Commit ce6ec889833db0802fa3b7af01b02ae95873c754

Authored by Andrii Shvaika
1 parent e102b55b

Refactoring of the websocket and subscription services

Showing 31 changed files with 1541 additions and 790 deletions
... ... @@ -224,7 +224,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
224 224 }
225 225
226 226 void process(ActorContext context, TransportToDeviceActorMsgWrapper wrapper) {
227   - boolean reportDeviceActivity = false;
  227 + boolean reportDeviceActivity = true;
228 228 TransportToDeviceActorMsg msg = wrapper.getMsg();
229 229 TbMsgCallback callback = wrapper.getCallback();
230 230 if (msg.hasSessionEvent()) {
... ... @@ -263,7 +263,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
263 263 callback.onSuccess();
264 264 }
265 265
266   - //TODO 2.5 move this as a notification to the queue;
267 266 private void reportLogicalDeviceActivity() {
268 267 systemContext.getDeviceStateService().onDeviceActivity(deviceId);
269 268 }
... ...
... ... @@ -19,6 +19,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
19 19 import org.thingsboard.server.common.data.id.EntityId;
20 20 import org.thingsboard.server.common.data.id.TenantId;
21 21 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
  22 +import org.thingsboard.server.common.msg.TbActorMsg;
22 23 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
23 24 import org.thingsboard.server.common.transport.SessionMsgProcessor;
24 25
... ... @@ -26,7 +27,7 @@ public interface ActorService extends SessionMsgProcessor {
26 27
27 28 void onEntityStateChange(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state);
28 29
29   - void onMsg(SendToClusterMsg msg);
  30 + void onMsg(TbActorMsg msg);
30 31
31 32 void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId);
32 33
... ...
... ... @@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
37 37 import org.thingsboard.server.common.data.id.EntityId;
38 38 import org.thingsboard.server.common.data.id.TenantId;
39 39 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
  40 +import org.thingsboard.server.common.msg.TbActorMsg;
40 41 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
41 42 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
42 43 import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
... ... @@ -108,7 +109,7 @@ public class DefaultActorService implements ActorService {
108 109 }
109 110
110 111 @Override
111   - public void onMsg(SendToClusterMsg msg) {
  112 + public void onMsg(TbActorMsg msg) {
112 113 appActor.tell(msg, ActorRef.noSender());
113 114 }
114 115
... ...
... ... @@ -24,6 +24,7 @@ import org.springframework.context.event.EventListener;
24 24 import org.springframework.scheduling.annotation.Scheduled;
25 25 import org.springframework.stereotype.Service;
26 26 import org.thingsboard.common.util.ThingsBoardThreadFactory;
  27 +import org.thingsboard.server.common.data.id.TenantId;
27 28 import org.thingsboard.server.queue.TbQueueConsumer;
28 29 import org.thingsboard.server.actors.ActorSystemContext;
29 30 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
... ... @@ -34,6 +35,10 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
34 35 import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
35 36 import org.thingsboard.server.queue.provider.TbCoreQueueProvider;
36 37 import org.thingsboard.server.service.state.DeviceStateService;
  38 +import org.thingsboard.server.service.subscription.LocalSubscriptionService;
  39 +import org.thingsboard.server.service.subscription.SubscriptionManagerService;
  40 +import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
  41 +import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate;
37 42 import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
38 43
39 44 import javax.annotation.PostConstruct;
... ... @@ -62,15 +67,19 @@ public class DefaultTbCoreConsumerService implements TbCoreConsumerService {
62 67
63 68 private final ActorSystemContext actorContext;
64 69 private final DeviceStateService stateService;
  70 + private final LocalSubscriptionService localSubscriptionService;
  71 + private final SubscriptionManagerService subscriptionManagerService;
65 72 private final TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> consumer;
66 73 private final TbCoreConsumerStats stats = new TbCoreConsumerStats();
67 74 private volatile ExecutorService mainConsumerExecutor;
68 75 private volatile boolean stopped = false;
69 76
70   - public DefaultTbCoreConsumerService(TbCoreQueueProvider tbCoreQueueProvider, ActorSystemContext actorContext, DeviceStateService stateService) {
  77 + public DefaultTbCoreConsumerService(TbCoreQueueProvider tbCoreQueueProvider, ActorSystemContext actorContext, DeviceStateService stateService, LocalSubscriptionService localSubscriptionService, SubscriptionManagerService subscriptionManagerService) {
71 78 this.consumer = tbCoreQueueProvider.getToCoreMsgConsumer();
72 79 this.actorContext = actorContext;
73 80 this.stateService = stateService;
  81 + this.localSubscriptionService = localSubscriptionService;
  82 + this.subscriptionManagerService = subscriptionManagerService;
74 83 }
75 84
76 85 @PostConstruct
... ... @@ -108,8 +117,15 @@ public class DefaultTbCoreConsumerService implements TbCoreConsumerService {
108 117 } else if (toCoreMsg.hasDeviceStateServiceMsg()) {
109 118 log.trace("[{}] Forwarding message to state service {}", id, toCoreMsg.getDeviceStateServiceMsg());
110 119 forwardToStateService(toCoreMsg.getDeviceStateServiceMsg(), callback);
  120 + } else if (toCoreMsg.hasToSubscriptionMgrMsg()) {
  121 + log.trace("[{}] Forwarding message to subscription manager service {}", id, toCoreMsg.getToSubscriptionMgrMsg());
  122 + forwardToSubMgrService(toCoreMsg.getToSubscriptionMgrMsg(), callback);
  123 + } else if (toCoreMsg.hasToLocalSubscriptionServiceMsg()) {
  124 + log.trace("[{}] Forwarding message to local subscription service {}", id, toCoreMsg.getToLocalSubscriptionServiceMsg());
  125 + forwardToLocalSubMgrService(toCoreMsg.getToLocalSubscriptionServiceMsg(), callback);
111 126 }
112 127 } catch (Throwable e) {
  128 + log.warn("[{}] Failed to process message: {}", id, msg, e);
113 129 callback.onFailure(e);
114 130 }
115 131 });
... ... @@ -126,23 +142,10 @@ public class DefaultTbCoreConsumerService implements TbCoreConsumerService {
126 142 }
127 143 }
128 144 }
  145 + log.info("Tb Core Consumer stopped.");
129 146 });
130 147 }
131 148
132   - private void forwardToStateService(TransportProtos.DeviceStateServiceMsgProto deviceStateServiceMsg, TbMsgCallback callback) {
133   - if (statsEnabled) {
134   - stats.log(deviceStateServiceMsg);
135   - }
136   - stateService.onQueueMsg(deviceStateServiceMsg, callback);
137   - }
138   -
139   - private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg, TbMsgCallback callback) {
140   - if (statsEnabled) {
141   - stats.log(toDeviceActorMsg);
142   - }
143   - actorContext.getAppActor().tell(new TransportToDeviceActorMsgWrapper(toDeviceActorMsg, callback), ActorRef.noSender());
144   - }
145   -
146 149 @Scheduled(fixedDelayString = "${queue.core.stats.print_interval_ms}")
147 150 public void printStats() {
148 151 if (statsEnabled) {
... ... @@ -161,4 +164,55 @@ public class DefaultTbCoreConsumerService implements TbCoreConsumerService {
161 164 }
162 165 }
163 166
  167 + private void forwardToLocalSubMgrService(TransportProtos.LocalSubscriptionServiceMsgProto msg, TbMsgCallback callback) {
  168 + if (msg.hasSubUpdate()) {
  169 + localSubscriptionService.onSubscriptionUpdate(msg.getSubUpdate().getSessionId(), TbSubscriptionUtils.fromProto(msg.getSubUpdate()), callback);
  170 + } else {
  171 + throwNotHandled(msg, callback);
  172 + }
  173 + }
  174 +
  175 + private void forwardToSubMgrService(TransportProtos.SubscriptionMgrMsgProto msg, TbMsgCallback callback) {
  176 + if (msg.hasAttributeSub()) {
  177 + subscriptionManagerService.addSubscription(TbSubscriptionUtils.fromProto(msg.getAttributeSub()), callback);
  178 + } else if (msg.hasTelemetrySub()) {
  179 + subscriptionManagerService.addSubscription(TbSubscriptionUtils.fromProto(msg.getTelemetrySub()), callback);
  180 + } else if (msg.hasSubClose()) {
  181 + TransportProtos.TbSubscriptionCloseProto closeProto = msg.getSubClose();
  182 + subscriptionManagerService.cancelSubscription(closeProto.getSessionId(), closeProto.getSubscriptionId(), callback);
  183 + } else if (msg.hasTsUpdate()) {
  184 + TransportProtos.TbTimeSeriesUpdateProto proto = msg.getTsUpdate();
  185 + subscriptionManagerService.onTimeseriesDataUpdate(
  186 + new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
  187 + TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
  188 + TbSubscriptionUtils.toTsKvEntityList(proto.getDataList()), callback);
  189 + } else if (msg.hasAttrUpdate()) {
  190 + TransportProtos.TbAttributeUpdateProto proto = msg.getAttrUpdate();
  191 + subscriptionManagerService.onAttributesUpdate(
  192 + new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
  193 + TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
  194 + proto.getScope(), TbSubscriptionUtils.toAttributeKvList(proto.getDataList()), callback);
  195 + } else {
  196 + throwNotHandled(msg, callback);
  197 + }
  198 + }
  199 +
  200 + private void forwardToStateService(TransportProtos.DeviceStateServiceMsgProto deviceStateServiceMsg, TbMsgCallback callback) {
  201 + if (statsEnabled) {
  202 + stats.log(deviceStateServiceMsg);
  203 + }
  204 + stateService.onQueueMsg(deviceStateServiceMsg, callback);
  205 + }
  206 +
  207 + private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg, TbMsgCallback callback) {
  208 + if (statsEnabled) {
  209 + stats.log(toDeviceActorMsg);
  210 + }
  211 + actorContext.getAppActor().tell(new TransportToDeviceActorMsgWrapper(toDeviceActorMsg, callback), ActorRef.noSender());
  212 + }
  213 +
  214 + private void throwNotHandled(Object msg, TbMsgCallback callback) {
  215 + log.warn("Message not handled: {}", msg);
  216 + callback.onFailure(new RuntimeException("Message not handled!"));
  217 + }
