提问者:小点点

谷歌云数据流-在bigquery中批量插入


我能够创建一个数据流管道,它从pub/sub读取数据,并在处理后以流模式写入大查询。

现在,我想以批处理模式运行我的管道,以降低成本,而不是流模式。

目前我的流水线正在使用动态目的地在bigquery中进行流式插入。我想知道是否有办法使用动态目的地执行批量插入操作。

下面是

public class StarterPipeline {  
   public interface StarterPipelineOption extends PipelineOptions {

    /**
     * Set this required option to specify where to read the input.
     */
    @Description("Path of the file to read from")
    @Default.String(Constants.pubsub_event_pipeline_url)
    String getInputFile();

    void setInputFile(String value);

}

@SuppressWarnings("serial")
public static void main(String[] args) throws SocketTimeoutException {

    StarterPipelineOption options = PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(StarterPipelineOption.class);

    Pipeline p = Pipeline.create(options);

    PCollection<String> datastream = p.apply("Read Events From Pubsub",
            PubsubIO.readStrings().fromSubscription(Constants.pubsub_event_pipeline_url));

    PCollection<String> windowed_items = datastream.apply(Window.<String>into(new GlobalWindows())
            .triggering(Repeatedly.forever(
                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(300))))
            .withAllowedLateness(Duration.standardDays(10)).discardingFiredPanes());

    // Write into Big Query
     windowed_items.apply("Read and make event table row", new
     ReadEventJson_bigquery())

     .apply("Write_events_to_BQ",
     BigQueryIO.writeTableRows().to(new DynamicDestinations<TableRow, String>() {
     public String getDestination(ValueInSingleWindow<TableRow> element) {
     String destination = EventSchemaBuilder
     .fetch_destination_based_on_event(element.getValue().get("event").toString());
     return destination;
     }

     @Override
     public TableDestination getTable(String table) {
     String destination =
     EventSchemaBuilder.fetch_table_name_based_on_event(table);
     return new TableDestination(destination, destination);
     }

     @Override
     public TableSchema getSchema(String table) {
     TableSchema table_schema =
     EventSchemaBuilder.fetch_table_schema_based_on_event(table);
     return table_schema;
     }
     }).withCreateDisposition(CreateDisposition.CREATE_NEVER)
     .withWriteDisposition(WriteDisposition.WRITE_APPEND)
     .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));

    p.run().waitUntilFinish();

    log.info("Events Pipeline Job Stopped");

}

}


共2个答案

匿名用户

批处理或流由PCollection决定,因此您需要将数据流PCollection从Pub/Sub转换为批处理PCollection以写入BigQuery。允许这样做的转换是GroupIntoBatches

请注意,由于此转换使用键值对,批处理将仅包含单个键的元素。对于非KV元素,请检查此相关答案。

使用此转换将PCollection作为批处理创建后,请像使用流PCollection一样应用带有动态目标的BigQuery写入。

匿名用户

您可以通过对流式作业使用文件加载来限制成本。插入方法部分指出,BigQueryIO. Write支持两种方法将数据插入到使用BigQueryIO.Write.与方法(org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method)指定的BigQuery中。如果未提供任何方法,则将根据输入PCollection选择默认方法。有关方法的更多信息,请参阅BigQueryIO.Write.Method。

不同的插入方法提供了成本、配额和数据展示一致性的不同权衡。有关这些权衡的更多信息,请参阅BigQuery留档。