Java源码示例:org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl

示例1
/**
 * Tests that no checkpoints happen when the fetcher is not running.
 */
@Test
public void ignoreCheckpointWhenNotRunning() throws Exception {
	@SuppressWarnings("unchecked")
	final MockFetcher<String> fetcher = new MockFetcher<>();
	final FlinkKafkaConsumerBase<String> consumer = new DummyFlinkKafkaConsumer<>(
			fetcher,
			mock(AbstractPartitionDiscoverer.class),
			false);

	final TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = new TestingListState<>();
	setupConsumer(consumer, false, listState, true, 0, 1);

	// snapshot before the fetcher starts running
	consumer.snapshotState(new StateSnapshotContextSynchronousImpl(1, 1));

	// no state should have been checkpointed
	assertFalse(listState.get().iterator().hasNext());

	// acknowledgement of the checkpoint should also not result in any offset commits
	consumer.notifyCheckpointComplete(1L);
	assertNull(fetcher.getAndClearLastCommittedOffsets());
	assertEquals(0, fetcher.getCommitCount());
}
 
示例2
/**
 * Tests that no checkpoints happen when the fetcher is not running.
 */
@Test
public void ignoreCheckpointWhenNotRunning() throws Exception {
    @SuppressWarnings("unchecked") final MockFetcher<String> fetcher = new MockFetcher<>();
    final FlinkPulsarSource<String> source = new DummyFlinkPulsarSource<>(
            fetcher,
            mock(PulsarMetadataReader.class),
            dummyProperties);

    final TestingListState<Tuple2<String, MessageId>> listState = new TestingListState<>();
    setupSource(source, false, listState, true, 0, 1);

    // snapshot before the fetcher starts running
    source.snapshotState(new StateSnapshotContextSynchronousImpl(1, 1));

    // no state should have been checkpointed
    assertFalse(listState.get().iterator().hasNext());

    // acknowledgement of the checkpoint should also not result in any offset commits
    source.notifyCheckpointComplete(1L);
    assertNull(fetcher.getAndClearLastCommittedOffsets());
    assertEquals(0, fetcher.getCommitCount());
}
 
示例3
/**
 * Tests that no checkpoints happen when the fetcher is not running.
 */
@Test
public void ignoreCheckpointWhenNotRunning() throws Exception {
	@SuppressWarnings("unchecked")
	final MockFetcher<String> fetcher = new MockFetcher<>();
	final FlinkKafkaConsumerBase<String> consumer = new DummyFlinkKafkaConsumer<>(
			fetcher,
			mock(AbstractPartitionDiscoverer.class),
			false);

	final TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = new TestingListState<>();
	setupConsumer(consumer, false, listState, true, 0, 1);

	// snapshot before the fetcher starts running
	consumer.snapshotState(new StateSnapshotContextSynchronousImpl(1, 1));

	// no state should have been checkpointed
	assertFalse(listState.get().iterator().hasNext());

	// acknowledgement of the checkpoint should also not result in any offset commits
	consumer.notifyCheckpointComplete(1L);
	assertNull(fetcher.getAndClearLastCommittedOffsets());
	assertEquals(0, fetcher.getCommitCount());
}
 
示例4
/**
 * Checks that the state snapshot context is closed after a successful snapshot operation.
 */
@Test
public void testSnapshotMethod() throws Exception {
	final long checkpointId = 42L;
	final long timestamp = 1L;

	final CloseableRegistry closeableRegistry = new CloseableRegistry();

	StateSnapshotContextSynchronousImpl context = spy(new StateSnapshotContextSynchronousImpl(0L, 0L));

	whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);

	StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
	when(containingTask.getCancelables()).thenReturn(closeableRegistry);

	AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
	when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenCallRealMethod();
	doReturn(containingTask).when(operator).getContainingTask();

	operator.snapshotState(
			checkpointId,
			timestamp,
			CheckpointOptions.forCheckpointWithDefaultLocation(),
			new MemCheckpointStreamFactory(Integer.MAX_VALUE));

}
 
示例5
/**
 * Tests that no checkpoints happen when the fetcher is not running.
 */
