Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job.runAt property to expose timestamp that job is supposed to run at #1732

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ export class Job<
*/
timestamp: number;

/**
* Timestamp when the job is expected to run
*/
runAt?: number;

/**
* Number of attempts after the job has failed.
* @defaultValue 0
Expand Down Expand Up @@ -168,6 +173,7 @@ export class Job<
this.repeatJobKey = repeatJobKey;

this.timestamp = opts.timestamp ? opts.timestamp : Date.now();
this.runAt = this.timestamp + (this.delay || 0);

this.opts.backoff = Backoffs.normalize(opts.backoff);

Expand Down Expand Up @@ -286,6 +292,10 @@ export class Job<

job.timestamp = parseInt(json.timestamp);

if (json.runAt) {
job.runAt = parseInt(json.runAt);
}

if (json.finishedOn) {
job.finishedOn = parseInt(json.finishedOn);
}
Expand Down Expand Up @@ -385,6 +395,7 @@ export class Job<
attemptsMade: this.attemptsMade,
finishedOn: this.finishedOn,
processedOn: this.processedOn,
runAt: this.runAt,
timestamp: this.timestamp,
failedReason: JSON.stringify(this.failedReason),
stacktrace: JSON.stringify(this.stacktrace),
Expand Down Expand Up @@ -729,7 +740,7 @@ export class Job<
* @returns void
*/
async changeDelay(delay: number): Promise<void> {
await this.scripts.changeDelay(this.id, delay);
this.runAt = await this.scripts.changeDelay(this.id, delay);
this.delay = delay;
}

Expand Down
21 changes: 13 additions & 8 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -520,36 +520,41 @@ export class Scripts {
return (<any>client).getStateV2(keys.concat([jobId]));
}

