Skip to content

Commit

Permalink
proof of concept for thread list api implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
justjanne committed Aug 18, 2022
1 parent 9eb7290 commit aec5b7e
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 14 deletions.
129 changes: 123 additions & 6 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,13 @@ interface IMessagesResponse {
state: IStateEvent[];
}

interface IThreadedMessagesResponse {
prev_batch: string;
next_batch: string;
chunk: IRoomEvent[];
state: IStateEvent[];
}

export interface IRequestTokenResponse {
sid: string;
submit_url?: string;
Expand Down Expand Up @@ -1178,12 +1185,12 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
}

try {
const { serverSupport, stable } = await this.doesServerSupportThread();
Thread.setServerSideSupport(serverSupport, stable);
const { serverSupport, stable, listThreads } = await this.doesServerSupportThread();
Thread.setServerSideSupport(serverSupport, stable, listThreads);
} catch (e) {
// Most likely cause is that `doesServerSupportThread` returned `null` (as it
// is allowed to do) and thus we enter "degraded mode" on threads.
Thread.setServerSideSupport(false, true);
Thread.setServerSideSupport(false, true, false);
}

// shallow-copy the opts dict before modifying and storing it
Expand Down Expand Up @@ -5480,6 +5487,63 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
return this.http.authedRequest(undefined, Method.Get, path, params);
}

/**
* Makes a request to /messages with the appropriate lazy loading filter set.
* XXX: if we do get rid of scrollback (as it's not used at the moment),
* we could inline this method again in paginateEventTimeline as that would
* then be the only call-site
* @param {string} roomId
* @param {string} fromToken
* @param {number} limit the maximum amount of events the retrieve
* @param {string} dir 'f' or 'b'
* @param {Filter} timelineFilter the timeline filter to pass
* @return {Promise}
*/
// XXX: Intended private, used by room.fetchRoomThreads
public createThreadMessagesRequest(
roomId: string,
fromToken: string | null,
limit = 30,
dir: Direction,
timelineFilter?: Filter,
): Promise<IMessagesResponse> {
const path = utils.encodeUri("/rooms/$roomId/threads", { $roomId: roomId });

const params: Record<string, string> = {
limit: limit.toString(),
dir: dir,
include: 'all',
};

if (fromToken) {
params.from = fromToken;
}

let filter = null;
if (this.clientOpts.lazyLoadMembers) {
// create a shallow copy of LAZY_LOADING_MESSAGES_FILTER,
// so the timelineFilter doesn't get written into it below
filter = Object.assign({}, Filter.LAZY_LOADING_MESSAGES_FILTER);
}
if (timelineFilter) {
// XXX: it's horrific that /messages' filter parameter doesn't match
// /sync's one - see https://matrix.org/jira/browse/SPEC-451
filter = filter || {};
Object.assign(filter, timelineFilter.getRoomTimelineFilterComponent()?.toJSON());
}
if (filter) {
params.filter = JSON.stringify(filter);
}

return this.http.authedRequest<IThreadedMessagesResponse>(undefined, Method.Get, path, params, undefined, {
prefix: "/_matrix/client/unstable/org.matrix.msc3856",
}).then(res => ({
...res,
start: res.prev_batch,
end: res.next_batch,
}));
}

