Java源码示例:org.apache.samza.system.IncomingMessageEnvelope

示例1
private List<IncomingMessageEnvelope> getNextMessages(
    SystemStreamPartition ssp, long timeoutMillis) throws InterruptedException {
  if (lastException != null) {
    throw new RuntimeException(lastException);
  }

  final List<IncomingMessageEnvelope> envelopes = new ArrayList<>();
  final BlockingQueue<IncomingMessageEnvelope> queue = queues.get(ssp);
  final IncomingMessageEnvelope envelope = queue.poll(timeoutMillis, TimeUnit.MILLISECONDS);

  if (envelope != null) {
    envelopes.add(envelope);
    queue.drainTo(envelopes);
  }

  available.release(envelopes.size());

  if (lastException != null) {
    throw new RuntimeException(lastException);
  }

  return envelopes;
}
 
示例2
private ReaderTask(
    Map<UnboundedReader, SystemStreamPartition> readerToSsp,
    Coder<CheckpointMarkT> checkpointMarkCoder,
    int capacity,
    long watermarkInterval,
    FnWithMetricsWrapper metricsWrapper) {
  this.readerToSsp = readerToSsp;
  this.checkpointMarkCoder = checkpointMarkCoder;
  this.readers = ImmutableList.copyOf(readerToSsp.keySet());
  this.watermarkInterval = watermarkInterval;
  this.available = new Semaphore(capacity);
  this.metricsWrapper = metricsWrapper;

  final Map<SystemStreamPartition, LinkedBlockingQueue<IncomingMessageEnvelope>> qs =
      new HashMap<>();
  readerToSsp.values().forEach(ssp -> qs.put(ssp, new LinkedBlockingQueue<>()));
  this.queues = ImmutableMap.copyOf(qs);
}
 
示例3
public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
  Map<SystemStreamPartition, List<IncomingMessageEnvelope>> map = new LinkedHashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
  assertEquals(1, systemStreamPartitions.size());
  SystemStreamPartition systemStreamPartition = systemStreamPartitions.iterator().next();
  assertEquals(expectedSystemStreamPartition, systemStreamPartition);

  if (pollCount++ == 0) {
    List<IncomingMessageEnvelope> list = new ArrayList<IncomingMessageEnvelope>();
    SetConfig setConfig1 = new SetConfig("test", "job.name", "my-job-name");
    SetConfig setConfig2 = new SetConfig("test", "job.id", "1234");
    Delete delete = new Delete("test", "job.name", SetConfig.TYPE);
    list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(setConfig1.getKeyArray()), serialize(setConfig1.getMessageMap())));
    list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(setConfig2.getKeyArray()), serialize(setConfig2.getMessageMap())));
    list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(delete.getKeyArray()), delete.getMessageMap()));
    map.put(systemStreamPartition, list);
  }

  return map;
}
 
示例4
private static List<IncomingMessageEnvelope> consumeUntilTimeoutOrWatermark(
    SystemConsumer consumer, SystemStreamPartition ssp, long timeoutMillis)
    throws InterruptedException {
  assertTrue("Expected timeoutMillis (" + timeoutMillis + ") >= 0", timeoutMillis >= 0);

  final List<IncomingMessageEnvelope> accumulator = new ArrayList<>();
  final long start = System.currentTimeMillis();
  long now = start;
  while (timeoutMillis + start >= now) {
    accumulator.addAll(pollOnce(consumer, ssp, now - start - timeoutMillis));
    if (!accumulator.isEmpty()
        && MessageType.of(accumulator.get(accumulator.size() - 1).getMessage())
            == MessageType.WATERMARK) {
      break;
    }
    now = System.currentTimeMillis();
  }
  return accumulator;
}
 
示例5
private static List<IncomingMessageEnvelope> consumeUntilTimeoutOrEos(
    SystemConsumer consumer, SystemStreamPartition ssp, long timeoutMillis)
    throws InterruptedException {
  assertTrue("Expected timeoutMillis (" + timeoutMillis + ") >= 0", timeoutMillis >= 0);

  final List<IncomingMessageEnvelope> accumulator = new ArrayList<>();
  final long start = System.currentTimeMillis();
  long now = start;
  while (timeoutMillis + start >= now) {
    accumulator.addAll(pollOnce(consumer, ssp, now - start - timeoutMillis));
    if (!accumulator.isEmpty() && accumulator.get(accumulator.size() - 1).isEndOfStream()) {
      break;
    }
    now = System.currentTimeMillis();
  }
  return accumulator;
}
 
