Skip to content

Commit

Permalink
Move receipt-specific logic into ReceiptAccumulator (#3320)
Browse files Browse the repository at this point in the history
* Extract receipt accumulation logic into ReceiptAccumulator

* Rename readReceipts to unthreadedReadReceipts

* Move AccumulatedReceipt into receipt-accumulator

* Move the logic for consuming events into ReceiptAccumulator
  • Loading branch information
andybalaam authored Apr 27, 2023
1 parent 261bc81 commit 3d86821
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 96 deletions.
138 changes: 127 additions & 11 deletions src/receipt-accumulator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,40 +14,156 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

import { AccumulatedReceipt } from "./sync-accumulator";
import { MapWithDefault } from "./utils";
import { IMinimalEvent } from "./sync-accumulator";
import { EventType } from "./@types/event";
import { isSupportedReceiptType, MapWithDefault, recursiveMapToObject } from "./utils";
import { IContent } from "./models/event";
import { MAIN_ROOM_TIMELINE, ReceiptContent, ReceiptType } from "./@types/read_receipts";

interface AccumulatedReceipt {
data: IMinimalEvent;
type: ReceiptType;
eventId: string;
}

/**
* Summarises the read receipts within a room. Used by the sync accumulator.
*
* Given receipts for users, picks the most recently-received one and provides
* the results in a new fake receipt event returned from
* buildAccumulatedReceiptEvent().
*
* Handles unthreaded receipts and receipts in each thread separately, so the
* returned event contains the most recently received unthreaded receipt, and
* the most recently received receipt in each thread.
*/
export class ReceiptAccumulator {
private readReceipts: Map<string, AccumulatedReceipt> = new Map();
/** user_id -\> most-recently-received unthreaded receipt */
private unthreadedReadReceipts: Map<string, AccumulatedReceipt> = new Map();

/** thread_id -\> user_id -\> most-recently-received receipt for this thread */
private threadedReadReceipts: MapWithDefault<string, Map<string, AccumulatedReceipt>> = new MapWithDefault(
() => new Map(),
);

public setUnthreaded(userId: string, receipt: AccumulatedReceipt): void {
this.readReceipts.set(userId, receipt);
/**
* Provide an unthreaded receipt for this user. Overwrites any other
* unthreaded receipt we have for this user.
*/
private setUnthreaded(userId: string, receipt: AccumulatedReceipt): void {
this.unthreadedReadReceipts.set(userId, receipt);
}

public setThreaded(threadId: string, userId: string, receipt: AccumulatedReceipt): void {
/**
* Provide a receipt for this user in this thread. Overwrites any other
* receipt we have for this user in this thread.
*/
private setThreaded(threadId: string, userId: string, receipt: AccumulatedReceipt): void {
this.threadedReadReceipts.getOrCreate(threadId).set(userId, receipt);
}

/**
* @returns an iterator of pairs of [userId, AccumulatedReceipt] - all the
* unthreaded receipts for each user.
* most recently-received unthreaded receipts for each user.
*/
public allUnthreaded(): IterableIterator<[string, AccumulatedReceipt]> {
return this.readReceipts.entries();
private allUnthreaded(): IterableIterator<[string, AccumulatedReceipt]> {
return this.unthreadedReadReceipts.entries();
}

/**
* @returns an iterator of pairs of [userId, AccumulatedReceipt] - all the
* threaded receipts for each user, in all threads.
* most recently-received threaded receipts for each user, in all
* threads.
*/
public *allThreaded(): IterableIterator<[string, AccumulatedReceipt]> {
private *allThreaded(): IterableIterator<[string, AccumulatedReceipt]> {
for (const receiptsForThread of this.threadedReadReceipts.values()) {
for (const e of receiptsForThread.entries()) {
yield e;
}
}
}

/**
* Given a list of ephemeral events, find the receipts and store the
* relevant ones to be returned later from buildAccumulatedReceiptEvent().
*/
public consumeEphemeralEvents(events: IMinimalEvent[] | undefined): void {
events?.forEach((e) => {
if (e.type !== EventType.Receipt || !e.content) {
// This means we'll drop unknown ephemeral events but that
// seems okay.
return;
}

// Handle m.receipt events. They clobber based on:
// (user_id, receipt_type)
// but they are keyed in the event as:
// content:{ $event_id: { $receipt_type: { $user_id: {json} }}}
// so store them in the former so we can accumulate receipt deltas
// quickly and efficiently (we expect a lot of them). Fold the
// receipt type into the key name since we only have 1 at the
// moment (m.read) and nested JSON objects are slower and more
// of a hassle to work with. We'll inflate this back out when
// getJSON() is called.
Object.keys(e.content).forEach((eventId) => {
Object.entries<ReceiptContent>(e.content[eventId]).forEach(([key, value]) => {
if (!isSupportedReceiptType(key)) return;

for (const userId of Object.keys(value)) {
const data = e.content[eventId][key][userId];

const receipt = {
data: e.content[eventId][key][userId],
type: key as ReceiptType,
eventId,
};

if (!data.thread_id || data.thread_id === MAIN_ROOM_TIMELINE) {
this.setUnthreaded(userId, receipt);
} else {
this.setThreaded(data.thread_id, userId, receipt);
}
}
});
});
});
}

/**
* Build a receipt event that contains all relevant information for this
* room, taking the most recently received receipt for each user in an
* unthreaded context, and in each thread.
*/
public buildAccumulatedReceiptEvent(roomId: string): IMinimalEvent | null {
const receiptEvent: IMinimalEvent = {
type: EventType.Receipt,
room_id: roomId,
content: {
// $event_id: { "m.read": { $user_id: $json } }
} as IContent,
};

const receiptEventContent: MapWithDefault<
string,
MapWithDefault<ReceiptType, Map<string, object>>
> = new MapWithDefault(() => new MapWithDefault(() => new Map()));

for (const [userId, receiptData] of this.allUnthreaded()) {
receiptEventContent
.getOrCreate(receiptData.eventId)
.getOrCreate(receiptData.type)
.set(userId, receiptData.data);
}

for (const [userId, receiptData] of this.allThreaded()) {
receiptEventContent
.getOrCreate(receiptData.eventId)
.getOrCreate(receiptData.type)
.set(userId, receiptData.data);
}

receiptEvent.content = recursiveMapToObject(receiptEventContent);

return receiptEventContent.size > 0 ? receiptEvent : null;
}
}
101 changes: 16 additions & 85 deletions src/sync-accumulator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ limitations under the License.
*/

import { logger } from "./logger";
import { deepCopy, isSupportedReceiptType, MapWithDefault, recursiveMapToObject } from "./utils";
import { deepCopy } from "./utils";
import { IContent, IUnsigned } from "./models/event";
import { IRoomSummary } from "./models/room-summary";
import { EventType } from "./@types/event";
import { MAIN_ROOM_TIMELINE, ReceiptContent, ReceiptType } from "./@types/read_receipts";
import { UNREAD_THREAD_NOTIFICATIONS } from "./@types/sync";
import { ReceiptAccumulator } from "./receipt-accumulator";

Expand All @@ -41,15 +40,10 @@ interface IOpts {
export interface IMinimalEvent {
content: IContent;
type: EventType | string;
room_id?: string;
unsigned?: IUnsigned;
}

export interface AccumulatedReceipt {
data: IMinimalEvent;
type: ReceiptType;
eventId: string;
}

export interface IEphemeral {
events: IMinimalEvent[];
}
Expand Down Expand Up @@ -410,52 +404,17 @@ export class SyncAccumulator {
acc[INVITED_COUNT_KEY] = sum[INVITED_COUNT_KEY] || acc[INVITED_COUNT_KEY];
}

data.ephemeral?.events?.forEach((e) => {
// We purposefully do not persist m.typing events.
// Technically you could refresh a browser before the timer on a
// typing event is up, so it'll look like you aren't typing when
// you really still are. However, the alternative is worse. If
// we do persist typing events, it will look like people are
// typing forever until someone really does start typing (which
// will prompt Synapse to send down an actual m.typing event to
// clobber the one we persisted).
if (e.type !== EventType.Receipt || !e.content) {
// This means we'll drop unknown ephemeral events but that
// seems okay.
return;
}
// Handle m.receipt events. They clobber based on:
// (user_id, receipt_type)
// but they are keyed in the event as:
// content:{ $event_id: { $receipt_type: { $user_id: {json} }}}
// so store them in the former so we can accumulate receipt deltas
// quickly and efficiently (we expect a lot of them). Fold the
// receipt type into the key name since we only have 1 at the
// moment (m.read) and nested JSON objects are slower and more
// of a hassle to work with. We'll inflate this back out when
// getJSON() is called.
Object.keys(e.content).forEach((eventId) => {
Object.entries<ReceiptContent>(e.content[eventId]).forEach(([key, value]) => {
if (!isSupportedReceiptType(key)) return;

for (const userId of Object.keys(value)) {
const data = e.content[eventId][key][userId];

const receipt = {
data: e.content[eventId][key][userId],
type: key as ReceiptType,
eventId,
};

if (!data.thread_id || data.thread_id === MAIN_ROOM_TIMELINE) {
currentData._receipts.setUnthreaded(userId, receipt);
} else {
currentData._receipts.setThreaded(data.thread_id, userId, receipt);
}
}
});
});
});
// We purposefully do not persist m.typing events.
// Technically you could refresh a browser before the timer on a
// typing event is up, so it'll look like you aren't typing when
// you really still are. However, the alternative is worse. If
// we do persist typing events, it will look like people are
// typing forever until someone really does start typing (which
// will prompt Synapse to send down an actual m.typing event to
// clobber the one we persisted).

// Persist the receipts
currentData._receipts.consumeEphemeralEvents(data.ephemeral?.events);

// if we got a limited sync, we need to remove all timeline entries or else
// we will have gaps in the timeline.
Expand Down Expand Up @@ -561,39 +520,11 @@ export class SyncAccumulator {
roomJson.account_data.events.push(roomData._accountData[evType]);
});

// Add receipt data
const receiptEvent = {
type: EventType.Receipt,
room_id: roomId,
content: {
// $event_id: { "m.read": { $user_id: $json } }
} as IContent,
};

const receiptEventContent: MapWithDefault<
string,
MapWithDefault<ReceiptType, Map<string, object>>
> = new MapWithDefault(() => new MapWithDefault(() => new Map()));

for (const [userId, receiptData] of roomData._receipts.allUnthreaded()) {
receiptEventContent
.getOrCreate(receiptData.eventId)
.getOrCreate(receiptData.type)
.set(userId, receiptData.data);
}

for (const [userId, receiptData] of roomData._receipts.allThreaded()) {
receiptEventContent
.getOrCreate(receiptData.eventId)
.getOrCreate(receiptData.type)
.set(userId, receiptData.data);
}

receiptEvent.content = recursiveMapToObject(receiptEventContent);
const receiptEvent = roomData._receipts.buildAccumulatedReceiptEvent(roomId);

// add only if we have some receipt data
if (receiptEventContent.size > 0) {
roomJson.ephemeral.events.push(receiptEvent as IMinimalEvent);
if (receiptEvent) {
roomJson.ephemeral.events.push(receiptEvent);
}

// Add timeline data
Expand Down

0 comments on commit 3d86821

Please sign in to comment.