提问者:小点点

运行束流水线时,对象没有属性窗口


在运行数据流作业时,我得到的“PBegin”对象没有属性“windows”。我正在pardo函数中调用连接类。

我正在尝试从Beam python连接NOSQL数据库SDK并运行sql从表中提取数据。然后使用另一个pardo将输出写入单独的文件。

class Connector(beam.DoFn):
    def __init__(self,username,seeds,keyspace,password,datacenter=None):
    self.username = username
    self.password = password
    self.seeds = seeds
    self.keyspace = keyspace
    self.datacenter = datacenter
    super(self.__class__, self).__init__()

    def process(self, element):

    if datacenter:
        load_balancing_policy = DCAwareRoundRobinPolicy(local_dc=self.datacenter)
    auth_provider = PlainTextAuthProvider(username=self.username, password=self.password)
    cluster = Cluster(contact_points=self.seeds,
                      load_balancing_policy=load_balancing_policy,
                      auth_provider=auth_provider)
    session=cluster.connect(self.seeds,self.keyspace,self.username, self.password, self.datacenter)
    rows = session.execute(SQL Query)
    yield rows

共2个答案

匿名用户

只是偶然发现了同样的问题。尝试连接到RDBMS源,但我想就实现设计而言,非关系型数据库和SQL数据库没有区别。

除了Jayadeep Jayaraman建议的之外,这可以通过使用ParDo来实现。实际上,如果您的用例可以接受这样做的限制,使用ParDo进行连接是波束留档建议的:

对于有界(批处理)源,目前有两种创建Beam源的选项:

使用ParDo和GroupByKey。

使用Source接口并扩展BoundedSource抽象子类。

ParDo是推荐的选项,因为实现Source可能很棘手。请参阅何时使用

你没有展示你如何使用你的DoFn。对我来说,记住DoFn作用于已经存在的PCollection的元素是很有帮助的。它不能自己从头开始创建DoFn。因此,为了克服你提到的问题,你可能需要从内存中创建一个PCollection,其中包含一个用于从源中检索数据的查询元素。然后将从源读取的ParDo应用到这个PCollection。

BTW:我想出了每个分区一个元素,我想从我的RDBMS读取我的收藏-所以数据可以从我的SQL数据库并行读取。

解决方案可能如下所示:

p | beam.Create(["Your Query / source object qualifier goes here"]) 
  | "Read from Database" >> beam.ParDo(YourConnector())

让我也提一下,使用DoFn的start_bundle和finish_bundle方法来设置/断开连接可能是个好主意。

匿名用户

为此,您需要使用BeamIO。这里有一个关于如何在Python中构建自定义IO的指南[1]。

ParDo通常用于在PCollection上运行转换。您也可以查看SplittableDoFn来构建类似的东西。参考这里[2]

1 - https://beam.apache.org/documentation/io/developing-io-python/

2 - https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html