import { CollectionViewer, DataSource } from '@angular/cdk/collections';
import { CdkVirtualScrollViewport } from '@angular/cdk/scrolling';
import { applyFilter } from '@nestjs-query/core/dist/src/helpers/filter.helpers';
import { applySort } from '@nestjs-query/core/dist/src/helpers/query.helpers';
import uniqBy from 'lodash/uniqBy';
import { BehaviorSubject, combineLatest, merge, Observable, Subject } from 'rxjs';
import {
  debounceTime,
  map,
  mergeMap,
  scan,
  shareReplay,
  startWith,
  switchMap,
  takeUntil,
  tap,
  filter as rxjsfilter,
  bufferTime,
  take,
  skip
} from 'rxjs/operators';
import { DocumentFilter, Document, PageInfo, DocumentSort } from 'src/app/graphql/frontend-data-graphql';

import { DocumentService } from './document.service';

interface IDocumentDataSourceOptions {
  filter: Observable<DocumentFilter>;
  sorting: Observable<DocumentSort[]>;
  documentService: DocumentService;
  viewport?: Observable<CdkVirtualScrollViewport>;
  destroy: Observable<any>;
  pageSize?: number; // default 25,
  resolveParent?: boolean; // default true
  resolveRoot?: boolean; // default true
  withValue?: boolean; // default false
  withPredictionSource?: boolean; // default false
  skipLatestUpdate?: boolean; // Option to skip the buffered last updatedOneDocument event for this data source, default false
}

export class DocumentDataSource extends DataSource<Document> {
  public loading$ = new BehaviorSubject<boolean>(true);

  archiveMode = false;
  private complete$ = new Subject<void>();
  private disconnect$ = new Subject<void>();
  private cursor: string | null;
  private _reload$ = new BehaviorSubject<boolean>(true);

  data$: Observable<Document[]>;
  count$: Observable<number>;
  pageInfo$ = new Subject<PageInfo>();
  scrollIndex$ = new BehaviorSubject<number>(0);

