Commit 63bac58ae84d31f3976ab447c088741fbfc50aeb

Authored by Andrii Shvaika
1 parent 2e122d86

Implementation of statistics counters

Showing 23 changed files with 186 additions and 378 deletions
... ... @@ -161,10 +161,6 @@ public class ActorSystemContext {
161 161
162 162 @Autowired
163 163 @Getter
164   - private TbQueueProducerProvider producerProvider;
165   -
166   - @Autowired
167   - @Getter
168 164 private TimeseriesService tsService;
169 165
170 166 @Autowired
... ...
... ... @@ -17,10 +17,8 @@ package org.thingsboard.server.actors.ruleChain;
17 17
18 18 import akka.actor.ActorRef;
19 19 import com.datastax.driver.core.ResultSetFuture;
20   -import com.datastax.driver.core.utils.UUIDs;
21 20 import com.fasterxml.jackson.core.JsonProcessingException;
22 21 import com.fasterxml.jackson.databind.ObjectMapper;
23   -import com.fasterxml.jackson.databind.node.ObjectNode;
24 22 import io.netty.channel.EventLoopGroup;
25 23 import lombok.extern.slf4j.Slf4j;
26 24 import org.springframework.data.redis.core.RedisTemplate;
... ... @@ -36,7 +34,6 @@ import org.thingsboard.server.actors.ActorSystemContext;
36 34 import org.thingsboard.server.common.data.Customer;
37 35 import org.thingsboard.server.common.data.DataConstants;
38 36 import org.thingsboard.server.common.data.Device;
39   -import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo;
40 37 import org.thingsboard.server.common.data.alarm.Alarm;
41 38 import org.thingsboard.server.common.data.asset.Asset;
42 39 import org.thingsboard.server.common.data.id.EntityId;
... ... @@ -45,8 +42,8 @@ import org.thingsboard.server.common.data.id.RuleNodeId;
45 42 import org.thingsboard.server.common.data.id.TenantId;
46 43 import org.thingsboard.server.common.data.rule.RuleNode;
47 44 import org.thingsboard.server.common.msg.TbMsg;
48   -import org.thingsboard.server.common.msg.TbMsgDataType;
49 45 import org.thingsboard.server.common.msg.TbMsgMetaData;
  46 +import org.thingsboard.server.common.msg.queue.ServiceType;
50 47 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
51 48 import org.thingsboard.server.dao.alarm.AlarmService;
52 49 import org.thingsboard.server.dao.asset.AssetService;
... ... @@ -62,11 +59,9 @@ import org.thingsboard.server.dao.rule.RuleChainService;
62 59 import org.thingsboard.server.dao.tenant.TenantService;
63 60 import org.thingsboard.server.dao.timeseries.TimeseriesService;
64 61 import org.thingsboard.server.dao.user.UserService;
65   -import org.thingsboard.server.common.msg.queue.ServiceType;
66 62 import org.thingsboard.server.gen.transport.TransportProtos;
67 63 import org.thingsboard.server.queue.TbQueueCallback;
68 64 import org.thingsboard.server.queue.TbQueueMsgMetadata;
69   -import org.thingsboard.server.queue.common.TbProtoQueueMsg;
70 65 import org.thingsboard.server.service.script.RuleNodeJsScriptEngine;
71 66 import scala.concurrent.duration.Duration;
72 67
... ... @@ -136,7 +131,7 @@ class DefaultTbContext implements TbContext {
136 131 .setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
137 132 .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
138 133 .setTbMsg(TbMsg.toByteString(tbMsg)).build();
139   - mainCtx.getProducerProvider().getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), new SimpleTbQueueCallback(onSuccess, onFailure));
  134 + mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback(onSuccess, onFailure));
140 135 }
141 136
142 137 @Override
... ... @@ -193,7 +188,7 @@ class DefaultTbContext implements TbContext {
193 188 if (failureMessage != null) {
194 189 msg.setFailureMessage(failureMessage);
195 190 }
196   - mainCtx.getProducerProvider().getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg.build()), new SimpleTbQueueCallback(onSuccess, onFailure));
  191 + mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg.build(), new SimpleTbQueueCallback(onSuccess, onFailure));
197 192 }
198 193
199 194 @Override
... ...
... ... @@ -44,10 +44,9 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
44 44 import org.thingsboard.server.dao.rule.RuleChainService;
45 45 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
46 46 import org.thingsboard.server.queue.TbQueueCallback;
47   -import org.thingsboard.server.queue.TbQueueProducer;
48 47 import org.thingsboard.server.queue.common.MultipleTbQueueTbMsgCallbackWrapper;
49   -import org.thingsboard.server.queue.common.TbProtoQueueMsg;
50 48 import org.thingsboard.server.queue.common.TbQueueTbMsgCallbackWrapper;
  49 +import org.thingsboard.server.service.queue.TbClusterService;
51 50
52 51 import java.util.ArrayList;
53 52 import java.util.Collections;
... ... @@ -68,7 +67,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
68 67 private final Map<RuleNodeId, RuleNodeCtx> nodeActors;
69 68 private final Map<RuleNodeId, List<RuleNodeRelation>> nodeRoutes;
70 69 private final RuleChainService service;
71   - private final TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> producer;
  70 + private final TbClusterService clusterService;
72 71 private String ruleChainName;
73 72
74 73 private RuleNodeId firstId;
... ... @@ -84,7 +83,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
84 83 this.nodeActors = new HashMap<>();
85 84 this.nodeRoutes = new HashMap<>();
86 85 this.service = systemContext.getRuleChainService();
87   - this.producer = systemContext.getProducerProvider().getRuleEngineMsgProducer();
  86 + this.clusterService = systemContext.getClusterService();
