1
0
mirror of https://github.com/matrix-org/matrix-js-sdk.git synced 2025-11-26 17:03:12 +03:00

Add ability to do indexeddb sync work in webworker

This commit is contained in:
David Baker
2017-04-06 11:09:11 +01:00
parent b392656d60
commit f5f05a9a91
6 changed files with 342 additions and 52 deletions

View File

@@ -61,6 +61,8 @@ module.exports.Filter = require("./filter");
module.exports.TimelineWindow = require("./timeline-window").TimelineWindow;
/** The {@link module:interactive-auth} class. */
module.exports.InteractiveAuth = require("./interactive-auth");
/** The {@link module:indexeddb-remote-worker} class. */
module.exports.IndexedDbStoreWorker = require("./store/indexeddb-remote-worker.js");
/**

View File

@@ -15,8 +15,6 @@ limitations under the License.
*/
import q from "q";
import User from "../models/user";
import {MatrixEvent} from "../models/event";
import SyncAccumulator from "../sync-accumulator";
import utils from "../utils";
@@ -104,13 +102,6 @@ const LocalIndexedDBStoreBackend = function LocalIndexedDBStoreBackend(
this._dbName = "matrix-js-sdk:" + (dbName || "default");
this.db = null;
this._syncAccumulator = new SyncAccumulator();
// Records the last-modified-time of each user at the last point we saved
// the database, such that we can derive the set if users that have been
// modified since we last saved.
this._userModifiedMap = {
// user_id : timestamp
};
};
@@ -179,23 +170,16 @@ LocalIndexedDBStoreBackend.prototype = {
},
setSyncData: function(syncData) {
return q().then(() => {
this._syncAccumulator.accumulate(syncData);
});
},
syncToDatabase: function(users) {
// work out changed users (this doesn't handle deletions but you
// can't 'delete' users as they are just presence events).
const changedUsers = users.filter((user) => {
return this._userModifiedMap[user.userId] !== user.getLastModifiedTime();
});
changedUsers.forEach((u) => { // update times
this._userModifiedMap[u.userId] = u.getLastModifiedTime();
});
syncToDatabase: function(userTuples) {
const syncData = this._syncAccumulator.getJSON();
return q.all([
this._persistUsers(changedUsers),
this._persistUserPresenceEvents(userTuples),
this._persistAccountData(syncData.accountData),
this._persistSyncData(syncData.nextBatch, syncData.roomsData),
]);
@@ -239,21 +223,21 @@ LocalIndexedDBStoreBackend.prototype = {
},
/**
* Persist a list of User objects. Users with the same 'userId' will be
* replaced.
* @param {User[]} users An array of users
* Persist a list of [user id, presence event] they are for.
* Users with the same 'userId' will be replaced.
* Prfesence events should be the event in its raw form (not the Event
* object)
* @param {Object[]} users An array of users
* @return {Promise} Resolves if the users were persisted.
*/
_persistUsers: function(users) {
_persistUserPresenceEvents: function(tuples) {
return q.try(() => {
const txn = this.db.transaction(["users"], "readwrite");
const store = txn.objectStore("users");
for (let i = 0; i < users.length; i++) {
for (const tuple of tuples) {
store.put({
userId: users[i].userId,
event: (users[i].events.presence ?
users[i].events.presence.event :
null),
userId: tuple[0],
event: tuple[1],
}); // put == UPSERT
}
return promiseifyTxn(txn);
@@ -261,20 +245,17 @@ LocalIndexedDBStoreBackend.prototype = {
},
/**
* Load all the users from the database. This is not cached.
* @return {Promise<User[]>} A list of users.
* Load all user presence events from the database. This is not cached.
* FIXME: It would probably be more sensible to store the events in the
* sync.
* @return {Promise<Object[]>} A list of presence events in their raw form.
*/
loadUsers: function() {
loadUserPresenceEvents: function() {
return q.try(() => {
const txn = this.db.transaction(["users"], "readonly");
const store = txn.objectStore("users");
return selectQuery(store, undefined, (cursor) => {
const user = new User(cursor.value.userId);
if (cursor.value.event) {
user.setPresenceEvent(new MatrixEvent(cursor.value.event));
}
this._userModifiedMap[user.userId] = user.getLastModifiedTime();
return user;
return [cursor.value.userId, cursor.value.event];
});
});
},

View File

@@ -0,0 +1,157 @@
/*
Copyright 2017 Vector Creations Ltd
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import q from "q";
/**
* An IndexedDB store backend where the actual backend sits in a web
* worker.
*
* Construct a new Indexed Database store backend. This requires a call to
* <code>connect()</code> before this store can be used.
* @constructor
* @param {string} workerScript URL to the worker script
* @param {string=} dbName Optional database name. The same name must be used
* to open the same database.
*/
const RemoteIndexedDBStoreBackend = function RemoteIndexedDBStoreBackend(
workerScript, dbName,
) {
this._worker = new Worker(workerScript);
this._nextSeq = 0;
// The currently in-flight requests to the actual backend
this._inFlight = {
// seq: promise,
};
this._worker.onmessage = this._onWorkerMessage.bind(this);
this._doCmd('_setupWorker', [dbName]).done(() => {
console.log("IndexedDB worker is ready");
});
};
RemoteIndexedDBStoreBackend.prototype = {
/**
* Attempt to connect to the database. This can fail if the user does not
* grant permission.
* @return {Promise} Resolves if successfully connected.
*/
connect: function() {
return this._doCmd('connect');
},
/**
* Clear the entire database. This should be used when logging out of a client
* to prevent mixing data between accounts.
* @return {Promise} Resolved when the database is cleared.
*/
clearDatabase: function() {
return this._doCmd('clearDatabase');
},
/**
* @return {Promise} Resolves with a sync response to restore the
* client state to where it was at the last save, or null if there
* is no saved sync data.
*/
getSavedSync: function() {
return this._doCmd('getSavedSync');
},
setSyncData: function(syncData) {
return this._doCmd('setSyncData', [syncData]);
},
syncToDatabase: function(users) {
return this._doCmd('syncToDatabase', [users]);
},
/**
* Load all user presence events from the database. This is not cached.
* @return {Promise<Object[]>} A list of presence events in their raw form.
*/
loadUserPresenceEvents: function() {
return this._doCmd('loadUserPresenceEvents');
},
/**
* Load all the account data events from the database. This is not cached.
* @return {Promise<Object[]>} A list of raw global account events.
*/
loadAccountData: function() {
return this._doCmd('loadAccountData');
},
/**
* Load the sync data from the database.
* @return {Promise<Object>} An object with "roomsData" and "nextBatch" keys.
*/
loadSyncData: function() {
return this._doCmd('loadSyncData');
},
_doCmd: function(cmd, args) {
// wrap in a q so if the postMessage throws,
// the promise automatically gets rejected
return q().then(() => {
const seq = this._nextSeq++;
const def = q.defer();
this._inFlight[seq] = def;
this._worker.postMessage({
command: cmd,
seq: seq,
args: args,
});
return def.promise;
}).catch((e) => {
console.log(args[0]);
});
},
_onWorkerMessage: function(ev) {
const msg = ev.data;
if (msg.command == 'cmd_success' || msg.command == 'cmd_fail') {
if (msg.seq === undefined) {
console.error("Got reply from worker with no seq");
return;
}
const def = this._inFlight[msg.seq];
if (def === undefined) {
console.error("Got reply for unknown seq " + msg.seq);
return;
}
delete this._inFlight[msg.seq];
if (msg.command == 'cmd_success') {
def.resolve(msg.result);
} else {
def.reject(msg.error);
}
} else {
console.warn("Unrecognised message from worker: " + msg);
}
},
};
export default RemoteIndexedDBStoreBackend;

View File

@@ -0,0 +1,104 @@
/*
Copyright 2017 Vector Creations Ltd
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import q from "q";
import LocalIndexedDBStoreBackend from "./indexeddb-local-backend.js";
/**
* This class lives in the webworker and drives a LocalIndexedDbStoreBackend
* controlled by messages from the main process.
*/
class IndexedDbStoreWorker {
constructor(postMessage) {
this.backend = null;
this.postMessage = postMessage;
this.onMessage = this.onMessage.bind(this);
}
onMessage(ev) {
const msg = ev.data;
let prom;
switch (msg.command) {
case '_setupWorker':
this.backend = new LocalIndexedDBStoreBackend(
// this is the 'indexedDB' global (where global != window
// because it's a web worker and there is no window).
indexedDB, msg.args[0],
);
prom = q();
break;
case 'connect':
prom = this.backend.connect();
break;
case 'clearDatabase':
prom = this.backend.clearDatabase().then((result) => {
// This returns special classes which can't be cloned
// accross to the main script, so don't try.
return {};
});
break;
case 'getSavedSync':
prom = this.backend.getSavedSync(false);
break;
case 'setSyncData':
prom = this.backend.setSyncData(...msg.args);
break;
case 'syncToDatabase':
prom = this.backend.syncToDatabase(...msg.args).then(() => {
// This also returns IndexedDB events which are not cloneable
return {};
});
break;
case 'loadUserPresenceEvents':
prom = this.backend.loadUserPresenceEvents();
break;
case 'loadAccountData':
prom = this.backend.loadAccountData();
break;
case 'loadSyncData':
prom = this.backend.loadSyncData();
break;
}
if (prom === undefined) {
postMessage({
command: 'cmd_fail',
seq: msg.seq,
// Canb't be an Error because they're not structured cloneable
error: "Unrecognised command",
});
return;
}
prom.done((ret) => {
this.postMessage.call(null, {
command: 'cmd_success',
seq: msg.seq,
result: ret,
});
}, (err) => {
this.postMessage.call(null, {
command: 'cmd_fail',
seq: msg.seq,
error: err,
});
});
}
}
module.exports = IndexedDbStoreWorker;

View File

@@ -18,6 +18,9 @@ import q from "q";
import {MatrixInMemoryStore} from "./memory";
import utils from "../utils";
import LocalIndexedDBStoreBackend from "./indexeddb-local-backend.js";
import RemoteIndexedDBStoreBackend from "./indexeddb-remote-backend.js";
import User from "../models/user";
import {MatrixEvent} from "../models/event";
/**
* This is an internal module. See {@link IndexedDBStore} for the public class.
@@ -64,6 +67,10 @@ const WRITE_DELAY_MS = 1000 * 60 * 5; // once every 5 minutes
* <code>window.indexedDB</code>
* @param {string=} opts.dbName Optional database name. The same name must be used
* to open the same database.
* @param {string=} opts.workerScript Optional URL to a script to invooke a web
* worker with to run IndexedDB queries on the web worker. The IndexedDbStoreWorker
* class is provided for this purpose and requires the application to provide a
* trivial wrapper script around it.
* @prop {IndexedDBStoreBackend} backend The backend instance. Call through to
* this API if you need to perform specific indexeddb actions like deleting the
* database.
@@ -75,9 +82,21 @@ const IndexedDBStore = function IndexedDBStore(opts) {
throw new Error('Missing required option: indexedDB');
}
if (opts.workerScript) {
this.backend = new RemoteIndexedDBStoreBackend(opts.workerScript, opts.dbName);
} else {
this.backend = new LocalIndexedDBStoreBackend(opts.indexedDB, opts.dbName);
}
this.startedUp = false;
this._syncTs = Date.now(); // updated when writes to the database are performed
// Records the last-modified-time of each user at the last point we saved
// the database, such that we can derive the set if users that have been
// modified since we last saved.
this._userModifiedMap = {
// user_id : timestamp
};
};
utils.inherits(IndexedDBStore, MatrixInMemoryStore);
@@ -90,22 +109,27 @@ IndexedDBStore.prototype.startup = function() {
}
return this.backend.connect().then(() => {
return q.all([
this.backend.loadUsers(),
this.backend.loadUserPresenceEvents(),
this.backend.loadAccountData(),
this.backend.loadSyncData(),
]);
}).then((values) => {
const [users, accountData, syncData] = values;
const [userPresenceEvents, accountData, syncData] = values;
console.log(
"Loaded data from database: sync from ", syncData.nextBatch,
" -- Reticulating splines...",
);
users.forEach((u) => {
userPresenceEvents.forEach(([userId, rawEvent]) => {
const u = new User(userId);
if (rawEvent) {
u.setPresenceEvent(new MatrixEvent(rawEvent));
}
this._userModifiedMap[u.userId] = u.getLastModifiedTime();
this.storeUser(u);
});
this._syncTs = Date.now(); // pretend we've written so we don't rewrite
this.setSyncToken(syncData.nextBatch);
this.setSyncData({
return this.setSyncData({
next_batch: syncData.nextBatch,
rooms: syncData.roomsData,
account_data: {
@@ -146,7 +170,21 @@ IndexedDBStore.prototype.save = function() {
const now = Date.now();
if (now - this._syncTs > WRITE_DELAY_MS) {
this._syncTs = Date.now(); // set now to guard against multi-writes
return this.backend.syncToDatabase(this.getUsers()).catch((err) => {
// work out changed users (this doesn't handle deletions but you
// can't 'delete' users as they are just presence events).
const userTuples = [];
for (const u of this.getUsers()) {
if (this._userModifiedMap[u.userId] === u.getLastModifiedTime()) continue;
if (!u.events.presence) continue;
userTuples.push([u.userId, u.events.presence.event]);
// note that we've saved this version of the user
this._userModifiedMap[u.userId] = u.getLastModifiedTime();
}
return this.backend.syncToDatabase(userTuples).catch((err) => {
console.error("sync fail:", err);
});
}
@@ -154,7 +192,7 @@ IndexedDBStore.prototype.save = function() {
};
IndexedDBStore.prototype.setSyncData = function(syncData) {
this.backend.setSyncData(syncData);
return this.backend.setSyncData(syncData);
};
module.exports.IndexedDBStore = IndexedDBStore;

View File

@@ -573,7 +573,7 @@ SyncApi.prototype._sync = function(syncOptions) {
);
return this._currentSyncRequest;
}
}).done(function(data) {
}).then(function(data) {
//debuglog('Completed sync, next_batch=' + data.next_batch);
// set the sync token NOW *before* processing the events. We do this so
@@ -584,6 +584,19 @@ SyncApi.prototype._sync = function(syncOptions) {
// Reset after a successful sync
self._failedSyncCount = 0;
// We need to wait until the sync data has been sent to the backend
// because it appears that the sync data gets modified somewhere in
// processing it in such a way as to make it no longer cloneable.
// XXX: Find out what is modifying it!
if (!isCachedResponse) {
// Don't give the store back its own cached data
return client.store.setSyncData(data).then(() => {
return data;
});
} else {
return q(data);
}
}).done((data) => {
try {
self._processSyncResponse(syncToken, data);
} catch (e) {
@@ -592,11 +605,6 @@ SyncApi.prototype._sync = function(syncOptions) {
console.error("Caught /sync error", e.stack || e);
}
// Don't give the store back its own cached data
if (!isCachedResponse) {
client.store.setSyncData(data);
}
// emit synced events
const syncEventData = {
oldSyncToken: syncToken,