From ee0e4881dc2dd17fea8b4efea6e149bd86daab22 Mon Sep 17 00:00:00 2001 From: Michael FIG Date: Wed, 25 Mar 2020 13:12:31 -0600 Subject: [PATCH] fix: prevent deadlock in the input queue for delivered commands We want to free the queue after the kernel is run, not necessarily when the command's result promise has settled. --- .../cosmic-swingset/lib/ag-solo/fake-chain.js | 7 +- packages/cosmic-swingset/lib/ag-solo/queue.js | 4 +- packages/cosmic-swingset/lib/ag-solo/start.js | 103 ++++++++++-------- .../lib/ag-solo/vats/lib-wallet.js | 1 + .../lib/ag-solo/vats/vat-http.js | 3 +- packages/cosmic-swingset/lib/ag-solo/web.js | 6 +- 6 files changed, 67 insertions(+), 57 deletions(-) diff --git a/packages/cosmic-swingset/lib/ag-solo/fake-chain.js b/packages/cosmic-swingset/lib/ag-solo/fake-chain.js index e77b9121c8e..03c509fc274 100644 --- a/packages/cosmic-swingset/lib/ag-solo/fake-chain.js +++ b/packages/cosmic-swingset/lib/ag-solo/fake-chain.js @@ -59,7 +59,7 @@ export async function connectToFakeChain(basedir, GCI, role, delay, inbound) { const maximumDelay = (delay || PRETEND_BLOCK_DELAY) * 1000; const withBlockQueue = makeWithQueue(); - const simulateBlock = withBlockQueue(async () => { + const simulateBlock = withBlockQueue(async function simulateBlock() { const actualStart = Date.now(); // Gather up the new messages into the latest block. thisBlock.push(...intoChain); @@ -97,10 +97,7 @@ export async function connectToFakeChain(basedir, GCI, role, delay, inbound) { // TODO: maybe add latency to the inbound messages. const mailboxJSON = mailboxStorage.get(`mailbox.${bootAddress}`); const mailbox = mailboxJSON && JSON.parse(mailboxJSON); - const { outbox, ack } = mailbox || { - outbox: [], - ack: 0, - }; + const { outbox = [], ack = 0 } = mailbox || {}; inbound(GCI, outbox, ack); }); diff --git a/packages/cosmic-swingset/lib/ag-solo/queue.js b/packages/cosmic-swingset/lib/ag-solo/queue.js index d6fd30c386d..f2ceef0fec1 100644 --- a/packages/cosmic-swingset/lib/ag-solo/queue.js +++ b/packages/cosmic-swingset/lib/ag-solo/queue.js @@ -19,7 +19,9 @@ export const makeWithQueue = () => { // Rerun dequeue() after settling. .finally(() => { queue.shift(); - dequeue(); + if (queue.length) { + dequeue(); + } }); }; diff --git a/packages/cosmic-swingset/lib/ag-solo/start.js b/packages/cosmic-swingset/lib/ag-solo/start.js index 2f9d2dd69b0..d0e38a776d9 100644 --- a/packages/cosmic-swingset/lib/ag-solo/start.js +++ b/packages/cosmic-swingset/lib/ag-solo/start.js @@ -130,68 +130,81 @@ async function buildSwingset( // Use the input queue to make sure it doesn't overlap with // other inbound messages. - const deliverInboundToMbx = withInputQueue(async (sender, messages, ack) => { - if (!(messages instanceof Array)) { - throw new Error(`inbound given non-Array: ${messages}`); - } - // console.log(`deliverInboundToMbx`, messages, ack); - if (mb.deliverInbound(sender, messages, ack, true)) { - await processKernel(); - } - }); + const queuedDeliverInboundToMbx = withInputQueue( + async function deliverInboundToMbx(sender, messages, ack) { + if (!(messages instanceof Array)) { + throw new Error(`inbound given non-Array: ${messages}`); + } + // console.log(`deliverInboundToMbx`, messages, ack); + if (mb.deliverInbound(sender, messages, ack, true)) { + await processKernel(); + } + }, + ); // Use the input queue to make sure it doesn't overlap with // other inbound messages. - const deliverInboundCommand = withInputQueue(async obj => { - // this promise could take an arbitrarily long time to resolve, so don't - // wait on it - const p = cm.inboundCommand(obj); - - // Register a handler in this turn so that we don't get complaints about - // asynchronously-handled callbacks. - p.catch(_ => {}); - - // The turn passes... - await processKernel(); - - // Rethrow any inboundCommand rejection in the new turn so that our - // caller must handle it (or be an unhandledRejection). - return p.catch(e => { - throw e; - }); - }); + const queuedBoxedDeliverInboundCommand = withInputQueue( + async function deliverInboundCommand(obj) { + // this promise could take an arbitrarily long time to resolve, so don't + // wait on it + const p = cm.inboundCommand(obj); + + // Register a handler in this turn so that we don't get complaints about + // asynchronously-handled callbacks. + p.catch(_ => {}); + + // The turn passes... + await processKernel(); + + // Rethrow any inboundCommand rejection in the new turn so that our + // caller must handle it (or be an unhandledRejection). + + // We box the promise, so that this queue isn't stalled. + return [ + p.catch(e => { + throw e; + }), + ]; + }, + ); let intervalMillis; // Use the input queue to make sure it doesn't overlap with // other inbound messages. - const moveTimeForward = withInputQueue(async () => { - const now = Math.floor(Date.now() / intervalMillis); - try { - if (timer.poll(now)) { - await processKernel(); - log.debug(`timer-provoked kernel crank complete ${now}`); + const queuedMoveTimeForward = withInputQueue( + async function moveTimeForward() { + const now = Math.floor(Date.now() / intervalMillis); + try { + if (timer.poll(now)) { + await processKernel(); + log.debug(`timer-provoked kernel crank complete ${now}`); + } + } catch (err) { + log.error(`timer-provoked kernel crank failed at ${now}:`, err); + } finally { + // We only rearm the timeout if moveTimeForward has completed, to + // make sure we don't have two copies of controller.run() executing + // at the same time. + setTimeout(queuedMoveTimeForward, intervalMillis); } - } catch (err) { - log.error(`timer-provoked kernel crank failed at ${now}:`, err); - } finally { - // We only rearm the timeout if moveTimeForward has completed, to - // make sure we don't have two copies of controller.run() executing - // at the same time. - setTimeout(moveTimeForward, intervalMillis); - } - }); + }, + ); // now let the bootstrap functions run await processKernel(); + const queuedDeliverInboundCommand = obj => + queuedBoxedDeliverInboundCommand(obj).then(([p]) => p); + return { - deliverInboundToMbx, - deliverInboundCommand, + deliverInboundToMbx: queuedDeliverInboundToMbx, + deliverInboundCommand: queuedDeliverInboundCommand, deliverOutbound, startTimer: interval => { intervalMillis = interval; - setTimeout(moveTimeForward, intervalMillis); + setTimeout(queuedMoveTimeForward, intervalMillis); }, }; } diff --git a/packages/cosmic-swingset/lib/ag-solo/vats/lib-wallet.js b/packages/cosmic-swingset/lib/ag-solo/vats/lib-wallet.js index c48fc425304..42f25222b3f 100644 --- a/packages/cosmic-swingset/lib/ag-solo/vats/lib-wallet.js +++ b/packages/cosmic-swingset/lib/ag-solo/vats/lib-wallet.js @@ -362,6 +362,7 @@ export async function makeWallet( }) .catch(rejected); } catch (e) { + console.error('Have error', e); rejected(e); throw e; } diff --git a/packages/cosmic-swingset/lib/ag-solo/vats/vat-http.js b/packages/cosmic-swingset/lib/ag-solo/vats/vat-http.js index 7175d9ff474..f45ea3bc42c 100644 --- a/packages/cosmic-swingset/lib/ag-solo/vats/vat-http.js +++ b/packages/cosmic-swingset/lib/ag-solo/vats/vat-http.js @@ -199,7 +199,8 @@ function build(E, D) { D(commandDevice).sendResponse(count, false, harden(true)); } catch (rej) { console.debug(`Error ${dispatcher}:`, rej); - D(commandDevice).sendResponse(count, true, harden(rej)); + const jsonable = (rej && rej.message) || rej; + D(commandDevice).sendResponse(count, true, harden(jsonable)); } }, }; diff --git a/packages/cosmic-swingset/lib/ag-solo/web.js b/packages/cosmic-swingset/lib/ag-solo/web.js index 0b99a5338ec..450580ebf3e 100644 --- a/packages/cosmic-swingset/lib/ag-solo/web.js +++ b/packages/cosmic-swingset/lib/ag-solo/web.js @@ -167,11 +167,7 @@ export function makeHTTPListener(basedir, port, host, rawInboundCommand) { ).finally(() => channels.delete(channelID)); }); - inboundCommand( - { type: 'ws/meta' }, - { ...meta, dispatcher: 'onOpen' }, - id, - ); + inboundCommand({ type: 'ws/meta' }, { ...meta, dispatcher: 'onOpen' }, id); ws.on('message', async message => { let obj = {};