Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: keep jobs in waiting list when queue is paused #2769

Open
wants to merge 35 commits into
base: v6
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
481b664
refactor: use isQueuePausedOrMaxed instead of getTargetQueueList
roggervalf Sep 12, 2024
2654bb8
chore: consider moving jobs from paused to wait if there are
roggervalf Sep 12, 2024
9286d68
refactor: obliterate to consider removing from wait
roggervalf Sep 13, 2024
1f2f889
refactor: remove paused key from addStandardJob
roggervalf Sep 13, 2024
a66ddba
refactor: remove paused key scripts
roggervalf Sep 13, 2024
87447a8
refactor: remove paused key from other scripts
roggervalf Sep 13, 2024
c3acbb9
refactor: remove paused key on moveToActive
roggervalf Sep 13, 2024
e973ec5
refactor: remove paused key on moveToFinished
roggervalf Sep 13, 2024
65c3beb
chore: fix moveToFinishedKeys ref
roggervalf Sep 13, 2024
7a93835
chore: add repairPausedKey script
roggervalf Sep 14, 2024
0cde0c3
chore: fix merge conflicts
roggervalf Sep 21, 2024
537ce42
docs(migrations): add new section
roggervalf Sep 21, 2024
e593d8a
docs: fix typo
roggervalf Sep 21, 2024
efe1a56
chore: restore few decoded options
roggervalf Sep 24, 2024
42ebfcb
refactor: add executeMigrations script
roggervalf Sep 26, 2024
616e45d
Merge branch 'master' into refactor-paused
roggervalf Sep 26, 2024
a8103c0
docs(flows): add missing word in flows guide (#2788)
armpogart Sep 27, 2024
f5f7426
test: update test cases
roggervalf Sep 27, 2024
db131b4
Merge branch 'master' into refactor-paused
roggervalf Sep 27, 2024
9a3067b
chore: remove only statement
roggervalf Sep 27, 2024
77b81ba
test: fix test cases
roggervalf Sep 28, 2024
47408f3
Merge branch 'v6' into refactor-paused
roggervalf Sep 28, 2024
67e9306
chore: add POC for migrations
manast Sep 30, 2024
fcac031
feat(queue-base): add migrations checker
manast Sep 30, 2024
4cd34a1
refactor(script): consider using rename when no wait jobs
roggervalf Oct 1, 2024
50b0313
fix: return client in get client in all paths
manast Oct 1, 2024
9f97ec0
Merge branch 'refactor-paused' of https://github.com/taskforcesh/bull…
manast Oct 1, 2024
1f7245a
chore: fix merge conflicts
roggervalf Oct 12, 2024
ee33a81
chore: fix merge conflicts
roggervalf Oct 16, 2024
92d9154
chore: add migrations in list
roggervalf Oct 16, 2024
268b9d4
test: fix test cases
roggervalf Oct 16, 2024
6e20049
docs(guide): update migrations for v6
roggervalf Oct 16, 2024
d0c4015
test: waitUntilReady in test cases
roggervalf Oct 16, 2024
fd3ba17
feat: add skipMigrationsExecution option
roggervalf Oct 17, 2024
82da69a
docs: update v6 clarification for skipMigrationsExecution
roggervalf Oct 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/gitbook/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@
- [Producers](guide/nestjs/producers.md)
- [Queue Events Listeners](guide/nestjs/queue-events-listeners.md)
- [Going to production](guide/going-to-production.md)
- [Migration to newer versions](guide/migration-to-newer-versions.md)
- [Migration to newer versions](guide/migrations/migration-to-newer-versions.md)
- [Version 6](guide/migrations/v6.md)
- [Troubleshooting](guide/troubleshooting.md)

## Patterns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,3 @@ Since BullMQ supports global pause, one possible strategy, if suitable for your
### Use new queues altogether

This drastic solution involves discontinuing use of older queues and creating new ones. You could rename older queues (e.g., "myQueueV2"), use a new Redis host, or maintain two versions of the service—one running an older BullMQ version with old queues, and a newer one with the latest BullMQ and a different set of queues. When the older version has no more jobs to process, it can be retired, leaving only the upgraded version.

27 changes: 27 additions & 0 deletions docs/gitbook/guide/migrations/v6.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
---
description: Tips and hints on how to migrate to v6.
---

# Migration to v6

Make sure to pass **skipMigrationsExecution** option in any of our instances as false in order to execute all necessary changes when coming from an older version

## Migration of deprecated paused key

If you have paused queues after upgrading to this version. These jobs will be moved to wait state when initializing any of our instances (Worker, Queue, QueueEvents or FlowProducer).

Paused key is not longer needed as this state is already represented by queue meta key. It also improve the process of pausing or resuming a queue as we don't need to rename any key.

## Remove legacy markers

When migrating from versions before v5.
It's recommended to do this process:

1. Pause your queues.
2. Upgrade to v6.
3. Instantiate any instance where migrations will be executed.
4. Resume your queues.

This way you will prevent that your workers pick a legacy marker.

A second option would be to do incremental upgrades.
13 changes: 12 additions & 1 deletion python/bullmq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ async def getJobLogs(self, job_id:str, start = 0, end = -1, asc = True):
"logs": result[0],
"count": result[1]
}