示例6
/**
 * Processes the incoming side input message envelope and updates the last processed offset for its SSP.
 * Synchronized inorder to be exclusive with flush().
 *
 * @param envelope incoming envelope to be processed
 */
public synchronized void process(IncomingMessageEnvelope envelope) {
  SystemStreamPartition envelopeSSP = envelope.getSystemStreamPartition();
  String envelopeOffset = envelope.getOffset();

  for (String store: this.sspToStores.get(envelopeSSP)) {
    SideInputsProcessor storeProcessor = this.storeToProcessor.get(store);
    KeyValueStore keyValueStore = (KeyValueStore) this.taskSideInputStorageManager.getStore(store);
    Collection<Entry<?, ?>> entriesToBeWritten = storeProcessor.process(envelope, keyValueStore);

    // TODO: SAMZA-2255: optimize writes to side input stores
    for (Entry entry : entriesToBeWritten) {
      // If the key is null we ignore, if the value is null, we issue a delete, else we issue a put
      if (entry.getKey() != null) {
        if (entry.getValue() != null) {
          keyValueStore.put(entry.getKey(), entry.getValue());
        } else {
          keyValueStore.delete(entry.getKey());
        }
      }
    }
  }

  this.lastProcessedOffsets.put(envelopeSSP, envelopeOffset);
}
 
示例7
/**
 * Compare two multi-file style offset. A multi-file style offset consist of both
 * the file index as well as the offset within that file. And the format of it is:
 * "fileIndex:offsetWithinFile"
 * For example, "2:0", "3:127"
 * Format of the offset within file is defined by the implementation of
 * {@link org.apache.samza.system.hdfs.reader.SingleFileHdfsReader} itself.
 *
 * @param offset1 First offset for comparison.
 * @param offset2 Second offset for comparison.
 * @return -1, if offset1 @lt offset2
 *          0, if offset1 == offset2
 *          1, if offset1 @gt offset2
 *          null, if not comparable
 */
@Override
public Integer offsetComparator(String offset1, String offset2) {
  if (StringUtils.isBlank(offset1) || StringUtils.isBlank(offset2)) {
    return null;
  }
  /*
   * Properly handle END_OF_STREAM offset here. If both are END_OF_STREAM,
   * then they are equal. Otherwise END_OF_STREAM is always greater than any
   * other offsets.
   */
  if (offset1.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) {
    return offset2.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET) ? 0 : 1;
  }
  if (offset2.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) {
    return -1;
  }
  int fileIndex1 = MultiFileHdfsReader.getCurFileIndex(offset1);
  int fileIndex2 = MultiFileHdfsReader.getCurFileIndex(offset2);
  if (fileIndex1 == fileIndex2) {
    String offsetWithinFile1 = MultiFileHdfsReader.getCurSingleFileOffset(offset1);
    String offsetWithinFile2 = MultiFileHdfsReader.getCurSingleFileOffset(offset2);
    return HdfsReaderFactory.offsetComparator(readerType, offsetWithinFile1, offsetWithinFile2);
  }
  return Integer.compare(fileIndex1, fileIndex2);
}
 
示例8
/**
 * {@inheritDoc}
 */
@Override
public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
  Set<SystemStreamPartition> systemStreamPartitions, long timeout)
  throws InterruptedException {
  systemStreamPartitions.forEach(systemStreamPartition -> {
    Future status = readerRunnableStatus.get(systemStreamPartition);
    if (status.isDone()) {
      try {
        status.get();
      } catch (ExecutionException | InterruptedException e) {
        MultiFileHdfsReader reader = readers.get(systemStreamPartition);
        LOG.warn(
          String.format("Detect failure in ReaderRunnable for ssp: %s. Try to reconnect now.", systemStreamPartition),
          e);
        reader.reconnect();
        readerRunnableStatus.put(systemStreamPartition, executorService.submit(new ReaderRunnable(reader)));
      }
    }
  });
  return super.poll(systemStreamPartitions, timeout);
}
 
