Commit a29e456ba41a333fc00a0d51b8bfb2fd0c36dbf2

Authored by ww
1 parent e45d77c3

fix(socket): socket消息未能成功取消订阅

@@ -8,6 +8,7 @@ import { setOption } from '@/packages/public/chart' @@ -8,6 +8,7 @@ import { setOption } from '@/packages/public/chart'
8 import { useChartDataSocket } from './useChartDataSocket' 8 import { useChartDataSocket } from './useChartDataSocket'
9 import { customRequest } from '@/api/external/customRequest' 9 import { customRequest } from '@/api/external/customRequest'
10 import { useFilterFn } from './useFilterFn' 10 import { useFilterFn } from './useFilterFn'
  11 +import { RequestContentTypeEnum } from '@/enums/external/httpEnum'
11 12
12 // 获取类型 13 // 获取类型
13 type ChartEditStoreType = typeof useChartEditStore 14 type ChartEditStoreType = typeof useChartEditStore
@@ -26,9 +27,6 @@ export const useChartDataFetch = ( @@ -26,9 +27,6 @@ export const useChartDataFetch = (
26 const vChartRef = ref<typeof VChart | null>(null) 27 const vChartRef = ref<typeof VChart | null>(null)
27 let fetchInterval: any = 0 28 let fetchInterval: any = 0
28 29
29 - // 数据池  
30 - const { addGlobalDataInterface } = useChartDataPondFetch()  
31 -  
32 // 组件类型 30 // 组件类型
33 const { chartFrame } = targetComponent.chartConfig 31 const { chartFrame } = targetComponent.chartConfig
34 32
@@ -43,6 +41,7 @@ export const useChartDataFetch = ( @@ -43,6 +41,7 @@ export const useChartDataFetch = (
43 41
44 const requestIntervalFn = () => { 42 const requestIntervalFn = () => {
45 const chartEditStore = useChartEditStore() 43 const chartEditStore = useChartEditStore()
  44 + if ((targetComponent.request.requestContentType as RequestContentTypeEnum) === RequestContentTypeEnum.WEB_SOCKET) return
46 45
47 // 全局数据 46 // 全局数据
48 const { 47 const {
@@ -107,9 +106,9 @@ export const useChartDataFetch = ( @@ -107,9 +106,9 @@ export const useChartDataFetch = (
107 } catch (error) { 106 } catch (error) {
108 console.log(error) 107 console.log(error)
109 } 108 }
110 - } 109 + }
111 110
112 - if (isPreview()) { 111 + if (isPreview()) {
113 requestIntervalFn() 112 requestIntervalFn()
114 const { initial } = useChartDataSocket() 113 const { initial } = useChartDataSocket()
115 initial(targetComponent, useChartEditStore, updateCallback) 114 initial(targetComponent, useChartEditStore, updateCallback)
@@ -35,17 +35,19 @@ const getSocketInstance = (request: ExtraRequestConfigType) => { @@ -35,17 +35,19 @@ const getSocketInstance = (request: ExtraRequestConfigType) => {
35 const { requestUrl, requestOriginUrl } = request 35 const { requestUrl, requestOriginUrl } = request
36 const socketStore = useSocketStore() 36 const socketStore = useSocketStore()
37 const index = socketConnectionPool.findIndex(item => item.url === requestUrl) 37 const index = socketConnectionPool.findIndex(item => item.url === requestUrl)
  38 +
38 if (~index) { 39 if (~index) {
39 return socketConnectionPool[index].ws 40 return socketConnectionPool[index].ws
40 } 41 }
41 const token = getJwtToken() 42 const token = getJwtToken()
42 const socketUrl = `${getOriginUrl(requestOriginUrl || '')}${requestUrl}?token=${token}` 43 const socketUrl = `${getOriginUrl(requestOriginUrl || '')}${requestUrl}?token=${token}`
43 44
44 - const instance = useWebSocket(socketUrl.replace('undefined', ''), { 45 + const instance = useWebSocket(socketUrl, {
45 onMessage() { 46 onMessage() {
46 const { data: originData } = instance 47 const { data: originData } = instance
47 const value = parse(unref(originData)) as SocketReceiveMessageType 48 const value = parse(unref(originData)) as SocketReceiveMessageType
48 socketStore.updateComponentDataset(value) 49 socketStore.updateComponentDataset(value)
  50 + socketStore.unsubscribe(value, instance.send)
49 }, 51 },
50 onDisconnected(ws, event) { 52 onDisconnected(ws, event) {
51 console.log('连接断开') 53 console.log('连接断开')
@@ -64,25 +66,28 @@ export const useChartDataSocket = () => { @@ -64,25 +66,28 @@ export const useChartDataSocket = () => {
64 const socketStore = useSocketStore() 66 const socketStore = useSocketStore()
65 67
66 const initial = (targetComponent: CreateComponentType, useChartEditStore: ChartEditStoreType, updateCallback?: (...args: any) => any) => { 68 const initial = (targetComponent: CreateComponentType, useChartEditStore: ChartEditStoreType, updateCallback?: (...args: any) => any) => {
67 - const { request } = targetComponent  
68 69
  70 + const { request } = targetComponent
69 const { requestUrl, requestContentType } = request 71 const { requestUrl, requestContentType } = request
70 72
71 if ((requestContentType as RequestContentTypeEnum) !== RequestContentTypeEnum.WEB_SOCKET) return 73 if ((requestContentType as RequestContentTypeEnum) !== RequestContentTypeEnum.WEB_SOCKET) return
72 -  
73 const { send } = getSocketInstance(request!) 74 const { send } = getSocketInstance(request!)
74 75
75 onMounted(() => { 76 onMounted(() => {
76 const message = socketStore.subscribe(targetComponent) 77 const message = socketStore.subscribe(targetComponent)
77 - send(JSON.stringify(message)) 78 + if (!message) return
  79 + const { subscribeMessage } = message
  80 + send(JSON.stringify(subscribeMessage))
78 }) 81 })
79 } 82 }
80 83
81 const sendMessage = async (targetComponent: CreateComponentType) => { 84 const sendMessage = async (targetComponent: CreateComponentType) => {
82 const { request } = unref(targetComponent) 85 const { request } = unref(targetComponent)
83 const message = socketStore.subscribe(unref(targetComponent)) 86 const message = socketStore.subscribe(unref(targetComponent))
  87 + if (!message) return
  88 + const { subscribeMessage } = message
84 const { send, data } = getSocketInstance(request) 89 const { send, data } = getSocketInstance(request)
85 - message && send(JSON.stringify(message)) 90 + send(JSON.stringify(subscribeMessage))
86 return socketStore.getComponentValueByKeys(targetComponent, parse(unref(data))) 91 return socketStore.getComponentValueByKeys(targetComponent, parse(unref(data)))
87 } 92 }
88 93
@@ -3,7 +3,8 @@ export enum SocketStoreEnum { @@ -3,7 +3,8 @@ export enum SocketStoreEnum {
3 CONNECTION_POOL = 'connectionPool', 3 CONNECTION_POOL = 'connectionPool',
4 SUBSCRIBE_POOL = 'subscribePool', 4 SUBSCRIBE_POOL = 'subscribePool',
5 CACHE_MESSAGE = 'cacheMessage', 5 CACHE_MESSAGE = 'cacheMessage',
6 - CURRENT_SUBSCRIBE_ID = 'currentSubscribeId' 6 + CURRENT_SUBSCRIBE_ID = 'currentSubscribeId',
  7 + UNSUBSCRIBE_POOL = 'unsubscribePool'
7 } 8 }
8 9
9 export interface KeyBoundComponentList { 10 export interface KeyBoundComponentList {
@@ -55,10 +56,15 @@ export interface SocketComponentRecord { @@ -55,10 +56,15 @@ export interface SocketComponentRecord {
55 keys: string[] 56 keys: string[]
56 } 57 }
57 58
  59 +export interface UnsubscribePoolType {
  60 + subscribeId: number,
  61 + message: SocketSendMessageType
  62 +}
  63 +
58 export interface SocketStoreType { 64 export interface SocketStoreType {
59 [SocketStoreEnum.CONNECTION_POOL]: SocketConnectionPoolType, 65 [SocketStoreEnum.CONNECTION_POOL]: SocketConnectionPoolType,
60 [SocketStoreEnum.SUBSCRIBE_POOL]: SubscribePoolType[], 66 [SocketStoreEnum.SUBSCRIBE_POOL]: SubscribePoolType[],
61 [SocketStoreEnum.CACHE_MESSAGE]: CacheMessageType, 67 [SocketStoreEnum.CACHE_MESSAGE]: CacheMessageType,
62 - [SocketStoreEnum.CURRENT_SUBSCRIBE_ID]: number  
63 - 68 + [SocketStoreEnum.CURRENT_SUBSCRIBE_ID]: number,
  69 + [SocketStoreEnum.UNSUBSCRIBE_POOL]: UnsubscribePoolType[]
64 } 70 }
1 import { defineStore } from "pinia"; 1 import { defineStore } from "pinia";
2 -import { KeyBoundComponentList, SocketComponentRecord, SocketReceiveMessageType, SocketSendMessageItemType, SocketSendMessageType, SocketStoreType } from '@/store/external/modules/socketStore.d' 2 +import { KeyBoundComponentList, SocketComponentRecord, SocketReceiveMessageType, SocketSendMessageItemType, SocketSendMessageType, SocketStoreType, UnsubscribePoolType } from '@/store/external/modules/socketStore.d'
3 import { CreateComponentType } from "@/packages/index.d"; 3 import { CreateComponentType } from "@/packages/index.d";
4 import { RequestContentTypeEnum } from "@/enums/external/httpEnum"; 4 import { RequestContentTypeEnum } from "@/enums/external/httpEnum";
5 import { useChartEditStore } from "@/store/modules/chartEditStore/chartEditStore"; 5 import { useChartEditStore } from "@/store/modules/chartEditStore/chartEditStore";
@@ -16,7 +16,8 @@ export const useSocketStore = defineStore({ @@ -16,7 +16,8 @@ export const useSocketStore = defineStore({
16 connectionPool: {}, 16 connectionPool: {},
17 subscribePool: [], 17 subscribePool: [],
18 cacheMessage: {}, 18 cacheMessage: {},
19 - currentSubscribeId: 0 19 + currentSubscribeId: 0,
  20 + unsubscribePool: []
20 }), 21 }),
21 getters: { 22 getters: {
22 /** 23 /**
@@ -39,7 +40,16 @@ export const useSocketStore = defineStore({ @@ -39,7 +40,16 @@ export const useSocketStore = defineStore({
39 }, 40 },
40 actions: { 41 actions: {
41 getSubscribeId() { 42 getSubscribeId() {
42 - return this.currentSubscribeId += this.currentSubscribeId 43 + return this.currentSubscribeId++
  44 + },
  45 +
  46 + setUnsubscribePool(message: UnsubscribePoolType) {
  47 + this.unsubscribePool.push(message)
  48 + },
  49 +
  50 + removeUnsubscribePool(message: SocketReceiveMessageType) {
  51 + const index = this.unsubscribePool.findIndex(item => item.subscribeId === message.subscriptionId)
  52 + this.unsubscribePool.splice(index, 1)
43 }, 53 },
44 54
45 /** 55 /**
@@ -76,12 +86,27 @@ export const useSocketStore = defineStore({ @@ -76,12 +86,27 @@ export const useSocketStore = defineStore({
76 */ 86 */
77 refreshSubscribedMessage(entityId: string) { 87 refreshSubscribedMessage(entityId: string) {
78 const isExist = this.subscribePool.findIndex(item => item.entityId === entityId) 88 const isExist = this.subscribePool.findIndex(item => item.entityId === entityId)
  89 + const needUnsubscribe = !!~isExist
  90 +
  91 + const newSubscribeId = this.getSubscribeId()
  92 + let oldSubscribeId: number
  93 + // 订阅设备不存在时
79 if (!~isExist) { 94 if (!~isExist) {
80 - const subscribeId = this.getSubscribeId()  
81 - this.subscribePool.push({ subscribeId, entityId }) 95 +
  96 + this.subscribePool.push({ subscribeId: newSubscribeId, entityId })
  97 + } else {
  98 + oldSubscribeId = this.subscribePool.findIndex(item => item.entityId === entityId)
  99 + this.subscribePool.splice(oldSubscribeId, 1)
  100 + this.subscribePool.push({ subscribeId: newSubscribeId, entityId })
  101 + }
  102 +
  103 + const unSubscribeMessage = needUnsubscribe ? this.createUnSubscribeMessage(oldSubscribeId!, entityId) : null
  104 + const subscribeMessage = this.createMessage(newSubscribeId, entityId)
  105 +
  106 + return {
  107 + unSubscribeMessage,
  108 + subscribeMessage
82 } 109 }
83 - const subscribeId = this.subscribePool.find(item => item.entityId === entityId)!.subscribeId!  
84 - return this.createMessage(subscribeId, entityId, !!~isExist)  
85 }, 110 },
86 111
87 /** 112 /**
@@ -90,18 +115,32 @@ export const useSocketStore = defineStore({ @@ -90,18 +115,32 @@ export const useSocketStore = defineStore({
90 * @param entityId 115 * @param entityId
91 * @returns 116 * @returns
92 */ 117 */
93 - createMessage(subscribeId: number, entityId: string, needUnsubscribe: boolean): SocketSendMessageType { 118 + createMessage(subscribeId: number, entityId: string): SocketSendMessageType {
94 const keys = Object.keys(Reflect.get(this.connectionPool, entityId)).join(',') 119 const keys = Object.keys(Reflect.get(this.connectionPool, entityId)).join(',')
95 const messageInfo = { entityType: 'DEVICE', entityId: entityId, scope: "LATEST_TELEMETRY", cmdId: subscribeId, keys } 120 const messageInfo = { entityType: 'DEVICE', entityId: entityId, scope: "LATEST_TELEMETRY", cmdId: subscribeId, keys }
96 return { 121 return {
97 tsSubCmds: [ 122 tsSubCmds: [
98 - ...(needUnsubscribe ? [{ ...messageInfo, unsubscribe: true }] as SocketSendMessageItemType[] : []),  
99 - messageInfo, 123 + messageInfo
100 ] 124 ]
101 } 125 }
102 }, 126 },
103 127
104 /** 128 /**
  129 + * @description 创建取消订阅的消息
  130 + * @param subscribeId
  131 + * @param entityId
  132 + */
  133 + createUnSubscribeMessage(subscribeId: number, entityId: string): SocketSendMessageType {
  134 + const keys = Object.keys(Reflect.get(this.connectionPool, entityId)).join(',')
  135 + const messageInfo = { entityType: 'DEVICE', entityId: entityId, scope: "LATEST_TELEMETRY", cmdId: subscribeId, keys, unsubscribe: true } as SocketSendMessageItemType
  136 + const message = {
  137 + tsSubCmds: [messageInfo]
  138 + }
  139 + this.setUnsubscribePool({ subscribeId, message })
  140 + return message
  141 + },
  142 +
  143 + /**
105 * @description 订阅 144 * @description 订阅
106 * @param targetComponent 145 * @param targetComponent
107 */ 146 */
@@ -138,6 +177,7 @@ export const useSocketStore = defineStore({ @@ -138,6 +177,7 @@ export const useSocketStore = defineStore({
138 */ 177 */
139 getNeedUpdateComponentsIdBySubscribeId(subscribeId: number, keys: string[]) { 178 getNeedUpdateComponentsIdBySubscribeId(subscribeId: number, keys: string[]) {
140 const entityId = this.subscribePool.find(item => item.subscribeId === subscribeId)?.entityId 179 const entityId = this.subscribePool.find(item => item.subscribeId === subscribeId)?.entityId
  180 +
141 if (entityId) { 181 if (entityId) {
142 const keysRecord = Reflect.get(this.connectionPool, entityId) 182 const keysRecord = Reflect.get(this.connectionPool, entityId)
143 const needUpdateComponents = keys.map(key => keysRecord[key]) 183 const needUpdateComponents = keys.map(key => keysRecord[key])
@@ -160,14 +200,14 @@ export const useSocketStore = defineStore({ @@ -160,14 +200,14 @@ export const useSocketStore = defineStore({
160 const { keys = '' } = Params 200 const { keys = '' } = Params
161 const targetComponentBindKeys = keys.split(KEYS_SEPARATOR) 201 const targetComponentBindKeys = keys.split(KEYS_SEPARATOR)
162 202
163 - const _value = cloneDeep(value) 203 + const _value = cloneDeep(value) || { data: {}, latestValues: {} }
164 _value.data = targetComponentBindKeys.reduce((prev, next) => { 204 _value.data = targetComponentBindKeys.reduce((prev, next) => {
165 - return { ...prev, [next]: value.data[next] } 205 + return { ...prev, [next]: _value.data[next] }
166 }, {}) 206 }, {})
167 _value.latestValues = targetComponentBindKeys.reduce((prev, next) => { 207 _value.latestValues = targetComponentBindKeys.reduce((prev, next) => {
168 - return { ...prev, [next]: value.latestValues[next] } 208 + return { ...prev, [next]: _value.latestValues[next] }
169 }, {}) 209 }, {})
170 - 210 +
171 return _value 211 return _value
172 }, 212 },
173 213
@@ -198,10 +238,23 @@ export const useSocketStore = defineStore({ @@ -198,10 +238,23 @@ export const useSocketStore = defineStore({
198 const { subscriptionId, data } = value 238 const { subscriptionId, data } = value
199 const keys = Object.keys(data) 239 const keys = Object.keys(data)
200 const componentIds = this.getNeedUpdateComponentsIdBySubscribeId(subscriptionId, keys) 240 const componentIds = this.getNeedUpdateComponentsIdBySubscribeId(subscriptionId, keys)
  241 + console.log(componentIds)
201 componentIds?.forEach((targetComponentId) => { 242 componentIds?.forEach((targetComponentId) => {
202 - this.updateComponentById(targetComponentId as string , value) 243 + this.updateComponentById(targetComponentId as string, value)
203 }) 244 })
  245 + },
204 246
  247 + /**
  248 + * @description socket接受到消息后,从需要取消的订阅池中取消订阅消息
  249 + */
  250 + unsubscribe(message: SocketReceiveMessageType, unsubscribeFn: Fn) {
  251 + const { subscriptionId } = message
  252 + if (subscriptionId === undefined) return
  253 + const index = this.unsubscribePool.findIndex(item => item.subscribeId === subscriptionId)
  254 + if (!~index) return
  255 + const sendMessage = this.unsubscribePool[index].message
  256 + unsubscribeFn(JSON.stringify(sendMessage))
  257 + this.removeUnsubscribePool(message)
205 } 258 }
206 } 259 }
207 }) 260 })
@@ -114,6 +114,7 @@ const getConfigurationData = () => { @@ -114,6 +114,7 @@ const getConfigurationData = () => {
114 record.requestParams[RequestParamsTypeEnum.BODY] = bodyValue 114 record.requestParams[RequestParamsTypeEnum.BODY] = bodyValue
115 record.requestInterval = unref(requestIntervalValueRef) 115 record.requestInterval = unref(requestIntervalValueRef)
116 record.requestIntervalUnit = unref(requestIntervalUnitRef) 116 record.requestIntervalUnit = unref(requestIntervalUnitRef)
  117 + record.requestContentType = unref(requestContentTypeRef) as unknown as any
117 return record 118 return record
118 } 119 }
119 120
@@ -52,7 +52,7 @@ const title = ref('') @@ -52,7 +52,7 @@ const title = ref('')
52 const comTitle = computed(() => { 52 const comTitle = computed(() => {
53 // eslint-disable-next-line vue/no-side-effects-in-computed-properties 53 // eslint-disable-next-line vue/no-side-effects-in-computed-properties
54 const newTitle = projectInfoStore.getProjectInfo.dataViewName 54 const newTitle = projectInfoStore.getProjectInfo.dataViewName
55 - setTitle(`工作空间-${newTitle}`) 55 + setTitle(`工作空间-${newTitle || ''}`)
56 chartEditStore.setEditCanvasConfig(EditCanvasConfigEnum.PROJECT_NAME, newTitle) 56 chartEditStore.setEditCanvasConfig(EditCanvasConfigEnum.PROJECT_NAME, newTitle)
57 projectInfoStore.setProjectInfoByKey(ProjectInfoEnum.DATA_VIEW_NAME, newTitle) 57 projectInfoStore.setProjectInfoByKey(ProjectInfoEnum.DATA_VIEW_NAME, newTitle)
58 return newTitle 58 return newTitle