import {
    asMaybeString,
    asString,
    type SerializedQuery,
    type BigTableQueryContinuation,
    type BigTableQueryResult,
} from "@glide/computation-model-types";
import { nativeTableRowDeletedColumnName, rowIndexColumnName, type TableColumn } from "@glide/type-schema";
import { areConditionsTrueForRow } from "../query-eval";
import { applySort } from "../query-sort";
import { evaluateGroupBy, type RowColumnValueGetter } from "../aggregates";
import { asMaybeBoolean, asMaybeNumber } from "@glide/common-core/dist/js/type-conversions";

export function evaluateQueryForRows<T extends Record<string, any>>(
    query: SerializedQuery,
    columns: readonly TableColumn[],
    rows: readonly T[],
    getColumnValue: RowColumnValueGetter<T>,
    continuation: BigTableQueryContinuation | undefined
): Error | BigTableQueryResult<T> {
    const isGroupBy = query.groupBy !== undefined;
    if (query.limit === 0) {
        return {
            isGroupBy,
            rows: [],
            version: undefined,
            numRowsInTable: rows.length,
            continuation,
            tableVersions: {},
        };
    }

    let results = rows.filter(row => areConditionsTrueForRow(query, cn => getColumnValue(row, cn), columns));

    if (query.search !== undefined) {
        const { columnNames } = query.search;
        const needle = query.search.needle.toLowerCase();
        results = rows.filter(row =>
            columnNames.some(cn => asMaybeString(getColumnValue(row, cn))?.toLowerCase().includes(needle))
        );
    }

    if (query.sort !== undefined) {
        applySort(columns, query.sort, results);
    }

    const wantVersion = query.rowVersion;
    if (wantVersion === undefined) {
        results = results.filter(row => asMaybeBoolean(getColumnValue(row, nativeTableRowDeletedColumnName) !== true));
    } else {
        const isComplex = isGroupBy || query.search !== undefined || query.conditions.length > 0;
        if (isComplex) return new Error("Row version not compatible with complex queries");
        results = results.filter(row => {
            const actualVersion = asMaybeNumber(getColumnValue(row, "$rowVersion"));
            if (actualVersion === undefined) return true;
            return actualVersion > wantVersion;
        });
    }

    // Row version is internal and shouldn't be returned
    results = results.map(({ $rowVersion, ...rest }) => rest as T);

    // FIXME: Figure out how continuations will work with aggregation
    let startIndex = 0;
    if (continuation !== undefined) {
        startIndex =
            results.findIndex(
                r =>
                    getColumnValue(r, rowIndexColumnName) === continuation.rowIndex &&
                    (query.sort.length === 0 ||
                        continuation.sortKeys.every((k, i) => getColumnValue(r, query.sort[i].columnName) === k))
            ) + 1;
    }

    if (query.limit !== undefined || startIndex > 0) {
        const limit = query.limit ?? 1;
        results = results.slice(startIndex, startIndex + limit);
    }

    if (query.groupBy === undefined) {
        return {
            isGroupBy: false,
            rows: results,
            version: undefined,
            numRowsInTable: rows.length,
            continuation:
                results.length === query.limit
                    ? {
                          rowIndex: asString(getColumnValue(results[results.length - 1], rowIndexColumnName)),
                          sortKeys: query.sort.map(s => getColumnValue(results[results.length - 1], s.columnName)),
                      }
                    : undefined,
            tableVersions: {},
        };
    }

    const groupByResults = evaluateGroupBy(query.groupBy, columns, results, getColumnValue, v => v.toDocumentData());

    return {
        isGroupBy: true,
        rows: groupByResults,
        version: undefined,
        numRowsInTable: rows.length,
        // We don't do continuations for aggregations (yet)
        continuation: undefined,
        tableVersions: {},
    };
}
