diff --git a/packages/observable/src/observable.spec.ts b/packages/observable/src/observable.spec.ts index dcde2b2845..2593a9b5eb 100644 --- a/packages/observable/src/observable.spec.ts +++ b/packages/observable/src/observable.spec.ts @@ -358,10 +358,10 @@ describe('Observable', () => { for await (const value of source) { results.push(value); } - } catch (err: any) { + } catch (err: unknown) { thrownError = err; } - + expect(thrownError instanceof Error).to.be.true; expect(thrownError?.message).to.equal('wee'); expect(results).to.deep.equal([1, 2]); }); diff --git a/packages/observable/src/observable.ts b/packages/observable/src/observable.ts index 31f8403d15..232c8b6bab 100644 --- a/packages/observable/src/observable.ts +++ b/packages/observable/src/observable.ts @@ -25,11 +25,11 @@ export class UnsubscriptionError extends Error { * @deprecated Internal implementation detail. Do not construct error instances. * Cannot be tagged as internal: https://github.com/ReactiveX/rxjs/issues/6269 */ - constructor(public errors: any[]) { + constructor(public errors: unknown[]) { super( errors ? `${errors.length} errors occurred during unsubscription: - ${errors.map((err, i) => `${i + 1}) ${err.toString()}`).join('\n ')}` + ${errors.map((err, i) => `${i + 1}) ${err}`).join('\n ')}` : '' ); this.name = 'UnsubscriptionError'; @@ -76,7 +76,7 @@ export class Subscription implements SubscriptionLike { * started when the Subscription was created. */ unsubscribe(): void { - let errors: any[] | undefined; + let errors: unknown[] | undefined; if (!this.closed) { this.closed = true; @@ -213,7 +213,7 @@ export interface SubscriberOverrides { * the destination's `error` method. * @param err An error that has been thrown by the source observable. */ - error?: (err: any) => void; + error?: (err: unknown) => void; /** * If provided, this function will be called whenever the {@link Subscriber}'s * `complete` method is called. If an error is thrown within this function, it @@ -245,7 +245,7 @@ export class Subscriber extends Subscription implements Observer { /** @internal */ protected readonly _nextOverride: ((value: T) => void) | null = null; /** @internal */ - protected readonly _errorOverride: ((err: any) => void) | null = null; + protected readonly _errorOverride: ((err: unknown) => void) | null = null; /** @internal */ protected readonly _completeOverride: (() => void) | null = null; /** @internal */ @@ -259,7 +259,7 @@ export class Subscriber extends Subscription implements Observer { /** * @internal */ - constructor(destination: Subscriber | Partial> | ((value: any) => void) | null, overrides: SubscriberOverrides); + constructor(destination: Subscriber | Partial> | ((value: unknown) => void) | null, overrides: SubscriberOverrides); /** * Creates an instance of an RxJS Subscriber. This is the workhorse of the library. @@ -303,7 +303,7 @@ export class Subscriber extends Subscription implements Observer { // Automatically chain subscriptions together here. // if destination appears to be one of our subscriptions, we'll chain it. - if (hasAddAndUnsubscribe(destination)) { + if (typeof destination === "object" && hasAddAndUnsubscribe(destination)) { destination.add(this); } } @@ -328,7 +328,7 @@ export class Subscriber extends Subscription implements Observer { * the Observable has experienced an error condition. * @param err The `error` exception. */ - error(err?: any): void { + error(err?: unknown): void { if (this.isStopped) { handleStoppedNotification(errorNotification(err), this); } else { @@ -363,7 +363,7 @@ export class Subscriber extends Subscription implements Observer { this.destination.next(value); } - protected _error(err: any): void { + protected _error(err: unknown): void { try { this.destination.error(err); } finally { @@ -404,7 +404,7 @@ export interface GlobalConfig { * we do not want errors thrown in this user-configured handler to interfere with the * behavior of the library. */ - onUnhandledError: ((err: any) => void) | null; + onUnhandledError: ((err: unknown) => void) | null; /** * A registration point for notifications that cannot be sent to subscribers because they @@ -416,7 +416,7 @@ export interface GlobalConfig { * we do not want errors thrown in this user-configured handler to interfere with the * behavior of the library. */ - onStoppedNotification: ((notification: ObservableNotification, subscriber: Subscriber) => void) | null; + onStoppedNotification: ((notification: ObservableNotification, subscriber: Subscriber) => void) | null; } function overrideNext(this: Subscriber, value: T): void { @@ -427,7 +427,7 @@ function overrideNext(this: Subscriber, value: T): void { } } -function overrideError(this: Subscriber, err: any): void { +function overrideError(this: Subscriber, err: unknown): void { try { this._errorOverride!(err); } catch (error) { @@ -461,7 +461,7 @@ class ConsumerObserver implements Observer { } } - error(err: any): void { + error(err: unknown): void { const { partialObserver } = this; if (partialObserver.error) { try { @@ -487,7 +487,7 @@ class ConsumerObserver implements Observer { } function createSafeObserver(observerOrNext?: Partial> | ((value: T) => void) | null): Observer { - return new ConsumerObserver(!observerOrNext || isFunction(observerOrNext) ? { next: observerOrNext ?? undefined } : observerOrNext); + return new ConsumerObserver(!observerOrNext || typeof observerOrNext === 'function' ? { next: observerOrNext ?? undefined } : observerOrNext); } /** @@ -495,13 +495,13 @@ function createSafeObserver(observerOrNext?: Partial> | ((value: * @param notification The notification being sent. * @param subscriber The stopped subscriber. */ -function handleStoppedNotification(notification: ObservableNotification, subscriber: Subscriber) { +function handleStoppedNotification(notification: ObservableNotification, subscriber: Subscriber) { const { onStoppedNotification } = config; onStoppedNotification && setTimeout(() => onStoppedNotification(notification, subscriber)); } -function hasAddAndUnsubscribe(value: any): value is Subscription { - return value && isFunction(value.unsubscribe) && isFunction(value.add); +function hasAddAndUnsubscribe(value: object | null): value is Subscription { + return value !== null && hasMethod(value, 'unsubscribe') && hasMethod(value, 'add'); } export interface OperateConfig extends SubscriberOverrides { @@ -773,7 +773,7 @@ export class Observable implements Subscribable { } /** @internal */ - protected _subscribe(_subscriber: Subscriber): TeardownLogic { + protected _subscribe(_subscriber: Subscriber): TeardownLogic { return; } @@ -845,7 +845,7 @@ export class Observable implements Subscribable { op7: UnaryFunction, op8: UnaryFunction, op9: UnaryFunction, - ...operations: OperatorFunction[] + ...operations: OperatorFunction[] ): Observable; pipe( op1: UnaryFunction, A>, @@ -857,7 +857,7 @@ export class Observable implements Subscribable { op7: UnaryFunction, op8: UnaryFunction, op9: UnaryFunction, - ...operations: UnaryFunction[] + ...operations: UnaryFunction[] ): unknown; /** @@ -880,8 +880,8 @@ export class Observable implements Subscribable { * @return The Observable result of all the operators having been called * in the order they were passed in. */ - pipe(...operations: UnaryFunction[]): unknown { - return operations.reduce(pipeReducer, this as any); + pipe(...operations: UnaryFunction[]): unknown { + return operations.reduce(pipeReducer, this); } /** @@ -1007,7 +1007,7 @@ export class Observable implements Subscribable { } } -function pipeReducer(prev: any, fn: UnaryFunction) { +function pipeReducer(prev: unknown, fn: UnaryFunction) { return fn(prev); } @@ -1020,7 +1020,7 @@ function pipeReducer(prev: any, fn: UnaryFunction) { * * @param err the error to report */ -export function reportUnhandledError(err: any) { +export function reportUnhandledError(err: unknown) { setTimeout(() => { const { onUnhandledError } = config; if (onUnhandledError) { @@ -1103,13 +1103,15 @@ export function reportUnhandledError(err: any) { * an Array, an iterable, async iterable, or an array-like object to be converted. */ -export function from>(input: O): Observable>; +export function from>(input: O): Observable>; export function from(input: ObservableInput): Observable { const type = getObservableInputType(input); switch (type) { case ObservableInputType.Own: return input as Observable; case ObservableInputType.InteropObservable: + if (!isInteropObservable(input)) + throw new Error('unreachable error for type-narrowing'); return fromInteropObservable(input); case ObservableInputType.ArrayLike: return fromArrayLike(input as ArrayLike); @@ -1128,14 +1130,10 @@ export function from(input: ObservableInput): Observable { * Creates an RxJS Observable from an object that implements `Symbol.observable`. * @param obj An object that properly implements `Symbol.observable`. */ -function fromInteropObservable(obj: any) { +function fromInteropObservable(obj: InteropObservable) { return new Observable((subscriber: Subscriber) => { - const obs = obj[Symbol.observable ?? '@@observable'](); - if (isFunction(obs.subscribe)) { - return obs.subscribe(subscriber); - } - // Should be caught by observable subscribe function error handling. - throw new TypeError('Provided object does not correctly implement Symbol.observable'); + const obs = Reflect.get(obj, Symbol.observable ?? '@@observable')(); + return obs.subscribe(subscriber); }); } @@ -1162,7 +1160,7 @@ export function fromPromise(promise: PromiseLike) { subscriber.complete(); } }, - (err: any) => subscriber.error(err) + (err: unknown) => subscriber.error(err) ) .then(null, reportUnhandledError); }); @@ -1239,6 +1237,11 @@ export enum ObservableInputType { } export function getObservableInputType(input: unknown): ObservableInputType { + if (typeof input !== 'object' || input === null) { + throw new TypeError( + `You provided '${input}' where a stream was expected. You can provide an Observable, Promise, ReadableStream, Array, AsyncIterable, or Iterable.` + ); + } if (input instanceof Observable) { return ObservableInputType.Own; } @@ -1261,9 +1264,7 @@ export function getObservableInputType(input: unknown): ObservableInputType { return ObservableInputType.ReadableStreamLike; } throw new TypeError( - `You provided ${ - input !== null && typeof input === 'object' ? 'an invalid object' : `'${input}'` - } where a stream was expected. You can provide an Observable, Promise, ReadableStream, Array, AsyncIterable, or Iterable.` + `You provided an invalid object where a stream was expected. You can provide an Observable, Promise, ReadableStream, Array, AsyncIterable, or Iterable.` ); } @@ -1271,12 +1272,16 @@ export function getObservableInputType(input: unknown): ObservableInputType { * Returns true if the object is a function. * @param value The value to check */ -export function isFunction(value: any): value is (...args: any[]) => any { +export function isFunction(value: unknown): value is (...args: unknown[]) => unknown { return typeof value === 'function'; } -function isAsyncIterable(obj: any): obj is AsyncIterable { - return Symbol.asyncIterator && isFunction(obj?.[Symbol.asyncIterator]); +function hasMethod(value: object, method: string | symbol): boolean { + return isFunction(Reflect.get(value, method)); +} + +function isAsyncIterable(obj: object): obj is AsyncIterable { + return Symbol.asyncIterator && hasMethod(obj, Symbol.asyncIterator); } export async function* readableStreamLikeToAsyncGenerator(readableStream: ReadableStreamLike): AsyncGenerator { @@ -1294,42 +1299,42 @@ export async function* readableStreamLikeToAsyncGenerator(readableStream: Rea } } -function isReadableStreamLike(obj: any): obj is ReadableStreamLike { +function isReadableStreamLike(obj: object): obj is ReadableStreamLike { // We don't want to use instanceof checks because they would return // false for instances from another Realm, like an