Java源码示例:org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker

示例1
@Test
public void testDoFnWithReturn() throws Exception {
  class MockFn extends DoFn<String, String> {
    @DoFn.ProcessElement
    public ProcessContinuation processElement(
        ProcessContext c, RestrictionTracker<SomeRestriction, Void> tracker) throws Exception {
      return null;
    }

    @GetInitialRestriction
    public SomeRestriction getInitialRestriction(@Element String element) {
      return null;
    }

    @NewTracker
    public SomeRestrictionTracker newTracker(@Restriction SomeRestriction restriction) {
      return null;
    }
  }

  MockFn fn = mock(MockFn.class);
  when(fn.processElement(mockProcessContext, null)).thenReturn(resume());
  assertEquals(resume(), invokeProcessElement(fn));
}
 
示例2
@Test
public void testDefaultWatermarkEstimatorStateAndCoder() throws Exception {
  class MockFn extends DoFn<String, String> {
    @ProcessElement
    public void processElement(
        ProcessContext c, RestrictionTracker<RestrictionWithDefaultTracker, Void> tracker) {}

    @GetInitialRestriction
    public RestrictionWithDefaultTracker getInitialRestriction(@Element String element) {
      return null;
    }
  }

  MockFn fn = mock(MockFn.class);
  DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);

  CoderRegistry coderRegistry = CoderRegistry.createDefault();
  coderRegistry.registerCoderProvider(
      CoderProviders.fromStaticMethods(
          RestrictionWithDefaultTracker.class, CoderForDefaultTracker.class));
  assertEquals(VoidCoder.of(), invoker.invokeGetWatermarkEstimatorStateCoder(coderRegistry));
  assertNull(invoker.invokeGetInitialWatermarkEstimatorState(new FakeArgumentProvider<>()));
}
 
示例3
@Test
public void testMissingNewWatermarkEstimatorMethod() throws Exception {
  class BadFn extends DoFn<Integer, String> {
    @ProcessElement
    public void process(
        ProcessContext context,
        RestrictionTracker<SomeRestriction, Void> tracker,
        ManualWatermarkEstimator<Instant> watermarkEstimator) {}

    @GetInitialRestriction
    public SomeRestriction getInitialRestriction() {
      return null;
    }

    @GetInitialWatermarkEstimatorState
    public Instant getInitialWatermarkEstimatorState() {
      return null;
    }
  }

  thrown.expectMessage(
      "Splittable, either @NewWatermarkEstimator method must be defined or Instant must implement HasDefaultWatermarkEstimator.");
  DoFnSignatures.getSignature(BadFn.class);
}
 
示例4
@Test
public void testSplittableMissingNewTrackerMethod() throws Exception {
  class OtherRestriction {}

  class BadFn extends DoFn<Integer, String> {
    @ProcessElement
    public void process(
        ProcessContext context, RestrictionTracker<OtherRestriction, Void> tracker) {}

    @GetInitialRestriction
    public OtherRestriction getInitialRestriction() {
      return null;
    }
  }

  thrown.expectMessage(
      "Splittable, either @NewTracker method must be defined or OtherRestriction must implement HasDefaultTracker.");
  DoFnSignatures.getSignature(BadFn.class);
}
 
示例5
@Test
public void testHasDefaultTracker() throws Exception {
  class Fn extends DoFn<Integer, String> {
    @ProcessElement
    public void process(
        ProcessContext c, RestrictionTracker<RestrictionWithDefaultTracker, Void> tracker) {}

    @GetInitialRestriction
    public RestrictionWithDefaultTracker getInitialRestriction(@Element Integer element) {
      return null;
    }
  }

  DoFnSignature signature = DoFnSignatures.getSignature(Fn.class);
  assertEquals(RestrictionTracker.class, signature.processElement().trackerT().getRawType());
}
 
