Java源码示例:org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable

示例1
@Test
public void testPartitionCustomOperatorPreservesFields() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple2<Long, Long>> data = env.fromCollection(Collections.singleton(new Tuple2<>(0L, 0L)));
		
		data.partitionCustom(new Partitioner<Long>() {
				public int partition(Long key, int numPartitions) { return key.intValue(); }
			}, 1)
			.groupBy(1)
			.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Long, Long>>())
			.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode partitioner = (SingleInputPlanNode) reducer.getInput().getSource();

		assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例2
@Test
public void testRangePartitionOperatorPreservesFields() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		DataSet<Tuple2<Long, Long>> data = env.fromCollection(Collections.singleton(new Tuple2<>(0L, 0L)));

		data.partitionByRange(1)
			.groupBy(1)
			.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Long,Long>>())
			.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());

		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);

		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode partitionNode = (SingleInputPlanNode)reducer.getInput().getSource();
		SingleInputPlanNode partitionIDRemover = (SingleInputPlanNode) partitionNode.getInput().getSource();

		assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, partitionNode.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitionIDRemover.getInput().getShipStrategy());

		SourcePlanNode sourcePlanNode = op.getDataSources().iterator().next();
		List<Channel> sourceOutgoingChannels = sourcePlanNode.getOutgoingChannels();
		assertEquals(2, sourceOutgoingChannels.size());
		assertEquals(ShipStrategyType.FORWARD, sourceOutgoingChannels.get(0).getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, sourceOutgoingChannels.get(1).getShipStrategy());
		assertEquals(DataExchangeMode.PIPELINED, sourceOutgoingChannels.get(0).getDataExchangeMode());
		assertEquals(DataExchangeMode.BATCH, sourceOutgoingChannels.get(1).getDataExchangeMode());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例3
@Test
public void testCustomPartitioningTupleGroupReduce() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
				.rebalance().setParallelism(4);
		
		data.groupBy(0).withPartitioner(new TestPartitionerInt())
			.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
			.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
		
		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例4
@Test
public void testCustomPartitioningTupleGroupReduceSorted() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
				.rebalance().setParallelism(4);
		
		data.groupBy(0).withPartitioner(new TestPartitionerInt())
			.sortGroup(1, Order.ASCENDING)
			.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
			.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
		
		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例5
@Test
public void testCustomPartitioningTupleGroupReduceSorted2() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple4<Integer,Integer,Integer, Integer>> data = env.fromElements(new Tuple4<Integer,Integer,Integer,Integer>(0, 0, 0, 0))
				.rebalance().setParallelism(4);
		
		data.groupBy(0).withPartitioner(new TestPartitionerInt())
			.sortGroup(1, Order.ASCENDING)
			.sortGroup(2, Order.DESCENDING)
			.reduceGroup(new IdentityGroupReducerCombinable<Tuple4<Integer,Integer,Integer,Integer>>())
			.output(new DiscardingOutputFormat<Tuple4<Integer, Integer, Integer, Integer>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
		
		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例6
@Test
public void testCustomPartitioningTupleGroupReduce() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		DataSet<Pojo2> data = env.fromElements(new Pojo2())
				.rebalance().setParallelism(4);

		data.groupBy("a").withPartitioner(new TestPartitionerInt())
				.reduceGroup(new IdentityGroupReducerCombinable<Pojo2>())
				.output(new DiscardingOutputFormat<Pojo2>());

		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);

		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();

		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例7
@Test
public void testCustomPartitioningTupleGroupReduceSorted() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		DataSet<Pojo3> data = env.fromElements(new Pojo3())
				.rebalance().setParallelism(4);

		data.groupBy("a").withPartitioner(new TestPartitionerInt())
				.sortGroup("b", Order.ASCENDING)
				.reduceGroup(new IdentityGroupReducerCombinable<Pojo3>())
				.output(new DiscardingOutputFormat<Pojo3>());

		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);

		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();

		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例8