async def obliterate(self, force: bool = False):
"""
Completely destroys the queue and all of its contents irreversibly.
Expand Down Expand Up @@ -198,6 +198,17 @@ def removeDeprecatedPriorityKey(self):
"""
return self.client.delete(self.toKey("priority"))

async def migrateDeprecatedPausedKey(self, maxCount: int = 1000):
"""
Migrate deprecated paused key.

@param maxCount: Max quantity of jobs to be moved to wait per iteration.
"""
while True:
cursor = await self.scripts.migrateDeprecatedPausedKey(maxCount)
if cursor is None or cursor == 0 or cursor == "0":
break

async def getJobCountByTypes(self, *types):
result = await self.getJobCounts(*types)
sum = 0
Expand Down
45 changes: 26 additions & 19 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
self.redisConnection = redisConnection
self.redisClient = redisConnection.conn
self.commands = {
"addStandardJob": self.redisClient.register_script(self.getScript("addStandardJob-8.lua")),
"addStandardJob": self.redisClient.register_script(self.getScript("addStandardJob-7.lua")),
"addDelayedJob": self.redisClient.register_script(self.getScript("addDelayedJob-6.lua")),
"addParentJob": self.redisClient.register_script(self.getScript("addParentJob-4.lua")),
"addPrioritizedJob": self.redisClient.register_script(self.getScript("addPrioritizedJob-8.lua")),
"changePriority": self.redisClient.register_script(self.getScript("changePriority-7.lua")),
"changePriority": self.redisClient.register_script(self.getScript("changePriority-6.lua")),
"cleanJobsInSet": self.redisClient.register_script(self.getScript("cleanJobsInSet-3.lua")),
"extendLock": self.redisClient.register_script(self.getScript("extendLock-2.lua")),
"getCounts": self.redisClient.register_script(self.getScript("getCounts-1.lua")),
Expand All @@ -51,18 +51,19 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"getState": self.redisClient.register_script(self.getScript("getState-8.lua")),
"getStateV2": self.redisClient.register_script(self.getScript("getStateV2-8.lua")),
"isJobInList": self.redisClient.register_script(self.getScript("isJobInList-1.lua")),
"moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-9.lua")),
"moveToActive": self.redisClient.register_script(self.getScript("moveToActive-11.lua")),
"moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-8.lua")),
"moveToActive": self.redisClient.register_script(self.getScript("moveToActive-10.lua")),
"moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-8.lua")),
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-14.lua")),
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-13.lua")),
"moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-5.lua")),
"obliterate": self.redisClient.register_script(self.getScript("obliterate-2.lua")),
"pause": self.redisClient.register_script(self.getScript("pause-7.lua")),
"promote": self.redisClient.register_script(self.getScript("promote-9.lua")),
"promote": self.redisClient.register_script(self.getScript("promote-8.lua")),
"removeJob": self.redisClient.register_script(self.getScript("removeJob-2.lua")),
"reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-8.lua")),
"retryJob": self.redisClient.register_script(self.getScript("retryJob-11.lua")),
"moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-8.lua")),
"migrateDeprecatedPausedKey": self.redisClient.register_script(self.getScript("migrateDeprecatedPausedKey-2.lua")),
"reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-7.lua")),
"retryJob": self.redisClient.register_script(self.getScript("retryJob-10.lua")),
"moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-7.lua")),
"saveStacktrace": self.redisClient.register_script(self.getScript("saveStacktrace-1.lua")),
"updateData": self.redisClient.register_script(self.getScript("updateData-1.lua")),
"updateProgress": self.redisClient.register_script(self.getScript("updateProgress-3.lua")),
Expand Down Expand Up @@ -131,7 +132,7 @@ def addStandardJob(self, job: Job, timestamp: int, pipe = None):
"""
Add a standard job to the queue
"""
keys = self.getKeys(['wait', 'paused', 'meta', 'id',
keys = self.getKeys(['wait', 'meta', 'id',
'completed', 'active', 'events', 'marker'])
args = self.addJobArgs(job, None)
args.append(timestamp)
Expand Down Expand Up @@ -259,15 +260,15 @@ def saveStacktraceArgs(self, job_id: str, stacktrace: str, failedReason: str):
return (keys, args)

def retryJobArgs(self, job_id: str, lifo: bool, token: str, opts: dict = {}):
keys = self.getKeys(['active', 'wait', 'paused'])
keys = self.getKeys(['active', 'wait'])
keys.append(self.toKey(job_id))
keys.append(self.keys['meta'])
keys.append(self.keys['events'])
keys.append(self.keys['delayed'])
keys.append(self.keys['prioritized'])
keys.append(self.keys['pc'])
keys.append(self.keys['marker'])
keys.append(self.keys['stalled'])
keys.append(self.keys['marker'])

push_cmd = "RPUSH" if lifo else "LPUSH"

Expand Down Expand Up @@ -302,7 +303,6 @@ def promoteArgs(self, job_id: str):
keys = self.getKeys(['delayed', 'wait', 'paused', 'meta', 'prioritized', 'active', 'pc', 'events', 'marker'])
keys.append(self.toKey(job_id))
keys.append(self.keys['events'])
keys.append(self.keys['paused'])
keys.append(self.keys['meta'])

args = [self.keys[''], job_id]
Expand Down Expand Up @@ -374,7 +374,6 @@ async def isJobInList(self, list_key: str, job_id: str):

async def changePriority(self, job_id: str, priority:int = 0, lifo:bool = False):
keys = [self.keys['wait'],
self.keys['paused'],
self.keys['meta'],
self.keys['prioritized'],
self.keys['active'],
Expand Down Expand Up @@ -408,7 +407,6 @@ async def reprocessJob(self, job: Job, state: str):
keys.append(self.keys[state])
keys.append(self.keys['wait'])
keys.append(self.keys['meta'])
keys.append(self.keys['paused'])
keys.append(self.keys['active'])
keys.append(self.keys['marker'])

Expand Down Expand Up @@ -450,7 +448,7 @@ async def obliterate(self, count: int, force: bool = False):

def moveJobsToWaitArgs(self, state: str, count: int, timestamp: int) -> int:
keys = self.getKeys(
['', 'events', state, 'wait', 'paused', 'meta', 'active', 'marker'])
['', 'events', state, 'wait', 'meta', 'active', 'marker'])

args = [count or 1000, timestamp or round(time.time()*1000), state]
return (keys, args)
Expand All @@ -465,6 +463,15 @@ async def retryJobs(self, state: str, count: int, timestamp: int):
result = await self.commands["moveJobsToWait"](keys=keys, args=args)
return result

async def migrateDeprecatedPausedKey(self, maxCount: int):
keys = self.getKeys(
['paused', 'wait'])

args = [maxCount]

result = await self.commands["migrateDeprecatedPausedKey"](keys=keys, args=args)
return result

async def promoteJobs(self, count: int):
"""
Promote jobs in delayed state
Expand All @@ -483,7 +490,7 @@ async def moveToActive(self, token: str, opts: dict) -> list[Any]:
limiter = opts.get("limiter", None)

keys = self.getKeys(['wait', 'active', 'prioritized', 'events',
'stalled', 'limiter', 'delayed', 'paused', 'meta', 'pc', 'marker'])
'stalled', 'limiter', 'delayed', 'meta', 'pc', 'marker'])
packedOpts = msgpack.packb(
{"token": token, "lockDuration": lockDuration, "limiter": limiter}, use_bin_type=True)
args = [self.keys[''], timestamp, packedOpts]
Expand Down Expand Up @@ -516,7 +523,7 @@ def moveToFinishedArgs(self, job: Job, val: Any, propVal: str, shouldRemove, tar
metricsKey = self.toKey('metrics:' + target)

keys = self.getKeys(['wait', 'active', 'prioritized', 'events',
'stalled', 'limiter', 'delayed', 'paused', 'meta', 'pc', target])
'stalled', 'limiter', 'delayed', 'meta', 'pc', target])
keys.append(self.toKey(job.id))
keys.append(metricsKey)
keys.append(self.keys['marker'])
Expand Down Expand Up @@ -580,7 +587,7 @@ def extendLock(self, jobId: str, token: str, duration: int, client: Redis = None

def moveStalledJobsToWait(self, maxStalledCount: int, stalledInterval: int):
keys = self.getKeys(['stalled', 'wait', 'active', 'failed',
'stalled-check', 'meta', 'paused', 'marker', 'events'])
'stalled-check', 'meta', 'marker', 'events'])
args = [maxStalledCount, self.keys[''], round(
time.time() * 1000), stalledInterval]
return self.commands["moveStalledJobsToWait"](keys, args)
Expand Down
3 changes: 2 additions & 1 deletion src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ const logger = debuglog('bull');

const optsDecodeMap = {
de: 'deduplication',
ocf: 'onChildFailure',
fpof: 'failParentOnFailure',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't we need to keep this old mappings to not cause a data breaking change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should introduce a "migration" mechanism here. Something like migration steps for going between versions that are run if required in Lua scripts for atomicity. But we would need to keep a version number in the meta key, adding complexity.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can keep this mappings that will be only used to format options of jobs with these old values, but no migration is needed as we are evaluation old and new option in lua scripts, new jobs will use new option

idof: 'ignoreDependencyOnFailure',
kl: 'keepLogs',
ocf: 'onChildFailure',
rdof: 'removeDependencyOnFailure',
};

const optsEncodeMap = invertObject(optsDecodeMap);
Expand Down
116 changes: 116 additions & 0 deletions src/classes/migrations.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import { RedisClient } from '../interfaces';

export interface MigrationOptions {
prefix: string;
queueName: string;
}

export type MigrationFunction = (
client: RedisClient,
opts: MigrationOptions,
) => Promise<void>;

export const checkPendingMigrations = async (
client: RedisClient,
opts: MigrationOptions,
) => {
const migrationsKey = getRedisKeyFromOpts(opts, 'migrations');
const existingMigrations = await client.zrange(migrationsKey, 0, -1);
return migrations.some(
migration =>
!existingMigrations.includes(`${migration.version}-${migration.name}`),
);
};

export const migrations: {
name: string;
version: string;
migrate: MigrationFunction;
}[] = [
{
name: 'remove-legacy-markers',
version: '6.0.0',
migrate: async (client: RedisClient, opts: MigrationOptions) => {
const keys: (string | number)[] = [
getRedisKeyFromOpts(opts, 'wait'),
getRedisKeyFromOpts(opts, 'paused'),
getRedisKeyFromOpts(opts, 'meta'),
getRedisKeyFromOpts(opts, 'completed'),
getRedisKeyFromOpts(opts, 'failed'),
];
const args = [getRedisKeyFromOpts(opts, '')];

await (<any>client).removeLegacyMarkers(keys.concat(args));
},
},
{
name: 'migrate-paused-jobs',
version: '6.0.0',
migrate: async (client: RedisClient, opts: MigrationOptions) => {
let cursor = 0;
do {
const keys: (string | number)[] = [
getRedisKeyFromOpts(opts, 'paused'),
getRedisKeyFromOpts(opts, 'wait'),
];
const args = [1000];
cursor = await (<any>client).migrateDeprecatedPausedKey(
keys.concat(args),
);
} while (cursor);
},
},
];

/**
* Run Migrations.
*
* This method is used to run possibly existing migrations for the queue.
*
* Normally, if there are pending migrations, the Queue, Worker and QueueEvents instances
* will throw an error when they are instantiated. Use then this method to run the migrations
* before instantiating the instances.
*
* @param redisClient The Redis client instance
* @param opts The options for the migration
*
* @sa https://docs.bullmq.io/guide/migrations
*/
export const runMigrations = async (
redisClient: RedisClient,
opts: {
prefix?: string;
queueName: string;
},
) => {
const prefix = opts.prefix || 'bull';
const migrationsKey = getRedisKeyFromOpts({ prefix, ...opts }, 'migrations');

// The migrations key is a ZSET with the migration timestamp as the score
for (const migration of migrations) {
const migrationId = `${migration.version}-${migration.name}`;
const pendingMigration = !!(await redisClient.zscore(
migrationsKey,
migrationId,
));
if (pendingMigration) {
continue;
}
console.log(`[BULLMQ] Running migration ${migrationId}`);
try {
await migration.migrate(redisClient, {
prefix,
queueName: opts.queueName,
});
await redisClient.zadd(migrationsKey, Date.now(), migrationId);
} catch (err) {
console.error(`[BULLMQ] Migration ${migrationId} failed: ${err}`);
break;
}
console.log(`[BULLMQ] Migration ${migrationId} completed`);
}
};

function getRedisKeyFromOpts(opts: MigrationOptions, key: string): string {
return `${opts.prefix}:${opts.queueName}:${key}`;
}
Loading
Loading