Java源码示例:org.apache.flink.optimizer.testfunctions.IdentityMapper

示例1
@Test
public void testBranchesOnlyInBCVariables1() {
	try{
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(100);

		DataSet<Long> input = env.generateSequence(1, 10);
		DataSet<Long> bc_input = env.generateSequence(1, 10);
		
		input
			.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name1")
			.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name2")
			.output(new DiscardingOutputFormat<Long>());
		
		Plan plan = env.createProgramPlan();
		compileNoStats(plan);
	}
	catch(Exception e){
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例2
/**
 * Source -> Map -> Reduce -> Cross -> Reduce -> Cross -> Reduce ->
 * |--------------------------/                  /
 * |--------------------------------------------/
 * 
 * First cross has SameKeyFirst output contract
 */
@Test
public void testTicket158() {
	// construct the plan
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(DEFAULT_PARALLELISM);
	DataSet<Long> set1 = env.generateSequence(0,1);

	set1.map(new IdentityMapper<Long>()).name("Map1")
			.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1")
			.cross(set1).with(new IdentityCrosser<Long>()).withForwardedFieldsFirst("*").name("Cross1")
			.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce2")
			.cross(set1).with(new IdentityCrosser<Long>()).name("Cross2")
			.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce3")
			.output(new DiscardingOutputFormat<Long>()).name("Sink");

	Plan plan = env.createProgramPlan();
	OptimizedPlan oPlan = compileNoStats(plan);

	JobGraphGenerator jobGen = new JobGraphGenerator();
	jobGen.compileJobGraph(oPlan);
}
 
示例3
@Test
public void testBranchAfterIteration() {
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(DEFAULT_PARALLELISM);
	DataSet<Long> sourceA = env.generateSequence(0,1);

	IterativeDataSet<Long> loopHead = sourceA.iterate(10);
	DataSet<Long> loopTail = loopHead.map(new IdentityMapper<Long>()).name("Mapper");
	DataSet<Long> loopRes = loopHead.closeWith(loopTail);

	loopRes.output(new DiscardingOutputFormat<Long>());
	loopRes.map(new IdentityMapper<Long>())
			.output(new DiscardingOutputFormat<Long>());

	Plan plan = env.createProgramPlan();

	try {
		compileNoStats(plan);
	}
	catch (Exception e) {
		e.printStackTrace();
		Assert.fail(e.getMessage());
	}
}
 
示例4
/**
 * Source -> Map -> Reduce -> Cross -> Reduce -> Cross -> Reduce ->
 * |--------------------------/                  /
 * |--------------------------------------------/
 * 
 * First cross has SameKeyFirst output contract
 */
@Test
public void testTicket158() {
	// construct the plan
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(DEFAULT_PARALLELISM);
	DataSet<Long> set1 = env.generateSequence(0,1);

	set1.map(new IdentityMapper<Long>()).name("Map1")
			.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1")
			.cross(set1).with(new IdentityCrosser<Long>()).withForwardedFieldsFirst("*").name("Cross1")
			.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce2")
			.cross(set1).with(new IdentityCrosser<Long>()).name("Cross2")
			.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce3")
			.output(new DiscardingOutputFormat<Long>()).name("Sink");

	Plan plan = env.createProgramPlan();
	OptimizedPlan oPlan = compileNoStats(plan);

	JobGraphGenerator jobGen = new JobGraphGenerator();
	jobGen.compileJobGraph(oPlan);
}
 
示例5
@Test
public void testBranchAfterIteration() {
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(DEFAULT_PARALLELISM);
	DataSet<Long> sourceA = env.generateSequence(0,1);

	IterativeDataSet<Long> loopHead = sourceA.iterate(10);
	DataSet<Long> loopTail = loopHead.map(new IdentityMapper<Long>()).name("Mapper");
	DataSet<Long> loopRes = loopHead.closeWith(loopTail);

	loopRes.output(new DiscardingOutputFormat<Long>());
	loopRes.map(new IdentityMapper<Long>())
			.output(new DiscardingOutputFormat<Long>());

	Plan plan = env.createProgramPlan();

	try {
		compileNoStats(plan);
	}
	catch (Exception e) {
		e.printStackTrace();
		Assert.fail(e.getMessage());
	}
}
 
示例6
@Test
public void testNoBreakerForIndependentVariable() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<String> source1 = env.fromElements("test");
		DataSet<String> source2 = env.fromElements("test");
		
		source1.map(new IdentityMapper<String>()).withBroadcastSet(source2, "some name")
				.output(new DiscardingOutputFormat<String>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
		
		assertEquals(TempMode.NONE, mapper.getInput().getTempMode());
		assertEquals(TempMode.NONE, mapper.getBroadcastInputs().get(0).getTempMode());
		
		assertEquals(DataExchangeMode.PIPELINED, mapper.getInput().getDataExchangeMode());
		assertEquals(DataExchangeMode.PIPELINED, mapper.getBroadcastInputs().get(0).getDataExchangeMode());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例7
@Test
public void testBreakerForDependentVariable() {
		try {
			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
			
			DataSet<String> source1 = env.fromElements("test");
			
			source1.map(new IdentityMapper<String>()).map(new IdentityMapper<String>()).withBroadcastSet(source1, "some name")
					.output(new DiscardingOutputFormat<String>());
			
			Plan p = env.createProgramPlan();
			OptimizedPlan op = compileNoStats(p);
			
			SinkPlanNode sink = op.getDataSinks().iterator().next();
			SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
			SingleInputPlanNode beforeMapper = (SingleInputPlanNode) mapper.getInput().getSource();

			assertEquals(TempMode.NONE, mapper.getInput().getTempMode());
			assertEquals(TempMode.NONE, beforeMapper.getInput().getTempMode());
			assertEquals(TempMode.NONE, mapper.getBroadcastInputs().get(0).getTempMode());

			assertEquals(DataExchangeMode.PIPELINED, mapper.getInput().getDataExchangeMode());
			assertEquals(DataExchangeMode.BATCH, beforeMapper.getInput().getDataExchangeMode());
			assertEquals(DataExchangeMode.BATCH, mapper.getBroadcastInputs().get(0).getDataExchangeMode());
		}
		catch (Exception e) {
			e.printStackTrace();
			fail(e.getMessage());
		}
}
 
示例8
@Test
public void testDistinctDestroysPartitioningOfNonDistinctFields() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(4);
		
		@SuppressWarnings("unchecked")
		DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L), new Tuple2<Long, Long>(1L, 1L))
				.map(new IdentityMapper<Tuple2<Long,Long>>()).setParallelism(4);
		
		data.distinct(1)
			.groupBy(0)
			.sum(1)
			.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
		SingleInputPlanNode distinctReducer = (SingleInputPlanNode) combiner.getInput().getSource();
		
		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		
		// reducer must repartition, because it works on a different field
		assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());

		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
		
		// distinct reducer is partitioned
		assertEquals(ShipStrategyType.PARTITION_HASH, distinctReducer.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例9
/**
 * <pre>
 *             +---------Iteration-------+
 *             |                         |
 *    /--map--< >----\                   |
 *   /         |      \         /-------< >---sink
 * src-map     |     join------/         |
 *   \         |      /                  |
 *    \        +-----/-------------------+
 *     \            /
 *      \--reduce--/
 * </pre>
 */
@Test
public void testIterationWithStaticInput() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(100);

		DataSet<Long> source = env.generateSequence(1, 1000000);

		DataSet<Long> mapped = source.map(new IdentityMapper<Long>());

		DataSet<Long> reduced = source.groupBy(new IdentityKeyExtractor<Long>()).reduce(new SelectOneReducer<Long>());

		IterativeDataSet<Long> iteration = mapped.iterate(10);
		iteration.closeWith(
				iteration.join(reduced)
						.where(new IdentityKeyExtractor<Long>())
						.equalTo(new IdentityKeyExtractor<Long>())
						.with(new DummyFlatJoinFunction<Long>()))
				.output(new DiscardingOutputFormat<Long>());

		compileNoStats(env.createProgramPlan());
	}
	catch(Exception e){
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例10
@Test
public void testWorksetIterationPipelineBreakerPlacement() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(8);
		
		// the workset (input two of the delta iteration) is the same as what is consumed be the successive join
		DataSet<Tuple2<Long, Long>> initialWorkset = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
		
		DataSet<Tuple2<Long, Long>> initialSolutionSet = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
		
		// trivial iteration, since we are interested in the inputs to the iteration
		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = initialSolutionSet.iterateDelta(initialWorkset, 100, 0);
		
		DataSet<Tuple2<Long, Long>> next = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>());
		
		DataSet<Tuple2<Long, Long>> result = iteration.closeWith(next, next);
		
		initialWorkset
			.join(result, JoinHint.REPARTITION_HASH_FIRST)
			.where(0).equalTo(0)
			.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
		
		Plan p = env.createProgramPlan();
		compileNoStats(p);
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例11
private Plan getTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) {

		// construct the plan
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(DEFAULT_PARALLELISM);
		DataSet<Tuple2<Long, Long>> solSetInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Solution Set");
		DataSet<Tuple2<Long, Long>> workSetInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Workset");
		DataSet<Tuple2<Long, Long>> invariantInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Invariant Input");

		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> deltaIt = solSetInput.iterateDelta(workSetInput, 100, 0).name(ITERATION_NAME);

		DataSet<Tuple2<Long, Long>> join1 = deltaIt.getWorkset().join(invariantInput).where(0).equalTo(0)
				.with(new IdentityJoiner<Tuple2<Long, Long>>())
				.withForwardedFieldsFirst("*").name(JOIN_WITH_INVARIANT_NAME);

		DataSet<Tuple2<Long, Long>> join2 = deltaIt.getSolutionSet().join(join1).where(0).equalTo(0)
				.with(new IdentityJoiner<Tuple2<Long, Long>>())
				.name(JOIN_WITH_SOLUTION_SET);
		if(joinPreservesSolutionSet) {
			((JoinOperator<?,?,?>)join2).withForwardedFieldsFirst("*");
		}

		DataSet<Tuple2<Long, Long>> nextWorkset = join2.groupBy(0).reduceGroup(new IdentityGroupReducer<Tuple2<Long, Long>>())
				.withForwardedFields("*").name(NEXT_WORKSET_REDUCER_NAME);

		if(mapBeforeSolutionDelta) {

			DataSet<Tuple2<Long, Long>> mapper = join2.map(new IdentityMapper<Tuple2<Long, Long>>())
					.withForwardedFields("*").name(SOLUTION_DELTA_MAPPER_NAME);

			deltaIt.closeWith(mapper, nextWorkset)
					.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
		}
		else {
			deltaIt.closeWith(join2, nextWorkset)
					.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
		}

		return env.createProgramPlan();
	}
 