@Test
public void testCustomPartitioningTupleGroupReduceSorted2() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		DataSet<Pojo4> data = env.fromElements(new Pojo4())
				.rebalance().setParallelism(4);

		data.groupBy("a").withPartitioner(new TestPartitionerInt())
				.sortGroup("b", Order.ASCENDING)
				.sortGroup("c", Order.DESCENDING)
				.reduceGroup(new IdentityGroupReducerCombinable<Pojo4>())
				.output(new DiscardingOutputFormat<Pojo4>());

		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);

		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();

		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例9
@Test
public void testCustomPartitioningKeySelectorGroupReduce() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
				.rebalance().setParallelism(4);
		
		data.groupBy(new TestKeySelector<Tuple2<Integer,Integer>>())
			.withPartitioner(new TestPartitionerInt())
				.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
			.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
		
		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例10
@Test
public void testCustomPartitioningKeySelectorGroupReduceSorted() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
				.rebalance().setParallelism(4);
		
		data.groupBy(new TestKeySelector<Tuple3<Integer,Integer,Integer>>())
			.withPartitioner(new TestPartitionerInt())
			.sortGroup(new TestKeySelector<Tuple3<Integer, Integer, Integer>>(), Order.ASCENDING)
			.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
			.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
		
		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例11
@Test
public void testPartitionCustomOperatorPreservesFields() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple2<Long, Long>> data = env.fromCollection(Collections.singleton(new Tuple2<>(0L, 0L)));
		
		data.partitionCustom(new Partitioner<Long>() {
				public int partition(Long key, int numPartitions) { return key.intValue(); }
			}, 1)
			.groupBy(1)
			.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Long, Long>>())
			.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode partitioner = (SingleInputPlanNode) reducer.getInput().getSource();

		assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例12
@Test
public void testRangePartitionOperatorPreservesFields() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		DataSet<Tuple2<Long, Long>> data = env.fromCollection(Collections.singleton(new Tuple2<>(0L, 0L)));

		data.partitionByRange(1)
			.groupBy(1)
			.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Long,Long>>())
			.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());

		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);

		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode partitionNode = (SingleInputPlanNode)reducer.getInput().getSource();
		SingleInputPlanNode partitionIDRemover = (SingleInputPlanNode) partitionNode.getInput().getSource();

		assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, partitionNode.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitionIDRemover.getInput().getShipStrategy());

		SourcePlanNode sourcePlanNode = op.getDataSources().iterator().next();
		List<Channel> sourceOutgoingChannels = sourcePlanNode.getOutgoingChannels();
		assertEquals(2, sourceOutgoingChannels.size());
		assertEquals(ShipStrategyType.FORWARD, sourceOutgoingChannels.get(0).getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, sourceOutgoingChannels.get(1).getShipStrategy());
		assertEquals(DataExchangeMode.PIPELINED, sourceOutgoingChannels.get(0).getDataExchangeMode());
		assertEquals(DataExchangeMode.BATCH, sourceOutgoingChannels.get(1).getDataExchangeMode());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例13
@Test
public void testCustomPartitioningTupleGroupReduce() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
				.rebalance().setParallelism(4);
		
		data.groupBy(0).withPartitioner(new TestPartitionerInt())
			.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
			.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
		
		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例14
@Test
public void testCustomPartitioningTupleGroupReduceSorted() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
				.rebalance().setParallelism(4);
		
		data.groupBy(0).withPartitioner(new TestPartitionerInt())
			.sortGroup(1, Order.ASCENDING)
			.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
			.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
		
		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例15
