Java源码示例:org.apache.flink.runtime.checkpoint.CheckpointCoordinator

示例1
@Override
public void declineCheckpoint(DeclineCheckpoint decline) {
	final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();

	if (checkpointCoordinator != null) {
		getRpcService().execute(() -> {
			try {
				checkpointCoordinator.receiveDeclineMessage(decline);
			} catch (Exception e) {
				log.error("Error in CheckpointCoordinator while processing {}", decline, e);
			}
		});
	} else {
		String errorMessage = "Received DeclineCheckpoint message for job {} with no CheckpointCoordinator";
		if (executionGraph.getState() == JobStatus.RUNNING) {
			log.error(errorMessage, jobGraph.getJobID());
		} else {
			log.debug(errorMessage, jobGraph.getJobID());
		}
	}
}
 
示例2
private ExecutionGraph createAndRestoreExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup) throws Exception {

		ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup);

		final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();

		if (checkpointCoordinator != null) {
			// check whether we find a valid checkpoint
			if (!checkpointCoordinator.restoreLatestCheckpointedState(
				newExecutionGraph.getAllVertices(),
				false,
				false)) {

				// check whether we can restore from a savepoint
				tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
			}
		}

		return newExecutionGraph;
	}
 
示例3
private ExecutionGraph createAndRestoreExecutionGraph(
		JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
		ShuffleMaster<?> shuffleMaster,
		PartitionTracker partitionTracker) throws Exception {

	ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker);

	final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();

	if (checkpointCoordinator != null) {
		// check whether we find a valid checkpoint
		if (!checkpointCoordinator.restoreLatestCheckpointedState(
			newExecutionGraph.getAllVertices(),
			false,
			false)) {

			// check whether we can restore from a savepoint
			tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
		}
	}

	return newExecutionGraph;
}
 
示例4
@Override
public void declineCheckpoint(final DeclineCheckpoint decline) {
	mainThreadExecutor.assertRunningInMainThread();

	final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
	final String taskManagerLocationInfo = retrieveTaskManagerLocation(decline.getTaskExecutionId());

	if (checkpointCoordinator != null) {
		ioExecutor.execute(() -> {
			try {
				checkpointCoordinator.receiveDeclineMessage(decline, taskManagerLocationInfo);
			} catch (Exception e) {
				log.error("Error in CheckpointCoordinator while processing {}", decline, e);
			}
		});
	} else {
		String errorMessage = "Received DeclineCheckpoint message for job {} with no CheckpointCoordinator";
		if (executionGraph.getState() == JobStatus.RUNNING) {
			log.error(errorMessage, jobGraph.getJobID());
		} else {
			log.debug(errorMessage, jobGraph.getJobID());
		}
	}
}
 
示例5
/**
 * Let the checkpoint coordinator to receive all acknowledges from given executionVertexes so that to complete the expected checkpoint.
 */
private void acknowledgeAllCheckpoints(CheckpointCoordinator checkpointCoordinator, Iterator<ExecutionVertex> executionVertexes) throws IOException, CheckpointException {
	while (executionVertexes.hasNext()) {
		ExecutionVertex executionVertex = executionVertexes.next();
		for (int index = 0; index < executionVertex.getJobVertex().getParallelism(); index++) {
			JobVertexID jobVertexID = executionVertex.getJobvertexId();
			OperatorStateHandle opStateBackend = CheckpointCoordinatorTest.generatePartitionableStateHandle(jobVertexID, index, 2, 8, false);
			OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(opStateBackend, null, null, null);
			TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot();
			taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID), operatorSubtaskState);

			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
				executionVertex.getJobId(),
				executionVertex.getJobVertex().getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
				checkpointId,
				new CheckpointMetrics(),
				taskOperatorSubtaskStates);

			checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown location");
		}
	}
}
 
