import { EventEmitter, Injectable } from "@angular/core";
import { Observable, Subject } from "rxjs";
import { Centrifuge } from "centrifuge";
import { environment } from "../../../../../apps/ui/src/environments/environment";
import { LocalStorageKey } from "@common/enums/local-storage.keys";
@Injectable({
  providedIn: "root",
})
export class CentrifugeService {
  public static onNewNotification: Subject<any> = new Subject<any>();
  public static onRecordStatusUpdate: Subject<any> = new Subject<any>();
  private client: Centrifuge;
  private debug = true;

  private connected = false;
  stateEmitter = new EventEmitter<any>();

  getStates(): Observable<any> {
    return this.stateEmitter;
  }

  connect(parameters): void {
    if (this.connected) {
      return;
    }
    this.client = new Centrifuge(environment.centrifugoServerUrl, {
      // sockjs: SockJS,
      token: localStorage.getItem(LocalStorageKey.AUTH),
      ...parameters,
    });

    this.debug = true;
    this.client
      .on("connecting", (ctx) => {
        if (this.debug) {
          console.log(`connecting: ${ctx.code}, ${ctx.reason}`);
        }
      })
      .on("connected", (ctx) => {
        this.connected = true;
        if (this.debug) {
          console.log(`connected over ${ctx.transport}`);
        }
        this.stateEmitter.emit({
          type: "state",
          state: "connected",
          info: ctx,
        });
      })
      .on("disconnected", (ctx) => {
        this.connected = false;
        if (this.debug) {
          console.log(`disconnected: ${ctx.code}, ${ctx.reason}`);
        }
        this.stateEmitter.emit({
          type: "state",
          state: "disconnected",
          info: ctx,
        });
        delete this.client;
      })
      .on("error", (ctx) => {
        if (this.debug) {
          console.error("Error :", ctx);
        }
        this.stateEmitter.emit({ type: "error", info: ctx });
      })
      .connect();
  }

  getMessages(channel: string) {
    const subscription = this.client.newSubscription(channel);
    return subscription
      .on("publication", function (ctx) {
        // handle new message coming from channel "news"
        if (this.debug) {
          console.log(`data received: ${ctx.data}, channel: ${ctx.channel}`);
        }
        const { data } = ctx;
        CentrifugeService.onNewNotification.next(data);
        CentrifugeService.onRecordStatusUpdate.next(data);
      })
      .on("subscribing", (ctx) => {
        if (this.debug) {
          console.log(`subscribing: ${ctx.code}, ${ctx.reason}`);
        }
      })
      .on("subscribed", (ctx) => {
        if (this.debug) {
          console.log(`subscribed channel: ${ctx.channel}, data: ${ctx.data}`);
        }
      })
      .on("unsubscribed", (ctx) => {
        if (this.debug) {
          console.log(`unsubscribed: ${ctx.code}, ${ctx.reason}`);
        }
        this.stateEmitter.emit({ type: "error", info: ctx });
      })
      .on("error", (ctx) => {
        if (this.debug) {
          console.log(" Subscribe error :", ctx);
        }
        this.stateEmitter.emit({ type: "error", info: ctx });
      })
      .subscribe();
  }

  disconnect(): void {
    this.client.disconnect();
  }
}
