import { Injectable } from '@angular/core';
import {map, Observable, ReplaySubject} from "rxjs";
import * as Stomp from "@stomp/stompjs";
import {StompSubscription} from "@stomp/stompjs";

/**

 WebSocketService is reposonsible for initializing
 the connection and keeps the track of all connected
 contexts. (keep their references until disconnect)
 */
interface IWebSocketService {
  /**
   1. service checks if connection with provided endpoint exists then returns it
   2. if not above, service creates a new connection object established with provided url
   The object is stored to future reuse until disconnect() invocation

   Use the newest Stomp but without SockJs
   */
  establish(endpoint: string): Observable<WebSocketConnection>;
}

/**
 WebSocketContext is responsible for single websocket connection
 and keeps the track of all subscriptions on its topics.
 */
interface IWebSocketConnection {
  /**
   At any time context may be used to disconnect from the
   web socket server. Remember to close all pending subscriptions
   before closing the web socket connection.
   */
  disconnect();

  /**
   Listen for events from the topic. If holder
   unsubscribes stop the stomp subscription too.

   Hold all pending subscritpions table. Remove
   subscriptions only on subscription.unsubscribe().

   Use subscriptions table to close all subscriptions
   on disconnect() call.
   */
  subscribe(topic: string): Observable<string>;

  /**
   Use subscribe() implementation but in the pipe
   map with JSON.parse() to the specific type of object
   */
  subscribeForObject<T>(topic: string): Observable<T>;
}
@Injectable({
  providedIn: 'root'
})
export class WebSocketService implements IWebSocketService {

  private connections = new Map<string, ReplaySubject<WebSocketConnection>>();
  constructor() {}

  establish(endpoint: string): Observable<WebSocketConnection> {
    if (this.connections.size && this.connections.has(endpoint))
      return this.connections.get(endpoint);

    const connectionSubject = new ReplaySubject<WebSocketConnection>(1);
    this.connections.set(endpoint, connectionSubject);

    const connection = new WebSocketConnection();
    connection.client = new Stomp.Client({
      brokerURL: endpoint,
      // debug: (str) => {
      //   console.log(str);
      // },
      reconnectDelay: 10000,
      heartbeatIncoming: 10000,
      heartbeatOutgoing: 10000,
      onConnect: () => {
        connectionSubject.next(connection);
      },
      onStompError: (frame) => {
        console.log("Broker reported error: " + frame.headers["message"]);
        console.log("Additional details: " + frame.body);
      },
    });
    // you should at least react on activate() error with pushing error to the connection subject (connectionSubject.error())

    try {
      connection.client.activate();
    } catch (e) {
      connectionSubject.error(e)
    }
      return connectionSubject;
  }
}

export class WebSocketConnection implements IWebSocketConnection {
  client: Stomp.Client;
  subscriptions: StompSubscription[] = [];

  constructor() {}

  disconnect() {
    this.subscriptions.forEach((sub) => {
      this.client.unsubscribe(sub.id);
      this.subscriptions.splice(0, this.subscriptions.length);
    });
    this.client.deactivate().then();
  }

  subscribe(topic: string): Observable<string> {
    return new Observable<string>((observer) => {
      const subscription = this.client.subscribe(topic, (msg) =>
        observer.next(msg.body)
      );
      this.subscriptions.push(subscription);
      return {
        unsubscribe: () => {
          this.client.unsubscribe(subscription.id);
          this.subscriptions.splice(this.subscriptions.indexOf(subscription));
        },
      };
    });
  }

  subscribeForObject<T>(
    topic: string
  ): Observable<T> {
    return this.subscribe(topic).pipe(
      map(msg => JSON.parse(msg))
    )
  }
}