示例6
private ExecutionGraph createAndRestoreExecutionGraph(
	JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
	ShuffleMaster<?> shuffleMaster,
	JobMasterPartitionTracker partitionTracker) throws Exception {

	ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker);

	final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();

	if (checkpointCoordinator != null) {
		// check whether we find a valid checkpoint
		if (!checkpointCoordinator.restoreLatestCheckpointedStateToAll(
			new HashSet<>(newExecutionGraph.getAllVertices().values()),
			false)) {

			// check whether we can restore from a savepoint
			tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
		}
	}

	return newExecutionGraph;
}
 
示例7
protected void restoreState(final Set<ExecutionVertexID> vertices, final boolean isGlobalRecovery) throws Exception {
	final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
	if (checkpointCoordinator == null) {
		return;
	}

	// if there is checkpointed state, reload it into the executions

	// abort pending checkpoints to
	// i) enable new checkpoint triggering without waiting for last checkpoint expired.
	// ii) ensure the EXACTLY_ONCE semantics if needed.
	checkpointCoordinator.abortPendingCheckpoints(
			new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));

	final Set<ExecutionJobVertex> jobVerticesToRestore = getInvolvedExecutionJobVertices(vertices);
	if (isGlobalRecovery) {
		checkpointCoordinator.restoreLatestCheckpointedStateToAll(jobVerticesToRestore, true);
	} else {
		checkpointCoordinator.restoreLatestCheckpointedStateToSubtasks(jobVerticesToRestore);
	}
}
 
示例8
@Override
public void declineCheckpoint(final DeclineCheckpoint decline) {
	mainThreadExecutor.assertRunningInMainThread();

	final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
	final String taskManagerLocationInfo = retrieveTaskManagerLocation(decline.getTaskExecutionId());

	if (checkpointCoordinator != null) {
		ioExecutor.execute(() -> {
			try {
				checkpointCoordinator.receiveDeclineMessage(decline, taskManagerLocationInfo);
			} catch (Exception e) {
				log.error("Error in CheckpointCoordinator while processing {}", decline, e);
			}
		});
	} else {
		String errorMessage = "Received DeclineCheckpoint message for job {} with no CheckpointCoordinator";
		if (executionGraph.getState() == JobStatus.RUNNING) {
			log.error(errorMessage, jobGraph.getJobID());
		} else {
			log.debug(errorMessage, jobGraph.getJobID());
		}
	}
}
 
示例9
private void onTerminalState(JobStatus status) {
	try {
		CheckpointCoordinator coord = this.checkpointCoordinator;
		this.checkpointCoordinator = null;
		if (coord != null) {
			coord.shutdown(status);
		}
		if (checkpointCoordinatorTimer != null) {
			checkpointCoordinatorTimer.shutdownNow();
			checkpointCoordinatorTimer = null;
		}
	}
	catch (Exception e) {
		LOG.error("Error while cleaning up after execution", e);
	}
	finally {
		terminationFuture.complete(status);
	}
}
 
示例10
public static void acknowledgeCurrentCheckpoint(DefaultScheduler scheduler) {
	final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
	assertEquals("Coordinator has not ", 1, checkpointCoordinator.getNumberOfPendingCheckpoints());

	final PendingCheckpoint pc = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();

	// because of races against the async thread in the coordinator, we need to wait here until the
	// coordinator state is acknowledged. This can be removed once the CheckpointCoordinator is
	// executes all actions in the Scheduler's main thread executor.
	while (pc.getNumberOfNonAcknowledgedOperatorCoordinators() > 0) {
		try {
			Thread.sleep(1);
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
			fail("interrupted");
		}
	}

	getAllCurrentExecutionAttempts(scheduler).forEach(
		(attemptId) -> scheduler.acknowledgeCheckpoint(pc.getJobId(), attemptId, pc.getCheckpointId(), new CheckpointMetrics(), null));
}
 
