Skip to content

Commit

Permalink
test(worker): improve getBlockTimeout tests (#2535)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Apr 21, 2024
1 parent f65eb4b commit d6066f4
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 21 deletions.
20 changes: 8 additions & 12 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ import {

// 10 seconds is the maximum time a BRPOPLPUSH can block.
const maximumBlockTimeout = 10;
/* 1 millisecond is chosen because the granularity of our timestamps are milliseconds.
Obviously we can still process much faster than 1 job per millisecond but delays and rate limits
will never work with more accuracy than 1ms. */
const minimumBlockTimeout = 0.001;

// note: sandboxed processors would also like to define concurrency per process
// for better resource utilization.
Expand Down Expand Up @@ -594,7 +590,10 @@ export class Worker<

get minimumBlockTimeout(): number {
return this.blockingConnection.capabilities.canBlockFor1Ms
? minimumBlockTimeout
? /* 1 millisecond is chosen because the granularity of our timestamps are milliseconds.
Obviously we can still process much faster than 1 job per millisecond but delays and rate limits
will never work with more accuracy than 1ms. */
0.001
: 0.002;
}

Expand Down Expand Up @@ -657,19 +656,16 @@ export class Worker<

// when there are delayed jobs
if (blockUntil) {
let blockTimeout;
const blockDelay = blockUntil - Date.now();
// when we reach the time to get new jobs
if (blockDelay < this.minimumBlockTimeout * 1000) {
return this.minimumBlockTimeout;
} else {
blockTimeout = blockDelay / 1000;
// We restrict the maximum block timeout to 10 second to avoid
// blocking the connection for too long in the case of reconnections
// reference: https://github.com/taskforcesh/bullmq/issues/1658
return Math.min(blockDelay / 1000, maximumBlockTimeout);
}

// We restrict the maximum block timeout to 10 second to avoid
// blocking the connection for too long in the case of reconnections
// reference: https://github.com/taskforcesh/bullmq/issues/1658
return Math.min(blockTimeout, maximumBlockTimeout);
} else {
return Math.max(opts.drainDelay, this.minimumBlockTimeout);
}
Expand Down
40 changes: 31 additions & 9 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ describe('workers', function () {
});

expect(worker['getBlockTimeout'](0)).to.be.equal(5);
worker.close();
await worker.close();
});
});

Expand All @@ -935,9 +935,14 @@ describe('workers', function () {
prefix,
autorun: false,
});
await worker.waitUntilReady();

expect(worker['getBlockTimeout'](0)).to.be.equal(0.001);
worker.close();
if (isRedisVersionLowerThan(worker.redisVersion, '7.0.8')) {
expect(worker['getBlockTimeout'](0)).to.be.equal(0.002);
} else {
expect(worker['getBlockTimeout'](0)).to.be.equal(0.001);
}
await worker.close();
});
});
});
Expand All @@ -950,9 +955,18 @@ describe('workers', function () {
prefix,
autorun: false,
});
await worker.waitUntilReady();

expect(worker['getBlockTimeout'](Date.now() - 1)).to.be.equal(0.001);
worker.close();
if (isRedisVersionLowerThan(worker.redisVersion, '7.0.8')) {
expect(worker['getBlockTimeout'](Date.now() - 1)).to.be.equal(
0.002,
);
} else {
expect(worker['getBlockTimeout'](Date.now() - 1)).to.be.equal(
0.001,
);
}
await worker.close();
});
});

Expand All @@ -963,11 +977,19 @@ describe('workers', function () {
prefix,
autorun: false,
});
await worker.waitUntilReady();

expect(worker['getBlockTimeout'](Date.now() + 100)).to.be.greaterThan(
0.001,
);
worker.close();
if (isRedisVersionLowerThan(worker.redisVersion, '7.0.8')) {
expect(
worker['getBlockTimeout'](Date.now() + 100),
).to.be.greaterThan(0.002);
} else {
expect(
worker['getBlockTimeout'](Date.now() + 100),
).to.be.greaterThan(0.001);
}

await worker.close();
});
});
});
Expand Down

0 comments on commit d6066f4

Please sign in to comment.