Java源码示例:org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber

示例1
protected String getShardIteratorForSentinel(SequenceNumber sentinelSequenceNumber) throws InterruptedException {
	String nextShardItr;

	if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) {
		// if the shard is already closed, there will be no latest next record to get for this shard
		if (subscribedShard.isClosed()) {
			nextShardItr = null;
		} else {
			nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null);
		}
	} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) {
		nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null);
	} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
		nextShardItr = null;
	} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
		nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp);
	} else {
		throw new RuntimeException("Unknown sentinel type: " + sentinelSequenceNumber);
	}

	return nextShardItr;
}
 
示例2
/**
 * Update the shard to last processed sequence number state.
 * This method is called by {@link ShardConsumer}s.
 *
 * @param shardStateIndex index of the shard to update in subscribedShardsState;
 *                        this index should be the returned value from
 *                        {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called
 *                        when the shard state was registered.
 * @param lastSequenceNumber the last sequence number value to update
 */
protected final void updateState(int shardStateIndex, SequenceNumber lastSequenceNumber) {
	synchronized (checkpointLock) {
		subscribedShardsState.get(shardStateIndex).setLastProcessedSequenceNum(lastSequenceNumber);

		// if a shard's state is updated to be SENTINEL_SHARD_ENDING_SEQUENCE_NUM by its consumer thread,
		// we've finished reading the shard and should determine it to be non-active
		if (lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
			LOG.info("Subtask {} has reached the end of subscribed shard: {}",
				indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getStreamShardHandle());

			// check if we need to mark the source as idle;
			// note that on resharding, if registerNewSubscribedShardState was invoked for newly discovered shards
			// AFTER the old shards had reached the end, the subtask's status will be automatically toggled back to
			// be active immediately afterwards as soon as we collect records from the new shards
			if (this.numberOfActiveShards.decrementAndGet() == 0) {
				LOG.info("Subtask {} has reached the end of all currently subscribed shards; marking the subtask as temporarily idle ...",
					indexOfThisConsumerSubtask);

				sourceContext.markAsTemporarilyIdle();
			}
		}
	}
}
 
示例3
protected String getShardIteratorForSentinel(SequenceNumber sentinelSequenceNumber) throws InterruptedException {
	String nextShardItr;

	if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) {
		// if the shard is already closed, there will be no latest next record to get for this shard
		if (subscribedShard.isClosed()) {
			nextShardItr = null;
		} else {
			nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null);
		}
	} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) {
		nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null);
	} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
		nextShardItr = null;
	} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
		nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp);
	} else {
		throw new RuntimeException("Unknown sentinel type: " + sentinelSequenceNumber);
	}

	return nextShardItr;
}
 
示例4
/**
 * Update the shard to last processed sequence number state.
 * This method is called by {@link ShardConsumer}s.
 *
 * @param shardStateIndex index of the shard to update in subscribedShardsState;
 *                        this index should be the returned value from
 *                        {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called
 *                        when the shard state was registered.
 * @param lastSequenceNumber the last sequence number value to update
 */
protected final void updateState(int shardStateIndex, SequenceNumber lastSequenceNumber) {
	synchronized (checkpointLock) {
		subscribedShardsState.get(shardStateIndex).setLastProcessedSequenceNum(lastSequenceNumber);

		// if a shard's state is updated to be SENTINEL_SHARD_ENDING_SEQUENCE_NUM by its consumer thread,
		// we've finished reading the shard and should determine it to be non-active
		if (lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
			LOG.info("Subtask {} has reached the end of subscribed shard: {}",
				indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getStreamShardHandle());

			// check if we need to mark the source as idle;
			// note that on resharding, if registerNewSubscribedShardState was invoked for newly discovered shards
			// AFTER the old shards had reached the end, the subtask's status will be automatically toggled back to
			// be active immediately afterwards as soon as we collect records from the new shards
			if (this.numberOfActiveShards.decrementAndGet() == 0) {
				LOG.info("Subtask {} has reached the end of all currently subscribed shards; marking the subtask as temporarily idle ...",
					indexOfThisConsumerSubtask);

				sourceContext.markAsTemporarilyIdle();
			}
		}
	}
}
 
示例5
protected String getShardIteratorForSentinel(SequenceNumber sentinelSequenceNumber) throws InterruptedException {
	String nextShardItr;

	if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) {
		// if the shard is already closed, there will be no latest next record to get for this shard
		if (subscribedShard.isClosed()) {
			nextShardItr = null;
		} else {
			nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null);
		}
	} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) {
		nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null);
	} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
		nextShardItr = null;
	} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
		nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp);
	} else {
		throw new RuntimeException("Unknown sentinel type: " + sentinelSequenceNumber);
	}

	return nextShardItr;
}
 
示例6
/**
 * Update the shard to last processed sequence number state.
 * This method is called by {@link ShardConsumer}s.
 *
 * @param shardStateIndex index of the shard to update in subscribedShardsState;
 *                        this index should be the returned value from
 *                        {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called
 *                        when the shard state was registered.
 * @param lastSequenceNumber the last sequence number value to update
 */
protected final void updateState(int shardStateIndex, SequenceNumber lastSequenceNumber) {
	synchronized (checkpointLock) {
		subscribedShardsState.get(shardStateIndex).setLastProcessedSequenceNum(lastSequenceNumber);

		// if a shard's state is updated to be SENTINEL_SHARD_ENDING_SEQUENCE_NUM by its consumer thread,
		// we've finished reading the shard and should determine it to be non-active
		if (lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
			LOG.info("Subtask {} has reached the end of subscribed shard: {}",
				indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getStreamShardHandle());

			// check if we need to mark the source as idle;
			// note that on resharding, if registerNewSubscribedShardState was invoked for newly discovered shards
			// AFTER the old shards had reached the end, the subtask's status will be automatically toggled back to
			// be active immediately afterwards as soon as we collect records from the new shards
			if (this.numberOfActiveShards.decrementAndGet() == 0) {
				LOG.info("Subtask {} has reached the end of all currently subscribed shards; marking the subtask as temporarily idle ...",
					indexOfThisConsumerSubtask);

				sourceContext.markAsTemporarilyIdle();
			}
		}
	}
}
 