示例6
@Test
public void testHasDefaultWatermarkEstimator() throws Exception {
  class Fn extends DoFn<Integer, String> {
    @ProcessElement
    public void process(
        ProcessContext c,
        RestrictionTracker<SomeRestriction, Void> tracker,
        WatermarkEstimator<WatermarkEstimatorStateWithDefaultWatermarkEstimator>
            watermarkEstimator) {}

    @GetInitialRestriction
    public SomeRestriction getInitialRestriction(@Element Integer element) {
      return null;
    }

    @GetInitialWatermarkEstimatorState
    public WatermarkEstimatorStateWithDefaultWatermarkEstimator
        getInitialWatermarkEstimatorState() {
      return null;
    }
  }

  DoFnSignature signature = DoFnSignatures.getSignature(Fn.class);
  assertEquals(
      WatermarkEstimator.class, signature.processElement().watermarkEstimatorT().getRawType());
}
 
示例7
@Test
public void testNewTrackerReturnsWrongType() throws Exception {
  class BadFn extends DoFn<Integer, String> {
    @ProcessElement
    public void process(
        ProcessContext context, RestrictionTracker<SomeRestriction, Void> tracker) {}

    @NewTracker
    public void newTracker(@Restriction SomeRestriction restriction) {}

    @GetInitialRestriction
    public SomeRestriction getInitialRestriction(@Element Integer element) {
      return null;
    }
  }

  thrown.expectMessage(
      "Returns void, but must return a subtype of RestrictionTracker<SomeRestriction, ?>");
  DoFnSignatures.getSignature(BadFn.class);
}
 
示例8
@Test
public void testNewWatermarkEstimatorReturnsWrongType() throws Exception {
  class BadFn extends DoFn<Integer, String> {
    @ProcessElement
    public void process(
        ProcessContext context, RestrictionTracker<SomeRestriction, Void> tracker) {}

    @GetInitialRestriction
    public SomeRestriction getInitialRestriction(@Element Integer element) {
      return null;
    }

    @NewWatermarkEstimator
    public void newWatermarkEstimator() {}
  }

  thrown.expectMessage("Returns void, but must return a subtype of WatermarkEstimator<Void>");
  DoFnSignatures.getSignature(BadFn.class);
}
 
示例9
@Test
public void testGetInitialRestrictionMismatchesNewTracker() throws Exception {
  class BadFn extends DoFn<Integer, String> {
    @ProcessElement
    public void process(
        ProcessContext context, RestrictionTracker<SomeRestriction, Void> tracker) {}

    @NewTracker
    public SomeRestrictionTracker newTracker(@Restriction SomeRestriction restriction) {
      return null;
    }

    @GetInitialRestriction
    public String getInitialRestriction(@Element Integer element) {
      return null;
    }
  }

  thrown.expectMessage("but must return a subtype of RestrictionTracker<String, ?>");
  thrown.expectMessage("newTracker(SomeRestriction): Returns SomeRestrictionTracker");
  DoFnSignatures.getSignature(BadFn.class);
}
 
示例10
@Test
public void testGetWatermarkEstimatorStateCoderReturnsWrongType() throws Exception {
  class BadFn extends DoFn<Integer, String> {
    @ProcessElement
    public void process(
        ProcessContext context, RestrictionTracker<SomeRestriction, Void> tracker) {}

    @GetInitialRestriction
    public SomeRestriction getInitialRestriction(@Element Integer element) {
      return null;
    }

    @GetWatermarkEstimatorStateCoder
    public KvCoder getWatermarkEstimatorStateCoder() {
      return null;
    }
  }

  thrown.expectMessage(
      "getWatermarkEstimatorStateCoder() returns KvCoder which is not a subtype of Coder<Void>");
  DoFnSignatures.getSignature(BadFn.class);
}
 
示例11
@ProcessElement
public ProcessContinuation processElement(
    ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
  int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
  int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts);
  for (int i = trueStart, numIterations = 1;
      tracker.tryClaim((long) blockStarts[i]);
      ++i, ++numIterations) {
    for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) {
      c.output(index);
    }
    if (numIterations == numClaimsPerCall) {
      return resume();
    }
  }
  return stop();
}
 
示例12
@ProcessElement
public ProcessContinuation processElement(
    ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
  int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
  int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts);
  for (int i = trueStart, numIterations = 1;
      tracker.tryClaim((long) blockStarts[i]);
      ++i, ++numIterations) {
    for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) {
      c.output(KV.of(c.sideInput(sideInput) + ":" + c.element(), index));
    }
    if (numIterations == numClaimsPerCall) {
      return resume();
    }
  }
  return stop();
}
 
