import { Injectable, NgZone, OnDestroy } from '@angular/core';
import { HttpClient, HttpResponseBase } from '@angular/common/http';
import { ConversationsLiveDataApi } from '@api/conversations-api/services/interfaces/conversations-live-data.api';
import { Observable, Subject } from 'rxjs';
import { ContactResponse } from '@api/contacts-api/models/responses/contact.response';
import { apiConversationsCommon } from '@environments/environment';
import { PushMessagePayload } from '@api/conversations-api/models/interfaces/push-message-payload.interface';
import { PushPayloadMessageType } from '@api/conversations-api/models/enums/push-payload-message-type.enum';
import { JsonSerializer } from '@core/serializer/json.serializer';
import { UnreadConversationResponse } from '@api/conversations-api/models/responses/unread-conversation.response';
import { IncomingCallResponse } from '@api/conversations-api/models/responses/incoming-call.response';
import { OutgoingCallResponse } from '@api/conversations-api/models/responses/outgoing-call.response';
import { NotificationResponse } from '@api/conversations-api/models/responses/notification.response';
import { takeUntil } from 'rxjs/operators';
import Timeout = NodeJS.Timeout;

@Injectable({
  providedIn: 'root',
})
export class ConversationsLiveDataApiService implements ConversationsLiveDataApi, OnDestroy {
  private readonly ALL_TOPICS_SUBSCRIPTION = '*';
  private readonly MAX_RECONNECT_ATTEMPTS = 20;
  private readonly RECONNECT_INITIAL_DELAY_MILLISECONDS = 1000;
  private readonly RECONNECT_EXPONENTIAL_BACKOFF_MULTIPLIER = 2;
  private readonly MAX_RECONNECT_DELAY_MILLISECONDS = 60 * 1000;
  private contactUpdated$ = new Subject<ContactResponse>();
  private unreadConversations$ = new Subject<UnreadConversationResponse[]>();
  private incomingCall$ = new Subject<IncomingCallResponse>();
  private outgoingCall$ = new Subject<OutgoingCallResponse>();
  private notificationsUpdated$ = new Subject<void>();
  private newNotification$ = new Subject<NotificationResponse>();
  private connectionShutdown$ = new Subject<void>();
  private eventSource?: EventSource;
  private reconnectAttempt = 0;
  private reconnectDelay = this.RECONNECT_INITIAL_DELAY_MILLISECONDS;
  private reconnectTimeout?: Timeout;

  public constructor(private http: HttpClient,
                     private zone: NgZone,
  ) {
  }

  public contactUpdated(): Observable<ContactResponse> {
    return this.contactUpdated$.asObservable();
  }

  public unreadConversations(): Observable<UnreadConversationResponse[]> {
    return this.unreadConversations$.asObservable();
  }

  public incomingCall(): Observable<IncomingCallResponse> {
    return this.incomingCall$.asObservable();
  }

  public outgoingCall(): Observable<OutgoingCallResponse> {
    return this.outgoingCall$.asObservable();
  }

  public notificationsUpdated(): Observable<void> {
    return this.notificationsUpdated$.asObservable();
  }

  public newNotification(): Observable<NotificationResponse> {
    return this.newNotification$.asObservable();
  }

  public openConnection(): void {
    this.discoverHub().pipe(
      takeUntil(this.connectionShutdown$),
    ).subscribe(
      (response) => {
        const hubUrl = this.extractHubUrl(response);
        const hub = new URL(hubUrl);
        hub.searchParams.append('topic', this.ALL_TOPICS_SUBSCRIPTION);

        this.eventSource = new EventSource(hub.toString(), { withCredentials: true });
        this.eventSource.onmessage = (event) => this.onEventSourceMessage(event);
        this.eventSource.onopen = () => this.onEventSourceConnected();
        this.eventSource.onerror = () => this.onEventSourceError();
      }, () => this.onEventSourceError());
  }

  private reconnect(): void {
    this.reconnectDelay *= this.RECONNECT_EXPONENTIAL_BACKOFF_MULTIPLIER;
    if (this.reconnectDelay > this.MAX_RECONNECT_DELAY_MILLISECONDS) {
      this.reconnectDelay = this.MAX_RECONNECT_DELAY_MILLISECONDS;
    }
    this.reconnectTimeout = setTimeout(() => {
      this.openConnection();
    }, this.reconnectDelay);

    this.reconnectAttempt++;
  }