  constructor(options: IDocumentDataSourceOptions) {
    super();

    const filterOrSortUpdates$ = combineLatest([options.filter, options.sorting]).pipe(
      debounceTime(500),
      tap(() => {
        this.cursor = null;
      }),
      takeUntil(options.destroy),
      shareReplay({
        bufferSize: 1,
        refCount: true
      })
    );

    options.viewport?.subscribe(viewport =>
      viewport
        ? filterOrSortUpdates$
            .pipe(
              switchMap(() => viewport.scrolledIndexChange),
              skip(1)
            )
            .subscribe(idx => {
              this.scrollIndex$.next(idx);
              this.refetchIfNecessary(viewport);
            })
        : null
    );

    // insertions from infinite scrolling
    const scrollUpdate$ = filterOrSortUpdates$.pipe(
      mergeMap(([filter, sorting]) => {
        return this._reload$.pipe(
          tap(() => this.loading$.next(true)),
          takeUntil(this.complete$),
          switchMap(() => {
            return options.documentService.documents
              .fetch({
                filter,
                paging: { after: this.cursor, first: options.pageSize ?? 25 },
                sorting: sorting,
                resolveParent: options.resolveParent ?? true,
                resolveRoot: options.resolveRoot ?? true,
                withValue: options.withValue ?? false,
                withPredictionSource: options.withPredictionSource ?? false
              })
              .pipe(
                tap(() => {
                  this.loading$.next(false);
                })
              );
          })
        );
      }),
      map(res => {
        const pageInfo = res.data.documents.pageInfo;
        this.pageInfo$.next(pageInfo);
        if (!pageInfo.hasNextPage) {
          this.complete$.next();
        } else {
          this.cursor = pageInfo.endCursor;
        }
        return {
          remove: [],
          insert: res.data.documents.edges.map(e => e.node as Document),
          updateOrInsert: []
        };
      }),
      takeUntil(options.destroy)
    );

    // insertion changes
    const created$ = options.filter.pipe(
      switchMap(filter =>
        options.documentService.createdDocument
          .subscribe({
            filter
          })
          .pipe(
            takeUntil(options.destroy),
            map(res => res.data!.createdDocument as Document),
            bufferTime(500),
            rxjsfilter(res => res.length > 0),
            map(res => ({
              updateOrInsert: res,
              remove: [],
              insert: []
            }))
          )
      ),
      takeUntil(options.destroy)
    );

    // update-change detected
    const updated$ = options.filter.pipe(
      switchMap(filter => {
        const localFilter = filter.and ? filter.and![0] : filter;
        return options.documentService.updatedOneDocument$.pipe(
          takeUntil(options.destroy),
          bufferTime(500),
          rxjsfilter(res => res.length > 0),
          map(subscriptionUpdates => {
            // Make sure we only process the last update (in a sequence) for each document
            const lastUpdatesPerDocument = uniqBy(subscriptionUpdates.reverse(), e => e.id);
            return {
              updateOrInsert: lastUpdatesPerDocument.filter(u => applyFilter(u, filter as any)),
              remove: lastUpdatesPerDocument.filter(u => !applyFilter(u, filter as any)),
              insert: []
            };
          }),
          skip(options.skipLatestUpdate ? 1 : 0)
        );
      }),
      takeUntil(options.destroy)
    );

    const deleted$ = options.filter.pipe(
      switchMap(filter =>
        options.documentService.deletedOneDocument.subscribe({ filter }).pipe(
          takeUntil(options.destroy),
          map(res => res.data!.deletedOneDocument as Document),
          bufferTime(500),
          rxjsfilter(res => res.length > 0),
          map(res => ({
            updateOrInsert: [],
            remove: res,
            insert: []
          }))
        )
      ),
      takeUntil(options.destroy)
    );

    // data source
    this.data$ = filterOrSortUpdates$.pipe(
      switchMap(([filter, sorting]) =>
        merge(scrollUpdate$, created$, updated$, deleted$).pipe(
          scan((acc: any, batch) => {
            const { remove, updateOrInsert, insert } = batch;
            // append insertions
            const updatedList: Document[] = [...acc];
            let pulledByUser = false;
            if (insert.length > 0) pulledByUser = true;
            const updatedDocuments = [];

            // handle inserst and updateOrInsert
            for (const updatedDocument of [...updateOrInsert, ...insert]) {
              const idx = updatedList.findIndex(d => d.id == updatedDocument.id);
              if (idx > -1 && updatedList[idx].updated <= updatedDocument.updated!) {
                const ud = { ...updatedList[idx], ...updatedDocument };
                // exchange in list
                updatedList.splice(idx, 1, ud as Document);
                updatedDocuments.push(ud);
              } else {
                // add to list if not found
                updatedList.push(updatedDocument as Document);
                if (!pulledByUser) updatedDocuments.push(updatedDocument);
              }
            }

            // handle removals
            for (const removedDocument of remove ?? []) {
              const idx = updatedList.findIndex(d => d.id === removedDocument.id);
              if (idx > -1) {
                updatedList.splice(idx, 1);
              }
            }

            if (options.viewport && remove.length > 0)
              options.viewport.pipe(take(1)).subscribe(viewport => (viewport ? this.refetchIfNecessary(viewport) : null));

            const sortedUpdatedList = sorting ? applySort(updatedList, sorting as any) : updatedList;

            return sortedUpdatedList;
          }, [])
        )
      ),
      takeUntil(options.destroy),
      shareReplay({
        bufferSize: 1,
        refCount: true
      })
    );

    this.count$ = options.filter.pipe(
      switchMap(filter => {
        return this.data$.pipe(
          startWith([]),
          debounceTime(500),
          mergeMap(res => options.documentService.countDocuments.fetch({ filter }).pipe(map(res => res.data.documents.totalCount)))
        );
      }),
      takeUntil(options.destroy)
    );
  }

  refetchIfNecessary(viewport: CdkVirtualScrollViewport) {
    const total = viewport.getDataLength();
    const { start, end } = viewport.getRenderedRange();

    const bottomScrollBufferPx = viewport.measureScrollOffset('bottom');
    const loadTheNextPage = bottomScrollBufferPx <= 500;
    if (end == total || loadTheNextPage) {
      this._reload$.next(true);
    }
  }
  completed(): Observable<void> {
    return this.complete$.asObservable();
  }

  connect(collectionViewer: CollectionViewer): Observable<Document[]> {
    return this.data$.pipe(startWith([]));
  }

  disconnect(): void {
    this.disconnect$.next();
    this.disconnect$.complete();
  }
}