示例12
@Test
public void testBulkIterationInClosure() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Long> data1 = env.generateSequence(1, 100);
		DataSet<Long> data2 = env.generateSequence(1, 100);
		
		IterativeDataSet<Long> firstIteration = data1.iterate(100);
		
		DataSet<Long> firstResult = firstIteration.closeWith(firstIteration.map(new IdentityMapper<Long>()));
		
		
		IterativeDataSet<Long> mainIteration = data2.map(new IdentityMapper<Long>()).iterate(100);
		
		DataSet<Long> joined = mainIteration.join(firstResult)
				.where(new IdentityKeyExtractor<Long>()).equalTo(new IdentityKeyExtractor<Long>())
				.with(new DummyFlatJoinFunction<Long>());
		
		DataSet<Long> mainResult = mainIteration.closeWith(joined);
		
		mainResult.output(new DiscardingOutputFormat<Long>());
		
		Plan p = env.createProgramPlan();
		
		// optimizer should be able to translate this
		OptimizedPlan op = compileNoStats(p);
		
		// job graph generator should be able to translate this
		new JobGraphGenerator().compileJobGraph(op);
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例13
/**
 * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
 * 
 * Increases parallelism between 1st reduce and 2nd map, so the hash partitioning from 1st reduce is not reusable.
 * Expected to re-establish partitioning between reduce and map, via hash, because random is a full network
 * transit as well.
 */