示例11
@Test
public void abortPendingCheckpointsWhenRestartingTasks() throws Exception {
	final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
	enableCheckpointing(jobGraph);

	final CountDownLatch checkpointTriggeredLatch = getCheckpointTriggeredLatch();

	final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);

	final ArchivedExecutionVertex onlyExecutionVertex = Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
	final ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
	scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.RUNNING));

	final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);

	checkpointCoordinator.triggerCheckpoint(false);
	checkpointTriggeredLatch.await();
	assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(1)));

	scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED));
	taskRestartExecutor.triggerScheduledTasks();
	assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(0)));
}
 
示例12
@Override
public void acknowledgeCheckpoint(
		final JobID jobID,
		final ExecutionAttemptID executionAttemptID,
		final long checkpointId,
		final CheckpointMetrics checkpointMetrics,
		final TaskStateSnapshot checkpointState) {

	final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
	final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
		jobID,
		executionAttemptID,
		checkpointId,
		checkpointMetrics,
		checkpointState);

	if (checkpointCoordinator != null) {
		getRpcService().execute(() -> {
			try {
				checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
			} catch (Throwable t) {
				log.warn("Error while processing checkpoint acknowledgement message", t);
			}
		});
	} else {
		String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator";
		if (executionGraph.getState() == JobStatus.RUNNING) {
			log.error(errorMessage, jobGraph.getJobID());
		} else {
			log.debug(errorMessage, jobGraph.getJobID());
		}
	}
}
 
示例13
@Override
public CompletableFuture<String> triggerSavepoint(
		@Nullable final String targetDirectory,
		final boolean cancelJob,
		final Time timeout) {

	final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
	if (checkpointCoordinator == null) {
		return FutureUtils.completedExceptionally(new IllegalStateException(
			String.format("Job %s is not a streaming job.", jobGraph.getJobID())));
	} else if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
		log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID());

		return FutureUtils.completedExceptionally(new IllegalStateException(
			"No savepoint directory configured. You can either specify a directory " +
				"while cancelling via -s :targetDirectory or configure a cluster-wide " +
				"default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'."));
	}

	if (cancelJob) {
		checkpointCoordinator.stopCheckpointScheduler();
	}
	return checkpointCoordinator
		.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
		.thenApply(CompletedCheckpoint::getExternalPointer)
		.handleAsync((path, throwable) -> {
			if (throwable != null) {
				if (cancelJob) {
					startCheckpointScheduler(checkpointCoordinator);
				}
				throw new CompletionException(throwable);
			} else if (cancelJob) {
				log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
				cancel(timeout);
			}
			return path;
		}, getMainThreadExecutor());
}
 
示例14
private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) {
	if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
		try {
			checkpointCoordinator.startCheckpointScheduler();
		} catch (IllegalStateException ignored) {
			// Concurrent shut down of the coordinator
		}
	}
}
 
示例15
/**
 * Tries to restore the given {@link ExecutionGraph} from the provided {@link SavepointRestoreSettings}.
 *
 * @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored
 * @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about the savepoint to restore from
 * @throws Exception if the {@link ExecutionGraph} could not be restored
 */
private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) throws Exception {
	if (savepointRestoreSettings.restoreSavepoint()) {
		final CheckpointCoordinator checkpointCoordinator = executionGraphToRestore.getCheckpointCoordinator();
		if (checkpointCoordinator != null) {
			checkpointCoordinator.restoreSavepoint(
				savepointRestoreSettings.getRestorePath(),
				savepointRestoreSettings.allowNonRestoredState(),
				executionGraphToRestore.getAllVertices(),
				userCodeLoader);
		}
	}
}
 
示例16
private void onTerminalState(JobStatus status) {
	try {
		CheckpointCoordinator coord = this.checkpointCoordinator;
		this.checkpointCoordinator = null;
		if (coord != null) {
			coord.shutdown(status);
		}
	}
	catch (Exception e) {
		LOG.error("Error while cleaning up after execution", e);
	}
	finally {
		terminationFuture.complete(status);
	}
}
 
