我想知道当运行器是SparkRunner时,是否有人对Beam KafkaIO组件中的速率限制有任何经验。我使用的版本是:Beam 2.29、Spark 3.2.0和Kafka客户端2.5.0?
我将Beam参数maxRecordsPerBatch设置为一个很大的数字,100000000。但即使管道停止45分钟,这个值也永远不会达到。但是,当出现高于正常值的高突发数据时,Kafka延迟会增加,直到它最终赶上。在SparkUI中,我看到参数batchIntervalMillis=300000(5分钟)未达到,批次最多需要3分钟。看起来KafkaIO在某个点停止读取,即使延迟非常大。我的Kafka参数--fetchMaxWaitMs=1000-maxPollRecords=5000应该能够带来大量数据。特别是因为KafkaIO为每个分区创建一个消费者。在我的系统中,有多个主题,共有992个分区,spark.default.aparallelism=600。一些分区的数据非常少,而其他分区的数据量很大。主题是每个区域的,当一个区域下降时,数据将通过另一个区域/主题发送。这就是滞后发生的时候。
spark.streaming.receiver.maxRate和spark.streaming_receiver.maxRatePerPartition加上spark.streaming.backpressure.enabled的配置值是否起作用?据我所见,Beam似乎用操作员KafkaIO控制了Kafka的全部读数。此组件创建自己的使用者,因此使用者的速率只能通过使用包括fetchMaxWaitMs和maxPollRecords的使用者配置来设置。如果在IO源之后的管道的其余部分中,这些Spark参数可能会产生任何影响。但我不确定。
所以我终于弄清楚了这一切是如何运作的。首先,我想声明Spark配置值:spark.streaming.receiver.maxRate,spark.streaming.receiver.maxRatePerPartition,spark.streaming.backpressure.enabled在Beam中不起作用,因为它们仅在使用Spark中的源运算符从Kafka读取时才有效。由于Beam有自己的运算符KafkaIO,因此它们不发挥作用。
因此Beam在SparkPipelineOptions类中定义了一组参数,这些参数在SparkRunner中用于设置Kafka的读取。这些参数包括:
@Description("Minimum time to spend on read, for each micro-batch.")
@Default.Long(200)
Long getMinReadTimeMillis();
@Description(
"A value between 0-1 to describe the percentage of a micro-batch dedicated to reading from UnboundedSource.")
@Default.Double(0.1)
Double getReadTimePercentage();
Beam 创建一个 SourceDStream 对象,它将传递给 Spark 以用作从 Kafka 读取的源。在此类中,方法 boundReadDuration 返回计算两个读取值中较大值的结果:比例持续时间和下限持续时间。第一个是通过将 BatchIntervalMillis 从 readTimePercent 乘以来计算的。第二个只是minReadTimeMillis的磨机值。下面是来自SourceDStream的代码。从此函数返回的持续时间将用于单独从 Kafka 读取,其余时间将分配给管道中的其他任务。
最后但同样重要的是,以下参数还控制在批处理maxRecordsPerBatch期间处理多少条记录。管道不会在单个批处理中处理超过这些记录。
private Duration boundReadDuration(double readTimePercentage, long minReadTimeMillis) {
long batchDurationMillis = ssc().graph().batchDuration().milliseconds();
Duration proportionalDuration = new Duration(Math.round(batchDurationMillis * readTimePercentage));
Duration lowerBoundDuration = new Duration(minReadTimeMillis);
Duration readDuration = proportionalDuration.isLongerThan(lowerBoundDuration) ? proportionalDuration: lowerBoundDuration;
LOG.info("Read duration set to: " + readDuration);
return readDuration;
}