Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(job-scheduler): add delayed job and scheduler in same script #2993

Merged
merged 9 commits into from
Jan 6, 2025
132 changes: 109 additions & 23 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,71 @@ export class JobScheduler extends QueueBase {
const multi = (await this.client).multi();
if (nextMillis) {
if (override) {
this.scripts.addJobScheduler(
(<unknown>multi) as RedisClient,
jobSchedulerId,
nextMillis,
JSON.stringify(typeof jobData === 'undefined' ? {} : jobData),
Job.optsAsJSON(opts),
{
name: jobName,
endDate: endDate ? new Date(endDate).getTime() : undefined,
tz: repeatOpts.tz,
pattern,
every,
return this.trace<Job<T, R, N>>(
Copy link
Collaborator Author

@roggervalf roggervalf Jan 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copied for now, when update job scheduler get refactored, we will reuse same wrapper

SpanKind.PRODUCER,
'add',
`${this.name}.${jobName}`,
async (span, srcPropagationMedatada) => {
let telemetry = opts.telemetry;

if (srcPropagationMedatada) {
const omitContext = opts.telemetry?.omitContext;
const telemetryMetadata =
opts.telemetry?.metadata ||
(!omitContext && srcPropagationMedatada);

if (telemetryMetadata || omitContext) {
telemetry = {
metadata: telemetryMetadata,
omitContext,
};
}
}

const mergedOpts = this.getNextJobOpts(
nextMillis,
jobSchedulerId,
{
...opts,
repeat: filteredRepeatOpts,
telemetry,
},
iterationCount,
newOffset,
);

const jobId = await this.scripts.addJobScheduler(
jobSchedulerId,
nextMillis,
JSON.stringify(typeof jobData === 'undefined' ? {} : jobData),
Job.optsAsJSON(opts),
{
name: jobName,
endDate: endDate ? new Date(endDate).getTime() : undefined,
tz: repeatOpts.tz,
pattern,
every,
},
Job.optsAsJSON(mergedOpts),
producerId,
);

const job = new this.Job<T, R, N>(
this,
jobName,
jobData,
mergedOpts,
jobId,
);

job.id = jobId;

span?.setAttributes({
[TelemetryAttributes.JobSchedulerId]: jobSchedulerId,
[TelemetryAttributes.JobId]: job.id,
});

return job;
},
);
} else {
Expand Down Expand Up @@ -222,10 +275,44 @@ export class JobScheduler extends QueueBase {
nextMillis,
});

const mergedOpts = this.getNextJobOpts(
nextMillis,
jobSchedulerId,
opts,
currentCount,
offset,
);

const job = new this.Job<T, R, N>(this, name, data, mergedOpts, jobId);
job.addJob(client);

if (producerId) {
const producerJobKey = this.toKey(producerId);
client.hset(producerJobKey, 'nrjid', job.id);
}

return job;
}

private getNextJobOpts(
nextMillis: number,
jobSchedulerId: string,
opts: JobsOptions,
currentCount: number,
offset?: number,
): JobsOptions {
//
// Generate unique job id for this iteration.
//
const jobId = this.getSchedulerNextJobId({
jobSchedulerId,
nextMillis,
});

const now = Date.now();
const delay = nextMillis + offset - now;

const mergedOpts = {
const mergedOpts: JobsOptions = {
...opts,
jobId,
delay: delay < 0 ? 0 : delay,
Expand All @@ -234,17 +321,16 @@ export class JobScheduler extends QueueBase {
repeatJobKey: jobSchedulerId,
};

mergedOpts.repeat = { ...opts.repeat, count: currentCount };

const job = new this.Job<T, R, N>(this, name, data, mergedOpts, jobId);
job.addJob(client);

if (producerId) {
const producerJobKey = this.toKey(producerId);
client.hset(producerJobKey, 'nrjid', job.id);
}
mergedOpts.repeat = {
...opts.repeat,
count: currentCount,
offset,
endDate: opts.repeat?.endDate
? new Date(opts.repeat.endDate).getTime()
: undefined,
};

return job;
return mergedOpts;
}

async removeJobScheduler(jobSchedulerId: string): Promise<number> {
Expand Down
1 change: 0 additions & 1 deletion src/classes/job.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { ChainableCommander } from 'ioredis';
import { debuglog } from 'util';
import {
BackoffOptions,
Expand Down
17 changes: 15 additions & 2 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
FinishedPropValAttribute,
MinimalQueue,
RedisJobOptions,
JobsOptions,
} from '../types';
import { ErrorCode } from '../enums';
import {
Expand Down Expand Up @@ -313,18 +314,26 @@ export class Scripts {
}

async addJobScheduler(
client: RedisClient,
jobSchedulerId: string,
nextMillis: number,
templateData: string,
templateOpts: RedisJobOptions,
opts: RepeatableOptions,
delayedJobOpts: JobsOptions,
// The job id of the job that produced this next iteration
producerId?: string,
): Promise<string> {
const client = await this.queue.client;

const queueKeys = this.queue.keys;

const keys: (string | number | Buffer)[] = [
queueKeys.repeat,
queueKeys.marker,
queueKeys.meta,
queueKeys.id,
queueKeys.delayed,
queueKeys.events,
queueKeys.repeat,
];

const args = [
Expand All @@ -333,8 +342,12 @@ export class Scripts {
jobSchedulerId,
templateData,
pack(templateOpts),
pack(delayedJobOpts),
Date.now(),
queueKeys[''],
producerId ? this.queue.toKey(producerId) : '',
];

return this.execCommand(client, 'addJobScheduler', keys.concat(args));
}

Expand Down
22 changes: 4 additions & 18 deletions src/commands/addDelayedJob-6.lua
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
[8] parent? {id, queueKey}
[9] repeat job key
[10] deduplication key

ARGV[2] Json stringified job data
ARGV[3] msgpacked options

Expand Down Expand Up @@ -56,12 +56,10 @@ local deduplicationKey = args[10]
local parentData

-- Includes
--- @include "includes/addDelayMarkerIfNeeded"
--- @include "includes/addDelayedJob"
--- @include "includes/deduplicateJob"
--- @include "includes/getDelayedScore"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/handleDuplicatedJob"
--- @include "includes/storeJob"

if parentKey ~= nil then
if rcall("EXISTS", parentKey) ~= 1 then return -5 end
Expand Down Expand Up @@ -96,20 +94,8 @@ if deduplicationJobId then
return deduplicationJobId
end

-- Store the job.
local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2],
opts, timestamp, parentKey, parentData,
repeatJobKey)

local score, delayedTimestamp = getDelayedScore(delayedKey, timestamp, tonumber(delay))

rcall("ZADD", delayedKey, score, jobId)
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed",
"jobId", jobId, "delay", delayedTimestamp)

-- mark that a delayed job is available
local markerKey = KEYS[1]
addDelayMarkerIfNeeded(markerKey, delayedKey)
addDelayedJob(jobIdKey, jobId, delayedKey, eventsKey, args[3], ARGV[2], opts, timestamp, repeatJobKey,
maxEvents, KEYS[1], parentKey, parentData)

-- Check if this job is a child of another job, if so add it to the parents dependencies
if parentDependenciesKey ~= nil then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
Adds a job scheduler, i.e. a job factory that creates jobs based on a given schedule (repeat options).

Input:
KEYS[1] 'repeat' key
KEYS[2] 'delayed' key
KEYS[1] 'marker',
KEYS[2] 'meta'
KEYS[3] 'id'
KEYS[4] 'delayed'
KEYS[5] events stream key
KEYS[6] 'repeat' key

ARGV[1] next milliseconds
ARGV[2] msgpacked options
Expand All @@ -14,27 +18,31 @@
[5] every?
ARGV[3] jobs scheduler id
ARGV[4] Json stringified template data
ARGV[5] mspacked template opts
ARGV[6] prefix key
ARGV[5] msgpacked template opts
ARGV[6] msgpacked delayed opts
ARGV[7] timestamp
ARGV[8] prefix key
ARGV[9] producer key

Output:
repeatableKey - OK
next delayed job id - OK
]]
local rcall = redis.call
local repeatKey = KEYS[1]
local delayedKey = KEYS[2]

local repeatKey = KEYS[6]
local delayedKey = KEYS[4]
local timestamp = ARGV[7]
local nextMillis = ARGV[1]
local jobSchedulerId = ARGV[3]
local templateOpts = cmsgpack.unpack(ARGV[5])
local prefixKey = ARGV[6]
local prefixKey = ARGV[8]

-- Includes
--- @include "includes/addDelayedJob"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/removeJob"

local function storeRepeatableJob(schedulerId, repeatKey, nextMillis, rawOpts, templateData, templateOpts)
local function storeRepeatableJob(schedulerId, schedulerKey, repeatKey, nextMillis, opts, templateData, templateOpts)
rcall("ZADD", repeatKey, nextMillis, schedulerId)
local opts = cmsgpack.unpack(rawOpts)

local optionalValues = {}
if opts['tz'] then
Expand Down Expand Up @@ -68,17 +76,19 @@ local function storeRepeatableJob(schedulerId, repeatKey, nextMillis, rawOpts, t
table.insert(optionalValues, templateData)
end

rcall("HMSET", repeatKey .. ":" .. schedulerId, "name", opts['name'],
rcall("HMSET", schedulerKey, "name", opts['name'],
unpack(optionalValues))
end

local schedulerKey = repeatKey .. ":" .. jobSchedulerId
local nextDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis
local nextDelayedJobKey = schedulerKey .. ":" .. nextMillis

-- If we are overriding a repeatable job we must delete the delayed job for
-- the next iteration.
local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId)
if prevMillis ~= false then
local delayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis
local nextDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis
local nextDelayedJobKey = repeatKey .. ":" .. jobSchedulerId .. ":" .. nextMillis

if rcall("ZSCORE", delayedKey, delayedJobId) ~= false
and (rcall("EXISTS", nextDelayedJobKey) ~= 1
Expand All @@ -88,4 +98,23 @@ if prevMillis ~= false then
end
end

return storeRepeatableJob(jobSchedulerId, repeatKey, nextMillis, ARGV[2], ARGV[4], templateOpts)
local schedulerOpts = cmsgpack.unpack(ARGV[2])

storeRepeatableJob(jobSchedulerId, schedulerKey, repeatKey, nextMillis, schedulerOpts, ARGV[4], templateOpts)

local eventsKey = KEYS[5]
local metaKey = KEYS[2]
local maxEvents = getOrSetMaxEvents(metaKey)

rcall("INCR", KEYS[3])

local delayedOpts = cmsgpack.unpack(ARGV[6])

addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerOpts['name'], ARGV[4], delayedOpts,
timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil)

if ARGV[9] ~= "" then
rcall("HSET", ARGV[9], "nrjid", nextDelayedJobId)
end

return nextDelayedJobId .. "" -- convert to string
25 changes: 25 additions & 0 deletions src/commands/includes/addDelayedJob.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
--[[
Add marker if needed when a job is available.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment here seems to be wrong.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in #2997

]]

-- Includes
--- @include "addDelayMarkerIfNeeded"
--- @include "getDelayedScore"
--- @include "storeJob"

local function addDelayedJob(jobIdKey, jobId, delayedKey, eventsKey, name, data, opts, timestamp, repeatJobKey,
maxEvents, markerKey, parentKey, parentData)
-- Store the job.
local delay, priority = storeJob(eventsKey, jobIdKey, jobId, name, data,
opts, timestamp, parentKey, parentData, repeatJobKey)

local score, delayedTimestamp = getDelayedScore(delayedKey, timestamp, tonumber(delay))

rcall("ZADD", delayedKey, score, jobId)
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed",
"jobId", jobId, "delay", delayedTimestamp)

-- mark that a delayed job is available
addDelayMarkerIfNeeded(markerKey, delayedKey)
end

Loading
Loading