1
0
mirror of https://github.com/matrix-org/matrix-js-sdk.git synced 2025-08-07 23:02:56 +03:00

Convert SearchResult, InteractiveAuth, PushProcessor and Scheduler to Typescript

This commit is contained in:
Michael Telatynski
2021-07-07 11:08:54 +01:00
parent dc90115a1b
commit e775bcac3c
11 changed files with 1210 additions and 1033 deletions

View File

@@ -30,7 +30,7 @@ import * as utils from './utils';
import { sleep } from './utils'; import { sleep } from './utils';
import { Group } from "./models/group"; import { Group } from "./models/group";
import { EventTimeline } from "./models/event-timeline"; import { EventTimeline } from "./models/event-timeline";
import { PushAction, PushProcessor } from "./pushprocessor"; import { IActionsObject, PushProcessor } from "./pushprocessor";
import { AutoDiscovery } from "./autodiscovery"; import { AutoDiscovery } from "./autodiscovery";
import * as olmlib from "./crypto/olmlib"; import * as olmlib from "./crypto/olmlib";
import { decodeBase64, encodeBase64 } from "./crypto/olmlib"; import { decodeBase64, encodeBase64 } from "./crypto/olmlib";
@@ -4092,7 +4092,7 @@ export class MatrixClient extends EventEmitter {
* @param {MatrixEvent} event The event to get push actions for. * @param {MatrixEvent} event The event to get push actions for.
* @return {module:pushprocessor~PushAction} A dict of actions to perform. * @return {module:pushprocessor~PushAction} A dict of actions to perform.
*/ */
public getPushActionsForEvent(event: MatrixEvent): PushAction { public getPushActionsForEvent(event: MatrixEvent): IActionsObject {
if (!event.getPushActions()) { if (!event.getPushActions()) {
event.setPushActions(this.pushProcessor.actionsForEvent(event)); event.setPushActions(this.pushProcessor.actionsForEvent(event));
} }

View File

@@ -19,12 +19,84 @@ limitations under the License.
/** @module interactive-auth */ /** @module interactive-auth */
import url from "url"; import url from "url";
import * as utils from "./utils"; import * as utils from "./utils";
import { logger } from './logger'; import { logger } from './logger';
import { MatrixClient } from "./client";
import { defer, IDeferred } from "./utils";
import { MatrixError } from "./http-api";
const EMAIL_STAGE_TYPE = "m.login.email.identity"; const EMAIL_STAGE_TYPE = "m.login.email.identity";
const MSISDN_STAGE_TYPE = "m.login.msisdn"; const MSISDN_STAGE_TYPE = "m.login.msisdn";
interface IFlow {
stages: AuthType[];
}
export interface IInputs {
emailAddress?: string;
phoneCountry?: string;
phoneNumber?: string;
}
export interface IStageStatus {
emailSid?: string;
errcode?: string;
error?: string;
}
export interface IAuthData {
session?: string;
completed?: string[];
flows?: IFlow[];
params?: Record<string, Record<string, any>>;
errcode?: string;
error?: MatrixError;
}
export enum AuthType {
Password = "m.login.password",
Recaptcha = "m.login.recaptcha",
Terms = "m.login.terms",
Email = "m.login.email.identity",
Msisdn = "m.login.msisdn",
Sso = "m.login.sso",
SsoUnstable = "org.matrix.login.sso",
Dummy = "m.login.dummy",
}
export interface IAuthDict {
// [key: string]: any;
type?: string;
// session?: string; // TODO
// TODO: Remove `user` once servers support proper UIA
// See https://github.com/vector-im/element-web/issues/10312
user?: string;
identifier?: any;
password?: string;
response?: string;
// TODO: Remove `threepid_creds` once servers support proper UIA
// See https://github.com/vector-im/element-web/issues/10312
// See https://github.com/matrix-org/matrix-doc/issues/2220
// eslint-disable-next-line camelcase
threepid_creds?: any;
threepidCreds?: any;
}
interface IOpts {
matrixClient: MatrixClient;
authData?: IAuthData;
inputs?: IInputs;
sessionId?: string;
clientSecret?: string;
emailSid?: string;
doRequest(auth: IAuthData, background: boolean): Promise<IAuthData>;
stateUpdated(nextStage: AuthType, status: IStageStatus): void;
requestEmailToken(email: string, secret: string, attempt: number, session: string): Promise<{ sid: string }>;
busyChanged?(busy: boolean): void;
startAuthStage?(nextStage: string): Promise<void>; // LEGACY
}
/** /**
* Abstracts the logic used to drive the interactive auth process. * Abstracts the logic used to drive the interactive auth process.
* *
@@ -51,12 +123,12 @@ const MSISDN_STAGE_TYPE = "m.login.msisdn";
* called with the new auth dict to submit the request. Also passes a * called with the new auth dict to submit the request. Also passes a
* second deprecated arg which is a flag set to true if this request * second deprecated arg which is a flag set to true if this request
* is a background request. The busyChanged callback should be used * is a background request. The busyChanged callback should be used
* instead of the backfround flag. Should return a promise which resolves * instead of the background flag. Should return a promise which resolves
* to the successful response or rejects with a MatrixError. * to the successful response or rejects with a MatrixError.
* *
* @param {function(bool): Promise} opts.busyChanged * @param {function(boolean): Promise} opts.busyChanged
* called whenever the interactive auth logic becomes busy submitting * called whenever the interactive auth logic becomes busy submitting
* information provided by the user or finsihes. After this has been * information provided by the user or finishes. After this has been
* called with true the UI should indicate that a request is in progress * called with true the UI should indicate that a request is in progress
* until it is called again with false. * until it is called again with false.
* *
@@ -102,33 +174,39 @@ const MSISDN_STAGE_TYPE = "m.login.msisdn";
* attemptAuth promise. * attemptAuth promise.
* *
*/ */
export function InteractiveAuth(opts) { export class InteractiveAuth {
this._matrixClient = opts.matrixClient; private readonly matrixClient: MatrixClient;
this._data = opts.authData || {}; private readonly inputs: IInputs;
this._requestCallback = opts.doRequest; private readonly clientSecret: string;
this._busyChangedCallback = opts.busyChanged; private readonly requestCallback: IOpts["doRequest"];
// startAuthStage included for backwards compat private readonly busyChangedCallback?: IOpts["busyChanged"];
this._stateUpdatedCallback = opts.stateUpdated || opts.startAuthStage; private readonly stateUpdatedCallback: IOpts["stateUpdated"];
this._resolveFunc = null; private readonly requestEmailTokenCallback: IOpts["requestEmailToken"];
this._rejectFunc = null;
this._inputs = opts.inputs || {};
this._requestEmailTokenCallback = opts.requestEmailToken;
if (opts.sessionId) this._data.session = opts.sessionId; private data: IAuthData;
this._clientSecret = opts.clientSecret || this._matrixClient.generateClientSecret(); private emailSid?: string;
this._emailSid = opts.emailSid; private requestingEmailToken = false;
if (this._emailSid === undefined) this._emailSid = null; private attemptAuthDeferred: IDeferred<IAuthData> = null;
this._requestingEmailToken = false; private chosenFlow: IFlow = null;
private currentStage: string = null;
this._chosenFlow = null;
this._currentStage = null;
// if we are currently trying to submit an auth dict (which includes polling) // if we are currently trying to submit an auth dict (which includes polling)
// the promise the will resolve/reject when it completes // the promise the will resolve/reject when it completes
this._submitPromise = null; private submitPromise: Promise<void> = null;
constructor(opts: IOpts) {
this.data = opts.authData || {};
this.requestCallback = opts.doRequest;
this.busyChangedCallback = opts.busyChanged;
// startAuthStage included for backwards compat
this.stateUpdatedCallback = opts.stateUpdated || opts.startAuthStage;
this.requestEmailTokenCallback = opts.requestEmailToken;
if (opts.sessionId) this.data.session = opts.sessionId;
this.clientSecret = opts.clientSecret || this.matrixClient.generateClientSecret();
this.emailSid = opts.emailSid ?? null;
} }
InteractiveAuth.prototype = {
/** /**
* begin the authentication process. * begin the authentication process.
* *
@@ -136,59 +214,58 @@ InteractiveAuth.prototype = {
* or rejects with the error on failure. Rejects with NoAuthFlowFoundError if * or rejects with the error on failure. Rejects with NoAuthFlowFoundError if
* no suitable authentication flow can be found * no suitable authentication flow can be found
*/ */
attemptAuth: function() { public attemptAuth(): Promise<IAuthData> {
// This promise will be quite long-lived and will resolve when the // This promise will be quite long-lived and will resolve when the
// request is authenticated and completes successfully. // request is authenticated and completes successfully.
return new Promise((resolve, reject) => { this.attemptAuthDeferred = defer();
this._resolveFunc = resolve;
this._rejectFunc = reject;
const hasFlows = this._data && this._data.flows; const hasFlows = this.data && this.data.flows;
// if we have no flows, try a request to acquire the flows // if we have no flows, try a request to acquire the flows
if (!hasFlows) { if (!hasFlows) {
if (this._busyChangedCallback) this._busyChangedCallback(true); this.busyChangedCallback?.(true);
// use the existing sessionid, if one is present. // use the existing sessionid, if one is present.
let auth = null; let auth = null;
if (this._data.session) { if (this.data.session) {
auth = { auth = {
session: this._data.session, session: this.data.session,
}; };
} }
this._doRequest(auth).finally(() => { this.doRequest(auth).finally(() => {
if (this._busyChangedCallback) this._busyChangedCallback(false); this.busyChangedCallback?.(false);
}); });
} else { } else {
this._startNextAuthStage(); this.startNextAuthStage();
}
return this.attemptAuthDeferred.promise;
} }
});
},
/** /**
* Poll to check if the auth session or current stage has been * Poll to check if the auth session or current stage has been
* completed out-of-band. If so, the attemptAuth promise will * completed out-of-band. If so, the attemptAuth promise will
* be resolved. * be resolved.
*/ */
poll: async function() { public async poll(): Promise<void> {
if (!this._data.session) return; if (!this.data.session) return;
// likewise don't poll if there is no auth session in progress // likewise don't poll if there is no auth session in progress
if (!this._resolveFunc) return; if (!this.attemptAuthDeferred) return;
// if we currently have a request in flight, there's no point making // if we currently have a request in flight, there's no point making
// another just to check what the status is // another just to check what the status is
if (this._submitPromise) return; if (this.submitPromise) return;
let authDict = {}; let authDict: IAuthDict = {};
if (this._currentStage == EMAIL_STAGE_TYPE) { if (this.currentStage == EMAIL_STAGE_TYPE) {
// The email can be validated out-of-band, but we need to provide the // The email can be validated out-of-band, but we need to provide the
// creds so the HS can go & check it. // creds so the HS can go & check it.
if (this._emailSid) { if (this.emailSid) {
const creds = { const creds: Record<string, string> = {
sid: this._emailSid, sid: this.emailSid,
client_secret: this._clientSecret, client_secret: this.clientSecret,
}; };
if (await this._matrixClient.doesServerRequireIdServerParam()) { if (await this.matrixClient.doesServerRequireIdServerParam()) {
const idServerParsedUrl = url.parse( const idServerParsedUrl = url.parse(
this._matrixClient.getIdentityServerUrl(), this.matrixClient.getIdentityServerUrl(),
); );
creds.id_server = idServerParsedUrl.host; creds.id_server = idServerParsedUrl.host;
} }
@@ -204,16 +281,16 @@ InteractiveAuth.prototype = {
} }
this.submitAuthDict(authDict, true); this.submitAuthDict(authDict, true);
}, }
/** /**
* get the auth session ID * get the auth session ID
* *
* @return {string} session id * @return {string} session id
*/ */
getSessionId: function() { public getSessionId(): string {
return this._data ? this._data.session : undefined; return this.data ? this.data.session : undefined;
}, }
/** /**
* get the client secret used for validation sessions * get the client secret used for validation sessions
@@ -221,9 +298,9 @@ InteractiveAuth.prototype = {
* *
* @return {string} client secret * @return {string} client secret
*/ */
getClientSecret: function() { public getClientSecret(): string {
return this._clientSecret; return this.clientSecret;
}, }
/** /**
* get the server params for a given stage * get the server params for a given stage
@@ -231,17 +308,13 @@ InteractiveAuth.prototype = {
* @param {string} loginType login type for the stage * @param {string} loginType login type for the stage
* @return {object?} any parameters from the server for this stage * @return {object?} any parameters from the server for this stage
*/ */
getStageParams: function(loginType) { public getStageParams(loginType: string): Record<string, any> {
let params = {}; return this.data.params?.[loginType];
if (this._data && this._data.params) {
params = this._data.params;
} }
return params[loginType];
},
getChosenFlow() { public getChosenFlow(): IFlow {
return this._chosenFlow; return this.chosenFlow;
}, }
/** /**
* submit a new auth dict and fire off the request. This will either * submit a new auth dict and fire off the request. This will either
@@ -249,38 +322,38 @@ InteractiveAuth.prototype = {
* to be called for a new stage. * to be called for a new stage.
* *
* @param {object} authData new auth dict to send to the server. Should * @param {object} authData new auth dict to send to the server. Should
* include a `type` propterty denoting the login type, as well as any * include a `type` property denoting the login type, as well as any
* other params for that stage. * other params for that stage.
* @param {bool} background If true, this request failing will not result * @param {boolean} background If true, this request failing will not result
* in the attemptAuth promise being rejected. This can be set to true * in the attemptAuth promise being rejected. This can be set to true
* for requests that just poll to see if auth has been completed elsewhere. * for requests that just poll to see if auth has been completed elsewhere.
*/ */
submitAuthDict: async function(authData, background) { public async submitAuthDict(authData: IAuthDict, background = false): Promise<void> {
if (!this._resolveFunc) { if (!this.attemptAuthDeferred) {
throw new Error("submitAuthDict() called before attemptAuth()"); throw new Error("submitAuthDict() called before attemptAuth()");
} }
if (!background && this._busyChangedCallback) { if (!background) {
this._busyChangedCallback(true); this.busyChangedCallback?.(true);
} }
// if we're currently trying a request, wait for it to finish // if we're currently trying a request, wait for it to finish
// as otherwise we can get multiple 200 responses which can mean // as otherwise we can get multiple 200 responses which can mean
// things like multiple logins for register requests. // things like multiple logins for register requests.
// (but discard any expections as we only care when its done, // (but discard any exceptions as we only care when its done,
// not whether it worked or not) // not whether it worked or not)
while (this._submitPromise) { while (this.submitPromise) {
try { try {
await this._submitPromise; await this.submitPromise;
} catch (e) { } catch (e) {
} }
} }
// use the sessionid from the last request, if one is present. // use the sessionid from the last request, if one is present.
let auth; let auth;
if (this._data.session) { if (this.data.session) {
auth = { auth = {
session: this._data.session, session: this.data.session,
}; };
utils.extend(auth, authData); utils.extend(auth, authData);
} else { } else {
@@ -290,15 +363,15 @@ InteractiveAuth.prototype = {
try { try {
// NB. the 'background' flag is deprecated by the busyChanged // NB. the 'background' flag is deprecated by the busyChanged
// callback and is here for backwards compat // callback and is here for backwards compat
this._submitPromise = this._doRequest(auth, background); this.submitPromise = this.doRequest(auth, background);
await this._submitPromise; await this.submitPromise;
} finally { } finally {
this._submitPromise = null; this.submitPromise = null;
if (!background && this._busyChangedCallback) { if (!background) {
this._busyChangedCallback(false); this.busyChangedCallback?.(false);
}
} }
} }
},
/** /**
* Gets the sid for the email validation session * Gets the sid for the email validation session
@@ -306,9 +379,9 @@ InteractiveAuth.prototype = {
* *
* @returns {string} The sid of the email auth session * @returns {string} The sid of the email auth session
*/ */
getEmailSid: function() { public getEmailSid(): string {
return this._emailSid; return this.emailSid;
}, }
/** /**
* Sets the sid for the email validation session * Sets the sid for the email validation session
@@ -318,9 +391,9 @@ InteractiveAuth.prototype = {
* *
* @param {string} sid The sid for the email validation session * @param {string} sid The sid for the email validation session
*/ */
setEmailSid: function(sid) { public setEmailSid(sid: string): void {
this._emailSid = sid; this.emailSid = sid;
}, }
/** /**
* Fire off a request, and either resolve the promise, or call * Fire off a request, and either resolve the promise, or call
@@ -328,33 +401,28 @@ InteractiveAuth.prototype = {
* *
* @private * @private
* @param {object?} auth new auth dict, including session id * @param {object?} auth new auth dict, including session id
* @param {bool?} background If true, this request is a background poll, so it * @param {boolean?} background If true, this request is a background poll, so it
* failing will not result in the attemptAuth promise being rejected. * failing will not result in the attemptAuth promise being rejected.
* This can be set to true for requests that just poll to see if auth has * This can be set to true for requests that just poll to see if auth has
* been completed elsewhere. * been completed elsewhere.
*/ */
_doRequest: async function(auth, background) { private async doRequest(auth: IAuthData, background = false): Promise<void> {
try { try {
const result = await this._requestCallback(auth, background); const result = await this.requestCallback(auth, background);
this._resolveFunc(result); this.attemptAuthDeferred.resolve(result);
this._resolveFunc = null;
this._rejectFunc = null;
} catch (error) { } catch (error) {
// sometimes UI auth errors don't come with flows // sometimes UI auth errors don't come with flows
const errorFlows = error.data ? error.data.flows : null; const errorFlows = error.data ? error.data.flows : null;
const haveFlows = this._data.flows || Boolean(errorFlows); const haveFlows = this.data.flows || Boolean(errorFlows);
if (error.httpStatus !== 401 || !error.data || !haveFlows) { if (error.httpStatus !== 401 || !error.data || !haveFlows) {
// doesn't look like an interactive-auth failure. // doesn't look like an interactive-auth failure.
if (!background) { if (!background) {
this._rejectFunc(error); this.attemptAuthDeferred?.reject(error);
} else { } else {
// We ignore all failures here (even non-UI auth related ones) // We ignore all failures here (even non-UI auth related ones)
// since we don't want to suddenly fail if the internet connection // since we don't want to suddenly fail if the internet connection
// had a blip whilst we were polling // had a blip whilst we were polling
logger.log( logger.log("Background poll request failed doing UI auth: ignoring", error);
"Background poll request failed doing UI auth: ignoring",
error,
);
} }
} }
// if the error didn't come with flows, completed flows or session ID, // if the error didn't come with flows, completed flows or session ID,
@@ -363,37 +431,35 @@ InteractiveAuth.prototype = {
// has not yet been validated). This appears to be a Synapse bug, which // has not yet been validated). This appears to be a Synapse bug, which
// we workaround here. // we workaround here.
if (!error.data.flows && !error.data.completed && !error.data.session) { if (!error.data.flows && !error.data.completed && !error.data.session) {
error.data.flows = this._data.flows; error.data.flows = this.data.flows;
error.data.completed = this._data.completed; error.data.completed = this.data.completed;
error.data.session = this._data.session; error.data.session = this.data.session;
} }
this._data = error.data; this.data = error.data;
try { try {
this._startNextAuthStage(); this.startNextAuthStage();
} catch (e) { } catch (e) {
this._rejectFunc(e); this.attemptAuthDeferred.reject(e);
this._resolveFunc = null;
this._rejectFunc = null;
} }
if ( if (
!this._emailSid && !this.emailSid &&
!this._requestingEmailToken && !this.requestingEmailToken &&
this._chosenFlow.stages.includes('m.login.email.identity') this.chosenFlow.stages.includes(AuthType.Email)
) { ) {
// If we've picked a flow with email auth, we send the email // If we've picked a flow with email auth, we send the email
// now because we want the request to fail as soon as possible // now because we want the request to fail as soon as possible
// if the email address is not valid (ie. already taken or not // if the email address is not valid (ie. already taken or not
// registered, depending on what the operation is). // registered, depending on what the operation is).
this._requestingEmailToken = true; this.requestingEmailToken = true;
try { try {
const requestTokenResult = await this._requestEmailTokenCallback( const requestTokenResult = await this.requestEmailTokenCallback(
this._inputs.emailAddress, this.inputs.emailAddress,
this._clientSecret, this.clientSecret,
1, // TODO: Multiple send attempts? 1, // TODO: Multiple send attempts?
this._data.session, this.data.session,
); );
this._emailSid = requestTokenResult.sid; this.emailSid = requestTokenResult.sid;
// NB. promise is not resolved here - at some point, doRequest // NB. promise is not resolved here - at some point, doRequest
// will be called again and if the user has jumped through all // will be called again and if the user has jumped through all
// the hoops correctly, auth will be complete and the request // the hoops correctly, auth will be complete and the request
@@ -407,15 +473,15 @@ InteractiveAuth.prototype = {
// to do) or it could be a network failure. Either way, pass // to do) or it could be a network failure. Either way, pass
// the failure up as the user can't complete auth if we can't // the failure up as the user can't complete auth if we can't
// send the email, for whatever reason. // send the email, for whatever reason.
this._rejectFunc(e); this.attemptAuthDeferred.reject(e);
this._resolveFunc = null;
this._rejectFunc = null;
} finally { } finally {
this._requestingEmailToken = false; this.requestingEmailToken = false;
} }
} }
} finally {
this.attemptAuthDeferred = null; // TODO
}
} }
},
/** /**
* Pick the next stage and call the callback * Pick the next stage and call the callback
@@ -423,34 +489,34 @@ InteractiveAuth.prototype = {
* @private * @private
* @throws {NoAuthFlowFoundError} If no suitable authentication flow can be found * @throws {NoAuthFlowFoundError} If no suitable authentication flow can be found
*/ */
_startNextAuthStage: function() { private startNextAuthStage(): void {
const nextStage = this._chooseStage(); const nextStage = this.chooseStage();
if (!nextStage) { if (!nextStage) {
throw new Error("No incomplete flows from the server"); throw new Error("No incomplete flows from the server");
} }
this._currentStage = nextStage; this.currentStage = nextStage;
if (nextStage === 'm.login.dummy') { if (nextStage === AuthType.Dummy) {
this.submitAuthDict({ this.submitAuthDict({
type: 'm.login.dummy', type: 'm.login.dummy',
}); });
return; return;
} }
if (this._data && this._data.errcode || this._data.error) { if (this.data && this.data.errcode || this.data.error) {
this._stateUpdatedCallback(nextStage, { this.stateUpdatedCallback(nextStage, {
errcode: this._data.errcode || "", errcode: this.data.errcode || "",
error: this._data.error || "", error: this.data.error || "",
}); });
return; return;
} }
const stageStatus = {}; const stageStatus: IStageStatus = {};
if (nextStage == EMAIL_STAGE_TYPE) { if (nextStage == EMAIL_STAGE_TYPE) {
stageStatus.emailSid = this._emailSid; stageStatus.emailSid = this.emailSid;
}
this.stateUpdatedCallback(nextStage, stageStatus);
} }
this._stateUpdatedCallback(nextStage, stageStatus);
},
/** /**
* Pick the next auth stage * Pick the next auth stage
@@ -459,15 +525,15 @@ InteractiveAuth.prototype = {
* @return {string?} login type * @return {string?} login type
* @throws {NoAuthFlowFoundError} If no suitable authentication flow can be found * @throws {NoAuthFlowFoundError} If no suitable authentication flow can be found
*/ */
_chooseStage: function() { private chooseStage(): AuthType {
if (this._chosenFlow === null) { if (this.chosenFlow === null) {
this._chosenFlow = this._chooseFlow(); this.chosenFlow = this.chooseFlow();
} }
logger.log("Active flow => %s", JSON.stringify(this._chosenFlow)); logger.log("Active flow => %s", JSON.stringify(this.chosenFlow));
const nextStage = this._firstUncompletedStage(this._chosenFlow); const nextStage = this.firstUncompletedStage(this.chosenFlow);
logger.log("Next stage: %s", nextStage); logger.log("Next stage: %s", nextStage);
return nextStage; return nextStage;
}, }
/** /**
* Pick one of the flows from the returned list * Pick one of the flows from the returned list
@@ -475,7 +541,7 @@ InteractiveAuth.prototype = {
* be returned, otherwise, null will be returned. * be returned, otherwise, null will be returned.
* *
* Only flows using all given inputs are chosen because it * Only flows using all given inputs are chosen because it
* is likley to be surprising if the user provides a * is likely to be surprising if the user provides a
* credential and it is not used. For example, for registration, * credential and it is not used. For example, for registration,
* this could result in the email not being used which would leave * this could result in the email not being used which would leave
* the account with no means to reset a password. * the account with no means to reset a password.
@@ -484,14 +550,14 @@ InteractiveAuth.prototype = {
* @return {object} flow * @return {object} flow
* @throws {NoAuthFlowFoundError} If no suitable authentication flow can be found * @throws {NoAuthFlowFoundError} If no suitable authentication flow can be found
*/ */
_chooseFlow: function() { private chooseFlow(): IFlow {
const flows = this._data.flows || []; const flows = this.data.flows || [];
// we've been given an email or we've already done an email part // we've been given an email or we've already done an email part
const haveEmail = Boolean(this._inputs.emailAddress) || Boolean(this._emailSid); const haveEmail = Boolean(this.inputs.emailAddress) || Boolean(this.emailSid);
const haveMsisdn = ( const haveMsisdn = (
Boolean(this._inputs.phoneCountry) && Boolean(this.inputs.phoneCountry) &&
Boolean(this._inputs.phoneNumber) Boolean(this.inputs.phoneNumber)
); );
for (const flow of flows) { for (const flow of flows) {
@@ -513,12 +579,13 @@ InteractiveAuth.prototype = {
// information such that the app can give a better one if so desired. // information such that the app can give a better one if so desired.
const err = new Error("No appropriate authentication flow found"); const err = new Error("No appropriate authentication flow found");
err.name = 'NoAuthFlowFoundError'; err.name = 'NoAuthFlowFoundError';
err.required_stages = []; const requiredStages: string[] = [];
if (haveEmail) err.required_stages.push(EMAIL_STAGE_TYPE); if (haveEmail) requiredStages.push(EMAIL_STAGE_TYPE);
if (haveMsisdn) err.required_stages.push(MSISDN_STAGE_TYPE); if (haveMsisdn) requiredStages.push(MSISDN_STAGE_TYPE);
err.available_flows = flows; (err as any).required_stages = requiredStages;
(err as any).available_flows = flows;
throw err; throw err;
}, }
/** /**
* Get the first uncompleted stage in the given flow * Get the first uncompleted stage in the given flow
@@ -527,14 +594,13 @@ InteractiveAuth.prototype = {
* @param {object} flow * @param {object} flow
* @return {string} login type * @return {string} login type
*/ */
_firstUncompletedStage: function(flow) { private firstUncompletedStage(flow: IFlow): AuthType {
const completed = (this._data || {}).completed || []; const completed = this.data.completed || [];
for (let i = 0; i < flow.stages.length; ++i) { for (let i = 0; i < flow.stages.length; ++i) {
const stageType = flow.stages[i]; const stageType = flow.stages[i];
if (completed.indexOf(stageType) === -1) { if (completed.indexOf(stageType) === -1) {
return stageType; return stageType;
} }
} }
}, }
}; }

View File

@@ -28,6 +28,7 @@ import { EventType, MsgType, RelationType } from "../@types/event";
import { Crypto } from "../crypto"; import { Crypto } from "../crypto";
import { deepSortedObjectEntries } from "../utils"; import { deepSortedObjectEntries } from "../utils";
import { RoomMember } from "./room-member"; import { RoomMember } from "./room-member";
import { IActionsObject } from '../pushprocessor';
/** /**
* Enum for event statuses. * Enum for event statuses.
@@ -148,7 +149,7 @@ export interface IDecryptOptions {
} }
export class MatrixEvent extends EventEmitter { export class MatrixEvent extends EventEmitter {
private pushActions: object = null; private pushActions: IActionsObject = null;
private _replacingEvent: MatrixEvent = null; private _replacingEvent: MatrixEvent = null;
private _localRedactionEvent: MatrixEvent = null; private _localRedactionEvent: MatrixEvent = null;
private _isCancelled = false; private _isCancelled = false;
@@ -935,7 +936,7 @@ export class MatrixEvent extends EventEmitter {
* *
* @return {?Object} push actions * @return {?Object} push actions
*/ */
public getPushActions(): object | null { public getPushActions(): IActionsObject | null {
return this.pushActions; return this.pushActions;
} }
@@ -944,7 +945,7 @@ export class MatrixEvent extends EventEmitter {
* *
* @param {Object} pushActions push actions * @param {Object} pushActions push actions
*/ */
public setPushActions(pushActions: object): void { public setPushActions(pushActions: IActionsObject): void {
this.pushActions = pushActions; this.pushActions = pushActions;
} }

View File

@@ -1,60 +0,0 @@
/*
Copyright 2015, 2016 OpenMarket Ltd
Copyright 2019 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/**
* @module models/search-result
*/
import { EventContext } from "./event-context";
/**
* Construct a new SearchResult
*
* @param {number} rank where this SearchResult ranks in the results
* @param {event-context.EventContext} eventContext the matching event and its
* context
*
* @constructor
*/
export function SearchResult(rank, eventContext) {
this.rank = rank;
this.context = eventContext;
}
/**
* Create a SearchResponse from the response to /search
* @static
* @param {Object} jsonObj
* @param {function} eventMapper
* @return {SearchResult}
*/
SearchResult.fromJson = function(jsonObj, eventMapper) {
const jsonContext = jsonObj.context || {};
const events_before = jsonContext.events_before || [];
const events_after = jsonContext.events_after || [];
const context = new EventContext(eventMapper(jsonObj.result));
context.setPaginateToken(jsonContext.start, true);
context.addEvents(events_before.map(eventMapper), true);
context.addEvents(events_after.map(eventMapper), false);
context.setPaginateToken(jsonContext.end, false);
return new SearchResult(jsonObj.rank, context);
};

View File

@@ -0,0 +1,77 @@
/*
Copyright 2015 - 2021 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/**
* @module models/search-result
*/
import { EventContext } from "./event-context";
import { EventMapper } from "../event-mapper";
import { IRoomEvent } from "../sync-accumulator";
/* eslint-disable camelcase */
interface IContext {
events_before?: IRoomEvent[];
events_after?: IRoomEvent[];
start?: string;
end?: string;
profile_info?: Record<string, {
displayname: string;
avatar_url: string;
}>;
}
/* eslint-enable camelcase */
interface ISearchResult {
rank: number;
result: IRoomEvent;
context: IContext;
}
export class SearchResult {
/**
* Create a SearchResponse from the response to /search
* @static
* @param {Object} jsonObj
* @param {function} eventMapper
* @return {SearchResult}
*/
public static fromJson(jsonObj: ISearchResult, eventMapper: EventMapper): SearchResult {
const jsonContext: IContext = jsonObj.context || {};
const eventsBefore = jsonContext.events_before || [];
const eventsAfter = jsonContext.events_after || [];
const context = new EventContext(eventMapper(jsonObj.result));
context.setPaginateToken(jsonContext.start, true);
context.addEvents(eventsBefore.map(eventMapper), true);
context.addEvents(eventsAfter.map(eventMapper), false);
context.setPaginateToken(jsonContext.end, false);
return new SearchResult(jsonObj.rank, context);
}
/**
* Construct a new SearchResult
*
* @param {number} rank where this SearchResult ranks in the results
* @param {event-context.EventContext} context the matching event and its
* context
*
* @constructor
*/
constructor(public readonly rank: number, public readonly context: EventContext) {}
}

View File

@@ -1,469 +0,0 @@
/*
Copyright 2015, 2016 OpenMarket Ltd
Copyright 2017 New Vector Ltd
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import { escapeRegExp, globToRegexp, isNullOrUndefined } from "./utils";
import { logger } from './logger';
/**
* @module pushprocessor
*/
const RULEKINDS_IN_ORDER = ['override', 'content', 'room', 'sender', 'underride'];
// The default override rules to apply to the push rules that arrive from the server.
// We do this for two reasons:
// 1. Synapse is unlikely to send us the push rule in an incremental sync - see
// https://github.com/matrix-org/synapse/pull/4867#issuecomment-481446072 for
// more details.
// 2. We often want to start using push rules ahead of the server supporting them,
// and so we can put them here.
const DEFAULT_OVERRIDE_RULES = [
{
// For homeservers which don't support MSC1930 yet
rule_id: ".m.rule.tombstone",
default: true,
enabled: true,
conditions: [
{
kind: "event_match",
key: "type",
pattern: "m.room.tombstone",
},
{
kind: "event_match",
key: "state_key",
pattern: "",
},
],
actions: [
"notify",
{
set_tweak: "highlight",
value: true,
},
],
},
{
// For homeservers which don't support MSC2153 yet
rule_id: ".m.rule.reaction",
default: true,
enabled: true,
conditions: [
{
kind: "event_match",
key: "type",
pattern: "m.reaction",
},
],
actions: [
"dont_notify",
],
},
];
/**
* Construct a Push Processor.
* @constructor
* @param {Object} client The Matrix client object to use
*/
export function PushProcessor(client) {
const cachedGlobToRegex = {
// $glob: RegExp,
};
const matchingRuleFromKindSet = (ev, kindset) => {
for (let ruleKindIndex = 0;
ruleKindIndex < RULEKINDS_IN_ORDER.length;
++ruleKindIndex) {
const kind = RULEKINDS_IN_ORDER[ruleKindIndex];
const ruleset = kindset[kind];
if (!ruleset) {
continue;
}
for (let ruleIndex = 0; ruleIndex < ruleset.length; ++ruleIndex) {
const rule = ruleset[ruleIndex];
if (!rule.enabled) {
continue;
}
const rawrule = templateRuleToRaw(kind, rule);
if (!rawrule) {
continue;
}
if (this.ruleMatchesEvent(rawrule, ev)) {
rule.kind = kind;
return rule;
}
}
}
return null;
};
const templateRuleToRaw = function(kind, tprule) {
const rawrule = {
'rule_id': tprule.rule_id,
'actions': tprule.actions,
'conditions': [],
};
switch (kind) {
case 'underride':
case 'override':
rawrule.conditions = tprule.conditions;
break;
case 'room':
if (!tprule.rule_id) {
return null;
}
rawrule.conditions.push({
'kind': 'event_match',
'key': 'room_id',
'value': tprule.rule_id,
});
break;
case 'sender':
if (!tprule.rule_id) {
return null;
}
rawrule.conditions.push({
'kind': 'event_match',
'key': 'user_id',
'value': tprule.rule_id,
});
break;
case 'content':
if (!tprule.pattern) {
return null;
}
rawrule.conditions.push({
'kind': 'event_match',
'key': 'content.body',
'pattern': tprule.pattern,
});
break;
}
return rawrule;
};
const eventFulfillsCondition = function(cond, ev) {
const condition_functions = {
"event_match": eventFulfillsEventMatchCondition,
"contains_display_name": eventFulfillsDisplayNameCondition,
"room_member_count": eventFulfillsRoomMemberCountCondition,
"sender_notification_permission": eventFulfillsSenderNotifPermCondition,
};
if (condition_functions[cond.kind]) {
return condition_functions[cond.kind](cond, ev);
}
// unknown conditions: we previously matched all unknown conditions,
// but given that rules can be added to the base rules on a server,
// it's probably better to not match unknown conditions.
return false;
};
const eventFulfillsSenderNotifPermCondition = function(cond, ev) {
const notifLevelKey = cond['key'];
if (!notifLevelKey) {
return false;
}
const room = client.getRoom(ev.getRoomId());
if (!room || !room.currentState) {
return false;
}
// Note that this should not be the current state of the room but the state at
// the point the event is in the DAG. Unfortunately the js-sdk does not store
// this.
return room.currentState.mayTriggerNotifOfType(notifLevelKey, ev.getSender());
};
const eventFulfillsRoomMemberCountCondition = function(cond, ev) {
if (!cond.is) {
return false;
}
const room = client.getRoom(ev.getRoomId());
if (!room || !room.currentState || !room.currentState.members) {
return false;
}
const memberCount = room.currentState.getJoinedMemberCount();
const m = cond.is.match(/^([=<>]*)([0-9]*)$/);
if (!m) {
return false;
}
const ineq = m[1];
const rhs = parseInt(m[2]);
if (isNaN(rhs)) {
return false;
}
switch (ineq) {
case '':
case '==':
return memberCount == rhs;
case '<':
return memberCount < rhs;
case '>':
return memberCount > rhs;
case '<=':
return memberCount <= rhs;
case '>=':
return memberCount >= rhs;
default:
return false;
}
};
const eventFulfillsDisplayNameCondition = function(cond, ev) {
let content = ev.getContent();
if (ev.isEncrypted() && ev.getClearContent()) {
content = ev.getClearContent();
}
if (!content || !content.body || typeof content.body != 'string') {
return false;
}
const room = client.getRoom(ev.getRoomId());
if (!room || !room.currentState || !room.currentState.members ||
!room.currentState.getMember(client.credentials.userId)) {
return false;
}
const displayName = room.currentState.getMember(client.credentials.userId).name;
// N.B. we can't use \b as it chokes on unicode. however \W seems to be okay
// as shorthand for [^0-9A-Za-z_].
const pat = new RegExp("(^|\\W)" + escapeRegExp(displayName) + "(\\W|$)", 'i');
return content.body.search(pat) > -1;
};
const eventFulfillsEventMatchCondition = function(cond, ev) {
if (!cond.key) {
return false;
}
const val = valueForDottedKey(cond.key, ev);
if (typeof val !== 'string') {
return false;
}
if (cond.value) {
return cond.value === val;
}
if (typeof cond.pattern !== 'string') {
return false;
}
let regex;
if (cond.key == 'content.body') {
regex = createCachedRegex('(^|\\W)', cond.pattern, '(\\W|$)');
} else {
regex = createCachedRegex('^', cond.pattern, '$');
}
return !!val.match(regex);
};
const createCachedRegex = function(prefix, glob, suffix) {
if (cachedGlobToRegex[glob]) {
return cachedGlobToRegex[glob];
}
cachedGlobToRegex[glob] = new RegExp(
prefix + globToRegexp(glob) + suffix,
'i', // Case insensitive
);
return cachedGlobToRegex[glob];
};
const valueForDottedKey = function(key, ev) {
const parts = key.split('.');
let val;
// special-case the first component to deal with encrypted messages
const firstPart = parts[0];
if (firstPart === 'content') {
val = ev.getContent();
parts.shift();
} else if (firstPart === 'type') {
val = ev.getType();
parts.shift();
} else {
// use the raw event for any other fields
val = ev.event;
}
while (parts.length > 0) {
const thisPart = parts.shift();
if (isNullOrUndefined(val[thisPart])) {
return null;
}
val = val[thisPart];
}
return val;
};
const matchingRuleForEventWithRulesets = function(ev, rulesets) {
if (!rulesets) {
return null;
}
if (ev.getSender() === client.credentials.userId) {
return null;
}
return matchingRuleFromKindSet(ev, rulesets.global);
};
const pushActionsForEventAndRulesets = function(ev, rulesets) {
const rule = matchingRuleForEventWithRulesets(ev, rulesets);
if (!rule) {
return {};
}
const actionObj = PushProcessor.actionListToActionsObject(rule.actions);
// Some actions are implicit in some situations: we add those here
if (actionObj.tweaks.highlight === undefined) {
// if it isn't specified, highlight if it's a content
// rule but otherwise not
actionObj.tweaks.highlight = (rule.kind == 'content');
}
return actionObj;
};
this.ruleMatchesEvent = function(rule, ev) {
let ret = true;
for (let i = 0; i < rule.conditions.length; ++i) {
const cond = rule.conditions[i];
ret &= eventFulfillsCondition(cond, ev);
}
//console.log("Rule "+rule.rule_id+(ret ? " matches" : " doesn't match"));
return ret;
};
/**
* Get the user's push actions for the given event
*
* @param {module:models/event.MatrixEvent} ev
*
* @return {PushAction}
*/
this.actionsForEvent = function(ev) {
return pushActionsForEventAndRulesets(ev, client.pushRules);
};
/**
* Get one of the users push rules by its ID
*
* @param {string} ruleId The ID of the rule to search for
* @return {object} The push rule, or null if no such rule was found
*/
this.getPushRuleById = function(ruleId) {
for (const scope of ['global']) {
if (client.pushRules[scope] === undefined) continue;
for (const kind of RULEKINDS_IN_ORDER) {
if (client.pushRules[scope][kind] === undefined) continue;
for (const rule of client.pushRules[scope][kind]) {
if (rule.rule_id === ruleId) return rule;
}
}
}
return null;
};
}
/**
* Convert a list of actions into a object with the actions as keys and their values
* eg. [ 'notify', { set_tweak: 'sound', value: 'default' } ]
* becomes { notify: true, tweaks: { sound: 'default' } }
* @param {array} actionlist The actions list
*
* @return {object} A object with key 'notify' (true or false) and an object of actions
*/
PushProcessor.actionListToActionsObject = function(actionlist) {
const actionobj = { 'notify': false, 'tweaks': {} };
for (let i = 0; i < actionlist.length; ++i) {
const action = actionlist[i];
if (action === 'notify') {
actionobj.notify = true;
} else if (typeof action === 'object') {
if (action.value === undefined) {
action.value = true;
}
actionobj.tweaks[action.set_tweak] = action.value;
}
}
return actionobj;
};
/**
* Rewrites conditions on a client's push rules to match the defaults
* where applicable. Useful for upgrading push rules to more strict
* conditions when the server is falling behind on defaults.
* @param {object} incomingRules The client's existing push rules
* @returns {object} The rewritten rules
*/
PushProcessor.rewriteDefaultRules = function(incomingRules) {
let newRules = JSON.parse(JSON.stringify(incomingRules)); // deep clone
// These lines are mostly to make the tests happy. We shouldn't run into these
// properties missing in practice.
if (!newRules) newRules = {};
if (!newRules.global) newRules.global = {};
if (!newRules.global.override) newRules.global.override = [];
// Merge the client-level defaults with the ones from the server
const globalOverrides = newRules.global.override;
for (const override of DEFAULT_OVERRIDE_RULES) {
const existingRule = globalOverrides
.find((r) => r.rule_id === override.rule_id);
if (existingRule) {
// Copy over the actions, default, and conditions. Don't touch the user's
// preference.
existingRule.default = override.default;
existingRule.conditions = override.conditions;
existingRule.actions = override.actions;
} else {
// Add the rule
const ruleId = override.rule_id;
logger.warn(`Adding default global override for ${ruleId}`);
globalOverrides.push(override);
}
}
return newRules;
};
/**
* @typedef {Object} PushAction
* @type {Object}
* @property {boolean} notify Whether this event should notify the user or not.
* @property {Object} tweaks How this event should be notified.
* @property {boolean} tweaks.highlight Whether this event should be highlighted
* on the UI.
* @property {boolean} tweaks.sound Whether this notification should produce a
* noise.
*/

561
src/pushprocessor.ts Normal file
View File

@@ -0,0 +1,561 @@
/*
Copyright 2015 - 2021 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import { escapeRegExp, globToRegexp, isNullOrUndefined } from "./utils";
import { logger } from './logger';
import { MatrixClient } from "./client";
import { MatrixEvent } from "./models/event";
/**
* @module pushprocessor
*/
enum RuleKind {
Override = "override",
Content = "content",
Room = "room",
Sender = "sender",
Underride = "underride",
}
const RULEKINDS_IN_ORDER = [RuleKind.Override, RuleKind.Content, RuleKind.Room, RuleKind.Sender, RuleKind.Underride];
enum ConditionKind {
EventMatch = "event_match",
ContainsDisplayName = "contains_display_name",
RoomMemberCount = "room_member_count",
SenderNotificationPermission = "sender_notification_permission",
}
enum Tweak {
Sound = "sound",
Highlight = "highlight",
}
interface ISetTweak {
// eslint-disable-next-line camelcase
set_tweak: Tweak;
value: any;
}
export enum Action {
Notify = "notify",
DontNotify = "dont_notify",
Coalesce = "coalesce",
}
type PushAction = Action | ISetTweak;
interface IEventMatchCondition {
kind: ConditionKind.EventMatch;
key: string;
pattern: string;
value?: string; // Legacy field?
}
interface IContainsDisplayNameCondition {
kind: ConditionKind.ContainsDisplayName;
}
interface IRoomMemberCountCondition {
kind: ConditionKind.RoomMemberCount;
is: string;
}
interface ISenderNotificationPermissionCondition {
kind: ConditionKind.SenderNotificationPermission;
key: string;
}
type PushCondition = IEventMatchCondition
| IContainsDisplayNameCondition
| IRoomMemberCountCondition
| ISenderNotificationPermissionCondition;
interface IPushRule {
// eslint-disable-next-line camelcase
rule_id: string;
default?: boolean;
enabled?: boolean;
conditions: PushCondition[];
actions: PushAction[];
kind?: RuleKind; // this is a locally monkey wrenched field
}
interface IRuleset {
[RuleKind.Content]: IPushRule[];
[RuleKind.Override]: IPushRule[];
[RuleKind.Room]: IPushRule[];
[RuleKind.Sender]: IPushRule[];
[RuleKind.Underride]: IPushRule[];
}
export interface IRulesets {
global: IRuleset;
}
// The default override rules to apply to the push rules that arrive from the server.
// We do this for two reasons:
// 1. Synapse is unlikely to send us the push rule in an incremental sync - see
// https://github.com/matrix-org/synapse/pull/4867#issuecomment-481446072 for
// more details.
// 2. We often want to start using push rules ahead of the server supporting them,
// and so we can put them here.
const DEFAULT_OVERRIDE_RULES: IPushRule[] = [
{
// For homeservers which don't support MSC1930 yet
rule_id: ".m.rule.tombstone",
default: true,
enabled: true,
conditions: [
{
kind: ConditionKind.EventMatch,
key: "type",
pattern: "m.room.tombstone",
},
{
kind: ConditionKind.EventMatch,
key: "state_key",
pattern: "",
},
],
actions: [
Action.Notify,
{
set_tweak: Tweak.Highlight,
value: true,
},
],
},
{
// For homeservers which don't support MSC2153 yet
rule_id: ".m.rule.reaction",
default: true,
enabled: true,
conditions: [
{
kind: ConditionKind.EventMatch,
key: "type",
pattern: "m.reaction",
},
],
actions: [
Action.DontNotify,
],
},
];
export interface IActionsObject {
notify: boolean;
tweaks: Partial<Record<Tweak, any>>;
}
export class PushProcessor {
/**
* Convert a list of actions into a object with the actions as keys and their values
* eg. [ 'notify', { set_tweak: 'sound', value: 'default' } ]
* becomes { notify: true, tweaks: { sound: 'default' } }
* @param {array} actionList The actions list
*
* @return {object} A object with key 'notify' (true or false) and an object of actions
*/
public static actionListToActionsObject(actionList: PushAction[]): IActionsObject {
const actionObj: IActionsObject = { notify: false, tweaks: {} };
for (let i = 0; i < actionList.length; ++i) {
const action = actionList[i];
if (action === Action.Notify) {
actionObj.notify = true;
} else if (typeof action === 'object') {
if (action.value === undefined) {
action.value = true;
}
actionObj.tweaks[action.set_tweak] = action.value;
}
}
return actionObj;
}
/**
* Rewrites conditions on a client's push rules to match the defaults
* where applicable. Useful for upgrading push rules to more strict
* conditions when the server is falling behind on defaults.
* @param {object} incomingRules The client's existing push rules
* @returns {object} The rewritten rules
*/
public static rewriteDefaultRules(incomingRules: IRulesets): IRulesets {
let newRules: IRulesets = JSON.parse(JSON.stringify(incomingRules)); // deep clone
// These lines are mostly to make the tests happy. We shouldn't run into these
// properties missing in practice.
if (!newRules) newRules = {} as IRulesets;
if (!newRules.global) newRules.global = {} as IRuleset;
if (!newRules.global.override) newRules.global.override = [];
// Merge the client-level defaults with the ones from the server
const globalOverrides = newRules.global.override;
for (const override of DEFAULT_OVERRIDE_RULES) {
const existingRule = globalOverrides
.find((r) => r.rule_id === override.rule_id);
if (existingRule) {
// Copy over the actions, default, and conditions. Don't touch the user's
// preference.
existingRule.default = override.default;
existingRule.conditions = override.conditions;
existingRule.actions = override.actions;
} else {
// Add the rule
const ruleId = override.rule_id;
logger.warn(`Adding default global override for ${ruleId}`);
globalOverrides.push(override);
}
}
return newRules;
}
private static cachedGlobToRegex: Record<string, RegExp> = {}; // $glob: RegExp
/**
* Construct a Push Processor.
* @constructor
* @param {Object} client The Matrix client object to use
*/
constructor(private readonly client: MatrixClient) {}
private matchingRuleFromKindSet(ev: MatrixEvent, kindset: IRuleset): IPushRule {
for (let ruleKindIndex = 0; ruleKindIndex < RULEKINDS_IN_ORDER.length; ++ruleKindIndex) {
const kind = RULEKINDS_IN_ORDER[ruleKindIndex];
const ruleset = kindset[kind];
if (!ruleset) {
continue;
}
for (let ruleIndex = 0; ruleIndex < ruleset.length; ++ruleIndex) {
const rule = ruleset[ruleIndex];
if (!rule.enabled) {
continue;
}
const rawrule = this.templateRuleToRaw(kind, rule);
if (!rawrule) {
continue;
}
if (this.ruleMatchesEvent(rawrule, ev)) {
rule.kind = kind;
return rule;
}
}
}
return null;
}
private templateRuleToRaw(kind: RuleKind, tprule: any): any {
const rawrule = {
'rule_id': tprule.rule_id,
'actions': tprule.actions,
'conditions': [],
};
switch (kind) {
case RuleKind.Underride:
case RuleKind.Override:
rawrule.conditions = tprule.conditions;
break;
case RuleKind.Room:
if (!tprule.rule_id) {
return null;
}
rawrule.conditions.push({
'kind': ConditionKind.EventMatch,
'key': 'room_id',
'value': tprule.rule_id,
});
break;
case RuleKind.Sender:
if (!tprule.rule_id) {
return null;
}
rawrule.conditions.push({
'kind': ConditionKind.EventMatch,
'key': 'user_id',
'value': tprule.rule_id,
});
break;
case RuleKind.Content:
if (!tprule.pattern) {
return null;
}
rawrule.conditions.push({
'kind': ConditionKind.EventMatch,
'key': 'content.body',
'pattern': tprule.pattern,
});
break;
}
return rawrule;
}
private eventFulfillsCondition(cond: PushCondition, ev: MatrixEvent): boolean {
switch (cond.kind) {
case ConditionKind.EventMatch:
return this.eventFulfillsEventMatchCondition(cond, ev);
case ConditionKind.ContainsDisplayName:
return this.eventFulfillsDisplayNameCondition(cond, ev);
case ConditionKind.RoomMemberCount:
return this.eventFulfillsRoomMemberCountCondition(cond, ev);
case ConditionKind.SenderNotificationPermission:
return this.eventFulfillsSenderNotifPermCondition(cond, ev);
}
// unknown conditions: we previously matched all unknown conditions,
// but given that rules can be added to the base rules on a server,
// it's probably better to not match unknown conditions.
return false;
}
private eventFulfillsSenderNotifPermCondition(
cond: ISenderNotificationPermissionCondition,
ev: MatrixEvent,
): boolean {
const notifLevelKey = cond['key'];
if (!notifLevelKey) {
return false;
}
const room = this.client.getRoom(ev.getRoomId());
if (!room?.currentState) {
return false;
}
// Note that this should not be the current state of the room but the state at
// the point the event is in the DAG. Unfortunately the js-sdk does not store
// this.
return room.currentState.mayTriggerNotifOfType(notifLevelKey, ev.getSender());
}
private eventFulfillsRoomMemberCountCondition(cond: IRoomMemberCountCondition, ev: MatrixEvent): boolean {
if (!cond.is) {
return false;
}
const room = this.client.getRoom(ev.getRoomId());
if (!room || !room.currentState || !room.currentState.members) {
return false;
}
const memberCount = room.currentState.getJoinedMemberCount();
const m = cond.is.match(/^([=<>]*)([0-9]*)$/);
if (!m) {
return false;
}
const ineq = m[1];
const rhs = parseInt(m[2]);
if (isNaN(rhs)) {
return false;
}
switch (ineq) {
case '':
case '==':
return memberCount == rhs;
case '<':
return memberCount < rhs;
case '>':
return memberCount > rhs;
case '<=':
return memberCount <= rhs;
case '>=':
return memberCount >= rhs;
default:
return false;
}
}
private eventFulfillsDisplayNameCondition(cond: IContainsDisplayNameCondition, ev: MatrixEvent): boolean {
let content = ev.getContent();
if (ev.isEncrypted() && ev.getClearContent()) {
content = ev.getClearContent();
}
if (!content || !content.body || typeof content.body != 'string') {
return false;
}
const room = this.client.getRoom(ev.getRoomId());
if (!room || !room.currentState || !room.currentState.members ||
!room.currentState.getMember(this.client.credentials.userId)) {
return false;
}
const displayName = room.currentState.getMember(this.client.credentials.userId).name;
// N.B. we can't use \b as it chokes on unicode. however \W seems to be okay
// as shorthand for [^0-9A-Za-z_].
const pat = new RegExp("(^|\\W)" + escapeRegExp(displayName) + "(\\W|$)", 'i');
return content.body.search(pat) > -1;
}
private eventFulfillsEventMatchCondition(cond: IEventMatchCondition, ev: MatrixEvent): boolean {
if (!cond.key) {
return false;
}
const val = this.valueForDottedKey(cond.key, ev);
if (typeof val !== 'string') {
return false;
}
if (cond.value) {
return cond.value === val;
}
if (typeof cond.pattern !== 'string') {
return false;
}
let regex;
if (cond.key == 'content.body') {
regex = this.createCachedRegex('(^|\\W)', cond.pattern, '(\\W|$)');
} else {
regex = this.createCachedRegex('^', cond.pattern, '$');
}
return !!val.match(regex);
}
private createCachedRegex(prefix: string, glob: string, suffix: string): RegExp {
if (PushProcessor.cachedGlobToRegex[glob]) {
return PushProcessor.cachedGlobToRegex[glob];
}
PushProcessor.cachedGlobToRegex[glob] = new RegExp(
prefix + globToRegexp(glob) + suffix,
'i', // Case insensitive
);
return PushProcessor.cachedGlobToRegex[glob];
}
private valueForDottedKey(key: string, ev: MatrixEvent): any {
const parts = key.split('.');
let val;
// special-case the first component to deal with encrypted messages
const firstPart = parts[0];
if (firstPart === 'content') {
val = ev.getContent();
parts.shift();
} else if (firstPart === 'type') {
val = ev.getType();
parts.shift();
} else {
// use the raw event for any other fields
val = ev.event;
}
while (parts.length > 0) {
const thisPart = parts.shift();
if (isNullOrUndefined(val[thisPart])) {
return null;
}
val = val[thisPart];
}
return val;
}
private matchingRuleForEventWithRulesets(ev: MatrixEvent, rulesets): IPushRule {
if (!rulesets) {
return null;
}
if (ev.getSender() === this.client.credentials.userId) {
return null;
}
return this.matchingRuleFromKindSet(ev, rulesets.global);
}
private pushActionsForEventAndRulesets(ev: MatrixEvent, rulesets): IActionsObject {
const rule = this.matchingRuleForEventWithRulesets(ev, rulesets);
if (!rule) {
return {} as IActionsObject;
}
const actionObj = PushProcessor.actionListToActionsObject(rule.actions);
// Some actions are implicit in some situations: we add those here
if (actionObj.tweaks.highlight === undefined) {
// if it isn't specified, highlight if it's a content
// rule but otherwise not
actionObj.tweaks.highlight = (rule.kind == 'content');
}
return actionObj;
}
public ruleMatchesEvent(rule: IPushRule, ev: MatrixEvent): boolean {
let ret = true;
for (let i = 0; i < rule.conditions.length; ++i) {
const cond = rule.conditions[i];
// @ts-ignore
ret &= this.eventFulfillsCondition(cond, ev);
}
//console.log("Rule "+rule.rule_id+(ret ? " matches" : " doesn't match"));
return ret;
}
/**
* Get the user's push actions for the given event
*
* @param {module:models/event.MatrixEvent} ev
*
* @return {PushAction}
*/
public actionsForEvent(ev: MatrixEvent): IActionsObject {
return this.pushActionsForEventAndRulesets(ev, this.client.pushRules);
}
/**
* Get one of the users push rules by its ID
*
* @param {string} ruleId The ID of the rule to search for
* @return {object} The push rule, or null if no such rule was found
*/
public getPushRuleById(ruleId: string): IPushRule {
for (const scope of ['global']) {
if (this.client.pushRules[scope] === undefined) continue;
for (const kind of RULEKINDS_IN_ORDER) {
if (this.client.pushRules[scope][kind] === undefined) continue;
for (const rule of this.client.pushRules[scope][kind]) {
if (rule.rule_id === ruleId) return rule;
}
}
}
return null;
}
}
/**
* @typedef {Object} PushAction
* @type {Object}
* @property {boolean} notify Whether this event should notify the user or not.
* @property {Object} tweaks How this event should be notified.
* @property {boolean} tweaks.highlight Whether this event should be highlighted
* on the UI.
* @property {boolean} tweaks.sound Whether this notification should produce a
* noise.
*/

View File

@@ -1,327 +0,0 @@
/*
Copyright 2015, 2016 OpenMarket Ltd
Copyright 2019 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/**
* This is an internal module which manages queuing, scheduling and retrying
* of requests.
* @module scheduler
*/
import * as utils from "./utils";
import { logger } from './logger';
const DEBUG = false; // set true to enable console logging.
/**
* Construct a scheduler for Matrix. Requires
* {@link module:scheduler~MatrixScheduler#setProcessFunction} to be provided
* with a way of processing events.
* @constructor
* @param {module:scheduler~retryAlgorithm} retryAlgorithm Optional. The retry
* algorithm to apply when determining when to try to send an event again.
* Defaults to {@link module:scheduler~MatrixScheduler.RETRY_BACKOFF_RATELIMIT}.
* @param {module:scheduler~queueAlgorithm} queueAlgorithm Optional. The queuing
* algorithm to apply when determining which events should be sent before the
* given event. Defaults to {@link module:scheduler~MatrixScheduler.QUEUE_MESSAGES}.
*/
export function MatrixScheduler(retryAlgorithm, queueAlgorithm) {
this.retryAlgorithm = retryAlgorithm || MatrixScheduler.RETRY_BACKOFF_RATELIMIT;
this.queueAlgorithm = queueAlgorithm || MatrixScheduler.QUEUE_MESSAGES;
this._queues = {
// queueName: [{
// event: MatrixEvent, // event to send
// defer: Deferred, // defer to resolve/reject at the END of the retries
// attempts: Number // number of times we've called processFn
// }, ...]
};
this._activeQueues = [];
this._procFn = null;
}
/**
* Retrieve a queue based on an event. The event provided does not need to be in
* the queue.
* @param {MatrixEvent} event An event to get the queue for.
* @return {?Array<MatrixEvent>} A shallow copy of events in the queue or null.
* Modifying this array will not modify the list itself. Modifying events in
* this array <i>will</i> modify the underlying event in the queue.
* @see MatrixScheduler.removeEventFromQueue To remove an event from the queue.
*/
MatrixScheduler.prototype.getQueueForEvent = function(event) {
const name = this.queueAlgorithm(event);
if (!name || !this._queues[name]) {
return null;
}
return this._queues[name].map(function(obj) {
return obj.event;
});
};
/**
* Remove this event from the queue. The event is equal to another event if they
* have the same ID returned from event.getId().
* @param {MatrixEvent} event The event to remove.
* @return {boolean} True if this event was removed.
*/
MatrixScheduler.prototype.removeEventFromQueue = function(event) {
const name = this.queueAlgorithm(event);
if (!name || !this._queues[name]) {
return false;
}
let removed = false;
utils.removeElement(this._queues[name], function(element) {
if (element.event.getId() === event.getId()) {
// XXX we should probably reject the promise?
// https://github.com/matrix-org/matrix-js-sdk/issues/496
removed = true;
return true;
}
});
return removed;
};
/**
* Set the process function. Required for events in the queue to be processed.
* If set after events have been added to the queue, this will immediately start
* processing them.
* @param {module:scheduler~processFn} fn The function that can process events
* in the queue.
*/
MatrixScheduler.prototype.setProcessFunction = function(fn) {
this._procFn = fn;
_startProcessingQueues(this);
};
/**
* Queue an event if it is required and start processing queues.
* @param {MatrixEvent} event The event that may be queued.
* @return {?Promise} A promise if the event was queued, which will be
* resolved or rejected in due time, else null.
*/
MatrixScheduler.prototype.queueEvent = function(event) {
const queueName = this.queueAlgorithm(event);
if (!queueName) {
return null;
}
// add the event to the queue and make a deferred for it.
if (!this._queues[queueName]) {
this._queues[queueName] = [];
}
const defer = utils.defer();
this._queues[queueName].push({
event: event,
defer: defer,
attempts: 0,
});
debuglog(
"Queue algorithm dumped event %s into queue '%s'",
event.getId(), queueName,
);
_startProcessingQueues(this);
return defer.promise;
};
/**
* Retries events up to 4 times using exponential backoff. This produces wait
* times of 2, 4, 8, and 16 seconds (30s total) after which we give up. If the
* failure was due to a rate limited request, the time specified in the error is
* waited before being retried.
* @param {MatrixEvent} event
* @param {Number} attempts
* @param {MatrixError} err
* @return {Number}
* @see module:scheduler~retryAlgorithm
*/
MatrixScheduler.RETRY_BACKOFF_RATELIMIT = function(event, attempts, err) {
if (err.httpStatus === 400 || err.httpStatus === 403 || err.httpStatus === 401) {
// client error; no amount of retrying with save you now.
return -1;
}
// we ship with browser-request which returns { cors: rejected } when trying
// with no connection, so if we match that, give up since they have no conn.
if (err.cors === "rejected") {
return -1;
}
// if event that we are trying to send is too large in any way then retrying won't help
if (err.name === "M_TOO_LARGE") {
return -1;
}
if (err.name === "M_LIMIT_EXCEEDED") {
const waitTime = err.data.retry_after_ms;
if (waitTime > 0) {
return waitTime;
}
}
if (attempts > 4) {
return -1; // give up
}
return (1000 * Math.pow(2, attempts));
};
/**
* Queues <code>m.room.message</code> events and lets other events continue
* concurrently.
* @param {MatrixEvent} event
* @return {string}
* @see module:scheduler~queueAlgorithm
*/
MatrixScheduler.QUEUE_MESSAGES = function(event) {
// enqueue messages or events that associate with another event (redactions and relations)
if (event.getType() === "m.room.message" || event.hasAssocation()) {
// put these events in the 'message' queue.
return "message";
}
// allow all other events continue concurrently.
return null;
};
function _startProcessingQueues(scheduler) {
if (!scheduler._procFn) {
return;
}
// for each inactive queue with events in them
Object.keys(scheduler._queues)
.filter(function(queueName) {
return scheduler._activeQueues.indexOf(queueName) === -1 &&
scheduler._queues[queueName].length > 0;
})
.forEach(function(queueName) {
// mark the queue as active
scheduler._activeQueues.push(queueName);
// begin processing the head of the queue
debuglog("Spinning up queue: '%s'", queueName);
_processQueue(scheduler, queueName);
});
}
function _processQueue(scheduler, queueName) {
// get head of queue
const obj = _peekNextEvent(scheduler, queueName);
if (!obj) {
// queue is empty. Mark as inactive and stop recursing.
const index = scheduler._activeQueues.indexOf(queueName);
if (index >= 0) {
scheduler._activeQueues.splice(index, 1);
}
debuglog("Stopping queue '%s' as it is now empty", queueName);
return;
}
debuglog(
"Queue '%s' has %s pending events",
queueName, scheduler._queues[queueName].length,
);
// fire the process function and if it resolves, resolve the deferred. Else
// invoke the retry algorithm.
// First wait for a resolved promise, so the resolve handlers for
// the deferred of the previously sent event can run.
// This way enqueued relations/redactions to enqueued events can receive
// the remove id of their target before being sent.
Promise.resolve().then(() => {
return scheduler._procFn(obj.event);
}).then(function(res) {
// remove this from the queue
_removeNextEvent(scheduler, queueName);
debuglog("Queue '%s' sent event %s", queueName, obj.event.getId());
obj.defer.resolve(res);
// keep processing
_processQueue(scheduler, queueName);
}, function(err) {
obj.attempts += 1;
// ask the retry algorithm when/if we should try again
const waitTimeMs = scheduler.retryAlgorithm(obj.event, obj.attempts, err);
debuglog(
"retry(%s) err=%s event_id=%s waitTime=%s",
obj.attempts, err, obj.event.getId(), waitTimeMs,
);
if (waitTimeMs === -1) { // give up (you quitter!)
debuglog(
"Queue '%s' giving up on event %s", queueName, obj.event.getId(),
);
// remove this from the queue
_removeNextEvent(scheduler, queueName);
obj.defer.reject(err);
// process next event
_processQueue(scheduler, queueName);
} else {
setTimeout(function() {
_processQueue(scheduler, queueName);
}, waitTimeMs);
}
});
}
function _peekNextEvent(scheduler, queueName) {
const queue = scheduler._queues[queueName];
if (!Array.isArray(queue)) {
return null;
}
return queue[0];
}
function _removeNextEvent(scheduler, queueName) {
const queue = scheduler._queues[queueName];
if (!Array.isArray(queue)) {
return null;
}
return queue.shift();
}
function debuglog() {
if (DEBUG) {
logger.log(...arguments);
}
}
/**
* The retry algorithm to apply when retrying events. To stop retrying, return
* <code>-1</code>. If this event was part of a queue, it will be removed from
* the queue.
* @callback retryAlgorithm
* @param {MatrixEvent} event The event being retried.
* @param {Number} attempts The number of failed attempts. This will always be
* >= 1.
* @param {MatrixError} err The most recent error message received when trying
* to send this event.
* @return {Number} The number of milliseconds to wait before trying again. If
* this is 0, the request will be immediately retried. If this is
* <code>-1</code>, the event will be marked as
* {@link module:models/event.EventStatus.NOT_SENT} and will not be retried.
*/
/**
* The queuing algorithm to apply to events. This function must be idempotent as
* it may be called multiple times with the same event. All queues created are
* serviced in a FIFO manner. To send the event ASAP, return <code>null</code>
* which will not put this event in a queue. Events that fail to send that form
* part of a queue will be removed from the queue and the next event in the
* queue will be sent.
* @callback queueAlgorithm
* @param {MatrixEvent} event The event to be sent.
* @return {string} The name of the queue to put the event into. If a queue with
* this name does not exist, it will be created. If this is <code>null</code>,
* the event is not put into a queue and will be sent concurrently.
*/
/**
* The function to invoke to process (send) events in the queue.
* @callback processFn
* @param {MatrixEvent} event The event to send.
* @return {Promise} Resolved/rejected depending on the outcome of the request.
*/

327
src/scheduler.ts Normal file
View File

@@ -0,0 +1,327 @@
/*
Copyright 2015 - 2021 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/**
* This is an internal module which manages queuing, scheduling and retrying
* of requests.
* @module scheduler
*/
import * as utils from "./utils";
import { logger } from './logger';
import { MatrixEvent } from "./models/event";
import { EventType } from "./@types/event";
import { IDeferred } from "./utils";
import { MatrixError } from "./http-api";
import { ISendEventResponse } from "./@types/requests";
const DEBUG = false; // set true to enable console logging.
interface IQueueEntry<T> {
event: MatrixEvent;
defer: IDeferred<T>;
attempts: number;
}
type ProcessFunction<T> = (event: MatrixEvent) => Promise<T>;
/**
* Construct a scheduler for Matrix. Requires
* {@link module:scheduler~MatrixScheduler#setProcessFunction} to be provided
* with a way of processing events.
* @constructor
* @param {module:scheduler~retryAlgorithm} retryAlgorithm Optional. The retry
* algorithm to apply when determining when to try to send an event again.
* Defaults to {@link module:scheduler~MatrixScheduler.RETRY_BACKOFF_RATELIMIT}.
* @param {module:scheduler~queueAlgorithm} queueAlgorithm Optional. The queuing
* algorithm to apply when determining which events should be sent before the
* given event. Defaults to {@link module:scheduler~MatrixScheduler.QUEUE_MESSAGES}.
*/
// eslint-disable-next-line camelcase
export class MatrixScheduler<T = ISendEventResponse> {
/**
* Retries events up to 4 times using exponential backoff. This produces wait
* times of 2, 4, 8, and 16 seconds (30s total) after which we give up. If the
* failure was due to a rate limited request, the time specified in the error is
* waited before being retried.
* @param {MatrixEvent} event
* @param {Number} attempts
* @param {MatrixError} err
* @return {Number}
* @see module:scheduler~retryAlgorithm
*/
public static RETRY_BACKOFF_RATELIMIT(event: MatrixEvent, attempts: number, err: MatrixError): number {
if (err.httpStatus === 400 || err.httpStatus === 403 || err.httpStatus === 401) {
// client error; no amount of retrying with save you now.
return -1;
}
// we ship with browser-request which returns { cors: rejected } when trying
// with no connection, so if we match that, give up since they have no conn.
if (err.cors === "rejected") {
return -1;
}
// if event that we are trying to send is too large in any way then retrying won't help
if (err.name === "M_TOO_LARGE") {
return -1;
}
if (err.name === "M_LIMIT_EXCEEDED") {
const waitTime = err.data.retry_after_ms;
if (waitTime > 0) {
return waitTime;
}
}
if (attempts > 4) {
return -1; // give up
}
return (1000 * Math.pow(2, attempts));
}
/**
* Queues <code>m.room.message</code> events and lets other events continue
* concurrently.
* @param {MatrixEvent} event
* @return {string}
* @see module:scheduler~queueAlgorithm
*/
public static QUEUE_MESSAGES(event: MatrixEvent) {
// enqueue messages or events that associate with another event (redactions and relations)
if (event.getType() === EventType.RoomMessage || event.hasAssocation()) {
// put these events in the 'message' queue.
return "message";
}
// allow all other events continue concurrently.
return null;
}
// queueName: [{
// event: MatrixEvent, // event to send
// defer: Deferred, // defer to resolve/reject at the END of the retries
// attempts: Number // number of times we've called processFn
// }, ...]
private readonly queues: Record<string, IQueueEntry<T>[]> = {};
private activeQueues: string[] = [];
private procFn: ProcessFunction<T> = null;
constructor(
public readonly retryAlgorithm = MatrixScheduler.RETRY_BACKOFF_RATELIMIT,
public readonly queueAlgorithm = MatrixScheduler.QUEUE_MESSAGES,
) {}
/**
* Retrieve a queue based on an event. The event provided does not need to be in
* the queue.
* @param {MatrixEvent} event An event to get the queue for.
* @return {?Array<MatrixEvent>} A shallow copy of events in the queue or null.
* Modifying this array will not modify the list itself. Modifying events in
* this array <i>will</i> modify the underlying event in the queue.
* @see MatrixScheduler.removeEventFromQueue To remove an event from the queue.
*/
public getQueueForEvent(event: MatrixEvent): MatrixEvent[] {
const name = this.queueAlgorithm(event);
if (!name || !this.queues[name]) {
return null;
}
return this.queues[name].map(function(obj) {
return obj.event;
});
}
/**
* Remove this event from the queue. The event is equal to another event if they
* have the same ID returned from event.getId().
* @param {MatrixEvent} event The event to remove.
* @return {boolean} True if this event was removed.
*/
public removeEventFromQueue(event: MatrixEvent): boolean {
const name = this.queueAlgorithm(event);
if (!name || !this.queues[name]) {
return false;
}
let removed = false;
utils.removeElement(this.queues[name], (element) => {
if (element.event.getId() === event.getId()) {
// XXX we should probably reject the promise?
// https://github.com/matrix-org/matrix-js-sdk/issues/496
removed = true;
return true;
}
});
return removed;
}
/**
* Set the process function. Required for events in the queue to be processed.
* If set after events have been added to the queue, this will immediately start
* processing them.
* @param {module:scheduler~processFn} fn The function that can process events
* in the queue.
*/
public setProcessFunction(fn: ProcessFunction<T>): void {
this.procFn = fn;
this.startProcessingQueues();
}
/**
* Queue an event if it is required and start processing queues.
* @param {MatrixEvent} event The event that may be queued.
* @return {?Promise} A promise if the event was queued, which will be
* resolved or rejected in due time, else null.
*/
public queueEvent(event: MatrixEvent): Promise<T> | null {
const queueName = this.queueAlgorithm(event);
if (!queueName) {
return null;
}
// add the event to the queue and make a deferred for it.
if (!this.queues[queueName]) {
this.queues[queueName] = [];
}
const defer = utils.defer<T>();
this.queues[queueName].push({
event: event,
defer: defer,
attempts: 0,
});
debuglog("Queue algorithm dumped event %s into queue '%s'", event.getId(), queueName);
this.startProcessingQueues();
return defer.promise;
}
private startProcessingQueues(): void {
if (!this.procFn) return;
// for each inactive queue with events in them
Object.keys(this.queues)
.filter((queueName) => {
return this.activeQueues.indexOf(queueName) === -1 &&
this.queues[queueName].length > 0;
})
.forEach((queueName) => {
// mark the queue as active
this.activeQueues.push(queueName);
// begin processing the head of the queue
debuglog("Spinning up queue: '%s'", queueName);
this.processQueue(queueName);
});
}
private processQueue = (queueName: string): void => {
// get head of queue
const obj = this.peekNextEvent(queueName);
if (!obj) {
// queue is empty. Mark as inactive and stop recursing.
const index = this.activeQueues.indexOf(queueName);
if (index >= 0) {
this.activeQueues.splice(index, 1);
}
debuglog("Stopping queue '%s' as it is now empty", queueName);
return;
}
debuglog("Queue '%s' has %s pending events", queueName, this.queues[queueName].length);
// fire the process function and if it resolves, resolve the deferred. Else
// invoke the retry algorithm.
// First wait for a resolved promise, so the resolve handlers for
// the deferred of the previously sent event can run.
// This way enqueued relations/redactions to enqueued events can receive
// the remove id of their target before being sent.
Promise.resolve().then(() => {
return this.procFn(obj.event);
}).then((res) => {
// remove this from the queue
this.removeNextEvent(queueName);
debuglog("Queue '%s' sent event %s", queueName, obj.event.getId());
obj.defer.resolve(res);
// keep processing
this.processQueue(queueName);
}, (err) => {
obj.attempts += 1;
// ask the retry algorithm when/if we should try again
const waitTimeMs = this.retryAlgorithm(obj.event, obj.attempts, err);
debuglog("retry(%s) err=%s event_id=%s waitTime=%s", obj.attempts, err, obj.event.getId(), waitTimeMs);
if (waitTimeMs === -1) { // give up (you quitter!)
debuglog("Queue '%s' giving up on event %s", queueName, obj.event.getId());
// remove this from the queue
this.removeNextEvent(queueName);
obj.defer.reject(err);
// process next event
this.processQueue(queueName);
} else {
setTimeout(this.processQueue, waitTimeMs, queueName);
}
});
};
private peekNextEvent(queueName: string): IQueueEntry<T> {
const queue = this.queues[queueName];
if (!Array.isArray(queue)) {
return null;
}
return queue[0];
}
private removeNextEvent(queueName: string): IQueueEntry<T> {
const queue = this.queues[queueName];
if (!Array.isArray(queue)) {
return null;
}
return queue.shift();
}
}
function debuglog(...args) {
if (DEBUG) {
logger.log(...args);
}
}
/**
* The retry algorithm to apply when retrying events. To stop retrying, return
* <code>-1</code>. If this event was part of a queue, it will be removed from
* the queue.
* @callback retryAlgorithm
* @param {MatrixEvent} event The event being retried.
* @param {Number} attempts The number of failed attempts. This will always be
* >= 1.
* @param {MatrixError} err The most recent error message received when trying
* to send this event.
* @return {Number} The number of milliseconds to wait before trying again. If
* this is 0, the request will be immediately retried. If this is
* <code>-1</code>, the event will be marked as
* {@link module:models/event.EventStatus.NOT_SENT} and will not be retried.
*/
/**
* The queuing algorithm to apply to events. This function must be idempotent as
* it may be called multiple times with the same event. All queues created are
* serviced in a FIFO manner. To send the event ASAP, return <code>null</code>
* which will not put this event in a queue. Events that fail to send that form
* part of a queue will be removed from the queue and the next event in the
* queue will be sent.
* @callback queueAlgorithm
* @param {MatrixEvent} event The event to be sent.
* @return {string} The name of the queue to put the event into. If a queue with
* this name does not exist, it will be created. If this is <code>null</code>,
* the event is not put into a queue and will be sent concurrently.
*/
/**
* The function to invoke to process (send) events in the queue.
* @callback processFn
* @param {MatrixEvent} event The event to send.
* @return {Promise} Resolved/rejected depending on the outcome of the request.
*/

View File

@@ -30,7 +30,7 @@ import * as utils from "./utils";
import { IDeferred } from "./utils"; import { IDeferred } from "./utils";
import { Filter } from "./filter"; import { Filter } from "./filter";
import { EventTimeline } from "./models/event-timeline"; import { EventTimeline } from "./models/event-timeline";
import { PushProcessor } from "./pushprocessor"; import { IRulesets, PushProcessor } from "./pushprocessor";
import { logger } from './logger'; import { logger } from './logger';
import { InvalidStoreError } from './errors'; import { InvalidStoreError } from './errors';
import { IStoredClientOpts, MatrixClient, PendingEventOrdering } from "./client"; import { IStoredClientOpts, MatrixClient, PendingEventOrdering } from "./client";
@@ -52,6 +52,7 @@ import {
import { MatrixEvent } from "./models/event"; import { MatrixEvent } from "./models/event";
import { MatrixError } from "./http-api"; import { MatrixError } from "./http-api";
import { ISavedSync } from "./store"; import { ISavedSync } from "./store";
import { EventType } from "./@types/event";
const DEBUG = true; const DEBUG = true;
@@ -1066,9 +1067,9 @@ export class SyncApi {
// honour push rules that were previously cached. Base rules // honour push rules that were previously cached. Base rules
// will be updated when we receive push rules via getPushRules // will be updated when we receive push rules via getPushRules
// (see sync) before syncing over the network. // (see sync) before syncing over the network.
if (accountDataEvent.getType() === 'm.push_rules') { if (accountDataEvent.getType() === EventType.PushRules) {
const rules = accountDataEvent.getContent(); const rules = accountDataEvent.getContent();
client.pushRules = PushProcessor.rewriteDefaultRules(rules); client.pushRules = PushProcessor.rewriteDefaultRules(rules as IRulesets);
} }
const prevEvent = prevEventsMap[accountDataEvent.getId()]; const prevEvent = prevEventsMap[accountDataEvent.getId()];
client.emit("accountData", accountDataEvent, prevEvent); client.emit("accountData", accountDataEvent, prevEvent);

View File

@@ -398,7 +398,7 @@ export function escapeRegExp(string: string): string {
return string.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); return string.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
} }
export function globToRegexp(glob: string, extended: any): string { export function globToRegexp(glob: string, extended?: any): string {
extended = typeof(extended) === 'boolean' ? extended : true; extended = typeof(extended) === 'boolean' ? extended : true;
// From // From
// https://github.com/matrix-org/synapse/blob/abbee6b29be80a77e05730707602f3bbfc3f38cb/synapse/push/__init__.py#L132 // https://github.com/matrix-org/synapse/blob/abbee6b29be80a77e05730707602f3bbfc3f38cb/synapse/push/__init__.py#L132
@@ -443,7 +443,7 @@ export interface IDeferred<T> {
} }
// Returns a Deferred // Returns a Deferred
export function defer<T>(): IDeferred<T> { export function defer<T = void>(): IDeferred<T> {
let resolve; let resolve;
let reject; let reject;