Java源码示例:org.apache.flink.runtime.taskmanager.TaskActions

示例1
/**
 * Helper to create simple {@link ResultPartition} instance for use by a {@link Task} inside
 * {@link NetworkEnvironment#registerTask(Task)}.
 *
 * @param partitionType
 * 		the produced partition type
 * @param channels
 * 		the number of output channels
 *
 * @return instance with minimal data set and some mocks so that it is useful for {@link
 * NetworkEnvironment#registerTask(Task)}
 */
private static ResultPartition createResultPartition(
		final ResultPartitionType partitionType, final int channels) {
	return new ResultPartition(
		"TestTask-" + partitionType + ":" + channels,
		mock(TaskActions.class),
		new JobID(),
		new ResultPartitionID(),
		partitionType,
		channels,
		channels,
		mock(ResultPartitionManager.class),
		new NoOpResultPartitionConsumableNotifier(),
		mock(IOManager.class),
		false);
}
 
示例2
private SingleInputGate createInputGate(
		int numberOfInputChannels, ResultPartitionType partitionType) {
	SingleInputGate inputGate = new SingleInputGate(
		"Test Task Name",
		new JobID(),
		new IntermediateDataSetID(),
		partitionType,
		0,
		numberOfInputChannels,
		mock(TaskActions.class),
		UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
		enableCreditBasedFlowControl);

	assertEquals(partitionType, inputGate.getConsumedPartitionType());

	return inputGate;
}
 
示例3
/**
 * Tests {@link ResultPartition#addBufferConsumer} on a partition which has already finished.
 *
 * @param pipelined the result partition type to set up
 */
protected void testAddOnFinishedPartition(final ResultPartitionType pipelined)
	throws Exception {
	BufferConsumer bufferConsumer = createFilledBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
	ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
	try {
		ResultPartition partition = createPartition(notifier, pipelined, true);
		partition.finish();
		reset(notifier);
		// partition.add() should fail
		partition.addBufferConsumer(bufferConsumer, 0);
		Assert.fail("exception expected");
	} catch (IllegalStateException e) {
		// expected => ignored
	} finally {
		if (!bufferConsumer.isRecycled()) {
			bufferConsumer.close();
			Assert.fail("bufferConsumer not recycled");
		}
		// should not have notified either
		verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
	}
}
 
示例4
/**
 * Tests {@link ResultPartition#addBufferConsumer} on a partition which has already been released.
 *
 * @param pipelined the result partition type to set up
 */
protected void testAddOnReleasedPartition(final ResultPartitionType pipelined)
	throws Exception {
	BufferConsumer bufferConsumer = createFilledBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
	ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
	try {
		ResultPartition partition = createPartition(notifier, pipelined, true);
		partition.release();
		// partition.add() silently drops the bufferConsumer but recycles it
		partition.addBufferConsumer(bufferConsumer, 0);
		assertTrue(partition.isReleased());
	} finally {
		if (!bufferConsumer.isRecycled()) {
			bufferConsumer.close();
			Assert.fail("bufferConsumer not recycled");
		}
		// should not have notified either
		verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
	}
}
 
示例5
/**
 * Tests {@link ResultPartition#addBufferConsumer(BufferConsumer, int)} on a working partition.
 *
 * @param pipelined the result partition type to set up
 */
protected void testAddOnPartition(final ResultPartitionType pipelined)
	throws Exception {
	ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
	ResultPartition partition = createPartition(notifier, pipelined, true);
	BufferConsumer bufferConsumer = createFilledBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
	try {
		// partition.add() adds the bufferConsumer without recycling it (if not spilling)
		partition.addBufferConsumer(bufferConsumer, 0);
		assertFalse("bufferConsumer should not be recycled (still in the queue)", bufferConsumer.isRecycled());
	} finally {
		if (!bufferConsumer.isRecycled()) {
			bufferConsumer.close();
		}
		// should have been notified for pipelined partitions
		if (pipelined.isPipelined()) {
			verify(notifier, times(1))
				.notifyPartitionConsumable(
					eq(partition.getJobId()),
					eq(partition.getPartitionId()),
					any(TaskActions.class));
		}
	}
}
 
示例6
private static ResultPartition createPartition(
	ResultPartitionConsumableNotifier notifier,
	ResultPartitionType type,
	boolean sendScheduleOrUpdateConsumersMessage) {
	return new ResultPartition(
		"TestTask",
		mock(TaskActions.class),
		new JobID(),
		new ResultPartitionID(),
		type,
		1,
		1,
		mock(ResultPartitionManager.class),
		notifier,
		ioManager,
		sendScheduleOrUpdateConsumersMessage);
}
 
