Sunday, January 15, 2017

Write Elastic from Kafka

Reading and writing from Kafka is so time consuming because you write your code,
wait data to come. If you see exception break the program and write again.

Our task was
1)Read a Json formatted data from kafka
2)Filter messages if they include some keywords.
3)Write to elastic search.

**Add related kafka streaming jars to your Zeppelin or
application.



import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.rdd.RDD
import scalaj.http._
import org.apache.spark.sql.SQLContext
import com.stratio.datasource.util.Config._
import org.apache.spark.sql.functions._
import org.elasticsearch.spark.sql._
import scalaj.http._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._

val ssc = new StreamingContext(sc, Seconds(30))
val topicMap:Map[String,Int] =Map("YourTopic"->1)
val lines = KafkaUtils.createStream(ssc,"your.com:2181", "group", topicMap).map(_._2)

lines.foreachRDD((rdd: RDD[String], time: Time) => {
      
  val df2 = sqlContext.read.json(rdd)

 
  val df = df2.filter($"msg".contains("mykeyword1") || $"msg".contains("mykeyword1") )
 
  
  val esConfig = Map("es.nodes"->"192.168.1.151","es.port"->"9200")

         df.saveToEs("elastic/target",esConfig)
 
    })

No comments:

Post a Comment