import { Backoff } from '@twilio/operation-retrier';
import { SyncError } from './utils/syncerror';
import log from './utils/logger';

import { SyncEntity } from './entity';
import { Configuration, Network } from './interfaces/services';

import { TransportUnavailableError } from 'twilsock';

/**
 * A data container used by the Subscriptions class to track subscribed entities' local
 * representations and their state.
 */
class SubscribedEntity {
  private readonly localObject: SyncEntity;
  pendingCorrelationId: number;
  pendingAction: string;
  rejectedWithError: any;
  retryCount: number;

  private established: boolean;

  constructor(entity: SyncEntity) {
    this.localObject = entity;
    this.pendingCorrelationId = null;
    this.pendingAction = null;
    this.established = false;
    this.retryCount = 0;
  }

  get sid(): string {
    return this.localObject.sid;
  }

  get type(): string {
    return this.localObject.type;
  }

  get lastEventId(): number {
    return this.localObject.lastEventId;
  }

  // below properties are specific to Insights only
  get indexName(): string {
    return this.localObject.indexName;
  }

  get queryString(): string {
    return this.localObject.queryString;
  }

  get isEstablished(): boolean {
    return this.established;
  }

  update(event, isStrictlyOrdered) {
    this.localObject._update(event, isStrictlyOrdered);
  }

  updatePending(action: Action, correlationId: number) {
    this.pendingAction = action;
    this.pendingCorrelationId = correlationId;
  }

  reset() {
    this.updatePending(null, null);
    this.retryCount = 0;
    this.established = false;
    this.setSubscriptionState('none');
  }

  markAsFailed(message) {
    this.rejectedWithError = message.error;
    this.updatePending(null, null);
    this.localObject.reportFailure(
      new SyncError(`Failed to subscribe on service events: ${message.error.message}`, message.error.status, message.error.code));
  }

  complete(eventId: number): void {
    this.updatePending(null, null);
    this.established = true;
    this.localObject._advanceLastEventId(eventId);
  }

  setSubscriptionState(newState) {
    this.localObject._setSubscriptionState(newState);
  }
}

interface SubscriptionsServices {
  config: Configuration;
  network: Network;
}

type Action = 'establish' | 'cancel' | null;

interface PokeBatch {
  action: Action;
  subscriptions: SubscribedEntity[];
}

type PokeReason = 'ttl' | 'reconnect';

/**
 * @class Subscriptions
 * @classdesc A manager which, in batches of varying size, continuously persists the
 *      subscription intent of the caller to the Sync backend until it achieves a
 *      converged state.
 */
class Subscriptions {
  private services: SubscriptionsServices;

  // This is always the full set of subscribables (SubscribedEntity instances) intended by
  // the client. At any point, whatever the state of these subscriptions on the server, this
  // is the intent of the user to which the SDK must converge.
  private subscriptions: Map<string, SubscribedEntity>;

  // This includes the set of subscribables (SubscribedEntity instances) for whom a request
  // has been dispatched (whether or not this particular request ultimately succeeds) to
  // establish a live subscription. Entities are removed when the corresponding "cancel"
  // request is dispatched.
  private persisted: Map<string, SubscribedEntity>;

  private latestPokeResponseArrivalTimestampByCorrelationId: Map<number, number>;

  private backoff: Backoff;

  private isConnected: boolean = false;

  private maxBatchSize: number = 100;

  // If the server includes a `ttl_in_s` attribute in the poke response, subscriptionTtlTimer is started for that duration
  // such that when it fires, it repokes the entire sync set (i.e., emulates a reconnect). Every reconnect resets the timer.
  // After the timer has fired, the first poke request includes a `reason: ttl` attribute in the body.
  private subscriptionTtlTimer: any | null = null;
  private pendingPokeReason: PokeReason = null;

