Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Features: Batching and graphile-worker upgrade to 0.16.0-rc.1 #524

Open
wants to merge 102 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 98 commits
Commits
Show all changes
102 commits
Select commit Hold shift + click to select a range
3eea949
Upgrade graphile-worker
nicktrn Oct 2, 2023
e4630f6
CronItem.pattern to match
nicktrn Oct 2, 2023
8e887af
Update graphile job schema
nicktrn Oct 2, 2023
0400e02
Add FAIL_LOCKED_JOBS_ON_STARTUP
nicktrn Oct 2, 2023
6440e30
Add to .env.example
nicktrn Oct 2, 2023
e3c9559
Detect migration errors
nicktrn Oct 2, 2023
25d6478
Rename migration env var
nicktrn Oct 2, 2023
00eade2
Improve migration helper, don't require restart
nicktrn Oct 2, 2023
8efb0a5
Remove restart comment for now
nicktrn Oct 3, 2023
d20a49e
Add migration docs
nicktrn Oct 3, 2023
aab2c8b
Link to migration docs
nicktrn Oct 3, 2023
b45f9fb
Lowercase
nicktrn Oct 3, 2023
6c3921b
Merge branch 'main' into worker-upgrade
nicktrn Oct 3, 2023
4f1c52f
Merge branch 'main' into worker-upgrade
nicktrn Oct 16, 2023
86d6343
Fix merge
nicktrn Oct 16, 2023
823506c
Merge branch 'main' into worker-upgrade
nicktrn Oct 16, 2023
07af335
Merge branch 'main' into worker-upgrade
nicktrn Oct 24, 2023
1219cbf
Don't fail locked jobs, add migration delay
nicktrn Oct 24, 2023
0c449bf
Shut down existing workers prior to new migrations
nicktrn Oct 25, 2023
d156d62
Merge branch 'main' into worker-upgrade
nicktrn Oct 25, 2023
fd2ddb7
Latest lockfile
nicktrn Oct 25, 2023
6be9909
Replace migration error screenshot
nicktrn Oct 25, 2023
c2dce5b
Remove redundant env var
nicktrn Oct 25, 2023
37e25ac
Merge branch 'main' into worker-upgrade
nicktrn Oct 25, 2023
4fa7e96
Check graphile schema exists
nicktrn Oct 25, 2023
59e16a9
Upsert batch job function
nicktrn Oct 25, 2023
3eecc27
Abstract migration helper into service
nicktrn Oct 25, 2023
7cbc928
Tweak add_batch_job function
nicktrn Oct 26, 2023
2070b14
Fix batch function, more logging
nicktrn Oct 26, 2023
288341d
Fix batch function.. again
nicktrn Oct 26, 2023
d35ddfe
Add enqueueBatch method
nicktrn Oct 26, 2023
284acb2
Simulate batch jobs
nicktrn Oct 26, 2023
ab17db0
Merge branch 'main' into worker-upgrade
nicktrn Oct 26, 2023
d4a5922
Notify before migration delay
nicktrn Oct 26, 2023
82163c5
Merge branch 'main' into worker-upgrade
nicktrn Nov 24, 2023
6b12a92
Merge branch 'main' into worker-upgrade
nicktrn Nov 24, 2023
546588d
Merge branch 'main' into worker-upgrade
nicktrn Nov 27, 2023
1637d25
Fix migration error detection
nicktrn Nov 27, 2023
5e37d95
Merge branch 'main' into worker-upgrade
nicktrn Nov 27, 2023
f648fb6
Batch enqueue types
nicktrn Nov 27, 2023
285e7bb
Upgrade graphile to latest rc
nicktrn Nov 27, 2023
8ac89a0
Make batch enqueue only accept batch tasks
nicktrn Nov 28, 2023
01bd1af
Fix cleanup and simulate batch
nicktrn Nov 28, 2023
84c28bc
Fix custom batch job function
nicktrn Nov 28, 2023
4eef340
Add batch opts to event trigger
nicktrn Nov 28, 2023
de428f0
Lint
nicktrn Nov 28, 2023
f7872a4
Expose batch function on even triggers only
nicktrn Nov 28, 2023
f0d7b63
Add batching to event dispatch
nicktrn Nov 28, 2023
175b58c
Event trigger batching tests
nicktrn Nov 28, 2023
0d53568
Pass jobKey to queueName func
nicktrn Nov 29, 2023
5495f93
Batch support for Job Runs
nicktrn Nov 29, 2023
497bbbd
Update UI to support batch payloads
nicktrn Nov 29, 2023
57fe918
Update batch send example
nicktrn Nov 29, 2023
b26bf16
Merge branch 'main' into worker-upgrade
nicktrn Nov 29, 2023
d10a7d0
Shorten redisinsight container name
nicktrn Nov 29, 2023
531b9e7
Improve KV get types
nicktrn Nov 30, 2023
ccb6208
Webhook delivery batching and some fixes
nicktrn Nov 30, 2023
5aa448b
Fix payload size chunking
nicktrn Dec 4, 2023
32c6815
Fix batched payloads for n = 1
nicktrn Dec 4, 2023
9971267
Start fixing batched webhook delivery
nicktrn Dec 4, 2023
127514d
Lower default max interval
nicktrn Dec 4, 2023
563736d
Rename dispatch batcher task
nicktrn Dec 4, 2023
df92240
Merge branch 'main' into worker-upgrade
nicktrn Dec 4, 2023
6d914b6
Fix webhook delivery
nicktrn Dec 5, 2023
5bd1c20
Batched run UI
nicktrn Dec 5, 2023
ad1b512
Add read replica support
nicktrn Dec 5, 2023
fb81d90
Use read replica for jobs view queries
nicktrn Dec 5, 2023
60f47c4
Fix debug log
nicktrn Dec 5, 2023
cb09814
Simplify batch options
nicktrn Dec 5, 2023
5324f9a
Add shopify delete all webhooks job
nicktrn Dec 6, 2023
6006ba0
Batched payload types
nicktrn Dec 6, 2023
0227271
Merge branch 'main' into worker-upgrade
nicktrn Dec 6, 2023
da439ce
Improve tests and add one for batch chunking
nicktrn Dec 6, 2023
16ca511
Batched webhook delivery examples
nicktrn Dec 6, 2023
3738794
Batched webhook trigger catalog entry
nicktrn Dec 6, 2023
a9754a0
Add total events to trigger card
nicktrn Dec 6, 2023
6b702da
Migration comments and logging
nicktrn Dec 6, 2023
ac8d301
Adjust batch defaults and limits
nicktrn Dec 6, 2023
5f94edf
Document current event payload limit
nicktrn Dec 6, 2023
dca8f40
Batching docs
nicktrn Dec 6, 2023
1d677e1
Merge branch 'main' into worker-upgrade
nicktrn Dec 6, 2023
93d32d1
Fix merge
nicktrn Dec 6, 2023
3db8feb
Fix rerun
nicktrn Dec 6, 2023
d5ac342
Consider test and external account when batching
nicktrn Dec 6, 2023
9ea24f8
Context on job run
nicktrn Dec 7, 2023
61b7a7a
Fix testing package
nicktrn Dec 7, 2023
d330134
Remove redundant batch param
nicktrn Dec 7, 2023
b926ce8
Batch example respects event payload limit
nicktrn Dec 7, 2023
61cfe97
Move testing package paths to compilerOptions
nicktrn Dec 7, 2023
d8698e2
Changeset
nicktrn Dec 7, 2023
2afa199
Merge branch 'main' into worker-upgrade
nicktrn Dec 7, 2023
9c6f6aa
Consider total code length for highlight limit
nicktrn Dec 7, 2023
c250eb6
Use eventId not eventRecordId
nicktrn Dec 7, 2023
4cb87c5
Only include single event metadata when unbatched
nicktrn Dec 7, 2023
59a0370
Include run id in batch trigger example
nicktrn Dec 7, 2023
7a1485b
Release worker utils on shutdown
nicktrn Dec 7, 2023
8415eac
Use reschedule helper to handle full batches
nicktrn Dec 7, 2023
a051fac
Safely get queue name from ID
nicktrn Dec 7, 2023
9cb70df
Backward compatible webhook delivery
nicktrn Dec 8, 2023
d2cf1cc
Merge branch 'main' into worker-upgrade
nicktrn Dec 8, 2023
8edd2c4
Remove dead code
nicktrn Dec 8, 2023
f130fbd
Merge branch 'main' into worker-upgrade
nicktrn Dec 12, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/curvy-pens-cover.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@trigger.dev/airtable": patch
"@trigger.dev/shopify": patch
"@trigger.dev/sdk": patch
"@trigger.dev/testing": patch
"@trigger.dev/core": patch
---

