Java源码示例:org.apache.flink.streaming.api.windowing.assigners.DynamicEventTimeSessionWindows

示例1
@Test
public void testMergeCoveringWindow() {
	MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
	SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
	when(extractor.extract(any())).thenReturn(5000L);

	DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);

	assigner.mergeWindows(
			Lists.newArrayList(
					new TimeWindow(1, 1),
					new TimeWindow(0, 2),
					new TimeWindow(4, 7),
					new TimeWindow(5, 6)),
			callback);

	verify(callback, times(1)).merge(
			(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(1, 1), new TimeWindow(0, 2))),
			eq(new TimeWindow(0, 2)));

	verify(callback, times(1)).merge(
			(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(5, 6), new TimeWindow(4, 7))),
			eq(new TimeWindow(4, 7)));

	verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject());
}
 
示例2
@Test
public void testMergeCoveringWindow() {
	MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
	SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
	when(extractor.extract(any())).thenReturn(5000L);

	DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);

	assigner.mergeWindows(
			Lists.newArrayList(
					new TimeWindow(1, 1),
					new TimeWindow(0, 2),
					new TimeWindow(4, 7),
					new TimeWindow(5, 6)),
			callback);

	verify(callback, times(1)).merge(
			(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(1, 1), new TimeWindow(0, 2))),
			eq(new TimeWindow(0, 2)));

	verify(callback, times(1)).merge(
			(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(5, 6), new TimeWindow(4, 7))),
			eq(new TimeWindow(4, 7)));

	verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject());
}
 
示例3
@Test
public void testMergeCoveringWindow() {
	MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
	SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
	when(extractor.extract(any())).thenReturn(5000L);

	DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);

	assigner.mergeWindows(
			Lists.newArrayList(
					new TimeWindow(1, 1),
					new TimeWindow(0, 2),
					new TimeWindow(4, 7),
					new TimeWindow(5, 6)),
			callback);

	verify(callback, times(1)).merge(
			(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(1, 1), new TimeWindow(0, 2))),
			eq(new TimeWindow(0, 2)));

	verify(callback, times(1)).merge(
			(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(5, 6), new TimeWindow(4, 7))),
			eq(new TimeWindow(4, 7)));

	verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject());
}
 
示例4
@Test
public void testDynamicGapProperties() {
	SessionWindowTimeGapExtractor<String> extractor = mock(SessionWindowTimeGapExtractor.class);
	DynamicEventTimeSessionWindows<String> assigner = EventTimeSessionWindows.withDynamicGap(extractor);

	assertNotNull(assigner);
	assertTrue(assigner.isEventTime());
}
 
示例5
@Test
public void testWindowAssignment() {

	WindowAssigner.WindowAssignerContext mockContext = mock(WindowAssigner.WindowAssignerContext.class);
	SessionWindowTimeGapExtractor<String> extractor = mock(SessionWindowTimeGapExtractor.class);
	when(extractor.extract(eq("gap5000"))).thenReturn(5000L);
	when(extractor.extract(eq("gap4000"))).thenReturn(4000L);
	when(extractor.extract(eq("gap9000"))).thenReturn(9000L);

	DynamicEventTimeSessionWindows<String> assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);

	assertThat(assigner.assignWindows("gap5000", 0L, mockContext), contains(timeWindow(0, 5000)));
	assertThat(assigner.assignWindows("gap4000", 4999L, mockContext), contains(timeWindow(4999, 8999)));
	assertThat(assigner.assignWindows("gap9000", 5000L, mockContext), contains(timeWindow(5000, 14000)));
}
 
示例6
@Test
public void testMergeSinglePointWindow() {
	MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
	SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
	when(extractor.extract(any())).thenReturn(5000L);

	DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);

	assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 0)), callback);

	verify(callback, never()).merge(anyCollection(), Matchers.anyObject());
}
 
示例7
@Test
public void testMergeSingleWindow() {
	MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
	SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
	when(extractor.extract(any())).thenReturn(5000L);

	DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);

	assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 1)), callback);

	verify(callback, never()).merge(anyCollection(), Matchers.anyObject());
}
 
