Java源码示例:org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager

示例1
TaskManagerServices(
	TaskManagerLocation taskManagerLocation,
	MemoryManager memoryManager,
	IOManager ioManager,
	NetworkEnvironment networkEnvironment,
	BroadcastVariableManager broadcastVariableManager,
	TaskSlotTable taskSlotTable,
	JobManagerTable jobManagerTable,
	JobLeaderService jobLeaderService,
	TaskExecutorLocalStateStoresManager taskManagerStateStore) {

	this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
	this.memoryManager = Preconditions.checkNotNull(memoryManager);
	this.ioManager = Preconditions.checkNotNull(ioManager);
	this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment);
	this.broadcastVariableManager = Preconditions.checkNotNull(broadcastVariableManager);
	this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable);
	this.jobManagerTable = Preconditions.checkNotNull(jobManagerTable);
	this.jobLeaderService = Preconditions.checkNotNull(jobLeaderService);
	this.taskManagerStateStore = Preconditions.checkNotNull(taskManagerStateStore);
}
 
示例2
public TaskManagerServicesBuilder() {
	taskManagerLocation = new LocalTaskManagerLocation();
	memoryManager = new MemoryManager(
		MemoryManager.MIN_PAGE_SIZE,
		1,
		MemoryManager.MIN_PAGE_SIZE,
		MemoryType.HEAP,
		false);
	ioManager = mock(IOManager.class);
	networkEnvironment = mock(NetworkEnvironment.class);
	broadcastVariableManager = new BroadcastVariableManager();
	taskSlotTable = mock(TaskSlotTable.class);
	jobManagerTable = new JobManagerTable();
	jobLeaderService = new JobLeaderService(taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
	taskStateManager = mock(TaskExecutorLocalStateStoresManager.class);
}
 
示例3
TaskManagerServices(
	TaskManagerLocation taskManagerLocation,
	MemoryManager memoryManager,
	IOManager ioManager,
	ShuffleEnvironment<?, ?> shuffleEnvironment,
	KvStateService kvStateService,
	BroadcastVariableManager broadcastVariableManager,
	TaskSlotTable taskSlotTable,
	JobManagerTable jobManagerTable,
	JobLeaderService jobLeaderService,
	TaskExecutorLocalStateStoresManager taskManagerStateStore,
	TaskEventDispatcher taskEventDispatcher) {

	this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
	this.memoryManager = Preconditions.checkNotNull(memoryManager);
	this.ioManager = Preconditions.checkNotNull(ioManager);
	this.shuffleEnvironment = Preconditions.checkNotNull(shuffleEnvironment);
	this.kvStateService = Preconditions.checkNotNull(kvStateService);
	this.broadcastVariableManager = Preconditions.checkNotNull(broadcastVariableManager);
	this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable);
	this.jobManagerTable = Preconditions.checkNotNull(jobManagerTable);
	this.jobLeaderService = Preconditions.checkNotNull(jobLeaderService);
	this.taskManagerStateStore = Preconditions.checkNotNull(taskManagerStateStore);
	this.taskEventDispatcher = Preconditions.checkNotNull(taskEventDispatcher);
}
 
示例4
public TaskManagerServicesBuilder() {
	taskManagerLocation = new LocalTaskManagerLocation();
	memoryManager = new MemoryManager(
		MemoryManager.MIN_PAGE_SIZE,
		1,
		MemoryManager.MIN_PAGE_SIZE,
		MemoryType.HEAP,
		false);
	ioManager = mock(IOManager.class);
	shuffleEnvironment = mock(ShuffleEnvironment.class);
	kvStateService = new KvStateService(new KvStateRegistry(), null, null);
	broadcastVariableManager = new BroadcastVariableManager();
	taskEventDispatcher = new TaskEventDispatcher();
	taskSlotTable = mock(TaskSlotTable.class);
	jobManagerTable = new JobManagerTable();
	jobLeaderService = new JobLeaderService(taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
	taskStateManager = mock(TaskExecutorLocalStateStoresManager.class);
}
 
示例5
private TaskExecutorTestingContext createTaskExecutorTestingContext(final TaskSlotTable<Task> taskSlotTable) throws IOException {
	final OneShotLatch offerSlotsLatch = new OneShotLatch();
	final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder()
		.setOfferSlotsFunction((resourceID, slotOffers) -> {
			offerSlotsLatch.trigger();
			return CompletableFuture.completedFuture(slotOffers);
		}).build();
	rpc.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);

	final JobLeaderService jobLeaderService = new DefaultJobLeaderService(
		unresolvedTaskManagerLocation,
		RetryingRegistrationConfiguration.defaultConfiguration());

	TaskExecutorLocalStateStoresManager stateStoresManager = createTaskExecutorLocalStateStoresManager();
	final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(new TaskManagerServicesBuilder()
		.setTaskSlotTable(taskSlotTable)
		.setJobLeaderService(jobLeaderService)
		.setTaskStateManager(stateStoresManager)
		.build());

	jobManagerLeaderRetriever.notifyListener(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
	return new TaskExecutorTestingContext(jobMasterGateway, taskSlotTable, taskExecutor);
}
 
示例6
@Test
public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
	final String resourceManagerAddress = "/resource/manager/address/one";

	final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
	final CountDownLatch taskManagerRegisteredLatch = new CountDownLatch(1);
	testingResourceManagerGateway.setRegisterTaskExecutorFunction(FunctionUtils.uncheckedFunction(
		ignored -> {
			taskManagerRegisteredLatch.countDown();
			return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(
				new InstanceID(), new ResourceID(resourceManagerAddress), new ClusterInformation("localhost", 1234)));
		}
	));

	rpc.registerGateway(resourceManagerAddress, testingResourceManagerGateway);

	final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
	final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
	final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
		.setTaskManagerLocation(taskManagerLocation)
		.setTaskSlotTable(taskSlotTable)
		.setTaskStateManager(localStateStoresManager)
		.build();

	final TaskExecutor taskManager = createTaskExecutor(taskManagerServices);

	try {
		taskManager.start();
		resourceManagerLeaderRetriever.notifyListener(resourceManagerAddress, UUID.randomUUID());

		assertTrue(taskManagerRegisteredLatch.await(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
	} finally {
		RpcUtils.terminateRpcEndpoint(taskManager, timeout);
	}
}
 
示例7
@Test
public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
	final String resourceManagerAddress = "/resource/manager/address/one";

	final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
	final CountDownLatch taskManagerRegisteredLatch = new CountDownLatch(1);
	testingResourceManagerGateway.setRegisterTaskExecutorFunction(FunctionUtils.uncheckedFunction(
		ignored -> {
			taskManagerRegisteredLatch.countDown();
			return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(
				new InstanceID(), new ResourceID(resourceManagerAddress), new ClusterInformation("localhost", 1234)));
		}
	));

	rpc.registerGateway(resourceManagerAddress, testingResourceManagerGateway);

	final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
	final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
	final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
		.setTaskManagerLocation(taskManagerLocation)
		.setTaskSlotTable(taskSlotTable)
		.setTaskStateManager(localStateStoresManager)
		.build();

	final TaskExecutor taskManager = createTaskExecutor(taskManagerServices);

	try {
		taskManager.start();
		resourceManagerLeaderRetriever.notifyListener(resourceManagerAddress, UUID.randomUUID());

		assertTrue(taskManagerRegisteredLatch.await(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
	} finally {
		RpcUtils.terminateRpcEndpoint(taskManager, timeout);
	}
}
 
示例8
TaskManagerServices(
	UnresolvedTaskManagerLocation unresolvedTaskManagerLocation,
	long managedMemorySize,
	IOManager ioManager,
	ShuffleEnvironment<?, ?> shuffleEnvironment,
	KvStateService kvStateService,
	BroadcastVariableManager broadcastVariableManager,
	TaskSlotTable<Task> taskSlotTable,
	JobTable jobTable,
	JobLeaderService jobLeaderService,
	TaskExecutorLocalStateStoresManager taskManagerStateStore,
	TaskEventDispatcher taskEventDispatcher,
	ExecutorService ioExecutor,
	LibraryCacheManager libraryCacheManager) {

	this.unresolvedTaskManagerLocation = Preconditions.checkNotNull(unresolvedTaskManagerLocation);
	this.managedMemorySize = managedMemorySize;
	this.ioManager = Preconditions.checkNotNull(ioManager);
	this.shuffleEnvironment = Preconditions.checkNotNull(shuffleEnvironment);
	this.kvStateService = Preconditions.checkNotNull(kvStateService);
	this.broadcastVariableManager = Preconditions.checkNotNull(broadcastVariableManager);
	this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable);
	this.jobTable = Preconditions.checkNotNull(jobTable);
	this.jobLeaderService = Preconditions.checkNotNull(jobLeaderService);
	this.taskManagerStateStore = Preconditions.checkNotNull(taskManagerStateStore);
	this.taskEventDispatcher = Preconditions.checkNotNull(taskEventDispatcher);
	this.ioExecutor = Preconditions.checkNotNull(ioExecutor);
	this.libraryCacheManager = Preconditions.checkNotNull(libraryCacheManager);
}
 
