Showing
4 changed files
with
20 additions
and
13 deletions
@@ -251,7 +251,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc | @@ -251,7 +251,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc | ||
251 | ctx.cancelTasks(); | 251 | ctx.cancelTasks(); |
252 | ctx.clearEntitySubscriptions(); | 252 | ctx.clearEntitySubscriptions(); |
253 | if (entities.isEmpty()) { | 253 | if (entities.isEmpty()) { |
254 | - AlarmDataUpdate update = new AlarmDataUpdate(cmd.getCmdId(), new PageData<>(), null, false); | 254 | + AlarmDataUpdate update = new AlarmDataUpdate(cmd.getCmdId(), new PageData<>(), null, 0, 0); |
255 | wsService.sendWsMsg(ctx.getSessionId(), update); | 255 | wsService.sendWsMsg(ctx.getSessionId(), update); |
256 | } else { | 256 | } else { |
257 | ctx.fetchAlarms(); | 257 | ctx.fetchAlarms(); |
@@ -88,7 +88,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> { | @@ -88,7 +88,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> { | ||
88 | 88 | ||
89 | public void fetchAlarms() { | 89 | public void fetchAlarms() { |
90 | AlarmDataUpdate update; | 90 | AlarmDataUpdate update; |
91 | - if(!entitiesMap.isEmpty()) { | 91 | + if (!entitiesMap.isEmpty()) { |
92 | long start = System.currentTimeMillis(); | 92 | long start = System.currentTimeMillis(); |
93 | PageData<AlarmData> alarms = alarmService.findAlarmDataByQueryForEntities(getTenantId(), getCustomerId(), | 93 | PageData<AlarmData> alarms = alarmService.findAlarmDataByQueryForEntities(getTenantId(), getCustomerId(), |
94 | query, getOrderedEntityIds()); | 94 | query, getOrderedEntityIds()); |
@@ -96,9 +96,9 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> { | @@ -96,9 +96,9 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> { | ||
96 | stats.getAlarmQueryInvocationCnt().incrementAndGet(); | 96 | stats.getAlarmQueryInvocationCnt().incrementAndGet(); |
97 | stats.getAlarmQueryTimeSpent().addAndGet(end - start); | 97 | stats.getAlarmQueryTimeSpent().addAndGet(end - start); |
98 | alarms = setAndMergeAlarmsData(alarms); | 98 | alarms = setAndMergeAlarmsData(alarms); |
99 | - update = new AlarmDataUpdate(cmdId, alarms, null, tooManyEntities); | 99 | + update = new AlarmDataUpdate(cmdId, alarms, null, maxEntitiesPerAlarmSubscription, data.getTotalElements()); |
100 | } else { | 100 | } else { |
101 | - update = new AlarmDataUpdate(cmdId, new PageData<>(), null, false); | 101 | + update = new AlarmDataUpdate(cmdId, new PageData<>(), null, maxEntitiesPerAlarmSubscription, data.getTotalElements()); |
102 | } | 102 | } |
103 | wsService.sendWsMsg(getSessionId(), update); | 103 | wsService.sendWsMsg(getSessionId(), update); |
104 | } | 104 | } |
@@ -178,7 +178,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> { | @@ -178,7 +178,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> { | ||
178 | alarm.getLatest().computeIfAbsent(keyType, tmp -> new HashMap<>()).putAll(latestUpdate); | 178 | alarm.getLatest().computeIfAbsent(keyType, tmp -> new HashMap<>()).putAll(latestUpdate); |
179 | return alarm; | 179 | return alarm; |
180 | }).collect(Collectors.toList()); | 180 | }).collect(Collectors.toList()); |
181 | - wsService.sendWsMsg(sessionId, new AlarmDataUpdate(cmdId, null, update, tooManyEntities)); | 181 | + wsService.sendWsMsg(sessionId, new AlarmDataUpdate(cmdId, null, update, maxEntitiesPerAlarmSubscription, data.getTotalElements())); |
182 | } else { | 182 | } else { |
183 | log.trace("[{}][{}][{}][{}] Received stale subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate); | 183 | log.trace("[{}][{}][{}][{}] Received stale subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate); |
184 | } | 184 | } |
@@ -201,7 +201,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> { | @@ -201,7 +201,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> { | ||
201 | AlarmData updated = new AlarmData(alarm, current.getOriginatorName(), current.getEntityId()); | 201 | AlarmData updated = new AlarmData(alarm, current.getOriginatorName(), current.getEntityId()); |
202 | updated.getLatest().putAll(current.getLatest()); | 202 | updated.getLatest().putAll(current.getLatest()); |
203 | alarmsMap.put(alarmId, updated); | 203 | alarmsMap.put(alarmId, updated); |
204 | - wsService.sendWsMsg(sessionId, new AlarmDataUpdate(cmdId, null, Collections.singletonList(updated), tooManyEntities)); | 204 | + wsService.sendWsMsg(sessionId, new AlarmDataUpdate(cmdId, null, Collections.singletonList(updated), maxEntitiesPerAlarmSubscription, data.getTotalElements())); |
205 | } else { | 205 | } else { |
206 | fetchAlarms(); | 206 | fetchAlarms(); |
207 | } | 207 | } |
@@ -108,6 +108,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi | @@ -108,6 +108,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi | ||
108 | private static final String FAILED_TO_FETCH_DATA = "Failed to fetch data!"; | 108 | private static final String FAILED_TO_FETCH_DATA = "Failed to fetch data!"; |
109 | private static final String FAILED_TO_FETCH_ATTRIBUTES = "Failed to fetch attributes!"; | 109 | private static final String FAILED_TO_FETCH_ATTRIBUTES = "Failed to fetch attributes!"; |
110 | private static final String SESSION_META_DATA_NOT_FOUND = "Session meta-data not found!"; | 110 | private static final String SESSION_META_DATA_NOT_FOUND = "Session meta-data not found!"; |
111 | + private static final String FAILED_TO_PARSE_WS_COMMAND = "Failed to parse websocket command!"; | ||
111 | 112 | ||
112 | private final ConcurrentMap<String, WsSessionMetaData> wsSessionsMap = new ConcurrentHashMap<>(); | 113 | private final ConcurrentMap<String, WsSessionMetaData> wsSessionsMap = new ConcurrentHashMap<>(); |
113 | 114 | ||
@@ -224,8 +225,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi | @@ -224,8 +225,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi | ||
224 | } | 225 | } |
225 | } catch (IOException e) { | 226 | } catch (IOException e) { |
226 | log.warn("Failed to decode subscription cmd: {}", e.getMessage(), e); | 227 | log.warn("Failed to decode subscription cmd: {}", e.getMessage(), e); |
227 | - TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(UNKNOWN_SUBSCRIPTION_ID, SubscriptionErrorCode.INTERNAL_ERROR, SESSION_META_DATA_NOT_FOUND); | ||
228 | - sendWsMsg(sessionRef, update); | 228 | + sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(UNKNOWN_SUBSCRIPTION_ID, SubscriptionErrorCode.BAD_REQUEST, FAILED_TO_PARSE_WS_COMMAND)); |
229 | } | 229 | } |
230 | } | 230 | } |
231 | 231 |
@@ -17,6 +17,7 @@ package org.thingsboard.server.service.telemetry.cmd.v2; | @@ -17,6 +17,7 @@ package org.thingsboard.server.service.telemetry.cmd.v2; | ||
17 | 17 | ||
18 | import com.fasterxml.jackson.annotation.JsonCreator; | 18 | import com.fasterxml.jackson.annotation.JsonCreator; |
19 | import com.fasterxml.jackson.annotation.JsonProperty; | 19 | import com.fasterxml.jackson.annotation.JsonProperty; |
20 | +import lombok.Getter; | ||
20 | import lombok.NoArgsConstructor; | 21 | import lombok.NoArgsConstructor; |
21 | import org.thingsboard.server.common.data.page.PageData; | 22 | import org.thingsboard.server.common.data.page.PageData; |
22 | import org.thingsboard.server.common.data.query.AlarmData; | 23 | import org.thingsboard.server.common.data.query.AlarmData; |
@@ -27,11 +28,15 @@ import java.util.List; | @@ -27,11 +28,15 @@ import java.util.List; | ||
27 | 28 | ||
28 | public class AlarmDataUpdate extends DataUpdate<AlarmData> { | 29 | public class AlarmDataUpdate extends DataUpdate<AlarmData> { |
29 | 30 | ||
30 | - private boolean tooManyEntities; | 31 | + @Getter |
32 | + private long allowedEntities; | ||
33 | + @Getter | ||
34 | + private long totalEntities; | ||
31 | 35 | ||
32 | - public AlarmDataUpdate(int cmdId, PageData<AlarmData> data, List<AlarmData> update, boolean tooManyEntities) { | 36 | + public AlarmDataUpdate(int cmdId, PageData<AlarmData> data, List<AlarmData> update, long allowedEntities, long totalEntities) { |
33 | super(cmdId, data, update, SubscriptionErrorCode.NO_ERROR.getCode(), null); | 37 | super(cmdId, data, update, SubscriptionErrorCode.NO_ERROR.getCode(), null); |
34 | - this.tooManyEntities = tooManyEntities; | 38 | + this.allowedEntities = allowedEntities; |
39 | + this.totalEntities = totalEntities; | ||
35 | } | 40 | } |
36 | 41 | ||
37 | public AlarmDataUpdate(int cmdId, int errorCode, String errorMsg) { | 42 | public AlarmDataUpdate(int cmdId, int errorCode, String errorMsg) { |
@@ -49,8 +54,10 @@ public class AlarmDataUpdate extends DataUpdate<AlarmData> { | @@ -49,8 +54,10 @@ public class AlarmDataUpdate extends DataUpdate<AlarmData> { | ||
49 | @JsonProperty("update") List<AlarmData> update, | 54 | @JsonProperty("update") List<AlarmData> update, |
50 | @JsonProperty("errorCode") int errorCode, | 55 | @JsonProperty("errorCode") int errorCode, |
51 | @JsonProperty("errorMsg") String errorMsg, | 56 | @JsonProperty("errorMsg") String errorMsg, |
52 | - @JsonProperty("tooManyEntities") boolean tooManyEntities) { | 57 | + @JsonProperty("allowedEntities") long allowedEntities, |
58 | + @JsonProperty("totalEntities") long totalEntities) { | ||
53 | super(cmdId, data, update, errorCode, errorMsg); | 59 | super(cmdId, data, update, errorCode, errorMsg); |
54 | - this.tooManyEntities = tooManyEntities; | 60 | + this.allowedEntities = allowedEntities; |
61 | + this.totalEntities = totalEntities; | ||
55 | } | 62 | } |
56 | } | 63 | } |