Java源码示例:org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability

示例1
@Override
public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException {
	BufferAndBacklog next = subpartitionView.getNextBuffer();
	if (next != null) {
		sequenceNumber++;

		if (next.buffer().isBuffer() && --numCreditsAvailable < 0) {
			throw new IllegalStateException("no credit available");
		}

		return new BufferAndAvailability(
			next.buffer(), isAvailable(next), next.buffersInBacklog());
	} else {
		return null;
	}
}
 
示例2
@Override
public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException {
	BufferAndBacklog next = subpartitionView.getNextBuffer();
	if (next != null) {
		sequenceNumber++;

		if (next.buffer().isBuffer() && --numCreditsAvailable < 0) {
			throw new IllegalStateException("no credit available");
		}

		return new BufferAndAvailability(
			next.buffer(), isAvailable(next), next.buffersInBacklog());
	} else {
		return null;
	}
}
 
示例3
private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException {
	if (hasReceivedAllEndOfPartitionEvents) {
		return Optional.empty();
	}

	if (closeFuture.isDone()) {
		throw new IllegalStateException("Released");
	}

	Optional<InputWithData<InputChannel, BufferAndAvailability>> next = waitAndGetNextData(blocking);
	if (!next.isPresent()) {
		return Optional.empty();
	}

	InputWithData<InputChannel, BufferAndAvailability> inputWithData = next.get();
	return Optional.of(transformToBufferOrEvent(
		inputWithData.data.buffer(),
		inputWithData.moreAvailable,
		inputWithData.input));
}
 
示例4
@Override
public BufferAndAvailability getNextBuffer() throws IOException {
	BufferAndBacklog next = subpartitionView.getNextBuffer();
	if (next != null) {
		sequenceNumber++;

		if (next.buffer().isBuffer() && --numCreditsAvailable < 0) {
			throw new IllegalStateException("no credit available");
		}

		return new BufferAndAvailability(
			next.buffer(), isAvailable(next), next.buffersInBacklog());
	} else {
		return null;
	}
}
 
示例5
private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException {
	if (hasReceivedAllEndOfPartitionEvents) {
		return Optional.empty();
	}

	if (closeFuture.isDone()) {
		throw new CancelTaskException("Input gate is already closed.");
	}

	Optional<InputWithData<InputChannel, BufferAndAvailability>> next = waitAndGetNextData(blocking);
	if (!next.isPresent()) {
		return Optional.empty();
	}

	InputWithData<InputChannel, BufferAndAvailability> inputWithData = next.get();
	return Optional.of(transformToBufferOrEvent(
		inputWithData.data.buffer(),
		inputWithData.moreAvailable,
		inputWithData.input));
}
 
示例6
@Override
public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException {
	BufferAndBacklog next = subpartitionView.getNextBuffer();
	if (next != null) {
		sequenceNumber++;
		return new BufferAndAvailability(next.buffer(), next.isMoreAvailable(), next.buffersInBacklog());
	} else {
		return null;
	}
}
 
示例7
private IteratorWrappingTestSingleInputGate<T> wrapIterator(MutableObjectIterator<T> iterator) throws IOException, InterruptedException {
	inputIterator = iterator;
	serializer = new SpanningRecordSerializer<T>();

	// The input iterator can produce an infinite stream. That's why we have to serialize each
	// record on demand and cannot do it upfront.
	final BufferAndAvailabilityProvider answer = new BufferAndAvailabilityProvider() {

		private boolean hasData = inputIterator.next(reuse) != null;

		@Override
		public Optional<BufferAndAvailability> getBufferAvailability() throws IOException {
			if (hasData) {
				serializer.serializeRecord(reuse);
				BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
				serializer.copyToBufferBuilder(bufferBuilder);

				hasData = inputIterator.next(reuse) != null;

				// Call getCurrentBuffer to ensure size is set
				return Optional.of(new BufferAndAvailability(buildSingleBuffer(bufferBuilder), true, 0));
			} else {
				inputChannel.setReleased();

				return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE),
					false,
					0));
			}
		}
	};

	inputChannel.addBufferAndAvailability(answer);

	inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel);

	return this;
}
 
示例8
@Override
public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException {
	BufferAndBacklog next = subpartitionView.getNextBuffer();
	if (next != null) {
		sequenceNumber++;
		return new BufferAndAvailability(next.buffer(), next.isMoreAvailable(), next.buffersInBacklog());
	} else {
		return null;
	}
}
 
