From e7ea32047f5a87a18c5ed4722d4d6e2163f7f2aa Mon Sep 17 00:00:00 2001 From: Michael FIG Date: Tue, 19 Oct 2021 14:29:03 -0600 Subject: [PATCH] fix(pegasus): more POLA and less global state --- packages/pegasus/src/courier.js | 124 +++++ packages/pegasus/src/once-promise-kit.js | 33 ++ packages/pegasus/src/pegasus.js | 591 ++++++++--------------- packages/pegasus/src/types.js | 56 +++ packages/pegasus/test/test-peg.js | 65 ++- 5 files changed, 470 insertions(+), 399 deletions(-) create mode 100644 packages/pegasus/src/courier.js create mode 100644 packages/pegasus/src/once-promise-kit.js diff --git a/packages/pegasus/src/courier.js b/packages/pegasus/src/courier.js new file mode 100644 index 00000000000..4e27eff9627 --- /dev/null +++ b/packages/pegasus/src/courier.js @@ -0,0 +1,124 @@ +import { details as X } from '@agoric/assert'; + +import { AmountMath } from '@agoric/ertp'; +import { E } from '@agoric/eventual-send'; +import { Far } from '@agoric/marshal'; +import { makeOncePromiseKit } from './once-promise-kit.js'; + +/** + * Create or return an existing courier promise kit. + * + * @template K + * @param {K} key + * @param {Store>} keyToCourierPK + */ +export const getCourierPK = (key, keyToCourierPK) => { + if (keyToCourierPK.has(key)) { + return keyToCourierPK.get(key); + } + + // This is the first packet for this denomination. + // Create a new Courier promise kit for it. + const courierPK = makeOncePromiseKit(() => X`${key} already pegged`); + + keyToCourierPK.init(key, courierPK); + return courierPK; +}; + +/** + * Create the [send, receive] pair. + * + * @typedef {Object} CourierArgs + * @property {ContractFacet} zcf + * @property {ERef} board + * @property {ERef} namesByAddress + * @property {Denom} remoteDenom + * @property {Brand} localBrand + * @property {(zcfSeat: ZCFSeat, amounts: AmountKeywordRecord) => void} retain + * @property {(zcfSeat: ZCFSeat, amounts: AmountKeywordRecord) => void} redeem + * @property {ERef} transferProtocol + * @param {ERef} connection + * @returns {(args: CourierArgs) => Courier} + */ +export const makeCourierMaker = connection => ({ + zcf, + board, + namesByAddress, + remoteDenom, + localBrand, + retain, + redeem, + transferProtocol, +}) => { + /** @type {Sender} */ + const send = async (zcfSeat, depositAddress) => { + const tryToSend = async () => { + const amount = zcfSeat.getAmountAllocated('Transfer', localBrand); + const transferPacket = await E(transferProtocol).makeTransferPacket({ + value: amount.value, + remoteDenom, + depositAddress, + }); + + // Retain the payment. We must not proceed on failure. + retain(zcfSeat, { Transfer: amount }); + + // The payment is already escrowed, and proposed to retain, so try sending. + return E(connection) + .send(transferPacket) + .then(ack => E(transferProtocol).assertTransferPacketAck(ack)) + .then( + _ => zcfSeat.exit(), + reason => { + // Return the payment to the seat, if possible. + redeem(zcfSeat, { Transfer: amount }); + throw reason; + }, + ); + }; + + // Reflect any error back to the seat. + return tryToSend().catch(reason => { + zcfSeat.fail(reason); + }); + }; + + /** @type {Receiver} */ + const receive = async ({ value, depositAddress }) => { + const localAmount = AmountMath.make(localBrand, value); + + // Look up the deposit facet for this board address, if there is one. + /** @type {DepositFacet} */ + const depositFacet = await E(board) + .getValue(depositAddress) + .catch(_ => E(namesByAddress).lookup(depositAddress, 'depositFacet')); + + const { userSeat, zcfSeat } = zcf.makeEmptySeatKit(); + + // Redeem the backing payment. + try { + redeem(zcfSeat, { Transfer: localAmount }); + zcfSeat.exit(); + } catch (e) { + zcfSeat.fail(e); + throw e; + } + + // Once we've gotten to this point, their payment is committed and + // won't be refunded on a failed receive. + const payout = await E(userSeat).getPayout('Transfer'); + + // Send the payout promise to the deposit facet. + // + // We don't want to wait for the depositFacet to return, so that + // it can't hang up (i.e. DoS) an ordered channel, which relies on + // us returning promptly. + E(depositFacet) + .receive(payout) + .catch(_ => {}); + + return E(transferProtocol).makeTransferPacketAck(true); + }; + + return Far('courier', { send, receive }); +}; diff --git a/packages/pegasus/src/once-promise-kit.js b/packages/pegasus/src/once-promise-kit.js new file mode 100644 index 00000000000..077b03019d7 --- /dev/null +++ b/packages/pegasus/src/once-promise-kit.js @@ -0,0 +1,33 @@ +import { assert } from '@agoric/assert'; +import { makePromiseKit } from '@agoric/promise-kit'; + +/** + * Create a promise kit that will throw an exception if it is resolved or + * rejected more than once. + * + * @param {() => Details} makeReinitDetails + */ +export const makeOncePromiseKit = makeReinitDetails => { + const { promise, resolve, reject } = makePromiseKit(); + + let initialized = false; + /** + * @template {any[]} A + * @template R + * @param {(...args: A) => R} fn + * @returns {(...args: A) => R} + */ + const onceOnly = fn => (...args) => { + assert(!initialized, makeReinitDetails()); + initialized = true; + return fn(...args); + }; + + /** @type {PromiseRecord} */ + const oncePK = harden({ + promise, + resolve: onceOnly(resolve), + reject: onceOnly(reject), + }); + return oncePK; +}; diff --git a/packages/pegasus/src/pegasus.js b/packages/pegasus/src/pegasus.js index 51456f70463..d7434ab0ba7 100644 --- a/packages/pegasus/src/pegasus.js +++ b/packages/pegasus/src/pegasus.js @@ -1,7 +1,7 @@ // @ts-check -import { assert, details as X, q } from '@agoric/assert'; -import { makeLegacyWeakMap, makeStore, makeLegacyMap } from '@agoric/store'; +import { assert, details as X } from '@agoric/assert'; +import { makeLegacyWeakMap, makeLegacyMap } from '@agoric/store'; import { E } from '@agoric/eventual-send'; import { assertProposalShape } from '@agoric/zoe/src/contractSupport/index.js'; import { Far } from '@endo/marshal'; @@ -12,170 +12,17 @@ import '@agoric/swingset-vat/src/vats/network/types.js'; import '@agoric/zoe/exported.js'; import '../exported.js'; -import { makePromiseKit } from '@agoric/promise-kit'; -import { AmountMath } from '@agoric/ertp'; import { ICS20TransferProtocol } from './ics20.js'; +import { makeCourierMaker, getCourierPK } from './courier.js'; const DEFAULT_TRANSFER_PROTOCOL = ICS20TransferProtocol; -const DEFAULT_AMOUNT_MATH_KIND = 'nat'; - const TRANSFER_PROPOSAL_SHAPE = { give: { Transfer: null, }, }; -/** - * Create a promise kit that will throw an exception if it is resolved or - * rejected more than once. - * - * @param {() => Details} makeReinitDetails - */ -const makeOncePromiseKit = makeReinitDetails => { - const { promise, resolve, reject } = makePromiseKit(); - - let initialized = false; - /** - * @template {any[]} A - * @template R - * @param {(...args: A) => R} fn - * @returns {(...args: A) => R} - */ - const onceOnly = fn => (...args) => { - assert(!initialized, makeReinitDetails()); - initialized = true; - return fn(...args); - }; - - /** @type {PromiseRecord} */ - const oncePK = harden({ - promise, - resolve: onceOnly(resolve), - reject: onceOnly(reject), - }); - return oncePK; -}; - -/** - * Create or return an existing courier promise kit. - * - * @param {Denom} remoteDenom - * @param {Store>} remoteDenomToCourierPK - */ -const getCourierPK = (remoteDenom, remoteDenomToCourierPK) => { - if (remoteDenomToCourierPK.has(remoteDenom)) { - return remoteDenomToCourierPK.get(remoteDenom); - } - - // This is the first packet for this denomination. - // Create a new Courier promise kit for it. - const courierPK = makeOncePromiseKit(() => X`${remoteDenom} already pegged`); - - remoteDenomToCourierPK.init(remoteDenom, courierPK); - return courierPK; -}; - -/** - * Create the [send, receive] pair. - * - * @typedef {Object} CourierArgs - * @property {ContractFacet} zcf - * @property {ERef} connection - * @property {ERef} board - * @property {ERef} namesByAddress - * @property {Denom} remoteDenom - * @property {Brand} localBrand - * @property {(zcfSeat: ZCFSeat, amounts: AmountKeywordRecord) => void} retain - * @property {(zcfSeat: ZCFSeat, amounts: AmountKeywordRecord) => void} redeem - * @property {ERef} transferProtocol - * @param {CourierArgs} arg0 - * @returns {Courier} - */ -const makeCourier = ({ - zcf, - connection, - board, - namesByAddress, - remoteDenom, - localBrand, - retain, - redeem, - transferProtocol, -}) => { - /** @type {Sender} */ - const send = async (zcfSeat, depositAddress) => { - const tryToSend = async () => { - const amount = zcfSeat.getAmountAllocated('Transfer', localBrand); - const transferPacket = await E(transferProtocol).makeTransferPacket({ - value: amount.value, - remoteDenom, - depositAddress, - }); - - // Retain the payment. We must not proceed on failure. - retain(zcfSeat, { Transfer: amount }); - - // The payment is already escrowed, and proposed to retain, so try sending. - return E(connection) - .send(transferPacket) - .then(ack => E(transferProtocol).assertTransferPacketAck(ack)) - .then( - _ => zcfSeat.exit(), - reason => { - // Return the payment to the seat, if possible. - redeem(zcfSeat, { Transfer: amount }); - throw reason; - }, - ); - }; - - // Reflect any error back to the seat. - return tryToSend().catch(reason => { - zcfSeat.fail(reason); - }); - }; - - /** @type {Receiver} */ - const receive = async ({ value, depositAddress }) => { - const localAmount = AmountMath.make(localBrand, value); - - // Look up the deposit facet for this board address, if there is one. - /** @type {DepositFacet} */ - const depositFacet = await E(board) - .getValue(depositAddress) - .catch(_ => E(namesByAddress).lookup(depositAddress, 'depositFacet')); - - const { userSeat, zcfSeat } = zcf.makeEmptySeatKit(); - - // Redeem the backing payment. - try { - redeem(zcfSeat, { Transfer: localAmount }); - zcfSeat.exit(); - } catch (e) { - zcfSeat.fail(e); - throw e; - } - - // Once we've gotten to this point, their payment is committed and - // won't be refunded on a failed receive. - const payout = await E(userSeat).getPayout('Transfer'); - - // Send the payout promise to the deposit facet. - // - // We don't want to wait for the depositFacet to return, so that - // it can't hang up (i.e. DoS) an ordered channel, which relies on - // us returning promptly. - E(depositFacet) - .receive(payout) - .catch(_ => {}); - - return E(transferProtocol).makeTransferPacketAck(true); - }; - - return Far('courier', { send, receive }); -}; - /** * Make a Pegasus public API. * @@ -187,16 +34,10 @@ const makePegasus = (zcf, board, namesByAddress) => { /** * @typedef {Object} LocalDenomState * @property {Store>} remoteDenomToCourierPK + * @property {IterationObserver} remoteDenomPublication * @property {Subscription} remoteDenomSubscription * @property {number} lastDenomNonce - * @property {ERef} transferProtocol - */ - - /** - * @type {LegacyWeakMap} */ - // Legacy because the value contains a JS Set - const connectionToLocalDenomState = makeLegacyWeakMap('Connection'); let lastLocalIssuerNonce = 0; /** @@ -210,23 +51,23 @@ const makePegasus = (zcf, board, namesByAddress) => { }; /** - * @type {Store} + * @type {LegacyWeakMap} */ - const pegToConnection = makeStore('Peg'); + const pegToDenomState = makeLegacyWeakMap('Peg'); /** * Create a fresh Peg associated with a descriptor. * - * @typedef {Object} PegDescriptor + * @typedef {Object} PegasusDescriptor * @property {Brand} localBrand * @property {Denom} remoteDenom * @property {string} allegedName * - * @param {Connection} c - * @param {PegDescriptor} desc + * @param {LocalDenomState} state + * @param {PegasusDescriptor} desc * @returns {Peg} */ - const makePeg = (c, desc) => { + const makePeg = (state, desc) => { /** @type {Peg} */ const peg = Far('peg', { getAllegedName() { @@ -240,42 +81,214 @@ const makePegasus = (zcf, board, namesByAddress) => { }, }); - pegToConnection.init(peg, c); + pegToDenomState.init(peg, state); return peg; }; + /** + * @param {Object} param0 + * @param {ReturnType} param0.makeCourier + * @param {LocalDenomState} param0.localDenomState + * @param {ERef} param0.transferProtocol + * @returns {PegasusConnectionActions} + */ + const makePegasusConnectionActions = ({ + makeCourier, + localDenomState, + transferProtocol, + }) => { + /** @type {PegasusConnectionActions} */ + const pegasusConnectionActions = { + async rejectStuckTransfers(remoteDenom) { + const { remoteDenomToCourierPK } = localDenomState; + + const { reject, promise } = remoteDenomToCourierPK.get(remoteDenom); + promise.catch(() => {}); + reject(assert.error(X`${remoteDenom} is temporarily unavailable`)); + + // Allow new transfers to be initiated. + remoteDenomToCourierPK.delete(remoteDenom); + }, + async pegRemote( + allegedName, + remoteDenom, + assetKind = undefined, + displayInfo = undefined, + ) { + const { remoteDenomToCourierPK } = localDenomState; + + // Create the issuer for the local erights corresponding to the remote values. + const localKeyword = createLocalIssuerKeyword(); + const zcfMint = await zcf.makeZCFMint( + localKeyword, + assetKind, + displayInfo, + ); + const { brand: localBrand } = zcfMint.getIssuerRecord(); + + // Describe how to retain/redeem pegged shadow erights. + const courier = makeCourier({ + zcf, + localBrand, + board, + namesByAddress, + remoteDenom, + retain: (zcfSeat, amounts) => + zcfMint.burnLosses(harden(amounts), zcfSeat), + redeem: (zcfSeat, amounts) => { + zcfMint.mintGains(harden(amounts), zcfSeat); + }, + transferProtocol, + }); + + const courierPK = getCourierPK(remoteDenom, remoteDenomToCourierPK); + courierPK.resolve(courier); + + return makePeg(localDenomState, { + localBrand, + remoteDenom, + allegedName, + }); + }, + + async pegLocal(allegedName, localIssuer) { + // We need the last nonce for our denom name. + localDenomState.lastDenomNonce += 1; + const remoteDenom = `pegasus${localDenomState.lastDenomNonce}`; + + // Create a seat in which to keep our denomination. + const { zcfSeat: poolSeat } = zcf.makeEmptySeatKit(); + + // Ensure the issuer can be used in Zoe offers. + const localKeyword = createLocalIssuerKeyword(); + const { brand: localBrand } = await zcf.saveIssuer( + localIssuer, + localKeyword, + ); + + /** + * Transfer amount (of localBrand) from loser to winner seats. + * + * @param {Amount} amount amount to transfer + * @param {Keyword} loserKeyword the keyword to take from the loser + * @param {ZCFSeat} loser seat to transfer from + * @param {Keyword} winnerKeyword the keyword to give to the winner + * @param {ZCFSeat} winner seat to transfer to + */ + const transferAmountFrom = ( + amount, + loserKeyword, + loser, + winnerKeyword, + winner, + ) => { + // Transfer the amount to our backing seat. + loser.decrementBy(harden({ [loserKeyword]: amount })); + winner.incrementBy(harden({ [winnerKeyword]: amount })); + zcf.reallocate(loser, winner); + }; + + // Describe how to retain/redeem real local erights. + const courier = makeCourier({ + zcf, + board, + namesByAddress, + remoteDenom, + localBrand, + retain: (transferSeat, amounts) => + transferAmountFrom( + amounts.Transfer, + 'Transfer', + transferSeat, + 'Pool', + poolSeat, + ), + redeem: (transferSeat, amounts) => + transferAmountFrom( + amounts.Transfer, + 'Pool', + poolSeat, + 'Transfer', + transferSeat, + ), + transferProtocol, + }); + + const { remoteDenomToCourierPK } = localDenomState; + + const courierPK = getCourierPK(remoteDenom, remoteDenomToCourierPK); + courierPK.resolve(courier); + + return makePeg(localDenomState, { + localBrand, + remoteDenom, + allegedName, + }); + }, + }; + return Far('pegasusConnectionActions', pegasusConnectionActions); + }; + return Far('pegasus', { /** * Return a handler that can be used with the Network API. * * @param {ERef} [transferProtocol=DEFAULT_TRANSFER_PROTOCOL] - * @returns {ConnectionHandler} + * @returns {PegasusConnectionKit} */ - makePegConnectionHandler(transferProtocol = DEFAULT_TRANSFER_PROTOCOL) { + makePegasusConnectionKit(transferProtocol = DEFAULT_TRANSFER_PROTOCOL) { /** - * @type {Store>} + * @type {WeakStore} */ - const remoteDenomToCourierPK = makeLegacyMap('Denomination'); + // Legacy because the value contains a JS Set + const connectionToLocalDenomState = makeLegacyWeakMap('Connection'); /** - * @type {SubscriptionRecord} + * @type {SubscriptionRecord} */ const { - subscription: remoteDenomSubscription, - publication: remoteDenomPublication, + subscription: connectionSubscription, + publication: connectionPublication, } = makeSubscriptionKit(); - return Far('pegConnectionHandler', { - async onOpen(c) { + /** @type {ConnectionHandler} */ + const handler = { + async onOpen(c, localAddr, remoteAddr) { // Register C with the table of Peg receivers. - connectionToLocalDenomState.init(c, { + const { + subscription: remoteDenomSubscription, + publication: remoteDenomPublication, + } = makeSubscriptionKit(); + const remoteDenomToCourierPK = makeLegacyMap('Denomination'); + + /** @type {LocalDenomState} */ + const localDenomState = { remoteDenomToCourierPK, lastDenomNonce: 0, + remoteDenomPublication, + remoteDenomSubscription, + }; + + // The courier is the only thing that we use to send messages to C. + const makeCourier = makeCourierMaker(c); + const actions = makePegasusConnectionActions({ + localDenomState, + makeCourier, transferProtocol, + }); + + connectionToLocalDenomState.init(c, localDenomState); + + /** @type {PegasusConnectionSubscription} */ + const subData = harden({ + localAddr, + remoteAddr, + actions, remoteDenomSubscription, }); + connectionPublication.updateState(subData); }, - async onReceive(_c, packetBytes) { + async onReceive(c, packetBytes) { const doReceive = async () => { // Dispatch the packet to the appropriate Peg for this connection. const parts = await E(transferProtocol).parseTransferPacket( @@ -285,6 +298,11 @@ const makePegasus = (zcf, board, namesByAddress) => { const { remoteDenom } = parts; assert.typeof(remoteDenom, 'string'); + const { + remoteDenomToCourierPK, + remoteDenomPublication, + } = connectionToLocalDenomState.get(c); + if (!remoteDenomToCourierPK.has(remoteDenom)) { // This is the first time we've heard of this denomination. remoteDenomPublication.updateState(remoteDenom); @@ -302,216 +320,21 @@ const makePegasus = (zcf, board, namesByAddress) => { }, async onClose(c) { // Unregister C. Pending transfers will be rejected by the Network API. + const { remoteDenomPublication } = connectionToLocalDenomState.get(c); connectionToLocalDenomState.delete(c); remoteDenomPublication.fail( - assert.error(X`pegConnectionHandler closed`), + assert.error(X`pegasusConnectionHandler closed`), ); }, - }); - }, - - /** - * Get a subscription for remote denoms added on a connection. - * - * @param {ERef} connectionP - */ - async getRemoteDenomSubscription(connectionP) { - const connection = await connectionP; - const { remoteDenomSubscription } = connectionToLocalDenomState.get( - connection, - ); - return remoteDenomSubscription; - }, - - /** - * Abort any in-progress remoteDenom transfers if there has not yet been a - * pegRemote or pegLocal for it. - * - * This races against any attempts to obtain metadata and establish a given - * peg. - * - * It's alright to expose to the holder of the connection. - * - * @param {ERef} connectionP - * @param {string} remoteDenom - */ - async rejectStuckTransfers(connectionP, remoteDenom) { - const connection = await connectionP; - const { remoteDenomToCourierPK } = connectionToLocalDenomState.get( - connection, - ); - - const { reject, promise } = remoteDenomToCourierPK.get(remoteDenom); - promise.catch(() => {}); - reject(assert.error(X`${remoteDenom} is temporarily unavailable`)); - - // Allow new transfers to be initiated. - remoteDenomToCourierPK.delete(remoteDenom); - }, - - /** - * Peg a remote asset over a network connection. - * - * @param {string} allegedName - * @param {ERef} connectionP The network connection (such as IBC - * channel) to communicate over - * @param {Denom} remoteDenom Remote denomination - * @param {string} [assetKind=DEFAULT_AMOUNT_MATH_KIND] The kind of - * amount math for the pegged values - * @param {DisplayInfo} [displayInfo] - * @returns {Promise} - */ - async pegRemote( - allegedName, - connectionP, - remoteDenom, - assetKind = DEFAULT_AMOUNT_MATH_KIND, - displayInfo = undefined, - ) { - // Assertions - assert( - assetKind === 'nat', - X`Unimplemented assetKind ${q(assetKind)}; need "nat"`, - ); - - const c = await connectionP; - assert( - connectionToLocalDenomState.has(c), - X`The connection must use .makePegConnectionHandler()`, - ); - - const { transferProtocol } = connectionToLocalDenomState.get(c); - - // Create the issuer for the local erights corresponding to the remote values. - const localKeyword = createLocalIssuerKeyword(); - const zcfMint = await zcf.makeZCFMint( - localKeyword, - assetKind, - displayInfo, - ); - const { brand: localBrand } = zcfMint.getIssuerRecord(); - - // Describe how to retain/redeem pegged shadow erights. - const courier = makeCourier({ - zcf, - connection: c, - localBrand, - board, - namesByAddress, - remoteDenom, - retain: (zcfSeat, amounts) => - zcfMint.burnLosses(harden(amounts), zcfSeat), - redeem: (zcfSeat, amounts) => { - zcfMint.mintGains(harden(amounts), zcfSeat); - }, - transferProtocol, - }); - - const { remoteDenomToCourierPK } = connectionToLocalDenomState.get(c); - - const courierPK = getCourierPK(remoteDenom, remoteDenomToCourierPK); - courierPK.resolve(courier); - - return makePeg(c, { localBrand, remoteDenom, allegedName }); - }, - - /** - * Peg a local asset over a network connection. - * - * @param {string} allegedName - * @param {ERef} connectionP The network connection (such as IBC - * channel) to communicate over - * @param {Issuer} localIssuer Local ERTP issuer whose assets should be - * pegged to the connection - * @returns {Promise} - */ - async pegLocal(allegedName, connectionP, localIssuer) { - const c = await connectionP; - assert( - connectionToLocalDenomState.has(c), - X`The connection must use .makePegConnectionHandler()`, - ); - - // We need the last nonce for our denom name. - const localDenomState = connectionToLocalDenomState.get(c); - const { transferProtocol } = localDenomState; - localDenomState.lastDenomNonce += 1; - const remoteDenom = `pegasus${localDenomState.lastDenomNonce}`; - - // Create a seat in which to keep our denomination. - const { zcfSeat: poolSeat } = zcf.makeEmptySeatKit(); - - // Ensure the issuer can be used in Zoe offers. - const localKeyword = createLocalIssuerKeyword(); - const { brand: localBrand } = await zcf.saveIssuer( - localIssuer, - localKeyword, - ); - - /** - * Transfer amount (of localBrand) from loser to winner seats. - * - * @param {Amount} amount amount to transfer - * @param {Keyword} loserKeyword the keyword to take from the loser - * @param {ZCFSeat} loser seat to transfer from - * @param {Keyword} winnerKeyword the keyword to give to the winner - * @param {ZCFSeat} winner seat to transfer to - */ - const transferAmountFrom = ( - amount, - loserKeyword, - loser, - winnerKeyword, - winner, - ) => { - // Transfer the amount to our backing seat. - loser.decrementBy(harden({ [loserKeyword]: amount })); - winner.incrementBy(harden({ [winnerKeyword]: amount })); - zcf.reallocate(loser, winner); }; - // Describe how to retain/redeem real local erights. - const courier = makeCourier({ - zcf, - connection: c, - board, - namesByAddress, - remoteDenom, - localBrand, - retain: (transferSeat, amounts) => - transferAmountFrom( - amounts.Transfer, - 'Transfer', - transferSeat, - 'Pool', - poolSeat, - ), - redeem: (transferSeat, amounts) => - transferAmountFrom( - amounts.Transfer, - 'Pool', - poolSeat, - 'Transfer', - transferSeat, - ), - transferProtocol, + return harden({ + handler: Far('pegasusConnectionHandler', handler), + subscription: connectionSubscription, }); - - const { remoteDenomToCourierPK } = localDenomState; - - const courierPK = getCourierPK(remoteDenom, remoteDenomToCourierPK); - courierPK.resolve(courier); - - return makePeg(c, { localBrand, remoteDenom, allegedName }); }, - /** - * Find one of our registered issuers. - * - * @param {Brand} localBrand - * @returns {Promise} - */ - async getLocalIssuer(localBrand) { + getLocalIssuer(localBrand) { return zcf.getIssuerForBrand(localBrand); }, @@ -525,11 +348,11 @@ const makePegasus = (zcf, board, namesByAddress) => { async makeInvitationToTransfer(pegP, depositAddress) { // Verify the peg. const peg = await pegP; - const c = pegToConnection.get(peg); + const denomState = pegToDenomState.get(peg); // Get details from the peg. const remoteDenom = await E(peg).getRemoteDenom(); - const { remoteDenomToCourierPK } = connectionToLocalDenomState.get(c); + const { remoteDenomToCourierPK } = denomState; const courierPK = getCourierPK(remoteDenom, remoteDenomToCourierPK); const { send } = await courierPK.promise; diff --git a/packages/pegasus/src/types.js b/packages/pegasus/src/types.js index 6ffb6a5bc4e..28745f4eeca 100644 --- a/packages/pegasus/src/types.js +++ b/packages/pegasus/src/types.js @@ -40,3 +40,59 @@ * @property {Sender} send * @property {Receiver} receive */ + +/** + * @callback RejectStuckTransfers + * Abort any in-progress remoteDenom transfers if there has not yet been a + * pegRemote or pegLocal for it. + * + * This races against any attempts to obtain metadata and establish a given + * peg. + * + * It's alright to expose to the holder of the connection. + * + * @param {Denom} remoteDenom + * @returns {Promise} + */ + +/** + * @callback PegRemote + * Peg a remote asset over a network connection. + * + * @param {string} allegedName + * @param {Denom} remoteDenom + * @param {AssetKind} [assetKind] The kind of the pegged values + * @param {DisplayInfo} [displayInfo] + * @returns {Promise} + */ + +/** + * @callback PegLocal + * Peg a local asset over a network connection. + * + * @param {string} allegedName + * @param {Issuer} localIssuer Local ERTP issuer whose assets should be + * pegged to the connection + * @returns {Promise} + */ + +/** + * @typedef {Object} PegasusConnectionActions + * @property {PegLocal} pegLocal + * @property {PegRemote} pegRemote + * @property {RejectStuckTransfers} rejectStuckTransfers + */ + +/** + * @typedef {Object} PegasusConnectionSubscription + * @property {PegasusConnectionActions} actions + * @property {Address} localAddr + * @property {Address} remoteAddr + * @property {Subscription} remoteDenomSubscription + */ + +/** + * @typedef {Object} PegasusConnectionKit + * @property {ConnectionHandler} handler + * @property {Subscription} subscription + */ diff --git a/packages/pegasus/test/test-peg.js b/packages/pegasus/test/test-peg.js index c08d0d4d72d..8280b2e38ab 100644 --- a/packages/pegasus/test/test-peg.js +++ b/packages/pegasus/test/test-peg.js @@ -24,10 +24,20 @@ const dirname = path.dirname(filename); const contractPath = `${dirname}/../src/pegasus.js`; /** - * @param {import('tape-promise/tape').Test} t + * @template T + * @param {ERef>} sub + * @returns {AsyncIterator} + */ +const makeAsyncIteratorFromSubscription = sub => + makeSubscription(E(sub).getSharableSubscriptionInternals())[ + Symbol.asyncIterator + ](); + +/** + * @param {import('ava').Assertions} t */ async function testRemotePeg(t) { - t.plan(16); + t.plan(20); /** * @type {PromiseRecord} @@ -107,7 +117,9 @@ async function testRemotePeg(t) { ); // Pretend we're Agoric. - const chandler = E(pegasus).makePegConnectionHandler(); + const { handler: chandler, subscription: connectionSubscription } = await E( + pegasus, + ).makePegasusConnectionKit(); const connP = E(portP).connect(portName, chandler); // Get some local Atoms. @@ -117,23 +129,41 @@ async function testRemotePeg(t) { receiver: '0x1234', sender: 'FIXME:sender', }; - await connP; + t.assert(await connP); const sendAckDataP = E(gaiaConnection).send(JSON.stringify(sendPacket)); // Note that we can create the peg after the fact. - const remoteDenomSub = makeSubscription( - E( - E(pegasus).getRemoteDenomSubscription(connP), - ).getSharableSubscriptionInternals(), + const connectionAit = makeAsyncIteratorFromSubscription( + connectionSubscription, + ); + const { + value: { + actions: pegConnActions, + localAddr, + remoteAddr, + remoteDenomSubscription, + }, + } = await connectionAit.next(); + + // Check the connection metadata. + t.is(localAddr, '/ibc-channel/chanabc/ibc-port/portdef/nonce/1', 'localAddr'); + t.is( + remoteAddr, + '/ibc-channel/chanabc/ibc-port/portdef/nonce/2', + 'remoteAddr', + ); + + // Find the first remoteDenom. + const remoteDenomAit = makeAsyncIteratorFromSubscription( + remoteDenomSubscription, ); - const remoteDenomAit = remoteDenomSub[Symbol.asyncIterator](); t.deepEqual(await remoteDenomAit.next(), { done: false, value: 'uatom' }); - const pegP = await E(pegasus).pegRemote('Gaia', connP, 'uatom'); + const pegP = E(pegConnActions).pegRemote('Gaia', 'uatom'); const localBrand = await E(pegP).getLocalBrand(); - const localIssuer = await E(pegasus).getLocalIssuer(localBrand); + const localIssuerP = E(pegasus).getLocalIssuer(localBrand); - const localPurseP = E(localIssuer).makeEmptyPurse(); + const localPurseP = E(localIssuerP).makeEmptyPurse(); resolveLocalDepositFacet(E(localPurseP).getDepositFacet()); const sendAckData = await sendAckDataP; @@ -183,7 +213,7 @@ async function testRemotePeg(t) { // Wait for the packet to go through. t.deepEqual(await remoteDenomAit.next(), { done: false, value: 'umuon' }); - E(pegasus).rejectStuckTransfers(connP, 'umuon'); + E(pegConnActions).rejectStuckTransfers('umuon'); const sendAckData3 = await sendAckData3P; const sendAck3 = JSON.parse(sendAckData3); @@ -212,13 +242,18 @@ async function testRemotePeg(t) { t.is(outcome, undefined, 'transfer is successful'); const paymentPs = await seat.getPayouts(); - const refundAmount = await E(localIssuer).getAmountOf(paymentPs.Transfer); + const refundAmount = await E(localIssuerP).getAmountOf(paymentPs.Transfer); const isEmptyRefund = AmountMath.isEmpty(refundAmount, localBrand); t.assert(isEmptyRefund, 'no refund from success'); - const stillIsLive = await E(localIssuer).isLive(localAtoms); + const stillIsLive = await E(localIssuerP).isLive(localAtoms); t.assert(!stillIsLive, 'payment is consumed'); + + await E(connP).close(); + await t.throwsAsync(() => remoteDenomAit.next(), { + message: 'pegasusConnectionHandler closed', + }); } test('remote peg', t => testRemotePeg(t));