示例9
public TaskManagerServicesBuilder() {
	unresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
	ioManager = mock(IOManager.class);
	shuffleEnvironment = mock(ShuffleEnvironment.class);
	kvStateService = new KvStateService(new KvStateRegistry(), null, null);
	broadcastVariableManager = new BroadcastVariableManager();
	taskEventDispatcher = new TaskEventDispatcher();
	taskSlotTable = TestingTaskSlotTable.<Task>newBuilder().closeAsyncReturns(CompletableFuture.completedFuture(null)).build();
	jobTable = DefaultJobTable.create();
	jobLeaderService = new DefaultJobLeaderService(unresolvedTaskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
	taskStateManager = mock(TaskExecutorLocalStateStoresManager.class);
	ioExecutor = TestingUtils.defaultExecutor();
	libraryCacheManager = TestingLibraryCacheManager.newBuilder().build();
}
 
示例10
@Test
public void testShouldShutDownTaskManagerServicesInPostStop() throws Exception {
	final TaskSlotTableImpl<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);

	final JobLeaderService jobLeaderService = new DefaultJobLeaderService(unresolvedTaskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());

	final IOManager ioManager = new IOManagerAsync(tmp.newFolder().getAbsolutePath());

	final TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
		false,
		ioManager.getSpillingDirectories(),
		Executors.directExecutor());

	nettyShuffleEnvironment.start();

	final KvStateService kvStateService = new KvStateService(new KvStateRegistry(), null, null);
	kvStateService.start();

	final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
		.setUnresolvedTaskManagerLocation(unresolvedTaskManagerLocation)
		.setIoManager(ioManager)
		.setShuffleEnvironment(nettyShuffleEnvironment)
		.setKvStateService(kvStateService)
		.setTaskSlotTable(taskSlotTable)
		.setJobLeaderService(jobLeaderService)
		.setTaskStateManager(localStateStoresManager)
		.build();

	final TaskExecutor taskManager = createTaskExecutor(taskManagerServices);

	try {
		taskManager.start();
	} finally {
		RpcUtils.terminateRpcEndpoint(taskManager, timeout);
	}

	assertThat(taskSlotTable.isClosed(), is(true));
	assertThat(nettyShuffleEnvironment.isClosed(), is(true));
	assertThat(kvStateService.isShutdown(), is(true));
}
 