88 87 }
89 88
90 89 @Override
... ... @@ -255,7 +254,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
255 254 msg.getCallback().onFailure(new RuleNodeException(failureMessage, ruleChainName, ruleNodeCtx.getSelf()));
256 255 } else {
257 256 log.debug("[{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info", entityId, originatorNodeId.getId());
258   - //TODO 2.5: Introduce our own RuleEngineFailureException to track what is wrong
259 257 msg.getCallback().onFailure(new RuleEngineException("Failure during message processing by Rule Node [" + originatorNodeId.getId().toString() + "]"));
260 258 }
261 259 } else {
... ... @@ -311,7 +309,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
311 309 .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
312 310 .setTbMsg(TbMsg.toByteString(newMsg))
313 311 .build();
314   - producer.send(tpi, new TbProtoQueueMsg<>(newMsg.getId(), toQueueMsg), callbackWrapper);
  312 + clusterService.pushMsgToRuleEngine(tpi, newMsg.getId(), toQueueMsg, callbackWrapper);
315 313 }
316 314
317 315 private boolean contains(Set<String> relationTypes, String type) {
... ...
... ... @@ -97,96 +97,4 @@ public class DefaultActorService implements ActorService {
97 97 }
98 98 }
99 99
100   - @Value("${cluster.stats.enabled:false}")
101   - private boolean statsEnabled;
102   -
103   - //TODO 2.5
104   - private final AtomicInteger sentClusterMsgs = new AtomicInteger(0);
105   - private final AtomicInteger receivedClusterMsgs = new AtomicInteger(0);
106   -
107   -
108   - @Scheduled(fixedDelayString = "${cluster.stats.print_interval_ms}")
109   - public void printStats() {
110   - if (statsEnabled) {
111   - int sent = sentClusterMsgs.getAndSet(0);
112   - int received = receivedClusterMsgs.getAndSet(0);
113   - if (sent > 0 || received > 0) {
114   - log.info("Cluster msgs sent [{}] received [{}]", sent, received);
115   - }
116   - }
117   - }
118   -
119   - //TODO 2.5
120   -// @Override
121   -// public void onReceivedMsg(ServerAddress source, ClusterAPIProtos.ClusterMessage msg) {
122   -// if (statsEnabled) {
123   -// receivedClusterMsgs.incrementAndGet();
124   -// }
125   -// ServerAddress serverAddress = new ServerAddress(source.getHost(), source.getPort(), source.getServerType());
126   -// if (log.isDebugEnabled()) {
127   -// log.info("Received msg [{}] from [{}]", msg.getMessageType().name(), serverAddress);
128   -// log.info("MSG: {}", msg);
129   -// }
130   -// switch (msg.getMessageType()) {
131   -// case CLUSTER_ACTOR_MESSAGE:
132   -// java.util.Optional<TbActorMsg> decodedMsg = actorContext.getEncodingService()
133   -// .decode(msg.getPayload().toByteArray());
134   -// if (decodedMsg.isPresent()) {
135   -// appActor.tell(decodedMsg.get(), ActorRef.noSender());
136   -// } else {
137   -// log.error("Error during decoding cluster proto message");
138   -// }
139   -// break;
140   -// case TO_ALL_NODES_MSG:
141   -// //TODO
142   -// break;
143   -// case CLUSTER_TELEMETRY_SUBSCRIPTION_CREATE_MESSAGE:
144   -// actorContext.getTsSubService().onNewRemoteSubscription(serverAddress, msg.getPayload().toByteArray());
145   -// break;
146   -// case CLUSTER_TELEMETRY_SUBSCRIPTION_UPDATE_MESSAGE:
147   -// actorContext.getTsSubService().onRemoteSubscriptionUpdate(serverAddress, msg.getPayload().toByteArray());
148   -// break;
149   -// case CLUSTER_TELEMETRY_SUBSCRIPTION_CLOSE_MESSAGE:
150   -// actorContext.getTsSubService().onRemoteSubscriptionClose(serverAddress, msg.getPayload().toByteArray());
151   -// break;
152   -// case CLUSTER_TELEMETRY_SESSION_CLOSE_MESSAGE:
153   -// actorContext.getTsSubService().onRemoteSessionClose(serverAddress, msg.getPayload().toByteArray());
154   -// break;
155   -// case CLUSTER_TELEMETRY_ATTR_UPDATE_MESSAGE:
156   -// actorContext.getTsSubService().onRemoteAttributesUpdate(serverAddress, msg.getPayload().toByteArray());
157   -// break;
158   -// case CLUSTER_TELEMETRY_TS_UPDATE_MESSAGE:
159   -// actorContext.getTsSubService().onRemoteTsUpdate(serverAddress, msg.getPayload().toByteArray());
160   -// break;
161   -// case CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE:
162   -// actorContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromRemoteServer(serverAddress, msg.getPayload().toByteArray());
163   -// break;
164   -// case CLUSTER_DEVICE_STATE_SERVICE_MESSAGE:
165   -// actorContext.getDeviceStateService().onRemoteMsg(serverAddress, msg.getPayload().toByteArray());
166   -// break;
167   -// case CLUSTER_TRANSACTION_SERVICE_MESSAGE:
168   -// actorContext.getRuleChainTransactionService().onRemoteTransactionMsg(serverAddress, msg.getPayload().toByteArray());
169   -// break;
170   -// }
171   -// }
172   -// @Override
173   -// public void onSendMsg(ClusterAPIProtos.ClusterMessage msg) {
174   -// if (statsEnabled) {
175   -// sentClusterMsgs.incrementAndGet();
176   -// }
177   -// rpcManagerActor.tell(msg, ActorRef.noSender());
178   -// }
179   -//
180   -// @Override
181   -// public void onRpcSessionCreateRequestMsg(RpcSessionCreateRequestMsg msg) {
182   -// if (statsEnabled) {
183   -// sentClusterMsgs.incrementAndGet();
184   -// }
185   -// rpcManagerActor.tell(msg, ActorRef.noSender());
186   -// }
187   -// @Override
188   -// public void onBroadcastMsg(RpcBroadcastMsg msg) {
189   -// rpcManagerActor.tell(msg, ActorRef.noSender());
190   -// }
191   -
192 100 }
... ...
... ... @@ -15,7 +15,6 @@
15 15 */
16 16 package org.thingsboard.server.controller;
17 17
18   -import com.datastax.driver.core.utils.UUIDs;
19 18 import com.fasterxml.jackson.databind.ObjectMapper;
20 19 import com.fasterxml.jackson.databind.node.ArrayNode;
21 20 import com.fasterxml.jackson.databind.node.ObjectNode;
... ... @@ -668,7 +667,7 @@ public abstract class BaseController {
668 667 }
669 668 }
670 669 TbMsg tbMsg = TbMsg.newMsg(msgType, entityId, metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode));
671   - tbClusterService.onToRuleEngineMsg(user.getTenantId(), entityId, tbMsg);
  670 + tbClusterService.pushMsgToRuleEngine(user.getTenantId(), entityId, tbMsg, null);
