Skip to content

Commit

Permalink
perf(job-scheduler): add delayed job and scheduler in same script (#2993
Browse files Browse the repository at this point in the history
)
  • Loading branch information
roggervalf authored Jan 6, 2025
1 parent d1fb113 commit 95718e8
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 64 deletions.
132 changes: 109 additions & 23 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,71 @@ export class JobScheduler extends QueueBase {
const multi = (await this.client).multi();
if (nextMillis) {
if (override) {
this.scripts.addJobScheduler(
(<unknown>multi) as RedisClient,
jobSchedulerId,
nextMillis,
JSON.stringify(typeof jobData === 'undefined' ? {} : jobData),
Job.optsAsJSON(opts),
{
name: jobName,
endDate: endDate ? new Date(endDate).getTime() : undefined,
tz: repeatOpts.tz,
pattern,
every,
return this.trace<Job<T, R, N>>(
SpanKind.PRODUCER,
'add',
`${this.name}.${jobName}`,
async (span, srcPropagationMedatada) => {
let telemetry = opts.telemetry;

if (srcPropagationMedatada) {
const omitContext = opts.telemetry?.omitContext;
const telemetryMetadata =
opts.telemetry?.metadata ||
(!omitContext && srcPropagationMedatada);

if (telemetryMetadata || omitContext) {
telemetry = {
metadata: telemetryMetadata,
omitContext,
};
}
}

const mergedOpts = this.getNextJobOpts(
nextMillis,
jobSchedulerId,
{
...opts,
repeat: filteredRepeatOpts,
telemetry,
},
iterationCount,
newOffset,
);

const jobId = await this.scripts.addJobScheduler(
jobSchedulerId,
nextMillis,
JSON.stringify(typeof jobData === 'undefined' ? {} : jobData),
Job.optsAsJSON(opts),
{
name: jobName,
endDate: endDate ? new Date(endDate).getTime() : undefined,
tz: repeatOpts.tz,
pattern,
every,
},
Job.optsAsJSON(mergedOpts),
producerId,
);

const job = new this.Job<T, R, N>(
this,
jobName,
jobData,
mergedOpts,
jobId,
);

job.id = jobId;

span?.setAttributes({
[TelemetryAttributes.JobSchedulerId]: jobSchedulerId,
[TelemetryAttributes.JobId]: job.id,
});

return job;
},
);
} else {
Expand Down Expand Up @@ -222,10 +275,44 @@ export class JobScheduler extends QueueBase {
nextMillis,
});

const mergedOpts = this.getNextJobOpts(
nextMillis,
jobSchedulerId,
opts,
currentCount,
offset,
);

const job = new this.Job<T, R, N>(this, name, data, mergedOpts, jobId);
job.addJob(client);

if (producerId) {
const producerJobKey = this.toKey(producerId);
client.hset(producerJobKey, 'nrjid', job.id);
}

return job;
}

private getNextJobOpts(
nextMillis: number,
jobSchedulerId: string,
opts: JobsOptions,
currentCount: number,
offset?: number,
): JobsOptions {
//
// Generate unique job id for this iteration.
//
const jobId = this.getSchedulerNextJobId({
jobSchedulerId,
nextMillis,
});

const now = Date.now();
const delay = nextMillis + offset - now;

const mergedOpts = {
const mergedOpts: JobsOptions = {
...opts,
jobId,
delay: delay < 0 ? 0 : delay,
Expand All @@ -234,17 +321,16 @@ export class JobScheduler extends QueueBase {
repeatJobKey: jobSchedulerId,
};

mergedOpts.repeat = { ...opts.repeat, count: currentCount };

const job = new this.Job<T, R, N>(this, name, data, mergedOpts, jobId);
job.addJob(client);

if (producerId) {
const producerJobKey = this.toKey(producerId);
client.hset(producerJobKey, 'nrjid', job.id);
}
mergedOpts.repeat = {
...opts.repeat,
count: currentCount,
offset,
endDate: opts.repeat?.endDate
? new Date(opts.repeat.endDate).getTime()
: undefined,
};

return job;
return mergedOpts;
}

async removeJobScheduler(jobSchedulerId: string): Promise<number> {
Expand Down
1 change: 0 additions & 1 deletion src/classes/job.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { ChainableCommander } from 'ioredis';
import { debuglog } from 'util';
import {
BackoffOptions,
Expand Down
17 changes: 15 additions & 2 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
FinishedPropValAttribute,
MinimalQueue,
RedisJobOptions,
JobsOptions,
} from '../types';
import { ErrorCode } from '../enums';
import {
Expand Down Expand Up @@ -313,18 +314,26 @@ export class Scripts {
}

async addJobScheduler(
client: RedisClient,
jobSchedulerId: string,
nextMillis: number,
templateData: string,
templateOpts: RedisJobOptions,
opts: RepeatableOptions,
delayedJobOpts: JobsOptions,
// The job id of the job that produced this next iteration
producerId?: string,
): Promise<string> {
const client = await this.queue.client;

const queueKeys = this.queue.keys;

const keys: (string | number | Buffer)[] = [
queueKeys.repeat,
queueKeys.marker,
queueKeys.meta,
queueKeys.id,
queueKeys.delayed,
queueKeys.events,
queueKeys.repeat,
];

const args = [
Expand All @@ -333,8 +342,12 @@ export class Scripts {
jobSchedulerId,
templateData,
pack(templateOpts),
pack(delayedJobOpts),
Date.now(),
queueKeys[''],
producerId ? this.queue.toKey(producerId) : '',
];

return this.execCommand(client, 'addJobScheduler', keys.concat(args));
}

Expand Down
22 changes: 4 additions & 18 deletions src/commands/addDelayedJob-6.lua
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
[8] parent? {id, queueKey}
[9] repeat job key
[10] deduplication key
ARGV[2] Json stringified job data
ARGV[3] msgpacked options
Expand Down Expand Up @@ -56,12 +56,10 @@ local deduplicationKey = args[10]
local parentData

-- Includes
--- @include "includes/addDelayMarkerIfNeeded"
--- @include "includes/addDelayedJob"
--- @include "includes/deduplicateJob"
--- @include "includes/getDelayedScore"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/handleDuplicatedJob"
--- @include "includes/storeJob"

if parentKey ~= nil then
if rcall("EXISTS", parentKey) ~= 1 then return -5 end
Expand Down Expand Up @@ -96,20 +94,8 @@ if deduplicationJobId then
return deduplicationJobId
end

-- Store the job.
local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2],
opts, timestamp, parentKey, parentData,
repeatJobKey)

local score, delayedTimestamp = getDelayedScore(delayedKey, timestamp, tonumber(delay))

rcall("ZADD", delayedKey, score, jobId)
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed",
"jobId", jobId, "delay", delayedTimestamp)

-- mark that a delayed job is available
local markerKey = KEYS[1]
addDelayMarkerIfNeeded(markerKey, delayedKey)
addDelayedJob(jobIdKey, jobId, delayedKey, eventsKey, args[3], ARGV[2], opts, timestamp, repeatJobKey,
maxEvents, KEYS[1], parentKey, parentData)

-- Check if this job is a child of another job, if so add it to the parents dependencies
if parentDependenciesKey ~= nil then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
Adds a job scheduler, i.e. a job factory that creates jobs based on a given schedule (repeat options).
Input:
KEYS[1] 'repeat' key
KEYS[2] 'delayed' key
KEYS[1] 'marker',
KEYS[2] 'meta'
KEYS[3] 'id'
KEYS[4] 'delayed'
KEYS[5] events stream key
KEYS[6] 'repeat' key
ARGV[1] next milliseconds
ARGV[2] msgpacked options
Expand All @@ -14,27 +18,31 @@
[5] every?
ARGV[3] jobs scheduler id
ARGV[4] Json stringified template data
ARGV[5] mspacked template opts
ARGV[6] prefix key
ARGV[5] msgpacked template opts
ARGV[6] msgpacked delayed opts
ARGV[7] timestamp
ARGV[8] prefix key
ARGV[9] producer key
Output:
repeatableKey - OK
next delayed job id - OK
]]
local rcall = redis.call
local repeatKey = KEYS[1]
local delayedKey = KEYS[2]

local repeatKey = KEYS[6]
local delayedKey = KEYS[4]
local timestamp = ARGV[7]
local nextMillis = ARGV[1]
local jobSchedulerId = ARGV[3]
local templateOpts = cmsgpack.unpack(ARGV[5])
local prefixKey = ARGV[6]
local prefixKey = ARGV[8]

-- Includes
--- @include "includes/addDelayedJob"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/removeJob"

local function storeRepeatableJob(schedulerId, repeatKey, nextMillis, rawOpts, templateData, templateOpts)
local function storeRepeatableJob(schedulerId, schedulerKey, repeatKey, nextMillis, opts, templateData, templateOpts)
rcall("ZADD", repeatKey, nextMillis, schedulerId)
local opts = cmsgpack.unpack(rawOpts)

local optionalValues = {}
if opts['tz'] then
Expand Down Expand Up @@ -68,17 +76,19 @@ local function storeRepeatableJob(schedulerId, repeatKey, nextMillis, rawOpts, t
table.insert(optionalValues, templateData)
end

rcall("HMSET", repeatKey .. ":" .. schedulerId, "name", opts['name'],
rcall("HMSET", schedulerKey, "name", opts['name'],
unpack(optionalValues))
end

local schedulerKey = repeatKey .. ":" .. jobSchedulerId
local nextDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis
local nextDelayedJobKey = schedulerKey .. ":" .. nextMillis

-- If we are overriding a repeatable job we must delete the delayed job for
-- the next iteration.
local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId)
if prevMillis ~= false then
local delayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis
local nextDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis
local nextDelayedJobKey = repeatKey .. ":" .. jobSchedulerId .. ":" .. nextMillis

if rcall("ZSCORE", delayedKey, delayedJobId) ~= false
and (rcall("EXISTS", nextDelayedJobKey) ~= 1
Expand All @@ -88,4 +98,23 @@ if prevMillis ~= false then
end
end

return storeRepeatableJob(jobSchedulerId, repeatKey, nextMillis, ARGV[2], ARGV[4], templateOpts)
local schedulerOpts = cmsgpack.unpack(ARGV[2])

storeRepeatableJob(jobSchedulerId, schedulerKey, repeatKey, nextMillis, schedulerOpts, ARGV[4], templateOpts)

local eventsKey = KEYS[5]
local metaKey = KEYS[2]
local maxEvents = getOrSetMaxEvents(metaKey)

rcall("INCR", KEYS[3])

local delayedOpts = cmsgpack.unpack(ARGV[6])

addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerOpts['name'], ARGV[4], delayedOpts,
timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil)

if ARGV[9] ~= "" then
rcall("HSET", ARGV[9], "nrjid", nextDelayedJobId)
end

return nextDelayedJobId .. "" -- convert to string
25 changes: 25 additions & 0 deletions src/commands/includes/addDelayedJob.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
--[[
Add marker if needed when a job is available.
]]

-- Includes
--- @include "addDelayMarkerIfNeeded"
--- @include "getDelayedScore"
--- @include "storeJob"

local function addDelayedJob(jobIdKey, jobId, delayedKey, eventsKey, name, data, opts, timestamp, repeatJobKey,
maxEvents, markerKey, parentKey, parentData)
-- Store the job.
local delay, priority = storeJob(eventsKey, jobIdKey, jobId, name, data,
opts, timestamp, parentKey, parentData, repeatJobKey)

local score, delayedTimestamp = getDelayedScore(delayedKey, timestamp, tonumber(delay))

rcall("ZADD", delayedKey, score, jobId)
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed",
"jobId", jobId, "delay", delayedTimestamp)

-- mark that a delayed job is available
addDelayMarkerIfNeeded(markerKey, delayedKey)
end

Loading

0 comments on commit 95718e8

Please sign in to comment.