示例7
/**
 * Register a new subscribed shard state.
 *
 * @param newSubscribedShardState the new shard state that this fetcher is to be subscribed to
 */
public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribedShardState) {
	synchronized (checkpointLock) {
		subscribedShardsState.add(newSubscribedShardState);

		// If a registered shard has initial state that is not SENTINEL_SHARD_ENDING_SEQUENCE_NUM (will be the case
		// if the consumer had already finished reading a shard before we failed and restored), we determine that
		// this subtask has a new active shard
		if (!newSubscribedShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
			this.numberOfActiveShards.incrementAndGet();
		}

		int shardStateIndex = subscribedShardsState.size() - 1;

		// track all discovered shards for watermark determination
		ShardWatermarkState sws = shardWatermarks.get(shardStateIndex);
		if (sws == null) {
			sws = new ShardWatermarkState();
			try {
				sws.periodicWatermarkAssigner = InstantiationUtil.clone(periodicWatermarkAssigner);
			} catch (Exception e) {
				throw new RuntimeException("Failed to instantiate new WatermarkAssigner", e);
			}
			sws.emitQueue = recordEmitter.getQueue(shardStateIndex);
			sws.lastUpdated = getCurrentTimeMillis();
			sws.lastRecordTimestamp = Long.MIN_VALUE;
			shardWatermarks.put(shardStateIndex, sws);
		}

		return shardStateIndex;
	}
}
 
示例8
@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedState() {
	StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);

	LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
	subscribedShardsStateUnderTest.add(
		new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
			fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));

	TestSourceContext<String> sourceContext = new TestSourceContext<>();

	TestableKinesisDataFetcher<String> fetcher =
		new TestableKinesisDataFetcher<>(
			Collections.singletonList("fakeStream"),
			sourceContext,
			new Properties(),
			new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
			10,
			2,
			new AtomicReference<>(),
			subscribedShardsStateUnderTest,
			KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
			Mockito.mock(KinesisProxyInterface.class));

	int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
	new ShardConsumer<>(
		fetcher,
		shardIndex,
		subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
		subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
		FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9, 500L),
		new ShardMetricsReporter()).run();

	assertEquals(1000, sourceContext.getCollectedOutputs().size());
	assertEquals(
		SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
		subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
}
 
示例9
/**
 * Register a new subscribed shard state.
 *
 * @param newSubscribedShardState the new shard state that this fetcher is to be subscribed to
 */
public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribedShardState) {
	synchronized (checkpointLock) {
		subscribedShardsState.add(newSubscribedShardState);

		// If a registered shard has initial state that is not SENTINEL_SHARD_ENDING_SEQUENCE_NUM (will be the case
		// if the consumer had already finished reading a shard before we failed and restored), we determine that
		// this subtask has a new active shard
		if (!newSubscribedShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
			this.numberOfActiveShards.incrementAndGet();
		}

		int shardStateIndex = subscribedShardsState.size() - 1;

		// track all discovered shards for watermark determination
		ShardWatermarkState sws = shardWatermarks.get(shardStateIndex);
		if (sws == null) {
			sws = new ShardWatermarkState();
			try {
				sws.periodicWatermarkAssigner = InstantiationUtil.clone(periodicWatermarkAssigner);
			} catch (Exception e) {
				throw new RuntimeException("Failed to instantiate new WatermarkAssigner", e);
			}
			sws.emitQueue = recordEmitter.getQueue(shardStateIndex);
			sws.lastUpdated = getCurrentTimeMillis();
			sws.lastRecordTimestamp = Long.MIN_VALUE;
			shardWatermarks.put(shardStateIndex, sws);
		}

		return shardStateIndex;
	}
}
 
示例10
@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedState() {
	StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);

	LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
	subscribedShardsStateUnderTest.add(
		new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
			fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));

	TestSourceContext<String> sourceContext = new TestSourceContext<>();

	TestableKinesisDataFetcher<String> fetcher =
		new TestableKinesisDataFetcher<>(
			Collections.singletonList("fakeStream"),
			sourceContext,
			new Properties(),
			new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
			10,
			2,
			new AtomicReference<>(),
			subscribedShardsStateUnderTest,
			KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
			Mockito.mock(KinesisProxyInterface.class));

	int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
	new ShardConsumer<>(
		fetcher,
		shardIndex,
		subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
		subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
		FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9, 500L),
		new ShardMetricsReporter()).run();

	assertEquals(1000, sourceContext.getCollectedOutputs().size());
	assertEquals(
		SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
		subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
}
 
示例11
/**
 * Register a new subscribed shard state.
 *
 * @param newSubscribedShardState the new shard state that this fetcher is to be subscribed to
 */
public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribedShardState) {
	synchronized (checkpointLock) {
		subscribedShardsState.add(newSubscribedShardState);

		// If a registered shard has initial state that is not SENTINEL_SHARD_ENDING_SEQUENCE_NUM (will be the case
		// if the consumer had already finished reading a shard before we failed and restored), we determine that
		// this subtask has a new active shard
		if (!newSubscribedShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
			this.numberOfActiveShards.incrementAndGet();
		}

		int shardStateIndex = subscribedShardsState.size() - 1;

		// track all discovered shards for watermark determination
		ShardWatermarkState sws = shardWatermarks.get(shardStateIndex);
		if (sws == null) {
			sws = new ShardWatermarkState();
			try {
				sws.periodicWatermarkAssigner = InstantiationUtil.clone(periodicWatermarkAssigner);
			} catch (Exception e) {
				throw new RuntimeException("Failed to instantiate new WatermarkAssigner", e);
			}
			sws.emitQueue = recordEmitter.getQueue(shardStateIndex);
			sws.lastUpdated = getCurrentTimeMillis();
			sws.lastRecordTimestamp = Long.MIN_VALUE;
			shardWatermarks.put(shardStateIndex, sws);
		}

		return shardStateIndex;
	}
}
 
