提问者:小点点

使用Apache Beam为每个窗口编写独特的镶木地板文件Python


我正在尝试将消息从kafka消费者流式传输到谷歌云存储,使用apache光束有30秒的窗口。用于beam_nuggets.io从kafka主题中读取。然而,我无法为每个窗口编写独特的镶木地板文件GCS。你可以在下面看到我的代码:'

import apache_beam as beam
from apache_beam.transforms.trigger import AfterAny, AfterCount, AfterProcessingTime, AfterWatermark, Repeatedly

from apache_beam.portability.api.beam_runner_api_pb2 import AccumulationMode
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import kafkaio
import json
from datetime import datetime
import pandas as pd
import config as conf
import apache_beam.transforms.window as window
consumer_config = {"topic": "Uswrite",
                   "bootstrap_servers": "*.*.*.*:9092",
                   "group_id": "notification_consumer_group_33"}
folder_name = datetime.now().strftime('%Y-%m-%d')
def format_result(consume_message):
    data = json.loads(consume_message[1])
    file_name = datetime.now().strftime("%Y_%m_%d-%I_%M_%S")
    df = pd.DataFrame(data).T #, orient='index'
    df.to_parquet(f'gs://{conf.gcs}/{folder_name}/{file_name}.parquet',
               storage_options={"token": "gcp.json"}, engine='fastparquet')
    print(consume_message)
with beam.Pipeline(options=PipelineOptions()) as p:
    consumer_message = (p | "Reading messages from Kafka" >> kafkaio.KafkaConsume(consumer_config=consumer_config)
                          | 'Windowing' >> beam.WindowInto(window.FixedWindows(30),
                                            trigger=AfterProcessingTime(30),
                                            allowed_lateness=900,
                                           accumulation_mode=AccumulationMode.ACCUMULATING)
                          | 'CombineGlobally' >> beam.Map(format_result))

# window.FixedWindows(30),trigger=beam.transforms.trigger.AfterProcessingTime(30),
# accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING
# allowed_lateness=100,CombineGlobally(format_result).without_defaults() allowed_lateness=30,

使用上面的代码,为每条消息生成一个新的拼花文件。我想做的是按30秒的窗口对消息进行分组,并为每个窗口生成一个拼花文件。我尝试了下面的不同配置,但没有成功:Beam. CombineGlobally(format_result).without_defaults())而不是beam.Map(format_result))Beam.ParDo(format_result))此外,我还有几个问题:

  1. 即使我将偏移量设置为auto. offset.set:最早,kafka生产者也会从最后一条消息开始读取,即使我更改了消费者组并且找不到原因。
  2. 另外,我对触发器、allowed_lateness、accumulation_mode的用法感到困惑。我不确定我是否需要它们来完成这项任务。

正如您在上面的代码块中看到的,我也尝试使用这些参数,但没有帮助。

我到处搜索,但找不到一个解释此用例的示例。


共2个答案

匿名用户

以下是您应该对管道进行的一些更改以获得此结果:

  • 如果您希望每个窗口只有一个输出,请删除您的触发器。触发器仅在每个窗口获得多个结果时才需要。
  • 添加GroupByKey组合操作来聚合元素。没有这样的操作,窗口没有任何效果。
  • 我建议使用Beam项目本身的parquetio来确保您获得可扩展的精确一次行为。(参见2.33.0版本的pydoc)

匿名用户

我看了一下python留档中的GroupByKey示例

  1. 我从Kafka消费者(我从beam_nuggets.io中使用了kafkaio)读取的消息有一种元组类型,为了使用GroupByKey,我试图通过附加我从Kafka消费者获得的元组来在convert_to_list函数中创建一个列表。然而,GroupByKey仍然没有产生任何输出。
import apache_beam as beam
from beam_nuggets.io import kafkaio

new_list = []
def convert_to_list(consume_message):
    new_list.append(consume_message)
    return new_list


with beam.Pipeline() as pipeline:
    dofn_params = (
            pipeline
            | "Reading messages from Kafka" >> kafkaio.KafkaConsume(consumer_config=consumer_config)
            | 'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30))
            | 'consume message added list' >> beam.ParDo(convert_to_list)
            | 'GroupBykey' >> beam.GroupByKey()
            | 'print' >> beam.Map(print))
import apache_beam as beam
from beam_nuggets.io import kafkaio


with beam.Pipeline() as pipeline:
    dofn_params = (
            pipeline
            | 'Created Pipeline' >> beam.Create([(None, '{"userId": "921","xx":"123"]),(None, '{"userId": "92111","yy":"123"]))
            | 'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30))
            | 'GroupBykey' >> beam.GroupByKey()
            | 'print' >> beam.Map(print))

我假设第一种方法中的问题与生成外部列表而不是pCollection有关,但我不确定。你能指导我如何继续吗?

我尝试的另一件事是使用apache_beam.io.kafka模块中的ReadFromKafka函数。但是这次我得到了以下错误:

ERROR:apache_beam.utils.subprocess_server:Starting job service with ['java', '-jar', 'user_directory’/.apache_beam/cache/jars\\beam-sdks-java-io-expansion-service-2.33.0.jar', '59627']

Java版本11.0.12已安装在我的计算机上,并且可以使用java命令。