示例11
@Test
public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
	final String resourceManagerAddress = "/resource/manager/address/one";

	final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
	final CountDownLatch taskManagerRegisteredLatch = new CountDownLatch(1);
	testingResourceManagerGateway.setRegisterTaskExecutorFunction(FunctionUtils.uncheckedFunction(
		ignored -> {
			taskManagerRegisteredLatch.countDown();
			return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(
				new InstanceID(), new ResourceID(resourceManagerAddress), new ClusterInformation("localhost", 1234)));
		}
	));

	rpc.registerGateway(resourceManagerAddress, testingResourceManagerGateway);

	final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
	final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
	final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
		.setUnresolvedTaskManagerLocation(unresolvedTaskManagerLocation)
		.setTaskSlotTable(taskSlotTable)
		.setTaskStateManager(localStateStoresManager)
		.build();

	final TaskExecutor taskManager = createTaskExecutor(taskManagerServices);

	try {
		taskManager.start();
		resourceManagerLeaderRetriever.notifyListener(resourceManagerAddress, UUID.randomUUID());

		assertTrue(taskManagerRegisteredLatch.await(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
	} finally {
		RpcUtils.terminateRpcEndpoint(taskManager, timeout);
	}
}
 
示例12
public TaskExecutorLocalStateStoresManager getTaskManagerStateStore() {
	return taskManagerStateStore;
}
 
示例13
/**
 * Creates and returns the task manager services.
 *
 * @param resourceID resource ID of the task manager
 * @param taskManagerServicesConfiguration task manager configuration
 * @param taskIOExecutor executor for async IO operations.
 * @param freeHeapMemoryWithDefrag an estimate of the size of the free heap memory
 * @param maxJvmHeapMemory the maximum JVM heap size
 * @return task manager components
 * @throws Exception
 */
public static TaskManagerServices fromConfiguration(
		TaskManagerServicesConfiguration taskManagerServicesConfiguration,
		ResourceID resourceID,
		Executor taskIOExecutor,
		long freeHeapMemoryWithDefrag,
		long maxJvmHeapMemory) throws Exception {

	// pre-start checks
	checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());

	final NetworkEnvironment network = createNetworkEnvironment(taskManagerServicesConfiguration, maxJvmHeapMemory);
	network.start();

	final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
		resourceID,
		taskManagerServicesConfiguration.getTaskManagerAddress(),
		network.getConnectionManager().getDataPort());

	// this call has to happen strictly after the network stack has been initialized
	final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration, freeHeapMemoryWithDefrag, maxJvmHeapMemory);

	// start the I/O manager, it will create some temp directories.
	final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());

	final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();

	final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());

	for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
		resourceProfiles.add(ResourceProfile.ANY);
	}

	final TimerService<AllocationID> timerService = new TimerService<>(
		new ScheduledThreadPoolExecutor(1),
		taskManagerServicesConfiguration.getTimerServiceShutdownTimeout());

	final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService);

	final JobManagerTable jobManagerTable = new JobManagerTable();

	final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());

	final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();

	final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];

	for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
		stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
	}

	final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
		taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
		stateRootDirectoryFiles,
		taskIOExecutor);

	return new TaskManagerServices(
		taskManagerLocation,
		memoryManager,
		ioManager,
		network,
		broadcastVariableManager,
		taskSlotTable,
		jobManagerTable,
		jobLeaderService,
		taskStateManager);
}
 
示例14
public TaskManagerServicesBuilder setTaskStateManager(TaskExecutorLocalStateStoresManager taskStateManager) {
	this.taskStateManager = taskStateManager;
	return this;
}
 
示例15
@Test
public void testShouldShutDownTaskManagerServicesInPostStop() throws Exception {
	final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);

	final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());

	final IOManager ioManager = new IOManagerAsync(tmp.newFolder().getAbsolutePath());

	final TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
		false,
		ioManager.getSpillingDirectories(),
		Executors.directExecutor());

	final MemoryManager memoryManager = new MemoryManager(
		4096,
		1,
		4096,
		MemoryType.HEAP,
		false);

	final NetworkEnvironment networkEnvironment = new NetworkEnvironment(
		1,
		1,
		0,
		0,
		2,
		8,
		true);
	networkEnvironment.start();

	final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
		.setTaskManagerLocation(taskManagerLocation)
		.setMemoryManager(memoryManager)
		.setIoManager(ioManager)
		.setNetworkEnvironment(networkEnvironment)
		.setTaskSlotTable(taskSlotTable)
		.setJobLeaderService(jobLeaderService)
		.setTaskStateManager(localStateStoresManager)
		.build();

	final long heartbeatInterval = 1000L;
	final long heartbeatTimeout = 1000L;
	final HeartbeatServices heartbeatServices = new HeartbeatServices(heartbeatInterval, heartbeatTimeout);

	final TaskExecutor taskManager = new TaskExecutor(
		rpc,
		taskManagerConfiguration,
		haServices,
		taskManagerServices,
		heartbeatServices,
		UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
		null,
		dummyBlobCacheService,
		testingFatalErrorHandler);

	try {
		taskManager.start();
	} finally {
		RpcUtils.terminateRpcEndpoint(taskManager, timeout);
	}

	assertThat(memoryManager.isShutdown(), is(true));
	assertThat(networkEnvironment.isShutdown(), is(true));
	assertThat(ioManager.isProperlyShutDown(), is(true));
}
 
示例16
@Test
public void testHeartbeatTimeoutWithResourceManager() throws Exception {
	final String rmAddress = "rm";
	final ResourceID rmResourceId = new ResourceID(rmAddress);

	final long heartbeatInterval = 1L;
	final long heartbeatTimeout = 3L;

	final ResourceManagerId rmLeaderId = ResourceManagerId.generate();

	TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway(
		rmLeaderId,
		rmResourceId,
		rmAddress,
		rmAddress);

	final TaskExecutorRegistrationSuccess registrationResponse = new TaskExecutorRegistrationSuccess(
		new InstanceID(),
		rmResourceId,
		new ClusterInformation("localhost", 1234));

	final CompletableFuture<ResourceID> taskExecutorRegistrationFuture = new CompletableFuture<>();
	final CountDownLatch registrationAttempts = new CountDownLatch(2);
	rmGateway.setRegisterTaskExecutorFunction(
		registration -> {
			taskExecutorRegistrationFuture.complete(registration.f1);
			registrationAttempts.countDown();
			return CompletableFuture.completedFuture(registrationResponse);
		});

	final CompletableFuture<ResourceID> taskExecutorDisconnectFuture = new CompletableFuture<>();
	rmGateway.setDisconnectTaskExecutorConsumer(
		disconnectInfo -> taskExecutorDisconnectFuture.complete(disconnectInfo.f0));

	rpc.registerGateway(rmAddress, rmGateway);

	final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
	final SlotReport slotReport = new SlotReport();
	when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);

	HeartbeatServices heartbeatServices = new HeartbeatServices(heartbeatInterval, heartbeatTimeout);

	final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();

	final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
		.setTaskManagerLocation(taskManagerLocation)
		.setTaskSlotTable(taskSlotTable)
		.setTaskStateManager(localStateStoresManager)
		.build();

	final TaskExecutor taskManager = new TaskExecutor(
		rpc,
		taskManagerConfiguration,
		haServices,
		taskManagerServices,
		heartbeatServices,
		UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
		null,
		dummyBlobCacheService,
		testingFatalErrorHandler);

	try {
		taskManager.start();

		// define a leader and see that a registration happens
		resourceManagerLeaderRetriever.notifyListener(rmAddress, rmLeaderId.toUUID());

		// register resource manager success will trigger monitoring heartbeat target between tm and rm
		assertThat(taskExecutorRegistrationFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS), equalTo(taskManagerLocation.getResourceID()));

		// heartbeat timeout should trigger disconnect TaskManager from ResourceManager
		assertThat(taskExecutorDisconnectFuture.get(heartbeatTimeout * 50L, TimeUnit.MILLISECONDS), equalTo(taskManagerLocation.getResourceID()));

		assertTrue(
			"The TaskExecutor should try to reconnect to the RM",
			registrationAttempts.await(timeout.toMilliseconds(), TimeUnit.SECONDS));
	} finally {
		RpcUtils.terminateRpcEndpoint(taskManager, timeout);
	}
}
 
