Skip to content

Commit

Permalink
fix: proper inbound IBC listening
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed May 2, 2020
1 parent 3424ca0 commit 3988235
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 49 deletions.
30 changes: 16 additions & 14 deletions packages/SwingSet/src/vats/network/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import { toBytes } from './bytes';

const harden = /** @type {<T>(x: T) => T} */ (rawHarden);

export const ENDPOINT_SEPARATOR = '/';

/**
* @template T,U
* @typedef {import('@agoric/store').Store<T,U>} Store
Expand All @@ -23,7 +25,7 @@ const harden = /** @type {<T>(x: T) => T} */ (rawHarden);

/**
* @typedef {Object} Protocol The network Protocol
* @property {(prefix: Endpoint) => Promise<Port>} bind Claim a port, or if ending in '/', a fresh name
* @property {(prefix: Endpoint) => Promise<Port>} bind Claim a port, or if ending in ENDPOINT_SEPARATOR, a fresh name
*/

/**
Expand All @@ -38,7 +40,7 @@ const harden = /** @type {<T>(x: T) => T} */ (rawHarden);
/**
* @typedef {Object} ListenHandler A handler for incoming connections
* @property {(port: Port, l: ListenHandler) => Promise<void>} [onListen] The listener has been registered
* @property {(port: Port, localAddr: Endpoint, remoteAddr: Endpoint, l: ListenHandler) => Promise<ConnectionHandler>} [onAccept] A new connection is incoming
* @property {(port: Port, localAddr: Endpoint, remoteAddr: Endpoint, l: ListenHandler) => Promise<ConnectionHandler>} onAccept A new connection is incoming
* @property {(port: Port, rej: any, l: ListenHandler) => Promise<void>} [onError] There was an error while listening
* @property {(port: Port, l: ListenHandler) => Promise<void>} [onRemove] The listener has been removed
*/
Expand Down Expand Up @@ -230,18 +232,17 @@ export function crossoverConnection(
/**
* Get the list of prefixes from longest to shortest.
* @param {string} addr
* @param {string} [sep='/']
*/
export function getPrefixes(addr, sep = '/') {
const parts = addr.split(sep);
export function getPrefixes(addr) {
const parts = addr.split(ENDPOINT_SEPARATOR);

/**
* @type {string[]}
*/
const ret = [];
for (let i = parts.length; i > 0; i -= 1) {
// Try most specific match.
const prefix = parts.slice(0, i).join(sep);
const prefix = parts.slice(0, i).join(ENDPOINT_SEPARATOR);
ret.push(prefix);
}
return ret;
Expand Down Expand Up @@ -283,9 +284,7 @@ export function makeNetworkProtocol(protocolHandler, E = defaultE) {

const lchandler =
/** @type {ConnectionHandler} */
(await E(listener)
.onAccept(port, localAddr, remoteAddr, listener)
.catch(rethrowUnlessMissing));
(await E(listener).onAccept(port, localAddr, remoteAddr, listener));

return crossoverConnection(
lchandler,
Expand All @@ -301,7 +300,7 @@ export function makeNetworkProtocol(protocolHandler, E = defaultE) {
/** @type {string} */
(await E(port).getLocalAddress());

const ret = getPrefixes(remoteAddr, '/');
const ret = getPrefixes(remoteAddr);
if (await protocolImpl.isListening(ret)) {
return protocolImpl.inbound(ret, remoteAddr, localAddr, lchandler);
}
Expand Down Expand Up @@ -345,7 +344,7 @@ export function makeNetworkProtocol(protocolHandler, E = defaultE) {
*/
const bind = async localAddr => {
// Check if we are underspecified (ends in slash)
if (localAddr.endsWith('/')) {
if (localAddr.endsWith(ENDPOINT_SEPARATOR)) {
for (;;) {
// eslint-disable-next-line no-await-in-loop
const portID = await E(protocolHandler).generatePortID(localAddr);
Expand Down Expand Up @@ -536,9 +535,12 @@ export function makeLoopbackProtocolHandler(E = defaultE) {
}
const [lport, lhandler] = listeners.get(remoteAddr);
// console.log(`looking up onAccept in`, lhandler);
const rport = await E(lhandler)
.onAccept(lport, remoteAddr, localAddr, lhandler)
.catch(rethrowUnlessMissing);
const rport = await E(lhandler).onAccept(
lport,
remoteAddr,
localAddr,
lhandler,
);
// console.log(`rport is`, rport);
return rport;
},
Expand Down
19 changes: 10 additions & 9 deletions packages/SwingSet/src/vats/network/router.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import { E as defaultE } from '@agoric/eventual-send';
import rawHarden from '@agoric/harden';
import makeStore from '@agoric/store';
import { makeNetworkProtocol } from './network';
import { makeNetworkProtocol, ENDPOINT_SEPARATOR } from './network';

const harden = /** @type {<T>(x: T) => T} */ (rawHarden);

Expand All @@ -26,29 +26,31 @@ const harden = /** @type {<T>(x: T) => T} */ (rawHarden);
/**
* Create a slash-delimited router.
*
* @param {string} [sep='/'] the delimiter of the routing strings
* @returns {Router} a new Router
*/
export default function makeRouter(sep = '/') {
export default function makeRouter() {
/**
* @type {Store<string, any>}
*/
const prefixToRoute = makeStore('prefix');
return harden({
getRoutes(addr) {
const parts = addr.split(sep);
const parts = addr.split(ENDPOINT_SEPARATOR);
/**
* @type {[string, any][]}
*/
const ret = [];
for (let i = parts.length; i > 0; i -= 1) {
// Try most specific match.
const prefix = parts.slice(0, i).join(sep);
const prefix = parts.slice(0, i).join(ENDPOINT_SEPARATOR);
if (prefixToRoute.has(prefix)) {
ret.push([prefix, prefixToRoute.get(prefix)]);
}
// Trim off the last value (after the slash).
const defaultPrefix = prefix.substr(0, prefix.lastIndexOf('/') + 1);
const defaultPrefix = prefix.substr(
0,
prefix.lastIndexOf(ENDPOINT_SEPARATOR) + 1,
);
if (prefixToRoute.has(defaultPrefix)) {
ret.push([defaultPrefix, prefixToRoute.get(defaultPrefix)]);
}
Expand All @@ -75,12 +77,11 @@ export default function makeRouter(sep = '/') {
/**
* Create a router that behaves like a Protocol.
*
* @param {string} [sep='/'] the route separator
* @param {typeof defaultE} [E=defaultE] Eventual sender
* @returns {RouterProtocol} The new delegated protocol
*/
export function makeRouterProtocol(sep = '/', E = defaultE) {
const router = makeRouter(sep);
export function makeRouterProtocol(E = defaultE) {
const router = makeRouter();
const protocols = makeStore('prefix');
const protocolHandlers = makeStore('prefix');

Expand Down
5 changes: 5 additions & 0 deletions packages/SwingSet/test/test-network.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,15 @@ const makeProtocolHandler = t => {
*/
let l;
let lp;
let nonce = 0;
return harden({
async onCreate(_protocol, _impl) {
log('created', _protocol, _impl);
},
async generatePortID() {
nonce += 1;
return `${nonce}`;
},
async onBind(port, localAddr) {
t.assert(port, `port is supplied to onBind`);
t.assert(localAddr, `local address is supplied to onBind`);
Expand Down
50 changes: 25 additions & 25 deletions packages/cosmic-swingset/lib/ag-solo/vats/ibc.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ export function makeIBCProtocolHandler(
{ timerService },
) {
/**
* @type {Store<string, Promise<Connection>>}
* @type {Store<string, [ConnectionHandler, Promise<Connection>]>}
*/
const channelKeyToConnP = makeStore('CHANNEL:PORT');
const channelKeyToHandler = makeStore('CHANNEL:PORT');

/**
* @typedef {Object} Counterparty
Expand All @@ -122,7 +122,7 @@ export function makeIBCProtocolHandler(
/**
* @type {Store<string, ConnectingInfo>}
*/
const channelKeyToConnectingInfo = makeStore('CHANNEL:PORT');
const channelKeyToInfo = makeStore('CHANNEL:PORT');

/**
* @type {Set<string>}
Expand Down Expand Up @@ -409,7 +409,7 @@ export function makeIBCProtocolHandler(
version,
};
const channelKey = `${channelID}:${portID}`;
channelKeyToConnectingInfo.init(channelKey, obj);
channelKeyToInfo.init(channelKey, obj);

if (!FIXME_ALLOW_NAIVE_RELAYS || !chandler) {
// Just wait until the connection handler resolves.
Expand Down Expand Up @@ -547,10 +547,10 @@ paths:
);
if (!waiter) {
await E(protocolImpl).isListening([`/ibc-port/${portID}`]);
channelKeyToConnectingInfo.init(channelKey, obj);
channelKeyToInfo.init(channelKey, obj);
} else {
// We have more specific information.
channelKeyToConnectingInfo.set(channelKey, obj);
channelKeyToInfo.set(channelKey, obj);
}
break;
}
Expand All @@ -569,8 +569,8 @@ paths:
connectionHops: hops,
counterparty: { port_id: rPortID, channel_id: rChannelID },
counterpartyVersion: storedVersion,
} = channelKeyToConnectingInfo.get(channelKey);
channelKeyToConnectingInfo.delete(channelKey);
} = channelKeyToInfo.get(channelKey);
channelKeyToInfo.delete(channelKey);

const rVersion = updatedVersion || storedVersion;
const localAddr = `/ibc-port/${portID}/${order.toLowerCase()}/${version}`;
Expand Down Expand Up @@ -610,7 +610,7 @@ paths:
}

// Check for a listener for this subprotocol.
const listenSearch = getPrefixes(localAddr, '/');
const listenSearch = getPrefixes(localAddr);
const rchandler = makeIBCConnectionHandler(
channelID,
portID,
Expand All @@ -620,17 +620,17 @@ paths:
);

// Actually connect.
const connP =
/** @type {Promise<Connection>} */
(E(protocolImpl).inbound(
listenSearch,
localAddr,
remoteAddr,
rchandler,
));
// eslint-disable-next-line prettier/prettier
const connP = /** @type {Promise<Connection>} */
(E(protocolImpl).inbound(listenSearch, localAddr, remoteAddr, rchandler))
.then(conn => {
console.info(`FIGME: got connection`, conn);
return conn;
});

/* Stash it for later use. */
channelKeyToConnP.init(channelKey, connP);
console.info(`FIGME: Stashing ${channelKey}`, rchandler);
channelKeyToHandler.init(channelKey, [rchandler, connP]);
break;
}

Expand All @@ -643,11 +643,11 @@ paths:
} = packet;
const channelKey = `${channelID}:${portID}`;

const connP = channelKeyToConnP.get(channelKey);
const [chandler, connP] = channelKeyToHandler.get(channelKey);
const data = base64ToBytes(data64);

E(connP)
.send(data)
connP
.then(conn => E(chandler).onReceive(conn, data, chandler))
.then(ack => {
const ack64 = dataToBase64(/** @type {Bytes} */ (ack));
return callIBCDevice('packetExecuted', { packet, ack: ack64 });
Expand Down Expand Up @@ -688,10 +688,10 @@ paths:
case 'channelCloseConfirm': {
const { portID, channelID } = obj;
const channelKey = `${channelID}:${portID}`;
if (channelKeyToConnP.has(channelKey)) {
const connP = channelKeyToConnP.get(channelKey);
channelKeyToConnP.delete(channelKey);
E(connP).close();
if (channelKeyToHandler.has(channelKey)) {
const [chandler, connP] = channelKeyToHandler.get(channelKey);
channelKeyToHandler.delete(channelKey);
connP.then(conn => E(chandler).onClose(conn, undefined, chandler));
}
break;
}
Expand Down
2 changes: 1 addition & 1 deletion packages/cosmic-swingset/lib/ag-solo/vats/vat-network.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import harden from '@agoric/harden';
import { makeRouterProtocol } from '@agoric/swingset-vat/src/vats/network/router';

function build(E) {
return harden(makeRouterProtocol('/', E));
return harden(makeRouterProtocol(E));
}

export default function setup(syscall, state, helpers) {
Expand Down

0 comments on commit 3988235

Please sign in to comment.