Skip to content

Commit

Permalink
fix(stalled): consider ignoreDependencyOnFailure option (python) (#2540
Browse files Browse the repository at this point in the history
…) fixes #2531
  • Loading branch information
roggervalf committed Apr 24, 2024
1 parent 0fb9b85 commit 0140959
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 6 deletions.
1 change: 1 addition & 0 deletions python/bullmq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

optsDecodeMap = {
'fpof': 'failParentOnFailure',
'idof': 'ignoreDependencyOnFailure',
'kl': 'keepLogs',
}

Expand Down
8 changes: 2 additions & 6 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,11 +512,6 @@ def getMetricsSize(opts: dict):
return metrics.get("maxDataPoints", "")
return ""

def getFailParentOnFailure(job: Job):
opts = job.opts
if opts is not None:
return opts.get("failParentOnFailure", False)

keepJobs = getKeepJobs(shouldRemove)

packedOpts = msgpack.packb({
Expand All @@ -527,7 +522,8 @@ def getFailParentOnFailure(job: Job):
"attempts": job.attempts,
"attemptsMade": job.attemptsMade,
"maxMetricsSize": getMetricsSize(opts),
"fpof": getFailParentOnFailure(job),
"fpof": opts.get("failParentOnFailure", False),
"idof": opts.get("ignoreDependencyOnFailure", False)
}, use_bin_type=True)

args = [job.id, timestamp, propVal, transformed_value or "", target,
Expand Down
11 changes: 11 additions & 0 deletions src/commands/moveStalledJobsToWait-9.lua
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ local rcall = redis.call
--- @include "includes/batches"
--- @include "includes/getTargetQueueList"
--- @include "includes/moveParentFromWaitingChildrenToFailed"
--- @include "includes/moveParentToWaitIfNeeded"
--- @include "includes/removeJob"
--- @include "includes/removeJobsByMaxAge"
--- @include "includes/removeJobsByMaxCount"
Expand Down Expand Up @@ -105,6 +106,16 @@ if (#stalling > 0) then
jobKey,
timestamp
)
elseif opts['idof'] then
local parentData = cjson.decode(rawParentData)
local parentKey = parentData['queueKey'] .. ':' .. parentData['id']
local dependenciesSet = parentKey .. ":dependencies"
if rcall("SREM", dependenciesSet, jobKey) == 1 then
moveParentToWaitIfNeeded(parentData['queueKey'], dependenciesSet,
parentKey, parentData['id'], timestamp)
local failedSet = parentKey .. ":failed"
rcall("HSET", failedSet, jobKey, failedReason)
end
end
if removeOnFailType == "number" then
removeJobsByMaxCount(opts["removeOnFail"],
Expand Down
89 changes: 89 additions & 0 deletions tests/test_stalled_jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,95 @@ describe('stalled jobs', function () {
});
});

describe('when ignoreDependencyOnFailure is provided as true', function () {
it('should move parent to waiting when child is moved to failed', async function () {
this.timeout(6000);
const concurrency = 4;
const parentQueueName = `parent-queue-${v4()}`;

const parentQueue = new Queue(parentQueueName, {
connection,
prefix,
});

const flow = new FlowProducer({ connection, prefix });

const worker = new Worker(
queueName,
async () => {
return delay(10000);
},
{
connection,
prefix,
lockDuration: 1000,
stalledInterval: 100,
maxStalledCount: 0,
concurrency,
},
);

const allActive = new Promise(resolve => {
worker.on('active', after(concurrency, resolve));
});

await worker.waitUntilReady();

const { job: parent } = await flow.add({
name: 'parent-job',
queueName: parentQueueName,
data: {},
children: [
{
name: 'test',
data: { foo: 'bar' },
queueName,
opts: { ignoreDependencyOnFailure: true },
},
],
});

const jobs = Array.from(Array(3).keys()).map(index => ({
name: 'test',
data: { index },
}));

await queue.addBulk(jobs);
await allActive;
await worker.close(true);

const worker2 = new Worker(queueName, async job => {}, {
connection,
prefix,
stalledInterval: 100,
maxStalledCount: 0,
concurrency,
});

const errorMessage = 'job stalled more than allowable limit';
const allFailed = new Promise<void>(resolve => {
worker2.on(
'failed',
after(concurrency, async (job, failedReason, prev) => {
const parentState = await parent.getState();

expect(parentState).to.be.equal('waiting');
expect(prev).to.be.equal('active');
expect(failedReason.message).to.be.equal(errorMessage);
resolve();
}),
);
});

await allFailed;

await worker2.close();
await parentQueue.close();
await flow.close();
await removeAllQueueData(new IORedis(redisHost), parentQueueName);
});
});

describe('when removeOnFail is provided as a number', function () {
it('keeps the specified number of jobs in failed', async function () {
this.timeout(6000);
Expand Down

0 comments on commit 0140959

Please sign in to comment.