import { Observable } from "rxjs";
import { decorateTopic, hasTopic } from "utils/mqtt_utils";
import {
  MQTT_PUBLISH,
  MQTT_SUBSCRIBE,
  MQTT_UNSUBSCRIBE,
  mqttDataFailure,
  mqttDataReceived,
  mqttPublishFailure,
  mqttPublishSuccess,
  mqttSubscribeSuccess,
  mqttUnsubscribeSuccess
} from "ducks/mqtt";

export const mqttSubscribeEpic = (
  action$,
  store,
  { mqttUtils, authSelectors }
) =>
  action$
    .ofType(MQTT_SUBSCRIBE)
    .mergeMap(({ payload: { topic, metadata } }) => {
      const unsubscribe$ = action$
        .ofType(MQTT_UNSUBSCRIBE)
        .filter(hasTopic(topic))
        .map(action => mqttUnsubscribeSuccess({ topic, metadata }))
        .take(1);

      const data$ = mqttUtils
        .subscribe$(
          decorateTopic(topic, authSelectors.getEnv(store.getState()))
        )
        .map(data => mqttDataReceived({ topic, data, metadata }))
        .startWith(mqttSubscribeSuccess({ topic, metadata }))
        .catch(error =>
          Observable.of(mqttDataFailure({ topic, metadata, error }))
        )
        .takeUntil(unsubscribe$);

      return Observable.merge(data$, unsubscribe$);
    });

export const mqttPublishEpic = (action$, store, { mqttUtils, authSelectors }) =>
  action$.ofType(MQTT_PUBLISH).mergeMap(({ payload: { topic, data } }) =>
    mqttUtils
      .publish$(
        decorateTopic(topic, authSelectors.getEnv(store.getState())),
        data
      )
      .map(() => mqttPublishSuccess({ topic, data }))
      .catch(error => Observable.of(mqttPublishFailure({ topic, data, error })))
  );