@Test
public void ignoreCheckpointWhenNotRunning() throws Exception {
	@SuppressWarnings("unchecked")
	final MockFetcher<String> fetcher = new MockFetcher<>();
	final FlinkKafkaConsumerBase<String> consumer = new DummyFlinkKafkaConsumer<>(
			fetcher,
			mock(AbstractPartitionDiscoverer.class),
			false);

	final TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = new TestingListState<>();
	setupConsumer(consumer, false, listState, true, 0, 1);

	// snapshot before the fetcher starts running
	consumer.snapshotState(new StateSnapshotContextSynchronousImpl(1, 1));

	// no state should have been checkpointed
	assertFalse(listState.get().iterator().hasNext());

	// acknowledgement of the checkpoint should also not result in any offset commits
	consumer.notifyCheckpointComplete(1L);
	assertNull(fetcher.getAndClearLastCommittedOffsets());
	assertEquals(0, fetcher.getCommitCount());
}
 
示例6
/**
 * Checks that the state snapshot context is closed after a successful snapshot operation.
 */
@Test
public void testSnapshotMethod() throws Exception {
	final long checkpointId = 42L;
	final long timestamp = 1L;

	final CloseableRegistry closeableRegistry = new CloseableRegistry();

	StateSnapshotContextSynchronousImpl context = spy(new StateSnapshotContextSynchronousImpl(0L, 0L));

	whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);

	StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
	when(containingTask.getCancelables()).thenReturn(closeableRegistry);

	AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
	when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenCallRealMethod();
	doReturn(containingTask).when(operator).getContainingTask();

	operator.snapshotState(
			checkpointId,
			timestamp,
			CheckpointOptions.forCheckpointWithDefaultLocation(),
			new MemCheckpointStreamFactory(Integer.MAX_VALUE));

	verify(context).close();
}
 
示例7
/**
 * Tests that the created StateSnapshotContextSynchronousImpl is closed in case of a failing
 * Operator#snapshotState(StateSnapshotContextSynchronousImpl) call.
 */
@Test
public void testFailingSnapshotMethod() throws Exception {
	final long checkpointId = 42L;
	final long timestamp = 1L;

	final Exception failingException = new Exception("Test exception");

	final CloseableRegistry closeableRegistry = new CloseableRegistry();

	StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class);

	whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);

	StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
	when(containingTask.getCancelables()).thenReturn(closeableRegistry);

	AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
	when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenCallRealMethod();
	doReturn(containingTask).when(operator).getContainingTask();

	// lets fail when calling the actual snapshotState method
	doThrow(failingException).when(operator).snapshotState(eq(context));

	try {
		operator.snapshotState(
				checkpointId,
				timestamp,
				CheckpointOptions.forCheckpointWithDefaultLocation(),
				new MemCheckpointStreamFactory(Integer.MAX_VALUE));
		fail("Exception expected.");
	} catch (Exception e) {
		assertEquals(failingException, e.getCause());
	}

	verify(context).close();
}
 
示例8
@Before
public void setUp() throws Exception {
	CloseableRegistry closableRegistry = new CloseableRegistry();
	CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(1024);
	KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);
	this.snapshotContext = new StateSnapshotContextSynchronousImpl(42, 4711, streamFactory, keyGroupRange, closableRegistry);
}
 
示例9
/**
 * Tests that closing the StateSnapshotContextSynchronousImpl will also close the associated
 * output streams.
 */
@Test
public void testStreamClosingWhenClosing() throws Exception {
	long checkpointId = 42L;
	long checkpointTimestamp = 1L;

	CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
	CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);

	CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
	when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)).thenReturn(outputStream1, outputStream2);

	InsightCloseableRegistry closableRegistry = new InsightCloseableRegistry();

	KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);

	StateSnapshotContextSynchronousImpl context = new StateSnapshotContextSynchronousImpl(
		checkpointId,
		checkpointTimestamp,
		streamFactory,
		keyGroupRange,
		closableRegistry);

	// creating the output streams
	context.getRawKeyedOperatorStateOutput();
	context.getRawOperatorStateOutput();

	verify(streamFactory, times(2)).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);

	assertEquals(2, closableRegistry.size());
	assertTrue(closableRegistry.contains(outputStream1));
	assertTrue(closableRegistry.contains(outputStream2));

	context.close();

	verify(outputStream1).close();
	verify(outputStream2).close();

	assertEquals(0, closableRegistry.size());
}
 
示例10
/**
 * Tests that the created StateSnapshotContextSynchronousImpl is closed in case of a failing
 * Operator#snapshotState(StateSnapshotContextSynchronousImpl) call.
 */
