From 8ae894ce532849e9dae98981abe198240c3e7fad Mon Sep 17 00:00:00 2001 From: David Worms Date: Tue, 16 Mar 2021 14:56:55 +0100 Subject: [PATCH] feat(core): new end option in scheduler --- packages/core/lib/actions/fs/base/stat.js | 3 -- packages/core/lib/actions/fs/copy.js | 2 +- packages/core/lib/actions/fs/glob.js | 29 ++++++++++++++-- packages/core/lib/schedulers/native.js | 36 ++++++++++++++++---- packages/core/lib/session.js | 21 ++++++++---- packages/core/src/schedulers/native.coffee | 17 ++++++--- packages/core/src/session.coffee | 7 ++-- packages/core/test/scheduler/creation.coffee | 2 +- 8 files changed, 87 insertions(+), 30 deletions(-) diff --git a/packages/core/lib/actions/fs/base/stat.js b/packages/core/lib/actions/fs/base/stat.js index fae7545ab..9dfb612e9 100644 --- a/packages/core/lib/actions/fs/base/stat.js +++ b/packages/core/lib/actions/fs/base/stat.js @@ -131,9 +131,6 @@ if [ -d /private ]; then else stat ${dereference} -c '%f|%u|%g|%s|%X|%Y' ${config.target} # Linux fi`, - // sudo: config.sudo - // bash: config.bash - // arch_chroot: config.arch_chroot trim: true }))); [rawmodehex, uid, gid, size, atime, mtime] = stdout.split('|'); diff --git a/packages/core/lib/actions/fs/copy.js b/packages/core/lib/actions/fs/copy.js index 26099433b..06512095f 100644 --- a/packages/core/lib/actions/fs/copy.js +++ b/packages/core/lib/actions/fs/copy.js @@ -195,7 +195,7 @@ handler = async function({ }))); for (i = 0, len = files.length; i < len; i++) { source = files[i]; - (async(source) => { + await (async(source) => { var gid, mode, stats, target, uid; target = path.resolve(config.target, path.relative(config.source, source)); ({stats} = (await this.fs.base.stat({ diff --git a/packages/core/lib/actions/fs/glob.js b/packages/core/lib/actions/fs/glob.js index 334c54957..e8170ffd4 100644 --- a/packages/core/lib/actions/fs/glob.js +++ b/packages/core/lib/actions/fs/glob.js @@ -8,10 +8,33 @@ // The action use the POXIX `find` command to fetch all files and filter the // paths locally using the Minimatch package. -// ## Callback information +// ## Output -// * `hash` -// The hash of the file or directory identified by the "target" option. +// * `files` +// List of files matching the globing expression. + +// ## Examples + +// Short notation: + +// ```js +// const {files} = await nikita.fs.glob(`${process.cwd()}/*`) +// for(const file of files){ +// console.info(`Found: ${file}`) +// } +// ``` + +// Extended notation: + +// ```js +// const {files} = await nikita.fs.glob({ +// dot: true, +// target: `${process.cwd()}/*` +// }) +// for(const file of files){ +// console.info(`Found: ${file}`) +// } +// ``` // ## Hook var Minimatch, getprefix, handler, on_action, schema, utils; diff --git a/packages/core/lib/schedulers/native.js b/packages/core/lib/schedulers/native.js index 2e1861082..5b9dcd1ea 100644 --- a/packages/core/lib/schedulers/native.js +++ b/packages/core/lib/schedulers/native.js @@ -17,7 +17,7 @@ module.exports = function(tasks, options = {}) { // It is not possible to register managed handler once the scheduler has // resolved. if (options.managed == null) { - options.managed = false; + options.managed = !!tasks; } if (options.parallel == null) { options.parallel = 1; @@ -28,6 +28,9 @@ module.exports = function(tasks, options = {}) { if (options.parallel === true) { options.parallel = -1; } + if (options.end == null) { + options.end = true; + } state = { stack: [], pause: options.pause != null ? !!options.pause : false, @@ -56,6 +59,10 @@ module.exports = function(tasks, options = {}) { promise = new Promise(function(resolve, reject) { scheduler = { state: state, + end: function(end) { + options.end = end; + return scheduler.pump(); + }, pump: function() { var task; if (state.pause) { @@ -65,22 +72,30 @@ module.exports = function(tasks, options = {}) { return; } if (!state.managed.resolved) { - if (state.error) { + if (state.managed.error) { state.managed.resolved = true; // Any pending managed task is stripped out after an error clear_managed_tasks(); scheduler.pump(); - return reject(state.error); - } else if (count_pending_tasks() + state.managed.running === 0) { + return reject(state.managed.error); + } else if (options.managed && options.end && count_pending_tasks() + state.managed.running === 0) { state.managed.resolved = true; scheduler.pump(); return resolve(state.output); + } else if (!options.managed && options.end && state.stack.length === 0) { + state.managed.resolved = true; + return resolve(); } } if (!state.stack.length) { return; } task = state.stack.shift(); + if (options.strict === true && !task.managed && state.error) { + task.reject(state.error); + setImmediate(scheduler.pump); + return; + } state.running++; state.pending--; if (task.managed) { @@ -114,9 +129,12 @@ module.exports = function(tasks, options = {}) { state.rejected++; state.resolved++; task.reject.call(null, error); - if (task.managed) { + if (options.strict) { state.error = error; } + if (task.managed) { + state.managed.error = error; + } return setImmediate(scheduler.pump); } }); @@ -136,8 +154,10 @@ module.exports = function(tasks, options = {}) { var task; if (!isArray) { state.pending++; + if (tasks.managed == null) { + tasks.managed = options.managed; + } state.stack.unshift({ - ...options, ...tasks, resolve: resolve, reject: reject @@ -185,8 +205,10 @@ module.exports = function(tasks, options = {}) { var task; if (!isArray) { state.pending++; + if (tasks.managed == null) { + tasks.managed = options.managed; + } state.stack.push({ - ...options, ...tasks, resolve: resolve, reject: reject diff --git a/packages/core/lib/session.js b/packages/core/lib/session.js index 262a4f5e5..22e53935f 100644 --- a/packages/core/lib/session.js +++ b/packages/core/lib/session.js @@ -105,6 +105,9 @@ session = function(args, options = {}) { }); action.parent = options.parent; action.plugins = plugins; + if (action.scheduler == null) { + action.scheduler = {}; + } if ((base = action.metadata).namespace == null) { base.namespace = []; } @@ -124,8 +127,12 @@ session = function(args, options = {}) { }); // Local scheduler to execute children and be notified on finish schedulers = { - in: schedule(), + in: schedule(null, { + ...action.scheduler, + end: false + }), out: schedule(null, { + ...action.scheduler, pause: true }) }; @@ -168,13 +175,13 @@ session = function(args, options = {}) { }); // Ensure child actions are executed pump = function() { - var child; - while (child = schedulers.out.state.stack.shift()) { - // Now that the handler has been executed, - // import all the actions registered outside of it - action.scheduler.state.stack.push(child); + var task; + // Now that the handler has been executed, + // import all the actions registered outside of it + while (task = schedulers.out.state.stack.shift()) { + action.scheduler.state.stack.push(task); } - return action.scheduler.pump(); + return action.scheduler.end(true); }; output.then(pump, pump); // Make sure the promise is resolved after the scheduler and its children diff --git a/packages/core/src/schedulers/native.coffee b/packages/core/src/schedulers/native.coffee index 49f3b2702..5380ad18b 100644 --- a/packages/core/src/schedulers/native.coffee +++ b/packages/core/src/schedulers/native.coffee @@ -12,10 +12,11 @@ module.exports = (tasks, options = {}) -> # handler pushed or unshifted are not. # It is not possible to register managed handler once the scheduler has # resolved. - options.managed ?= false + options.managed ?= !!tasks options.parallel ?= 1 options.parallel = 1 if options.parallel is false options.parallel = -1 if options.parallel is true + options.end ?= true state = stack: [] pause: if options.pause? then !!options.pause else false @@ -39,20 +40,26 @@ module.exports = (tasks, options = {}) -> promise = new Promise (resolve, reject) -> scheduler = state: state + end: (end) -> + options.end = end + scheduler.pump() pump: -> return if state.pause return if state.running is options.parallel - unless state.managed.resolved + if not state.managed.resolved if state.managed.error state.managed.resolved = true # Any pending managed task is stripped out after an error clear_managed_tasks() scheduler.pump() return reject state.managed.error - else if count_pending_tasks() + state.managed.running is 0 + else if options.managed and options.end and count_pending_tasks() + state.managed.running is 0 state.managed.resolved = true scheduler.pump() return resolve state.output + else if not options.managed and options.end and state.stack.length is 0 + state.managed.resolved = true + return resolve() return unless state.stack.length task = state.stack.shift() if options.strict is true and not task.managed and state.error @@ -94,8 +101,8 @@ module.exports = (tasks, options = {}) -> new Promise (resolve, reject) -> unless isArray state.pending++ + tasks.managed ?= options.managed state.stack.unshift { - ...options ...tasks resolve: resolve reject: reject @@ -119,8 +126,8 @@ module.exports = (tasks, options = {}) -> prom = new Promise (resolve, reject) -> unless isArray state.pending++ + tasks.managed ?= options.managed state.stack.push { - ...options ...tasks resolve: resolve reject: reject diff --git a/packages/core/src/session.coffee b/packages/core/src/session.coffee index fb1b00aae..a21a62de1 100644 --- a/packages/core/src/session.coffee +++ b/packages/core/src/session.coffee @@ -74,7 +74,7 @@ session = (args, options={}) -> args: name: name, action: act # Local scheduler to execute children and be notified on finish schedulers = - in: schedule null, action.scheduler + in: schedule null, {...action.scheduler, end: false} out: schedule null, {...action.scheduler, pause: true} # Start with a paused scheduler to register actions out of the handler action.scheduler = schedulers.out @@ -108,8 +108,9 @@ session = (args, options={}) -> pump = -> # Now that the handler has been executed, # import all the actions registered outside of it - action.scheduler.state.stack.push child while child = schedulers.out.state.stack.shift() - action.scheduler.pump() + while task = schedulers.out.state.stack.shift() + action.scheduler.state.stack.push task + action.scheduler.end(true) output.then pump, pump # Make sure the promise is resolved after the scheduler and its children Promise.all [output, action.scheduler] diff --git a/packages/core/test/scheduler/creation.coffee b/packages/core/test/scheduler/creation.coffee index f3ec776cd..0885a1caa 100644 --- a/packages/core/test/scheduler/creation.coffee +++ b/packages/core/test/scheduler/creation.coffee @@ -36,7 +36,7 @@ describe 'scheduler.creation', -> ] scheduler.push -> new Promise (accept) -> accept 3 scheduler - .should.be.resolvedWith [] + .should.be.resolvedWith undefined describe 'error', ->