1
0
mirror of https://github.com/matrix-org/matrix-js-sdk.git synced 2025-08-21 04:02:35 +03:00

Design the API for the scheduler and hook MatrixClient up to it.

Scheduler itself still needs internal impl.
This commit is contained in:
Kegan Dougal
2015-06-19 15:50:05 +01:00
parent 8a9f84a4b2
commit 2f78ceb6fc
3 changed files with 135 additions and 75 deletions

View File

@@ -4,6 +4,7 @@
* @module client
*/
var EventEmitter = require("events").EventEmitter;
var q = require("q");
var httpApi = require("./http-api");
var MatrixEvent = require("./models/event").MatrixEvent;
@@ -33,7 +34,9 @@ var utils = require("./utils");
* @param {Object} opts.store Optional. The data store to use. If not specified,
* this client will not store any HTTP responses.
* @param {Object} opts.scheduler Optional. The scheduler to use. If not
* specified, this client will not retry requests on failure.
* specified, this client will not retry requests on failure. This client
* will supply its own processing function to
* {@link module:scheduler~MatrixScheduler#setProcessFunction}.
*/
function MatrixClient(opts) {
utils.checkObjectHasKeys(opts, ["baseUrl", "request"]);
@@ -43,6 +46,12 @@ function MatrixClient(opts) {
this.store = opts.store;
this.scheduler = opts.scheduler;
if (this.scheduler) {
var self = this;
this.scheduler.setProcessFunction(function(eventToSend, txnId) {
return _sendEvent(self, eventToSend, txnId);
});
}
// track our position in the overall eventstream
this.fromToken = undefined;
this.clientRunning = false;
@@ -233,22 +242,11 @@ MatrixClient.prototype.sendEvent = function(roomId, eventType, content, txnId,
callback) {
if (utils.isFunction(txnId)) { callback = txnId; txnId = undefined; }
if (!txnId) {
txnId = "m" + new Date().getTime();
}
var path = utils.encodeUri("/rooms/$roomId/send/$eventType/$txnId", {
$roomId: roomId,
$eventType: eventType,
$txnId: txnId
});
// add this event immediately to the local store as 'sending'.
// NB: Don't need to check for this.store since getRoom does.
// we always construct a MatrixEvent when sending because the store and
// scheduler use them. We'll extract the params back out if it turns out
// the client has no scheduler or store.
var room = this.getRoom(roomId);
var localEvent = null;
if (room) {
localEvent = new MatrixEvent({
var localEvent = new MatrixEvent({
event_id: "~" + roomId + ":" + txnId,
user_id: this.credentials.userId,
room_id: roomId,
@@ -256,22 +254,30 @@ MatrixClient.prototype.sendEvent = function(roomId, eventType, content, txnId,
origin_server_ts: new Date().getTime(),
content: content
});
// add this event immediately to the local store as 'sending'.
// NB: Don't need to check for this.store since room will be null otherwise.
if (room) {
localEvent.status = EventStatus.SENDING;
room.addEventsToTimeline([localEvent]);
}
var publicDefer = q.defer(); // need a public defer to invoke callback fns..
var promise;
// this event may be queued
if (this.scheduler) {
var queueDefer = this.scheduler.queueEvent(localEvent);
if (queueDefer) {
// the scheduler has control now, so return
// return queueDefer;
}
// if this returns a promsie then the scheduler has control now and will
// resolve/reject when it is done. Internally, the scheduler will invoke
// processFn which is set to this._sendEvent so the same code path is
// executed regardless.
// promise = this.scheduler.queueEvent(localEvent, txnId);
}
return this._http.authedRequest(
callback, "PUT", path, undefined, content
).then(function(res) {
if (!promise) {
promise = _sendEvent(this, localEvent, txnId);
}
promise.done(function(res) { // the request was sent OK
if (room && localEvent) {
var eventId = res.event_id;
// try to find an event with this event_id. If we find it, this is
@@ -292,13 +298,58 @@ MatrixClient.prototype.sendEvent = function(roomId, eventType, content, txnId,
localEvent.status = null;
}
}
}, function(err) {
// resolve public api callbacks/promises
if (callback) {
callback(null, res);
}
publicDefer.resolve(res);
}, function(err) { // the request failed to send.
if (localEvent) {
localEvent.status = EventStatus.NOT_SENT;
}
// reject public api callbacks/promises
if (callback) {
callback(err);
}
publicDefer.reject(err);
});
return publicDefer.promise;
};
function _sendEvent(client, event, txnId) {
if (!txnId) {
txnId = "m" + new Date().getTime();
}
var pathParams = {
$roomId: event.getRoomId(),
$eventType: event.getType(),
$stateKey: event.getStateKey(),
$txnId: txnId
};
var path;
if (event.isState()) {
var pathTemplate = "/rooms/$roomId/state/$eventType";
if (event.getStateKey() && event.getStateKey().length > 0) {
pathTemplate = "/rooms/$roomId/state/$eventType/$stateKey";
}
path = utils.encodeUri(pathTemplate, pathParams);
}
else {
path = utils.encodeUri(
"/rooms/$roomId/send/$eventType/$txnId", pathParams
);
}
return client._http.authedRequest(
undefined, "PUT", path, undefined, event.getContent()
);
}
/**
* @param {string} roomId
* @param {Object} content

View File

@@ -57,9 +57,7 @@ module.exports.createClient = function(opts) {
}
opts.request = request;
opts.store = new module.exports.MatrixInMemoryStore();
opts.scheduler = new module.exports.MatrixScheduler(
function() {} // TODO
);
opts.scheduler = new module.exports.MatrixScheduler();
return new module.exports.MatrixClient(opts);
};

View File

@@ -5,12 +5,13 @@
* @module scheduler
*/
var utils = require("./utils");
var q = require("q");
/**
* Construct a scheduler for Matrix.
* Construct a scheduler for Matrix. Requires
* {@link module:scheduler~MatrixScheduler#setProcessFunction} to be provided
* with a way of processing events.
* @constructor
* @param {module:scheduler~processFn} processFn Required. The function that can
* process events in the queue.
* @param {module:scheduler~retryAlgorithm} retryAlgorithm Optional. The retry
* algorithm to apply when determining when to try to send an event again.
* Defaults to {@link module:scheduler~MatrixScheduler.RETRY_BACKOFF_RATELIMIT}.
@@ -18,62 +19,55 @@ var utils = require("./utils");
* algorithm to apply when determining which events should be sent before the
* given event. Defaults to {@link module:scheduler~MatrixScheduler.QUEUE_MESSAGES}.
*/
function MatrixScheduler(processFn, retryAlgorithm, queueAlgorithm) {
if (!utils.isFunction(processFn)) {
throw new Error("processFn must be a function.");
}
this.processFn = processFn;
function MatrixScheduler(retryAlgorithm, queueAlgorithm) {
this.retryAlgorithm = retryAlgorithm || MatrixScheduler.RETRY_BACKOFF_RATELIMIT;
this.queueAlgorithm = queueAlgorithm || MatrixScheduler.QUEUE_MESSAGES;
this._queues = {
// queueName: [MatrixEvent, ...]
// queueName: [{
// event: MatrixEvent, // event to send
// defer: Deferred, // defer to resolve/reject at the END of the retries
// txnId: String // desired transaction ID
// }, ...]
};
this._procFn = null;
}
/**
* Makes the scheduler start processing events in the queues if it isn't already.
* Set the process function. Required for events in the queue to be processed.
* If set after events have been added to the queue, this will immediately start
* processing them.
* @param {module:scheduler~processFn} fn The function that can process events
* in the queue.
*/
MatrixScheduler.prototype.checkQueuesRunning = function() {
// TODO
MatrixScheduler.prototype.setProcessFunction = function(fn) {
this._procFn = fn;
_startProcessingQueues(this);
};
/**
* Remove the head of the queue. Reduces the length of the queue by 1.
* @param {string} queueName The name of the queue to get the event from.
* @return {MatrixEvent} The head of the queue or <code>null</code>.
* Queue an event if it is required and start processing queues.
* @param {MatrixEvent} event The event that may be queued.
* @param {String} txnId Optional. The txnId to set when sending.
* @return {?Promise} A promise if the event was queued, which will be
* resolved or rejected in due time, else null.
*/
MatrixScheduler.prototype.removeNextEvent = function(queueName) {
var queue = this._queues[queueName];
if (!utils.isArray(queue)) {
MatrixScheduler.prototype.queueEvent = function(event, txnId) {
var queueName = this.queueAlgorithm(event);
if (!queueName) {
return null;
}
return queue.shift();
};
/**
* Add an event to the end of the queue.
* @param {string} queueName The name of the queue to add the event to.
* @param {MatrixEvent} event The event to queue.
*/
MatrixScheduler.prototype.addEventToQueue = function(queueName, event) {
// add the event to the queue and make a deferred for it.
if (!this._queues[queueName]) {
this._queues[queueName] = [];
}
this._queues[queueName].push(event);
};
/**
* Queue an event if it is required.
* @param {MatrixEvent} event The event that may be queued.
* @return {boolean} True if the event was queued, else false.
*/
MatrixScheduler.prototype.queueEvent = function(event) {
var queueName = this.queueAlgorithm(event);
if (!queueName) {
return false;
}
this.addEventToQueue(queueName, event);
return true;
var defer = q.defer();
this._queues[queueName].push({
event: event,
defer: defer,
txnId: txnId
});
_startProcessingQueues(this);
return defer.promise;
};
/**
@@ -116,6 +110,22 @@ MatrixScheduler.QUEUE_MESSAGES = function(event) {
return null;
};
function _startProcessingQueues(scheduler) {
if (!scheduler._procFn) {
return;
}
var obj = _removeNextEvent(scheduler, "");
return obj;
}
function _removeNextEvent(scheduler, queueName) {
var queue = scheduler._queues[queueName];
if (!utils.isArray(queue)) {
return null;
}
return queue.shift();
}
/**
* The retry algorithm to apply when retrying events. To stop retrying, return
* <code>-1</code>. If this event was part of a queue, it will be removed from
@@ -149,7 +159,8 @@ MatrixScheduler.QUEUE_MESSAGES = function(event) {
* The function to invoke to process (send) events in the queue.
* @callback processFn
* @param {MatrixEvent} event The event to send.
* @return {Promise} Resolves to the HTTP response, rejects with an HTTP error.
* @param {string} txnId The desired transaction ID for this event.
* @return {Promise} Resolved/rejected depending on the outcome of the request.
*/
/**