672 671 } catch (Exception e) {
673 672 log.warn("[{}] Failed to push entity action to rule engine: {}", entityId, actionType, e);
674 673 }
... ...
... ... @@ -99,8 +99,8 @@ public class DeviceController extends BaseController {
99 99
100 100 Device savedDevice = checkNotNull(deviceService.saveDeviceWithAccessToken(device, accessToken));
101 101
102   - tbClusterService.onToCoreMsg(new DeviceNameOrTypeUpdateMsg(savedDevice.getTenantId(),
103   - savedDevice.getId(), savedDevice.getName(), savedDevice.getType()));
  102 + tbClusterService.pushMsgToCore(new DeviceNameOrTypeUpdateMsg(savedDevice.getTenantId(),
  103 + savedDevice.getId(), savedDevice.getName(), savedDevice.getType()), null);
104 104
105 105 logEntityAction(savedDevice.getId(), savedDevice,
106 106 savedDevice.getCustomerId(),
... ... @@ -254,7 +254,7 @@ public class DeviceController extends BaseController {
254 254 Device device = checkDeviceId(deviceCredentials.getDeviceId(), Operation.WRITE_CREDENTIALS);
255 255 DeviceCredentials result = checkNotNull(deviceCredentialsService.updateDeviceCredentials(getCurrentUser().getTenantId(), deviceCredentials));
256 256
257   - tbClusterService.onToCoreMsg(new DeviceCredentialsUpdateNotificationMsg(getCurrentUser().getTenantId(), deviceCredentials.getDeviceId()));
  257 + tbClusterService.pushMsgToCore(new DeviceCredentialsUpdateNotificationMsg(getCurrentUser().getTenantId(), deviceCredentials.getDeviceId()), null);
258 258
259 259 logEntityAction(device.getId(), device,
260 260 device.getCustomerId(),
... ...
... ... @@ -364,8 +364,8 @@ public class TelemetryController extends BaseController {
364 364 DeviceId deviceId = new DeviceId(entityId.getId());
365 365 Set<AttributeKey> keysToNotify = new HashSet<>();
366 366 keys.forEach(key -> keysToNotify.add(new AttributeKey(scope, key)));
367   - tbClusterService.onToCoreMsg(DeviceAttributesEventNotificationMsg.onDelete(
368   - user.getTenantId(), deviceId, keysToNotify));
  367 + tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(
  368 + user.getTenantId(), deviceId, keysToNotify), null);
369 369 }
370 370 result.setResult(new ResponseEntity<>(HttpStatus.OK));
371 371 }
... ... @@ -399,8 +399,8 @@ public class TelemetryController extends BaseController {
399 399 logAttributesUpdated(user, entityId, scope, attributes, null);
400 400 if (entityId.getEntityType() == EntityType.DEVICE) {
401 401 DeviceId deviceId = new DeviceId(entityId.getId());
402   - tbClusterService.onToCoreMsg(DeviceAttributesEventNotificationMsg.onUpdate(
403   - user.getTenantId(), deviceId, scope, attributes));
  402 + tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onUpdate(
  403 + user.getTenantId(), deviceId, scope, attributes), null);
404 404 }
405 405 result.setResult(new ResponseEntity(HttpStatus.OK));
406 406 }
... ...
... ... @@ -17,6 +17,8 @@ package org.thingsboard.server.service.queue;
17 17
18 18 import com.google.protobuf.ByteString;
19 19 import lombok.extern.slf4j.Slf4j;
  20 +import org.springframework.beans.factory.annotation.Value;
  21 +import org.springframework.scheduling.annotation.Scheduled;
20 22 import org.springframework.stereotype.Service;
21 23 import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg;
22 24 import org.thingsboard.server.common.data.EntityType;
... ... @@ -27,7 +29,13 @@ import org.thingsboard.server.common.msg.TbMsg;
27 29 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
28 30 import org.thingsboard.server.common.msg.queue.ServiceType;
29 31 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
30   -import org.thingsboard.server.gen.transport.TransportProtos.*;
  32 +import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto;
  33 +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
  34 +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
  35 +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
  36 +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
  37 +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
  38 +import org.thingsboard.server.queue.TbQueueCallback;
31 39 import org.thingsboard.server.queue.TbQueueProducer;
32 40 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
33 41 import org.thingsboard.server.queue.discovery.PartitionService;
... ... @@ -38,12 +46,22 @@ import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
38 46 import java.util.HashSet;
39 47 import java.util.Set;
40 48 import java.util.UUID;
  49 +import java.util.concurrent.atomic.AtomicInteger;
41 50
42 51 @Service
43 52 @Slf4j
44 53 public class DefaultTbClusterService implements TbClusterService {
45 54
46   - protected TbQueueProducerProvider producerProvider;
  55 + @Value("${cluster.stats.enabled:false}")
  56 + private boolean statsEnabled;
  57 +
  58 + private final AtomicInteger toCoreMsgs = new AtomicInteger(0);
  59 + private final AtomicInteger toCoreNfs = new AtomicInteger(0);
  60 + private final AtomicInteger toRuleEngineMsgs = new AtomicInteger(0);
  61 + private final AtomicInteger toRuleEngineNfs = new AtomicInteger(0);
  62 + private final AtomicInteger toTransportNfs = new AtomicInteger(0);
  63 +
  64 + private final TbQueueProducerProvider producerProvider;
47 65 private final PartitionService partitionService;
48 66 private final DataDecodingEncodingService encodingService;
49 67
... ... @@ -54,33 +72,29 @@ public class DefaultTbClusterService implements TbClusterService {
54 72 }
55 73
56 74 @Override
57   - public void onToRuleEngineMsg(TenantId tenantId, EntityId entityId, TbMsg tbMsg) {
58   - if (tenantId.isNullUid()) {
59   - if (entityId.getEntityType().equals(EntityType.TENANT)) {
60   - tenantId = new TenantId(entityId.getId());
61   - } else {
62   - log.warn("[{}][{}] Received invalid message: {}", tenantId, entityId, tbMsg);
63   - return;
64   - }
65   - }
66   - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId);
67   - ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder()
68   - .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
69   - .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
70   - .setTbMsg(TbMsg.toByteString(tbMsg)).build();
71   - producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), null);
  75 + public void pushMsgToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, TbQueueCallback callback) {
  76 + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
  77 + producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), callback);
  78 + toCoreMsgs.incrementAndGet();
  79 + }
  80 +
  81 + @Override
  82 + public void pushMsgToCore(TopicPartitionInfo tpi, UUID msgId, ToCoreMsg msg, TbQueueCallback callback) {
  83 + producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(msgId, msg), callback);
  84 + toCoreMsgs.incrementAndGet();
72 85 }
73 86
74 87 @Override
75   - public void onToCoreMsg(ToDeviceActorNotificationMsg msg) {
  88 + public void pushMsgToCore(ToDeviceActorNotificationMsg msg, TbQueueCallback callback) {
76 89 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, msg.getTenantId(), msg.getDeviceId());
77 90 byte[] msgBytes = encodingService.encode(msg);
78 91 ToCoreMsg toCoreMsg = ToCoreMsg.newBuilder().setToDeviceActorNotificationMsg(ByteString.copyFrom(msgBytes)).build();
79   - producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(msg.getDeviceId().getId(), toCoreMsg), null);
  92 + producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(msg.getDeviceId().getId(), toCoreMsg), callback);
  93 + toCoreMsgs.incrementAndGet();
80 94 }
81 95
82 96 @Override
83   - public void onToCoreMsg(String serviceId, FromDeviceRpcResponse response) {
  97 + public void pushNotificationToCore(String serviceId, FromDeviceRpcResponse response, TbQueueCallback callback) {
84 98 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceId);
85 99 FromDeviceRPCResponseProto.Builder builder = FromDeviceRPCResponseProto.newBuilder()
86 100 .setRequestIdMSB(response.getId().getMostSignificantBits())
... ... @@ -88,11 +102,37 @@ public class DefaultTbClusterService implements TbClusterService {
88 102 .setError(response.getError().isPresent() ? response.getError().get().ordinal() : -1);
89 103 response.getResponse().ifPresent(builder::setResponse);
90 104 ToCoreNotificationMsg msg = ToCoreNotificationMsg.newBuilder().setFromDeviceRpcResponse(builder).build();
91   - producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(response.getId(), msg), null);
  105 + producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(response.getId(), msg), callback);
  106 + toCoreNfs.incrementAndGet();
  107 + }
  108 +
  109 + @Override
  110 + public void pushMsgToRuleEngine(TopicPartitionInfo tpi, UUID msgId, ToRuleEngineMsg msg, TbQueueCallback callback) {
  111 + producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(msgId, msg), callback);
  112 + toRuleEngineMsgs.incrementAndGet();