@Test
public void testCustomPartitioningTupleGroupReduceSorted2() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple4<Integer,Integer,Integer, Integer>> data = env.fromElements(new Tuple4<Integer,Integer,Integer,Integer>(0, 0, 0, 0))
				.rebalance().setParallelism(4);
		
		data.groupBy(0).withPartitioner(new TestPartitionerInt())
			.sortGroup(1, Order.ASCENDING)
			.sortGroup(2, Order.DESCENDING)
			.reduceGroup(new IdentityGroupReducerCombinable<Tuple4<Integer,Integer,Integer,Integer>>())
			.output(new DiscardingOutputFormat<Tuple4<Integer, Integer, Integer, Integer>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
		
		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例16
@Test
public void testCustomPartitioningTupleGroupReduce() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		DataSet<Pojo2> data = env.fromElements(new Pojo2())
				.rebalance().setParallelism(4);

		data.groupBy("a").withPartitioner(new TestPartitionerInt())
				.reduceGroup(new IdentityGroupReducerCombinable<Pojo2>())
				.output(new DiscardingOutputFormat<Pojo2>());

		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);

		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();

		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例17
@Test
public void testCustomPartitioningTupleGroupReduceSorted() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		DataSet<Pojo3> data = env.fromElements(new Pojo3())
				.rebalance().setParallelism(4);

		data.groupBy("a").withPartitioner(new TestPartitionerInt())
				.sortGroup("b", Order.ASCENDING)
				.reduceGroup(new IdentityGroupReducerCombinable<Pojo3>())
				.output(new DiscardingOutputFormat<Pojo3>());

		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);

		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();

		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例18
@Test
public void testCustomPartitioningTupleGroupReduceSorted2() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		DataSet<Pojo4> data = env.fromElements(new Pojo4())
				.rebalance().setParallelism(4);

		data.groupBy("a").withPartitioner(new TestPartitionerInt())
				.sortGroup("b", Order.ASCENDING)
				.sortGroup("c", Order.DESCENDING)
				.reduceGroup(new IdentityGroupReducerCombinable<Pojo4>())
				.output(new DiscardingOutputFormat<Pojo4>());

		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);

		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();

		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例19
@Test
public void testCustomPartitioningKeySelectorGroupReduce() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
				.rebalance().setParallelism(4);
		
		data.groupBy(new TestKeySelector<Tuple2<Integer,Integer>>())
			.withPartitioner(new TestPartitionerInt())
				.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
			.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
		
		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例20
@Test
public void testCustomPartitioningKeySelectorGroupReduceSorted() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
				.rebalance().setParallelism(4);
		
		data.groupBy(new TestKeySelector<Tuple3<Integer,Integer,Integer>>())
			.withPartitioner(new TestPartitionerInt())
			.sortGroup(new TestKeySelector<Tuple3<Integer, Integer, Integer>>(), Order.ASCENDING)
			.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
			.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
		
		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例21
@Test
public void testPartitionCustomOperatorPreservesFields() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple2<Long, Long>> data = env.fromCollection(Collections.singleton(new Tuple2<>(0L, 0L)));
		
		data.partitionCustom(new Partitioner<Long>() {
				public int partition(Long key, int numPartitions) { return key.intValue(); }
			}, 1)
			.groupBy(1)
			.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Long, Long>>())
			.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode partitioner = (SingleInputPlanNode) reducer.getInput().getSource();

		assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例22
@Test
public void testRangePartitionOperatorPreservesFields() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		DataSet<Tuple2<Long, Long>> data = env.fromCollection(Collections.singleton(new Tuple2<>(0L, 0L)));

		data.partitionByRange(1)
			.groupBy(1)
			.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Long,Long>>())
			.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());

		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);

		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode partitionNode = (SingleInputPlanNode)reducer.getInput().getSource();
		SingleInputPlanNode partitionIDRemover = (SingleInputPlanNode) partitionNode.getInput().getSource();

		assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, partitionNode.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitionIDRemover.getInput().getShipStrategy());

		SourcePlanNode sourcePlanNode = op.getDataSources().iterator().next();
		List<Channel> sourceOutgoingChannels = sourcePlanNode.getOutgoingChannels();
		assertEquals(2, sourceOutgoingChannels.size());
		assertEquals(ShipStrategyType.FORWARD, sourceOutgoingChannels.get(0).getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, sourceOutgoingChannels.get(1).getShipStrategy());
		assertEquals(DataExchangeMode.PIPELINED, sourceOutgoingChannels.get(0).getDataExchangeMode());
		assertEquals(DataExchangeMode.BATCH, sourceOutgoingChannels.get(1).getDataExchangeMode());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例23