示例17
/**
 * Tests that the correct slot report is sent as part of the heartbeat response.
 */
@Test
public void testHeartbeatSlotReporting() throws Exception {
	final String rmAddress = "rm";
	final UUID rmLeaderId = UUID.randomUUID();

	// register the mock resource manager gateway
	final TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway();
	final CompletableFuture<ResourceID> taskExecutorRegistrationFuture = new CompletableFuture<>();
	final ResourceID rmResourceId = rmGateway.getOwnResourceId();
	final CompletableFuture<RegistrationResponse> registrationResponse = CompletableFuture.completedFuture(
		new TaskExecutorRegistrationSuccess(
			new InstanceID(),
			rmResourceId,
			new ClusterInformation("localhost", 1234)));

	rmGateway.setRegisterTaskExecutorFunction(stringResourceIDIntegerHardwareDescriptionTuple4 -> {
		taskExecutorRegistrationFuture.complete(stringResourceIDIntegerHardwareDescriptionTuple4.f1);
		return registrationResponse;
	});

	final CompletableFuture<SlotReport> initialSlotReportFuture = new CompletableFuture<>();
	rmGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
		initialSlotReportFuture.complete(resourceIDInstanceIDSlotReportTuple3.f2);
		return CompletableFuture.completedFuture(Acknowledge.get());
	});

	final CompletableFuture<SlotReport> heartbeatSlotReportFuture = new CompletableFuture<>();
	rmGateway.setTaskExecutorHeartbeatConsumer((resourceID, slotReport) -> heartbeatSlotReportFuture.complete(slotReport));

	rpc.registerGateway(rmAddress, rmGateway);

	final SlotID slotId = new SlotID(taskManagerLocation.getResourceID(), 0);
	final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
	final SlotReport slotReport1 = new SlotReport(
		new SlotStatus(
			slotId,
			resourceProfile));
	final SlotReport slotReport2 = new SlotReport(
		new SlotStatus(
			slotId,
			resourceProfile,
			new JobID(),
			new AllocationID()));

	final TestingTaskSlotTable taskSlotTable = new TestingTaskSlotTable(new ArrayDeque<>(Arrays.asList(slotReport1, slotReport2)));

	final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();

	final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
		.setTaskManagerLocation(taskManagerLocation)
		.setTaskSlotTable(taskSlotTable)
		.setTaskStateManager(localStateStoresManager)
		.build();

	final TaskExecutor taskManager = new TaskExecutor(
		rpc,
		taskManagerConfiguration,
		haServices,
		taskManagerServices,
		HEARTBEAT_SERVICES,
		UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
		null,
		dummyBlobCacheService,
		testingFatalErrorHandler);

	try {
		taskManager.start();

		// define a leader and see that a registration happens
		resourceManagerLeaderRetriever.notifyListener(rmAddress, rmLeaderId);

		// register resource manager success will trigger monitoring heartbeat target between tm and rm
		assertThat(taskExecutorRegistrationFuture.get(), equalTo(taskManagerLocation.getResourceID()));
		assertThat(initialSlotReportFuture.get(), equalTo(slotReport1));

		TaskExecutorGateway taskExecutorGateway = taskManager.getSelfGateway(TaskExecutorGateway.class);

		// trigger the heartbeat asynchronously
		taskExecutorGateway.heartbeatFromResourceManager(rmResourceId);

		// wait for heartbeat response
		SlotReport actualSlotReport = heartbeatSlotReportFuture.get();

		// the new slot report should be reported
		assertEquals(slotReport2, actualSlotReport);
	} finally {
		RpcUtils.terminateRpcEndpoint(taskManager, timeout);
	}
}
 
示例18
@Test
public void testTriggerRegistrationOnLeaderChange() throws Exception {
	final String address1 = "/resource/manager/address/one";
	final String address2 = "/resource/manager/address/two";
	final UUID leaderId1 = UUID.randomUUID();
	final UUID leaderId2 = UUID.randomUUID();
	final ResourceID rmResourceId1 = new ResourceID(address1);
	final ResourceID rmResourceId2 = new ResourceID(address2);

	// register the mock resource manager gateways
	ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class);
	ResourceManagerGateway rmGateway2 = mock(ResourceManagerGateway.class);

	when(rmGateway1.registerTaskExecutor(
				anyString(), any(ResourceID.class), anyInt(), any(HardwareDescription.class), any(Time.class)))
		.thenReturn(CompletableFuture.completedFuture(
			new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId1, new ClusterInformation("localhost", 1234))));
	when(rmGateway2.registerTaskExecutor(
				anyString(), any(ResourceID.class), anyInt(), any(HardwareDescription.class), any(Time.class)))
		.thenReturn(CompletableFuture.completedFuture(
			new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId2, new ClusterInformation("localhost", 1234))));

	rpc.registerGateway(address1, rmGateway1);
	rpc.registerGateway(address2, rmGateway2);

	final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
	final SlotReport slotReport = new SlotReport();
	when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);

	final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();

	final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
		.setTaskManagerLocation(taskManagerLocation)
		.setTaskSlotTable(taskSlotTable)
		.setTaskStateManager(localStateStoresManager)
		.build();

	TaskExecutor taskManager = new TaskExecutor(
		rpc,
		taskManagerConfiguration,
		haServices,
		taskManagerServices,
		HEARTBEAT_SERVICES,
		UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
		null,
		dummyBlobCacheService,
		testingFatalErrorHandler);

	try {
		taskManager.start();
		String taskManagerAddress = taskManager.getAddress();

		// no connection initially, since there is no leader
		assertNull(taskManager.getResourceManagerConnection());

		// define a leader and see that a registration happens
		resourceManagerLeaderRetriever.notifyListener(address1, leaderId1);

		verify(rmGateway1, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
				eq(taskManagerAddress), eq(taskManagerLocation.getResourceID()), anyInt(), any(HardwareDescription.class), any(Time.class));
		assertNotNull(taskManager.getResourceManagerConnection());

		// cancel the leader
		resourceManagerLeaderRetriever.notifyListener(null, null);

		// set a new leader, see that a registration happens
		resourceManagerLeaderRetriever.notifyListener(address2, leaderId2);

		verify(rmGateway2, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
				eq(taskManagerAddress), eq(taskManagerLocation.getResourceID()), anyInt(), any(HardwareDescription.class), any(Time.class));
		assertNotNull(taskManager.getResourceManagerConnection());
	}
	finally {
		RpcUtils.terminateRpcEndpoint(taskManager, timeout);
	}
}
 
