diff --git a/lib/client.js b/lib/client.js index 1edc036b4..bff18efb8 100644 --- a/lib/client.js +++ b/lib/client.js @@ -4,6 +4,7 @@ * @module client */ var EventEmitter = require("events").EventEmitter; +var q = require("q"); var httpApi = require("./http-api"); var MatrixEvent = require("./models/event").MatrixEvent; @@ -33,7 +34,9 @@ var utils = require("./utils"); * @param {Object} opts.store Optional. The data store to use. If not specified, * this client will not store any HTTP responses. * @param {Object} opts.scheduler Optional. The scheduler to use. If not - * specified, this client will not retry requests on failure. + * specified, this client will not retry requests on failure. This client + * will supply its own processing function to + * {@link module:scheduler~MatrixScheduler#setProcessFunction}. */ function MatrixClient(opts) { utils.checkObjectHasKeys(opts, ["baseUrl", "request"]); @@ -43,6 +46,12 @@ function MatrixClient(opts) { this.store = opts.store; this.scheduler = opts.scheduler; + if (this.scheduler) { + var self = this; + this.scheduler.setProcessFunction(function(eventToSend, txnId) { + return _sendEvent(self, eventToSend, txnId); + }); + } // track our position in the overall eventstream this.fromToken = undefined; this.clientRunning = false; @@ -233,45 +242,42 @@ MatrixClient.prototype.sendEvent = function(roomId, eventType, content, txnId, callback) { if (utils.isFunction(txnId)) { callback = txnId; txnId = undefined; } - if (!txnId) { - txnId = "m" + new Date().getTime(); - } - - var path = utils.encodeUri("/rooms/$roomId/send/$eventType/$txnId", { - $roomId: roomId, - $eventType: eventType, - $txnId: txnId + // we always construct a MatrixEvent when sending because the store and + // scheduler use them. We'll extract the params back out if it turns out + // the client has no scheduler or store. + var room = this.getRoom(roomId); + var localEvent = new MatrixEvent({ + event_id: "~" + roomId + ":" + txnId, + user_id: this.credentials.userId, + room_id: roomId, + type: eventType, + origin_server_ts: new Date().getTime(), + content: content }); // add this event immediately to the local store as 'sending'. - // NB: Don't need to check for this.store since getRoom does. - var room = this.getRoom(roomId); - var localEvent = null; + // NB: Don't need to check for this.store since room will be null otherwise. if (room) { - localEvent = new MatrixEvent({ - event_id: "~" + roomId + ":" + txnId, - user_id: this.credentials.userId, - room_id: roomId, - type: eventType, - origin_server_ts: new Date().getTime(), - content: content - }); localEvent.status = EventStatus.SENDING; room.addEventsToTimeline([localEvent]); } + var publicDefer = q.defer(); // need a public defer to invoke callback fns.. + var promise; // this event may be queued if (this.scheduler) { - var queueDefer = this.scheduler.queueEvent(localEvent); - if (queueDefer) { - // the scheduler has control now, so return - // return queueDefer; - } + // if this returns a promsie then the scheduler has control now and will + // resolve/reject when it is done. Internally, the scheduler will invoke + // processFn which is set to this._sendEvent so the same code path is + // executed regardless. + // promise = this.scheduler.queueEvent(localEvent, txnId); } - return this._http.authedRequest( - callback, "PUT", path, undefined, content - ).then(function(res) { + if (!promise) { + promise = _sendEvent(this, localEvent, txnId); + } + + promise.done(function(res) { // the request was sent OK if (room && localEvent) { var eventId = res.event_id; // try to find an event with this event_id. If we find it, this is @@ -292,13 +298,58 @@ MatrixClient.prototype.sendEvent = function(roomId, eventType, content, txnId, localEvent.status = null; } } - }, function(err) { + + // resolve public api callbacks/promises + if (callback) { + callback(null, res); + } + publicDefer.resolve(res); + }, function(err) { // the request failed to send. if (localEvent) { localEvent.status = EventStatus.NOT_SENT; } + + // reject public api callbacks/promises + if (callback) { + callback(err); + } + publicDefer.reject(err); }); + + return publicDefer.promise; }; +function _sendEvent(client, event, txnId) { + if (!txnId) { + txnId = "m" + new Date().getTime(); + } + var pathParams = { + $roomId: event.getRoomId(), + $eventType: event.getType(), + $stateKey: event.getStateKey(), + $txnId: txnId + }; + + var path; + + if (event.isState()) { + var pathTemplate = "/rooms/$roomId/state/$eventType"; + if (event.getStateKey() && event.getStateKey().length > 0) { + pathTemplate = "/rooms/$roomId/state/$eventType/$stateKey"; + } + path = utils.encodeUri(pathTemplate, pathParams); + } + else { + path = utils.encodeUri( + "/rooms/$roomId/send/$eventType/$txnId", pathParams + ); + } + + return client._http.authedRequest( + undefined, "PUT", path, undefined, event.getContent() + ); +} + /** * @param {string} roomId * @param {Object} content diff --git a/lib/matrix.js b/lib/matrix.js index bc7cf8f1f..fbe0bc801 100644 --- a/lib/matrix.js +++ b/lib/matrix.js @@ -57,9 +57,7 @@ module.exports.createClient = function(opts) { } opts.request = request; opts.store = new module.exports.MatrixInMemoryStore(); - opts.scheduler = new module.exports.MatrixScheduler( - function() {} // TODO - ); + opts.scheduler = new module.exports.MatrixScheduler(); return new module.exports.MatrixClient(opts); }; diff --git a/lib/scheduler.js b/lib/scheduler.js index da5f3c181..158a55396 100644 --- a/lib/scheduler.js +++ b/lib/scheduler.js @@ -5,12 +5,13 @@ * @module scheduler */ var utils = require("./utils"); +var q = require("q"); /** - * Construct a scheduler for Matrix. + * Construct a scheduler for Matrix. Requires + * {@link module:scheduler~MatrixScheduler#setProcessFunction} to be provided + * with a way of processing events. * @constructor - * @param {module:scheduler~processFn} processFn Required. The function that can - * process events in the queue. * @param {module:scheduler~retryAlgorithm} retryAlgorithm Optional. The retry * algorithm to apply when determining when to try to send an event again. * Defaults to {@link module:scheduler~MatrixScheduler.RETRY_BACKOFF_RATELIMIT}. @@ -18,62 +19,55 @@ var utils = require("./utils"); * algorithm to apply when determining which events should be sent before the * given event. Defaults to {@link module:scheduler~MatrixScheduler.QUEUE_MESSAGES}. */ -function MatrixScheduler(processFn, retryAlgorithm, queueAlgorithm) { - if (!utils.isFunction(processFn)) { - throw new Error("processFn must be a function."); - } - this.processFn = processFn; +function MatrixScheduler(retryAlgorithm, queueAlgorithm) { this.retryAlgorithm = retryAlgorithm || MatrixScheduler.RETRY_BACKOFF_RATELIMIT; this.queueAlgorithm = queueAlgorithm || MatrixScheduler.QUEUE_MESSAGES; this._queues = { - // queueName: [MatrixEvent, ...] + // queueName: [{ + // event: MatrixEvent, // event to send + // defer: Deferred, // defer to resolve/reject at the END of the retries + // txnId: String // desired transaction ID + // }, ...] }; + this._procFn = null; } /** - * Makes the scheduler start processing events in the queues if it isn't already. + * Set the process function. Required for events in the queue to be processed. + * If set after events have been added to the queue, this will immediately start + * processing them. + * @param {module:scheduler~processFn} fn The function that can process events + * in the queue. */ -MatrixScheduler.prototype.checkQueuesRunning = function() { - // TODO +MatrixScheduler.prototype.setProcessFunction = function(fn) { + this._procFn = fn; + _startProcessingQueues(this); }; /** - * Remove the head of the queue. Reduces the length of the queue by 1. - * @param {string} queueName The name of the queue to get the event from. - * @return {MatrixEvent} The head of the queue or null. + * Queue an event if it is required and start processing queues. + * @param {MatrixEvent} event The event that may be queued. + * @param {String} txnId Optional. The txnId to set when sending. + * @return {?Promise} A promise if the event was queued, which will be + * resolved or rejected in due time, else null. */ -MatrixScheduler.prototype.removeNextEvent = function(queueName) { - var queue = this._queues[queueName]; - if (!utils.isArray(queue)) { +MatrixScheduler.prototype.queueEvent = function(event, txnId) { + var queueName = this.queueAlgorithm(event); + if (!queueName) { return null; } - return queue.shift(); -}; - -/** - * Add an event to the end of the queue. - * @param {string} queueName The name of the queue to add the event to. - * @param {MatrixEvent} event The event to queue. - */ -MatrixScheduler.prototype.addEventToQueue = function(queueName, event) { + // add the event to the queue and make a deferred for it. if (!this._queues[queueName]) { this._queues[queueName] = []; } - this._queues[queueName].push(event); -}; - -/** - * Queue an event if it is required. - * @param {MatrixEvent} event The event that may be queued. - * @return {boolean} True if the event was queued, else false. - */ -MatrixScheduler.prototype.queueEvent = function(event) { - var queueName = this.queueAlgorithm(event); - if (!queueName) { - return false; - } - this.addEventToQueue(queueName, event); - return true; + var defer = q.defer(); + this._queues[queueName].push({ + event: event, + defer: defer, + txnId: txnId + }); + _startProcessingQueues(this); + return defer.promise; }; /** @@ -116,6 +110,22 @@ MatrixScheduler.QUEUE_MESSAGES = function(event) { return null; }; +function _startProcessingQueues(scheduler) { + if (!scheduler._procFn) { + return; + } + var obj = _removeNextEvent(scheduler, ""); + return obj; +} + +function _removeNextEvent(scheduler, queueName) { + var queue = scheduler._queues[queueName]; + if (!utils.isArray(queue)) { + return null; + } + return queue.shift(); +} + /** * The retry algorithm to apply when retrying events. To stop retrying, return * -1. If this event was part of a queue, it will be removed from @@ -149,7 +159,8 @@ MatrixScheduler.QUEUE_MESSAGES = function(event) { * The function to invoke to process (send) events in the queue. * @callback processFn * @param {MatrixEvent} event The event to send. - * @return {Promise} Resolves to the HTTP response, rejects with an HTTP error. + * @param {string} txnId The desired transaction ID for this event. + * @return {Promise} Resolved/rejected depending on the outcome of the request. */ /**