示例7
@SuppressWarnings("unchecked")
public FairnessVerifyingInputGate(
		String owningTaskName,
		JobID jobId,
		IntermediateDataSetID consumedResultId,
		int consumedSubpartitionIndex,
		int numberOfInputChannels,
		TaskActions taskActions,
		TaskIOMetricGroup metrics,
		boolean isCreditBased) {

	super(owningTaskName, jobId, consumedResultId, ResultPartitionType.PIPELINED,
		consumedSubpartitionIndex,
			numberOfInputChannels, taskActions, metrics, isCreditBased);

	try {
		Field f = SingleInputGate.class.getDeclaredField("inputChannelsWithData");
		f.setAccessible(true);
		channelsWithData = (ArrayDeque<InputChannel>) f.get(this);
	}
	catch (Exception e) {
		throw new RuntimeException(e);
	}

	this.uniquenessChecker = new HashSet<>();
}
 
示例8
/**
 * Tests {@link ResultPartition#addBufferConsumer(BufferConsumer, int)} on a working partition.
 *
 * @param partitionType the result partition type to set up
 */
private void testAddOnPartition(final ResultPartitionType partitionType) throws Exception {
	ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
	JobID jobId = new JobID();
	TaskActions taskActions = new NoOpTaskActions();
	ResultPartitionWriter consumableNotifyingPartitionWriter = createConsumableNotifyingResultPartitionWriter(
		partitionType,
		taskActions,
		jobId,
		notifier);
	BufferConsumer bufferConsumer = createFilledBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
	try {
		// partition.add() adds the bufferConsumer without recycling it (if not spilling)
		consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
		assertFalse("bufferConsumer should not be recycled (still in the queue)", bufferConsumer.isRecycled());
	} finally {
		if (!bufferConsumer.isRecycled()) {
			bufferConsumer.close();
		}
		// should have been notified for pipelined partitions
		if (partitionType.isPipelined()) {
			verify(notifier, times(1))
				.notifyPartitionConsumable(eq(jobId), eq(consumableNotifyingPartitionWriter.getPartitionId()), eq(taskActions));
		}
	}
}
 
示例9
/**
 * Tests {@link ResultPartition#addBufferConsumer(BufferConsumer, int)} on a working partition.
 *
 * @param partitionType the result partition type to set up
 */
private void testAddOnPartition(final ResultPartitionType partitionType) throws Exception {
	ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
	JobID jobId = new JobID();
	TaskActions taskActions = new NoOpTaskActions();
	ResultPartitionWriter consumableNotifyingPartitionWriter = createConsumableNotifyingResultPartitionWriter(
		partitionType,
		taskActions,
		jobId,
		notifier);
	BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
	try {
		// partition.add() adds the bufferConsumer without recycling it (if not spilling)
		consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
		assertFalse("bufferConsumer should not be recycled (still in the queue)", bufferConsumer.isRecycled());
	} finally {
		if (!bufferConsumer.isRecycled()) {
			bufferConsumer.close();
		}
		// should have been notified for pipelined partitions
		if (partitionType.isPipelined()) {
			verify(notifier, times(1))
				.notifyPartitionConsumable(eq(jobId), eq(consumableNotifyingPartitionWriter.getPartitionId()), eq(taskActions));
		}
	}
}
 
示例10
@Override
public void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, final TaskActions taskActions) {
	CompletableFuture<Acknowledge> acknowledgeFuture = jobMasterGateway.scheduleOrUpdateConsumers(partitionId, timeout);

	acknowledgeFuture.whenCompleteAsync(
		(Acknowledge ack, Throwable throwable) -> {
			if (throwable != null) {
				LOG.error("Could not schedule or update consumers at the JobManager.", throwable);

				taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers.", throwable));
			}
		},
		executor);
}
 
