Skip to content

Commit

Permalink
fix: match notifier semantics to async iterables (#1332)
Browse files Browse the repository at this point in the history
  • Loading branch information
erights authored Jul 26, 2020
1 parent e5034f9 commit efbf359
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 86 deletions.
6 changes: 3 additions & 3 deletions packages/cosmic-swingset/lib/ag-solo/vats/lib-wallet.js
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ export async function makeWallet({
// handle the update, which has already resolved to a record. If the offer is
// 'done', mark the offer 'complete', otherwise resubscribe to the notifier.
function updateOrResubscribe(id, offerHandle, update) {
const { updateHandle, done } = update;
if (done) {
const { updateCount } = update;
if (updateCount === undefined) {
// TODO do we still need these?
idToOfferHandle.delete(id);

Expand All @@ -245,7 +245,7 @@ export async function makeWallet({
idToNotifierP.delete(id);
} else {
E(idToNotifierP.get(id))
.getUpdateSince(updateHandle)
.getUpdateSince(updateCount)
.then(nextUpdate => updateOrResubscribe(id, offerHandle, nextUpdate));
}
}
Expand Down
165 changes: 119 additions & 46 deletions packages/notifier/src/notifier.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,49 @@
import { producePromise } from '@agoric/produce-promise';

/**
* @typedef {Object} UpdateHandle a value used to mark the position in the update stream
* @typedef {number | undefined} UpdateCount a value used to mark the position
* in the update stream. For the last state, the updateCount is undefined.
*/

/**
* @template T the type of the state value
* @typedef {Object} UpdateRecord<T>
* @property {T} value is whatever state the service wants to publish
* @property {UpdateHandle} updateHandle is a value that identifies the update
* @property {boolean} done false until the updater publishes a final state
* @property {UpdateCount} updateCount is a value that identifies the update
*/

/**
* @template T the type of the notifier state
* @callback GetUpdateSince<T> Can be called repeatedly to get a sequence of update records
* @param {UpdateHandle} [updateHandle] return update record as of a handle
* If the handle argument is omitted or differs from the current handle, return the current record.
* Otherwise, after the next state change, the promise will resolve to the then-current value of the record.
* @callback GetUpdateSince<T> Can be called repeatedly to get a sequence of
* update records
* @param {UpdateCount} [updateCount] return update record as of a handle
* If the handle argument is omitted or differs from the current handle,
* return the current record.
* Otherwise, after the next state change, the promise will resolve to the
* then-current value of the record.
* @returns {Promise<UpdateRecord<T>>} resolves to the corresponding update
*/

/**
* @template T the type of the notifier state
* @typedef {Object} Notifier<T> an object that can be used to get the current state or updates
* @property {GetUpdateSince<T>} getUpdateSince return update record as of a handle
* @property {() => UpdateRecord<T>} getCurrentUpdate return the current update record
* @typedef {Object} Notifier<T> an object that can be used to get the current
* state or updates
* @property {GetUpdateSince<T>} getUpdateSince return update record as of a
* handle
*/

/**
* @template T the type of the notifier state
* @typedef {Object} Updater<T> an object that should be closely held, as anyone with access to
* @typedef {Object} Updater<T> an object that should be closely held, as
* anyone with access to
* it can provide updates
* @property {(state: T) => void} updateState sets the new state, and resolves the outstanding promise to send an update
* @property {(finalState: T) => void} resolve sets the final state, sends a final update, and freezes the
* @property {(state: T) => void} updateState sets the new state, and resolves
* the outstanding promise to send an update
* @property {(finalState: T) => void} finish sets the final state, sends a
* final update, and freezes the
* updater
* @property {(reason: T) => void} reject the stream becomes erroneously
* terminated, allegedly for the stated reason.
*/

/**
Expand All @@ -48,6 +57,13 @@ import { producePromise } from '@agoric/produce-promise';
* @property {Updater<T>} updater the (closely-held) notifier producer
*/

/**
* Whether to enable deprecated legacy features to support legacy clients
* during the transition. TODO once all clients are updated to the new API,
* remove this flag and all code enabled by this flag.
*/
const supportLegacy = true;

/**
* Produces a pair of objects, which allow a service to produce a stream of
* update promises.
Expand All @@ -56,55 +72,112 @@ import { producePromise } from '@agoric/produce-promise';
* @param {T} [initialState] the first state to be returned
* @returns {NotifierRecord<T>} the notifier and updater
*/
export const makeNotifierKit = (initialState = undefined) => {
let currentPromiseRec = producePromise();
let currentResponse = harden({
value: initialState,
updateHandle: {},
done: false,
});

function getCurrentUpdate() {
return currentResponse;
// The initial state argument has to be truly optional even though it can
// be any first class value including `undefined`. We need to distinguish the
// presence vs the absence of it, which we cannot do with the optional argument
// syntax. Rather we use the arity of the arguments array.
//
// If no initial state is provided to `makeNotifierKit`, then it starts without
// an initial state. Its initial state will instead be the state of the first
// update.
export const makeNotifierKit = (...args) => {
let nextPromiseKit = producePromise();
let currentUpdateCount = 1; // avoid falsy numbers
let currentResponse;

const hasState = () => currentResponse !== undefined;

const final = () => currentUpdateCount === undefined;

const extraProperties = () =>
supportLegacy
? {
updateHandle: currentUpdateCount,
done: final(),
}
: {};

if (args.length >= 1) {
// start as hasState() && !final()
currentResponse = harden({
value: args[0],
updateCount: currentUpdateCount,
...extraProperties(),
});
}

function getUpdateSince(updateHandle = undefined) {
if (updateHandle === currentResponse.updateHandle) {
return currentPromiseRec.promise;
// else start as !hasState() && !final()

// NaN matches nothing
function getUpdateSince(updateCount = NaN) {
if (
hasState() &&
(final() || currentResponse.updateCount !== updateCount)
) {
// If hasState() and either it is final() or it is
// not the state of updateCount, return the current state.
return Promise.resolve(currentResponse);
}
return Promise.resolve(currentResponse);
// otherwise return a promise for the next state.
return nextPromiseKit.promise;
}

function updateState(state) {
if (!currentResponse.updateHandle) {
throw new Error('Cannot update state after resolve.');
if (final()) {
throw new Error('Cannot update state after termination.');
}

currentResponse = harden({ value: state, updateHandle: {}, done: false });
currentPromiseRec.resolve(currentResponse);
currentPromiseRec = producePromise();
// become hasState() && !final()
currentUpdateCount += 1;
currentResponse = harden({
value: state,
updateCount: currentUpdateCount,
...extraProperties(),
});
nextPromiseKit.resolve(currentResponse);
nextPromiseKit = producePromise();
}

function resolve(finalState) {
if (!currentResponse.updateHandle) {
throw new Error('Cannot resolve again.');
function finish(finalState) {
if (final()) {
throw new Error('Cannot finish after termination.');
}

// become hasState() && final()
currentUpdateCount = undefined;
currentResponse = harden({
value: finalState,
updateHandle: undefined,
done: true,
updateCount: currentUpdateCount,
...extraProperties(),
});
currentPromiseRec.resolve(currentResponse);
nextPromiseKit.resolve(currentResponse);
nextPromiseKit = undefined;
}

function reject(reason) {
if (final()) {
throw new Error('Cannot reject after termination.');
}

// become !hasState() && final()
currentUpdateCount = undefined;
currentResponse = undefined;
nextPromiseKit.reject(reason);
}

// notifier facet is separate so it can be handed out loosely while updater is
// tightly held
const notifier = harden({ getUpdateSince, getCurrentUpdate });
const updater = harden({ updateState, resolve });
// notifier facet is separate so it can be handed out loosely while updater
// is tightly held
const notifier = harden({ getUpdateSince });
const updater = harden({
updateState,
finish,
reject,
...(supportLegacy ? { resolve: finish } : {}),
});
return harden({ notifier, updater });
};

// Deprecated. TODO remove

export { makeNotifierKit as produceNotifier };
// Deprecated. TODO remove once no clients need it.
// Unlike makeNotifierKit, produceIssuerKit always produces
// a notifier with an initial state, which defaults to undefined.
export const produceNotifier = (initialState = undefined) =>
makeNotifierKit(initialState);
34 changes: 17 additions & 17 deletions packages/notifier/test/test-notifier.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ test('notifier - single update', async t => {
const updateDeNovo = await notifier.getUpdateSince();
t.equals(updateDeNovo.value, 1, 'initial state is one');

const updateInWaiting = notifier.getUpdateSince(updateDeNovo.updateHandle);
const updateInWaiting = notifier.getUpdateSince(updateDeNovo.updateCount);
const all = Promise.all([updateInWaiting]).then(([update]) => {
t.equals(update.value, 3, 'updated state is eventually three');
});

const update2 = await notifier.getUpdateSince({});
const update2 = await notifier.getUpdateSince();
t.equals(update2.value, 1);
updater.updateState(3);
await all;
Expand All @@ -47,15 +47,15 @@ test('notifier - initial update', async t => {
/** @type {NotifierRecord<number>} */
const { notifier, updater } = makeNotifierKit(1);

const updateDeNovo = notifier.getCurrentUpdate();
const updateDeNovo = await notifier.getUpdateSince();
t.equals(updateDeNovo.value, 1, 'initial state is one');

const updateInWaiting = notifier.getUpdateSince(updateDeNovo.updateHandle);
const updateInWaiting = notifier.getUpdateSince(updateDeNovo.updateCount);
const all = Promise.all([updateInWaiting]).then(([update]) => {
t.equals(update.value, 3, 'updated state is eventually three');
});

const update2 = await notifier.getUpdateSince({});
const update2 = await notifier.getUpdateSince();
t.equals(update2.value, 1);
updater.updateState(3);
await all;
Expand All @@ -65,22 +65,22 @@ test('notifier - update after state change', async t => {
t.plan(5);
const { notifier, updater } = makeNotifierKit(1);

const updateDeNovo = notifier.getCurrentUpdate();
const updateDeNovo = await notifier.getUpdateSince();

const updateInWaiting = notifier.getUpdateSince(updateDeNovo.updateHandle);
const updateInWaiting = notifier.getUpdateSince(updateDeNovo.updateCount);
t.equals(updateDeNovo.value, 1, 'first state check (1)');
const all = Promise.all([updateInWaiting]).then(([update1]) => {
t.equals(update1.value, 3, '4th check (delayed) 3');
const thirdStatePromise = notifier.getUpdateSince(update1.updateHandle);
const thirdStatePromise = notifier.getUpdateSince(update1.updateCount);
Promise.all([thirdStatePromise]).then(([update2]) => {
t.equals(update2.value, 5, '5th check (delayed) 5');
});
});

t.equals(notifier.getCurrentUpdate().value, 1, '2nd check (1)');
t.equals((await notifier.getUpdateSince()).value, 1, '2nd check (1)');
updater.updateState(3);

t.equals(notifier.getCurrentUpdate().value, 3, '3rd check (3)');
t.equals((await notifier.getUpdateSince()).value, 3, '3rd check (3)');
updater.updateState(5);
await all;
});
Expand All @@ -90,21 +90,21 @@ test('notifier - final state', async t => {
/** @type {NotifierRecord<number|string>} */
const { notifier, updater } = makeNotifierKit(1);

const updateDeNovo = notifier.getCurrentUpdate();
const updateInWaiting = notifier.getUpdateSince(updateDeNovo.updateHandle);
const updateDeNovo = await notifier.getUpdateSince();
const updateInWaiting = notifier.getUpdateSince(updateDeNovo.updateCount);
t.equals(updateDeNovo.value, 1, 'initial state is one');
const all = Promise.all([updateInWaiting]).then(([update]) => {
t.equals(update.value, 'final', 'state is "final"');
t.notOk(update.updateHandle, 'no handle after close');
const postFinalUpdate = notifier.getUpdateSince(update.updateHandle);
t.notOk(update.updateCount, 'no handle after close');
const postFinalUpdate = notifier.getUpdateSince(update.updateCount);
Promise.all([postFinalUpdate]).then(([after]) => {
t.equals(after.value, 'final', 'stable');
t.notOk(after.updateHandle, 'no handle after close');
t.notOk(after.updateCount, 'no handle after close');
});
});

const invalidHandle = await notifier.getUpdateSince({});
const invalidHandle = await notifier.getUpdateSince();
t.equals(invalidHandle.value, 1, 'still one');
updater.resolve('final');
updater.finish('final');
await all;
});
6 changes: 3 additions & 3 deletions packages/swingset-runner/demo/zoeTests/vat-alice.js
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,11 @@ const build = async (zoe, issuers, payments, installations, timer) => {
await showPurseBalance(simoleanPurseP, 'aliceSimoleanPurse', log);
};

function logStateOnChanges(notifier, lastHandle = undefined) {
const updateRecordP = E(notifier).getUpdateSince(lastHandle);
function logStateOnChanges(notifier, lastCount = undefined) {
const updateRecordP = E(notifier).getUpdateSince(lastCount);
updateRecordP.then(updateRec => {
log(updateRec.value);
logStateOnChanges(notifier, updateRec.updateHandle);
logStateOnChanges(notifier, updateRec.updateCount);
});
}

Expand Down
2 changes: 1 addition & 1 deletion packages/zoe/src/contractFacet.js
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ export function buildRootObject(_vatPowers) {
addOffer: (offerHandle, proposal, allocation) => {
const ignoringUpdater = harden({
updateState: () => {},
resolve: () => {},
finish: () => {},
});
const offerRecord = {
instanceHandle,
Expand Down
2 changes: 1 addition & 1 deletion packages/zoe/src/state.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ const makeOfferTable = () => {
deleteOffers: offerHandles => {
return offerHandles.map(offerHandle => {
const { updater } = table.get(offerHandle);
updater.resolve(undefined);
updater.finish(undefined);
return table.delete(offerHandle);
});
},
Expand Down
6 changes: 3 additions & 3 deletions packages/zoe/src/zoe.js
Original file line number Diff line number Diff line change
Expand Up @@ -611,13 +611,13 @@ function makeZoe(vatAdminSvc) {
// deposited. Keywords in the want clause are mapped to the empty
// amount for that keyword's Issuer.
const recordOffer = amountsArray => {
const notifierRec = makeNotifierKit();
const notifierKit = makeNotifierKit(undefined);
const offerRecord = {
instanceHandle,
proposal: cleanedProposal,
currentAllocation: arrayToObj(amountsArray, userKeywords),
notifier: notifierRec.notifier,
updater: notifierRec.updater,
notifier: notifierKit.notifier,
updater: notifierKit.updater,
};
const { zcfForZoe } = instanceTable.get(instanceHandle);
payoutMap.init(offerHandle, producePromise());
Expand Down
6 changes: 3 additions & 3 deletions packages/zoe/test/swingsetTests/zoe/vat-alice.js
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,11 @@ const build = async (log, zoe, issuers, payments, installations, timer) => {
await showPurseBalance(simoleanPurseP, 'aliceSimoleanPurse', log);
};

function logStateOnChanges(notifier, lastHandle = undefined) {
const updateRecordP = E(notifier).getUpdateSince(lastHandle);
function logStateOnChanges(notifier, lastCount = undefined) {
const updateRecordP = E(notifier).getUpdateSince(lastCount);
updateRecordP.then(updateRec => {
log(updateRec.value);
logStateOnChanges(notifier, updateRec.updateHandle);
logStateOnChanges(notifier, updateRec.updateCount);
});
}

Expand Down
Loading

0 comments on commit efbf359

Please sign in to comment.