// Vendor
import { groupBy } from 'lodash';
import { pipe } from 'fp-ts/lib/function';
import { fromNullable, getOrElse, map } from 'fp-ts/lib/Option';
import { delay } from 'fp-ts/lib/Task';
import { Collection, Dexie } from 'dexie';
import { v4 } from 'uuid';

// Internal
import { Semigroup } from 'fp-ts/lib/Semigroup';
import { EntityWithId } from 'flatfile-import/application/publisher/publisher-service';
import { Option } from 'fp-ts/Option';
import { DataAggregator } from '../application/data-aggregator';
import { FlatfileEntity, LocalDatabase } from './db';

interface RetryConfig {
  retries: number;
  delay: number;
}

async function* aggregateDataInBatches<E>(
  fetchBatch: (offset: number, limit: number) => Promise<E[]>,
  getDataTotalSize: () => Promise<number>,
  batchSize: number
) {
  const total = await getDataTotalSize();
  let currentPulled = 0;
  while (currentPulled < total) {
    try {
      // eslint-disable-next-line no-await-in-loop
      yield await fetchBatch(currentPulled, batchSize);
    } catch (e) {
      // eslint-disable-next-line no-console
      console.error(
        `Batch [${currentPulled}, ${
          currentPulled + batchSize
        }] is going to be discard because error: ${e}.`
      );
      yield [];
    } finally {
      // We move the pointer even if it fails, so we can continue pulling values
      currentPulled += batchSize;
    }
  }
}

type ConcatAllFn<E> = (a: E[]) => Option<E>;

export class PersistedAggregator<E extends EntityWithId>
  implements DataAggregator<E>
{
  db: LocalDatabase<E & FlatfileEntity>;

  processId: string;

  semigroup: Semigroup<E>;

  concatAllFn: ConcatAllFn<E>;

  constructor(
    db: LocalDatabase<E & FlatfileEntity>,
    semigroup: Semigroup<E>,
    concatAllFn: (l1: E[]) => Option<E>
  ) {
    this.db = db;
    this.semigroup = semigroup;
    this.concatAllFn = concatAllFn;
    // We will start with an UUID in case we don't set the Flatfile session one
    this.processId = v4();
  }

  setProcessId(processId: string) {
    this.processId = processId;
  }

  clean(): Promise<void> {
    return this.queryProcessIdEntities()
      .delete()
      .then(() => {});
  }

  async add(entity: E): Promise<void> {
    const newCustomer = { processId: this.processId, ...entity };
    return this.db.transaction('rw', this.db.entities, async () => {
      try {
        const existingCustomer = await this.db.entities.get([
          entity.id,
          this.processId,
        ]);
        const updatedCustomer: E & FlatfileEntity = pipe(
          fromNullable(existingCustomer),
          map((eCust) => this.semigroup.concat(eCust, entity)),
          map((cust) => ({ ...cust, processId: this.processId })),
          getOrElse(() => newCustomer)
        );
        await this.db.entities.put(updatedCustomer, [
          entity.id,
          this.processId,
        ]);
      } catch (e) {
        // eslint-disable-next-line no-console
        console.error('Local customer persistence failed with error:', e);
      }
    });
  }

  private readonly queryProcessIdEntities = (): Collection<
    E & FlatfileEntity,
    [string, string]
  > =>
    this.db.entities
      .where('[id+processId]')
      .between([Dexie.minKey, this.processId], [Dexie.maxKey, this.processId]);

  size = (): Promise<number> => this.queryProcessIdEntities().count();

  aggregate(batchSize: number): AsyncGenerator<(E & FlatfileEntity)[]> {
    const retryConfig = { retries: 2, delay: 200 };
    // Get customers fn with retry logic
    const getEntities = (
      offset: number,
      limit: number,
      retryConfig: RetryConfig
    ): Promise<(E & FlatfileEntity)[]> =>
      this.queryProcessIdEntities()
        .offset(offset)
        .limit(limit)
        .toArray()
        .catch((e) =>
          retryConfig.retries > 0
            ? delay(retryConfig.delay)(() =>
                getEntities(offset, limit, {
                  retries: retryConfig.retries - 1,
                  delay: retryConfig.delay,
                })
              )()
            : Promise.reject(e)
        );
    return aggregateDataInBatches(
      (offset, limit) => getEntities(offset, limit, retryConfig),
      this.size,
      batchSize
    );
  }

  addBulk(entities: E[]): Promise<void> {
    if (entities.length === 0) {
      return Promise.resolve();
    }
    // first aggregate in entities
    const firstGroupedEntities = groupBy(entities, 'id');
    const uniqCustomerIds = Object.keys(firstGroupedEntities);
    const getKeys: [string, string][] = uniqCustomerIds.map((key) => [
      key,
      this.processId,
    ]);
    const firstAggregatedCustomers = uniqCustomerIds.reduce((acc, key) => {
      const optEntity = this.concatAllFn(firstGroupedEntities[key]);
      return pipe(
        optEntity,
        map((ent) => acc.concat([ent])),
        getOrElse(() => acc)
      );
    }, [] as E[]);

    const inboundGroupedEntities = groupBy(firstAggregatedCustomers, 'id');
    const flatfileEntities = firstAggregatedCustomers.map((cust) => ({
      ...cust,
      processId: this.processId,
    }));

    return this.db
      .transaction('rw', this.db.entities, async () => {
        try {
          // Get the customers to update
          const fetchedEntities = (
            await this.db.entities.bulkGet(getKeys)
          ).filter((cust) => cust !== undefined) as (E & FlatfileEntity)[];
          const fetchedIdsMap = fetchedEntities.reduce(
            (acc, next) => acc.set(next.id, true),
            new Map<string, boolean>()
          );
          const customersToAdd: (E & FlatfileEntity)[] =
            flatfileEntities.filter((cust) => !fetchedIdsMap.get(cust.id));

          // Aggregate the customers to update
          const customersToUpdate = fetchedEntities.reduce((acc, next) => {
            const c = inboundGroupedEntities[next.id].reduce(
              (accG, nextG) => this.semigroup.concat(accG, nextG),
              next
            );
            return acc.concat([{ ...c, processId: this.processId }]);
          }, [] as (E & FlatfileEntity)[]);

          const customersToPersist = customersToAdd.concat(customersToUpdate);
          // Persist the customers
          await this.db.entities.bulkPut(customersToPersist);
        } catch (e) {
          // eslint-disable-next-line no-console
          console.error(e);
          throw e;
        }
      })
      .then(() => {});
  }
}
