You've already forked matrix-js-sdk
mirror of
https://github.com/matrix-org/matrix-js-sdk.git
synced 2025-08-09 10:22:46 +03:00
Add glue code to hook up the sync accumulator
The user of the SDK is responsible for DIing the main components: let store = new IndexedDBStore( new IndexedDBStoreBackend(window.indexedDB), new SyncAccumulator(), }); await store.startup(); let client = matrix.createClient({store: store});
This commit is contained in:
@@ -38,6 +38,7 @@ const Filter = require("./filter");
|
||||
const SyncApi = require("./sync");
|
||||
const MatrixBaseApis = require("./base-apis");
|
||||
const MatrixError = httpApi.MatrixError;
|
||||
const IndexedDBStore = require("./store/indexeddb").IndexedDBStore;
|
||||
|
||||
const SCROLLBACK_DELAY_MS = 3000;
|
||||
let CRYPTO_ENABLED = false;
|
||||
@@ -163,6 +164,11 @@ function MatrixClient(opts) {
|
||||
|
||||
this.olmVersion = Crypto.getOlmVersion();
|
||||
}
|
||||
|
||||
// Set up a sync accumulator if we can persist room data
|
||||
if (this.store instanceof IndexedDBStore) {
|
||||
this._syncAccumulator = this.store.getSyncAccumulator();
|
||||
}
|
||||
}
|
||||
utils.inherits(MatrixClient, EventEmitter);
|
||||
utils.extend(MatrixClient.prototype, MatrixBaseApis.prototype);
|
||||
@@ -2695,6 +2701,7 @@ MatrixClient.prototype.startClient = function(opts) {
|
||||
opts = Object.assign({}, opts);
|
||||
|
||||
opts.crypto = this._crypto;
|
||||
opts.syncAccumulator = this._syncAccumulator;
|
||||
this._clientOpts = opts;
|
||||
|
||||
this._syncApi = new SyncApi(this, opts);
|
||||
|
@@ -25,6 +25,8 @@ module.exports.MatrixInMemoryStore = require("./store/memory").MatrixInMemorySto
|
||||
module.exports.IndexedDBStore = require("./store/indexeddb").IndexedDBStore;
|
||||
/** The {@link module:store/indexeddb.IndexedDBStoreBackend|IndexedDBStoreBackend} class. */
|
||||
module.exports.IndexedDBStoreBackend = require("./store/indexeddb").IndexedDBStoreBackend;
|
||||
/** The {@link module:sync-accumulator.SyncAccumulator|SyncAccumulator} class. */
|
||||
module.exports.SyncAccumulator = require("./sync-accumulator");
|
||||
/** The {@link module:http-api.MatrixHttpApi|MatrixHttpApi} class. */
|
||||
module.exports.MatrixHttpApi = require("./http-api").MatrixHttpApi;
|
||||
/** The {@link module:http-api.MatrixError|MatrixError} class. */
|
||||
|
@@ -18,7 +18,6 @@ limitations under the License.
|
||||
import q from "q";
|
||||
import {MatrixInMemoryStore} from "./memory";
|
||||
import User from "../models/user";
|
||||
import Room from "../models/room";
|
||||
import {MatrixEvent} from "../models/event";
|
||||
import utils from "../utils";
|
||||
|
||||
@@ -77,25 +76,18 @@ IndexedDBStoreBackend.prototype = {
|
||||
},
|
||||
|
||||
/**
|
||||
* Persist a list of Room objects. Rooms with the same 'roomId' will be replaced.
|
||||
* @param {Room[]} rooms An array of rooms
|
||||
* @return {Promise} Resolves if the rooms were persisted.
|
||||
* Persist rooms /sync data along with the next batch token.
|
||||
* @param {string} nextBatch The next_batch /sync value.
|
||||
* @param {Object} roomsData The 'rooms' /sync data from a SyncAccumulator
|
||||
* @return {Promise} Resolves if the data was persisted.
|
||||
*/
|
||||
persistRooms: function(rooms) {
|
||||
return this._upsert("rooms", rooms);
|
||||
},
|
||||
|
||||
/**
|
||||
* Persist a sync token. This will replace any existing sync token.
|
||||
* @param {string} syncToken The token to persist.
|
||||
* @return {Promise} Resolves if the token was persisted.
|
||||
*/
|
||||
persistSyncToken: function(syncToken) {
|
||||
persistSyncData: function(nextBatch, roomsData) {
|
||||
const obj = {
|
||||
clobber: "-", // constant key so will always clobber
|
||||
syncToken: syncToken,
|
||||
nextBatch: nextBatch,
|
||||
roomsData: roomsData,
|
||||
};
|
||||
return this._upsert("config", [obj]);
|
||||
return this._upsert("sync", [obj]);
|
||||
},
|
||||
|
||||
/**
|
||||
@@ -134,14 +126,6 @@ IndexedDBStoreBackend.prototype = {
|
||||
return this._deserializeAll("users", User);
|
||||
},
|
||||
|
||||
/**
|
||||
* Load all the rooms from the database. This is not cached.
|
||||
* @return {Promise<Room[]>} A list of rooms.
|
||||
*/
|
||||
loadRooms: function() {
|
||||
return this._deserializeAll("rooms", Room);
|
||||
},
|
||||
|
||||
/**
|
||||
* Load all the account data events from the database. This is not cached.
|
||||
* @return {Promise<MatrixEvent[]>} A list of events.
|
||||
@@ -157,20 +141,20 @@ IndexedDBStoreBackend.prototype = {
|
||||
},
|
||||
|
||||
/**
|
||||
* Load the sync token from the database.
|
||||
* @return {Promise<?string>} The sync token
|
||||
* Load the sync data from the database.
|
||||
* @return {Promise<Object>} An object with "roomsData" and "nextBatch" keys.
|
||||
*/
|
||||
loadSyncToken: function() {
|
||||
loadSyncData: function() {
|
||||
return q.try(() => {
|
||||
const txn = this.db.transaction(["config"], "readonly");
|
||||
const store = txn.objectStore("config");
|
||||
const txn = this.db.transaction(["sync"], "readonly");
|
||||
const store = txn.objectStore("sync");
|
||||
const results = selectQuery(store, undefined, (cursor) => {
|
||||
return cursor.value;
|
||||
});
|
||||
if (results.length > 1) {
|
||||
console.warn("loadSyncToken: More than 1 config row found.");
|
||||
console.warn("loadSyncData: More than 1 sync row found.");
|
||||
}
|
||||
return (results.length > 0 ? results[0].syncToken : null);
|
||||
return (results.length > 0 ? results[0] : {});
|
||||
});
|
||||
},
|
||||
|
||||
@@ -211,8 +195,11 @@ IndexedDBStoreBackend.prototype = {
|
||||
* sync from the server is not required. This does not reduce memory usage as all
|
||||
* the data is eagerly fetched when <code>startup()</code> is called.
|
||||
* <pre>
|
||||
* let syncAccumulator = new SyncAccumulator();
|
||||
* let opts = { localStorage: window.localStorage };
|
||||
* let store = new IndexedDBStore(new IndexedDBStoreBackend(window.indexedDB), opts);
|
||||
* let store = new IndexedDBStore(
|
||||
* new IndexedDBStoreBackend(window.indexedDB), syncAccumulator, opts
|
||||
* );
|
||||
* await store.startup(); // load from indexed db
|
||||
* let client = sdk.createClient({
|
||||
* store: store,
|
||||
@@ -228,12 +215,14 @@ IndexedDBStoreBackend.prototype = {
|
||||
* @constructor
|
||||
* @extends MatrixInMemoryStore
|
||||
* @param {IndexedDBStoreBackend} backend The indexed db backend instance.
|
||||
* @param {SyncAccumulator} syncAccumulator The sync accumulator which will be
|
||||
* loaded from IndexedDB and periodically saved to IndexedDB.
|
||||
* @param {Object=} opts Options for MatrixInMemoryStore.
|
||||
* @prop {IndexedDBStoreBackend} backend The backend instance. Call through to
|
||||
* this API if you need to perform specific indexeddb actions like deleting the
|
||||
* database.
|
||||
*/
|
||||
const IndexedDBStore = function IndexedDBStore(backend, opts) {
|
||||
const IndexedDBStore = function IndexedDBStore(backend, syncAccumulator, opts) {
|
||||
MatrixInMemoryStore.call(this, opts);
|
||||
this.backend = backend;
|
||||
this.startedUp = false;
|
||||
@@ -243,6 +232,11 @@ const IndexedDBStore = function IndexedDBStore(backend, opts) {
|
||||
this._userModifiedMap = {
|
||||
// user_id : timestamp
|
||||
};
|
||||
this._syncAccumulator = syncAccumulator;
|
||||
|
||||
if (!this.backend || !this._syncAccumulator) {
|
||||
throw new Error("Missing backend or syncAccumulator");
|
||||
}
|
||||
};
|
||||
utils.inherits(IndexedDBStore, MatrixInMemoryStore);
|
||||
|
||||
@@ -258,10 +252,10 @@ IndexedDBStore.prototype.startup = function() {
|
||||
this.backend.loadUsers(),
|
||||
this.backend.loadAccountData(),
|
||||
this.backend.loadRooms(),
|
||||
this.backend.loadSyncToken(),
|
||||
this.backend.loadSyncData(),
|
||||
]);
|
||||
}).then((values) => {
|
||||
const [users, accountData, rooms, syncToken] = values;
|
||||
const [users, accountData, rooms, syncData] = values;
|
||||
console.log(
|
||||
"Loaded data from database. Reticulating splines...",
|
||||
accountData, users,
|
||||
@@ -275,10 +269,20 @@ IndexedDBStore.prototype.startup = function() {
|
||||
this.storeRoom(r);
|
||||
});
|
||||
this._syncTs = Date.now(); // pretend we've written so we don't rewrite
|
||||
this.setSyncToken(syncToken);
|
||||
this.setSyncToken(syncData.syncToken);
|
||||
this._setSyncData(syncData.syncToken, syncData.roomsData);
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
* Return the accumulator which will have the initial /sync data when startup()
|
||||
* is called.
|
||||
* @return {SyncAccumulator}
|
||||
*/
|
||||
IndexedDBStore.prototype.getSyncAccumulator = function() {
|
||||
return this._syncAccumulator;
|
||||
};
|
||||
|
||||
/**
|
||||
* Set a new sync token and possibly write to the database.
|
||||
* Overrides MatrixInMemoryStore.
|
||||
@@ -295,6 +299,13 @@ IndexedDBStore.prototype.setSyncToken = function(token) {
|
||||
return null;
|
||||
};
|
||||
|
||||
IndexedDBStore.prototype._setSyncData = function(nextBatch, roomsData) {
|
||||
this._syncAccumulator.accumulateRooms({
|
||||
next_batch: nextBatch,
|
||||
rooms: roomsData,
|
||||
});
|
||||
};
|
||||
|
||||
IndexedDBStore.prototype._syncToDatabase = function() {
|
||||
console.log("_syncToDatabase");
|
||||
this._syncTs = Date.now(); // set now to guard against multi-writes
|
||||
@@ -315,16 +326,16 @@ IndexedDBStore.prototype._syncToDatabase = function() {
|
||||
return this.accountData[etype];
|
||||
});
|
||||
|
||||
const syncData = this._syncAccumulator.getJSON();
|
||||
|
||||
return q.all([
|
||||
this.backend.persistUsers(changedUsers),
|
||||
this.backend.persistAccountData(changedAccountData),
|
||||
this.backend.persistSyncData(syncData.nextBatch, syncData.roomsData),
|
||||
]);
|
||||
};
|
||||
|
||||
function createDatabase(db) {
|
||||
// Make room store, clobber based on room ID. (roomId property of Room objects)
|
||||
db.createObjectStore("rooms", { keyPath: ["roomId"] });
|
||||
|
||||
// Make user store, clobber based on user ID. (userId property of User objects)
|
||||
db.createObjectStore("users", { keyPath: ["userId"] });
|
||||
|
||||
@@ -332,8 +343,8 @@ function createDatabase(db) {
|
||||
// (event.type property of MatrixEvent objects)
|
||||
db.createObjectStore("accountData", { keyPath: ["type"] });
|
||||
|
||||
// Make configuration store (sync tokens, etc), always clobber (const key).
|
||||
db.createObjectStore("config", { keyPath: ["clobber"] });
|
||||
// Make /sync store (sync tokens, room data, etc), always clobber (const key).
|
||||
db.createObjectStore("sync", { keyPath: ["clobber"] });
|
||||
}
|
||||
|
||||
/**
|
||||
|
@@ -15,10 +15,13 @@ limitations under the License.
|
||||
*/
|
||||
"use strict";
|
||||
|
||||
/**
|
||||
* This is an internal module. See {@link SyncAccumulator} for the public class.
|
||||
* @module sync-accumulator
|
||||
*/
|
||||
|
||||
|
||||
/**
|
||||
* Internal class.
|
||||
*
|
||||
* The purpose of this class is to accumulate /sync responses such that a
|
||||
* complete "initial" JSON response can be returned which accurately represents
|
||||
* the sum total of the /sync responses accumulated to date. It only handles
|
||||
|
31
src/sync.js
31
src/sync.js
@@ -60,7 +60,8 @@ function debuglog() {
|
||||
* @param {Object} opts Config options
|
||||
* @param {module:crypto=} opts.crypto Crypto manager
|
||||
* @param {SyncAccumulator=} opts.syncAccumulator An accumulator which will be
|
||||
* kept up-to-date.
|
||||
* kept up-to-date. If one is supplied, the response to getJSON() will be used
|
||||
* initially.
|
||||
*/
|
||||
function SyncApi(client, opts) {
|
||||
this.client = client;
|
||||
@@ -513,9 +514,26 @@ SyncApi.prototype._sync = function(syncOptions) {
|
||||
// normal timeout= plus buffer time
|
||||
const clientSideTimeoutMs = this.opts.pollTimeout + BUFFER_PERIOD_MS;
|
||||
|
||||
this._currentSyncRequest = client._http.authedRequest(
|
||||
undefined, "GET", "/sync", qps, undefined, clientSideTimeoutMs,
|
||||
);
|
||||
let isCachedResponse = false;
|
||||
if (self.opts.syncAccumulator && !syncOptions.hasSyncedBefore) {
|
||||
const data = self.opts.syncAccumulator.getJSON();
|
||||
// Don't do an HTTP hit to /sync. Instead, load up the persisted /sync data,
|
||||
// if there is data there.
|
||||
if (data.nextBatch) {
|
||||
console.log("sync(): not doing HTTP hit, instead returning stored /sync");
|
||||
this._currentSyncRequest = q.resolve({
|
||||
next_batch: data.nextBatch,
|
||||
rooms: data.roomsData,
|
||||
});
|
||||
isCachedResponse = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!isCachedResponse) {
|
||||
this._currentSyncRequest = client._http.authedRequest(
|
||||
undefined, "GET", "/sync", qps, undefined, clientSideTimeoutMs,
|
||||
);
|
||||
}
|
||||
|
||||
this._currentSyncRequest.done(function(data) {
|
||||
// set the sync token NOW *before* processing the events. We do this so
|
||||
@@ -531,7 +549,10 @@ SyncApi.prototype._sync = function(syncOptions) {
|
||||
console.error("Caught /sync error", e.stack || e);
|
||||
}
|
||||
|
||||
if(self.opts.syncAccumulator) {
|
||||
// If there's an accumulator then the first HTTP response is actually the
|
||||
// accumulated data. We don't want to accumulate the same thing twice, so
|
||||
// only accumulate if this isn't a cached response.
|
||||
if (!isCachedResponse) {
|
||||
self.opts.syncAccumulator.accumulateRooms(data);
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user