import { BehaviorSubject, Observable, Subject } from 'rxjs';
import { filter } from 'rxjs/operators';
import SocketIo from 'socket.io-client';
import { filterChannels } from '.';
import { Config, Job } from '../state';
import {
  IAssets,
  IConfig,
  IFeedbackParams,
  IMessage,
  ISocketResponse,
  SocketMessage,
  SocketResponseType,
} from '../types';

export class WebsocketSubject extends Subject<SocketMessage> {
  private assetsSubject: BehaviorSubject<IAssets>;
  private configSubject: BehaviorSubject<Config>;
  private socket: SocketIOClient.Socket;
  private sentJobSubject: Subject<Job>;
  private editorId?: string;

  constructor(uri: string, clientId: string, onConnect: () => void) {
    super();
    this.assetsSubject = new BehaviorSubject<IAssets>({
      icon: '',
      logo: '',
    });
    this.configSubject = new BehaviorSubject<Config>(new Config());
    this.sentJobSubject = new Subject<Job>();
    this.socket = SocketIo(uri, {
      autoConnect: false,
      query: {
        clientId,
      },
      randomizationFactor: 0.8,
      reconnectionDelay: 2000,
      reconnectionDelayMax: 600000,
      timeout: 600000,
    });
    this.socket.on('reconnect_attempt', (attemptNumber: number) => {
      this.socket.io.opts.query = {
        clientId,
        editorId: this.editorId,
        reconnecting: true,
      };
    });
    this.socket.on('connect', () => {
      onConnect();
      this.socket.on(`${this.socket.id}:editorId`, (editorId: string) => {
        this.editorId = editorId;
      });
      this.addListenerForAssetsChannel();
      this.addListenerForConfigChannel();
      this.addListenerForChannel(SocketResponseType.Corrections, msg => msg);
      this.addListenerForChannel(SocketResponseType.Mongo, msg => msg);
      this.addListenerForChannel(SocketResponseType.Tokenized, msg => msg);
      this.addListenerForChannel(SocketResponseType.Unchanged, msg => msg);
    });
  }

  public connect() {
    this.socket.connect();
  }

  public sendJob(job: Job): void {
    this.socket.emit(`${this.socket.id}:jobs`, job.getMessageJSON());
    this.sentJobSubject.next(job);
  }

  public sendFeedback(feedbackParams: IFeedbackParams): void {
    this.socket.emit(
      `${this.socket.id}:feedback`,
      JSON.stringify(feedbackParams),
    );
  }

  public get assets$(): Observable<IAssets> {
    return this.assetsSubject.asObservable();
  }

  public get config$(): Observable<Config> {
    return this.configSubject.asObservable();
  }

  public sentJobObservable(editorId: string): Observable<Job> {
    return this.sentJobSubject
      .asObservable()
      .pipe(filter(job => job.editorId === editorId));
  }

  public messagesFor(editorId: string): Observable<SocketMessage> {
    return this.asObservable().pipe(
      filterChannels([
        SocketResponseType.Corrections,
        SocketResponseType.Tokenized,
        SocketResponseType.Unchanged,
        SocketResponseType.Mongo,
      ]),
      filter(m => Job.parseKey(m.msg.key!).editorId === editorId),
    );
  }

  public createEditablesInMongoFor(key: string): void {
    this.socket.emit(`${this.socket.id}:editable`, `${this.socket.id}:${key}`);
  }

  private addListenerForAssetsChannel() {
    this.socket.on(
      `${this.socket.id}:${SocketResponseType.Assets}`,
      (msg: IAssets) => {
        this.assetsSubject.next(msg);
      },
    );
  }

  private addListenerForConfigChannel() {
    this.socket.on(
      `${this.socket.id}:${SocketResponseType.Config}`,
      (msg: Partial<IConfig>) => {
        this.configSubject.next(this.configSubject.getValue().merge(msg));
      },
    );
  }

  private addListenerForChannel<C extends SocketResponseType>(
    channel: C,
    mapper: (msg: any) => ISocketResponse<C>,
  ): void {
    this.socket.on(`${this.socket.id}:${channel}`, (msg: any) => {
      const nextValue: IMessage<C> = { channel, msg: mapper(msg) };
      this.next(nextValue);
    });
  }
}