92 113 }
93 114
94 115 @Override
95   - public void onToRuleEngineMsg(String serviceId, FromDeviceRpcResponse response) {
  116 + public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) {
  117 + if (tenantId.isNullUid()) {
  118 + if (entityId.getEntityType().equals(EntityType.TENANT)) {
  119 + tenantId = new TenantId(entityId.getId());
  120 + } else {
  121 + log.warn("[{}][{}] Received invalid message: {}", tenantId, entityId, tbMsg);
  122 + return;
  123 + }
  124 + }
  125 + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId);
  126 + ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder()
  127 + .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
  128 + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
  129 + .setTbMsg(TbMsg.toByteString(tbMsg)).build();
  130 + producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback);
  131 + toRuleEngineMsgs.incrementAndGet();
  132 + }
  133 +
  134 + @Override
  135 + public void pushNotificationToRuleEngine(String serviceId, FromDeviceRpcResponse response, TbQueueCallback callback) {
96 136 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceId);
97 137 FromDeviceRPCResponseProto.Builder builder = FromDeviceRPCResponseProto.newBuilder()
98 138 .setRequestIdMSB(response.getId().getMostSignificantBits())
... ... @@ -100,13 +140,15 @@ public class DefaultTbClusterService implements TbClusterService {
100 140 .setError(response.getError().isPresent() ? response.getError().get().ordinal() : -1);
101 141 response.getResponse().ifPresent(builder::setResponse);
102 142 ToRuleEngineNotificationMsg msg = ToRuleEngineNotificationMsg.newBuilder().setFromDeviceRpcResponse(builder).build();
103   - producerProvider.getRuleEngineNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(response.getId(), msg), null);
  143 + producerProvider.getRuleEngineNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(response.getId(), msg), callback);
  144 + toRuleEngineNfs.incrementAndGet();
104 145 }
105 146
106 147 @Override
107   - public void onToTransportMsg(String serviceId, ToTransportMsg response) {
  148 + public void pushNotificationToTransport(String serviceId, ToTransportMsg response, TbQueueCallback callback) {
108 149 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, serviceId);
109   - producerProvider.getTransportNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), response), null);
  150 + producerProvider.getTransportNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), response), callback);
  151 + toTransportNfs.incrementAndGet();
110 152 }
111 153
112 154 @Override
... ... @@ -120,12 +162,13 @@ public class DefaultTbClusterService implements TbClusterService {
120 162 TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> toRuleEngineProducer = producerProvider.getRuleEngineNotificationsMsgProducer();
121 163 Set<String> tbRuleEngineServices = new HashSet<>(partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE));
122 164 if (msg.getEntityId().getEntityType().equals(EntityType.TENANT)) {
123   - TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toCoreProducer = producerProvider.getTbCoreNotificationsMsgProducer();
  165 + TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer();
124 166 Set<String> tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE);
125 167 for (String serviceId : tbCoreServices) {
126 168 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceId);
127 169 ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setComponentLifecycleMsg(ByteString.copyFrom(msgBytes)).build();
128   - toCoreProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEntityId().getId(), toCoreMsg), null);
  170 + toCoreNfProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEntityId().getId(), toCoreMsg), null);
  171 + toCoreNfs.incrementAndGet();
129 172 }
130 173 // No need to push notifications twice
131 174 tbRuleEngineServices.removeAll(tbCoreServices);
... ... @@ -134,6 +177,22 @@ public class DefaultTbClusterService implements TbClusterService {
134 177 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceId);
135 178 ToRuleEngineNotificationMsg toRuleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().setComponentLifecycleMsg(ByteString.copyFrom(msgBytes)).build();
136 179 toRuleEngineProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEntityId().getId(), toRuleEngineMsg), null);
  180 + toRuleEngineNfs.incrementAndGet();
  181 + }
  182 + }
  183 +
  184 + @Scheduled(fixedDelayString = "${cluster.stats.print_interval_ms}")
  185 + public void printStats() {
  186 + if (statsEnabled) {
  187 + int toCoreMsgCnt = toCoreMsgs.getAndSet(0);
  188 + int toCoreNfsCnt = toCoreNfs.getAndSet(0);
  189 + int toRuleEngineMsgsCnt = toRuleEngineMsgs.getAndSet(0);
  190 + int toRuleEngineNfsCnt = toRuleEngineNfs.getAndSet(0);
  191 + int toTransportNfsCnt = toTransportNfs.getAndSet(0);
  192 + if (toCoreMsgCnt > 0 || toCoreNfsCnt > 0 || toRuleEngineMsgsCnt > 0 || toRuleEngineNfsCnt > 0 || toTransportNfsCnt > 0) {
  193 + log.info("To TbCore: [{}] messages [{}] notifications; To TbRuleEngine: [{}] messages [{}] notifications; To Transport: [{}] notifications",
  194 + toCoreMsgCnt, toCoreNfsCnt, toRuleEngineMsgsCnt, toRuleEngineNfsCnt, toTransportNfsCnt);
  195 + }
137 196 }
138 197 }
139 198 }
... ...
... ... @@ -183,21 +183,24 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
183 183
184 184 @Override
185 185 protected void handleNotification(UUID id, TbProtoQueueMsg<ToCoreNotificationMsg> msg, TbCallback callback) {
186   - ToCoreNotificationMsg toCoreMsg = msg.getValue();
187   - if (toCoreMsg.hasToLocalSubscriptionServiceMsg()) {
188   - log.trace("[{}] Forwarding message to local subscription service {}", id, toCoreMsg.getToLocalSubscriptionServiceMsg());
189   - forwardToLocalSubMgrService(toCoreMsg.getToLocalSubscriptionServiceMsg(), callback);
190   - } else if (toCoreMsg.hasFromDeviceRpcResponse()) {
191   - log.trace("[{}] Forwarding message to RPC service {}", id, toCoreMsg.getFromDeviceRpcResponse());
192   - forwardToCoreRpcService(toCoreMsg.getFromDeviceRpcResponse(), callback);
193   - } else if (toCoreMsg.getComponentLifecycleMsg() != null && !toCoreMsg.getComponentLifecycleMsg().isEmpty()) {
194   - Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreMsg.getComponentLifecycleMsg().toByteArray());
  186 + ToCoreNotificationMsg toCoreNotification = msg.getValue();
  187 + if (toCoreNotification.hasToLocalSubscriptionServiceMsg()) {
  188 + log.trace("[{}] Forwarding message to local subscription service {}", id, toCoreNotification.getToLocalSubscriptionServiceMsg());
  189 + forwardToLocalSubMgrService(toCoreNotification.getToLocalSubscriptionServiceMsg(), callback);
  190 + } else if (toCoreNotification.hasFromDeviceRpcResponse()) {
  191 + log.trace("[{}] Forwarding message to RPC service {}", id, toCoreNotification.getFromDeviceRpcResponse());
  192 + forwardToCoreRpcService(toCoreNotification.getFromDeviceRpcResponse(), callback);
  193 + } else if (toCoreNotification.getComponentLifecycleMsg() != null && !toCoreNotification.getComponentLifecycleMsg().isEmpty()) {
  194 + Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreNotification.getComponentLifecycleMsg().toByteArray());
195 195 if (actorMsg.isPresent()) {
196 196 log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
197 197 actorContext.tell(actorMsg.get(), ActorRef.noSender());
198 198 }
199 199 callback.onSuccess();
200 200 }
  201 + if (statsEnabled) {
  202 + stats.log(toCoreNotification);
  203 + }
201 204 }
202 205
203 206 private void forwardToCoreRpcService(FromDeviceRPCResponseProto proto, TbCallback callback) {
... ... @@ -246,6 +249,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
246 249 } else {
247 250 throwNotHandled(msg, callback);
248 251 }
  252 + if (statsEnabled) {
  253 + stats.log(msg);
  254 + }