示例19
/**
 * Tests that a TaskManager detects a job leader for which it has reserved slots. Upon detecting
 * the job leader, it will offer all reserved slots to the JobManager.
 */
@Test
public void testJobLeaderDetection() throws Exception {
	final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
	final JobManagerTable jobManagerTable = new JobManagerTable();
	final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());

	final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
	CompletableFuture<Void> initialSlotReportFuture = new CompletableFuture<>();
	resourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
		initialSlotReportFuture.complete(null);
		return CompletableFuture.completedFuture(Acknowledge.get());
	});

	final CompletableFuture<Collection<SlotOffer>> offeredSlotsFuture = new CompletableFuture<>();
	final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder()
		.setOfferSlotsFunction((resourceID, slotOffers) -> {

			offeredSlotsFuture.complete(new ArrayList<>(slotOffers));
			return CompletableFuture.completedFuture(slotOffers);
		})
		.build();

	rpc.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway);
	rpc.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);

	final AllocationID allocationId = new AllocationID();
	final SlotID slotId = new SlotID(taskManagerLocation.getResourceID(), 0);

	final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();

	final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
		.setTaskManagerLocation(taskManagerLocation)
		.setTaskSlotTable(taskSlotTable)
		.setJobManagerTable(jobManagerTable)
		.setJobLeaderService(jobLeaderService)
		.setTaskStateManager(localStateStoresManager)
		.build();

	TaskExecutor taskManager = new TaskExecutor(
		rpc,
		taskManagerConfiguration,
		haServices,
		taskManagerServices,
		HEARTBEAT_SERVICES,
		UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
		null,
		dummyBlobCacheService,
		testingFatalErrorHandler);

	try {
		taskManager.start();

		final TaskExecutorGateway tmGateway = taskManager.getSelfGateway(TaskExecutorGateway.class);

		// tell the task manager about the rm leader
		resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID());

		// wait for the initial slot report
		initialSlotReportFuture.get();

		// request slots from the task manager under the given allocation id
		CompletableFuture<Acknowledge> slotRequestAck = tmGateway.requestSlot(
			slotId,
			jobId,
			allocationId,
			jobMasterGateway.getAddress(),
			resourceManagerGateway.getFencingToken(),
			timeout);

		slotRequestAck.get();

		// now inform the task manager about the new job leader
		jobManagerLeaderRetriever.notifyListener(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());

		final Collection<SlotOffer> offeredSlots = offeredSlotsFuture.get();
		final Collection<AllocationID> allocationIds = offeredSlots.stream().map(SlotOffer::getAllocationId).collect(Collectors.toList());
		assertThat(allocationIds, containsInAnyOrder(allocationId));
	} finally {
		RpcUtils.terminateRpcEndpoint(taskManager, timeout);
	}
}
 
示例20
/**
 * This tests makes sure that duplicate JobMaster gained leadership messages are filtered out
 * by the TaskExecutor. See FLINK-7526.
 */
@Test
public void testFilterOutDuplicateJobMasterRegistrations() throws Exception {
	final long verificationTimeout = 500L;
	final JobLeaderService jobLeaderService = mock(JobLeaderService.class);
	final HeartbeatServices heartbeatServicesMock = mock(HeartbeatServices.class, Mockito.RETURNS_MOCKS);

	final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
	when(jobMasterGateway.getHostname()).thenReturn("localhost");
	final JMTMRegistrationSuccess registrationMessage = new JMTMRegistrationSuccess(ResourceID.generate());
	final JobManagerTable jobManagerTableMock = spy(new JobManagerTable());

	final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();

	final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
		.setTaskManagerLocation(taskManagerLocation)
		.setJobManagerTable(jobManagerTableMock)
		.setJobLeaderService(jobLeaderService)
		.setTaskStateManager(localStateStoresManager)
		.build();

	final TestingTaskExecutor taskExecutor = new TestingTaskExecutor(
		rpc,
		taskManagerConfiguration,
		haServices,
		taskManagerServices,
		heartbeatServicesMock,
		UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
		null,
		dummyBlobCacheService,
		testingFatalErrorHandler);

	try {
		taskExecutor.start();
		taskExecutor.waitUntilStarted();

		ArgumentCaptor<JobLeaderListener> jobLeaderListenerArgumentCaptor = ArgumentCaptor.forClass(JobLeaderListener.class);

		verify(jobLeaderService).start(anyString(), any(RpcService.class), any(HighAvailabilityServices.class), jobLeaderListenerArgumentCaptor.capture());

		JobLeaderListener taskExecutorListener = jobLeaderListenerArgumentCaptor.getValue();

		taskExecutorListener.jobManagerGainedLeadership(jobId, jobMasterGateway, registrationMessage);

		// duplicate job manager gained leadership message
		taskExecutorListener.jobManagerGainedLeadership(jobId, jobMasterGateway, registrationMessage);

		ArgumentCaptor<JobManagerConnection> jobManagerConnectionArgumentCaptor = ArgumentCaptor.forClass(JobManagerConnection.class);

		verify(jobManagerTableMock, Mockito.timeout(verificationTimeout).times(1)).put(eq(jobId), jobManagerConnectionArgumentCaptor.capture());

		JobManagerConnection jobManagerConnection = jobManagerConnectionArgumentCaptor.getValue();

		assertEquals(jobMasterGateway, jobManagerConnection.getJobManagerGateway());
	} finally {
		RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
	}
}
 