示例12
@SuppressWarnings("unchecked")
@Override
public void run() {
	try {
		String nextShardItr = getShardIterator(lastSequenceNum);

		long processingStartTimeNanos = System.nanoTime();

		while (isRunning()) {
			if (nextShardItr == null) {
				fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());

				// we can close this consumer thread once we've reached the end of the subscribed shard
				break;
			} else {
				shardMetricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecordsPerFetch);
				GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);

				List<Record> aggregatedRecords = getRecordsResult.getRecords();
				int numberOfAggregatedRecords = aggregatedRecords.size();
				shardMetricsReporter.setNumberOfAggregatedRecords(numberOfAggregatedRecords);

				// each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding
				List<UserRecord> fetchedRecords = deaggregateRecords(
					aggregatedRecords,
					subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
					subscribedShard.getShard().getHashKeyRange().getEndingHashKey());

				long recordBatchSizeBytes = 0L;
				for (UserRecord record : fetchedRecords) {
					recordBatchSizeBytes += record.getData().remaining();
					deserializeRecordForCollectionAndUpdateState(record);
				}

				int numberOfDeaggregatedRecords = fetchedRecords.size();
				shardMetricsReporter.setNumberOfDeaggregatedRecords(numberOfDeaggregatedRecords);

				nextShardItr = getRecordsResult.getNextShardIterator();

				long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, System.nanoTime());
				long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos;
				maxNumberOfRecordsPerFetch = adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes, maxNumberOfRecordsPerFetch);
				shardMetricsReporter.setRunLoopTimeNanos(runLoopTimeNanos);
				processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop
			}
		}
	} catch (Throwable t) {
		fetcherRef.stopWithError(t);
	}
}
 
示例13
InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) {
	this.sentinelSequenceNumber = sentinelSequenceNumber;
}
 
示例14
public SentinelSequenceNumber toSentinelSequenceNumber() {
	return this.sentinelSequenceNumber;
}
 
示例15
@Override
public void run(SourceContext<T> sourceContext) throws Exception {

	// all subtasks will run a fetcher, regardless of whether or not the subtask will initially have
	// shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks
	// can potentially have new shards to subscribe to later on
	KinesisDataFetcher<T> fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer);

	// initial discovery
	List<StreamShardHandle> allShards = fetcher.discoverNewShardsToSubscribe();

	for (StreamShardHandle shard : allShards) {
		StreamShardMetadata.EquivalenceWrapper kinesisStreamShard =
			new StreamShardMetadata.EquivalenceWrapper(KinesisDataFetcher.convertToStreamShardMetadata(shard));

		if (sequenceNumsToRestore != null) {

			if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) {
				// if the shard was already seen and is contained in the state,
				// just use the sequence number stored in the state
				fetcher.registerNewSubscribedShardState(
					new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, sequenceNumsToRestore.get(kinesisStreamShard)));

				if (LOG.isInfoEnabled()) {
					LOG.info("Subtask {} is seeding the fetcher with restored shard {}," +
							" starting state set to the restored sequence number {}",
						getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), sequenceNumsToRestore.get(kinesisStreamShard));
				}
			} else {
				// the shard wasn't discovered in the previous run, therefore should be consumed from the beginning
				fetcher.registerNewSubscribedShardState(
					new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()));

				if (LOG.isInfoEnabled()) {
					LOG.info("Subtask {} is seeding the fetcher with new discovered shard {}," +
							" starting state set to the SENTINEL_EARLIEST_SEQUENCE_NUM",
						getRuntimeContext().getIndexOfThisSubtask(), shard.toString());
				}
			}
		} else {
			// we're starting fresh; use the configured start position as initial state
			SentinelSequenceNumber startingSeqNum =
				InitialPosition.valueOf(configProps.getProperty(
					ConsumerConfigConstants.STREAM_INITIAL_POSITION,
					ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)).toSentinelSequenceNumber();

			fetcher.registerNewSubscribedShardState(
				new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, startingSeqNum.get()));

			if (LOG.isInfoEnabled()) {
				LOG.info("Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}",
					getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), startingSeqNum.get());
			}
		}
	}

	// check that we are running before starting the fetcher
	if (!running) {
		return;
	}

	// expose the fetcher from this point, so that state
	// snapshots can be taken from the fetcher's state holders
	this.fetcher = fetcher;

	// start the fetcher loop. The fetcher will stop running only when cancel() or
	// close() is called, or an error is thrown by threads created by the fetcher
	fetcher.runFetcher();

	// check that the fetcher has terminated before fully closing
	fetcher.awaitTermination();
	sourceContext.close();
}
 
示例16
@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithUnexpectedExpiredIterator() {
	StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);

	LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
	subscribedShardsStateUnderTest.add(
		new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
			fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));

	TestSourceContext<String> sourceContext = new TestSourceContext<>();

	TestableKinesisDataFetcher<String> fetcher =
		new TestableKinesisDataFetcher<>(
			Collections.singletonList("fakeStream"),
			sourceContext,
			new Properties(),
			new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
			10,
			2,
			new AtomicReference<>(),
			subscribedShardsStateUnderTest,
			KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
			Mockito.mock(KinesisProxyInterface.class));

	int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
	new ShardConsumer<>(
		fetcher,
		shardIndex,
		subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
		subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
		// Get a total of 1000 records with 9 getRecords() calls,
		// and the 7th getRecords() call will encounter an unexpected expired shard iterator
		FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(
			1000, 9, 7, 500L),
		new ShardMetricsReporter()).run();

	assertEquals(1000, sourceContext.getCollectedOutputs().size());
	assertEquals(
		SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
		subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
}
 
