import { io } from "socket.io-client";
import { toAuthUserObj, uid, waitForCondition } from "../util/algorithm";
import { authSignal } from "../util/signal";
import { CloudRunWithoutCache } from "./config";
import { AccountStore } from "../constants/Account";

const webSocketStore = { channelSubscribers: [] };

function globalMessageHandler(message) {
  for (const s of webSocketStore.channelSubscribers) {
    if (s.channel === message.channel) {
      s.fire(message);
    }
  }
}

async function waitForWebSocketConnection() {
  const params = {timeout: 30000, label: 'wait for websocket connection'};
  const condition = () => webSocketStore.socket && webSocketStore.subscribe;
  await waitForCondition(condition, params);
}

async function subscribeToChannel(channel, listener) {
  await waitForWebSocketConnection();
  if (webSocketStore.socket && webSocketStore.subscribe) {
    console.log("subscribe to channel...", channel)
    webSocketStore.subscribe(channel, listener);
  } else {
    console.log(
      `Failed to subscriber to channel: ${channel}. Websocket not connected!`
    );
  }
}

async function unsubscribeToChannel(channel, listener) {
  await waitForWebSocketConnection();
  if (webSocketStore.socket && webSocketStore.unsubscribe) {
    webSocketStore.unsubscribe(channel, listener);
  } else {
    console.log(
      `Failed to unsubscriber to channel: ${channel}. Websocket not connected!`
    );
  }
}

function startWebSocketConnection(url, token) {
  const socket = io(url, {
    auth: {
      token: token, // Replace with the actual token
    },
    transports: ["websocket"],
  });
  webSocketStore.socket = socket;
  webSocketStore.subscribe = (channel, listener) => {
    webSocketStore.channelSubscribers.push({
      channel,
      fire: listener,
    });
    socket.emit("subscribe", channel);
  };
  webSocketStore.unsubscribe = (channel, listener) => {
    webSocketStore.channelSubscribers =
      webSocketStore.channelSubscribers.filter((s) => s.fire !== listener);
    socket.emit("unsubscribe", channel);
  };

  socket.on("connect", () => {
    console.log("Connected to WebSocket server");
    // Remove any pre-existing listeners to prevent duplicates
    socket.off("message");

    // Listen for messages
    socket.on("message", (data) => {
      console.log("Received message:", data.message);
      globalMessageHandler(data);
    });

    for (const s of webSocketStore.channelSubscribers) {
      socket.emit("subscribe", s.channel);
    }
  });
}

const startNotification = async (userId, handler) => {
  try {
    const token = await generateAccessToken();
    const url = AccountStore.LIVECONNECT_URL;
    console.log("startNotification()", {url, token})
    startWebSocketConnection(url, token);
  } catch (e) {
    console.log("startNotification error", e);
  }
};

const trigger = async ({ subject, id, params }) => {
  globalMessageHandler({
    channel: subject,
    timestamp: id,
    params,
  })
};

const SUBSCRIPTIONS = {
  subscription: null,
  listeners: {}
};

const subscribe = async (subject, handle) => {
  if (handle) {
    try {
      let map = SUBSCRIPTIONS.listeners[subject];
      if (!map) {
        map = {};
        SUBSCRIPTIONS.listeners[subject] = map;
      }
      const key = uid();
      const listener = (messageObj) => {
        const {channel, message, timestamp} = messageObj;
        const id = timestamp;
        if (message) {
          const authUser = authSignal.authUser;
          const authUserObj = toAuthUserObj(authUser);
          const { userId, subject, isJob, params } = message;
          console.log("handle message", {channel, subject}, messageObj, authUserObj)
          if (isJob) {
            handle(id, params);
          } else {
            if (userId && authUserObj?.uid && authUserObj.uid !== userId) {
              handle(id, params);
            }
          }
        }
      }
      map[key] = listener;
      subscribeToChannel(subject, listener);
      return key;
    } catch (error) {
      // ignore this error
    }
  }
};

const unsubscribe = async (subject, key) => {
  try {

    let map = SUBSCRIPTIONS.listeners[subject];
    if (!map) {
      map = {};
      SUBSCRIPTIONS.listeners[subject] = map;
    }
    const listener = map[key];
    if (listener) {
      unsubscribeToChannel(subject, listener);
      map[key] = undefined;
    }
  } catch (error) {
    console.error("system unsubcribe failed", error);
  }
};

const notify = async (subject, params) => {
  try {
    const param = { subject, params };
    await CloudRunWithoutCache("notification_publish", param);
  } catch (error) {
    console.error("system import failed", error);
  }
};

const apiNotify = async (subject, isJob, params) => {
  try {
    const param = { subject, isJob, params };
    await CloudRunWithoutCache("notification_publish", param);
  } catch (error) {
    console.error("system import failed", error);
  }
};

const generateAccessToken = async () => {
  try {
    const param = {};
    return await CloudRunWithoutCache(
      "notification_generateAccessToken",
      param
    );
  } catch (error) {
    console.error("system import failed", error);
  }
};

const notification = {
  startNotification,
  subscribe,
  unsubscribe,
  trigger,
  notify,
  apiNotify,
};

export default notification;
