Java源码示例:org.apache.beam.examples.common.WriteOneFilePerWindow

示例1
public static void main(String[] args) throws IOException {
  // The maximum number of shards when writing output.
  int numShards = 1;

  PubSubToGCSOptions options = PipelineOptionsFactory
    .fromArgs(args)
    .withValidation()
    .as(PubSubToGCSOptions.class);

  options.setStreaming(true);

  Pipeline pipeline = Pipeline.create(options);

  pipeline
    // 1) Read string messages from a Pub/Sub topic.
    .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
    // 2) Group the messages into fixed-sized minute intervals.
    .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
    // 3) Write one file to GCS for every window of messages.
    .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

  // Execute the pipeline and wait until it finishes running.
  pipeline.run().waitUntilFinish();
}
 
示例2
static void runWindowedWordCount(Options options) throws IOException {
  final String output = options.getOutput();
  final Instant minTimestamp = new Instant(options.getMinTimestampMillis());
  final Instant maxTimestamp = new Instant(options.getMaxTimestampMillis());

  Pipeline pipeline = Pipeline.create(options);

  /*
   * Concept #1: the Beam SDK lets us run the same pipeline with either a bounded or
   * unbounded input source.
   */
  PCollection<String> input =
      pipeline
          /* Read from the GCS file. */
          .apply(TextIO.read().from(options.getInputFile()))
          // Concept #2: Add an element timestamp, using an artificial time just to show
          // windowing.
          // See AddTimestampFn for more detail on this.
          .apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)));

  /*
   * Concept #3: Window into fixed windows. The fixed window size for this example defaults to 1
   * minute (you can change this with a command-line option). See the documentation for more
   * information on how fixed windows work, and for information on the other types of windowing
   * available (e.g., sliding windows).
   */
  PCollection<String> windowedWords =
      input.apply(
          Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));

  /*
   * Concept #4: Re-use our existing CountWords transform that does not have knowledge of
   * windows over a PCollection containing windowed values.
   */
  PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());

  /*
   * Concept #5: Format the results and write to a sharded file partitioned by window, using a
   * simple ParDo operation. Because there may be failures followed by retries, the
   * writes must be idempotent, but the details of writing to files is elided here.
   */
  wordCounts
      .apply(MapElements.via(new WordCount.FormatAsTextFn()))
      .apply(new WriteOneFilePerWindow(output, options.getNumShards()));

  PipelineResult result = pipeline.run();
  try {
    result.waitUntilFinish();
  } catch (Exception exc) {
    result.cancel();
  }
}
 
示例3
static void runWindowedWordCount(Options options) throws IOException {
  final String output = options.getOutput();
  final Instant minTimestamp = new Instant(options.getMinTimestampMillis());
  final Instant maxTimestamp = new Instant(options.getMaxTimestampMillis());

  Pipeline pipeline = Pipeline.create(options);

  /*
   * Concept #1: the Beam SDK lets us run the same pipeline with either a bounded or
   * unbounded input source.
   */
  PCollection<String> input =
      pipeline
          /* Read from the GCS file. */
          .apply(TextIO.read().from(options.getInputFile()))
          // Concept #2: Add an element timestamp, using an artificial time just to show
          // windowing.
          // See AddTimestampFn for more detail on this.
          .apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)));

  /*
   * Concept #3: Window into fixed windows. The fixed window size for this example defaults to 1
   * minute (you can change this with a command-line option). See the documentation for more
   * information on how fixed windows work, and for information on the other types of windowing
   * available (e.g., sliding windows).
   */
  PCollection<String> windowedWords =
      input.apply(
          Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));

  /*
   * Concept #4: Re-use our existing CountWords transform that does not have knowledge of
   * windows over a PCollection containing windowed values.
   */
  PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());

  /*
   * Concept #5: Format the results and write to a sharded file partitioned by window, using a
   * simple ParDo operation. Because there may be failures followed by retries, the
   * writes must be idempotent, but the details of writing to files is elided here.
   */
  wordCounts
      .apply(MapElements.via(new WordCount.FormatAsTextFn()))
      .apply(new WriteOneFilePerWindow(output, options.getNumShards()));

  PipelineResult result = pipeline.run();
  try {
    result.waitUntilFinish();
  } catch (Exception exc) {
    result.cancel();
  }
}