示例17
@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads() {
	Properties consumerProperties = new Properties();
	consumerProperties.put("flink.shard.adaptivereads", "true");

	StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);

	LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
	subscribedShardsStateUnderTest.add(
		new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
			fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));

	TestSourceContext<String> sourceContext = new TestSourceContext<>();

	TestableKinesisDataFetcher<String> fetcher =
		new TestableKinesisDataFetcher<>(
			Collections.singletonList("fakeStream"),
			sourceContext,
			consumerProperties,
			new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
			10,
			2,
			new AtomicReference<>(),
			subscribedShardsStateUnderTest,
			KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
			Mockito.mock(KinesisProxyInterface.class));

	int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
	new ShardConsumer<>(
		fetcher,
		shardIndex,
		subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
		subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
		// Initial number of records to fetch --> 10
		FakeKinesisBehavioursFactory.initialNumOfRecordsAfterNumOfGetRecordsCallsWithAdaptiveReads(10, 2, 500L),
		new ShardMetricsReporter()).run();

	// Avg record size for first batch --> 10 * 10 Kb/10 = 10 Kb
	// Number of records fetched in second batch --> 2 Mb/10Kb * 5 = 40
	// Total number of records = 10 + 40 = 50
	assertEquals(50, sourceContext.getCollectedOutputs().size());
	assertEquals(
		SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
		subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
}
 
示例18
@Test
public void testRestoreWithEmptyState() throws Exception {
	final List<StreamShardHandle> initialDiscoveryShards = new ArrayList<>(TEST_STATE.size());
	for (StreamShardMetadata shardMetadata : TEST_STATE.keySet()) {
		Shard shard = new Shard();
		shard.setShardId(shardMetadata.getShardId());

		SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
		sequenceNumberRange.withStartingSequenceNumber("1");
		shard.setSequenceNumberRange(sequenceNumberRange);

		initialDiscoveryShards.add(new StreamShardHandle(shardMetadata.getStreamName(), shard));
	}

	final TestFetcher<String> fetcher = new TestFetcher<>(
		Collections.singletonList(TEST_STREAM_NAME),
		new TestSourceContext<>(),
		new TestRuntimeContext(true, 1, 0),
		TestUtils.getStandardProperties(),
		new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
		null,
		initialDiscoveryShards);

	final DummyFlinkKinesisConsumer<String> consumerFunction = new DummyFlinkKinesisConsumer<>(
		fetcher, new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()));

	StreamSource<String, DummyFlinkKinesisConsumer<String>> consumerOperator = new StreamSource<>(consumerFunction);

	final AbstractStreamOperatorTestHarness<String> testHarness =
		new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);

	testHarness.setup();
	testHarness.initializeState(
		OperatorSnapshotUtil.getResourceFilename(
			"kinesis-consumer-migration-test-flink" + testMigrateVersion + "-empty-snapshot"));
	testHarness.open();

	consumerFunction.run(new TestSourceContext<>());

	// assert that no state was restored
	assertTrue(consumerFunction.getRestoredState().isEmpty());

	// although the restore state is empty, the fetcher should still have been registered the initial discovered shard;
	// furthermore, the discovered shard should be considered a newly created shard while the job wasn't running,
	// and therefore should be consumed from the earliest sequence number
	KinesisStreamShardState restoredShardState = fetcher.getSubscribedShardsState().get(0);
	assertEquals(TEST_STREAM_NAME, restoredShardState.getStreamShardHandle().getStreamName());
	assertEquals(TEST_SHARD_ID, restoredShardState.getStreamShardHandle().getShard().getShardId());
	assertFalse(restoredShardState.getStreamShardHandle().isClosed());
	assertEquals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get(), restoredShardState.getLastProcessedSequenceNum());

	consumerOperator.close();
	consumerOperator.cancel();
}
 