  /**
   * @constructor
   * Prepares a new Subscriptions manager object with zero subscribed or persisted subscriptions.
   *
   * @param {object} config may include a key 'backoffConfig', wherein any of the parameters
   *      of Backoff.exponential (from npm 'backoff') are valid and will override the defaults.
   *
   * @param {Network} must be a viable running Sync Network object, useful for routing requests.
   */
  constructor(services: SubscriptionsServices) {
    this.services = services;
    this.subscriptions = new Map<string, SubscribedEntity>();
    this.persisted = new Map<string, SubscribedEntity>();
    this.latestPokeResponseArrivalTimestampByCorrelationId = new Map<number, number>();

    const defaultBackoffConfig = {
      randomisationFactor: 0.2,
      initialDelay: 100,
      maxDelay: 2 * 60 * 1000
    };
    this.backoff = Backoff.exponential(Object.assign(defaultBackoffConfig, this.services.config.backoffConfig));

    // This block is triggered by #_persist. Every request is executed in a series of (ideally 1)
    // backoff 'ready' event, at which point a new subscription set is calculated.
    this.backoff.on('ready', () => {
      let {action: action, subscriptions: subscriptionRequests} = this.getSubscriptionUpdateBatch();
      if (action) {
        this.applyNewSubscriptionUpdateBatch(action, subscriptionRequests);
      } else {
        this.backoff.reset();
        log.debug('All subscriptions resolved.');
      }
    });
  }

  private getSubscriptionUpdateBatch(): PokeBatch {
    function subtract(these: Map<string, SubscribedEntity>, those: Map<string, SubscribedEntity>, action: Action, limit) {
      let result = [];
      for (let [thisKey, thisValue] of these) {
        const otherValue = those.get(thisKey);
        if (!otherValue && action !== thisValue.pendingAction && !thisValue.rejectedWithError) {
          result.push(thisValue);
          if (limit && result.length >= limit) {
            break;
          }
        }
      }
      return result;
    }

    let listToAdd = subtract(this.subscriptions, this.persisted, 'establish', this.maxBatchSize);
    if (listToAdd.length > 0) {
      return {action: 'establish', subscriptions: listToAdd};
    }

    let listToRemove = subtract(this.persisted, this.subscriptions, 'cancel', this.maxBatchSize);
    if (listToRemove.length > 0) {
      return {action: 'cancel', subscriptions: listToRemove};
    }

    return {action: null, subscriptions: null};
  }

  private persist() {
    this.backoff.backoff();
  }

  private async applyNewSubscriptionUpdateBatch(action, requests: SubscribedEntity[]) {
    if (!this.isConnected) {
      log.debug(`Twilsock connection (required for subscription) not ready; waiting…`);
      this.backoff.reset();
      return;
    }

    // Keeping in mind that events may begin flowing _before_ we receive the response
    requests = this.processLocalActions(action, requests);

    const correlationId = new Date().getTime();
    for (const subscribed of requests) {
      this.recordActionAttemptOn(subscribed, action, correlationId);
    }

    let reason: PokeReason = this.pendingPokeReason;
    this.pendingPokeReason = null;

    // Send this batch to the service
    try {
      let response = await this.request(action, correlationId, reason, requests);

      let newMaxBatchSize = response.body.max_batch_size;
      if (!isNaN(parseInt(newMaxBatchSize)) && isFinite(newMaxBatchSize) && newMaxBatchSize > 0) {
        this.maxBatchSize = newMaxBatchSize;
      }

      if (!this.subscriptionTtlTimer) {
        let subscriptionTtlInS = response.body.ttl_in_s;
        let isNumeric = !isNaN(parseFloat(subscriptionTtlInS)) && isFinite(subscriptionTtlInS);
        let isValidTtl = isNumeric && subscriptionTtlInS > 0;
        if (isValidTtl) {
          this.subscriptionTtlTimer = setTimeout(() => this.onSubscriptionTtlElapsed(), subscriptionTtlInS * 1000);
        }
      }

      if (action === 'establish') {
        const estimatedDeliveryInMs = response.body.estimated_delivery_in_ms;
        let isNumeric = !isNaN(parseFloat(estimatedDeliveryInMs)) && isFinite(estimatedDeliveryInMs);
        let isValidTimeout = isNumeric && estimatedDeliveryInMs > 0;
        if (isValidTimeout) {
          setTimeout(() => this.verifyPokeDelivery(correlationId, estimatedDeliveryInMs, requests), estimatedDeliveryInMs);
        } else {
          log.error(`Invalid timeout: ${estimatedDeliveryInMs}`);
        }
        requests.filter(r => r.pendingCorrelationId === correlationId)
          .forEach(r => r.setSubscriptionState('response_in_flight'));
      }
      this.backoff.reset();
    } catch (e) {
      for (const attemptedSubscription of requests) {
        this.recordActionFailureOn(attemptedSubscription, action);
      }

      if (e instanceof TransportUnavailableError) {
        log.debug(`Twilsock connection (required for subscription) not ready (c:${correlationId}); waiting…`);
        this.backoff.reset();
      } else {
        log.debug(`Failed an attempt to ${action} subscriptions (c:${correlationId}); retrying`, e);
        this.persist();
      }
    }
  }