Upgrade `graphile-worker` and add batch support
4 changes: 2 additions & 2 deletions apps/webapp/app/components/code/CodeBlock.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ export const CodeBlock = forwardRef<HTMLDivElement, CodeBlockProps>(
Array.from({ length: end - start + 1 }, (_, i) => start + i)
);

// if there are more than 1000 lines, don't highlight
const shouldHighlight = lineCount <= 1000;
// if there are more than 1000 lines or 20_000 characters, don't highlight
const shouldHighlight = lineCount <= 1000 && code.length <= 20_000;

return (
<div
Expand Down
8 changes: 7 additions & 1 deletion apps/webapp/app/components/run/RunOverview.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,13 @@ export function RunOverview({ run, trigger, showRerun, paths }: RunOverviewProps
<RunPanelHeader icon={trigger.icon} title={trigger.title} />
<RunPanelBody>
<RunPanelProperties
properties={[{ label: "Event name", text: run.event.name }]
properties={[
{ label: "Event name", text: run.event.name },
{ label: "Batched", text: String(run.batched) },
...(run.batched
? [{ label: "Total Events", text: String(run.eventIds.length) }]
: []),
]
.concat(
run.event.externalAccount
? [{ label: "Account ID", text: run.event.externalAccount.identifier }]
Expand Down
29 changes: 27 additions & 2 deletions apps/webapp/app/components/run/TriggerDetail.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,25 @@ import { DisplayProperty } from "@trigger.dev/core";

export function TriggerDetail({
trigger,
payload,
context,
event,
properties,
batched = false,
eventIds,
}: {
trigger: DetailedEvent;
payload: string;
context: string;
event: {
title: string;
icon: string;
};
properties: DisplayProperty[];
batched?: boolean;
eventIds?: string[];
}) {
const { id, name, payload, context, timestamp, deliveredAt } = trigger;
const { id, name, timestamp, deliveredAt } = trigger;

return (
<RunPanel selected={false}>
Expand All @@ -45,7 +53,18 @@ export function TriggerDetail({
/>
)}
<RunPanelIconProperty icon="id" label="Event name" value={name} />
<RunPanelIconProperty icon="account" label="Event ID" value={id} />
{batched ? (
<>
<RunPanelIconProperty icon="packages" label="Batched" value={String(batched)} />
<RunPanelIconProperty
icon="hash"
label="Total Events"
value={eventIds ? eventIds.length : 0}
/>
</>
) : (
<RunPanelIconProperty icon="account" label="Event ID" value={id} />
)}
{trigger.externalAccount && (
<RunPanelIconProperty
icon="account"
Expand All @@ -66,6 +85,12 @@ export function TriggerDetail({
<CodeBlock code={payload} />
<Header3>Context</Header3>
<CodeBlock code={context} />
{batched && eventIds && (
<>
<Header3>Event IDs</Header3>
<CodeBlock code={JSON.stringify(eventIds, null, 2)} />
</>
)}
</div>
</RunPanelBody>
</RunPanel>
Expand Down
82 changes: 71 additions & 11 deletions apps/webapp/app/db.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,22 @@ export { Prisma };

export const prisma = singleton("prisma", getClient);

export const $replica: Omit<PrismaClient, "$transaction"> = singleton(
"replica",
() => getReplicaClient() ?? prisma
);

function getClient() {
const { DATABASE_URL } = process.env;
invariant(typeof DATABASE_URL === "string", "DATABASE_URL env var not set");

const databaseUrl = new URL(DATABASE_URL);

// We need to add the connection_limit and pool_timeout query params to the url, in a way that works if the DATABASE_URL already has query params
const query = databaseUrl.searchParams;
query.set("connection_limit", env.DATABASE_CONNECTION_LIMIT.toString());
query.set("pool_timeout", env.DATABASE_POOL_TIMEOUT.toString());
databaseUrl.search = query.toString();

// Remove the username:password in the url and print that to the console
const urlWithoutCredentials = new URL(databaseUrl.href);
urlWithoutCredentials.password = "";
const databaseUrl = extendQueryParams(DATABASE_URL, {
connection_limit: env.DATABASE_CONNECTION_LIMIT.toString(),
pool_timeout: env.DATABASE_POOL_TIMEOUT.toString(),
});

console.log(`🔌 setting up prisma client to ${urlWithoutCredentials.toString()}`);
console.log(`🔌 setting up prisma client to ${redactUrlSecrets(databaseUrl.href)}`);

const client = new PrismaClient({
datasources: {
Expand Down Expand Up @@ -127,6 +126,67 @@ function getClient() {
return client;
}

function getReplicaClient() {
if (!env.DATABASE_READ_REPLICA_URL) {
return;
}

const replicaUrl = extendQueryParams(env.DATABASE_READ_REPLICA_URL, {
connection_limit: env.DATABASE_CONNECTION_LIMIT.toString(),
pool_timeout: env.DATABASE_POOL_TIMEOUT.toString(),
});

console.log(`🔌 setting up read replica connection to ${redactUrlSecrets(replicaUrl)}`);

const replicaClient = new PrismaClient({
datasources: {
db: {
url: replicaUrl.href,
},
},
log: [
{
emit: "stdout",
level: "error",
},
{
emit: "stdout",
level: "info",
},
{
emit: "stdout",
level: "warn",
},
],
});

// connect eagerly
replicaClient.$connect();

console.log(`🔌 read replica connected`);

return replicaClient;
}

function extendQueryParams(hrefOrUrl: string | URL, queryParams: Record<string, string>) {
const url = new URL(hrefOrUrl);
const query = url.searchParams;

for (const [key, val] of Object.entries(queryParams)) {
query.set(key, val);
}

url.search = query.toString();

return url;
}

function redactUrlSecrets(hrefOrUrl: string | URL) {
const url = new URL(hrefOrUrl);
url.password = "";
return url.href;
}

export type { PrismaClient } from "@trigger.dev/database";

export const PrismaErrorSchema = z.object({
Expand Down
6 changes: 6 additions & 0 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,13 @@ function logError(error: unknown, request?: Request) {
);
}
}

console.error(error);

if (error instanceof Error && error.message.startsWith("There are locked jobs present")) {
console.log("⚠️ graphile-worker migration issue detected!");
console.log("⚠️ see: https://trigger.dev/docs/documentation/guides/self-hosting/graphile-migration");
}
}

const sqsEventConsumer = singleton("sqsEventConsumer", getSharedSqsEventConsumer);
1 change: 1 addition & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { isValidRegex } from "./utils/regex";
const EnvironmentSchema = z.object({
NODE_ENV: z.union([z.literal("development"), z.literal("production"), z.literal("test")]),
DATABASE_URL: z.string(),
DATABASE_READ_REPLICA_URL: z.string().optional(),
DATABASE_CONNECTION_LIMIT: z.coerce.number().int().default(10),
DATABASE_POOL_TIMEOUT: z.coerce.number().int().default(60),
DIRECT_URL: z.string(),
Expand Down