@Test
public void checkPropertyHandlingWithIncreasingGlobalParallelism1() {
	final int p = DEFAULT_PARALLELISM;

	// construct the plan
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(p);
	DataSet<Long> set1 = env.generateSequence(0,1).setParallelism(p);

	set1.map(new IdentityMapper<Long>())
				.withForwardedFields("*").setParallelism(p).name("Map1")
			.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
				.withForwardedFields("*").setParallelism(p).name("Reduce1")
			.map(new IdentityMapper<Long>())
				.withForwardedFields("*").setParallelism(p * 2).name("Map2")
			.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
				.withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
			.output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");

	Plan plan = env.createProgramPlan();
	// submit the plan to the compiler
	OptimizedPlan oPlan = compileNoStats(plan);
	
	// check the optimized Plan
	// when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
	// because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
	// mapper respectively reducer
	SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
	SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
	SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
	
	ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
	ShipStrategyType redIn = red2Node.getInput().getShipStrategy();
	
	Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, mapIn);
	Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, redIn);
}
 
示例14
@Test
public void testCostComputationWithMultipleDataSinks() {
	final int SINKS = 5;

	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(DEFAULT_PARALLELISM);

		DataSet<Long> source = env.generateSequence(1, 10000);

		DataSet<Long> mappedA = source.map(new IdentityMapper<Long>());
		DataSet<Long> mappedC = source.map(new IdentityMapper<Long>());

		for (int sink = 0; sink < SINKS; sink++) {
			mappedA.output(new DiscardingOutputFormat<Long>());
			mappedC.output(new DiscardingOutputFormat<Long>());
		}

		Plan plan = env.createProgramPlan("Plans With Multiple Data Sinks");
		OptimizedPlan oPlan = compileNoStats(plan);

		new JobGraphGenerator().compileJobGraph(oPlan);
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例15
@Test
public void testWorksetIterationPipelineBreakerPlacement() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(8);
		
		// the workset (input two of the delta iteration) is the same as what is consumed be the successive join
		DataSet<Tuple2<Long, Long>> initialWorkset = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
		
		DataSet<Tuple2<Long, Long>> initialSolutionSet = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
		
		// trivial iteration, since we are interested in the inputs to the iteration
		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = initialSolutionSet.iterateDelta(initialWorkset, 100, 0);
		
		DataSet<Tuple2<Long, Long>> next = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>());
		
		DataSet<Tuple2<Long, Long>> result = iteration.closeWith(next, next);
		
		initialWorkset
			.join(result, JoinHint.REPARTITION_HASH_FIRST)
			.where(0).equalTo(0)
			.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
		
		Plan p = env.createProgramPlan();
		compileNoStats(p);
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例16
/**
 * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
 * 
 * Increases parallelism between 1st reduce and 2nd map, such that more tasks are on one instance.
 * Expected to re-establish partitioning between map and reduce via a local hash.
 */
