提问者:小点点

阅读Kafka时Spark丢失99.9%的消息


TL;DR:我的spark应用程序接收了Kafka发送的0.1%的短信。我主要怀疑的是:对于每个批处理间隔(在这个例子中是1秒),新的JVM被实例化。我正在尝试使用摄取数据。延迟加载的map()转换。司机有没有可能

详细信息的长版本:

我的事件流程如下:一个java类产生sample(。json作为字符串)数据

问题:在实验中,我从数据生成器发送10000条消息。我有一个kibana仪表板来处理这些输入数据

Q1。剩下的400个msg都丢在哪里了?

现在火花(批处理间隔为1秒

它持续读取10(有时20)条消息。

如果它读取10条消息,则计数值为1-10

Q2。为什么Spark只能获得10(或最多20)条消息?

我在 Spark 应用程序中更改了“auto.offset.reset”、“最小”设置,但这并没有真正帮助。它只从计数 1-10 读取 10 条消息。

Q3.需要做些什么才能让它从Kafka主题的开头开始阅读?

我能想到的 1 件事是这里的破坏运动是我在 .map 函数中摄取味精:

JavaDStream<String> lines = stream.map(new Function<Tuple2<String, String>, String>() {
  public String call(Tuple2<String, String> tuple2) {
  my_fn(tuple2._2().toString());
  return tuple2._2();
}

有人能阐明窗口和减少功能吗?

注意:我使用logstash实例从spark迁移数据-

我需要在数据生成器脚本中操作json obj。我正在使用maven

问题 4.如何使用 kafka-run-class 方法运行着色的 jar,或者它是否需要作为独立的 java 程序运行,在这种情况下,它会以与使用 kafka 运行时相同的速率发出 msgs 开箱即用的脚本,因为我认为它确实负责并行化

使用这个 kafka 脚本,我能够在我的机器上渗出 1.4Mpps。

编辑:更多关于Kafka-火花分区的信息

图例:topic_1是来自生产者(脚本)的数据-

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic_1 
          Topic:topic_1 PartitionCount:1    ReplicationFactor:1 Configs: 
          Topic: topic_1 Partition: 0   Leader: 0   Replicas: 0 Isr: 0 

curl 'localhost:9200/_cat/indices?v'
health status index          pri rep docs.count docs.deleted store.size pri.store.size 
yellow open   topic_1     5   1   11002386            0    702.1mb        702.1mb 
yellow open   topic_2     5   1       6307            0    786.4kb        786.4kb 
yellow open   .kibana         1   1          9            0       47kb        47kb 

火花,我和一个执行者一起跑

Data_gen:简单的java kafka生产者代码,以1:1的比例发送值为0或1的json字符串。Spark_app:主qn中的代码。my_fn()获取字符串msg,将其转换为json


共1个答案

匿名用户

map() transformations are lazy so they don’t like to work until asked for.
I was expecting messages in a map() transformation.
put this expectation in rdd.take() which in non-lazy inside foreachRDD()
It worked.