// Lodash
import _ from 'lodash';

// Reactive X
import {
  BehaviorSubject,
  concat,
  interval,
  isObservable,
  merge,
  MonoTypeOperatorFunction,
  NEVER,
  Observable,
  ObservedValueOf,
  OperatorFunction,
  ReplaySubject,
  Subject,
  throwError,
} from 'rxjs';
import {
  catchError,
  concatMap,
  delay,
  distinctUntilChanged,
  distinctUntilKeyChanged,
  filter,
  finalize,
  map,
  retryWhen,
  scan,
  switchMap,
  take,
  takeUntil,
  takeWhile,
  tap,
} from 'rxjs/operators';


/**
 * Executes the given callback everytime the source observable emits a value.
 * @param callback The callback to execute.
 * @returns A RxJS operator function.
 */
export function onNext<T>(callback: (value: T) => void): MonoTypeOperatorFunction<T> {
  return (source$: Observable<T>) => {
    return source$.pipe(
      tap({
        next: (value) => callback(value),
      }),
    );
  };
}

/**
 * Executes the given callback everytime the source observable errors.
 * @param callback The callback to execute.
 * @returns A RxJS operator function.
 */
export function onError<T>(callback: (error: any) => void): MonoTypeOperatorFunction<T> {
  return (source$: Observable<T>) => {
    return source$.pipe(
      tap({
        error: (error) => callback(error),
      }),
    );
  };
}

/**
 * Executes the given callback everytime the source observable completes.
 * @param callback The callback to execute.
 * @returns A RxJS operator function.
 */
export function onComplete<T>(callback: () => void): MonoTypeOperatorFunction<T> {
  return (source$: Observable<T>) => {
    return source$.pipe(
      tap({
        complete: () => callback(),
      }),
    );
  };
}

/**
 * Executes the given callback everytime the source observable closes.
 * @param callback The callback to execute.
 * @returns A RxJS operator function.
 */
export function onClose<T>(
  callback: (reason: 'error' | 'complete' | 'unsubscribe') => void,
): MonoTypeOperatorFunction<T> {
  return (source$: Observable<T>) => {
    let errored: boolean = false;
    let completed: boolean = false;

    return source$.pipe(
      tap({
        error: () => (errored = true),
        complete: () => (completed = true),
      }),
      finalize(() => {
        if (completed) {
          callback('complete');
        } else if (errored) {
          callback('error');
        } else {
          callback('unsubscribe');
        }
      }),
    );
  };
}

/**
 * Makes the source observable complete whenever the notifier completes.
 * @param notifier$ The notifier.
 * @returns A RxJS operator function.
 */
export function takeDuring<T>(notifier$: Observable<any>) {
  return (source$: Observable<T>) => {
    const complete$: Subject<void> = new Subject();

    return merge(
      source$.pipe(
        // Complete when the source observable completes
        onComplete(() => {
          complete$.next();
          complete$.complete();
        }),
      ),
      notifier$.pipe(
        // Complete when the notifier completes
        onComplete(() => {
          complete$.next();
          complete$.complete();
        }),
        // Don't emit values by the notifier
        filter(() => false),
      ),
    ).pipe(takeUntil(complete$));
  };
}

/**
 * Runs the given observable for the lifetime of the source observable.
 * However, no emissions by the given observable are passed on.
 * @param observable The observable to 'run in the background'.
 * @returns A RxJS operator function.
 */
export function inBackground<T>(observable: Observable<any>): MonoTypeOperatorFunction<T> {
  return (source$: Observable<T>) => {
    return merge(
      source$,
      observable.pipe(
        filter(() => false),
        takeDuring(source$),
      ),
    ) as Observable<T>;
  };
}

/**
 * Behaves like the 'switchMap' operator. If however an error occurs during a projection
 * immediately after canceling atleast one projection, those canceled projections are retried
 * in order from most recent to least recent, until the first of them successfully completes.
 * @param project The projection to map to.
 * @returns A RxJS operator function.
 */
export function fallbackMap<SourceValue, Projection extends Observable<any>>(
  project: (value: SourceValue, index: number) => Projection,
): OperatorFunction<SourceValue, ObservedValueOf<Projection>> {
  return (source$: Observable<SourceValue>) => {
    const fallback$: Subject<Projection> = new Subject();
    let unfinishedProjections: Projection[] = [];

    return merge(
      fallback$.pipe(map((projection) => ({ projection, projectionOrigin: 'fallback' }))),
      source$.pipe(map((value, index) => ({ projection: project(value, index), projectionOrigin: 'source' }))),
    ).pipe(
      switchMap((projectionWithOrigin) => {
        const { projection, projectionOrigin } = projectionWithOrigin;

        return projection.pipe(
          onComplete(() => {
            // On success clear queue of unfinished projections
            unfinishedProjections = [];
          }),
          onError(() => {
            // Remove unfinished projections if they errored during fallback
            if (projectionOrigin !== 'fallback') return;
            unfinishedProjections = unfinishedProjections.slice(0, -1);
          }),
          onClose((reason) => {
            // Only consider unfinished projections
            if (reason !== 'unsubscribe') return;

            // Don't re-add unfinished projections
            if (projectionOrigin === 'fallback') return;

            // Add to queue of unfinished projections
            unfinishedProjections.push(projection);
          }),
          catchError((error: any) => {
            // Propagate error if no more unfinished projections left
            if (unfinishedProjections.length < 1) return throwError(() => error);

            // Fallback to the most recent unfinished projection
            const latestUnfinishedProjection = unfinishedProjections[unfinishedProjections.length - 1];
            fallback$.next(latestUnfinishedProjection);

            return NEVER;
          }),
        );
      }),
    );
  };
}