示例11
public SingleInputGate(
	String owningTaskName,
	JobID jobId,
	IntermediateDataSetID consumedResultId,
	final ResultPartitionType consumedPartitionType,
	int consumedSubpartitionIndex,
	int numberOfInputChannels,
	TaskActions taskActions,
	TaskIOMetricGroup metrics,
	boolean isCreditBased) {

	this.owningTaskName = checkNotNull(owningTaskName);
	this.jobId = checkNotNull(jobId);

	this.consumedResultId = checkNotNull(consumedResultId);
	this.consumedPartitionType = checkNotNull(consumedPartitionType);

	checkArgument(consumedSubpartitionIndex >= 0);
	this.consumedSubpartitionIndex = consumedSubpartitionIndex;

	checkArgument(numberOfInputChannels > 0);
	this.numberOfInputChannels = numberOfInputChannels;

	this.inputChannels = new HashMap<>(numberOfInputChannels);
	this.channelsWithEndOfPartitionEvents = new BitSet(numberOfInputChannels);
	this.enqueuedInputChannelsWithData = new BitSet(numberOfInputChannels);

	this.taskActions = checkNotNull(taskActions);
	this.isCreditBased = isCreditBased;
}
 
示例12
/**
 * Creates and returns the single input gate for credit-based testing.
 *
 * @return The new created single input gate.
 */
static SingleInputGate createSingleInputGate() {
	return new SingleInputGate(
		"InputGate",
		new JobID(),
		new IntermediateDataSetID(),
		ResultPartitionType.PIPELINED,
		0,
		1,
		mock(TaskActions.class),
		UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
		true);
}
 
示例13
/**
 * Helper to create spy of a {@link SingleInputGate} for use by a {@link Task} inside
 * {@link NetworkEnvironment#registerTask(Task)}.
 *
 * @param partitionType
 * 		the consumed partition type
 * @param channels
 * 		the number of input channels
 *
 * @return input gate with some fake settings
 */
private SingleInputGate createSingleInputGate(
		final ResultPartitionType partitionType, final int channels) {
	return spy(new SingleInputGate(
		"Test Task Name",
		new JobID(),
		new IntermediateDataSetID(),
		partitionType,
		0,
		channels,
		mock(TaskActions.class),
		UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
		enableCreditBasedFlowControl));
}
 
示例14
ResultPartitionWithCountDownLatch(
		String owningTaskName,
		TaskActions taskActions,
		JobID jobId,
		ResultPartitionID partitionId,
		ResultPartitionType partitionType,
		int numberOfSubpartitions,
		int numTargetKeyGroups,
		ResultPartitionManager partitionManager,
		ResultPartitionConsumableNotifier partitionConsumableNotifier,
		IOManager ioManager,
		boolean sendScheduleOrUpdateConsumersMessage,
		CountDownLatch blockLatch,
		CountDownLatch doneLatch) {
	super(
		owningTaskName,
		taskActions,
		jobId,
		partitionId,
		partitionType,
		numberOfSubpartitions,
		numTargetKeyGroups,
		partitionManager,
		partitionConsumableNotifier,
		ioManager,
		sendScheduleOrUpdateConsumersMessage);
	this.blockLatch = Preconditions.checkNotNull(blockLatch);
	this.doneLatch = Preconditions.checkNotNull(doneLatch);
}
 
示例15
private SingleInputGate createSingleInputGate() {
	return new SingleInputGate(
		"InputGate",
		new JobID(),
		new IntermediateDataSetID(),
		ResultPartitionType.PIPELINED,
		0,
		1,
		mock(TaskActions.class),
		UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
		true);
}
 
示例16
@Test
public void testConsumptionWithLocalChannels() throws Exception {
	final int numberOfChannels = 11;
	final int buffersPerChannel = 1000;

	final ResultPartition resultPartition = mock(ResultPartition.class);

	final PipelinedSubpartition[] partitions = new PipelinedSubpartition[numberOfChannels];
	final Source[] sources = new Source[numberOfChannels];

	final ResultPartitionManager resultPartitionManager = createResultPartitionManager(partitions);

	final SingleInputGate gate = new SingleInputGate(
			"Test Task Name",
			new JobID(),
			new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
			0, numberOfChannels,
			mock(TaskActions.class),
			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
			true);

	for (int i = 0; i < numberOfChannels; i++) {
		LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(),
				resultPartitionManager, mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
		gate.setInputChannel(new IntermediateResultPartitionID(), channel);

		partitions[i] = new PipelinedSubpartition(0, resultPartition);
		sources[i] = new PipelinedSubpartitionSource(partitions[i]);
	}

	ProducerThread producer = new ProducerThread(sources, numberOfChannels * buffersPerChannel, 4, 10);
	ConsumerThread consumer = new ConsumerThread(gate, numberOfChannels * buffersPerChannel);
	producer.start();
	consumer.start();

	// the 'sync()' call checks for exceptions and failed assertions
	producer.sync();
	consumer.sync();
}
 