@Test
public void checkPropertyHandlingWithIncreasingLocalParallelism() {
	final int p = DEFAULT_PARALLELISM * 2;

	// construct the plan
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(p);
	DataSet<Long> set1 = env.generateSequence(0,1).setParallelism(p);

	set1.map(new IdentityMapper<Long>())
			.withForwardedFields("*").setParallelism(p).name("Map1")
			.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
			.withForwardedFields("*").setParallelism(p).name("Reduce1")
			.map(new IdentityMapper<Long>())
			.withForwardedFields("*").setParallelism(p * 2).name("Map2")
			.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
			.withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
			.output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");

	Plan plan = env.createProgramPlan();
	// submit the plan to the compiler
	OptimizedPlan oPlan = compileNoStats(plan);
	
	// check the optimized Plan
	// when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
	// because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
	// mapper respectively reducer
	SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
	SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
	SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
	
	ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
	ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
	
	Assert.assertTrue("Invalid ship strategy for an operator.", 
			(ShipStrategyType.PARTITION_RANDOM ==  mapIn && ShipStrategyType.PARTITION_HASH == reduceIn) || 
			(ShipStrategyType.PARTITION_HASH == mapIn && ShipStrategyType.FORWARD == reduceIn));
}
 
示例17
/**
 * <pre>
 *             +---------Iteration-------+
 *             |                         |
 *    /--map--< >----\                   |
 *   /         |      \         /-------< >---sink
 * src-map     |     join------/         |
 *   \         |      /                  |
 *    \        +-----/-------------------+
 *     \            /
 *      \--reduce--/
 * </pre>
 */