示例9
@Test(expected = SamzaException.class)
public void testReachingMaxReconnect() {
  int numMaxRetries = 3;
  SystemStreamPartition ssp = new SystemStreamPartition("hdfs", "testStream", new Partition(0));
  MultiFileHdfsReader multiReader = new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp, Arrays.asList(descriptors), "0:0", numMaxRetries);
  // first read a few events, and then reconnect
  for (int i = 0; i < NUM_EVENTS / 2; i++) {
    multiReader.readNext();
  }
  for (int i = 0; i < numMaxRetries; i++) {
    IncomingMessageEnvelope envelope = multiReader.readNext();
    multiReader.reconnect();
    IncomingMessageEnvelope envelopeAfterReconnect = multiReader.readNext();
    Assert.assertEquals(envelope, envelopeAfterReconnect);
  }
  multiReader.readNext();
  multiReader.reconnect();
  Assert.fail();
}
 
示例10
@Before
public void setup() {
  completeCount = new AtomicInteger(0);
  failureCount = new AtomicInteger(0);
  throwable = null;

  listener = new TaskCallbackListener() {

    @Override
    public void onComplete(TaskCallback callback) {
      completeCount.incrementAndGet();
    }

    @Override
    public void onFailure(TaskCallback callback, Throwable t) {
      throwable = t;
      failureCount.incrementAndGet();
    }
  };

  callback = new TaskCallbackImpl(listener, null, mock(IncomingMessageEnvelope.class), null, 0L, 0L);
}
 
示例11
/**
 * Fetch the pending envelope in the pending queue for the task to process.
 * Update the chooser for flow control on the SSP level. Once it's updated, the RunLoop
 * will be able to choose new messages from this SSP for the task to process. Note that we
 * update only when the envelope is first time being processed. This solves the issue in
 * Broadcast stream where a message need to be processed by multiple tasks. In that case,
 * the envelope will be in the pendingEnvelopeQueue of each task. Only the first fetch updates
 * the chooser with the next envelope in the broadcast stream partition.
 * The function will be called in the run loop thread so no synchronization.
 * @return
 */
private IncomingMessageEnvelope fetchEnvelope() {
  PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.remove();
  int queueSize = pendingEnvelopeQueue.size();
  taskMetrics.pendingMessages().set(queueSize);
  log.trace("fetch envelope ssp {} offset {} to process.",
      pendingEnvelope.envelope.getSystemStreamPartition(), pendingEnvelope.envelope.getOffset());
  log.debug("Task {} pending envelopes count is {} after fetching.", taskName, queueSize);

  if (pendingEnvelope.markProcessed()) {
    SystemStreamPartition partition = pendingEnvelope.envelope.getSystemStreamPartition();
    consumerMultiplexer.tryUpdate(partition);
    log.debug("Update chooser for {}", partition);
  }
  return pendingEnvelope.envelope;
}
 
示例12
private List<IncomingMessageEnvelope> consumeRawMessages(SystemConsumer consumer, Set<SystemStreamPartition> sspsToPoll) {
  try {
    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> results = consumer.poll(sspsToPoll, POLL_TIMEOUT_MS);

    return results.entrySet()
        .stream()
        .filter(entry -> entry.getValue().size() != 0)
        .map(Map.Entry::getValue)
        .flatMap(List::stream)
        .collect(Collectors.toList());
  } catch (Exception e) {
    fail("Unable to consume messages");
  }

  return new ArrayList<>();
}
 
示例13
@Test
public void testSyncTaskWithMultiplePartitionMultithreadedWithCustomIME() throws Exception {
  Map<Integer, List<KV>> inputPartitionData = new HashMap<>();
  Map<Integer, List<KV>> inputPartitionIME = new HashMap<>();
  Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>();
  genData(inputPartitionData, expectedOutputPartitionData);

  for (Map.Entry<Integer, List<KV>> entry: inputPartitionData.entrySet()) {
    Integer partitionId = entry.getKey();
    List<KV> messages = entry.getValue();
    SystemStreamPartition ssp = new SystemStreamPartition("test", "input", new Partition(partitionId));
    inputPartitionIME.put(partitionId, new ArrayList<>());
    int offset = 0;
    for (KV message: messages) {
      IncomingMessageEnvelope ime = new IncomingMessageEnvelope(ssp, String.valueOf(offset++), message.key, message.getValue());
      inputPartitionIME.get(partitionId).add(KV.of(message.key, ime));
    }
  }
  syncTaskWithMultiplePartitionMultithreadedHelper(inputPartitionIME, expectedOutputPartitionData);
}
 
