import { Injectable } from '@angular/core';
import { QuoteDto } from './shell-communication/dtos/quote-dto.class';
import { MessageBusService } from './message-bus.service';
import { ToastrService } from 'ngx-toastr';
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
import { LastQuoteCacheBackendService } from './last-quote-cache-backend-service.class';
import { ResubscribeQuotesDto } from './shell-communication/dtos/resubscribe-quotes-dto.interface';
import { convertCashSettledTickerToNormal, delay, isCashSettledOptionTicker, isNullOrUndefined, isTruthy } from './utils';
import { GreeksDto, ResubscribeOptionStrategiesDto, StrategyPriceDto } from './shell-communication/shell-dto-protocol';
import { ChangeMarketDataSubscriptionShell, ChangeOptionStrategiesSubscriptionShell, MarketDataRequestSpec } from './shell-communication/shell-operations-protocol';
import {SessionService} from "./authentication/session-service.service";


@Injectable({ providedIn: 'root' })
export class LastQuoteCacheService {
   constructor(
      private readonly _backendService: LastQuoteCacheBackendService,
      private readonly _messageBus: MessageBusService,
      private readonly _toastr: ToastrService,
      private readonly _sessionService: SessionService,
   ) {
      this._quotesCache = {};
      this._greeksCache = {};
   }

   //
   private _quotesCache: Record<string, QuoteDto>;
   private _greeksCache: Record<string, GreeksDto>;
   private _strategyPriceCache: Record<string, StrategyPriceDto>;
   private _ticker2subscribers: Record<string, number> = {};
   private _strategy2subscribers: Record<string, number> = {};
   private _unsubscriber: Subject<any>;
   private _tickersSubscribeQueue: string[] = [];
   private _tickersUnsubscribeQueue: string[] = [];
   private _tickersQueueProcessorInterval: number;

   //
   init(): Promise<any> {

      const start = Date.now();

      if (this._tickersQueueProcessorInterval) {
         clearTimeout(this._tickersQueueProcessorInterval);
      }

      if (this._unsubscriber) {
         this._unsubscriber.next();
         this._unsubscriber.complete();
      }

      this._unsubscriber = new Subject<any>();

      this._quotesCache = {};

      this._greeksCache = {};

      this._strategyPriceCache = {};

      this._ticker2subscribers = {};

      this._strategy2subscribers = {};

      this._messageBus
         .of<QuoteDto[]>('QuoteDto')
         .pipe(takeUntil(this._unsubscriber))
         .subscribe(x => this.onQuoteMessage(x.payload));

      this._messageBus
         .of<GreeksDto>('GreeksDto')
         .pipe(takeUntil(this._unsubscriber))
         .subscribe(x => this.onGreeksMessage(x.payload));

      this._messageBus
         .of<StrategyPriceDto[]>('StrategyPriceDto')
         .pipe(takeUntil(this._unsubscriber))
         .subscribe(x => this.onStrategyPriceMessage(x.payload));

      this._messageBus
         .of<ResubscribeQuotesDto>('ResubscribeQuotesDto')
         .pipe(
            takeUntil(this._unsubscriber)
         )
         .subscribe(x => this.onResubscribeQuotes(x.payload));

      this._messageBus
         .of<ResubscribeOptionStrategiesDto>('ResubscribeOptionStrategiesDto')
         .pipe(
            takeUntil(this._unsubscriber)
         )
         .subscribe(x => this.onResubscribeOptionStrategies(x.payload));

      console.info(`last-quote-cache|Initialized`);

      this.setQueueProcessorTimeout();

      const end = Date.now();

      this._messageBus.publish({
         topic: 'ServiceInitialized',
         payload: {
            time: end-start,
            name: 'LastQuoteCache'
         }
      });

      return Promise.resolve();
   }

   private setQueueProcessorTimeout() {
      this._tickersQueueProcessorInterval = setTimeout(() => {
         this.subscriptionsQueueProcessor()
            .finally(() => {
               this.setQueueProcessorTimeout();
            });
      }, 1000) as any;
   }