示例17
@Test
public void testConsumptionWithRemoteChannels() throws Exception {
	final int numberOfChannels = 11;
	final int buffersPerChannel = 1000;

	final ConnectionManager connManager = createDummyConnectionManager();
	final Source[] sources = new Source[numberOfChannels];

	final SingleInputGate gate = new SingleInputGate(
			"Test Task Name",
			new JobID(),
			new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
			0,
			numberOfChannels,
			mock(TaskActions.class),
			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
			true);

	for (int i = 0; i < numberOfChannels; i++) {
		RemoteInputChannel channel = new RemoteInputChannel(
				gate, i, new ResultPartitionID(), mock(ConnectionID.class),
				connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
		gate.setInputChannel(new IntermediateResultPartitionID(), channel);

		sources[i] = new RemoteChannelSource(channel);
	}

	ProducerThread producer = new ProducerThread(sources, numberOfChannels * buffersPerChannel, 4, 10);
	ConsumerThread consumer = new ConsumerThread(gate, numberOfChannels * buffersPerChannel);
	producer.start();
	consumer.start();

	// the 'sync()' call checks for exceptions and failed assertions
	producer.sync();
	consumer.sync();
}
 
示例18
@Override
public void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, final TaskActions taskActions) {
	CompletableFuture<Acknowledge> acknowledgeFuture = jobMasterGateway.scheduleOrUpdateConsumers(partitionId, timeout);

	acknowledgeFuture.whenCompleteAsync(
		(Acknowledge ack, Throwable throwable) -> {
			if (throwable != null) {
				LOG.error("Could not schedule or update consumers at the JobManager.", throwable);

				taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers.", throwable));
			}
		},
		executor);
}
 
示例19
/**
 * Tests {@link ResultPartition#addBufferConsumer} on a partition which has already finished.
 *
 * @param partitionType the result partition type to set up
 */
private void testAddOnFinishedPartition(final ResultPartitionType partitionType) throws Exception {
	BufferConsumer bufferConsumer = createFilledBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
	ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
	JobID jobId = new JobID();
	TaskActions taskActions = new NoOpTaskActions();
	ResultPartitionWriter consumableNotifyingPartitionWriter = createConsumableNotifyingResultPartitionWriter(
		partitionType,
		taskActions,
		jobId,
		notifier);
	try {
		consumableNotifyingPartitionWriter.finish();
		reset(notifier);
		// partition.add() should fail
		consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
		Assert.fail("exception expected");
	} catch (IllegalStateException e) {
		// expected => ignored
	} finally {
		if (!bufferConsumer.isRecycled()) {
			bufferConsumer.close();
			Assert.fail("bufferConsumer not recycled");
		}
		// should not have notified either
		verify(notifier, never()).notifyPartitionConsumable(
			eq(jobId),
			eq(consumableNotifyingPartitionWriter.getPartitionId()),
			eq(taskActions));
	}
}
 
示例20
/**
 * Tests {@link ResultPartition#addBufferConsumer} on a partition which has already been released.
 *
 * @param partitionType the result partition type to set up
 */
private void testAddOnReleasedPartition(final ResultPartitionType partitionType) throws Exception {
	BufferConsumer bufferConsumer = createFilledBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
	ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
	JobID jobId = new JobID();
	TaskActions taskActions = new NoOpTaskActions();
	ResultPartition partition = partitionType == ResultPartitionType.BLOCKING ?
		createPartition(partitionType, fileChannelManager) : createPartition(partitionType);
	ResultPartitionWriter consumableNotifyingPartitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate(
		Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
		new ResultPartitionWriter[] {partition},
		taskActions,
		jobId,
		notifier)[0];
	try {
		partition.release();
		// partition.add() silently drops the bufferConsumer but recycles it
		consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
		assertTrue(partition.isReleased());
	} finally {
		if (!bufferConsumer.isRecycled()) {
			bufferConsumer.close();
			Assert.fail("bufferConsumer not recycled");
		}
		// should not have notified either
		verify(notifier, never()).notifyPartitionConsumable(eq(jobId), eq(partition.getPartitionId()), eq(taskActions));
	}
}
 
示例21
private ResultPartitionWriter createConsumableNotifyingResultPartitionWriter(
		ResultPartitionType partitionType,
		TaskActions taskActions,
		JobID jobId,
		ResultPartitionConsumableNotifier notifier) {
	ResultPartition partition = partitionType == ResultPartitionType.BLOCKING ?
		createPartition(partitionType, fileChannelManager) : createPartition(partitionType);
	return ConsumableNotifyingResultPartitionWriterDecorator.decorate(
		Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
		new ResultPartitionWriter[] {partition},
		taskActions,
		jobId,
		notifier)[0];
}
 