示例14
@Test
public void testReadSucceedsOnKeySerdeExceptionsWhenValidationIsDisabled() throws Exception {
  KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
      CHECKPOINT_SYSTEM, 1);
  Config mockConfig = mock(Config.class);
  when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY)).thenReturn(GROUPER_FACTORY_CLASS);

  // mock out a consumer that returns a single checkpoint IME
  SystemStreamPartition ssp = new SystemStreamPartition("system-1", "input-topic", new Partition(0));
  List<List<IncomingMessageEnvelope>> checkpointEnvelopes = ImmutableList.of(
      ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, "0")));
  SystemConsumer mockConsumer = newConsumer(checkpointEnvelopes);

  SystemAdmin mockAdmin = newAdmin("0", "1");
  SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);

  // wire up an exception throwing serde with the checkpointmanager
  KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
      false, mockConfig, mock(MetricsRegistry.class), new ExceptionThrowingCheckpointSerde(),
      new ExceptionThrowingCheckpointKeySerde());
  checkpointManager.register(TASK1);
  checkpointManager.start();

  // expect the read to succeed inspite of the exception from ExceptionThrowingSerde
  checkpointManager.readLastCheckpoint(TASK1);
}
 
示例15
@Override
public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> set, long timeout)
    throws InterruptedException {
  Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopeMap = new HashMap<>();
  set.forEach(ssp -> {
    int curMessages = curMessagesPerSsp.get(ssp);
    // We send num Messages and an end of stream message following that.
    List<IncomingMessageEnvelope> envelopes =
        IntStream.range(curMessages, curMessages + numMessages / 4)
            .mapToObj(i -> i < numMessages ? new IncomingMessageEnvelope(ssp, null, getKey(i, ssp),
                getData(i, ssp)) : IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp))
            .collect(Collectors.toList());
    envelopeMap.put(ssp, envelopes);
    curMessagesPerSsp.put(ssp, curMessages + numMessages / 4);
  });
  if (sleepBetweenPollsMs > 0) {
    Thread.sleep(sleepBetweenPollsMs);
  }

  return envelopeMap;
}
 
示例16
/**
 * Pass an invalid IME to processAsync. Any exceptions in processAsync should still get propagated through the
 * task callback.
 */
@Test
public void testExceptionsInProcessInvokesTaskCallback() throws InterruptedException {
  ExecutorService taskThreadPool = Executors.newFixedThreadPool(2);
  TaskCallback mockTaskCallback = mock(TaskCallback.class);
  MessageCollector mockMessageCollector = mock(MessageCollector.class);
  TaskCoordinator mockTaskCoordinator = mock(TaskCoordinator.class);
  StreamOperatorTask operatorTask = new StreamOperatorTask(mock(OperatorSpecGraph.class));
  operatorTask.setTaskThreadPool(taskThreadPool);

  CountDownLatch failureLatch = new CountDownLatch(1);

  doAnswer(ctx -> {
    failureLatch.countDown();
    return null;
  }).when(mockTaskCallback).failure(anyObject());

  operatorTask.processAsync(mock(IncomingMessageEnvelope.class), mockMessageCollector,
      mockTaskCoordinator, mockTaskCallback);
  failureLatch.await();
}
 