@Test
public void testFailingSnapshotMethod() throws Exception {
	final long checkpointId = 42L;
	final long timestamp = 1L;

	final Exception failingException = new Exception("Test exception");

	final CloseableRegistry closeableRegistry = new CloseableRegistry();

	StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class);

	whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);

	StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
	when(containingTask.getCancelables()).thenReturn(closeableRegistry);

	AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
	when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenCallRealMethod();
	doReturn(containingTask).when(operator).getContainingTask();

	// lets fail when calling the actual snapshotState method
	doThrow(failingException).when(operator).snapshotState(eq(context));

	try {
		operator.snapshotState(
				checkpointId,
				timestamp,
				CheckpointOptions.forCheckpointWithDefaultLocation(),
				new MemCheckpointStreamFactory(Integer.MAX_VALUE));
		fail("Exception expected.");
	} catch (Exception e) {
		assertEquals(failingException, e.getCause());
	}
}
 
示例11
@Before
public void setUp() throws Exception {
	CloseableRegistry closableRegistry = new CloseableRegistry();
	CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(1024);
	KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);
	this.snapshotContext = new StateSnapshotContextSynchronousImpl(42, 4711, streamFactory, keyGroupRange, closableRegistry);
}
 
示例12
@Test
public void testStreamClosingExceptionally() throws Exception {
	long checkpointId = 42L;
	long checkpointTimestamp = 1L;

	CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
	CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);

	CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
	when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)).thenReturn(outputStream1, outputStream2);

	InsightCloseableRegistry closableRegistry = new InsightCloseableRegistry();

	KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);

	StateSnapshotContextSynchronousImpl context = new StateSnapshotContextSynchronousImpl(
		checkpointId,
		checkpointTimestamp,
		streamFactory,
		keyGroupRange,
		closableRegistry);

	// creating the output streams
	context.getRawKeyedOperatorStateOutput();
	context.getRawOperatorStateOutput();

	verify(streamFactory, times(2)).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);

	assertEquals(2, closableRegistry.size());
	assertTrue(closableRegistry.contains(outputStream1));
	assertTrue(closableRegistry.contains(outputStream2));

	context.closeExceptionally();

	verify(outputStream1).close();
	verify(outputStream2).close();

	assertEquals(0, closableRegistry.size());
}
 
示例13
public OperatorSnapshotFutures snapshotState(
		CheckpointedStreamOperator streamOperator,
		Optional<InternalTimeServiceManager<?>> timeServiceManager,
		String operatorName,
		long checkpointId,
		long timestamp,
		CheckpointOptions checkpointOptions,
		CheckpointStreamFactory factory) throws CheckpointException {
	KeyGroupRange keyGroupRange = null != keyedStateBackend ?
		keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;

	OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();

	StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
		checkpointId,
		timestamp,
		factory,
		keyGroupRange,
		closeableRegistry);

	snapshotState(
		streamOperator,
		timeServiceManager,
		operatorName,
		checkpointId,
		timestamp,
		checkpointOptions,
		factory,
		snapshotInProgress,
		snapshotContext);

	return snapshotInProgress;
}
 
示例14
@Test
public void testSnapshotState() throws Exception {
	StateInitializationContext stateContext = getStateContext();
	operator.initializeState(stateContext);
	operator.open();
	MockSourceSplit newSplit = new MockSourceSplit((2));
	operator.handleOperatorEvent(new AddSplitEvent<>(
			Collections.singletonList(newSplit), new MockSourceSplitSerializer()));
	operator.snapshotState(new StateSnapshotContextSynchronousImpl(100L, 100L));

	// Verify the splits in state.
	List<MockSourceSplit> splitsInState = CollectionUtil.iterableToList(operator.getReaderState().get());
	assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), splitsInState);
}
 
示例15
@Before
public void setUp() throws Exception {
	CloseableRegistry closableRegistry = new CloseableRegistry();
	CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(1024);
	KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);
	this.snapshotContext = new StateSnapshotContextSynchronousImpl(42, 4711, streamFactory, keyGroupRange, closableRegistry);
}
 
