Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 31 additions & 19 deletions src/room/PCTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ export default class PCTransport extends EventEmitter {

private log = log;

private iceLog = log;

private loggerOptions: LoggerOptions;

private ddExtID = 0;
Expand Down Expand Up @@ -87,8 +89,9 @@ export default class PCTransport extends EventEmitter {

constructor(config?: RTCConfiguration, loggerOptions: LoggerOptions = {}) {
super();
this.log = getLogger(loggerOptions.loggerName ?? LoggerNames.PCTransport);
this.loggerOptions = loggerOptions;
this.log = getLogger(loggerOptions.loggerName ?? LoggerNames.PCTransport, () => this.logContext);
this.iceLog = getLogger(LoggerNames.ICE, () => this.logContext);
this.config = config;
this._pc = this.createPC();
this.offerLock = new Mutex();
Expand All @@ -99,24 +102,33 @@ export default class PCTransport extends EventEmitter {

pc.onicecandidate = (ev) => {
if (!ev.candidate) return;
this.iceLog.debug('local ICE candidate gathered', { candidate: ev.candidate.candidate });
this.onIceCandidate?.(ev.candidate);
};
pc.onicecandidateerror = (ev) => {
this.iceLog.debug('ICE candidate error', { event: ev });
this.onIceCandidateError?.(ev);
};

pc.oniceconnectionstatechange = () => {
this.iceLog.debug(`ICE connection state: ${pc.iceConnectionState}`);
this.onIceConnectionStateChange?.(pc.iceConnectionState);
};

pc.onsignalingstatechange = () => {
this.log.debug(`signaling state: ${pc.signalingState}`);
this.onSignalingStatechange?.(pc.signalingState);
};

pc.onconnectionstatechange = () => {
this.log.debug(`connection state: ${pc.connectionState}`);
this.onConnectionStateChange?.(pc.connectionState);
};
pc.ondatachannel = (ev) => {
this.log.debug('data channel opened by peer', {
label: ev.channel.label,
id: ev.channel.id,
});
this.onDataChannel?.(ev);
};
pc.ontrack = (ev) => {
Expand All @@ -142,6 +154,9 @@ export default class PCTransport extends EventEmitter {
if (this.pc.remoteDescription && !this.restartingIce) {
return this.pc.addIceCandidate(candidate);
}
this.iceLog.debug('queuing remote ICE candidate until remote description applied', {
pendingCount: this.pendingCandidates.length + 1,
});
this.pendingCandidates.push(candidate);
}

Expand All @@ -153,7 +168,6 @@ export default class PCTransport extends EventEmitter {
offerId !== this.latestOfferId
) {
this.log.warn('ignoring answer for old offer', {
...this.logContext,
offerId,
latestOfferId: this.latestOfferId,
});
Expand All @@ -172,7 +186,7 @@ export default class PCTransport extends EventEmitter {
sdpParsed.media.forEach((media) => {
ensureIPAddrMatchVersion(media);
});
this.log.debug('setting pending initial offer before processing answer', this.logContext);
this.log.debug('setting pending initial offer before processing answer');
await this.setMungedSDP(initialOffer, write(sdpParsed));
}
const sdpParsed = parse(sd.sdp ?? '');
Expand Down Expand Up @@ -230,6 +244,11 @@ export default class PCTransport extends EventEmitter {
}
await this.setMungedSDP(sd, mungedSDP, true);

if (this.pendingCandidates.length > 0) {
this.iceLog.debug('flushing queued ICE candidates', {
count: this.pendingCandidates.length,
});
}
this.pendingCandidates.forEach((candidate) => {
this.pc.addIceCandidate(candidate);
});
Expand Down Expand Up @@ -271,10 +290,7 @@ export default class PCTransport extends EventEmitter {
const unlock = await this.offerLock.lock();
try {
if (this.pc.signalingState !== 'stable') {
this.log.warn(
'signaling state is not stable, cannot create initial offer',
this.logContext,
);
this.log.warn('signaling state is not stable, cannot create initial offer');
return;
}
const offerId = this.latestOfferId + 1;
Expand All @@ -301,7 +317,7 @@ export default class PCTransport extends EventEmitter {
}

if (options?.iceRestart) {
this.log.debug('restarting ICE', this.logContext);
this.iceLog.debug('restarting ICE');
this.restartingIce = true;
}

Expand All @@ -318,21 +334,21 @@ export default class PCTransport extends EventEmitter {
await this._pc.setRemoteDescription(currentSD);
} else {
this.renegotiate = true;
this.log.debug('requesting renegotiation', { ...this.logContext });
this.log.debug('requesting renegotiation');
return;
}
} else if (!this._pc || this._pc.signalingState === 'closed') {
this.log.warn('could not createOffer with closed peer connection', this.logContext);
this.log.warn('could not createOffer with closed peer connection');
return;
}

// actually negotiate
this.log.debug('starting to negotiate', this.logContext);
this.log.debug('starting to negotiate');
// increase the offer id at the start to ensure the offer is always > 0 so that we can use 0 as a default value for legacy behavior
const offerId = this.latestOfferId + 1;
this.latestOfferId = offerId;
const offer = await this.pc.createOffer(options);
this.log.debug('original offer', { sdp: offer.sdp, ...this.logContext });
this.log.debug('original offer', { sdp: offer.sdp });

const sdpParsed = parse(offer.sdp ?? '');
sdpParsed.media.forEach((media) => {
Expand Down Expand Up @@ -386,7 +402,6 @@ export default class PCTransport extends EventEmitter {
});
if (this.latestOfferId > offerId) {
this.log.warn('latestOfferId mismatch', {
...this.logContext,
latestOfferId: this.latestOfferId,
offerId,
});
Expand Down Expand Up @@ -523,6 +538,7 @@ export default class PCTransport extends EventEmitter {
if (!this._pc) {
return;
}
this.log.debug('closing peer connection');
this.pendingInitialOffer = undefined;
this._pc.close();
this._pc.onconnectionstatechange = null;
Expand All @@ -544,10 +560,7 @@ export default class PCTransport extends EventEmitter {
if (munged) {
sd.sdp = munged;
try {
this.log.debug(
`setting munged ${remote ? 'remote' : 'local'} description`,
this.logContext,
);
this.log.debug(`setting munged ${remote ? 'remote' : 'local'} description`);
if (remote) {
await this.pc.setRemoteDescription(sd);
} else {
Expand All @@ -556,7 +569,6 @@ export default class PCTransport extends EventEmitter {
return;
} catch (e) {
this.log.warn(`not able to set ${sd.type}, falling back to unmodified sdp`, {
...this.logContext,
error: e,
mungedSdp: munged,
originalSdp,
Expand Down Expand Up @@ -589,7 +601,7 @@ export default class PCTransport extends EventEmitter {
if (!remote && this.pc.remoteDescription) {
fields.remoteSdp = this.pc.remoteDescription;
}
this.log.error(`unable to set ${sd.type}`, { ...this.logContext, fields });
this.log.error(`unable to set ${sd.type}`, { fields });
throw new NegotiationError(msg);
}
}
Expand Down
16 changes: 10 additions & 6 deletions src/room/PCTransportManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ export class PCTransportManager {

private log = log;

private iceLog = log;

private loggerOptions: LoggerOptions;

private _mode: PCMode;
Expand All @@ -73,8 +75,9 @@ export class PCTransportManager {
}

constructor(mode: PCMode, loggerOptions: LoggerOptions, rtcConfig?: RTCConfiguration) {
this.log = getLogger(loggerOptions.loggerName ?? LoggerNames.PCManager);
this.loggerOptions = loggerOptions;
this.log = getLogger(loggerOptions.loggerName ?? LoggerNames.PCManager, () => this.logContext);
this.iceLog = getLogger(LoggerNames.ICE, () => this.logContext);

this.isPublisherConnectionRequired = mode !== 'subscriber-primary';
this.isSubscriberConnectionRequired = mode === 'subscriber-primary';
Expand Down Expand Up @@ -150,7 +153,7 @@ export class PCTransportManager {
publisher.removeTrack(sender);
}
} catch (e) {
this.log.warn('could not removeTrack', { ...this.logContext, error: e });
this.log.warn('could not removeTrack', { error: e });
}
}
}
Expand All @@ -159,6 +162,7 @@ export class PCTransportManager {
}

async triggerIceRestart() {
this.iceLog.info('triggering ICE restart');
if (this.subscriber) {
this.subscriber.restartingIce = true;
}
Expand All @@ -169,6 +173,7 @@ export class PCTransportManager {
}

async addIceCandidate(candidate: RTCIceCandidateInit, target: SignalTarget) {
this.iceLog.debug('adding remote ICE candidate', { target, candidate });
if (target === SignalTarget.PUBLISHER) {
await this.publisher.addIceCandidate(candidate);
} else {
Expand All @@ -178,7 +183,6 @@ export class PCTransportManager {

async createSubscriberAnswerFromOffer(sd: RTCSessionDescriptionInit, offerId: number) {
this.log.debug('received server offer', {
...this.logContext,
RTCSdpType: sd.type,
sdp: sd.sdp,
signalingState: this.subscriber?.getSignallingState().toString(),
Expand All @@ -199,6 +203,7 @@ export class PCTransportManager {
}

updateConfiguration(config: RTCConfiguration, iceRestart?: boolean) {
this.log.debug('updating rtc configuration', { iceRestart });
this.publisher.setConfiguration(config);
this.subscriber?.setConfiguration(config);
if (iceRestart) {
Expand All @@ -214,7 +219,7 @@ export class PCTransportManager {
this.publisher.getConnectionState() !== 'connected' &&
this.publisher.getConnectionState() !== 'connecting'
) {
this.log.debug('negotiation required, start negotiating', this.logContext);
this.log.debug('negotiation required, start negotiating');
this.publisher.negotiate();
}
await Promise.all(
Expand Down Expand Up @@ -350,7 +355,6 @@ export class PCTransportManager {
`pc state change: from ${PCTransportState[previousState]} to ${
PCTransportState[this.state]
}`,
this.logContext,
);
this.onStateChange?.(
this.state,
Expand All @@ -372,7 +376,7 @@ export class PCTransportManager {

return new Promise<void>(async (resolve, reject) => {
const abortHandler = () => {
this.log.warn('abort transport connection', this.logContext);
this.log.warn('abort transport connection');
CriticalTimers.clearTimeout(connectTimeout);

reject(ConnectionError.cancelled('room connection has been cancelled'));
Expand Down
6 changes: 4 additions & 2 deletions src/room/RegionUrlProvider.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { Mutex } from '@livekit/mutex';
import type { RegionInfo, RegionSettings } from '@livekit/protocol';
import log from '../logger';
import { LoggerNames, getLogger } from '../logger';

const log = getLogger(LoggerNames.Region);
import { ConnectionError, ConnectionErrorReason } from './errors';
import { extractMaxAgeFromRequestHeaders, isCloud } from './utils';

Expand Down Expand Up @@ -229,7 +231,7 @@ export class RegionUrlProvider {
if (regionsLeft.length > 0) {
const nextRegion = regionsLeft[0];
this.attemptedRegions.push(nextRegion);
log.debug(`next region: ${nextRegion.region}`);
log.info(`switching to region: ${nextRegion.region}`, { region: nextRegion.region });
return nextRegion.url;
} else {
return null;
Expand Down
1 change: 1 addition & 0 deletions src/room/data-track/incoming/IncomingDataTrackManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ export default class IncomingDataTrackManager extends (EventEmitter as new () =>
return;
}
case 'pending': {
log.debug(`data track subscription activated`, { sid, handle: assignedHandle });
const pipeline = new IncomingDataTrackPipeline({
info: descriptor.info,
publisherIdentity: descriptor.publisherIdentity,
Expand Down
2 changes: 2 additions & 0 deletions src/room/data-track/outgoing/OutgoingDataTrackManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,13 @@ export default class OutgoingDataTrackManager extends (EventEmitter as new () =>
case 'pending': {
if (result.type === 'ok') {
const info = result.data;
log.debug(`SFU accepted publish request for handle ${handle}`, { sid: info.sid });
const e2eeManager = info.usesE2ee ? this.e2eeManager : null;
this.descriptors.set(info.pubHandle, Descriptor.active(info, e2eeManager));

descriptor.completionFuture.resolve?.();
} else {
log.debug(`SFU rejected publish request for handle ${handle}`, { error: result.error });
descriptor.completionFuture.reject?.(result.error);
}
return;
Expand Down
5 changes: 5 additions & 0 deletions src/room/participant/Participant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,11 @@ export default class Participant extends (EventEmitter as new () => TypedEmitter
}

protected addTrackPublication(publication: TrackPublication) {
this.log.debug(`adding track publication`, {
trackSid: publication.trackSid,
source: publication.source,
kind: publication.kind,
});
// forward publication driven events
publication.on(TrackEvent.Muted, () => {
this.emit(ParticipantEvent.TrackMuted, publication);
Expand Down
16 changes: 10 additions & 6 deletions src/room/track/Track.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ export abstract class Track<

/** @internal */
setStreamState(value: Track.StreamState) {
if (this._streamState !== value) {
this.log.debug(`stream state changed: ${this._streamState} -> ${value}`);
}
this._streamState = value;
}

Expand Down Expand Up @@ -89,8 +92,8 @@ export abstract class Track<
loggerOptions: LoggerOptions = {},
) {
super();
this.log = getLogger(loggerOptions.loggerName ?? LoggerNames.Track);
this.loggerContextCb = loggerOptions.loggerContextCb;
this.log = getLogger(loggerOptions.loggerName ?? LoggerNames.Track, () => this.logContext);

this.setMaxListeners(100);
this.kind = kind;
Expand Down Expand Up @@ -184,11 +187,11 @@ export abstract class Track<
this.emit(hasAudio ? TrackEvent.AudioPlaybackFailed : TrackEvent.VideoPlaybackFailed, e);
} else if (e.name === 'AbortError') {
// commonly triggered by another `play` request, only log for debugging purposes
log.debug(
this.log.debug(
`${hasAudio ? 'audio' : 'video'} playback aborted, likely due to new play request`,
);
} else {
log.warn(`could not playback ${hasAudio ? 'audio' : 'video'}`, e);
this.log.warn(`could not playback ${hasAudio ? 'audio' : 'video'}`, { error: e });
}
// If audio playback isn't allowed make sure we still play back the video
if (
Expand Down Expand Up @@ -251,6 +254,7 @@ export abstract class Track<
}

stop() {
this.log.debug('stopping track');
this.stopMonitor();
this._mediaStreamTrack.stop();
}
Expand Down Expand Up @@ -278,12 +282,12 @@ export abstract class Track<

/** @internal */
updateLoggerOptions(loggerOptions: LoggerOptions) {
if (loggerOptions.loggerName) {
this.log = getLogger(loggerOptions.loggerName);
}
if (loggerOptions.loggerContextCb) {
this.loggerContextCb = loggerOptions.loggerContextCb;
}
if (loggerOptions.loggerName) {
this.log = getLogger(loggerOptions.loggerName, () => this.logContext);
}
}

private recycleElement(element: HTMLMediaElement) {
Expand Down