示例17
/**
 * Tries to restore the given {@link ExecutionGraph} from the provided {@link SavepointRestoreSettings}.
 *
 * @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored
 * @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about the savepoint to restore from
 * @throws Exception if the {@link ExecutionGraph} could not be restored
 */
private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) throws Exception {
	if (savepointRestoreSettings.restoreSavepoint()) {
		final CheckpointCoordinator checkpointCoordinator = executionGraphToRestore.getCheckpointCoordinator();
		if (checkpointCoordinator != null) {
			checkpointCoordinator.restoreSavepoint(
				savepointRestoreSettings.getRestorePath(),
				savepointRestoreSettings.allowNonRestoredState(),
				executionGraphToRestore.getAllVertices(),
				userCodeLoader);
		}
	}
}
 
示例18
@Override
public CompletableFuture<String> triggerSavepoint(final String targetDirectory, final boolean cancelJob) {
	mainThreadExecutor.assertRunningInMainThread();

	final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
	if (checkpointCoordinator == null) {
		throw new IllegalStateException(
			String.format("Job %s is not a streaming job.", jobGraph.getJobID()));
	} else if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
		log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID());

		throw new IllegalStateException(
			"No savepoint directory configured. You can either specify a directory " +
				"while cancelling via -s :targetDirectory or configure a cluster-wide " +
				"default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.");
	}

	if (cancelJob) {
		checkpointCoordinator.stopCheckpointScheduler();
	}

	return checkpointCoordinator
		.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
		.thenApply(CompletedCheckpoint::getExternalPointer)
		.handleAsync((path, throwable) -> {
			if (throwable != null) {
				if (cancelJob) {
					startCheckpointScheduler(checkpointCoordinator);
				}
				throw new CompletionException(throwable);
			} else if (cancelJob) {
				log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
				cancel();
			}
			return path;
		}, mainThreadExecutor);
}
 
示例19
private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) {
	mainThreadExecutor.assertRunningInMainThread();

	if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
		try {
			checkpointCoordinator.startCheckpointScheduler();
		} catch (IllegalStateException ignored) {
			// Concurrent shut down of the coordinator
		}
	}
}
 
示例20
@Override
public void acknowledgeCheckpoint(final JobID jobID, final ExecutionAttemptID executionAttemptID, final long checkpointId, final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot checkpointState) {
	mainThreadExecutor.assertRunningInMainThread();

	final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
	final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
		jobID,
		executionAttemptID,
		checkpointId,
		checkpointMetrics,
		checkpointState);

	final String taskManagerLocationInfo = retrieveTaskManagerLocation(executionAttemptID);

	if (checkpointCoordinator != null) {
		ioExecutor.execute(() -> {
			try {
				checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, taskManagerLocationInfo);
			} catch (Throwable t) {
				log.warn("Error while processing checkpoint acknowledgement message", t);
			}
		});
	} else {
		String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator";
		if (executionGraph.getState() == JobStatus.RUNNING) {
			log.error(errorMessage, jobGraph.getJobID());
		} else {
			log.debug(errorMessage, jobGraph.getJobID());
		}
	}
}
 
示例21
private void onTerminalState(JobStatus status) {
	try {
		CheckpointCoordinator coord = this.checkpointCoordinator;
		this.checkpointCoordinator = null;
		if (coord != null) {
			coord.shutdown(status);
		}
	}
	catch (Exception e) {
		LOG.error("Error while cleaning up after execution", e);
	}
	finally {
		terminationFuture.complete(status);
	}
}
 
示例22
/**
 * Attach pending checkpoints of chk-42 and chk-43 to the execution graph.
 * If {@link #acknowledgeAllCheckpoints(CheckpointCoordinator, Iterator)} called then,
 * chk-42 would become the completed checkpoint.
 */