示例16
@Test
public void testStreamClosingExceptionally() throws Exception {
	long checkpointId = 42L;
	long checkpointTimestamp = 1L;

	CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
	CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);

	CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
	when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)).thenReturn(outputStream1, outputStream2);

	InsightCloseableRegistry closableRegistry = new InsightCloseableRegistry();

	KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);

	StateSnapshotContextSynchronousImpl context = new StateSnapshotContextSynchronousImpl(
		checkpointId,
		checkpointTimestamp,
		streamFactory,
		keyGroupRange,
		closableRegistry);

	// creating the output streams
	context.getRawKeyedOperatorStateOutput();
	context.getRawOperatorStateOutput();

	verify(streamFactory, times(2)).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);

	assertEquals(2, closableRegistry.size());
	assertTrue(closableRegistry.contains(outputStream1));
	assertTrue(closableRegistry.contains(outputStream2));

	context.closeExceptionally();

	verify(outputStream1).close();
	verify(outputStream2).close();

	assertEquals(0, closableRegistry.size());
}
 
示例17
@Test
public void testUseRestoredStateForSnapshotIfFetcherNotInitialized() throws Exception {
	Properties config = TestUtils.getStandardProperties();

	List<Tuple2<StreamShardMetadata, SequenceNumber>> globalUnionState = new ArrayList<>(4);
	globalUnionState.add(Tuple2.of(
		KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
		new SequenceNumber("1")));
	globalUnionState.add(Tuple2.of(
		KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
		new SequenceNumber("1")));
	globalUnionState.add(Tuple2.of(
		KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
		new SequenceNumber("1")));
	globalUnionState.add(Tuple2.of(
		KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(3)))),
		new SequenceNumber("1")));

	TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
	for (Tuple2<StreamShardMetadata, SequenceNumber> state : globalUnionState) {
		listState.add(state);
	}

	FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);
	RuntimeContext context = mock(RuntimeContext.class);
	when(context.getIndexOfThisSubtask()).thenReturn(0);
	when(context.getNumberOfParallelSubtasks()).thenReturn(2);
	consumer.setRuntimeContext(context);

	OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
	when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);

	StateInitializationContext initializationContext = mock(StateInitializationContext.class);
	when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
	when(initializationContext.isRestored()).thenReturn(true);

	consumer.initializeState(initializationContext);

	// only opened, not run
	consumer.open(new Configuration());

	// arbitrary checkpoint id and timestamp
	consumer.snapshotState(new StateSnapshotContextSynchronousImpl(123, 123));

	assertTrue(listState.isClearCalled());

	// the checkpointed list state should contain only the shards that it should subscribe to
	assertEquals(globalUnionState.size() / 2, listState.getList().size());
	assertTrue(listState.getList().contains(globalUnionState.get(0)));
	assertTrue(listState.getList().contains(globalUnionState.get(2)));
}
 
示例18
@Override
public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
		CheckpointStreamFactory factory) throws Exception {

	KeyGroupRange keyGroupRange = null != keyedStateBackend ?
			keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;

	OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();

	try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
			checkpointId,
			timestamp,
			factory,
			keyGroupRange,
			getContainingTask().getCancelables())) {

		snapshotState(snapshotContext);

		snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
		snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());

		if (null != operatorStateBackend) {
			snapshotInProgress.setOperatorStateManagedFuture(
				operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
		}

		if (null != keyedStateBackend) {
			snapshotInProgress.setKeyedStateManagedFuture(
				keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
		}
	} catch (Exception snapshotException) {
		try {
			snapshotInProgress.cancel();
		} catch (Exception e) {
			snapshotException.addSuppressed(e);
		}

		String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " +
			getOperatorName() + ".";

		if (!getContainingTask().isCanceled()) {
			LOG.info(snapshotFailMessage, snapshotException);
		}
		throw new Exception(snapshotFailMessage, snapshotException);
	}

	return snapshotInProgress;
}
 
示例19
/**
 * Tests that a failing snapshot method call to the keyed state backend will trigger the closing
 * of the StateSnapshotContextSynchronousImpl and the cancellation of the
 * OperatorSnapshotResult. The latter is supposed to also cancel all assigned futures.
 */