示例21
/**
 * Tests that the heartbeat is stopped once the TaskExecutor detects that the RM is no longer leader.
 *
 * <p>See FLINK-8462
 */
@Test
public void testRMHeartbeatStopWhenLeadershipRevoked() throws Exception {
	final long heartbeatInterval = 1L;
	final long heartbeatTimeout = 10000L;
	final long pollTimeout = 1000L;
	final RecordingHeartbeatServices heartbeatServices = new RecordingHeartbeatServices(heartbeatInterval, heartbeatTimeout);
	final ResourceID rmResourceID = ResourceID.generate();

	final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);

	final String rmAddress = "rm";
	final TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway(
		ResourceManagerId.generate(),
		rmResourceID,
		rmAddress,
		rmAddress);

	rpc.registerGateway(rmAddress, rmGateway);

	final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();

	final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
		.setTaskManagerLocation(taskManagerLocation)
		.setTaskSlotTable(taskSlotTable)
		.setTaskStateManager(localStateStoresManager)
		.build();

	final TaskExecutor taskExecutor = new TaskExecutor(
		rpc,
		taskManagerConfiguration,
		haServices,
		taskManagerServices,
		heartbeatServices,
		UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
		null,
		dummyBlobCacheService,
		testingFatalErrorHandler);

	try {
		taskExecutor.start();

		final BlockingQueue<ResourceID> unmonitoredTargets = heartbeatServices.getUnmonitoredTargets();
		final BlockingQueue<ResourceID> monitoredTargets = heartbeatServices.getMonitoredTargets();

		resourceManagerLeaderRetriever.notifyListener(rmAddress, rmGateway.getFencingToken().toUUID());

		// wait for TM registration by checking the registered heartbeat targets
		assertThat(
			monitoredTargets.poll(pollTimeout, TimeUnit.MILLISECONDS),
			equalTo(rmResourceID));

		// let RM lose leadership
		resourceManagerLeaderRetriever.notifyListener(null, null);

		// the timeout should not have triggered since it is much higher
		assertThat(unmonitoredTargets.poll(pollTimeout, TimeUnit.MILLISECONDS), equalTo(rmResourceID));
	} finally {
		RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
	}
}
 
示例22
/**
 * Tests that a job is removed from the JobLeaderService once a TaskExecutor has
 * no more slots assigned to this job.
 *
 * <p>See FLINK-8504
 */
@Test
public void testRemoveJobFromJobLeaderService() throws Exception {
	final TaskSlotTable taskSlotTable = new TaskSlotTable(
		Collections.singleton(ResourceProfile.UNKNOWN),
		timerService);

	final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();

	final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
		.setTaskManagerLocation(taskManagerLocation)
		.setTaskSlotTable(taskSlotTable)
		.setTaskStateManager(localStateStoresManager)
		.build();

	final TestingTaskExecutor taskExecutor = new TestingTaskExecutor(
		rpc,
		taskManagerConfiguration,
		haServices,
		taskManagerServices,
		HEARTBEAT_SERVICES,
		UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
		null,
		dummyBlobCacheService,
		testingFatalErrorHandler);

	try {
		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
		final CompletableFuture<Void> initialSlotReport = new CompletableFuture<>();
		resourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
			initialSlotReport.complete(null);
			return CompletableFuture.completedFuture(Acknowledge.get());
		});
		final ResourceManagerId resourceManagerId = resourceManagerGateway.getFencingToken();

		rpc.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway);
		resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerId.toUUID());

		final CompletableFuture<LeaderRetrievalListener> startFuture = new CompletableFuture<>();
		final CompletableFuture<Void> stopFuture = new CompletableFuture<>();

		final StartStopNotifyingLeaderRetrievalService jobMasterLeaderRetriever = new StartStopNotifyingLeaderRetrievalService(
			startFuture,
			stopFuture);
		haServices.setJobMasterLeaderRetriever(jobId, jobMasterLeaderRetriever);

		taskExecutor.start();
		taskExecutor.waitUntilStarted();

		final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);

		final SlotID slotId = new SlotID(taskManagerLocation.getResourceID(), 0);
		final AllocationID allocationId = new AllocationID();

		assertThat(startFuture.isDone(), is(false));
		final JobLeaderService jobLeaderService = taskManagerServices.getJobLeaderService();
		assertThat(jobLeaderService.containsJob(jobId), is(false));

		// wait for the initial slot report
		initialSlotReport.get();

		taskExecutorGateway.requestSlot(
			slotId,
			jobId,
			allocationId,
			"foobar",
			resourceManagerId,
			timeout).get();

		// wait until the job leader retrieval service for jobId is started
		startFuture.get();
		assertThat(jobLeaderService.containsJob(jobId), is(true));

		taskExecutorGateway.freeSlot(allocationId, new FlinkException("Test exception"), timeout).get();

		// wait that the job leader retrieval service for jobId stopped becaue it should get removed
		stopFuture.get();
		assertThat(jobLeaderService.containsJob(jobId), is(false));
	} finally {
		RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
	}
}
 
示例23
private TaskExecutorLocalStateStoresManager createTaskExecutorLocalStateStoresManager() throws IOException {
	return new TaskExecutorLocalStateStoresManager(
		false,
		new File[]{tmp.newFolder()},
		Executors.directExecutor());
}
 
示例24
public TaskExecutorLocalStateStoresManager getTaskManagerStateStore() {
	return taskManagerStateStore;
}
 
示例25
/**
 * Creates and returns the task manager services.
 *
 * @param taskManagerServicesConfiguration task manager configuration
 * @param taskManagerMetricGroup metric group of the task manager
 * @param taskIOExecutor executor for async IO operations
 * @return task manager components
 * @throws Exception
 */
