import { Observable } from 'rxjs';
import { map, share } from 'rxjs/operators';
import { CleanSubscriber } from '../util/subscriber';
import { ContextService } from './context.service';
import { Inject, Injectable, OnDestroy } from '@angular/core';
import { RxStompService } from '@stomp/ng2-stompjs';
import { StompConfig } from '@stomp/stompjs';
import { IonstackModuleConfig, MODULE_CONFIG } from '../ionstack.config';
import { IonstackService } from './ionstack.service';
import { Context } from '../model/context';

@Injectable({
  providedIn: 'root'
})
export class WebsocketService implements OnDestroy {
  private sub = new CleanSubscriber();
  private userId: number;
  private started = false;
  private observers: {[key: string]: Observable<any>} = {};

  constructor(
    private stompService: RxStompService,
    @Inject(MODULE_CONFIG) private ionstackModuleConfig: IonstackModuleConfig,
    private ionstackService: IonstackService,
    private contextService: ContextService,
  ) {
    this.sub.subscribe<Context>(this.contextService.context, ctx => {
      const uId = ctx.account?.id;
      if (this.userId !== uId && this.started) {
        this.stompService.stompClient.forceDisconnect();
      }
    });
  }
  
  ngOnDestroy() {
    this.sub.unsubscribeAll();
    if (this.stompService.stompClient.active) {
      this.stompService.stompClient.deactivate();
    }
  }

  isStarted() {
    return this.started;
  }

  send(destination: string, data: string | Uint8Array | any = {}) {
    if (data instanceof Uint8Array) {
      this.stompService.publish({destination: '/app' + destination, binaryBody: data});
    } else {
      this.stompService.publish({destination: '/app' + destination, body: typeof data === 'string' ? data : JSON.stringify(data)});
    }
  }

  private useConnexion() {
    if (!this.started && this.ionstackService.isBrowser() && this.ionstackModuleConfig.websocketBrokerUrl) {
      this.started = true;
      const config: StompConfig = {
        brokerURL: this.ionstackModuleConfig.websocketBrokerUrl,
        reconnectDelay: 10000,
      };
      const token = this.contextService.getCurrentContext()?.token;
      if (token) {
        config.connectHeaders = {Authorization: 'Bearer ' + token};
      }
      this.stompService.stompClient.configure(config);
      this.stompService.activate();
    }
  }

  subscribe<T>(destination: string, prefix = '/topic'): Observable<T> {
    if (destination in this.observers) {
      return this.observers[destination];
    }
    const obs = this.observers[destination] = new Observable<T>(observer => {
      this.useConnexion();
      const sub = this.stompService.watch({destination: prefix + destination}).pipe(map(msg => {
        if (msg.headers?.['content-type'] === 'application/json') {
          return JSON.parse(msg.body);
        }
        if (msg.isBinaryBody) {
          return msg.binaryBody as any;
        }
      })).subscribe(observer);
      this.sub.registerSub(sub);
      return () => {
        sub.unsubscribe();
        delete this.observers[destination];
        if (Object.keys(this.observers).length === 0) {
          this.started = false;
          this.stompService.deactivate();
        }
      }
    }).pipe(share());
    this.observers[destination] = obs;
    return obs;
  }

  userSubscribe(destination: string) {
    return this.subscribe(destination, '/user/topic');
  }

}
