Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(worker): add discardTtl option #1653

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
9 changes: 8 additions & 1 deletion src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ export interface WorkerListener<

const RATE_LIMIT_ERROR = 'bullmq:rateLimitExceeded';

const DISCARD_TTL_ERROR = 'bullmq:discardTtlExceeded';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we will need to replace this constants by an enum soon.


/**
*
* This class represents a worker that is able to process jobs from the queue.
Expand Down Expand Up @@ -527,7 +529,6 @@ export class Worker<
? Math.ceil(blockTimeout)
: blockTimeout;

const now = Date.now();
const jobId = await client.brpoplpush(
this.keys.wait,
this.keys.active,
Expand Down Expand Up @@ -669,6 +670,12 @@ export class Worker<
lockExtender();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, seems a bit counterintuitive that we need to extend the lock if we are going to fail the job, but I guess this is necessary to avoid the risk of not having a lock when handling the failed job, which requires a valid lock.


try {
if (this.opts.discardTtl) {
if (this.opts.discardTtl < Date.now() - job.timestamp) {
return handleFailed(new Error(DISCARD_TTL_ERROR));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something hit me about this. Are we really sure we want to fail the job here? I am thinking that discarded jobs would probably just pollute the failed set where there could be legitimate jobs that failed for some reason that needs to be debugged. Maybe it is better to just remove the job while sending an event for it.

}
}

const result = await this.callProcessJob(job, token);
return await handleCompleted(result);
} catch (err) {
Expand Down
11 changes: 10 additions & 1 deletion src/interfaces/worker-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,16 @@ export interface WorkerOptions extends QueueBaseOptions {
lockDuration?: number;
lockRenewTime?: number;
runRetryDelay?: number;
settings?: AdvancedOptions; // FIXME only backoffStrategies is used

/**
* Settings for advanced options.
*/
settings?: AdvancedOptions;

/**
* Time for discard jobs without processing them.
*/
discardTtl?: number;
}

export interface GetNextJobOptions {
Expand Down
47 changes: 47 additions & 0 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,53 @@ describe('workers', function () {
});
});

describe('when discardTtl is provided', function () {
it('moves jobs to failed when time is greater than discardTtl', async () => {
const maxJobs = 5;

let processor;
const processing = new Promise<void>(resolve => {
processor = async () => {
await delay(200);
resolve();
};
});

const worker = new Worker(queueName, processor, {
connection,
discardTtl: 100,
});
await worker.waitUntilReady();

const jobs = Array.from(Array(maxJobs).keys()).map(index => ({
name: 'test',
data: { index },
}));

await queue.addBulk(jobs);

const failing = new Promise<void>(resolve => {
worker.on(
'failed',
after(4, (job: Job, error) => {
expect(error.name).to.be.eql('Error');
expect(error.message).to.be.eql('bullmq:discardTtlExceeded');
expect(job.failedReason).to.be.eql('bullmq:discardTtlExceeded');
resolve();
}),
);
});

worker.run();

await processing;

await failing;

await worker.close();
});
});

it('process a job that updates progress as number', async () => {
let processor;

Expand Down