You've already forked matrix-js-sdk
mirror of
https://github.com/matrix-org/matrix-js-sdk.git
synced 2025-08-06 12:02:40 +03:00
Add MatrixClient.resendEvent to manually resend an event that was not sent.
Bundle txnId as MatrixEvent._txnId instead of exposing it to every place that happens to need it (since it's so tightly coupled with MatrixEvent)
This commit is contained in:
@@ -53,6 +53,24 @@ rl.on('line', function(line) {
|
|||||||
else if (line === "/members" && viewingRoom) {
|
else if (line === "/members" && viewingRoom) {
|
||||||
printMemberList();
|
printMemberList();
|
||||||
}
|
}
|
||||||
|
else if (line === "/resend" && viewingRoom) {
|
||||||
|
// get the oldest not sent event.
|
||||||
|
var notSentEvent;
|
||||||
|
for (var i = 0; i < viewingRoom.timeline.length; i++) {
|
||||||
|
if (viewingRoom.timeline[i].status == sdk.EventStatus.NOT_SENT) {
|
||||||
|
notSentEvent = viewingRoom.timeline[i];
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (notSentEvent) {
|
||||||
|
matrixClient.resendEvent(notSentEvent, viewingRoom).done(function() {
|
||||||
|
printMessages();
|
||||||
|
rl.prompt();
|
||||||
|
}, function(err) {
|
||||||
|
console.log("/resend Error: %s", err);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
else if (line.indexOf("/more ") === 0 && viewingRoom) {
|
else if (line.indexOf("/more ") === 0 && viewingRoom) {
|
||||||
var amount = parseInt(line.split(" ")[1]) || 20;
|
var amount = parseInt(line.split(" ")[1]) || 20;
|
||||||
matrixClient.scrollback(viewingRoom, amount).done(function(room) {
|
matrixClient.scrollback(viewingRoom, amount).done(function(room) {
|
||||||
@@ -66,15 +84,11 @@ rl.on('line', function(line) {
|
|||||||
printHelp();
|
printHelp();
|
||||||
}
|
}
|
||||||
else if (viewingRoom) {
|
else if (viewingRoom) {
|
||||||
matrixClient.sendTextMessage(viewingRoom.roomId, line).done(function() {
|
matrixClient.sendTextMessage(viewingRoom.roomId, line).finally(function() {
|
||||||
console.log(CLEAR_CONSOLE);
|
|
||||||
printMessages();
|
printMessages();
|
||||||
rl.prompt();
|
rl.prompt();
|
||||||
}, function(err) {
|
|
||||||
console.log("Error: %s", err);
|
|
||||||
});
|
});
|
||||||
// print local echo immediately
|
// print local echo immediately
|
||||||
console.log(CLEAR_CONSOLE);
|
|
||||||
printMessages();
|
printMessages();
|
||||||
}
|
}
|
||||||
rl.prompt();
|
rl.prompt();
|
||||||
@@ -127,11 +141,12 @@ function printHelp() {
|
|||||||
console.log(" '/exit' Return to the room list index.");
|
console.log(" '/exit' Return to the room list index.");
|
||||||
console.log(" '/members' Show the room member list.");
|
console.log(" '/members' Show the room member list.");
|
||||||
console.log(" '/more 15' Scrollback 15 events");
|
console.log(" '/more 15' Scrollback 15 events");
|
||||||
|
console.log(" '/resend' Resend the oldest event which failed to send.");
|
||||||
}
|
}
|
||||||
|
|
||||||
function completer(line) {
|
function completer(line) {
|
||||||
var completions = [
|
var completions = [
|
||||||
"/help", "/join ", "/exit", "/members", "/more "
|
"/help", "/join ", "/exit", "/members", "/more ", "/resend"
|
||||||
];
|
];
|
||||||
var hits = completions.filter(function(c) { return c.indexOf(line) == 0 });
|
var hits = completions.filter(function(c) { return c.indexOf(line) == 0 });
|
||||||
// show all completions if none found
|
// show all completions if none found
|
||||||
|
@@ -49,8 +49,8 @@ function MatrixClient(opts) {
|
|||||||
this.scheduler = opts.scheduler;
|
this.scheduler = opts.scheduler;
|
||||||
if (this.scheduler) {
|
if (this.scheduler) {
|
||||||
var self = this;
|
var self = this;
|
||||||
this.scheduler.setProcessFunction(function(eventToSend, txnId) {
|
this.scheduler.setProcessFunction(function(eventToSend) {
|
||||||
return _sendEvent(self, eventToSend, txnId);
|
return _sendEventHttpRequest(self, eventToSend);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
// track our position in the overall eventstream
|
// track our position in the overall eventstream
|
||||||
@@ -141,6 +141,19 @@ MatrixClient.prototype.joinRoom = function(roomIdOrAlias, callback) {
|
|||||||
return defer.promise;
|
return defer.promise;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resend an event.
|
||||||
|
* @param {MatrixEvent} event The event to resend.
|
||||||
|
* @param {Room} room Optional. The room the event is in. Will update the
|
||||||
|
* timeline entry if provided.
|
||||||
|
* @return {module:client.Promise} Resolves: TODO
|
||||||
|
* @return {module:http-api.MatrixError} Rejects: with an error response.
|
||||||
|
*/
|
||||||
|
MatrixClient.prototype.resendEvent = function(event, room) {
|
||||||
|
event.status = EventStatus.SENDING;
|
||||||
|
return _sendEvent(this, room, event);
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {string} roomId
|
* @param {string} roomId
|
||||||
* @param {string} name
|
* @param {string} name
|
||||||
@@ -253,6 +266,9 @@ MatrixClient.prototype.sendStateEvent = function(roomId, eventType, content, sta
|
|||||||
MatrixClient.prototype.sendEvent = function(roomId, eventType, content, txnId,
|
MatrixClient.prototype.sendEvent = function(roomId, eventType, content, txnId,
|
||||||
callback) {
|
callback) {
|
||||||
if (utils.isFunction(txnId)) { callback = txnId; txnId = undefined; }
|
if (utils.isFunction(txnId)) { callback = txnId; txnId = undefined; }
|
||||||
|
if (!txnId) {
|
||||||
|
txnId = "m" + new Date().getTime();
|
||||||
|
}
|
||||||
|
|
||||||
// we always construct a MatrixEvent when sending because the store and
|
// we always construct a MatrixEvent when sending because the store and
|
||||||
// scheduler use them. We'll extract the params back out if it turns out
|
// scheduler use them. We'll extract the params back out if it turns out
|
||||||
@@ -266,6 +282,7 @@ MatrixClient.prototype.sendEvent = function(roomId, eventType, content, txnId,
|
|||||||
origin_server_ts: new Date().getTime(),
|
origin_server_ts: new Date().getTime(),
|
||||||
content: content
|
content: content
|
||||||
});
|
});
|
||||||
|
localEvent._txnId = txnId;
|
||||||
|
|
||||||
// add this event immediately to the local store as 'sending'.
|
// add this event immediately to the local store as 'sending'.
|
||||||
if (room) {
|
if (room) {
|
||||||
@@ -273,23 +290,27 @@ MatrixClient.prototype.sendEvent = function(roomId, eventType, content, txnId,
|
|||||||
room.addEventsToTimeline([localEvent]);
|
room.addEventsToTimeline([localEvent]);
|
||||||
}
|
}
|
||||||
|
|
||||||
var publicDefer = q.defer(); // need a public defer to invoke callback fns..
|
return _sendEvent(this, room, localEvent, callback);
|
||||||
|
};
|
||||||
|
|
||||||
|
function _sendEvent(client, room, event, callback) {
|
||||||
|
var defer = q.defer();
|
||||||
var promise;
|
var promise;
|
||||||
// this event may be queued
|
// this event may be queued
|
||||||
if (this.scheduler) {
|
if (client.scheduler) {
|
||||||
// if this returns a promsie then the scheduler has control now and will
|
// if this returns a promsie then the scheduler has control now and will
|
||||||
// resolve/reject when it is done. Internally, the scheduler will invoke
|
// resolve/reject when it is done. Internally, the scheduler will invoke
|
||||||
// processFn which is set to this._sendEvent so the same code path is
|
// processFn which is set to this._sendEventHttpRequest so the same code
|
||||||
// executed regardless.
|
// path is executed regardless.
|
||||||
promise = this.scheduler.queueEvent(localEvent, txnId);
|
promise = client.scheduler.queueEvent(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!promise) {
|
if (!promise) {
|
||||||
promise = _sendEvent(this, localEvent, txnId);
|
promise = _sendEventHttpRequest(client, event);
|
||||||
}
|
}
|
||||||
|
|
||||||
promise.done(function(res) { // the request was sent OK
|
promise.done(function(res) { // the request was sent OK
|
||||||
if (room && localEvent) {
|
if (room) {
|
||||||
var eventId = res.event_id;
|
var eventId = res.event_id;
|
||||||
// try to find an event with this event_id. If we find it, this is
|
// try to find an event with this event_id. If we find it, this is
|
||||||
// the echo of this event *from the event stream* so we can remove
|
// the echo of this event *from the event stream* so we can remove
|
||||||
@@ -301,45 +322,33 @@ MatrixClient.prototype.sendEvent = function(roomId, eventType, content, txnId,
|
|||||||
}, true);
|
}, true);
|
||||||
if (matchingEvent) {
|
if (matchingEvent) {
|
||||||
utils.removeElement(room.timeline, function(ev) {
|
utils.removeElement(room.timeline, function(ev) {
|
||||||
return ev.getId() === localEvent.getId();
|
return ev.getId() === event.getId();
|
||||||
}, true);
|
}, true);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
localEvent.event.event_id = res.event_id;
|
event.event.event_id = res.event_id;
|
||||||
localEvent.status = null;
|
event.status = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// resolve public api callbacks/promises
|
_resolve(callback, defer, res);
|
||||||
if (callback) {
|
}, function(err) {
|
||||||
callback(null, res);
|
// the request failed to send.
|
||||||
}
|
event.status = EventStatus.NOT_SENT;
|
||||||
publicDefer.resolve(res);
|
_reject(callback, defer, err);
|
||||||
}, function(err) { // the request failed to send.
|
|
||||||
if (localEvent) {
|
|
||||||
localEvent.status = EventStatus.NOT_SENT;
|
|
||||||
}
|
|
||||||
|
|
||||||
// reject public api callbacks/promises
|
|
||||||
if (callback) {
|
|
||||||
callback(err);
|
|
||||||
}
|
|
||||||
publicDefer.reject(err);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
return publicDefer.promise;
|
return defer.promise;
|
||||||
};
|
}
|
||||||
|
|
||||||
function _sendEvent(client, event, txnId) {
|
function _sendEventHttpRequest(client, event) {
|
||||||
if (!txnId) {
|
|
||||||
txnId = "m" + new Date().getTime();
|
|
||||||
}
|
|
||||||
var pathParams = {
|
var pathParams = {
|
||||||
$roomId: event.getRoomId(),
|
$roomId: event.getRoomId(),
|
||||||
$eventType: event.getType(),
|
$eventType: event.getType(),
|
||||||
$stateKey: event.getStateKey(),
|
$stateKey: event.getStateKey(),
|
||||||
$txnId: txnId
|
$txnId: event._txnId ? event._txnId : new Date().getTime()
|
||||||
};
|
};
|
||||||
|
console.log("_txnId = %s", event._txnId);
|
||||||
|
|
||||||
var path;
|
var path;
|
||||||
|
|
||||||
@@ -793,8 +802,8 @@ MatrixClient.prototype.roomState = function(roomId, callback) {
|
|||||||
/**
|
/**
|
||||||
* Retrieve older messages from the given room and put them in the timeline.
|
* Retrieve older messages from the given room and put them in the timeline.
|
||||||
* @param {Room} room The room to get older messages in.
|
* @param {Room} room The room to get older messages in.
|
||||||
* @param {Number} limit Optional. The maximum number of previous events to pull
|
* @param {Integer} limit Optional. The maximum number of previous events to
|
||||||
* in. Default: 30.
|
* pull in. Default: 30.
|
||||||
* @param {module:client.callback} callback Optional.
|
* @param {module:client.callback} callback Optional.
|
||||||
* @return {module:client.Promise} Resolves: Room.
|
* @return {module:client.Promise} Resolves: Room.
|
||||||
* @return {module:http-api.MatrixError} Rejects: with an error response.
|
* @return {module:http-api.MatrixError} Rejects: with an error response.
|
||||||
|
@@ -28,7 +28,6 @@ function MatrixScheduler(retryAlgorithm, queueAlgorithm) {
|
|||||||
// queueName: [{
|
// queueName: [{
|
||||||
// event: MatrixEvent, // event to send
|
// event: MatrixEvent, // event to send
|
||||||
// defer: Deferred, // defer to resolve/reject at the END of the retries
|
// defer: Deferred, // defer to resolve/reject at the END of the retries
|
||||||
// txnId: String // desired transaction ID
|
|
||||||
// attempts: Number // number of times we've called processFn
|
// attempts: Number // number of times we've called processFn
|
||||||
// }, ...]
|
// }, ...]
|
||||||
};
|
};
|
||||||
@@ -51,11 +50,10 @@ MatrixScheduler.prototype.setProcessFunction = function(fn) {
|
|||||||
/**
|
/**
|
||||||
* Queue an event if it is required and start processing queues.
|
* Queue an event if it is required and start processing queues.
|
||||||
* @param {MatrixEvent} event The event that may be queued.
|
* @param {MatrixEvent} event The event that may be queued.
|
||||||
* @param {String} txnId Optional. The txnId to set when sending.
|
|
||||||
* @return {?Promise} A promise if the event was queued, which will be
|
* @return {?Promise} A promise if the event was queued, which will be
|
||||||
* resolved or rejected in due time, else null.
|
* resolved or rejected in due time, else null.
|
||||||
*/
|
*/
|
||||||
MatrixScheduler.prototype.queueEvent = function(event, txnId) {
|
MatrixScheduler.prototype.queueEvent = function(event) {
|
||||||
var queueName = this.queueAlgorithm(event);
|
var queueName = this.queueAlgorithm(event);
|
||||||
if (!queueName) {
|
if (!queueName) {
|
||||||
return null;
|
return null;
|
||||||
@@ -68,7 +66,6 @@ MatrixScheduler.prototype.queueEvent = function(event, txnId) {
|
|||||||
this._queues[queueName].push({
|
this._queues[queueName].push({
|
||||||
event: event,
|
event: event,
|
||||||
defer: defer,
|
defer: defer,
|
||||||
txnId: txnId,
|
|
||||||
attempts: 0
|
attempts: 0
|
||||||
});
|
});
|
||||||
debuglog(
|
debuglog(
|
||||||
@@ -154,7 +151,7 @@ function _processQueue(scheduler, queueName) {
|
|||||||
);
|
);
|
||||||
// fire the process function and if it resolves, resolve the deferred. Else
|
// fire the process function and if it resolves, resolve the deferred. Else
|
||||||
// invoke the retry algorithm.
|
// invoke the retry algorithm.
|
||||||
scheduler._procFn(obj.event, obj.txnId).done(function(res) {
|
scheduler._procFn(obj.event).done(function(res) {
|
||||||
// remove this from the queue
|
// remove this from the queue
|
||||||
_removeNextEvent(scheduler, queueName);
|
_removeNextEvent(scheduler, queueName);
|
||||||
debuglog("Queue '%s' sent event %s", queueName, obj.event.getId());
|
debuglog("Queue '%s' sent event %s", queueName, obj.event.getId());
|
||||||
@@ -242,7 +239,6 @@ function debuglog() {
|
|||||||
* The function to invoke to process (send) events in the queue.
|
* The function to invoke to process (send) events in the queue.
|
||||||
* @callback processFn
|
* @callback processFn
|
||||||
* @param {MatrixEvent} event The event to send.
|
* @param {MatrixEvent} event The event to send.
|
||||||
* @param {string} txnId The desired transaction ID for this event.
|
|
||||||
* @return {Promise} Resolved/rejected depending on the outcome of the request.
|
* @return {Promise} Resolved/rejected depending on the outcome of the request.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user