   //
   getLastQuote(ticker: string): QuoteDto {
      ticker = this.fixTicker(ticker);
      const q = this._quotesCache[ticker];
      return q;
   }

   //
   async getLastQuoteWithAwait(ticker: string): Promise<QuoteDto> {

      ticker = this.fixTicker(ticker);

      let lq = this.getLastQuote(ticker);

      if (!isNullOrUndefined(lq)) {
         return Promise.resolve(lq);
      }

      this.subscribeTicker(ticker);

      let attempt = 0;

      do {
         await delay(250);
         lq = this.getLastQuote(ticker);
         attempt++;
      } while (isNullOrUndefined(lq) && attempt < 21);

      return lq;
   }

   //
   getLastGreeks(ticker: string): GreeksDto {
      ticker = this.fixTicker(ticker);
      const g = this._greeksCache[ticker];
      return g;
   }

   //
   getLastStrategyPrice(strategyCode: string): StrategyPriceDto {
      return this._strategyPriceCache[strategyCode];
   }

   //
   getAllLastStrategyPrices(): StrategyPriceDto[] {
      return Object.values(this._strategyPriceCache);
   }

   getAllQuotes(): QuoteDto[] {
      return Object.values(this._quotesCache);
   }

   //
   subscribeTicker(ticker: string): QuoteDto {

      ticker = this.fixTicker(ticker);

      this.subscribeTickers([ticker]);

      const lq = this.getLastQuote(ticker);

      if (isTruthy(lq)) {
         this._messageBus.publishAsync({topic: 'QuoteDto', payload: [lq]});
      }

      return lq;
   }

   //
   subscribeTickers(tickers: string[]): {[ix: string]: QuoteDto} {

      this.fixTickers(tickers);

      // console.debug(`last-quote-cache|subscribe request for ${tickers.length} tickers.`);

      const validTickers = tickers.filter( t => isTruthy(t) );

      if (validTickers.length === 0) {
         return;
      }

      this._tickersSubscribeQueue.push(...validTickers);

      const mapObj = validTickers
         .filter(x => isTruthy(x))
         .map( t => this.getLastQuote(t))
         .filter( q => isTruthy(q))
         .reduce( ( prev, curr ) =>  {
            prev[curr.ticker] = curr;
            return prev;
         }, {});

      const values = Object.values(mapObj);
      if (values.length > 0) {
         this._messageBus.publishAsync({topic: 'QuoteDto', payload: values });
      }

      return mapObj;
   }

   //
   unsubscribeTicker(ticker: string) {
      ticker = this.fixTicker(ticker);
      this.unsubscribeTickers([ticker]);
   }

   //
   unsubscribeTickers(tickers: string[]) {

      this.fixTickers(tickers);

      // console.debug(`last-quote-cache|unsubscribe request for ${tickers.length} tickers`);

      const validTickers = tickers.filter( t => isTruthy(t) );

      if (validTickers.length === 0) {
         // console.debug(`last-quote-cache|no valid tickers to unsubscribe`);
         return;
      }

      // console.debug(`last-quote-cache|${validTickers.length} valid tickers to unsubscribe`);

      this._tickersUnsubscribeQueue.push(...validTickers);
   }

   //
   subscribeStrategyCodesDiff(alreadySubscribed: string[], toSubscribe: string[]) {

      const unsubscribe = alreadySubscribed.filter(x => !isNullOrUndefined(x) && !toSubscribe.includes(x));

      const subscribe = toSubscribe.filter(x => !isNullOrUndefined(x) && !alreadySubscribed.includes(x));

      this.changeOptionStrategiesSubscription(unsubscribe, subscribe).then(_ => {});
   }

