diff --git a/src/sync-accumulator.js b/src/sync-accumulator.js index 727d31baf..c0c71cfb5 100644 --- a/src/sync-accumulator.js +++ b/src/sync-accumulator.js @@ -42,19 +42,24 @@ class SyncAccumulator { opts = opts || {}; opts.maxTimelineEntries = opts.maxTimelineEntries || 50; this.opts = opts; - this.rooms = { - // $room_id : { - // category: invite|join|leave, - // data: { ... sync json data ... } - // } + this.inviteRooms = { + //$roomId: { ... sync 'invite' json data ... } + }; + this.joinRooms = { + //$roomId: { + // _currentState: { $event_type: { $state_key: json } }, + // _timeline: [json, json, json], + // _timelineToken: token, + // _accountData: { $event_type: json } + //} }; } /** - * Accumulate incremental /sync data. + * Accumulate incremental /sync room data. * @param {Object} syncResponse the complete /sync JSON */ - accumulate(syncResponse) { + accumulateRooms(syncResponse) { if (!syncResponse.rooms) { return; } @@ -93,78 +98,71 @@ class SyncAccumulator { // +------+ <======== +--------+ // // * equivalent to "no state" - - // We *NEVER* accumulate 'ephemeral' events because we don't want to - // store stale typing notifs. - if (data.ephemeral) { - delete data.ephemeral; - } - - if (!this.rooms[roomId]) { // (3) and (5) - this.rooms[roomId] = { - category: category, - data: data, - }; - return; - } - - const r = this.rooms[roomId]; - if (r.category === category) { - // append data to existing data structure - if (category === "invite") { - this._accumulateInviteState(r, data); - } else if (category === "join") { - this._accumulateJoinState(r, data); - } - } else if (category === "join" && r.category === "invite") { // (1) - // invite -> join, replace data structure. - this.rooms[roomId] = { - category: "join", - data: data, - }; - } else if (category === "leave") { // (2) and (4) - // invite|join -> leave, delete data structure, so (3) and (5) can - // be hit if they rejoin/get reinvited. - delete this.rooms[roomId]; + switch (category) { + case "invite": // (5) + this._accumulateInviteState(roomId, data); + break; + case "join": + if (this.inviteRooms[roomId]) { // (1) + // was previously invite, now join. We expect /sync to give + // the entire state and timeline on 'join', so delete previous + // invite state + delete this.inviteRooms[roomId]; + } + // (3) + // TODO: Check that join 'state' is the same if you leave then + // rejoin. We need to know if Synapse is instead returning + // a delta from the old leave state. If it is, this means + // we can NEVER delete 'leave' room data :/ + this._accumulateJoinState(roomId, data); + break; + case "leave": + if (this.inviteRooms[roomId]) { // (4) + delete this.inviteRooms[roomId]; + } else { // (2) + delete this.joinRooms[roomId]; + } + break; + default: + console.error("Unknown cateogory: ", category); } } - _accumulateInviteState(room, data) { + _accumulateInviteState(roomId, data) { if (!data.invite_state || !data.invite_state.events) { // no new data return; } - // ensure current data structure is sound - if (!room.data.invite_state) { - room.data.invite_state = {}; + if (!this.inviteRooms[roomId]) { + this.inviteRooms[roomId] = data; + return; } - if (!room.data.invite_state.events) { - room.data.invite_state.events = []; - } - + // accumulate extra keys for invite->invite transitions // clobber based on event type / state key // We expect invite_state to be small, so just loop over the events + const currentData = this.inviteRooms[roomId]; data.invite_state.events.forEach((e) => { let hasAdded = false; - for (let i = 0; i < room.data.invite_state.events.length; i++) { - const current = room.data.invite_state.events[i]; + for (let i = 0; i < currentData.invite_state.events.length; i++) { + const current = currentData.invite_state.events[i]; if (current.type === e.type && current.state_key == e.state_key) { - room.data.invite_state.events[i] = e; // update + currentData.invite_state.events[i] = e; // update hasAdded = true; } } if (!hasAdded) { - room.data.invite_state.events.push(e); + currentData.invite_state.events.push(e); } }); } // Accumulate timeline and state events in a room. - _accumulateJoinState(room, data) { + _accumulateJoinState(roomId, data) { // We expect this function to be called a lot (every /sync) so we want // this to be fast. /sync stores events in an array but we often want // to clobber based on type/state_key. Rather than convert arrays to - // maps all the time, just keep private maps which contains - // the set of updates to apply, which we'll do on getJSON(). + // maps all the time, just keep private maps which contain + // the actual current accumulated sync state, and array-ify it when + // getJSON() is called. // State resolution: // The 'state' key is the delta from the previous sync (or start of time @@ -193,53 +191,92 @@ class SyncAccumulator { // opts.maxTimelineEntries, and we may have a few less. We should never // have more though, provided that the /sync limit is less than or equal // to opts.maxTimelineEntries. + // + // We *NEVER* accumulate 'ephemeral' events because we don't want to + // store stale typing notifs. - // ensure current data structure is sound - room.state = room.state || {}; - room.state.events = room.state.events || []; - room._currentState = room._currentState || {}; - room.timeline = room.timeline || {}; - room.timeline.events = room.timeline.events || []; - room.account_data = room.account_data || {}; - room.account_data.events = room.account_data.events || []; - room.account_data._clobbers = room.account_data._clobbers || {}; + if (!this.joinRooms[roomId]) { + this.joinRooms[roomId] = { + _currentState: {}, + _timeline: [], + _timelineToken: null, + _accountData: {}, + }; + } - // TODO: state/timeline - - if (data.account_data) { + if (data.account_data && data.account_data.events) { // clobber based on type data.account_data.events.forEach((e) => { - room.account_data._clobbers[e.type] = e; + this.joinRooms[roomId]._accountData[e.type] = e; }); } + + // Work out the current state. The deltas need to be applied in the order: + // - existing state which didn't come down /sync. + // - State events under the 'state' key. + // - State events in the 'timeline'. + if (data.state && data.state.events) { + data.state.events.forEach((e) => { + setState(this.joinRooms[roomId]._currentState, e); + }); + } + if (data.timeline && data.timeline.events) { + data.timeline.events.forEach((e) => { + // this nops if 'e' isn't a state event + setState(this.joinRooms[roomId]._currentState, e); + }); + } + + // TODO: append events to our timeline and attempt to prune it } /** * Return everything under the 'rooms' key from a /sync response which - * accurately represents all room data. + * represents all room data that should be stored. This should be paired + * with the sync token which represents the most recent /sync response + * provided to accumulate(). Failure to do this can result in missing events. + *
+     * accumulator = new SyncAccumulator();
+     * // these 2 lines must occur on the same event loop tick to prevent
+     * // race conditions!
+     * accumulator.accumulateRooms(someSyncResponse);
+     * var outputSyncData = accumulator.getJSON();
+     * // the next batch pairs with outputSyncData.
+     * var syncToken = someSyncResponse.next_batch;
+     * 
* @return {Object} A JSON object which has the same API shape as /sync. */ getJSON() { const data = { join: {}, invite: {}, + // always empty. This is set by /sync when a room was previously + // in 'invite' or 'join'. On fresh startup, the client won't know + // about any previous room being in 'invite' or 'join' so we can + // just omit mentioning it at all, even if it has previously come + // down /sync. + // TODO: Check if full state is given upon rejoin. leave: {}, }; - Object.keys(this.rooms).forEach((roomId) => { - switch (this.rooms[roomId].category) { - case "join": - data.join[roomId] = this.rooms[roomId].data; - break; - case "invite": - data.invite[roomId] = this.rooms[roomId].data; - break; - case "leave": - data.leave[roomId] = this.rooms[roomId].data; - break; - } + Object.keys(this.inviteRooms).forEach((roomId) => { + data.invite[roomId] = this.inviteRooms[roomId]; + }); + Object.keys(this.joinRooms).forEach((roomId) => { + // TODO roll back current state to start of timeline. + data.join[roomId] = this.joinRooms[roomId].data; }); return data; } } +function setState(eventMap, event) { + if (!event.state_key || !event.type) { + return; + } + if (!eventMap[event.type]) { + eventMap[event.type] = {}; + } + eventMap[event.type][event.state_key] = event; +} + module.exports = SyncAccumulator;