249 255 }
250 256
251 257 private void forwardToStateService(DeviceStateServiceMsgProto deviceStateServiceMsg, TbCallback callback) {
... ...
... ... @@ -20,20 +20,32 @@ import org.thingsboard.server.common.data.id.EntityId;
20 20 import org.thingsboard.server.common.data.id.TenantId;
21 21 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
22 22 import org.thingsboard.server.common.msg.TbMsg;
  23 +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
  24 +import org.thingsboard.server.gen.transport.TransportProtos;
  25 +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
23 26 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
  27 +import org.thingsboard.server.queue.TbQueueCallback;
24 28 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
25 29
  30 +import java.util.UUID;
  31 +
26 32 public interface TbClusterService {
27 33
28   - void onToRuleEngineMsg(TenantId tenantId, EntityId entityId, TbMsg msg);
  34 + void pushMsgToCore(TopicPartitionInfo tpi, UUID msgKey, ToCoreMsg msg, TbQueueCallback callback);
  35 +
  36 + void pushMsgToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, TbQueueCallback callback);
  37 +
  38 + void pushMsgToCore(ToDeviceActorNotificationMsg msg, TbQueueCallback callback);
  39 +
  40 + void pushNotificationToCore(String targetServiceId, FromDeviceRpcResponse response, TbQueueCallback callback);
29 41
30   - void onToCoreMsg(ToDeviceActorNotificationMsg msg);
  42 + void pushMsgToRuleEngine(TopicPartitionInfo tpi, UUID msgId, TransportProtos.ToRuleEngineMsg msg, TbQueueCallback callback);
31 43
32   - void onToCoreMsg(String targetServiceId, FromDeviceRpcResponse response);
  44 + void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg msg, TbQueueCallback callback);
33 45
34   - void onToRuleEngineMsg(String targetServiceId, FromDeviceRpcResponse response);
  46 + void pushNotificationToRuleEngine(String targetServiceId, FromDeviceRpcResponse response, TbQueueCallback callback);
35 47
36   - void onToTransportMsg(String targetServiceId, ToTransportMsg response);
  48 + void pushNotificationToTransport(String targetServiceId, ToTransportMsg response, TbQueueCallback callback);
37 49
38 50 void onEntityStateChange(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state);
39 51
... ...
... ... @@ -32,6 +32,10 @@ public class TbCoreConsumerStats {
32 32 private final AtomicInteger subscriptionInfoCounter = new AtomicInteger(0);
33 33 private final AtomicInteger claimDeviceCounter = new AtomicInteger(0);
34 34
  35 + private final AtomicInteger deviceStateCounter = new AtomicInteger(0);
  36 + private final AtomicInteger subscriptionMsgCounter = new AtomicInteger(0);
  37 + private final AtomicInteger toCoreNotificationsCounter = new AtomicInteger(0);
  38 +
35 39 public void log(TransportProtos.TransportToDeviceActorMsg msg) {
36 40 totalCounter.incrementAndGet();
37 41 if (msg.hasSessionEvent()) {
... ... @@ -57,18 +61,31 @@ public class TbCoreConsumerStats {
57 61 }
58 62 }
59 63
60   - public void log(TransportProtos.DeviceStateServiceMsgProto deviceStateServiceMsg) {
61   - //TODO 2.5
  64 + public void log(TransportProtos.DeviceStateServiceMsgProto msg) {
  65 + totalCounter.incrementAndGet();
  66 + deviceStateCounter.incrementAndGet();
  67 + }
  68 +
  69 + public void log(TransportProtos.SubscriptionMgrMsgProto msg) {
  70 + totalCounter.incrementAndGet();
  71 + subscriptionMsgCounter.incrementAndGet();
  72 + }
  73 +
  74 + public void log(TransportProtos.ToCoreNotificationMsg msg) {
  75 + totalCounter.incrementAndGet();
  76 + toCoreNotificationsCounter.incrementAndGet();
62 77 }
63 78
64 79 public void printStats() {
65 80 int total = totalCounter.getAndSet(0);
66 81 if (total > 0) {
67   - log.info("Transport total [{}] sessionEvents [{}] getAttr [{}] subToAttr [{}] subToRpc [{}] toDevRpc [{}] subInfo [{}] claimDevice [{}]",
  82 + log.info("Transport total [{}] sessionEvents [{}] getAttr [{}] subToAttr [{}] subToRpc [{}] toDevRpc [{}] subInfo [{}] claimDevice [{}]" +
  83 + " deviceState [{}] subMgr [{}] coreNfs [{}]",
68 84 total, sessionEventCounter.getAndSet(0),
69 85 getAttributesCounter.getAndSet(0), subscribeToAttributesCounter.getAndSet(0),
70 86 subscribeToRPCCounter.getAndSet(0), toDeviceRPCCallResponseCounter.getAndSet(0),
71   - subscriptionInfoCounter.getAndSet(0), claimDeviceCounter.getAndSet(0));
  87 + subscriptionInfoCounter.getAndSet(0), claimDeviceCounter.getAndSet(0)
  88 + , deviceStateCounter.getAndSet(0), subscriptionMsgCounter.getAndSet(0), toCoreNotificationsCounter.getAndSet(0));
72 89 }
73 90 }
74 91
... ...
... ... @@ -16,7 +16,6 @@
16 16 package org.thingsboard.server.service.rpc;
17 17
18 18 import akka.actor.ActorRef;
19   -import com.datastax.driver.core.utils.UUIDs;
20 19 import com.fasterxml.jackson.core.JsonProcessingException;
21 20 import com.fasterxml.jackson.databind.ObjectMapper;
22 21 import com.fasterxml.jackson.databind.node.ObjectNode;
... ... @@ -147,7 +146,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
147 146 log.warn("Failed to find tbCoreRpcService for local service. Possible duplication of serviceIds.");
148 147 }
149 148 } else {
150   - clusterService.onToRuleEngineMsg(originServiceId, response);
  149 + clusterService.pushNotificationToRuleEngine(originServiceId, response, null);
151 150 }
152 151 }
153 152
... ... @@ -170,7 +169,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
170 169
171 170 try {
172 171 TbMsg tbMsg = TbMsg.newMsg(DataConstants.RPC_CALL_FROM_SERVER_TO_DEVICE, msg.getDeviceId(), metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode));
173   - clusterService.onToRuleEngineMsg(msg.getTenantId(), msg.getDeviceId(), tbMsg);
  172 + clusterService.pushMsgToRuleEngine(msg.getTenantId(), msg.getDeviceId(), tbMsg, null);
174 173 } catch (JsonProcessingException e) {
175 174 throw new RuntimeException(e);
176 175 }
... ...
... ... @@ -22,7 +22,6 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory;
22 22 import org.thingsboard.rule.engine.api.RpcError;
23 23 import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest;
24 24 import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcResponse;
25   -import org.thingsboard.server.common.data.id.DeviceId;
26 25 import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
27 26 import org.thingsboard.server.common.msg.queue.ServiceType;
28 27 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
... ... @@ -96,7 +95,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
96 95 .setSessionIdLSB(sessionId.getLeastSignificantBits())
97 96 .setToServerResponse(responseMsg)
98 97 .build();
99   - clusterService.onToTransportMsg(serviceId, msg);
  98 + clusterService.pushNotificationToTransport(serviceId, msg, null);
