Java源码示例:storm.trident.operation.Aggregator
示例1
public AbstractTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore,
Aggregator aggregator, BatchOutputCollector delegateCollector) {
this.windowTaskId = windowTaskId;
this.windowStore = windowStore;
this.aggregator = aggregator;
this.delegateCollector = delegateCollector;
windowTriggerCountId = WindowTridentProcessor.TRIGGER_COUNT_PREFIX + windowTaskId;
windowManager = new WindowManager<>(new TridentWindowLifeCycleListener());
WindowStrategy<T> windowStrategy = windowConfig.getWindowStrategy();
EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy();
windowManager.setEvictionPolicy(evictionPolicy);
triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy);
windowManager.setTriggerPolicy(triggerPolicy);
}
示例2
@Override
public Stream partitionAggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
projectionValidation(inputFields);
return _topology.addSourcedNode(this,
new ProcessorNode(_topology.getUniqueStreamId(),
_name,
functionFields,
functionFields,
new AggregateProcessor(inputFields, agg)));
}
示例3
private Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator,
Fields functionFields, boolean storeTuplesInStore) {
projectionValidation(inputFields);
windowConfig.validate();
Fields fields = addTriggerField(functionFields);
// when storeTuplesInStore is false then the given windowStoreFactory is only used to store triggers and
// that store is passed to WindowStateUpdater to remove them after committing the batch.
Stream stream = _topology.addSourcedNode(this,
new ProcessorNode(_topology.getUniqueStreamId(),
_name,
fields,
fields,
new WindowTridentProcessor(windowConfig, _topology.getUniqueWindowId(), windowStoreFactory,
inputFields, aggregator, storeTuplesInStore)));
Stream effectiveStream = stream.project(functionFields);
// create StateUpdater with the given windowStoreFactory to remove triggered aggregation results form store
// when they are successfully processed.
StateFactory stateFactory = new WindowsStateFactory();
StateUpdater stateUpdater = new WindowsStateUpdater(windowStoreFactory);
stream.partitionPersist(stateFactory, new Fields(WindowTridentProcessor.TRIGGER_FIELD_NAME), stateUpdater, new Fields());
return effectiveStream;
}
示例4
public GroupedAggregator(Aggregator agg, Fields group, Fields input, int outSize) {
_groupFields = group;
_inFields = input;
_agg = agg;
int[] sizes = new int[2];
sizes[0] = _groupFields.size();
sizes[1] = outSize;
_fact = new ComboList.Factory(sizes);
}
示例5
public ChainedAggregatorImpl(Aggregator[] aggs, Fields[] inputFields, ComboList.Factory fact) {
_aggs = aggs;
_inputFields = inputFields;
_fact = fact;
if (_aggs.length != _inputFields.length) {
throw new IllegalArgumentException("Require input fields for each aggregator");
}
}
示例6
private ChainedFullAggregatorDeclarer aggregate(Fields inputFields, Aggregator agg, Fields functionFields, boolean isCombiner) {
if (isCombiner) {
if (_type == null) {
_type = AggType.FULL_COMBINE;
}
} else {
_type = AggType.FULL;
}
_aggs.add(new AggSpec(inputFields, agg, functionFields));
return this;
}
示例7
@Override
public IAggregatableStream partitionAggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
Aggregator groupedAgg = new GroupedAggregator(agg, _groupFields, inputFields, functionFields.size());
Fields allInFields = TridentUtils.fieldsUnion(_groupFields, inputFields);
Fields allOutFields = TridentUtils.fieldsConcat(_groupFields, functionFields);
Stream s = _stream.partitionAggregate(allInFields, groupedAgg, allOutFields);
return new GroupedStream(s, _groupFields);
}
示例8
public StoreBasedTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, Aggregator aggregator,
BatchOutputCollector delegateCollector, Long maxTuplesCacheSize, Fields inputFields) {
super(windowConfig, windowTaskId, windowStore, aggregator, delegateCollector);
this.maxCachedTuplesSize = maxTuplesCacheSize;
this.inputFields = inputFields;
freshOutputFactory = new TridentTupleView.FreshOutputFactory(inputFields);
windowTupleTaskId = TUPLE_PREFIX + windowTaskId;
}
示例9
public WindowTridentProcessor(WindowConfig windowConfig, String uniqueWindowId, WindowsStoreFactory windowStoreFactory,
Fields inputFields, Aggregator aggregator, boolean storeTuplesInStore) {
this.windowConfig = windowConfig;
this.windowId = uniqueWindowId;
this.windowStoreFactory = windowStoreFactory;
this.inputFields = inputFields;
this.aggregator = aggregator;
this.storeTuplesInStore = storeTuplesInStore;
}
示例10
public Stream partitionAggregate(Aggregator agg, Fields functionFields) {
return partitionAggregate(null, agg, functionFields);
}
示例11
private <T> Stream comparableAggregateStream(String inputFieldName, Aggregator<T> aggregator) {
if(inputFieldName != null) {
projectionValidation(new Fields(inputFieldName));
}
return partitionAggregate(getOutputFields(), aggregator, getOutputFields());
}
示例12
public Stream aggregate(Aggregator agg, Fields functionFields) {
return aggregate(null, agg, functionFields);
}
示例13
public Stream aggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
projectionValidation(inputFields);
return chainedAgg()
.aggregate(inputFields, agg, functionFields)
.chainEnd();
}
示例14
public void cleanup() {
for (Aggregator a : _aggs) {
a.cleanup();
}
}
示例15
public SingleEmitAggregator(Aggregator agg, BatchToPartition batchToPartition) {
_agg = agg;
_batchToPartition = batchToPartition;
}
示例16
public AggregateProcessor(Fields inputFields, Aggregator agg) {
_agg = agg;
_inputFields = inputFields;
}
示例17
public AggSpec(Fields inFields, Aggregator agg, Fields outFields) {
this.inFields = inFields;
this.agg = agg;
this.outFields = outFields;
}
示例18
public ChainedPartitionAggregatorDeclarer partitionAggregate(Aggregator agg, Fields functionFields) {
return partitionAggregate(null, agg, functionFields);
}
示例19
public ChainedPartitionAggregatorDeclarer partitionAggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
_type = AggType.PARTITION;
_aggs.add(new AggSpec(inputFields, agg, functionFields));
return this;
}
示例20
public ChainedFullAggregatorDeclarer aggregate(Aggregator agg, Fields functionFields) {
return aggregate(null, agg, functionFields);
}
示例21
public ChainedFullAggregatorDeclarer aggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
return aggregate(inputFields, agg, functionFields, false);
}
示例22
public Stream aggregate(Aggregator agg, Fields functionFields) {
return aggregate(null, agg, functionFields);
}
示例23
public Stream aggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
return new ChainedAggregatorDeclarer(this, this).aggregate(inputFields, agg, functionFields).chainEnd();
}
示例24
public InMemoryTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore,
Aggregator aggregator, BatchOutputCollector delegateCollector) {
super(windowConfig, windowTaskId, windowStore, aggregator, delegateCollector);
}
示例25
/**
* Returns a stream of aggregated results based on the given window configuration which uses inmemory windowing tuple store.
*
* @param windowConfig window configuration like window length and slide length.
* @param inputFields input fields
* @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
* @param functionFields fields of values to emit with aggregation.
*
* @return the new stream with this operation.
*/
public Stream window(WindowConfig windowConfig, Fields inputFields, Aggregator aggregator, Fields functionFields) {
// this store is used only for storing triggered aggregated results but not tuples as storeTuplesInStore is set
// as false int he below call.
InMemoryWindowsStoreFactory inMemoryWindowsStoreFactory = new InMemoryWindowsStoreFactory();
return window(windowConfig, inMemoryWindowsStoreFactory, inputFields, aggregator, functionFields, false);
}
示例26
/**
* This aggregator operation computes the minimum of tuples by the given {@code inputFieldName} and it is
* assumed that its value is an instance of {@code Comparable}. If the value of tuple with field {@code inputFieldName} is not an
* instance of {@code Comparable} then it throws {@code ClassCastException}
*
* @param inputFieldName input field name
* @return the new stream with this operation.
*/
public Stream minBy(String inputFieldName) {
Aggregator<ComparisonAggregator.State> min = new Min(inputFieldName);
return comparableAggregateStream(inputFieldName, min);
}
示例27
/**
* This aggregator operation computes the minimum of tuples by the given {@code inputFieldName} in a stream by
* using the given {@code comparator}. If the value of tuple with field {@code inputFieldName} is not an
* instance of {@code T} then it throws {@code ClassCastException}
*
* @param inputFieldName input field name
* @param comparator comparator used in for finding minimum of two tuple values of {@code inputFieldName}.
* @param <T> type of tuple's given input field value.
* @return the new stream with this operation.
*/
public <T> Stream minBy(String inputFieldName, Comparator<T> comparator) {
Aggregator<ComparisonAggregator.State> min = new MinWithComparator<>(inputFieldName, comparator);
return comparableAggregateStream(inputFieldName, min);
}
示例28
/**
* This aggregator operation computes the maximum of tuples by the given {@code inputFieldName} and it is
* assumed that its value is an instance of {@code Comparable}. If the value of tuple with field {@code inputFieldName} is not an
* instance of {@code Comparable} then it throws {@code ClassCastException}
*
* @param inputFieldName input field name
* @return the new stream with this operation.
*/
public Stream maxBy(String inputFieldName) {
Aggregator<ComparisonAggregator.State> max = new Max(inputFieldName);
return comparableAggregateStream(inputFieldName, max);
}
示例29
/**
* This aggregator operation computes the maximum of tuples by the given {@code inputFieldName} in a stream by
* using the given {@code comparator}. If the value of tuple with field {@code inputFieldName} is not an
* instance of {@code T} then it throws {@code ClassCastException}
*
* @param inputFieldName input field name
* @param comparator comparator used in for finding maximum of two tuple values of {@code inputFieldName}.
* @param <T> type of tuple's given input field value.
* @return the new stream with this operation.
*/
public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator) {
Aggregator<ComparisonAggregator.State> max = new MaxWithComparator<>(inputFieldName, comparator);
return comparableAggregateStream(inputFieldName, max);
}
示例30
/**
* Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples.
*
* @param windowCount represents number of tuples in the window
* @param windowStoreFactory intermediary tuple store for storing windowing tuples
* @param inputFields projected fields for aggregator
* @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
* @param functionFields fields of values to emit with aggregation.
*
* @return the new stream with this operation.
*/
public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory,
Fields inputFields, Aggregator aggregator, Fields functionFields) {
return window(TumblingCountWindow.of(windowCount), windowStoreFactory, inputFields, aggregator, functionFields);
}