Skip to content

Commit

Permalink
fix: propagate Go errors all the way to the caller
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed Apr 27, 2020
1 parent 8b63024 commit ea5ba38
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 60 deletions.
7 changes: 5 additions & 2 deletions packages/cosmic-swingset/lib/ag-solo/vats/bridge.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,14 @@ export function makeBridgeManager(E, D, bridgeDevice) {
if (!bridgeDevice) {
throw Error(`bridge device not yet connected`);
}
const retval = D(bridgeDevice).callOutbound(dstID, obj);
const retobj = D(bridgeDevice).callOutbound(dstID, obj);
// note: *we* get this return value synchronously, but any callers (in
// separate vats) only get a Promise, and will receive the value in some
// future turn
return retval;
if (retobj && retobj.error) {
throw Error(retobj.error);
}
return retobj;
}

// We now manage the device.
Expand Down
235 changes: 199 additions & 36 deletions packages/cosmic-swingset/lib/ag-solo/vats/ibc.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import { makeWithQueue } from './queue';

const DEFAULT_PACKET_TIMEOUT = 1000;

// FIXME: this constitutes a security flaw, but is currently the
// only way to create channels.
const ALLOW_NAIVE_RELAYS = true;

/**
* @typedef {import('@agoric/swingset-vat/src/vats/network').ProtocolHandler} ProtocolHandler
* @typedef {import('@agoric/swingset-vat/src/vats/network').ProtocolImpl} ProtocolImpl
Expand All @@ -36,6 +40,20 @@ const DEFAULT_PACKET_TIMEOUT = 1000;
* @typedef {import('@agoric/store').Store<K, V>} Store
*/

const goodLetters = 'abcdefghijklmnopqrstuvwxyz';
/**
* Get a sequence of letters chosen from `goodLetters`.
* @param {number} n
*/
const getGoodLetters = n => {
let gl = '';
do {
gl += goodLetters[n % goodLetters.length];
n = Math.floor(n / goodLetters.length);
} while (n > 0);
return gl;
};

let seed = 0;

/**
Expand Down Expand Up @@ -69,16 +87,38 @@ export function makeIBCProtocolHandler(
* @property {string} channelID
* @property {Counterparty} counterparty
* @property {string} version
* @property {string|undefined} counterpartyVersion
* @property {string} [counterpartyVersion]
*/

/**
* @type {Store<string, ConnectingInfo>}
*/
const channelKeyToConnectingInfo = makeStore('CHANNEL:PORT');

/**
* @type {Set<string>}
*/
const usedChannels = new Set();

seed += 1;
const sparseInts = generateSparseInts(seed);
const portSparseInts = generateSparseInts(seed);
const channelSparseInts = generateSparseInts(seed * 2);

const generateChannelID = () => {
let channelID;
for (;;) {
const n = channelSparseInts.next().value;
if (typeof n !== 'number') {
throw Error(`internal: channelSparseInts is out of ints`);
}

channelID = `chan${getGoodLetters(n)}`;
if (!usedChannels.has(channelID)) {
usedChannels.add(channelID);
return channelID;
}
}
};

/**
* @type {Store<string, PromiseRecord<Bytes, any>>}
Expand Down Expand Up @@ -160,6 +200,8 @@ export function makeIBCProtocolHandler(
onReceive,
async onClose(_conn, _reason, _handler) {
await callIBCDevice('channelCloseInit', { channelID, portID });
usedChannels.delete(channelID);
usedChannels.delete(rChannelID);
},
});
}
Expand All @@ -183,26 +225,29 @@ export function makeIBCProtocolHandler(
let protocolImpl;

/**
* @type {Store<string, Store<string, (typeof makeIBCConnectionHandler)[]>>}
* @typedef {Object} ConnectedRecord
* @property {string} channelID
* @property {string} rChannelID
* @property {typeof makeIBCConnectionHandler} connected
*/

/**
* @type {Store<string, Store<string, ConnectedRecord[]>>}
*/
const outboundWaiters = makeStore('destination');

const goodLetters = 'abcdefghijklmnopqrstuvwxyz';