示例22
@Override
public void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, final TaskActions taskActions) {
	CompletableFuture<Acknowledge> acknowledgeFuture = jobMasterGateway.scheduleOrUpdateConsumers(partitionId, timeout);

	acknowledgeFuture.whenCompleteAsync(
		(Acknowledge ack, Throwable throwable) -> {
			if (throwable != null) {
				LOG.error("Could not schedule or update consumers at the JobManager.", throwable);

				taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers.", throwable));
			}
		},
		executor);
}
 
示例23
/**
 * Tests {@link ResultPartition#addBufferConsumer} on a partition which has already finished.
 *
 * @param partitionType the result partition type to set up
 */
private void testAddOnFinishedPartition(final ResultPartitionType partitionType) throws Exception {
	BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
	ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
	JobID jobId = new JobID();
	TaskActions taskActions = new NoOpTaskActions();
	ResultPartitionWriter consumableNotifyingPartitionWriter = createConsumableNotifyingResultPartitionWriter(
		partitionType,
		taskActions,
		jobId,
		notifier);
	try {
		consumableNotifyingPartitionWriter.finish();
		reset(notifier);
		// partition.add() should fail
		consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
		Assert.fail("exception expected");
	} catch (IllegalStateException e) {
		// expected => ignored
	} finally {
		if (!bufferConsumer.isRecycled()) {
			bufferConsumer.close();
			Assert.fail("bufferConsumer not recycled");
		}
		// should not have notified either
		verify(notifier, never()).notifyPartitionConsumable(
			eq(jobId),
			eq(consumableNotifyingPartitionWriter.getPartitionId()),
			eq(taskActions));
	}
}
 
示例24
/**
 * Tests {@link ResultPartition#addBufferConsumer} on a partition which has already been released.
 *
 * @param partitionType the result partition type to set up
 */
private void testAddOnReleasedPartition(final ResultPartitionType partitionType) throws Exception {
	BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
	ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
	JobID jobId = new JobID();
	TaskActions taskActions = new NoOpTaskActions();
	ResultPartition partition = partitionType == ResultPartitionType.BLOCKING ?
		createPartition(partitionType, fileChannelManager) : createPartition(partitionType);
	ResultPartitionWriter consumableNotifyingPartitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate(
		Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
		new ResultPartitionWriter[] {partition},
		taskActions,
		jobId,
		notifier)[0];
	try {
		partition.release();
		// partition.add() silently drops the bufferConsumer but recycles it
		consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
		assertTrue(partition.isReleased());
	} finally {
		if (!bufferConsumer.isRecycled()) {
			bufferConsumer.close();
			Assert.fail("bufferConsumer not recycled");
		}
		// should not have notified either
		verify(notifier, never()).notifyPartitionConsumable(eq(jobId), eq(partition.getPartitionId()), eq(taskActions));
	}
}
 
示例25
private ResultPartitionWriter createConsumableNotifyingResultPartitionWriter(
		ResultPartitionType partitionType,
		TaskActions taskActions,
		JobID jobId,
		ResultPartitionConsumableNotifier notifier) {
	ResultPartition partition = partitionType == ResultPartitionType.BLOCKING ?
		createPartition(partitionType, fileChannelManager) : createPartition(partitionType);
	return ConsumableNotifyingResultPartitionWriterDecorator.decorate(
		Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
		new ResultPartitionWriter[] {partition},
		taskActions,
		jobId,
		notifier)[0];
}
 
示例26
/**
 * Creates an input gate and all of its input channels.
 */