private void attachPendingCheckpoints(ExecutionGraph eg) throws IOException {
	final Map<Long, PendingCheckpoint> pendingCheckpoints = new HashMap<>();
	final Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm = new HashMap<>();
	eg.getAllExecutionVertices().forEach(e -> {
		Execution ee = e.getCurrentExecutionAttempt();
		if (ee != null) {
			verticesToConfirm.put(ee.getAttemptId(), e);
		}
	});

	CheckpointCoordinator checkpointCoordinator = eg.getCheckpointCoordinator();
	assertNotNull(checkpointCoordinator);
	CheckpointStorageCoordinatorView checkpointStorage = checkpointCoordinator.getCheckpointStorage();
	pendingCheckpoints.put(checkpointId, new PendingCheckpoint(
		eg.getJobID(),
		checkpointId,
		0L,
		verticesToConfirm,
		CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
		checkpointStorage.initializeLocationForCheckpoint(checkpointId),
		eg.getFutureExecutor()));

	long newCheckpointId = checkpointId + 1;
	pendingCheckpoints.put(newCheckpointId, new PendingCheckpoint(
		eg.getJobID(),
		newCheckpointId,
		0L,
		verticesToConfirm,
		CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
		checkpointStorage.initializeLocationForCheckpoint(newCheckpointId),
		eg.getFutureExecutor()));
	Whitebox.setInternalState(checkpointCoordinator, "pendingCheckpoints", pendingCheckpoints);
}
 
示例23
@Test
public void abortPendingCheckpointsWhenRestartingTasks() throws Exception {
	final JobGraph jobGraph = createStreamingJobGraph();
	final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);

	final Iterator<ExecutionVertex> vertexIterator = executionGraph.getAllExecutionVertices().iterator();
	final ExecutionVertex firstExecutionVertex = vertexIterator.next();

	setTasksRunning(executionGraph, firstExecutionVertex, vertexIterator.next());

	final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
	checkState(checkpointCoordinator != null);

	checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(),  false);
	assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
	long checkpointId = checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();

	AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
		jobGraph.getJobID(),
		firstExecutionVertex.getCurrentExecutionAttempt().getAttemptId(),
		checkpointId);

	// let the first vertex acknowledge the checkpoint, and fail it afterwards
	// the failover strategy should then cancel all pending checkpoints on restart
	checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown location");
	assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());

	failVertex(firstExecutionVertex);
	assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
	manualMainThreadExecutor.triggerScheduledTasks();

	assertNoPendingCheckpoints(checkpointCoordinator);
}
 
示例24
/**
 * Tries to restore the given {@link ExecutionGraph} from the provided {@link SavepointRestoreSettings}.
 *
 * @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored
 * @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about the savepoint to restore from
 * @throws Exception if the {@link ExecutionGraph} could not be restored
 */
private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) throws Exception {
	if (savepointRestoreSettings.restoreSavepoint()) {
		final CheckpointCoordinator checkpointCoordinator = executionGraphToRestore.getCheckpointCoordinator();
		if (checkpointCoordinator != null) {
			checkpointCoordinator.restoreSavepoint(
				savepointRestoreSettings.getRestorePath(),
				savepointRestoreSettings.allowNonRestoredState(),
				executionGraphToRestore.getAllVertices(),
				userCodeLoader);
		}
	}
}
 
