Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(queue): add global concurrency #2496

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
"nyc": "^15.1.0",
"prettier": "^2.7.1",
"pretty-quick": "^3.1.3",
"progress": "^2.0.3",
"rimraf": "^3.0.2",
"rrule": "^2.6.9",
"semantic-release": "^19.0.3",
Expand Down
8 changes: 5 additions & 3 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"moveToActive": self.redisClient.register_script(self.getScript("moveToActive-11.lua")),
"moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-8.lua")),
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-14.lua")),
"moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-5.lua")),
"moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-7.lua")),
"obliterate": self.redisClient.register_script(self.getScript("obliterate-2.lua")),
"pause": self.redisClient.register_script(self.getScript("pause-7.lua")),
"promote": self.redisClient.register_script(self.getScript("promote-8.lua")),
"removeJob": self.redisClient.register_script(self.getScript("removeJob-1.lua")),
"removeJob": self.redisClient.register_script(self.getScript("removeJob-2.lua")),
"reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-7.lua")),
"retryJob": self.redisClient.register_script(self.getScript("retryJob-11.lua")),
"moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-7.lua")),
Expand Down Expand Up @@ -172,6 +172,8 @@ def moveToWaitingChildrenArgs(self, job_id, token, opts: dict = {}):
self.keys['active'],
self.keys['waiting-children'],
self.toKey(job_id),
self.keys[''],
self.keys['meta'],
self.keys['stalled']]
child_key = opts.get("child") if opts else None
args = [token, get_parent_key(child_key) or "", round(time.time() * 1000), job_id,
Expand Down Expand Up @@ -310,7 +312,7 @@ async def promote(self, job_id: str):
return None

def remove(self, job_id: str, remove_children: bool):
keys = self.getKeys([''])
keys = self.getKeys(['', 'meta'])
args = [job_id, 1 if remove_children else 0]

return self.commands["removeJob"](keys=keys, args=args)
Expand Down
21 changes: 21 additions & 0 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,18 @@ export class Queue<
});
}

/**
* Enable and set global concurrency value.
* @param concurrency - Maximum number of simultaneous jobs that the workers can handle.
* For instance, setting this value to 1 ensures that no more than one job
* is processed at any given time. If this limit is not defined, there will be no
* restriction on the number of concurrent jobs.
*/
async setGlobalConcurrency(concurrency: number) {
const client = await this.client;
return client.hset(this.keys.meta, 'concurrency', concurrency);
}

/**
* Adds a new job to the queue.
*
Expand Down Expand Up @@ -301,6 +313,15 @@ export class Queue<
return pausedKeyExists === 1;
}

/**
* Returns true if the queue is currently maxed.
*/
async isMaxed(): Promise<boolean> {
const client = await this.client;
const maxed = await client.hexists(this.keys.meta, 'maxed');
return maxed === 1;
}

/**
* Get all repeatable meta jobs.
*
Expand Down
11 changes: 8 additions & 3 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,9 @@ export class Scripts {
async remove(jobId: string, removeChildren: boolean): Promise<number> {
const client = await this.queue.client;

const keys: (string | number)[] = [''].map(name => this.queue.toKey(name));
const keys: (string | number)[] = ['', 'meta'].map(name =>
this.queue.toKey(name),
);
return (<any>client).removeJob(
keys.concat([jobId, removeChildren ? 1 : 0]),
);
Expand Down Expand Up @@ -843,6 +845,8 @@ export class Scripts {
'active',
'waiting-children',
jobId,
'',
'meta',
'stalled',
].map(name => {
return this.queue.toKey(name);
Expand Down Expand Up @@ -1172,7 +1176,7 @@ export class Scripts {
const client = await this.queue.client;
const lockKey = `${this.queue.toKey(jobId)}:lock`;

const keys = [
const keys: (string | number)[] = [
this.queue.keys.active,
this.queue.keys.wait,
this.queue.keys.stalled,
Expand All @@ -1183,9 +1187,10 @@ export class Scripts {
this.queue.keys.prioritized,
this.queue.keys.marker,
this.queue.keys.events,
this.queue.toKey(''),
];

const args = [jobId, token, this.queue.toKey(jobId)];
const args = [jobId, token, this.queue.toKey(jobId), Date.now()];

const pttl = await (<any>client).moveJobFromActiveToWait(keys.concat(args));

Expand Down
2 changes: 1 addition & 1 deletion src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ will never work with more accuracy than 1ms. */

