Skip to content

Commit

Permalink
Fix issues with special characters in queue/task names causing runs t…
Browse files Browse the repository at this point in the history
…o get stuck in queued
  • Loading branch information
ericallam committed May 16, 2024
1 parent ba61bfe commit c9733f3
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 5 deletions.
14 changes: 12 additions & 2 deletions apps/webapp/app/v3/marqs/devQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { prisma } from "~/db.server";
import { createNewSession, disconnectSession } from "~/models/runtimeEnvironment.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { marqs } from "~/v3/marqs/index.server";
import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server";
import { EnvironmentVariablesRepository } from "../environmentVariables/environmentVariablesRepository.server";
import { generateFriendlyId } from "../friendlyIdentifiers";
import { CancelAttemptService } from "../services/cancelAttempt.server";
Expand Down Expand Up @@ -452,11 +452,21 @@ export class DevQueueConsumer {

const queue = await prisma.taskQueue.findUnique({
where: {
runtimeEnvironmentId_name: { runtimeEnvironmentId: this.env.id, name: lockedTaskRun.queue },
runtimeEnvironmentId_name: {
runtimeEnvironmentId: this.env.id,
name: sanitizeQueueName(lockedTaskRun.queue),
},
},
});

if (!queue) {
logger.debug("[DevQueueConsumer] Failed to find queue", {
queueName: lockedTaskRun.queue,
sanitizedName: sanitizeQueueName(lockedTaskRun.queue),
taskRun: lockedTaskRun.id,
messageId: message.messageId,
});

await marqs?.nackMessage(message.messageId);
setTimeout(() => this.#doWork(), 1000);
return;
Expand Down
11 changes: 8 additions & 3 deletions apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { z } from "zod";
import { prisma } from "~/db.server";
import { logger } from "~/services/logger.server";
import { singleton } from "~/utils/singleton";
import { marqs } from "~/v3/marqs/index.server";
import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server";
import { EnvironmentVariablesRepository } from "../environmentVariables/environmentVariablesRepository.server";
import { generateFriendlyId } from "../friendlyIdentifiers";
import { socketIo } from "../handleSocketIo.server";
Expand Down Expand Up @@ -408,7 +408,7 @@ export class SharedQueueConsumer {
where: {
runtimeEnvironmentId_name: {
runtimeEnvironmentId: lockedTaskRun.runtimeEnvironmentId,
name: lockedTaskRun.queue,
name: sanitizeQueueName(lockedTaskRun.queue),
},
},
});
Expand Down Expand Up @@ -635,12 +635,17 @@ export class SharedQueueConsumer {
where: {
runtimeEnvironmentId_name: {
runtimeEnvironmentId: resumableAttempt.runtimeEnvironmentId,
name: resumableRun.queue,
name: sanitizeQueueName(resumableRun.queue),
},
},
});

if (!queue) {
logger.debug("SharedQueueConsumer queue not found, so nacking message", {
queueName: sanitizeQueueName(resumableRun.queue),
attempt: resumableAttempt,
});

await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval);
return;
}
Expand Down
9 changes: 9 additions & 0 deletions references/v3-catalog/src/trigger/simple.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ export const simplestTask = task({
},
});

export const taskWithSpecialCharacters = task({
id: "admin:special-characters",
run: async (payload: { url: string }) => {
return {
message: "This task has special characters in its ID",
};
},
});

export const createJsonHeroDoc = task({
id: "create-jsonhero-doc",
run: async (payload: { title: string; content: any }, { ctx }) => {
Expand Down

0 comments on commit c9733f3

Please sign in to comment.