import { Observable, Unsubscribable } from 'rxjs';

enum State {
    Running, Error, Completed
}

const FINISHED: Unsubscribable = {
    unsubscribe() {
    }
};

/**
 * Returns an Observable that prevents all output while the provided "suppressor" observable has most recently generated a true value.
 * The most recent value generated by the source (if any) while output is suppressed will be propagated when the suppressor observable
 * later emits false.
 *
 * The suppression state is initially false. A true value must be generated by the suppressor before the suppression starts.
 *
 * Errors from either the source or the suppressor are propagated to the output, halting further output.
 *
 * If the suppressor completes while suppressing, the output will complete since no more values can be propagated. If the suppressor
 * completes while not suppressing, output from the source will continue as normal
 *
 * Examples:
 *     SUPPRESSOR      T          F
 *     SOURCE       A    B  C  D     E  <COMPLETE>
 *     OUTPUT       A             D  E  <COMPLETE>
 *
 *     SUPPRESSOR      T          <COMPLETE>
 *     SOURCE       A    B  C  D
 *     OUTPUT       A             <COMPLETE>
 *
 *     SUPPRESSOR      T          F     <COMPLETE>
 *     SOURCE       A    B  C  D     E              F  G  H  <COMPLETE>
 *     OUTPUT       A             D  E              F  G  H  <COMPLETE>
 */
export function suppressWith(suppressor: Observable<boolean>) {
    return function <T>(source: Observable<T>): Observable<T> {
        return new Observable(subscriber => {
            let isSuppressing = false;
            let suppressedValue: T = null;
            let hasSuppressedValue = false;
            let suppressorState: State = State.Running;
            let sourceState: State = State.Running;
            let sourceSubscription: Unsubscribable | null = null;

            const suppressorSubscription = suppressor.subscribe({
                next: (value) => {
                    if (isSuppressing && !value) {
                        // Stop suppressing and propagate the most recent suppressed value (if any)
                        if (hasSuppressedValue) {
                            subscriber.next(suppressedValue);
                            hasSuppressedValue = false;
                            suppressedValue = null;
                        }
                        if (sourceState === State.Completed) {
                            subscriber.complete();
                        }
                    }
                    else if (!isSuppressing && value) {
                        // Start suppressing
                        hasSuppressedValue = false;
                        suppressedValue = null;
                    }
                    isSuppressing = value;
                },
                error: (e) => {
                    suppressorState = State.Error;

                    // Since we will propagate the error to the subscriber, we can cancel the source subscription.
                    if (sourceSubscription !== null) {
                        sourceSubscription.unsubscribe();
                    }
                    // Propagate the error to the subscriber, unless it is already done.
                    if (sourceState === State.Running) {
                        subscriber.error(e);
                    }
                },
                complete: () => {
                    suppressorState = State.Completed;
                    if (isSuppressing && sourceState === State.Completed) {
                        subscriber.complete();
                    }
                }
            });

            // This stupid function is here only to prevent WebStorm from flagging `suppressorState === State.Error` and
            // `suppressorState == State.Completed` as impossible. Flagging those statements with @ts-ignore does not work
            // because lint complains about @ts-ignore being unnecessary. `suppressorState` *can* be modified from its
            // initialization above because `suppressor` might be a "cold" observable that emits all its values immediately
            // on subscription, triggering the changes to `suppressorState` in the observer functions above.
            function hasState(state: State, candidate: State) {
                return state === candidate;
            }

            if (hasState(suppressorState, State.Error)) {
                // If there was already an error from the suppressor, it was already passed on to the subscriber and there is
                // nothing further to do.
                return FINISHED;
            }
            else if (isSuppressing && hasState(suppressorState, State.Completed)) {
                // If the suppressor is one that generates all of its values immediately, it may have already finished.
                // If it finished while suppression was enabled, nothing will ever be generated.
                subscriber.complete();
                return FINISHED;
            }

            sourceSubscription = source.subscribe({
                next: (value: T) => {
                    if (!isSuppressing) {
                        // If not suppressing, values should go straight through to the subscriber.
                        subscriber.next(value);
                    }
                    else {
                        // Otherwise, track the most value from the source, so it can be output when suppression ends.
                        suppressedValue = value;
                        hasSuppressedValue = true;
                    }
                },
                error: (e) => {
                    sourceState = State.Error;

                    // There will be nothing left to do, so drop the subscription on the suppressor.
                    suppressorSubscription.unsubscribe();

                    // Propagate the error.
                    subscriber.error(e);
                },
                complete: () => {
                    sourceState = State.Completed;
                    if (!isSuppressing || suppressorState === State.Completed) {
                        subscriber.complete();
                        suppressorSubscription.unsubscribe();
                    }
                }
            });

            return sourceSubscription;
        });
    };
}