   //
   subscribeTickersDiff(alreadySubscribed: string[], toSubscribe: string[]) {

      this.fixTickers(alreadySubscribed);
      this.fixTickers(toSubscribe);

      const subscribe = toSubscribe.filter(x => !alreadySubscribed.includes(x));
      const unsubscribe = alreadySubscribed.filter(x => !toSubscribe.includes(x));

      this.unsubscribeTickers(unsubscribe);
      this.subscribeTickers(subscribe);
   }

   //
   subscribeIfNotYet(ticker: string) {

      ticker = this.fixTicker(ticker);

      if (ticker in this._ticker2subscribers) {
         return;
      }

      if (this._tickersSubscribeQueue.indexOf(ticker) >= 0) {
         return;
      }

      this.subscribeTicker(ticker);
   }

   //
   private async changeOptionStrategiesSubscription(unsubscribe: string[], subscribe: string[]): Promise<void> {

      const filteredUnsub = [];

      unsubscribe.forEach(strategyCode => {

         if (!isTruthy(strategyCode)) {
            return;
         }

         const listenersCount = (this._strategy2subscribers[strategyCode] || 0) - 1;

         if (listenersCount <= 0) {

            delete this._strategy2subscribers[strategyCode];

            delete this._strategyPriceCache[strategyCode];

            filteredUnsub.push(strategyCode);

         } else {

            this._strategy2subscribers[strategyCode] = listenersCount;

         }

      });

      const filteredSub = [];

      subscribe.forEach(strategyCode => {

         if (!isTruthy(strategyCode)) {
            return;
         }

         const listenersCount = (this._strategy2subscribers[strategyCode] || 0) + 1;

         if (listenersCount === 1) {

            filteredSub.push(strategyCode);

         }

         this._strategy2subscribers[strategyCode] = listenersCount;
      });

      if (filteredSub.length === 0 && filteredUnsub.length === 0) {
         return;
      }

      filteredSub.forEach(x => {
         const ix = filteredUnsub.indexOf(x);
         if (ix >= 0) {
            filteredUnsub.splice(ix, 1);
         }
      });

      // console.debug(`last-quote-cache|strategy|change`, {unsub: filteredUnsub, sub: filteredSub});

      const cmd = new ChangeOptionStrategiesSubscriptionShell(filteredUnsub, filteredSub);

      try {

         await this._backendService.changeOptionStrategiesSubscription(cmd);

      } catch (e) {

         if (e.message) {
            if (e.message.indexOf('invalid session') < 0)  {
               console.error(e);
               this._toastr.error('"Option Strategies Subscription Change" operation completed with errors', 'Option Strategies');
            }
         }
      }
   }

   //
   private onQuoteMessage(quotes: QuoteDto[]): void {
      quotes.forEach(quote => {
         this._quotesCache[quote.ticker] = quote;
      });
   }

   //
   private onGreeksMessage(greek: GreeksDto) {
      this._greeksCache[greek.ticker] = greek;
   }

   //
   private onStrategyPriceMessage(dtos: StrategyPriceDto[]): void {
      dtos.forEach(dto => {
         this._strategyPriceCache[dto.strategyCode] = dto;
      });
   }

   //
   private async subscriptionsQueueProcessor(): Promise<void> {

      const unsubQueue = this.processUnsubscribeQueue();
      const subQueue = this.processSubscribeQueue();

      const netUnsubscribe = unsubQueue
         .filter(x => !subQueue.includes(x))
         .map(x => ({ ticker: x } as MarketDataRequestSpec) );

      const netSubscribe = subQueue
         .map(x => ({ ticker: x } as MarketDataRequestSpec) );

      netUnsubscribe.forEach(x => {
         // console.debug(`last-quote-cache|ticker|unsubscribe: ${x.ticker}`);
      })

      netSubscribe.forEach(x => {
         // console.debug(`last-quote-cache|ticker|subscribe: ${x.ticker}`);
      });

      try {

         if (netUnsubscribe.length === 0 && netSubscribe.length === 0) {
            if (unsubQueue.length > 0 || subQueue.length > 0) {
               // console.debug(`last-quote-cache|tickers|change: sub & unsub queues are empty`);
            }
            return;
         }

         const cmd = new ChangeMarketDataSubscriptionShell(netUnsubscribe, netSubscribe);

         await this._backendService.changeMarketDataSubscription(cmd);

      } catch (e) {

         if (e.message) {
            if (e.message.indexOf('invalid session') < 0)  {
               console.error(e);
               this._toastr.error('"Option Strategies Subscription Change" operation completed with errors', 'Option Strategies');
            }
         }
      } finally {
         //
      }
   }