@Test
public void testFailingBackendSnapshotMethod() throws Exception {
	final long checkpointId = 42L;
	final long timestamp = 1L;

	final Exception failingException = new Exception("Test exception");

	final CloseableRegistry closeableRegistry = new CloseableRegistry();

	RunnableFuture<SnapshotResult<KeyedStateHandle>> futureKeyedStateHandle = mock(RunnableFuture.class);
	RunnableFuture<SnapshotResult<OperatorStateHandle>> futureOperatorStateHandle = mock(RunnableFuture.class);

	StateSnapshotContextSynchronousImpl context = spy(new StateSnapshotContextSynchronousImpl(checkpointId, timestamp));
	when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyedStateHandle);
	when(context.getOperatorStateStreamFuture()).thenReturn(futureOperatorStateHandle);

	OperatorSnapshotFutures operatorSnapshotResult = spy(new OperatorSnapshotFutures());

	whenNew(StateSnapshotContextSynchronousImpl.class)
		.withArguments(
			anyLong(),
			anyLong(),
			any(CheckpointStreamFactory.class),
			nullable(KeyGroupRange.class),
			any(CloseableRegistry.class))
		.thenReturn(context);
	whenNew(OperatorSnapshotFutures.class).withAnyArguments().thenReturn(operatorSnapshotResult);

	StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
	when(containingTask.getCancelables()).thenReturn(closeableRegistry);

	AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
	when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenCallRealMethod();

	doCallRealMethod().when(operator).close();
	doCallRealMethod().when(operator).dispose();

	doReturn(containingTask).when(operator).getContainingTask();

	RunnableFuture<SnapshotResult<OperatorStateHandle>> futureManagedOperatorStateHandle = mock(RunnableFuture.class);

	OperatorStateBackend operatorStateBackend = mock(OperatorStateBackend.class);
	when(operatorStateBackend.snapshot(
		eq(checkpointId),
		eq(timestamp),
		any(CheckpointStreamFactory.class),
		any(CheckpointOptions.class))).thenReturn(futureManagedOperatorStateHandle);

	AbstractKeyedStateBackend<?> keyedStateBackend = mock(AbstractKeyedStateBackend.class);
	when(keyedStateBackend.snapshot(
		eq(checkpointId),
		eq(timestamp),
		any(CheckpointStreamFactory.class),
		eq(CheckpointOptions.forCheckpointWithDefaultLocation()))).thenThrow(failingException);

	closeableRegistry.registerCloseable(operatorStateBackend);
	closeableRegistry.registerCloseable(keyedStateBackend);

	Whitebox.setInternalState(operator, "operatorStateBackend", operatorStateBackend);
	Whitebox.setInternalState(operator, "keyedStateBackend", keyedStateBackend);

	try {
		operator.snapshotState(
				checkpointId,
				timestamp,
				CheckpointOptions.forCheckpointWithDefaultLocation(),
				new MemCheckpointStreamFactory(Integer.MAX_VALUE));
		fail("Exception expected.");
	} catch (Exception e) {
		assertEquals(failingException, e.getCause());
	}

	// verify that the context has been closed, the operator snapshot result has been cancelled
	// and that all futures have been cancelled.
	verify(context).close();
	verify(operatorSnapshotResult).cancel();

	verify(futureKeyedStateHandle).cancel(anyBoolean());
	verify(futureOperatorStateHandle).cancel(anyBoolean());
	verify(futureKeyedStateHandle).cancel(anyBoolean());

	operator.close();

	operator.dispose();

	verify(operatorStateBackend).close();
	verify(keyedStateBackend).close();
	verify(operatorStateBackend).dispose();
	verify(keyedStateBackend).dispose();
}
 
示例20
@Test
public void testUseRestoredStateForSnapshotIfFetcherNotInitialized() throws Exception {
	Properties config = TestUtils.getStandardProperties();

	List<Tuple2<StreamShardMetadata, SequenceNumber>> globalUnionState = new ArrayList<>(4);
	globalUnionState.add(Tuple2.of(
		KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
		new SequenceNumber("1")));
	globalUnionState.add(Tuple2.of(
		KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
		new SequenceNumber("1")));
	globalUnionState.add(Tuple2.of(
		KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
		new SequenceNumber("1")));
	globalUnionState.add(Tuple2.of(
		KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(3)))),
		new SequenceNumber("1")));

	TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
	for (Tuple2<StreamShardMetadata, SequenceNumber> state : globalUnionState) {
		listState.add(state);
	}

	FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);
	RuntimeContext context = mock(RuntimeContext.class);
	when(context.getIndexOfThisSubtask()).thenReturn(0);
	when(context.getNumberOfParallelSubtasks()).thenReturn(2);
	consumer.setRuntimeContext(context);

	OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
	when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);

	StateInitializationContext initializationContext = mock(StateInitializationContext.class);
	when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
	when(initializationContext.isRestored()).thenReturn(true);

	consumer.initializeState(initializationContext);

	// only opened, not run
	consumer.open(new Configuration());

	// arbitrary checkpoint id and timestamp
	consumer.snapshotState(new StateSnapshotContextSynchronousImpl(123, 123));

	assertTrue(listState.isClearCalled());

	// the checkpointed list state should contain only the shards that it should subscribe to
	assertEquals(globalUnionState.size() / 2, listState.getList().size());
	assertTrue(listState.getList().contains(globalUnionState.get(0)));
	assertTrue(listState.getList().contains(globalUnionState.get(2)));
}
 