示例9
private Optional<InputWithData<InputChannel, BufferAndAvailability>> waitAndGetNextData(boolean blocking)
		throws IOException, InterruptedException {
	while (true) {
		Optional<InputChannel> inputChannel = getChannel(blocking);
		if (!inputChannel.isPresent()) {
			return Optional.empty();
		}

		// Do not query inputChannel under the lock, to avoid potential deadlocks coming from
		// notifications.
		Optional<BufferAndAvailability> result = inputChannel.get().getNextBuffer();

		synchronized (inputChannelsWithData) {
			if (result.isPresent() && result.get().moreAvailable()) {
				// enqueue the inputChannel at the end to avoid starvation
				inputChannelsWithData.add(inputChannel.get());
				enqueuedInputChannelsWithData.set(inputChannel.get().getChannelIndex());
			}

			if (inputChannelsWithData.isEmpty()) {
				resetIsAvailable();
			}

			if (result.isPresent()) {
				return Optional.of(new InputWithData<>(
					inputChannel.get(),
					result.get(),
					!inputChannelsWithData.isEmpty()));
			}
		}
	}
}
 
示例10
private IteratorWrappingTestSingleInputGate<T> wrapIterator(MutableObjectIterator<T> iterator) throws IOException, InterruptedException {
	inputIterator = iterator;
	serializer = new SpanningRecordSerializer<T>();

	// The input iterator can produce an infinite stream. That's why we have to serialize each
	// record on demand and cannot do it upfront.
	final BufferAndAvailabilityProvider answer = new BufferAndAvailabilityProvider() {

		private boolean hasData = inputIterator.next(reuse) != null;

		@Override
		public Optional<BufferAndAvailability> getBufferAvailability() throws IOException {
			if (hasData) {
				serializer.serializeRecord(reuse);
				BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
				serializer.copyToBufferBuilder(bufferBuilder);

				hasData = inputIterator.next(reuse) != null;

				// Call getCurrentBuffer to ensure size is set
				return Optional.of(new BufferAndAvailability(buildSingleBuffer(bufferBuilder), true, 0));
			} else {
				inputChannel.setReleased();

				return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE),
					false,
					0));
			}
		}
	};

	inputChannel.addBufferAndAvailability(answer);

	inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel);

	return this;
}
 
示例11
private Optional<InputWithData<InputChannel, BufferAndAvailability>> waitAndGetNextData(boolean blocking)
		throws IOException, InterruptedException {
	while (true) {
		Optional<InputChannel> inputChannel = getChannel(blocking);
		if (!inputChannel.isPresent()) {
			return Optional.empty();
		}

		// Do not query inputChannel under the lock, to avoid potential deadlocks coming from
		// notifications.
		Optional<BufferAndAvailability> result = inputChannel.get().getNextBuffer();

		synchronized (inputChannelsWithData) {
			if (result.isPresent() && result.get().moreAvailable()) {
				// enqueue the inputChannel at the end to avoid starvation
				inputChannelsWithData.add(inputChannel.get());
				enqueuedInputChannelsWithData.set(inputChannel.get().getChannelIndex());
			}

			if (inputChannelsWithData.isEmpty()) {
				availabilityHelper.resetUnavailable();
			}

			if (result.isPresent()) {
				return Optional.of(new InputWithData<>(
					inputChannel.get(),
					result.get(),
					!inputChannelsWithData.isEmpty()));
			}
		}
	}
}
 
示例12
private Callable<Void> processRecoveredBufferTask(RecoveredInputChannel inputChannel, int totalStates, int[] states, boolean verifyRelease) {
	return () -> {
		// process all the queued state buffers and verify the data
		int numProcessedStates = 0;
		while (numProcessedStates < totalStates) {
			if (verifyRelease && inputChannel.isReleased()) {
				break;
			}
			if (inputChannel.getNumberOfQueuedBuffers() == 0) {
				Thread.sleep(1);
				continue;
			}
			try {
				Optional<BufferAndAvailability> bufferAndAvailability = inputChannel.getNextBuffer();
				if (bufferAndAvailability.isPresent()) {
					Buffer buffer = bufferAndAvailability.get().buffer();
					BufferBuilderAndConsumerTest.assertContent(buffer, null, states);
					buffer.recycleBuffer();
					numProcessedStates++;
				}
			} catch (Throwable t) {
				if (!(verifyRelease && inputChannel.isReleased())) {
					throw new AssertionError("Exceptions are expected here only if the input channel was released", t);
				}
			}
		}

		return null;
	};
}
 
