socketStore.ts 3.66 KB
import { defineStore } from "pinia";
import { KeyBoundComponentList, SocketReceiveMessageType, SocketSendMessageType, SocketStoreType } from '@/store/external/modules/socketStore.d'
import { CreateComponentType } from "@/packages/index.d";
import { RequestContentTypeEnum } from "@/enums/external/httpEnum";

const KEYS_SEPARATOR = ','

export const useSocketStore = defineStore({
  id: 'useSocketStore',
  state: (): SocketStoreType => ({
    connectionPool: {},
    subscribePool: [],
    cacheMessage: {}
  }),
  getters: {

  },
  actions: {
    /**
     * @description 更新连接池
     */
    updateConnectionPool(entityId: string, keys: string[], componentId: string) {

      const isExist = Reflect.has(this.connectionPool, entityId)
      if (isExist) {
        const temp = Reflect.get(this.connectionPool, entityId)
        keys.forEach(key => {
          const isExistKey = Reflect.has(temp, key)
          if (!isExistKey) {
            const keyBindEntityIdList = Reflect.get(temp, key) || []
            Reflect.set(temp, key, [...keyBindEntityIdList, { componentId }] as KeyBoundComponentList[])
          }
        })
      } else {
        const keysRecord: Record<string, KeyBoundComponentList[]> = {}

        keys.forEach(key => {
          Reflect.set(keysRecord, key, [{ componentId }])
        })

        Reflect.set(this.connectionPool, entityId, keysRecord)

      }
      return this.refreshSubscribedMessage(entityId)
    },

    /**
     * @description 获取重新刷新的消息
     * @param entityId 
     * @returns 
     */
    refreshSubscribedMessage(entityId: string) {
      const isExist = this.subscribePool.findIndex(item => item.entityId === entityId)
      if (!~isExist) {
        const subscribeId = this.subscribePool.length ? Math.max(...this.subscribePool.map(item => item.subscribeId)) + 1 : 0
        this.subscribePool.push({ subscribeId, entityId })
      }
      const subscribeId = this.subscribePool.find(item => item.entityId === entityId)!.subscribeId!

      return this.createMessage(subscribeId, entityId)
    },

    /**
     * @description 创建消息
     * @param subscribeId 
     * @param entityId 
     * @returns 
     */
    createMessage(subscribeId: number, entityId: string): SocketSendMessageType {
      const keys = Object.keys(Reflect.get(this.connectionPool, entityId)).join(',')

      return {
        tsSubCmds: [
          {
            entityType: 'DEVICE',
            entityId: entityId,
            scope: "LATEST_TELEMETRY",
            cmdId: subscribeId,
            keys
          }
        ]
      }
    },

    /**
     * @description 订阅
     * @param targetComponent 
     */
    subscribe(targetComponent: CreateComponentType) {
      const { id: componentId, request } = targetComponent
      const { requestContentType, requestParams } = request
      if ((requestContentType as RequestContentTypeEnum) === RequestContentTypeEnum.WEB_SOCKET) {
        const { Params } = requestParams
        const { entityId = '', keys = '' } = Params
        return this.updateConnectionPool(entityId, keys.split(KEYS_SEPARATOR), componentId)
      }
    },

    /**
     * @description 缓存消息
     * @param message 
     */
    setCacheMessage(message: SocketReceiveMessageType) {
      const { subscriptionId } = message
      const existedIndex = this.subscribePool.findIndex(item => item.subscribeId === subscriptionId)

      if (~existedIndex) {
        const isExistMessage = Reflect.get(this.cacheMessage, subscriptionId)
        if (!isExistMessage) Reflect.set(this.cacheMessage, subscriptionId, [])
        Reflect.set(this.cacheMessage, subscriptionId, [...Reflect.get(this.cacheMessage, subscriptionId), message])
      }
    }
  }
})