wait data to come. If you see exception break the program and write again.
Our task was
1)Read a Json formatted data from kafka
2)Filter messages if they include some keywords.
3)Write to elastic search.
**Add related kafka streaming jars to your Zeppelin or
application.
import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.rdd.RDD import scalaj.http._ import org.apache.spark.sql.SQLContext import com.stratio.datasource.util.Config._ import org.apache.spark.sql.functions._ import org.elasticsearch.spark.sql._ import scalaj.http._ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions._ val ssc = new StreamingContext(sc, Seconds(30)) val topicMap:Map[String,Int] =Map("YourTopic"->1) val lines = KafkaUtils.createStream(ssc,"your.com:2181", "group", topicMap).map(_._2) lines.foreachRDD((rdd: RDD[String], time: Time) => { val df2 = sqlContext.read.json(rdd) val df = df2.filter($"msg".contains("mykeyword1") || $"msg".contains("mykeyword1") ) val esConfig = Map("es.nodes"->"192.168.1.151","es.port"->"9200") df.saveToEs("elastic/target",esConfig) })
No comments:
Post a Comment