Skip to content

Commit

Permalink
fix: implement epochs and make tolerant of restarts
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed Sep 16, 2020
1 parent 510f427 commit 1c786b8
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 30 deletions.
1 change: 1 addition & 0 deletions packages/SwingSet/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"@agoric/assert": "^0.0.9",
"@agoric/babel-parser": "^7.6.4",
"@agoric/bundle-source": "^1.1.7",
"@agoric/captp": "^1.4.0",
"@agoric/eventual-send": "^0.10.0",
"@agoric/import-bundle": "^0.0.9",
"@agoric/install-ses": "^0.3.0",
Expand Down
25 changes: 18 additions & 7 deletions packages/SwingSet/src/devices/plugin-src.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ export function buildRootDeviceNode(tools) {
const senders = {};
// Take a shallow copy so that these are not frozen.
const connectedMods = restart ? [...restart.connectedMods] : [];
const nextEpochs = restart ? [...restart.nextEpochs] : [];
const connectedState = restart ? [...restart.connectedState] : [];

function saveState() {
setDeviceState(
harden({
registeredReceiver,
// Take a shallow copy so that these are not frozen.
nextEpochs: [...nextEpochs],
connectedMods: [...connectedMods],
connectedState: [...connectedState],
}),
Expand All @@ -38,13 +40,15 @@ export function buildRootDeviceNode(tools) {
// Allocate this module first.
if (connectedMods[index] === undefined) {
connectedMods[index] = mod;
saveState();
}
if (connectedMods[index] !== mod) {
throw TypeError(
`Index ${index} is already allocated to ${connectedMods[index]}, not ${mod}`,
);
}
const epoch = nextEpochs[index] || 0;
nextEpochs[index] = epoch + 1;
saveState();

const modNS = endowments.require(mod);
const receiver = obj => {
Expand All @@ -69,7 +73,7 @@ export function buildRootDeviceNode(tools) {
);

// Establish a CapTP connection.
const { dispatch } = makeCapTP(mod, receiver, bootstrap);
const { dispatch } = makeCapTP(mod, receiver, bootstrap, { epoch });

// Save the dispatch function for later.
senders[index] = dispatch;
Expand All @@ -82,28 +86,35 @@ export function buildRootDeviceNode(tools) {

function send(index, obj) {
const mod = connectedMods[index];
console.info('send', index, obj, mod);
// console.info('send', index, obj, mod);
if (!mod) {
throw TypeError(`No module associated with ${index}`);
}
let sender = senders[index];
if (!sender) {
// Lazily create a sender.
console.info('Destroying', index);
SO(registeredReceiver).abort(index);
// Lazily create a fresh sender.
connect(mod, index);
sender = senders[index];
}
// Now actually send.
sender(obj);
}

endowments.registerResetter(() => {
connectedMods.forEach((mod, index) => {
if (mod) {
// console.info('Startup resetting', index, mod, nextEpochs[index]);
SO(registeredReceiver).reset(index, nextEpochs[index]);
}
});
});

return harden({
connect,
send,
registerReceiver(receiver) {
if (registeredReceiver) {
throw Error(`registerd receiver already set`);
throw Error(`registered receiver already set`);
}
registeredReceiver = receiver;
saveState();
Expand Down
16 changes: 15 additions & 1 deletion packages/SwingSet/src/devices/plugin.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
export function buildPlugin(pluginRequire, queueThunkForKernel) {
const srcPath = require.resolve('./plugin-src');
let resetter;

function reset() {
const init = resetter;
if (init) {
resetter = undefined;
init();
}
}

function registerResetter(init) {
resetter = init;
}

// srcPath and endowments are provided to buildRootDeviceNode() for use
// during configuration.
return {
srcPath,
endowments: { require: pluginRequire, queueThunkForKernel },
endowments: { require: pluginRequire, queueThunkForKernel, registerResetter },
reset,
};
}
10 changes: 9 additions & 1 deletion packages/captp/lib/captp.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export { E };
* @property {(err: any) => void} onReject
* @property {typeof defaultRemotable} Remotable
* @property {typeof defaultMakeMarshal} makeMarshal
* @property {number} epoch
*/
/**
* Create a CapTP connection.
Expand All @@ -38,6 +39,7 @@ export function makeCapTP(ourId, rawSend, bootstrapObj = undefined, opts = {}) {
onReject = err => console.error('CapTP', ourId, 'exception:', err),
Remotable = defaultRemotable,
makeMarshal = defaultMakeMarshal,
epoch = undefined,
} = opts;

const disconnectReason = id =>
Expand Down Expand Up @@ -179,6 +181,7 @@ export function makeCapTP(ourId, rawSend, bootstrapObj = undefined, opts = {}) {
const [questionID, pr] = makeQuestion();
send({
type: 'CTP_CALL',
epoch,
questionID,
target,
method: serialize(harden([prop])),
Expand All @@ -193,6 +196,7 @@ export function makeCapTP(ourId, rawSend, bootstrapObj = undefined, opts = {}) {
const [questionID, pr] = makeQuestion();
send({
type: 'CTP_CALL',
epoch,
questionID,
target,
method: serialize(harden([prop, args])),
Expand Down Expand Up @@ -259,6 +263,7 @@ export function makeCapTP(ourId, rawSend, bootstrapObj = undefined, opts = {}) {
answers.set(questionID, bootstrap);
return send({
type: 'CTP_RETURN',
epoch,
answerID: questionID,
result: serialize(bootstrap),
});
Expand Down Expand Up @@ -296,13 +301,15 @@ export function makeCapTP(ourId, rawSend, bootstrapObj = undefined, opts = {}) {
.then(res =>
send({
type: 'CTP_RETURN',
epoch,
answerID: questionID,
result: serialize(harden(res)),
}),
)
.catch(rej =>
send({
type: 'CTP_RETURN',
epoch,
answerID: questionID,
exception: serialize(harden(rej)),
}),
Expand Down Expand Up @@ -357,6 +364,7 @@ export function makeCapTP(ourId, rawSend, bootstrapObj = undefined, opts = {}) {
const [questionID, pr] = makeQuestion();
send({
type: 'CTP_BOOTSTRAP',
epoch,
questionID,
});
return harden(pr.p);
Expand All @@ -383,7 +391,7 @@ export function makeCapTP(ourId, rawSend, bootstrapObj = undefined, opts = {}) {

// Abort a connection.
const abort = (reason = undefined) => {
dispatch({ type: 'CTP_DISCONNECT', reason });
dispatch({ type: 'CTP_DISCONNECT', epoch, reason });
};

return harden({ abort, dispatch, getBootstrap, serialize, unserialize });
Expand Down
3 changes: 2 additions & 1 deletion packages/cosmic-swingset/lib/ag-solo/fake-chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ async function makeMapStorage(file) {
await fs.promises.writeFile(file, json);
};

let obj = {};
try {
content = await fs.promises.readFile(file);
obj = JSON.parse(content);
} catch (e) {
return map;
}
const obj = JSON.parse(content);
Object.entries(obj).forEach(([k, v]) => map.set(k, importMailbox(v)));

return map;
Expand Down
2 changes: 1 addition & 1 deletion packages/cosmic-swingset/lib/ag-solo/hello.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export const bootPlugin = ({ getState, setState }) => {
state = { count: state.count + 1 };
setState(state);
await fs.writeFile(dataFile, `Said hello ${state.count} times\n`);
return `Hello, ${name}`;
return `Hello, ${name}!`;
},
});
};
6 changes: 6 additions & 0 deletions packages/cosmic-swingset/lib/ag-solo/start.js
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ async function buildSwingset(
intervalMillis = interval;
setTimeout(queuedMoveTimeForward, intervalMillis);
},
resetOutdatedState: withInputQueue(() => {
plugin.reset();
return processKernel();
}),
};
}

Expand Down Expand Up @@ -259,6 +263,7 @@ export default async function start(basedir, argv) {
deliverInboundCommand,
deliverOutbound,
startTimer,
resetOutdatedState,
} = d;

let hostport;
Expand Down Expand Up @@ -313,6 +318,7 @@ export default async function start(basedir, argv) {

// Start timer here!
startTimer(1200);
resetOutdatedState();

// Remove wallet traces.
await unlink('html/wallet').catch(_ => {});
Expand Down
Loading

0 comments on commit 1c786b8

Please sign in to comment.