@Test
public void testIterationWithStaticInput() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(100);

		DataSet<Long> source = env.generateSequence(1, 1000000);

		DataSet<Long> mapped = source.map(new IdentityMapper<Long>());

		DataSet<Long> reduced = source.groupBy(new IdentityKeyExtractor<Long>()).reduce(new SelectOneReducer<Long>());

		IterativeDataSet<Long> iteration = mapped.iterate(10);
		iteration.closeWith(
				iteration.join(reduced)
						.where(new IdentityKeyExtractor<Long>())
						.equalTo(new IdentityKeyExtractor<Long>())
						.with(new DummyFlatJoinFunction<Long>()))
				.output(new DiscardingOutputFormat<Long>());

		compileNoStats(env.createProgramPlan());
	}
	catch(Exception e){
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例18
@Test
public void testMultipleIterations() {
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(100);
	
	DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
	
	DataSet<String> reduced = input
			.map(new IdentityMapper<String>())
			.reduceGroup(new Top1GroupReducer<String>());
		
	IterativeDataSet<String> iteration1 = input.iterate(100);
	IterativeDataSet<String> iteration2 = input.iterate(20);
	IterativeDataSet<String> iteration3 = input.iterate(17);
	
	iteration1.closeWith(iteration1.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "bc1"))
			.output(new DiscardingOutputFormat<String>());
	iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>()).withBroadcastSet(reduced, "bc2"))
			.output(new DiscardingOutputFormat<String>());
	iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>()).withBroadcastSet(reduced, "bc3"))
			.output(new DiscardingOutputFormat<String>());
	
	Plan plan = env.createProgramPlan();
	
	try{
		compileNoStats(plan);
	}catch(Exception e){
		e.printStackTrace();
		Assert.fail(e.getMessage());
	}
}
 
示例19
@Test
public void testMultipleIterationsWithClosueBCVars() {
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(100);

	DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
		
	IterativeDataSet<String> iteration1 = input.iterate(100);
	IterativeDataSet<String> iteration2 = input.iterate(20);
	IterativeDataSet<String> iteration3 = input.iterate(17);
	
	
	iteration1.closeWith(iteration1.map(new IdentityMapper<String>()))
			.output(new DiscardingOutputFormat<String>());
	iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>()))
			.output(new DiscardingOutputFormat<String>());
	iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>()))
			.output(new DiscardingOutputFormat<String>());
	
	Plan plan = env.createProgramPlan();
	
	try{
		compileNoStats(plan);
	}catch(Exception e){
		e.printStackTrace();
		Assert.fail(e.getMessage());
	}
}
 
示例20
/**
 * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
 * 
 * Increases parallelism between 1st reduce and 2nd map, so the hash partitioning from 1st reduce is not reusable.
 * Expected to re-establish partitioning between reduce and map, via hash, because random is a full network
 * transit as well.
 */
@Test
public void checkPropertyHandlingWithIncreasingGlobalParallelism1() {
	final int p = DEFAULT_PARALLELISM;

	// construct the plan
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(p);
	DataSet<Long> set1 = env.generateSequence(0,1).setParallelism(p);

	set1.map(new IdentityMapper<Long>())
				.withForwardedFields("*").setParallelism(p).name("Map1")
			.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
				.withForwardedFields("*").setParallelism(p).name("Reduce1")
			.map(new IdentityMapper<Long>())
				.withForwardedFields("*").setParallelism(p * 2).name("Map2")
			.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
				.withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
			.output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");

	Plan plan = env.createProgramPlan();
	// submit the plan to the compiler
	OptimizedPlan oPlan = compileNoStats(plan);
	
	// check the optimized Plan
	// when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
	// because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
	// mapper respectively reducer
	SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
	SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
	SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
	
	ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
	ShipStrategyType redIn = red2Node.getInput().getShipStrategy();
	
	Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, mapIn);
	Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, redIn);
}
 
