You've already forked matrix-js-sdk
mirror of
https://github.com/matrix-org/matrix-js-sdk.git
synced 2025-07-31 15:24:23 +03:00
Add txn_id support to sliding sync
This allows clients to know when a request has been applied on the server. This allows us to change `resend(): void` to `resend(): Promise<string>` which resolves/rejects with the transaction ID when it has been applied/not.
This commit is contained in:
@ -562,6 +562,225 @@ describe("SlidingSync", () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe("transaction IDs", () => {
|
||||||
|
beforeAll(setupClient);
|
||||||
|
afterAll(teardownClient);
|
||||||
|
const roomId = "!foo:bar";
|
||||||
|
|
||||||
|
let slidingSync: SlidingSync;
|
||||||
|
|
||||||
|
// really this applies to them all but it's easier to just test one
|
||||||
|
it("should resolve modifyRoomSubscriptions after SlidingSync.start() is called", async () => {
|
||||||
|
const roomSubInfo = {
|
||||||
|
timeline_limit: 1,
|
||||||
|
required_state: [
|
||||||
|
["m.room.name", ""],
|
||||||
|
],
|
||||||
|
};
|
||||||
|
// add the subscription
|
||||||
|
slidingSync = new SlidingSync(proxyBaseUrl, [], roomSubInfo, client, 1);
|
||||||
|
// modification before SlidingSync.start()
|
||||||
|
const subscribePromise = slidingSync.modifyRoomSubscriptions(new Set([roomId]));
|
||||||
|
let txnId;
|
||||||
|
httpBackend.when("POST", syncUrl).check(function(req) {
|
||||||
|
const body = req.data;
|
||||||
|
logger.log("txn got ", body);
|
||||||
|
expect(body.room_subscriptions).toBeTruthy();
|
||||||
|
expect(body.room_subscriptions[roomId]).toEqual(roomSubInfo);
|
||||||
|
expect(body.txn_id).toBeTruthy();
|
||||||
|
txnId = body.txn_id;
|
||||||
|
}).respond(200, function() {
|
||||||
|
return {
|
||||||
|
pos: "aaa",
|
||||||
|
txn_id: txnId,
|
||||||
|
lists: [],
|
||||||
|
extensions: {},
|
||||||
|
rooms: {
|
||||||
|
[roomId]: {
|
||||||
|
name: "foo bar",
|
||||||
|
required_state: [],
|
||||||
|
timeline: [],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
});
|
||||||
|
slidingSync.start();
|
||||||
|
await httpBackend.flushAllExpected();
|
||||||
|
await subscribePromise;
|
||||||
|
});
|
||||||
|
it("should resolve setList during a connection", async () => {
|
||||||
|
const newList = {
|
||||||
|
ranges: [[0, 20]],
|
||||||
|
};
|
||||||
|
const promise = slidingSync.setList(0, newList);
|
||||||
|
let txnId;
|
||||||
|
httpBackend.when("POST", syncUrl).check(function(req) {
|
||||||
|
const body = req.data;
|
||||||
|
logger.log("txn got ", body);
|
||||||
|
expect(body.room_subscriptions).toBeFalsy();
|
||||||
|
expect(body.lists[0]).toEqual(newList);
|
||||||
|
expect(body.txn_id).toBeTruthy();
|
||||||
|
txnId = body.txn_id;
|
||||||
|
}).respond(200, function() {
|
||||||
|
return {
|
||||||
|
pos: "bbb",
|
||||||
|
txn_id: txnId,
|
||||||
|
lists: [{ count: 5 }],
|
||||||
|
extensions: {},
|
||||||
|
};
|
||||||
|
});
|
||||||
|
await httpBackend.flushAllExpected();
|
||||||
|
await promise;
|
||||||
|
expect(txnId).toBeDefined();
|
||||||
|
});
|
||||||
|
it("should resolve setListRanges during a connection", async () => {
|
||||||
|
const promise = slidingSync.setListRanges(0, [[20, 40]]);
|
||||||
|
let txnId;
|
||||||
|
httpBackend.when("POST", syncUrl).check(function(req) {
|
||||||
|
const body = req.data;
|
||||||
|
logger.log("txn got ", body);
|
||||||
|
expect(body.room_subscriptions).toBeFalsy();
|
||||||
|
expect(body.lists[0]).toEqual({
|
||||||
|
ranges: [[20, 40]],
|
||||||
|
});
|
||||||
|
expect(body.txn_id).toBeTruthy();
|
||||||
|
txnId = body.txn_id;
|
||||||
|
}).respond(200, function() {
|
||||||
|
return {
|
||||||
|
pos: "ccc",
|
||||||
|
txn_id: txnId,
|
||||||
|
lists: [{ count: 5 }],
|
||||||
|
extensions: {},
|
||||||
|
};
|
||||||
|
});
|
||||||
|
await httpBackend.flushAllExpected();
|
||||||
|
await promise;
|
||||||
|
expect(txnId).toBeDefined();
|
||||||
|
});
|
||||||
|
it("should resolve modifyRoomSubscriptionInfo during a connection", async () => {
|
||||||
|
const promise = slidingSync.modifyRoomSubscriptionInfo({
|
||||||
|
timeline_limit: 99,
|
||||||
|
});
|
||||||
|
let txnId;
|
||||||
|
httpBackend.when("POST", syncUrl).check(function(req) {
|
||||||
|
const body = req.data;
|
||||||
|
logger.log("txn got ", body);
|
||||||
|
expect(body.room_subscriptions).toBeTruthy();
|
||||||
|
expect(body.room_subscriptions[roomId]).toEqual({
|
||||||
|
timeline_limit: 99,
|
||||||
|
});
|
||||||
|
expect(body.txn_id).toBeTruthy();
|
||||||
|
txnId = body.txn_id;
|
||||||
|
}).respond(200, function() {
|
||||||
|
return {
|
||||||
|
pos: "ddd",
|
||||||
|
txn_id: txnId,
|
||||||
|
extensions: {},
|
||||||
|
};
|
||||||
|
});
|
||||||
|
await httpBackend.flushAllExpected();
|
||||||
|
await promise;
|
||||||
|
expect(txnId).toBeDefined();
|
||||||
|
});
|
||||||
|
it("should reject earlier pending promises if a later transaction is acknowledged", async () => {
|
||||||
|
// i.e if we have [A,B,C] and see txn_id=C then A,B should be rejected.
|
||||||
|
const gotTxnIds = [];
|
||||||
|
const pushTxn = function(req) {
|
||||||
|
gotTxnIds.push(req.data.txn_id);
|
||||||
|
};
|
||||||
|
const failPromise = slidingSync.setListRanges(0, [[20, 40]]);
|
||||||
|
httpBackend.when("POST", syncUrl).check(pushTxn).respond(200, { pos: "e" }); // missing txn_id
|
||||||
|
await httpBackend.flushAllExpected();
|
||||||
|
const failPromise2 = slidingSync.setListRanges(0, [[60, 70]]);
|
||||||
|
httpBackend.when("POST", syncUrl).check(pushTxn).respond(200, { pos: "f" }); // missing txn_id
|
||||||
|
await httpBackend.flushAllExpected();
|
||||||
|
|
||||||
|
// attach rejection handlers now else if we do it later Jest treats that as an unhandled rejection
|
||||||
|
// which is a fail.
|
||||||
|
expect(failPromise).rejects.toEqual(gotTxnIds[0]);
|
||||||
|
expect(failPromise2).rejects.toEqual(gotTxnIds[1]);
|
||||||
|
|
||||||
|
const okPromise = slidingSync.setListRanges(0, [[0, 20]]);
|
||||||
|
let txnId;
|
||||||
|
httpBackend.when("POST", syncUrl).check((req) => {
|
||||||
|
txnId = req.data.txn_id;
|
||||||
|
}).respond(200, () => {
|
||||||
|
// include the txn_id, earlier requests should now be reject()ed.
|
||||||
|
return {
|
||||||
|
pos: "g",
|
||||||
|
txn_id: txnId,
|
||||||
|
};
|
||||||
|
});
|
||||||
|
await httpBackend.flushAllExpected();
|
||||||
|
await okPromise;
|
||||||
|
|
||||||
|
expect(txnId).toBeDefined();
|
||||||
|
});
|
||||||
|
it("should not reject later pending promises if an earlier transaction is acknowledged", async () => {
|
||||||
|
// i.e if we have [A,B,C] and see txn_id=B then C should not be rejected but A should.
|
||||||
|
const gotTxnIds = [];
|
||||||
|
const pushTxn = function(req) {
|
||||||
|
gotTxnIds.push(req.data.txn_id);
|
||||||
|
};
|
||||||
|
const A = slidingSync.setListRanges(0, [[20, 40]]);
|
||||||
|
httpBackend.when("POST", syncUrl).check(pushTxn).respond(200, { pos: "A" });
|
||||||
|
await httpBackend.flushAllExpected();
|
||||||
|
const B = slidingSync.setListRanges(0, [[60, 70]]);
|
||||||
|
httpBackend.when("POST", syncUrl).check(pushTxn).respond(200, { pos: "B" }); // missing txn_id
|
||||||
|
await httpBackend.flushAllExpected();
|
||||||
|
|
||||||
|
// attach rejection handlers now else if we do it later Jest treats that as an unhandled rejection
|
||||||
|
// which is a fail.
|
||||||
|
expect(A).rejects.toEqual(gotTxnIds[0]);
|
||||||
|
|
||||||
|
const C = slidingSync.setListRanges(0, [[0, 20]]);
|
||||||
|
let pendingC = true;
|
||||||
|
C.finally(() => {
|
||||||
|
pendingC = false;
|
||||||
|
});
|
||||||
|
httpBackend.when("POST", syncUrl).check(pushTxn).respond(200, () => {
|
||||||
|
// include the txn_id for B, so C's promise is outstanding
|
||||||
|
return {
|
||||||
|
pos: "C",
|
||||||
|
txn_id: gotTxnIds[1],
|
||||||
|
};
|
||||||
|
});
|
||||||
|
await httpBackend.flushAllExpected();
|
||||||
|
// A is rejected, see above
|
||||||
|
expect(B).resolves.toEqual(gotTxnIds[1]); // B is resolved
|
||||||
|
expect(pendingC).toBe(true); // C is pending still
|
||||||
|
});
|
||||||
|
it("should do nothing for unknown txn_ids", async () => {
|
||||||
|
const promise = slidingSync.setListRanges(0, [[20, 40]]);
|
||||||
|
let pending = true;
|
||||||
|
promise.finally(() => {
|
||||||
|
pending = false;
|
||||||
|
});
|
||||||
|
let txnId;
|
||||||
|
httpBackend.when("POST", syncUrl).check(function(req) {
|
||||||
|
const body = req.data;
|
||||||
|
logger.log("txn got ", body);
|
||||||
|
expect(body.room_subscriptions).toBeFalsy();
|
||||||
|
expect(body.lists[0]).toEqual({
|
||||||
|
ranges: [[20, 40]],
|
||||||
|
});
|
||||||
|
expect(body.txn_id).toBeTruthy();
|
||||||
|
txnId = body.txn_id;
|
||||||
|
}).respond(200, function() {
|
||||||
|
return {
|
||||||
|
pos: "ccc",
|
||||||
|
txn_id: "bogus transaction id",
|
||||||
|
lists: [{ count: 5 }],
|
||||||
|
extensions: {},
|
||||||
|
};
|
||||||
|
});
|
||||||
|
await httpBackend.flushAllExpected();
|
||||||
|
expect(txnId).toBeDefined();
|
||||||
|
expect(pending).toBe(true);
|
||||||
|
slidingSync.stop();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe("extensions", () => {
|
describe("extensions", () => {
|
||||||
beforeAll(setupClient);
|
beforeAll(setupClient);
|
||||||
afterAll(teardownClient);
|
afterAll(teardownClient);
|
||||||
|
@ -68,6 +68,7 @@ export interface MSC3575SlidingSyncRequest {
|
|||||||
unsubscribe_rooms?: string[];
|
unsubscribe_rooms?: string[];
|
||||||
room_subscriptions?: Record<string, MSC3575RoomSubscription>;
|
room_subscriptions?: Record<string, MSC3575RoomSubscription>;
|
||||||
extensions?: object;
|
extensions?: object;
|
||||||
|
txn_id?: string;
|
||||||
|
|
||||||
// query params
|
// query params
|
||||||
pos?: string;
|
pos?: string;
|
||||||
@ -126,6 +127,7 @@ type Operation = DeleteOperation | InsertOperation | InvalidateOperation | SyncO
|
|||||||
*/
|
*/
|
||||||
export interface MSC3575SlidingSyncResponse {
|
export interface MSC3575SlidingSyncResponse {
|
||||||
pos: string;
|
pos: string;
|
||||||
|
txn_id?: string;
|
||||||
lists: ListResponse[];
|
lists: ListResponse[];
|
||||||
rooms: Record<string, MSC3575RoomData>;
|
rooms: Record<string, MSC3575RoomData>;
|
||||||
extensions: object;
|
extensions: object;
|
||||||
@ -334,6 +336,11 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
|
|||||||
private terminated = false;
|
private terminated = false;
|
||||||
// flag set when resend() is called because we cannot rely on detecting AbortError in JS SDK :(
|
// flag set when resend() is called because we cannot rely on detecting AbortError in JS SDK :(
|
||||||
private needsResend = false;
|
private needsResend = false;
|
||||||
|
// the txn_id to send with the next request.
|
||||||
|
private txnId?: string = null;
|
||||||
|
// a list (in chronological order of when they were sent) of objects containing the txn ID and
|
||||||
|
// a defer to resolve/reject depending on whether they were successfully sent or not.
|
||||||
|
private txnIdDefers: {txnId: string, resolve: Function, reject: Function}[] = [];
|
||||||
// map of extension name to req/resp handler
|
// map of extension name to req/resp handler
|
||||||
private extensions: Record<string, Extension> = {};
|
private extensions: Record<string, Extension> = {};
|
||||||
|
|
||||||
@ -404,9 +411,9 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
|
|||||||
* @param index The list index to modify
|
* @param index The list index to modify
|
||||||
* @param ranges The new ranges to apply.
|
* @param ranges The new ranges to apply.
|
||||||
*/
|
*/
|
||||||
public setListRanges(index: number, ranges: number[][]): void {
|
public setListRanges(index: number, ranges: number[][]): Promise<string> {
|
||||||
this.lists[index].updateListRange(ranges);
|
this.lists[index].updateListRange(ranges);
|
||||||
this.resend();
|
return this.resend();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -415,14 +422,14 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
|
|||||||
* @param index The index to modify
|
* @param index The index to modify
|
||||||
* @param list The new list parameters.
|
* @param list The new list parameters.
|
||||||
*/
|
*/
|
||||||
public setList(index: number, list: MSC3575List): void {
|
public setList(index: number, list: MSC3575List): Promise<string> {
|
||||||
if (this.lists[index]) {
|
if (this.lists[index]) {
|
||||||
this.lists[index].replaceList(list);
|
this.lists[index].replaceList(list);
|
||||||
} else {
|
} else {
|
||||||
this.lists[index] = new SlidingList(list);
|
this.lists[index] = new SlidingList(list);
|
||||||
}
|
}
|
||||||
this.listModifiedCount += 1;
|
this.listModifiedCount += 1;
|
||||||
this.resend();
|
return this.resend();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -439,9 +446,9 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
|
|||||||
* prepare the room subscriptions for when start() is called.
|
* prepare the room subscriptions for when start() is called.
|
||||||
* @param s The new desired room subscriptions.
|
* @param s The new desired room subscriptions.
|
||||||
*/
|
*/
|
||||||
public modifyRoomSubscriptions(s: Set<string>) {
|
public modifyRoomSubscriptions(s: Set<string>): Promise<string> {
|
||||||
this.desiredRoomSubscriptions = s;
|
this.desiredRoomSubscriptions = s;
|
||||||
this.resend();
|
return this.resend();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -449,10 +456,10 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
|
|||||||
* such that they will be sent up afresh.
|
* such that they will be sent up afresh.
|
||||||
* @param rs The new room subscription fields to fetch.
|
* @param rs The new room subscription fields to fetch.
|
||||||
*/
|
*/
|
||||||
public modifyRoomSubscriptionInfo(rs: MSC3575RoomSubscription): void {
|
public modifyRoomSubscriptionInfo(rs: MSC3575RoomSubscription): Promise<string> {
|
||||||
this.roomSubscriptionInfo = rs;
|
this.roomSubscriptionInfo = rs;
|
||||||
this.confirmedRoomSubscriptions = new Set<string>();
|
this.confirmedRoomSubscriptions = new Set<string>();
|
||||||
this.resend();
|
return this.resend();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -615,11 +622,52 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Resend a Sliding Sync request. Used when something has changed in the request.
|
* Resend a Sliding Sync request. Used when something has changed in the request. Resolves with
|
||||||
|
* the transaction ID of this request on success. Rejects with the transaction ID of this request
|
||||||
|
* on failure.
|
||||||
*/
|
*/
|
||||||
public resend(): void {
|
public resend(): Promise<string> {
|
||||||
this.needsResend = true;
|
this.needsResend = true;
|
||||||
|
this.txnId = ""+Math.random();
|
||||||
|
const p: Promise<string> = new Promise((resolve, reject) => {
|
||||||
|
this.txnIdDefers.push({
|
||||||
|
txnId: this.txnId,
|
||||||
|
resolve: resolve,
|
||||||
|
reject: reject,
|
||||||
|
});
|
||||||
|
});
|
||||||
this.pendingReq?.abort();
|
this.pendingReq?.abort();
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
|
||||||
|
private resolveTransactionDefers(txnId?: string) {
|
||||||
|
if (!txnId) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// find the matching index
|
||||||
|
let txnIndex = -1;
|
||||||
|
for (let i = 0; i < this.txnIdDefers.length; i++) {
|
||||||
|
if (this.txnIdDefers[i].txnId === txnId) {
|
||||||
|
txnIndex = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (txnIndex === -1) {
|
||||||
|
// this shouldn't happen; we shouldn't be seeing txn_ids for things we don't know about,
|
||||||
|
// whine about it.
|
||||||
|
logger.warn(`resolveTransactionDefers: seen ${txnId} but it isn't a pending txn, ignoring.`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// This list is sorted in time, so if the input txnId ACKs in the middle of this array,
|
||||||
|
// then everything before it that hasn't been ACKed yet never will and we should reject them.
|
||||||
|
for (let i = 0; i < txnIndex; i++) {
|
||||||
|
if (i < txnIndex) {
|
||||||
|
this.txnIdDefers[i].reject(this.txnIdDefers[i].txnId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.txnIdDefers[txnIndex].resolve(txnId);
|
||||||
|
// clear out settled promises, incuding the one we resolved.
|
||||||
|
this.txnIdDefers = this.txnIdDefers.slice(txnIndex+1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -666,6 +714,10 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
|
|||||||
reqBody.room_subscriptions[roomId] = this.roomSubscriptionInfo;
|
reqBody.room_subscriptions[roomId] = this.roomSubscriptionInfo;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (this.txnId) {
|
||||||
|
reqBody.txn_id = this.txnId;
|
||||||
|
this.txnId = null;
|
||||||
|
}
|
||||||
this.pendingReq = this.client.slidingSync(reqBody, this.proxyBaseUrl);
|
this.pendingReq = this.client.slidingSync(reqBody, this.proxyBaseUrl);
|
||||||
resp = await this.pendingReq;
|
resp = await this.pendingReq;
|
||||||
logger.debug(resp);
|
logger.debug(resp);
|
||||||
@ -747,6 +799,8 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
|
|||||||
i, this.lists[i].joinedCount, Object.assign({}, this.lists[i].roomIndexToRoomId),
|
i, this.lists[i].joinedCount, Object.assign({}, this.lists[i].roomIndexToRoomId),
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
this.resolveTransactionDefers(resp.txn_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user