async changeDelay(jobId: string, delay: number): Promise<void> {
async changeDelay(jobId: string, delay: number): Promise<number> {
const client = await this.queue.client;

const args = this.changeDelayArgs(jobId, delay);
const delayedTimestamp = Date.now() + delay;
const args = this.changeDelayArgs(jobId, delay, delayedTimestamp);
const result = await (<any>client).changeDelay(args);
if (result < 0) {
throw this.finishedErrors(result, jobId, 'changeDelay', 'delayed');
}

return delayedTimestamp;
}

private changeDelayArgs(jobId: string, delay: number): (string | number)[] {
private changeDelayArgs(
jobId: string,
delay: number,
delayedTimestamp: number,
): (string | number)[] {
//
// Bake in the job id first 12 bits into the timestamp
// to guarantee correct execution order of delayed jobs
// (up to 4096 jobs per given timestamp or 4096 jobs apart per timestamp)
//
// WARNING: Jobs that are so far apart that they wrap around will cause FIFO to fail
//
let timestamp = Date.now() + delay;

if (timestamp > 0) {
timestamp = timestamp * 0x1000 + (+jobId & 0xfff);
if (delayedTimestamp > 0) {
delayedTimestamp = delayedTimestamp * 0x1000 + (+jobId & 0xfff);
}

const keys: (string | number)[] = ['delayed', jobId].map(name => {
return this.queue.toKey(name);
});
keys.push.apply(keys, [this.queue.keys.events]);

return keys.concat([delay, JSON.stringify(timestamp), jobId]);
return keys.concat([delay, JSON.stringify(delayedTimestamp), jobId]);
}

// Note: We have an issue here with jobs using custom job ids
Expand Down
10 changes: 6 additions & 4 deletions src/commands/addJob-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,16 @@ if repeatJobKey ~= nil then
table.insert(optionalValues, repeatJobKey)
end

-- Check if job is delayed
local delayedTimestamp = (delay > 0 and (timestamp + delay)) or 0
local runAt = (delayedTimestamp > 0 and delayedTimestamp) or timestamp

rcall("HMSET", jobIdKey, "name", args[3], "data", ARGV[2], "opts", jsonOpts,
"timestamp", timestamp, "delay", delay, "priority", priority, unpack(optionalValues))
"timestamp", timestamp, "delay", delay, "runAt", runAt, "priority", priority,
unpack(optionalValues))

rcall("XADD", KEYS[8], "*", "event", "added", "jobId", jobId, "name", args[3])

-- Check if job is delayed
local delayedTimestamp = (delay > 0 and (timestamp + delay)) or 0

-- Check if job is a parent, if so add to the parents set
local waitChildrenKey = args[6]
if waitChildrenKey ~= nil then
Expand Down
2 changes: 1 addition & 1 deletion src/commands/changeDelay-3.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ if rcall("EXISTS", KEYS[2]) == 1 then
return -3
end

rcall("HSET", KEYS[2], "delay", tonumber(ARGV[1]))
rcall("HMSET", KEYS[2], "delay", tonumber(ARGV[1]), "runAt", delayedTimestamp)
rcall("ZADD", KEYS[1], score, jobId)

rcall("XADD", KEYS[3], "*", "event", "delayed", "jobId", jobId, "delay", delayedTimestamp);
Expand Down
10 changes: 6 additions & 4 deletions src/commands/includes/updateParentDepsIfNeeded.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@ local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDepende
local processedSet = parentKey .. ":processed"
rcall("HSET", processedSet, jobIdKey, returnvalue)
local activeParent = rcall("ZSCORE", parentQueueKey .. ":waiting-children", parentId)
if rcall("SCARD", parentDependenciesKey) == 0 and activeParent then
if rcall("SCARD", parentDependenciesKey) == 0 and activeParent then
rcall("ZREM", parentQueueKey .. ":waiting-children", parentId)
local parentTarget = getTargetQueueList(parentQueueKey .. ":meta", parentQueueKey .. ":wait",
parentQueueKey .. ":paused")
local jobAttributes = rcall("HMGET", parentKey, "priority", "delay")
local priority = tonumber(jobAttributes[1]) or 0
local delay = tonumber(jobAttributes[2]) or 0
local runAt = tonumber(timestamp) + delay

rcall("HSET", parentKey, "runAt", runAt)
if delay > 0 then
local delayedTimestamp = tonumber(timestamp) + delay
local score = delayedTimestamp * 0x1000
local parentDelayedKey = parentQueueKey .. ":delayed"
local score = runAt * 0x1000
local parentDelayedKey = parentQueueKey .. ":delayed"
rcall("ZADD", parentDelayedKey, score, parentId)

addDelayMarkerIfNeeded(parentTarget, parentDelayedKey)
Expand Down
1 change: 1 addition & 0 deletions src/commands/moveToDelayed-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ if rcall("EXISTS", jobKey) == 1 then
end

rcall("ZADD", delayedKey, score, jobId)
rcall("HSET", jobKey, "runAt", delayedTimestamp)
rcall("XADD", KEYS[6], "*", "event", "delayed", "jobId", jobId, "delay", delayedTimestamp)

-- Check if we need to push a marker job to wake up sleeping workers.
Expand Down
2 changes: 2 additions & 0 deletions src/interfaces/job-json.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export interface JobJson {
attemptsMade: number;
finishedOn?: number;
processedOn?: number;
runAt?: number;
timestamp: number;
failedReason: string;
stacktrace: string;
Expand All @@ -29,6 +30,7 @@ export interface JobJsonRaw {
attemptsMade: string;
finishedOn?: string;
processedOn?: string;
runAt?: string;
timestamp: string;
failedReason: string;
stacktrace: string[];
Expand Down
4 changes: 4 additions & 0 deletions src/interfaces/minimal-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ export interface MinimalJob<
* Timestamp when the job was created (unless overridden with job options).
*/
timestamp: number;
/**
* Timestamp when the job is expected to run
*/
runAt?: number;
/**
* Number of attempts after the job has failed.
* @defaultValue 0
Expand Down
10 changes: 10 additions & 0 deletions tests/test_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,14 @@ describe('Job', function () {
const storedJob = await Job.fromId(queue, job.id);
expect(storedJob).to.have.property('id');
expect(storedJob).to.have.property('data');
expect(storedJob).to.have.property('timestamp');
expect(storedJob).to.have.property('runAt');

expect(storedJob.data.foo).to.be.equal('bar');
expect(storedJob.opts).to.be.an('object');
expect(storedJob.opts.timestamp).to.be.equal(timestamp);
expect(storedJob.timestamp).to.be.equal(timestamp);
expect(storedJob.runAt).to.be.equal(timestamp);
});

it('should use the custom jobId if one is provided', async function () {
Expand Down Expand Up @@ -709,11 +713,17 @@ describe('Job', function () {
const isDelayed = await job.isDelayed();
expect(isDelayed).to.be.equal(true);

const changeDelayTime = new Date().getTime();
await job.changeDelay(4000);

const isDelayedAfterChangeDelay = await job.isDelayed();
expect(isDelayedAfterChangeDelay).to.be.equal(true);
expect(job.delay).to.be.equal(4000);
expect(job.runAt).to.be.gte(changeDelayTime + 4000);

const storedJob = await Job.fromId(queue, job.id);
expect(storedJob.delay).to.be.equal(job.delay);
expect(storedJob.runAt).to.be.equal(job.runAt);

await completing;

Expand Down