Skip to content

Commit

Permalink
v3: automatic pod cleanup (#1092)
Browse files Browse the repository at this point in the history
* add automatic pod cleaner

* fix task monitor namespace override
  • Loading branch information
nicktrn committed May 9, 2024
1 parent cfe4b54 commit 2306217
Show file tree
Hide file tree
Showing 3 changed files with 279 additions and 0 deletions.
9 changes: 9 additions & 0 deletions apps/kubernetes-provider/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
import { Machine, PostStartCauses, PreStopCauses, EnvironmentType } from "@trigger.dev/core/v3";
import { randomUUID } from "crypto";
import { TaskMonitor } from "./taskMonitor";
import { PodCleaner } from "./podCleaner";

const RUNTIME_ENV = process.env.KUBERNETES_PORT ? "kubernetes" : "local";
const NODE_NAME = process.env.NODE_NAME || "local";
Expand Down Expand Up @@ -543,3 +544,11 @@ const taskMonitor = new TaskMonitor({
});

taskMonitor.start();

const podCleaner = new PodCleaner({
runtimeEnv: RUNTIME_ENV,
namespace: "default",
intervalInSeconds: 300,
});

podCleaner.start();
264 changes: 264 additions & 0 deletions apps/kubernetes-provider/src/podCleaner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
import * as k8s from "@kubernetes/client-node";
import { SimpleLogger } from "@trigger.dev/core-apps";

type PodCleanerOptions = {
runtimeEnv: "local" | "kubernetes";
namespace?: string;
intervalInSeconds?: number;
};

export class PodCleaner {
private enabled = false;
private namespace = "default";
private intervalInSeconds = 300;

private logger = new SimpleLogger("[PodCleaner]");
private k8sClient: {
core: k8s.CoreV1Api;
kubeConfig: k8s.KubeConfig;
};

constructor(private opts: PodCleanerOptions) {
if (opts.namespace) {
this.namespace = opts.namespace;
}

if (opts.intervalInSeconds) {
this.intervalInSeconds = opts.intervalInSeconds;
}

this.k8sClient = this.#createK8sClient();
}

#createK8sClient() {
const kubeConfig = new k8s.KubeConfig();

if (this.opts.runtimeEnv === "local") {
kubeConfig.loadFromDefault();
} else if (this.opts.runtimeEnv === "kubernetes") {
kubeConfig.loadFromCluster();
} else {
throw new Error(`Unsupported runtime environment: ${this.opts.runtimeEnv}`);
}

return {
core: kubeConfig.makeApiClient(k8s.CoreV1Api),
kubeConfig: kubeConfig,
};
}

#isRecord(candidate: unknown): candidate is Record<string, unknown> {
if (typeof candidate !== "object" || candidate === null) {
return false;
} else {
return true;
}
}

#logK8sError(err: unknown, debugOnly = false) {
if (debugOnly) {
this.logger.debug("K8s API Error", err);
} else {
this.logger.error("K8s API Error", err);
}
}

#handleK8sError(err: unknown) {
if (!this.#isRecord(err) || !this.#isRecord(err.body)) {
this.#logK8sError(err);
return;
}

this.#logK8sError(err, true);

if (typeof err.body.message === "string") {
this.#logK8sError({ message: err.body.message });
return;
}

this.#logK8sError({ body: err.body });
}

async #deletePods(opts: {
namespace: string;
dryRun?: boolean;
fieldSelector?: string;
labelSelector?: string;
}) {
return await this.k8sClient.core
.deleteCollectionNamespacedPod(
opts.namespace,
undefined, // pretty
undefined, // continue
opts.dryRun ? "All" : undefined,
opts.fieldSelector,
undefined, // gracePeriodSeconds
opts.labelSelector
)
.catch(this.#handleK8sError.bind(this));
}

async #deleteCompletedRuns() {
this.logger.log("Deleting completed runs");

const start = Date.now();

const result = await this.#deletePods({
namespace: this.namespace,
fieldSelector: "status.phase=Succeeded",
labelSelector: "app=task-run",
});

const elapsedMs = Date.now() - start;

if (!result) {
this.logger.log("Deleting completed runs: No delete result", { elapsedMs });
return;
}

