Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/optimize opts properties #1694

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ const logger = debuglog('bull');
const optsDecodeMap = {
fpof: 'failParentOnFailure',
kl: 'keepLogs',
roc: 'removeOnComplete',
rof: 'removeOnFail',
i: 'jobId',
p: 'priority',
d: 'delay',
a: 'attempts',
b: 'backoff',
l: 'lifo',
stl: 'stackTraceLimit',
sl: 'sizeLimit',
};

const optsEncodeMap = invert(optsDecodeMap);
Expand Down
4 changes: 0 additions & 4 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -812,10 +812,6 @@ export class Scripts {
}),
];

if (opts.limiter) {
args.push(opts.limiter.max, opts.limiter.duration);
}

const result = await (<any>client).moveToActive(
(<(string | number | boolean | Buffer)[]>keys).concat(args),
);
Expand Down
6 changes: 3 additions & 3 deletions src/commands/addJob-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ end

-- Store the job.
local jsonOpts = cjson.encode(opts)
local delay = opts['delay'] or 0
local priority = opts['priority'] or 0
local delay = opts['d'] or 0
local priority = opts['p'] or 0

local optionalValues = {}
if parentKey ~= nil then
Expand Down Expand Up @@ -150,7 +150,7 @@ else
-- Standard or priority add
if priority == 0 then
-- LIFO or FIFO
local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH';
local pushCmd = opts['l'] and 'RPUSH' or 'LPUSH';
rcall(pushCmd, target, jobId)
else
-- Priority add
Expand Down
13 changes: 8 additions & 5 deletions src/commands/includes/checkStalledJobs.lua
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ local function checkStalledJobs(stalledKey, waitKey, activeKey, failedKey,
if (stalledCount > MAX_STALLED_JOB_COUNT) then
local rawOpts = rcall("HGET", jobKey, "opts")
local opts = cjson.decode(rawOpts)
local removeOnFailType = type(opts["removeOnFail"])

local removeOnFail = opts["rof"] or opts["removeOnFail"]

local removeOnFailType = type(removeOnFail)
rcall("ZADD", failedKey, timestamp, jobId)
local failedReason =
"job stalled more than allowable limit"
Expand All @@ -82,16 +85,16 @@ local function checkStalledJobs(stalledKey, waitKey, activeKey, failedKey,
'failedReason', failedReason)

if removeOnFailType == "number" then
removeJobsByMaxCount(opts["removeOnFail"],
removeJobsByMaxCount(removeOnFail,
failedKey, queueKeyPrefix)
elseif removeOnFailType == "boolean" then
if opts["removeOnFail"] then
if removeOnFail then
removeJob(jobId, false, queueKeyPrefix)
rcall("ZREM", failedKey, jobId)
end
elseif removeOnFailType ~= "nil" then
local maxAge = opts["removeOnFail"]["age"]
local maxCount = opts["removeOnFail"]["count"]
local maxAge = removeOnFail["age"]
local maxCount = removeOnFail["count"]

if maxAge ~= nil then
removeJobsByMaxAge(timestamp, maxAge,
Expand Down
10 changes: 6 additions & 4 deletions tests/test_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ describe('Job', function () {
await removeAllQueueData(new IORedis(), parentQueueName);
});

it('removes 4000 jobs in time rage of 4000ms', async function () {
this.timeout(4000);
it('removes 4000 jobs in time rage of 8000ms', async function () {
this.timeout(8000);
const numJobs = 4000;

// Create waiting jobs
Expand All @@ -279,8 +279,10 @@ describe('Job', function () {
const delayedJobs = await queue.addBulk(jobsDataWithDelay);

// Remove all jobs
await Promise.all(delayedJobs.map(job => job.remove()));
await Promise.all(waitingJobs.map(job => job.remove()));
await Promise.all([
Promise.all(delayedJobs.map(job => job.remove())),
Promise.all(waitingJobs.map(job => job.remove())),
]);

const countJobs = await queue.getJobCountByTypes('waiting', 'delayed');
expect(countJobs).to.be.equal(0);
Expand Down