public static SingleInputGate create(
	String owningTaskName,
	JobID jobId,
	ExecutionAttemptID executionId,
	InputGateDeploymentDescriptor igdd,
	NetworkEnvironment networkEnvironment,
	TaskActions taskActions,
	TaskIOMetricGroup metrics) {

	final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId());
	final ResultPartitionType consumedPartitionType = checkNotNull(igdd.getConsumedPartitionType());

	final int consumedSubpartitionIndex = igdd.getConsumedSubpartitionIndex();
	checkArgument(consumedSubpartitionIndex >= 0);

	final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors());

	final SingleInputGate inputGate = new SingleInputGate(
		owningTaskName, jobId, consumedResultId, consumedPartitionType, consumedSubpartitionIndex,
		icdd.length, taskActions, metrics, networkEnvironment.isCreditBased());

	// Create the input channels. There is one input channel for each consumed partition.
	final InputChannel[] inputChannels = new InputChannel[icdd.length];

	int numLocalChannels = 0;
	int numRemoteChannels = 0;
	int numUnknownChannels = 0;

	for (int i = 0; i < inputChannels.length; i++) {
		final ResultPartitionID partitionId = icdd[i].getConsumedPartitionId();
		final ResultPartitionLocation partitionLocation = icdd[i].getConsumedPartitionLocation();

		if (partitionLocation.isLocal()) {
			inputChannels[i] = new LocalInputChannel(inputGate, i, partitionId,
				networkEnvironment.getResultPartitionManager(),
				networkEnvironment.getTaskEventDispatcher(),
				networkEnvironment.getPartitionRequestInitialBackoff(),
				networkEnvironment.getPartitionRequestMaxBackoff(),
				metrics
			);

			numLocalChannels++;
		}
		else if (partitionLocation.isRemote()) {
			inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId,
				partitionLocation.getConnectionId(),
				networkEnvironment.getConnectionManager(),
				networkEnvironment.getPartitionRequestInitialBackoff(),
				networkEnvironment.getPartitionRequestMaxBackoff(),
				metrics
			);

			numRemoteChannels++;
		}
		else if (partitionLocation.isUnknown()) {
			inputChannels[i] = new UnknownInputChannel(inputGate, i, partitionId,
				networkEnvironment.getResultPartitionManager(),
				networkEnvironment.getTaskEventDispatcher(),
				networkEnvironment.getConnectionManager(),
				networkEnvironment.getPartitionRequestInitialBackoff(),
				networkEnvironment.getPartitionRequestMaxBackoff(),
				metrics
			);

			numUnknownChannels++;
		}
		else {
			throw new IllegalStateException("Unexpected partition location.");
		}

		inputGate.setInputChannel(partitionId.getPartitionId(), inputChannels[i]);
	}

	LOG.debug("{}: Created {} input channels (local: {}, remote: {}, unknown: {}).",
		owningTaskName,
		inputChannels.length,
		numLocalChannels,
		numRemoteChannels,
		numUnknownChannels);

	return inputGate;
}
 
示例27
public ResultPartition(
	String owningTaskName,
	TaskActions taskActions, // actions on the owning task
	JobID jobId,
	ResultPartitionID partitionId,
	ResultPartitionType partitionType,
	int numberOfSubpartitions,
	int numTargetKeyGroups,
	ResultPartitionManager partitionManager,
	ResultPartitionConsumableNotifier partitionConsumableNotifier,
	IOManager ioManager,
	boolean sendScheduleOrUpdateConsumersMessage) {

	this.owningTaskName = checkNotNull(owningTaskName);
	this.taskActions = checkNotNull(taskActions);
	this.jobId = checkNotNull(jobId);
	this.partitionId = checkNotNull(partitionId);
	this.partitionType = checkNotNull(partitionType);
	this.subpartitions = new ResultSubpartition[numberOfSubpartitions];
	this.numTargetKeyGroups = numTargetKeyGroups;
	this.partitionManager = checkNotNull(partitionManager);
	this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier);
	this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;

	// Create the subpartitions.
	switch (partitionType) {
		case BLOCKING:
			for (int i = 0; i < subpartitions.length; i++) {
				subpartitions[i] = new SpillableSubpartition(i, this, ioManager);
			}

			break;

		case PIPELINED:
		case PIPELINED_BOUNDED:
			for (int i = 0; i < subpartitions.length; i++) {
				subpartitions[i] = new PipelinedSubpartition(i, this);
			}

			break;

		default:
			throw new IllegalArgumentException("Unsupported result partition type.");
	}

	// Initially, partitions should be consumed once before release.
	pin();

	LOG.debug("{}: Initialized {}", owningTaskName, this);
}
 
