import AWS from "aws-sdk/global";
import AWSMqtt from "aws-mqtt-client";
import { pipe, pathOr, startsWith, is } from "ramda";
import { Observable, defer } from "rxjs";
import { manifest } from "api/aws/manifest";
import { refreshToken$, refreshTokenLock } from "api/utils/refresh_token";
import { traceLog } from "api/utils/trace";
import shouldRefreshToken from "api/utils/should_refresh_token";

let mqttClient;
let subscriptions = [];

const handleMessage = (topic, message) => {
  for (const s of subscriptions) {
    if (
      (s.topic.endsWith("#") && topic.startsWith(s.topic.slice(0, -1))) ||
      s.topic === topic
    ) {
      s.handler(JSON.parse(message.toString()));
    }
  }
};

const authorize = () => {
  return AWS.config.credentials
    .getPromise()
    .then(() => AWS.config.credentials)
    .catch(err => {
      if (shouldRefreshToken(err)) {
        traceLog("[mqtt] refreshing token");
        return refreshTokenLock
          .singleton(defer(() => refreshToken$()))
          .toPromise()
          .then(() => AWS.config.credentials);
      } else {
        return Promise.reject(err);
      }
    });
};

export const getIotEndpoint = ({ IotEndpoint, IotEndpointATS }) =>
  IotEndpointATS || IotEndpoint;

const connect = () => {
  return new Promise((resolve, reject) => {
    if (mqttClient && mqttClient.connected) {
      resolve(mqttClient);
    } else {
      const createClient = creds => {
        const currentCreds = creds;

        mqttClient = new AWSMqtt({
          accessKeyId: creds.accessKeyId,
          secretAccessKey: creds.secretAccessKey,
          sessionToken: creds.sessionToken,
          endpointAddress: getIotEndpoint(manifest),
          region: AWS.config.region,
          reconnectPeriod: 5000
        });

        mqttClient._originalReconnect = mqttClient._reconnect;
        mqttClient._reconnect = () => {
          authorize().then((creds, err) => {
            if (err) reject(err);
            if (
              creds &&
              currentCreds &&
              creds.sessionToken !== currentCreds.sessionToken
            ) {
              mqttClient.end();
              createClient(creds);
            } else {
              mqttClient._originalReconnect();
            }
          });
        };

        mqttClient.on("connect", () => {
          for (const s of subscriptions) {
            mqttClient.subscribe(s.topic);
          }
          resolve(mqttClient);
        });

        mqttClient.on("message", handleMessage);
      };
      authorize().then(creds => createClient(creds));
    }
  });
};

const _badTopicCheck = (type, topic) => {
  if (topic === undefined) {
    console.error(`Topic was undefined for ${type}`); // eslint-disable-line
    return true;
  } else {
    return false;
  }
};

const _subscribe = (client, topic) => {
  if (_badTopicCheck("sub", topic)) return;
  client.subscribe(
    topic,
    {
      qos: 0
    },
    err => {
      if (err) {
        console.error("Unable to subscribe", err); // eslint-disable-line
      }
    }
  );
};

export const subscribe = (topic, handler) => {
  if (_badTopicCheck("sub", topic)) return;
  if (!subscriptions.find(s => s.topic === topic)) {
    subscriptions.push({
      topic,
      handler
    });
    connect().then(client => _subscribe(client, topic));
  }
};

export const unsubscribe = topic => {
  if (_badTopicCheck("unsub", topic)) return;
  const remaining = subscriptions.filter(s => s.topic !== topic);
  if (remaining.length < subscriptions.length) {
    subscriptions = remaining;
    connect().then(client => {
      client.unsubscribe(topic);
    });
  }
};

export const publish = (topic, data) => {
  if (_badTopicCheck("pub", topic)) return;
  return connect().then(client => {
    const payload = is(Object, data) ? JSON.stringify(data) : data;
    client.publish(topic, payload);
  });
};

export const eventTopic = domainPath => `event${domainPath}#`;

export const thingUpdateTopic = (domainPath, thingName, type) => {
  if (domainPath === undefined) return undefined;
  return `thing-update${type ? "-" + type : ""}${domainPath}${thingName}`;
};

export const hasTopic = topic =>
  pipe(pathOr("", ["payload", "topic"]), startsWith(topic));

export const hasThingUpdateTopic = hasTopic("thing-update");
export const hasEventTopic = hasTopic("event");

export const decorateTopic = (topic, env) => {
  if (_badTopicCheck("decorate topic", topic)) return;
  // if we are not on the prod stack we want the topic to look like
  // envName-topic when we subscribe or unsubscribe
  const topicEnvPrefix = env && env.toLowerCase() !== "prod" ? `${env}-` : "";
  return `${topicEnvPrefix}${topic}`;
};

export const client = (env, domainPath, thingName) => {
  // thingNameDomainPathType
  if (thingName === undefined) return;
  const topics = ["update", "delta"].map(type => ({
    id: type,
    topic: decorateTopic(
      thingUpdateTopic(
        domainPath,
        thingName,
        type === "update" ? undefined : type
      ),
      env
    )
  }));
  return connect().then(() => ({
    subscribe: callback =>
      subscribe(topics[1].topic, delta => {
        callback({
          delta
        });
      }),
    publish: data => publish(topics[0].topic, data),
    unsubscribe: () => {
      topics.forEach(({ topic }) => unsubscribe(topic));
    },
    INTERVAL: 1000
  }));
};

export const subscribe$ = topic =>
  Observable.create(subject => {
    const handler = data => subject.next(data);
    subscribe(topic, handler);
    return () => unsubscribe(topic);
  });

export const publish$ = (topic, data) =>
  Observable.fromPromise(publish(topic, data));