示例17
private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> readEvents(Set<SystemStreamPartition> ssps,
    KinesisSystemConsumer consumer, int numEvents) throws InterruptedException {
  Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages = new HashMap<>();
  int totalEventsConsumed = 0;

  while (totalEventsConsumed < numEvents) {
    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> receivedMessages =
        consumer.poll(ssps, Duration.ofSeconds(1).toMillis());
    receivedMessages.forEach((key, value) -> {
      if (messages.containsKey(key)) {
        messages.get(key).addAll(value);
      } else {
        messages.put(key, new ArrayList<>(value));
      }
    });
    totalEventsConsumed = messages.values().stream().mapToInt(List::size).sum();
  }

  if (totalEventsConsumed < numEvents) {
    String msg = String.format("Received only %d of %d events", totalEventsConsumed, numEvents);
    throw new SamzaException(msg);
  }
  return messages;
}
 
示例18
private ReaderTask(
    Map<BoundedReader<T>, SystemStreamPartition> readerToSsp,
    int capacity,
    FnWithMetricsWrapper metricsWrapper) {
  this.readerToSsp = readerToSsp;
  this.available = new Semaphore(capacity);
  this.metricsWrapper = metricsWrapper;

  final Map<SystemStreamPartition, LinkedBlockingQueue<IncomingMessageEnvelope>> qs =
      new HashMap<>();
  readerToSsp.values().forEach(ssp -> qs.put(ssp, new LinkedBlockingQueue<>()));
  this.queues = ImmutableMap.copyOf(qs);
}
 
示例19
private void process(IncomingMessageEnvelope envelope,
    MessageCollector collector,
    TaskCoordinator coordinator,
    TaskCallback callback) {
  try {
    wrappedTask.process(envelope, collector, coordinator);
    callback.complete();
  } catch (Throwable t) {
    callback.failure(t);
  }
}
 
示例20
@Test
public void testWithUnkeyedInput() {
  InputOperatorImpl inputOperator =
      new InputOperatorImpl(new InputOperatorSpec("stream-id", null, null, null, false, "input-op-id"));

  IncomingMessageEnvelope ime =
      new IncomingMessageEnvelope(mock(SystemStreamPartition.class), "123", "key", "msg");

  Collection<Object> results =
      inputOperator.handleMessage(ime, mock(MessageCollector.class), mock(TaskCoordinator.class));

  Object result = results.iterator().next();
  assertEquals("msg", result);
}
 
示例21
private void setError(Exception exception) {
  this.lastException = exception;
  // A dummy message used to force the consumer to wake up immediately and check the
  // lastException field, which will be populated.
  readerToSsp
      .values()
      .forEach(
          ssp -> {
            final IncomingMessageEnvelope checkLastExceptionEvelope =
                new IncomingMessageEnvelope(ssp, null, null, null);
            enqueueUninterruptibly(checkLastExceptionEvelope);
          });
}
 
示例22
@Test
public void testUpdateCallbackWithCoordinatorRequests() {
  TaskName taskName = new TaskName("Partition 0");
  SystemStreamPartition ssp = new SystemStreamPartition("kafka", "topic", new Partition(0));

  // simulate out of order
  IncomingMessageEnvelope envelope2 = new IncomingMessageEnvelope(ssp, "2", null, null);
  ReadableCoordinator coordinator2 = new ReadableCoordinator(taskName);
  coordinator2.shutdown(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER);
  TaskCallbackImpl callback2 = new TaskCallbackImpl(listener, taskName, envelope2, coordinator2, 2, 0);
  List<TaskCallbackImpl> callbacksToUpdate = callbackManager.updateCallback(callback2);
  assertTrue(callbacksToUpdate.isEmpty());

  IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp, "1", null, null);
  ReadableCoordinator coordinator1 = new ReadableCoordinator(taskName);
  coordinator1.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
  TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator1, 1, 0);
  callbacksToUpdate = callbackManager.updateCallback(callback1);
  assertTrue(callbacksToUpdate.isEmpty());

  IncomingMessageEnvelope envelope0 = new IncomingMessageEnvelope(ssp, "0", null, null);
  ReadableCoordinator coordinator = new ReadableCoordinator(taskName);
  TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0, 0);
  callbacksToUpdate = callbackManager.updateCallback(callback0);
  assertEquals(2, callbacksToUpdate.size());

  //Check for envelope0
  TaskCallbackImpl taskCallback = callbacksToUpdate.get(0);
  assertTrue(taskCallback.matchSeqNum(0));
  assertEquals(ssp, taskCallback.envelope.getSystemStreamPartition());
  assertEquals("0", taskCallback.envelope.getOffset());

  //Check for envelope1
  taskCallback = callbacksToUpdate.get(1);
  assertTrue(taskCallback.matchSeqNum(1));
  assertEquals(ssp, taskCallback.envelope.getSystemStreamPartition());
  assertEquals("1", taskCallback.envelope.getOffset());
}
 