示例21
@Test
public void testBranchesOnlyInBCVariables2() {
	try{
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(100);

		DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 10).map(new Duplicator<Long>()).name("proper input");
		
		DataSet<Long> bc_input1 = env.generateSequence(1, 10).name("BC input 1");
		DataSet<Long> bc_input2 = env.generateSequence(1, 10).name("BC input 1");
		
		DataSet<Tuple2<Long, Long>> joinInput1 =
				input.map(new IdentityMapper<Tuple2<Long,Long>>())
					.withBroadcastSet(bc_input1.map(new IdentityMapper<Long>()), "bc1")
					.withBroadcastSet(bc_input2, "bc2");
		
		DataSet<Tuple2<Long, Long>> joinInput2 =
				input.map(new IdentityMapper<Tuple2<Long,Long>>())
					.withBroadcastSet(bc_input1, "bc1")
					.withBroadcastSet(bc_input2, "bc2");
		
		DataSet<Tuple2<Long, Long>> joinResult = joinInput1
			.join(joinInput2, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(1)
			.with(new DummyFlatJoinFunction<Tuple2<Long,Long>>());
		
		input
			.map(new IdentityMapper<Tuple2<Long,Long>>())
				.withBroadcastSet(bc_input1, "bc1")
			.union(joinResult)
			.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
		
		Plan plan = env.createProgramPlan();
		compileNoStats(plan);
	}
	catch(Exception e){
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例22
@Test
public void testPipelineBreakerWithBroadcastVariable() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.getConfig().setExecutionMode(ExecutionMode.PIPELINED);
		env.setParallelism(64);
		
		DataSet<Long> source = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
		
		DataSet<Long> result = source.map(new IdentityMapper<Long>())
									.map(new IdentityMapper<Long>())
										.withBroadcastSet(source, "bc");
		
		result.output(new DiscardingOutputFormat<Long>());
		
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode mapperInput = (SingleInputPlanNode) mapper.getInput().getSource();
		
		assertEquals(TempMode.NONE, mapper.getInput().getTempMode());
		assertEquals(TempMode.NONE, mapper.getBroadcastInputs().get(0).getTempMode());
		
		assertEquals(DataExchangeMode.BATCH, mapperInput.getInput().getDataExchangeMode());
		assertEquals(DataExchangeMode.BATCH, mapper.getBroadcastInputs().get(0).getDataExchangeMode());
		
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例23
@Test
public void testDistinctDestroysPartitioningOfNonDistinctFields() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(4);
		
		@SuppressWarnings("unchecked")
		DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L), new Tuple2<Long, Long>(1L, 1L))
				.map(new IdentityMapper<Tuple2<Long,Long>>()).setParallelism(4);
		
		data.distinct(1)
			.groupBy(0)
			.sum(1)
			.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
		SingleInputPlanNode distinctReducer = (SingleInputPlanNode) combiner.getInput().getSource();
		
		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		
		// reducer must repartition, because it works on a different field
		assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());

		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
		
		// distinct reducer is partitioned
		assertEquals(ShipStrategyType.PARTITION_HASH, distinctReducer.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例24
@Test
public void testNoBreakerForIndependentVariable() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<String> source1 = env.fromElements("test");
		DataSet<String> source2 = env.fromElements("test");
		
		source1.map(new IdentityMapper<String>()).withBroadcastSet(source2, "some name")
				.output(new DiscardingOutputFormat<String>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
		
		assertEquals(TempMode.NONE, mapper.getInput().getTempMode());
		assertEquals(TempMode.NONE, mapper.getBroadcastInputs().get(0).getTempMode());
		
		assertEquals(DataExchangeMode.PIPELINED, mapper.getInput().getDataExchangeMode());
		assertEquals(DataExchangeMode.PIPELINED, mapper.getBroadcastInputs().get(0).getDataExchangeMode());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例25
@Test
public void testBreakerForDependentVariable() {
		try {
			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
			
			DataSet<String> source1 = env.fromElements("test");
			
			source1.map(new IdentityMapper<String>()).map(new IdentityMapper<String>()).withBroadcastSet(source1, "some name")
					.output(new DiscardingOutputFormat<String>());
			
			Plan p = env.createProgramPlan();
			OptimizedPlan op = compileNoStats(p);
			
			SinkPlanNode sink = op.getDataSinks().iterator().next();
			SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
			SingleInputPlanNode beforeMapper = (SingleInputPlanNode) mapper.getInput().getSource();

			assertEquals(TempMode.NONE, mapper.getInput().getTempMode());
			assertEquals(TempMode.NONE, beforeMapper.getInput().getTempMode());
			assertEquals(TempMode.NONE, mapper.getBroadcastInputs().get(0).getTempMode());

			assertEquals(DataExchangeMode.PIPELINED, mapper.getInput().getDataExchangeMode());
			assertEquals(DataExchangeMode.BATCH, beforeMapper.getInput().getDataExchangeMode());
			assertEquals(DataExchangeMode.BATCH, mapper.getBroadcastInputs().get(0).getDataExchangeMode());
		}
		catch (Exception e) {
			e.printStackTrace();
			fail(e.getMessage());
		}
}
 
示例26
@Test
public void testDistinctPreservesPartitioningOfDistinctFields() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(4);
		
		@SuppressWarnings("unchecked")
		DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L), new Tuple2<Long, Long>(1L, 1L))
				.map(new IdentityMapper<Tuple2<Long,Long>>()).setParallelism(4);
		
		data.distinct(0)
			.groupBy(0)
			.sum(1)
			.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode distinctReducer = (SingleInputPlanNode) reducer.getInput().getSource();
		
		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		
		// reducer can be forward, reuses partitioning from distinct
		assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy());
		
		// distinct reducer is partitioned
		assertEquals(ShipStrategyType.PARTITION_HASH, distinctReducer.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例27
@Test
public void testDistinctDestroysPartitioningOfNonDistinctFields() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(4);
		
		@SuppressWarnings("unchecked")
		DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L), new Tuple2<Long, Long>(1L, 1L))
				.map(new IdentityMapper<Tuple2<Long,Long>>()).setParallelism(4);
		
		data.distinct(1)
			.groupBy(0)
			.sum(1)
			.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
		SingleInputPlanNode distinctReducer = (SingleInputPlanNode) combiner.getInput().getSource();
		
		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
		
		// reducer must repartition, because it works on a different field
		assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());

		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
		
		// distinct reducer is partitioned
		assertEquals(ShipStrategyType.PARTITION_HASH, distinctReducer.getInput().getShipStrategy());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例28