100 99 }
101 100
102 101 @Override
... ... @@ -148,7 +147,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
148 147 }
149 148 } else {
150 149 log.trace("[{}] Forwarding msg {} to queue actor!", msg.getDeviceId(), msg);
151   - clusterService.onToCoreMsg(rpcMsg);
  150 + clusterService.pushMsgToCore(rpcMsg, null);
152 151 }
153 152 }
154 153
... ... @@ -160,7 +159,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
160 159 log.warn("Failed to find tbCoreRpcService for local service. Possible duplication of serviceIds.");
161 160 }
162 161 } else {
163   - clusterService.onToCoreMsg(originServiceId, response);
  162 + clusterService.pushNotificationToCore(originServiceId, response, null);
164 163 }
165 164 }
166 165
... ...
... ... @@ -29,7 +29,6 @@ import org.springframework.beans.factory.annotation.Value;
29 29 import org.springframework.stereotype.Service;
30 30 import org.springframework.util.StringUtils;
31 31 import org.thingsboard.common.util.ThingsBoardThreadFactory;
32   -import org.thingsboard.server.queue.common.TbProtoQueueMsg;
33 32 import org.thingsboard.server.common.data.DataConstants;
34 33 import org.thingsboard.server.common.data.Device;
35 34 import org.thingsboard.server.common.data.Tenant;
... ... @@ -56,8 +55,8 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
56 55 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
57 56 import org.thingsboard.server.gen.transport.TransportProtos;
58 57 import org.thingsboard.server.common.msg.queue.TbCallback;
59   -import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
60 58 import org.thingsboard.server.queue.util.TbCoreComponent;
  59 +import org.thingsboard.server.service.queue.TbClusterService;
61 60 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
62 61
63 62 import javax.annotation.Nullable;
... ... @@ -106,7 +105,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
106 105 private final DeviceService deviceService;
107 106 private final AttributesService attributesService;
108 107 private final TimeseriesService tsService;
109   - private final TbQueueProducerProvider producerProvider;
  108 + private final TbClusterService clusterService;
110 109 private final PartitionService partitionService;
111 110
112 111 private TelemetrySubscriptionService tsSubService;
... ... @@ -137,12 +136,12 @@ public class DefaultDeviceStateService implements DeviceStateService {
137 136
138 137 public DefaultDeviceStateService(TenantService tenantService, DeviceService deviceService,
139 138 AttributesService attributesService, TimeseriesService tsService,
140   - TbQueueProducerProvider producerProvider, PartitionService partitionService) {
  139 + TbClusterService clusterService, PartitionService partitionService) {
141 140 this.tenantService = tenantService;
142 141 this.deviceService = deviceService;
143 142 this.attributesService = attributesService;
144 143 this.tsService = tsService;
145   - this.producerProvider = producerProvider;
  144 + this.clusterService = clusterService;
146 145 this.partitionService = partitionService;
147 146 }
148 147
... ... @@ -413,8 +412,6 @@ public class DefaultDeviceStateService implements DeviceStateService {
413 412 }
414 413
415 414 private void sendDeviceEvent(TenantId tenantId, DeviceId deviceId, boolean added, boolean updated, boolean deleted) {
416   - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
417   - log.trace("[{}][{}] Device is monitored on partition: {}", tenantId, deviceId, tpi);
418 415 TransportProtos.DeviceStateServiceMsgProto.Builder builder = TransportProtos.DeviceStateServiceMsgProto.newBuilder();
419 416 builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
420 417 builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
... ... @@ -424,8 +421,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
424 421 builder.setUpdated(updated);
425 422 builder.setDeleted(deleted);
426 423 TransportProtos.DeviceStateServiceMsgProto msg = builder.build();
427   - producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(deviceId.getId(),
428   - TransportProtos.ToCoreMsg.newBuilder().setDeviceStateServiceMsg(msg).build()), null);
  424 + clusterService.pushMsgToCore(tenantId, deviceId, TransportProtos.ToCoreMsg.newBuilder().setDeviceStateServiceMsg(msg).build(), null);
429 425 }
430 426
431 427 private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
... ... @@ -497,12 +493,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
497 493 try {
498 494 TbMsg tbMsg = TbMsg.newMsg(msgType, stateData.getDeviceId(), stateData.getMetaData().copy(), TbMsgDataType.JSON
499 495 , json.writeValueAsString(state));
500   - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, stateData.getTenantId(), stateData.getDeviceId());
501   - TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
502   - .setTenantIdMSB(stateData.getTenantId().getId().getMostSignificantBits())
503   - .setTenantIdLSB(stateData.getTenantId().getId().getLeastSignificantBits())
504   - .setTbMsg(TbMsg.toByteString(tbMsg)).build();
505   - producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), null);
  496 + clusterService.pushMsgToRuleEngine(stateData.getTenantId(), stateData.getDeviceId(), tbMsg, null);
506 497 } catch (Exception e) {
507 498 log.warn("[{}] Failed to push inactivity alarm: {}", stateData.getDeviceId(), state, e);
508 499 }
... ...
... ... @@ -245,8 +245,9 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
245 245 }
246 246 }
247 247 } else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope)) {
248   - clusterService.onToCoreMsg(DeviceAttributesEventNotificationMsg.onUpdate(tenantId,
249   - new DeviceId(entityId.getId()), DataConstants.SHARED_SCOPE, new ArrayList<>(attributes)));
  248 + clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onUpdate(tenantId,
  249 + new DeviceId(entityId.getId()), DataConstants.SHARED_SCOPE, new ArrayList<>(attributes))
  250 + , null);
250 251 }
251 252 }
252 253 callback.onSuccess();
... ...
... ... @@ -28,16 +28,14 @@ import org.thingsboard.server.common.data.id.EntityViewId;
28 28 import org.thingsboard.server.common.data.id.TenantId;
29 29 import org.thingsboard.server.dao.entityview.EntityViewService;
30 30 import org.thingsboard.server.gen.transport.TransportProtos;
31   -import org.thingsboard.server.queue.TbQueueProducer;
32   -import org.thingsboard.server.queue.common.TbProtoQueueMsg;
33 31 import org.thingsboard.server.queue.discovery.ClusterTopologyChangeEvent;
34 32 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
35 33 import org.thingsboard.server.queue.discovery.PartitionService;
36 34 import org.thingsboard.server.common.msg.queue.ServiceType;
37 35 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
38 36 import org.thingsboard.server.common.msg.queue.TbCallback;
39   -import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
40 37 import org.thingsboard.server.queue.util.TbCoreComponent;
  38 +import org.thingsboard.server.service.queue.TbClusterService;
41 39 import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
42 40 import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate;
43 41
... ... @@ -70,19 +68,17 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
70 68 private PartitionService partitionService;
71 69
72 70 @Autowired
73   - private TbQueueProducerProvider producerProvider;
  71 + private TbClusterService clusterService;
74 72
75 73 @Autowired
76 74 @Lazy
77 75 private SubscriptionManagerService subscriptionManagerService;
78 76
79 77 private ExecutorService wsCallBackExecutor;
80   - private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> toCoreProducer;
81 78
82 79 @PostConstruct
83 80 public void initExecutor() {
84 81 wsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ws-sub-callback"));
85   - toCoreProducer = producerProvider.getTbCoreMsgProducer();
86 82 }
87 83
88 84 @PreDestroy
... ... @@ -140,7 +136,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
140 136 } else {
141 137 // Push to the queue;
142 138 TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toNewSubscriptionProto(subscription);
143   - toCoreProducer.send(tpi, new TbProtoQueueMsg<>(subscription.getEntityId().getId(), toCoreMsg), null);
  139 + clusterService.pushMsgToCore(tpi, subscription.getEntityId().getId(), toCoreMsg, null);
144 140 }
145 141 }
146 142
... ... @@ -181,7 +177,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
181 177 } else {
182 178 // Push to the queue;
183 179 TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toCloseSubscriptionProto(subscription);
184   - toCoreProducer.send(tpi, new TbProtoQueueMsg<>(subscription.getEntityId().getId(), toCoreMsg), null);
  180 + clusterService.pushMsgToCore(tpi, subscription.getEntityId().getId(), toCoreMsg, null);
185 181 }
186 182 } else {
187 183 log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId);
... ...
... ... @@ -32,17 +32,15 @@ import org.thingsboard.server.common.data.kv.DoubleDataEntry;
32 32 import org.thingsboard.server.common.data.kv.LongDataEntry;
33 33 import org.thingsboard.server.common.data.kv.StringDataEntry;
34 34 import org.thingsboard.server.common.data.kv.TsKvEntry;
  35 +import org.thingsboard.server.common.msg.queue.ServiceType;
  36 +import org.thingsboard.server.common.msg.queue.TbCallback;
  37 +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
