import Bottleneck from 'bottleneck';
import { useEffect } from 'react';

import { asyncTaskDao } from '../../../data/dao/asyncTaskDao';
import { asyncTaskUpdatedEventStream } from '../../../data/events/asyncTaskUpdatedEventStream';
import { AsyncTask, AsyncTaskStatus } from '../../../domain/objects/AsyncTask';
import { AsyncTaskUpdatedEvent } from '../../../domain/objects/AsyncTaskUpdatedEvent';
import { executeAsyncTask } from '../../../logic/asyncTask/executeAsyncTask';
import { UnexpectedCodePathError } from '../../../utils/errors/UnexpectedCodePathError';
import { log } from '../../../utils/log';
import { sleep } from '../../../utils/sleep';

const bottleneck = new Bottleneck({ maxConcurrent: 3 });

const queueAsyncTaskExecutionBehindBottleneck = async ({
  task,
}: {
  task: AsyncTask;
}) => {
  await bottleneck.schedule(async () => {
    await executeAsyncTask({ externalId: task.externalId });
  });
};

const queueAsyncTaskExecutionBehindBottleneckForAllPersistedQueuedTasks =
  async () => {
    const dbQueuedTasks: AsyncTask[] = await asyncTaskDao.findAllByStatus({
      status: AsyncTaskStatus.QUEUED,
    });
    const dbAttemptedTasks: AsyncTask[] = await asyncTaskDao.findAllByStatus({
      status: AsyncTaskStatus.ATTEMPTED,
    });
    const tasks = [...dbQueuedTasks, ...dbAttemptedTasks];
    log.info(
      'queueing all db queued async tasks into in-memory execution queue',
      { tasks, count: tasks.length },
    );
    await Promise.all(
      tasks.map((task) => queueAsyncTaskExecutionBehindBottleneck({ task })),
    );
    log.info(
      'queued all db queued async tasks into in-memory execution queue',
      { tasks, count: tasks.length },
    );
  };

const onAsyncTaskUpdatedEvent = async (event: AsyncTaskUpdatedEvent) => {
  // if we wern't told that this task was queued, we're not interested
  if (event.asyncTaskStatus !== AsyncTaskStatus.QUEUED) return;

  // if this task's state isn't still queued, we're not interested
  await sleep(300);
  const task: AsyncTask | null = await asyncTaskDao.findByUnique({
    externalId: event.asyncTaskExternalId,
  });
  if (!task)
    throw new UnexpectedCodePathError(
      'consumed asyncTaskUpdatedEvent whos asyncTaskExternalId did not reference a real task',
      { event, task },
    );
  if (task.status !== AsyncTaskStatus.QUEUED) return;

  // since the reported task is still in queued status, we should queue it into our in-memory execution queue
  await queueAsyncTaskExecutionBehindBottleneck({ task });
};

/**
 * the async task provider
 * - loads up all queued async-tasks on mount
 * - subscribes to async-task updated events to continue adding newly ready events
 */
export const AsyncTaskProvider = () => {
  useEffect(() => {
    // on load, queue into the in-memory execution queue from those tasks queued in db
    void queueAsyncTaskExecutionBehindBottleneckForAllPersistedQueuedTasks();

    // also, subscribe to async task updated events, to queue into the in-memory execution queue when new tasks are saved to db
    asyncTaskUpdatedEventStream.subscribe({
      consumer: onAsyncTaskUpdatedEvent,
    });
  }, []);
  return null;
};