示例13
private IteratorWrappingTestSingleInputGate<T> wrapIterator(MutableObjectIterator<T> iterator) throws IOException, InterruptedException {
	inputIterator = iterator;
	serializer = new SpanningRecordSerializer<T>();

	// The input iterator can produce an infinite stream. That's why we have to serialize each
	// record on demand and cannot do it upfront.
	final BufferAndAvailabilityProvider answer = new BufferAndAvailabilityProvider() {

		private boolean hasData = inputIterator.next(reuse) != null;

		@Override
		public Optional<BufferAndAvailability> getBufferAvailability() throws IOException {
			if (hasData) {
				serializer.serializeRecord(reuse);
				BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
				BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
				serializer.copyToBufferBuilder(bufferBuilder);

				hasData = inputIterator.next(reuse) != null;

				// Call getCurrentBuffer to ensure size is set
				return Optional.of(new BufferAndAvailability(bufferConsumer.build(), true, 0));
			} else {
				inputChannel.setReleased();

				return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE),
					false,
					0));
			}
		}
	};

	inputChannel.addBufferAndAvailability(answer);

	inputGate.setInputChannels(inputChannel);

	return this;
}
 
示例14
private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException {
	if (hasReceivedAllEndOfPartitionEvents) {
		return Optional.empty();
	}

	if (isReleased) {
		throw new IllegalStateException("Released");
	}

	requestPartitions();

	InputChannel currentChannel;
	boolean moreAvailable;
	Optional<BufferAndAvailability> result = Optional.empty();

	do {
		synchronized (inputChannelsWithData) {
			while (inputChannelsWithData.size() == 0) {
				if (isReleased) {
					throw new IllegalStateException("Released");
				}

				if (blocking) {
					inputChannelsWithData.wait();
				}
				else {
					return Optional.empty();
				}
			}

			currentChannel = inputChannelsWithData.remove();
			enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
			moreAvailable = !inputChannelsWithData.isEmpty();
		}

		result = currentChannel.getNextBuffer();
	} while (!result.isPresent());

	// this channel was now removed from the non-empty channels queue
	// we re-add it in case it has more data, because in that case no "non-empty" notification
	// will come for that channel
	if (result.get().moreAvailable()) {
		queueChannel(currentChannel);
		moreAvailable = true;
	}

	final Buffer buffer = result.get().buffer();
	if (buffer.isBuffer()) {
		return Optional.of(new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable));
	}
	else {
		final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());

		if (event.getClass() == EndOfPartitionEvent.class) {
			channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());

			if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) {
				// Because of race condition between:
				// 1. releasing inputChannelsWithData lock in this method and reaching this place
				// 2. empty data notification that re-enqueues a channel
				// we can end up with moreAvailable flag set to true, while we expect no more data.
				checkState(!moreAvailable || !pollNextBufferOrEvent().isPresent());
				moreAvailable = false;
				hasReceivedAllEndOfPartitionEvents = true;
			}

			currentChannel.notifySubpartitionConsumed();

			currentChannel.releaseAllResources();
		}

		return Optional.of(new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable));
	}
}
 
示例15
@SuppressWarnings("unchecked")
private void setupInputChannels() throws IOException, InterruptedException {

	for (int i = 0; i < numInputChannels; i++) {
		final int channelIndex = i;
		final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
		final SerializationDelegate<Object> delegate = (SerializationDelegate<Object>) (SerializationDelegate<?>)
			new SerializationDelegate<StreamElement>(new StreamElementSerializer<T>(serializer));

		inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>();
		inputChannels[channelIndex] = new TestInputChannel(inputGate, i);

		final BufferAndAvailabilityProvider answer = () -> {
			ConcurrentLinkedQueue<InputValue<Object>> inputQueue = inputQueues[channelIndex];
			InputValue<Object> input;
			boolean moreAvailable;
			synchronized (inputQueue) {
				input = inputQueue.poll();
				moreAvailable = !inputQueue.isEmpty();
			}
			if (input != null && input.isStreamEnd()) {
				inputChannels[channelIndex].setReleased();
				return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), moreAvailable, 0));
			} else if (input != null && input.isStreamRecord()) {
				Object inputElement = input.getStreamRecord();

				delegate.setInstance(inputElement);
				recordSerializer.serializeRecord(delegate);
				BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
				recordSerializer.copyToBufferBuilder(bufferBuilder);
				bufferBuilder.finish();

				// Call getCurrentBuffer to ensure size is set
				return Optional.of(new BufferAndAvailability(buildSingleBuffer(bufferBuilder), moreAvailable, 0));
			} else if (input != null && input.isEvent()) {
				AbstractEvent event = input.getEvent();
				return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(event), moreAvailable, 0));
			} else {
				return Optional.empty();
			}
		};

		inputChannels[channelIndex].addBufferAndAvailability(answer);

		inputGate.setInputChannel(new IntermediateResultPartitionID(),
			inputChannels[channelIndex]);
	}
}
 
