1
0
mirror of https://github.com/matrix-org/matrix-js-sdk.git synced 2025-11-26 17:03:12 +03:00

Merge branch 'develop' into kegan/sync-v3

This commit is contained in:
kegsay
2022-10-13 10:56:36 +01:00
committed by GitHub
56 changed files with 2718 additions and 2730 deletions

View File

@@ -15,11 +15,11 @@ limitations under the License.
*/
import { logger } from './logger';
import { IAbortablePromise } from "./@types/partials";
import { MatrixClient } from "./client";
import { IRoomEvent, IStateEvent } from "./sync-accumulator";
import { TypedEventEmitter } from "./models//typed-event-emitter";
import { TypedEventEmitter } from "./models/typed-event-emitter";
import { sleep, IDeferred, defer } from "./utils";
import { ConnectionError } from "./http-api";
// /sync requests allow you to set a timeout= but the request may continue
// beyond that and wedge forever, so we need to track how long we are willing
@@ -353,7 +353,8 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
private desiredRoomSubscriptions = new Set<string>(); // the *desired* room subscriptions
private confirmedRoomSubscriptions = new Set<string>();
private pendingReq?: IAbortablePromise<MSC3575SlidingSyncResponse>;
private pendingReq?: Promise<MSC3575SlidingSyncResponse>;
private abortController?: AbortController;
/**
* Create a new sliding sync instance
@@ -700,7 +701,8 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
...d,
txnId: this.txnId,
});
this.pendingReq?.abort();
this.abortController?.abort();
this.abortController = new AbortController();
return d.promise;
}
@@ -728,7 +730,7 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
this.txnIdDefers[i].reject(this.txnIdDefers[i].txnId);
}
this.txnIdDefers[txnIndex].resolve(txnId);
// clear out settled promises, incuding the one we resolved.
// clear out settled promises, including the one we resolved.
this.txnIdDefers = this.txnIdDefers.slice(txnIndex+1);
}
@@ -737,7 +739,7 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
*/
public stop(): void {
this.terminated = true;
this.pendingReq?.abort();
this.abortController?.abort();
// remove all listeners so things can be GC'd
this.removeAllListeners(SlidingSyncEvent.Lifecycle);
this.removeAllListeners(SlidingSyncEvent.List);
@@ -768,6 +770,8 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
* Start syncing with the server. Blocks until stopped.
*/
public async start() {
this.abortController = new AbortController();
let currentPos: string;
while (!this.terminated) {
this.needsResend = false;
@@ -800,7 +804,7 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
reqBody.txn_id = this.txnId;
this.txnId = null;
}
this.pendingReq = this.client.slidingSync(reqBody, this.proxyBaseUrl);
this.pendingReq = this.client.slidingSync(reqBody, this.proxyBaseUrl, this.abortController.signal);
resp = await this.pendingReq;
currentPos = resp.pos;
// update what we think we're subscribed to.
@@ -847,11 +851,8 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
await sleep(50); // in case the 400 was for something else; don't tightloop
continue;
} // else fallthrough to generic error handling
} else if (this.needsResend || err === "aborted") {
// don't sleep as we caused this error by abort()ing the request.
// we check for 'aborted' because that's the error Jest returns and without it
// we get warnings about not exiting fast enough.
continue;
} else if (this.needsResend || err instanceof ConnectionError) {
continue; // don't sleep as we caused this error by abort()ing the request.
}
logger.error(err);
await sleep(5000);