示例25
@Override
public CompletableFuture<String> triggerSavepoint(final String targetDirectory, final boolean cancelJob) {
	mainThreadExecutor.assertRunningInMainThread();

	final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
	if (checkpointCoordinator == null) {
		throw new IllegalStateException(
			String.format("Job %s is not a streaming job.", jobGraph.getJobID()));
	} else if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
		log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID());

		throw new IllegalStateException(
			"No savepoint directory configured. You can either specify a directory " +
				"while cancelling via -s :targetDirectory or configure a cluster-wide " +
				"default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.");
	}

	log.info("Triggering {}savepoint for job {}.", cancelJob ? "cancel-with-" : "", jobGraph.getJobID());

	if (cancelJob) {
		checkpointCoordinator.stopCheckpointScheduler();
	}

	return checkpointCoordinator
		.triggerSavepoint(targetDirectory)
		.thenApply(CompletedCheckpoint::getExternalPointer)
		.handleAsync((path, throwable) -> {
			if (throwable != null) {
				if (cancelJob) {
					startCheckpointScheduler(checkpointCoordinator);
				}
				throw new CompletionException(throwable);
			} else if (cancelJob) {
				log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
				cancel();
			}
			return path;
		}, mainThreadExecutor);
}
 
示例26
private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) {
	mainThreadExecutor.assertRunningInMainThread();

	if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
		try {
			checkpointCoordinator.startCheckpointScheduler();
		} catch (IllegalStateException ignored) {
			// Concurrent shut down of the coordinator
		}
	}
}
 
示例27
@Override
public void acknowledgeCheckpoint(final JobID jobID, final ExecutionAttemptID executionAttemptID, final long checkpointId, final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot checkpointState) {
	mainThreadExecutor.assertRunningInMainThread();

	final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
	final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
		jobID,
		executionAttemptID,
		checkpointId,
		checkpointMetrics,
		checkpointState);

	final String taskManagerLocationInfo = retrieveTaskManagerLocation(executionAttemptID);

	if (checkpointCoordinator != null) {
		ioExecutor.execute(() -> {
			try {
				checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, taskManagerLocationInfo);
			} catch (Throwable t) {
				log.warn("Error while processing checkpoint acknowledgement message", t);
			}
		});
	} else {
		String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator";
		if (executionGraph.getState() == JobStatus.RUNNING) {
			log.error(errorMessage, jobGraph.getJobID());
		} else {
			log.debug(errorMessage, jobGraph.getJobID());
		}
	}
}
 
示例28
public static void acknowledgePendingCheckpoint(final DefaultScheduler scheduler, final long checkpointId) throws CheckpointException {
	final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
	final JobID jid = scheduler.getJobId();

	for (ExecutionAttemptID attemptId : getAllCurrentExecutionAttempts(scheduler)) {
		final AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jid, attemptId, checkpointId);
		checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown location");
	}
}
 
示例29
public static CompletedCheckpoint takeCheckpoint(DefaultScheduler scheduler) throws Exception {
	final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
	checkpointCoordinator.triggerCheckpoint(false);

	assertEquals("test setup inconsistent", 1, checkpointCoordinator.getNumberOfPendingCheckpoints());
	final PendingCheckpoint checkpoint = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
	final CompletableFuture<CompletedCheckpoint> future = checkpoint.getCompletionFuture();

	acknowledgePendingCheckpoint(scheduler, checkpoint.getCheckpointId());

	CompletedCheckpoint completed = future.getNow(null);
	assertNotNull("checkpoint not complete", completed);
	return completed;
}
 
示例30
@Test
public void restoreStateWhenRestartingTasks() throws Exception {
	final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
	enableCheckpointing(jobGraph);

	final CountDownLatch checkpointTriggeredLatch = getCheckpointTriggeredLatch();

	final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);

	final ArchivedExecutionVertex onlyExecutionVertex = Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
	final ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
	scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.RUNNING));

	final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);

	// register a stateful master hook to help verify state restore
	final TestMasterHook masterHook = TestMasterHook.fromId("testHook");
	checkpointCoordinator.addMasterHook(masterHook);

	// complete one checkpoint for state restore
	checkpointCoordinator.triggerCheckpoint(false);
	checkpointTriggeredLatch.await();
	final long checkpointId = checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
	acknowledgePendingCheckpoint(scheduler, checkpointId);

	scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED));
	taskRestartExecutor.triggerScheduledTasks();
	assertThat(masterHook.getRestoreCount(), is(equalTo(1)));
}