TL;DR:我的spark应用程序接收了Kafka发送的0.1%的短信。我主要怀疑的是:对于每个批处理间隔(在这个例子中是1秒),新的JVM被实例化。我正在尝试使用摄取数据。延迟加载的map()转换。司机有没有可能
详细信息的长版本:
我的事件流程如下:一个java类产生sample(。json作为字符串)数据
问题:在实验中,我从数据生成器发送10000条消息。我有一个kibana仪表板来处理这些输入数据
Q1。剩下的400个msg都丢在哪里了?
现在火花(批处理间隔为1秒
它持续读取10(有时20)条消息。
如果它读取10条消息,则计数值为1-10
Q2。为什么Spark只能获得10(或最多20)条消息?
我在 Spark 应用程序中更改了“auto.offset.reset”、“最小”设置,但这并没有真正帮助。它只从计数 1-10 读取 10 条消息。
Q3.需要做些什么才能让它从Kafka主题的开头开始阅读?
我能想到的 1 件事是这里的破坏运动是我在 .map 函数中摄取味精:
JavaDStream<String> lines = stream.map(new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> tuple2) {
my_fn(tuple2._2().toString());
return tuple2._2();
}
有人能阐明窗口和减少功能吗?
注意:我使用logstash实例从spark迁移数据-
我需要在数据生成器脚本中操作json obj。我正在使用maven
问题 4.如何使用 kafka-run-class 方法运行着色的 jar,或者它是否需要作为独立的 java 程序运行,在这种情况下,它会以与使用 kafka 运行时相同的速率发出 msgs 开箱即用的脚本,因为我认为它确实负责并行化
使用这个 kafka 脚本,我能够在我的机器上渗出 1.4Mpps。
编辑:更多关于Kafka-火花分区的信息
图例:topic_1是来自生产者(脚本)的数据-
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic_1 Topic:topic_1 PartitionCount:1 ReplicationFactor:1 Configs: Topic: topic_1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 curl 'localhost:9200/_cat/indices?v' health status index pri rep docs.count docs.deleted store.size pri.store.size yellow open topic_1 5 1 11002386 0 702.1mb 702.1mb yellow open topic_2 5 1 6307 0 786.4kb 786.4kb yellow open .kibana 1 1 9 0 47kb 47kb
火花,我和一个执行者一起跑
Data_gen:简单的java kafka生产者代码,以1:1的比例发送值为0或1的json字符串。Spark_app:主qn中的代码。my_fn()获取字符串msg,将其转换为json
map() transformations are lazy so they don’t like to work until asked for.
I was expecting messages in a map() transformation.
put this expectation in rdd.take() which in non-lazy inside foreachRDD()
It worked.