Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ci(upstash): test updates for Upstash #2304

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 23 additions & 20 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,26 +116,29 @@ jobs:
- run: yarn build
- run: BULLMQ_TEST_PREFIX={b} yarn test

# node-upstash:
# runs-on: ubuntu-latest
# continue-on-error: true

# env:
# node-version: lts/*
# REDIS_HOST: ${{ secrets.REDIS_HOST }}

# name: testing node@lts/*, upstash@latest
# steps:
# - name: Checkout repository
# uses: actions/checkout@v3 # v3
# - name: Use Node.js ${{ env.node-version }}
# uses: actions/setup-node@v3 # v3
# with:
# node-version: lts/*
# cache: 'yarn'
# - run: yarn install --ignore-engines --frozen-lockfile --non-interactive
# - run: yarn build
# - run: yarn test
node-upstash:
runs-on: ubuntu-latest
continue-on-error: true

env:
node-version: lts/*

name: testing node@lts/*, upstash@latest
steps:
- name: Checkout repository
uses: actions/checkout@v3 # v3
- name: Use Node.js ${{ env.node-version }}
uses: actions/setup-node@v3 # v3
with:
node-version: lts/*
cache: 'yarn'
- run: yarn install --ignore-engines --frozen-lockfile --non-interactive
- run: yarn build
- name: Run tests using upstash
run: REDIS_HOST=${{ secrets.UPSTASH_HOST }} yarn test
env:
REDIS_HOST: ${{ secrets.UPSTASH_HOST }}
NPM_TOKEN: ${{ secrets.NPM_TOKEN }}

python:
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,9 @@ export class Worker<

stalled.forEach((jobId: string) => this.emit('stalled', jobId, 'active'));

// It is possible that a failed job has been removed from the queue
// before we are able to notify the worker about it (if using the removeOnFail option).
// In this case the job will be undefined.
const jobPromises: Promise<Job<DataType, ResultType, NameType>>[] = [];
for (let i = 0; i < failed.length; i++) {
jobPromises.push(
Expand Down
5 changes: 4 additions & 1 deletion tests/test_bulk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ describe('bulk jobs', () => {

let connection;
before(async function () {
connection = new IORedis(redisHost, { maxRetriesPerRequest: null });
connection = new IORedis(redisHost, {
maxRetriesPerRequest: null,
disconnectTimeout: 0,
});
});

beforeEach(async function () {
Expand Down
5 changes: 4 additions & 1 deletion tests/test_clean.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ describe('Cleaner', () => {

let connection;
before(async function () {
connection = new IORedis(redisHost, { maxRetriesPerRequest: null });
connection = new IORedis(redisHost, {
maxRetriesPerRequest: null,
disconnectTimeout: 0,
});
});

beforeEach(async () => {
Expand Down
8 changes: 5 additions & 3 deletions tests/test_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ describe('connection', () => {

let connection;
before(async function () {
connection = new IORedis(redisHost, { maxRetriesPerRequest: null });
connection = new IORedis(redisHost, {
maxRetriesPerRequest: null,
disconnectTimeout: 0,
});
});

beforeEach(async function () {
Expand All @@ -39,8 +42,7 @@ describe('connection', () => {

describe('prefix', () => {
it('should throw exception if using prefix with ioredis', async () => {
const connection = new IORedis({
host: redisHost,
const connection = new IORedis(redisHost, {
keyPrefix: 'bullmq',
});

Expand Down
5 changes: 4 additions & 1 deletion tests/test_delay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ describe('Delayed jobs', function () {

let connection;
before(async function () {
connection = new IORedis(redisHost, { maxRetriesPerRequest: null });
connection = new IORedis(redisHost, {
maxRetriesPerRequest: null,
disconnectTimeout: 0,
});
});

beforeEach(async function () {
Expand Down
23 changes: 16 additions & 7 deletions tests/test_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ describe('events', function () {

let connection;
before(async function () {
connection = new IORedis(redisHost, { maxRetriesPerRequest: null });
connection = new IORedis(redisHost, {
maxRetriesPerRequest: null,
disconnectTimeout: 0,
});
});

beforeEach(async function () {
Expand Down Expand Up @@ -644,8 +647,10 @@ describe('events', function () {
await waitCompletedEvent;

const eventsLength = await client.xlen(trimmedQueue.keys.events);

expect(eventsLength).to.be.lte(35);
// Trim near is not guaranteed to trim all in redis spec.
if (!process.env.UPSTASH_HOST) {
expect(eventsLength).to.be.lte(35);
}
expect(eventsLength).to.be.gte(20);

await worker.close();
Expand Down Expand Up @@ -702,8 +707,10 @@ describe('events', function () {
await waitDelayedEvent;

const eventsLength = await client.xlen(trimmedQueue.keys.events);

expect(eventsLength).to.be.lte(35);
// Trim near is not guaranteed to trim all in redis spec.
if (!process.env.UPSTASH_HOST) {
expect(eventsLength).to.be.lte(35);
}
expect(eventsLength).to.be.gte(20);

await worker.close();
Expand Down Expand Up @@ -759,8 +766,10 @@ describe('events', function () {
await waitCompletedEvent;

const eventsLength = await client.xlen(trimmedQueue.keys.events);

expect(eventsLength).to.be.lte(35);
// Trim near is not guaranteed to trim all in redis spec.
if (!process.env.UPSTASH_HOST) {
expect(eventsLength).to.be.lte(35);
}
expect(eventsLength).to.be.gte(20);

await worker.close();
Expand Down
5 changes: 4 additions & 1 deletion tests/test_flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ describe('flows', () => {

let connection;
before(async function () {
connection = new IORedis(redisHost, { maxRetriesPerRequest: null });
connection = new IORedis(redisHost, {
maxRetriesPerRequest: null,
disconnectTimeout: 0,
});
});

beforeEach(async function () {
Expand Down
8 changes: 5 additions & 3 deletions tests/test_getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ describe('Jobs getters', function () {

let connection;
before(async function () {
connection = new IORedis(redisHost, { maxRetriesPerRequest: null });
connection = new IORedis(redisHost, {
maxRetriesPerRequest: null,
disconnectTimeout: 0,
});
});

beforeEach(async function () {
Expand Down Expand Up @@ -128,8 +131,7 @@ describe('Jobs getters', function () {
describe('when sharing connection', () => {
// Test is very flaky on CI, so we skip it for now.
it('gets all workers for a given queue', async function () {
const ioredisConnection = new IORedis({
host: redisHost,
const ioredisConnection = new IORedis(redisHost, {
maxRetriesPerRequest: null,
});

Expand Down
11 changes: 9 additions & 2 deletions tests/test_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ describe('Job', function () {
let queueName: string;
let connection;
before(async function () {
connection = new IORedis(redisHost, { maxRetriesPerRequest: null });
connection = new IORedis(redisHost, {
maxRetriesPerRequest: null,
disconnectTimeout: 0,
});
});

beforeEach(async function () {
Expand Down Expand Up @@ -314,7 +317,11 @@ describe('Job', function () {
});

it('removes 4000 jobs in time rage of 4000ms', async function () {
this.timeout(8000);
if (process.env.UPSTASH_HOST) {
this.timeout(400000);
} else {
this.timeout(8000);
}
const numJobs = 4000;

// Create waiting jobs
Expand Down
5 changes: 4 additions & 1 deletion tests/test_metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ describe('metrics', function () {

let connection;
before(async function () {
connection = new IORedis(redisHost, { maxRetriesPerRequest: null });
connection = new IORedis(redisHost, {
maxRetriesPerRequest: null,
disconnectTimeout: 0,
});
});

beforeEach(function () {
Expand Down
7 changes: 5 additions & 2 deletions tests/test_obliterate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ describe('Obliterate', function () {

let connection;
before(async function () {
connection = new IORedis(redisHost, { maxRetriesPerRequest: null });
connection = new IORedis(redisHost, {
maxRetriesPerRequest: null,
disconnectTimeout: 0,
});
});

beforeEach(async () => {
Expand Down Expand Up @@ -429,7 +432,7 @@ describe('Obliterate', function () {
});

it('should obliterate a queue with high number of jobs in different statuses', async function () {
this.timeout(6000);
this.timeout(60000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this also needed due to xtrim?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My answer has to be similar to #2304 (comment)
Uptash is probably slower because it is not just in memory. I don't really remember how much longer was this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that even for disk, the amount of time should not be that high considering the database would be almost empty as every test case deletes itself. I think it is worth to investigate if the delay is legit or if there is something else going on, as this could affect BullMQ users in production.

const arr1: Promise<Job<any, any, string>>[] = [];
for (let i = 0; i < 300; i++) {
arr1.push(queue.add('test', { foo: `barLoop${i}` }));
Expand Down
5 changes: 4 additions & 1 deletion tests/test_pause.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ describe('Pause', function () {

let connection;
before(async function () {
connection = new IORedis(redisHost, { maxRetriesPerRequest: null });
connection = new IORedis(redisHost, {
maxRetriesPerRequest: null,
disconnectTimeout: 0,
});
});

beforeEach(async function () {
Expand Down
5 changes: 4 additions & 1 deletion tests/test_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ describe('queues', function () {

let connection;
before(async function () {
connection = new IORedis(redisHost, { maxRetriesPerRequest: null });
connection = new IORedis(redisHost, {
maxRetriesPerRequest: null,
disconnectTimeout: 0,
});
});

beforeEach(async function () {
Expand Down
11 changes: 9 additions & 2 deletions tests/test_rate_limiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ describe('Rate Limiter', function () {

let connection;
before(async function () {
connection = new IORedis(redisHost, { maxRetriesPerRequest: null });
connection = new IORedis(redisHost, {
maxRetriesPerRequest: null,
disconnectTimeout: 0,
});
});

beforeEach(async function () {
Expand Down Expand Up @@ -660,7 +663,11 @@ describe('Rate Limiter', function () {
describe('when there are more added jobs than max limiter', () => {
it('processes jobs as max limiter from the beginning', async function () {
const numJobs = 400;
this.timeout(5000);
if (process.env.UPSTASH_HOST) {
this.timeout(10000);
} else {
this.timeout(5000);
}
let parallelJobs = 0;

const processor = async () => {
Expand Down
5 changes: 4 additions & 1 deletion tests/test_repeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ describe('repeat', function () {

let connection;
before(async function () {
connection = new IORedis(redisHost, { maxRetriesPerRequest: null });
connection = new IORedis(redisHost, {
maxRetriesPerRequest: null,
disconnectTimeout: 0,
});
});

beforeEach(async function () {
Expand Down
5 changes: 4 additions & 1 deletion tests/test_sandboxed_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ function sandboxProcessTests(

let connection;
before(async function () {
connection = new IORedis(redisHost, { maxRetriesPerRequest: null });
connection = new IORedis(redisHost, {
maxRetriesPerRequest: null,
disconnectTimeout: 0,
});
});

beforeEach(async function () {
Expand Down