提问者:小点点

Apache BeamPython窗口和GroupByKey从流不工作


我正在使用Apache Beam的PythonSDK,我无法从无界PCollection中按窗口和键执行聚合。数据来自Kafka主题,它被组织为带有键、值和时间戳的字典。我在beam_nuggets包中与Kafka消费者一起阅读它(因为我无法使默认的Kafka消费者工作),应用三分钟长的固定窗口,GroupByKey并计算平均值。我目前对处理后期数据不感兴趣(默认触发器应该可以很好地工作)。似乎所有数据在windows中都被正确划分,但从未调用GroupByKey之后的聚合函数。

这是我使用的代码:

import json
import apache_beam as beam
from apache_beam.transforms import window, trigger
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import kafkaio

beam_options = PipelineOptions(
        runner = "DirectRunner",
        streaming = True,
        )

class AddTimestampDoFn(beam.DoFn):
    def process(self, element):
        unix_timestamp = element["datetime"]/1000

        yield beam.window.TimestampedValue(element, unix_timestamp)

def add_key(x):
    print("add key", x["datetime"])
    return (x["key"], x)

def process_group(x):
    print("process_group")
    return sum(x)/len(x)

with beam.Pipeline(options = beam_options) as pipeline:
    data = (pipeline | kafkaio.KafkaConsume(consumer_config = {"bootstrap_servers": "localhost:9092",
                                                      "topic": "foo",
                                                      "group_id": "consumer_group",
                                                      "auto_offset_reset": "latest"},
                                                      value_decoder = bytes.decode
                                                )
                     | "ToDict" >> beam.MapTuple(lambda k,v: json.loads(v))
                     | "Add timestamp" >> beam.ParDo(AddTimestampDoFn())
                     | "Add key" >> beam.Map(add_key)
                     | "Window" >> beam.WindowInto(window.FixedWindows(60*3))
            )
    grouped = (data | f"Group" >> beam.GroupByKey()
                    | f"ProcessGroup" >> beam.Map(process_group)
              )

第一部分似乎工作正常,因为Kafka消费者收到的每条消息都会打印“添加键”调试日志。窗口似乎设置正确,每个数据点都分配给一个窗口。然而,“process_group”日志永远不会打印,就好像管道永远不会到达那个点一样。

我知道StackOverflow上有几个类似的问题(比如这个、这个或这个),但似乎没有一个解决方案有效。

我还尝试了不同的触发功能(如AfterWatermark),但似乎仍然不起作用。

Apache Beam版本是2.41.0


共1个答案

匿名用户

当我阅读带有静态数量记录的测试Kafka主题时,Flink和CoGroupByKey也遇到了类似的问题。一旦我开始每隔几秒钟产生一次新消息,问题就消失了,CoGroupByKey开始按预期运行。

在这里找到解决方案:

https://github.com/apache/beam/issues/22809#issuecomment-1310971785