public static TaskManagerServices fromConfiguration(
		TaskManagerServicesConfiguration taskManagerServicesConfiguration,
		MetricGroup taskManagerMetricGroup,
		Executor taskIOExecutor) throws Exception {

	// pre-start checks
	checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());

	final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();

	// start the I/O manager, it will create some temp directories.
	final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());

	final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(
		taskManagerServicesConfiguration,
		taskEventDispatcher,
		taskManagerMetricGroup);
	final int dataPort = shuffleEnvironment.start();

	final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
	kvStateService.start();

	final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
		taskManagerServicesConfiguration.getResourceID(),
		taskManagerServicesConfiguration.getTaskManagerAddress(),
		dataPort);

	// this call has to happen strictly after the network stack has been initialized
	final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration);
	final long managedMemorySize = memoryManager.getMemorySize();

	final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();

	final int numOfSlots = taskManagerServicesConfiguration.getNumberOfSlots();
	final List<ResourceProfile> resourceProfiles =
		Collections.nCopies(numOfSlots, computeSlotResourceProfile(numOfSlots, managedMemorySize));

	final TimerService<AllocationID> timerService = new TimerService<>(
		new ScheduledThreadPoolExecutor(1),
		taskManagerServicesConfiguration.getTimerServiceShutdownTimeout());

	final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService);

	final JobManagerTable jobManagerTable = new JobManagerTable();

	final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());

	final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();

	final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];

	for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
		stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
	}

	final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
		taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
		stateRootDirectoryFiles,
		taskIOExecutor);

	return new TaskManagerServices(
		taskManagerLocation,
		memoryManager,
		ioManager,
		shuffleEnvironment,
		kvStateService,
		broadcastVariableManager,
		taskSlotTable,
		jobManagerTable,
		jobLeaderService,
		taskStateManager,
		taskEventDispatcher);
}
 
示例26
public TaskManagerServicesBuilder setTaskStateManager(TaskExecutorLocalStateStoresManager taskStateManager) {
	this.taskStateManager = taskStateManager;
	return this;
}
 
示例27
@Test
public void testShouldShutDownTaskManagerServicesInPostStop() throws Exception {
	final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);

	final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());

	final IOManager ioManager = new IOManagerAsync(tmp.newFolder().getAbsolutePath());

	final TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
		false,
		ioManager.getSpillingDirectories(),
		Executors.directExecutor());

	final MemoryManager memoryManager = new MemoryManager(
		4096,
		1,
		4096,
		MemoryType.HEAP,
		false);

	nettyShuffleEnvironment.start();

	final KvStateService kvStateService = new KvStateService(new KvStateRegistry(), null, null);
	kvStateService.start();

	final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
		.setTaskManagerLocation(taskManagerLocation)
		.setMemoryManager(memoryManager)
		.setIoManager(ioManager)
		.setShuffleEnvironment(nettyShuffleEnvironment)
		.setKvStateService(kvStateService)
		.setTaskSlotTable(taskSlotTable)
		.setJobLeaderService(jobLeaderService)
		.setTaskStateManager(localStateStoresManager)
		.build();

	final TaskExecutor taskManager = createTaskExecutor(taskManagerServices);

	try {
		taskManager.start();
	} finally {
		RpcUtils.terminateRpcEndpoint(taskManager, timeout);
	}

	assertThat(memoryManager.isShutdown(), is(true));
	assertThat(nettyShuffleEnvironment.isClosed(), is(true));
	assertThat(kvStateService.isShutdown(), is(true));
}
 
示例28
@Test
public void testHeartbeatTimeoutWithResourceManager() throws Exception {
	final String rmAddress = "rm";
	final ResourceID rmResourceId = new ResourceID(rmAddress);

	final long heartbeatInterval = 1L;
	final long heartbeatTimeout = 3L;

	final ResourceManagerId rmLeaderId = ResourceManagerId.generate();

	TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway(
		rmLeaderId,
		rmResourceId,
		rmAddress,
		rmAddress);

	final TaskExecutorRegistrationSuccess registrationResponse = new TaskExecutorRegistrationSuccess(
		new InstanceID(),
		rmResourceId,
		new ClusterInformation("localhost", 1234));

	final CompletableFuture<ResourceID> taskExecutorRegistrationFuture = new CompletableFuture<>();
	final CountDownLatch registrationAttempts = new CountDownLatch(2);
	rmGateway.setRegisterTaskExecutorFunction(
		registration -> {
			taskExecutorRegistrationFuture.complete(registration.f1);
			registrationAttempts.countDown();
			return CompletableFuture.completedFuture(registrationResponse);
		});

	final CompletableFuture<ResourceID> taskExecutorDisconnectFuture = new CompletableFuture<>();
	rmGateway.setDisconnectTaskExecutorConsumer(
		disconnectInfo -> taskExecutorDisconnectFuture.complete(disconnectInfo.f0));

	rpc.registerGateway(rmAddress, rmGateway);

	final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
	final SlotReport slotReport = new SlotReport();
	when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);

	HeartbeatServices heartbeatServices = new HeartbeatServices(heartbeatInterval, heartbeatTimeout);

	final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();

	final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
		.setTaskManagerLocation(taskManagerLocation)
		.setTaskSlotTable(taskSlotTable)
		.setTaskStateManager(localStateStoresManager)
		.build();

	final TaskExecutor taskManager = createTaskExecutor(taskManagerServices, heartbeatServices);

	try {
		taskManager.start();

		// define a leader and see that a registration happens
		resourceManagerLeaderRetriever.notifyListener(rmAddress, rmLeaderId.toUUID());

		// register resource manager success will trigger monitoring heartbeat target between tm and rm
		assertThat(taskExecutorRegistrationFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS), equalTo(taskManagerLocation.getResourceID()));

		// heartbeat timeout should trigger disconnect TaskManager from ResourceManager
		assertThat(taskExecutorDisconnectFuture.get(heartbeatTimeout * 50L, TimeUnit.MILLISECONDS), equalTo(taskManagerLocation.getResourceID()));

		assertTrue(
			"The TaskExecutor should try to reconnect to the RM",
			registrationAttempts.await(timeout.toMilliseconds(), TimeUnit.SECONDS));
	} finally {
		RpcUtils.terminateRpcEndpoint(taskManager, timeout);
	}
}
 
