Java源码示例:org.apache.flink.api.java.operators.SortedGrouping
示例1
private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(SortedGrouping<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) {
return op1
.reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(info.parallelism).name("PythonGroupReducePreStep")
.mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type))
.setParallelism(info.parallelism).name(info.name);
}
示例2
private DataSet<Tuple> translateGroupBy(DataSet<Tuple> input, FlowNode node, int dop) {
GroupBy groupBy = (GroupBy) node.getSourceElements().iterator().next();
Scope outScope = getOutScope(node);
List<Scope> inScopes = getInputScopes(node, groupBy);
Fields outFields;
if(outScope.isEvery()) {
outFields = outScope.getOutGroupingFields();
}
else {
outFields = outScope.getOutValuesFields();
}
registerKryoTypes(outFields);
// get input scope
Scope inScope = inScopes.get(0);
// get grouping keys
Fields groupKeyFields = groupBy.getKeySelectors().get(inScope.getName());
// get group sorting keys
Fields sortKeyFields = groupBy.getSortingSelectors().get(inScope.getName());
String[] groupKeys = registerKeyFields(input, groupKeyFields);
String[] sortKeys = null;
if (sortKeyFields != null) {
sortKeys = registerKeyFields(input, sortKeyFields);
}
Order sortOrder = groupBy.isSortReversed() ? Order.DESCENDING : Order.ASCENDING;
if(sortOrder == Order.DESCENDING) {
// translate groupBy with inverse sort order
return translateInverseSortedGroupBy(input, node, dop, groupKeys, sortKeys, outFields);
}
else if(groupKeys == null || groupKeys.length == 0) {
// translate key-less (global) groupBy
return translateGlobalGroupBy(input, node, dop, sortKeys, sortOrder, outFields);
}
else {
UnsortedGrouping<Tuple> grouping = input
.groupBy(groupKeys);
if(sortKeys != null && sortKeys.length > 0) {
// translate groupBy with group sorting
SortedGrouping<Tuple> sortedGrouping = grouping
.sortGroup(sortKeys[0], Order.ASCENDING);
for(int i=1; i<sortKeys.length; i++) {
sortedGrouping = sortedGrouping
.sortGroup(sortKeys[i], Order.DESCENDING);
}
return sortedGrouping
.reduceGroup(new GroupByReducer(node))
.returns(new TupleTypeInfo(outFields))
.withParameters(this.getFlinkNodeConfig(node))
.setParallelism(dop)
.name("reduce-" + node.getID());
}
else {
// translate groupBy without group sorting
return grouping
.reduceGroup(new GroupByReducer(node))
.returns(new TupleTypeInfo(outFields))
.withParameters(this.getFlinkNodeConfig(node))
.setParallelism(dop)
.name("reduce-" + node.getID());
}
}
}
示例3
/**
* Adds the given {@link SortedGrouping} to this cache for the given ID.
*
* @param id Set ID
* @param set SortedGrouping to add
* @param <S> SortedGrouping class
*/
public <S extends SortedGrouping<?>> void add(int id, S set) {
cacheSetType(id, SetType.SORTED_GROUPING);
sortedGroupings.put(id, set);
}
示例4
/**
* Returns the cached {@link SortedGrouping} for the given ID.
*
* @param id Set ID
* @param <T> SortedGrouping type
* @return Cached SortedGrouping
* @throws IllegalStateException if the cached set is not a SortedGrouping
*/
@SuppressWarnings("unchecked")
public <T> SortedGrouping<T> getSortedGrouping(int id) {
return verifyType(id, sortedGroupings.get(id), SetType.SORTED_GROUPING);
}