import { Injectable, OnDestroy } from "@angular/core";
import {
  HubConnection,
  HubConnectionBuilder,
  HubConnectionState,
} from "@microsoft/signalr";
import { ReplaySubject, Subject } from "rxjs";
import { filter, map } from "rxjs/operators";
import { SubSink } from "subsink";

@Injectable({
  providedIn: "root",
})
export class SignalRBuilderService
  extends HubConnectionBuilder
  implements OnDestroy {
  private readonly listeners$$: Subject<any> = new Subject<any>();
  private readonly signalRHandlers$$: ReplaySubject<string> = new ReplaySubject<string>();
  private readonly subSink = new SubSink();
  private hubConnection: HubConnection;

  constructor() {
    super();
  }

  public async start() {
    this.buildConnection();

    if (this.hubConnection.state === HubConnectionState.Disconnected) {
      await this.hubConnection.start();
    }
    return Promise.resolve();
  }

  public async stop() {
    if (
      this.hubConnection &&
      this.hubConnection.state === HubConnectionState.Connected
    ) {
      await this.hubConnection.stop();
    }
    return Promise.resolve();
  }

  public isConnected() {
    return this.hubConnection?.state === HubConnectionState.Connected;
  }

  private buildConnection() {
    if (!this.hubConnection) {
      this.hubConnection = this.build();
      this.subSink.sink = this.signalRHandlers$$.subscribe((name) => {
        this.hubConnection.on(name, (data) => {
          this.listeners$$.next({ name, data });
        });
      });
    }
  }

  public addListener<T>(name: string) {
    this.signalRHandlers$$.next(name);

    return this.listeners$$.pipe(
      filter((value) => value.name === name),
      map((value) => value.data as T)
    );
  }

  ngOnDestroy(): void {
    this.subSink.unsubscribe();
  }
}