示例28
public TestSingleInputGate(int numberOfInputChannels, boolean initialize) {
	checkArgument(numberOfInputChannels >= 1);

	SingleInputGate realGate = new SingleInputGate(
		"Test Task Name",
		new JobID(),
		new IntermediateDataSetID(),
		ResultPartitionType.PIPELINED,
		0,
		numberOfInputChannels,
		mock(TaskActions.class),
		UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
		true);

	this.inputGate = spy(realGate);

	// Notify about late registrations (added for DataSinkTaskTest#testUnionDataSinkTask).
	// After merging registerInputOutput and invoke, we have to make sure that the test
	// notifications happen at the expected time. In real programs, this is guaranteed by
	// the instantiation and request partition life cycle.
	try {
		Field f = realGate.getClass().getDeclaredField("inputChannelsWithData");
		f.setAccessible(true);
		final ArrayDeque<InputChannel> notifications = (ArrayDeque<InputChannel>) f.get(realGate);

		doAnswer(new Answer<Void>() {
			@Override
			public Void answer(InvocationOnMock invocation) throws Throwable {
				invocation.callRealMethod();

				synchronized (notifications) {
					if (!notifications.isEmpty()) {
						InputGateListener listener = (InputGateListener) invocation.getArguments()[0];
						listener.notifyInputGateNonEmpty(inputGate);
					}
				}

				return null;
			}
		}).when(inputGate).registerListener(any(InputGateListener.class));
	} catch (Exception e) {
		throw new RuntimeException(e);
	}

	this.inputChannels = new TestInputChannel[numberOfInputChannels];

	if (initialize) {
		for (int i = 0; i < numberOfInputChannels; i++) {
			inputChannels[i] = new TestInputChannel(inputGate, i);
			inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannels[i]);
		}
	}
}
 
示例29
/**
 * Tests basic correctness of buffer-or-event interleaving and correct <code>null</code> return
 * value after receiving all end-of-partition events.
 *
 * <p>For buffer-or-event instances, it is important to verify that they have been set off to
 * the correct logical index.
 */
@Test(timeout = 120 * 1000)
public void testBasicGetNextLogic() throws Exception {
	// Setup
	final String testTaskName = "Test Task";
	final SingleInputGate ig1 = new SingleInputGate(
		testTaskName, new JobID(),
		new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
		0, 3,
		mock(TaskActions.class),
		UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
		true);
	final SingleInputGate ig2 = new SingleInputGate(
		testTaskName, new JobID(),
		new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
		0, 5,
		mock(TaskActions.class),
		UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
		true);

	final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});

	assertEquals(ig1.getNumberOfInputChannels() + ig2.getNumberOfInputChannels(), union.getNumberOfInputChannels());

	final TestInputChannel[][] inputChannels = new TestInputChannel[][]{
			TestInputChannel.createInputChannels(ig1, 3),
			TestInputChannel.createInputChannels(ig2, 5)
	};

	inputChannels[0][0].readBuffer(); // 0 => 0
	inputChannels[0][0].readEndOfPartitionEvent(); // 0 => 0
	inputChannels[1][2].readBuffer(); // 2 => 5
	inputChannels[1][2].readEndOfPartitionEvent(); // 2 => 5
	inputChannels[1][0].readBuffer(); // 0 => 3
	inputChannels[1][1].readBuffer(); // 1 => 4
	inputChannels[0][1].readBuffer(); // 1 => 1
	inputChannels[1][3].readBuffer(); // 3 => 6
	inputChannels[0][1].readEndOfPartitionEvent(); // 1 => 1
	inputChannels[1][3].readEndOfPartitionEvent(); // 3 => 6
	inputChannels[0][2].readBuffer(); // 1 => 2
	inputChannels[0][2].readEndOfPartitionEvent(); // 1 => 2
	inputChannels[1][4].readBuffer(); // 4 => 7
	inputChannels[1][4].readEndOfPartitionEvent(); // 4 => 7
	inputChannels[1][1].readEndOfPartitionEvent(); // 0 => 3
	inputChannels[1][0].readEndOfPartitionEvent(); // 0 => 3

	ig1.notifyChannelNonEmpty(inputChannels[0][0]);
	ig1.notifyChannelNonEmpty(inputChannels[0][1]);
	ig1.notifyChannelNonEmpty(inputChannels[0][2]);

	ig2.notifyChannelNonEmpty(inputChannels[1][0]);
	ig2.notifyChannelNonEmpty(inputChannels[1][1]);
	ig2.notifyChannelNonEmpty(inputChannels[1][2]);
	ig2.notifyChannelNonEmpty(inputChannels[1][3]);
	ig2.notifyChannelNonEmpty(inputChannels[1][4]);

	verifyBufferOrEvent(union, true, 0, true); // gate 1, channel 0
	verifyBufferOrEvent(union, true, 3, true); // gate 2, channel 0
	verifyBufferOrEvent(union, true, 1, true); // gate 1, channel 1
	verifyBufferOrEvent(union, true, 4, true); // gate 2, channel 1
	verifyBufferOrEvent(union, true, 2, true); // gate 1, channel 2
	verifyBufferOrEvent(union, true, 5, true); // gate 2, channel 1
	verifyBufferOrEvent(union, false, 0, true); // gate 1, channel 0
	verifyBufferOrEvent(union, true, 6, true); // gate 2, channel 1
	verifyBufferOrEvent(union, false, 1, true); // gate 1, channel 1
	verifyBufferOrEvent(union, true, 7, true); // gate 2, channel 1
	verifyBufferOrEvent(union, false, 2, true); // gate 1, channel 2
	verifyBufferOrEvent(union, false, 3, true); // gate 2, channel 0
	verifyBufferOrEvent(union, false, 4, true); // gate 2, channel 1
	verifyBufferOrEvent(union, false, 5, true); // gate 2, channel 2
	verifyBufferOrEvent(union, false, 6, true); // gate 2, channel 3
	verifyBufferOrEvent(union, false, 7, false); // gate 2, channel 4

	// Return null when the input gate has received all end-of-partition events
	assertTrue(union.isFinished());
	assertFalse(union.getNextBufferOrEvent().isPresent());
}
 
