Skip to content

Commit

Permalink
fix: prevent deadlock in the input queue for delivered commands
Browse files Browse the repository at this point in the history
We want to free the queue after the kernel is run, not necessarily when the command's result promise has settled.
  • Loading branch information
michaelfig committed Mar 25, 2020
1 parent 0f050cc commit ee0e488
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 57 deletions.
7 changes: 2 additions & 5 deletions packages/cosmic-swingset/lib/ag-solo/fake-chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export async function connectToFakeChain(basedir, GCI, role, delay, inbound) {
const maximumDelay = (delay || PRETEND_BLOCK_DELAY) * 1000;

const withBlockQueue = makeWithQueue();
const simulateBlock = withBlockQueue(async () => {
const simulateBlock = withBlockQueue(async function simulateBlock() {
const actualStart = Date.now();
// Gather up the new messages into the latest block.
thisBlock.push(...intoChain);
Expand Down Expand Up @@ -97,10 +97,7 @@ export async function connectToFakeChain(basedir, GCI, role, delay, inbound) {
// TODO: maybe add latency to the inbound messages.
const mailboxJSON = mailboxStorage.get(`mailbox.${bootAddress}`);
const mailbox = mailboxJSON && JSON.parse(mailboxJSON);
const { outbox, ack } = mailbox || {
outbox: [],
ack: 0,
};
const { outbox = [], ack = 0 } = mailbox || {};
inbound(GCI, outbox, ack);
});

Expand Down
4 changes: 3 additions & 1 deletion packages/cosmic-swingset/lib/ag-solo/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ export const makeWithQueue = () => {
// Rerun dequeue() after settling.
.finally(() => {
queue.shift();
dequeue();
if (queue.length) {
dequeue();
}
});
};

Expand Down
103 changes: 58 additions & 45 deletions packages/cosmic-swingset/lib/ag-solo/start.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,68 +130,81 @@ async function buildSwingset(

// Use the input queue to make sure it doesn't overlap with
// other inbound messages.
const deliverInboundToMbx = withInputQueue(async (sender, messages, ack) => {
if (!(messages instanceof Array)) {
throw new Error(`inbound given non-Array: ${messages}`);
}
// console.log(`deliverInboundToMbx`, messages, ack);
if (mb.deliverInbound(sender, messages, ack, true)) {
await processKernel();
}
});
const queuedDeliverInboundToMbx = withInputQueue(
async function deliverInboundToMbx(sender, messages, ack) {
if (!(messages instanceof Array)) {
throw new Error(`inbound given non-Array: ${messages}`);
}
// console.log(`deliverInboundToMbx`, messages, ack);
if (mb.deliverInbound(sender, messages, ack, true)) {
await processKernel();
}
},
);

// Use the input queue to make sure it doesn't overlap with
// other inbound messages.
const deliverInboundCommand = withInputQueue(async obj => {
// this promise could take an arbitrarily long time to resolve, so don't
// wait on it
const p = cm.inboundCommand(obj);

// Register a handler in this turn so that we don't get complaints about
// asynchronously-handled callbacks.
p.catch(_ => {});

// The turn passes...
await processKernel();

// Rethrow any inboundCommand rejection in the new turn so that our
// caller must handle it (or be an unhandledRejection).
return p.catch(e => {
throw e;
});
});
const queuedBoxedDeliverInboundCommand = withInputQueue(
async function deliverInboundCommand(obj) {
// this promise could take an arbitrarily long time to resolve, so don't
// wait on it
const p = cm.inboundCommand(obj);

// Register a handler in this turn so that we don't get complaints about
// asynchronously-handled callbacks.
p.catch(_ => {});

// The turn passes...
await processKernel();

// Rethrow any inboundCommand rejection in the new turn so that our
// caller must handle it (or be an unhandledRejection).

// We box the promise, so that this queue isn't stalled.
return [
p.catch(e => {
throw e;
}),
];
},
);

let intervalMillis;

// Use the input queue to make sure it doesn't overlap with
// other inbound messages.
const moveTimeForward = withInputQueue(async () => {
const now = Math.floor(Date.now() / intervalMillis);
try {
if (timer.poll(now)) {
await processKernel();
log.debug(`timer-provoked kernel crank complete ${now}`);
const queuedMoveTimeForward = withInputQueue(
async function moveTimeForward() {
const now = Math.floor(Date.now() / intervalMillis);
try {
if (timer.poll(now)) {
await processKernel();
log.debug(`timer-provoked kernel crank complete ${now}`);
}
} catch (err) {
log.error(`timer-provoked kernel crank failed at ${now}:`, err);
} finally {
// We only rearm the timeout if moveTimeForward has completed, to
// make sure we don't have two copies of controller.run() executing
// at the same time.
setTimeout(queuedMoveTimeForward, intervalMillis);
}
} catch (err) {
log.error(`timer-provoked kernel crank failed at ${now}:`, err);
} finally {
// We only rearm the timeout if moveTimeForward has completed, to
// make sure we don't have two copies of controller.run() executing
// at the same time.
setTimeout(moveTimeForward, intervalMillis);
}
});
},
);

// now let the bootstrap functions run
await processKernel();

const queuedDeliverInboundCommand = obj =>
queuedBoxedDeliverInboundCommand(obj).then(([p]) => p);

return {
deliverInboundToMbx,
deliverInboundCommand,
deliverInboundToMbx: queuedDeliverInboundToMbx,
deliverInboundCommand: queuedDeliverInboundCommand,
deliverOutbound,
startTimer: interval => {
intervalMillis = interval;
setTimeout(moveTimeForward, intervalMillis);
setTimeout(queuedMoveTimeForward, intervalMillis);
},
};
}
Expand Down
1 change: 1 addition & 0 deletions packages/cosmic-swingset/lib/ag-solo/vats/lib-wallet.js
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ export async function makeWallet(
})
.catch(rejected);
} catch (e) {
console.error('Have error', e);
rejected(e);
throw e;
}
Expand Down
3 changes: 2 additions & 1 deletion packages/cosmic-swingset/lib/ag-solo/vats/vat-http.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ function build(E, D) {
D(commandDevice).sendResponse(count, false, harden(true));
} catch (rej) {
console.debug(`Error ${dispatcher}:`, rej);
D(commandDevice).sendResponse(count, true, harden(rej));
const jsonable = (rej && rej.message) || rej;
D(commandDevice).sendResponse(count, true, harden(jsonable));
}
},
};
Expand Down
6 changes: 1 addition & 5 deletions packages/cosmic-swingset/lib/ag-solo/web.js
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,7 @@ export function makeHTTPListener(basedir, port, host, rawInboundCommand) {
).finally(() => channels.delete(channelID));
});

inboundCommand(
{ type: 'ws/meta' },
{ ...meta, dispatcher: 'onOpen' },
id,
);
inboundCommand({ type: 'ws/meta' }, { ...meta, dispatcher: 'onOpen' }, id);

ws.on('message', async message => {
let obj = {};
Expand Down

0 comments on commit ee0e488

Please sign in to comment.