示例29
/**
 * Tests that the correct slot report is sent as part of the heartbeat response.
 */
@Test
public void testHeartbeatSlotReporting() throws Exception {
	final String rmAddress = "rm";
	final UUID rmLeaderId = UUID.randomUUID();

	// register the mock resource manager gateway
	final TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway();
	final CompletableFuture<ResourceID> taskExecutorRegistrationFuture = new CompletableFuture<>();
	final ResourceID rmResourceId = rmGateway.getOwnResourceId();
	final CompletableFuture<RegistrationResponse> registrationResponse = CompletableFuture.completedFuture(
		new TaskExecutorRegistrationSuccess(
			new InstanceID(),
			rmResourceId,
			new ClusterInformation("localhost", 1234)));

	rmGateway.setRegisterTaskExecutorFunction(stringResourceIDIntegerHardwareDescriptionTuple4 -> {
		taskExecutorRegistrationFuture.complete(stringResourceIDIntegerHardwareDescriptionTuple4.f1);
		return registrationResponse;
	});

	final CompletableFuture<SlotReport> initialSlotReportFuture = new CompletableFuture<>();
	rmGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
		initialSlotReportFuture.complete(resourceIDInstanceIDSlotReportTuple3.f2);
		return CompletableFuture.completedFuture(Acknowledge.get());
	});

	final CompletableFuture<SlotReport> heartbeatSlotReportFuture = new CompletableFuture<>();
	rmGateway.setTaskExecutorHeartbeatConsumer((resourceID, slotReport) -> heartbeatSlotReportFuture.complete(slotReport));

	rpc.registerGateway(rmAddress, rmGateway);

	final SlotID slotId = new SlotID(taskManagerLocation.getResourceID(), 0);
	final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
	final SlotReport slotReport1 = new SlotReport(
		new SlotStatus(
			slotId,
			resourceProfile));
	final SlotReport slotReport2 = new SlotReport(
		new SlotStatus(
			slotId,
			resourceProfile,
			new JobID(),
			new AllocationID()));

	final TestingTaskSlotTable taskSlotTable = new TestingTaskSlotTable(new ArrayDeque<>(Arrays.asList(slotReport1, slotReport2)));

	final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();

	final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
		.setTaskManagerLocation(taskManagerLocation)
		.setTaskSlotTable(taskSlotTable)
		.setTaskStateManager(localStateStoresManager)
		.build();

	final TaskExecutor taskManager = createTaskExecutor(taskManagerServices);

	try {
		taskManager.start();

		// define a leader and see that a registration happens
		resourceManagerLeaderRetriever.notifyListener(rmAddress, rmLeaderId);

		// register resource manager success will trigger monitoring heartbeat target between tm and rm
		assertThat(taskExecutorRegistrationFuture.get(), equalTo(taskManagerLocation.getResourceID()));
		assertThat(initialSlotReportFuture.get(), equalTo(slotReport1));

		TaskExecutorGateway taskExecutorGateway = taskManager.getSelfGateway(TaskExecutorGateway.class);

		// trigger the heartbeat asynchronously
		taskExecutorGateway.heartbeatFromResourceManager(rmResourceId);

		// wait for heartbeat response
		SlotReport actualSlotReport = heartbeatSlotReportFuture.get();

		// the new slot report should be reported
		assertEquals(slotReport2, actualSlotReport);
	} finally {
		RpcUtils.terminateRpcEndpoint(taskManager, timeout);
	}
}
 
示例30
@Test
public void testTriggerRegistrationOnLeaderChange() throws Exception {
	final String address1 = "/resource/manager/address/one";
	final String address2 = "/resource/manager/address/two";
	final UUID leaderId1 = UUID.randomUUID();
	final UUID leaderId2 = UUID.randomUUID();
	final ResourceID rmResourceId1 = new ResourceID(address1);
	final ResourceID rmResourceId2 = new ResourceID(address2);

	// register the mock resource manager gateways
	ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class);
	ResourceManagerGateway rmGateway2 = mock(ResourceManagerGateway.class);

	when(rmGateway1.registerTaskExecutor(
				anyString(), any(ResourceID.class), anyInt(), any(HardwareDescription.class), any(Time.class)))
		.thenReturn(CompletableFuture.completedFuture(
			new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId1, new ClusterInformation("localhost", 1234))));
	when(rmGateway2.registerTaskExecutor(
				anyString(), any(ResourceID.class), anyInt(), any(HardwareDescription.class), any(Time.class)))
		.thenReturn(CompletableFuture.completedFuture(
			new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId2, new ClusterInformation("localhost", 1234))));

	rpc.registerGateway(address1, rmGateway1);
	rpc.registerGateway(address2, rmGateway2);

	final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
	final SlotReport slotReport = new SlotReport();
	when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);

	final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();

	final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
		.setTaskManagerLocation(taskManagerLocation)
		.setTaskSlotTable(taskSlotTable)
		.setTaskStateManager(localStateStoresManager)
		.build();

	TaskExecutor taskManager = createTaskExecutor(taskManagerServices);

	try {
		taskManager.start();
		String taskManagerAddress = taskManager.getAddress();

		// no connection initially, since there is no leader
		assertNull(taskManager.getResourceManagerConnection());

		// define a leader and see that a registration happens
		resourceManagerLeaderRetriever.notifyListener(address1, leaderId1);

		verify(rmGateway1, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
				eq(taskManagerAddress), eq(taskManagerLocation.getResourceID()), anyInt(), any(HardwareDescription.class), any(Time.class));
		assertNotNull(taskManager.getResourceManagerConnection());

		// cancel the leader
		resourceManagerLeaderRetriever.notifyListener(null, null);

		// set a new leader, see that a registration happens
		resourceManagerLeaderRetriever.notifyListener(address2, leaderId2);

		verify(rmGateway2, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
				eq(taskManagerAddress), eq(taskManagerLocation.getResourceID()), anyInt(), any(HardwareDescription.class), any(Time.class));
		assertNotNull(taskManager.getResourceManagerConnection());
	}
	finally {
		RpcUtils.terminateRpcEndpoint(taskManager, timeout);
	}
}