  public shutDownConnection(): void {
    this.connectionShutdown$.next();
    if (this.eventSource) {
      this.eventSource.close();
      this.eventSource = undefined;
    }
    if (this.reconnectTimeout) {
      clearTimeout(this.reconnectTimeout);
    }
  }

  private onEventSourceConnected(): void {
    this.reconnectAttempt = 0;
    this.reconnectDelay = this.RECONNECT_INITIAL_DELAY_MILLISECONDS;
  }

  private onEventSourceError(): void {
    if (this.eventSource && this.eventSource.readyState !== EventSource.CLOSED) {
      this.shutDownConnection();
    }

    if (this.reconnectAttempt < this.MAX_RECONNECT_ATTEMPTS) {
      this.reconnect();
    } else {
      this.onEventSourceClosedPermanently();
    }
  }

  private onEventSourceClosedPermanently(): void {
    throw new Error('Połączenie z serwerem PUSH zamknięte permanentnie. Nie nastąpi więcej prób wznowienia połączenia.');
  }

  private onEventSourceMessage(event: MessageEvent): void {
    const payload: PushMessagePayload = JSON.parse(event.data);

    switch (payload.type) {
      case PushPayloadMessageType.CONTACT_UPDATED:
        return this.onContactUpdateMessage(JsonSerializer.parse(payload.data, ContactResponse));
      case PushPayloadMessageType.UNREAD_CONVERSATIONS:
        return this.onUnreadConversationsMessage(JsonSerializer.parseArray(payload.data, UnreadConversationResponse));
      case PushPayloadMessageType.INCOMING_CALL:
        return this.onIncomingCallMessage(JsonSerializer.parse(payload.data, IncomingCallResponse));
      case PushPayloadMessageType.OUTGOING_CALL:
        return this.onOutgoingCallMessage(JsonSerializer.parse(payload.data, OutgoingCallResponse));
      case PushPayloadMessageType.NEW_NOTIFICATION:
        return this.onNewNotificationMessage(JsonSerializer.parse(payload.data, NotificationResponse));
      case PushPayloadMessageType.NOTIFICATIONS_UPDATED:
        return this.onNotificationsUpdatedMessage();
      default:
        return;
    }
  }

  private onContactUpdateMessage(contact: ContactResponse): void {
    this.zone.run(() => {
      this.contactUpdated$.next(contact);
    });
  }

  private onUnreadConversationsMessage(unreadConversations: UnreadConversationResponse[]): void {
    this.zone.run(() => {
      this.unreadConversations$.next(unreadConversations);
    });
  }

  private onIncomingCallMessage(incomingCallResponse: IncomingCallResponse): void {
    this.zone.run(() => {
      this.incomingCall$.next(incomingCallResponse);
    });
  }

  private onOutgoingCallMessage(outgoingCallResponse: OutgoingCallResponse): void {
    this.zone.run(() => {
      this.outgoingCall$.next(outgoingCallResponse);
    });
  }

  private onNewNotificationMessage(notificationResponse: NotificationResponse): void {
    this.zone.run(() => {
      this.newNotification$.next(notificationResponse);
    });
  }

  private onNotificationsUpdatedMessage(): void {
    this.zone.run(() => {
      this.notificationsUpdated$.next();
    });
  }

  private discoverHub(): Observable<HttpResponseBase> {
    const url = this.grabEndpointUrl(apiConversationsCommon.LIVE_DATA.DISCOVER);

    return this.http.get(url, { observe: 'response', withCredentials: true });
  }

  private extractHubUrl(response: HttpResponseBase): string {
    const linkHeader = response.headers.get('Link');
    if (!linkHeader) {
      throw new Error('Odpowiedź z serwera nie zawiera nagłówka Link z konfiguracją HUB.');
    }

    const matches = linkHeader.match(/<([^>]+)>;\s+rel=(?:mercure|"[^"]*mercure[^"]*")/);
    if (!matches || !matches[1]) {
      throw new Error('Odpowiedź z serwera zawiera nieprawidłową konfigurację HUB w nagłówku Link.');
    }

    return matches[1];
  }

  private grabEndpointUrl(endpoint: string): string {
    return [apiConversationsCommon.API_HOST_URL, endpoint].join('');
  }

  public ngOnDestroy(): void {
    this.shutDownConnection();
  }
}