let timeout: NodeJS.Timeout;
try {
if (!this.closing) {
if (!this.closing && !this.limitUntil) {
let blockTimeout = this.getBlockTimeout(blockUntil);

if (blockTimeout > 0) {
Expand Down
21 changes: 21 additions & 0 deletions src/commands/includes/decreaseConcurrency.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
local function decreaseConcurrency(prefix, metaKey)
local maxConcurrency = rcall("HGET", metaKey, "concurrency")
if maxConcurrency then
local activeCountKey = prefix .. 'active:count'
local activeCount = rcall("GET", activeCountKey)
if activeCount then
local count
if activeCount == 1 then
rcall("DEL", activeCountKey)
count = 0
else
count = rcall("DECR", activeCountKey)
end

if count < tonumber(maxConcurrency) then
rcall("HDEL", metaKey, "maxed")
end
end
end
end

2 changes: 1 addition & 1 deletion src/commands/includes/getNextDelayedTimestamp.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ local function getNextDelayedTimestamp(delayedKey)
local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
if #result then
local nextTimestamp = tonumber(result[2])
if (nextTimestamp ~= nil) then
if nextTimestamp ~= nil then
nextTimestamp = nextTimestamp / 0x1000
end
return nextTimestamp
Expand Down
12 changes: 12 additions & 0 deletions src/commands/includes/increaseConcurrency.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
local function increaseConcurrency(prefix, metaKey)
local maxConcurrency = rcall("HGET", metaKey, "concurrency")
if maxConcurrency then
local count = rcall("INCR", prefix .. 'active:count')

if count >= tonumber(maxConcurrency) then
rcall("HSET", metaKey, "maxed", 1)
return true
end
end
return false
end
6 changes: 6 additions & 0 deletions src/commands/includes/isQueueMaxed.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
--[[
Function to check for the meta.maxed key to decide if we are maxed or not.
]]
local function isQueueMaxed(queueMetaKey)
return rcall("HEXISTS", queueMetaKey, "maxed") == 1
end
2 changes: 1 addition & 1 deletion src/commands/includes/isQueuePaused.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
(since an empty list and !EXISTS are not really the same).
]]
local function isQueuePaused(queueMetaKey)
return rcall("HEXISTS", queueMetaKey, "paused") == 1
return rcall("HEXISTS", queueMetaKey, "paused") == 1
end
6 changes: 5 additions & 1 deletion src/commands/includes/removeJobFromAnyState.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
prev state
]]

local function removeJobFromAnyState( prefix, jobId)
-- Includes
--- @include "decreaseConcurrency"

local function removeJobFromAnyState( prefix, jobId, metaKey)
-- We start with the ZSCORE checks, since they have O(1) complexity
if rcall("ZSCORE", prefix .. "completed", jobId) then
rcall("ZREM", prefix .. "completed", jobId)
Expand All @@ -28,6 +31,7 @@ local function removeJobFromAnyState( prefix, jobId)
elseif rcall("LREM", prefix .. "paused", 1, jobId) == 1 then
return "paused"
elseif rcall("LREM", prefix .. "active", 1, jobId) == 1 then
decreaseConcurrency(prefix, metaKey)
return "active"
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
KEYS[8] prioritized key
KEYS[9] marker key
KEYS[10] event key
KEYS[11] prefix key

ARGV[1] job id
ARGV[2] lock token
Expand All @@ -21,6 +22,7 @@ local rcall = redis.call

-- Includes
--- @include "includes/addJobInTargetList"
--- @include "includes/decreaseConcurrency"
--- @include "includes/pushBackJobWithPriority"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/getTargetQueueList"
Expand All @@ -47,6 +49,7 @@ if lockToken == token then
addJobInTargetList(target, KEYS[9], "RPUSH", isPaused, jobId)
end

decreaseConcurrency(KEYS[11], metaKey)
rcall("DEL", lockKey)

local maxEvents = getOrSetMaxEvents(metaKey)
Expand Down
2 changes: 2 additions & 0 deletions src/commands/moveStalledJobsToWait-9.lua
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ local rcall = redis.call
-- Includes
--- @include "includes/addJobInTargetList"
--- @include "includes/batches"
--- @include "includes/decreaseConcurrency"
--- @include "includes/getTargetQueueList"
--- @include "includes/moveParentFromWaitingChildrenToFailed"
--- @include "includes/moveParentToWaitIfNeeded"
Expand Down Expand Up @@ -79,6 +80,7 @@ if (#stalling > 0) then
local removed = rcall("LREM", activeKey, 1, jobId)

if (removed > 0) then
decreaseConcurrency(queueKeyPrefix, metaKey)
-- If this job has been stalled too many times, such as if it crashes the worker, then fail it.
local stalledCount =
rcall("HINCRBY", jobKey, "stalledCounter", 1)
Expand Down
13 changes: 11 additions & 2 deletions src/commands/moveToActive-11.lua
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ local opts = cmsgpack.unpack(ARGV[3])
--- @include "includes/getNextDelayedTimestamp"
--- @include "includes/getRateLimitTTL"
--- @include "includes/getTargetQueueList"
--- @include "includes/increaseConcurrency"
--- @include "includes/isQueueMaxed"
--- @include "includes/moveJobFromPriorityToActive"
--- @include "includes/prepareJobForProcessing"
--- @include "includes/promoteDelayedJobs"
Expand All @@ -57,14 +59,19 @@ local markerKey = KEYS[11]
promoteDelayedJobs(delayedKey, markerKey, target, KEYS[3], eventStreamKey, ARGV[1],
ARGV[2], KEYS[10], paused)

-- paused queue
if paused then return {0, 0, 0, 0} end

local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max'])
local expireTime = getRateLimitTTL(maxJobs, rateLimiterKey)

-- Check if we are rate limited first.
if expireTime > 0 then return {0, 0, expireTime, 0} end

-- paused queue
if paused then return {0, 0, 0, 0} end
local isMaxed = isQueueMaxed(KEYS[9])

-- maxed queue
if isMaxed then return {0, 0, 0, 0} end

-- no job ID, try non-blocking move from wait to active
local jobId = rcall("RPOPLPUSH", waitKey, activeKey)
Expand All @@ -76,11 +83,13 @@ if jobId and string.sub(jobId, 1, 2) == "0:" then
end

if jobId then
increaseConcurrency(ARGV[1], KEYS[9])
return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],
maxJobs, opts)
else
jobId = moveJobFromPriorityToActive(KEYS[3], activeKey, KEYS[10])
if jobId then
increaseConcurrency(ARGV[1], KEYS[9])
return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],
maxJobs, opts)
end
Expand Down
3 changes: 3 additions & 0 deletions src/commands/moveToDelayed-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ local rcall = redis.call