示例19
@Test
public void testRestoreWithReshardedStream() throws Exception {
	final List<StreamShardHandle> initialDiscoveryShards = new ArrayList<>(TEST_STATE.size());
	for (StreamShardMetadata shardMetadata : TEST_STATE.keySet()) {
		// setup the closed shard
		Shard closedShard = new Shard();
		closedShard.setShardId(shardMetadata.getShardId());

		SequenceNumberRange closedSequenceNumberRange = new SequenceNumberRange();
		closedSequenceNumberRange.withStartingSequenceNumber("1");
		closedSequenceNumberRange.withEndingSequenceNumber("1087654321"); // this represents a closed shard
		closedShard.setSequenceNumberRange(closedSequenceNumberRange);

		initialDiscoveryShards.add(new StreamShardHandle(shardMetadata.getStreamName(), closedShard));

		// setup the new shards
		Shard newSplitShard1 = new Shard();
		newSplitShard1.setShardId(KinesisShardIdGenerator.generateFromShardOrder(1));

		SequenceNumberRange newSequenceNumberRange1 = new SequenceNumberRange();
		newSequenceNumberRange1.withStartingSequenceNumber("1087654322");
		newSplitShard1.setSequenceNumberRange(newSequenceNumberRange1);

		newSplitShard1.setParentShardId(TEST_SHARD_ID);

		Shard newSplitShard2 = new Shard();
		newSplitShard2.setShardId(KinesisShardIdGenerator.generateFromShardOrder(2));

		SequenceNumberRange newSequenceNumberRange2 = new SequenceNumberRange();
		newSequenceNumberRange2.withStartingSequenceNumber("2087654322");
		newSplitShard2.setSequenceNumberRange(newSequenceNumberRange2);

		newSplitShard2.setParentShardId(TEST_SHARD_ID);

		initialDiscoveryShards.add(new StreamShardHandle(shardMetadata.getStreamName(), newSplitShard1));
		initialDiscoveryShards.add(new StreamShardHandle(shardMetadata.getStreamName(), newSplitShard2));
	}

	final TestFetcher<String> fetcher = new TestFetcher<>(
		Collections.singletonList(TEST_STREAM_NAME),
		new TestSourceContext<>(),
		new TestRuntimeContext(true, 1, 0),
		TestUtils.getStandardProperties(),
		new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
		null,
		initialDiscoveryShards);

	final DummyFlinkKinesisConsumer<String> consumerFunction = new DummyFlinkKinesisConsumer<>(
		fetcher, new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()));

	StreamSource<String, DummyFlinkKinesisConsumer<String>> consumerOperator =
		new StreamSource<>(consumerFunction);

	final AbstractStreamOperatorTestHarness<String> testHarness =
		new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);

	testHarness.setup();
	testHarness.initializeState(
		OperatorSnapshotUtil.getResourceFilename(
			"kinesis-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"));
	testHarness.open();

	consumerFunction.run(new TestSourceContext<>());

	// assert that state is correctly restored
	assertNotEquals(null, consumerFunction.getRestoredState());
	assertEquals(1, consumerFunction.getRestoredState().size());
	assertEquals(TEST_STATE, removeEquivalenceWrappers(consumerFunction.getRestoredState()));

	// assert that the fetcher is registered with all shards, including new shards
	assertEquals(3, fetcher.getSubscribedShardsState().size());

	KinesisStreamShardState restoredClosedShardState = fetcher.getSubscribedShardsState().get(0);
	assertEquals(TEST_STREAM_NAME, restoredClosedShardState.getStreamShardHandle().getStreamName());
	assertEquals(TEST_SHARD_ID, restoredClosedShardState.getStreamShardHandle().getShard().getShardId());
	assertTrue(restoredClosedShardState.getStreamShardHandle().isClosed());
	assertEquals(TEST_SEQUENCE_NUMBER, restoredClosedShardState.getLastProcessedSequenceNum());

	KinesisStreamShardState restoredNewSplitShard1 = fetcher.getSubscribedShardsState().get(1);
	assertEquals(TEST_STREAM_NAME, restoredNewSplitShard1.getStreamShardHandle().getStreamName());
	assertEquals(KinesisShardIdGenerator.generateFromShardOrder(1), restoredNewSplitShard1.getStreamShardHandle().getShard().getShardId());
	assertFalse(restoredNewSplitShard1.getStreamShardHandle().isClosed());
	// new shards should be consumed from the beginning
	assertEquals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get(), restoredNewSplitShard1.getLastProcessedSequenceNum());

	KinesisStreamShardState restoredNewSplitShard2 = fetcher.getSubscribedShardsState().get(2);
	assertEquals(TEST_STREAM_NAME, restoredNewSplitShard2.getStreamShardHandle().getStreamName());
	assertEquals(KinesisShardIdGenerator.generateFromShardOrder(2), restoredNewSplitShard2.getStreamShardHandle().getShard().getShardId());
	assertFalse(restoredNewSplitShard2.getStreamShardHandle().isClosed());
	// new shards should be consumed from the beginning
	assertEquals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get(), restoredNewSplitShard2.getLastProcessedSequenceNum());

	consumerOperator.close();
	consumerOperator.cancel();
}
 
示例20
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldBeCorrectlySeededWithNewDiscoveredKinesisStreamShard() throws Exception {

	// ----------------------------------------------------------------------
	// setup initial state
	// ----------------------------------------------------------------------

	HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");

	// ----------------------------------------------------------------------
	// mock operator state backend and initial state for initializeState()
	// ----------------------------------------------------------------------

	TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
	for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
		listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
	}

	OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
	when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);

	StateInitializationContext initializationContext = mock(StateInitializationContext.class);
	when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
	when(initializationContext.isRestored()).thenReturn(true);

	// ----------------------------------------------------------------------
	// mock fetcher
	// ----------------------------------------------------------------------

	KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
	List<StreamShardHandle> shards = new ArrayList<>();
	shards.addAll(fakeRestoredState.keySet());
	shards.add(new StreamShardHandle("fakeStream2",
		new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))));
	when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);

	// assume the given config is correct
	PowerMockito.mockStatic(KinesisConfigUtil.class);
	PowerMockito.doNothing().when(KinesisConfigUtil.class);

	// ----------------------------------------------------------------------
	// start to test fetcher's initial state seeding
	// ----------------------------------------------------------------------

	TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
		"fakeStream", new Properties(), 10, 2);
	consumer.initializeState(initializationContext);
	consumer.open(new Configuration());
	consumer.run(Mockito.mock(SourceFunction.SourceContext.class));

	fakeRestoredState.put(new StreamShardHandle("fakeStream2",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
		SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
	for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
		Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
			new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
				restoredShard.getKey(), restoredShard.getValue()));
	}
}
 