  private verifyPokeDelivery(correlationId: number, estimatedDeliveryInMs: number, requests: SubscribedEntity[]) {
    const lastReceived = this.latestPokeResponseArrivalTimestampByCorrelationId.get(correlationId);
    const silencePeriod = lastReceived ? (new Date().getTime() - lastReceived)
      : estimatedDeliveryInMs;
    if (silencePeriod >= estimatedDeliveryInMs) {
      // If we haven't received _any_ responses from that poke request for the duration of estimated_delivery_in_ms, poke again
      requests
        .filter(r => r.pendingCorrelationId === correlationId)
        .forEach(r => {
          r.updatePending(null, null);
          r.retryCount++;
          this.persisted.delete(r.sid);
        });
      this.persist();
      this.latestPokeResponseArrivalTimestampByCorrelationId.delete(correlationId);
    } else {
      // Otherwise, the poke responses are probably in transit and we should wait for them
      const timeoutExtension = estimatedDeliveryInMs - silencePeriod;
      setTimeout(() => this.verifyPokeDelivery(correlationId, estimatedDeliveryInMs, requests), timeoutExtension);
    }
  }

  private processLocalActions(action, requests) {
    if (action === 'cancel') {
      return requests.filter(request => !request.rejectedWithError);
    }
    return requests;
  }

  private recordActionAttemptOn(attemptedSubscription: SubscribedEntity, action, correlationId) {
    attemptedSubscription.setSubscriptionState('request_in_flight');
    if (action === 'establish') {
      this.persisted.set(attemptedSubscription.sid, attemptedSubscription);
      attemptedSubscription.updatePending(action, correlationId);
    } else { // cancel
      let persistedSubscription = this.persisted.get(attemptedSubscription.sid);
      if (persistedSubscription) {
        persistedSubscription.updatePending(action, correlationId);
      }
    }
  }

  private recordActionFailureOn(attemptedSubscription: SubscribedEntity, action) {
    attemptedSubscription.setSubscriptionState('none');
    attemptedSubscription.updatePending(null, null);
    if (action === 'establish') {
      this.persisted.delete(attemptedSubscription.sid);
    }
  }

  private request(action, correlationId, reason: PokeReason, objects: SubscribedEntity[]) {
    let requests = objects.map(object => ({
        object_sid: object.sid,
        object_type: object.type,
        last_event_id: action === 'establish' ? object.lastEventId : undefined,
        index_name: action === 'establish' ? object.indexName : undefined,
        query_string: action === 'establish' ? object.queryString : undefined,
      })
    );
    let retriedRequests = objects.filter(a => a.retryCount > 0).length;

    log.debug(`Attempting '${action}' request (c:${correlationId}):`, requests);
    const requestBody: any = {
      event_protocol_version: 3,
      action,
      correlation_id: correlationId,
      retried_requests: retriedRequests,
      ttl_in_s: -1,
      requests
    };
    if (reason === 'ttl') {
      requestBody.reason = reason;
    }

    return this.services.network.post(this.services.config.subscriptionsUri, requestBody);
  }

