From c06ebd1e4aa677727a860e19b125c24e1b5ccdba Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 6 Feb 2017 16:45:25 +0000 Subject: [PATCH] Implement /sync accumulation for everything but room timelines/state --- src/sync-accumulator.js | 144 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 142 insertions(+), 2 deletions(-) diff --git a/src/sync-accumulator.js b/src/sync-accumulator.js index bcaf84536..727d31baf 100644 --- a/src/sync-accumulator.js +++ b/src/sync-accumulator.js @@ -30,7 +30,18 @@ limitations under the License. */ class SyncAccumulator { - constructor() { + /** + * @param {Object} opts + * @param {Number=} opts.maxTimelineEntries The ideal maximum number of + * timeline entries to keep in the sync response. This is best-effort, as + * clients do not always have a back-pagination token for each event, so + * it's possible there may be slightly *less* than this value. There will + * never be more. + */ + constructor(opts) { + opts = opts || {}; + opts.maxTimelineEntries = opts.maxTimelineEntries || 50; + this.opts = opts; this.rooms = { // $room_id : { // category: invite|join|leave, @@ -71,7 +82,136 @@ class SyncAccumulator { } _accumulateRoom(roomId, category, data) { - return; + // Valid /sync state transitions + // +--------+ <======+ 1: Accept an invite + // +== | INVITE | | (5) 2: Leave a room + // | +--------+ =====+ | 3: Join a public room previously + // |(1) (4) | | left (handle as if new room) + // V (2) V | 4: Reject an invite + // +------+ ========> +--------+ 5: Invite to a room previously + // | JOIN | (3) | LEAVE* | left (handle as if new room) + // +------+ <======== +--------+ + // + // * equivalent to "no state" + + // We *NEVER* accumulate 'ephemeral' events because we don't want to + // store stale typing notifs. + if (data.ephemeral) { + delete data.ephemeral; + } + + if (!this.rooms[roomId]) { // (3) and (5) + this.rooms[roomId] = { + category: category, + data: data, + }; + return; + } + + const r = this.rooms[roomId]; + if (r.category === category) { + // append data to existing data structure + if (category === "invite") { + this._accumulateInviteState(r, data); + } else if (category === "join") { + this._accumulateJoinState(r, data); + } + } else if (category === "join" && r.category === "invite") { // (1) + // invite -> join, replace data structure. + this.rooms[roomId] = { + category: "join", + data: data, + }; + } else if (category === "leave") { // (2) and (4) + // invite|join -> leave, delete data structure, so (3) and (5) can + // be hit if they rejoin/get reinvited. + delete this.rooms[roomId]; + } + } + + _accumulateInviteState(room, data) { + if (!data.invite_state || !data.invite_state.events) { // no new data + return; + } + // ensure current data structure is sound + if (!room.data.invite_state) { + room.data.invite_state = {}; + } + if (!room.data.invite_state.events) { + room.data.invite_state.events = []; + } + + // clobber based on event type / state key + // We expect invite_state to be small, so just loop over the events + data.invite_state.events.forEach((e) => { + let hasAdded = false; + for (let i = 0; i < room.data.invite_state.events.length; i++) { + const current = room.data.invite_state.events[i]; + if (current.type === e.type && current.state_key == e.state_key) { + room.data.invite_state.events[i] = e; // update + hasAdded = true; + } + } + if (!hasAdded) { + room.data.invite_state.events.push(e); + } + }); + } + + // Accumulate timeline and state events in a room. + _accumulateJoinState(room, data) { + // We expect this function to be called a lot (every /sync) so we want + // this to be fast. /sync stores events in an array but we often want + // to clobber based on type/state_key. Rather than convert arrays to + // maps all the time, just keep private maps which contains + // the set of updates to apply, which we'll do on getJSON(). + + // State resolution: + // The 'state' key is the delta from the previous sync (or start of time + // if no token was supplied), to the START of the timeline. To obtain + // the current state, we need to "roll forward" state by reading the + // timeline. We want to store the current state so we can drop events + // out the end of the timeline based on opts.maxTimelineEntries. + // + // 'state' 'timeline' current state + // |-------x<======================>x + // T I M E + // + // When getJSON() is called, we 'roll back' the current state by the + // number of entries in the timeline to work out what 'state' should be. + + // Back-pagination: + // On an initial /sync, the server provides a back-pagination token for + // the start of the timeline. When /sync deltas come down, they also + // include back-pagination tokens for the start of the timeline. This + // means not all events in the timeline have back-pagination tokens, as + // it is only the ones at the START of the timeline which have them. + // In order for us to have a valid timeline (and back-pagination token + // to match), we need to make sure that when we remove old timeline + // events, that we roll forward to an event which has a back-pagination + // token. This means we can't keep a strict sliding-window based on + // opts.maxTimelineEntries, and we may have a few less. We should never + // have more though, provided that the /sync limit is less than or equal + // to opts.maxTimelineEntries. + + // ensure current data structure is sound + room.state = room.state || {}; + room.state.events = room.state.events || []; + room._currentState = room._currentState || {}; + room.timeline = room.timeline || {}; + room.timeline.events = room.timeline.events || []; + room.account_data = room.account_data || {}; + room.account_data.events = room.account_data.events || []; + room.account_data._clobbers = room.account_data._clobbers || {}; + + // TODO: state/timeline + + if (data.account_data) { + // clobber based on type + data.account_data.events.forEach((e) => { + room.account_data._clobbers[e.type] = e; + }); + } } /**