wait data to come. If you see exception break the program and write again.
Our task was
1)Read a Json formatted data from kafka
2)Filter messages if they include some keywords.
3)Write to elastic search.
**Add related kafka streaming jars to your Zeppelin or
application.
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.rdd.RDD
import scalaj.http._
import org.apache.spark.sql.SQLContext
import com.stratio.datasource.util.Config._
import org.apache.spark.sql.functions._
import org.elasticsearch.spark.sql._
import scalaj.http._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
val ssc = new StreamingContext(sc, Seconds(30))
val topicMap:Map[String,Int] =Map("YourTopic"->1)
val lines = KafkaUtils.createStream(ssc,"your.com:2181", "group", topicMap).map(_._2)
lines.foreachRDD((rdd: RDD[String], time: Time) => {
val df2 = sqlContext.read.json(rdd)
val df = df2.filter($"msg".contains("mykeyword1") || $"msg".contains("mykeyword1") )
val esConfig = Map("es.nodes"->"192.168.1.151","es.port"->"9200")
df.saveToEs("elastic/target",esConfig)
})
No comments:
Post a Comment