Java源码示例:org.apache.flink.api.java.operators.SortedGrouping

示例1
private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(SortedGrouping<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) {
	return op1
		.reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(info.parallelism).name("PythonGroupReducePreStep")
		.mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type))
		.setParallelism(info.parallelism).name(info.name);
}
 
示例2
private DataSet<Tuple> translateGroupBy(DataSet<Tuple> input, FlowNode node, int dop) {

		GroupBy groupBy = (GroupBy) node.getSourceElements().iterator().next();

		Scope outScope = getOutScope(node);
		List<Scope> inScopes = getInputScopes(node, groupBy);

		Fields outFields;
		if(outScope.isEvery()) {
			outFields = outScope.getOutGroupingFields();
		}
		else {
			outFields = outScope.getOutValuesFields();
		}
		registerKryoTypes(outFields);

		// get input scope
		Scope inScope = inScopes.get(0);

		// get grouping keys
		Fields groupKeyFields = groupBy.getKeySelectors().get(inScope.getName());
		// get group sorting keys
		Fields sortKeyFields = groupBy.getSortingSelectors().get(inScope.getName());

		String[] groupKeys = registerKeyFields(input, groupKeyFields);
		String[] sortKeys = null;
		if (sortKeyFields != null) {
			sortKeys = registerKeyFields(input, sortKeyFields);
		}
		Order sortOrder = groupBy.isSortReversed() ? Order.DESCENDING : Order.ASCENDING;

		if(sortOrder == Order.DESCENDING) {
			// translate groupBy with inverse sort order
			return translateInverseSortedGroupBy(input, node, dop, groupKeys, sortKeys, outFields);
		}
		else if(groupKeys == null || groupKeys.length == 0) {
			// translate key-less (global) groupBy
			return translateGlobalGroupBy(input, node, dop, sortKeys, sortOrder, outFields);
		}
		else {

			UnsortedGrouping<Tuple> grouping = input
					.groupBy(groupKeys);

			if(sortKeys != null && sortKeys.length > 0) {
				// translate groupBy with group sorting

				SortedGrouping<Tuple> sortedGrouping = grouping
						.sortGroup(sortKeys[0], Order.ASCENDING);
				for(int i=1; i<sortKeys.length; i++) {
					sortedGrouping = sortedGrouping
							.sortGroup(sortKeys[i], Order.DESCENDING);
				}

				return sortedGrouping
						.reduceGroup(new GroupByReducer(node))
						.returns(new TupleTypeInfo(outFields))
						.withParameters(this.getFlinkNodeConfig(node))
						.setParallelism(dop)
						.name("reduce-" + node.getID());
			}
			else {
				// translate groupBy without group sorting

				return grouping
						.reduceGroup(new GroupByReducer(node))
						.returns(new TupleTypeInfo(outFields))
						.withParameters(this.getFlinkNodeConfig(node))
						.setParallelism(dop)
						.name("reduce-" + node.getID());
			}
		}

	}
 
示例3
/**
 * Adds the given {@link SortedGrouping} to this cache for the given ID.
 *
 * @param id  Set ID
 * @param set SortedGrouping to add
 * @param <S> SortedGrouping class
 */
public <S extends SortedGrouping<?>> void add(int id, S set) {
	cacheSetType(id, SetType.SORTED_GROUPING);
	sortedGroupings.put(id, set);
}
 
示例4
/**
 * Returns the cached {@link SortedGrouping} for the given ID.
 *
 * @param id  Set ID
 * @param <T> SortedGrouping type
 * @return Cached SortedGrouping
 * @throws IllegalStateException if the cached set is not a SortedGrouping
 */
@SuppressWarnings("unchecked")
public <T> SortedGrouping<T> getSortedGrouping(int id) {
	return verifyType(id, sortedGroupings.get(id), SetType.SORTED_GROUPING);
}