示例16
@SuppressWarnings("unchecked")
private void setupInputChannels() throws IOException, InterruptedException {

	for (int i = 0; i < numInputChannels; i++) {
		final int channelIndex = i;
		final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
		final SerializationDelegate<Object> delegate = (SerializationDelegate<Object>) (SerializationDelegate<?>)
			new SerializationDelegate<StreamElement>(new StreamElementSerializer<T>(serializer));

		inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>();
		inputChannels[channelIndex] = new TestInputChannel(inputGate, i);

		final BufferAndAvailabilityProvider answer = () -> {
			ConcurrentLinkedQueue<InputValue<Object>> inputQueue = inputQueues[channelIndex];
			InputValue<Object> input;
			boolean moreAvailable;
			synchronized (inputQueue) {
				input = inputQueue.poll();
				moreAvailable = !inputQueue.isEmpty();
			}
			if (input != null && input.isStreamEnd()) {
				inputChannels[channelIndex].setReleased();
				return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), moreAvailable, 0));
			} else if (input != null && input.isStreamRecord()) {
				Object inputElement = input.getStreamRecord();

				delegate.setInstance(inputElement);
				recordSerializer.serializeRecord(delegate);
				BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
				recordSerializer.copyToBufferBuilder(bufferBuilder);
				bufferBuilder.finish();

				// Call getCurrentBuffer to ensure size is set
				return Optional.of(new BufferAndAvailability(buildSingleBuffer(bufferBuilder), moreAvailable, 0));
			} else if (input != null && input.isEvent()) {
				AbstractEvent event = input.getEvent();
				if (event instanceof EndOfPartitionEvent) {
					inputChannels[channelIndex].setReleased();
				}

				return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(event), moreAvailable, 0));
			} else {
				return Optional.empty();
			}
		};

		inputChannels[channelIndex].addBufferAndAvailability(answer);

		inputGate.setInputChannel(new IntermediateResultPartitionID(),
			inputChannels[channelIndex]);
	}
}
 
示例17
@SuppressWarnings("unchecked")
private void setupInputChannels() {

	for (int i = 0; i < numInputChannels; i++) {
		final int channelIndex = i;
		final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
		final SerializationDelegate<Object> delegate = (SerializationDelegate<Object>) (SerializationDelegate<?>)
			new SerializationDelegate<>(new StreamElementSerializer<T>(serializer));

		inputQueues[channelIndex] = new ConcurrentLinkedQueue<>();
		inputChannels[channelIndex] = new TestInputChannel(inputGate, i);

		final BufferAndAvailabilityProvider answer = () -> {
			ConcurrentLinkedQueue<InputValue<Object>> inputQueue = inputQueues[channelIndex];
			InputValue<Object> input;
			boolean moreAvailable;
			synchronized (inputQueue) {
				input = inputQueue.poll();
				moreAvailable = !inputQueue.isEmpty();
			}
			if (input != null && input.isStreamEnd()) {
				inputChannels[channelIndex].setReleased();
				return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), moreAvailable, 0));
			} else if (input != null && input.isStreamRecord()) {
				Object inputElement = input.getStreamRecord();

				delegate.setInstance(inputElement);
				recordSerializer.serializeRecord(delegate);
				BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
				BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
				recordSerializer.copyToBufferBuilder(bufferBuilder);
				bufferBuilder.finish();

				// Call getCurrentBuffer to ensure size is set
				return Optional.of(new BufferAndAvailability(bufferConsumer.build(), moreAvailable, 0));
			} else if (input != null && input.isEvent()) {
				AbstractEvent event = input.getEvent();
				if (event instanceof EndOfPartitionEvent) {
					inputChannels[channelIndex].setReleased();
				}

				return Optional.of(new BufferAndAvailability(EventSerializer.toBuffer(event), moreAvailable, 0));
			} else {
				return Optional.empty();
			}
		};

		inputChannels[channelIndex].addBufferAndAvailability(answer);
	}
	inputGate.setInputChannels(inputChannels);
}
 
示例18
BufferAndAvailability getNextBuffer() throws IOException, InterruptedException; 
示例19
BufferAndAvailability getNextBuffer() throws IOException, InterruptedException; 
示例20
BufferAndAvailability getNextBuffer() throws IOException;