示例8
@Test
public void testMergeConsecutiveWindows() {
	MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
	SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
	when(extractor.extract(any())).thenReturn(5000L);

	DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);

	assigner.mergeWindows(
			Lists.newArrayList(
					new TimeWindow(0, 1),
					new TimeWindow(1, 2),
					new TimeWindow(2, 3),
					new TimeWindow(4, 5),
					new TimeWindow(5, 6)),
			callback);

	verify(callback, times(1)).merge(
			(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(0, 1), new TimeWindow(1, 2), new TimeWindow(2, 3))),
			eq(new TimeWindow(0, 3)));

	verify(callback, times(1)).merge(
			(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(4, 5), new TimeWindow(5, 6))),
			eq(new TimeWindow(4, 6)));

	verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject());
}
 
示例9
@Test
public void testProperties() {
	SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
	when(extractor.extract(any())).thenReturn(5000L);

	DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);

	assertTrue(assigner.isEventTime());
	assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
	assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(EventTimeTrigger.class));
}
 
示例10
@Test
public void testDynamicGapProperties() {
	SessionWindowTimeGapExtractor<String> extractor = mock(SessionWindowTimeGapExtractor.class);
	DynamicEventTimeSessionWindows<String> assigner = EventTimeSessionWindows.withDynamicGap(extractor);

	assertNotNull(assigner);
	assertTrue(assigner.isEventTime());
}
 
示例11
@Test
public void testWindowAssignment() {

	WindowAssigner.WindowAssignerContext mockContext = mock(WindowAssigner.WindowAssignerContext.class);
	SessionWindowTimeGapExtractor<String> extractor = mock(SessionWindowTimeGapExtractor.class);
	when(extractor.extract(eq("gap5000"))).thenReturn(5000L);
	when(extractor.extract(eq("gap4000"))).thenReturn(4000L);
	when(extractor.extract(eq("gap9000"))).thenReturn(9000L);

	DynamicEventTimeSessionWindows<String> assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);

	assertThat(assigner.assignWindows("gap5000", 0L, mockContext), contains(timeWindow(0, 5000)));
	assertThat(assigner.assignWindows("gap4000", 4999L, mockContext), contains(timeWindow(4999, 8999)));
	assertThat(assigner.assignWindows("gap9000", 5000L, mockContext), contains(timeWindow(5000, 14000)));
}
 
示例12
@Test
public void testMergeSinglePointWindow() {
	MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
	SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
	when(extractor.extract(any())).thenReturn(5000L);

	DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);

	assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 0)), callback);

	verify(callback, never()).merge(anyCollection(), Matchers.anyObject());
}
 
示例13
@Test
public void testMergeSingleWindow() {
	MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
	SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
	when(extractor.extract(any())).thenReturn(5000L);

	DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);

	assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 1)), callback);

	verify(callback, never()).merge(anyCollection(), Matchers.anyObject());
}
 
示例14
@Test
public void testMergeConsecutiveWindows() {
	MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
	SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
	when(extractor.extract(any())).thenReturn(5000L);

	DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);

	assigner.mergeWindows(
			Lists.newArrayList(
					new TimeWindow(0, 1),
					new TimeWindow(1, 2),
					new TimeWindow(2, 3),
					new TimeWindow(4, 5),
					new TimeWindow(5, 6)),
			callback);

	verify(callback, times(1)).merge(
			(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(0, 1), new TimeWindow(1, 2), new TimeWindow(2, 3))),
			eq(new TimeWindow(0, 3)));

	verify(callback, times(1)).merge(
			(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(4, 5), new TimeWindow(5, 6))),
			eq(new TimeWindow(4, 6)));

	verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject());
}
 
示例15
@Test
public void testProperties() {
	SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
	when(extractor.extract(any())).thenReturn(5000L);

	DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);

	assertTrue(assigner.isEventTime());
	assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
	assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(EventTimeTrigger.class));
}
 
示例16
@Test
public void testDynamicGapProperties() {
	SessionWindowTimeGapExtractor<String> extractor = mock(SessionWindowTimeGapExtractor.class);
	DynamicEventTimeSessionWindows<String> assigner = EventTimeSessionWindows.withDynamicGap(extractor);

	assertNotNull(assigner);
	assertTrue(assigner.isEventTime());
}
 