  /**
   * Establishes intent to be subscribed to this entity. That subscription will be effected
   * asynchronously.
   * If subscription to the given sid already exists, it will be overwritten.
   *
   * @param {String} sid should be a well-formed SID, uniquely identifying a single instance of a Sync entity.
   * @param {Object} entity should represent the (singular) local representation of this entity.
   *      Incoming events and modifications to the entity will be directed at the _update() function
   *      of this provided reference.
   *
   * @return undefined
   */
  add(sid: string, entity: SyncEntity): void {
    log.debug(`Establishing intent to subscribe to ${sid}`);
    const existingSubscription = this.subscriptions.get(sid);
    if (existingSubscription && entity && existingSubscription.lastEventId === entity.lastEventId) {
      // If last event id is the same as before - we're fine
      return;
    }

    this.persisted.delete(sid);
    this.subscriptions.set(sid, new SubscribedEntity(entity));
    this.persist();
  }

  /**
   * Establishes the caller's intent to no longer be subscribed to this entity. Following this
   * call, no further events shall be routed to the local representation of the entity, even
   * though a server-side subscription may take more time to actually terminate.
   *
   * @param {string} sid should be any well-formed SID, uniquely identifying a Sync entity.
   *      This call only has meaningful effect if that entity is subscribed at the
   *      time of call. Otherwise does nothing.
   *
   * @return undefined
   */
  remove(sid: string): void {
    log.debug(`Establishing intent to unsubscribe from ${sid}`);
    const removed = this.subscriptions.delete(sid);
    if (removed) {
      this.persist();
    }
  }

  /**
   * The point of ingestion for remote incoming messages (e.g. new data was written to a map
   * to which we are subscribed).
   *
   * @param {object} message is the full, unaltered body of the incoming notification.
   *
   * @return undefined
   */
  acceptMessage(message: any, isStrictlyOrdered: boolean): void {
    log.trace('Subscriptions received', message);
    if (message.correlation_id) {
      this.latestPokeResponseArrivalTimestampByCorrelationId.set(message.correlation_id, new Date().getTime());
    }

    let event_type;
    switch (message.event_type) {
      case 'subscription_established':
        this.applySubscriptionEstablishedMessage(message.event, message.correlation_id);
        break;
      case 'subscription_canceled':
        this.applySubscriptionCancelledMessage(message.event, message.correlation_id);
        break;
      case 'subscription_failed':
        this.applySubscriptionFailedMessage(message.event, message.correlation_id);
        break;
      case (event_type = message.event_type.match(/^(?:map|list|document|stream|live_query)_/) || {}).input: {
        let typedSid;
        switch (event_type[0]) {
          case 'map_':
            typedSid = message.event.map_sid;
            break;
          case 'list_':
            typedSid = message.event.list_sid;
            break;
          case 'document_':
            typedSid = message.event.document_sid;
            break;
          case 'stream_':
            typedSid = message.event.stream_sid;
            break;
          case 'live_query_':
            typedSid = message.event.query_id;
            // hack to mark replay events for LiveQuery as strictly ordered, due to lack of special type of notification for them
            // (normally only replay events would have `twilio.sync.event` type, but LiveQuery non-replay events were also assigned
            // to this type in legacy clients, which we have to support now; hence a hack)
            isStrictlyOrdered = false; // explicitly override it due to code in router.ts does not know about LiveQueries
            if (message.strictly_ordered === true) {
              isStrictlyOrdered = true;
            }
            break;
          default:
            typedSid = undefined;
        }

        this.applyEventToSubscribedEntity(typedSid, message, isStrictlyOrdered);
        }
        break;
      default:
        log.debug(`Dropping unknown message type ${message.event_type}`);
        break;
    }
  }

