Skip to content

Commit

Permalink
Merge branch 'v3/fix-checkpoint-failures' into v3/worker-attempt-crea…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
nicktrn committed May 8, 2024
2 parents 55cd522 + e6cea79 commit 2d30f86
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 52 deletions.
5 changes: 5 additions & 0 deletions .changeset/warm-olives-provide.md
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Improve handling of IPC timeouts and fix checkpoint cancellation after failures
141 changes: 101 additions & 40 deletions apps/coordinator/src/index.ts
@@ -1,5 +1,5 @@
import { createServer } from "node:http";
import { $ } from "execa";
import { $, type ExecaChildProcess } from "execa";
import { nanoid } from "nanoid";
import { Server } from "socket.io";
import {
Expand All @@ -19,6 +19,7 @@ collectDefaultMetrics();
const HTTP_SERVER_PORT = Number(process.env.HTTP_SERVER_PORT || 8020);
const NODE_NAME = process.env.NODE_NAME || "coordinator";
const DEFAULT_RETRY_DELAY_THRESHOLD_IN_MS = 30_000;
const CHAOS_MONKEY_ENABLED = !!process.env.CHAOS_MONKEY_ENABLED;

const REGISTRY_HOST = process.env.REGISTRY_HOST || "localhost:5000";
const CHECKPOINT_PATH = process.env.CHECKPOINT_PATH || "/checkpoints";
Expand All @@ -32,6 +33,10 @@ const SECURE_CONNECTION = ["1", "true"].includes(process.env.SECURE_CONNECTION ?

const logger = new SimpleLogger(`[${NODE_NAME}]`);

if (CHAOS_MONKEY_ENABLED) {
logger.log("🍌 Chaos monkey enabled");
}

type CheckpointerInitializeReturn = {
canCheckpoint: boolean;
willSimulate: boolean;
Expand All @@ -49,13 +54,18 @@ type CheckpointData = {
docker: boolean;
};

function isExecaChildProcess(maybeExeca: unknown): maybeExeca is Awaited<ExecaChildProcess> {
return typeof maybeExeca === "object" && maybeExeca !== null && "escapedCommand" in maybeExeca;
}

class Checkpointer {
#initialized = false;
#canCheckpoint = false;
#dockerMode = !process.env.KUBERNETES_PORT;

#logger = new SimpleLogger("[checkptr]");
#abortControllers = new Map<string, AbortController>();
#failedCheckpoints = new Map<string, unknown>();

constructor(private opts = { forceSimulate: false }) {}

Expand Down Expand Up @@ -150,14 +160,25 @@ class Checkpointer {
success: !!result,
});

return result;
if (!result.success) {
return;
}

return result.checkpoint;
}

isCheckpointing(runId: string) {
return this.#abortControllers.has(runId);
}

cancelCheckpoint(runId: string): boolean {
// If the last checkpoint failed, pretend we canceled it
// This ensures tasks don't wait for external resume messages to continue
if (this.#hasFailedCheckpoint(runId)) {
this.#clearFailedCheckpoint(runId);
return true;
}

const controller = this.#abortControllers.get(runId);

if (!controller) {
Expand All @@ -176,44 +197,58 @@ class Checkpointer {
leaveRunning = true, // This mirrors kubernetes behaviour more accurately
projectRef,
deploymentVersion,
}: CheckpointAndPushOptions): Promise<CheckpointData | undefined> {
}: CheckpointAndPushOptions): Promise<
{ success: true; checkpoint: CheckpointData } | { success: false; reason?: "CANCELED" }
> {
await this.initialize();

const options = {
runId,
leaveRunning,
projectRef,
deploymentVersion,
};

if (!this.#dockerMode && !this.#canCheckpoint) {
this.#logger.error("No checkpoint support. Simulation requires docker.");
return;
return { success: false };
}

if (this.#abortControllers.has(runId)) {
logger.error("Checkpoint procedure already in progress", {
options: {
runId,
leaveRunning,
projectRef,
deploymentVersion,
},
});
return;
logger.error("Checkpoint procedure already in progress", { options });
return { success: false };
}

// This is a new checkpoint, clear any last failure for this run
this.#clearFailedCheckpoint(runId);

const controller = new AbortController();
this.#abortControllers.set(runId, controller);

const $$ = $({ signal: controller.signal });

try {
if (CHAOS_MONKEY_ENABLED) {
console.log("🍌 Chaos monkey wreaking havoc");

const random = Math.random();

if (random < 0.33) {
// Fake long checkpoint duration
await $$`sleep 300`;
} else if (random < 0.66) {
// Fake checkpoint error
await $$`false`;
} else {
// no-op
}
}

const shortCode = nanoid(8);
const imageRef = this.#getImageRef(projectRef, deploymentVersion, shortCode);
const exportLocation = this.#getExportLocation(projectRef, deploymentVersion, shortCode);

this.#logger.log("Checkpointing:", {
options: {
runId,
leaveRunning,
projectRef,
deploymentVersion,
},
});
this.#logger.log("Checkpointing:", { options });

const containterName = this.#getRunContainerName(runId);

Expand All @@ -234,9 +269,9 @@ class Checkpointer {
);
}
}
} catch (error: any) {
this.#logger.error(error.stderr);
return;
} catch (error) {
this.#logger.error("Failed while creating docker checkpoint", { exportLocation });
throw error;
}

