Skip to content

Commit

Permalink
refactor: add setGlobalConcurrency method
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Apr 7, 2024
1 parent 5e521e7 commit dccea30
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 21 deletions.
15 changes: 12 additions & 3 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,6 @@ export class Queue<
get metaValues(): Record<string, string | number> {
return {
'opts.maxLenEvents': this.opts?.streams?.events?.maxLen ?? 10000,
...(this.opts?.concurrency
? { concurrency: this.opts?.concurrency }
: {}),
};
}

Expand All @@ -186,6 +183,18 @@ export class Queue<
});
}

/**
* Enable and set global concurrency value.
* @param concurrency - Maximum number of simultaneous jobs that the workers can handle.
* For instance, setting this value to 1 ensures that no more than one job
* is processed at any given time. If this limit is not defined, there will be no
* restriction on the number of concurrent jobs.
*/
async setGlobalConcurrency(concurrency: number) {
const client = await this.client;
return client.hset(this.keys.meta, 'concurrency', concurrency);
}

/**
* Adds a new job to the queue.
*
Expand Down
8 changes: 0 additions & 8 deletions src/interfaces/queue-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,6 @@ export interface QueueOptions extends QueueBaseOptions {
};
};

/**
* Maximum number of simultaneous jobs that the workers can handle.
* For instance, setting this value to 1 ensures that no more than one job
* is processed at any given time. If this limit is not defined, there will be no
* restriction on the number of concurrent jobs.
*/
concurrency?: number;

settings?: AdvancedRepeatOptions;
}

Expand Down
23 changes: 13 additions & 10 deletions tests/test_concurrency.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ describe('Concurrency', () => {
});

it('should run max concurrency for jobs added', async () => {
const queue = new Queue(queueName, { connection, concurrency: 1, prefix });
const queue = new Queue(queueName, { connection, prefix });
const numJobs = 15;
const jobsData: { name: string; data: any }[] = [];
for (let j = 0; j < numJobs; j++) {
Expand All @@ -42,7 +42,7 @@ describe('Concurrency', () => {
}

await queue.addBulk(jobsData);

await queue.setGlobalConcurrency(1);
const bar = new ProgressBar(':bar', { total: numJobs });

let count = 0;
Expand Down Expand Up @@ -93,7 +93,7 @@ describe('Concurrency', () => {
}).timeout(16000);

it('emits drained global event only once when worker is idle', async function () {
const queue = new Queue(queueName, { connection, concurrency: 1, prefix });
const queue = new Queue(queueName, { connection, prefix });
const worker = new Worker(
queueName,
async () => {
Expand All @@ -119,6 +119,7 @@ describe('Concurrency', () => {
{ name: 'test', data: { foo: 'bar' } },
{ name: 'test', data: { foo: 'baz' } },
]);
await queue.setGlobalConcurrency(1);

await delay(4000);

Expand All @@ -138,11 +139,11 @@ describe('Concurrency', () => {

const queue = new Queue(queueName, {
connection,
concurrency: 1,
prefix,
});
const queueEvents = new QueueEvents(queueName, { connection, prefix });
await queueEvents.waitUntilReady();
await queue.setGlobalConcurrency(1);

const worker = new Worker(
queueName,
Expand Down Expand Up @@ -222,11 +223,11 @@ describe('Concurrency', () => {

const queue = new Queue(queueName, {
connection,
concurrency: 1,
prefix,
});
const queueEvents = new QueueEvents(queueName, { connection, prefix });
await queueEvents.waitUntilReady();
await queue.setGlobalConcurrency(1);

const worker = new Worker(
queueName,
Expand Down Expand Up @@ -304,7 +305,6 @@ describe('Concurrency', () => {
const flow = new FlowProducer({ connection, prefix });
const queue = new Queue(queueName, {
connection,
concurrency: 1,
prefix,
});

Expand All @@ -317,6 +317,8 @@ describe('Concurrency', () => {
}

await queue.addBulk(jobsData);
await queue.setGlobalConcurrency(1);

const name = 'child-job';

await flow.add({
Expand Down Expand Up @@ -394,13 +396,13 @@ describe('Concurrency', () => {
const globalConcurrency = 2;
const queue = new Queue(queueName, {
connection,
concurrency: globalConcurrency,
prefix,
});

for (let j = 0; j < numJobs; j++) {
await queue.add('test-stalled', { foo: j % 2 });
}
await queue.setGlobalConcurrency(globalConcurrency);

const concurrency = 4;

Expand Down Expand Up @@ -486,7 +488,6 @@ describe('Concurrency', () => {
const globalConcurrency = 1;
const queue = new Queue(queueName, {
connection,
concurrency: globalConcurrency,
prefix,
});

Expand All @@ -497,6 +498,7 @@ describe('Concurrency', () => {
{ attempts: 2, backoff: 100 },
);
}
await queue.setGlobalConcurrency(globalConcurrency);

const concurrency = 10;

Expand Down Expand Up @@ -538,7 +540,6 @@ describe('Concurrency', () => {
const globalConcurrency = 1;
const queue = new Queue(queueName, {
connection,
concurrency: globalConcurrency,
prefix,
});

Expand All @@ -549,6 +550,7 @@ describe('Concurrency', () => {
{ attempts: 2, backoff: 0 },
);
}
await queue.setGlobalConcurrency(globalConcurrency);

const concurrency = 4;

Expand Down Expand Up @@ -591,9 +593,10 @@ describe('Concurrency', () => {
const globalConcurrency = 1;
const queue = new Queue(queueName, {
connection,
concurrency: globalConcurrency,
prefix,
});
await queue.waitUntilReady();
await queue.setGlobalConcurrency(globalConcurrency);
const worker = new Worker(queueName, null, {
connection,
lockRenewTime: 200,
Expand Down

0 comments on commit dccea30

Please sign in to comment.