示例21
@SuppressWarnings("unchecked")
@Override
public void run() {
	try {
		String nextShardItr = getShardIterator(lastSequenceNum);

		long processingStartTimeNanos = System.nanoTime();

		while (isRunning()) {
			if (nextShardItr == null) {
				fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());

				// we can close this consumer thread once we've reached the end of the subscribed shard
				break;
			} else {
				shardMetricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecordsPerFetch);
				GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);

				List<Record> aggregatedRecords = getRecordsResult.getRecords();
				int numberOfAggregatedRecords = aggregatedRecords.size();
				shardMetricsReporter.setNumberOfAggregatedRecords(numberOfAggregatedRecords);

				// each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding
				List<UserRecord> fetchedRecords = deaggregateRecords(
					aggregatedRecords,
					subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
					subscribedShard.getShard().getHashKeyRange().getEndingHashKey());

				long recordBatchSizeBytes = 0L;
				for (UserRecord record : fetchedRecords) {
					recordBatchSizeBytes += record.getData().remaining();
					deserializeRecordForCollectionAndUpdateState(record);
				}

				int numberOfDeaggregatedRecords = fetchedRecords.size();
				shardMetricsReporter.setNumberOfDeaggregatedRecords(numberOfDeaggregatedRecords);

				nextShardItr = getRecordsResult.getNextShardIterator();

				long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, System.nanoTime());
				long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos;
				maxNumberOfRecordsPerFetch = adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes, maxNumberOfRecordsPerFetch);
				shardMetricsReporter.setRunLoopTimeNanos(runLoopTimeNanos);
				processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop
			}
		}
	} catch (Throwable t) {
		fetcherRef.stopWithError(t);
	}
}
 
示例22
InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) {
	this.sentinelSequenceNumber = sentinelSequenceNumber;
}
 
示例23
public SentinelSequenceNumber toSentinelSequenceNumber() {
	return this.sentinelSequenceNumber;
}
 
示例24
@Override
public void run(SourceContext<T> sourceContext) throws Exception {

	// all subtasks will run a fetcher, regardless of whether or not the subtask will initially have
	// shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks
	// can potentially have new shards to subscribe to later on
	KinesisDataFetcher<T> fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer);

	// initial discovery
	List<StreamShardHandle> allShards = fetcher.discoverNewShardsToSubscribe();

	for (StreamShardHandle shard : allShards) {
		StreamShardMetadata.EquivalenceWrapper kinesisStreamShard =
			new StreamShardMetadata.EquivalenceWrapper(KinesisDataFetcher.convertToStreamShardMetadata(shard));

		if (sequenceNumsToRestore != null) {

			if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) {
				// if the shard was already seen and is contained in the state,
				// just use the sequence number stored in the state
				fetcher.registerNewSubscribedShardState(
					new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, sequenceNumsToRestore.get(kinesisStreamShard)));

				if (LOG.isInfoEnabled()) {
					LOG.info("Subtask {} is seeding the fetcher with restored shard {}," +
							" starting state set to the restored sequence number {}",
						getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), sequenceNumsToRestore.get(kinesisStreamShard));
				}
			} else {
				// the shard wasn't discovered in the previous run, therefore should be consumed from the beginning
				fetcher.registerNewSubscribedShardState(
					new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()));

				if (LOG.isInfoEnabled()) {
					LOG.info("Subtask {} is seeding the fetcher with new discovered shard {}," +
							" starting state set to the SENTINEL_EARLIEST_SEQUENCE_NUM",
						getRuntimeContext().getIndexOfThisSubtask(), shard.toString());
				}
			}
		} else {
			// we're starting fresh; use the configured start position as initial state
			SentinelSequenceNumber startingSeqNum =
				InitialPosition.valueOf(configProps.getProperty(
					ConsumerConfigConstants.STREAM_INITIAL_POSITION,
					ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)).toSentinelSequenceNumber();

			fetcher.registerNewSubscribedShardState(
				new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, startingSeqNum.get()));

			if (LOG.isInfoEnabled()) {
				LOG.info("Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}",
					getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), startingSeqNum.get());
			}
		}
	}

	// check that we are running before starting the fetcher
	if (!running) {
		return;
	}

	// expose the fetcher from this point, so that state
	// snapshots can be taken from the fetcher's state holders
	this.fetcher = fetcher;

	// start the fetcher loop. The fetcher will stop running only when cancel() or
	// close() is called, or an error is thrown by threads created by the fetcher
	fetcher.runFetcher();

	// check that the fetcher has terminated before fully closing
	fetcher.awaitTermination();
	sourceContext.close();
}
 
示例25
@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithUnexpectedExpiredIterator() {
	StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);

	LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
	subscribedShardsStateUnderTest.add(
		new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
			fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));

	TestSourceContext<String> sourceContext = new TestSourceContext<>();

	TestableKinesisDataFetcher<String> fetcher =
		new TestableKinesisDataFetcher<>(
			Collections.singletonList("fakeStream"),
			sourceContext,
			new Properties(),
			new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
			10,
			2,
			new AtomicReference<>(),
			subscribedShardsStateUnderTest,
			KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
			Mockito.mock(KinesisProxyInterface.class));

	int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
	new ShardConsumer<>(
		fetcher,
		shardIndex,
		subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
		subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
		// Get a total of 1000 records with 9 getRecords() calls,
		// and the 7th getRecords() call will encounter an unexpected expired shard iterator
		FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(
			1000, 9, 7, 500L),
		new ShardMetricsReporter()).run();

	assertEquals(1000, sourceContext.getCollectedOutputs().size());
	assertEquals(
		SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
		subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
}
 
示例26
@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads() {
	Properties consumerProperties = new Properties();
	consumerProperties.put("flink.shard.adaptivereads", "true");

	StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);

	LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
	subscribedShardsStateUnderTest.add(
		new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
			fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));

	TestSourceContext<String> sourceContext = new TestSourceContext<>();

	TestableKinesisDataFetcher<String> fetcher =
		new TestableKinesisDataFetcher<>(
			Collections.singletonList("fakeStream"),
			sourceContext,
			consumerProperties,
			new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
			10,
			2,
			new AtomicReference<>(),
			subscribedShardsStateUnderTest,
			KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
			Mockito.mock(KinesisProxyInterface.class));

	int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
	new ShardConsumer<>(
		fetcher,
		shardIndex,
		subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
		subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
		// Initial number of records to fetch --> 10
		FakeKinesisBehavioursFactory.initialNumOfRecordsAfterNumOfGetRecordsCallsWithAdaptiveReads(10, 2, 500L),
		new ShardMetricsReporter()).run();

	// Avg record size for first batch --> 10 * 10 Kb/10 = 10 Kb
	// Number of records fetched in second batch --> 2 Mb/10Kb * 5 = 40
	// Total number of records = 10 + 40 = 50
	assertEquals(50, sourceContext.getCollectedOutputs().size());
	assertEquals(
		SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
		subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
}
 
