Skip to content

Commit

Permalink
perf(worker): do not call bzpopmin when blockDelay is lower or equal 0 (
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Apr 27, 2024
1 parent 723457f commit 9760b85
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 22 deletions.
28 changes: 16 additions & 12 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -621,20 +621,22 @@ will never work with more accuracy than 1ms. */
if (!this.closing) {
let blockTimeout = this.getBlockTimeout(blockUntil);

blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout
? blockTimeout
: Math.ceil(blockTimeout);
if (blockTimeout > 0) {
blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout
? blockTimeout
: Math.ceil(blockTimeout);

this.updateDelays(); // reset delays to avoid reusing same values in next iteration
// Markers should only be used for un-blocking, so we will handle them in this
// function only.
const result = await bclient.bzpopmin(this.keys.marker, blockTimeout);
this.updateDelays(); // reset delays to avoid reusing same values in next iteration
// Markers should only be used for un-blocking, so we will handle them in this
// function only.
const result = await bclient.bzpopmin(this.keys.marker, blockTimeout);

if (result) {
const [_key, member, score] = result;
if (result) {
const [_key, member, score] = result;

if (member) {
return parseInt(score);
if (member) {
return parseInt(score);
}
}
}

Expand All @@ -658,7 +660,9 @@ will never work with more accuracy than 1ms. */
if (blockUntil) {
const blockDelay = blockUntil - Date.now();
// when we reach the time to get new jobs
if (blockDelay < this.minimumBlockTimeout * 1000) {
if (blockDelay <= 0) {
return blockDelay;
} else if (blockDelay < this.minimumBlockTimeout * 1000) {
return this.minimumBlockTimeout;
} else {
// We restrict the maximum block timeout to 10 second to avoid
Expand Down
14 changes: 4 additions & 10 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -949,23 +949,17 @@ describe('workers', function () {

describe('when blockUntil is greater than 0', () => {
describe('when blockUntil is lower than date now value', () => {
it('returns minimumBlockTimeout', async () => {
it('returns blockDelay value lower or equal 0', async () => {
const worker = new Worker(queueName, async () => {}, {
connection,
prefix,
autorun: false,
});
await worker.waitUntilReady();

if (isRedisVersionLowerThan(worker.redisVersion, '7.0.8')) {
expect(worker['getBlockTimeout'](Date.now() - 1)).to.be.equal(
0.002,
);
} else {
expect(worker['getBlockTimeout'](Date.now() - 1)).to.be.equal(
0.001,
);
}
expect(
worker['getBlockTimeout'](Date.now() - 1),
).to.be.lessThanOrEqual(0);
await worker.close();
});
});
Expand Down

0 comments on commit 9760b85

Please sign in to comment.