/**
* Take an EventTimeline, and back/forward-fill results.
*
Expand All @@ -5495,6 +5559,8 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
*/
public paginateEventTimeline(eventTimeline: EventTimeline, opts: IPaginateOpts): Promise<boolean> {
const isNotifTimeline = (eventTimeline.getTimelineSet() === this.notifTimelineSet);
const room = this.getRoom(eventTimeline.getRoomId());
const isThreadTimeline = eventTimeline.getTimelineSet().isThreadTimeline;

// TODO: we should implement a backoff (as per scrollback()) to deal more
// nicely with HTTP errors.
Expand Down Expand Up @@ -5565,8 +5631,43 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
eventTimeline.paginationRequests[dir] = null;
});
eventTimeline.paginationRequests[dir] = promise;
} else if (isThreadTimeline) {
if (!room) {
throw new Error("Unknown room " + eventTimeline.getRoomId());
}

promise = this.createThreadMessagesRequest(
eventTimeline.getRoomId(),
token,
opts.limit,
dir,
eventTimeline.getFilter(),
).then((res) => {
if (res.state) {
const roomState = eventTimeline.getState(dir);
const stateEvents = res.state.map(this.getEventMapper());
roomState.setUnknownStateEvents(stateEvents);
}
const token = res.end;
const matrixEvents = res.chunk.map(this.getEventMapper());

const timelineSet = eventTimeline.getTimelineSet();
timelineSet.addEventsToTimeline(matrixEvents, backwards, eventTimeline, token);
this.processBeaconEvents(timelineSet.room, matrixEvents);
this.processThreadRoots(timelineSet.room, matrixEvents, backwards);

// if we've hit the end of the timeline, we need to stop trying to
// paginate. We need to keep the 'forwards' token though, to make sure
// we can recover from gappy syncs.
if (backwards && res.end == res.start) {
eventTimeline.setPaginationToken(null, dir);
}
return res.end != res.start;
}).finally(() => {
eventTimeline.paginationRequests[dir] = null;
});
eventTimeline.paginationRequests[dir] = promise;
} else {
const room = this.getRoom(eventTimeline.getRoomId());
if (!room) {
throw new Error("Unknown room " + eventTimeline.getRoomId());
}
Expand Down Expand Up @@ -6649,6 +6750,10 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
* @return {Promise<boolean>} true if the feature is supported
*/
public async doesServerSupportUnstableFeature(feature: string): Promise<boolean> {
// FIXME: WORKAROUND FOR NOW
if (feature === "org.matrix.msc3856") {
return this.http.opts.baseUrl === "https://threads-dev.lab.element.dev";
}
const response = await this.getVersions();
if (!response) return false;
const unstableFeatures = response["unstable_features"];
Expand Down Expand Up @@ -6678,16 +6783,21 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
public async doesServerSupportThread(): Promise<{
serverSupport: boolean;
stable: boolean;
listThreads: boolean;
} | null> {
try {
const hasUnstableSupport = await this.doesServerSupportUnstableFeature("org.matrix.msc3440");
const hasStableSupport = await this.doesServerSupportUnstableFeature("org.matrix.msc3440.stable");
const [hasUnstableSupport, hasStableSupport, hasListThreadsSupport] = await Promise.all([
this.doesServerSupportUnstableFeature("org.matrix.msc3440"),
this.doesServerSupportUnstableFeature("org.matrix.msc3440.stable"),
this.doesServerSupportUnstableFeature("org.matrix.msc3856"),
]);

// TODO: Use `this.isVersionSupported("v1.3")` for whatever spec version includes MSC3440 formally.

return {
serverSupport: hasUnstableSupport || hasStableSupport,
stable: hasStableSupport,
listThreads: hasListThreadsSupport,
};
} catch (e) {
// Assume server support and stability aren't available: null/no data return.
Expand Down Expand Up @@ -9060,6 +9170,13 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
room.processThreadedEvents(threadedEvents, toStartOfTimeline);
}

/**
* @experimental
*/
public processThreadRoots(room: Room, threadedEvents: MatrixEvent[], toStartOfTimeline: boolean): void {
room.processThreadRoots(threadedEvents, toStartOfTimeline);
}

public processBeaconEvents(
room?: Room,
events?: MatrixEvent[],
Expand Down
3 changes: 3 additions & 0 deletions src/models/event-timeline-set.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,15 @@ export class EventTimelineSet extends TypedEventEmitter<EmittedEvents, EventTime
* @param {MatrixClient=} client the Matrix client which owns this EventTimelineSet,
* can be omitted if room is specified.
* @param {Thread=} thread the thread to which this timeline set relates.
* @param {boolean} isThreadTimeline Whether this timeline set relates to a thread list timeline
* (e.g., All threads or My threads)
*/
constructor(
public readonly room: Room | undefined,
opts: IOpts = {},
client?: MatrixClient,
public readonly thread?: Thread,
public readonly isThreadTimeline: boolean = false,
) {
super();

Expand Down
92 changes: 85 additions & 7 deletions src/models/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1583,9 +1583,37 @@ export class Room extends TypedEventEmitter<EmittedEvents, RoomEventHandlerMap>
return filter;
}

/**
* Add a timelineSet for this room with the given filter
* @param {ThreadFilterType?} filterType Thread list type (e.g., All threads or My threads)
* @param {Object=} opts Configuration options
* @return {EventTimelineSet} The timelineSet
*/
public getOrCreateThreadTimelineSet(
filterType?: ThreadFilterType,
{
pendingEvents = true,
}: ICreateFilterOpts = {},
): EventTimelineSet {
if (this.threadsTimelineSets[filterType]) {
return this.filteredTimelineSets[filterType];
}
const opts = Object.assign({ pendingEvents }, this.opts);
const timelineSet =
new EventTimelineSet(this, opts, undefined, undefined, Thread.hasServerSideSupportForThreadList);
this.reEmitter.reEmit(timelineSet, [
RoomEvent.Timeline,
RoomEvent.TimelineReset,
]);

return timelineSet;
}

private async createThreadTimelineSet(filterType?: ThreadFilterType): Promise<EventTimelineSet> {
let timelineSet: EventTimelineSet;
if (Thread.hasServerSideSupport) {
if (Thread.hasServerSideSupportForThreadList) {
timelineSet = this.getOrCreateThreadTimelineSet(filterType);
} else if (Thread.hasServerSideSupport) {
const filter = await this.getThreadListFilter(filterType);

timelineSet = this.getOrCreateFilteredTimelineSet(
Expand Down Expand Up @@ -1620,11 +1648,38 @@ export class Room extends TypedEventEmitter<EmittedEvents, RoomEventHandlerMap>

public threadsReady = false;

public processThreadRoots(events: MatrixEvent[], toStartOfTimeline: boolean): void {
for (const rootEvent of events) {
EventTimeline.setEventMetadata(
rootEvent,
this.currentState,
toStartOfTimeline,
);
if (!this.getThread(rootEvent.getId())) {
this.createThread(rootEvent.getId(), rootEvent, [], toStartOfTimeline);
}
}
}

public async fetchRoomThreads(): Promise<void> {
if (this.threadsReady || !this.client.supportsExperimentalThreads()) {
return;
}

if (Thread.hasServerSideSupportForThreadList) {
await Promise.all([
this.fetchRoomThreadList(ThreadFilterType.All),
this.fetchRoomThreadList(ThreadFilterType.My),
]);
} else {
await this.fetchOldThreadList();
}

this.on(ThreadEvent.NewReply, this.onThreadNewReply);
this.threadsReady = true;
}

private async fetchOldThreadList(): Promise<void> {
const allThreadsFilter = await this.getThreadListFilter();

const { chunk: events } = await this.client.createMessagesRequest(
Expand Down Expand Up @@ -1673,20 +1728,43 @@ export class Room extends TypedEventEmitter<EmittedEvents, RoomEventHandlerMap>
});
latestMyThreadsRootEvent = rootEvent;
}

if (!this.getThread(rootEvent.getId())) {
this.createThread(rootEvent.getId(), rootEvent, [], true);
}
}

this.processThreadRoots(threadRoots, true);

this.client.decryptEventIfNeeded(threadRoots[threadRoots.length -1]);
if (latestMyThreadsRootEvent) {
this.client.decryptEventIfNeeded(latestMyThreadsRootEvent);
}
}

this.threadsReady = true;
private async fetchRoomThreadList(filter?: ThreadFilterType): Promise<void> {
const timelineSet = filter === ThreadFilterType.My
? this.threadsTimelineSets[1]
: this.threadsTimelineSets[0];

this.on(ThreadEvent.NewReply, this.onThreadNewReply);
const { chunk: events, end } = await this.client.createThreadMessagesRequest(
this.roomId,
null,
undefined,
Direction.Backward,
timelineSet.getFilter(),
);

timelineSet.getLiveTimeline().setPaginationToken(end, Direction.Backward);

if (!events.length) return;

const matrixEvents = events.map(this.client.getEventMapper());
this.processThreadRoots(matrixEvents, true);
const roomState = this.getLiveTimeline().getState(EventTimeline.FORWARDS);
for (const rootEvent of matrixEvents) {
timelineSet.addLiveEvent(rootEvent, {
duplicateStrategy: DuplicateStrategy.Ignore,
fromCache: false,
roomState,
});
}
}

private onThreadNewReply(thread: Thread): void {
Expand Down
8 changes: 7 additions & 1 deletion src/models/thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ interface IThreadOpts {
*/
export class Thread extends TypedEventEmitter<EmittedEvents, EventHandlerMap> {
public static hasServerSideSupport: boolean;
public static hasServerSideSupportForThreadList: boolean;

/**
* A reference to all the events ID at the bottom of the threads
Expand Down Expand Up @@ -134,8 +135,13 @@ export class Thread extends TypedEventEmitter<EmittedEvents, EventHandlerMap> {
this.emit(ThreadEvent.Update, this);
}

public static setServerSideSupport(hasServerSideSupport: boolean, useStable: boolean): void {
public static setServerSideSupport(
hasServerSideSupport: boolean,
useStable: boolean,
hasListThreads: boolean,
): void {
Thread.hasServerSideSupport = hasServerSideSupport;
Thread.hasServerSideSupportForThreadList = hasListThreads;
if (!useStable) {
FILTER_RELATED_BY_SENDERS.setPreferUnstable(true);
FILTER_RELATED_BY_REL_TYPES.setPreferUnstable(true);
Expand Down

0 comments on commit aec5b7e

Please sign in to comment.