import * as ApiUtils from "ApiUtils";
import * as AsyncValueUtils from "Utils/AsyncValueUtils";
import * as CancellableResultUtils from "Utils/CancellableResultUtils";
import * as EventLogger from "EventLogger";
import * as JsonUtils from "JsonUtils";
import * as QueryUtils from "QueryUtils";
import * as Tunnel from "ApiContracts/subtrace/tunnel/tunnel";
import { AsyncValue } from "Utils/AsyncValue";
import { CancellableResult } from "Utils/CancellableResult";
import { SubtraceEventKind } from "SubtraceEventKind";
import { Deferred } from "Deferred";
import { QueryResult } from "QueryResult";
import { ClickhouseQueryResult } from "ClickhouseResult";
import { WebSocketCloseCode } from "WebSocketCloseCode";

export class QueryManager implements Disposable {
  // We only maintain one websocket open at a time per instance of the dashboard.
  // If the user switches namespaces, we close the current open websocket and
  // open a fresh one.
  private controller: AbortController | undefined;
  private currentNamespaceId: string | undefined;
  private cachedWebSocketResult: AsyncValue<CancellableResult<WebSocket>>;
  private onQueryEvaluated: ((queryId: string, queryResult: QueryResult) => void) | undefined;
  private static instance: QueryManager | undefined;

  private constructor() {
    this.cachedWebSocketResult = AsyncValueUtils.uninitialized();
  }

  public [Symbol.dispose](): void {
    this.cleanupWebSocket(WebSocketCloseCode.NormalClosure, "cleanupWebSocket: Disposing QueryManager instance");
  }

  public static getInstance(): QueryManager {
    if (this.instance === undefined) {
      this.instance = new QueryManager();
    }

    return this.instance;
  }

  public async queueEvaluation(
    namespaceId: string,
    sqlStatement: string,
    queryId: string,
  ): Promise<CancellableResult<undefined>> {
    const result: CancellableResult<WebSocket> = await this.initializeWebSocket(namespaceId);

    if (CancellableResultUtils.isFailure(result) || CancellableResultUtils.isCancelled(result)) {
      return result;
    }

    const sqlQueryMessage: ArrayBuffer = Tunnel.Select.encode({
      sqlStatement,
      tunnelQueryId: queryId,
    }).finish();
    const webSocket: WebSocket = result.value;

    webSocket.send(sqlQueryMessage);
    return CancellableResultUtils.success(undefined);
  }

  public setOnQueryEvaluated(onQueryEvaluated: (queryId: string, queryResult: QueryResult) => void): void {
    this.onQueryEvaluated = onQueryEvaluated;
  }

  private cleanupWebSocket(code: WebSocketCloseCode, reason: string): void {
    this.controller?.abort();
    if (
      AsyncValueUtils.isFulfilled(this.cachedWebSocketResult) &&
      CancellableResultUtils.isSuccess(this.cachedWebSocketResult.value)
    ) {
      this.cachedWebSocketResult.value.value.close(code, reason);
    }
  }

  private async decodeTunnelResult(data: Uint8Array, mode: Tunnel.CompressionMode): Promise<QueryResult> {
    const start: number = performance.now();

    const blob = new Blob([data]);
    let stream: ReadableStream<string>;
    switch (mode) {
      case Tunnel.CompressionMode.COMPRESSION_NONE:
        stream = blob.stream().pipeThrough(new TextDecoderStream());
        break;
      case Tunnel.CompressionMode.COMPRESSION_GZIP:
        stream = blob.stream().pipeThrough(new DecompressionStream("gzip")).pipeThrough(new TextDecoderStream());
        break;

      default:
        throw new Error(`unknown compression mode ${mode}`);
    }

    let text: string = "";
    const reader: ReadableStreamDefaultReader<string> = stream.getReader();
    let result: ReadableStreamReadResult<string>;
    while (!(result = await reader.read()).done) {
      text += result.value;
    }
    const json = JsonUtils.parse<ClickhouseQueryResult>(text);
    const timeTakenInMilliseconds: number = performance.now() - start;

    EventLogger.logEvent(SubtraceEventKind.TunnelResultDecoded, {
      compression_mode: Tunnel.compressionModeToJSON(mode),
      wire_data_size_bytes: data.length,
      text_size_bytes: text.length,
      compression_ratio: (text.length / data.length).toFixed(1),
      number_of_cols: json.meta.length,
      number_of_rows: json.data.length,
      time_taken_ns: Math.round(timeTakenInMilliseconds * 1e6),
      throughput_bytes_per_second: Math.round(text.length / (timeTakenInMilliseconds / 1e3)),
    });

    return {
      meta: json.meta,
      data: json.data.map((row) => Object.fromEntries(json.meta.map((column, index) => [column.name, row[index]]))),
      ...(json.rows_before_limit_at_least !== undefined &&
      json.rows_before_limit_at_least > QueryUtils.GRID_QUERY_RESULT_MAX_ROWS
        ? {
            wasRowLimitExceeded: true,
            actualNumberOfRows: json.rows_before_limit_at_least,
          }
        : { wasRowLimitExceeded: false }),
    };
  }