-- Includes
--- @include "includes/addDelayMarkerIfNeeded"
--- @include "includes/decreaseConcurrency"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/isQueuePaused"
--- @include "includes/removeLock"
Expand All @@ -52,6 +53,8 @@ if rcall("EXISTS", jobKey) == 1 then
local numRemovedElements = rcall("LREM", KEYS[2], -1, jobId)
if numRemovedElements < 1 then return -3 end

decreaseConcurrency(ARGV[1], metaKey)

if ARGV[7] == "0" then
rcall("HINCRBY", jobKey, "atm", 1)
end
Expand Down
9 changes: 9 additions & 0 deletions src/commands/moveToFinished-14.lua
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ local rcall = redis.call

--- Includes
--- @include "includes/collectMetrics"
--- @include "includes/decreaseConcurrency"
--- @include "includes/getNextDelayedTimestamp"
--- @include "includes/getRateLimitTTL"
--- @include "includes/getTargetQueueList"
--- @include "includes/isQueueMaxed"
--- @include "includes/moveJobFromPriorityToActive"
--- @include "includes/moveParentFromWaitingChildrenToFailed"
--- @include "includes/moveParentToWaitIfNeeded"
Expand Down Expand Up @@ -116,6 +118,8 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
-- Trim events before emiting them to avoid trimming events emitted in this script
trimEvents(metaKey, eventStreamKey)

decreaseConcurrency(ARGV[7], metaKey)

-- If job has a parent we need to
-- 1) remove this job id from parents dependencies
-- 2) move the job Id to parent "processed" set
Expand Down Expand Up @@ -217,6 +221,11 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
-- paused queue
if paused then return {0, 0, 0, 0} end

local isMaxed = isQueueMaxed(KEYS[9])

-- maxed queue
if isMaxed then return {0, 0, 0, 0} end

jobId = rcall("RPOPLPUSH", KEYS[1], KEYS[2])

if jobId then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
KEYS[2] active key
KEYS[3] waitChildrenKey key
KEYS[4] job key
KEYS[5] stalled key
KEYS[5] key prefix
KEYS[6] meta key
KEYS[7] stalled key

ARGV[1] token
ARGV[2] child key
Expand All @@ -21,12 +23,15 @@
-3 - Job not in active set
]]
local rcall = redis.call
local prefix = KEYS[5]
local metaKey = KEYS[6]

-- Includes
--- Includes
--- @include "includes/decreaseConcurrency"
--- @include "includes/removeLock"

local function moveToWaitingChildren (activeKey, waitingChildrenKey, jobId,
timestamp)
timestamp, prefix, metaKey)
local score = tonumber(timestamp)

local numRemovedElements = rcall("LREM", activeKey, -1, jobId)
Expand All @@ -37,27 +42,28 @@ local function moveToWaitingChildren (activeKey, waitingChildrenKey, jobId,

rcall("ZADD", waitingChildrenKey, score, jobId)

decreaseConcurrency(prefix, metaKey)
return 0
end

if rcall("EXISTS", KEYS[4]) == 1 then
if ARGV[2] ~= "" then
if rcall("SISMEMBER", KEYS[4] .. ":dependencies", ARGV[2]) ~= 0 then
local errorCode = removeLock(KEYS[4], KEYS[5], ARGV[1], ARGV[4])
local errorCode = removeLock(KEYS[4], KEYS[7], ARGV[1], ARGV[4])
if errorCode < 0 then
return errorCode
end
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3])
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], prefix, metaKey)
end

return 1
else
if rcall("SCARD", KEYS[4] .. ":dependencies") ~= 0 then
local errorCode = removeLock(KEYS[4], KEYS[5], ARGV[1], ARGV[4])
local errorCode = removeLock(KEYS[4], KEYS[7], ARGV[1], ARGV[4])
if errorCode < 0 then
return errorCode
end
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3])
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], prefix, metaKey)
end

return 1
Expand Down