提问者:小点点

apache波束数据流作业异常无限期重试


我正在GCP上运行一个使用apacheSDK2.39创建的流式传输管道作为数据流作业。基本上,我从PubSub读取并写入PubSub,通过墙壁时钟进行一些聚合。

当我的一个ParDo操作抛出运行时异常时,我遇到了一个问题(运行时异常的原因是PubSub中的一个输入元素“无效”:属性为null,但代码假定它是非null)。我观察到的是:

  • 光束开始无限期重试失败元素(实际上是聚合事件)
  • 新的输入元素(即将输入PubSub)被正确聚合
  • 每次重新处理有问题GCP聚合事件时,控制台中的日志消息数量都会增加
  • 如果我终止我的波束作业并重新启动它,所有问题都消失了,并且不再次处理违规的输入元素,这向我建议输入元素一旦被apache波束读取就会被确认。

我正在寻找一种通用的方法来处理这种情况:我想重试一个有问题的元素配置的次数(以覆盖一些瞬态问题,如在我的ParDo之一,我正在访问BigQuery),但经过多次重试后,我只想记录问题并忽略违规元素-我不在乎输出消息会乱序。

它可以在Apache Beam中以某种方式完成吗?我知道我可能会将所有ParDo逻辑包装在try/catch块中,但是:

  • 我如何知道元素是第一次处理还是重试
  • 我需要将所有ParDo包装在这种逻辑中(可以回退到某些Templatemethod模式)有什么想法吗?

共1个答案

匿名用户

最简单的方法是在DoFn的流程方法中放置一个try/catch块(可能使用for循环重试固定次数)。然后,如果您想用它做其他事情(例如计算它们或记录它们以供将来检查),您可以将持久失败的消息发送到不同的输出。这种模式(以重试为模,认为这可能是一个值得添加的功能)通常称为“死信”模式,可通过ParDo(…)在Python中使用。with_exception_handling(…)。