/**
 * Buffers any emissions of the source observable until the notifier emits.
 * All buffered values are then emitted immediately and in sequence.
 * From then any following emissions of the source observable are just passed on.
 * @param notifier$ The observable signaling when to stop buffering.
 * @returns A RxJS operator function.
 */
export function bufferUntil<T>(notifier$: Observable<any>): MonoTypeOperatorFunction<T> {
  return (source$: Observable<T>) => {
    let shouldBuffer: boolean = true;

    const buffer$ = new ReplaySubject<T>();
    source$.pipe(takeWhile(() => shouldBuffer, true)).subscribe(buffer$);

    return notifier$.pipe(
      take(1),
      switchMap(() => {
        shouldBuffer = false;
        return concat(buffer$, source$);
      }),
    );
  };
}

/**
 * Retries the source observable up to a given limit of retries and waits
 * for the provided period of time between them.
 * If the abort condition is fulfilled, this operator will stop retrying.
 * @param count The limit of retries.
 * @param wait The time between retries.
 * @param abortCondition The condition on which to stop retrying.
 * @returns A RxJS operator function.
 */
export function retryUnless<T>(
  count: number,
  wait: number,
  abortCondition: (error) => boolean,
): MonoTypeOperatorFunction<T> {
  return retryWhen<T>((errors) =>
    errors.pipe(
      scan((accumulator, error) => {
        // Error immediately if abort condition fulfilled.
        if (abortCondition(error)) {
          throw error;
        }

        // Error if the maximum amount of retries was reached.
        if (accumulator >= count) {
          throw error;
        }

        // Otherwise increment the accumulator.
        return accumulator + 1;
      }, 0),
      // Wait the specified number of milliseconds between retries.
      delay(wait),
    ),
  );
}

/**
 * At first the given initial value is emitted immediately.
 * For any update emitted by the source observable, the update is applied
 * and emitted immediately using the provided 'applyUpdate' function.
 * The actual update is then performed using the provided 'performUpdate' function.
 * If this update fails the value present before applying the update is reverted to (by emitting it).
 * If the update succeeds the next update in queue is applied and performed the same way.
 * If the first update fails, this operator reverts to the provided initial value.
 * @param initialValue The initial value or value stream.
 * @param applyUpdate Function which applies the given update locally.
 * @param performUpdate Function which applies the given update remotely.
 * @returns A RxJS operator function.
 */
export function optimisticUpdate<UpdateType, ValueType>(
  initialValue: ValueType | Observable<ValueType>,
  applyUpdate: (currentValue: ValueType, update: UpdateType) => ValueType,
  performUpdate: (currentValue: ValueType, update: UpdateType) => Observable<ValueType>,
): OperatorFunction<UpdateType, ValueType> {
  return (source$: Observable<UpdateType>) => {
    const value$: Subject<ValueType> = new ReplaySubject(1);

    if (isObservable(initialValue)) {
      initialValue.pipe(take(1), takeDuring(source$)).subscribe((value) => value$.next(value));
    } else {
      value$.next(initialValue);
    }

    const update$ = source$.pipe(
      concatMap((update: UpdateType) => {
        return value$.pipe(
          take(1),
          switchMap((valueBeforeUpdate: ValueType) => {
            // Optimistic update
            value$.next(applyUpdate(valueBeforeUpdate, update));

            // Actual update
            return performUpdate(valueBeforeUpdate, update).pipe(
              onClose((reason) => {
                // Revert to original value if update does not complete successfully
                if (reason === 'complete') return;
                value$.next(valueBeforeUpdate);
              }),
            );
          }),
        );
      }),
    );

    return value$.pipe(inBackground(update$), takeDuring(source$));
  };
}

export function createBehaviorSubject<T>(initialValue: T, observable: Observable<T>): BehaviorSubject<T> {
  const behaviorSubject = new BehaviorSubject(initialValue);
  observable.subscribe(behaviorSubject);

  return behaviorSubject;
}

/**
 * Filters out emissions which have the same projected values.
 * @param project The projection function.
 * @returns A RxJS operator function.
 */
export function distinctUntilProjectionChanged<T>(project: (value: T) => any): MonoTypeOperatorFunction<T> {
  return (source$: Observable<T>) => {
    return source$.pipe(
      map((value: T) => ({
        originalValue: value,
        projectedValue: project(value),
      })),
      distinctUntilKeyChanged('projectedValue'),
      map(({ originalValue }) => originalValue),
    );
  };
}

export function distinctUntilDeepChanged<T>(): MonoTypeOperatorFunction<T> {
  return (source$: Observable<T>) => {
    return source$.pipe(
      distinctUntilChanged((a, b) => _.isEqual(a, b)),
    );
  };
}

export class IncrementingObservableController {
  private restartSubject = new Subject<void>();

  restartObservable() {
    this.restartSubject.next();
  }

  getRestartObservable(): Observable<void> {
    return this.restartSubject.asObservable();
  }
}

export function createRangedIncrementingObservable(parameters: {
  maxValue: number, stepSize: number, duration: number
}, controller: IncrementingObservableController): Observable<number> {
  const { maxValue, stepSize, duration } = parameters;
  const totalSteps = Math.ceil(maxValue / stepSize);
  return controller.getRestartObservable().pipe(
    switchMap(() =>
      interval(duration / totalSteps).pipe(
        map((value) => value * stepSize),
        takeWhile((value) => value <= maxValue)
      )
    )
  );
}