35 38 import org.thingsboard.server.dao.attributes.AttributesService;
36 39 import org.thingsboard.server.dao.timeseries.TimeseriesService;
37 40 import org.thingsboard.server.gen.transport.TransportProtos;
38   -import org.thingsboard.server.queue.TbQueueProducer;
39   -import org.thingsboard.server.queue.common.TbProtoQueueMsg;
40 41 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
41 42 import org.thingsboard.server.queue.discovery.PartitionService;
42   -import org.thingsboard.server.common.msg.queue.ServiceType;
43   -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
44   -import org.thingsboard.server.common.msg.queue.TbCallback;
45   -import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
  43 +import org.thingsboard.server.service.queue.TbClusterService;
46 44 import org.thingsboard.server.service.subscription.SubscriptionManagerService;
47 45 import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
48 46
... ... @@ -69,22 +67,20 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
69 67
70 68 private final AttributesService attrService;
71 69 private final TimeseriesService tsService;
72   - private final TbQueueProducerProvider producerProvider;
  70 + private final TbClusterService clusterService;
73 71 private final PartitionService partitionService;
74 72 private Optional<SubscriptionManagerService> subscriptionManagerService;
75 73
76   - private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> toCoreProducer;
77   -
78 74 private ExecutorService tsCallBackExecutor;
79 75 private ExecutorService wsCallBackExecutor;
80 76
81 77 public DefaultTelemetrySubscriptionService(AttributesService attrService,
82 78 TimeseriesService tsService,
83   - TbQueueProducerProvider producerProvider,
  79 + TbClusterService clusterService,
84 80 PartitionService partitionService) {
85 81 this.attrService = attrService;
86 82 this.tsService = tsService;
87   - this.producerProvider = producerProvider;
  83 + this.clusterService = clusterService;
88 84 this.partitionService = partitionService;
89 85 }
90 86
... ... @@ -97,7 +93,6 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
97 93 public void initExecutor() {
98 94 tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-service-ts-callback"));
99 95 wsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-service-ws-callback"));
100   - toCoreProducer = producerProvider.getTbCoreMsgProducer();
101 96 }
102 97
103 98 @PreDestroy
... ... @@ -172,7 +167,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
172 167 }
173 168 } else {
174 169 TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toAttributesUpdateProto(tenantId, entityId, scope, attributes);
175   - toCoreProducer.send(tpi, new TbProtoQueueMsg<>(entityId.getId(), toCoreMsg), null);
  170 + clusterService.pushMsgToCore(tpi, entityId.getId(), toCoreMsg, null);
176 171 }
177 172 }
178 173
... ... @@ -186,7 +181,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
186 181 }
187 182 } else {
188 183 TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toTimeseriesUpdateProto(tenantId, entityId, ts);
189   - toCoreProducer.send(tpi, new TbProtoQueueMsg<>(entityId.getId(), toCoreMsg), null);
  184 + clusterService.pushMsgToCore(tpi, entityId.getId(), toCoreMsg, null);
190 185 }
191 186 }
192 187
... ...
... ... @@ -23,13 +23,11 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
23 23
24 24 import javax.annotation.PostConstruct;
25 25
26   -//TODO 2.5 Maybe remove this service if it is not used.
27 26 @Service
28 27 @ConditionalOnExpression("'${service.type:null}'=='tb-transport'")
29 28 public class TbTransportQueueProducerProvider implements TbQueueProducerProvider {
30 29
31 30 private final TbTransportQueueFactory tbQueueProvider;
32   - private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> toTransport;
33 31 private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> toRuleEngine;
34 32 private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> toTbCore;
35 33
... ...
... ... @@ -211,7 +211,7 @@ public interface TbContext {
211 211
212 212 ResultSetFuture submitCassandraTask(CassandraStatementTask task);
213 213
214   - //TODO 2.5: - need to remove this.
  214 + @Deprecated
215 215 RedisTemplate<String, Object> getRedisTemplate();
216 216
217 217 }
... ...
... ... @@ -75,7 +75,6 @@ public class TbMsgCountNode implements TbNode {
75 75 TbMsgMetaData metaData = new TbMsgMetaData();
76 76 metaData.putValue("delta", Long.toString(System.currentTimeMillis() - lastScheduledTs + delay));
77 77
78   - //TODO 2.5: Callback?
79 78 TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), ctx.getTenantId(), metaData, gson.toJson(telemetryJson));
80 79 ctx.enqueueForTellNext(tbMsg, SUCCESS);
81 80 scheduleTickMsg(ctx);
... ...
... ... @@ -23,7 +23,6 @@ import lombok.extern.slf4j.Slf4j;
23 23 import org.springframework.http.HttpEntity;
24 24 import org.springframework.http.HttpHeaders;
25 25 import org.springframework.http.HttpMethod;
26   -import org.springframework.http.HttpStatus;
27 26 import org.springframework.http.ResponseEntity;
28 27 import org.springframework.http.client.Netty4ClientHttpRequestFactory;
29 28 import org.springframework.util.concurrent.ListenableFuture;
... ... @@ -84,7 +83,7 @@ class TbHttpClient {
84 83 }
85 84 }
86 85
87   - void processMessage(TbContext ctx, TbMsg msg, TbRedisQueueProcessor queueProcessor) {
  86 + void processMessage(TbContext ctx, TbMsg msg) {
88 87 String endpointUrl = TbNodeUtils.processPattern(config.getRestEndpointUrlPattern(), msg.getMetaData());
89 88 HttpHeaders headers = prepareHeaders(msg.getMetaData());
90 89 HttpMethod method = HttpMethod.valueOf(config.getRequestMethod());
... ... @@ -95,13 +94,6 @@ class TbHttpClient {
95 94 future.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
96 95 @Override
97 96 public void onFailure(Throwable throwable) {
98   - if (config.isUseRedisQueueForMsgPersistence()) {
99   - if (throwable instanceof HttpClientErrorException) {
100   - processHttpClientError(((HttpClientErrorException) throwable).getStatusCode(), msg, queueProcessor);
101   - } else {
102   - queueProcessor.pushOnFailure(msg);
103   - }
104   - }
105 97 TbMsg next = processException(ctx, msg, throwable);
106 98 ctx.tellFailure(next, throwable);
107 99 }
... ... @@ -109,15 +101,9 @@ class TbHttpClient {
109 101 @Override
110 102 public void onSuccess(ResponseEntity<String> responseEntity) {
111 103 if (responseEntity.getStatusCode().is2xxSuccessful()) {
112   - if (config.isUseRedisQueueForMsgPersistence()) {
113   - queueProcessor.resetCounter();
114   - }
115 104 TbMsg next = processResponse(ctx, msg, responseEntity);
116 105 ctx.tellSuccess(next);
117 106 } else {
118   - if (config.isUseRedisQueueForMsgPersistence()) {
119   - processHttpClientError(responseEntity.getStatusCode(), msg, queueProcessor);
120   - }
121 107 TbMsg next = processFailureResponse(ctx, msg, responseEntity);
122 108 ctx.tellNext(next, TbRelationTypes.FAILURE);
123 109 }
... ... @@ -183,11 +169,4 @@ class TbHttpClient {
183 169 }
184 170 }
185 171
186   - private void processHttpClientError(HttpStatus statusCode, TbMsg msg, TbRedisQueueProcessor queueProcessor) {
187   - if (statusCode.is4xxClientError()) {
188   - log.warn("[{}] Client error during message delivering!", msg);
189   - } else {
190   - queueProcessor.pushOnFailure(msg);
191   - }
192   - }
193 172 }
... ...
1   -/**
2   - * Copyright © 2016-2020 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.rule.engine.rest;
17   -
18   -import lombok.Data;
19   -import lombok.extern.slf4j.Slf4j;
20   -import org.springframework.data.redis.core.ListOperations;
21   -import org.thingsboard.rule.engine.api.TbContext;
22   -import org.thingsboard.server.common.msg.TbMsg;
23   -
24   -import java.util.List;
25   -import java.util.concurrent.ExecutorService;
26   -import java.util.concurrent.Executors;
27   -import java.util.concurrent.Future;
28   -import java.util.concurrent.TimeUnit;
29   -import java.util.concurrent.atomic.AtomicInteger;
30   -
31   -@Data
32   -@Slf4j
33   -class TbRedisQueueProcessor {
34   -
35   - private static final int MAX_QUEUE_SIZE = Integer.MAX_VALUE;
36   -
37   - private final TbContext ctx;
38   - private final TbHttpClient httpClient;
39   - private final ExecutorService executor;
40   - private final ListOperations<String, Object> listOperations;
41   - private final String redisKey;
42   - private final boolean trimQueue;
43   - private final int maxQueueSize;
44   -
45   - private AtomicInteger failuresCounter;
46   - private Future future;
47   -
48   - TbRedisQueueProcessor(TbContext ctx, TbHttpClient httpClient, boolean trimQueue, int maxQueueSize) {
49   - this.ctx = ctx;
50   - this.httpClient = httpClient;
51   - this.executor = Executors.newSingleThreadExecutor();
52   - this.listOperations = ctx.getRedisTemplate().opsForList();
53   - this.redisKey = constructRedisKey();
54   - this.trimQueue = trimQueue;
55   - this.maxQueueSize = maxQueueSize;
56   - init();
57   - }
58   -
59   - private void init() {
60   - failuresCounter = new AtomicInteger(0);
61   - future = executor.submit(() -> {
62   - while (true) {
63   - if (failuresCounter.get() != 0 && failuresCounter.get() % 50 == 0) {
64   - sleep("Target HTTP server is down...", 3);
65   - }
66   - if (listOperations.size(redisKey) > 0) {
67   - List<Object> list = listOperations.range(redisKey, -10, -1);
68   - list.forEach(obj -> {
69   - //TODO 2.5: Callback?
70   - TbMsg msg = TbMsg.fromBytes((byte[]) obj, null);
71   - log.debug("Trying to send the message: {}", msg);
72   - listOperations.remove(redisKey, -1, obj);
73   - httpClient.processMessage(ctx, msg, this);
74   - });
75   - } else {
76   - sleep("Queue is empty, waiting for tasks!", 1);
77   - }
78   - }
79   - });
80   - }
81   -
82   - void destroy() {
83   - if (future != null) {
84   - future.cancel(true);
85   - }
86   - if (executor != null) {
87   - executor.shutdownNow();
88   - }
89   - }
90   -
91   - void push(TbMsg msg) {
92   - listOperations.leftPush(redisKey, TbMsg.toByteArray(msg));
93   - if (trimQueue) {
94   - listOperations.trim(redisKey, 0, validateMaxQueueSize());
95   - }
96   - }
97   -
98   - void pushOnFailure(TbMsg msg) {
99   - listOperations.rightPush(redisKey, TbMsg.toByteArray(msg));
100   - failuresCounter.incrementAndGet();
101   - }
102   -
103   - void resetCounter() {
104   - failuresCounter.set(0);
105   - }
106   -
107   - private String constructRedisKey() {
108   - return ctx.getServiceId() + ctx.getSelfId();
109   - }
110   -
111   - private int validateMaxQueueSize() {
112   - if (maxQueueSize != 0) {
113   - return maxQueueSize;
114   - }
115   - return MAX_QUEUE_SIZE;
116   - }
117   -
118   - private void sleep(String logMessage, int sleepSeconds) {
119   - try {
120   - log.debug(logMessage);
121   - TimeUnit.SECONDS.sleep(sleepSeconds);
122   - } catch (InterruptedException e) {
123   - throw new IllegalStateException("Thread interrupted!", e);
124   - }
125   - }
126   -}
... ... @@ -25,8 +25,6 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils;
25 25 import org.thingsboard.server.common.data.plugin.ComponentType;
26 26 import org.thingsboard.server.common.msg.TbMsg;
27 27
28   -import java.util.concurrent.ExecutionException;
29   -
30 28 @Slf4j
31 29 @RuleNode(
32 30 type = ComponentType.EXTERNAL,
... ... @@ -47,7 +45,6 @@ public class TbRestApiCallNode implements TbNode {
47 45
48 46 private boolean useRedisQueueForMsgPersistence;
49 47 private TbHttpClient httpClient;
50   - private TbRedisQueueProcessor queueProcessor;
51 48
52 49 @Override
53 50 public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
... ... @@ -55,20 +52,13 @@ public class TbRestApiCallNode implements TbNode {
55 52 httpClient = new TbHttpClient(config);
56 53 useRedisQueueForMsgPersistence = config.isUseRedisQueueForMsgPersistence();
57 54 if (useRedisQueueForMsgPersistence) {
58   - if (ctx.getRedisTemplate() == null) {
59   - throw new RuntimeException("Redis cache type must be used!");
60   - }
61   - queueProcessor = new TbRedisQueueProcessor(ctx, httpClient, config.isTrimQueue(), config.getMaxQueueSize());
  55 + log.warn("[{}][{}] Usage of Redis Template is deprecated starting 2.5 and will have no affect", ctx.getTenantId(), ctx.getSelfId());
62 56 }
63 57 }
64 58
65 59 @Override
66   - public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
67   - if (useRedisQueueForMsgPersistence) {
68   - queueProcessor.push(msg);
69   - } else {
70   - httpClient.processMessage(ctx, msg, null);
71   - }
  60 + public void onMsg(TbContext ctx, TbMsg msg) {
  61 + httpClient.processMessage(ctx, msg);
72 62 }
73 63
74 64 @Override
... ... @@ -76,9 +66,6 @@ public class TbRestApiCallNode implements TbNode {
76 66 if (this.httpClient != null) {
77 67 this.httpClient.destroy();
78 68 }
79   - if (this.queueProcessor != null) {
80   - this.queueProcessor.destroy();
81   - }
82 69 }
83 70
84 71 }
... ...