Most of big data projects are migration. And most of utility packages does not work as in samples because
of version differences , library conflicts.
We decided to use Stratio/Spark-MongoDB for Spark to MongoDB migration.
We had to move data from old Solr files to Mongo. If we try to move all data it was getting exceptions
which implies size problems.(Sorry forgot the exceptions.) Then we need a method for writing the items in batch.
We tried different sizes and found 50000 was working.
Solr was also returning how many records are there actually even if we do not fetch.
So we defined a function(of course we could do at loop) to check if we consumed all of them in batches.
We got some errors on usage of library stratio. I found standard codes were not compatible with our environment and
found magic configuration by checking source code. I omitted lots of code below but it gives a perfect idea.
And in fact it is runnable if you write your data frames correctly.
Our environment was Spark 1.6.2 .
import scalaj.http._ import com.mongodb.casbah.{WriteConcern => MongodbWriteConcern} import com.stratio.datasource._ import com.stratio.datasource.mongodb._ import com.stratio.datasource.mongodb.schema._ import com.stratio.datasource.mongodb.writer._ import com.stratio.datasource.mongodb.config._ import com.stratio.datasource.mongodb.config.MongodbConfig._ import org.apache.spark.sql.SQLContext import com.stratio.datasource.util.Config._ import org.apache.spark.sql.functions._ val wbuilder = MongodbConfigBuilder(Map(Host -> List("192.168.2.150:27018"), Database -> "ManagementDev", Collection ->"arsiv", SamplingRatio -> 1.0, WriteConcern -> "normal", ReadPreference -> "Secondary")) val writeConfig = wbuilder.build() var condition=true var start=0 var pass=0 def checkPaging(total:Long,start:Int):Boolean={ if((start+50000)<(total)) { return true } else { return false } } do{ val response1 = Http("http://192.168.2.155:8983/solr/collection1/select?q=*%3A*&sort=tw_id+asc&start="+start+"&rows=50000&fl=AuthorEmail&wt=json") .timeout(connTimeoutMs = 1000000, readTimeoutMs = 500000).header("key", "val").method("get") .execute().body val rdd1 = sc.parallelize(Seq(response1)) val df1 = sqlContext.read.json(rdd1) val df2=df1.select($"response.docs") //Lots of omitted codes val dcount=df1.select("response.numFound").take(1)(0)(0).asInstanceOf[Long] //Lots of omitted codes dfy.write.format("parquet").mode(org.apache.spark.sql.SaveMode.Append).partitionBy("saveCriteria").save("hdfs://your.com/user/Archive") dfx.saveToMongodb(writeConfig) condition=checkPaging(dcount,start) start=start+50000 pass=pass+1 println("!!PASS "+pass ) } while(condition)
No comments:
Post a Comment