示例30
/**
 * Tests request back off configuration is correctly forwarded to the channels.
 */
@Test
public void testRequestBackoffConfiguration() throws Exception {
	ResultPartitionID[] partitionIds = new ResultPartitionID[] {
		new ResultPartitionID(),
		new ResultPartitionID(),
		new ResultPartitionID()
	};

	InputChannelDeploymentDescriptor[] channelDescs = new InputChannelDeploymentDescriptor[]{
		// Local
		new InputChannelDeploymentDescriptor(
			partitionIds[0],
			ResultPartitionLocation.createLocal()),
		// Remote
		new InputChannelDeploymentDescriptor(
			partitionIds[1],
			ResultPartitionLocation.createRemote(new ConnectionID(new InetSocketAddress("localhost", 5000), 0))),
		// Unknown
		new InputChannelDeploymentDescriptor(
			partitionIds[2],
			ResultPartitionLocation.createUnknown())};

	InputGateDeploymentDescriptor gateDesc =
		new InputGateDeploymentDescriptor(new IntermediateDataSetID(),
			ResultPartitionType.PIPELINED, 0, channelDescs);

	int initialBackoff = 137;
	int maxBackoff = 1001;

	final NetworkEnvironment netEnv = new NetworkEnvironment(
		100, 32, initialBackoff, maxBackoff, 2, 8, enableCreditBasedFlowControl);

	SingleInputGate gate = SingleInputGate.create(
		"TestTask",
		new JobID(),
		new ExecutionAttemptID(),
		gateDesc,
		netEnv,
		mock(TaskActions.class),
		UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());

	try {
		assertEquals(gateDesc.getConsumedPartitionType(), gate.getConsumedPartitionType());

		Map<IntermediateResultPartitionID, InputChannel> channelMap = gate.getInputChannels();

		assertEquals(3, channelMap.size());
		InputChannel localChannel = channelMap.get(partitionIds[0].getPartitionId());
		assertEquals(LocalInputChannel.class, localChannel.getClass());

		InputChannel remoteChannel = channelMap.get(partitionIds[1].getPartitionId());
		assertEquals(RemoteInputChannel.class, remoteChannel.getClass());

		InputChannel unknownChannel = channelMap.get(partitionIds[2].getPartitionId());
		assertEquals(UnknownInputChannel.class, unknownChannel.getClass());

		InputChannel[] channels =
			new InputChannel[] {localChannel, remoteChannel, unknownChannel};
		for (InputChannel ch : channels) {
			assertEquals(0, ch.getCurrentBackoff());

			assertTrue(ch.increaseBackoff());
			assertEquals(initialBackoff, ch.getCurrentBackoff());

			assertTrue(ch.increaseBackoff());
			assertEquals(initialBackoff * 2, ch.getCurrentBackoff());

			assertTrue(ch.increaseBackoff());
			assertEquals(initialBackoff * 2 * 2, ch.getCurrentBackoff());

			assertTrue(ch.increaseBackoff());
			assertEquals(maxBackoff, ch.getCurrentBackoff());

			assertFalse(ch.increaseBackoff());
		}
	} finally {
		gate.releaseAllResources();
		netEnv.shutdown();
	}
}