import { Hostname, parseDates } from "@kvix/shared";
import { Observable } from "rxjs/internal/Observable";
import { merge } from "rxjs/internal/observable/merge";
import { finalize } from "rxjs/internal/operators/finalize";
import { Subject } from "rxjs/internal/Subject";
import io from "socket.io-client";
import { Log } from "./utils/logging";

export abstract class Socket {
  instance: SocketIOClient.Socket | null = null;

  connect() {
    const options: SocketIOClient.ConnectOpts = {
      // Do not fallback to long-polling since we scale horizontally
      transports: ["websocket"],
      path: "/live",
    };

    const url = Hostname.socket.default();
    this.instance = io(url, options);

    (window as any).io = this.instance.io;
  }

  disconnect() {
    if (this.instance) {
      this.instance.disconnect();
    }
  }

  protected emit(event: string, payload?: any): Promise<void> {
    return new Promise((resolve, reject) => {
      if (!this.instance) {
        throw new Error("No available socket!");
      }

      Log.socket.send(event, payload);

      this.instance.emit(event, payload || null, (error?: any) => {
        return error ? reject(error) : resolve();
      });
    });
  }

  on<T>(event: string | string[]): Observable<T> {
    if (event instanceof Array) {
      return merge(...event.map((name) => this.on<T>(name)));
    }

    if (!this.instance) {
      throw new Error("Socket is not started!");
    }

    const stream = new Subject<T>();

    const emitter = (data: T) => {
      parseDates(data);

      Log.socket.recieve(event, data);
      stream.next(data);
    };

    this.instance.on(event, emitter);

    return stream.asObservable().pipe(
      finalize(() => {
        if (this.instance) {
          this.instance.removeListener(event, emitter);
        }
      })
    );
  }
}