164 218 }
... ...
... ... @@ -17,6 +17,19 @@ package org.thingsboard.server.service.queue;
17 17
18 18 public interface TbMsgCallback {
19 19
  20 + TbMsgCallback EMPTY = new TbMsgCallback() {
  21 +
  22 + @Override
  23 + public void onSuccess() {
  24 +
  25 + }
  26 +
  27 + @Override
  28 + public void onFailure(Throwable t) {
  29 +
  30 + }
  31 + };
  32 +
20 33 void onSuccess();
21 34
22 35 void onFailure(Throwable t);
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -92,7 +92,6 @@ import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
92 92 */
93 93 @Service
94 94 @Slf4j
95   -//TODO: refactor to use page links as cursor and not fetch all
96 95 public class DefaultDeviceStateService implements DeviceStateService {
97 96
98 97 private static final ObjectMapper json = new ObjectMapper();
... ... @@ -150,7 +149,6 @@ public class DefaultDeviceStateService implements DeviceStateService {
150 149 private volatile boolean clusterUpdatePending = false;
151 150
152 151 private ListeningScheduledExecutorService queueExecutor;
153   - private ConcurrentMap<TenantId, Set<DeviceId>> tenantDevices = new ConcurrentHashMap<>();
154 152 private ConcurrentMap<TopicPartitionInfo, Set<DeviceId>> partitionedDevices = new ConcurrentHashMap<>();
155 153 private ConcurrentMap<DeviceId, DeviceStateData> deviceStates = new ConcurrentHashMap<>();
156 154 private ConcurrentMap<DeviceId, Long> deviceLastReportedActivity = new ConcurrentHashMap<>();
... ... @@ -378,7 +376,6 @@ public class DefaultDeviceStateService implements DeviceStateService {
378 376 deviceStates.put(state.getDeviceId(), state);
379 377 }
380 378
381   - //TODO 2.5: review this method
382 379 private void updateState() {
383 380 long ts = System.currentTimeMillis();
384 381 Set<DeviceId> deviceIds = new HashSet<>(deviceStates.keySet());
... ... @@ -439,13 +436,9 @@ public class DefaultDeviceStateService implements DeviceStateService {
439 436 deviceStates.remove(deviceId);
440 437 deviceLastReportedActivity.remove(deviceId);
441 438 deviceLastSavedActivity.remove(deviceId);
442   - Set<DeviceId> deviceIds = tenantDevices.get(tenantId);
443   - if (deviceIds != null) {
444   - deviceIds.remove(deviceId);
445   - if (deviceIds.isEmpty()) {
446   - tenantDevices.remove(tenantId);
447   - }
448   - }
  439 + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
  440 + Set<DeviceId> deviceIdSet = partitionedDevices.get(tpi);
  441 + deviceIdSet.remove(deviceId);
449 442 }
450 443
451 444 private ListenableFuture<DeviceStateData> fetchDeviceState(Device device) {
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.subscription;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.springframework.beans.factory.annotation.Autowired;
  20 +import org.springframework.context.annotation.Lazy;
  21 +import org.springframework.context.event.EventListener;
  22 +import org.springframework.stereotype.Service;
  23 +import org.thingsboard.common.util.ThingsBoardThreadFactory;
  24 +import org.thingsboard.server.common.data.EntityType;
  25 +import org.thingsboard.server.common.data.EntityView;
  26 +import org.thingsboard.server.common.data.id.EntityId;
  27 +import org.thingsboard.server.common.data.id.EntityViewId;
  28 +import org.thingsboard.server.common.data.id.TenantId;
  29 +import org.thingsboard.server.dao.entityview.EntityViewService;
  30 +import org.thingsboard.server.gen.transport.TransportProtos;
  31 +import org.thingsboard.server.queue.TbQueueProducer;
  32 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  33 +import org.thingsboard.server.queue.discovery.ClusterTopologyChangeEvent;
  34 +import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
  35 +import org.thingsboard.server.queue.discovery.PartitionService;
  36 +import org.thingsboard.server.queue.discovery.ServiceType;
  37 +import org.thingsboard.server.queue.discovery.TopicPartitionInfo;
  38 +import org.thingsboard.server.queue.provider.TbCoreQueueProvider;
  39 +import org.thingsboard.server.service.queue.TbMsgCallback;
  40 +import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
  41 +import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate;
  42 +
  43 +import javax.annotation.PostConstruct;
  44 +import javax.annotation.PreDestroy;
  45 +import java.util.Collections;
  46 +import java.util.HashSet;
  47 +import java.util.Map;
  48 +import java.util.Set;
  49 +import java.util.concurrent.ConcurrentHashMap;
  50 +import java.util.concurrent.ExecutorService;
  51 +import java.util.concurrent.Executors;
  52 +import java.util.stream.Collectors;
  53 +
  54 +@Slf4j
  55 +@Service
  56 +public class DefaultLocalSubscriptionService implements LocalSubscriptionService {
  57 +
  58 + private final Set<TopicPartitionInfo> currentPartitions = ConcurrentHashMap.newKeySet();
  59 + private final Map<String, Map<Integer, TbSubscription>> subscriptionsBySessionId = new ConcurrentHashMap<>();
  60 +
  61 + @Autowired
  62 + private TelemetryWebSocketService wsService;
  63 +
  64 + @Autowired
  65 + private EntityViewService entityViewService;
  66 +
  67 + @Autowired
  68 + private PartitionService partitionService;
  69 +
  70 + @Autowired
  71 + private TbCoreQueueProvider coreQueueProvider;
  72 +
  73 + @Autowired
  74 + @Lazy
  75 + private SubscriptionManagerService subscriptionManagerService;
  76 +
  77 + private ExecutorService wsCallBackExecutor;
  78 + private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> toCoreProducer;
  79 +
  80 + @PostConstruct
  81 + public void initExecutor() {
  82 + wsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ws-sub-callback"));
  83 + toCoreProducer = coreQueueProvider.getTbCoreMsgProducer();
  84 + }
  85 +
  86 + @PreDestroy
  87 + public void shutdownExecutor() {
  88 + if (wsCallBackExecutor != null) {
  89 + wsCallBackExecutor.shutdownNow();
  90 + }
  91 + }
  92 +
  93 + @Override
  94 + @EventListener(PartitionChangeEvent.class)
  95 + public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
  96 + if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceKey().getServiceType())) {
  97 + currentPartitions.clear();
  98 + currentPartitions.addAll(partitionChangeEvent.getPartitions());
  99 + }
  100 + }
  101 +
  102 + @Override
  103 + @EventListener(ClusterTopologyChangeEvent.class)
  104 + public void onApplicationEvent(ClusterTopologyChangeEvent event) {
  105 + if (event.getServiceKeys().stream().anyMatch(key -> ServiceType.TB_CORE.equals(key.getServiceType()))) {
  106 + /*
  107 + * If the cluster topology has changed, we need to push all current subscriptions to SubscriptionManagerService again.
  108 + * Otherwise, the SubscriptionManagerService may "forget" those subscriptions in case of restart.
  109 + * Although this is resource consuming operation, it is cheaper than sending ping/pong commands periodically
  110 + * It is also cheaper then caching the subscriptions by entity id and then lookup of those caches every time we have new telemetry in SubscriptionManagerService.
  111 + * Even if we cache locally the list of active subscriptions by entity id, it is still time consuming operation to get them from cache
  112 + * Since number of subscriptions is usually much less then number of devices that are pushing data.
  113 + */
  114 + subscriptionsBySessionId.values().forEach(map -> map.values()
  115 + .forEach(sub -> pushSubscriptionToManagerService(sub, false)));
  116 + }
  117 + }
  118 +
  119 + //TODO 3.1: replace null callbacks with callbacks from websocket service.
  120 + @Override
  121 + public void addSubscription(TbSubscription subscription) {
  122 + EntityId entityId = subscription.getEntityId();
  123 + // Telemetry subscription on Entity Views are handled differently, because we need to allow only certain keys and time ranges;
  124 + if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW) && TbSubscriptionType.TIMESERIES.equals(subscription.getType())) {
  125 + subscription = resolveEntityViewSubscription((TbTimeseriesSubscription) subscription);
  126 + }
  127 + pushSubscriptionToManagerService(subscription, true);
  128 + registerSubscription(subscription);
  129 + }
  130 +
  131 + private void pushSubscriptionToManagerService(TbSubscription subscription, boolean pushToLocalService) {
  132 + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, subscription.getTenantId(), subscription.getEntityId());
  133 + if (currentPartitions.contains(tpi)) {
  134 + // Subscription is managed on the same server;
  135 + if (pushToLocalService) {
  136 + subscriptionManagerService.addSubscription(subscription, TbMsgCallback.EMPTY);
  137 + }
  138 + } else {
  139 + // Push to the queue;
  140 + TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toNewSubscriptionProto(subscription);
  141 + toCoreProducer.send(tpi, new TbProtoQueueMsg<>(subscription.getEntityId().getId(), toCoreMsg), null);
  142 + }
  143 + }
  144 +
  145 + @Override
  146 + public void onSubscriptionUpdate(String sessionId, SubscriptionUpdate update, TbMsgCallback callback) {
  147 + TbSubscription subscription = subscriptionsBySessionId
  148 + .getOrDefault(sessionId, Collections.emptyMap()).get(update.getSubscriptionId());
  149 + if (subscription != null) {
  150 + switch (subscription.getType()) {
  151 + case TIMESERIES:
  152 + TbTimeseriesSubscription tsSub = (TbTimeseriesSubscription) subscription;
  153 + update.getLatestValues().forEach((key, value) -> tsSub.getKeyStates().put(key, value));
  154 + break;
  155 + case ATTRIBUTES:
  156 + TbAttributeSubscription attrSub = (TbAttributeSubscription) subscription;
  157 + update.getLatestValues().forEach((key, value) -> attrSub.getKeyStates().put(key, value));
  158 + break;
  159 + }
  160 + wsService.sendWsMsg(sessionId, update);
  161 + }
  162 + callback.onSuccess();
  163 + }
  164 +
  165 + @Override
  166 + public void cancelSubscription(String sessionId, int subscriptionId) {
  167 + log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId);
  168 + Map<Integer, TbSubscription> sessionSubscriptions = subscriptionsBySessionId.get(sessionId);
  169 + if (sessionSubscriptions != null) {
  170 + TbSubscription subscription = sessionSubscriptions.remove(subscriptionId);
  171 + if (subscription != null) {
  172 + if (sessionSubscriptions.isEmpty()) {
  173 + subscriptionsBySessionId.remove(sessionId);
  174 + }
  175 + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, subscription.getTenantId(), subscription.getEntityId());
  176 + if (currentPartitions.contains(tpi)) {
  177 + // Subscription is managed on the same server;
  178 + subscriptionManagerService.cancelSubscription(sessionId, subscriptionId, TbMsgCallback.EMPTY);
  179 + } else {
  180 + // Push to the queue;
  181 + TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toCloseSubscriptionProto(subscription);
  182 + toCoreProducer.send(tpi, new TbProtoQueueMsg<>(subscription.getEntityId().getId(), toCoreMsg), null);
  183 + }
  184 + } else {
  185 + log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId);
  186 + }
  187 + } else {
  188 + log.debug("[{}] No session subscriptions found!", sessionId);
  189 + }
  190 + }
  191 +
  192 + @Override
  193 + public void cancelAllSessionSubscriptions(String sessionId) {
  194 + Map<Integer, TbSubscription> subscriptions = subscriptionsBySessionId.get(sessionId);
  195 + if (subscriptions != null) {
  196 + Set<Integer> toRemove = new HashSet<>(subscriptions.keySet());
  197 + toRemove.forEach(id -> cancelSubscription(sessionId, id));
  198 + }
  199 + }
  200 +
  201 + private TbSubscription resolveEntityViewSubscription(TbTimeseriesSubscription subscription) {
  202 + EntityView entityView = entityViewService.findEntityViewById(TenantId.SYS_TENANT_ID, new EntityViewId(subscription.getEntityId().getId()));
  203 +
  204 + Map<String, Long> keyStates;
  205 + if (subscription.isAllKeys()) {
  206 + keyStates = entityView.getKeys().getTimeseries().stream().collect(Collectors.toMap(k -> k, k -> 0L));
  207 + } else {
  208 + keyStates = subscription.getKeyStates().entrySet()
  209 + .stream().filter(entry -> entityView.getKeys().getTimeseries().contains(entry.getKey()))
  210 + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
  211 + }
  212 +
  213 + return TbTimeseriesSubscription.builder()
  214 + .serviceId(subscription.getServiceId())
  215 + .sessionId(subscription.getSessionId())
  216 + .subscriptionId(subscription.getSubscriptionId())
  217 + .tenantId(subscription.getTenantId())
  218 + .entityId(entityView.getEntityId())
  219 + .startTime(entityView.getStartTimeMs())
  220 + .endTime(entityView.getEndTimeMs())
  221 + .allKeys(false)
  222 + .keyStates(keyStates).build();
  223 + }
  224 +
  225 + private void registerSubscription(TbSubscription subscription) {
  226 + Map<Integer, TbSubscription> sessionSubscriptions = subscriptionsBySessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap<>());
  227 + sessionSubscriptions.put(subscription.getSubscriptionId(), subscription);
  228 + }
  229 +
  230 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.subscription;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.springframework.beans.factory.annotation.Autowired;
  20 +import org.springframework.stereotype.Service;
  21 +import org.springframework.util.StringUtils;
  22 +import org.thingsboard.common.util.DonAsynchron;
  23 +import org.thingsboard.common.util.ThingsBoardThreadFactory;
  24 +import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
  25 +import org.thingsboard.server.actors.service.ActorService;
  26 +import org.thingsboard.server.common.data.DataConstants;
  27 +import org.thingsboard.server.common.data.EntityType;
  28 +import org.thingsboard.server.common.data.id.DeviceId;
  29 +import org.thingsboard.server.common.data.id.EntityId;
  30 +import org.thingsboard.server.common.data.id.TenantId;
  31 +import org.thingsboard.server.common.data.kv.Aggregation;
  32 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  33 +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
  34 +import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
  35 +import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
  36 +import org.thingsboard.server.common.data.kv.TsKvEntry;
  37 +import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
  38 +import org.thingsboard.server.dao.attributes.AttributesService;
  39 +import org.thingsboard.server.dao.timeseries.TimeseriesService;
  40 +import org.thingsboard.server.gen.transport.TransportProtos.LocalSubscriptionServiceMsgProto;
  41 +import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateProto;
  42 +import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateValueListProto;
  43 +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
  44 +import org.thingsboard.server.queue.TbQueueProducer;
  45 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  46 +import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
  47 +import org.thingsboard.server.queue.discovery.PartitionService;
  48 +import org.thingsboard.server.queue.discovery.ServiceType;
  49 +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
  50 +import org.thingsboard.server.queue.discovery.TopicPartitionInfo;
  51 +import org.thingsboard.server.queue.provider.TbCoreQueueProvider;
  52 +import org.thingsboard.server.service.queue.TbMsgCallback;
  53 +import org.thingsboard.server.service.state.DefaultDeviceStateService;
  54 +import org.thingsboard.server.service.state.DeviceStateService;
  55 +import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate;
  56 +
  57 +import javax.annotation.PostConstruct;
  58 +import javax.annotation.PreDestroy;
  59 +import java.util.ArrayList;
  60 +import java.util.HashSet;
  61 +import java.util.List;
  62 +import java.util.Map;
  63 +import java.util.Objects;
  64 +import java.util.Set;
  65 +import java.util.TreeMap;
  66 +import java.util.concurrent.ConcurrentHashMap;
  67 +import java.util.concurrent.ConcurrentMap;
  68 +import java.util.concurrent.ExecutorService;
  69 +import java.util.concurrent.Executors;
  70 +import java.util.function.Function;
  71 +import java.util.function.Predicate;
  72 +
  73 +@Slf4j
  74 +@Service
  75 +public class DefaultSubscriptionManagerService implements SubscriptionManagerService {
  76 +
  77 + @Autowired
  78 + private AttributesService attrService;
  79 +
  80 + @Autowired
  81 + private TimeseriesService tsService;
  82 +
  83 + @Autowired
  84 + private PartitionService partitionService;
  85 +
  86 + @Autowired
  87 + private TbServiceInfoProvider serviceInfoProvider;
  88 +
  89 + @Autowired
  90 + private TbCoreQueueProvider coreQueueProvider;
  91 +
  92 + @Autowired
  93 + private LocalSubscriptionService localSubscriptionService;
  94 +
  95 + @Autowired
  96 + private DeviceStateService deviceStateService;
  97 +
  98 + @Autowired
  99 + private ActorService actorService;
  100 +
  101 + private final Map<EntityId, Set<TbSubscription>> subscriptionsByEntityId = new ConcurrentHashMap<>();
  102 + private final Map<String, Map<Integer, TbSubscription>> subscriptionsByWsSessionId = new ConcurrentHashMap<>();
  103 + private final ConcurrentMap<TopicPartitionInfo, Set<TbSubscription>> partitionedSubscriptions = new ConcurrentHashMap<>();
  104 + private final Set<TopicPartitionInfo> currentPartitions = ConcurrentHashMap.newKeySet();
  105 +
  106 + private ExecutorService tsCallBackExecutor;
  107 + private String serviceId;
  108 + private TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> toCoreProducer;
  109 +
  110 + @PostConstruct
  111 + public void initExecutor() {
  112 + tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-sub-callback"));
  113 + serviceId = serviceInfoProvider.getServiceId();
  114 + toCoreProducer = coreQueueProvider.getTbCoreMsgProducer();
  115 + }
  116 +
  117 + @PreDestroy
  118 + public void shutdownExecutor() {
  119 + if (tsCallBackExecutor != null) {
  120 + tsCallBackExecutor.shutdownNow();
  121 + }
  122 + }
  123 +
  124 + @Override
  125 + public void addSubscription(TbSubscription subscription, TbMsgCallback callback) {
  126 + log.trace("[{}][{}][{}] Registering remote subscription for entity [{}]",
  127 + subscription.getServiceId(), subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId());
  128 + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, subscription.getTenantId(), subscription.getEntityId());
  129 + if (currentPartitions.contains(tpi)) {
  130 + partitionedSubscriptions.computeIfAbsent(tpi, k -> ConcurrentHashMap.newKeySet()).add(subscription);
  131 + callback.onSuccess();
  132 + } else {
  133 + log.warn("[{}][{}] Entity belongs to external partition. Probably rebalancing is in progress. Topic: {}"
  134 + , subscription.getTenantId(), subscription.getEntityId(), tpi.getFullTopicName());
  135 + callback.onFailure(new RuntimeException("Entity belongs to external partition " + tpi.getFullTopicName() + "!"));
  136 + }
  137 + boolean newSubscription = subscriptionsByEntityId
  138 + .computeIfAbsent(subscription.getEntityId(), k -> ConcurrentHashMap.newKeySet()).add(subscription);
  139 + subscriptionsByWsSessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap<>()).put(subscription.getSubscriptionId(), subscription);
  140 + if (newSubscription) {
  141 + switch (subscription.getType()) {
  142 + case TIMESERIES:
  143 + handleNewTelemetrySubscription((TbTimeseriesSubscription) subscription);
  144 + break;
  145 + case ATTRIBUTES:
  146 + handleNewAttributeSubscription((TbAttributeSubscription) subscription);
  147 + break;
  148 + }
  149 + }
  150 + }
  151 +
  152 + @Override
  153 + public void cancelSubscription(String sessionId, int subscriptionId, TbMsgCallback callback) {
  154 + log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId);
  155 + Map<Integer, TbSubscription> sessionSubscriptions = subscriptionsByWsSessionId.get(sessionId);
  156 + if (sessionSubscriptions != null) {
  157 + TbSubscription subscription = sessionSubscriptions.remove(subscriptionId);
  158 + if (subscription != null) {
  159 + removeSubscriptionFromEntityMap(subscription);
  160 + removeSubscriptionFromPartitionMap(subscription);
  161 + if (sessionSubscriptions.isEmpty()) {
  162 + subscriptionsByWsSessionId.remove(sessionId);
  163 + }
  164 + } else {
  165 + log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId);
  166 + }
  167 + } else {
  168 + log.debug("[{}] No session subscriptions found!", sessionId);
  169 + }
  170 + callback.onSuccess();
  171 + }
  172 +
  173 + @Override
  174 + public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
  175 + Set<TopicPartitionInfo> removedPartitions = new HashSet<>(currentPartitions);
  176 + removedPartitions.removeAll(partitionChangeEvent.getPartitions());
  177 +
  178 + currentPartitions.clear();
  179 + currentPartitions.addAll(partitionChangeEvent.getPartitions());
  180 +
  181 + // We no longer manage current partition of devices;
  182 + removedPartitions.forEach(partition -> {
  183 + Set<TbSubscription> subs = partitionedSubscriptions.remove(partition);
  184 + if (subs != null) {
  185 + subs.forEach(this::removeSubscriptionFromEntityMap);
  186 + }
  187 + });
  188 + }
  189 +
  190 + @Override
  191 + public void onTimeseriesDataUpdate(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, TbMsgCallback callback) {
  192 + onLocalSubUpdate(entityId,
  193 + s -> {
  194 + if (TbSubscriptionType.TIMESERIES.equals(s.getType())) {
  195 + return (TbTimeseriesSubscription) s;
  196 + } else {
  197 + return null;
  198 + }
  199 + }, s -> true, s -> {
  200 + List<TsKvEntry> subscriptionUpdate = null;
  201 + for (TsKvEntry kv : ts) {
  202 + if (isInTimeRange(s, kv.getTs()) && (s.isAllKeys() || s.getKeyStates().containsKey((kv.getKey())))) {
  203 + if (subscriptionUpdate == null) {
  204 + subscriptionUpdate = new ArrayList<>();
  205 + }
  206 + subscriptionUpdate.add(kv);
  207 + }
  208 + }
  209 + return subscriptionUpdate;
  210 + });
  211 + callback.onSuccess();
  212 + }
  213 +
  214 + @Override
  215 + public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, TbMsgCallback callback) {
  216 + onLocalSubUpdate(entityId,
  217 + s -> {
  218 + if (TbSubscriptionType.ATTRIBUTES.equals(s.getType())) {
  219 + return (TbAttributeSubscription) s;
  220 + } else {
  221 + return null;
  222 + }
  223 + },
  224 + s -> (StringUtils.isEmpty(s.getScope()) || scope.equals(s.getScope().name())),
  225 + s -> {
  226 + List<TsKvEntry> subscriptionUpdate = null;
  227 + for (AttributeKvEntry kv : attributes) {
  228 + if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) {
  229 + if (subscriptionUpdate == null) {
  230 + subscriptionUpdate = new ArrayList<>();
  231 + }
  232 + subscriptionUpdate.add(new BasicTsKvEntry(kv.getLastUpdateTs(), kv));
  233 + }
  234 + }
  235 + return subscriptionUpdate;
  236 + });
  237 + if (entityId.getEntityType() == EntityType.DEVICE) {
  238 + if (TbAttributeSubscriptionScope.SERVER_SCOPE.name().equalsIgnoreCase(scope)) {
  239 + for (AttributeKvEntry attribute : attributes) {
  240 + if (attribute.getKey().equals(DefaultDeviceStateService.INACTIVITY_TIMEOUT)) {
  241 + deviceStateService.onDeviceInactivityTimeoutUpdate(new DeviceId(entityId.getId()), attribute.getLongValue().orElse(0L));
  242 + }
  243 + }
  244 + } else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope)) {
  245 + DeviceAttributesEventNotificationMsg notificationMsg = DeviceAttributesEventNotificationMsg.onUpdate(tenantId,
  246 + new DeviceId(entityId.getId()), DataConstants.SHARED_SCOPE, new ArrayList<>(attributes));
  247 + actorService.onMsg(notificationMsg);
  248 + }
  249 + }
  250 + callback.onSuccess();
  251 + }
  252 +
  253 + private <T extends TbSubscription> void onLocalSubUpdate(EntityId entityId,
  254 + Function<TbSubscription, T> castFunction,
  255 + Predicate<T> filterFunction,
  256 + Function<T, List<TsKvEntry>> processFunction) {
  257 + Set<TbSubscription> entitySubscriptions = subscriptionsByEntityId.get(entityId);
  258 + if (entitySubscriptions != null) {
  259 + entitySubscriptions.stream().map(castFunction).filter(Objects::nonNull).filter(filterFunction).forEach(s -> {
  260 + List<TsKvEntry> subscriptionUpdate = processFunction.apply(s);
  261 + if (subscriptionUpdate != null && !subscriptionUpdate.isEmpty()) {
  262 + if (serviceId.equals(s.getServiceId())) {
  263 + SubscriptionUpdate update = new SubscriptionUpdate(s.getSubscriptionId(), subscriptionUpdate);
  264 + localSubscriptionService.onSubscriptionUpdate(s.getSessionId(), update, TbMsgCallback.EMPTY);
  265 + } else {
  266 + TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, s.getServiceId());
  267 + toCoreProducer.send(tpi, toProto(s, subscriptionUpdate), null);
  268 + }
  269 + }
  270 + });
  271 + } else {
  272 + log.debug("[{}] No device subscriptions to process!", entityId);
  273 + }
  274 + }
  275 +
  276 + private boolean isInTimeRange(TbTimeseriesSubscription subscription, long kvTime) {
  277 + return (subscription.getStartTime() == 0 || subscription.getStartTime() <= kvTime)
  278 + && (subscription.getEndTime() == 0 || subscription.getEndTime() >= kvTime);
  279 + }
  280 +
  281 + private void removeSubscriptionFromEntityMap(TbSubscription sub) {
  282 + Set<TbSubscription> entitySubSet = subscriptionsByEntityId.get(sub.getEntityId());
  283 + if (entitySubSet != null) {
  284 + entitySubSet.remove(sub);
  285 + if (entitySubSet.isEmpty()) {
  286 + subscriptionsByEntityId.remove(sub.getEntityId());
  287 + }
  288 + }
  289 + }
  290 +
  291 + private void removeSubscriptionFromPartitionMap(TbSubscription sub) {
  292 + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, sub.getTenantId(), sub.getEntityId());
  293 + Set<TbSubscription> subs = partitionedSubscriptions.get(tpi);
  294 + if (subs != null) {
  295 + subs.remove(sub);
  296 + }
  297 + }
  298 +
  299 + private void handleNewAttributeSubscription(TbAttributeSubscription subscription) {
  300 + log.trace("[{}][{}][{}] Processing remote attribute subscription for entity [{}]",
  301 + serviceId, subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId());
  302 +
  303 + final Map<String, Long> keyStates = subscription.getKeyStates();
  304 + DonAsynchron.withCallback(attrService.find(subscription.getTenantId(), subscription.getEntityId(), DataConstants.CLIENT_SCOPE, keyStates.keySet()), values -> {
  305 + List<TsKvEntry> missedUpdates = new ArrayList<>();
  306 + values.forEach(latestEntry -> {
  307 + if (latestEntry.getLastUpdateTs() > keyStates.get(latestEntry.getKey())) {
  308 + missedUpdates.add(new BasicTsKvEntry(latestEntry.getLastUpdateTs(), latestEntry));
  309 + }
  310 + });
  311 + if (!missedUpdates.isEmpty()) {
  312 + TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, subscription.getServiceId());
  313 + toCoreProducer.send(tpi, toProto(subscription, missedUpdates), null);
  314 + }
  315 + },
  316 + e -> log.error("Failed to fetch missed updates.", e), tsCallBackExecutor);
  317 + }
  318 +
  319 + private void handleNewTelemetrySubscription(TbTimeseriesSubscription subscription) {
  320 + log.trace("[{}][{}][{}] Processing remote telemetry subscription for entity [{}]",
  321 + serviceId, subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId());
  322 +
  323 + long curTs = System.currentTimeMillis();
  324 + List<ReadTsKvQuery> queries = new ArrayList<>();
  325 + subscription.getKeyStates().forEach((key, value) -> {
  326 + if (curTs > value) {
  327 + long startTs = subscription.getStartTime() > 0 ? Math.max(subscription.getStartTime(), value + 1L) : (value + 1L);
  328 + long endTs = subscription.getEndTime() > 0 ? Math.min(subscription.getEndTime(), curTs) : curTs;
  329 + queries.add(new BaseReadTsKvQuery(key, startTs, endTs, 0, 1000, Aggregation.NONE));
  330 + }
  331 + });
  332 + if (!queries.isEmpty()) {
  333 + DonAsynchron.withCallback(tsService.findAll(subscription.getTenantId(), subscription.getEntityId(), queries),
  334 + missedUpdates -> {
  335 + if (missedUpdates != null && !missedUpdates.isEmpty()) {
  336 + TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, subscription.getServiceId());
  337 + toCoreProducer.send(tpi, toProto(subscription, missedUpdates), null);
  338 + }
  339 + },
  340 + e -> log.error("Failed to fetch missed updates.", e),
  341 + tsCallBackExecutor);
  342 + }
  343 + }
  344 +
  345 + private TbProtoQueueMsg<ToCoreMsg> toProto(TbSubscription subscription, List<TsKvEntry> updates) {
  346 + TbSubscriptionUpdateProto.Builder builder = TbSubscriptionUpdateProto.newBuilder();
  347 +
  348 + builder.setSessionId(subscription.getSessionId());
  349 + builder.setSubscriptionId(subscription.getSubscriptionId());
  350 +
  351 + Map<String, List<Object>> data = new TreeMap<>();
  352 + for (TsKvEntry tsEntry : updates) {
  353 + List<Object> values = data.computeIfAbsent(tsEntry.getKey(), k -> new ArrayList<>());
  354 + Object[] value = new Object[2];
  355 + value[0] = tsEntry.getTs();
  356 + value[1] = tsEntry.getValueAsString();
  357 + values.add(value);
  358 + }
  359 +
  360 + data.forEach((key, value) -> {
  361 + TbSubscriptionUpdateValueListProto.Builder dataBuilder = TbSubscriptionUpdateValueListProto.newBuilder();
  362 + dataBuilder.setKey(key);
  363 + value.forEach(v -> {
  364 + Object[] array = (Object[]) v;
  365 + dataBuilder.addTs((long) array[0]);
  366 + dataBuilder.addValue((String) array[1]);
  367 + });
  368 + builder.addData(dataBuilder.build());
  369 + });
  370 +
  371 + ToCoreMsg toCoreMsg = ToCoreMsg.newBuilder().setToLocalSubscriptionServiceMsg(
  372 + LocalSubscriptionServiceMsgProto.newBuilder().setSubUpdate(builder.build()).build())
  373 + .build();
  374 + return new TbProtoQueueMsg<>(subscription.getEntityId().getId(), toCoreMsg);
  375 + }
  376 +
  377 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.subscription;
  17 +
  18 +import org.thingsboard.server.queue.discovery.ClusterTopologyChangeEvent;
  19 +import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
  20 +import org.thingsboard.server.service.queue.TbMsgCallback;
  21 +import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate;
  22 +
  23 +public interface LocalSubscriptionService {
  24 +
  25 + void addSubscription(TbSubscription subscription);
  26 +
  27 + void cancelSubscription(String sessionId, int subscriptionId);
  28 +
  29 + void cancelAllSessionSubscriptions(String sessionId);
  30 +
  31 + void onSubscriptionUpdate(String sessionId, SubscriptionUpdate update, TbMsgCallback callback);
  32 +
  33 + void onApplicationEvent(PartitionChangeEvent event);
  34 +
  35 + void onApplicationEvent(ClusterTopologyChangeEvent event);
  36 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.subscription;
  17 +
  18 +import org.springframework.context.ApplicationListener;
  19 +import org.thingsboard.server.common.data.id.EntityId;
  20 +import org.thingsboard.server.common.data.id.TenantId;
  21 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  22 +import org.thingsboard.server.common.data.kv.TsKvEntry;
  23 +import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
  24 +import org.thingsboard.server.service.queue.TbMsgCallback;
  25 +
  26 +import java.util.List;
  27 +
  28 +public interface SubscriptionManagerService extends ApplicationListener<PartitionChangeEvent> {
  29 +
  30 + void addSubscription(TbSubscription subscription, TbMsgCallback callback);
  31 +
  32 + void cancelSubscription(String sessionId, int subscriptionId, TbMsgCallback callback);
  33 +
  34 + void onTimeseriesDataUpdate(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, TbMsgCallback callback);
  35 +
  36 + void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, TbMsgCallback callback);
  37 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.subscription;
  17 +
  18 +import lombok.Builder;
  19 +import lombok.Data;
  20 +import lombok.Getter;
  21 +import org.thingsboard.server.common.data.id.EntityId;
  22 +import org.thingsboard.server.common.data.id.TenantId;
  23 +
  24 +import java.util.Map;
  25 +
  26 +public class TbAttributeSubscription extends TbSubscription {
  27 +
  28 + @Getter private final boolean allKeys;
  29 + @Getter private final Map<String, Long> keyStates;
  30 + @Getter private final TbAttributeSubscriptionScope scope;
  31 +
  32 + @Builder
  33 + public TbAttributeSubscription(String serviceId, String sessionId, int subscriptionId, TenantId tenantId, EntityId entityId,
  34 + boolean allKeys, Map<String, Long> keyStates, TbAttributeSubscriptionScope scope) {
  35 + super(serviceId, sessionId, subscriptionId, tenantId, entityId, TbSubscriptionType.ATTRIBUTES);
  36 + this.allKeys = allKeys;
  37 + this.keyStates = keyStates;
  38 + this.scope = scope;
  39 + }
  40 +
  41 + @Override
  42 + public boolean equals(Object o) {
  43 + return super.equals(o);
  44 + }
  45 +
  46 + @Override
  47 + public int hashCode() {
  48 + return super.hashCode();
  49 + }
  50 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.subscription;
  17 +
  18 +public enum TbAttributeSubscriptionScope {
  19 +
  20 + CLIENT_SCOPE, SHARED_SCOPE, SERVER_SCOPE
  21 +
  22 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.subscription;
  17 +
  18 +import lombok.AllArgsConstructor;
  19 +import lombok.Data;
  20 +import org.thingsboard.server.common.data.id.EntityId;
  21 +import org.thingsboard.server.common.data.id.TenantId;
  22 +
  23 +import java.util.Objects;
  24 +
  25 +@Data
  26 +@AllArgsConstructor
  27 +public abstract class TbSubscription {
  28 +
  29 + private final String serviceId;
  30 + private final String sessionId;
  31 + private final int subscriptionId;
  32 + private final TenantId tenantId;
  33 + private final EntityId entityId;
  34 + private final TbSubscriptionType type;
  35 +
  36 + @Override
  37 + public boolean equals(Object o) {
  38 + if (this == o) return true;
  39 + if (o == null || getClass() != o.getClass()) return false;
  40 + TbSubscription that = (TbSubscription) o;
  41 + return subscriptionId == that.subscriptionId &&
  42 + sessionId.equals(that.sessionId) &&
  43 + tenantId.equals(that.tenantId) &&
  44 + entityId.equals(that.entityId) &&
  45 + type == that.type;
  46 + }
  47 +
  48 + @Override
  49 + public int hashCode() {
  50 + return Objects.hash(sessionId, subscriptionId, tenantId, entityId, type);
  51 + }
  52 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.subscription;
  17 +
  18 +public enum TbSubscriptionType {
  19 + TIMESERIES, ATTRIBUTES
  20 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.subscription;
  17 +
  18 +import org.thingsboard.server.common.data.id.EntityId;
  19 +import org.thingsboard.server.common.data.id.EntityIdFactory;
  20 +import org.thingsboard.server.common.data.id.TenantId;
  21 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  22 +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
  23 +import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
  24 +import org.thingsboard.server.common.data.kv.BooleanDataEntry;
  25 +import org.thingsboard.server.common.data.kv.DataType;
  26 +import org.thingsboard.server.common.data.kv.DoubleDataEntry;
  27 +import org.thingsboard.server.common.data.kv.JsonDataEntry;
  28 +import org.thingsboard.server.common.data.kv.KvEntry;
  29 +import org.thingsboard.server.common.data.kv.LongDataEntry;
  30 +import org.thingsboard.server.common.data.kv.StringDataEntry;
  31 +import org.thingsboard.server.common.data.kv.TsKvEntry;
  32 +import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto;
  33 +import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType;
  34 +import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionMgrMsgProto;
  35 +import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeSubscriptionProto;
  36 +import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeUpdateProto;
  37 +import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseProto;
  38 +import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionKetStateProto;
  39 +import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionProto;
  40 +import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateProto;
  41 +import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesSubscriptionProto;
  42 +import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto;
  43 +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
  44 +import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
  45 +import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode;
  46 +import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate;
  47 +
  48 +import java.util.ArrayList;
  49 +import java.util.HashMap;
  50 +import java.util.List;
  51 +import java.util.Map;
  52 +import java.util.TreeMap;
  53 +import java.util.UUID;
  54 +
  55 +public class TbSubscriptionUtils {
  56 +
  57 + public static ToCoreMsg toNewSubscriptionProto(TbSubscription subscription) {
  58 + SubscriptionMgrMsgProto.Builder msgBuilder = SubscriptionMgrMsgProto.newBuilder();
  59 + TbSubscriptionProto subscriptionProto = TbSubscriptionProto.newBuilder()
  60 + .setServiceId(subscription.getServiceId())
  61 + .setSessionId(subscription.getSessionId())
  62 + .setSubscriptionId(subscription.getSubscriptionId())
  63 + .setTenantIdMSB(subscription.getTenantId().getId().getMostSignificantBits())
  64 + .setTenantIdLSB(subscription.getTenantId().getId().getLeastSignificantBits())
  65 + .setEntityType(subscription.getEntityId().getEntityType().name())
  66 + .setEntityIdMSB(subscription.getEntityId().getId().getMostSignificantBits())
  67 + .setEntityIdLSB(subscription.getEntityId().getId().getLeastSignificantBits()).build();
  68 +
  69 + switch (subscription.getType()) {
  70 + case TIMESERIES:
  71 + TbTimeseriesSubscription tSub = (TbTimeseriesSubscription) subscription;
  72 + TbTimeSeriesSubscriptionProto.Builder tSubProto = TbTimeSeriesSubscriptionProto.newBuilder()
  73 + .setSub(subscriptionProto)
  74 + .setAllKeys(tSub.isAllKeys());
  75 + tSub.getKeyStates().forEach((key, value) -> tSubProto.addKeyStates(
  76 + TbSubscriptionKetStateProto.newBuilder().setKey(key).setTs(value).build()));
  77 + tSubProto.setStartTime(tSub.getStartTime());
  78 + tSubProto.setEndTime(tSub.getEndTime());
  79 + msgBuilder.setTelemetrySub(tSubProto.build());
  80 + break;
  81 + case ATTRIBUTES:
  82 + TbAttributeSubscription aSub = (TbAttributeSubscription) subscription;
  83 + TbAttributeSubscriptionProto.Builder aSubProto = TbAttributeSubscriptionProto.newBuilder()
  84 + .setSub(subscriptionProto)
  85 + .setAllKeys(aSub.isAllKeys())
  86 + .setScope(aSub.getScope().name());
  87 + aSub.getKeyStates().forEach((key, value) -> aSubProto.addKeyStates(
  88 + TbSubscriptionKetStateProto.newBuilder().setKey(key).setTs(value).build()));
  89 + msgBuilder.setAttributeSub(aSubProto.build());
  90 + break;
  91 + }
  92 + return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build();
  93 + }
  94 +
  95 + public static ToCoreMsg toCloseSubscriptionProto(TbSubscription subscription) {
  96 + SubscriptionMgrMsgProto.Builder msgBuilder = SubscriptionMgrMsgProto.newBuilder();
  97 + TbSubscriptionCloseProto closeProto = TbSubscriptionCloseProto.newBuilder()
  98 + .setSessionId(subscription.getSessionId())
  99 + .setSubscriptionId(subscription.getSubscriptionId()).build();
  100 + msgBuilder.setSubClose(closeProto);
  101 + return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build();
  102 + }
  103 +
  104 + public static TbSubscription fromProto(TbAttributeSubscriptionProto attributeSub) {
  105 + TbSubscriptionProto subProto = attributeSub.getSub();
  106 + TbAttributeSubscription.TbAttributeSubscriptionBuilder builder = TbAttributeSubscription.builder()
  107 + .serviceId(subProto.getServiceId())
  108 + .sessionId(subProto.getSessionId())
  109 + .subscriptionId(subProto.getSubscriptionId())
  110 + .entityId(EntityIdFactory.getByTypeAndUuid(subProto.getEntityType(), new UUID(subProto.getEntityIdMSB(), subProto.getEntityIdLSB())))
  111 + .tenantId(new TenantId(new UUID(subProto.getTenantIdMSB(), subProto.getTenantIdLSB())));
  112 +
  113 + builder.scope(TbAttributeSubscriptionScope.valueOf(attributeSub.getScope()));
  114 + builder.allKeys(attributeSub.getAllKeys());
  115 + Map<String, Long> keyStates = new HashMap<>();
  116 + attributeSub.getKeyStatesList().forEach(ksProto -> keyStates.put(ksProto.getKey(), ksProto.getTs()));
  117 + builder.keyStates(keyStates);
  118 + return builder.build();
  119 + }
  120 +
  121 + public static TbSubscription fromProto(TbTimeSeriesSubscriptionProto telemetrySub) {
  122 + TbSubscriptionProto subProto = telemetrySub.getSub();
  123 + TbTimeseriesSubscription.TbTimeseriesSubscriptionBuilder builder = TbTimeseriesSubscription.builder()
  124 + .serviceId(subProto.getServiceId())
  125 + .sessionId(subProto.getSessionId())
  126 + .subscriptionId(subProto.getSubscriptionId())
  127 + .entityId(EntityIdFactory.getByTypeAndUuid(subProto.getEntityType(), new UUID(subProto.getEntityIdMSB(), subProto.getEntityIdLSB())))
  128 + .tenantId(new TenantId(new UUID(subProto.getTenantIdMSB(), subProto.getTenantIdLSB())));
  129 +
  130 + builder.allKeys(telemetrySub.getAllKeys());
  131 + Map<String, Long> keyStates = new HashMap<>();
  132 + telemetrySub.getKeyStatesList().forEach(ksProto -> keyStates.put(ksProto.getKey(), ksProto.getTs()));
  133 + builder.startTime(telemetrySub.getStartTime());
  134 + builder.endTime(telemetrySub.getEndTime());
  135 + builder.keyStates(keyStates);
  136 + return builder.build();
  137 + }
  138 +
  139 + public static SubscriptionUpdate fromProto(TbSubscriptionUpdateProto proto) {
  140 + if (proto.getErrorCode() > 0) {
  141 + return new SubscriptionUpdate(proto.getSubscriptionId(), SubscriptionErrorCode.forCode(proto.getErrorCode()), proto.getErrorMsg());
  142 + } else {
  143 + Map<String, List<Object>> data = new TreeMap<>();
  144 + proto.getDataList().forEach(v -> {
  145 + List<Object> values = data.computeIfAbsent(v.getKey(), k -> new ArrayList<>());
  146 + for (int i = 0; i < v.getTsCount(); i++) {
  147 + Object[] value = new Object[2];
  148 + value[0] = v.getTs(i);
  149 + value[1] = v.getValue(i);
  150 + values.add(value);
  151 + }
  152 + });
  153 + return new SubscriptionUpdate(proto.getSubscriptionId(), data);
  154 + }
  155 + }
  156 +
  157 + public static ToCoreMsg toTimeseriesUpdateProto(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts) {
  158 + TbTimeSeriesUpdateProto.Builder builder = TbTimeSeriesUpdateProto.newBuilder();
  159 + builder.setEntityType(entityId.getEntityType().name());
  160 + builder.setEntityIdMSB(entityId.getId().getMostSignificantBits());
  161 + builder.setEntityIdLSB(entityId.getId().getLeastSignificantBits());
  162 + builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
  163 + builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
  164 + ts.forEach(v -> builder.addData(toKeyValueProto(v.getTs(), v).build()));
  165 + SubscriptionMgrMsgProto.Builder msgBuilder = SubscriptionMgrMsgProto.newBuilder();
  166 + msgBuilder.setTsUpdate(builder);
  167 + return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build();
  168 + }
  169 +
  170 + public static ToCoreMsg toAttributesUpdateProto(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
  171 + TbAttributeUpdateProto.Builder builder = TbAttributeUpdateProto.newBuilder();
  172 + builder.setEntityType(entityId.getEntityType().name());
  173 + builder.setEntityIdMSB(entityId.getId().getMostSignificantBits());
  174 + builder.setEntityIdLSB(entityId.getId().getLeastSignificantBits());
  175 + builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
  176 + builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
  177 + builder.setScope(scope);
  178 + attributes.forEach(v -> builder.addData(toKeyValueProto(v.getLastUpdateTs(), v).build()));
  179 +
  180 + SubscriptionMgrMsgProto.Builder msgBuilder = SubscriptionMgrMsgProto.newBuilder();
  181 + msgBuilder.setAttrUpdate(builder);
  182 + return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build();
  183 + }
  184 +
  185 + private static TsKvProto.Builder toKeyValueProto(long ts, KvEntry attr) {
  186 + KeyValueProto.Builder dataBuilder = KeyValueProto.newBuilder();
  187 + dataBuilder.setKey(attr.getKey());
  188 + dataBuilder.setType(KeyValueType.forNumber(attr.getDataType().ordinal()));
  189 + switch (attr.getDataType()) {
  190 + case BOOLEAN:
  191 + attr.getBooleanValue().ifPresent(dataBuilder::setBoolV);
  192 + break;
  193 + case LONG:
  194 + attr.getLongValue().ifPresent(dataBuilder::setLongV);
  195 + break;
  196 + case DOUBLE:
  197 + attr.getDoubleValue().ifPresent(dataBuilder::setDoubleV);
  198 + break;
  199 + case JSON:
  200 + attr.getJsonValue().ifPresent(dataBuilder::setJsonV);
  201 + break;
  202 + case STRING:
  203 + attr.getStrValue().ifPresent(dataBuilder::setStringV);
  204 + break;
  205 + }
  206 + return TsKvProto.newBuilder().setTs(ts).setKv(dataBuilder);
  207 + }
  208 +
  209 + public static EntityId toEntityId(String entityType, long entityIdMSB, long entityIdLSB) {
  210 + return EntityIdFactory.getByTypeAndUuid(entityType, new UUID(entityIdMSB, entityIdLSB));
  211 + }
  212 +
  213 + public static List<TsKvEntry> toTsKvEntityList(List<TsKvProto> dataList) {
  214 + List<TsKvEntry> result = new ArrayList<>(dataList.size());
  215 + dataList.forEach(proto -> result.add(new BasicTsKvEntry(proto.getTs(), getKvEntry(proto.getKv()))));
  216 + return result;
  217 + }
  218 +
  219 + public static List<AttributeKvEntry> toAttributeKvList(List<TsKvProto> dataList) {
  220 + List<AttributeKvEntry> result = new ArrayList<>(dataList.size());
  221 + dataList.forEach(proto -> result.add(new BaseAttributeKvEntry(getKvEntry(proto.getKv()), proto.getTs())));
  222 + return result;
  223 + }
  224 +
  225 + private static KvEntry getKvEntry(KeyValueProto proto) {
  226 + KvEntry entry = null;
  227 + DataType type = DataType.values()[proto.getType().getNumber()];
  228 + switch (type) {
  229 + case BOOLEAN:
  230 + entry = new BooleanDataEntry(proto.getKey(), proto.getBoolV());
  231 + break;
  232 + case LONG:
  233 + entry = new LongDataEntry(proto.getKey(), proto.getLongV());
  234 + break;
  235 + case DOUBLE:
  236 + entry = new DoubleDataEntry(proto.getKey(), proto.getDoubleV());
  237 + break;
  238 + case STRING:
  239 + entry = new StringDataEntry(proto.getKey(), proto.getStringV());
  240 + break;
  241 + case JSON:
  242 + entry = new JsonDataEntry(proto.getKey(), proto.getJsonV());
  243 + break;
  244 + }
  245 + return entry;
  246 + }
  247 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.subscription;
  17 +
  18 +import lombok.Builder;
  19 +import lombok.Getter;
  20 +import org.thingsboard.server.common.data.id.EntityId;
  21 +import org.thingsboard.server.common.data.id.TenantId;
  22 +
  23 +import java.util.Map;
  24 +
  25 +public class TbTimeseriesSubscription extends TbSubscription {
  26 +
  27 + @Getter private final boolean allKeys;
  28 + @Getter private final Map<String, Long> keyStates;
  29 + @Getter private final long startTime;
  30 + @Getter private final long endTime;
  31 +
  32 + @Builder
  33 + public TbTimeseriesSubscription(String serviceId, String sessionId, int subscriptionId, TenantId tenantId, EntityId entityId,
  34 + boolean allKeys, Map<String, Long> keyStates, long startTime, long endTime) {
  35 + super(serviceId, sessionId, subscriptionId, tenantId, entityId, TbSubscriptionType.TIMESERIES);
  36 + this.allKeys = allKeys;
  37 + this.keyStates = keyStates;
  38 + this.startTime = startTime;
  39 + this.endTime = endTime;
  40 + }
  41 +
  42 + @Override
  43 + public boolean equals(Object o) {
  44 + return super.equals(o);
  45 + }
  46 +
  47 + @Override
  48 + public int hashCode() {
  49 + return super.hashCode();
  50 + }
  51 +}
... ...
... ... @@ -18,71 +18,44 @@ package org.thingsboard.server.service.telemetry;
18 18 import com.google.common.util.concurrent.FutureCallback;
19 19 import com.google.common.util.concurrent.Futures;
20 20 import com.google.common.util.concurrent.ListenableFuture;
21   -import com.google.protobuf.InvalidProtocolBufferException;
22 21 import lombok.extern.slf4j.Slf4j;
23 22 import org.springframework.beans.factory.annotation.Autowired;
24   -import org.springframework.context.annotation.Lazy;
  23 +import org.springframework.context.event.EventListener;
25 24 import org.springframework.stereotype.Service;
26   -import org.springframework.util.StringUtils;
27   -import org.thingsboard.common.util.DonAsynchron;
28 25 import org.thingsboard.common.util.ThingsBoardThreadFactory;
29   -import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
30   -import org.thingsboard.server.actors.service.ActorService;
31   -import org.thingsboard.server.common.data.DataConstants;
32   -import org.thingsboard.server.common.data.EntityType;
33   -import org.thingsboard.server.common.data.EntityView;
34   -import org.thingsboard.server.common.data.id.DeviceId;
35 26 import org.thingsboard.server.common.data.id.EntityId;
36   -import org.thingsboard.server.common.data.id.EntityIdFactory;
37   -import org.thingsboard.server.common.data.id.EntityViewId;
38 27 import org.thingsboard.server.common.data.id.TenantId;
39   -import org.thingsboard.server.common.data.kv.Aggregation;
40 28 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
41 29 import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
42   -import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
43   -import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
44 30 import org.thingsboard.server.common.data.kv.BooleanDataEntry;
45   -import org.thingsboard.server.common.data.kv.DataType;
46 31 import org.thingsboard.server.common.data.kv.DoubleDataEntry;
47   -import org.thingsboard.server.common.data.kv.JsonDataEntry;
48   -import org.thingsboard.server.common.data.kv.KvEntry;
49 32 import org.thingsboard.server.common.data.kv.LongDataEntry;
50   -import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
51 33 import org.thingsboard.server.common.data.kv.StringDataEntry;
52 34 import org.thingsboard.server.common.data.kv.TsKvEntry;
53   -import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
54   -import org.thingsboard.server.common.msg.cluster.ServerAddress;
55 35 import org.thingsboard.server.dao.attributes.AttributesService;
56   -import org.thingsboard.server.dao.entityview.EntityViewService;
57 36 import org.thingsboard.server.dao.timeseries.TimeseriesService;
58   -import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
59   -import org.thingsboard.server.service.state.DefaultDeviceStateService;
60   -import org.thingsboard.server.service.state.DeviceStateService;
61   -import org.thingsboard.server.service.telemetry.sub.Subscription;
62   -import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode;
63   -import org.thingsboard.server.service.telemetry.sub.SubscriptionState;
64   -import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate;
  37 +import org.thingsboard.server.gen.transport.TransportProtos;
  38 +import org.thingsboard.server.queue.TbQueueProducer;
  39 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  40 +import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
  41 +import org.thingsboard.server.queue.discovery.PartitionService;
  42 +import org.thingsboard.server.queue.discovery.ServiceType;
  43 +import org.thingsboard.server.queue.discovery.TopicPartitionInfo;
  44 +import org.thingsboard.server.queue.provider.TbCoreQueueProvider;
  45 +import org.thingsboard.server.service.queue.TbMsgCallback;
  46 +import org.thingsboard.server.service.subscription.SubscriptionManagerService;
  47 +import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
65 48
66 49 import javax.annotation.Nullable;
67 50 import javax.annotation.PostConstruct;
68 51 import javax.annotation.PreDestroy;
69   -import java.util.ArrayList;
70 52 import java.util.Collections;
71   -import java.util.HashSet;
72   -import java.util.Iterator;
73 53 import java.util.List;
74   -import java.util.Map;
75   -import java.util.Optional;
76 54 import java.util.Set;
77   -import java.util.TreeMap;
78   -import java.util.UUID;
79 55 import java.util.concurrent.ConcurrentHashMap;
80 56 import java.util.concurrent.ExecutorService;
81 57 import java.util.concurrent.Executors;
82 58 import java.util.function.Consumer;
83   -import java.util.function.Function;
84   -import java.util.function.Predicate;
85   -import java.util.stream.Collectors;
86 59
87 60 /**
88 61 * Created by ashvayka on 27.03.18.
... ... @@ -91,8 +64,7 @@ import java.util.stream.Collectors;
91 64 @Slf4j
92 65 public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptionService {
93 66
94   - @Autowired
95   - private TelemetryWebSocketService wsService;
  67 + private final Set<TopicPartitionInfo> currentPartitions = ConcurrentHashMap.newKeySet();
96 68
97 69 @Autowired
98 70 private AttributesService attrService;
... ... @@ -101,23 +73,24 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
101 73 private TimeseriesService tsService;
102 74
103 75 @Autowired
104   - private EntityViewService entityViewService;
  76 + private TbCoreQueueProvider coreQueueProvider;
105 77
106 78 @Autowired
107   - @Lazy
108   - private DeviceStateService stateService;
  79 + private PartitionService partitionService;
109 80
110 81 @Autowired
111   - @Lazy
112   - private ActorService actorService;
113   -
  82 + private SubscriptionManagerService subscriptionManagerService;
  83 +
  84 + private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> toCoreProducer;
  85 +
114 86 private ExecutorService tsCallBackExecutor;
115 87 private ExecutorService wsCallBackExecutor;
116 88
117 89 @PostConstruct
118 90 public void initExecutor() {
119   - tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-sub-callback"));
120   - wsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ws-sub-callback"));
  91 + tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-service-ts-callback"));
  92 + wsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-service-ws-callback"));
  93 + toCoreProducer = coreQueueProvider.getTbCoreMsgProducer();
121 94 }
122 95
123 96 @PreDestroy
... ... @@ -130,65 +103,12 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
130 103 }
131 104 }
132 105
133   - private final Map<EntityId, Set<Subscription>> subscriptionsByEntityId = new ConcurrentHashMap<>();
134   - private final Map<String, Map<Integer, Subscription>> subscriptionsByWsSessionId = new ConcurrentHashMap<>();
135   -
136   - @Override
137   - public void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub) {
138   - long startTime = 0L;
139   - long endTime = 0L;
140   - if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW) && TelemetryFeature.TIMESERIES.equals(sub.getType())) {
141   - EntityView entityView = entityViewService.findEntityViewById(TenantId.SYS_TENANT_ID, new EntityViewId(entityId.getId()));
142   - entityId = entityView.getEntityId();
143   - startTime = entityView.getStartTimeMs();
144   - endTime = entityView.getEndTimeMs();
145   - sub = getUpdatedSubscriptionState(entityId, sub, entityView);
146   - }
147   - //TODO 2.5
148   - Optional<ServerAddress> server = Optional.empty();//routingService.resolveById(entityId);
149   - Subscription subscription;
150   - if (server.isPresent()) {
151   - ServerAddress address = server.get();
152   - log.trace("[{}] Forwarding subscription [{}] for [{}] entity [{}] to [{}]", sessionId, sub.getSubscriptionId(), entityId.getEntityType().name(), entityId, address);
153   - subscription = new Subscription(sub, true, address, startTime, endTime);
154   - tellNewSubscription(address, sessionId, subscription);
155   - } else {
156   - log.trace("[{}] Registering local subscription [{}] for [{}] entity [{}]", sessionId, sub.getSubscriptionId(), entityId.getEntityType().name(), entityId);
157   - subscription = new Subscription(sub, true, null, startTime, endTime);
158   - }
159   - registerSubscription(sessionId, entityId, subscription);
160   - }
161   -
162   - private SubscriptionState getUpdatedSubscriptionState(EntityId entityId, SubscriptionState sub, EntityView entityView) {
163   - Map<String, Long> keyStates;
164   - if (sub.isAllKeys()) {
165   - keyStates = entityView.getKeys().getTimeseries().stream().collect(Collectors.toMap(k -> k, k -> 0L));
166   - } else {
167   - keyStates = sub.getKeyStates().entrySet()
168   - .stream().filter(entry -> entityView.getKeys().getTimeseries().contains(entry.getKey()))
169   - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
170   - }
171   - return new SubscriptionState(sub.getWsSessionId(), sub.getSubscriptionId(), sub.getTenantId(), entityId, sub.getType(), false, keyStates, sub.getScope());
172   - }
173   -
174 106 @Override
175   - public void cleanupLocalWsSessionSubscriptions(TelemetryWebSocketSessionRef sessionRef, String sessionId) {
176   - cleanupLocalWsSessionSubscriptions(sessionId);
177   - }
178   -
179   - @Override
180   - public void removeSubscription(String sessionId, int subscriptionId) {
181   - log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId);
182   - Map<Integer, Subscription> sessionSubscriptions = subscriptionsByWsSessionId.get(sessionId);
183   - if (sessionSubscriptions != null) {
184   - Subscription subscription = sessionSubscriptions.remove(subscriptionId);
185   - if (subscription != null) {
186   - processSubscriptionRemoval(sessionId, sessionSubscriptions, subscription);
187   - } else {
188   - log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId);
189   - }
190   - } else {
191   - log.debug("[{}] No session subscriptions found!", sessionId);
  107 + @EventListener(PartitionChangeEvent.class)
  108 + public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
  109 + if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceKey().getServiceType())) {
  110 + currentPartitions.clear();
  111 + currentPartitions.addAll(partitionChangeEvent.getPartitions());
192 112 }
193 113 }
194 114
... ... @@ -201,14 +121,14 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
201 121 public void saveAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback) {
202 122 ListenableFuture<List<Void>> saveFuture = tsService.save(tenantId, entityId, ts, ttl);
203 123 addMainCallback(saveFuture, callback);
204   - addWsCallback(saveFuture, success -> onTimeseriesUpdate(entityId, ts));
  124 + addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts));
205 125 }
206 126
207 127 @Override
208 128 public void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback) {
209 129 ListenableFuture<List<Void>> saveFuture = attrService.save(tenantId, entityId, scope, attributes);
210 130 addMainCallback(saveFuture, callback);
211   - addWsCallback(saveFuture, success -> onAttributesUpdate(entityId, scope, attributes));
  131 + addWsCallback(saveFuture, success -> onAttributesUpdate(tenantId, entityId, scope, attributes));
212 132 }
213 133
214 134 @Override
... ... @@ -235,355 +155,23 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
235 155 , System.currentTimeMillis())), callback);
236 156 }
237 157
238   - @Override
239   - public void onSharedAttributesUpdate(TenantId tenantId, DeviceId deviceId, Set<AttributeKvEntry> attributes) {
240   - DeviceAttributesEventNotificationMsg notificationMsg = DeviceAttributesEventNotificationMsg.onUpdate(tenantId,
241   - deviceId, DataConstants.SHARED_SCOPE, new ArrayList<>(attributes));
242   - actorService.onMsg(new SendToClusterMsg(deviceId, notificationMsg));
243   - }
244   -
245   - @Override
246   - public void onNewRemoteSubscription(ServerAddress serverAddress, byte[] data) {
247   - ClusterAPIProtos.SubscriptionProto proto;
248   - try {
249   - proto = ClusterAPIProtos.SubscriptionProto.parseFrom(data);
250   - } catch (InvalidProtocolBufferException e) {
251   - throw new RuntimeException(e);
252   - }
253   - Map<String, Long> statesMap = proto.getKeyStatesList().stream().collect(
254   - Collectors.toMap(ClusterAPIProtos.SubscriptionKetStateProto::getKey, ClusterAPIProtos.SubscriptionKetStateProto::getTs));
255   - Subscription subscription = new Subscription(
256   - new SubscriptionState(proto.getSessionId(), proto.getSubscriptionId(),
257   - new TenantId(UUID.fromString(proto.getTenantId())),
258   - EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()),
259   - TelemetryFeature.valueOf(proto.getType()), proto.getAllKeys(), statesMap, proto.getScope()),
260   - false, new ServerAddress(serverAddress.getHost(), serverAddress.getPort(), serverAddress.getServerType()));
261   -
262   - addRemoteWsSubscription(serverAddress, proto.getSessionId(), subscription);
263   - }
264   -
265   - @Override
266   - public void onRemoteSubscriptionUpdate(ServerAddress serverAddress, byte[] data) {
267   - ClusterAPIProtos.SubscriptionUpdateProto proto;
268   - try {
269   - proto = ClusterAPIProtos.SubscriptionUpdateProto.parseFrom(data);
270   - } catch (InvalidProtocolBufferException e) {
271   - throw new RuntimeException(e);
272   - }
273   - SubscriptionUpdate update = convert(proto);
274   - String sessionId = proto.getSessionId();
275   - log.trace("[{}] Processing remote subscription onUpdate [{}]", sessionId, update);
276   - Optional<Subscription> subOpt = getSubscription(sessionId, update.getSubscriptionId());
277   - if (subOpt.isPresent()) {
278   - updateSubscriptionState(sessionId, subOpt.get(), update);
279   - wsService.sendWsMsg(sessionId, update);
280   - }
281   - }
282   -
283   - @Override
284   - public void onRemoteSubscriptionClose(ServerAddress serverAddress, byte[] data) {
285   - ClusterAPIProtos.SubscriptionCloseProto proto;
286   - try {
287   - proto = ClusterAPIProtos.SubscriptionCloseProto.parseFrom(data);
288   - } catch (InvalidProtocolBufferException e) {
289   - throw new RuntimeException(e);
290   - }
291   - removeSubscription(proto.getSessionId(), proto.getSubscriptionId());
292   - }
293   -
294   - @Override
295   - public void onRemoteSessionClose(ServerAddress serverAddress, byte[] data) {
296   - ClusterAPIProtos.SessionCloseProto proto;
297   - try {
298   - proto = ClusterAPIProtos.SessionCloseProto.parseFrom(data);
299   - } catch (InvalidProtocolBufferException e) {
300   - throw new RuntimeException(e);
301   - }
302   - cleanupRemoteWsSessionSubscriptions(proto.getSessionId());
303   - }
304   -
305   - @Override
306   - public void onRemoteAttributesUpdate(ServerAddress serverAddress, byte[] data) {
307   - ClusterAPIProtos.AttributeUpdateProto proto;
308   - try {
309   - proto = ClusterAPIProtos.AttributeUpdateProto.parseFrom(data);
310   - } catch (InvalidProtocolBufferException e) {
311   - throw new RuntimeException(e);
312   - }
313   - onAttributesUpdate(EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()), proto.getScope(),
314   - proto.getDataList().stream().map(this::toAttribute).collect(Collectors.toList()));
315   - }
316   -
317   - @Override
318   - public void onRemoteTsUpdate(ServerAddress serverAddress, byte[] data) {
319   - ClusterAPIProtos.TimeseriesUpdateProto proto;
320   - try {
321   - proto = ClusterAPIProtos.TimeseriesUpdateProto.parseFrom(data);
322   - } catch (InvalidProtocolBufferException e) {
323   - throw new RuntimeException(e);
324   - }
325   - onTimeseriesUpdate(EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()),
326   - proto.getDataList().stream().map(this::toTimeseries).collect(Collectors.toList()));
327   - }
328   -
329   - @Override
330   - public void onClusterUpdate() {
331   - log.trace("Processing cluster onUpdate msg!");
332   - Iterator<Map.Entry<EntityId, Set<Subscription>>> deviceIterator = subscriptionsByEntityId.entrySet().iterator();
333   - while (deviceIterator.hasNext()) {
334   - Map.Entry<EntityId, Set<Subscription>> e = deviceIterator.next();
335   - Set<Subscription> subscriptions = e.getValue();
336   - //TODO 2.5
337   - Optional<ServerAddress> newAddressOptional = Optional.empty();// routingService.resolveById(e.getKey());
338   - if (newAddressOptional.isPresent()) {
339   - newAddressOptional.ifPresent(serverAddress -> checkSubscriptionsNewAddress(serverAddress, subscriptions));
340   - } else {
341   - checkSubscriptionsPrevAddress(subscriptions);
342   - }
343   - if (subscriptions.size() == 0) {
344   - log.trace("[{}] No more subscriptions for this device on current server.", e.getKey());
345   - deviceIterator.remove();
346   - }
347   - }
348   - }
349   -
350   - private void checkSubscriptionsNewAddress(ServerAddress newAddress, Set<Subscription> subscriptions) {
351   - Iterator<Subscription> subscriptionIterator = subscriptions.iterator();
352   - while (subscriptionIterator.hasNext()) {
353   - Subscription s = subscriptionIterator.next();
354   - if (s.isLocal()) {
355   - if (!newAddress.equals(s.getServer())) {
356   - log.trace("[{}] Local subscription is now handled on new server [{}]", s.getWsSessionId(), newAddress);
357   - s.setServer(newAddress);
358   - tellNewSubscription(newAddress, s.getWsSessionId(), s);
359   - }
360   - } else {
361   - log.trace("[{}] Remote subscription is now handled on new server address: [{}]", s.getWsSessionId(), newAddress);
362   - subscriptionIterator.remove();
363   - //TODO: onUpdate state of subscription by WsSessionId and other maps.
364   - }
365   - }
366   - }
367   -
368   - private void checkSubscriptionsPrevAddress(Set<Subscription> subscriptions) {
369   - for (Subscription s : subscriptions) {
370   - if (s.isLocal() && s.getServer() != null) {
371   - log.trace("[{}] Local subscription is no longer handled on remote server address [{}]", s.getWsSessionId(), s.getServer());
372   - s.setServer(null);
373   - } else {
374   - log.trace("[{}] Remote subscription is on up to date server address.", s.getWsSessionId());
375   - }
376   - }
377   - }
378   -
379   - private void addRemoteWsSubscription(ServerAddress address, String sessionId, Subscription subscription) {
380   - EntityId entityId = subscription.getEntityId();
381   - log.trace("[{}] Registering remote subscription [{}] for entity [{}] to [{}]", sessionId, subscription.getSubscriptionId(), entityId, address);
382   - registerSubscription(sessionId, entityId, subscription);
383   - if (subscription.getType() == TelemetryFeature.ATTRIBUTES) {
384   - final Map<String, Long> keyStates = subscription.getKeyStates();
385   - DonAsynchron.withCallback(attrService.find(subscription.getSub().getTenantId(), entityId, DataConstants.CLIENT_SCOPE, keyStates.keySet()), values -> {
386   - List<TsKvEntry> missedUpdates = new ArrayList<>();
387   - values.forEach(latestEntry -> {
388   - if (latestEntry.getLastUpdateTs() > keyStates.get(latestEntry.getKey())) {
389   - missedUpdates.add(new BasicTsKvEntry(latestEntry.getLastUpdateTs(), latestEntry));
390   - }
391   - });
392   - if (!missedUpdates.isEmpty()) {
393   - tellRemoteSubUpdate(address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
394   - }
395   - },
396   - e -> log.error("Failed to fetch missed updates.", e), tsCallBackExecutor);
397   - } else if (subscription.getType() == TelemetryFeature.TIMESERIES) {
398   - long curTs = System.currentTimeMillis();
399   - List<ReadTsKvQuery> queries = new ArrayList<>();
400   - subscription.getKeyStates().entrySet().forEach(e -> {
401   - if (curTs > e.getValue()) {
402   - queries.add(new BaseReadTsKvQuery(e.getKey(), e.getValue() + 1L, curTs, 0, 1000, Aggregation.NONE));
403   - } else {
404   - log.debug("[{}] Invalid subscription [{}], entityId [{}] curTs [{}]", sessionId, subscription, entityId, curTs);
405   - }
406   - });
407   - if (!queries.isEmpty()) {
408   - DonAsynchron.withCallback(tsService.findAll(subscription.getSub().getTenantId(), entityId, queries),
409   - missedUpdates -> {
410   - if (missedUpdates != null && !missedUpdates.isEmpty()) {
411   - tellRemoteSubUpdate(address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
412   - }
413   - },
414   - e -> log.error("Failed to fetch missed updates.", e),
415   - tsCallBackExecutor);
416   - }
417   - }
418   - }
419   -
420   - private void onAttributesUpdate(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
421   - //TODO 2.5
422   - Optional<ServerAddress> serverAddress = Optional.empty();//routingService.resolveById(entityId);
423   - if (!serverAddress.isPresent()) {
424   - onLocalAttributesUpdate(entityId, scope, attributes);
425   - if (entityId.getEntityType() == EntityType.DEVICE && DataConstants.SERVER_SCOPE.equalsIgnoreCase(scope)) {
426   - for (AttributeKvEntry attribute : attributes) {
427   - if (attribute.getKey().equals(DefaultDeviceStateService.INACTIVITY_TIMEOUT)) {
428   - stateService.onDeviceInactivityTimeoutUpdate(new DeviceId(entityId.getId()), attribute.getLongValue().orElse(0L));
429   - }
430   - }
431   - }
  158 + private void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
  159 + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
  160 + if (currentPartitions.contains(tpi)) {
  161 + subscriptionManagerService.onAttributesUpdate(tenantId, entityId, scope, attributes, TbMsgCallback.EMPTY);
432 162 } else {
433   - tellRemoteAttributesUpdate(serverAddress.get(), entityId, scope, attributes);
  163 + TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toAttributesUpdateProto(tenantId, entityId, scope, attributes);
  164 + toCoreProducer.send(tpi, new TbProtoQueueMsg<>(entityId.getId(), toCoreMsg), null);
434 165 }
435 166 }
436 167
437   - private void onTimeseriesUpdate(EntityId entityId, List<TsKvEntry> ts) {
438   - //TODO 2.5
439   - Optional<ServerAddress> serverAddress = Optional.empty();//routingService.resolveById(entityId);
440   - if (!serverAddress.isPresent()) {
441   - onLocalTimeseriesUpdate(entityId, ts);
  168 + private void onTimeSeriesUpdate(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts) {
  169 + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
  170 + if (currentPartitions.contains(tpi)) {
  171 + subscriptionManagerService.onTimeseriesDataUpdate(tenantId, entityId, ts, TbMsgCallback.EMPTY);
442 172 } else {
443   - tellRemoteTimeseriesUpdate(serverAddress.get(), entityId, ts);
444   - }
445   - }
446   -
447   - private void onLocalAttributesUpdate(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
448   - onLocalSubUpdate(entityId, s -> TelemetryFeature.ATTRIBUTES == s.getType() && (StringUtils.isEmpty(s.getScope()) || scope.equals(s.getScope())), s -> {
449   - List<TsKvEntry> subscriptionUpdate = null;
450   - for (AttributeKvEntry kv : attributes) {
451   - if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) {
452   - if (subscriptionUpdate == null) {
453   - subscriptionUpdate = new ArrayList<>();
454   - }
455   - subscriptionUpdate.add(new BasicTsKvEntry(kv.getLastUpdateTs(), kv));
456   - }
457   - }
458   - return subscriptionUpdate;
459   - });
460   - }
461   -
462   - private void onLocalTimeseriesUpdate(EntityId entityId, List<TsKvEntry> ts) {
463   - onLocalSubUpdate(entityId, s -> TelemetryFeature.TIMESERIES == s.getType(), s -> {
464   - List<TsKvEntry> subscriptionUpdate = null;
465   - for (TsKvEntry kv : ts) {
466   - if (isInTimeRange(s, kv.getTs()) && (s.isAllKeys() || s.getKeyStates().containsKey((kv.getKey())))) {
467   - if (subscriptionUpdate == null) {
468   - subscriptionUpdate = new ArrayList<>();
469   - }
470   - subscriptionUpdate.add(kv);
471   - }
472   - }
473   - return subscriptionUpdate;
474   - });
475   - }
476   -
477   - private boolean isInTimeRange(Subscription subscription, long kvTime) {
478   - return (subscription.getStartTime() == 0 || subscription.getStartTime() <= kvTime)
479   - && (subscription.getEndTime() == 0 || subscription.getEndTime() >= kvTime);
480   - }
481   -
482   - private void onLocalSubUpdate(EntityId entityId, Predicate<Subscription> filter, Function<Subscription, List<TsKvEntry>> f) {
483   - Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.get(entityId);
484   - if (deviceSubscriptions != null) {
485   - deviceSubscriptions.stream().filter(filter).forEach(s -> {
486   - String sessionId = s.getWsSessionId();
487   - List<TsKvEntry> subscriptionUpdate = f.apply(s);
488   - if (subscriptionUpdate != null && !subscriptionUpdate.isEmpty()) {
489   - SubscriptionUpdate update = new SubscriptionUpdate(s.getSubscriptionId(), subscriptionUpdate);
490   - if (s.isLocal()) {
491   - updateSubscriptionState(sessionId, s, update);
492   - wsService.sendWsMsg(sessionId, update);
493   - } else {
494   - tellRemoteSubUpdate(s.getServer(), sessionId, update);
495   - }
496   - }
497   - });
498   - } else {
499   - log.debug("[{}] No device subscriptions to process!", entityId);
500   - }
501   - }
502   -
503   - private void updateSubscriptionState(String sessionId, Subscription subState, SubscriptionUpdate update) {
504   - log.trace("[{}] updating subscription state {} using onUpdate {}", sessionId, subState, update);
505   - update.getLatestValues().entrySet().forEach(e -> subState.setKeyState(e.getKey(), e.getValue()));
506   - }
507   -
508   - private void registerSubscription(String sessionId, EntityId entityId, Subscription subscription) {
509   - Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.computeIfAbsent(entityId, k -> ConcurrentHashMap.newKeySet());
510   - deviceSubscriptions.add(subscription);
511   - Map<Integer, Subscription> sessionSubscriptions = subscriptionsByWsSessionId.computeIfAbsent(sessionId, k -> new ConcurrentHashMap<>());
512   - sessionSubscriptions.put(subscription.getSubscriptionId(), subscription);
513   - }
514   -
515   - private void cleanupLocalWsSessionSubscriptions(String sessionId) {
516   - cleanupWsSessionSubscriptions(sessionId, true);
517   - }
518   -
519   - private void cleanupRemoteWsSessionSubscriptions(String sessionId) {
520   - cleanupWsSessionSubscriptions(sessionId, false);
521   - }
522   -
523   - private void cleanupWsSessionSubscriptions(String sessionId, boolean localSession) {
524   - log.debug("[{}] Removing all subscriptions for particular session.", sessionId);
525   - Map<Integer, Subscription> sessionSubscriptions = subscriptionsByWsSessionId.get(sessionId);
526   - if (sessionSubscriptions != null) {
527   - int sessionSubscriptionSize = sessionSubscriptions.size();
528   -
529   - for (Subscription subscription : sessionSubscriptions.values()) {
530   - EntityId entityId = subscription.getEntityId();
531   - Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.get(entityId);
532   - deviceSubscriptions.remove(subscription);
533   - if (deviceSubscriptions.isEmpty()) {
534   - subscriptionsByEntityId.remove(entityId);
535   - }
536   - }
537   - subscriptionsByWsSessionId.remove(sessionId);
538   - log.debug("[{}] Removed {} subscriptions for particular session.", sessionId, sessionSubscriptionSize);
539   -
540   - if (localSession) {
541   - notifyWsSubscriptionClosed(sessionId, sessionSubscriptions);
542   - }
543   - } else {
544   - log.debug("[{}] No subscriptions found!", sessionId);
545   - }
546   - }
547   -
548   - private void notifyWsSubscriptionClosed(String sessionId, Map<Integer, Subscription> sessionSubscriptions) {
549   - Set<ServerAddress> affectedServers = new HashSet<>();
550   - for (Subscription subscription : sessionSubscriptions.values()) {
551   - if (subscription.getServer() != null) {
552   - affectedServers.add(subscription.getServer());
553   - }
554   - }
555   - for (ServerAddress address : affectedServers) {
556   - log.debug("[{}] Going to onSubscriptionUpdate [{}] server about session close event", sessionId, address);
557   - tellRemoteSessionClose(address, sessionId);
558   - }
559   - }
560   -
561   - private void processSubscriptionRemoval(String sessionId, Map<Integer, Subscription> sessionSubscriptions, Subscription subscription) {
562   - EntityId entityId = subscription.getEntityId();
563   - if (subscription.isLocal() && subscription.getServer() != null) {
564   - tellRemoteSubClose(subscription.getServer(), sessionId, subscription.getSubscriptionId());
565   - }
566   - if (sessionSubscriptions.isEmpty()) {
567   - log.debug("[{}] Removed last subscription for particular session.", sessionId);
568   - subscriptionsByWsSessionId.remove(sessionId);
569   - } else {
570   - log.debug("[{}] Removed session subscription.", sessionId);
571   - }
572   - Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.get(entityId);
573   - if (deviceSubscriptions != null) {
574   - boolean result = deviceSubscriptions.remove(subscription);
575   - if (result) {
576   - if (deviceSubscriptions.size() == 0) {
577   - log.debug("[{}] Removed last subscription for particular device.", sessionId);
578   - subscriptionsByEntityId.remove(entityId);
579   - } else {
580   - log.debug("[{}] Removed device subscription.", sessionId);
581   - }
582   - } else {
583   - log.debug("[{}] Subscription not found!", sessionId);
584   - }
585   - } else {
586   - log.debug("[{}] No device subscriptions found!", sessionId);
  173 + TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toTimeseriesUpdateProto(tenantId, entityId, ts);
  174 + toCoreProducer.send(tpi, new TbProtoQueueMsg<>(entityId.getId(), toCoreMsg), null);
587 175 }
588 176 }
589 177
... ... @@ -613,167 +201,4 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
613 201 }
614 202 }, wsCallBackExecutor);
615 203 }
616   -
617   - private void tellNewSubscription(ServerAddress address, String sessionId, Subscription sub) {
618   - ClusterAPIProtos.SubscriptionProto.Builder builder = ClusterAPIProtos.SubscriptionProto.newBuilder();
619   - builder.setSessionId(sessionId);
620   - builder.setSubscriptionId(sub.getSubscriptionId());
621   - builder.setTenantId(sub.getSub().getTenantId().getId().toString());
622   - builder.setEntityType(sub.getEntityId().getEntityType().name());
623   - builder.setEntityId(sub.getEntityId().getId().toString());
624   - builder.setType(sub.getType().name());
625   - builder.setAllKeys(sub.isAllKeys());
626   - if (sub.getScope() != null) {
627   - builder.setScope(sub.getScope());
628   - }
629   - sub.getKeyStates().entrySet().forEach(e -> builder.addKeyStates(
630   - ClusterAPIProtos.SubscriptionKetStateProto.newBuilder().setKey(e.getKey()).setTs(e.getValue()).build()));
631   - //TODO 2.5
632   -// rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_SUBSCRIPTION_CREATE_MESSAGE, builder.build().toByteArray());
633   - }
634   -
635   - private void tellRemoteSubUpdate(ServerAddress address, String sessionId, SubscriptionUpdate update) {
636   - ClusterAPIProtos.SubscriptionUpdateProto.Builder builder = ClusterAPIProtos.SubscriptionUpdateProto.newBuilder();
637   - builder.setSessionId(sessionId);
638   - builder.setSubscriptionId(update.getSubscriptionId());
639   - builder.setErrorCode(update.getErrorCode());
640   - if (update.getErrorMsg() != null) {
641   - builder.setErrorMsg(update.getErrorMsg());
642   - }
643   - update.getData().entrySet().forEach(
644   - e -> {
645   - ClusterAPIProtos.SubscriptionUpdateValueListProto.Builder dataBuilder = ClusterAPIProtos.SubscriptionUpdateValueListProto.newBuilder();
646   -
647   - dataBuilder.setKey(e.getKey());
648   - e.getValue().forEach(v -> {
649   - Object[] array = (Object[]) v;
650   - dataBuilder.addTs((long) array[0]);
651   - dataBuilder.addValue((String) array[1]);
652   - });
653   -
654   - builder.addData(dataBuilder.build());
655   - }
656   - );
657   - //TODO 2.5
658   -// rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_SUBSCRIPTION_UPDATE_MESSAGE, builder.build().toByteArray());
659   - }
660   -
661   - private void tellRemoteAttributesUpdate(ServerAddress address, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
662   - ClusterAPIProtos.AttributeUpdateProto.Builder builder = ClusterAPIProtos.AttributeUpdateProto.newBuilder();
663   - builder.setEntityId(entityId.getId().toString());
664   - builder.setEntityType(entityId.getEntityType().name());
665   - builder.setScope(scope);
666   - attributes.forEach(v -> builder.addData(toKeyValueProto(v.getLastUpdateTs(), v).build()));
667   - //TODO 2.5
668   -// rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_ATTR_UPDATE_MESSAGE, builder.build().toByteArray());
669   - }
670   -
671   - private void tellRemoteTimeseriesUpdate(ServerAddress address, EntityId entityId, List<TsKvEntry> ts) {
672   - ClusterAPIProtos.TimeseriesUpdateProto.Builder builder = ClusterAPIProtos.TimeseriesUpdateProto.newBuilder();
673   - builder.setEntityId(entityId.getId().toString());
674   - builder.setEntityType(entityId.getEntityType().name());
675   - ts.forEach(v -> builder.addData(toKeyValueProto(v.getTs(), v).build()));
676   - //TODO 2.5
677   -// rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_TS_UPDATE_MESSAGE, builder.build().toByteArray());
678   - }
679   -
680   - private void tellRemoteSessionClose(ServerAddress address, String sessionId) {
681   - ClusterAPIProtos.SessionCloseProto proto = ClusterAPIProtos.SessionCloseProto.newBuilder().setSessionId(sessionId).build();
682   - //TODO 2.5
683   -// rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_SESSION_CLOSE_MESSAGE, proto.toByteArray());
684   - }
685   -
686   - private void tellRemoteSubClose(ServerAddress address, String sessionId, int subscriptionId) {
687   - ClusterAPIProtos.SubscriptionCloseProto proto = ClusterAPIProtos.SubscriptionCloseProto.newBuilder().setSessionId(sessionId).setSubscriptionId(subscriptionId).build();
688   - //TODO 2.5
689   -// rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_SUBSCRIPTION_CLOSE_MESSAGE, proto.toByteArray());
690   - }
691   -
692   - private ClusterAPIProtos.KeyValueProto.Builder toKeyValueProto(long ts, KvEntry attr) {
693   - ClusterAPIProtos.KeyValueProto.Builder dataBuilder = ClusterAPIProtos.KeyValueProto.newBuilder();
694   - dataBuilder.setKey(attr.getKey());
695   - dataBuilder.setTs(ts);
696   - dataBuilder.setValueType(attr.getDataType().ordinal());
697   - switch (attr.getDataType()) {
698   - case BOOLEAN:
699   - Optional<Boolean> booleanValue = attr.getBooleanValue();
700   - booleanValue.ifPresent(dataBuilder::setBoolValue);
701   - break;
702   - case LONG:
703   - Optional<Long> longValue = attr.getLongValue();
704   - longValue.ifPresent(dataBuilder::setLongValue);
705   - break;
706   - case DOUBLE:
707   - Optional<Double> doubleValue = attr.getDoubleValue();
708   - doubleValue.ifPresent(dataBuilder::setDoubleValue);
709   - break;
710   - case JSON:
711   - Optional<String> jsonValue = attr.getJsonValue();
712   - jsonValue.ifPresent(dataBuilder::setJsonValue);
713   - break;
714   - case STRING:
715   - Optional<String> stringValue = attr.getStrValue();
716   - stringValue.ifPresent(dataBuilder::setStrValue);
717   - break;
718   - }
719   - return dataBuilder;
720   - }
721   -
722   - private AttributeKvEntry toAttribute(ClusterAPIProtos.KeyValueProto proto) {
723   - return new BaseAttributeKvEntry(getKvEntry(proto), proto.getTs());
724   - }
725   -
726   - private TsKvEntry toTimeseries(ClusterAPIProtos.KeyValueProto proto) {
727   - return new BasicTsKvEntry(proto.getTs(), getKvEntry(proto));
728   - }
729   -
730   - private KvEntry getKvEntry(ClusterAPIProtos.KeyValueProto proto) {
731   - KvEntry entry = null;
732   - DataType type = DataType.values()[proto.getValueType()];
733   - switch (type) {
734   - case BOOLEAN:
735   - entry = new BooleanDataEntry(proto.getKey(), proto.getBoolValue());
736   - break;
737   - case LONG:
738   - entry = new LongDataEntry(proto.getKey(), proto.getLongValue());
739   - break;
740   - case DOUBLE:
741   - entry = new DoubleDataEntry(proto.getKey(), proto.getDoubleValue());
742   - break;
743   - case STRING:
744   - entry = new StringDataEntry(proto.getKey(), proto.getStrValue());
745   - break;
746   - case JSON:
747   - entry = new JsonDataEntry(proto.getKey(), proto.getJsonValue());
748   - break;
749   - }
750   - return entry;
751   - }
752   -
753   - private SubscriptionUpdate convert(ClusterAPIProtos.SubscriptionUpdateProto proto) {
754   - if (proto.getErrorCode() > 0) {
755   - return new SubscriptionUpdate(proto.getSubscriptionId(), SubscriptionErrorCode.forCode(proto.getErrorCode()), proto.getErrorMsg());
756   - } else {
757   - Map<String, List<Object>> data = new TreeMap<>();
758   - proto.getDataList().forEach(v -> {
759   - List<Object> values = data.computeIfAbsent(v.getKey(), k -> new ArrayList<>());
760   - for (int i = 0; i < v.getTsCount(); i++) {
761   - Object[] value = new Object[2];
762   - value[0] = v.getTs(i);
763   - value[1] = v.getValue(i);
764   - values.add(value);
765   - }
766   - });
767   - return new SubscriptionUpdate(proto.getSubscriptionId(), data);
768   - }
769   - }
770   -
771   - private Optional<Subscription> getSubscription(String sessionId, int subscriptionId) {
772   - Subscription state = null;
773   - Map<Integer, Subscription> subMap = subscriptionsByWsSessionId.get(sessionId);
774   - if (subMap != null) {
775   - state = subMap.get(subscriptionId);
776   - }
777   - return Optional.ofNullable(state);
778   - }
779 204 }
... ...
... ... @@ -42,24 +42,25 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
42 42 import org.thingsboard.server.dao.attributes.AttributesService;
43 43 import org.thingsboard.server.dao.timeseries.TimeseriesService;
44 44 import org.thingsboard.server.dao.util.TenantRateLimitException;
  45 +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
45 46 import org.thingsboard.server.service.security.AccessValidator;
46 47 import org.thingsboard.server.service.security.ValidationCallback;
47 48 import org.thingsboard.server.service.security.ValidationResult;
48 49 import org.thingsboard.server.service.security.ValidationResultCode;
49 50 import org.thingsboard.server.service.security.model.UserPrincipal;
50 51 import org.thingsboard.server.service.security.permission.Operation;
  52 +import org.thingsboard.server.service.subscription.LocalSubscriptionService;
  53 +import org.thingsboard.server.service.subscription.TbAttributeSubscriptionScope;
  54 +import org.thingsboard.server.service.subscription.TbAttributeSubscription;
  55 +import org.thingsboard.server.service.subscription.TbTimeseriesSubscription;
51 56 import org.thingsboard.server.service.telemetry.cmd.AttributesSubscriptionCmd;
52 57 import org.thingsboard.server.service.telemetry.cmd.GetHistoryCmd;
53 58 import org.thingsboard.server.service.telemetry.cmd.SubscriptionCmd;
54 59 import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmd;
55 60 import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper;
56 61 import org.thingsboard.server.service.telemetry.cmd.TimeseriesSubscriptionCmd;
57   -import org.thingsboard.server.service.telemetry.exception.AccessDeniedException;
58   -import org.thingsboard.server.service.telemetry.exception.EntityNotFoundException;
59   -import org.thingsboard.server.service.telemetry.exception.InternalErrorException;
60 62 import org.thingsboard.server.service.telemetry.exception.UnauthorizedException;
61 63 import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode;
62   -import org.thingsboard.server.service.telemetry.sub.SubscriptionState;
63 64 import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate;
64 65
65 66 import javax.annotation.Nullable;
... ... @@ -70,7 +71,6 @@ import java.util.ArrayList;
70 71 import java.util.Collections;
71 72 import java.util.HashMap;
72 73 import java.util.HashSet;
73   -import java.util.Iterator;
74 74 import java.util.List;
75 75 import java.util.Map;
76 76 import java.util.Optional;
... ... @@ -98,7 +98,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
98 98 private final ConcurrentMap<String, WsSessionMetaData> wsSessionsMap = new ConcurrentHashMap<>();
99 99
100 100 @Autowired
101   - private TelemetrySubscriptionService subscriptionManager;
  101 + private LocalSubscriptionService subService;
102 102
103 103 @Autowired
104 104 private TelemetryWebSocketMsgEndpoint msgEndpoint;
... ... @@ -112,6 +112,9 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
112 112 @Autowired
113 113 private TimeseriesService tsService;
114 114
  115 + @Autowired
  116 + private TbServiceInfoProvider serviceInfoProvider;
  117 +
115 118 @Value("${server.ws.limits.max_subscriptions_per_tenant:0}")
116 119 private int maxSubscriptionsPerTenant;
117 120 @Value("${server.ws.limits.max_subscriptions_per_customer:0}")
... ... @@ -127,9 +130,11 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
127 130 private ConcurrentMap<UserId, Set<String>> publicUserSubscriptionsMap = new ConcurrentHashMap<>();
128 131
129 132 private ExecutorService executor;
  133 + private String serviceId;
130 134
131 135 @PostConstruct
132 136 public void initExecutor() {
  137 + serviceId = serviceInfoProvider.getServiceId();
133 138 executor = Executors.newWorkStealingPool(50);
134 139 }
135 140
... ... @@ -153,7 +158,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
153 158 break;
154 159 case CLOSED:
155 160 wsSessionsMap.remove(sessionId);
156   - subscriptionManager.cleanupLocalWsSessionSubscriptions(sessionRef, sessionId);
  161 + subService.cancelAllSessionSubscriptions(sessionId);
157 162 processSessionClose(sessionRef);
158 163 break;
159 164 }
... ... @@ -334,8 +339,16 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
334 339 keys.forEach(key -> subState.put(key, 0L));
335 340 attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
336 341
337   - SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), sessionRef.getSecurityCtx().getTenantId(), entityId, TelemetryFeature.ATTRIBUTES, false, subState, cmd.getScope());
338   - subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
  342 + TbAttributeSubscription sub = TbAttributeSubscription.builder()
  343 + .serviceId(serviceId)
  344 + .sessionId(sessionId)
  345 + .subscriptionId(cmd.getCmdId())
  346 + .tenantId(sessionRef.getSecurityCtx().getTenantId())
  347 + .entityId(entityId)
  348 + .allKeys(false)
  349 + .keyStates(subState)
  350 + .scope(TbAttributeSubscriptionScope.valueOf(cmd.getScope())).build();
  351 + subService.addSubscription(sub);
339 352 }
340 353
341 354 @Override
... ... @@ -421,8 +434,16 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
421 434 Map<String, Long> subState = new HashMap<>(attributesData.size());
422 435 attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
423 436
424   - SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), sessionRef.getSecurityCtx().getTenantId(), entityId, TelemetryFeature.ATTRIBUTES, true, subState, cmd.getScope());
425   - subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
  437 + TbAttributeSubscription sub = TbAttributeSubscription.builder()
  438 + .serviceId(serviceId)
  439 + .sessionId(sessionId)
  440 + .subscriptionId(cmd.getCmdId())
  441 + .tenantId(sessionRef.getSecurityCtx().getTenantId())
  442 + .entityId(entityId)
  443 + .allKeys(true)
  444 + .keyStates(subState)
  445 + .scope(TbAttributeSubscriptionScope.valueOf(cmd.getScope())).build();
  446 + subService.addSubscription(sub);
426 447 }
427 448
428 449 @Override
... ... @@ -494,8 +515,16 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
494 515 sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
495 516 Map<String, Long> subState = new HashMap<>(data.size());
496 517 data.forEach(v -> subState.put(v.getKey(), v.getTs()));
497   - SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), sessionRef.getSecurityCtx().getTenantId(), entityId, TelemetryFeature.TIMESERIES, true, subState, cmd.getScope());
498   - subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
  518 +
  519 + TbTimeseriesSubscription sub = TbTimeseriesSubscription.builder()
  520 + .serviceId(serviceId)
  521 + .sessionId(sessionId)
  522 + .subscriptionId(cmd.getCmdId())
  523 + .tenantId(sessionRef.getSecurityCtx().getTenantId())
  524 + .entityId(entityId)
  525 + .allKeys(true)
  526 + .keyStates(subState).build();
  527 + subService.addSubscription(sub);
499 528 }
500 529
501 530 @Override
... ... @@ -520,12 +549,19 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
520 549 @Override
521 550 public void onSuccess(List<TsKvEntry> data) {
522 551 sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
523   -
524 552 Map<String, Long> subState = new HashMap<>(keys.size());
525 553 keys.forEach(key -> subState.put(key, startTs));
526 554 data.forEach(v -> subState.put(v.getKey(), v.getTs()));
527   - SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), sessionRef.getSecurityCtx().getTenantId(), entityId, TelemetryFeature.TIMESERIES, false, subState, cmd.getScope());
528   - subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
  555 +
  556 + TbTimeseriesSubscription sub = TbTimeseriesSubscription.builder()
  557 + .serviceId(serviceId)
  558 + .sessionId(sessionId)
  559 + .subscriptionId(cmd.getCmdId())
  560 + .tenantId(sessionRef.getSecurityCtx().getTenantId())
  561 + .entityId(entityId)
  562 + .allKeys(false)
  563 + .keyStates(subState).build();
  564 + subService.addSubscription(sub);
529 565 }
530 566
531 567 @Override
... ... @@ -544,9 +580,9 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
544 580
545 581 private void unsubscribe(TelemetryWebSocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) {
546 582 if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) {
547   - subscriptionManager.cleanupLocalWsSessionSubscriptions(sessionRef, sessionId);
  583 + subService.cancelAllSessionSubscriptions(sessionId);
548 584 } else {
549   - subscriptionManager.removeSubscription(sessionId, cmd.getCmdId());
  585 + subService.cancelSubscription(sessionId, cmd.getCmdId());
550 586 }
551 587 }
552 588
... ...
... ... @@ -15,34 +15,17 @@
15 15 */
16 16 package org.thingsboard.server.service.telemetry;
17 17
  18 +import org.springframework.context.ApplicationListener;
18 19 import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
19 20 import org.thingsboard.server.common.data.id.EntityId;
20 21 import org.thingsboard.server.common.data.id.TenantId;
21 22 import org.thingsboard.server.common.msg.cluster.ServerAddress;
  23 +import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
22 24 import org.thingsboard.server.service.telemetry.sub.SubscriptionState;
23 25
24 26 /**
25 27 * Created by ashvayka on 27.03.18.
26 28 */
27   -public interface TelemetrySubscriptionService extends RuleEngineTelemetryService {
  29 +public interface TelemetrySubscriptionService extends RuleEngineTelemetryService, ApplicationListener<PartitionChangeEvent> {
28 30
29   - void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub);
30   -
31   - void cleanupLocalWsSessionSubscriptions(TelemetryWebSocketSessionRef sessionRef, String sessionId);
32   -
33   - void removeSubscription(String sessionId, int cmdId);
34   -
35   - void onNewRemoteSubscription(ServerAddress serverAddress, byte[] data);
36   -
37   - void onRemoteSubscriptionUpdate(ServerAddress serverAddress, byte[] bytes);
38   -
39   - void onRemoteSubscriptionClose(ServerAddress serverAddress, byte[] bytes);
40   -
41   - void onRemoteSessionClose(ServerAddress serverAddress, byte[] bytes);
42   -
43   - void onRemoteAttributesUpdate(ServerAddress serverAddress, byte[] bytes);
44   -
45   - void onRemoteTsUpdate(ServerAddress serverAddress, byte[] bytes);
46   -
47   - void onClusterUpdate();
48 31 }
... ...
... ... @@ -75,7 +75,7 @@ public class SubscriptionUpdate {
75 75 if (data == null) {
76 76 return Collections.emptyMap();
77 77 } else {
78   - return data.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> {
  78 + return data.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> {
79 79 List<Object> data = e.getValue();
80 80 Object[] latest = (Object[]) data.get(data.size() - 1);
81 81 return (long) latest[0];
... ...
... ... @@ -64,69 +64,7 @@ enum MessageType {
64 64 }
65 65
66 66 // Messages related to CLUSTER_TELEMETRY_MESSAGE
67   -message SubscriptionProto {
68   - string sessionId = 1;
69   - int32 subscriptionId = 2;
70   - string entityType = 3;
71   - string tenantId = 4;
72   - string entityId = 5;
73   - string type = 6;
74   - bool allKeys = 7;
75   - repeated SubscriptionKetStateProto keyStates = 8;
76   - string scope = 9;
77   -}
78   -
79   -message SubscriptionUpdateProto {
80   - string sessionId = 1;
81   - int32 subscriptionId = 2;
82   - int32 errorCode = 3;
83   - string errorMsg = 4;
84   - repeated SubscriptionUpdateValueListProto data = 5;
85   -}
86   -
87   -message AttributeUpdateProto {
88   - string entityType = 1;
89   - string entityId = 2;
90   - string scope = 3;
91   - repeated KeyValueProto data = 4;
92   -}
93   -
94   -message TimeseriesUpdateProto {
95   - string entityType = 1;
96   - string entityId = 2;
97   - repeated KeyValueProto data = 4;
98   -}
99   -
100   -message SessionCloseProto {
101   - string sessionId = 1;
102   -}
103   -
104   -message SubscriptionCloseProto {
105   - string sessionId = 1;
106   - int32 subscriptionId = 2;
107   -}
108   -
109   -message SubscriptionKetStateProto {
110   - string key = 1;
111   - int64 ts = 2;
112   -}
113   -
114   -message SubscriptionUpdateValueListProto {
115   - string key = 1;
116   - repeated int64 ts = 2;
117   - repeated string value = 3;
118   -}
119 67
120   -message KeyValueProto {
121   - string key = 1;
122   - int64 ts = 2;
123   - int32 valueType = 3;
124   - string strValue = 4;
125   - int64 longValue = 5;
126   - double doubleValue = 6;
127   - bool boolValue = 7;
128   - string jsonValue = 8;
129   -}
130 68
131 69 message FromDeviceRPCResponseProto {
132 70 int64 requestIdMSB = 1;
... ...
... ... @@ -412,7 +412,7 @@ audit-log:
412 412 state:
413 413 defaultInactivityTimeoutInSec: "${DEFAULT_INACTIVITY_TIMEOUT:10}"
414 414 defaultStateCheckIntervalInSec: "${DEFAULT_STATE_CHECK_INTERVAL:10}"
415   - persistToTelemetry: "${PERSIST_STATE_TO_TELEMETRY:false}"
  415 + persistToTelemetry: "${PERSIST_STATE_TO_TELEMETRY:true}"
416 416
417 417 js:
418 418 evaluator: "${JS_EVALUATOR:local}" # local/remote
... ...
... ... @@ -32,6 +32,10 @@ public class BaseAttributeKvEntry implements AttributeKvEntry {
32 32 this.lastUpdateTs = lastUpdateTs;
33 33 }
34 34
  35 + public BaseAttributeKvEntry(long lastUpdateTs, KvEntry kv) {
  36 + this(kv, lastUpdateTs);
  37 + }
  38 +
35 39 @Override
36 40 public long getLastUpdateTs() {
37 41 return lastUpdateTs;
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.discovery;
  17 +
  18 +import lombok.Getter;
  19 +import org.springframework.context.ApplicationEvent;
  20 +
  21 +import java.util.Set;
  22 +
  23 +
  24 +public class ClusterTopologyChangeEvent extends ApplicationEvent {
  25 +
  26 + @Getter
  27 + private final Set<ServiceKey> serviceKeys;
  28 +
  29 + public ClusterTopologyChangeEvent(Object source, Set<ServiceKey> serviceKeys) {
  30 + super(source);
  31 + this.serviceKeys = serviceKeys;
  32 + }
  33 +}
... ...
... ... @@ -66,6 +66,10 @@ public class ConsistentHashPartitionService implements PartitionService {
66 66 //TODO: Fetch this from the database, together with size of partitions for each service for each tenant.
67 67 private ConcurrentMap<TenantId, Set<ServiceType>> isolatedTenants = new ConcurrentHashMap<>();
68 68
  69 + private Map<String, TopicPartitionInfo> tbCoreNotificationTopics = new HashMap<>();
  70 + private Map<String, TopicPartitionInfo> tbRuleEngineNotificationTopics = new HashMap<>();
  71 + private List<ServiceInfo> currentOtherServices;
  72 +
69 73 private HashFunction hashFunction;
70 74
71 75 public ConsistentHashPartitionService(TbServiceInfoProvider serviceInfoProvider, ApplicationEventPublisher applicationEventPublisher) {
... ... @@ -85,12 +89,11 @@ public class ConsistentHashPartitionService implements PartitionService {
85 89 @Override
86 90 public List<TopicPartitionInfo> getCurrentPartitions(ServiceType serviceType) {
87 91 ServiceInfo currentService = serviceInfoProvider.getServiceInfo();
88   - TenantId tenantId = getTenantId(currentService);
  92 + TenantId tenantId = getSystemOrIsolatedTenantId(currentService);
89 93 ServiceKey serviceKey = new ServiceKey(serviceType, tenantId);
90 94 List<Integer> partitions = myPartitions.get(serviceKey);
91 95 List<TopicPartitionInfo> topicPartitions = new ArrayList<>();
92 96 for (Integer partition : partitions) {
93   -
94 97 TopicPartitionInfo.TopicPartitionInfoBuilder tpi = TopicPartitionInfo.builder();
95 98 tpi.topic(partitionTopics.get(serviceType));
96 99 tpi.partition(partition);
... ... @@ -112,34 +115,16 @@ public class ConsistentHashPartitionService implements PartitionService {
112 115 return buildTopicPartitionInfo(serviceType, tenantId, partition);
113 116 }
114 117
115   - private TopicPartitionInfo buildTopicPartitionInfo(ServiceKey serviceKey, int partition) {
116   - return buildTopicPartitionInfo(serviceKey.getServiceType(), serviceKey.getTenantId(), partition);
117   - }
118   -
119   - private TopicPartitionInfo buildTopicPartitionInfo(ServiceType serviceType, TenantId tenantId, int partition) {
120   - boolean isolated = isolatedTenants.get(tenantId) != null && isolatedTenants.get(tenantId).contains(serviceType);
121   - TopicPartitionInfo.TopicPartitionInfoBuilder tpi = TopicPartitionInfo.builder();
122   - tpi.topic(partitionTopics.get(serviceType));
123   - tpi.partition(partition);
124   - if (isolated) {
125   - tpi.tenantId(tenantId);
126   - }
127   - return tpi.build();
128   - }
129   -
130 118 @Override
131 119 public void recalculatePartitions(ServiceInfo currentService, List<ServiceInfo> otherServices) {
132 120 logServiceInfo(currentService);
133 121 otherServices.forEach(this::logServiceInfo);
134 122
135   - Map<ServiceType, ConsistentHashCircle<ServiceInfo>> newCircles = new HashMap<>(ServiceType.values().length);
136   - for (ServiceType serverType : ServiceType.values()) {
137   - newCircles.put(serverType, new ConsistentHashCircle<>());
138   - }
139   - addNode(newCircles, currentService);
  123 + Map<ServiceKey, ConsistentHashCircle<ServiceInfo>> circles = new HashMap<>();
  124 + addNode(circles, currentService);
140 125 for (ServiceInfo other : otherServices) {
141   - addNode(newCircles, other);
142   - TenantId tenantId = getTenantId(other);
  126 + TenantId tenantId = getSystemOrIsolatedTenantId(other);
  127 + addNode(circles, other);
143 128 if (!tenantId.isNullUid()) {
144 129 isolatedTenants.putIfAbsent(tenantId, new HashSet<>());
145 130 for (String serviceType : other.getServiceTypesList()) {
... ... @@ -149,12 +134,14 @@ public class ConsistentHashPartitionService implements PartitionService {
149 134 }
150 135 }
151 136 ConcurrentMap<ServiceKey, List<Integer>> oldPartitions = myPartitions;
  137 + TenantId myTenantId = getSystemOrIsolatedTenantId(currentService);
152 138 myPartitions = new ConcurrentHashMap<>();
153 139 partitionSizes.forEach((type, size) -> {
  140 + ServiceKey myServiceKey = new ServiceKey(type, myTenantId);
154 141 for (int i = 0; i < size; i++) {
155   - ServiceInfo serviceInfo = resolveByPartitionIdx(newCircles.get(type), i);
  142 + ServiceInfo serviceInfo = resolveByPartitionIdx(circles.get(myServiceKey), i);
156 143 if (currentService.equals(serviceInfo)) {
157   - ServiceKey serviceKey = new ServiceKey(type, getTenantId(serviceInfo));
  144 + ServiceKey serviceKey = new ServiceKey(type, getSystemOrIsolatedTenantId(serviceInfo));
158 145 myPartitions.computeIfAbsent(serviceKey, key -> new ArrayList<>()).add(i);
159 146 }
160 147 }
... ... @@ -165,13 +152,81 @@ public class ConsistentHashPartitionService implements PartitionService {
165 152 Set<TopicPartitionInfo> tpiList = partitions.stream()
166 153 .map(partition -> buildTopicPartitionInfo(serviceKey, partition))
167 154 .collect(Collectors.toSet());
  155 + // Adding notifications topic for every @TopicPartitionInfo list
  156 + tpiList.add(getNotificationsTopic(serviceKey.getServiceType(), serviceInfoProvider.getServiceId()));
168 157 applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceKey, tpiList));
169 158 }
  159 +
170 160 });
  161 +
  162 + if (currentOtherServices == null) {
  163 + currentOtherServices = new ArrayList<>(otherServices);
  164 + } else {
  165 + Set<ServiceKey> changes = new HashSet<>();
  166 + Map<ServiceKey, List<ServiceInfo>> currentMap = getServiceKeyListMap(currentOtherServices);
  167 + Map<ServiceKey, List<ServiceInfo>> newMap = getServiceKeyListMap(otherServices);
  168 + currentOtherServices = otherServices;
  169 + currentMap.forEach((key, list) -> {
  170 + if (!list.equals(newMap.get(key))) {
  171 + changes.add(key);
  172 +
  173 + }
  174 + });
  175 + currentMap.keySet().forEach(newMap::remove);
  176 + changes.addAll(newMap.keySet());
  177 + if (!changes.isEmpty()) {
  178 + applicationEventPublisher.publishEvent(new ClusterTopologyChangeEvent(this, changes));
  179 + }
  180 + }
  181 + }
  182 +
  183 + private Map<ServiceKey, List<ServiceInfo>> getServiceKeyListMap(List<ServiceInfo> services) {
  184 + final Map<ServiceKey, List<ServiceInfo>> currentMap = new HashMap<>();
  185 + services.forEach(serviceInfo -> {
  186 + for (String serviceTypeStr : serviceInfo.getServiceTypesList()) {
  187 + ServiceType serviceType = ServiceType.valueOf(serviceTypeStr.toUpperCase());
  188 + ServiceKey serviceKey = new ServiceKey(serviceType, getSystemOrIsolatedTenantId(serviceInfo));
  189 + currentMap.computeIfAbsent(serviceKey, key -> new ArrayList<>()).add(serviceInfo);
  190 + }
  191 + });
  192 + return currentMap;
  193 + }
  194 +
  195 + @Override
  196 + public TopicPartitionInfo getNotificationsTopic(ServiceType serviceType, String serviceId) {
  197 + switch (serviceType) {
  198 + case TB_CORE:
  199 + return tbCoreNotificationTopics.computeIfAbsent(serviceId,
  200 + id -> buildTopicPartitionInfo(serviceType, serviceId));
  201 + case TB_RULE_ENGINE:
  202 + return tbRuleEngineNotificationTopics.computeIfAbsent(serviceId,
  203 + id -> buildTopicPartitionInfo(serviceType, serviceId));
  204 + default:
  205 + return buildTopicPartitionInfo(serviceType, serviceId);
  206 + }
  207 + }
  208 +
  209 + private TopicPartitionInfo buildTopicPartitionInfo(ServiceType serviceType, String serviceId) {
  210 + return new TopicPartitionInfo(serviceType.name().toLowerCase() + "." + serviceId, null, null);
  211 + }
  212 +
  213 + private TopicPartitionInfo buildTopicPartitionInfo(ServiceKey serviceKey, int partition) {
  214 + return buildTopicPartitionInfo(serviceKey.getServiceType(), serviceKey.getTenantId(), partition);
  215 + }
  216 +
  217 + private TopicPartitionInfo buildTopicPartitionInfo(ServiceType serviceType, TenantId tenantId, int partition) {
  218 + boolean isolated = isolatedTenants.get(tenantId) != null && isolatedTenants.get(tenantId).contains(serviceType);
  219 + TopicPartitionInfo.TopicPartitionInfoBuilder tpi = TopicPartitionInfo.builder();
  220 + tpi.topic(partitionTopics.get(serviceType));
  221 + tpi.partition(partition);
  222 + if (isolated) {
  223 + tpi.tenantId(tenantId);
  224 + }
  225 + return tpi.build();
171 226 }
172 227
173 228 private void logServiceInfo(TransportProtos.ServiceInfo server) {
174   - TenantId tenantId = getTenantId(server);
  229 + TenantId tenantId = getSystemOrIsolatedTenantId(server);
175 230 if (tenantId.isNullUid()) {
176 231 log.info("[{}] Found common server: [{}]", server.getServiceId(), server.getServiceTypesList());
177 232 } else {
... ... @@ -179,21 +234,23 @@ public class ConsistentHashPartitionService implements PartitionService {
179 234 }
180 235 }
181 236
182   - private TenantId getTenantId(TransportProtos.ServiceInfo serviceInfo) {
  237 + private TenantId getSystemOrIsolatedTenantId(TransportProtos.ServiceInfo serviceInfo) {
183 238 return new TenantId(new UUID(serviceInfo.getTenantIdMSB(), serviceInfo.getTenantIdLSB()));
184 239 }
185 240
186   - private void addNode(Map<ServiceType, ConsistentHashCircle<ServiceInfo>> circles, ServiceInfo instance) {
  241 + private void addNode(Map<ServiceKey, ConsistentHashCircle<ServiceInfo>> circles, ServiceInfo instance) {
  242 + TenantId tenantId = getSystemOrIsolatedTenantId(instance);
187 243 for (String serviceTypeStr : instance.getServiceTypesList()) {
188 244 ServiceType serviceType = ServiceType.valueOf(serviceTypeStr.toUpperCase());
  245 + ServiceKey serviceKey = new ServiceKey(serviceType, tenantId);
189 246 for (int i = 0; i < virtualNodesSize; i++) {
190   - circles.get(serviceType).put(hash(instance, i).asLong(), instance);
  247 + circles.computeIfAbsent(serviceKey, key -> new ConsistentHashCircle<>()).put(hash(instance, i).asLong(), instance);
191 248 }
192 249 }
193 250 }
194 251
195 252 private ServiceInfo resolveByPartitionIdx(ConsistentHashCircle<ServiceInfo> circle, Integer partitionIdx) {
196   - if (circle.isEmpty()) {
  253 + if (circle == null || circle.isEmpty()) {
197 254 return null;
198 255 }
199 256 Long hash = hashFunction.newHasher().putInt(partitionIdx).hash().asLong();
... ...
... ... @@ -36,4 +36,13 @@ public interface PartitionService {
36 36 * @param otherServices - all other discovered services {@link org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo}
37 37 */
38 38 void recalculatePartitions(TransportProtos.ServiceInfo currentService, List<TransportProtos.ServiceInfo> otherServices);
  39 +
  40 + /**
  41 + * Each Service should start a consumer for messages that target individual service instance based on serviceId.
  42 + * This topic is likely to have single partition, and is always assigned to the service.
  43 + * @param tbCore
  44 + * @param serviceId
  45 + * @return
  46 + */
  47 + TopicPartitionInfo getNotificationsTopic(ServiceType tbCore, String serviceId);
39 48 }
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ...
... ... @@ -240,6 +240,79 @@ message DeviceActorToTransportMsg {
240 240 }
241 241
242 242 /**
  243 + * TB Core Data Structures
  244 + */
  245 +
  246 +message TbSubscriptionProto {
  247 + string serviceId = 1;
  248 + string sessionId = 2;
  249 + int32 subscriptionId = 3;
  250 + string entityType = 4;
  251 + int64 tenantIdMSB = 5;
  252 + int64 tenantIdLSB = 6;
  253 + int64 entityIdMSB = 7;
  254 + int64 entityIdLSB = 8;
  255 +}
  256 +
  257 +message TbTimeSeriesSubscriptionProto {
  258 + TbSubscriptionProto sub = 1;
  259 + bool allKeys = 2;
  260 + repeated TbSubscriptionKetStateProto keyStates = 3;
  261 + int64 startTime = 4;
  262 + int64 endTime = 5;
  263 +}
  264 +
  265 +message TbAttributeSubscriptionProto {
  266 + TbSubscriptionProto sub = 1;
  267 + bool allKeys = 2;
  268 + repeated TbSubscriptionKetStateProto keyStates = 3;
  269 + string scope = 4;
  270 +}
  271 +
  272 +message TbSubscriptionUpdateProto {
  273 + string sessionId = 1;
  274 + int32 subscriptionId = 2;
  275 + int32 errorCode = 3;
  276 + string errorMsg = 4;
  277 + repeated TbSubscriptionUpdateValueListProto data = 5;
  278 +}
  279 +
  280 +message TbAttributeUpdateProto {
  281 + string entityType = 1;
  282 + int64 entityIdMSB = 2;
  283 + int64 entityIdLSB = 3;
  284 + int64 tenantIdMSB = 4;
  285 + int64 tenantIdLSB = 5;
  286 + string scope = 6;
  287 + repeated TsKvProto data = 7;
  288 +}
  289 +
  290 +message TbTimeSeriesUpdateProto {
  291 + string entityType = 1;
  292 + int64 entityIdMSB = 2;
  293 + int64 entityIdLSB = 3;
  294 + int64 tenantIdMSB = 4;
  295 + int64 tenantIdLSB = 5;
  296 + repeated TsKvProto data = 6;
  297 +}
  298 +
  299 +message TbSubscriptionCloseProto {
  300 + string sessionId = 1;
  301 + int32 subscriptionId = 2;
  302 +}
  303 +
  304 +message TbSubscriptionKetStateProto {
  305 + string key = 1;
  306 + int64 ts = 2;
  307 +}
  308 +
  309 +message TbSubscriptionUpdateValueListProto {
  310 + string key = 1;
  311 + repeated int64 ts = 2;
  312 + repeated string value = 3;
  313 +}
  314 +
  315 +/**
243 316 * TB Core to TB Core messages
244 317 */
245 318
... ... @@ -253,27 +326,41 @@ message DeviceStateServiceMsgProto {
253 326 bool deleted = 7;
254 327 }
255 328
  329 +message SubscriptionMgrMsgProto {
  330 + TbTimeSeriesSubscriptionProto telemetrySub = 1;
  331 + TbAttributeSubscriptionProto attributeSub = 2;
  332 + TbSubscriptionCloseProto subClose = 3;
  333 + TbTimeSeriesUpdateProto tsUpdate = 4;
  334 + TbAttributeUpdateProto attrUpdate = 5;
  335 +}
  336 +
  337 +message LocalSubscriptionServiceMsgProto {
  338 + TbSubscriptionUpdateProto subUpdate = 1;
  339 +}
  340 +
256 341 /**
257 342 * Main messages;
258 343 */
259 344
260 345 /* Request from Transport Service to ThingsBoard Core Service */
261 346 message TransportApiRequestMsg {
262   - ValidateDeviceTokenRequestMsg validateTokenRequestMsg = 1;
263   - ValidateDeviceX509CertRequestMsg validateX509CertRequestMsg = 2;
264   - GetOrCreateDeviceFromGatewayRequestMsg getOrCreateDeviceRequestMsg = 3;
  347 + ValidateDeviceTokenRequestMsg validateTokenRequestMsg = 1;
  348 + ValidateDeviceX509CertRequestMsg validateX509CertRequestMsg = 2;
  349 + GetOrCreateDeviceFromGatewayRequestMsg getOrCreateDeviceRequestMsg = 3;
265 350 }
266 351
267 352 /* Response from ThingsBoard Core Service to Transport Service */
268 353 message TransportApiResponseMsg {
269   - ValidateDeviceCredentialsResponseMsg validateTokenResponseMsg = 1;
270   - GetOrCreateDeviceFromGatewayResponseMsg getOrCreateDeviceResponseMsg = 2;
  354 + ValidateDeviceCredentialsResponseMsg validateTokenResponseMsg = 1;
  355 + GetOrCreateDeviceFromGatewayResponseMsg getOrCreateDeviceResponseMsg = 2;
271 356 }
272 357
273 358 /* Messages that are handled by ThingsBoard Core Service */
274 359 message ToCoreMsg {
275 360 TransportToDeviceActorMsg toDeviceActorMsg = 1;
276 361 DeviceStateServiceMsgProto deviceStateServiceMsg = 2;
  362 + SubscriptionMgrMsgProto toSubscriptionMgrMsg = 3;
  363 + LocalSubscriptionServiceMsgProto toLocalSubscriptionServiceMsg = 4;
277 364 }
278 365
279 366 /* Messages that are handled by ThingsBoard RuleEngine Service */
... ...
... ... @@ -136,6 +136,7 @@ public class DefaultTransportService implements TransportService {
136 136 log.warn("Failed to process the notification.", e);
137 137 }
138 138 });
  139 + transportNotificationsConsumer.commit();
139 140 } catch (Exception e) {
140 141 log.warn("Failed to obtain messages from queue.", e);
141 142 try {
... ...
... ... @@ -44,6 +44,4 @@ public interface RuleEngineTelemetryService {
44 44
45 45 void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, boolean value, FutureCallback<Void> callback);
46 46
47   - void onSharedAttributesUpdate(TenantId tenantId, DeviceId deviceId, Set<AttributeKvEntry> attributes);
48   -
49 47 }
... ...
... ... @@ -65,9 +65,6 @@ public class TbMsgAttributesNode implements TbNode {
65 65 String src = msg.getData();
66 66 Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(src));
67 67 ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(), config.getScope(), new ArrayList<>(attributes), new TelemetryNodeCallback(ctx, msg));
68   - if (msg.getOriginator().getEntityType() == EntityType.DEVICE && DataConstants.SHARED_SCOPE.equals(config.getScope())) {
69   - ctx.getTelemetryService().onSharedAttributesUpdate(ctx.getTenantId(), new DeviceId(msg.getOriginator().getId()), attributes);
70   - }
71 68 }
72 69
73 70 @Override
... ...