Sunday, January 15, 2017

Spark, Migrate Data from Solr to MongoDB in Batches


Most of big data projects are migration. And most of utility packages does not work as in samples because
of version differences , library conflicts.

We decided to use Stratio/Spark-MongoDB for Spark to MongoDB migration.

We had to move data from old Solr files to Mongo. If we try to move all data it was getting exceptions
which implies size problems.(Sorry forgot the exceptions.) Then we need a method for writing the items in batch.
We tried different sizes and found 50000 was working.

Solr was also returning how many records are there actually even if we do not fetch.
So we defined a function(of course we could do at loop) to check if we consumed all of them in batches.

We got some errors on usage of library stratio. I found standard codes were not compatible with our environment and
found magic configuration by checking source code. I omitted lots of code below but it gives a perfect idea.
And in fact it is runnable if you write your data frames correctly.
Our environment was Spark 1.6.2 .


import scalaj.http._
import com.mongodb.casbah.{WriteConcern => MongodbWriteConcern}
import com.stratio.datasource._
import com.stratio.datasource.mongodb._
import com.stratio.datasource.mongodb.schema._
import com.stratio.datasource.mongodb.writer._
import com.stratio.datasource.mongodb.config._
import com.stratio.datasource.mongodb.config.MongodbConfig._
import org.apache.spark.sql.SQLContext
import com.stratio.datasource.util.Config._
import org.apache.spark.sql.functions._

val wbuilder = MongodbConfigBuilder(Map(Host -> List("192.168.2.150:27018"), Database -> "ManagementDev", Collection ->"arsiv", SamplingRatio -> 1.0, WriteConcern -> "normal", ReadPreference -> "Secondary"))
val writeConfig = wbuilder.build()

var condition=true
var start=0
var pass=0

def checkPaging(total:Long,start:Int):Boolean={
    if((start+50000)<(total)) {
        return true
    }
    else {
        return false
    }
}

do{
    val response1 = Http("http://192.168.2.155:8983/solr/collection1/select?q=*%3A*&sort=tw_id+asc&start="+start+"&rows=50000&fl=AuthorEmail&wt=json")
    .timeout(connTimeoutMs = 1000000, readTimeoutMs = 500000).header("key", "val").method("get")
    .execute().body
    val rdd1 = sc.parallelize(Seq(response1))
 
    val df1 = sqlContext.read.json(rdd1)
    
    val df2=df1.select($"response.docs")
    
    //Lots of omitted codes
    val dcount=df1.select("response.numFound").take(1)(0)(0).asInstanceOf[Long]
    //Lots of omitted codes

    dfy.write.format("parquet").mode(org.apache.spark.sql.SaveMode.Append).partitionBy("saveCriteria").save("hdfs://your.com/user/Archive") 
    dfx.saveToMongodb(writeConfig)
    
    condition=checkPaging(dcount,start)
    start=start+50000
    pass=pass+1
    println("!!PASS "+pass )
} 
while(condition)

No comments:

Post a Comment