示例13
@ProcessElement
public ProcessContinuation process(
    @Element String element,
    OutputReceiver<String> receiver,
    RestrictionTracker<OffsetRange, Long> tracker,
    BundleFinalizer bundleFinalizer)
    throws InterruptedException {
  if (wasFinalized.get()) {
    // Claim beyond the end now that we know we have been finalized.
    tracker.tryClaim(Long.MAX_VALUE);
    receiver.output(element);
    return stop();
  }
  if (tracker.tryClaim(tracker.currentRestriction().getFrom() + 1)) {
    bundleFinalizer.afterBundleCommit(
        Instant.now().plus(Duration.standardSeconds(MAX_ATTEMPTS)),
        () -> wasFinalized.set(true));
    // We sleep here instead of setting a resume time since the resume time doesn't need to
    // be honored.
    sleep(1000L); // 1 second
    return resume();
  }
  return stop();
}
 
示例14
@ProcessElement
public void processElement(
    @Element Read read,
    OutputReceiver<Result> out,
    RestrictionTracker<ByteKeyRange, ByteKey> tracker)
    throws Exception {
  Connection connection = ConnectionFactory.createConnection(read.getConfiguration());
  TableName tableName = TableName.valueOf(read.getTableId());
  Table table = connection.getTable(tableName);
  final ByteKeyRange range = tracker.currentRestriction();
  try (ResultScanner scanner =
      table.getScanner(HBaseUtils.newScanInRange(read.getScan(), range))) {
    for (Result result : scanner) {
      ByteKey key = ByteKey.copyFrom(result.getRow());
      if (!tracker.tryClaim(key)) {
        return;
      }
      out.output(result);
    }
    tracker.tryClaim(ByteKey.EMPTY);
  }
}
 
示例15
@ProcessElement
public ProcessContinuation process(
    ProcessContext context, RestrictionTracker<OffsetRange, Long> tracker) {
  Uninterruptibles.sleepUninterruptibly(
      sleepBeforeFirstClaim.getMillis(), TimeUnit.MILLISECONDS);
  for (long i = tracker.currentRestriction().getFrom(), numIterations = 1;
      tracker.tryClaim(i);
      ++i, ++numIterations) {
    Uninterruptibles.sleepUninterruptibly(
        sleepBeforeEachOutput.getMillis(), TimeUnit.MILLISECONDS);
    context.output("" + i);
    if (numIterations == numOutputsPerProcessCall) {
      return resume();
    }
  }
  return stop();
}
 
示例16
@Test
public void testInvokeProcessElementOutputDisallowedBeforeTryClaim() throws Exception {
  DoFn<Void, String> brokenFn =
      new DoFn<Void, String>() {
        @ProcessElement
        public void process(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
          c.output("foo");
        }

        @GetInitialRestriction
        public OffsetRange getInitialRestriction(@Element Void element) {
          throw new UnsupportedOperationException("Should not be called in this test");
        }
      };
  e.expectMessage("Output is not allowed before tryClaim()");
  runTest(brokenFn, new OffsetRange(0, 5));
}
 
示例17
@Test
public void testInvokeProcessElementOutputDisallowedAfterFailedTryClaim() throws Exception {
  DoFn<Void, String> brokenFn =
      new DoFn<Void, String>() {
        @ProcessElement
        public void process(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
          assertFalse(tracker.tryClaim(6L));
          c.output("foo");
        }

        @GetInitialRestriction
        public OffsetRange getInitialRestriction(@Element Void element) {
          throw new UnsupportedOperationException("Should not be called in this test");
        }
      };
  e.expectMessage("Output is not allowed after a failed tryClaim()");
  runTest(brokenFn, new OffsetRange(0, 5));
}
 
