Skip to content

Commit

Permalink
perf: migrate to graphile worker v0.16.6 (#1097)
Browse files Browse the repository at this point in the history
* migrate to graphile worker v0.16.6

* remove stale docs link

* fix jobs cleanup query
  • Loading branch information
nicktrn committed May 10, 2024
1 parent c644912 commit 1642fd7
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 88 deletions.
4 changes: 4 additions & 0 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ Worker.init().catch((error) => {

function logError(error: unknown, request?: Request) {
console.error(error);

if (error instanceof Error && error.message.startsWith("There are locked jobs present")) {
console.log("⚠️ graphile-worker migration issue detected!");
}
}

const sqsEventConsumer = singleton("sqsEventConsumer", getSharedSqsEventConsumer);
Expand Down
81 changes: 61 additions & 20 deletions apps/webapp/app/platform/zodWorker.server.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import type {
CronItem,
CronItemOptions,
Job as GraphileJob,
DbJob as GraphileJob,
Runner as GraphileRunner,
JobHelpers,
RunnerOptions,
Task,
TaskList,
TaskSpec,
WorkerUtils,
} from "graphile-worker";
import { run as graphileRun, parseCronItems } from "graphile-worker";
import { run as graphileRun, makeWorkerUtils, parseCronItems } from "graphile-worker";
import { SpanKind, trace } from "@opentelemetry/api";

import omit from "lodash.omit";
import { z } from "zod";
import { PrismaClient, PrismaClientOrTransaction } from "~/db.server";
import { $replica, PrismaClient, PrismaClientOrTransaction } from "~/db.server";
import { PgListenService } from "~/services/db/pgListen.server";
import { workerLogger as logger } from "~/services/logger.server";
import { flattenAttributes } from "@trigger.dev/core/v3";
Expand All @@ -34,8 +35,8 @@ const RawCronPayloadSchema = z.object({

const GraphileJobSchema = z.object({
id: z.coerce.string(),
queue_name: z.string().nullable(),
task_identifier: z.string(),
job_queue_id: z.number().nullable(),
task_id: z.number(),
payload: z.unknown(),
priority: z.number(),
run_at: z.coerce.date(),
Expand Down Expand Up @@ -72,7 +73,7 @@ type RecurringTaskPayload = {

export type ZodRecurringTasks = {
[key: string]: {
pattern: string;
match: string;
options?: CronItemOptions;
handler: (payload: RecurringTaskPayload, job: GraphileJob) => Promise<void>;
};
Expand Down Expand Up @@ -129,6 +130,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
#rateLimiter?: ZodWorkerRateLimiter;
#shutdownTimeoutInMs?: number;
#shuttingDown = false;
#workerUtils?: WorkerUtils;

constructor(options: ZodWorkerOptions<TMessageCatalog>) {
this.#name = options.name;
Expand Down Expand Up @@ -158,6 +160,8 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {

const parsedCronItems = parseCronItems(this.#createCronItemsFromRecurringTasks());

this.#workerUtils = await makeWorkerUtils(this.#runnerOptions);

this.#runner = await graphileRun({
...this.#runnerOptions,
noHandleSignals: true,
Expand Down Expand Up @@ -188,7 +192,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
this.#logDebug("Detected incoming migration", { latestMigration });

if (latestMigration > 10) {
// already migrated past v0.14 - nothing to do
this.#logDebug("Already migrated past v0.14 - nothing to do", { latestMigration });
return;
}

Expand Down Expand Up @@ -263,6 +267,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {

public async stop() {
await this.#runner?.stop();
await this.#workerUtils?.release();
}

public async enqueue<K extends keyof TMessageCatalog>(
Expand Down Expand Up @@ -442,12 +447,29 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
return taskList;
}

async #getQueueName(queueId: number | null) {
if (queueId === null) {
return;
}

const schema = z.array(z.object({ queue_name: z.string() }));

const rawQueueNameResults = await $replica.$queryRawUnsafe(
`SELECT queue_name FROM ${this.graphileWorkerSchema}._private_job_queues WHERE id = $1`,
queueId
);

const queueNameResults = schema.parse(rawQueueNameResults);

return queueNameResults[0]?.queue_name;
}

async #rescheduleTask(payload: unknown, helpers: JobHelpers) {
this.#logDebug("Rescheduling task", { payload, job: helpers.job });

await this.enqueue(helpers.job.task_identifier, payload, {
runAt: helpers.job.run_at,
queueName: helpers.job.queue_name ?? undefined,
queueName: await this.#getQueueName(helpers.job.job_queue_id),
priority: helpers.job.priority,
jobKey: helpers.job.key ?? undefined,
flags: Object.keys(helpers.job.flags ?? []),
Expand All @@ -460,7 +482,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {

if (this.#cleanup) {
cronItems.push({
pattern: this.#cleanup.frequencyExpression,
match: this.#cleanup.frequencyExpression,
identifier: CLEANUP_TASK_NAME,
task: CLEANUP_TASK_NAME,
options: this.#cleanup.taskOptions,
Expand All @@ -469,7 +491,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {

if (this.#reporter) {
cronItems.push({
pattern: "50 * * * *", // Every hour at 50 minutes past the hour
match: "50 * * * *", // Every hour at 50 minutes past the hour
identifier: REPORTER_TASK_NAME,
task: REPORTER_TASK_NAME,
});
Expand All @@ -481,7 +503,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {

for (const [key, task] of Object.entries(this.#recurringTasks)) {
const cronItem: CronItem = {
pattern: task.pattern,
match: task.match,
identifier: key,
task: key,
options: task.options,
Expand Down Expand Up @@ -529,7 +551,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
attributes: {
"job.task_identifier": job.task_identifier,
"job.id": job.id,
...(job.queue_name ? { "job.queue_name": job.queue_name } : {}),
...(job.job_queue_id ? { "job.queue_id": job.job_queue_id } : {}),
...flattenAttributes(job.payload as Record<string, unknown>, "job.payload"),
"job.priority": job.priority,
"job.run_at": job.run_at.toISOString(),
Expand Down Expand Up @@ -599,7 +621,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
attributes: {
"job.task_identifier": job.task_identifier,
"job.id": job.id,
...(job.queue_name ? { "job.queue_name": job.queue_name } : {}),
...(job.job_queue_id ? { "job.queue_id": job.job_queue_id } : {}),
...flattenAttributes(job.payload as Record<string, unknown>, "job.payload"),
"job.priority": job.priority,
"job.run_at": job.run_at.toISOString(),
Expand Down Expand Up @@ -638,6 +660,10 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
return;
}

if (!this.#workerUtils) {
throw new Error("WorkerUtils need to be initialized before running job cleanup.");
}

const job = helpers.job;

logger.debug("Received cleanup task", {
Expand All @@ -663,23 +689,38 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
payload,
});

const rawResults = await this.#prisma.$queryRawUnsafe(
`WITH rows AS (SELECT id FROM ${this.graphileWorkerSchema}.jobs WHERE run_at < $1 AND locked_at IS NULL AND max_attempts = attempts LIMIT $2 FOR UPDATE) DELETE FROM ${this.graphileWorkerSchema}.jobs WHERE id IN (SELECT id FROM rows) RETURNING id`,
const rawResults = await $replica.$queryRawUnsafe(
`SELECT id
FROM ${this.graphileWorkerSchema}.jobs
WHERE run_at < $1
AND locked_at IS NULL
AND max_attempts = attempts
LIMIT $2`,
expirationDate,
this.#cleanup.maxCount
);

const results = Array.isArray(rawResults) ? rawResults : [];
const results = z
.array(
z.object({
id: z.coerce.string(),
})
)
.parse(rawResults);

const completedJobs = await this.#workerUtils.completeJobs(results.map((job) => job.id));

logger.debug("Cleaned up old jobs", {
count: results.length,
found: results.length,
deleted: completedJobs.length,
expirationDate,
payload,
});

if (this.#reporter) {
await this.#reporter("cleanup_stats", {
count: results.length,
found: results.length,
deleted: completedJobs.length,
expirationDate,
ts: payload._cron.ts,
});
Expand Down Expand Up @@ -711,7 +752,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
const schema = z.array(z.object({ count: z.coerce.number() }));

// Count the number of jobs that have been added since the startAt date and before the payload._cron.ts date
const rawAddedResults = await this.#prisma.$queryRawUnsafe(
const rawAddedResults = await $replica.$queryRawUnsafe(
`SELECT COUNT(*) FROM ${this.graphileWorkerSchema}.jobs WHERE created_at > $1 AND created_at < $2`,
startAt,
payload._cron.ts
Expand All @@ -720,7 +761,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
const addedCountResults = schema.parse(rawAddedResults)[0];

// Count the total number of jobs in the jobs table
const rawTotalResults = await this.#prisma.$queryRawUnsafe(
const rawTotalResults = await $replica.$queryRawUnsafe(
`SELECT COUNT(*) FROM ${this.graphileWorkerSchema}.jobs`
);

Expand Down
95 changes: 95 additions & 0 deletions apps/webapp/app/services/db/graphileMigrationHelper.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { runMigrations } from "graphile-worker";
import { PrismaClient, prisma } from "~/db.server";
import { env } from "~/env.server";
import { logger } from "~/services/logger.server";
import { PgNotifyService } from "./pgNotify.server";
import { z } from "zod";

export class GraphileMigrationHelperService {
#prismaClient: PrismaClient;

constructor(prismaClient: PrismaClient = prisma) {
this.#prismaClient = prismaClient;
}

public async call() {
this.#logDebug("GraphileMigrationHelperService.call");

await this.#detectAndPrepareForMigrations();

await runMigrations({
connectionString: env.DATABASE_URL,
schema: env.WORKER_SCHEMA,
});
}

#logDebug(message: string, args?: any) {
logger.debug(`[migrationHelper] ${message}`, args);
}

async #getLatestMigration() {
const migrationQueryResult = await this.#prismaClient.$queryRawUnsafe(`
SELECT id FROM ${env.WORKER_SCHEMA}.migrations
ORDER BY id DESC LIMIT 1
`);

const MigrationQueryResultSchema = z.array(z.object({ id: z.number() }));

const migrationResults = MigrationQueryResultSchema.parse(migrationQueryResult);

if (!migrationResults.length) {
// no migrations applied yet
return -1;
}

return migrationResults[0].id;
}

async #graphileSchemaExists() {
const schemaCount = await this.#prismaClient.$executeRaw`
SELECT schema_name FROM information_schema.schemata
WHERE schema_name = ${env.WORKER_SCHEMA}
`;

return schemaCount === 1;
}

/** Helper for graphile-worker v0.14.0 migration. No-op if already migrated. */
async #detectAndPrepareForMigrations() {
if (!(await this.#graphileSchemaExists())) {
// no schema yet, likely first start
return;
}

const latestMigration = await this.#getLatestMigration();

if (latestMigration < 0) {
// no migrations found
return;
}

// the first v0.14.0 migration has ID 11
if (latestMigration > 10) {
// already migrated
return;
}

// add 15s to graceful shutdown timeout, just to be safe
const migrationDelayInMs = env.GRACEFUL_SHUTDOWN_TIMEOUT + 15000;

this.#logDebug("Delaying worker startup due to pending migration", {
latestMigration,
migrationDelayInMs,
});

console.log(`⚠️ detected pending graphile migration`);
console.log(`⚠️ notifying running workers`);

const pgNotify = new PgNotifyService();
await pgNotify.call("trigger:graphile:migrate", { latestMigration });

console.log(`⚠️ delaying worker startup by ${migrationDelayInMs}ms`);

await new Promise((resolve) => setTimeout(resolve, migrationDelayInMs));
}
}
12 changes: 6 additions & 6 deletions apps/webapp/app/services/worker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import { TriggerScheduledTaskService } from "~/v3/services/triggerScheduledTask.
import { PerformTaskAttemptAlertsService } from "~/v3/services/alerts/performTaskAttemptAlerts.server";
import { DeliverAlertService } from "~/v3/services/alerts/deliverAlert.server";
import { PerformDeploymentAlertsService } from "~/v3/services/alerts/performDeploymentAlerts.server";
import { GraphileMigrationHelperService } from "./db/graphileMigrationHelper.server";

const workerCatalog = {
indexEndpoint: z.object({
Expand Down Expand Up @@ -211,9 +212,8 @@ if (env.NODE_ENV === "production") {
}

export async function init() {
// const pgNotify = new PgNotifyService();
// await pgNotify.call("trigger:graphile:migrate", { latestMigration: 10 });
// await new Promise((resolve) => setTimeout(resolve, 10000))
const migrationHelper = new GraphileMigrationHelperService();
await migrationHelper.call();

if (env.WORKER_ENABLED === "true") {
await workerQueue.initialize();
Expand Down Expand Up @@ -250,7 +250,7 @@ function getWorkerQueue() {
recurringTasks: {
// Run this every 5 minutes
autoIndexProductionEndpoints: {
pattern: "*/5 * * * *",
match: "*/5 * * * *",
handler: async (payload, job) => {
const service = new RecurringEndpointIndexService();

Expand All @@ -259,7 +259,7 @@ function getWorkerQueue() {
},
// Run this every hour
purgeOldIndexings: {
pattern: "0 * * * *",
match: "0 * * * *",
handler: async (payload, job) => {
// Delete indexings that are older than 7 days
await prisma.endpointIndex.deleteMany({
Expand All @@ -273,7 +273,7 @@ function getWorkerQueue() {
},
// Run this every hour at the 13 minute mark
purgeOldTaskEvents: {
pattern: "47 * * * *",
match: "47 * * * *",
handler: async (payload, job) => {
await eventRepository.truncateEvents();
},
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
"evt": "^2.4.13",
"express": "^4.18.1",
"framer-motion": "^10.12.11",
"graphile-worker": "^0.13.0",
"graphile-worker": "0.16.6",
"highlight.run": "^7.3.4",
"humanize-duration": "^3.27.3",
"intl-parse-accept-language": "^1.0.0",
Expand Down

0 comments on commit 1642fd7

Please sign in to comment.