socketStore.ts 9.42 KB
import { defineStore } from "pinia";
import { KeyBoundComponentList, SocketComponentRecord, SocketReceiveMessageType, SocketSendMessageItemType, SocketSendMessageType, SocketStoreType, UnsubscribePoolType } from '@/store/external/modules/socketStore.d'
import { CreateComponentType } from "@/packages/index.d";
import { RequestContentTypeEnum } from "@/enums/external/httpEnum";
import { useChartEditStore } from "@/store/modules/chartEditStore/chartEditStore";
import { pinia } from '@/store'
import { cloneDeep } from "lodash";
import { useFilterFn } from "@/hooks/external/useFilterFn";


const KEYS_SEPARATOR = ','
const chartEditStore = useChartEditStore(pinia)
export const useSocketStore = defineStore({
  id: 'useSocketStore',
  state: (): SocketStoreType => ({
    connectionPool: {},
    subscribePool: [],
    cacheMessage: {},
    currentSubscribeId: 0,
    unsubscribePool: []
  }),
  getters: {
    /**
     * @description 获取所有socket连接的组件
     * @returns 
     */
    getSocketComponentsRecord(): SocketComponentRecord[] {
      const socketComponents = chartEditStore.getComponentList.filter(item => (item.request.requestContentType as RequestContentTypeEnum) === RequestContentTypeEnum.WEB_SOCKET)
      return socketComponents.map(item => {
        const { request, id } = item
        const { requestParams } = request
        const { Params } = requestParams
        const { keys } = Params
        return {
          componentId: id,
          keys: keys.split(KEYS_SEPARATOR)
        }
      })
    },
  },
  actions: {
    getSubscribeId() {
      return this.currentSubscribeId++
    },

    setUnsubscribePool(message: UnsubscribePoolType) {
      this.unsubscribePool.push(message)
    },

    removeUnsubscribePool(message: SocketReceiveMessageType) {
      const index = this.unsubscribePool.findIndex(item => item.subscribeId === message.subscriptionId)
      this.unsubscribePool.splice(index, 1)
    },

    /**
     * @description 更新连接池
     */
    updateConnectionPool(entityId: string, keys: string[], componentId: string) {
      const isExist = Reflect.has(this.connectionPool, entityId)
      if (isExist) {
        const temp = Reflect.get(this.connectionPool, entityId)
        keys.forEach(key => {
          const isExistKey = Reflect.has(temp, key)
          if (!isExistKey) {
            const keyBindEntityIdList = Reflect.get(temp, key) || []
            Reflect.set(temp, key, [...keyBindEntityIdList, { componentId }] as KeyBoundComponentList[])

          }
        })
      } else {
        const keysRecord: Record<string, KeyBoundComponentList[]> = {}

        keys.forEach(key => {
          Reflect.set(keysRecord, key, [{ componentId }])
        })
        Reflect.set(this.connectionPool, entityId, keysRecord)
      }
      return this.refreshSubscribedMessage(entityId)
    },

    /**
     * @description 获取重新刷新的消息
     * @param entityId 
     * @param needUnsubscribe
     * @returns 
     */
    refreshSubscribedMessage(entityId: string) {
      const isExist = this.subscribePool.findIndex(item => item.entityId === entityId)
      const needUnsubscribe = !!~isExist

      const newSubscribeId = this.getSubscribeId()
      let oldSubscribeId: number
      // 订阅设备不存在时
      if (!~isExist) {

        this.subscribePool.push({ subscribeId: newSubscribeId, entityId })
      } else {
        oldSubscribeId = this.subscribePool.findIndex(item => item.entityId === entityId)
        this.subscribePool.splice(oldSubscribeId, 1)
        this.subscribePool.push({ subscribeId: newSubscribeId, entityId })
      }

      const unSubscribeMessage = needUnsubscribe ? this.createUnSubscribeMessage(oldSubscribeId!, entityId) : null
      const subscribeMessage = this.createMessage(newSubscribeId, entityId)

      return {
        unSubscribeMessage,
        subscribeMessage
      }
    },

    /**
     * @description 创建消息
     * @param subscribeId 
     * @param entityId 
     * @returns 
     */
    createMessage(subscribeId: number, entityId: string): SocketSendMessageType {
      const keys = Object.keys(Reflect.get(this.connectionPool, entityId)).join(',')
      const messageInfo = { entityType: 'DEVICE', entityId: entityId, scope: "LATEST_TELEMETRY", cmdId: subscribeId, keys }
      return {
        tsSubCmds: [
          messageInfo
        ]
      }
    },

    /**
     * @description 创建取消订阅的消息
     * @param subscribeId 
     * @param entityId 
     */
    createUnSubscribeMessage(subscribeId: number, entityId: string): SocketSendMessageType {
      const keys = Object.keys(Reflect.get(this.connectionPool, entityId)).join(',')
      const messageInfo = { entityType: 'DEVICE', entityId: entityId, scope: "LATEST_TELEMETRY", cmdId: subscribeId, keys, unsubscribe: true } as SocketSendMessageItemType
      const message = {
        tsSubCmds: [messageInfo]
      }
      this.setUnsubscribePool({ subscribeId, message })
      return message
    },

    /**
     * @description 订阅
     * @param targetComponent 
     */
    subscribe(targetComponent: CreateComponentType) {
      const { id: componentId, request } = targetComponent
      const { requestContentType, requestParams } = request
      if ((requestContentType as RequestContentTypeEnum) === RequestContentTypeEnum.WEB_SOCKET) {
        const { Params } = requestParams
        const { entityId = '', keys = '' } = Params
        return this.updateConnectionPool(entityId, keys.split(KEYS_SEPARATOR), componentId)
      }
    },

    /**
     * @description 缓存消息
     * @param message 
     */
    setCacheMessage(message: SocketReceiveMessageType) {
      const { subscriptionId } = message
      const existedIndex = this.subscribePool.findIndex(item => item.subscribeId === subscriptionId)

      if (~existedIndex) {
        const isExistMessage = Reflect.get(this.cacheMessage, subscriptionId)
        if (!isExistMessage) Reflect.set(this.cacheMessage, subscriptionId, [])
        Reflect.set(this.cacheMessage, subscriptionId, [...Reflect.get(this.cacheMessage, subscriptionId), message])
      }
    },

    /**
     * @description 获取需要更新的组件通过订阅id
     * @param subscribeId 
     * @param keys 
     * @returns 
     */
    getNeedUpdateComponentsIdBySubscribeId(subscribeId: number, keys: string[]) {
      const entityId = this.subscribePool.find(item => item.subscribeId === subscribeId)?.entityId

      if (entityId) {
        const keysRecord = Reflect.get(this.connectionPool, entityId)
        const needUpdateComponents = keys.map(key => keysRecord[key])
        const ids = needUpdateComponents
          .reduce((prev, next) => [...prev, ...next], [])
          .map((item: KeyBoundComponentList) => item.componentId)
        return [...new Set(ids)]
      }
    },

    /**
     * @description 获取当前组件绑定的keys数据
     * @param targetComponent 
     * @param value 
     * @returns 
     */
    getComponentValueByKeys(targetComponent: CreateComponentType, value: SocketReceiveMessageType) {
      const { request: { requestParams } } = targetComponent
      const { Params } = requestParams
      const { keys = '' } = Params
      const targetComponentBindKeys = keys.split(KEYS_SEPARATOR)

      const _value = cloneDeep(value) || { data: {}, latestValues: {} }
      _value.data = targetComponentBindKeys.reduce((prev, next) => {
        return { ...prev, [next]: _value.data[next] }
      }, {})
      _value.latestValues = targetComponentBindKeys.reduce((prev, next) => {
        return { ...prev, [next]: _value.latestValues[next] }
      }, {})

      return _value
    },

    /**
     * @description 更新组件数据通过组件id
     * @param id 
     * @param value 
     */
    updateComponentById(id: string, value: SocketReceiveMessageType) {
      const targetComponent = this.getSocketComponentsRecord.find(item => item.componentId === id)
      const targetComponentIndex = chartEditStore.fetchTargetIndex(targetComponent?.componentId)
      const target = chartEditStore.componentList[targetComponentIndex] as CreateComponentType
      const _target = cloneDeep(target)
      const { filter } = _target
      const _value = this.getComponentValueByKeys(target, value)
      const { value: filterValue, reason, flag } = useFilterFn(filter, _value)
      _target.option.dataset = flag ? filterValue : reason
      // TODO 存在重复更新未变化的值
      // console.log({ _target })
      chartEditStore.updateComponentList(targetComponentIndex, _target)
    },

    /**
     * @description 更新组件数据源
     * @param value 
     */
    updateComponentDataset(value: SocketReceiveMessageType) {
      const { subscriptionId, data } = value
      const keys = Object.keys(data)
      const componentIds = this.getNeedUpdateComponentsIdBySubscribeId(subscriptionId, keys)
      console.log(componentIds)
      componentIds?.forEach((targetComponentId) => {
        this.updateComponentById(targetComponentId as string, value)
      })
    },

    /**
     * @description socket接受到消息后,从需要取消的订阅池中取消订阅消息
     */
    unsubscribe(message: SocketReceiveMessageType, unsubscribeFn: Fn) {
      const { subscriptionId } = message
      if (subscriptionId === undefined) return
      const index = this.unsubscribePool.findIndex(item => item.subscribeId === subscriptionId)
      if (!~index) return
      const sendMessage = this.unsubscribePool[index].message
      unsubscribeFn(JSON.stringify(sendMessage))
      this.removeUnsubscribePool(message)
    }
  }
})