@Test
public void testCustomPartitioningTupleGroupReduce() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
				.rebalance().setParallelism(4);
		
		data.groupBy(0).withPartitioner(new TestPartitionerInt())
			.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
			.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
		
		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例24
@Test
public void testCustomPartitioningTupleGroupReduceSorted() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
				.rebalance().setParallelism(4);
		
		data.groupBy(0).withPartitioner(new TestPartitionerInt())
			.sortGroup(1, Order.ASCENDING)
			.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
			.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
		
		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例25
@Test
public void testCustomPartitioningTupleGroupReduceSorted2() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple4<Integer,Integer,Integer, Integer>> data = env.fromElements(new Tuple4<Integer,Integer,Integer,Integer>(0, 0, 0, 0))
				.rebalance().setParallelism(4);
		
		data.groupBy(0).withPartitioner(new TestPartitionerInt())
			.sortGroup(1, Order.ASCENDING)
			.sortGroup(2, Order.DESCENDING)
			.reduceGroup(new IdentityGroupReducerCombinable<Tuple4<Integer,Integer,Integer,Integer>>())
			.output(new DiscardingOutputFormat<Tuple4<Integer, Integer, Integer, Integer>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
		
		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例26
@Test
public void testCustomPartitioningTupleGroupReduce() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		DataSet<Pojo2> data = env.fromElements(new Pojo2())
				.rebalance().setParallelism(4);

		data.groupBy("a").withPartitioner(new TestPartitionerInt())
				.reduceGroup(new IdentityGroupReducerCombinable<Pojo2>())
				.output(new DiscardingOutputFormat<Pojo2>());

		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);

		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();

		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例27
@Test
public void testCustomPartitioningTupleGroupReduceSorted() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		DataSet<Pojo3> data = env.fromElements(new Pojo3())
				.rebalance().setParallelism(4);

		data.groupBy("a").withPartitioner(new TestPartitionerInt())
				.sortGroup("b", Order.ASCENDING)
				.reduceGroup(new IdentityGroupReducerCombinable<Pojo3>())
				.output(new DiscardingOutputFormat<Pojo3>());

		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);

		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();

		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例28
@Test
public void testCustomPartitioningTupleGroupReduceSorted2() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		DataSet<Pojo4> data = env.fromElements(new Pojo4())
				.rebalance().setParallelism(4);

		data.groupBy("a").withPartitioner(new TestPartitionerInt())
				.sortGroup("b", Order.ASCENDING)
				.sortGroup("c", Order.DESCENDING)
				.reduceGroup(new IdentityGroupReducerCombinable<Pojo4>())
				.output(new DiscardingOutputFormat<Pojo4>());

		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);

		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();

		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例29
@Test
public void testCustomPartitioningKeySelectorGroupReduce() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
				.rebalance().setParallelism(4);
		
		data.groupBy(new TestKeySelector<Tuple2<Integer,Integer>>())
			.withPartitioner(new TestPartitionerInt())
				.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
			.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
		
		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例30
@Test
public void testCustomPartitioningKeySelectorGroupReduceSorted() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
				.rebalance().setParallelism(4);
		
		data.groupBy(new TestKeySelector<Tuple3<Integer,Integer,Integer>>())
			.withPartitioner(new TestPartitionerInt())
			.sortGroup(new TestKeySelector<Tuple3<Integer, Integer, Integer>>(), Order.ASCENDING)
			.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
			.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
		
		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}