Commit 94d0ae46d097c746d4e7f12c89e4aaabc2b43917
Merge branch 'ww' into 'main_dev'
fix(socket): socket消息未能成功取消订阅 See merge request yunteng/thingskit-view!39
Showing
7 changed files
with
94 additions
and
31 deletions
... | ... | @@ -8,6 +8,7 @@ import { setOption } from '@/packages/public/chart' |
8 | 8 | import { useChartDataSocket } from './useChartDataSocket' |
9 | 9 | import { customRequest } from '@/api/external/customRequest' |
10 | 10 | import { useFilterFn } from './useFilterFn' |
11 | +import { RequestContentTypeEnum } from '@/enums/external/httpEnum' | |
11 | 12 | |
12 | 13 | // 获取类型 |
13 | 14 | type ChartEditStoreType = typeof useChartEditStore |
... | ... | @@ -26,9 +27,6 @@ export const useChartDataFetch = ( |
26 | 27 | const vChartRef = ref<typeof VChart | null>(null) |
27 | 28 | let fetchInterval: any = 0 |
28 | 29 | |
29 | - // 数据池 | |
30 | - const { addGlobalDataInterface } = useChartDataPondFetch() | |
31 | - | |
32 | 30 | // 组件类型 |
33 | 31 | const { chartFrame } = targetComponent.chartConfig |
34 | 32 | |
... | ... | @@ -43,6 +41,7 @@ export const useChartDataFetch = ( |
43 | 41 | |
44 | 42 | const requestIntervalFn = () => { |
45 | 43 | const chartEditStore = useChartEditStore() |
44 | + if ((targetComponent.request.requestContentType as RequestContentTypeEnum) === RequestContentTypeEnum.WEB_SOCKET) return | |
46 | 45 | |
47 | 46 | // 全局数据 |
48 | 47 | const { |
... | ... | @@ -107,9 +106,9 @@ export const useChartDataFetch = ( |
107 | 106 | } catch (error) { |
108 | 107 | console.log(error) |
109 | 108 | } |
110 | - } | |
109 | + } | |
111 | 110 | |
112 | - if (isPreview()) { | |
111 | + if (isPreview()) { | |
113 | 112 | requestIntervalFn() |
114 | 113 | const { initial } = useChartDataSocket() |
115 | 114 | initial(targetComponent, useChartEditStore, updateCallback) | ... | ... |
... | ... | @@ -35,17 +35,19 @@ const getSocketInstance = (request: ExtraRequestConfigType) => { |
35 | 35 | const { requestUrl, requestOriginUrl } = request |
36 | 36 | const socketStore = useSocketStore() |
37 | 37 | const index = socketConnectionPool.findIndex(item => item.url === requestUrl) |
38 | + | |
38 | 39 | if (~index) { |
39 | 40 | return socketConnectionPool[index].ws |
40 | 41 | } |
41 | 42 | const token = getJwtToken() |
42 | 43 | const socketUrl = `${getOriginUrl(requestOriginUrl || '')}${requestUrl}?token=${token}` |
43 | 44 | |
44 | - const instance = useWebSocket(socketUrl.replace('undefined', ''), { | |
45 | + const instance = useWebSocket(socketUrl, { | |
45 | 46 | onMessage() { |
46 | 47 | const { data: originData } = instance |
47 | 48 | const value = parse(unref(originData)) as SocketReceiveMessageType |
48 | 49 | socketStore.updateComponentDataset(value) |
50 | + socketStore.unsubscribe(value, instance.send) | |
49 | 51 | }, |
50 | 52 | onDisconnected(ws, event) { |
51 | 53 | console.log('连接断开') |
... | ... | @@ -64,25 +66,28 @@ export const useChartDataSocket = () => { |
64 | 66 | const socketStore = useSocketStore() |
65 | 67 | |
66 | 68 | const initial = (targetComponent: CreateComponentType, useChartEditStore: ChartEditStoreType, updateCallback?: (...args: any) => any) => { |
67 | - const { request } = targetComponent | |
68 | 69 | |
70 | + const { request } = targetComponent | |
69 | 71 | const { requestUrl, requestContentType } = request |
70 | 72 | |
71 | 73 | if ((requestContentType as RequestContentTypeEnum) !== RequestContentTypeEnum.WEB_SOCKET) return |
72 | - | |
73 | 74 | const { send } = getSocketInstance(request!) |
74 | 75 | |
75 | 76 | onMounted(() => { |
76 | 77 | const message = socketStore.subscribe(targetComponent) |
77 | - send(JSON.stringify(message)) | |
78 | + if (!message) return | |
79 | + const { subscribeMessage } = message | |
80 | + send(JSON.stringify(subscribeMessage)) | |
78 | 81 | }) |
79 | 82 | } |
80 | 83 | |
81 | 84 | const sendMessage = async (targetComponent: CreateComponentType) => { |
82 | 85 | const { request } = unref(targetComponent) |
83 | 86 | const message = socketStore.subscribe(unref(targetComponent)) |
87 | + if (!message) return | |
88 | + const { subscribeMessage } = message | |
84 | 89 | const { send, data } = getSocketInstance(request) |
85 | - message && send(JSON.stringify(message)) | |
90 | + send(JSON.stringify(subscribeMessage)) | |
86 | 91 | return socketStore.getComponentValueByKeys(targetComponent, parse(unref(data))) |
87 | 92 | } |
88 | 93 | ... | ... |
... | ... | @@ -3,7 +3,8 @@ export enum SocketStoreEnum { |
3 | 3 | CONNECTION_POOL = 'connectionPool', |
4 | 4 | SUBSCRIBE_POOL = 'subscribePool', |
5 | 5 | CACHE_MESSAGE = 'cacheMessage', |
6 | - CURRENT_SUBSCRIBE_ID = 'currentSubscribeId' | |
6 | + CURRENT_SUBSCRIBE_ID = 'currentSubscribeId', | |
7 | + UNSUBSCRIBE_POOL = 'unsubscribePool' | |
7 | 8 | } |
8 | 9 | |
9 | 10 | export interface KeyBoundComponentList { |
... | ... | @@ -55,10 +56,15 @@ export interface SocketComponentRecord { |
55 | 56 | keys: string[] |
56 | 57 | } |
57 | 58 | |
59 | +export interface UnsubscribePoolType { | |
60 | + subscribeId: number, | |
61 | + message: SocketSendMessageType | |
62 | +} | |
63 | + | |
58 | 64 | export interface SocketStoreType { |
59 | 65 | [SocketStoreEnum.CONNECTION_POOL]: SocketConnectionPoolType, |
60 | 66 | [SocketStoreEnum.SUBSCRIBE_POOL]: SubscribePoolType[], |
61 | 67 | [SocketStoreEnum.CACHE_MESSAGE]: CacheMessageType, |
62 | - [SocketStoreEnum.CURRENT_SUBSCRIBE_ID]: number | |
63 | - | |
68 | + [SocketStoreEnum.CURRENT_SUBSCRIBE_ID]: number, | |
69 | + [SocketStoreEnum.UNSUBSCRIBE_POOL]: UnsubscribePoolType[] | |
64 | 70 | } | ... | ... |
1 | 1 | import { defineStore } from "pinia"; |
2 | -import { KeyBoundComponentList, SocketComponentRecord, SocketReceiveMessageType, SocketSendMessageItemType, SocketSendMessageType, SocketStoreType } from '@/store/external/modules/socketStore.d' | |
2 | +import { KeyBoundComponentList, SocketComponentRecord, SocketReceiveMessageType, SocketSendMessageItemType, SocketSendMessageType, SocketStoreType, UnsubscribePoolType } from '@/store/external/modules/socketStore.d' | |
3 | 3 | import { CreateComponentType } from "@/packages/index.d"; |
4 | 4 | import { RequestContentTypeEnum } from "@/enums/external/httpEnum"; |
5 | 5 | import { useChartEditStore } from "@/store/modules/chartEditStore/chartEditStore"; |
... | ... | @@ -16,7 +16,8 @@ export const useSocketStore = defineStore({ |
16 | 16 | connectionPool: {}, |
17 | 17 | subscribePool: [], |
18 | 18 | cacheMessage: {}, |
19 | - currentSubscribeId: 0 | |
19 | + currentSubscribeId: 0, | |
20 | + unsubscribePool: [] | |
20 | 21 | }), |
21 | 22 | getters: { |
22 | 23 | /** |
... | ... | @@ -39,7 +40,16 @@ export const useSocketStore = defineStore({ |
39 | 40 | }, |
40 | 41 | actions: { |
41 | 42 | getSubscribeId() { |
42 | - return this.currentSubscribeId += this.currentSubscribeId | |
43 | + return this.currentSubscribeId++ | |
44 | + }, | |
45 | + | |
46 | + setUnsubscribePool(message: UnsubscribePoolType) { | |
47 | + this.unsubscribePool.push(message) | |
48 | + }, | |
49 | + | |
50 | + removeUnsubscribePool(message: SocketReceiveMessageType) { | |
51 | + const index = this.unsubscribePool.findIndex(item => item.subscribeId === message.subscriptionId) | |
52 | + this.unsubscribePool.splice(index, 1) | |
43 | 53 | }, |
44 | 54 | |
45 | 55 | /** |
... | ... | @@ -76,12 +86,27 @@ export const useSocketStore = defineStore({ |
76 | 86 | */ |
77 | 87 | refreshSubscribedMessage(entityId: string) { |
78 | 88 | const isExist = this.subscribePool.findIndex(item => item.entityId === entityId) |
89 | + const needUnsubscribe = !!~isExist | |
90 | + | |
91 | + const newSubscribeId = this.getSubscribeId() | |
92 | + let oldSubscribeId: number | |
93 | + // 订阅设备不存在时 | |
79 | 94 | if (!~isExist) { |
80 | - const subscribeId = this.getSubscribeId() | |
81 | - this.subscribePool.push({ subscribeId, entityId }) | |
95 | + | |
96 | + this.subscribePool.push({ subscribeId: newSubscribeId, entityId }) | |
97 | + } else { | |
98 | + oldSubscribeId = this.subscribePool.findIndex(item => item.entityId === entityId) | |
99 | + this.subscribePool.splice(oldSubscribeId, 1) | |
100 | + this.subscribePool.push({ subscribeId: newSubscribeId, entityId }) | |
101 | + } | |
102 | + | |
103 | + const unSubscribeMessage = needUnsubscribe ? this.createUnSubscribeMessage(oldSubscribeId!, entityId) : null | |
104 | + const subscribeMessage = this.createMessage(newSubscribeId, entityId) | |
105 | + | |
106 | + return { | |
107 | + unSubscribeMessage, | |
108 | + subscribeMessage | |
82 | 109 | } |
83 | - const subscribeId = this.subscribePool.find(item => item.entityId === entityId)!.subscribeId! | |
84 | - return this.createMessage(subscribeId, entityId, !!~isExist) | |
85 | 110 | }, |
86 | 111 | |
87 | 112 | /** |
... | ... | @@ -90,18 +115,32 @@ export const useSocketStore = defineStore({ |
90 | 115 | * @param entityId |
91 | 116 | * @returns |
92 | 117 | */ |
93 | - createMessage(subscribeId: number, entityId: string, needUnsubscribe: boolean): SocketSendMessageType { | |
118 | + createMessage(subscribeId: number, entityId: string): SocketSendMessageType { | |
94 | 119 | const keys = Object.keys(Reflect.get(this.connectionPool, entityId)).join(',') |
95 | 120 | const messageInfo = { entityType: 'DEVICE', entityId: entityId, scope: "LATEST_TELEMETRY", cmdId: subscribeId, keys } |
96 | 121 | return { |
97 | 122 | tsSubCmds: [ |
98 | - ...(needUnsubscribe ? [{ ...messageInfo, unsubscribe: true }] as SocketSendMessageItemType[] : []), | |
99 | - messageInfo, | |
123 | + messageInfo | |
100 | 124 | ] |
101 | 125 | } |
102 | 126 | }, |
103 | 127 | |
104 | 128 | /** |
129 | + * @description 创建取消订阅的消息 | |
130 | + * @param subscribeId | |
131 | + * @param entityId | |
132 | + */ | |
133 | + createUnSubscribeMessage(subscribeId: number, entityId: string): SocketSendMessageType { | |
134 | + const keys = Object.keys(Reflect.get(this.connectionPool, entityId)).join(',') | |
135 | + const messageInfo = { entityType: 'DEVICE', entityId: entityId, scope: "LATEST_TELEMETRY", cmdId: subscribeId, keys, unsubscribe: true } as SocketSendMessageItemType | |
136 | + const message = { | |
137 | + tsSubCmds: [messageInfo] | |
138 | + } | |
139 | + this.setUnsubscribePool({ subscribeId, message }) | |
140 | + return message | |
141 | + }, | |
142 | + | |
143 | + /** | |
105 | 144 | * @description 订阅 |
106 | 145 | * @param targetComponent |
107 | 146 | */ |
... | ... | @@ -138,6 +177,7 @@ export const useSocketStore = defineStore({ |
138 | 177 | */ |
139 | 178 | getNeedUpdateComponentsIdBySubscribeId(subscribeId: number, keys: string[]) { |
140 | 179 | const entityId = this.subscribePool.find(item => item.subscribeId === subscribeId)?.entityId |
180 | + | |
141 | 181 | if (entityId) { |
142 | 182 | const keysRecord = Reflect.get(this.connectionPool, entityId) |
143 | 183 | const needUpdateComponents = keys.map(key => keysRecord[key]) |
... | ... | @@ -160,14 +200,14 @@ export const useSocketStore = defineStore({ |
160 | 200 | const { keys = '' } = Params |
161 | 201 | const targetComponentBindKeys = keys.split(KEYS_SEPARATOR) |
162 | 202 | |
163 | - const _value = cloneDeep(value) | |
203 | + const _value = cloneDeep(value) || { data: {}, latestValues: {} } | |
164 | 204 | _value.data = targetComponentBindKeys.reduce((prev, next) => { |
165 | - return { ...prev, [next]: value.data[next] } | |
205 | + return { ...prev, [next]: _value.data[next] } | |
166 | 206 | }, {}) |
167 | 207 | _value.latestValues = targetComponentBindKeys.reduce((prev, next) => { |
168 | - return { ...prev, [next]: value.latestValues[next] } | |
208 | + return { ...prev, [next]: _value.latestValues[next] } | |
169 | 209 | }, {}) |
170 | - | |
210 | + | |
171 | 211 | return _value |
172 | 212 | }, |
173 | 213 | |
... | ... | @@ -198,10 +238,23 @@ export const useSocketStore = defineStore({ |
198 | 238 | const { subscriptionId, data } = value |
199 | 239 | const keys = Object.keys(data) |
200 | 240 | const componentIds = this.getNeedUpdateComponentsIdBySubscribeId(subscriptionId, keys) |
241 | + console.log(componentIds) | |
201 | 242 | componentIds?.forEach((targetComponentId) => { |
202 | - this.updateComponentById(targetComponentId as string , value) | |
243 | + this.updateComponentById(targetComponentId as string, value) | |
203 | 244 | }) |
245 | + }, | |
204 | 246 | |
247 | + /** | |
248 | + * @description socket接受到消息后,从需要取消的订阅池中取消订阅消息 | |
249 | + */ | |
250 | + unsubscribe(message: SocketReceiveMessageType, unsubscribeFn: Fn) { | |
251 | + const { subscriptionId } = message | |
252 | + if (subscriptionId === undefined) return | |
253 | + const index = this.unsubscribePool.findIndex(item => item.subscribeId === subscriptionId) | |
254 | + if (!~index) return | |
255 | + const sendMessage = this.unsubscribePool[index].message | |
256 | + unsubscribeFn(JSON.stringify(sendMessage)) | |
257 | + this.removeUnsubscribePool(message) | |
205 | 258 | } |
206 | 259 | } |
207 | 260 | }) | ... | ... |
... | ... | @@ -114,6 +114,7 @@ const getConfigurationData = () => { |
114 | 114 | record.requestParams[RequestParamsTypeEnum.BODY] = bodyValue |
115 | 115 | record.requestInterval = unref(requestIntervalValueRef) |
116 | 116 | record.requestIntervalUnit = unref(requestIntervalUnitRef) |
117 | + record.requestContentType = unref(requestContentTypeRef) as unknown as any | |
117 | 118 | return record |
118 | 119 | } |
119 | 120 | ... | ... |
... | ... | @@ -22,8 +22,7 @@ const updateValue = (value: string) => { |
22 | 22 | <NTag type="success" style="width: fit-content;">sql</NTag> |
23 | 23 | </SettingItemBox> |
24 | 24 | <SettingItemBox name="键值"> |
25 | - <MonacoEditor v-model:modelValue="value" @update:modelValue="updateValue" width="600px" height="200px" | |
26 | - language="sql" /> | |
25 | + <MonacoEditor :modelValue="value" @update:modelValue="updateValue" width="600px" height="200px" language="sql" /> | |
27 | 26 | </SettingItemBox> |
28 | 27 | </section> |
29 | 28 | </template> | ... | ... |
... | ... | @@ -52,7 +52,7 @@ const title = ref('') |
52 | 52 | const comTitle = computed(() => { |
53 | 53 | // eslint-disable-next-line vue/no-side-effects-in-computed-properties |
54 | 54 | const newTitle = projectInfoStore.getProjectInfo.dataViewName |
55 | - setTitle(`工作空间-${newTitle}`) | |
55 | + setTitle(`工作空间-${newTitle || ''}`) | |
56 | 56 | chartEditStore.setEditCanvasConfig(EditCanvasConfigEnum.PROJECT_NAME, newTitle) |
57 | 57 | projectInfoStore.setProjectInfoByKey(ProjectInfoEnum.DATA_VIEW_NAME, newTitle) |
58 | 58 | return newTitle | ... | ... |