@Test
public void testGeneratingJobGraphWithUnconsumedResultPartition() {

	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<>(1L, 2L))
		.setParallelism(1);

	DataSet<Tuple2<Long, Long>> ds = input.map(new IdentityMapper<>())
		.setParallelism(3);

	AbstractID intermediateDataSetID = new AbstractID();

	// this output branch will be excluded.
	ds.output(BlockingShuffleOutputFormat.createOutputFormat(intermediateDataSetID))
		.setParallelism(1);

	// this is the normal output branch.
	ds.output(new DiscardingOutputFormat<>())
		.setParallelism(1);

	JobGraph jobGraph = compileJob(env);

	Assert.assertEquals(3, jobGraph.getVerticesSortedTopologicallyFromSources().size());

	JobVertex mapVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
	Assert.assertThat(mapVertex, Matchers.instanceOf(JobVertex.class));

	// there are 2 output result with one of them is ResultPartitionType.BLOCKING_PERSISTENT
	Assert.assertEquals(2, mapVertex.getProducedDataSets().size());

	Assert.assertTrue(mapVertex.getProducedDataSets().stream()
		.anyMatch(dataSet -> dataSet.getId().equals(new IntermediateDataSetID(intermediateDataSetID)) &&
			dataSet.getResultType() == ResultPartitionType.BLOCKING_PERSISTENT));
}
 
示例29
@Test
public void testSolutionSetDeltaDependsOnBroadcastVariable() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple2<Long, Long>> source =
					env.generateSequence(1, 1000).map(new DuplicateValueScalar<Long>());
		
		DataSet<Tuple2<Long, Long>> invariantInput =
				env.generateSequence(1, 1000).map(new DuplicateValueScalar<Long>());
		
		// iteration from here
		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = source.iterateDelta(source, 1000, 1);
		
		DataSet<Tuple2<Long, Long>> result =
			invariantInput
				.map(new IdentityMapper<Tuple2<Long, Long>>()).withBroadcastSet(iter.getWorkset(), "bc data")
				.join(iter.getSolutionSet()).where(0).equalTo(1).projectFirst(1).projectSecond(1);
		
		iter.closeWith(result.map(new IdentityMapper<Tuple2<Long,Long>>()), result)
				.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
		
		OptimizedPlan p = compileNoStats(env.createProgramPlan());
		
		// check that the JSON generator accepts this plan
		new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(p);
		
		// check that the JobGraphGenerator accepts the plan
		new JobGraphGenerator().compileJobGraph(p);
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例30
@Test
public void testWorksetIterationPipelineBreakerPlacement() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(8);
		
		// the workset (input two of the delta iteration) is the same as what is consumed be the successive join
		DataSet<Tuple2<Long, Long>> initialWorkset = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
		
		DataSet<Tuple2<Long, Long>> initialSolutionSet = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
		
		// trivial iteration, since we are interested in the inputs to the iteration
		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = initialSolutionSet.iterateDelta(initialWorkset, 100, 0);
		
		DataSet<Tuple2<Long, Long>> next = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>());
		
		DataSet<Tuple2<Long, Long>> result = iteration.closeWith(next, next);
		
		initialWorkset
			.join(result, JoinHint.REPARTITION_HASH_FIRST)
			.where(0).equalTo(0)
			.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
		
		Plan p = env.createProgramPlan();
		compileNoStats(p);
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}