diff --git a/examples/node/app.js b/examples/node/app.js index 89f3c80ca..e6f4a34fc 100644 --- a/examples/node/app.js +++ b/examples/node/app.js @@ -53,6 +53,24 @@ rl.on('line', function(line) { else if (line === "/members" && viewingRoom) { printMemberList(); } + else if (line === "/resend" && viewingRoom) { + // get the oldest not sent event. + var notSentEvent; + for (var i = 0; i < viewingRoom.timeline.length; i++) { + if (viewingRoom.timeline[i].status == sdk.EventStatus.NOT_SENT) { + notSentEvent = viewingRoom.timeline[i]; + break; + } + } + if (notSentEvent) { + matrixClient.resendEvent(notSentEvent, viewingRoom).done(function() { + printMessages(); + rl.prompt(); + }, function(err) { + console.log("/resend Error: %s", err); + }); + } + } else if (line.indexOf("/more ") === 0 && viewingRoom) { var amount = parseInt(line.split(" ")[1]) || 20; matrixClient.scrollback(viewingRoom, amount).done(function(room) { @@ -66,15 +84,11 @@ rl.on('line', function(line) { printHelp(); } else if (viewingRoom) { - matrixClient.sendTextMessage(viewingRoom.roomId, line).done(function() { - console.log(CLEAR_CONSOLE); + matrixClient.sendTextMessage(viewingRoom.roomId, line).finally(function() { printMessages(); rl.prompt(); - }, function(err) { - console.log("Error: %s", err); }); // print local echo immediately - console.log(CLEAR_CONSOLE); printMessages(); } rl.prompt(); @@ -127,11 +141,12 @@ function printHelp() { console.log(" '/exit' Return to the room list index."); console.log(" '/members' Show the room member list."); console.log(" '/more 15' Scrollback 15 events"); + console.log(" '/resend' Resend the oldest event which failed to send."); } function completer(line) { var completions = [ - "/help", "/join ", "/exit", "/members", "/more " + "/help", "/join ", "/exit", "/members", "/more ", "/resend" ]; var hits = completions.filter(function(c) { return c.indexOf(line) == 0 }); // show all completions if none found diff --git a/lib/client.js b/lib/client.js index 819280c44..79c55234f 100644 --- a/lib/client.js +++ b/lib/client.js @@ -49,8 +49,8 @@ function MatrixClient(opts) { this.scheduler = opts.scheduler; if (this.scheduler) { var self = this; - this.scheduler.setProcessFunction(function(eventToSend, txnId) { - return _sendEvent(self, eventToSend, txnId); + this.scheduler.setProcessFunction(function(eventToSend) { + return _sendEventHttpRequest(self, eventToSend); }); } // track our position in the overall eventstream @@ -141,6 +141,19 @@ MatrixClient.prototype.joinRoom = function(roomIdOrAlias, callback) { return defer.promise; }; +/** + * Resend an event. + * @param {MatrixEvent} event The event to resend. + * @param {Room} room Optional. The room the event is in. Will update the + * timeline entry if provided. + * @return {module:client.Promise} Resolves: TODO + * @return {module:http-api.MatrixError} Rejects: with an error response. + */ +MatrixClient.prototype.resendEvent = function(event, room) { + event.status = EventStatus.SENDING; + return _sendEvent(this, room, event); +}; + /** * @param {string} roomId * @param {string} name @@ -253,6 +266,9 @@ MatrixClient.prototype.sendStateEvent = function(roomId, eventType, content, sta MatrixClient.prototype.sendEvent = function(roomId, eventType, content, txnId, callback) { if (utils.isFunction(txnId)) { callback = txnId; txnId = undefined; } + if (!txnId) { + txnId = "m" + new Date().getTime(); + } // we always construct a MatrixEvent when sending because the store and // scheduler use them. We'll extract the params back out if it turns out @@ -266,6 +282,7 @@ MatrixClient.prototype.sendEvent = function(roomId, eventType, content, txnId, origin_server_ts: new Date().getTime(), content: content }); + localEvent._txnId = txnId; // add this event immediately to the local store as 'sending'. if (room) { @@ -273,23 +290,27 @@ MatrixClient.prototype.sendEvent = function(roomId, eventType, content, txnId, room.addEventsToTimeline([localEvent]); } - var publicDefer = q.defer(); // need a public defer to invoke callback fns.. + return _sendEvent(this, room, localEvent, callback); +}; + +function _sendEvent(client, room, event, callback) { + var defer = q.defer(); var promise; // this event may be queued - if (this.scheduler) { + if (client.scheduler) { // if this returns a promsie then the scheduler has control now and will // resolve/reject when it is done. Internally, the scheduler will invoke - // processFn which is set to this._sendEvent so the same code path is - // executed regardless. - promise = this.scheduler.queueEvent(localEvent, txnId); + // processFn which is set to this._sendEventHttpRequest so the same code + // path is executed regardless. + promise = client.scheduler.queueEvent(event); } if (!promise) { - promise = _sendEvent(this, localEvent, txnId); + promise = _sendEventHttpRequest(client, event); } promise.done(function(res) { // the request was sent OK - if (room && localEvent) { + if (room) { var eventId = res.event_id; // try to find an event with this event_id. If we find it, this is // the echo of this event *from the event stream* so we can remove @@ -301,45 +322,33 @@ MatrixClient.prototype.sendEvent = function(roomId, eventType, content, txnId, }, true); if (matchingEvent) { utils.removeElement(room.timeline, function(ev) { - return ev.getId() === localEvent.getId(); + return ev.getId() === event.getId(); }, true); } else { - localEvent.event.event_id = res.event_id; - localEvent.status = null; + event.event.event_id = res.event_id; + event.status = null; } } - // resolve public api callbacks/promises - if (callback) { - callback(null, res); - } - publicDefer.resolve(res); - }, function(err) { // the request failed to send. - if (localEvent) { - localEvent.status = EventStatus.NOT_SENT; - } - - // reject public api callbacks/promises - if (callback) { - callback(err); - } - publicDefer.reject(err); + _resolve(callback, defer, res); + }, function(err) { + // the request failed to send. + event.status = EventStatus.NOT_SENT; + _reject(callback, defer, err); }); - return publicDefer.promise; -}; + return defer.promise; +} -function _sendEvent(client, event, txnId) { - if (!txnId) { - txnId = "m" + new Date().getTime(); - } +function _sendEventHttpRequest(client, event) { var pathParams = { $roomId: event.getRoomId(), $eventType: event.getType(), $stateKey: event.getStateKey(), - $txnId: txnId + $txnId: event._txnId ? event._txnId : new Date().getTime() }; + console.log("_txnId = %s", event._txnId); var path; @@ -793,8 +802,8 @@ MatrixClient.prototype.roomState = function(roomId, callback) { /** * Retrieve older messages from the given room and put them in the timeline. * @param {Room} room The room to get older messages in. - * @param {Number} limit Optional. The maximum number of previous events to pull - * in. Default: 30. + * @param {Integer} limit Optional. The maximum number of previous events to + * pull in. Default: 30. * @param {module:client.callback} callback Optional. * @return {module:client.Promise} Resolves: Room. * @return {module:http-api.MatrixError} Rejects: with an error response. diff --git a/lib/scheduler.js b/lib/scheduler.js index 8f7d4adcc..2bc3ca34b 100644 --- a/lib/scheduler.js +++ b/lib/scheduler.js @@ -28,7 +28,6 @@ function MatrixScheduler(retryAlgorithm, queueAlgorithm) { // queueName: [{ // event: MatrixEvent, // event to send // defer: Deferred, // defer to resolve/reject at the END of the retries - // txnId: String // desired transaction ID // attempts: Number // number of times we've called processFn // }, ...] }; @@ -51,11 +50,10 @@ MatrixScheduler.prototype.setProcessFunction = function(fn) { /** * Queue an event if it is required and start processing queues. * @param {MatrixEvent} event The event that may be queued. - * @param {String} txnId Optional. The txnId to set when sending. * @return {?Promise} A promise if the event was queued, which will be * resolved or rejected in due time, else null. */ -MatrixScheduler.prototype.queueEvent = function(event, txnId) { +MatrixScheduler.prototype.queueEvent = function(event) { var queueName = this.queueAlgorithm(event); if (!queueName) { return null; @@ -68,7 +66,6 @@ MatrixScheduler.prototype.queueEvent = function(event, txnId) { this._queues[queueName].push({ event: event, defer: defer, - txnId: txnId, attempts: 0 }); debuglog( @@ -154,7 +151,7 @@ function _processQueue(scheduler, queueName) { ); // fire the process function and if it resolves, resolve the deferred. Else // invoke the retry algorithm. - scheduler._procFn(obj.event, obj.txnId).done(function(res) { + scheduler._procFn(obj.event).done(function(res) { // remove this from the queue _removeNextEvent(scheduler, queueName); debuglog("Queue '%s' sent event %s", queueName, obj.event.getId()); @@ -242,7 +239,6 @@ function debuglog() { * The function to invoke to process (send) events in the queue. * @callback processFn * @param {MatrixEvent} event The event to send. - * @param {string} txnId The desired transaction ID for this event. * @return {Promise} Resolved/rejected depending on the outcome of the request. */