From 2b659656cc74dff9836483400d871178c1abd85a Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 8 Dec 2015 17:41:09 +0000 Subject: [PATCH] Move alllll the sync code to sync.js - still more to do (in FIXME XXX) --- lib/client.js | 489 ++------------------------------------------------ lib/sync.js | 415 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 426 insertions(+), 478 deletions(-) create mode 100644 lib/sync.js diff --git a/lib/client.js b/lib/client.js index 1ec64be36..e9c77469a 100644 --- a/lib/client.js +++ b/lib/client.js @@ -13,12 +13,11 @@ var httpApi = require("./http-api"); var MatrixEvent = require("./models/event").MatrixEvent; var EventStatus = require("./models/event").EventStatus; var StubStore = require("./store/stub"); -var Room = require("./models/room"); -var User = require("./models/user"); var webRtcCall = require("./webrtc/call"); var utils = require("./utils"); var contentRepo = require("./content-repo"); var Filter = require("./filter"); +var SyncApi = require("./sync"); var SCROLLBACK_DELAY_MS = 3000; var CRYPTO_ENABLED = false; @@ -622,9 +621,9 @@ MatrixClient.prototype.joinRoom = function(roomIdOrAlias, opts, callback) { this._http.authedRequest(undefined, "POST", path, undefined, {}).then( function(res) { var roomId = res.room_id; - var room = createNewRoom(self, roomId); + var room = createNewRoom(self, roomId); // XXX FIXME TODO if (opts.syncRoom) { - return _syncRoom(self, room); + return _syncRoom(self, room); // XXX FIXME TODO } return q(room); }, function(err) { @@ -2174,136 +2173,12 @@ MatrixClient.prototype.isLoggedIn = function() { /** * This is an internal method. * @param {MatrixClient} client - * @param {integer} historyLen - * @param {integer} includeArchived - * @param {integer} attempt */ -function doInitialSync(client, historyLen, includeArchived, attempt) { - attempt = attempt || 1; - var qps = { limit: historyLen }; - if (includeArchived) { - qps.archived = true; - } - if (client._guestRooms && client._isGuest) { - console.log(client._guestRooms); - qps.room_id = JSON.stringify(client._guestRooms); - } - client._http.authedRequest( - undefined, "GET", "/initialSync", qps - ).done(function(data) { - var i, j; - // intercept the results and put them into our store - if (!(client.store instanceof StubStore)) { - utils.forEach( - utils.map(data.presence, _PojoToMatrixEventMapper(client)), - function(e) { - var user = createNewUser(client, e.getContent().user_id); - user.setPresenceEvent(e); - client.store.storeUser(user); - }); - - // group receipts by room ID. - var receiptsByRoom = {}; - data.receipts = data.receipts || []; - utils.forEach(data.receipts.map(_PojoToMatrixEventMapper(client)), - function(receiptEvent) { - if (!receiptsByRoom[receiptEvent.getRoomId()]) { - receiptsByRoom[receiptEvent.getRoomId()] = []; - } - receiptsByRoom[receiptEvent.getRoomId()].push(receiptEvent); - } - ); - - for (i = 0; i < data.rooms.length; i++) { - var room = createNewRoom(client, data.rooms[i].room_id); - if (!data.rooms[i].state) { - data.rooms[i].state = []; - } - if (data.rooms[i].membership === "invite") { - var inviteEvent = data.rooms[i].invite; - if (!inviteEvent) { - // fallback for servers which don't serve the invite key yet - inviteEvent = { - event_id: "$fake_" + room.roomId, - content: { - membership: "invite" - }, - state_key: client.credentials.userId, - user_id: data.rooms[i].inviter, - room_id: room.roomId, - type: "m.room.member" - }; - } - data.rooms[i].state.push(inviteEvent); - } - - _processRoomEvents( - client, room, data.rooms[i].state, data.rooms[i].messages - ); - - var receipts = receiptsByRoom[room.roomId] || []; - for (j = 0; j < receipts.length; j++) { - room.addReceipt(receipts[j]); - } - - var privateUserData = data.rooms[i].account_data || []; - var privateUserDataEvents = - utils.map(privateUserData, _PojoToMatrixEventMapper(client)); - for (j = 0; j < privateUserDataEvents.length; j++) { - var event = privateUserDataEvents[j]; - if (event.getType() === "m.tag") { - room.addTags(event); - } - // XXX: unhandled private user data event - we should probably - // put it somewhere useful once the API has settled - } - - // cache the name/summary/etc prior to storage since we don't - // know how the store will serialise the Room. - room.recalculate(client.credentials.userId); - - client.store.storeRoom(room); - client.emit("Room", room); - } - } - - if (data) { - client.store.setSyncToken(data.end); - var events = []; - for (i = 0; i < data.presence.length; i++) { - events.push(new MatrixEvent(data.presence[i])); - } - for (i = 0; i < data.rooms.length; i++) { - if (data.rooms[i].state) { - for (j = 0; j < data.rooms[i].state.length; j++) { - events.push(new MatrixEvent(data.rooms[i].state[j])); - } - } - if (data.rooms[i].messages) { - for (j = 0; j < data.rooms[i].messages.chunk.length; j++) { - events.push( - new MatrixEvent(data.rooms[i].messages.chunk[j]) - ); - } - } - } - utils.forEach(events, function(e) { - client.emit("event", e); - }); - } - - client.clientRunning = true; - updateSyncState(client, "PREPARED"); - // assume success until we fail which may be 30+ secs - updateSyncState(client, "SYNCING"); - _pollForEvents(client); - }, function(err) { - console.error("/initialSync error (%s attempts): %s", attempt, err); - attempt += 1; - startSyncingRetryTimer(client, attempt, function() { - doInitialSync(client, historyLen, includeArchived, attempt); - }); - updateSyncState(client, "ERROR", { error: err }); +function doSync(client) { + var syncApi = new SyncApi(client); + syncApi.sync({ + historyLen: client._config.initialSyncLimit, + includeArchived: client._config.includeArchivedRooms }); } @@ -2352,250 +2227,12 @@ MatrixClient.prototype.startClient = function(opts) { this.uploadKeys(5); } - if (this.store.getSyncToken()) { - // resume from where we left off. - _pollForEvents(this); - return; - } - // periodically poll for turn servers if we support voip checkTurnServers(this); - prepareForSync(this); + doSync(this); }; -function prepareForSync(client, attempt) { - if (client.isGuest()) { - // no push rules for guests - doInitialSync( - client, - client._config.initialSyncLimit, - client._config.includeArchivedRooms - ); - return; - } - - attempt = attempt || 1; - // we do push rules before syncing so when we gets events down we know immediately - // whether they are bing-worthy. - client.pushRules().done(function(result) { - client.pushRules = result; - doInitialSync( - client, - client._config.initialSyncLimit, - client._config.includeArchivedRooms - ); - }, function(err) { - attempt += 1; - startSyncingRetryTimer(client, attempt, function() { - prepareForSync(client, attempt); - }); - updateSyncState(client, "ERROR", { error: err }); - }); -} - -/** - * This is an internal method. - * @param {MatrixClient} client - * @param {Number} attempt The attempt number - */ -function _pollForEvents(client, attempt) { - attempt = attempt || 1; - var self = client; - if (!client.clientRunning) { - return; - } - var timeoutMs = client._config.pollTimeout; - if (attempt > 1) { - // we think the connection is dead. If it comes back up, we won't know - // about it till /events returns. If the timeout= is high, this could - // be a long time. Set it to 1 when doing retries. - timeoutMs = 1; - } - var discardResult = false; - var timeoutObj = setTimeout(function() { - discardResult = true; - console.error("/events request timed out."); - _pollForEvents(client); - }, timeoutMs + (20 * 1000)); // 20s buffer - - var queryParams = { - from: client.store.getSyncToken(), - timeout: timeoutMs - }; - if (client._guestRooms && client._isGuest) { - queryParams.room_id = client._guestRooms; - } - - client._http.authedRequest(undefined, "GET", "/events", queryParams).done( - function(data) { - if (discardResult) { - return; - } - else { - clearTimeout(timeoutObj); - } - - if (self._syncState !== "SYNCING") { - updateSyncState(self, "SYNCING"); - } - - try { - var events = []; - if (data) { - events = utils.map(data.chunk, _PojoToMatrixEventMapper(self)); - } - if (!(self.store instanceof StubStore)) { - var roomIdsWithNewInvites = {}; - // bucket events based on room. - var i = 0; - var roomIdToEvents = {}; - for (i = 0; i < events.length; i++) { - var roomId = events[i].getRoomId(); - // possible to have no room ID e.g. for presence events. - if (roomId) { - if (!roomIdToEvents[roomId]) { - roomIdToEvents[roomId] = []; - } - roomIdToEvents[roomId].push(events[i]); - if (events[i].getType() === "m.room.member" && - events[i].getContent().membership === "invite") { - roomIdsWithNewInvites[roomId] = true; - } - } - else if (events[i].getType() === "m.presence") { - var usr = self.store.getUser(events[i].getContent().user_id); - if (usr) { - usr.setPresenceEvent(events[i]); - } - else { - usr = createNewUser(self, events[i].getContent().user_id); - usr.setPresenceEvent(events[i]); - self.store.storeUser(usr); - } - } - } - - // add events to room - var roomIds = utils.keys(roomIdToEvents); - utils.forEach(roomIds, function(roomId) { - var room = self.store.getRoom(roomId); - var isBrandNewRoom = false; - if (!room) { - room = createNewRoom(self, roomId); - isBrandNewRoom = true; - } - - var wasJoined = room.hasMembershipState( - self.credentials.userId, "join" - ); - - room.addEvents(roomIdToEvents[roomId], "replace"); - room.recalculate(self.credentials.userId); - - // store the Room for things like invite events so developers - // can update the UI - if (isBrandNewRoom) { - self.store.storeRoom(room); - self.emit("Room", room); - } - - var justJoined = room.hasMembershipState( - self.credentials.userId, "join" - ); - - if (!wasJoined && justJoined) { - // we've just transitioned into a join state for this room, - // so sync state. - _syncRoom(self, room); - } - }); - - Object.keys(roomIdsWithNewInvites).forEach(function(inviteRoomId) { - _resolveInvites(self, self.store.getRoom(inviteRoomId)); - }); - } - if (data) { - self.store.setSyncToken(data.end); - utils.forEach(events, function(e) { - self.emit("event", e); - }); - } - } - catch (e) { - console.error("Event stream error:"); - console.error(e); - } - _pollForEvents(self); - }, function(err) { - console.error("/events error: %s", JSON.stringify(err)); - if (discardResult) { - return; - } - else { - clearTimeout(timeoutObj); - } - - attempt += 1; - startSyncingRetryTimer(self, attempt, function() { - _pollForEvents(self, attempt); - }); - updateSyncState(self, "ERROR", { error: err }); - }); -} - -function _syncRoom(client, room) { - if (client._syncingRooms[room.roomId]) { - return client._syncingRooms[room.roomId]; - } - var defer = q.defer(); - client._syncingRooms[room.roomId] = defer.promise; - client.roomInitialSync(room.roomId, client._config.initialSyncLimit).done( - function(res) { - room.timeline = []; // blow away any previous messages. - _processRoomEvents(client, room, res.state, res.messages); - room.recalculate(client.credentials.userId); - client.store.storeRoom(room); - client.emit("Room", room); - defer.resolve(room); - client._syncingRooms[room.roomId] = undefined; - }, function(err) { - defer.reject(err); - client._syncingRooms[room.roomId] = undefined; - }); - return defer.promise; -} - -function _processRoomEvents(client, room, stateEventList, messageChunk) { - // "old" and "current" state are the same initially; they - // start diverging if the user paginates. - // We must deep copy otherwise membership changes in old state - // will leak through to current state! - var oldStateEvents = utils.map( - utils.deepCopy(stateEventList), _PojoToMatrixEventMapper(client) - ); - var stateEvents = utils.map(stateEventList, _PojoToMatrixEventMapper(client)); - room.oldState.setStateEvents(oldStateEvents); - room.currentState.setStateEvents(stateEvents); - - _resolveInvites(client, room); - - // add events to the timeline *after* setting the state - // events so messages use the right display names. Initial sync - // returns messages in chronological order, so we need to reverse - // it to get most recent -> oldest. We need it in that order in - // order to diverge old/current state correctly. - room.addEventsToTimeline( - utils.map( - messageChunk ? messageChunk.chunk : [], - _PojoToMatrixEventMapper(client) - ).reverse(), true - ); - if (messageChunk) { - room.oldState.paginationToken = messageChunk.start; - } -} - /** * High level helper method to stop the client from polling and allow a * clean shutdown. @@ -2605,66 +2242,6 @@ MatrixClient.prototype.stopClient = function() { // TODO: f.e. Room => self.store.storeRoom(room) ? }; - -function reEmit(reEmitEntity, emittableEntity, eventNames) { - utils.forEach(eventNames, function(eventName) { - // setup a listener on the entity (the Room, User, etc) for this event - emittableEntity.on(eventName, function() { - // take the args from the listener and reuse them, adding the - // event name to the arg list so it works with .emit() - // Transformation Example: - // listener on "foo" => function(a,b) { ... } - // Re-emit on "thing" => thing.emit("foo", a, b) - var newArgs = [eventName]; - for (var i = 0; i < arguments.length; i++) { - newArgs.push(arguments[i]); - } - reEmitEntity.emit.apply(reEmitEntity, newArgs); - }); - }); -} - -function _resolveInvites(client, room) { - if (!room || !client._config.resolveInvitesToProfiles) { - return; - } - // For each invited room member we want to give them a displayname/avatar url - // if they have one (the m.room.member invites don't contain this). - room.getMembersWithMembership("invite").forEach(function(member) { - if (member._requestedProfileInfo) { - return; - } - member._requestedProfileInfo = true; - // try to get a cached copy first. - var user = client.getUser(member.userId); - var promise; - if (user) { - promise = q({ - avatar_url: user.avatarUrl, - displayname: user.displayName - }); - } - else { - promise = client.getProfileInfo(member.userId); - } - promise.done(function(info) { - // slightly naughty by doctoring the invite event but this means all - // the code paths remain the same between invite/join display name stuff - // which is a worthy trade-off for some minor pollution. - var inviteEvent = member.events.member; - if (inviteEvent.getContent().membership !== "invite") { - // between resolving and now they have since joined, so don't clobber - return; - } - inviteEvent.getContent().avatar_url = info.avatar_url; - inviteEvent.getContent().displayname = info.displayname; - member.setMembershipEvent(inviteEvent, room.currentState); // fire listeners - }, function(err) { - // OH WELL. - }); - }); -} - function setupCallEventHandler(client) { var candidatesByCall = { // callId: [Candidate] @@ -2820,20 +2397,6 @@ function setupCallEventHandler(client) { }); } -function startSyncingRetryTimer(client, attempt, fn) { - client._syncingRetry = {}; - client._syncingRetry.fn = fn; - client._syncingRetry.timeoutId = setTimeout(function() { - fn(); - }, retryTimeMsForAttempt(attempt)); -} - -function updateSyncState(client, newState, data) { - var old = client._syncState; - client._syncState = newState; - client.emit("sync", client._syncState, old, data); -} - function checkTurnServers(client) { if (!client._supportsVoip) { return; @@ -2861,37 +2424,6 @@ function checkTurnServers(client) { }); } -function createNewUser(client, userId) { - var user = new User(userId); - reEmit(client, user, ["User.avatarUrl", "User.displayName", "User.presence"]); - return user; -} - -function createNewRoom(client, roomId) { - var room = new Room(roomId, { - pendingEventOrdering: client._config.pendingEventOrdering - }); - reEmit(client, room, ["Room.name", "Room.timeline", "Room.receipt", "Room.tags"]); - - // we need to also re-emit room state and room member events, so hook it up - // to the client now. We need to add a listener for RoomState.members in - // order to hook them correctly. (TODO: find a better way?) - reEmit(client, room.currentState, [ - "RoomState.events", "RoomState.members", "RoomState.newMember" - ]); - room.currentState.on("RoomState.newMember", function(event, state, member) { - member.user = client.getUser(member.userId); - reEmit( - client, member, - [ - "RoomMember.name", "RoomMember.typing", "RoomMember.powerLevel", - "RoomMember.membership" - ] - ); - }); - return room; -} - function retryTimeMsForAttempt(attempt) { // 2,4,8,16,32,64,128,128,128,... seconds // max 2^7 secs = 2.1 mins @@ -2970,7 +2502,8 @@ MatrixClient.prototype.generateClientSecret = function() { module.exports.MatrixClient = MatrixClient; /** */ module.exports.CRYPTO_ENABLED = CRYPTO_ENABLED; - +/** */ +module.exports.EventMapper = _PojoToMatrixEventMapper; // MatrixClient Event JSDocs diff --git a/lib/sync.js b/lib/sync.js new file mode 100644 index 000000000..c34e7a242 --- /dev/null +++ b/lib/sync.js @@ -0,0 +1,415 @@ +"use strict"; + +/* + * TODO: + * This class mainly serves to take all the syncing logic out of client.js and + * into a separate file. It's all very fluid, and this class gut wrenches a lot + * of MatrixClient props (e.g. _http). Given we want to support WebSockets as + * an alternative syncing API, we may want to have a proper syncing interface + * for HTTP and WS at some point. + */ + +var StubStore = require("./store/stub"); +var EventMapper = require("./client").EventMapper; +var User = require("./models/user"); +var Room = require("./models/room"); +var utils = require("./utils"); +var MatrixEvent = require("./models/event").MatrixEvent; + +function startSyncingRetryTimer(client, attempt, fn) { + client._syncingRetry = {}; + client._syncingRetry.fn = fn; + client._syncingRetry.timeoutId = setTimeout(function() { + fn(); + }, retryTimeMsForAttempt(attempt)); +} + +function updateSyncState(client, newState, data) { + var old = client._syncState; + client._syncState = newState; + client.emit("sync", client._syncState, old, data); +} + +function createNewUser(client, userId) { + var user = new User(userId); + reEmit(client, user, ["User.avatarUrl", "User.displayName", "User.presence"]); + return user; +} + +function createNewRoom(client, roomId) { + var room = new Room(roomId, { + pendingEventOrdering: client._config.pendingEventOrdering + }); + reEmit(client, room, ["Room.name", "Room.timeline", "Room.receipt", "Room.tags"]); + + // we need to also re-emit room state and room member events, so hook it up + // to the client now. We need to add a listener for RoomState.members in + // order to hook them correctly. (TODO: find a better way?) + reEmit(client, room.currentState, [ + "RoomState.events", "RoomState.members", "RoomState.newMember" + ]); + room.currentState.on("RoomState.newMember", function(event, state, member) { + member.user = client.getUser(member.userId); + reEmit( + client, member, + [ + "RoomMember.name", "RoomMember.typing", "RoomMember.powerLevel", + "RoomMember.membership" + ] + ); + }); + return room; +} + +function reEmit(reEmitEntity, emittableEntity, eventNames) { + utils.forEach(eventNames, function(eventName) { + // setup a listener on the entity (the Room, User, etc) for this event + emittableEntity.on(eventName, function() { + // take the args from the listener and reuse them, adding the + // event name to the arg list so it works with .emit() + // Transformation Example: + // listener on "foo" => function(a,b) { ... } + // Re-emit on "thing" => thing.emit("foo", a, b) + var newArgs = [eventName]; + for (var i = 0; i < arguments.length; i++) { + newArgs.push(arguments[i]); + } + reEmitEntity.emit.apply(reEmitEntity, newArgs); + }); + }); +} + +/** + * Internal class - unstable. + * Construct an entity which is able to sync with a homeserver. + * @param {MatrixClient} client The matrix client instance to use. + */ +function SyncApi(client) { + this.client = client; + this.opts = {}; +} + +/** + * @param {Object} opts + * @param {Number} opts.historyLen + * @param {Boolean} opts.includeArchived + */ +SyncApi.prototype.sync = function(opts) { + console.log("SyncApi.sync -> %s", opts); + this.opts = opts || {}; + return this._sync(); +} + + +SyncApi.prototype._prepareForSync = function(attempt) { + var client = this.client; + var self = this; + if (client.isGuest()) { + // no push rules for guests + this._sync(); + return; + } + + attempt = attempt || 1; + // we do push rules before syncing so when we gets events down we know immediately + // whether they are bing-worthy. + client.pushRules().done(function(result) { + client.pushRules = result; + self._sync(); + }, function(err) { + attempt += 1; + startSyncingRetryTimer(client, attempt, function() { + prepareForSync(client, attempt); + }); + updateSyncState(client, "ERROR", { error: err }); + }); +} + +SyncApi.prototype._sync = function(attempt) { + var opts = this.opts; + var client = this.client; + var self = this; + var historyLen = opts.historyLen; + var includeArchived = opts.includeArchived; + attempt = attempt || 1; + + var qps = { limit: historyLen }; + if (includeArchived) { + qps.archived = true; + } + if (client._guestRooms && client._isGuest) { + qps.room_id = JSON.stringify(client._guestRooms); + } + client._http.authedRequest( + undefined, "GET", "/initialSync", qps + ).done(function(data) { + var i, j; + // intercept the results and put them into our store + if (!(client.store instanceof StubStore)) { + utils.forEach( + utils.map(data.presence, EventMapper(client)), + function(e) { + var user = createNewUser(client, e.getContent().user_id); + user.setPresenceEvent(e); + client.store.storeUser(user); + }); + + // group receipts by room ID. + var receiptsByRoom = {}; + data.receipts = data.receipts || []; + utils.forEach(data.receipts.map(EventMapper(client)), + function(receiptEvent) { + if (!receiptsByRoom[receiptEvent.getRoomId()]) { + receiptsByRoom[receiptEvent.getRoomId()] = []; + } + receiptsByRoom[receiptEvent.getRoomId()].push(receiptEvent); + } + ); + + for (i = 0; i < data.rooms.length; i++) { + var room = createNewRoom(client, data.rooms[i].room_id); + if (!data.rooms[i].state) { + data.rooms[i].state = []; + } + if (data.rooms[i].membership === "invite") { + var inviteEvent = data.rooms[i].invite; + if (!inviteEvent) { + // fallback for servers which don't serve the invite key yet + inviteEvent = { + event_id: "$fake_" + room.roomId, + content: { + membership: "invite" + }, + state_key: client.credentials.userId, + user_id: data.rooms[i].inviter, + room_id: room.roomId, + type: "m.room.member" + }; + } + data.rooms[i].state.push(inviteEvent); + } + + _processRoomEvents( // XXX + client, room, data.rooms[i].state, data.rooms[i].messages + ); + + var receipts = receiptsByRoom[room.roomId] || []; + for (j = 0; j < receipts.length; j++) { + room.addReceipt(receipts[j]); + } + + var privateUserData = data.rooms[i].account_data || []; + var privateUserDataEvents = + utils.map(privateUserData, EventMapper(client)); + for (j = 0; j < privateUserDataEvents.length; j++) { + var event = privateUserDataEvents[j]; + if (event.getType() === "m.tag") { + room.addTags(event); + } + // XXX: unhandled private user data event - we should probably + // put it somewhere useful once the API has settled + } + + // cache the name/summary/etc prior to storage since we don't + // know how the store will serialise the Room. + room.recalculate(client.credentials.userId); + + client.store.storeRoom(room); + client.emit("Room", room); + } + } + + if (data) { + client.store.setSyncToken(data.end); + var events = []; + for (i = 0; i < data.presence.length; i++) { + events.push(new MatrixEvent(data.presence[i])); + } + for (i = 0; i < data.rooms.length; i++) { + if (data.rooms[i].state) { + for (j = 0; j < data.rooms[i].state.length; j++) { + events.push(new MatrixEvent(data.rooms[i].state[j])); + } + } + if (data.rooms[i].messages) { + for (j = 0; j < data.rooms[i].messages.chunk.length; j++) { + events.push( + new MatrixEvent(data.rooms[i].messages.chunk[j]) + ); + } + } + } + utils.forEach(events, function(e) { + client.emit("event", e); + }); + } + + client.clientRunning = true; + updateSyncState(client, "PREPARED"); + // assume success until we fail which may be 30+ secs + updateSyncState(client, "SYNCING"); + self._pollForEvents(); + }, function(err) { + console.error("/initialSync error (%s attempts): %s", attempt, err); + attempt += 1; + startSyncingRetryTimer(client, attempt, function() { + self._sync(opts, attempt); + }); + updateSyncState(client, "ERROR", { error: err }); + }); +}; + +/** + * This is an internal method. + * @param {MatrixClient} client + * @param {Number} attempt The attempt number + */ +SyncApi.prototype._pollForEvents = function(attempt) { + var client = this.client; + var self = this; + + attempt = attempt || 1; + + if (!client.clientRunning) { + return; + } + var timeoutMs = client._config.pollTimeout; + if (attempt > 1) { + // we think the connection is dead. If it comes back up, we won't know + // about it till /events returns. If the timeout= is high, this could + // be a long time. Set it to 1 when doing retries. + timeoutMs = 1; + } + var discardResult = false; + var timeoutObj = setTimeout(function() { + discardResult = true; + console.error("/events request timed out."); + self._pollForEvents(); + }, timeoutMs + (20 * 1000)); // 20s buffer + + var queryParams = { + from: client.store.getSyncToken(), + timeout: timeoutMs + }; + if (client._guestRooms && client._isGuest) { + queryParams.room_id = client._guestRooms; + } + + client._http.authedRequest(undefined, "GET", "/events", queryParams).done( + function(data) { + if (discardResult) { + return; + } + else { + clearTimeout(timeoutObj); + } + + if (client._syncState !== "SYNCING") { + updateSyncState(self, "SYNCING"); + } + + try { + var events = []; + if (data) { + events = utils.map(data.chunk, EventMapper(self)); + } + if (!(client.store instanceof StubStore)) { + var roomIdsWithNewInvites = {}; + // bucket events based on room. + var i = 0; + var roomIdToEvents = {}; + for (i = 0; i < events.length; i++) { + var roomId = events[i].getRoomId(); + // possible to have no room ID e.g. for presence events. + if (roomId) { + if (!roomIdToEvents[roomId]) { + roomIdToEvents[roomId] = []; + } + roomIdToEvents[roomId].push(events[i]); + if (events[i].getType() === "m.room.member" && + events[i].getContent().membership === "invite") { + roomIdsWithNewInvites[roomId] = true; + } + } + else if (events[i].getType() === "m.presence") { + var usr = client.store.getUser(events[i].getContent().user_id); + if (usr) { + usr.setPresenceEvent(events[i]); + } + else { + usr = createNewUser(self, events[i].getContent().user_id); + usr.setPresenceEvent(events[i]); + client.store.storeUser(usr); + } + } + } + + // add events to room + var roomIds = utils.keys(roomIdToEvents); + utils.forEach(roomIds, function(roomId) { + var room = client.store.getRoom(roomId); + var isBrandNewRoom = false; + if (!room) { + room = createNewRoom(self, roomId); + isBrandNewRoom = true; + } + + var wasJoined = room.hasMembershipState( + client.credentials.userId, "join" + ); + + room.addEvents(roomIdToEvents[roomId], "replace"); + room.recalculate(client.credentials.userId); + + // store the Room for things like invite events so developers + // can update the UI + if (isBrandNewRoom) { + client.store.storeRoom(room); + client.emit("Room", room); + } + + var justJoined = room.hasMembershipState( + client.credentials.userId, "join" + ); + + if (!wasJoined && justJoined) { + // we've just transitioned into a join state for this room, + // so sync state. + _syncRoom(self, room); // XXX + } + }); + + Object.keys(roomIdsWithNewInvites).forEach(function(inviteRoomId) { + _resolveInvites(self, client.store.getRoom(inviteRoomId)); // XXX + }); + } + if (data) { + client.store.setSyncToken(data.end); + utils.forEach(events, function(e) { + client.emit("event", e); + }); + } + } + catch (e) { + console.error("Event stream error:"); + console.error(e); + } + client._pollForEvents(); + }, function(err) { + console.error("/events error: %s", JSON.stringify(err)); + if (discardResult) { + return; + } + else { + clearTimeout(timeoutObj); + } + + attempt += 1; + startSyncingRetryTimer(self, attempt, function() { + client._pollForEvents(attempt); + }); + updateSyncState(self, "ERROR", { error: err }); + }); +} + +module.exports = SyncApi;