示例23
@Override
public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
    Set<SystemStreamPartition> systemStreamPartitions, long timeout)
    throws InterruptedException {
  assert !readerToSsp.isEmpty(); // start should be called before poll

  final Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = new HashMap<>();
  for (SystemStreamPartition ssp : systemStreamPartitions) {
    envelopes.put(ssp, readerTask.getNextMessages(ssp, timeout));
  }
  return envelopes;
}
 
示例24
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
  processed = true;
  if (e != null) {
    throw e;
  }
}
 
示例25
@Override
@SuppressWarnings("unchecked")
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
  Map<String, Object> message = (Map<String, Object>) envelope.getMessage();
  if (!message.get("event").equals("postMessage")) {
    throw new IllegalStateException("Unexpected event type on deliveries stream: " + message.get("event"));
  }
  String recipient = (String) message.get("recipient");
  String time = (String) message.get("time");

  homeTimeline.put(recipient + ":" + time + ":" + numMessages, message);
  numMessages++;
}
 
示例26
@Override
public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
    Set<SystemStreamPartition> ssps, long timeout) throws InterruptedException {
  if (isEnd.compareAndSet(false, true)) {
    return ssps.stream()
        .collect(
            Collectors.toMap(
                Function.identity(), SamzaImpulseSystemConsumer::constructMessages));
  } else {
    return Collections.emptyMap();
  }
}
 
示例27
static IncomingMessageEnvelope createElementMessage(
    SystemStreamPartition ssp, String offset, String element, Instant timestamp) {
  return new IncomingMessageEnvelope(
      ssp,
      offset,
      null,
      OpMessage.ofElement(WindowedValue.timestampedValueInGlobalWindow(element, timestamp)));
}
 
示例28
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)
    throws Exception {
  for (SamzaStream stream : this.outputStreams) {
    stream.setCollector(collector);
  }
  this.getProcessor().process((ContentEvent) envelope.getMessage());
}
 
示例29
public void start() throws IOException, InterruptedException {
  super.start();
  SystemAdmin systemAdmin = factory.getAdmin(systemName, config);
  SystemStreamMetadata ssm =
      systemAdmin.getSystemStreamMetadata(Collections.singleton(physicalStreamName)).get(physicalStreamName);

  NoOpMetricsRegistry metricsRegistry = new NoOpMetricsRegistry();
  Set<SystemStreamPartition> ssps = createSSPs(systemName, physicalStreamName, startPartition, endPartition);
  SystemConsumer consumer = factory.getConsumer(systemName, config, metricsRegistry);
  for (SystemStreamPartition ssp : ssps) {
    consumer.register(ssp, ssm.getSystemStreamPartitionMetadata().get(ssp.getPartition()).getOldestOffset());
  }

  consumer.start();

  System.out.println("starting consumption at " + Instant.now());
  Instant startTime = Instant.now();
  int numEvents = 0;
  while (numEvents < totalEvents) {
    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> pollResult = consumer.poll(ssps, 2000);
    numEvents += pollResult.values().stream().mapToInt(List::size).sum();
  }

  System.out.println("Ending consumption at " + Instant.now());
  System.out.println(String.format("Event Rate is %s Messages/Sec ",
      numEvents * 1000 / Duration.between(startTime, Instant.now()).toMillis()));
  consumer.stop();
  System.exit(0);
}
 
示例30
@Override
public IncomingMessageEnvelope readNext() {
  // get checkpoint for THIS record
  String checkpoint = nextOffset();
  GenericRecord record = fileReader.next();
  if (fileReader.previousSync() != curBlockStart) {
    curBlockStart = fileReader.previousSync();
    curRecordOffset = 0;
  } else {
    curRecordOffset++;
  }
  // avro schema doesn't necessarily have key field
  return new IncomingMessageEnvelope(systemStreamPartition, checkpoint, null, record);
}