diff --git a/src/client.js b/src/client.js index dcf327b0c..d4f149870 100644 --- a/src/client.js +++ b/src/client.js @@ -33,6 +33,7 @@ import {EventTimeline} from "./models/event-timeline"; import {SearchResult} from "./models/search-result"; import {StubStore} from "./store/stub"; import {createNewMatrixCall} from "./webrtc/call"; +import {CallEventHandler} from './webrtc/callEventHandler'; import * as utils from './utils'; import {sleep} from './utils'; import { @@ -336,17 +337,15 @@ export function MatrixClient(opts) { } this.clientRunning = false; - this.callList = { - // callId: MatrixCall - }; - // try constructing a MatrixCall to see if we are running in an environment // which has WebRTC. If we are, listen for and handle m.call.* events. const call = createNewMatrixCall(this); this._supportsVoip = false; if (call) { - setupCallEventHandler(this); + this._callEventHandler = new CallEventHandler(this); this._supportsVoip = true; + } else { + this._callEventHandler = null; } this._syncingRetry = null; this._syncApi = null; @@ -5115,6 +5114,11 @@ MatrixClient.prototype.stopClient = function() { if (this._peekSync) { this._peekSync.stopPeeking(); } + if (this._callEventHandler) { + this._callEventHandler.stop(); + this._callEventHandler = null; + } + global.clearTimeout(this._checkTurnServersTimeoutID); if (this._clientWellKnownIntervalID !== undefined) { global.clearInterval(this._clientWellKnownIntervalID); @@ -5324,231 +5328,6 @@ async function(roomId, eventId, relationType, eventType, opts = {}) { }; }; -function setupCallEventHandler(client) { - const candidatesByCall = { - // callId: [Candidate] - }; - - // The sync code always emits one event at a time, so it will patiently - // wait for us to finish processing a call invite before delivering the - // next event, even if that next event is a hangup. We therefore accumulate - // all our call events and then process them on the 'sync' event, ie. - // each time a sync has completed. This way, we can avoid emitting incoming - // call events if we get both the invite and answer/hangup in the same sync. - // This happens quite often, eg. replaying sync from storage, catchup sync - // after loading and after we've been offline for a bit. - let callEventBuffer = []; - function evaluateEventBuffer() { - if (client.getSyncState() === "SYNCING") { - // don't process any events until they are all decrypted - if (callEventBuffer.some((e) => e.isBeingDecrypted())) return; - - const ignoreCallIds = {}; // Set - // inspect the buffer and mark all calls which have been answered - // or hung up before passing them to the call event handler. - for (let i = callEventBuffer.length - 1; i >= 0; i--) { - const ev = callEventBuffer[i]; - if (ev.getType() === "m.call.answer" || - ev.getType() === "m.call.hangup") { - ignoreCallIds[ev.getContent().call_id] = "yep"; - } - } - // now loop through the buffer chronologically and inject them - callEventBuffer.forEach(function(e) { - if ( - e.getType() === "m.call.invite" && - ignoreCallIds[e.getContent().call_id] - ) { - // This call has previously been answered or hung up: ignore it - return; - } - try { - callEventHandler(e); - } catch (e) { - logger.error("Caught exception handling call event", e); - } - }); - callEventBuffer = []; - } - } - client.on("sync", evaluateEventBuffer); - - function onEvent(event) { - // any call events or ones that might be once they're decrypted - if (event.getType().indexOf("m.call.") === 0 || event.isBeingDecrypted()) { - // queue up for processing once all events from this sync have been - // processed (see above). - callEventBuffer.push(event); - } - - if (event.isBeingDecrypted() || event.isDecryptionFailure()) { - // add an event listener for once the event is decrypted. - event.once("Event.decrypted", () => { - if (event.getType().indexOf("m.call.") === -1) return; - - if (callEventBuffer.includes(event)) { - // we were waiting for that event to decrypt, so recheck the buffer - evaluateEventBuffer(); - } else { - // This one wasn't buffered so just run the event handler for it - // straight away - try { - callEventHandler(event); - } catch (e) { - logger.error("Caught exception handling call event", e); - } - } - }); - } - } - client.on("event", onEvent); - - function callEventHandler(event) { - const content = event.getContent(); - let call = content.call_id ? client.callList[content.call_id] : undefined; - let i; - //console.info("RECV %s content=%s", event.getType(), JSON.stringify(content)); - - if (event.getType() === "m.call.invite") { - if (event.getSender() === client.credentials.userId) { - return; // ignore invites you send - } - - // XXX: age is always wrong for events from a stored sync so this doesn't - // really work. getLocalAge works by comparing the event's timestamp to the - // local system clock so is probably worse (ie. if your clock was over a minute - // fast, you wouldn't be able to receive any calls at all). - if (event.getAge() > content.lifetime) { - return; // expired call - } - - if (call && call.state === "ended") { - return; // stale/old invite event - } - if (call) { - logger.log( - "WARN: Already have a MatrixCall with id %s but got an " + - "invite. Clobbering.", - content.call_id, - ); - } - - call = createNewMatrixCall(client, event.getRoomId(), { - forceTURN: client._forceTURN, - }); - if (!call) { - logger.log( - "Incoming call ID " + content.call_id + " but this client " + - "doesn't support WebRTC", - ); - // don't hang up the call: there could be other clients - // connected that do support WebRTC and declining the - // the call on their behalf would be really annoying. - return; - } - - call.callId = content.call_id; - call.initWithInvite(event); - client.callList[call.callId] = call; - - // if we stashed candidate events for that call ID, play them back now - if (candidatesByCall[call.callId]) { - for (i = 0; i < candidatesByCall[call.callId].length; i++) { - call.gotRemoteIceCandidate( - candidatesByCall[call.callId][i], - ); - } - } - - // Were we trying to call that user (room)? - let existingCall; - const existingCalls = utils.values(client.callList); - for (i = 0; i < existingCalls.length; ++i) { - const thisCall = existingCalls[i]; - if (call.roomId === thisCall.roomId && - thisCall.direction === 'outbound' && - (["wait_local_media", "create_offer", "invite_sent"].indexOf( - thisCall.state) !== -1)) { - existingCall = thisCall; - break; - } - } - - if (existingCall) { - // If we've only got to wait_local_media or create_offer and - // we've got an invite, pick the incoming call because we know - // we haven't sent our invite yet otherwise, pick whichever - // call has the lowest call ID (by string comparison) - if (existingCall.state === 'wait_local_media' || - existingCall.state === 'create_offer' || - existingCall.callId > call.callId) { - logger.log( - "Glare detected: answering incoming call " + call.callId + - " and canceling outgoing call " + existingCall.callId, - ); - existingCall.replacedBy(call); - call.answer(); - } else { - logger.log( - "Glare detected: rejecting incoming call " + call.callId + - " and keeping outgoing call " + existingCall.callId, - ); - call.hangup(); - } - } else { - client.emit("Call.incoming", call); - } - } else if (event.getType() === 'm.call.answer') { - if (!call) { - return; - } - if (event.getSender() === client.credentials.userId) { - if (call.state === 'ringing') { - call.onAnsweredElsewhere(content); - } - } else { - call.receivedAnswer(content); - } - } else if (event.getType() === 'm.call.candidates') { - if (event.getSender() === client.credentials.userId) { - return; - } - if (!call) { - // store the candidates; we may get a call eventually. - if (!candidatesByCall[content.call_id]) { - candidatesByCall[content.call_id] = []; - } - candidatesByCall[content.call_id] = candidatesByCall[ - content.call_id - ].concat(content.candidates); - } else { - for (i = 0; i < content.candidates.length; i++) { - call.gotRemoteIceCandidate(content.candidates[i]); - } - } - } else if (event.getType() === 'm.call.hangup') { - // Note that we also observe our own hangups here so we can see - // if we've already rejected a call that would otherwise be valid - if (!call) { - // if not live, store the fact that the call has ended because - // we're probably getting events backwards so - // the hangup will come before the invite - call = createNewMatrixCall(client, event.getRoomId()); - if (call) { - call.callId = content.call_id; - call.initWithHangup(event); - client.callList[content.call_id] = call; - } - } else { - if (call.state !== 'ended') { - call.onHangupReceived(content); - delete client.callList[content.call_id]; - } - } - } - } -} - function checkTurnServers(client) { if (!client._supportsVoip) { return; diff --git a/src/webrtc/call.ts b/src/webrtc/call.ts index 2d468e684..25c2b3fb8 100644 --- a/src/webrtc/call.ts +++ b/src/webrtc/call.ts @@ -45,10 +45,10 @@ import MatrixEvent from "../models/event" */ interface CallOpts { - roomId: string, - client: any, // Fix when client is TSified - forceTURN: boolean, - turnServers: Array, + roomId?: string, + client?: any, // Fix when client is TSified + forceTURN?: boolean, + turnServers?: Array, } interface TurnServer { @@ -58,7 +58,7 @@ interface TurnServer { ttl?: number, } -enum CallState { +export enum CallState { Fledgling = 'fledgling', InviteSent = 'invite_sent', WaitLocalMedia = 'wait_local_media', @@ -70,17 +70,17 @@ enum CallState { Ended = 'ended', } -enum CallType { +export enum CallType { Voice = 'voice', Video = 'video', } -enum CallDirection { +export enum CallDirection { Inbound = 'inbound', Outbound = 'outbound', } -enum CallParty { +export enum CallParty { Local = 'local', Remote = 'remote', } @@ -91,7 +91,7 @@ enum MediaQueueId { LocalVideo = 'local_video', } -enum CallErrorCode { +export enum CallErrorCode { /** An error code when the local client failed to create an offer. */ LocalOfferFailed = 'local_offer_failed', /** @@ -433,7 +433,7 @@ export class MatrixCall extends EventEmitter { this.type = CallType.Voice; } - if (event.getAge()) { + if (event.getLocalAge()) { setTimeout(() => { if (this.state == CallState.Ringing) { logger.debug("Call invite has expired. Hanging up."); @@ -445,7 +445,7 @@ export class MatrixCall extends EventEmitter { } this.emit("hangup"); } - }, this.msg.lifetime - event.getAge()); + }, this.msg.lifetime - event.getLocalAge()); } } @@ -1154,7 +1154,8 @@ export class MatrixCall extends EventEmitter { private async placeCallWithConstraints(constraints: MediaStreamConstraints) { logger.log("Getting user media with constraints", constraints); - this.client.callList[this.callId] = this; + // XXX Find a better way to do this + this.client._callEventHandler.calls.set(this.callId, this); this.setState(CallState.WaitLocalMedia); this.direction = CallDirection.Outbound; this.config = constraints; diff --git a/src/webrtc/callEventHandler.ts b/src/webrtc/callEventHandler.ts new file mode 100644 index 000000000..bf01f0a98 --- /dev/null +++ b/src/webrtc/callEventHandler.ts @@ -0,0 +1,256 @@ +/* +Copyright 2020 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import MatrixEvent from '../models/event'; +import {logger} from '../logger'; +import { createNewMatrixCall, MatrixCall, CallErrorCode, CallState, CallDirection } from './call'; +import { EventType } from '../@types/event'; +import { MatrixClient } from '../client'; + +// Don't ring unless we'd be ringing for at least 3 seconds: the user needs some +// time to press the 'accept' button +const RING_GRACE_PERIOD = 3000; + +export class CallEventHandler { + client: MatrixClient; + calls: Map; + callEventBuffer: MatrixEvent[]; + candidatesByCall: Map>; + + constructor(client: MatrixClient) { + this.client = client; + this.calls = new Map(); + // The sync code always emits one event at a time, so it will patiently + // wait for us to finish processing a call invite before delivering the + // next event, even if that next event is a hangup. We therefore accumulate + // all our call events and then process them on the 'sync' event, ie. + // each time a sync has completed. This way, we can avoid emitting incoming + // call events if we get both the invite and answer/hangup in the same sync. + // This happens quite often, eg. replaying sync from storage, catchup sync + // after loading and after we've been offline for a bit. + this.callEventBuffer = []; + this.candidatesByCall = new Map>(); + this.client.on("sync", this.evaluateEventBuffer); + this.client.on("event", this.onEvent); + } + + public stop() { + this.client.removeEventListener("sync", this.evaluateEventBuffer); + this.client.removeEventListener("event", this.onEvent); + } + + private evaluateEventBuffer = () => { + if (this.client.getSyncState() === "SYNCING") { + // don't process any events until they are all decrypted + if (this.callEventBuffer.some((e) => e.isBeingDecrypted())) return; + + const ignoreCallIds = new Set(); + // inspect the buffer and mark all calls which have been answered + // or hung up before passing them to the call event handler. + for (const ev of this.callEventBuffer) { + if (ev.getType() === EventType.CallAnswer || + ev.getType() === EventType.CallHangup) { + ignoreCallIds.add(ev.getContent().call_id); + } + } + // now loop through the buffer chronologically and inject them + for (const e of this.callEventBuffer) { + if ( + e.getType() === EventType.CallInvite && + ignoreCallIds.has(e.getContent().call_id) + ) { + // This call has previously been answered or hung up: ignore it + continue; + } + try { + this.handleCallEvent(e); + } catch (e) { + logger.error("Caught exception handling call event", e); + } + } + this.callEventBuffer = []; + } + } + + private onEvent = (event: MatrixEvent) => { + // any call events or ones that might be once they're decrypted + if (event.getType().indexOf("m.call.") === 0 || event.isBeingDecrypted()) { + // queue up for processing once all events from this sync have been + // processed (see above). + this.callEventBuffer.push(event); + } + + if (event.isBeingDecrypted() || event.isDecryptionFailure()) { + // add an event listener for once the event is decrypted. + event.once("Event.decrypted", () => { + if (event.getType().indexOf("m.call.") === -1) return; + + if (this.callEventBuffer.includes(event)) { + // we were waiting for that event to decrypt, so recheck the buffer + this.evaluateEventBuffer(); + } else { + // This one wasn't buffered so just run the event handler for it + // straight away + try { + this.handleCallEvent(event); + } catch (e) { + logger.error("Caught exception handling call event", e); + } + } + }); + } + } + + private handleCallEvent(event: MatrixEvent) { + const content = event.getContent(); + let call = content.call_id ? this.calls.get(content.call_id) : undefined; + //console.info("RECV %s content=%s", event.getType(), JSON.stringify(content)); + + if (event.getType() === EventType.CallInvite) { + if (event.getSender() === this.client.credentials.userId) { + return; // ignore invites you send + } + + if (event.getLocalAge() > content.lifetime - RING_GRACE_PERIOD) { + return; // expired call + } + + if (call && call.state === CallState.Ended) { + return; // stale/old invite event + } + if (call) { + logger.log( + `WARN: Already have a MatrixCall with id ${content.call_id} but got an ` + + `invite. Clobbering.`, + ); + } + + call = createNewMatrixCall(this.client, event.getRoomId(), { + forceTURN: this.client._forceTURN, + }); + if (!call) { + logger.log( + "Incoming call ID " + content.call_id + " but this client " + + "doesn't support WebRTC", + ); + // don't hang up the call: there could be other clients + // connected that do support WebRTC and declining the + // the call on their behalf would be really annoying. + return; + } + + call.callId = content.call_id; + call.initWithInvite(event); + this.calls.set(call.callId, call); + + // if we stashed candidate events for that call ID, play them back now + if (this.candidatesByCall.get(call.callId)) { + for (const cand of this.candidatesByCall.get(call.callId)) { + call.gotRemoteIceCandidate(cand); + } + } + + // Were we trying to call that user (room)? + let existingCall; + for (const thisCall of this.calls.values()) { + const isCalling = [CallState.WaitLocalMedia, CallState.CreateOffer, CallState.InviteSent].includes( + thisCall.state, + ); + + if ( + call.roomId === thisCall.roomId && + thisCall.direction === CallDirection.Outbound && + isCalling + ) { + existingCall = thisCall; + break; + } + } + + if (existingCall) { + // If we've only got to wait_local_media or create_offer and + // we've got an invite, pick the incoming call because we know + // we haven't sent our invite yet otherwise, pick whichever + // call has the lowest call ID (by string comparison) + if (existingCall.state === CallState.WaitLocalMedia || + existingCall.state === CallState.CreateOffer || + existingCall.callId > call.callId) { + logger.log( + "Glare detected: answering incoming call " + call.callId + + " and canceling outgoing call " + existingCall.callId, + ); + existingCall.replacedBy(call); + call.answer(); + } else { + logger.log( + "Glare detected: rejecting incoming call " + call.callId + + " and keeping outgoing call " + existingCall.callId, + ); + call.hangup(CallErrorCode.Replaced, true); + } + } else { + this.client.emit("Call.incoming", call); + } + } else if (event.getType() === EventType.CallAnswer) { + if (!call) { + return; + } + if (event.getSender() === this.client.credentials.userId) { + if (call.state === CallState.Ringing) { + call.onAnsweredElsewhere(content); + } + } else { + call.receivedAnswer(content); + } + } else if (event.getType() === EventType.CallCandidates) { + if (event.getSender() === this.client.credentials.userId) { + return; + } + if (!call) { + // store the candidates; we may get a call eventually. + if (!this.candidatesByCall.has(content.call_id)) { + this.candidatesByCall.set(content.call_id, []); + } + this.candidatesByCall.set(content.call_id, this.candidatesByCall.get( + content.call_id, + ).concat(content.candidates)); + } else { + for (const cand of content.candidates) { + call.gotRemoteIceCandidate(cand); + } + } + } else if (event.getType() === EventType.CallHangup) { + // Note that we also observe our own hangups here so we can see + // if we've already rejected a call that would otherwise be valid + if (!call) { + // if not live, store the fact that the call has ended because + // we're probably getting events backwards so + // the hangup will come before the invite + call = createNewMatrixCall(this.client, event.getRoomId()); + if (call) { + call.callId = content.call_id; + call.initWithHangup(event); + this.calls.set(content.call_id, call); + } + } else { + if (call.state !== CallState.Ended) { + call.onHangupReceived(content); + this.calls.delete(content.call_id); + } + } + } + } +}