示例18
@ProcessElement
public void processElement(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
  // create the channel
  String fileName = c.element().getKey();
  try (SeekableByteChannel channel = getReader(c.element().getValue())) {
    ByteBuffer readBuffer = ByteBuffer.allocate(BATCH_SIZE);
    ByteString buffer = ByteString.EMPTY;
    for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
      long startOffset = (i * BATCH_SIZE) - BATCH_SIZE;
      channel.position(startOffset);
      readBuffer = ByteBuffer.allocate(BATCH_SIZE);
      buffer = ByteString.EMPTY;
      channel.read(readBuffer);
      readBuffer.flip();
      buffer = ByteString.copyFrom(readBuffer);
      readBuffer.clear();
      LOG.debug(
          "Current Restriction {}, Content Size{}",
          tracker.currentRestriction(),
          buffer.size());
      c.output(KV.of(fileName, buffer.toStringUtf8().trim()));
    }
  } catch (Exception e) {

    c.output(textReaderFailedElements, e.getMessage());
  }
}
 
示例19
/**
 * Returns a thread safe {@link RestrictionTracker} which reports all claim attempts to the
 * specified {@link ClaimObserver}.
 */
public static <RestrictionT, PositionT> RestrictionTracker<RestrictionT, PositionT> observe(
    RestrictionTracker<RestrictionT, PositionT> restrictionTracker,
    ClaimObserver<PositionT> claimObserver) {
  if (restrictionTracker instanceof RestrictionTracker.HasProgress) {
    return new RestrictionTrackerObserverWithProgress<>(restrictionTracker, claimObserver);
  } else {
    return new RestrictionTrackerObserver<>(restrictionTracker, claimObserver);
  }
}
 
示例20
private Progress getProgress() {
  synchronized (splitLock) {
    if (currentTracker instanceof RestrictionTracker.HasProgress) {
      return ((HasProgress) currentTracker).getProgress();
    }
  }
  return null;
}
 
示例21
@ProcessElement
public ProcessContinuation processElement(
    ProcessContext context,
    RestrictionTracker<OffsetRange, Long> tracker,
    ManualWatermarkEstimator<Instant> watermarkEstimator)
    throws Exception {
  long checkpointUpperBound = CHECKPOINT_UPPER_BOUND;
  long position = tracker.currentRestriction().getFrom();
  boolean claimStatus;
  while (true) {
    claimStatus = (tracker.tryClaim(position));
    if (!claimStatus) {
      break;
    } else if (position == SPLIT_ELEMENT) {
      enableAndWaitForTrySplitToHappen();
    }
    context.outputWithTimestamp(
        context.element() + ":" + position, GlobalWindow.TIMESTAMP_MIN_VALUE.plus(position));
    watermarkEstimator.setWatermark(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(position));
    position += 1L;
    if (position == checkpointUpperBound) {
      break;
    }
  }
  if (!claimStatus) {
    return ProcessContinuation.stop();
  } else {
    return ProcessContinuation.resume().withResumeDelay(Duration.millis(54321L));
  }
}
 
示例22
@Override
@ProcessElement
public ProcessContinuation processElement(
    ProcessContext context,
    RestrictionTracker<OffsetRange, Long> tracker,
    ManualWatermarkEstimator<Instant> watermarkEstimator)
    throws Exception {
  long checkpointUpperBound = Long.parseLong(context.sideInput(singletonSideInput));
  long position = tracker.currentRestriction().getFrom();
  boolean claimStatus;
  while (true) {
    claimStatus = (tracker.tryClaim(position));
    if (!claimStatus) {
      break;
    } else if (position == NonWindowObservingTestSplittableDoFn.SPLIT_ELEMENT) {
      enableAndWaitForTrySplitToHappen();
    }
    context.outputWithTimestamp(
        context.element() + ":" + position, GlobalWindow.TIMESTAMP_MIN_VALUE.plus(position));
    watermarkEstimator.setWatermark(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(position));
    position += 1L;
    if (position == checkpointUpperBound) {
      break;
    }
  }
  if (!claimStatus) {
    return ProcessContinuation.stop();
  } else {
    return ProcessContinuation.resume().withResumeDelay(Duration.millis(54321L));
  }
}
 