const total = (result.response as any)?.body?.items?.length ?? 0;

this.logger.log("Deleting completed runs: Done", { total, elapsedMs });
}

async #deleteFailedRuns() {
this.logger.log("Deleting failed runs");

const start = Date.now();

const result = await this.#deletePods({
namespace: this.namespace,
fieldSelector: "status.phase=Failed",
labelSelector: "app=task-run",
});

const elapsedMs = Date.now() - start;

if (!result) {
this.logger.log("Deleting failed runs: No delete result", { elapsedMs });
return;
}

const total = (result.response as any)?.body?.items?.length ?? 0;

this.logger.log("Deleting failed runs: Done", { total, elapsedMs });
}

async #deleteUnrecoverableRuns() {
await this.#deletePods({
namespace: this.namespace,
fieldSelector: "status.phase=?",
labelSelector: "app=task-run",
});
}

async start() {
this.enabled = true;
this.logger.log("Starting");

const completedInterval = setInterval(async () => {
if (!this.enabled) {
clearInterval(completedInterval);
return;
}

try {
await this.#deleteCompletedRuns();
} catch (error) {
this.logger.error("Error deleting completed runs", error);
}
}, this.intervalInSeconds * 1000);

const failedInterval = setInterval(
async () => {
if (!this.enabled) {
clearInterval(failedInterval);
return;
}

try {
await this.#deleteFailedRuns();
} catch (error) {
this.logger.error("Error deleting completed runs", error);
}
},
// Use a longer interval for failed runs. This is only a backup in case the task monitor fails.
2 * this.intervalInSeconds * 1000
);

// this.#launchTests();
}

async stop() {
if (!this.enabled) {
return;
}

this.enabled = false;
this.logger.log("Shutting down..");
}

async #launchTests() {
const createPod = async (
container: k8s.V1Container,
name: string,
labels?: Record<string, string>
) => {
this.logger.log("Creating pod:", name);

const pod = {
metadata: {
name,
labels,
},
spec: {
restartPolicy: "Never",
automountServiceAccountToken: false,
terminationGracePeriodSeconds: 1,
containers: [container],
},
} satisfies k8s.V1Pod;

await this.k8sClient.core
.createNamespacedPod(this.namespace, pod)
.catch(this.#handleK8sError.bind(this));
};

const createIdlePod = async (name: string, labels?: Record<string, string>) => {
const container = {
name,
image: "docker.io/library/busybox",
command: ["sh"],
args: ["-c", "sleep infinity"],
} satisfies k8s.V1Container;

await createPod(container, name, labels);
};

const createCompletedPod = async (name: string, labels?: Record<string, string>) => {
const container = {
name,
image: "docker.io/library/busybox",
command: ["sh"],
args: ["-c", "true"],
} satisfies k8s.V1Container;

await createPod(container, name, labels);
};

const createFailedPod = async (name: string, labels?: Record<string, string>) => {
const container = {
name,
image: "docker.io/library/busybox",
command: ["sh"],
args: ["-c", "false"],
} satisfies k8s.V1Container;

await createPod(container, name, labels);
};

await createIdlePod("test-idle-1", { app: "task-run" });
await createFailedPod("test-failed-1", { app: "task-run" });
await createCompletedPod("test-completed-1", { app: "task-run" });
}
}
6 changes: 6 additions & 0 deletions apps/kubernetes-provider/src/taskMonitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ type TaskMonitorOptions = {

export class TaskMonitor {
#enabled = false;

#logger = new SimpleLogger("[TaskMonitor]");
#taskInformer: ReturnType<typeof k8s.makeInformer<k8s.V1Pod>>;
#processedPods = new Map<string, number>();
#queue = new PQueue({ concurrency: 10 });

#k8sClient: {
core: k8s.CoreV1Api;
kubeConfig: k8s.KubeConfig;
Expand All @@ -44,6 +46,10 @@ export class TaskMonitor {
private labelSelector = "app in (task-index, task-run)";

constructor(private opts: TaskMonitorOptions) {
if (opts.namespace) {
this.namespace = opts.namespace;
}

this.#k8sClient = this.#createK8sClient();

this.#taskInformer = this.#createTaskInformer();
Expand Down

0 comments on commit 2306217

Please sign in to comment.