import pLimit from "p-limit";
import { allSettled } from "./all-settled";
// Node does not have a native AbortController, so we need to import this.
// Thankfully, it works just fine in the browser.

type ConcurrencyLimitResult<T> =
    | {
          kind: "timeout";
      }
    | {
          kind: "success";
          result: T;
      };

export async function withBackpressureConcurrencyLimiter(
    maxConcurrency: number,
    f: (limiter: ConcurrencyLimiterWithBackpressure) => Promise<unknown>
): Promise<void> {
    const limiter = new ConcurrencyLimiterWithBackpressure(maxConcurrency);
    try {
        await f(limiter);
    } finally {
        await limiter.finish();
    }
}

export async function mapConcurrently<T, U>(
    arr: readonly T[],
    maxConcurrency: number,
    f: (x: T) => Promise<U>
): Promise<U[]> {
    const limit = pLimit(maxConcurrency);
    return await Promise.all(arr.map(async x => limit(async () => await f(x))));
}

export async function mapFilterUndefinedConcurrently<T, U>(
    arr: readonly T[],
    maxConcurrency: number,
    f: (x: T) => Promise<U | undefined>
): Promise<U[]> {
    const limit = pLimit(maxConcurrency);
    const withUndefineds = await Promise.all(arr.map(async x => limit(async () => await f(x))));
    return withUndefineds.filter(u => u !== undefined) as U[];
}

export async function concurrentForEach<T>(
    items: Iterable<T>,
    maxConcurrency: number,
    f: (x: T) => Promise<unknown>
): Promise<void> {
    await withBackpressureConcurrencyLimiter(maxConcurrency, async limiter => {
        await limiter.forEach(items, f);
    });
}

export class ConcurrencyLimiterWithBackpressure {
    private readonly promises = new Set<Promise<unknown>>();

    constructor(private readonly limit: number) {}

    public async run(f: () => Promise<unknown>, abortSignal?: AbortSignal): Promise<boolean> {
        // We allocate yet another abort controller to prevent memory leaks.
        // The AbortSignal we are given may be shared across multiple invocations,
        // so we have to take care to remove our event listener when we're done.
        // If we didn't, even though our invocation is long finished, our event
        // listener would still be strongly bound into the AbortSignal's event
        // listener set.
        //
        // Unfortunately this doesn't compose well with Promises; if we didn't do
        // this we'd have to launder the Promise's resolve() into the outer scope,
        // which is extremely weird. So instead we just forward the AbortSignal
        // into our own AbortController, using a function that is easily added
        // to and removed from the given AbortSignal's event listener.
        const abortController = new AbortController();
        const forwardAbortSignal = () => {
            abortController.abort();
        };

        if (abortSignal !== undefined) {
            if (abortSignal.aborted) return false;
            abortSignal.addEventListener("abort", forwardAbortSignal);
        }

        const subSignal = abortController.signal;
        const subSignalPromise = new Promise<void>(resolve => {
            if (subSignal.aborted) return resolve();
            subSignal.addEventListener("abort", () => resolve());
        });

        try {
            while (this.promises.size >= this.limit) {
                const waitlist = [subSignalPromise, ...this.promises];
                await Promise.race(waitlist);
                if (subSignal.aborted) return false;
            }
        } finally {
            if (abortSignal !== undefined) {
                abortSignal.removeEventListener("abort", forwardAbortSignal);
            }
        }
        let added = false;
        let finished = false;

        const called = (async (onAdded: () => void) => {
            try {
                return await f();
            } finally {
                if (added) {
                    onAdded();
                } else {
                    finished = true;
                }
            }
            // ESLint wasn't happy with the refer-before-assignment,
            // that wasn't actually a refer-before-assignment.
            // So instead we self-referentially work with `called`
            // by passing _another_ function that closes over `called`
            // in a manner that ESLint understands that it's definitely
            // defined now.
        })(() => this.promises.delete(called));

        if (!finished) {
            this.promises.add(called);
            added = true;
        }
        return true;
    }

    public async runSync<T>(f: () => Promise<T>, abortSignal?: AbortSignal): Promise<ConcurrencyLimitResult<T>> {
        return await new Promise<ConcurrencyLimitResult<T>>(async (resolve, reject) => {
            let resolved = false;
            try {
                const didRun = await this.run(async () => {
                    try {
                        const result = await f();
                        if (!resolved) {
                            resolved = true;
                            resolve({ kind: "success", result });
                        }
                    } catch (e: unknown) {
                        if (!resolved) {
                            resolved = true;
                            reject(e);
                        }
                    }
                }, abortSignal);
                if (!didRun && !resolved) {
                    resolved = true;
                    resolve({ kind: "timeout" });
                }
            } catch (e: unknown) {
                if (!resolved) {
                    resolved = true;
                    reject(e);
                }
            }
        });
    }

    public hasOutstandingWork(): boolean {
        return this.promises.size > 0;
    }

    public async finish(): Promise<void> {
        while (this.hasOutstandingWork()) {
            await allSettled(Array.from(this.promises));
        }
    }

    public async forEach<T>(items: Iterable<T>, f: (x: T) => Promise<unknown>): Promise<void> {
        for (const x of items) {
            await this.run(async () => await f(x));
        }
    }
}