示例17
@Test
public void testWindowAssignment() {

	WindowAssigner.WindowAssignerContext mockContext = mock(WindowAssigner.WindowAssignerContext.class);
	SessionWindowTimeGapExtractor<String> extractor = mock(SessionWindowTimeGapExtractor.class);
	when(extractor.extract(eq("gap5000"))).thenReturn(5000L);
	when(extractor.extract(eq("gap4000"))).thenReturn(4000L);
	when(extractor.extract(eq("gap9000"))).thenReturn(9000L);

	DynamicEventTimeSessionWindows<String> assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);

	assertThat(assigner.assignWindows("gap5000", 0L, mockContext), contains(timeWindow(0, 5000)));
	assertThat(assigner.assignWindows("gap4000", 4999L, mockContext), contains(timeWindow(4999, 8999)));
	assertThat(assigner.assignWindows("gap9000", 5000L, mockContext), contains(timeWindow(5000, 14000)));
}
 
示例18
@Test
public void testMergeSinglePointWindow() {
	MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
	SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
	when(extractor.extract(any())).thenReturn(5000L);

	DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);

	assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 0)), callback);

	verify(callback, never()).merge(anyCollection(), Matchers.anyObject());
}
 
示例19
@Test
public void testMergeSingleWindow() {
	MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
	SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
	when(extractor.extract(any())).thenReturn(5000L);

	DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);

	assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 1)), callback);

	verify(callback, never()).merge(anyCollection(), Matchers.anyObject());
}
 
示例20
@Test
public void testMergeConsecutiveWindows() {
	MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
	SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
	when(extractor.extract(any())).thenReturn(5000L);

	DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);

	assigner.mergeWindows(
			Lists.newArrayList(
					new TimeWindow(0, 1),
					new TimeWindow(1, 2),
					new TimeWindow(2, 3),
					new TimeWindow(4, 5),
					new TimeWindow(5, 6)),
			callback);

	verify(callback, times(1)).merge(
			(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(0, 1), new TimeWindow(1, 2), new TimeWindow(2, 3))),
			eq(new TimeWindow(0, 3)));

	verify(callback, times(1)).merge(
			(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(4, 5), new TimeWindow(5, 6))),
			eq(new TimeWindow(4, 6)));

	verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject());
}
 
示例21
@Test
public void testProperties() {
	SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
	when(extractor.extract(any())).thenReturn(5000L);

	DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);

	assertTrue(assigner.isEventTime());
	assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
	assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(EventTimeTrigger.class));
}
 
示例22
@Test
@SuppressWarnings("unchecked")
public void testDynamicEventTimeSessionWindows() throws Exception {
	closeCalled.set(0);

	SessionWindowTimeGapExtractor<Tuple2<String, Integer>> extractor = mock(SessionWindowTimeGapExtractor.class);
	when(extractor.extract(any(Tuple2.class))).thenAnswer(invocation -> {
		Tuple2<String, Integer> element = (Tuple2<String, Integer>) invocation.getArguments()[0];
		switch (element.f0) {
			case "key1":
				return 3000L;
			case "key2":
				switch (element.f1) {
					case 10:
						return 1000L;
					default:
						return 2000L;
				}
			default:
				return 0L;
		}
	});

	ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
			STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

	WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
		DynamicEventTimeSessionWindows.withDynamicGap(extractor),
		new TimeWindow.Serializer(),
		new TupleKeySelector(),
		BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
		stateDesc,
		new InternalIterableWindowFunction<>(new SessionWindowFunction()),
		EventTimeTrigger.create(),
		0,
		null /* late data output tag */);

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
		createTestHarness(operator);

	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	testHarness.open();

	// test different gaps for different keys
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 10));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));

	testHarness.processWatermark(new Watermark(8999));

	expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 3010L), 3009));
	expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-9", 5000L, 8000L), 7999));
	expectedOutput.add(new Watermark(8999));

	// test gap when it produces an end time before current timeout
	// the furthest timeout is respected
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 9000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 10000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 10500));

	testHarness.processWatermark(new Watermark(12999));

	expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-13", 9000L, 12000L), 11999));
	expectedOutput.add(new Watermark(12999));

	// test gap when it produces an end time after current timeout
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 13000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 13500));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14000));

	testHarness.processWatermark(new Watermark(16999));

	expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-21", 13000L, 16000L), 15999));
	expectedOutput.add(new Watermark(16999));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());

	testHarness.close();
}
 
