Java源码示例:org.apache.flink.streaming.api.transformations.SideOutputTransformation

示例1
/**
 * Gets the {@link DataStream} that contains the elements that are emitted from an operation
 * into the side output with the given {@link OutputTag}.
 *
 * @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object)
 */
public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
	if (wasSplitApplied) {
		throw new UnsupportedOperationException("getSideOutput() and split() may not be called on the same DataStream. " +
			"As a work-around, please add a no-op map function before the split() call.");
	}

	sideOutputTag = clean(requireNonNull(sideOutputTag));

	// make a defensive copy
	sideOutputTag = new OutputTag<X>(sideOutputTag.getId(), sideOutputTag.getTypeInfo());

	TypeInformation<?> type = requestedSideOutputs.get(sideOutputTag);
	if (type != null && !type.equals(sideOutputTag.getTypeInfo())) {
		throw new UnsupportedOperationException("A side output with a matching id was " +
				"already requested with a different type. This is not allowed, side output " +
				"ids need to be unique.");
	}

	requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo());

	SideOutputTransformation<X> sideOutputTransformation = new SideOutputTransformation<>(this.getTransformation(), sideOutputTag);
	return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation);
}
 
示例2
/**
 * Transforms a {@code SideOutputTransformation}.
 *
 * <p>For this we create a virtual node in the {@code StreamGraph} that holds the side-output
 * {@link org.apache.flink.util.OutputTag}.
 *
 * @see org.apache.flink.streaming.api.graph.StreamGraphGenerator
 */
private <T> Collection<Integer> transformSideOutput(SideOutputTransformation<T> sideOutput) {
	StreamTransformation<?> input = sideOutput.getInput();
	Collection<Integer> resultIds = transform(input);

	// the recursive transform might have already transformed this
	if (alreadyTransformed.containsKey(sideOutput)) {
		return alreadyTransformed.get(sideOutput);
	}

	List<Integer> virtualResultIds = new ArrayList<>();

	for (int inputId : resultIds) {
		int virtualId = StreamTransformation.getNewNodeId();
		streamGraph.addVirtualSideOutputNode(inputId, virtualId, sideOutput.getOutputTag());
		virtualResultIds.add(virtualId);
	}
	return virtualResultIds;
}
 
示例3
/**
 * Gets the {@link DataStream} that contains the elements that are emitted from an operation
 * into the side output with the given {@link OutputTag}.
 *
 * @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object)
 */
public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
	if (wasSplitApplied) {
		throw new UnsupportedOperationException("getSideOutput() and split() may not be called on the same DataStream. " +
			"As a work-around, please add a no-op map function before the split() call.");
	}

	sideOutputTag = clean(requireNonNull(sideOutputTag));

	// make a defensive copy
	sideOutputTag = new OutputTag<X>(sideOutputTag.getId(), sideOutputTag.getTypeInfo());

	TypeInformation<?> type = requestedSideOutputs.get(sideOutputTag);
	if (type != null && !type.equals(sideOutputTag.getTypeInfo())) {
		throw new UnsupportedOperationException("A side output with a matching id was " +
				"already requested with a different type. This is not allowed, side output " +
				"ids need to be unique.");
	}

	requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo());

	SideOutputTransformation<X> sideOutputTransformation = new SideOutputTransformation<>(this.getTransformation(), sideOutputTag);
	return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation);
}
 
示例4
/**
 * Transforms a {@code SideOutputTransformation}.
 *
 * <p>For this we create a virtual node in the {@code StreamGraph} that holds the side-output
 * {@link org.apache.flink.util.OutputTag}.
 *
 * @see org.apache.flink.streaming.api.graph.StreamGraphGenerator
 */
private <T> Collection<Integer> transformSideOutput(SideOutputTransformation<T> sideOutput) {
	Transformation<?> input = sideOutput.getInput();
	Collection<Integer> resultIds = transform(input);

	// the recursive transform might have already transformed this
	if (alreadyTransformed.containsKey(sideOutput)) {
		return alreadyTransformed.get(sideOutput);
	}

	List<Integer> virtualResultIds = new ArrayList<>();

	for (int inputId : resultIds) {
		int virtualId = Transformation.getNewNodeId();
		streamGraph.addVirtualSideOutputNode(inputId, virtualId, sideOutput.getOutputTag());
		virtualResultIds.add(virtualId);
	}
	return virtualResultIds;
}
 
