diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index f8646381eb..3362adedf3 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -118,18 +118,71 @@ export class JobScheduler extends QueueBase { const multi = (await this.client).multi(); if (nextMillis) { if (override) { - this.scripts.addJobScheduler( - (multi) as RedisClient, - jobSchedulerId, - nextMillis, - JSON.stringify(typeof jobData === 'undefined' ? {} : jobData), - Job.optsAsJSON(opts), - { - name: jobName, - endDate: endDate ? new Date(endDate).getTime() : undefined, - tz: repeatOpts.tz, - pattern, - every, + return this.trace>( + SpanKind.PRODUCER, + 'add', + `${this.name}.${jobName}`, + async (span, srcPropagationMedatada) => { + let telemetry = opts.telemetry; + + if (srcPropagationMedatada) { + const omitContext = opts.telemetry?.omitContext; + const telemetryMetadata = + opts.telemetry?.metadata || + (!omitContext && srcPropagationMedatada); + + if (telemetryMetadata || omitContext) { + telemetry = { + metadata: telemetryMetadata, + omitContext, + }; + } + } + + const mergedOpts = this.getNextJobOpts( + nextMillis, + jobSchedulerId, + { + ...opts, + repeat: filteredRepeatOpts, + telemetry, + }, + iterationCount, + newOffset, + ); + + const jobId = await this.scripts.addJobScheduler( + jobSchedulerId, + nextMillis, + JSON.stringify(typeof jobData === 'undefined' ? {} : jobData), + Job.optsAsJSON(opts), + { + name: jobName, + endDate: endDate ? new Date(endDate).getTime() : undefined, + tz: repeatOpts.tz, + pattern, + every, + }, + Job.optsAsJSON(mergedOpts), + producerId, + ); + + const job = new this.Job( + this, + jobName, + jobData, + mergedOpts, + jobId, + ); + + job.id = jobId; + + span?.setAttributes({ + [TelemetryAttributes.JobSchedulerId]: jobSchedulerId, + [TelemetryAttributes.JobId]: job.id, + }); + + return job; }, ); } else { @@ -222,10 +275,44 @@ export class JobScheduler extends QueueBase { nextMillis, }); + const mergedOpts = this.getNextJobOpts( + nextMillis, + jobSchedulerId, + opts, + currentCount, + offset, + ); + + const job = new this.Job(this, name, data, mergedOpts, jobId); + job.addJob(client); + + if (producerId) { + const producerJobKey = this.toKey(producerId); + client.hset(producerJobKey, 'nrjid', job.id); + } + + return job; + } + + private getNextJobOpts( + nextMillis: number, + jobSchedulerId: string, + opts: JobsOptions, + currentCount: number, + offset?: number, + ): JobsOptions { + // + // Generate unique job id for this iteration. + // + const jobId = this.getSchedulerNextJobId({ + jobSchedulerId, + nextMillis, + }); + const now = Date.now(); const delay = nextMillis + offset - now; - const mergedOpts = { + const mergedOpts: JobsOptions = { ...opts, jobId, delay: delay < 0 ? 0 : delay, @@ -234,17 +321,16 @@ export class JobScheduler extends QueueBase { repeatJobKey: jobSchedulerId, }; - mergedOpts.repeat = { ...opts.repeat, count: currentCount }; - - const job = new this.Job(this, name, data, mergedOpts, jobId); - job.addJob(client); - - if (producerId) { - const producerJobKey = this.toKey(producerId); - client.hset(producerJobKey, 'nrjid', job.id); - } + mergedOpts.repeat = { + ...opts.repeat, + count: currentCount, + offset, + endDate: opts.repeat?.endDate + ? new Date(opts.repeat.endDate).getTime() + : undefined, + }; - return job; + return mergedOpts; } async removeJobScheduler(jobSchedulerId: string): Promise { diff --git a/src/classes/job.ts b/src/classes/job.ts index df39be9ba6..3293b619e1 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -1,4 +1,3 @@ -import { ChainableCommander } from 'ioredis'; import { debuglog } from 'util'; import { BackoffOptions, diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index a36359a946..e309e8cd78 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -32,6 +32,7 @@ import { FinishedPropValAttribute, MinimalQueue, RedisJobOptions, + JobsOptions, } from '../types'; import { ErrorCode } from '../enums'; import { @@ -313,18 +314,26 @@ export class Scripts { } async addJobScheduler( - client: RedisClient, jobSchedulerId: string, nextMillis: number, templateData: string, templateOpts: RedisJobOptions, opts: RepeatableOptions, + delayedJobOpts: JobsOptions, + // The job id of the job that produced this next iteration + producerId?: string, ): Promise { + const client = await this.queue.client; + const queueKeys = this.queue.keys; const keys: (string | number | Buffer)[] = [ - queueKeys.repeat, + queueKeys.marker, + queueKeys.meta, + queueKeys.id, queueKeys.delayed, + queueKeys.events, + queueKeys.repeat, ]; const args = [ @@ -333,8 +342,12 @@ export class Scripts { jobSchedulerId, templateData, pack(templateOpts), + pack(delayedJobOpts), + Date.now(), queueKeys[''], + producerId ? this.queue.toKey(producerId) : '', ]; + return this.execCommand(client, 'addJobScheduler', keys.concat(args)); } diff --git a/src/commands/addDelayedJob-6.lua b/src/commands/addDelayedJob-6.lua index de2c0f764e..f615fe20f6 100644 --- a/src/commands/addDelayedJob-6.lua +++ b/src/commands/addDelayedJob-6.lua @@ -26,7 +26,7 @@ [8] parent? {id, queueKey} [9] repeat job key [10] deduplication key - + ARGV[2] Json stringified job data ARGV[3] msgpacked options @@ -56,12 +56,10 @@ local deduplicationKey = args[10] local parentData -- Includes ---- @include "includes/addDelayMarkerIfNeeded" +--- @include "includes/addDelayedJob" --- @include "includes/deduplicateJob" ---- @include "includes/getDelayedScore" --- @include "includes/getOrSetMaxEvents" --- @include "includes/handleDuplicatedJob" ---- @include "includes/storeJob" if parentKey ~= nil then if rcall("EXISTS", parentKey) ~= 1 then return -5 end @@ -96,20 +94,8 @@ if deduplicationJobId then return deduplicationJobId end --- Store the job. -local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], - opts, timestamp, parentKey, parentData, - repeatJobKey) - -local score, delayedTimestamp = getDelayedScore(delayedKey, timestamp, tonumber(delay)) - -rcall("ZADD", delayedKey, score, jobId) -rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed", - "jobId", jobId, "delay", delayedTimestamp) - --- mark that a delayed job is available -local markerKey = KEYS[1] -addDelayMarkerIfNeeded(markerKey, delayedKey) +addDelayedJob(jobIdKey, jobId, delayedKey, eventsKey, args[3], ARGV[2], opts, timestamp, repeatJobKey, + maxEvents, KEYS[1], parentKey, parentData) -- Check if this job is a child of another job, if so add it to the parents dependencies if parentDependenciesKey ~= nil then diff --git a/src/commands/addJobScheduler-2.lua b/src/commands/addJobScheduler-6.lua similarity index 57% rename from src/commands/addJobScheduler-2.lua rename to src/commands/addJobScheduler-6.lua index c5a84a641c..f13e09514d 100644 --- a/src/commands/addJobScheduler-2.lua +++ b/src/commands/addJobScheduler-6.lua @@ -2,8 +2,12 @@ Adds a job scheduler, i.e. a job factory that creates jobs based on a given schedule (repeat options). Input: - KEYS[1] 'repeat' key - KEYS[2] 'delayed' key + KEYS[1] 'marker', + KEYS[2] 'meta' + KEYS[3] 'id' + KEYS[4] 'delayed' + KEYS[5] events stream key + KEYS[6] 'repeat' key ARGV[1] next milliseconds ARGV[2] msgpacked options @@ -14,27 +18,31 @@ [5] every? ARGV[3] jobs scheduler id ARGV[4] Json stringified template data - ARGV[5] mspacked template opts - ARGV[6] prefix key + ARGV[5] msgpacked template opts + ARGV[6] msgpacked delayed opts + ARGV[7] timestamp + ARGV[8] prefix key + ARGV[9] producer key Output: - repeatableKey - OK + next delayed job id - OK ]] local rcall = redis.call -local repeatKey = KEYS[1] -local delayedKey = KEYS[2] - +local repeatKey = KEYS[6] +local delayedKey = KEYS[4] +local timestamp = ARGV[7] local nextMillis = ARGV[1] local jobSchedulerId = ARGV[3] local templateOpts = cmsgpack.unpack(ARGV[5]) -local prefixKey = ARGV[6] +local prefixKey = ARGV[8] -- Includes +--- @include "includes/addDelayedJob" +--- @include "includes/getOrSetMaxEvents" --- @include "includes/removeJob" -local function storeRepeatableJob(schedulerId, repeatKey, nextMillis, rawOpts, templateData, templateOpts) +local function storeRepeatableJob(schedulerId, schedulerKey, repeatKey, nextMillis, opts, templateData, templateOpts) rcall("ZADD", repeatKey, nextMillis, schedulerId) - local opts = cmsgpack.unpack(rawOpts) local optionalValues = {} if opts['tz'] then @@ -68,17 +76,19 @@ local function storeRepeatableJob(schedulerId, repeatKey, nextMillis, rawOpts, t table.insert(optionalValues, templateData) end - rcall("HMSET", repeatKey .. ":" .. schedulerId, "name", opts['name'], + rcall("HMSET", schedulerKey, "name", opts['name'], unpack(optionalValues)) end +local schedulerKey = repeatKey .. ":" .. jobSchedulerId +local nextDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis +local nextDelayedJobKey = schedulerKey .. ":" .. nextMillis + -- If we are overriding a repeatable job we must delete the delayed job for -- the next iteration. local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId) if prevMillis ~= false then local delayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis - local nextDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis - local nextDelayedJobKey = repeatKey .. ":" .. jobSchedulerId .. ":" .. nextMillis if rcall("ZSCORE", delayedKey, delayedJobId) ~= false and (rcall("EXISTS", nextDelayedJobKey) ~= 1 @@ -88,4 +98,23 @@ if prevMillis ~= false then end end -return storeRepeatableJob(jobSchedulerId, repeatKey, nextMillis, ARGV[2], ARGV[4], templateOpts) +local schedulerOpts = cmsgpack.unpack(ARGV[2]) + +storeRepeatableJob(jobSchedulerId, schedulerKey, repeatKey, nextMillis, schedulerOpts, ARGV[4], templateOpts) + +local eventsKey = KEYS[5] +local metaKey = KEYS[2] +local maxEvents = getOrSetMaxEvents(metaKey) + +rcall("INCR", KEYS[3]) + +local delayedOpts = cmsgpack.unpack(ARGV[6]) + +addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerOpts['name'], ARGV[4], delayedOpts, + timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil) + +if ARGV[9] ~= "" then + rcall("HSET", ARGV[9], "nrjid", nextDelayedJobId) +end + +return nextDelayedJobId .. "" -- convert to string diff --git a/src/commands/includes/addDelayedJob.lua b/src/commands/includes/addDelayedJob.lua new file mode 100644 index 0000000000..cfc5391216 --- /dev/null +++ b/src/commands/includes/addDelayedJob.lua @@ -0,0 +1,25 @@ +--[[ + Add marker if needed when a job is available. +]] + +-- Includes +--- @include "addDelayMarkerIfNeeded" +--- @include "getDelayedScore" +--- @include "storeJob" + +local function addDelayedJob(jobIdKey, jobId, delayedKey, eventsKey, name, data, opts, timestamp, repeatJobKey, + maxEvents, markerKey, parentKey, parentData) + -- Store the job. + local delay, priority = storeJob(eventsKey, jobIdKey, jobId, name, data, + opts, timestamp, parentKey, parentData, repeatJobKey) + + local score, delayedTimestamp = getDelayedScore(delayedKey, timestamp, tonumber(delay)) + + rcall("ZADD", delayedKey, score, jobId) + rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed", + "jobId", jobId, "delay", delayedTimestamp) + + -- mark that a delayed job is available + addDelayMarkerIfNeeded(markerKey, delayedKey) +end + \ No newline at end of file diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index 160d9c8a7a..2d63215fb3 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -768,8 +768,8 @@ describe('Job Scheduler', function () { worker.on('completed', async job => { try { if (prev) { - expect(prev.timestamp).to.be.lt(job.timestamp); - expect(job.timestamp - prev.timestamp).to.be.gte(2000); + expect(prev.timestamp).to.be.lt(job.processedOn!); + expect(job.processedOn! - prev.timestamp).to.be.gte(2000); } prev = job; counter++; @@ -791,8 +791,8 @@ describe('Job Scheduler', function () { worker.on('completed', async job => { try { if (prev2) { - expect(prev2.timestamp).to.be.lt(job.timestamp); - expect(job.timestamp - prev2.timestamp).to.be.gte(2000); + expect(prev2.timestamp).to.be.lt(job.processedOn!); + expect(job.processedOn! - prev2.timestamp).to.be.gte(2000); } prev2 = job; counter2++; @@ -838,7 +838,7 @@ describe('Job Scheduler', function () { async () => { this.clock.tick(nextTick); }, - { connection, prefix }, + { autorun: false, connection, prefix }, ); let prev: Job; @@ -876,6 +876,8 @@ describe('Job Scheduler', function () { const delayedCountBefore = await queue.getDelayedCount(); expect(delayedCountBefore).to.be.eq(1); + worker.run(); + await completing; const waitingCount = await queue.getWaitingCount(); @@ -1693,6 +1695,7 @@ describe('Job Scheduler', function () { resolve(); }, { + autorun: false, connection, prefix, }, @@ -1712,6 +1715,8 @@ describe('Job Scheduler', function () { const delayedCountBeforeFailing = await queue.getDelayedCount(); expect(delayedCountBeforeFailing).to.be.equal(0); + worker.run(); + await failing; const failedCount = await queue.getFailedCount(); @@ -1771,6 +1776,7 @@ describe('Job Scheduler', function () { } }, { + autorun: false, connection, prefix, }, @@ -1791,6 +1797,8 @@ describe('Job Scheduler', function () { this.clock.tick(177); + worker.run(); + await failing; this.clock.tick(177);