Java源码示例:org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl
示例1
/**
* Tests that no checkpoints happen when the fetcher is not running.
*/
@Test
public void ignoreCheckpointWhenNotRunning() throws Exception {
@SuppressWarnings("unchecked")
final MockFetcher<String> fetcher = new MockFetcher<>();
final FlinkKafkaConsumerBase<String> consumer = new DummyFlinkKafkaConsumer<>(
fetcher,
mock(AbstractPartitionDiscoverer.class),
false);
final TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = new TestingListState<>();
setupConsumer(consumer, false, listState, true, 0, 1);
// snapshot before the fetcher starts running
consumer.snapshotState(new StateSnapshotContextSynchronousImpl(1, 1));
// no state should have been checkpointed
assertFalse(listState.get().iterator().hasNext());
// acknowledgement of the checkpoint should also not result in any offset commits
consumer.notifyCheckpointComplete(1L);
assertNull(fetcher.getAndClearLastCommittedOffsets());
assertEquals(0, fetcher.getCommitCount());
}
示例2
/**
* Tests that no checkpoints happen when the fetcher is not running.
*/
@Test
public void ignoreCheckpointWhenNotRunning() throws Exception {
@SuppressWarnings("unchecked") final MockFetcher<String> fetcher = new MockFetcher<>();
final FlinkPulsarSource<String> source = new DummyFlinkPulsarSource<>(
fetcher,
mock(PulsarMetadataReader.class),
dummyProperties);
final TestingListState<Tuple2<String, MessageId>> listState = new TestingListState<>();
setupSource(source, false, listState, true, 0, 1);
// snapshot before the fetcher starts running
source.snapshotState(new StateSnapshotContextSynchronousImpl(1, 1));
// no state should have been checkpointed
assertFalse(listState.get().iterator().hasNext());
// acknowledgement of the checkpoint should also not result in any offset commits
source.notifyCheckpointComplete(1L);
assertNull(fetcher.getAndClearLastCommittedOffsets());
assertEquals(0, fetcher.getCommitCount());
}
示例3
/**
* Tests that no checkpoints happen when the fetcher is not running.
*/
@Test
public void ignoreCheckpointWhenNotRunning() throws Exception {
@SuppressWarnings("unchecked")
final MockFetcher<String> fetcher = new MockFetcher<>();
final FlinkKafkaConsumerBase<String> consumer = new DummyFlinkKafkaConsumer<>(
fetcher,
mock(AbstractPartitionDiscoverer.class),
false);
final TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = new TestingListState<>();
setupConsumer(consumer, false, listState, true, 0, 1);
// snapshot before the fetcher starts running
consumer.snapshotState(new StateSnapshotContextSynchronousImpl(1, 1));
// no state should have been checkpointed
assertFalse(listState.get().iterator().hasNext());
// acknowledgement of the checkpoint should also not result in any offset commits
consumer.notifyCheckpointComplete(1L);
assertNull(fetcher.getAndClearLastCommittedOffsets());
assertEquals(0, fetcher.getCommitCount());
}
示例4
/**
* Checks that the state snapshot context is closed after a successful snapshot operation.
*/
@Test
public void testSnapshotMethod() throws Exception {
final long checkpointId = 42L;
final long timestamp = 1L;
final CloseableRegistry closeableRegistry = new CloseableRegistry();
StateSnapshotContextSynchronousImpl context = spy(new StateSnapshotContextSynchronousImpl(0L, 0L));
whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
when(containingTask.getCancelables()).thenReturn(closeableRegistry);
AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenCallRealMethod();
doReturn(containingTask).when(operator).getContainingTask();
operator.snapshotState(
checkpointId,
timestamp,
CheckpointOptions.forCheckpointWithDefaultLocation(),
new MemCheckpointStreamFactory(Integer.MAX_VALUE));
}
示例5
/**
* Tests that no checkpoints happen when the fetcher is not running.
*/
@Test
public void ignoreCheckpointWhenNotRunning() throws Exception {
@SuppressWarnings("unchecked")
final MockFetcher<String> fetcher = new MockFetcher<>();
final FlinkKafkaConsumerBase<String> consumer = new DummyFlinkKafkaConsumer<>(
fetcher,
mock(AbstractPartitionDiscoverer.class),
false);
final TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = new TestingListState<>();
setupConsumer(consumer, false, listState, true, 0, 1);
// snapshot before the fetcher starts running
consumer.snapshotState(new StateSnapshotContextSynchronousImpl(1, 1));
// no state should have been checkpointed
assertFalse(listState.get().iterator().hasNext());
// acknowledgement of the checkpoint should also not result in any offset commits
consumer.notifyCheckpointComplete(1L);
assertNull(fetcher.getAndClearLastCommittedOffsets());
assertEquals(0, fetcher.getCommitCount());
}
示例6
/**
* Checks that the state snapshot context is closed after a successful snapshot operation.
*/
@Test
public void testSnapshotMethod() throws Exception {
final long checkpointId = 42L;
final long timestamp = 1L;
final CloseableRegistry closeableRegistry = new CloseableRegistry();
StateSnapshotContextSynchronousImpl context = spy(new StateSnapshotContextSynchronousImpl(0L, 0L));
whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
when(containingTask.getCancelables()).thenReturn(closeableRegistry);
AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenCallRealMethod();
doReturn(containingTask).when(operator).getContainingTask();
operator.snapshotState(
checkpointId,
timestamp,
CheckpointOptions.forCheckpointWithDefaultLocation(),
new MemCheckpointStreamFactory(Integer.MAX_VALUE));
verify(context).close();
}
示例7
/**
* Tests that the created StateSnapshotContextSynchronousImpl is closed in case of a failing
* Operator#snapshotState(StateSnapshotContextSynchronousImpl) call.
*/
@Test
public void testFailingSnapshotMethod() throws Exception {
final long checkpointId = 42L;
final long timestamp = 1L;
final Exception failingException = new Exception("Test exception");
final CloseableRegistry closeableRegistry = new CloseableRegistry();
StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class);
whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
when(containingTask.getCancelables()).thenReturn(closeableRegistry);
AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenCallRealMethod();
doReturn(containingTask).when(operator).getContainingTask();
// lets fail when calling the actual snapshotState method
doThrow(failingException).when(operator).snapshotState(eq(context));
try {
operator.snapshotState(
checkpointId,
timestamp,
CheckpointOptions.forCheckpointWithDefaultLocation(),
new MemCheckpointStreamFactory(Integer.MAX_VALUE));
fail("Exception expected.");
} catch (Exception e) {
assertEquals(failingException, e.getCause());
}
verify(context).close();
}
示例8
@Before
public void setUp() throws Exception {
CloseableRegistry closableRegistry = new CloseableRegistry();
CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(1024);
KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);
this.snapshotContext = new StateSnapshotContextSynchronousImpl(42, 4711, streamFactory, keyGroupRange, closableRegistry);
}
示例9
/**
* Tests that closing the StateSnapshotContextSynchronousImpl will also close the associated
* output streams.
*/
@Test
public void testStreamClosingWhenClosing() throws Exception {
long checkpointId = 42L;
long checkpointTimestamp = 1L;
CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)).thenReturn(outputStream1, outputStream2);
InsightCloseableRegistry closableRegistry = new InsightCloseableRegistry();
KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);
StateSnapshotContextSynchronousImpl context = new StateSnapshotContextSynchronousImpl(
checkpointId,
checkpointTimestamp,
streamFactory,
keyGroupRange,
closableRegistry);
// creating the output streams
context.getRawKeyedOperatorStateOutput();
context.getRawOperatorStateOutput();
verify(streamFactory, times(2)).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
assertEquals(2, closableRegistry.size());
assertTrue(closableRegistry.contains(outputStream1));
assertTrue(closableRegistry.contains(outputStream2));
context.close();
verify(outputStream1).close();
verify(outputStream2).close();
assertEquals(0, closableRegistry.size());
}
示例10
/**
* Tests that the created StateSnapshotContextSynchronousImpl is closed in case of a failing
* Operator#snapshotState(StateSnapshotContextSynchronousImpl) call.
*/
@Test
public void testFailingSnapshotMethod() throws Exception {
final long checkpointId = 42L;
final long timestamp = 1L;
final Exception failingException = new Exception("Test exception");
final CloseableRegistry closeableRegistry = new CloseableRegistry();
StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class);
whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
when(containingTask.getCancelables()).thenReturn(closeableRegistry);
AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenCallRealMethod();
doReturn(containingTask).when(operator).getContainingTask();
// lets fail when calling the actual snapshotState method
doThrow(failingException).when(operator).snapshotState(eq(context));
try {
operator.snapshotState(
checkpointId,
timestamp,
CheckpointOptions.forCheckpointWithDefaultLocation(),
new MemCheckpointStreamFactory(Integer.MAX_VALUE));
fail("Exception expected.");
} catch (Exception e) {
assertEquals(failingException, e.getCause());
}
}
示例11
@Before
public void setUp() throws Exception {
CloseableRegistry closableRegistry = new CloseableRegistry();
CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(1024);
KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);
this.snapshotContext = new StateSnapshotContextSynchronousImpl(42, 4711, streamFactory, keyGroupRange, closableRegistry);
}
示例12
@Test
public void testStreamClosingExceptionally() throws Exception {
long checkpointId = 42L;
long checkpointTimestamp = 1L;
CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)).thenReturn(outputStream1, outputStream2);
InsightCloseableRegistry closableRegistry = new InsightCloseableRegistry();
KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);
StateSnapshotContextSynchronousImpl context = new StateSnapshotContextSynchronousImpl(
checkpointId,
checkpointTimestamp,
streamFactory,
keyGroupRange,
closableRegistry);
// creating the output streams
context.getRawKeyedOperatorStateOutput();
context.getRawOperatorStateOutput();
verify(streamFactory, times(2)).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
assertEquals(2, closableRegistry.size());
assertTrue(closableRegistry.contains(outputStream1));
assertTrue(closableRegistry.contains(outputStream2));
context.closeExceptionally();
verify(outputStream1).close();
verify(outputStream2).close();
assertEquals(0, closableRegistry.size());
}
示例13
public OperatorSnapshotFutures snapshotState(
CheckpointedStreamOperator streamOperator,
Optional<InternalTimeServiceManager<?>> timeServiceManager,
String operatorName,
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory) throws CheckpointException {
KeyGroupRange keyGroupRange = null != keyedStateBackend ?
keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
checkpointId,
timestamp,
factory,
keyGroupRange,
closeableRegistry);
snapshotState(
streamOperator,
timeServiceManager,
operatorName,
checkpointId,
timestamp,
checkpointOptions,
factory,
snapshotInProgress,
snapshotContext);
return snapshotInProgress;
}
示例14
@Test
public void testSnapshotState() throws Exception {
StateInitializationContext stateContext = getStateContext();
operator.initializeState(stateContext);
operator.open();
MockSourceSplit newSplit = new MockSourceSplit((2));
operator.handleOperatorEvent(new AddSplitEvent<>(
Collections.singletonList(newSplit), new MockSourceSplitSerializer()));
operator.snapshotState(new StateSnapshotContextSynchronousImpl(100L, 100L));
// Verify the splits in state.
List<MockSourceSplit> splitsInState = CollectionUtil.iterableToList(operator.getReaderState().get());
assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), splitsInState);
}
示例15
@Before
public void setUp() throws Exception {
CloseableRegistry closableRegistry = new CloseableRegistry();
CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(1024);
KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);
this.snapshotContext = new StateSnapshotContextSynchronousImpl(42, 4711, streamFactory, keyGroupRange, closableRegistry);
}
示例16
@Test
public void testStreamClosingExceptionally() throws Exception {
long checkpointId = 42L;
long checkpointTimestamp = 1L;
CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)).thenReturn(outputStream1, outputStream2);
InsightCloseableRegistry closableRegistry = new InsightCloseableRegistry();
KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);
StateSnapshotContextSynchronousImpl context = new StateSnapshotContextSynchronousImpl(
checkpointId,
checkpointTimestamp,
streamFactory,
keyGroupRange,
closableRegistry);
// creating the output streams
context.getRawKeyedOperatorStateOutput();
context.getRawOperatorStateOutput();
verify(streamFactory, times(2)).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
assertEquals(2, closableRegistry.size());
assertTrue(closableRegistry.contains(outputStream1));
assertTrue(closableRegistry.contains(outputStream2));
context.closeExceptionally();
verify(outputStream1).close();
verify(outputStream2).close();
assertEquals(0, closableRegistry.size());
}
示例17
@Test
public void testUseRestoredStateForSnapshotIfFetcherNotInitialized() throws Exception {
Properties config = TestUtils.getStandardProperties();
List<Tuple2<StreamShardMetadata, SequenceNumber>> globalUnionState = new ArrayList<>(4);
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(3)))),
new SequenceNumber("1")));
TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Tuple2<StreamShardMetadata, SequenceNumber> state : globalUnionState) {
listState.add(state);
}
FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);
RuntimeContext context = mock(RuntimeContext.class);
when(context.getIndexOfThisSubtask()).thenReturn(0);
when(context.getNumberOfParallelSubtasks()).thenReturn(2);
consumer.setRuntimeContext(context);
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
consumer.initializeState(initializationContext);
// only opened, not run
consumer.open(new Configuration());
// arbitrary checkpoint id and timestamp
consumer.snapshotState(new StateSnapshotContextSynchronousImpl(123, 123));
assertTrue(listState.isClearCalled());
// the checkpointed list state should contain only the shards that it should subscribe to
assertEquals(globalUnionState.size() / 2, listState.getList().size());
assertTrue(listState.getList().contains(globalUnionState.get(0)));
assertTrue(listState.getList().contains(globalUnionState.get(2)));
}
示例18
@Override
public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory) throws Exception {
KeyGroupRange keyGroupRange = null != keyedStateBackend ?
keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
checkpointId,
timestamp,
factory,
keyGroupRange,
getContainingTask().getCancelables())) {
snapshotState(snapshotContext);
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
if (null != operatorStateBackend) {
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
if (null != keyedStateBackend) {
snapshotInProgress.setKeyedStateManagedFuture(
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
} catch (Exception snapshotException) {
try {
snapshotInProgress.cancel();
} catch (Exception e) {
snapshotException.addSuppressed(e);
}
String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " +
getOperatorName() + ".";
if (!getContainingTask().isCanceled()) {
LOG.info(snapshotFailMessage, snapshotException);
}
throw new Exception(snapshotFailMessage, snapshotException);
}
return snapshotInProgress;
}
示例19
/**
* Tests that a failing snapshot method call to the keyed state backend will trigger the closing
* of the StateSnapshotContextSynchronousImpl and the cancellation of the
* OperatorSnapshotResult. The latter is supposed to also cancel all assigned futures.
*/
@Test
public void testFailingBackendSnapshotMethod() throws Exception {
final long checkpointId = 42L;
final long timestamp = 1L;
final Exception failingException = new Exception("Test exception");
final CloseableRegistry closeableRegistry = new CloseableRegistry();
RunnableFuture<SnapshotResult<KeyedStateHandle>> futureKeyedStateHandle = mock(RunnableFuture.class);
RunnableFuture<SnapshotResult<OperatorStateHandle>> futureOperatorStateHandle = mock(RunnableFuture.class);
StateSnapshotContextSynchronousImpl context = spy(new StateSnapshotContextSynchronousImpl(checkpointId, timestamp));
when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyedStateHandle);
when(context.getOperatorStateStreamFuture()).thenReturn(futureOperatorStateHandle);
OperatorSnapshotFutures operatorSnapshotResult = spy(new OperatorSnapshotFutures());
whenNew(StateSnapshotContextSynchronousImpl.class)
.withArguments(
anyLong(),
anyLong(),
any(CheckpointStreamFactory.class),
nullable(KeyGroupRange.class),
any(CloseableRegistry.class))
.thenReturn(context);
whenNew(OperatorSnapshotFutures.class).withAnyArguments().thenReturn(operatorSnapshotResult);
StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
when(containingTask.getCancelables()).thenReturn(closeableRegistry);
AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenCallRealMethod();
doCallRealMethod().when(operator).close();
doCallRealMethod().when(operator).dispose();
doReturn(containingTask).when(operator).getContainingTask();
RunnableFuture<SnapshotResult<OperatorStateHandle>> futureManagedOperatorStateHandle = mock(RunnableFuture.class);
OperatorStateBackend operatorStateBackend = mock(OperatorStateBackend.class);
when(operatorStateBackend.snapshot(
eq(checkpointId),
eq(timestamp),
any(CheckpointStreamFactory.class),
any(CheckpointOptions.class))).thenReturn(futureManagedOperatorStateHandle);
AbstractKeyedStateBackend<?> keyedStateBackend = mock(AbstractKeyedStateBackend.class);
when(keyedStateBackend.snapshot(
eq(checkpointId),
eq(timestamp),
any(CheckpointStreamFactory.class),
eq(CheckpointOptions.forCheckpointWithDefaultLocation()))).thenThrow(failingException);
closeableRegistry.registerCloseable(operatorStateBackend);
closeableRegistry.registerCloseable(keyedStateBackend);
Whitebox.setInternalState(operator, "operatorStateBackend", operatorStateBackend);
Whitebox.setInternalState(operator, "keyedStateBackend", keyedStateBackend);
try {
operator.snapshotState(
checkpointId,
timestamp,
CheckpointOptions.forCheckpointWithDefaultLocation(),
new MemCheckpointStreamFactory(Integer.MAX_VALUE));
fail("Exception expected.");
} catch (Exception e) {
assertEquals(failingException, e.getCause());
}
// verify that the context has been closed, the operator snapshot result has been cancelled
// and that all futures have been cancelled.
verify(context).close();
verify(operatorSnapshotResult).cancel();
verify(futureKeyedStateHandle).cancel(anyBoolean());
verify(futureOperatorStateHandle).cancel(anyBoolean());
verify(futureKeyedStateHandle).cancel(anyBoolean());
operator.close();
operator.dispose();
verify(operatorStateBackend).close();
verify(keyedStateBackend).close();
verify(operatorStateBackend).dispose();
verify(keyedStateBackend).dispose();
}
示例20
@Test
public void testUseRestoredStateForSnapshotIfFetcherNotInitialized() throws Exception {
Properties config = TestUtils.getStandardProperties();
List<Tuple2<StreamShardMetadata, SequenceNumber>> globalUnionState = new ArrayList<>(4);
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(3)))),
new SequenceNumber("1")));
TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Tuple2<StreamShardMetadata, SequenceNumber> state : globalUnionState) {
listState.add(state);
}
FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);
RuntimeContext context = mock(RuntimeContext.class);
when(context.getIndexOfThisSubtask()).thenReturn(0);
when(context.getNumberOfParallelSubtasks()).thenReturn(2);
consumer.setRuntimeContext(context);
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
consumer.initializeState(initializationContext);
// only opened, not run
consumer.open(new Configuration());
// arbitrary checkpoint id and timestamp
consumer.snapshotState(new StateSnapshotContextSynchronousImpl(123, 123));
assertTrue(listState.isClearCalled());
// the checkpointed list state should contain only the shards that it should subscribe to
assertEquals(globalUnionState.size() / 2, listState.getList().size());
assertTrue(listState.getList().contains(globalUnionState.get(0)));
assertTrue(listState.getList().contains(globalUnionState.get(2)));
}
示例21
/**
* Tests that a failing snapshot method call to the keyed state backend will trigger the closing
* of the StateSnapshotContextSynchronousImpl and the cancellation of the
* OperatorSnapshotResult. The latter is supposed to also cancel all assigned futures.
*/
@Test
public void testFailingBackendSnapshotMethod() throws Exception {
final long checkpointId = 42L;
final long timestamp = 1L;
final Exception failingException = new Exception("Test exception");
final CloseableRegistry closeableRegistry = new CloseableRegistry();
RunnableFuture<SnapshotResult<KeyedStateHandle>> futureKeyedStateHandle = mock(RunnableFuture.class);
RunnableFuture<SnapshotResult<OperatorStateHandle>> futureOperatorStateHandle = mock(RunnableFuture.class);
StateSnapshotContextSynchronousImpl context = spy(new StateSnapshotContextSynchronousImpl(checkpointId, timestamp));
when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyedStateHandle);
when(context.getOperatorStateStreamFuture()).thenReturn(futureOperatorStateHandle);
OperatorSnapshotFutures operatorSnapshotResult = spy(new OperatorSnapshotFutures());
whenNew(StateSnapshotContextSynchronousImpl.class)
.withArguments(
anyLong(),
anyLong(),
any(CheckpointStreamFactory.class),
nullable(KeyGroupRange.class),
any(CloseableRegistry.class))
.thenReturn(context);
whenNew(OperatorSnapshotFutures.class).withAnyArguments().thenReturn(operatorSnapshotResult);
StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
when(containingTask.getCancelables()).thenReturn(closeableRegistry);
AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenCallRealMethod();
doCallRealMethod().when(operator).close();
doCallRealMethod().when(operator).dispose();
doReturn(containingTask).when(operator).getContainingTask();
RunnableFuture<SnapshotResult<OperatorStateHandle>> futureManagedOperatorStateHandle = mock(RunnableFuture.class);
OperatorStateBackend operatorStateBackend = mock(OperatorStateBackend.class);
when(operatorStateBackend.snapshot(
eq(checkpointId),
eq(timestamp),
any(CheckpointStreamFactory.class),
any(CheckpointOptions.class))).thenReturn(futureManagedOperatorStateHandle);
AbstractKeyedStateBackend<?> keyedStateBackend = mock(AbstractKeyedStateBackend.class);
when(keyedStateBackend.snapshot(
eq(checkpointId),
eq(timestamp),
any(CheckpointStreamFactory.class),
eq(CheckpointOptions.forCheckpointWithDefaultLocation()))).thenThrow(failingException);
closeableRegistry.registerCloseable(operatorStateBackend);
closeableRegistry.registerCloseable(keyedStateBackend);
Whitebox.setInternalState(operator, "operatorStateBackend", operatorStateBackend);
Whitebox.setInternalState(operator, "keyedStateBackend", keyedStateBackend);
try {
operator.snapshotState(
checkpointId,
timestamp,
CheckpointOptions.forCheckpointWithDefaultLocation(),
new MemCheckpointStreamFactory(Integer.MAX_VALUE));
fail("Exception expected.");
} catch (Exception e) {
assertEquals(failingException, e.getCause());
}
// verify that the context has been closed, the operator snapshot result has been cancelled
// and that all futures have been cancelled.
verify(operatorSnapshotResult).cancel();
verify(futureKeyedStateHandle).cancel(anyBoolean());
verify(futureOperatorStateHandle).cancel(anyBoolean());
verify(futureKeyedStateHandle).cancel(anyBoolean());
operator.close();
operator.dispose();
verify(operatorStateBackend).close();
verify(keyedStateBackend).close();
verify(operatorStateBackend).dispose();
verify(keyedStateBackend).dispose();
}
示例22
/**
* Tests that closing the StateSnapshotContextSynchronousImpl will also close the associated
* output streams.
*/
@Test
public void testStreamClosingWhenClosing() throws Exception {
long checkpointId = 42L;
long checkpointTimestamp = 1L;
CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)).thenReturn(outputStream1, outputStream2);
InsightCloseableRegistry closableRegistry = new InsightCloseableRegistry();
KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);
StateSnapshotContextSynchronousImpl context = new StateSnapshotContextSynchronousImpl(
checkpointId,
checkpointTimestamp,
streamFactory,
keyGroupRange,
closableRegistry);
// creating the output streams
context.getRawKeyedOperatorStateOutput();
context.getRawOperatorStateOutput();
verify(streamFactory, times(2)).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
assertEquals(2, closableRegistry.size());
assertTrue(closableRegistry.contains(outputStream1));
assertTrue(closableRegistry.contains(outputStream2));
context.getKeyedStateStreamFuture().run();
context.getOperatorStateStreamFuture().run();
verify(outputStream1).closeAndGetHandle();
verify(outputStream2).closeAndGetHandle();
assertEquals(0, closableRegistry.size());
}
示例23
@Test
public void testUseRestoredStateForSnapshotIfFetcherNotInitialized() throws Exception {
Properties config = TestUtils.getStandardProperties();
List<Tuple2<StreamShardMetadata, SequenceNumber>> globalUnionState = new ArrayList<>(4);
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(3)))),
new SequenceNumber("1")));
TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Tuple2<StreamShardMetadata, SequenceNumber> state : globalUnionState) {
listState.add(state);
}
FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);
RuntimeContext context = mock(RuntimeContext.class);
when(context.getIndexOfThisSubtask()).thenReturn(0);
when(context.getNumberOfParallelSubtasks()).thenReturn(2);
consumer.setRuntimeContext(context);
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
consumer.initializeState(initializationContext);
// only opened, not run
consumer.open(new Configuration());
// arbitrary checkpoint id and timestamp
consumer.snapshotState(new StateSnapshotContextSynchronousImpl(123, 123));
assertTrue(listState.isClearCalled());
// the checkpointed list state should contain only the shards that it should subscribe to
assertEquals(globalUnionState.size() / 2, listState.getList().size());
assertTrue(listState.getList().contains(globalUnionState.get(0)));
assertTrue(listState.getList().contains(globalUnionState.get(2)));
}
示例24
/**
* Tests that closing the StateSnapshotContextSynchronousImpl will also close the associated
* output streams.
*/
@Test
public void testStreamClosingWhenClosing() throws Exception {
long checkpointId = 42L;
long checkpointTimestamp = 1L;
CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)).thenReturn(outputStream1, outputStream2);
InsightCloseableRegistry closableRegistry = new InsightCloseableRegistry();
KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);
StateSnapshotContextSynchronousImpl context = new StateSnapshotContextSynchronousImpl(
checkpointId,
checkpointTimestamp,
streamFactory,
keyGroupRange,
closableRegistry);
// creating the output streams
context.getRawKeyedOperatorStateOutput();
context.getRawOperatorStateOutput();
verify(streamFactory, times(2)).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
assertEquals(2, closableRegistry.size());
assertTrue(closableRegistry.contains(outputStream1));
assertTrue(closableRegistry.contains(outputStream2));
context.getKeyedStateStreamFuture().run();
context.getOperatorStateStreamFuture().run();
verify(outputStream1).closeAndGetHandle();
verify(outputStream2).closeAndGetHandle();
assertEquals(0, closableRegistry.size());
}