示例27
@Test
public void testRestoreWithEmptyState() throws Exception {
	final List<StreamShardHandle> initialDiscoveryShards = new ArrayList<>(TEST_STATE.size());
	for (StreamShardMetadata shardMetadata : TEST_STATE.keySet()) {
		Shard shard = new Shard();
		shard.setShardId(shardMetadata.getShardId());

		SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
		sequenceNumberRange.withStartingSequenceNumber("1");
		shard.setSequenceNumberRange(sequenceNumberRange);

		initialDiscoveryShards.add(new StreamShardHandle(shardMetadata.getStreamName(), shard));
	}

	final TestFetcher<String> fetcher = new TestFetcher<>(
		Collections.singletonList(TEST_STREAM_NAME),
		new TestSourceContext<>(),
		new TestRuntimeContext(true, 1, 0),
		TestUtils.getStandardProperties(),
		new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
		null,
		initialDiscoveryShards);

	final DummyFlinkKinesisConsumer<String> consumerFunction = new DummyFlinkKinesisConsumer<>(
		fetcher, new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()));

	StreamSource<String, DummyFlinkKinesisConsumer<String>> consumerOperator = new StreamSource<>(consumerFunction);

	final AbstractStreamOperatorTestHarness<String> testHarness =
		new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);

	testHarness.setup();
	testHarness.initializeState(
		OperatorSnapshotUtil.getResourceFilename(
			"kinesis-consumer-migration-test-flink" + testMigrateVersion + "-empty-snapshot"));
	testHarness.open();

	consumerFunction.run(new TestSourceContext<>());

	// assert that no state was restored
	assertTrue(consumerFunction.getRestoredState().isEmpty());

	// although the restore state is empty, the fetcher should still have been registered the initial discovered shard;
	// furthermore, the discovered shard should be considered a newly created shard while the job wasn't running,
	// and therefore should be consumed from the earliest sequence number
	KinesisStreamShardState restoredShardState = fetcher.getSubscribedShardsState().get(0);
	assertEquals(TEST_STREAM_NAME, restoredShardState.getStreamShardHandle().getStreamName());
	assertEquals(TEST_SHARD_ID, restoredShardState.getStreamShardHandle().getShard().getShardId());
	assertFalse(restoredShardState.getStreamShardHandle().isClosed());
	assertEquals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get(), restoredShardState.getLastProcessedSequenceNum());

	consumerOperator.close();
	consumerOperator.cancel();
}
 
示例28
@Test
public void testRestoreWithReshardedStream() throws Exception {
	final List<StreamShardHandle> initialDiscoveryShards = new ArrayList<>(TEST_STATE.size());
	for (StreamShardMetadata shardMetadata : TEST_STATE.keySet()) {
		// setup the closed shard
		Shard closedShard = new Shard();
		closedShard.setShardId(shardMetadata.getShardId());

		SequenceNumberRange closedSequenceNumberRange = new SequenceNumberRange();
		closedSequenceNumberRange.withStartingSequenceNumber("1");
		closedSequenceNumberRange.withEndingSequenceNumber("1087654321"); // this represents a closed shard
		closedShard.setSequenceNumberRange(closedSequenceNumberRange);

		initialDiscoveryShards.add(new StreamShardHandle(shardMetadata.getStreamName(), closedShard));

		// setup the new shards
		Shard newSplitShard1 = new Shard();
		newSplitShard1.setShardId(KinesisShardIdGenerator.generateFromShardOrder(1));

		SequenceNumberRange newSequenceNumberRange1 = new SequenceNumberRange();
		newSequenceNumberRange1.withStartingSequenceNumber("1087654322");
		newSplitShard1.setSequenceNumberRange(newSequenceNumberRange1);

		newSplitShard1.setParentShardId(TEST_SHARD_ID);

		Shard newSplitShard2 = new Shard();
		newSplitShard2.setShardId(KinesisShardIdGenerator.generateFromShardOrder(2));

		SequenceNumberRange newSequenceNumberRange2 = new SequenceNumberRange();
		newSequenceNumberRange2.withStartingSequenceNumber("2087654322");
		newSplitShard2.setSequenceNumberRange(newSequenceNumberRange2);

		newSplitShard2.setParentShardId(TEST_SHARD_ID);

		initialDiscoveryShards.add(new StreamShardHandle(shardMetadata.getStreamName(), newSplitShard1));
		initialDiscoveryShards.add(new StreamShardHandle(shardMetadata.getStreamName(), newSplitShard2));
	}

	final TestFetcher<String> fetcher = new TestFetcher<>(
		Collections.singletonList(TEST_STREAM_NAME),
		new TestSourceContext<>(),
		new TestRuntimeContext(true, 1, 0),
		TestUtils.getStandardProperties(),
		new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
		null,
		initialDiscoveryShards);

	final DummyFlinkKinesisConsumer<String> consumerFunction = new DummyFlinkKinesisConsumer<>(
		fetcher, new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()));

	StreamSource<String, DummyFlinkKinesisConsumer<String>> consumerOperator =
		new StreamSource<>(consumerFunction);

	final AbstractStreamOperatorTestHarness<String> testHarness =
		new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);

	testHarness.setup();
	testHarness.initializeState(
		OperatorSnapshotUtil.getResourceFilename(
			"kinesis-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"));
	testHarness.open();

	consumerFunction.run(new TestSourceContext<>());

	// assert that state is correctly restored
	assertNotEquals(null, consumerFunction.getRestoredState());
	assertEquals(1, consumerFunction.getRestoredState().size());
	assertEquals(TEST_STATE, removeEquivalenceWrappers(consumerFunction.getRestoredState()));

	// assert that the fetcher is registered with all shards, including new shards
	assertEquals(3, fetcher.getSubscribedShardsState().size());

	KinesisStreamShardState restoredClosedShardState = fetcher.getSubscribedShardsState().get(0);
	assertEquals(TEST_STREAM_NAME, restoredClosedShardState.getStreamShardHandle().getStreamName());
	assertEquals(TEST_SHARD_ID, restoredClosedShardState.getStreamShardHandle().getShard().getShardId());
	assertTrue(restoredClosedShardState.getStreamShardHandle().isClosed());
	assertEquals(TEST_SEQUENCE_NUMBER, restoredClosedShardState.getLastProcessedSequenceNum());

	KinesisStreamShardState restoredNewSplitShard1 = fetcher.getSubscribedShardsState().get(1);
	assertEquals(TEST_STREAM_NAME, restoredNewSplitShard1.getStreamShardHandle().getStreamName());
	assertEquals(KinesisShardIdGenerator.generateFromShardOrder(1), restoredNewSplitShard1.getStreamShardHandle().getShard().getShardId());
	assertFalse(restoredNewSplitShard1.getStreamShardHandle().isClosed());
	// new shards should be consumed from the beginning
	assertEquals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get(), restoredNewSplitShard1.getLastProcessedSequenceNum());

	KinesisStreamShardState restoredNewSplitShard2 = fetcher.getSubscribedShardsState().get(2);
	assertEquals(TEST_STREAM_NAME, restoredNewSplitShard2.getStreamShardHandle().getStreamName());
	assertEquals(KinesisShardIdGenerator.generateFromShardOrder(2), restoredNewSplitShard2.getStreamShardHandle().getShard().getShardId());
	assertFalse(restoredNewSplitShard2.getStreamShardHandle().isClosed());
	// new shards should be consumed from the beginning
	assertEquals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get(), restoredNewSplitShard2.getLastProcessedSequenceNum());

	consumerOperator.close();
	consumerOperator.cancel();
}
 