  private applySubscriptionEstablishedMessage(message, correlationId) {
    const sid = message.object_sid;
    let subscriptionIntent = this.persisted.get(message.object_sid);
    if (subscriptionIntent && subscriptionIntent.pendingCorrelationId === correlationId) {
      if (message.replay_status === 'interrupted') {
        log.debug(`Event Replay for subscription to ${sid} (c:${correlationId}) interrupted; continuing eagerly.`);
        subscriptionIntent.updatePending(null, null);
        this.persisted.delete(subscriptionIntent.sid);
        this.backoff.reset();
      } else if (message.replay_status === 'completed') {
        log.debug(`Event Replay for subscription to ${sid} (c:${correlationId}) completed. Subscription is ready.`);
        subscriptionIntent.complete(message.last_event_id);
        this.persisted.set(message.object_sid, subscriptionIntent);
        subscriptionIntent.setSubscriptionState('established');
        this.backoff.reset();
      }
    } else {
      log.debug(`Late message for ${message.object_sid} (c:${correlationId}) dropped.`);
    }
    this.persist();
  }

  private applySubscriptionCancelledMessage(message, correlationId) {
    let persistedSubscription = this.persisted.get(message.object_sid);
    if (persistedSubscription && persistedSubscription.pendingCorrelationId === correlationId) {
      persistedSubscription.updatePending(null, null);
      persistedSubscription.setSubscriptionState('none');
      this.persisted.delete(message.object_sid);
    } else {
      log.debug(`Late message for ${message.object_sid} (c:${correlationId}) dropped.`);
    }
    this.persist();
  }

  private applySubscriptionFailedMessage(message, correlationId) {
    const sid = message.object_sid;
    let subscriptionIntent = this.subscriptions.get(sid);
    let subscription = this.persisted.get(sid);
    if (subscriptionIntent && subscription) {
      if (subscription.pendingCorrelationId === correlationId) {
        log.error(`Failed to subscribe on ${subscription.sid}`, message.error);
        subscription.markAsFailed(message);
        subscription.setSubscriptionState('none');
      }
    } else if (!subscriptionIntent && subscription) {
      this.persisted.delete(sid);
      subscription.setSubscriptionState('none');
    }

    this.persist();
  }

  private applyEventToSubscribedEntity(sid: string, message: any, isStrictlyOrdered: boolean): void {
    if (!sid) {
      return;
    }

    // Looking for subscription descriptor to check if poke has been completed
    isStrictlyOrdered = isStrictlyOrdered || (() => {
      let subscription = this.persisted.get(sid);
      return subscription && subscription.isEstablished;
    })();

    // Still searching for subscriptionIntents. User could remove subscription already
    let subscriptionIntent = this.subscriptions.get(sid);
    if (subscriptionIntent) {
      message.event.type = message.event_type;
      subscriptionIntent.update(message.event, isStrictlyOrdered);
    } else {
      log.debug(`Message dropped for SID '${sid}', for which there is no subscription.`);
    }
  }

  onConnectionStateChanged(isConnected: boolean) {
    this.isConnected = isConnected;
    if (isConnected) {
      this.poke('reconnect');
    }
  }

  private onSubscriptionTtlElapsed() {
    if (this.isConnected) {
      this.poke('ttl');
    }
  }

  /**
   * Prompts a playback of any missed changes made to any subscribed object. This method
   * should be invoked whenever the connectivity layer has experienced cross-cutting
   * delivery failures that would affect the entire local sync set. Any tangible result
   * of this operation will result in calls to the _update() function of subscribed
   * Sync entities.
   */
  private poke(reason: PokeReason) {
    log.debug(`Triggering event replay for all subscriptions, reason=${reason}`);
    this.pendingPokeReason = reason;
    if (this.subscriptionTtlTimer) {
      clearTimeout(this.subscriptionTtlTimer);
      this.subscriptionTtlTimer = null;
    }

    let failedSubscriptions = [];

    for (let it of this.persisted.values()) {
      it.reset();
      if (it.rejectedWithError) {
        failedSubscriptions.push(it);
      }
    }

    this.persisted.clear();
    for (let it of failedSubscriptions) {
      this.persisted.set(it.sid, it);
    }
    this.persist();
  }

  /**
   * Stops all communication, clears any subscription intent, and returns.
   */
  shutdown() {
    this.backoff.reset();
    this.subscriptions.clear();
  }
}

export { SubscriptionsServices, Subscriptions };