示例5
/**
 * Gets the {@link DataStream} that contains the elements that are emitted from an operation
 * into the side output with the given {@link OutputTag}.
 *
 * @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object)
 */
public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
	if (wasSplitApplied) {
		throw new UnsupportedOperationException("getSideOutput() and split() may not be called on the same DataStream. " +
			"As a work-around, please add a no-op map function before the split() call.");
	}

	sideOutputTag = clean(requireNonNull(sideOutputTag));

	// make a defensive copy
	sideOutputTag = new OutputTag<X>(sideOutputTag.getId(), sideOutputTag.getTypeInfo());

	TypeInformation<?> type = requestedSideOutputs.get(sideOutputTag);
	if (type != null && !type.equals(sideOutputTag.getTypeInfo())) {
		throw new UnsupportedOperationException("A side output with a matching id was " +
				"already requested with a different type. This is not allowed, side output " +
				"ids need to be unique.");
	}

	requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo());

	SideOutputTransformation<X> sideOutputTransformation = new SideOutputTransformation<>(this.getTransformation(), sideOutputTag);
	return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation);
}
 
示例6
/**
 * Transforms a {@code SideOutputTransformation}.
 *
 * <p>For this we create a virtual node in the {@code StreamGraph} that holds the side-output
 * {@link org.apache.flink.util.OutputTag}.
 *
 * @see org.apache.flink.streaming.api.graph.StreamGraphGenerator
 */
private <T> Collection<Integer> transformSideOutput(SideOutputTransformation<T> sideOutput) {
	Transformation<?> input = sideOutput.getInput();
	Collection<Integer> resultIds = transform(input);

	// the recursive transform might have already transformed this
	if (alreadyTransformed.containsKey(sideOutput)) {
		return alreadyTransformed.get(sideOutput);
	}

	List<Integer> virtualResultIds = new ArrayList<>();

	for (int inputId : resultIds) {
		int virtualId = Transformation.getNewNodeId();
		streamGraph.addVirtualSideOutputNode(inputId, virtualId, sideOutput.getOutputTag());
		virtualResultIds.add(virtualId);
	}
	return virtualResultIds;
}
 
示例7
private <T> void validateSplitTransformation(StreamTransformation<T> input) {
	if (input instanceof SelectTransformation || input instanceof SplitTransformation) {
		throw new IllegalStateException("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
	} else if (input instanceof SideOutputTransformation) {
		throw new IllegalStateException("Split after side-outputs are not supported. Splits are deprecated. Please use side-outputs.");
	} else if (input instanceof UnionTransformation) {
		for (StreamTransformation<T> transformation : ((UnionTransformation<T>) input).getInputs()) {
			validateSplitTransformation(transformation);
		}
	} else if (input instanceof PartitionTransformation) {
		validateSplitTransformation(((PartitionTransformation) input).getInput());
	} else {
		return;
	}
}
 
示例8
private <T> void validateSplitTransformation(Transformation<T> input) {
	if (input instanceof SelectTransformation || input instanceof SplitTransformation) {
		throw new IllegalStateException("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
	} else if (input instanceof SideOutputTransformation) {
		throw new IllegalStateException("Split after side-outputs are not supported. Splits are deprecated. Please use side-outputs.");
	} else if (input instanceof UnionTransformation) {
		for (Transformation<T> transformation : ((UnionTransformation<T>) input).getInputs()) {
			validateSplitTransformation(transformation);
		}
	} else if (input instanceof PartitionTransformation) {
		validateSplitTransformation(((PartitionTransformation) input).getInput());
	} else {
		return;
	}
}
 
示例9
private <T> void validateSplitTransformation(Transformation<T> input) {
	if (input instanceof SelectTransformation || input instanceof SplitTransformation) {
		throw new IllegalStateException("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
	} else if (input instanceof SideOutputTransformation) {
		throw new IllegalStateException("Split after side-outputs are not supported. Splits are deprecated. Please use side-outputs.");
	} else if (input instanceof UnionTransformation) {
		for (Transformation<T> transformation : ((UnionTransformation<T>) input).getInputs()) {
			validateSplitTransformation(transformation);
		}
	} else if (input instanceof PartitionTransformation) {
		validateSplitTransformation(((PartitionTransformation) input).getInput());
	} else {
		return;
	}
}