示例23
@Test
@SuppressWarnings("unchecked")
public void testDynamicEventTimeSessionWindows() throws Exception {
	closeCalled.set(0);

	SessionWindowTimeGapExtractor<Tuple2<String, Integer>> extractor = mock(SessionWindowTimeGapExtractor.class);
	when(extractor.extract(any(Tuple2.class))).thenAnswer(invocation -> {
		Tuple2<String, Integer> element = (Tuple2<String, Integer>) invocation.getArguments()[0];
		switch (element.f0) {
			case "key1":
				return 3000L;
			case "key2":
				switch (element.f1) {
					case 10:
						return 1000L;
					default:
						return 2000L;
				}
			default:
				return 0L;
		}
	});

	ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
			STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

	WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
		DynamicEventTimeSessionWindows.withDynamicGap(extractor),
		new TimeWindow.Serializer(),
		new TupleKeySelector(),
		BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
		stateDesc,
		new InternalIterableWindowFunction<>(new SessionWindowFunction()),
		EventTimeTrigger.create(),
		0,
		null /* late data output tag */);

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
		createTestHarness(operator);

	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	testHarness.open();

	// test different gaps for different keys
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 10));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));

	testHarness.processWatermark(new Watermark(8999));

	expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 3010L), 3009));
	expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-9", 5000L, 8000L), 7999));
	expectedOutput.add(new Watermark(8999));

	// test gap when it produces an end time before current timeout
	// the furthest timeout is respected
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 9000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 10000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 10500));

	testHarness.processWatermark(new Watermark(12999));

	expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-13", 9000L, 12000L), 11999));
	expectedOutput.add(new Watermark(12999));

	// test gap when it produces an end time after current timeout
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 13000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 13500));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14000));

	testHarness.processWatermark(new Watermark(16999));

	expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-21", 13000L, 16000L), 15999));
	expectedOutput.add(new Watermark(16999));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());

	testHarness.close();
}
 
示例24
@Test
@SuppressWarnings("unchecked")
public void testDynamicEventTimeSessionWindows() throws Exception {
	closeCalled.set(0);

	SessionWindowTimeGapExtractor<Tuple2<String, Integer>> extractor = mock(SessionWindowTimeGapExtractor.class);
	when(extractor.extract(any(Tuple2.class))).thenAnswer(invocation -> {
		Tuple2<String, Integer> element = (Tuple2<String, Integer>) invocation.getArguments()[0];
		switch (element.f0) {
			case "key1":
				return 3000L;
			case "key2":
				switch (element.f1) {
					case 10:
						return 1000L;
					default:
						return 2000L;
				}
			default:
				return 0L;
		}
	});

	ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
			STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

	WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
		DynamicEventTimeSessionWindows.withDynamicGap(extractor),
		new TimeWindow.Serializer(),
		new TupleKeySelector(),
		BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
		stateDesc,
		new InternalIterableWindowFunction<>(new SessionWindowFunction()),
		EventTimeTrigger.create(),
		0,
		null /* late data output tag */);

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
		createTestHarness(operator);

	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	testHarness.open();

	// test different gaps for different keys
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 10));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));

	testHarness.processWatermark(new Watermark(8999));

	expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 3010L), 3009));
	expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-9", 5000L, 8000L), 7999));
	expectedOutput.add(new Watermark(8999));

	// test gap when it produces an end time before current timeout
	// the furthest timeout is respected
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 9000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 10000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 10500));

	testHarness.processWatermark(new Watermark(12999));

	expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-13", 9000L, 12000L), 11999));
	expectedOutput.add(new Watermark(12999));

	// test gap when it produces an end time after current timeout
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 13000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 13500));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14000));

	testHarness.processWatermark(new Watermark(16999));

	expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-21", 13000L, 16000L), 15999));
	expectedOutput.add(new Watermark(16999));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());

	testHarness.close();
}