  private async initializeWebSocket(namespaceId: string): Promise<CancellableResult<WebSocket>> {
    if (this.currentNamespaceId !== namespaceId) {
      this.cleanupWebSocket(WebSocketCloseCode.NormalClosure, "cleanupWebSocket: Namespace switched");
      this.currentNamespaceId = namespaceId;
      this.cachedWebSocketResult = AsyncValueUtils.uninitialized();
    }

    if (AsyncValueUtils.isFulfilled(this.cachedWebSocketResult)) {
      if (CancellableResultUtils.isSuccess(this.cachedWebSocketResult.value)) {
        const webSocket: WebSocket = this.cachedWebSocketResult.value.value;
        if (webSocket.readyState === WebSocket.OPEN) {
          return this.cachedWebSocketResult.value;
        }
      }
    }

    if (AsyncValueUtils.isPending(this.cachedWebSocketResult)) {
      return this.cachedWebSocketResult.promise;
    }

    const deferredResult: Deferred<CancellableResult<WebSocket>> = new Deferred();
    this.cachedWebSocketResult = AsyncValueUtils.pending(deferredResult.promise);

    const controller: AbortController = new AbortController();
    let endpoint: string;

    try {
      const createTunnelRequest: Tunnel.Create_Request = {
        namespaceId,
        role: Tunnel.Role.SELECT,
      };

      const response: Response = await ApiUtils.post("/api/CreateTunnel", createTunnelRequest, {
        signal: controller.signal,
        subtraceTags: { namespace_id: namespaceId },
      });
      await ApiUtils.assertStatus(response, 200);

      const data: Tunnel.Create_Response = await response.json();
      endpoint = data.endpoint;
    } catch (error: unknown) {
      if (controller.signal.aborted) {
        // Do nothing since the API call was aborted, likely because the user navigated back in their
        // browser or reloaded the page.
        const result: CancellableResult<WebSocket> = CancellableResultUtils.cancelled();
        deferredResult.resolve(result);
        this.cachedWebSocketResult = AsyncValueUtils.fulfilled(result);
        return result;
      } else {
        const errorToLog: Error = error instanceof Error ? error : new Error(JsonUtils.stringify(error));
        EventLogger.logEvent(SubtraceEventKind.CreateTunnelFailure, {
          error_message: errorToLog.message,
          error_name: errorToLog.name,
          error_stack: errorToLog.stack ?? "",
        });

        const result: CancellableResult<WebSocket> = CancellableResultUtils.failure(errorToLog);
        deferredResult.resolve(result);
        this.cachedWebSocketResult = AsyncValueUtils.fulfilled(result);
        return result;
      }
    }

    const webSocket: WebSocket = new WebSocket(endpoint);
    webSocket.binaryType = "arraybuffer";

    const deferredWebSocket: Deferred<WebSocket> = new Deferred();

    webSocket.onopen = (): void => {
      deferredWebSocket.resolve(webSocket);
      const result: CancellableResult<WebSocket> = CancellableResultUtils.success(webSocket);
      this.cachedWebSocketResult = AsyncValueUtils.fulfilled(result);
      deferredResult.resolve(result);
    };

    webSocket.onclose = (event): Promise<void> => this.onWebSocketClose(event);
    webSocket.onmessage = (event): Promise<void> => this.onWebSocketMessage(event);

    return deferredResult.promise;
  }

  private async onWebSocketClose(event: CloseEvent): Promise<void> {
    if (event.code === WebSocketCloseCode.NormalClosure) {
      // Do nothing, these are situations where we expect the WebSocket to close,
      // and stay closed.
      return;
    }

    // We don't usually expect the websocket to close like this, so log
    // information around why this happened and reset the websocket.
    EventLogger.logEvent(SubtraceEventKind.TunnelWebSocketClosed, {
      code: event.code,
      reason: event.reason,
      was_clean: event.wasClean,
    });

    this.cachedWebSocketResult = AsyncValueUtils.uninitialized();
  }

  private async onWebSocketMessage(message: MessageEvent): Promise<void> {
    const result: Tunnel.Result = Tunnel.Result.decode(new Uint8Array(message.data));
    if (result.tunnelError) {
      EventLogger.logEvent(SubtraceEventKind.TunnelError, { error: result.tunnelError });
      return;
    }

    if (result.clickhouseError) {
      EventLogger.logEvent(SubtraceEventKind.ClickhouseError, { error: result.clickhouseError });
      return;
    }

    const queryResult: QueryResult = await this.decodeTunnelResult(result.compressedData, result.compressionMode);
    this.onQueryEvaluated?.(result.tunnelQueryId, queryResult);
  }
}