this.#logger.log("checkpoint created:", {
Expand All @@ -245,8 +280,11 @@ class Checkpointer {
});

return {
location: exportLocation,
docker: true,
success: true,
checkpoint: {
location: exportLocation,
docker: true,
},
};
}

Expand Down Expand Up @@ -291,29 +329,52 @@ class Checkpointer {
// this.#logger.log("Deleted checkpoint image", { imageRef });
} catch (error) {
this.#logger.error("Failed during checkpoint cleanup", { exportLocation });
this.#logger.debug(error);
throw error;
}

return {
location: imageRef,
docker: false,
success: true,
checkpoint: {
location: imageRef,
docker: false,
},
};
} catch (error) {
this.#logger.error("checkpoint failed", {
options: {
runId,
leaveRunning,
projectRef,
deploymentVersion,
},
error,
});
return;
if (isExecaChildProcess(error)) {
if (error.isCanceled) {
this.#logger.error("Checkpoint canceled", { options, error });

return { success: false, reason: "CANCELED" };
}

// Everything that's not a cancellation is a failure
this.#failCheckpoint(runId, error);
this.#logger.error("Checkpoint command error", { options, error });

return { success: false };
}

this.#failCheckpoint(runId, error);
this.#logger.error("Unhandled checkpoint error", { options, error });

return { success: false };
} finally {
this.#abortControllers.delete(runId);
}
}

#failCheckpoint(runId: string, error: unknown) {
this.#failedCheckpoints.set(runId, error);
}

#clearFailedCheckpoint(runId: string) {
this.#failedCheckpoints.delete(runId);
}

#hasFailedCheckpoint(runId: string) {
return this.#failedCheckpoints.has(runId);
}

#getRunContainerName(suffix: string) {
return `task-run-${suffix}`;
}
Expand Down Expand Up @@ -1011,7 +1072,7 @@ class TaskCoordinator {
return provider;
}

#cancelCheckpoint(runId: string) {
#cancelCheckpoint(runId: string): boolean {
const checkpointWait = this.#checkpointableTasks.get(runId);

if (checkpointWait) {
Expand Down
38 changes: 26 additions & 12 deletions packages/core/src/v3/runtime/prodRuntimeManager.ts
Expand Up @@ -55,10 +55,14 @@ export class ProdRuntimeManager implements RuntimeManager {
this._waitForDuration = { resolve, reject };
});

const { willCheckpointAndRestore } = await this.ipc.sendWithAck("WAIT_FOR_DURATION", {
ms,
now,
});
const { willCheckpointAndRestore } = await this.ipc.sendWithAck(
"WAIT_FOR_DURATION",
{
ms,
now,
},
10_000
);

if (!willCheckpointAndRestore) {
await internalTimeout;
Expand All @@ -74,14 +78,24 @@ export class ProdRuntimeManager implements RuntimeManager {
// Resets the clock to the current time
clock.reset();

// The coordinator should cancel any in-progress checkpoints
const { checkpointCanceled, version } = await this.ipc.sendWithAck("CANCEL_CHECKPOINT", {
version: "v2",
reason: "WAIT_FOR_DURATION",
});

if (checkpointCanceled) {
// There won't be a checkpoint or external resume and we've already completed our internal timeout
try {
// The coordinator should cancel any in-progress checkpoints
const { checkpointCanceled, version } = await this.ipc.sendWithAck(
"CANCEL_CHECKPOINT",
{
version: "v2",
reason: "WAIT_FOR_DURATION",
},
10_000
);

if (checkpointCanceled) {
// There won't be a checkpoint or external resume and we've already completed our internal timeout
return;
}
} catch (error) {
// If the cancellation times out, we will proceed as if the checkpoint was canceled
logger.debug("Checkpoint cancellation timed out", { error });
return;
}

Expand Down

0 comments on commit 2d30f86

Please sign in to comment.