import { BehaviorSubject, from, Observable, of } from 'rxjs';
import { bufferTime, catchError, filter, map, mergeMap, scan, takeWhile } from 'rxjs/operators';

const concurrencyNumber = 5;

class BatchItemResult<U> {
  itemId: string;
  result: U;
  error: U;

  constructor(itemId: any, result: U, error: U) {
    this.itemId = itemId;
    this.result = result;
    this.error = error;
  }
}

class BatchItem<T, U> {
  line: number;
  item: T;
  result: U;
  error: U;

  constructor(item: T, line: number) {
    this.line = line;
    this.item = item;
  }

  mapResult(result: U) {
    this.result = result;
    return this;
  }

  mapError(error: U) {
    this.error = error;
    return this;
  }
}

type BatchHandler<T, U> = (item: T) => Observable<U>;

class BatchRequest {
  indices: number[];
  totalItemCount: number;

  constructor(indices: number[], totalItemCount: number) {
    this.indices = indices || [];
    this.totalItemCount = totalItemCount;
  }
}

class BatchResult<U> {
  itemsRemainingCount: number;
  itemsProcessingCount: number;
  results: Map<number, BatchItemResult<U>> = new Map<number, BatchItemResult<U>>();
  lastOffset: number;
  totalItemCount: number;
  maxConcurrency: number;

  constructor(concurrency: number, totalItemCount: number) {
    this.itemsProcessingCount = concurrency;
    this.itemsRemainingCount = totalItemCount;
    this.maxConcurrency = concurrency;
    this.totalItemCount = totalItemCount;
    this.lastOffset = concurrency;
  }

  hasReceived(item: BatchItem<any, U>) {
    this.results.set(item.line, new BatchItemResult<U>(item.item.id, item.result, item.error));
    this.itemsProcessingCount--;
    this.itemsRemainingCount--;
  }

  nextBatchRequest(): BatchRequest {
    const nextOffsetToProcess = [];
    for (
      let i = this.lastOffset;
      i <
      Math.min(
        this.lastOffset + (this.maxConcurrency - this.itemsProcessingCount),
        this.totalItemCount
      );
      i++
    ) {
      this.lastOffset++;
      this.itemsProcessingCount++;
      nextOffsetToProcess.push(i);
    }
    return new BatchRequest(nextOffsetToProcess, this.totalItemCount);
  }
}

function handleBatch<T extends { line?: number }, U>(
  data: Array<T>,
  handler: BatchHandler<T, U>,
  req: BatchRequest
): Observable<BatchItem<T, U>> {
  const slice: Array<BatchItem<T, U>> = data
    .map((item: T, i: number) => new BatchItem<T, U>(item, item.line || i))
    .filter(
      (item: BatchItem<T, U>, i: number) => req.indices.some((index: number) => index === i)
      // req.indices.findIndex((index: number) => index === i) >= 0
    );

  return of(slice).pipe(
    // If our batch is the last one, our array will be empty. We will complete our controller automatically then (and it will discard the following steps)
    takeWhile(batchItems => batchItems.length > 0),
    // We are emitting a batch of N items. Transform it into N emissions
    mergeMap(batchItems => from(batchItems)),
    // Do N requests in parallel (concurrency)
    mergeMap(
      batchItem =>
        handler(batchItem.item).pipe(
          map(r => batchItem.mapResult(r)),
          catchError(err => {
            batchItem.mapError(err);
            return of(batchItem);
          })
        ),
      concurrencyNumber
    )
  );
}

export default function processItemsInBatch<T, U>(
  data: Array<T>,
  handler: BatchHandler<T, U>,
  concurrency: number
): Observable<BatchResult<U>> {
  // We initialize your controller with the first batch

  const controller = new BehaviorSubject<BatchRequest>(
    new BatchRequest(Array.from(Array(concurrency).keys()), data.length)
  );

  return controller.pipe(
    // Retrieve our next batch: a set of N (N=batchSize) items to process
    mergeMap(batch => handleBatch<T, U>(data, handler, batch)),
    // Fetch results in buffers of 300ms
    bufferTime(300, undefined, concurrency),
    // Discard all buffers which are empty... If your requests are too long ^^
    filter(b => b.length > 0),
    // Scan will accumulate the queries of your batches (it may arrive at any time) and will launch the subsequent requests
    // For example: you ask for 3 concurrent queries : it will wait for 3 queries to be done before calling the next batch
    scan((result: BatchResult<U>, items: BatchItem<T, U>[]) => {
      items.forEach((it: BatchItem<T, U>) => {
        result.hasReceived(it);
      });
      // Here, we are emitting the next value (we are telling "thanks. my batch is done, take the next one please")
      controller.next(result.nextBatchRequest());
      return result;
    }, new BatchResult<U>(concurrency, data.length))
  );
}
