import { Observable, Subject, Subscription } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

export class Subscriptions {
    private terminator = new Subject<void>();
    private subscriptions: Array<Subscription> = [];

    /**
     * Add a managed subscription for an observable. The subscription will be active until it completes, or until cancel() is called.
     * If cancel() is called before the observable completes, the completion block (if any) will not be called.
     */
    public add<T>(observable: Observable<T>,
                  next?: (value: T) => void, error?: (error: any) => void, complete?: () => void) {
        const subscription = observable.subscribe({next, error, complete});
        this.subscriptions.push(subscription);
    }

    /**
     * Wrap an observable so that it completes when these subscriptions are cancelled (if it has not already completed)
     */
    public limit<T>(observable: Observable<T>): Observable<T> {
        return observable.pipe(takeUntil(this.terminator));
    }

    /**
     * Cancel all outstanding subscriptions and complete limited observables.
     */
    public cancel() {
        this.subscriptions.forEach(s => s.unsubscribe());
        this.subscriptions = [];
        this.terminator.next();
    }
}
