Java源码示例:org.apache.flink.runtime.checkpoint.CheckpointCoordinator
示例1
@Override
public void declineCheckpoint(DeclineCheckpoint decline) {
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
if (checkpointCoordinator != null) {
getRpcService().execute(() -> {
try {
checkpointCoordinator.receiveDeclineMessage(decline);
} catch (Exception e) {
log.error("Error in CheckpointCoordinator while processing {}", decline, e);
}
});
} else {
String errorMessage = "Received DeclineCheckpoint message for job {} with no CheckpointCoordinator";
if (executionGraph.getState() == JobStatus.RUNNING) {
log.error(errorMessage, jobGraph.getJobID());
} else {
log.debug(errorMessage, jobGraph.getJobID());
}
}
}
示例2
private ExecutionGraph createAndRestoreExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup) throws Exception {
ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup);
final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();
if (checkpointCoordinator != null) {
// check whether we find a valid checkpoint
if (!checkpointCoordinator.restoreLatestCheckpointedState(
newExecutionGraph.getAllVertices(),
false,
false)) {
// check whether we can restore from a savepoint
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
}
}
return newExecutionGraph;
}
示例3
private ExecutionGraph createAndRestoreExecutionGraph(
JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
ShuffleMaster<?> shuffleMaster,
PartitionTracker partitionTracker) throws Exception {
ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker);
final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();
if (checkpointCoordinator != null) {
// check whether we find a valid checkpoint
if (!checkpointCoordinator.restoreLatestCheckpointedState(
newExecutionGraph.getAllVertices(),
false,
false)) {
// check whether we can restore from a savepoint
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
}
}
return newExecutionGraph;
}
示例4
@Override
public void declineCheckpoint(final DeclineCheckpoint decline) {
mainThreadExecutor.assertRunningInMainThread();
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
final String taskManagerLocationInfo = retrieveTaskManagerLocation(decline.getTaskExecutionId());
if (checkpointCoordinator != null) {
ioExecutor.execute(() -> {
try {
checkpointCoordinator.receiveDeclineMessage(decline, taskManagerLocationInfo);
} catch (Exception e) {
log.error("Error in CheckpointCoordinator while processing {}", decline, e);
}
});
} else {
String errorMessage = "Received DeclineCheckpoint message for job {} with no CheckpointCoordinator";
if (executionGraph.getState() == JobStatus.RUNNING) {
log.error(errorMessage, jobGraph.getJobID());
} else {
log.debug(errorMessage, jobGraph.getJobID());
}
}
}
示例5
/**
* Let the checkpoint coordinator to receive all acknowledges from given executionVertexes so that to complete the expected checkpoint.
*/
private void acknowledgeAllCheckpoints(CheckpointCoordinator checkpointCoordinator, Iterator<ExecutionVertex> executionVertexes) throws IOException, CheckpointException {
while (executionVertexes.hasNext()) {
ExecutionVertex executionVertex = executionVertexes.next();
for (int index = 0; index < executionVertex.getJobVertex().getParallelism(); index++) {
JobVertexID jobVertexID = executionVertex.getJobvertexId();
OperatorStateHandle opStateBackend = CheckpointCoordinatorTest.generatePartitionableStateHandle(jobVertexID, index, 2, 8, false);
OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(opStateBackend, null, null, null);
TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot();
taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID), operatorSubtaskState);
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
executionVertex.getJobId(),
executionVertex.getJobVertex().getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
checkpointId,
new CheckpointMetrics(),
taskOperatorSubtaskStates);
checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown location");
}
}
}
示例6
private ExecutionGraph createAndRestoreExecutionGraph(
JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTracker partitionTracker) throws Exception {
ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker);
final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();
if (checkpointCoordinator != null) {
// check whether we find a valid checkpoint
if (!checkpointCoordinator.restoreLatestCheckpointedStateToAll(
new HashSet<>(newExecutionGraph.getAllVertices().values()),
false)) {
// check whether we can restore from a savepoint
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
}
}
return newExecutionGraph;
}
示例7
protected void restoreState(final Set<ExecutionVertexID> vertices, final boolean isGlobalRecovery) throws Exception {
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
if (checkpointCoordinator == null) {
return;
}
// if there is checkpointed state, reload it into the executions
// abort pending checkpoints to
// i) enable new checkpoint triggering without waiting for last checkpoint expired.
// ii) ensure the EXACTLY_ONCE semantics if needed.
checkpointCoordinator.abortPendingCheckpoints(
new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
final Set<ExecutionJobVertex> jobVerticesToRestore = getInvolvedExecutionJobVertices(vertices);
if (isGlobalRecovery) {
checkpointCoordinator.restoreLatestCheckpointedStateToAll(jobVerticesToRestore, true);
} else {
checkpointCoordinator.restoreLatestCheckpointedStateToSubtasks(jobVerticesToRestore);
}
}
示例8
@Override
public void declineCheckpoint(final DeclineCheckpoint decline) {
mainThreadExecutor.assertRunningInMainThread();
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
final String taskManagerLocationInfo = retrieveTaskManagerLocation(decline.getTaskExecutionId());
if (checkpointCoordinator != null) {
ioExecutor.execute(() -> {
try {
checkpointCoordinator.receiveDeclineMessage(decline, taskManagerLocationInfo);
} catch (Exception e) {
log.error("Error in CheckpointCoordinator while processing {}", decline, e);
}
});
} else {
String errorMessage = "Received DeclineCheckpoint message for job {} with no CheckpointCoordinator";
if (executionGraph.getState() == JobStatus.RUNNING) {
log.error(errorMessage, jobGraph.getJobID());
} else {
log.debug(errorMessage, jobGraph.getJobID());
}
}
}
示例9
private void onTerminalState(JobStatus status) {
try {
CheckpointCoordinator coord = this.checkpointCoordinator;
this.checkpointCoordinator = null;
if (coord != null) {
coord.shutdown(status);
}
if (checkpointCoordinatorTimer != null) {
checkpointCoordinatorTimer.shutdownNow();
checkpointCoordinatorTimer = null;
}
}
catch (Exception e) {
LOG.error("Error while cleaning up after execution", e);
}
finally {
terminationFuture.complete(status);
}
}
示例10
public static void acknowledgeCurrentCheckpoint(DefaultScheduler scheduler) {
final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
assertEquals("Coordinator has not ", 1, checkpointCoordinator.getNumberOfPendingCheckpoints());
final PendingCheckpoint pc = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
// because of races against the async thread in the coordinator, we need to wait here until the
// coordinator state is acknowledged. This can be removed once the CheckpointCoordinator is
// executes all actions in the Scheduler's main thread executor.
while (pc.getNumberOfNonAcknowledgedOperatorCoordinators() > 0) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
fail("interrupted");
}
}
getAllCurrentExecutionAttempts(scheduler).forEach(
(attemptId) -> scheduler.acknowledgeCheckpoint(pc.getJobId(), attemptId, pc.getCheckpointId(), new CheckpointMetrics(), null));
}
示例11
@Test
public void abortPendingCheckpointsWhenRestartingTasks() throws Exception {
final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
enableCheckpointing(jobGraph);
final CountDownLatch checkpointTriggeredLatch = getCheckpointTriggeredLatch();
final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
final ArchivedExecutionVertex onlyExecutionVertex = Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
final ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.RUNNING));
final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
checkpointCoordinator.triggerCheckpoint(false);
checkpointTriggeredLatch.await();
assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(1)));
scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED));
taskRestartExecutor.triggerScheduledTasks();
assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(0)));
}
示例12
@Override
public void acknowledgeCheckpoint(
final JobID jobID,
final ExecutionAttemptID executionAttemptID,
final long checkpointId,
final CheckpointMetrics checkpointMetrics,
final TaskStateSnapshot checkpointState) {
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
jobID,
executionAttemptID,
checkpointId,
checkpointMetrics,
checkpointState);
if (checkpointCoordinator != null) {
getRpcService().execute(() -> {
try {
checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
} catch (Throwable t) {
log.warn("Error while processing checkpoint acknowledgement message", t);
}
});
} else {
String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator";
if (executionGraph.getState() == JobStatus.RUNNING) {
log.error(errorMessage, jobGraph.getJobID());
} else {
log.debug(errorMessage, jobGraph.getJobID());
}
}
}
示例13
@Override
public CompletableFuture<String> triggerSavepoint(
@Nullable final String targetDirectory,
final boolean cancelJob,
final Time timeout) {
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
if (checkpointCoordinator == null) {
return FutureUtils.completedExceptionally(new IllegalStateException(
String.format("Job %s is not a streaming job.", jobGraph.getJobID())));
} else if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID());
return FutureUtils.completedExceptionally(new IllegalStateException(
"No savepoint directory configured. You can either specify a directory " +
"while cancelling via -s :targetDirectory or configure a cluster-wide " +
"default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'."));
}
if (cancelJob) {
checkpointCoordinator.stopCheckpointScheduler();
}
return checkpointCoordinator
.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
.thenApply(CompletedCheckpoint::getExternalPointer)
.handleAsync((path, throwable) -> {
if (throwable != null) {
if (cancelJob) {
startCheckpointScheduler(checkpointCoordinator);
}
throw new CompletionException(throwable);
} else if (cancelJob) {
log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
cancel(timeout);
}
return path;
}, getMainThreadExecutor());
}
示例14
private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) {
if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
try {
checkpointCoordinator.startCheckpointScheduler();
} catch (IllegalStateException ignored) {
// Concurrent shut down of the coordinator
}
}
}
示例15
/**
* Tries to restore the given {@link ExecutionGraph} from the provided {@link SavepointRestoreSettings}.
*
* @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored
* @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about the savepoint to restore from
* @throws Exception if the {@link ExecutionGraph} could not be restored
*/
private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) throws Exception {
if (savepointRestoreSettings.restoreSavepoint()) {
final CheckpointCoordinator checkpointCoordinator = executionGraphToRestore.getCheckpointCoordinator();
if (checkpointCoordinator != null) {
checkpointCoordinator.restoreSavepoint(
savepointRestoreSettings.getRestorePath(),
savepointRestoreSettings.allowNonRestoredState(),
executionGraphToRestore.getAllVertices(),
userCodeLoader);
}
}
}
示例16
private void onTerminalState(JobStatus status) {
try {
CheckpointCoordinator coord = this.checkpointCoordinator;
this.checkpointCoordinator = null;
if (coord != null) {
coord.shutdown(status);
}
}
catch (Exception e) {
LOG.error("Error while cleaning up after execution", e);
}
finally {
terminationFuture.complete(status);
}
}
示例17
/**
* Tries to restore the given {@link ExecutionGraph} from the provided {@link SavepointRestoreSettings}.
*
* @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored
* @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about the savepoint to restore from
* @throws Exception if the {@link ExecutionGraph} could not be restored
*/
private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) throws Exception {
if (savepointRestoreSettings.restoreSavepoint()) {
final CheckpointCoordinator checkpointCoordinator = executionGraphToRestore.getCheckpointCoordinator();
if (checkpointCoordinator != null) {
checkpointCoordinator.restoreSavepoint(
savepointRestoreSettings.getRestorePath(),
savepointRestoreSettings.allowNonRestoredState(),
executionGraphToRestore.getAllVertices(),
userCodeLoader);
}
}
}
示例18
@Override
public CompletableFuture<String> triggerSavepoint(final String targetDirectory, final boolean cancelJob) {
mainThreadExecutor.assertRunningInMainThread();
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
if (checkpointCoordinator == null) {
throw new IllegalStateException(
String.format("Job %s is not a streaming job.", jobGraph.getJobID()));
} else if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID());
throw new IllegalStateException(
"No savepoint directory configured. You can either specify a directory " +
"while cancelling via -s :targetDirectory or configure a cluster-wide " +
"default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.");
}
if (cancelJob) {
checkpointCoordinator.stopCheckpointScheduler();
}
return checkpointCoordinator
.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
.thenApply(CompletedCheckpoint::getExternalPointer)
.handleAsync((path, throwable) -> {
if (throwable != null) {
if (cancelJob) {
startCheckpointScheduler(checkpointCoordinator);
}
throw new CompletionException(throwable);
} else if (cancelJob) {
log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
cancel();
}
return path;
}, mainThreadExecutor);
}
示例19
private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) {
mainThreadExecutor.assertRunningInMainThread();
if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
try {
checkpointCoordinator.startCheckpointScheduler();
} catch (IllegalStateException ignored) {
// Concurrent shut down of the coordinator
}
}
}
示例20
@Override
public void acknowledgeCheckpoint(final JobID jobID, final ExecutionAttemptID executionAttemptID, final long checkpointId, final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot checkpointState) {
mainThreadExecutor.assertRunningInMainThread();
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
jobID,
executionAttemptID,
checkpointId,
checkpointMetrics,
checkpointState);
final String taskManagerLocationInfo = retrieveTaskManagerLocation(executionAttemptID);
if (checkpointCoordinator != null) {
ioExecutor.execute(() -> {
try {
checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, taskManagerLocationInfo);
} catch (Throwable t) {
log.warn("Error while processing checkpoint acknowledgement message", t);
}
});
} else {
String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator";
if (executionGraph.getState() == JobStatus.RUNNING) {
log.error(errorMessage, jobGraph.getJobID());
} else {
log.debug(errorMessage, jobGraph.getJobID());
}
}
}
示例21
private void onTerminalState(JobStatus status) {
try {
CheckpointCoordinator coord = this.checkpointCoordinator;
this.checkpointCoordinator = null;
if (coord != null) {
coord.shutdown(status);
}
}
catch (Exception e) {
LOG.error("Error while cleaning up after execution", e);
}
finally {
terminationFuture.complete(status);
}
}
示例22
/**
* Attach pending checkpoints of chk-42 and chk-43 to the execution graph.
* If {@link #acknowledgeAllCheckpoints(CheckpointCoordinator, Iterator)} called then,
* chk-42 would become the completed checkpoint.
*/
private void attachPendingCheckpoints(ExecutionGraph eg) throws IOException {
final Map<Long, PendingCheckpoint> pendingCheckpoints = new HashMap<>();
final Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm = new HashMap<>();
eg.getAllExecutionVertices().forEach(e -> {
Execution ee = e.getCurrentExecutionAttempt();
if (ee != null) {
verticesToConfirm.put(ee.getAttemptId(), e);
}
});
CheckpointCoordinator checkpointCoordinator = eg.getCheckpointCoordinator();
assertNotNull(checkpointCoordinator);
CheckpointStorageCoordinatorView checkpointStorage = checkpointCoordinator.getCheckpointStorage();
pendingCheckpoints.put(checkpointId, new PendingCheckpoint(
eg.getJobID(),
checkpointId,
0L,
verticesToConfirm,
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
checkpointStorage.initializeLocationForCheckpoint(checkpointId),
eg.getFutureExecutor()));
long newCheckpointId = checkpointId + 1;
pendingCheckpoints.put(newCheckpointId, new PendingCheckpoint(
eg.getJobID(),
newCheckpointId,
0L,
verticesToConfirm,
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
checkpointStorage.initializeLocationForCheckpoint(newCheckpointId),
eg.getFutureExecutor()));
Whitebox.setInternalState(checkpointCoordinator, "pendingCheckpoints", pendingCheckpoints);
}
示例23
@Test
public void abortPendingCheckpointsWhenRestartingTasks() throws Exception {
final JobGraph jobGraph = createStreamingJobGraph();
final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
final Iterator<ExecutionVertex> vertexIterator = executionGraph.getAllExecutionVertices().iterator();
final ExecutionVertex firstExecutionVertex = vertexIterator.next();
setTasksRunning(executionGraph, firstExecutionVertex, vertexIterator.next());
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
checkState(checkpointCoordinator != null);
checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), false);
assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
long checkpointId = checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jobGraph.getJobID(),
firstExecutionVertex.getCurrentExecutionAttempt().getAttemptId(),
checkpointId);
// let the first vertex acknowledge the checkpoint, and fail it afterwards
// the failover strategy should then cancel all pending checkpoints on restart
checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown location");
assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
failVertex(firstExecutionVertex);
assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
manualMainThreadExecutor.triggerScheduledTasks();
assertNoPendingCheckpoints(checkpointCoordinator);
}
示例24
/**
* Tries to restore the given {@link ExecutionGraph} from the provided {@link SavepointRestoreSettings}.
*
* @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored
* @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about the savepoint to restore from
* @throws Exception if the {@link ExecutionGraph} could not be restored
*/
private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) throws Exception {
if (savepointRestoreSettings.restoreSavepoint()) {
final CheckpointCoordinator checkpointCoordinator = executionGraphToRestore.getCheckpointCoordinator();
if (checkpointCoordinator != null) {
checkpointCoordinator.restoreSavepoint(
savepointRestoreSettings.getRestorePath(),
savepointRestoreSettings.allowNonRestoredState(),
executionGraphToRestore.getAllVertices(),
userCodeLoader);
}
}
}
示例25
@Override
public CompletableFuture<String> triggerSavepoint(final String targetDirectory, final boolean cancelJob) {
mainThreadExecutor.assertRunningInMainThread();
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
if (checkpointCoordinator == null) {
throw new IllegalStateException(
String.format("Job %s is not a streaming job.", jobGraph.getJobID()));
} else if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID());
throw new IllegalStateException(
"No savepoint directory configured. You can either specify a directory " +
"while cancelling via -s :targetDirectory or configure a cluster-wide " +
"default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.");
}
log.info("Triggering {}savepoint for job {}.", cancelJob ? "cancel-with-" : "", jobGraph.getJobID());
if (cancelJob) {
checkpointCoordinator.stopCheckpointScheduler();
}
return checkpointCoordinator
.triggerSavepoint(targetDirectory)
.thenApply(CompletedCheckpoint::getExternalPointer)
.handleAsync((path, throwable) -> {
if (throwable != null) {
if (cancelJob) {
startCheckpointScheduler(checkpointCoordinator);
}
throw new CompletionException(throwable);
} else if (cancelJob) {
log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
cancel();
}
return path;
}, mainThreadExecutor);
}
示例26
private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) {
mainThreadExecutor.assertRunningInMainThread();
if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
try {
checkpointCoordinator.startCheckpointScheduler();
} catch (IllegalStateException ignored) {
// Concurrent shut down of the coordinator
}
}
}
示例27
@Override
public void acknowledgeCheckpoint(final JobID jobID, final ExecutionAttemptID executionAttemptID, final long checkpointId, final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot checkpointState) {
mainThreadExecutor.assertRunningInMainThread();
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
jobID,
executionAttemptID,
checkpointId,
checkpointMetrics,
checkpointState);
final String taskManagerLocationInfo = retrieveTaskManagerLocation(executionAttemptID);
if (checkpointCoordinator != null) {
ioExecutor.execute(() -> {
try {
checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, taskManagerLocationInfo);
} catch (Throwable t) {
log.warn("Error while processing checkpoint acknowledgement message", t);
}
});
} else {
String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator";
if (executionGraph.getState() == JobStatus.RUNNING) {
log.error(errorMessage, jobGraph.getJobID());
} else {
log.debug(errorMessage, jobGraph.getJobID());
}
}
}
示例28
public static void acknowledgePendingCheckpoint(final DefaultScheduler scheduler, final long checkpointId) throws CheckpointException {
final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
final JobID jid = scheduler.getJobId();
for (ExecutionAttemptID attemptId : getAllCurrentExecutionAttempts(scheduler)) {
final AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jid, attemptId, checkpointId);
checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown location");
}
}
示例29
public static CompletedCheckpoint takeCheckpoint(DefaultScheduler scheduler) throws Exception {
final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
checkpointCoordinator.triggerCheckpoint(false);
assertEquals("test setup inconsistent", 1, checkpointCoordinator.getNumberOfPendingCheckpoints());
final PendingCheckpoint checkpoint = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
final CompletableFuture<CompletedCheckpoint> future = checkpoint.getCompletionFuture();
acknowledgePendingCheckpoint(scheduler, checkpoint.getCheckpointId());
CompletedCheckpoint completed = future.getNow(null);
assertNotNull("checkpoint not complete", completed);
return completed;
}
示例30
@Test
public void restoreStateWhenRestartingTasks() throws Exception {
final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
enableCheckpointing(jobGraph);
final CountDownLatch checkpointTriggeredLatch = getCheckpointTriggeredLatch();
final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
final ArchivedExecutionVertex onlyExecutionVertex = Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
final ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.RUNNING));
final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
// register a stateful master hook to help verify state restore
final TestMasterHook masterHook = TestMasterHook.fromId("testHook");
checkpointCoordinator.addMasterHook(masterHook);
// complete one checkpoint for state restore
checkpointCoordinator.triggerCheckpoint(false);
checkpointTriggeredLatch.await();
final long checkpointId = checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
acknowledgePendingCheckpoint(scheduler, checkpointId);
scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED));
taskRestartExecutor.triggerScheduledTasks();
assertThat(masterHook.getRestoreCount(), is(equalTo(1)));
}