1
0
mirror of https://github.com/matrix-org/matrix-js-sdk.git synced 2025-11-25 05:23:13 +03:00

Add initial group call logic

This commit is contained in:
Robert Long
2021-09-08 13:27:38 -07:00
parent 60e168806d
commit 4fe115b2c4
5 changed files with 802 additions and 6 deletions

View File

@@ -23,7 +23,7 @@ import { EventEmitter } from "events";
import { ISyncStateData, SyncApi } from "./sync";
import { EventStatus, IContent, IDecryptOptions, IEvent, MatrixEvent } from "./models/event";
import { StubStore } from "./store/stub";
import { createNewMatrixCall, MatrixCall, ConstraintsType, getUserMediaContraints } from "./webrtc/call";
import { createNewMatrixCall, MatrixCall, ConstraintsType, getUserMediaContraints, CallType } from "./webrtc/call";
import { Filter, IFilterDefinition } from "./filter";
import { CallEventHandler } from './webrtc/callEventHandler';
import * as utils from './utils';
@@ -144,6 +144,7 @@ import { IHierarchyRoom, ISpaceSummaryEvent, ISpaceSummaryRoom } from "./@types/
import { IPusher, IPusherRequest, IPushRules, PushRuleAction, PushRuleKind, RuleId } from "./@types/PushRules";
import { IThreepid } from "./@types/threepids";
import { CryptoStore } from "./crypto/store/base";
import { GroupCall, GroupCallEvent } from "./webrtc/groupCall";
export type Store = IStore;
export type SessionStore = WebStorageSessionStore;
@@ -1315,6 +1316,23 @@ export class MatrixClient extends EventEmitter {
return createNewMatrixCall(this, roomId, { invitee });
}
/**
* Creates a new group call.
*
* @param {string} roomId The room the call is to be placed in.
* @return {GroupCall} the call or null if the browser doesn't support calling.
*/
public createGroupCall(
roomId: string,
type: CallType,
dataChannelsEnabled?: boolean,
dataChannelOptions?: RTCDataChannelInit,
): GroupCall {
const groupCall = new GroupCall(this, roomId, type, dataChannelsEnabled, dataChannelOptions);
this.reEmitter.reEmit(groupCall, Object.values(GroupCallEvent));
return groupCall;
}
/**
* Get the current sync state.
* @return {?SyncState} the sync state, which may be null.
@@ -6153,11 +6171,11 @@ export class MatrixClient extends EventEmitter {
public register(
username: string,
password: string,
sessionId: string,
sessionId: string | null,
auth: any,
bindThreepids: any,
guestAccessToken: string,
inhibitLogin: boolean,
bindThreepids?: any,
guestAccessToken?: string,
inhibitLogin?: boolean,
callback?: Callback,
): Promise<any> { // TODO: Types (many)
// backwards compat

View File

@@ -51,6 +51,7 @@ export {
createNewMatrixCall,
setAudioInput as setMatrixCallAudioInput,
setVideoInput as setMatrixCallVideoInput,
CallType,
} from "./webrtc/call";
// TODO: This export is temporary and is only used for the local call feed for conference calls

View File

@@ -128,6 +128,8 @@ export enum CallEvent {
FeedsChanged = 'feeds_changed',
AssertedIdentityChanged = 'asserted_identity_changed',
DataChannel = 'datachannel',
}
export enum CallErrorCode {
@@ -368,6 +370,13 @@ export class MatrixCall extends EventEmitter {
await this.placeCallWithConstraints(ConstraintsType.Video);
}
public createDataChannel(label: string, options: RTCDataChannelInit) {
logger.debug("createDataChannel");
const dataChannel = this.peerConn.createDataChannel(label, options);
this.emit(CallEvent.DataChannel, dataChannel);
return dataChannel;
}
public getOpponentMember(): RoomMember {
return this.opponentMember;
}
@@ -1525,6 +1534,10 @@ export class MatrixCall extends EventEmitter {
stream.addEventListener("removetrack", () => this.deleteFeedByStream(stream));
};
private onDataChannel = (ev: RTCDataChannelEvent): void => {
this.emit(CallEvent.DataChannel, ev.channel);
};
/**
* This method removes all video/rtx codecs from screensharing video
* transceivers. This is necessary since they can cause problems. Without
@@ -1883,6 +1896,7 @@ export class MatrixCall extends EventEmitter {
pc.addEventListener('icegatheringstatechange', this.onIceGatheringStateChange);
pc.addEventListener('track', this.onTrack);
pc.addEventListener('negotiationneeded', this.onNegotiationNeeded);
pc.addEventListener('datachannel', this.onDataChannel);
return pc;
}

View File

@@ -35,7 +35,7 @@ export class CallFeed extends EventEmitter {
private analyser: AnalyserNode;
private frequencyBinCount: Float32Array;
private speakingThreshold = SPEAKING_THRESHOLD;
private speaking = false;
public speaking = false;
private volumeLooperTimeout: number;
constructor(

763
src/webrtc/groupCall.ts Normal file
View File

@@ -0,0 +1,763 @@
import EventEmitter from "events";
import { CallFeed, CallFeedEvent } from "./callFeed";
import { MatrixClient } from "../client";
import { randomString } from "../randomstring";
import { CallErrorCode, CallEvent, CallType, MatrixCall } from "./call";
import { RoomMember } from "../models/room-member";
import { SDPStreamMetadataPurpose } from "./callEventTypes";
import { Room } from "../models/room";
import { logger } from "../logger";
export enum GroupCallEvent {
ActiveSpeakerChanged = "active_speaker_changed",
}
const CONF_ROOM = "me.robertlong.conf";
const CONF_PARTICIPANT = "me.robertlong.conf.participant";
const PARTICIPANT_TIMEOUT = 1000 * 15;
const SPEAKING_THRESHOLD = -80;
const ACTIVE_SPEAKER_INTERVAL = 1000;
const ACTIVE_SPEAKER_SAMPLES = 8;
export class GroupCallParticipant extends EventEmitter {
public feeds: CallFeed[] = [];
public activeSpeaker: boolean;
public activeSpeakerSamples: number[];
public dataChannel?: RTCDataChannel;
constructor(
private groupCall: GroupCall,
public member: RoomMember,
// The session id is used to re-initiate calls if the user's participant
// session id has changed
public sessionId: string,
public call?: MatrixCall,
) {
super();
this.activeSpeakerSamples = Array(ACTIVE_SPEAKER_SAMPLES).fill(
-Infinity,
);
if (this.call) {
this.call.on(CallEvent.State, this.onCallStateChanged);
this.call.on(CallEvent.FeedsChanged, this.onCallFeedsChanged);
this.call.on(CallEvent.Replaced, this.onCallReplaced);
this.call.on(CallEvent.Hangup, this.onCallHangup);
}
}
public replaceCall(call: MatrixCall, sessionId: string) {
if (this.call) {
this.call.hangup(CallErrorCode.Replaced, false);
this.call.removeListener(CallEvent.State, this.onCallStateChanged);
this.call.removeListener(
CallEvent.FeedsChanged,
this.onCallFeedsChanged,
);
this.call.removeListener(CallEvent.Replaced, this.onCallReplaced);
this.call.removeListener(CallEvent.Hangup, this.onCallHangup);
this.call.removeListener(CallEvent.DataChannel, this.onCallDataChannel);
}
this.call = call;
this.member = call.getOpponentMember();
this.activeSpeaker = false;
this.sessionId = sessionId;
this.call.on(CallEvent.State, this.onCallStateChanged);
this.call.on(CallEvent.FeedsChanged, this.onCallFeedsChanged);
this.call.on(CallEvent.Replaced, this.onCallReplaced);
this.call.on(CallEvent.Hangup, this.onCallHangup);
this.call.on(CallEvent.DataChannel, this.onCallDataChannel);
}
public get usermediaFeed() {
return this.feeds.find((feed) => feed.purpose === SDPStreamMetadataPurpose.Usermedia);
}
public get usermediaStream(): MediaStream {
return this.usermediaFeed?.stream;
}
public isAudioMuted(): boolean {
const feed = this.usermediaFeed;
if (!feed) {
return true;
}
return feed.isAudioMuted();
}
public isVideoMuted(): boolean {
const feed = this.usermediaFeed;
if (!feed) {
return true;
}
return feed.isVideoMuted();
}
private onCallStateChanged = (state) => {
const call = this.call;
const audioMuted = this.groupCall.localParticipant.isAudioMuted();
if (
call.localUsermediaStream &&
call.isMicrophoneMuted() !== audioMuted
) {
call.setMicrophoneMuted(audioMuted);
}
const videoMuted = this.groupCall.localParticipant.isVideoMuted();
if (
call.localUsermediaStream &&
call.isLocalVideoMuted() !== videoMuted
) {
call.setLocalVideoMuted(videoMuted);
}
this.groupCall.emit(
"debugstate",
this.member.userId,
this.call.callId,
state,
);
};
onCallFeedsChanged = () => {
const oldFeeds = this.feeds;
const newFeeds = this.call.getRemoteFeeds();
this.feeds = [];
for (const feed of newFeeds) {
if (oldFeeds.includes(feed)) {
continue;
}
this.addCallFeed(feed);
}
};
onCallReplaced = (newCall) => {
// TODO: Should we always reuse the sessionId?
this.replaceCall(newCall, this.sessionId);
this.groupCall.emit("call", newCall);
this.groupCall.emit("participants_changed");
};
onCallHangup = () => {
if (this.call.hangupReason === "replaced") {
return;
}
const participantIndex = this.groupCall.participants.indexOf(this);
if (participantIndex === -1) {
return;
}
this.groupCall.participants.splice(participantIndex, 1);
if (
this.groupCall.activeSpeaker === this &&
this.groupCall.participants.length > 0
) {
this.groupCall.activeSpeaker = this.groupCall.participants[0];
this.groupCall.activeSpeaker.activeSpeaker = true;
}
this.groupCall.emit("participants_changed");
};
addCallFeed(callFeed: CallFeed) {
if (callFeed.purpose === SDPStreamMetadataPurpose.Usermedia) {
callFeed.setSpeakingThreshold(SPEAKING_THRESHOLD);
callFeed.measureVolumeActivity(true);
callFeed.on(CallFeedEvent.Speaking, this.onCallFeedSpeaking);
callFeed.on(
CallFeedEvent.VolumeChanged,
this.onCallFeedVolumeChanged,
);
callFeed.on(
CallFeedEvent.MuteStateChanged,
this.onCallFeedMuteStateChanged,
);
this.onCallFeedMuteStateChanged(
this.isAudioMuted(),
this.isVideoMuted(),
);
}
this.feeds.push(callFeed);
}
onCallFeedSpeaking = (speaking: boolean) => {
this.emit("speaking");
};
onCallFeedVolumeChanged = (maxVolume: number) => {
this.activeSpeakerSamples.shift();
this.activeSpeakerSamples.push(maxVolume);
this.emit("volume_changed", maxVolume);
};
onCallFeedMuteStateChanged = (audioMuted: boolean, videoMuted: boolean) => {
if (audioMuted) {
this.activeSpeakerSamples = Array(ACTIVE_SPEAKER_SAMPLES).fill(
-Infinity,
);
}
this.emit("mute_state_changed", audioMuted, videoMuted);
};
onCallDataChannel = (dataChannel: RTCDataChannel) => {
this.dataChannel = dataChannel;
this.emit("datachannel");
};
}
export class GroupCall extends EventEmitter {
public entered = false;
public activeSpeaker: GroupCallParticipant;
public localParticipant: GroupCallParticipant;
public participants: GroupCallParticipant[] = [];
public room: Room;
private speakerMap: Map<RoomMember, number[]> = new Map();
private presenceLoopTimeout?: number;
private activeSpeakerLoopTimeout: number;
constructor(
private client: MatrixClient,
roomId: string,
public type: CallType,
private dataChannelsEnabled?: boolean,
private dataChannelOptions?: RTCDataChannelInit,
) {
super();
this.room = this.client.getRoom(roomId);
}
async initLocalParticipant() {
if (this.localParticipant) {
return this.localParticipant;
}
let stream;
if (this.type === CallType.Video) {
stream = await this.client.getLocalVideoStream();
} else {
stream = await this.client.getLocalAudioStream();
}
const userId = this.client.getUserId();
const localCallFeed = new CallFeed(
stream,
userId,
SDPStreamMetadataPurpose.Usermedia,
this.client,
this.room.roomId,
false,
false,
);
const member = this.room.getMember(userId);
this.localParticipant = new GroupCallParticipant(
this,
member,
randomString(16),
);
this.localParticipant.addCallFeed(localCallFeed);
return this.localParticipant;
}
async enter() {
if (!this.localParticipant) {
await this.initLocalParticipant();
}
// Ensure that this room is marked as a conference room so clients can react appropriately
const activeConf = this.room.currentState
.getStateEvents(CONF_ROOM, "")
?.getContent()?.active;
if (!activeConf) {
this.sendStateEventWithRetry(
this.room.roomId,
CONF_ROOM,
{ active: true },
"",
);
}
this.activeSpeaker = this.localParticipant;
this.participants.push(this.localParticipant);
// Announce to the other room members that we have entered the room.
// Continue doing so every PARTICIPANT_TIMEOUT ms
this.onPresenceLoop();
this.entered = true;
this.processInitialCalls();
// Set up participants for the members currently in the room.
// Other members will be picked up by the RoomState.members event.
const initialMembers = this.room.getMembers();
for (const member of initialMembers) {
this.onMemberChanged(member);
}
this.client.on("RoomState.members", this.onRoomStateMembers);
this.client.on("Call.incoming", this.onIncomingCall);
this.emit("entered");
this.emit("participants_changed");
this.onActiveSpeakerLoop();
}
leave() {
this.localParticipant = null;
this.client.stopLocalMediaStream();
if (!this.entered) {
return;
}
const userId = this.client.getUserId();
const currentMemberState = this.room.currentState.getStateEvents(
"m.room.member",
userId,
);
this.sendStateEventWithRetry(
this.room.roomId,
"m.room.member",
{
...currentMemberState.getContent(),
[CONF_PARTICIPANT]: null,
},
userId,
);
for (const participant of this.participants) {
if (participant.call) {
participant.call.hangup(CallErrorCode.UserHangup, false);
}
}
this.entered = false;
this.participants = [];
this.activeSpeaker = null;
this.speakerMap.clear();
clearTimeout(this.presenceLoopTimeout);
clearTimeout(this.activeSpeakerLoopTimeout);
this.client.removeListener(
"RoomState.members",
this.onRoomStateMembers,
);
this.client.removeListener("Call.incoming", this.onIncomingCall);
this.emit("participants_changed");
this.emit("left");
}
isLocalVideoMuted() {
if (this.localParticipant) {
return this.localParticipant.isVideoMuted();
}
return true;
}
isMicrophoneMuted() {
if (this.localParticipant) {
return this.localParticipant.isAudioMuted();
}
return true;
}
setMicrophoneMuted(muted) {
if (this.localParticipant) {
for (const { stream } of this.localParticipant.feeds) {
for (const track of stream.getTracks()) {
if (track.kind === "audio") {
track.enabled = !muted;
}
}
}
}
for (const { call } of this.participants) {
if (
call &&
call.localUsermediaStream &&
call.isMicrophoneMuted() !== muted
) {
call.setMicrophoneMuted(muted);
}
}
this.emit("participants_changed");
this.emit("audio_mute_state_changed");
}
setLocalVideoMuted(muted) {
if (this.localParticipant) {
for (const { stream } of this.localParticipant.feeds) {
for (const track of stream.getTracks()) {
if (track.kind === "video") {
track.enabled = !muted;
}
}
}
}
for (const { call } of this.participants) {
if (
call &&
call.localUsermediaStream &&
call.isLocalVideoMuted() !== muted
) {
call.setLocalVideoMuted(muted);
}
}
this.emit("participants_changed");
this.emit("video_mute_state_changed");
}
public get localUsermediaFeed(): CallFeed {
return this.localParticipant?.usermediaFeed;
}
public get localUsermediaStream(): MediaStream {
return this.localParticipant?.usermediaStream;
}
/**
* Call presence
*/
onPresenceLoop = () => {
const userId = this.client.getUserId();
const currentMemberState = this.room.currentState.getStateEvents(
"m.room.member",
userId,
);
this.sendStateEventWithRetry(
this.room.roomId,
"m.room.member",
{
...currentMemberState.getContent(),
[CONF_PARTICIPANT]: {
sessionId: this.localParticipant.sessionId,
expiresAt: new Date().getTime() + PARTICIPANT_TIMEOUT * 2,
},
},
userId,
);
const now = new Date().getTime();
for (const participant of this.participants) {
if (participant === this.localParticipant) {
continue;
}
const memberStateEvent = this.room.currentState.getStateEvents(
"m.room.member",
participant.member.userId,
);
const memberStateContent = memberStateEvent.getContent();
if (
!memberStateContent ||
!memberStateContent[CONF_PARTICIPANT] ||
typeof memberStateContent[CONF_PARTICIPANT] !== "object" ||
(memberStateContent[CONF_PARTICIPANT].expiresAt &&
memberStateContent[CONF_PARTICIPANT].expiresAt < now)
) {
this.emit(
"debugstate",
participant.member.userId,
null,
"inactive",
);
if (participant.call) {
// NOTE: This should remove the participant on the next tick
// since matrix-js-sdk awaits a promise before firing user_hangup
participant.call.hangup(CallErrorCode.UserHangup, false);
}
}
}
this.presenceLoopTimeout = setTimeout(
this.onPresenceLoop,
PARTICIPANT_TIMEOUT,
);
};
/**
* Call Setup
*
* There are two different paths for calls to be created:
* 1. Incoming calls triggered by the Call.incoming event.
* 2. Outgoing calls to the initial members of a room or new members
* as they are observed by the RoomState.members event.
*/
processInitialCalls() {
const calls = this.client.callEventHandler.calls.values();
for (const call of calls) {
this.onIncomingCall(call);
}
}
onIncomingCall = (call: MatrixCall) => {
// The incoming calls may be for another room, which we will ignore.
if (call.roomId !== this.room.roomId) {
return;
}
if (call.state !== "ringing") {
logger.warn("Incoming call no longer in ringing state. Ignoring.");
return;
}
const opponentMember = call.getOpponentMember();
const memberStateEvent = this.room.currentState.getStateEvents(
"m.room.member",
opponentMember.userId,
);
const memberStateContent = memberStateEvent.getContent();
if (!memberStateContent || !memberStateContent[CONF_PARTICIPANT]) {
call.reject();
return;
}
const { sessionId } = memberStateContent[CONF_PARTICIPANT];
// Check if the user calling has an existing participant and use this call instead.
const existingParticipant = this.participants.find(
(participant) => participant.member.userId === opponentMember.userId,
);
let participant;
if (existingParticipant) {
participant = existingParticipant;
// This also fires the hangup event and triggers those side-effects
existingParticipant.replaceCall(call, sessionId);
} else {
participant = new GroupCallParticipant(
this,
opponentMember,
sessionId,
call,
);
this.participants.push(participant);
}
call.answer();
this.emit("call", call);
this.emit("participants_changed");
};
onRoomStateMembers = (_event, _state, member) => {
this.onMemberChanged(member);
};
onMemberChanged = (member) => {
// The member events may be received for another room, which we will ignore.
if (member.roomId !== this.room.roomId) {
return;
}
// Don't process your own member.
const localUserId = this.client.getUserId();
if (member.userId === localUserId) {
return;
}
// Get the latest member participant state event.
const memberStateEvent = this.room.currentState.getStateEvents(
"m.room.member",
member.userId,
);
const memberStateContent = memberStateEvent.getContent();
if (!memberStateContent) {
return;
}
const participantInfo = memberStateContent[CONF_PARTICIPANT];
if (!participantInfo || typeof participantInfo !== "object") {
return;
}
const { expiresAt, sessionId } = participantInfo;
// If the participant state has expired, ignore this user.
const now = new Date().getTime();
if (expiresAt < now) {
this.emit("debugstate", member.userId, null, "inactive");
return;
}
// If there is an existing participant for this member check the session id.
// If the session id changed then we can hang up the old call and start a new one.
// Otherwise, ignore the member change event because we already have an active participant.
let participant = this.participants.find(
(p) => p.member.userId === member.userId,
);
if (participant) {
if (participant.sessionId !== sessionId) {
this.emit("debugstate", member.userId, null, "inactive");
participant.call.hangup(CallErrorCode.Replaced, false);
} else {
return;
}
}
// Only initiate a call with a user who has a userId that is lexicographically
// less than your own. Otherwise, that user will call you.
if (member.userId < localUserId) {
this.emit("debugstate", member.userId, null, "waiting for invite");
return;
}
const call = this.client.createCall(this.room.roomId, member.userId);
if (participant) {
participant.replaceCall(call, sessionId);
} else {
participant = new GroupCallParticipant(
this,
member,
sessionId,
call,
);
// TODO: Should we wait until the call has been answered to push the participant?
// Or do we hide the participant until their stream is live?
// Does hiding a participant without a stream present a privacy problem because
// a participant without a stream can still listen in on other user's streams?
this.participants.push(participant);
}
let callPromise;
if (this.type === CallType.Video) {
callPromise = call.placeVideoCall();
} else {
callPromise = call.placeVoiceCall();
}
callPromise.then(() => {
if (this.dataChannelsEnabled) {
call.createDataChannel("datachannel", this.dataChannelOptions);
}
this.emit("call", call);
});
this.emit("participants_changed");
};
onActiveSpeakerLoop = () => {
let topAvg;
let nextActiveSpeaker;
for (const participant of this.participants) {
let total = 0;
for (let i = 0; i < participant.activeSpeakerSamples.length; i++) {
const volume = participant.activeSpeakerSamples[i];
total += Math.max(volume, SPEAKING_THRESHOLD);
}
const avg = total / ACTIVE_SPEAKER_SAMPLES;
if (!topAvg || avg > topAvg) {
topAvg = avg;
nextActiveSpeaker = participant.member;
}
}
if (nextActiveSpeaker && topAvg > SPEAKING_THRESHOLD) {
if (nextActiveSpeaker && this.activeSpeaker !== nextActiveSpeaker) {
this.activeSpeaker.activeSpeaker = false;
nextActiveSpeaker.activeSpeaker = true;
this.activeSpeaker = nextActiveSpeaker;
this.emit("participants_changed");
}
}
this.activeSpeakerLoopTimeout = setTimeout(
this.onActiveSpeakerLoop,
ACTIVE_SPEAKER_INTERVAL,
);
};
/**
* Utils
*/
// TODO: move this elsewhere or get rid of the retry logic. Do we need it?
sendStateEventWithRetry(
roomId,
eventType,
content,
stateKey,
callback = undefined,
maxAttempts = 5,
) {
const sendStateEventWithRetry = async (attempt = 0) => {
try {
return await this.client.sendStateEvent(
roomId,
eventType,
content,
stateKey,
callback,
);
} catch (error) {
if (attempt >= maxAttempts) {
throw error;
}
await new Promise<void>((resolve) => setTimeout(resolve, 5));
return sendStateEventWithRetry(attempt + 1);
}
};
return sendStateEventWithRetry();
}
}