示例21
/**
 * Tests that a failing snapshot method call to the keyed state backend will trigger the closing
 * of the StateSnapshotContextSynchronousImpl and the cancellation of the
 * OperatorSnapshotResult. The latter is supposed to also cancel all assigned futures.
 */
@Test
public void testFailingBackendSnapshotMethod() throws Exception {
	final long checkpointId = 42L;
	final long timestamp = 1L;

	final Exception failingException = new Exception("Test exception");

	final CloseableRegistry closeableRegistry = new CloseableRegistry();

	RunnableFuture<SnapshotResult<KeyedStateHandle>> futureKeyedStateHandle = mock(RunnableFuture.class);
	RunnableFuture<SnapshotResult<OperatorStateHandle>> futureOperatorStateHandle = mock(RunnableFuture.class);

	StateSnapshotContextSynchronousImpl context = spy(new StateSnapshotContextSynchronousImpl(checkpointId, timestamp));
	when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyedStateHandle);
	when(context.getOperatorStateStreamFuture()).thenReturn(futureOperatorStateHandle);

	OperatorSnapshotFutures operatorSnapshotResult = spy(new OperatorSnapshotFutures());

	whenNew(StateSnapshotContextSynchronousImpl.class)
		.withArguments(
			anyLong(),
			anyLong(),
			any(CheckpointStreamFactory.class),
			nullable(KeyGroupRange.class),
			any(CloseableRegistry.class))
		.thenReturn(context);
	whenNew(OperatorSnapshotFutures.class).withAnyArguments().thenReturn(operatorSnapshotResult);

	StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
	when(containingTask.getCancelables()).thenReturn(closeableRegistry);

	AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
	when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenCallRealMethod();

	doCallRealMethod().when(operator).close();
	doCallRealMethod().when(operator).dispose();

	doReturn(containingTask).when(operator).getContainingTask();

	RunnableFuture<SnapshotResult<OperatorStateHandle>> futureManagedOperatorStateHandle = mock(RunnableFuture.class);

	OperatorStateBackend operatorStateBackend = mock(OperatorStateBackend.class);
	when(operatorStateBackend.snapshot(
		eq(checkpointId),
		eq(timestamp),
		any(CheckpointStreamFactory.class),
		any(CheckpointOptions.class))).thenReturn(futureManagedOperatorStateHandle);

	AbstractKeyedStateBackend<?> keyedStateBackend = mock(AbstractKeyedStateBackend.class);
	when(keyedStateBackend.snapshot(
		eq(checkpointId),
		eq(timestamp),
		any(CheckpointStreamFactory.class),
		eq(CheckpointOptions.forCheckpointWithDefaultLocation()))).thenThrow(failingException);

	closeableRegistry.registerCloseable(operatorStateBackend);
	closeableRegistry.registerCloseable(keyedStateBackend);

	Whitebox.setInternalState(operator, "operatorStateBackend", operatorStateBackend);
	Whitebox.setInternalState(operator, "keyedStateBackend", keyedStateBackend);

	try {
		operator.snapshotState(
				checkpointId,
				timestamp,
				CheckpointOptions.forCheckpointWithDefaultLocation(),
				new MemCheckpointStreamFactory(Integer.MAX_VALUE));
		fail("Exception expected.");
	} catch (Exception e) {
		assertEquals(failingException, e.getCause());
	}

	// verify that the context has been closed, the operator snapshot result has been cancelled
	// and that all futures have been cancelled.
	verify(operatorSnapshotResult).cancel();

	verify(futureKeyedStateHandle).cancel(anyBoolean());
	verify(futureOperatorStateHandle).cancel(anyBoolean());
	verify(futureKeyedStateHandle).cancel(anyBoolean());

	operator.close();

	operator.dispose();

	verify(operatorStateBackend).close();
	verify(keyedStateBackend).close();
	verify(operatorStateBackend).dispose();
	verify(keyedStateBackend).dispose();
}
 
