Skip to content

Commit

Permalink
Merge pull request #710 from Agoric/mfig/better-errors-and-sync
Browse files Browse the repository at this point in the history
Fix ag-solo kernel reentrancy and fake-chain non-determinism
  • Loading branch information
michaelfig authored Mar 18, 2020
2 parents 4985f5d + c1282c9 commit ec5be6c
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 64 deletions.
24 changes: 17 additions & 7 deletions packages/SwingSet/src/kernel/kernel.js
Original file line number Diff line number Diff line change
Expand Up @@ -364,16 +364,26 @@ export default function buildKernel(kernelEndowments) {
}
}

let processQueueRunning;
async function processQueueMessage(message) {
kdebug(`processQ ${JSON.stringify(message)}`);
if (message.type === 'send') {
await deliverToTarget(message.target, message.msg);
} else if (message.type === 'notify') {
await processNotify(message);
} else {
throw Error(`unable to process message.type ${message.type}`);
if (processQueueRunning) {
console.log(`We're currently already running at`, processQueueRunning);
throw Error(`Kernel reentrancy is forbidden`);
}
try {
processQueueRunning = Error('here');
if (message.type === 'send') {
await deliverToTarget(message.target, message.msg);
} else if (message.type === 'notify') {
await processNotify(message);
} else {
throw Error(`unable to process message.type ${message.type}`);
}
commitCrank();
} finally {
processQueueRunning = undefined;
}
commitCrank();
}

function validateVatSetupFn(setup) {
Expand Down
17 changes: 10 additions & 7 deletions packages/SwingSet/src/kernel/vatManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,11 @@ export default function makeVatManager(
return process(
() => dispatch[dispatchOp](...dispatchArgs),
() => transcriptFinishDispatch(),
err => console.log(`doProcess: ${errmsg}:`, err),
err => {
if (errmsg !== null) {
console.log(`doProcess: ${errmsg}:`, err);
}
},
);
}

Expand Down Expand Up @@ -382,13 +386,12 @@ export default function makeVatManager(
throw replayAbandonShip;
}
playbackSyscalls = Array.from(t.syscalls);
// We really don't care about "failed replays" because they're just
// exceptions that have been raised in a normal event.
//
// If we really fail, replayAbandonShip is set.
// eslint-disable-next-line no-await-in-loop
await doProcess(
t.d,
`Replay failed: [${t.d[0]}, ${t.d[1]}, ${t.d[2]}, ${JSON.stringify(
t.d[3],
)}]`,
);
await doProcess(t.d, null);
}

if (replayAbandonShip) {
Expand Down
35 changes: 25 additions & 10 deletions packages/cosmic-swingset/lib/ag-solo/fake-chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import path from 'path';
import fs from 'fs';
import stringify from '@agoric/swingset-vat/src/kernel/json-stable-stringify';
import { launch } from '../launch-chain';
import makeBlockManager from '../block-manager';

const PRETEND_BLOCK_DELAY = 5;

Expand Down Expand Up @@ -36,33 +37,47 @@ export async function connectToFakeChain(basedir, GCI, role, delay, inbound) {
const argv = [`--role=${role}`, bootAddress];
const stateDBdir = path.join(basedir, `fake-chain-${GCI}-state`);
const s = await launch(stateDBdir, mailboxStorage, vatsdir, argv);
const { deliverInbound, beginBlock, saveChainState, saveOutsideState } = s;

let pretendLast = Date.now();
let blockHeight = 0;
const blockManager = makeBlockManager(s);
const { savedHeight, savedActions } = s;

let blockHeight = savedHeight;
let blockTime =
savedActions.length > 0
? savedActions[0].blockTime
: Math.floor(Date.now() / 1000);
let intoChain = [];
let thisBlock = [];

async function simulateBlock() {
const actualStart = Date.now();
// Gather up the new messages into the latest block.
thisBlock.push(...intoChain);
intoChain = [];

try {
const commitStamp = pretendLast + PRETEND_BLOCK_DELAY * 1000;
const blockTime = Math.floor(commitStamp / 1000);
await beginBlock(blockHeight, blockTime);
blockTime += PRETEND_BLOCK_DELAY;
blockHeight += 1;

await blockManager({ type: 'BEGIN_BLOCK', blockHeight, blockTime });
for (let i = 0; i < thisBlock.length; i += 1) {
const [newMessages, acknum] = thisBlock[i];
await deliverInbound(bootAddress, newMessages, acknum);
await blockManager({
type: 'DELIVER_INBOUND',
peer: bootAddress,
messages: newMessages,
ack: acknum,
blockHeight,
blockTime,
});
}
await blockManager({ type: 'END_BLOCK', blockHeight, blockTime });

// Done processing, "commit the block".
saveChainState();
saveOutsideState();
await blockManager({ type: 'COMMIT_BLOCK', blockHeight, blockTime });
await writeMap(mailboxFile, mailboxStorage);
thisBlock = [];
pretendLast = commitStamp + Date.now() - actualStart;
blockTime = blockTime + Date.now() - actualStart;
blockHeight += 1;
} catch (e) {
console.log(`error fake processing`, e);
Expand Down
43 changes: 43 additions & 0 deletions packages/cosmic-swingset/lib/ag-solo/queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import makePromise from '@agoric/make-promise';

// Return a function that can wrap an async or sync method, but
// ensures only one of them (in order) is running at a time.
export const makeWithQueue = () => {
const queue = [];

// Execute the thunk at the front of the queue.
const dequeue = () => {
if (!queue.length) {
return;
}
const [thunk, resolve, reject] = queue[0];
// Run the thunk in a new turn.
Promise.resolve()
.then(thunk)
// Resolve or reject our caller with the thunk's value.
.then(resolve, reject)
// Rerun dequeue() after settling.
.finally(() => {
queue.shift();
dequeue();
});
};

return function withQueue(inner) {
return function queueCall(...args) {
// Curry the arguments into the inner function, and
// resolve/reject with whatever the inner function does.
const thunk = _ => inner(...args);
const pr = makePromise();
queue.push([thunk, pr.res, pr.rej]);

if (queue.length === 1) {
// Start running immediately.
dequeue();
}

// Allow the caller to retrieve our thunk's results.
return pr.p;
};
};
};
94 changes: 54 additions & 40 deletions packages/cosmic-swingset/lib/ag-solo/start.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import fs from 'fs';
import path from 'path';
import temp from 'temp';
import { promisify } from 'util';
import readlines from 'n-readlines';
// import { createHash } from 'crypto';

// import connect from 'lotion-connect';
Expand All @@ -24,10 +23,10 @@ import {

import { deliver, addDeliveryTarget } from './outbound';
import { makeHTTPListener } from './web';
import { makeWithQueue } from './queue';

import { connectToChain } from './chain-cosmos-sdk';
import { connectToFakeChain } from './fake-chain';
import bundle from './bundle';

// import { makeChainFollower } from './follower';
// import { makeDeliverator } from './deliver-with-ag-cosmos-helper';
Expand Down Expand Up @@ -123,42 +122,61 @@ async function buildSwingset(
}
}

async function deliverInboundToMbx(sender, messages, ack) {
const withInputQueue = makeWithQueue();

// Use the input queue to make sure it doesn't overlap with
// other inbound messages.
const deliverInboundToMbx = withInputQueue(async (sender, messages, ack) => {
if (!(messages instanceof Array)) {
throw new Error(`inbound given non-Array: ${messages}`);
}
// console.log(`deliverInboundToMbx`, messages, ack);
if (mb.deliverInbound(sender, messages, ack, true)) {
await processKernel();
}
}
});

async function deliverInboundCommand(obj) {
// Use the input queue to make sure it doesn't overlap with
// other inbound messages.
const deliverInboundCommand = withInputQueue(async obj => {
// this promise could take an arbitrarily long time to resolve, so don't
// wait on it
const p = cm.inboundCommand(obj);
// TODO: synchronize this somehow, make sure it doesn't overlap with the
// processKernel() call in deliverInbound()

// Register a handler in this turn so that we don't get complaints about
// asynchronously-handled callbacks.
p.catch(_ => {});

// The turn passes...
await processKernel();
return p;
}

const intervalMillis = 1200;
// TODO(hibbert) protect against kernel turns that take too long
// drop calls to moveTimeForward if it's fallen behind, to make sure we don't
// have two copies of controller.run() executing at the same time.
function moveTimeForward() {
// Rethrow any inboundCommand rejection in the new turn so that our
// caller must handle it (or be an unhandledRejection).
return p.catch(e => {
throw e;
});
});

let intervalMillis;

// Use the input queue to make sure it doesn't overlap with
// other inbound messages.
const moveTimeForward = withInputQueue(async () => {
const now = Math.floor(Date.now() / intervalMillis);
if (timer.poll(now)) {
const p = processKernel();
p.then(
_ => console.log(`timer-provoked kernel crank complete ${now}`),
err =>
console.log(`timer-provoked kernel crank failed at ${now}:`, err),
);
try {
if (timer.poll(now)) {
await processKernel();
console.log(`timer-provoked kernel crank complete ${now}`);
}
} catch (err) {
console.log(`timer-provoked kernel crank failed at ${now}:`, err);
} finally {
// We only rearm the timeout if moveTimeForward has completed, to
// make sure we don't have two copies of controller.run() executing
// at the same time.
setTimeout(moveTimeForward, intervalMillis);
}
}
setInterval(moveTimeForward, intervalMillis);
});

// now let the bootstrap functions run
await processKernel();
Expand All @@ -167,6 +185,10 @@ async function buildSwingset(
deliverInboundToMbx,
deliverInboundCommand,
deliverOutbound,
startTimer: interval => {
intervalMillis = interval;
setTimeout(moveTimeForward, intervalMillis);
},
};
}

Expand Down Expand Up @@ -199,7 +221,12 @@ export default async function start(basedir, withSES, argv) {
broadcast,
);

const { deliverInboundToMbx, deliverInboundCommand, deliverOutbound } = d;
const {
deliverInboundToMbx,
deliverInboundCommand,
deliverOutbound,
startTimer,
} = d;

await Promise.all(
connections.map(async c => {
Expand Down Expand Up @@ -252,23 +279,10 @@ export default async function start(basedir, withSES, argv) {
}),
);

// Start timer here!
startTimer(1200);

console.log(`swingset running`);
swingSetRunning = true;
deliverOutbound();

// Install the bundles as specified.
const initDir = path.join(basedir, 'init-bundles');
let list = [];
try {
list = await fs.promises.readdir(initDir);
} catch (e) {}
for (const initName of list.sort()) {
console.log('loading init bundle', initName);
const initFile = path.join(initDir, initName);
if (
await bundle(() => '.', ['--evaluate', '--once', '--input', initFile])
) {
return 0;
}
}
}

0 comments on commit ec5be6c

Please sign in to comment.