// We delegate to a loopback protocol, too, to connect locally.
const protocol = extendLoopbackProtocolHandler({
async onCreate(impl, _protocolHandler) {
console.info('IBC onCreate');
protocolImpl = impl;
},
async generatePortID(_localAddr, _protocolHandler) {
let n = /** @type {number} */ (sparseInts.next().value);
let nonceLetters = '';
do {
nonceLetters += goodLetters[n % goodLetters.length];
n = Math.floor(n / goodLetters.length);
} while (n > 0);
return `port${nonceLetters}`;
const n = portSparseInts.next().value;
if (!n) {
throw Error(`internal: portSparseInts is out of ints`);
}
return `port${getGoodLetters(n)}`;
},
async onBind(_port, localAddr, _protocolHandler) {
const portID = localAddrToPortID(localAddr);
Expand All @@ -211,23 +256,40 @@ export function makeIBCProtocolHandler(
async onConnect(_port, localAddr, remoteAddr, _protocolHandler) {
console.warn('IBC onConnect', localAddr, remoteAddr);
const portID = localAddrToPortID(localAddr);
if (!remoteAddr.match(/^(\/ibc-hop\/[^/]+)+\/ibc-port\/[^/]+$/)) {
const match = remoteAddr.match(
/^(\/ibc-hop\/[^/]+)*\/ibc-port\/([^/]+)\/(ordered|unordered)\/([^/]+)$/,
);
if (!match) {
throw TypeError(
`Remote address ${remoteAddr} must be /ibc-hop/HOP.../ibc-port/PORT`,
`Remote address ${remoteAddr} must be '(/ibc-hop/HOP...)*/ibc-port/PORT/(ordered|unordered)/VERSION'`,
);
}

// We need to wait around for a relayer to continue the connection.
// TODO: We should also send a ChanOpenInit to get a passive relayer flowing.
const hops = [];
let h = match[1];
while (h !== '') {
const m = h.match(/^\/ibc-hop\/([^/]+)/);
h = h.substr(m[0].length);
hops.push(m[1]);
}
const rPortID = match[2];
/**
* @type {'ORDERED'|'UNORDERED'}
*/
const order = match[4] === 'ordered' ? 'ORDERED' : 'UNORDERED';
const version = match[5];

const rChannelID = generateChannelID();
const channelID = generateChannelID();

/** @type {PromiseRecord<ConnectionHandler,any>} */
const chandler = producePromise();

/**
* @type {typeof makeIBCConnectionHandler}
*/
const connected = (...args) => {
const ch = makeIBCConnectionHandler(...args);
const connected = (cID, pID, rCID, rPID, ord) => {
const ch = makeIBCConnectionHandler(cID, pID, rCID, rPID, ord);
chandler.resolve(ch);
return ch;
};
Expand All @@ -243,9 +305,35 @@ export function makeIBCProtocolHandler(
waiterList = waiters.get(portID);
} else {
waiterList = [];
waiters.init(remoteAddr, waiterList);
waiters.init(portID, waiterList);
}
waiterList.push(connected);
waiterList.push({ channelID, rChannelID, connected });

// Try sending a ChanOpenInit to get a passive relayer flowing.
const packet = {
source_channel: channelID,
source_port: portID,
destination_channel: rChannelID,
destination_port: rPortID,
};

const obj = {
channelID,
portID,
counterparty: { channel_id: rChannelID, port_id: rPortID },
connectionHops: hops,
order,
version,
};
const channelKey = `${channelID}:${portID}`;
channelKeyToConnectingInfo.init(channelKey, obj);

await callIBCDevice('channelOpenInit', {
packet,
order,
hops,
version,
});
return chandler.promise;
},
async onListen(_port, localAddr, _listenHandler) {
Expand All @@ -259,25 +347,59 @@ export function makeIBCProtocolHandler(
},
});

function getWaiter(hops, portID, rPort, claim = false) {
const us = `/ibc-port/${portID}`;
/**
*
* @param {string[]} hops
* @param {string} channelID,
* @param {string} portID
* @param {string} rPortID
* @param {'ORDERED'|'UNORDERED'} order
* @param {string} version
* @param {boolean} [removeMatching=false]
* @param {boolean} [defaultMatchFirst=false]
* @returns {ConnectedRecord|undefined}
*/
function getWaiter(
hops,
channelID,
portID,
rPortID,
order,
version,
removeMatching = false,
defaultMatchFirst = false,
) {
const us = `/ibc-port/${portID}/${order.toLowerCase()}/${version}`;
for (let i = 0; i <= hops.length; i += 1) {
// Try most specific to least specific outbound connections.
const ibcHops = hops
.slice(0, hops.length - i)
.map(hop => `/ibc-hop/${hop}`)
.join('/');
const them = `${ibcHops}/ibc-port/${rPort}`;
const them = `${ibcHops}/ibc-port/${rPortID}`;
if (outboundWaiters.has(them)) {
const waiters = outboundWaiters.get(them);
if (waiters.has(us)) {
// Return the list of waiters.
if (!claim) {
return true;
}
const waiterList = waiters.get(us);
// Find the best waiter.
let bestWaiter = defaultMatchFirst ? 0 : -1;
for (let j = 0; i < waiterList.length; j += 1) {
if (waiterList[j].channelID === channelID) {
bestWaiter = j;
break;
}
}

if (bestWaiter < 0) {
return undefined;
}
const waiter = waiterList[bestWaiter];
if (!removeMatching) {
return waiter;
}

// Clean up the maps.
const waiter = waiterList.shift();
waiterList.splice(bestWaiter, 1);
if (waiterList.length === 0) {
waiters.delete(us);
}
Expand All @@ -288,7 +410,7 @@ export function makeIBCProtocolHandler(
}
}
}
return false;
return undefined;
}

return harden({
Expand All @@ -301,15 +423,41 @@ export function makeIBCProtocolHandler(
const {
channelID,
portID,
counterparty: { port_id: rPortID },
counterparty: { channel_id: rChannelID, port_id: rPortID },
connectionHops: hops,
order,
version,
} = obj;

const channelKey = `${channelID}:${portID}`;
if (!getWaiter(hops, portID, rPortID)) {
const waiter = getWaiter(
hops,
portID,
channelID,
rPortID,
order,
version,
false,
ALLOW_NAIVE_RELAYS,
);
if (waiter) {
if (ALLOW_NAIVE_RELAYS) {
// Update the used map if the relayer actively created the channel
// for us.
// This is the case with the "naive" relay strategy.
if (waiter.channelID !== channelID) {
usedChannels.delete(waiter.channelID);
usedChannels.add(channelID);
}
if (waiter.rChannelID !== rChannelID) {
usedChannels.delete(waiter.rChannelID);
}
}
channelKeyToConnectingInfo.set(channelKey, obj);
} else {
await E(protocolImpl).isListening([`/ibc-port/${portID}`]);
channelKeyToConnectingInfo.init(channelKey, obj);
}
channelKeyToConnectingInfo.init(channelKey, obj);
break;
}

Expand All @@ -334,10 +482,25 @@ export function makeIBCProtocolHandler(
const localAddr = `/ibc-port/${portID}/${order.toLowerCase()}/${version}`;
const ibcHops = hops.map(hop => `/ibc-hop/${hop}`).join('/');
const remoteAddr = `${ibcHops}/ibc-port/${rPortID}/${order.toLowerCase()}/${rVersion}`;
const waiter = getWaiter(hops, portID, rPortID, true);
if (typeof waiter === 'function') {
const waiter = getWaiter(
hops,
channelID,
portID,
rPortID,
order,
version,
true,
ALLOW_NAIVE_RELAYS,
);
if (waiter) {
// An outbound connection wants to use this channel.
waiter(channelID, portID, rChannelID, rPortID, order === 'ORDERED');
waiter.connected(
channelID,
portID,
rChannelID,
rPortID,
order === 'ORDERED',
);
break;
}

Expand Down Expand Up @@ -382,7 +545,7 @@ export function makeIBCProtocolHandler(
.send(data)
.then(ack => {
const ack64 = dataToBase64(/** @type {Bytes} */ (ack));
callIBCDevice('packetExecuted', { packet, ack: ack64 });
return callIBCDevice('packetExecuted', { packet, ack: ack64 });
})
.catch(e => console.error(e));
break;
Expand Down
Loading

0 comments on commit ea5ba38

Please sign in to comment.