示例29
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldBeCorrectlySeededWithNewDiscoveredKinesisStreamShard() throws Exception {

	// ----------------------------------------------------------------------
	// setup initial state
	// ----------------------------------------------------------------------

	HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");

	// ----------------------------------------------------------------------
	// mock operator state backend and initial state for initializeState()
	// ----------------------------------------------------------------------

	TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
	for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
		listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
	}

	OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
	when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);

	StateInitializationContext initializationContext = mock(StateInitializationContext.class);
	when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
	when(initializationContext.isRestored()).thenReturn(true);

	// ----------------------------------------------------------------------
	// mock fetcher
	// ----------------------------------------------------------------------

	KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
	List<StreamShardHandle> shards = new ArrayList<>();
	shards.addAll(fakeRestoredState.keySet());
	shards.add(new StreamShardHandle("fakeStream2",
		new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))));
	when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);

	// assume the given config is correct
	PowerMockito.mockStatic(KinesisConfigUtil.class);
	PowerMockito.doNothing().when(KinesisConfigUtil.class);

	// ----------------------------------------------------------------------
	// start to test fetcher's initial state seeding
	// ----------------------------------------------------------------------

	TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
		"fakeStream", new Properties(), 10, 2);
	consumer.initializeState(initializationContext);
	consumer.open(new Configuration());
	consumer.run(Mockito.mock(SourceFunction.SourceContext.class));

	fakeRestoredState.put(new StreamShardHandle("fakeStream2",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
		SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
	for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
		Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
			new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
				restoredShard.getKey(), restoredShard.getValue()));
	}
}
 
示例30
@SuppressWarnings("unchecked")
@Override
public void run() {
	try {
		String nextShardItr = getShardIterator(lastSequenceNum);

		long processingStartTimeNanos = System.nanoTime();

		while (isRunning()) {
			if (nextShardItr == null) {
				fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());

				// we can close this consumer thread once we've reached the end of the subscribed shard
				break;
			} else {
				shardMetricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecordsPerFetch);
				GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);

				List<Record> aggregatedRecords = getRecordsResult.getRecords();
				int numberOfAggregatedRecords = aggregatedRecords.size();
				shardMetricsReporter.setNumberOfAggregatedRecords(numberOfAggregatedRecords);

				// each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding
				List<UserRecord> fetchedRecords = deaggregateRecords(
					aggregatedRecords,
					subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
					subscribedShard.getShard().getHashKeyRange().getEndingHashKey());

				long recordBatchSizeBytes = 0L;
				for (UserRecord record : fetchedRecords) {
					recordBatchSizeBytes += record.getData().remaining();
					deserializeRecordForCollectionAndUpdateState(record);
				}

				int numberOfDeaggregatedRecords = fetchedRecords.size();
				shardMetricsReporter.setNumberOfDeaggregatedRecords(numberOfDeaggregatedRecords);

				nextShardItr = getRecordsResult.getNextShardIterator();

				long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, System.nanoTime());
				long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos;
				maxNumberOfRecordsPerFetch = adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes, maxNumberOfRecordsPerFetch);
				shardMetricsReporter.setRunLoopTimeNanos(runLoopTimeNanos);
				processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop
			}
		}
	} catch (Throwable t) {
		fetcherRef.stopWithError(t);
	}
}