提问者:小点点

使用数据流批量插入到Bigquery


我正在使用apache光束管道,我想用python批量插入到bigquery。我的数据来自无界的Pub/Sub。根据我的研究,带触发器的GlobalWindows应该可以解决我的问题。我尝试了窗口化我的管道,但它仍然流式传输insertion.My管道代码如下:

p2 = (p | 'Read ' >> beam.io.ReadFromPubSub(subscription=subscription_path,
    with_attributes=True,
    timestamp_attribute=None,id_label=None)
       | 'Windowing' >>  beam.WindowInto(window.GlobalWindows(),
           trigger=Repeatedly(
                   AfterAny(
                AfterCount(100),
           AfterProcessingTime(1 * 60))), 
        accumulation_mode=AccumulationMode.DISCARDING)
      | 'Process ' >> beam.Map(getAttributes))
p3 = (p2 | 'Filter ' >> beam.Filter(lambda msg: (("xx" in msg) and (msg["xx"].lower() == "true")))
         | 'Delete ' >> beam.Map(deleteAttribute)
         | 'Write '  >> writeTable(bq_table_test, bq_batch_size))

def writeTable(table_name):
return beam.io.WriteToBigQuery(
    table=table_name,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
    batch_size=100)

我正在从账单报告中检查插入是批处理还是流。当Streming插入使用增加时,我知道没有发生批量插入。我可以检查插入是流还是批处理的另一个功能吗?还有如何对bigquery进行批量插入?


共1个答案

匿名用户

根据您无法指定插入类型的留档,它会根据您的输入PCollection自动检测:

Python的BeamSDK目前不支持指定插入方法。

BigQueryIO支持两种向BigQuery中插入数据的方法:加载作业和流式插入。每种插入方法都提供了成本、配额和数据展示一致性的不同权衡。有关这些权衡的更多信息,请参阅BigQuery留档以获取加载作业和流式插入。

BigQueryIO根据输入PCollection选择默认插入方法。

当您将BigQueryIO写入转换应用于有界PCollection时,BigQueryIO使用加载作业。

当您将BigQueryIO写入转换应用于无界PCollection时,BigQueryIO使用流插入。

在您的情况下,您正在从无界源(Pubsub)读取,因此在这种情况下它始终是流式写入。窗口化不会改变数据的性质。

我能想到的一种解决方法是拆分管道,例如,流式管道将写入某个存储(GCS)的文件集合,然后另一个管道将读取和上传这些文件(文件是有界的)。