示例23
@ProcessElement
public ProcessContinuation processElement(
    @Element SequenceDefinition srcElement,
    OutputReceiver<Instant> out,
    RestrictionTracker<OffsetRange, Long> restrictionTracker) {

  OffsetRange restriction = restrictionTracker.currentRestriction();
  Long interval = srcElement.durationMilliSec;
  Long nextOutput = restriction.getFrom() + interval;

  boolean claimSuccess = true;

  while (claimSuccess && Instant.ofEpochMilli(nextOutput).isBeforeNow()) {
    claimSuccess = restrictionTracker.tryClaim(nextOutput);
    if (claimSuccess) {
      Instant output = Instant.ofEpochMilli(nextOutput);
      out.outputWithTimestamp(output, output);
      nextOutput = nextOutput + interval;
    }
  }

  ProcessContinuation continuation = ProcessContinuation.stop();
  if (claimSuccess) {
    Duration offset = new Duration(Instant.now(), Instant.ofEpochMilli(nextOutput));
    continuation = ProcessContinuation.resume().withResumeDelay(offset);
  }
  return continuation;
}
 
示例24
/** Uses {@link HasDefaultTracker} to produce the tracker. */
@SuppressWarnings("unused")
public static <InputT, OutputT, RestrictionT, PositionT>
    RestrictionTracker<RestrictionT, PositionT> invokeNewTracker(
        DoFnInvoker.ArgumentProvider<InputT, OutputT> argumentProvider) {
  return ((HasDefaultTracker) argumentProvider.restriction()).newTracker();
}
 
示例25
/**
 * Generates a {@link TypeDescriptor} for {@code RestrictionTracker<RestrictionT, ?>} given {@code
 * RestrictionT}.
 */
private static <RestrictionT>
    TypeDescriptor<RestrictionTracker<RestrictionT, ?>> restrictionTrackerTypeOf(
        TypeDescriptor<RestrictionT> restrictionT) {
  return new TypeDescriptor<RestrictionTracker<RestrictionT, ?>>() {}.where(
      new TypeParameter<RestrictionT>() {}, restrictionT);
}
 
示例26
@ProcessElement
public void processElement(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
  long position = tracker.currentRestriction().getFrom();
  while (tracker.tryClaim(position)) {
    TimestampedValue<OutputT> value = c.element().getValue().get((int) position);
    c.outputWithTimestamp(KV.of(c.element().getKey(), value.getValue()), value.getTimestamp());
    position += 1L;
  }
}
 
示例27
@ProcessElement
public void processElement(
    RestrictionTracker<BoundedSource<T>, TimestampedValue<T>[]> tracker,
    OutputReceiver<T> receiver)
    throws IOException {
  TimestampedValue<T>[] out = new TimestampedValue[1];
  while (tracker.tryClaim(out)) {
    receiver.outputWithTimestamp(out[0].getValue(), out[0].getTimestamp());
  }
}
 
示例28
@NewTracker
public RestrictionTracker<
        UnboundedSourceRestriction<OutputT, CheckpointT>, UnboundedSourceValue<OutputT>[]>
    restrictionTracker(
        @Restriction UnboundedSourceRestriction<OutputT, CheckpointT> restriction,
        PipelineOptions pipelineOptions) {
  return new UnboundedSourceAsSDFRestrictionTracker(restriction, pipelineOptions);
}
 
示例29
@Override
public Progress getProgress() {
  // We treat the empty source as implicitly done.
  if (currentRestriction().getSource() instanceof EmptyUnboundedSource) {
    return RestrictionTracker.Progress.from(1, 0);
  }

  if (currentReader == null) {
    try {
      currentReader =
          initialRestriction
              .getSource()
              .createReader(pipelineOptions, initialRestriction.getCheckpoint());
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

  long size = currentReader.getSplitBacklogBytes();
  if (size != UnboundedReader.BACKLOG_UNKNOWN) {
    // The UnboundedSource/UnboundedReader API has no way of reporting how much work
    // has been completed so runners can only see the work remaining changing.
    return RestrictionTracker.Progress.from(0, size);
  }

  // TODO: Support "global" backlog reporting
  // size = reader.getTotalBacklogBytes();
  // if (size != UnboundedReader.BACKLOG_UNKNOWN) {
  //   return size;
  // }

  // We treat unknown as 0 progress
  return RestrictionTracker.Progress.from(0, 1);
}
 
示例30
@ProcessElement
public ProcessContinuation processElement(
    ProcessContext c,
    RestrictionTracker<SomeRestriction, Void> tracker,
    WatermarkEstimator<Instant> watermarkEstimator) {
  return null;
}