-
Notifications
You must be signed in to change notification settings - Fork 422
/
createBackgroundWorker.server.ts
220 lines (203 loc) · 6.74 KB
/
createBackgroundWorker.server.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
import { CreateBackgroundWorkerRequestBody, TaskResource } from "@trigger.dev/core/v3";
import type { BackgroundWorker } from "@trigger.dev/database";
import { Prisma, PrismaClientOrTransaction } from "~/db.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server";
import { generateFriendlyId } from "../friendlyIdentifiers";
import { calculateNextBuildVersion } from "../utils/calculateNextBuildVersion";
import { BaseService } from "./baseService.server";
import { projectPubSub } from "./projectPubSub.server";
export class CreateBackgroundWorkerService extends BaseService {
public async call(
projectRef: string,
environment: AuthenticatedEnvironment,
body: CreateBackgroundWorkerRequestBody
): Promise<BackgroundWorker> {
return this.traceWithEnv("call", environment, async (span) => {
span.setAttribute("projectRef", projectRef);
const project = await this._prisma.project.findUniqueOrThrow({
where: {
externalRef: projectRef,
environments: {
some: {
id: environment.id,
},
},
},
include: {
backgroundWorkers: {
where: {
runtimeEnvironmentId: environment.id,
},
orderBy: {
createdAt: "desc",
},
take: 1,
},
},
});
const latestBackgroundWorker = project.backgroundWorkers[0];
if (latestBackgroundWorker?.contentHash === body.metadata.contentHash) {
return latestBackgroundWorker;
}
const nextVersion = calculateNextBuildVersion(project.backgroundWorkers[0]?.version);
logger.debug(`Creating background worker`, {
nextVersion,
lastVersion: project.backgroundWorkers[0]?.version,
});
const backgroundWorker = await this._prisma.backgroundWorker.create({
data: {
friendlyId: generateFriendlyId("worker"),
version: nextVersion,
runtimeEnvironmentId: environment.id,
projectId: project.id,
metadata: body.metadata,
contentHash: body.metadata.contentHash,
cliVersion: body.metadata.cliPackageVersion,
sdkVersion: body.metadata.packageVersion,
supportsLazyAttempts: body.supportsLazyAttempts,
},
});
await createBackgroundTasks(body.metadata.tasks, backgroundWorker, environment, this._prisma);
try {
//send a notification that a new worker has been created
await projectPubSub.publish(
`project:${project.id}:env:${environment.id}`,
"WORKER_CREATED",
{
environmentId: environment.id,
environmentType: environment.type,
createdAt: backgroundWorker.createdAt,
taskCount: body.metadata.tasks.length,
type: "local",
}
);
await marqs?.updateEnvConcurrencyLimits(environment);
} catch (err) {
logger.error(
"Error publishing WORKER_CREATED event or updating global concurrency limits",
{
error:
err instanceof Error
? {
name: err.name,
message: err.message,
stack: err.stack,
}
: err,
project,
environment,
backgroundWorker,
}
);
}
return backgroundWorker;
});
}
}
export async function createBackgroundTasks(
tasks: TaskResource[],
worker: BackgroundWorker,
environment: AuthenticatedEnvironment,
prisma: PrismaClientOrTransaction
) {
for (const task of tasks) {
try {
await prisma.backgroundWorkerTask.create({
data: {
friendlyId: generateFriendlyId("task"),
projectId: worker.projectId,
runtimeEnvironmentId: worker.runtimeEnvironmentId,
workerId: worker.id,
slug: task.id,
filePath: task.filePath,
exportName: task.exportName,
retryConfig: task.retry,
queueConfig: task.queue,
machineConfig: task.machine,
triggerSource: task.triggerSource === "schedule" ? "SCHEDULED" : "STANDARD",
},
});
let queueName = sanitizeQueueName(task.queue?.name ?? `task/${task.id}`);
// Check that the queuename is not an empty string
if (!queueName) {
queueName = sanitizeQueueName(`task/${task.id}`);
}
const concurrencyLimit =
typeof task.queue?.concurrencyLimit === "number"
? Math.max(
Math.min(
task.queue.concurrencyLimit,
environment.maximumConcurrencyLimit,
environment.organization.maximumConcurrencyLimit
),
0
)
: null;
const taskQueue = await prisma.taskQueue.upsert({
where: {
runtimeEnvironmentId_name: {
runtimeEnvironmentId: worker.runtimeEnvironmentId,
name: queueName,
},
},
update: {
concurrencyLimit,
rateLimit: task.queue?.rateLimit,
},
create: {
friendlyId: generateFriendlyId("queue"),
name: queueName,
concurrencyLimit,
runtimeEnvironmentId: worker.runtimeEnvironmentId,
projectId: worker.projectId,
rateLimit: task.queue?.rateLimit,
type: task.queue?.name ? "NAMED" : "VIRTUAL",
},
});
if (typeof taskQueue.concurrencyLimit === "number") {
await marqs?.updateQueueConcurrencyLimits(
environment,
taskQueue.name,
taskQueue.concurrencyLimit
);
}
} catch (error) {
if (error instanceof Prisma.PrismaClientKnownRequestError) {
// The error code for unique constraint violation in Prisma is P2002
if (error.code === "P2002") {
logger.warn("Task already exists", {
task,
worker,
});
} else {
logger.error("Prisma Error creating background worker task", {
error: {
code: error.code,
message: error.message,
},
task,
worker,
});
}
} else if (error instanceof Error) {
logger.error("Error creating background worker task", {
error: {
name: error.name,
message: error.message,
stack: error.stack,
},
task,
worker,
});
} else {
logger.error("Unknown error creating background worker task", {
error,
task,
worker,
});
}
}
}
}