提问者:小点点

如何使用spark流处理实时流数据/日志?


我是Spark和Scala的新手。

我想实现一个实时Spark Consumer,它可以每分钟从Kafka Publisher读取网络日志[每分钟获取大约1GB的JSON日志行],并最终将聚合值存储在ElasticSearch中。

聚合基于几个值[如bytes_in、bytes_out等],使用复合键[如:客户端MAC、客户端IP、服务器MAC、服务器IP等]。

我写的星火消费者是:

object LogsAnalyzerScalaCS{
    def main(args : Array[String]) {
          val sparkConf = new SparkConf().setAppName("LOGS-AGGREGATION")
          sparkConf.set("es.nodes", "my ip address")
          sparkConf.set("es.port", "9200")
          sparkConf.set("es.index.auto.create", "true")
          sparkConf.set("es.nodes.discovery", "false")

          val elasticResource = "conrec_1min/1minute"
          val ssc = new StreamingContext(sparkConf, Seconds(30))
          val zkQuorum = "my zk quorum IPs:2181"
          val consumerGroupId = "LogsConsumer"
          val topics = "Logs"
          val topicMap = topics.split(",").map((_,3)).toMap
          val json = KafkaUtils.createStream(ssc, zkQuorum, consumerGroupId, topicMap)
          val logJSON = json.map(_._2)
          try{
            logJSON.foreachRDD( rdd =>{
              if(!rdd.isEmpty()){
                  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
                  import sqlContext.implicits._
                  val df = sqlContext.read.json(rdd)
                  val groupedData = 
((df.groupBy("id","start_time_formated","l2_c","l3_c",
"l4_c","l2_s","l3_s","l4_s")).agg(count("f_id") as "total_f", sum("p_out") as "total_p_out",sum("p_in") as "total_p_in",sum("b_out") as "total_b_out",sum("b_in") as "total_b_in", sum("duration") as "total_duration"))
                  val dataForES = groupedData.withColumnRenamed("start_time_formated", "start_time")
                  dataForES.saveToEs(elasticResource)
                  dataForES.show();
                }
              })
             }
          catch{
            case e: Exception => print("Exception has occurred : "+e.getMessage)
          }
          ssc.start()
          ssc.awaitTermination()
        }

object SQLContextSingleton {
    @transient  private var instance: org.apache.spark.sql.SQLContext = _
    def getInstance(sparkContext: SparkContext): org.apache.spark.sql.SQLContext = {
      if (instance == null) {
        instance = new org.apache.spark.sql.SQLContext(sparkContext)
      }
      instance
    }
  }
}

首先,我想知道我的方法是否正确[考虑到我需要1分钟的日志聚合]?

使用此代码似乎存在问题:

    < li >该消费者将每30秒从Kafka broker提取数据,并将这30秒数据的最终聚合保存到Elasticsearch,从而增加Elasticsearch中唯一键的行数[每分钟至少2个条目]。UI工具【就说Kibana吧】需要做进一步的聚合。如果我将轮询时间从30秒增加到60秒,则需要花费大量时间来聚合,因此根本无法保持实时性。 < li >我希望以这样一种方式实现它,即在ElasticSearch中,每个键只能保存一行。因此,我希望执行聚合,直到我的数据集中没有新的键值,这些键值是从Kafka broker[每分钟]获取的。在做了一些谷歌搜索后,我发现这可以使用groupByKey()和updateStateByKey()函数来实现,但我不知道如何在我的情况下使用它[我是否应该将JSON日志行转换成一串具有平坦值的日志行,然后在那里使用这些函数]?如果我将使用这些函数,那么我应该何时将最终的聚合值保存到ElasticSearch中? < li >是否有其他实现方式?

您的快速帮助将不胜感激。

问候,普佩什


共1个答案

匿名用户

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Main {
def main(args: Array[String]): Unit = {


val conf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(15))

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "group1",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)//,localhost:9094,localhost:9095"

val topics = Array("test")
val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

val out = stream.map(record =>
  record.value
)

val words = out.flatMap(_.split(" "))
val count = words.map(word => (word, 1))
val wdc = count.reduceByKey(_+_)

val sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())

wdc.foreachRDD{rdd=>
        val es = sqlContext.createDataFrame(rdd).toDF("word","count")
        import org.elasticsearch.spark.sql._
        es.saveToEs("wordcount/testing")
  es.show()
}

ssc.start()
ssc.awaitTermination()

 }
}

查看完整示例和sbt

apache Sparkscalahadookafkaapache spark-sql spark-streamingapache-spark-2.0弹性