Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Features: Batching and graphile-worker upgrade to 0.16.0-rc.1 #524

Open
wants to merge 102 commits into
base: main
Choose a base branch
from

Conversation

nicktrn
Copy link
Collaborator

@nicktrn nicktrn commented Oct 2, 2023

Closes #396
Closes #348

✅ Checklist

  • I have followed every step in the contributing guide
  • The PR title follows the convention.
  • I ran and tested the code works
  • More testing just to be safe
  • Create migration docs

Testing

Migration

  1. Create this docker-compose.yml
version: "3"

volumes:
  data:

networks:
  app_network:
    external: false

x-db-url: &db-url postgresql://postgres:postgres@db:5432/postgres

x-env: &env
  LOGIN_ORIGIN: http://localhost:3030
  APP_ORIGIN: http://localhost:3030
  PORT: 3030
  REMIX_APP_PORT: 3030
  MAGIC_LINK_SECRET: secret
  SESSION_SECRET: secret
  ENCRYPTION_KEY: ae13021afef0819c3a307ad487071c06
  DATABASE_URL: &db-url
  DIRECT_URL: &db-url
  GRACEFUL_SHUTDOWN_TIMEOUT: 5000
  NODE_ENV: production
  RUNTIME_PLATFORM: docker-compose
  TRIGGER_LOG_LEVEL: debug

services:
  old-webapp:
    image: ghcr.io/triggerdotdev/trigger.dev:v2.2.21
    restart: "no"
    environment:
      <<: *env
      WORKER_ENABLED: false
      EXECUTION_WORKER_ENABLED: false
      TASK_OPERATION_WORKER_ENABLED: false
    ports:
      - 3065:3030
    depends_on:
      - db
    networks:
      - app_network

  old-workers:
    image: ghcr.io/triggerdotdev/trigger.dev:v2.2.21
    restart: "no"
    environment:
      <<: *env
      HTTP_SERVER_DISABLED: true
    depends_on:
      - db
    networks:
      - app_network

  new-webapp:
    image: ghcr.io/triggerdotdev/trigger.dev:batching-rc.2
    restart: always
    environment:
      <<: *env
      WORKER_ENABLED: false
      EXECUTION_WORKER_ENABLED: false
      TASK_OPERATION_WORKER_ENABLED: false
    ports:
      - 3030:3030
    depends_on:
      - db
    networks:
      - app_network

  new-workers:
    image: ghcr.io/triggerdotdev/trigger.dev:batching-rc.2
    restart: always
    environment:
      <<: *env
      HTTP_SERVER_DISABLED: true
    depends_on:
      - db
    networks:
      - app_network
      
  db:
    image: postgres:14
    restart: always
    volumes:
      - data:/var/lib/postgresql/data/
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: postgres
    networks:
      - app_network
    ports:
      - 3085:5432
  1. Start up the DB
docker-compose up -d db
  1. Start up the old webapp and workers
docker-compose up old-webapp old-workers
  1. In another window, start up the new webapp and workers
docker-compose up new-webapp new-workers
  1. To delete the DB and start again, maybe after altering some env vars
docker-compose down -v

Batching

const maxPayloads = 10;
const maxInterval = 10;
client.defineJob({
id: "batch-trigger-receive",
name: "Batch Trigger Receive",
version: "1.0.0",
trigger: eventTrigger({
name: "batch.trigger",
batch: {
maxPayloads,
maxInterval,
},
}),
run: async (payload, io) => {
await io.logger.info(`Should at most receive ${maxPayloads} payloads per batch`);
await io.logger.info(`Should wait no more than ${maxInterval} seconds between batches`);
const totalPayloadSize = payload.reduce((sum, p) => sum + JSON.stringify(p).length, 0);
return `Received ${payload.length} payloads. Total size in bytes: ${totalPayloadSize}`;
},
});
const getLargeString = (bytes: number) => {
return Array(bytes).fill("F").join("");
};
client.defineJob({
id: "batch-trigger-send",
name: "Batch Trigger Send",
version: "1.0.0",
trigger: eventTrigger({
name: "batch.trigger.send",
schema: z.object({
payloads: z.number().default(12),
// expect batches to be chunked as they will exceed the server limit
oversized: z.boolean().default(false),
}),
}),
run: async (payload, io) => {
for (let i = 0; i < payload.payloads; i++) {
await io.sendEvent(`send-${i}`, {
name: "batch.trigger",
payload: {
count: i,
...(payload.oversized
? {
largePayload: getLargeString(250 * 1000), // 250KB
}
: undefined),
},
});
}
},
});


Changelog

  • Upgrade to v0.16.0-rc.1
  • Detect migration errors and point to docs
  • Add migration delay to allow graceful shutdown to complete (rolling deploy)
  • Insert custom add_batch_job function
  • Add batched event triggers and webhook delivery
  • Read replica support via optional DATABASE_READ_REPLICA_URL env var

Screenshots

graphile-migration

💯

@changeset-bot
Copy link

changeset-bot bot commented Oct 2, 2023

🦋 Changeset detected

Latest commit: f130fbd

The changes in this PR will be included in the next version bump.

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@nicktrn nicktrn marked this pull request as ready for review October 3, 2023 09:45
@nicktrn
Copy link
Collaborator Author

nicktrn commented Oct 4, 2023

0.15.0 was released yesterday. The only change being that migration files are now embedded as strings. Just in case of unforeseen bugs, we should stay on 0.14.0-rc.0 for now.

@nicktrn nicktrn marked this pull request as draft October 24, 2023 15:16
@nicktrn nicktrn marked this pull request as ready for review October 25, 2023 10:18
@@ -51,10 +60,11 @@ const AddJobResultsSchema = z.array(GraphileJobSchema);

export type ZodTasks<TConsumerSchema extends MessageCatalogSchema> = {
[K in keyof TConsumerSchema]: {
queueName?: string | ((payload: z.infer<TConsumerSchema[K]>) => string);
queueName?: string | ((payload: z.infer<TConsumerSchema[K]>, jobKey?: string) => string);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This helps decrease payload size for batch jobs

})),
runs: job.runs.map((r) => {
// For compatibility with old Job Runs where payload is only available on the related Event Record
const payload = r.payload !== null ? JSON.parse(r.payload) : r.event.payload;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll be able to simplify this in a lot of places once we migrate existing data and make JobRun.payload non-optional.

"events.invokeDispatcher": {
priority: 0, // smaller number = higher priority
maxAttempts: 6,
handler: async (payload, job) => {
const service = new InvokeDispatcherService();

await service.call(payload.id, payload.eventRecordId);
await service.call(payload.id, [payload.eventRecordId]);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Takes an array now, but only batches if the dispatcher has batching enabled.

@nicktrn nicktrn marked this pull request as ready for review December 7, 2023 10:17
id,
name: execution.event.name,
context: parsedContext[i],
timestamp: execution.event.timestamp,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cheating here - timestamp comes from the first event. Timestamps would either be another thing to attach to JobRun, get for each execution, or remove.

Alternatively, we could create many-to-many relations between JobRun and EventRecord. This would be a big change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
1 participant