示例22
/**
 * Tests that closing the StateSnapshotContextSynchronousImpl will also close the associated
 * output streams.
 */
@Test
public void testStreamClosingWhenClosing() throws Exception {
	long checkpointId = 42L;
	long checkpointTimestamp = 1L;

	CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
	CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);

	CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
	when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)).thenReturn(outputStream1, outputStream2);

	InsightCloseableRegistry closableRegistry = new InsightCloseableRegistry();

	KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);

	StateSnapshotContextSynchronousImpl context = new StateSnapshotContextSynchronousImpl(
		checkpointId,
		checkpointTimestamp,
		streamFactory,
		keyGroupRange,
		closableRegistry);

	// creating the output streams
	context.getRawKeyedOperatorStateOutput();
	context.getRawOperatorStateOutput();

	verify(streamFactory, times(2)).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);

	assertEquals(2, closableRegistry.size());
	assertTrue(closableRegistry.contains(outputStream1));
	assertTrue(closableRegistry.contains(outputStream2));

	context.getKeyedStateStreamFuture().run();
	context.getOperatorStateStreamFuture().run();

	verify(outputStream1).closeAndGetHandle();
	verify(outputStream2).closeAndGetHandle();

	assertEquals(0, closableRegistry.size());
}
 
示例23
@Test
public void testUseRestoredStateForSnapshotIfFetcherNotInitialized() throws Exception {
	Properties config = TestUtils.getStandardProperties();

	List<Tuple2<StreamShardMetadata, SequenceNumber>> globalUnionState = new ArrayList<>(4);
	globalUnionState.add(Tuple2.of(
		KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
		new SequenceNumber("1")));
	globalUnionState.add(Tuple2.of(
		KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
		new SequenceNumber("1")));
	globalUnionState.add(Tuple2.of(
		KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
		new SequenceNumber("1")));
	globalUnionState.add(Tuple2.of(
		KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(3)))),
		new SequenceNumber("1")));

	TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
	for (Tuple2<StreamShardMetadata, SequenceNumber> state : globalUnionState) {
		listState.add(state);
	}

	FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);
	RuntimeContext context = mock(RuntimeContext.class);
	when(context.getIndexOfThisSubtask()).thenReturn(0);
	when(context.getNumberOfParallelSubtasks()).thenReturn(2);
	consumer.setRuntimeContext(context);

	OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
	when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);

	StateInitializationContext initializationContext = mock(StateInitializationContext.class);
	when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
	when(initializationContext.isRestored()).thenReturn(true);

	consumer.initializeState(initializationContext);

	// only opened, not run
	consumer.open(new Configuration());

	// arbitrary checkpoint id and timestamp
	consumer.snapshotState(new StateSnapshotContextSynchronousImpl(123, 123));

	assertTrue(listState.isClearCalled());

	// the checkpointed list state should contain only the shards that it should subscribe to
	assertEquals(globalUnionState.size() / 2, listState.getList().size());
	assertTrue(listState.getList().contains(globalUnionState.get(0)));
	assertTrue(listState.getList().contains(globalUnionState.get(2)));
}
 
示例24
/**
 * Tests that closing the StateSnapshotContextSynchronousImpl will also close the associated
 * output streams.
 */
@Test
public void testStreamClosingWhenClosing() throws Exception {
	long checkpointId = 42L;
	long checkpointTimestamp = 1L;

	CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
	CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);

	CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
	when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)).thenReturn(outputStream1, outputStream2);

	InsightCloseableRegistry closableRegistry = new InsightCloseableRegistry();

	KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);

	StateSnapshotContextSynchronousImpl context = new StateSnapshotContextSynchronousImpl(
		checkpointId,
		checkpointTimestamp,
		streamFactory,
		keyGroupRange,
		closableRegistry);

	// creating the output streams
	context.getRawKeyedOperatorStateOutput();
	context.getRawOperatorStateOutput();

	verify(streamFactory, times(2)).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);

	assertEquals(2, closableRegistry.size());
	assertTrue(closableRegistry.contains(outputStream1));
	assertTrue(closableRegistry.contains(outputStream2));

	context.getKeyedStateStreamFuture().run();
	context.getOperatorStateStreamFuture().run();

	verify(outputStream1).closeAndGetHandle();
	verify(outputStream2).closeAndGetHandle();

	assertEquals(0, closableRegistry.size());
}