// Vendor
import { from, reduce } from '@reactivex/ix-es2015-esm/asynciterable';
import {
  buffer,
  concatMap,
  flatMap,
} from '@reactivex/ix-es2015-esm/asynciterable/operators';

// Internal
import { UpsertCustomer } from 'types/generated/UpsertCustomer';
import { Publisher } from '../publisher';
import { BatchProcessResponse } from '../../infrastructure/http-publisher';
import {
  EntityWithId,
  PublisherService,
  PublishResult,
} from './publisher-service';
import { PublishAdapter } from './publish-adapter';

const accumulatePublishedResults = <PIn, POut extends EntityWithId>(
  currentAcc: PublishResult<POut>,
  results: BatchProcessResponse,
  customersMap: Map<string, PIn>,
  adapter: (input: PIn) => POut
): PublishResult<POut> => {
  const { withErrors, successfully } = currentAcc;
  return {
    successfully: successfully.concat(
      results.successful.map((id) => adapter(customersMap.get(id) as PIn))
    ),
    withErrors: withErrors.concat(
      results.rejected.map((entityError) => ({
        entity: adapter(customersMap.get(entityError.id) as PIn),
        error: entityError.error,
      }))
    ),
  };
};

export class BatchPublisherService<
  PIn extends EntityWithId,
  POut extends EntityWithId
> implements PublisherService<PIn, POut>
{
  publisher: Publisher<UpsertCustomer[], BatchProcessResponse>;

  constructor(publisher: Publisher<UpsertCustomer[], BatchProcessResponse>) {
    this.publisher = publisher;
  }

  async publish(
    customers: AsyncGenerator<PIn[]>,
    concurrency: number,
    onPublishedCustomers: (customerIds: string[]) => void,
    publishAdapter: PublishAdapter<PIn, POut>
  ): Promise<PublishResult<POut>> {
    const processBatch = (
      batch: PIn[]
    ): Promise<{ result: BatchProcessResponse; batch: PIn[] }> =>
      this.publisher
        .publish(batch.map(publishAdapter.convert))
        .catch(
          (e) =>
            ({
              successful: [],
              rejected: batch.map((cust) => ({
                id: cust.id,
                error: {
                  kind: 'batch-process-error',
                  message: `Batch processing error: ${e.message}`,
                },
              })),
            } as BatchProcessResponse)
        )
        .then((results) => {
          // Notify published customers
          onPublishedCustomers(
            results.successful.concat(results.rejected.map((rej) => rej.id))
          );
          return { result: results, batch };
        });

    const result: PublishResult<POut> = {
      withErrors: [],
      successfully: [],
    };

    const mappedSource = from(customers)
      .pipe(buffer(concurrency))
      .pipe(
        concatMap((customerBatches) =>
          from(Promise.all(customerBatches.map((batch) => processBatch(batch))))
        )
      )
      .pipe(flatMap((results) => from(results)));

    return reduce(mappedSource, {
      seed: result,
      callback: (acc, next) => {
        const customersMap = new Map(
          next.batch.flatMap((b) => b).map((cust) => [cust.id, cust])
        );
        const updatedResults = accumulatePublishedResults(
          acc,
          next.result,
          customersMap,
          publishAdapter.convert
        );
        return {
          successfully: updatedResults.successfully,
          withErrors: updatedResults.withErrors,
        };
      },
    });
  }
}
