Java源码示例:org.apache.flink.graph.gsa.GatherSumApplyIteration

示例1
/**
 * Runs a Gather-Sum-Apply iteration on the graph with configuration options.
 *
 * @param gatherFunction the gather function collects information about adjacent vertices and edges
 * @param sumFunction the sum function aggregates the gathered information
 * @param applyFunction the apply function updates the vertex values with the aggregates
 * @param maximumNumberOfIterations maximum number of iterations to perform
 * @param parameters the iteration configuration parameters
 * @param <M> the intermediate type used between gather, sum and apply
 *
 * @return the updated Graph after the gather-sum-apply iteration has converged or
 * after maximumNumberOfIterations.
 */
public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
		org.apache.flink.graph.gsa.GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
		ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations,
		GSAConfiguration parameters) {

	GatherSumApplyIteration<K, VV, EV, M> iteration = GatherSumApplyIteration.withEdges(
			edges, gatherFunction, sumFunction, applyFunction, maximumNumberOfIterations);

	iteration.configure(parameters);

	DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);

	return new Graph<>(newVertices, this.edges, this.context);
}
 
示例2
@Test
public void testIterationConfiguration() throws Exception {
	/*
	 * Test name, parallelism and solutionSetUnmanaged parameters
	 */
	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	GatherSumApplyIteration<Long, Long, Long, Long> iteration = GatherSumApplyIteration
		.withEdges(TestGraphUtils.getLongLongEdgeData(env), new DummyGather(),
			new DummySum(), new DummyApply(), 10);

	GSAConfiguration parameters = new GSAConfiguration();
	parameters.setName("gelly iteration");
	parameters.setParallelism(2);
	parameters.setSolutionSetUnmanagedMemory(true);

	iteration.configure(parameters);

	Assert.assertEquals("gelly iteration", iteration.getIterationConfiguration().getName(""));
	Assert.assertEquals(2, iteration.getIterationConfiguration().getParallelism());
	Assert.assertEquals(true, iteration.getIterationConfiguration().isSolutionSetUnmanagedMemory());

	DataSet<Vertex<Long, Long>> data = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration);
	List<Vertex<Long, Long>> result = data.collect();

	expectedResult = "1,11\n" +
		"2,12\n" +
		"3,13\n" +
		"4,14\n" +
		"5,15";

	compareResultAsTuples(result, expectedResult);
}
 
示例3
/**
 * Runs a Gather-Sum-Apply iteration on the graph with configuration options.
 *
 * @param gatherFunction the gather function collects information about adjacent vertices and edges
 * @param sumFunction the sum function aggregates the gathered information
 * @param applyFunction the apply function updates the vertex values with the aggregates
 * @param maximumNumberOfIterations maximum number of iterations to perform
 * @param parameters the iteration configuration parameters
 * @param <M> the intermediate type used between gather, sum and apply
 *
 * @return the updated Graph after the gather-sum-apply iteration has converged or
 * after maximumNumberOfIterations.
 */
public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
		org.apache.flink.graph.gsa.GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
		ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations,
		GSAConfiguration parameters) {

	GatherSumApplyIteration<K, VV, EV, M> iteration = GatherSumApplyIteration.withEdges(
			edges, gatherFunction, sumFunction, applyFunction, maximumNumberOfIterations);

	iteration.configure(parameters);

	DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);

	return new Graph<>(newVertices, this.edges, this.context);
}
 
示例4
@Test
public void testIterationConfiguration() throws Exception {
	/*
	 * Test name, parallelism and solutionSetUnmanaged parameters
	 */
	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	GatherSumApplyIteration<Long, Long, Long, Long> iteration = GatherSumApplyIteration
		.withEdges(TestGraphUtils.getLongLongEdgeData(env), new DummyGather(),
			new DummySum(), new DummyApply(), 10);

	GSAConfiguration parameters = new GSAConfiguration();
	parameters.setName("gelly iteration");
	parameters.setParallelism(2);
	parameters.setSolutionSetUnmanagedMemory(true);

	iteration.configure(parameters);

	Assert.assertEquals("gelly iteration", iteration.getIterationConfiguration().getName(""));
	Assert.assertEquals(2, iteration.getIterationConfiguration().getParallelism());
	Assert.assertEquals(true, iteration.getIterationConfiguration().isSolutionSetUnmanagedMemory());

	DataSet<Vertex<Long, Long>> data = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration);
	List<Vertex<Long, Long>> result = data.collect();

	expectedResult = "1,11\n" +
		"2,12\n" +
		"3,13\n" +
		"4,14\n" +
		"5,15";

	compareResultAsTuples(result, expectedResult);
}
 
示例5
/**
 * Runs a Gather-Sum-Apply iteration on the graph with configuration options.
 *
 * @param gatherFunction the gather function collects information about adjacent vertices and edges
 * @param sumFunction the sum function aggregates the gathered information
 * @param applyFunction the apply function updates the vertex values with the aggregates
 * @param maximumNumberOfIterations maximum number of iterations to perform
 * @param parameters the iteration configuration parameters
 * @param <M> the intermediate type used between gather, sum and apply
 *
 * @return the updated Graph after the gather-sum-apply iteration has converged or
 * after maximumNumberOfIterations.
 */
public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
		org.apache.flink.graph.gsa.GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
		ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations,
		GSAConfiguration parameters) {

	GatherSumApplyIteration<K, VV, EV, M> iteration = GatherSumApplyIteration.withEdges(
			edges, gatherFunction, sumFunction, applyFunction, maximumNumberOfIterations);

	iteration.configure(parameters);

	DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);

	return new Graph<>(newVertices, this.edges, this.context);
}
 
示例6
@Test
public void testIterationConfiguration() throws Exception {
	/*
	 * Test name, parallelism and solutionSetUnmanaged parameters
	 */
	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	GatherSumApplyIteration<Long, Long, Long, Long> iteration = GatherSumApplyIteration
		.withEdges(TestGraphUtils.getLongLongEdgeData(env), new DummyGather(),
			new DummySum(), new DummyApply(), 10);

	GSAConfiguration parameters = new GSAConfiguration();
	parameters.setName("gelly iteration");
	parameters.setParallelism(2);
	parameters.setSolutionSetUnmanagedMemory(true);

	iteration.configure(parameters);

	Assert.assertEquals("gelly iteration", iteration.getIterationConfiguration().getName(""));
	Assert.assertEquals(2, iteration.getIterationConfiguration().getParallelism());
	Assert.assertEquals(true, iteration.getIterationConfiguration().isSolutionSetUnmanagedMemory());

	DataSet<Vertex<Long, Long>> data = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration);
	List<Vertex<Long, Long>> result = data.collect();

	expectedResult = "1,11\n" +
		"2,12\n" +
		"3,13\n" +
		"4,14\n" +
		"5,15";

	compareResultAsTuples(result, expectedResult);
}