Skip to content

Commit

Permalink
v2: Graphile auto-cleanup and auto-endpoint disabling (#1103)
Browse files Browse the repository at this point in the history
* Auto-cleanup failed graphile jobs instead of keeping them around

* Disable endpoint after a period of sequential indexing failures

* Remove log
  • Loading branch information
ericallam committed May 15, 2024
1 parent dc53f0f commit 2496917
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 40 deletions.
2 changes: 2 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ const EnvironmentSchema = z.object({

ORG_SLACK_INTEGRATION_CLIENT_ID: z.string().optional(),
ORG_SLACK_INTEGRATION_CLIENT_SECRET: z.string().optional(),

MAX_SEQUENTIAL_INDEX_FAILURE_COUNT: z.coerce.number().default(96),
});

export type Environment = z.infer<typeof EnvironmentSchema>;
Expand Down
11 changes: 11 additions & 0 deletions apps/webapp/app/platform/zodWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,17 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
span.recordException(new Error(String(error)));
}

if (job.attempts >= job.max_attempts) {
logger.error("Job failed after max attempts", {
job,
attempts: job.attempts,
max_attempts: job.max_attempts,
error: error instanceof Error ? error.message : error,
});

return;
}

throw error;
} finally {
span.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ export class TriggerEndpointIndexHookService {
},
});

if (!endpoint) {
if (!endpoint || !endpoint.url) {
throw new Error("Endpoint not found");
}

Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/services/apiRateLimit.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ export const apiRateLimiter = authorizationRateLimitMiddleware({
pathMatchers: [/^\/api/],
// Allow /api/v1/tasks/:id/callback/:secret
pathWhiteList: [
"/api/internal/stripe_webhooks",
"/api/v1/authorization-code",
"/api/v1/token",
/^\/api\/v1\/tasks\/[^\/]+\/callback\/[^\/]+$/, // /api/v1/tasks/$id/callback/$secret
Expand Down
154 changes: 124 additions & 30 deletions apps/webapp/app/services/endpoints/performEndpointIndexService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import { IndexEndpointStats } from "@trigger.dev/core";
import { RegisterHttpEndpointService } from "../triggers/registerHttpEndpoint.server";
import { RegisterWebhookService } from "../triggers/registerWebhook.server";
import { EndpointIndex } from "@trigger.dev/database";
import { env } from "~/env.server";

const MAX_SEQUENTIAL_FAILURE_COUNT = env.MAX_SEQUENTIAL_INDEX_FAILURE_COUNT;

export class PerformEndpointIndexService {
#prismaClient: PrismaClient;
Expand Down Expand Up @@ -56,9 +59,16 @@ export class PerformEndpointIndexService {

if (!endpointIndex.endpoint.url) {
logger.debug("Endpoint URL is not set", endpointIndex);
return updateEndpointIndexWithError(this.#prismaClient, id, {
message: "Endpoint URL is not set",
});

return updateEndpointIndexWithError(
this.#prismaClient,
id,
endpointIndex.endpoint.id,
{
message: "Endpoint URL is not set",
},
false
);
}

// Make a request to the endpoint to fetch a list of jobs
Expand All @@ -69,9 +79,15 @@ export class PerformEndpointIndexService {
const { response, parser, headerParser, errorParser } = await client.indexEndpoint();

if (!response) {
return updateEndpointIndexWithError(this.#prismaClient, id, {
message: `Could not connect to endpoint ${endpointIndex.endpoint.url}`,
});
return updateEndpointIndexWithError(
this.#prismaClient,
id,
endpointIndex.endpoint.id,
{
message: `Could not connect to endpoint ${endpointIndex.endpoint.url}`,
},
endpointIndex.endpoint.environment.type !== "DEVELOPMENT"
);
}

if (isRedirect(response.status)) {
Expand All @@ -83,15 +99,27 @@ export class PerformEndpointIndexService {
const location = response.headers.get("location");

if (!location) {
return updateEndpointIndexWithError(this.#prismaClient, id, {
message: `Endpoint ${endpointIndex.endpoint.url} is redirecting but no location header is present`,
});
return updateEndpointIndexWithError(
this.#prismaClient,
id,
endpointIndex.endpoint.id,
{
message: `Endpoint ${endpointIndex.endpoint.url} is redirecting but no location header is present`,
},
endpointIndex.endpoint.environment.type !== "DEVELOPMENT"
);
}

if (redirectCount > 5) {
return updateEndpointIndexWithError(this.#prismaClient, id, {
message: `Endpoint ${endpointIndex.endpoint.url} is redirecting too many times`,
});
return updateEndpointIndexWithError(
this.#prismaClient,
id,
endpointIndex.endpoint.id,
{
message: `Endpoint ${endpointIndex.endpoint.url} is redirecting too many times`,
},
endpointIndex.endpoint.environment.type !== "DEVELOPMENT"
);
}

await this.#prismaClient.endpoint.update({
Expand All @@ -111,20 +139,38 @@ export class PerformEndpointIndexService {
const body = await safeBodyFromResponse(response, errorParser);

if (body) {
return updateEndpointIndexWithError(this.#prismaClient, id, {
message: body.message,
});
return updateEndpointIndexWithError(
this.#prismaClient,
id,
endpointIndex.endpoint.id,
{
message: body.message,
},
endpointIndex.endpoint.environment.type !== "DEVELOPMENT"
);
}

return updateEndpointIndexWithError(this.#prismaClient, id, {
message: "Trigger API key is invalid",
});
return updateEndpointIndexWithError(
this.#prismaClient,
id,
endpointIndex.endpoint.id,
{
message: "Trigger API key is invalid",
},
endpointIndex.endpoint.environment.type !== "DEVELOPMENT"
);
}

if (!response.ok) {
return updateEndpointIndexWithError(this.#prismaClient, id, {
message: `Could not connect to endpoint ${endpointIndex.endpoint.url}. Status code: ${response.status}`,
});
return updateEndpointIndexWithError(
this.#prismaClient,
id,
endpointIndex.endpoint.id,
{
message: `Could not connect to endpoint ${endpointIndex.endpoint.url}. Status code: ${response.status}`,
},
endpointIndex.endpoint.environment.type !== "DEVELOPMENT"
);
}

const anyBody = await response.json();
Expand Down Expand Up @@ -152,21 +198,33 @@ export class PerformEndpointIndexService {
}).message;
}