    //
    private processUnsubscribeQueue(): string[] {

      if (this._tickersUnsubscribeQueue.length === 0) {
         return [];
      }

      const queueCopy = this._tickersUnsubscribeQueue.slice();

      this._tickersUnsubscribeQueue.length = 0;

      const netTickersToUnsubscribe: string[] = [];

      // console.debug(`last-quote-cache|ticker|processing unsubscribe queue...`);

      queueCopy.forEach(ticker => {

         const subscribersCount = (this._ticker2subscribers[ticker] || 0) - 1;

         if (subscribersCount <= 0) {

            delete this._ticker2subscribers[ticker];
            delete this._quotesCache[ticker];
            delete this._greeksCache[ticker];

            netTickersToUnsubscribe.push(ticker);

            // console.debug(`last-quote-cache|ticker|marked to unsubcsribe: ${ticker}`);

         } else {

            this._ticker2subscribers[ticker] = subscribersCount;

            // console.debug(`last-quote-cache|ticker|decrease subscribers: ${ticker} (${subscribersCount})`);
         }

      });

      return netTickersToUnsubscribe;
   }

   //
   private processSubscribeQueue(): string[] {

      if (this._tickersSubscribeQueue.length === 0) {
         return [];
      }

      const queueCopy = this._tickersSubscribeQueue.slice();

      this._tickersSubscribeQueue.length = 0;

      const newTickersToSubscribe: string[] = [];

      // console.debug(`last-quote-cache|ticker|processing subscribe queue...`);

      queueCopy.forEach(ticker => {

         // 1. if already subscribed, just increase counter and return

         const subscribersCount = this._ticker2subscribers[ticker] || 0;

         if (subscribersCount > 0) {

            const newSubscribersCount = subscribersCount + 1;

            this._ticker2subscribers[ticker] = subscribersCount + 1;

            // console.debug(`last-quote-cache|ticker|increase subscribers: ${ticker} (${newSubscribersCount})`);

            return;
         }

         // 2. if not subscribed, register first subscriber and send request
         this._ticker2subscribers[ticker] = 1;

         // console.debug(`last-quote-cache|ticker|marked to subscribe: ${ticker}`);

         newTickersToSubscribe.push(ticker);

      });

      return newTickersToSubscribe;
   }

   //
   private onResubscribeQuotes(msg: ResubscribeQuotesDto): void {
      const tickers = Object.keys(this._ticker2subscribers);
      // console.debug(`last-quote-cache|quotes|resubscribing tickers. Count=${tickers.length}`);
      this._ticker2subscribers = {};
      this.subscribeTickers(tickers);
   }

   //
   private onResubscribeOptionStrategies(msg: ResubscribeOptionStrategiesDto): void {
      const strategies = Object.keys(this._strategy2subscribers);
      // console.debug(`last-quote-cache|strategy|resubscribing. Count=${strategies.length}`);
      this._strategy2subscribers = {};
      this.subscribeStrategyCodesDiff([], strategies);
   }

   //
   private fixTickers(tickers: string[]) {
      for (let i = 0; i < tickers.length; i++) {
         const oldTicker = tickers[i];
         const newTicker = this.fixTicker(oldTicker);
         tickers[i] = newTicker;
      }
   }

   //
   private fixTicker(ticker: string): string {
      if (isCashSettledOptionTicker(ticker)) {
         ticker = convertCashSettledTickerToNormal(ticker);
      }

      return ticker;
   }
}
