Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Search #2190

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
11 changes: 10 additions & 1 deletion src/classes/queue-getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
WORKER_SUFFIX,
} from '../utils';
import { JobState, JobType } from '../types';
import { Metrics } from '../interfaces';
import { FilteredJobsResult, Metrics } from '../interfaces';

/**
*
Expand Down Expand Up @@ -355,6 +355,15 @@ export class QueueGetters<
);
}

async getJobsByFilter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't forget the typedoc style documentation for this public method.

type: JobType,
filter: Record<string, any>,
cursor: number,
count = 20,
): Promise<FilteredJobsResult> {
return this.scripts.getJobsByFilter(type, filter, cursor, count);
}

/**
* Returns the logs for a given Job.
* @param jobId - the id of the job to get the logs for.
Expand Down
84 changes: 83 additions & 1 deletion src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

/*eslint-env node */
'use strict';
import { Job } from '../classes';
import { Packr } from 'msgpackr';

const packer = new Packr({
Expand All @@ -14,6 +15,7 @@ const packer = new Packr({
const pack = packer.pack;

import {
FilteredJobsResult,
JobJson,
JobJsonRaw,
MinimalJob,
Expand All @@ -32,7 +34,12 @@ import {
RedisJobOptions,
} from '../types';
import { ErrorCode } from '../enums';
import { array2obj, getParentKey, isRedisVersionLowerThan } from '../utils';
import {
array2obj,
isEmpty,
getParentKey,
isRedisVersionLowerThan,
} from '../utils';
import { ChainableCommander } from 'ioredis';

export type JobData = [JobJsonRaw | number, string?];
Expand Down Expand Up @@ -405,6 +412,81 @@ export class Scripts {
return (<any>client).getRanges(args);
}

/**
* Retrieve jobs by a user-defined mongo-compatible filter object
* @param type - type of job
* @param filter - mongo-like filter
* @param cursor - cursor position
* @param count - count of jobs to return per iteration
*/
async getJobsByFilter(
type: JobType,
filter: Record<string, unknown>,
cursor: number,
count = 20,
): Promise<FilteredJobsResult> {
const client = await this.queue.client;
type = type === 'waiting' ? 'wait' : type; // alias
const key = type ? this.queue.toKey(type) : '';
const prefix = this.queue.toKey('');
const criteria = JSON.stringify(filter);

const response = await (<any>client).getJobsByFilter(
key,
prefix,
criteria,
cursor,
count,
);

const newCursor = response[0] === '0' ? null : Number(response[0]);
const jobs: Job[] = [];

let currentJob: Record<string, any> = {};
let jobId: string = null;

const queue: MinimalQueue = this.queue;

function addJobIfNeeded() {
if (currentJob && !isEmpty(currentJob) && jobId) {
// TODO: verify this
const trace = currentJob['stacktrace'];
if (!Array.isArray(trace)) {
if (typeof trace === 'string') {
currentJob['stacktrace'] = JSON.parse(trace);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious about this logic, https://github.com/taskforcesh/bullmq/blob/master/src/classes/job.ts#L316 fromJSON method is handling stacktrace values

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely be better if we could just use Job.fromJson here.

} else {
currentJob['stacktrace'] = [];
}
}
const raw = currentJob as JobJsonRaw;
const job = Job.fromJSON(queue, raw, jobId);
const ts = currentJob['timestamp'];
job.timestamp = ts ? parseInt(ts) : Date.now();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jobs.push(job);
}
}

for (let i = 1; i < response.length; i += 2) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so the thing that makes this code more complex seems to be that all the jobs are returned flattened in one single array. Wouldn't be better to return an array of arrays from the lua script so that we could have much simpler code here?

const key = response[i];
const value = response[i + 1];

if (key === 'jobId') {
addJobIfNeeded();
jobId = value;
currentJob = {};
} else {
currentJob[key] = value;
}
}

addJobIfNeeded();

return {
cursor: newCursor,
jobs,
};
}

private getCountsArgs(types: JobType[]): (string | number)[] {
const queueKeys = this.queue.keys;
const transformedTypes = types.map(type => {
Expand Down