return updateEndpointIndexWithError(this.#prismaClient, id, {
message: friendlyError,
raw: fromZodError(bodyResult.error).message,
});
return updateEndpointIndexWithError(
this.#prismaClient,
id,
endpointIndex.endpoint.id,
{
message: friendlyError,
raw: fromZodError(bodyResult.error).message,
},
endpointIndex.endpoint.environment.type !== "DEVELOPMENT"
);
}

const headerResult = headerParser.safeParse(Object.fromEntries(response.headers.entries()));
if (!headerResult.success) {
const friendlyError = fromZodError(headerResult.error, {
prefix: "Your headers are invalid",
});
return updateEndpointIndexWithError(this.#prismaClient, id, {
message: friendlyError.message,
raw: headerResult.error.issues,
});
return updateEndpointIndexWithError(
this.#prismaClient,
id,
endpointIndex.endpoint.id,
{
message: friendlyError.message,
raw: headerResult.error.issues,
},
endpointIndex.endpoint.environment.type !== "DEVELOPMENT"
);
}

const { jobs, sources, dynamicTriggers, dynamicSchedules, httpEndpoints, webhooks } =
Expand Down Expand Up @@ -407,8 +465,44 @@ export class PerformEndpointIndexService {
async function updateEndpointIndexWithError(
prismaClient: PrismaClient,
id: string,
error: EndpointIndexError
endpointId: string,
error: EndpointIndexError,
checkDisabling = true
) {
// Check here to see if this endpoint has only failed for the last 50 times
// And if so, we disable the endpoint by setting the url to null
if (checkDisabling) {
const recentIndexes = await prismaClient.endpointIndex.findMany({
where: {
endpointId,
id: {
not: id,
},
},
orderBy: {
createdAt: "desc",
},
take: MAX_SEQUENTIAL_FAILURE_COUNT - 1,
select: {
status: true,
},
});

if (
recentIndexes.length === MAX_SEQUENTIAL_FAILURE_COUNT - 1 &&
recentIndexes.every((index) => index.status === "FAILURE")
) {
await prismaClient.endpoint.update({
where: {
id: endpointId,
},
data: {
url: null,
},
});
}
}

return await prismaClient.endpointIndex.update({
where: {
id,
Expand Down
7 changes: 4 additions & 3 deletions apps/webapp/app/services/sources/handleHttpSource.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ export class HandleHttpSourceService {
return { status: 200 };
}

if (!triggerSource.endpoint.url) {
return { status: 404 };
}

if (!triggerSource.organization.runsEnabled) {
logger.debug("HandleHttpSourceService: Runs are disabled for this organization", {
organizationId: triggerSource.organization.id,
});
return { status: 404 };
}

Expand Down
5 changes: 0 additions & 5 deletions apps/webapp/app/services/worker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,6 @@ function getWorkerQueue() {
return new ZodWorker({
name: "workerQueue",
prisma,
cleanup: {
frequencyExpression: "13,27,43 * * * *",
ttl: env.WORKER_CLEANUP_TTL_DAYS * 24 * 60 * 60 * 1000, // X days
maxCount: 1000,
},
runnerOptions: {
connectionString: env.DATABASE_URL,
concurrency: env.WORKER_CONCURRENCY,
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/tracer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ function getTracer() {
if (env.INTERNAL_OTEL_TRACE_EXPORTER_URL) {
const exporter = new OTLPTraceExporter({
url: env.INTERNAL_OTEL_TRACE_EXPORTER_URL,
timeoutMillis: 1000,
timeoutMillis: 10_000,
headers:
env.INTERNAL_OTEL_TRACE_EXPORTER_AUTH_HEADER_NAME &&
env.INTERNAL_OTEL_TRACE_EXPORTER_AUTH_HEADER_VALUE
Expand Down

0 comments on commit 2496917

Please sign in to comment.