Commit 2fc39132cdc218461d20abaea2b74d3a1a8b215f

Authored by Andrii Shvaika
2 parents 2b6797e9 5b4cfbb3

Merge with develop/2.5.3

Showing 50 changed files with 989 additions and 211 deletions
... ... @@ -98,6 +98,10 @@
98 98 <artifactId>queue</artifactId>
99 99 </dependency>
100 100 <dependency>
  101 + <groupId>org.thingsboard.common</groupId>
  102 + <artifactId>stats</artifactId>
  103 + </dependency>
  104 + <dependency>
101 105 <groupId>org.thingsboard</groupId>
102 106 <artifactId>dao</artifactId>
103 107 <type>test-jar</type>
... ... @@ -309,18 +313,6 @@
309 313 <artifactId>Java-WebSocket</artifactId>
310 314 <scope>test</scope>
311 315 </dependency>
312   - <dependency>
313   - <groupId>org.springframework.boot</groupId>
314   - <artifactId>spring-boot-starter-actuator</artifactId>
315   - </dependency>
316   - <dependency>
317   - <groupId>io.micrometer</groupId>
318   - <artifactId>micrometer-core</artifactId>
319   - </dependency>
320   - <dependency>
321   - <groupId>io.micrometer</groupId>
322   - <artifactId>micrometer-registry-prometheus</artifactId>
323   - </dependency>
324 316 </dependencies>
325 317
326 318 <build>
... ...
  1 +-----BEGIN CERTIFICATE-----
  2 +MIIDdzCCAl+gAwIBAgIEAgAAuTANBgkqhkiG9w0BAQUFADBaMQswCQYDVQQGEwJJ
  3 +RTESMBAGA1UEChMJQmFsdGltb3JlMRMwEQYDVQQLEwpDeWJlclRydXN0MSIwIAYD
  4 +VQQDExlCYWx0aW1vcmUgQ3liZXJUcnVzdCBSb290MB4XDTAwMDUxMjE4NDYwMFoX
  5 +DTI1MDUxMjIzNTkwMFowWjELMAkGA1UEBhMCSUUxEjAQBgNVBAoTCUJhbHRpbW9y
  6 +ZTETMBEGA1UECxMKQ3liZXJUcnVzdDEiMCAGA1UEAxMZQmFsdGltb3JlIEN5YmVy
  7 +VHJ1c3QgUm9vdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKMEuyKr
  8 +mD1X6CZymrV51Cni4eiVgLGw41uOKymaZN+hXe2wCQVt2yguzmKiYv60iNoS6zjr
  9 +IZ3AQSsBUnuId9Mcj8e6uYi1agnnc+gRQKfRzMpijS3ljwumUNKoUMMo6vWrJYeK
  10 +mpYcqWe4PwzV9/lSEy/CG9VwcPCPwBLKBsua4dnKM3p31vjsufFoREJIE9LAwqSu
  11 +XmD+tqYF/LTdB1kC1FkYmGP1pWPgkAx9XbIGevOF6uvUA65ehD5f/xXtabz5OTZy
  12 +dc93Uk3zyZAsuT3lySNTPx8kmCFcB5kpvcY67Oduhjprl3RjM71oGDHweI12v/ye
  13 +jl0qhqdNkNwnGjkCAwEAAaNFMEMwHQYDVR0OBBYEFOWdWTCCR1jMrPoIVDaGezq1
  14 +BE3wMBIGA1UdEwEB/wQIMAYBAf8CAQMwDgYDVR0PAQH/BAQDAgEGMA0GCSqGSIb3
  15 +DQEBBQUAA4IBAQCFDF2O5G9RaEIFoN27TyclhAO992T9Ldcw46QQF+vaKSm2eT92
  16 +9hkTI7gQCvlYpNRhcL0EYWoSihfVCr3FvDB81ukMJY2GQE/szKN+OMY3EU/t3Wgx
  17 +jkzSswF07r51XgdIGn9w/xZchMB5hbgF/X++ZRGjD8ACtPhSNzkE1akxehi/oCr0
  18 +Epn3o0WC4zxe9Z2etciefC7IpJ5OCBRLbf1wbWsaY71k5h+3zvDyny67G7fyUIhz
  19 +ksLi4xaNmjICq44Y3ekQEe5+NauQrz4wlHrQMz2nZQ/1/I6eYs9HRCwBXbsdtTLS
  20 +R9I4LtD+gdwyah617jzV/OeBHRnDJELqYzmp
  21 +-----END CERTIFICATE-----
  22 +
... ...
... ... @@ -29,7 +29,8 @@ import java.util.Arrays;
29 29 @ComponentScan({"org.thingsboard.server.install",
30 30 "org.thingsboard.server.service.component",
31 31 "org.thingsboard.server.service.install",
32   - "org.thingsboard.server.dao"})
  32 + "org.thingsboard.server.dao",
  33 + "org.thingsboard.server.common.stats"})
33 34 public class ThingsboardInstallApplication {
34 35
35 36 private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";
... ...
... ... @@ -131,6 +131,9 @@ class DefaultTbContext implements TbContext {
131 131 .setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
132 132 .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
133 133 .setTbMsg(TbMsg.toByteString(tbMsg)).build();
  134 + if (nodeCtx.getSelf().isDebugMode()) {
  135 + mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "To Root Rule Chain");
  136 + }
134 137 mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback(onSuccess, onFailure));
135 138 }
136 139
... ... @@ -176,10 +179,10 @@ class DefaultTbContext implements TbContext {
176 179 enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure);
177 180 }
178 181
179   - private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg tbMsg, Set<String> relationTypes, String failureMessage, Runnable onSuccess, Consumer<Throwable> onFailure) {
  182 + private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg source, Set<String> relationTypes, String failureMessage, Runnable onSuccess, Consumer<Throwable> onFailure) {
180 183 RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId();
181 184 RuleNodeId ruleNodeId = nodeCtx.getSelf().getId();
182   - tbMsg = TbMsg.newMsg(tbMsg, ruleChainId, ruleNodeId);
  185 + TbMsg tbMsg = TbMsg.newMsg(source, ruleChainId, ruleNodeId);
183 186 TransportProtos.ToRuleEngineMsg.Builder msg = TransportProtos.ToRuleEngineMsg.newBuilder()
184 187 .setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
185 188 .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
... ... @@ -188,6 +191,10 @@ class DefaultTbContext implements TbContext {
188 191 if (failureMessage != null) {
189 192 msg.setFailureMessage(failureMessage);
190 193 }
  194 + if (nodeCtx.getSelf().isDebugMode()) {
  195 + relationTypes.forEach(relationType ->
  196 + mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, relationType));
  197 + }
191 198 mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg.build(), new SimpleTbQueueCallback(onSuccess, onFailure));
192 199 }
193 200
... ...
... ... @@ -62,6 +62,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
62 62 @Override
63 63 public void onUpdate(TbActorCtx context) throws Exception {
64 64 RuleNode newRuleNode = systemContext.getRuleChainService().findRuleNodeById(tenantId, entityId);
  65 + this.info = new RuleNodeInfo(entityId, ruleChainName, newRuleNode != null ? newRuleNode.getName() : "Unknown");
65 66 boolean restartRequired = state != ComponentLifecycleState.ACTIVE ||
66 67 !(ruleNode.getType().equals(newRuleNode.getType()) && ruleNode.getConfiguration().equals(newRuleNode.getConfiguration()));
67 68 this.ruleNode = newRuleNode;
... ...
... ... @@ -46,6 +46,7 @@ import org.thingsboard.server.queue.TbQueueConsumer;
46 46 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
47 47 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
48 48 import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
  49 +import org.thingsboard.server.common.stats.StatsFactory;
49 50 import org.thingsboard.server.queue.util.TbCoreComponent;
50 51 import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
51 52 import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
... ... @@ -53,7 +54,6 @@ import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
53 54 import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
54 55 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
55 56 import org.thingsboard.server.service.state.DeviceStateService;
56   -import org.thingsboard.server.service.stats.StatsCounterFactory;
57 57 import org.thingsboard.server.service.subscription.SubscriptionManagerService;
58 58 import org.thingsboard.server.service.subscription.TbLocalSubscriptionService;
59 59 import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
... ... @@ -94,14 +94,14 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
94 94 public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory, ActorSystemContext actorContext,
95 95 DeviceStateService stateService, TbLocalSubscriptionService localSubscriptionService,
96 96 SubscriptionManagerService subscriptionManagerService, DataDecodingEncodingService encodingService,
97   - TbCoreDeviceRpcService tbCoreDeviceRpcService, StatsCounterFactory counterFactory) {
  97 + TbCoreDeviceRpcService tbCoreDeviceRpcService, StatsFactory statsFactory) {
98 98 super(actorContext, encodingService, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer());
99 99 this.mainConsumer = tbCoreQueueFactory.createToCoreMsgConsumer();
100 100 this.stateService = stateService;
101 101 this.localSubscriptionService = localSubscriptionService;
102 102 this.subscriptionManagerService = subscriptionManagerService;
103 103 this.tbCoreDeviceRpcService = tbCoreDeviceRpcService;
104   - this.stats = new TbCoreConsumerStats(counterFactory);
  104 + this.stats = new TbCoreConsumerStats(statsFactory);
105 105 }
106 106
107 107 @PostConstruct
... ...
... ... @@ -42,6 +42,7 @@ import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
42 42 import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory;
43 43 import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
44 44 import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
  45 +import org.thingsboard.server.common.stats.StatsFactory;
45 46 import org.thingsboard.server.queue.util.TbRuleEngineComponent;
46 47 import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
47 48 import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
... ... @@ -54,7 +55,6 @@ import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrateg
54 55 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
55 56 import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
56 57 import org.thingsboard.server.service.stats.RuleEngineStatisticsService;
57   -import org.thingsboard.server.service.stats.StatsCounterFactory;
58 58
59 59 import javax.annotation.PostConstruct;
60 60 import javax.annotation.PreDestroy;
... ... @@ -83,7 +83,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
83 83 @Value("${queue.rule-engine.stats.enabled:true}")
84 84 private boolean statsEnabled;
85 85
86   - private final StatsCounterFactory counterFactory;
  86 + private final StatsFactory statsFactory;
87 87 private final TbRuleEngineSubmitStrategyFactory submitStrategyFactory;
88 88 private final TbRuleEngineProcessingStrategyFactory processingStrategyFactory;
89 89 private final TbRuleEngineQueueFactory tbRuleEngineQueueFactory;
... ... @@ -101,7 +101,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
101 101 TbRuleEngineQueueFactory tbRuleEngineQueueFactory, RuleEngineStatisticsService statisticsService,
102 102 ActorSystemContext actorContext, DataDecodingEncodingService encodingService,
103 103 TbRuleEngineDeviceRpcService tbDeviceRpcService,
104   - StatsCounterFactory counterFactory) {
  104 + StatsFactory statsFactory) {
105 105 super(actorContext, encodingService, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer());
106 106 this.statisticsService = statisticsService;
107 107 this.ruleEngineSettings = ruleEngineSettings;
... ... @@ -109,7 +109,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
109 109 this.submitStrategyFactory = submitStrategyFactory;
110 110 this.processingStrategyFactory = processingStrategyFactory;
111 111 this.tbDeviceRpcService = tbDeviceRpcService;
112   - this.counterFactory = counterFactory;
  112 + this.statsFactory = statsFactory;
113 113 }
114 114
115 115 @PostConstruct
... ... @@ -118,7 +118,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
118 118 for (TbRuleEngineQueueConfiguration configuration : ruleEngineSettings.getQueues()) {
119 119 consumerConfigurations.putIfAbsent(configuration.getName(), configuration);
120 120 consumers.computeIfAbsent(configuration.getName(), queueName -> tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration));
121   - consumerStats.put(configuration.getName(), new TbRuleEngineConsumerStats(configuration.getName(), counterFactory));
  121 + consumerStats.put(configuration.getName(), new TbRuleEngineConsumerStats(configuration.getName(), statsFactory));
122 122 }
123 123 submitExecutor = Executors.newSingleThreadExecutor();
124 124 }
... ...
... ... @@ -17,12 +17,11 @@ package org.thingsboard.server.service.queue;
17 17
18 18 import lombok.extern.slf4j.Slf4j;
19 19 import org.thingsboard.server.gen.transport.TransportProtos;
20   -import org.thingsboard.server.service.stats.StatsCounter;
21   -import org.thingsboard.server.service.stats.StatsCounterFactory;
22   -import org.thingsboard.server.service.stats.StatsType;
  20 +import org.thingsboard.server.common.stats.StatsCounter;
  21 +import org.thingsboard.server.common.stats.StatsFactory;
  22 +import org.thingsboard.server.common.stats.StatsType;
23 23
24 24 import java.util.*;
25   -import java.util.concurrent.atomic.AtomicInteger;
26 25
27 26 @Slf4j
28 27 public class TbCoreConsumerStats {
... ... @@ -53,20 +52,20 @@ public class TbCoreConsumerStats {
53 52
54 53 private final List<StatsCounter> counters = new ArrayList<>();
55 54
56   - public TbCoreConsumerStats(StatsCounterFactory counterFactory) {
  55 + public TbCoreConsumerStats(StatsFactory statsFactory) {
57 56 String statsKey = StatsType.CORE.getName();
58 57
59   - this.totalCounter = counterFactory.createStatsCounter(statsKey, TOTAL_MSGS);
60   - this.sessionEventCounter = counterFactory.createStatsCounter(statsKey, SESSION_EVENTS);
61   - this.getAttributesCounter = counterFactory.createStatsCounter(statsKey, GET_ATTRIBUTE);
62   - this.subscribeToAttributesCounter = counterFactory.createStatsCounter(statsKey, ATTRIBUTE_SUBSCRIBES);
63   - this.subscribeToRPCCounter = counterFactory.createStatsCounter(statsKey, RPC_SUBSCRIBES);
64   - this.toDeviceRPCCallResponseCounter = counterFactory.createStatsCounter(statsKey, TO_DEVICE_RPC_CALL_RESPONSES);
65   - this.subscriptionInfoCounter = counterFactory.createStatsCounter(statsKey, SUBSCRIPTION_INFO);
66   - this.claimDeviceCounter = counterFactory.createStatsCounter(statsKey, DEVICE_CLAIMS);
67   - this.deviceStateCounter = counterFactory.createStatsCounter(statsKey, DEVICE_STATES);
68   - this.subscriptionMsgCounter = counterFactory.createStatsCounter(statsKey, SUBSCRIPTION_MSGS);
69   - this.toCoreNotificationsCounter = counterFactory.createStatsCounter(statsKey, TO_CORE_NOTIFICATIONS);
  58 + this.totalCounter = statsFactory.createStatsCounter(statsKey, TOTAL_MSGS);
  59 + this.sessionEventCounter = statsFactory.createStatsCounter(statsKey, SESSION_EVENTS);
  60 + this.getAttributesCounter = statsFactory.createStatsCounter(statsKey, GET_ATTRIBUTE);
  61 + this.subscribeToAttributesCounter = statsFactory.createStatsCounter(statsKey, ATTRIBUTE_SUBSCRIBES);
  62 + this.subscribeToRPCCounter = statsFactory.createStatsCounter(statsKey, RPC_SUBSCRIBES);
  63 + this.toDeviceRPCCallResponseCounter = statsFactory.createStatsCounter(statsKey, TO_DEVICE_RPC_CALL_RESPONSES);
  64 + this.subscriptionInfoCounter = statsFactory.createStatsCounter(statsKey, SUBSCRIPTION_INFO);
  65 + this.claimDeviceCounter = statsFactory.createStatsCounter(statsKey, DEVICE_CLAIMS);
  66 + this.deviceStateCounter = statsFactory.createStatsCounter(statsKey, DEVICE_STATES);
  67 + this.subscriptionMsgCounter = statsFactory.createStatsCounter(statsKey, SUBSCRIPTION_MSGS);
  68 + this.toCoreNotificationsCounter = statsFactory.createStatsCounter(statsKey, TO_CORE_NOTIFICATIONS);
70 69
71 70
72 71 counters.add(totalCounter);
... ...
... ... @@ -15,21 +15,19 @@
15 15 */
16 16 package org.thingsboard.server.service.queue;
17 17
18   -import lombok.Data;
19 18 import lombok.extern.slf4j.Slf4j;
20 19 import org.thingsboard.server.common.data.id.TenantId;
21 20 import org.thingsboard.server.common.msg.queue.RuleEngineException;
22 21 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
23 22 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  23 +import org.thingsboard.server.common.stats.StatsFactory;
24 24 import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult;
25   -import org.thingsboard.server.service.stats.StatsCounter;
26   -import org.thingsboard.server.service.stats.StatsCounterFactory;
27   -import org.thingsboard.server.service.stats.StatsType;
  25 +import org.thingsboard.server.common.stats.StatsCounter;
  26 +import org.thingsboard.server.common.stats.StatsType;
28 27
29 28 import java.util.*;
30 29 import java.util.concurrent.ConcurrentHashMap;
31 30 import java.util.concurrent.ConcurrentMap;
32   -import java.util.concurrent.atomic.AtomicInteger;
33 31
34 32 @Slf4j
35 33 public class TbRuleEngineConsumerStats {
... ... @@ -60,18 +58,18 @@ public class TbRuleEngineConsumerStats {
60 58
61 59 private final String queueName;
62 60
63   - public TbRuleEngineConsumerStats(String queueName, StatsCounterFactory counterFactory) {
  61 + public TbRuleEngineConsumerStats(String queueName, StatsFactory statsFactory) {
64 62 this.queueName = queueName;
65 63
66 64 String statsKey = StatsType.RULE_ENGINE.getName() + "." + queueName;
67   - this.totalMsgCounter = counterFactory.createStatsCounter(statsKey, TOTAL_MSGS);
68   - this.successMsgCounter = counterFactory.createStatsCounter(statsKey, SUCCESSFUL_MSGS);
69   - this.timeoutMsgCounter = counterFactory.createStatsCounter(statsKey, TIMEOUT_MSGS);
70   - this.failedMsgCounter = counterFactory.createStatsCounter(statsKey, FAILED_MSGS);
71   - this.tmpTimeoutMsgCounter = counterFactory.createStatsCounter(statsKey, TMP_TIMEOUT);
72   - this.tmpFailedMsgCounter = counterFactory.createStatsCounter(statsKey, TMP_FAILED);
73   - this.successIterationsCounter = counterFactory.createStatsCounter(statsKey, SUCCESSFUL_ITERATIONS);
74   - this.failedIterationsCounter = counterFactory.createStatsCounter(statsKey, FAILED_ITERATIONS);
  65 + this.totalMsgCounter = statsFactory.createStatsCounter(statsKey, TOTAL_MSGS);
  66 + this.successMsgCounter = statsFactory.createStatsCounter(statsKey, SUCCESSFUL_MSGS);
  67 + this.timeoutMsgCounter = statsFactory.createStatsCounter(statsKey, TIMEOUT_MSGS);
  68 + this.failedMsgCounter = statsFactory.createStatsCounter(statsKey, FAILED_MSGS);
  69 + this.tmpTimeoutMsgCounter = statsFactory.createStatsCounter(statsKey, TMP_TIMEOUT);
  70 + this.tmpFailedMsgCounter = statsFactory.createStatsCounter(statsKey, TMP_FAILED);
  71 + this.successIterationsCounter = statsFactory.createStatsCounter(statsKey, SUCCESSFUL_ITERATIONS);
  72 + this.failedIterationsCounter = statsFactory.createStatsCounter(statsKey, FAILED_ITERATIONS);
75 73
76 74 counters.add(totalMsgCounter);
77 75 counters.add(successMsgCounter);
... ...
... ... @@ -52,10 +52,10 @@ public class RawAccessJwtToken implements JwtToken, Serializable {
52 52 try {
53 53 return Jwts.parser().setSigningKey(signingKey).parseClaimsJws(this.token);
54 54 } catch (UnsupportedJwtException | MalformedJwtException | IllegalArgumentException | SignatureException ex) {
55   - log.error("Invalid JWT Token", ex);
  55 + log.debug("Invalid JWT Token", ex);
56 56 throw new BadCredentialsException("Invalid JWT token: ", ex);
57 57 } catch (ExpiredJwtException expiredEx) {
58   - log.info("JWT Token is expired", expiredEx);
  58 + log.debug("JWT Token is expired", expiredEx);
59 59 throw new JwtExpiredTokenException(this, "JWT Token expired", expiredEx);
60 60 }
61 61 }
... ...
... ... @@ -18,6 +18,9 @@ package org.thingsboard.server.service.stats;
18 18 import org.springframework.beans.factory.annotation.Autowired;
19 19 import org.springframework.stereotype.Service;
20 20 import org.thingsboard.server.actors.JsInvokeStats;
  21 +import org.thingsboard.server.common.stats.StatsCounter;
  22 +import org.thingsboard.server.common.stats.StatsFactory;
  23 +import org.thingsboard.server.common.stats.StatsType;
21 24
22 25 import javax.annotation.PostConstruct;
23 26
... ... @@ -32,14 +35,14 @@ public class DefaultJsInvokeStats implements JsInvokeStats {
32 35 private StatsCounter failuresCounter;
33 36
34 37 @Autowired
35   - private StatsCounterFactory counterFactory;
  38 + private StatsFactory statsFactory;
36 39
37 40 @PostConstruct
38 41 public void init() {
39 42 String key = StatsType.JS_INVOKE.getName();
40   - this.requestsCounter = counterFactory.createStatsCounter(key, REQUESTS);
41   - this.responsesCounter = counterFactory.createStatsCounter(key, RESPONSES);
42   - this.failuresCounter = counterFactory.createStatsCounter(key, FAILURES);
  43 + this.requestsCounter = statsFactory.createStatsCounter(key, REQUESTS);
  44 + this.responsesCounter = statsFactory.createStatsCounter(key, RESPONSES);
  45 + this.failuresCounter = statsFactory.createStatsCounter(key, FAILURES);
43 46 }
44 47
45 48 @Override
... ...
... ... @@ -20,6 +20,9 @@ import org.springframework.beans.factory.annotation.Value;
20 20 import org.springframework.boot.context.event.ApplicationReadyEvent;
21 21 import org.springframework.context.event.EventListener;
22 22 import org.springframework.stereotype.Service;
  23 +import org.thingsboard.server.common.stats.MessagesStats;
  24 +import org.thingsboard.server.common.stats.StatsFactory;
  25 +import org.thingsboard.server.common.stats.StatsType;
23 26 import org.thingsboard.server.queue.TbQueueConsumer;
24 27 import org.thingsboard.server.queue.TbQueueProducer;
25 28 import org.thingsboard.server.queue.TbQueueResponseTemplate;
... ... @@ -29,10 +32,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestM
29 32 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
30 33 import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
31 34 import org.thingsboard.server.queue.util.TbCoreComponent;
32   -import org.thingsboard.server.service.stats.DefaultQueueStats;
33   -import org.thingsboard.server.service.stats.StatsCounter;
34   -import org.thingsboard.server.service.stats.StatsCounterFactory;
35   -import org.thingsboard.server.service.stats.StatsType;
36 35
37 36 import javax.annotation.PostConstruct;
38 37 import javax.annotation.PreDestroy;
... ... @@ -45,13 +44,9 @@ import java.util.concurrent.*;
45 44 @Service
46 45 @TbCoreComponent
47 46 public class TbCoreTransportApiService {
48   - private static final String TOTAL_MSGS = "totalMsgs";
49   - private static final String SUCCESSFUL_MSGS = "successfulMsgs";
50   - private static final String FAILED_MSGS = "failedMsgs";
51   -
52 47 private final TbCoreQueueFactory tbCoreQueueFactory;
53 48 private final TransportApiService transportApiService;
54   - private final StatsCounterFactory counterFactory;
  49 + private final StatsFactory statsFactory;
55 50
56 51 @Value("${queue.transport_api.max_pending_requests:10000}")
57 52 private int maxPendingRequests;
... ... @@ -66,10 +61,10 @@ public class TbCoreTransportApiService {
66 61 private TbQueueResponseTemplate<TbProtoQueueMsg<TransportApiRequestMsg>,
67 62 TbProtoQueueMsg<TransportApiResponseMsg>> transportApiTemplate;
68 63
69   - public TbCoreTransportApiService(TbCoreQueueFactory tbCoreQueueFactory, TransportApiService transportApiService, StatsCounterFactory counterFactory) {
  64 + public TbCoreTransportApiService(TbCoreQueueFactory tbCoreQueueFactory, TransportApiService transportApiService, StatsFactory statsFactory) {
70 65 this.tbCoreQueueFactory = tbCoreQueueFactory;
71 66 this.transportApiService = transportApiService;
72   - this.counterFactory = counterFactory;
  67 + this.statsFactory = statsFactory;
73 68 }
74 69
75 70 @PostConstruct
... ... @@ -79,10 +74,7 @@ public class TbCoreTransportApiService {
79 74 TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> consumer = tbCoreQueueFactory.createTransportApiRequestConsumer();
80 75
81 76 String key = StatsType.TRANSPORT.getName();
82   - StatsCounter totalCounter = counterFactory.createStatsCounter(key, TOTAL_MSGS);
83   - StatsCounter successfulCounter = counterFactory.createStatsCounter(key, SUCCESSFUL_MSGS);
84   - StatsCounter failedCounter = counterFactory.createStatsCounter(key, FAILED_MSGS);
85   - DefaultQueueStats queueStats = new DefaultQueueStats(totalCounter, successfulCounter, failedCounter);
  77 + MessagesStats queueStats = statsFactory.createMessagesStats(key);
86 78
87 79 DefaultTbQueueResponseTemplate.DefaultTbQueueResponseTemplateBuilder
88 80 <TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> builder = DefaultTbQueueResponseTemplate.builder();
... ...
... ... @@ -178,6 +178,8 @@ database:
178 178 ts_max_intervals: "${DATABASE_TS_MAX_INTERVALS:700}" # Max number of DB queries generated by single API call to fetch telemetry records
179 179 ts:
180 180 type: "${DATABASE_TS_TYPE:sql}" # cassandra, sql, or timescale (for hybrid mode, DATABASE_TS_TYPE value should be cassandra, or timescale)
  181 + latest_ts:
  182 + type: "${DATABASE_TS_TYPE:sql}" # cassandra, sql, or timescale (for hybrid mode, DATABASE_TS_TYPE value should be cassandra, or timescale)
181 183
182 184 # note: timescale works only with postgreSQL database for DATABASE_ENTITIES_TYPE.
183 185
... ... @@ -756,7 +758,7 @@ queue:
756 758 transport:
757 759 # For high priority notifications that require minimum latency and processing time
758 760 notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"
759   - poll_interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
  761 + poll_interval: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_POLL_INTERVAL_MS:25}"
760 762
761 763 service:
762 764 type: "${TB_SERVICE_TYPE:monolith}" # monolith or tb-core or tb-rule-engine
... ...
... ... @@ -41,6 +41,7 @@
41 41 <module>queue</module>
42 42 <module>transport</module>
43 43 <module>dao-api</module>
  44 + <module>stats</module>
44 45 </modules>
45 46
46 47 </project>
... ...
... ... @@ -49,6 +49,10 @@
49 49 <artifactId>message</artifactId>
50 50 </dependency>
51 51 <dependency>
  52 + <groupId>org.thingsboard.common</groupId>
  53 + <artifactId>stats</artifactId>
  54 + </dependency>
  55 + <dependency>
52 56 <groupId>org.apache.kafka</groupId>
53 57 <artifactId>kafka-clients</artifactId>
54 58 </dependency>
... ... @@ -112,6 +116,7 @@
112 116 <groupId>org.apache.curator</groupId>
113 117 <artifactId>curator-recipes</artifactId>
114 118 </dependency>
  119 +
115 120 <dependency>
116 121 <groupId>junit</groupId>
117 122 <artifactId>junit</artifactId>
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.queue;
17 17
18 18 import com.google.common.util.concurrent.ListenableFuture;
  19 +import org.thingsboard.server.common.stats.MessagesStats;
19 20
20 21 public interface TbQueueRequestTemplate<Request extends TbQueueMsg, Response extends TbQueueMsg> {
21 22
... ... @@ -25,4 +26,5 @@ public interface TbQueueRequestTemplate<Request extends TbQueueMsg, Response ext
25 26
26 27 void stop();
27 28
  29 + void setMessagesStats(MessagesStats messagesStats);
28 30 }
... ...
... ... @@ -28,6 +28,7 @@ import org.thingsboard.server.queue.TbQueueMsg;
28 28 import org.thingsboard.server.queue.TbQueueMsgMetadata;
29 29 import org.thingsboard.server.queue.TbQueueProducer;
30 30 import org.thingsboard.server.queue.TbQueueRequestTemplate;
  31 +import org.thingsboard.server.common.stats.MessagesStats;
31 32
32 33 import java.util.List;
33 34 import java.util.UUID;
... ... @@ -54,6 +55,8 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
54 55 private volatile long tickSize = 0L;
55 56 private volatile boolean stopped = false;
56 57
  58 + private MessagesStats messagesStats;
  59 +
57 60 @Builder
58 61 public DefaultTbQueueRequestTemplate(TbQueueAdmin queueAdmin,
59 62 TbQueueProducer<Request> requestTemplate,
... ... @@ -154,6 +157,11 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
154 157 }
155 158
156 159 @Override
  160 + public void setMessagesStats(MessagesStats messagesStats) {
  161 + this.messagesStats = messagesStats;
  162 + }
  163 +
  164 + @Override
157 165 public ListenableFuture<Response> send(Request request) {
158 166 if (tickSize > maxPendingRequests) {
159 167 return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));
... ... @@ -166,14 +174,23 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
166 174 ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future);
167 175 pendingRequests.putIfAbsent(requestId, responseMetaData);
168 176 log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, request.getKey(), responseMetaData.expTime);
  177 + if (messagesStats != null) {
  178 + messagesStats.incrementTotal();
  179 + }
169 180 requestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {
170 181 @Override
171 182 public void onSuccess(TbQueueMsgMetadata metadata) {
  183 + if (messagesStats != null) {
  184 + messagesStats.incrementSuccessful();
  185 + }
172 186 log.trace("[{}] Request sent: {}", requestId, metadata);
173 187 }
174 188
175 189 @Override
176 190 public void onFailure(Throwable t) {
  191 + if (messagesStats != null) {
  192 + messagesStats.incrementFailed();
  193 + }
177 194 pendingRequests.remove(requestId);
178 195 future.setException(t);
179 196 }
... ...
... ... @@ -23,7 +23,7 @@ import org.thingsboard.server.queue.TbQueueHandler;
23 23 import org.thingsboard.server.queue.TbQueueMsg;
24 24 import org.thingsboard.server.queue.TbQueueProducer;
25 25 import org.thingsboard.server.queue.TbQueueResponseTemplate;
26   -import org.thingsboard.server.queue.stats.QueueStats;
  26 +import org.thingsboard.server.common.stats.MessagesStats;
27 27
28 28 import java.util.List;
29 29 import java.util.UUID;
... ... @@ -45,7 +45,7 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
45 45 private final ExecutorService loopExecutor;
46 46 private final ScheduledExecutorService timeoutExecutor;
47 47 private final ExecutorService callbackExecutor;
48   - private final QueueStats stats;
  48 + private final MessagesStats stats;
49 49 private final int maxPendingRequests;
50 50 private final long requestTimeout;
51 51
... ... @@ -61,7 +61,7 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
61 61 long requestTimeout,
62 62 int maxPendingRequests,
63 63 ExecutorService executor,
64   - QueueStats stats) {
  64 + MessagesStats stats) {
65 65 this.requestTemplate = requestTemplate;
66 66 this.responseTemplate = responseTemplate;
67 67 this.pendingRequests = new ConcurrentHashMap<>();
... ...
  1 +<?xml version="1.0" encoding="UTF-8"?>
  2 +<!--
  3 +
  4 + Copyright © 2016-2020 The Thingsboard Authors
  5 +
  6 + Licensed under the Apache License, Version 2.0 (the "License");
  7 + you may not use this file except in compliance with the License.
  8 + You may obtain a copy of the License at
  9 +
  10 + http://www.apache.org/licenses/LICENSE-2.0
  11 +
  12 + Unless required by applicable law or agreed to in writing, software
  13 + distributed under the License is distributed on an "AS IS" BASIS,
  14 + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15 + See the License for the specific language governing permissions and
  16 + limitations under the License.
  17 +
  18 +-->
  19 +<project xmlns="http://maven.apache.org/POM/4.0.0"
  20 + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  21 + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  22 + <modelVersion>4.0.0</modelVersion>
  23 + <parent>
  24 + <groupId>org.thingsboard</groupId>
  25 + <version>3.1.0-SNAPSHOT</version>
  26 + <artifactId>common</artifactId>
  27 + </parent>
  28 + <groupId>org.thingsboard.common</groupId>
  29 + <artifactId>stats</artifactId>
  30 + <packaging>jar</packaging>
  31 +
  32 + <name>Thingsboard Server Stats</name>
  33 + <url>https://thingsboard.io</url>
  34 +
  35 + <properties>
  36 + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  37 + <main.dir>${basedir}/../..</main.dir>
  38 + </properties>
  39 +
  40 + <dependencies>
  41 + <dependency>
  42 + <groupId>com.google.guava</groupId>
  43 + <artifactId>guava</artifactId>
  44 + <scope>provided</scope>
  45 + </dependency>
  46 + <dependency>
  47 + <groupId>org.slf4j</groupId>
  48 + <artifactId>slf4j-api</artifactId>
  49 + </dependency>
  50 + <dependency>
  51 + <groupId>org.slf4j</groupId>
  52 + <artifactId>log4j-over-slf4j</artifactId>
  53 + </dependency>
  54 + <dependency>
  55 + <groupId>ch.qos.logback</groupId>
  56 + <artifactId>logback-core</artifactId>
  57 + </dependency>
  58 + <dependency>
  59 + <groupId>ch.qos.logback</groupId>
  60 + <artifactId>logback-classic</artifactId>
  61 + </dependency>
  62 + <dependency>
  63 + <groupId>org.springframework.boot</groupId>
  64 + <artifactId>spring-boot-starter-actuator</artifactId>
  65 + </dependency>
  66 + <dependency>
  67 + <groupId>io.micrometer</groupId>
  68 + <artifactId>micrometer-core</artifactId>
  69 + </dependency>
  70 + <dependency>
  71 + <groupId>io.micrometer</groupId>
  72 + <artifactId>micrometer-registry-prometheus</artifactId>
  73 + </dependency>
  74 +
  75 + <dependency>
  76 + <groupId>junit</groupId>
  77 + <artifactId>junit</artifactId>
  78 + <scope>test</scope>
  79 + </dependency>
  80 + <dependency>
  81 + <groupId>org.mockito</groupId>
  82 + <artifactId>mockito-all</artifactId>
  83 + <scope>test</scope>
  84 + </dependency>
  85 + </dependencies>
  86 +
  87 + <build>
  88 + <plugins>
  89 + </plugins>
  90 + </build>
  91 +
  92 +</project>
\ No newline at end of file
... ...
common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultCounter.java renamed from application/src/main/java/org/thingsboard/server/service/stats/StatsCounter.java
... ... @@ -13,21 +13,19 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -package org.thingsboard.server.service.stats;
  16 +package org.thingsboard.server.common.stats;
17 17
18 18 import io.micrometer.core.instrument.Counter;
19 19
20 20 import java.util.concurrent.atomic.AtomicInteger;
21 21
22   -public class StatsCounter {
  22 +public class DefaultCounter {
23 23 private final AtomicInteger aiCounter;
24 24 private final Counter micrometerCounter;
25   - private final String name;
26 25
27   - public StatsCounter(AtomicInteger aiCounter, Counter micrometerCounter, String name) {
  26 + public DefaultCounter(AtomicInteger aiCounter, Counter micrometerCounter) {
28 27 this.aiCounter = aiCounter;
29 28 this.micrometerCounter = micrometerCounter;
30   - this.name = name;
31 29 }
32 30
33 31 public void increment() {
... ... @@ -47,8 +45,4 @@ public class StatsCounter {
47 45 aiCounter.addAndGet(delta);
48 46 micrometerCounter.increment(delta);
49 47 }
50   -
51   - public String getName() {
52   - return name;
53   - }
54 48 }
... ...
common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultMessagesStats.java renamed from application/src/main/java/org/thingsboard/server/service/stats/DefaultQueueStats.java
... ... @@ -13,16 +13,14 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -package org.thingsboard.server.service.stats;
  16 +package org.thingsboard.server.common.stats;
17 17
18   -import org.thingsboard.server.queue.stats.QueueStats;
19   -
20   -public class DefaultQueueStats implements QueueStats {
  18 +public class DefaultMessagesStats implements MessagesStats {
21 19 private final StatsCounter totalCounter;
22 20 private final StatsCounter successfulCounter;
23 21 private final StatsCounter failedCounter;
24 22
25   - public DefaultQueueStats(StatsCounter totalCounter, StatsCounter successfulCounter, StatsCounter failedCounter) {
  23 + public DefaultMessagesStats(StatsCounter totalCounter, StatsCounter successfulCounter, StatsCounter failedCounter) {
26 24 this.totalCounter = totalCounter;
27 25 this.successfulCounter = successfulCounter;
28 26 this.failedCounter = failedCounter;
... ... @@ -42,4 +40,26 @@ public class DefaultQueueStats implements QueueStats {
42 40 public void incrementFailed(int amount) {
43 41 failedCounter.add(amount);
44 42 }
  43 +
  44 + @Override
  45 + public int getTotal() {
  46 + return totalCounter.get();
  47 + }
  48 +
  49 + @Override
  50 + public int getSuccessful() {
  51 + return successfulCounter.get();
  52 + }
  53 +
  54 + @Override
  55 + public int getFailed() {
  56 + return failedCounter.get();
  57 + }
  58 +
  59 + @Override
  60 + public void reset() {
  61 + totalCounter.clear();
  62 + successfulCounter.clear();
  63 + failedCounter.clear();
  64 + }
45 65 }
... ...
common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultStatsFactory.java renamed from application/src/main/java/org/thingsboard/server/service/stats/StatsCounterFactory.java
... ... @@ -13,19 +13,23 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -package org.thingsboard.server.service.stats;
  16 +package org.thingsboard.server.common.stats;
17 17
18 18 import io.micrometer.core.instrument.Counter;
19 19 import io.micrometer.core.instrument.MeterRegistry;
  20 +import io.micrometer.core.instrument.Tags;
20 21 import org.springframework.beans.factory.annotation.Autowired;
21 22 import org.springframework.beans.factory.annotation.Value;
22 23 import org.springframework.stereotype.Service;
23   -import org.thingsboard.server.service.metrics.StubCounter;
24 24
25 25 import java.util.concurrent.atomic.AtomicInteger;
26 26
27 27 @Service
28   -public class StatsCounterFactory {
  28 +public class DefaultStatsFactory implements StatsFactory {
  29 + private static final String TOTAL_MSGS = "totalMsgs";
  30 + private static final String SUCCESSFUL_MSGS = "successfulMsgs";
  31 + private static final String FAILED_MSGS = "failedMsgs";
  32 +
29 33 private static final String STATS_NAME_TAG = "statsName";
30 34
31 35 private static final Counter STUB_COUNTER = new StubCounter();
... ... @@ -33,9 +37,10 @@ public class StatsCounterFactory {
33 37 @Autowired
34 38 private MeterRegistry meterRegistry;
35 39
36   - @Value("${metrics.enabled}")
  40 + @Value("${metrics.enabled:false}")
37 41 private Boolean metricsEnabled;
38 42
  43 + @Override
39 44 public StatsCounter createStatsCounter(String key, String statsName) {
40 45 return new StatsCounter(
41 46 new AtomicInteger(0),
... ... @@ -45,4 +50,42 @@ public class StatsCounterFactory {
45 50 statsName
46 51 );
47 52 }
  53 +
  54 + @Override
  55 + public DefaultCounter createDefaultCounter(String key, String... tags) {
  56 + return new DefaultCounter(
  57 + new AtomicInteger(0),
  58 + metricsEnabled ?
  59 + meterRegistry.counter(key, tags)
  60 + : STUB_COUNTER
  61 + );
  62 + }
  63 +
  64 + @Override
  65 + public <T extends Number> T createGauge(String key, T number, String... tags) {
  66 + return meterRegistry.gauge(key, Tags.of(tags), number);
  67 + }
  68 +
  69 + @Override
  70 + public MessagesStats createMessagesStats(String key) {
  71 + StatsCounter totalCounter = createStatsCounter(key, TOTAL_MSGS);
  72 + StatsCounter successfulCounter = createStatsCounter(key, SUCCESSFUL_MSGS);
  73 + StatsCounter failedCounter = createStatsCounter(key, FAILED_MSGS);
  74 + return new DefaultMessagesStats(totalCounter, successfulCounter, failedCounter);
  75 + }
  76 +
  77 + private static class StubCounter implements Counter {
  78 + @Override
  79 + public void increment(double amount) {}
  80 +
  81 + @Override
  82 + public double count() {
  83 + return 0;
  84 + }
  85 +
  86 + @Override
  87 + public Id getId() {
  88 + return null;
  89 + }
  90 + }
48 91 }
... ...
common/stats/src/main/java/org/thingsboard/server/common/stats/MessagesStats.java renamed from common/queue/src/main/java/org/thingsboard/server/queue/stats/QueueStats.java
... ... @@ -13,9 +13,9 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -package org.thingsboard.server.queue.stats;
  16 +package org.thingsboard.server.common.stats;
17 17
18   -public interface QueueStats {
  18 +public interface MessagesStats {
19 19 default void incrementTotal() {
20 20 incrementTotal(1);
21 21 }
... ... @@ -28,10 +28,17 @@ public interface QueueStats {
28 28
29 29 void incrementSuccessful(int amount);
30 30
31   -
32 31 default void incrementFailed() {
33 32 incrementFailed(1);
34 33 }
35 34
36 35 void incrementFailed(int amount);
  36 +
  37 + int getTotal();
  38 +
  39 + int getSuccessful();
  40 +
  41 + int getFailed();
  42 +
  43 + void reset();
37 44 }
... ...
common/stats/src/main/java/org/thingsboard/server/common/stats/StatsCounter.java renamed from application/src/main/java/org/thingsboard/server/service/metrics/StubCounter.java
... ... @@ -13,21 +13,21 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -package org.thingsboard.server.service.metrics;
  16 +package org.thingsboard.server.common.stats;
17 17
18 18 import io.micrometer.core.instrument.Counter;
19 19
20   -public class StubCounter implements Counter {
21   - @Override
22   - public void increment(double amount) {}
  20 +import java.util.concurrent.atomic.AtomicInteger;
23 21
24   - @Override
25   - public double count() {
26   - return 0;
  22 +public class StatsCounter extends DefaultCounter {
  23 + private final String name;
  24 +
  25 + public StatsCounter(AtomicInteger aiCounter, Counter micrometerCounter, String name) {
  26 + super(aiCounter, micrometerCounter);
  27 + this.name = name;
27 28 }
28 29
29   - @Override
30   - public Id getId() {
31   - return null;
  30 + public String getName() {
  31 + return name;
32 32 }
33 33 }
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.stats;
  17 +
  18 +public interface StatsFactory {
  19 + StatsCounter createStatsCounter(String key, String statsName);
  20 +
  21 + DefaultCounter createDefaultCounter(String key, String... tags);
  22 +
  23 + <T extends Number> T createGauge(String key, T number, String... tags);
  24 +
  25 + MessagesStats createMessagesStats(String key);
  26 +}
... ...
common/stats/src/main/java/org/thingsboard/server/common/stats/StatsType.java renamed from application/src/main/java/org/thingsboard/server/service/stats/StatsType.java
... ... @@ -13,10 +13,10 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -package org.thingsboard.server.service.stats;
  16 +package org.thingsboard.server.common.stats;
17 17
18 18 public enum StatsType {
19   - RULE_ENGINE("ruleEngine"), CORE("core"), TRANSPORT("transport"), JS_INVOKE("jsInvoke");
  19 + RULE_ENGINE("ruleEngine"), CORE("core"), TRANSPORT("transport"), JS_INVOKE("jsInvoke"), RATE_EXECUTOR("rateExecutor");
20 20
21 21 private String name;
22 22
... ...
... ... @@ -42,6 +42,10 @@
42 42 </dependency>
43 43 <dependency>
44 44 <groupId>org.thingsboard.common</groupId>
  45 + <artifactId>stats</artifactId>
  46 + </dependency>
  47 + <dependency>
  48 + <groupId>org.thingsboard.common</groupId>
45 49 <artifactId>data</artifactId>
46 50 </dependency>
47 51 <dependency>
... ...
... ... @@ -55,6 +55,9 @@ import org.thingsboard.server.queue.discovery.PartitionService;
55 55 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
56 56 import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
57 57 import org.thingsboard.server.queue.provider.TbTransportQueueFactory;
  58 +import org.thingsboard.server.common.stats.MessagesStats;
  59 +import org.thingsboard.server.common.stats.StatsFactory;
  60 +import org.thingsboard.server.common.stats.StatsType;
58 61
59 62 import javax.annotation.PostConstruct;
60 63 import javax.annotation.PreDestroy;
... ... @@ -101,12 +104,17 @@ public class DefaultTransportService implements TransportService {
101 104 private final TbQueueProducerProvider producerProvider;
102 105 private final PartitionService partitionService;
103 106 private final TbServiceInfoProvider serviceInfoProvider;
  107 + private final StatsFactory statsFactory;
104 108
105 109 protected TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> transportApiRequestTemplate;
106 110 protected TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer;
107 111 protected TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> tbCoreMsgProducer;
108 112 protected TbQueueConsumer<TbProtoQueueMsg<ToTransportMsg>> transportNotificationsConsumer;
109 113
  114 + protected MessagesStats ruleEngineProducerStats;
  115 + protected MessagesStats tbCoreProducerStats;
  116 + protected MessagesStats transportApiStats;
  117 +
110 118 protected ScheduledExecutorService schedulerExecutor;
111 119 protected ExecutorService transportCallbackExecutor;
112 120
... ... @@ -119,11 +127,12 @@ public class DefaultTransportService implements TransportService {
119 127 private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("transport-consumer"));
120 128 private volatile boolean stopped = false;
121 129
122   - public DefaultTransportService(TbServiceInfoProvider serviceInfoProvider, TbTransportQueueFactory queueProvider, TbQueueProducerProvider producerProvider, PartitionService partitionService) {
  130 + public DefaultTransportService(TbServiceInfoProvider serviceInfoProvider, TbTransportQueueFactory queueProvider, TbQueueProducerProvider producerProvider, PartitionService partitionService, StatsFactory statsFactory) {
123 131 this.serviceInfoProvider = serviceInfoProvider;
124 132 this.queueProvider = queueProvider;
125 133 this.producerProvider = producerProvider;
126 134 this.partitionService = partitionService;
  135 + this.statsFactory = statsFactory;
127 136 }
128 137
129 138 @PostConstruct
... ... @@ -133,10 +142,14 @@ public class DefaultTransportService implements TransportService {
133 142 new TbRateLimits(perTenantLimitsConf);
134 143 new TbRateLimits(perDevicesLimitsConf);
135 144 }
  145 + this.ruleEngineProducerStats = statsFactory.createMessagesStats(StatsType.RULE_ENGINE.getName() + ".producer");
  146 + this.tbCoreProducerStats = statsFactory.createMessagesStats(StatsType.CORE.getName() + ".producer");
  147 + this.transportApiStats = statsFactory.createMessagesStats(StatsType.TRANSPORT.getName() + ".producer");
136 148 this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("transport-scheduler"));
137 149 this.transportCallbackExecutor = Executors.newWorkStealingPool(20);
138 150 this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS);
139 151 transportApiRequestTemplate = queueProvider.createTransportApiRequestTemplate();
  152 + transportApiRequestTemplate.setMessagesStats(transportApiStats);
140 153 ruleEngineMsgProducer = producerProvider.getRuleEngineMsgProducer();
141 154 tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer();
142 155 transportNotificationsConsumer = queueProvider.createTransportNotificationsConsumer();
... ... @@ -557,10 +570,14 @@ public class DefaultTransportService implements TransportService {
557 570 if (log.isTraceEnabled()) {
558 571 log.trace("[{}][{}] Pushing to topic {} message {}", getTenantId(sessionInfo), getDeviceId(sessionInfo), tpi.getFullTopicName(), toDeviceActorMsg);
559 572 }
  573 + TransportTbQueueCallback transportTbQueueCallback = callback != null ?
  574 + new TransportTbQueueCallback(callback) : null;
  575 + tbCoreProducerStats.incrementTotal();
  576 + StatsCallback wrappedCallback = new StatsCallback(transportTbQueueCallback, tbCoreProducerStats);
560 577 tbCoreMsgProducer.send(tpi,
561 578 new TbProtoQueueMsg<>(getRoutingKey(sessionInfo),
562   - ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()), callback != null ?
563   - new TransportTbQueueCallback(callback) : null);
  579 + ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()),
  580 + wrappedCallback);
564 581 }
565 582
566 583 protected void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) {
... ... @@ -571,7 +588,9 @@ public class DefaultTransportService implements TransportService {
571 588 ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder().setTbMsg(TbMsg.toByteString(tbMsg))
572 589 .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
573 590 .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build();
574   - ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback);
  591 + ruleEngineProducerStats.incrementTotal();
  592 + StatsCallback wrappedCallback = new StatsCallback(callback, ruleEngineProducerStats);
  593 + ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), wrappedCallback);
575 594 }
576 595
577 596 private class TransportTbQueueCallback implements TbQueueCallback {
... ... @@ -592,6 +611,30 @@ public class DefaultTransportService implements TransportService {
592 611 }
593 612 }
594 613
  614 + private class StatsCallback implements TbQueueCallback {
  615 + private final TbQueueCallback callback;
  616 + private final MessagesStats stats;
  617 +
  618 + private StatsCallback(TbQueueCallback callback, MessagesStats stats) {
  619 + this.callback = callback;
  620 + this.stats = stats;
  621 + }
  622 +
  623 + @Override
  624 + public void onSuccess(TbQueueMsgMetadata metadata) {
  625 + stats.incrementSuccessful();
  626 + if (callback != null)
  627 + callback.onSuccess(metadata);
  628 + }
  629 +
  630 + @Override
  631 + public void onFailure(Throwable t) {
  632 + stats.incrementFailed();
  633 + if (callback != null)
  634 + callback.onFailure(t);
  635 + }
  636 + }
  637 +
595 638 private class MsgPackCallback implements TbQueueCallback {
596 639 private final AtomicInteger msgCount;
597 640 private final TransportServiceCallback<Void> callback;
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.common.util;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +
  20 +import javax.crypto.Mac;
  21 +import javax.crypto.spec.SecretKeySpec;
  22 +import java.io.IOException;
  23 +import java.net.URLEncoder;
  24 +import java.nio.charset.StandardCharsets;
  25 +import java.nio.file.Files;
  26 +import java.nio.file.Path;
  27 +import java.nio.file.Paths;
  28 +import java.util.Base64;
  29 +
  30 +@Slf4j
  31 +public final class AzureIotHubUtil {
  32 + private static final String BASE_DIR_PATH = System.getProperty("user.dir");
  33 + private static final String APP_DIR = "application";
  34 + private static final String SRC_DIR = "src";
  35 + private static final String MAIN_DIR = "main";
  36 + private static final String DATA_DIR = "data";
  37 + private static final String CERTS_DIR = "certs";
  38 + private static final String AZURE_DIR = "azure";
  39 + private static final String FILE_NAME = "BaltimoreCyberTrustRoot.crt.pem";
  40 +
  41 + private static final Path FULL_FILE_PATH;
  42 +
  43 + static {
  44 + if (BASE_DIR_PATH.endsWith("bin")) {
  45 + FULL_FILE_PATH = Paths.get(BASE_DIR_PATH.replaceAll("bin$", ""), DATA_DIR, CERTS_DIR, AZURE_DIR, FILE_NAME);
  46 + } else if (BASE_DIR_PATH.endsWith("conf")) {
  47 + FULL_FILE_PATH = Paths.get(BASE_DIR_PATH.replaceAll("conf$", ""), DATA_DIR, CERTS_DIR, AZURE_DIR, FILE_NAME);
  48 + } else {
  49 + FULL_FILE_PATH = Paths.get(BASE_DIR_PATH, APP_DIR, SRC_DIR, MAIN_DIR, DATA_DIR, CERTS_DIR, AZURE_DIR, FILE_NAME);
  50 + }
  51 + }
  52 +
  53 + private static final long SAS_TOKEN_VALID_SECS = 365 * 24 * 60 * 60;
  54 + private static final long ONE_SECOND_IN_MILLISECONDS = 1000;
  55 +
  56 + private static final String SAS_TOKEN_FORMAT = "SharedAccessSignature sr=%s&sig=%s&se=%s";
  57 +
  58 + private static final String USERNAME_FORMAT = "%s/%s/?api-version=2018-06-30";
  59 +
  60 + private AzureIotHubUtil() {
  61 + }
  62 +
  63 + public static String buildUsername(String host, String deviceId) {
  64 + return String.format(USERNAME_FORMAT, host, deviceId);
  65 + }
  66 +
  67 + public static String buildSasToken(String host, String sasKey) {
  68 + try {
  69 + final String targetUri = URLEncoder.encode(host.toLowerCase(), "UTF-8");
  70 + final long expiryTime = buildExpiresOn();
  71 + String toSign = targetUri + "\n" + expiryTime;
  72 + byte[] keyBytes = Base64.getDecoder().decode(sasKey.getBytes(StandardCharsets.UTF_8));
  73 + SecretKeySpec signingKey = new SecretKeySpec(keyBytes, "HmacSHA256");
  74 + Mac mac = Mac.getInstance("HmacSHA256");
  75 + mac.init(signingKey);
  76 + byte[] rawHmac = mac.doFinal(toSign.getBytes(StandardCharsets.UTF_8));
  77 + String signature = URLEncoder.encode(Base64.getEncoder().encodeToString(rawHmac), "UTF-8");
  78 + return String.format(SAS_TOKEN_FORMAT, targetUri, signature, expiryTime);
  79 + } catch (Exception e) {
  80 + throw new RuntimeException("Failed to build SAS token!!!", e);
  81 + }
  82 + }
  83 +
  84 + private static long buildExpiresOn() {
  85 + long expiresOnDate = System.currentTimeMillis();
  86 + expiresOnDate += SAS_TOKEN_VALID_SECS * ONE_SECOND_IN_MILLISECONDS;
  87 + return expiresOnDate / ONE_SECOND_IN_MILLISECONDS;
  88 + }
  89 +
  90 + public static String getDefaultCaCert() {
  91 + try {
  92 + return new String(Files.readAllBytes(FULL_FILE_PATH));
  93 + } catch (IOException e) {
  94 + log.error("Failed to load Default CaCert file!!! [{}]", FULL_FILE_PATH.toString());
  95 + throw new RuntimeException("Failed to load Default CaCert file!!!");
  96 + }
  97 + }
  98 +
  99 +}
... ...
... ... @@ -45,6 +45,10 @@
45 45 </dependency>
46 46 <dependency>
47 47 <groupId>org.thingsboard.common</groupId>
  48 + <artifactId>stats</artifactId>
  49 + </dependency>
  50 + <dependency>
  51 + <groupId>org.thingsboard.common</groupId>
48 52 <artifactId>dao-api</artifactId>
49 53 </dependency>
50 54 <dependency>
... ...
... ... @@ -23,6 +23,9 @@ import org.springframework.beans.factory.annotation.Value;
23 23 import org.springframework.scheduling.annotation.Scheduled;
24 24 import org.springframework.stereotype.Component;
25 25 import org.thingsboard.server.common.data.id.TenantId;
  26 +import org.thingsboard.server.common.stats.DefaultCounter;
  27 +import org.thingsboard.server.common.stats.StatsCounter;
  28 +import org.thingsboard.server.common.stats.StatsFactory;
26 29 import org.thingsboard.server.dao.entity.EntityService;
27 30 import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor;
28 31 import org.thingsboard.server.dao.util.AsyncTaskContext;
... ... @@ -56,48 +59,58 @@ public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor<
56 59 @Value("${cassandra.query.tenant_rate_limits.enabled}") boolean tenantRateLimitsEnabled,
57 60 @Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration,
58 61 @Value("${cassandra.query.tenant_rate_limits.print_tenant_names}") boolean printTenantNames,
59   - @Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq) {
60   - super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, tenantRateLimitsEnabled, tenantRateLimitsConfiguration, printQueriesFreq);
  62 + @Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq,
  63 + @Autowired StatsFactory statsFactory) {
  64 + super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, tenantRateLimitsEnabled, tenantRateLimitsConfiguration, printQueriesFreq, statsFactory);
61 65 this.printTenantNames = printTenantNames;
62 66 }
63 67
64 68 @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
65 69 public void printStats() {
66 70 int queueSize = getQueueSize();
67   - int totalAddedValue = totalAdded.getAndSet(0);
68   - int totalLaunchedValue = totalLaunched.getAndSet(0);
69   - int totalReleasedValue = totalReleased.getAndSet(0);
70   - int totalFailedValue = totalFailed.getAndSet(0);
71   - int totalExpiredValue = totalExpired.getAndSet(0);
72   - int totalRejectedValue = totalRejected.getAndSet(0);
73   - int totalRateLimitedValue = totalRateLimited.getAndSet(0);
74   - int rateLimitedTenantsValue = rateLimitedTenants.size();
75   - int concurrencyLevelValue = concurrencyLevel.get();
76   - if (queueSize > 0 || totalAddedValue > 0 || totalLaunchedValue > 0 || totalReleasedValue > 0 ||
77   - totalFailedValue > 0 || totalExpiredValue > 0 || totalRejectedValue > 0 || totalRateLimitedValue > 0 || rateLimitedTenantsValue > 0
78   - || concurrencyLevelValue > 0) {
79   - log.info("Permits queueSize [{}] totalAdded [{}] totalLaunched [{}] totalReleased [{}] totalFailed [{}] totalExpired [{}] totalRejected [{}] " +
80   - "totalRateLimited [{}] totalRateLimitedTenants [{}] currBuffer [{}] ",
81   - queueSize, totalAddedValue, totalLaunchedValue, totalReleasedValue,
82   - totalFailedValue, totalExpiredValue, totalRejectedValue, totalRateLimitedValue, rateLimitedTenantsValue, concurrencyLevelValue);
  71 + int rateLimitedTenantsCount = (int) stats.getRateLimitedTenants().values().stream()
  72 + .filter(defaultCounter -> defaultCounter.get() > 0)
  73 + .count();
  74 +
  75 + if (queueSize > 0
  76 + || rateLimitedTenantsCount > 0
  77 + || concurrencyLevel.get() > 0
  78 + || stats.getStatsCounters().stream().anyMatch(counter -> counter.get() > 0)
  79 + ) {
  80 + StringBuilder statsBuilder = new StringBuilder();
  81 +
  82 + statsBuilder.append("queueSize").append(" = [").append(queueSize).append("] ");
  83 + stats.getStatsCounters().forEach(counter -> {
  84 + statsBuilder.append(counter.getName()).append(" = [").append(counter.get()).append("] ");
  85 + });
  86 + statsBuilder.append("totalRateLimitedTenants").append(" = [").append(rateLimitedTenantsCount).append("] ");
  87 + statsBuilder.append(CONCURRENCY_LEVEL).append(" = [").append(concurrencyLevel.get()).append("] ");
  88 +
  89 + stats.getStatsCounters().forEach(StatsCounter::clear);
  90 + log.info("Permits {}", statsBuilder);
83 91 }
84 92
85   - rateLimitedTenants.forEach(((tenantId, counter) -> {
86   - if (printTenantNames) {
87   - String name = tenantNamesCache.computeIfAbsent(tenantId, tId -> {
88   - try {
89   - return entityService.fetchEntityNameAsync(TenantId.SYS_TENANT_ID, tenantId).get();
90   - } catch (Exception e) {
91   - log.error("[{}] Failed to get tenant name", tenantId, e);
92   - return "N/A";
  93 + stats.getRateLimitedTenants().entrySet().stream()
  94 + .filter(entry -> entry.getValue().get() > 0)
  95 + .forEach(entry -> {
  96 + TenantId tenantId = entry.getKey();
  97 + DefaultCounter counter = entry.getValue();
  98 + int rateLimitedRequests = counter.get();
  99 + counter.clear();
  100 + if (printTenantNames) {
  101 + String name = tenantNamesCache.computeIfAbsent(tenantId, tId -> {
  102 + try {
  103 + return entityService.fetchEntityNameAsync(TenantId.SYS_TENANT_ID, tenantId).get();
  104 + } catch (Exception e) {
  105 + log.error("[{}] Failed to get tenant name", tenantId, e);
  106 + return "N/A";
  107 + }
  108 + });
  109 + log.info("[{}][{}] Rate limited requests: {}", tenantId, name, rateLimitedRequests);
  110 + } else {
  111 + log.info("[{}] Rate limited requests: {}", tenantId, rateLimitedRequests);
93 112 }
94 113 });
95   - log.info("[{}][{}] Rate limited requests: {}", tenantId, name, counter);
96   - } else {
97   - log.info("[{}] Rate limited requests: {}", tenantId, counter);
98   - }
99   - }));
100   - rateLimitedTenants.clear();
101 114 }
102 115
103 116 @PreDestroy
... ...
... ... @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.ListenableFuture;
19 19 import com.google.common.util.concurrent.SettableFuture;
20 20 import lombok.extern.slf4j.Slf4j;
21 21 import org.thingsboard.common.util.ThingsBoardThreadFactory;
  22 +import org.thingsboard.server.common.stats.MessagesStats;
22 23
23 24 import java.util.ArrayList;
24 25 import java.util.List;
... ... @@ -27,7 +28,6 @@ import java.util.concurrent.ExecutorService;
27 28 import java.util.concurrent.Executors;
28 29 import java.util.concurrent.LinkedBlockingQueue;
29 30 import java.util.concurrent.TimeUnit;
30   -import java.util.concurrent.atomic.AtomicInteger;
31 31 import java.util.function.Consumer;
32 32 import java.util.stream.Collectors;
33 33
... ... @@ -35,15 +35,14 @@ import java.util.stream.Collectors;
35 35 public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
36 36
37 37 private final BlockingQueue<TbSqlQueueElement<E>> queue = new LinkedBlockingQueue<>();
38   - private final AtomicInteger addedCount = new AtomicInteger();
39   - private final AtomicInteger savedCount = new AtomicInteger();
40   - private final AtomicInteger failedCount = new AtomicInteger();
41 38 private final TbSqlBlockingQueueParams params;
42 39
43 40 private ExecutorService executor;
  41 + private final MessagesStats stats;
44 42
45   - public TbSqlBlockingQueue(TbSqlBlockingQueueParams params) {
  43 + public TbSqlBlockingQueue(TbSqlBlockingQueueParams params, MessagesStats stats) {
46 44 this.params = params;
  45 + this.stats = stats;
47 46 }
48 47
49 48 @Override
... ... @@ -68,7 +67,7 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
68 67 log.debug("[{}] Going to save {} entities", logName, entities.size());
69 68 saveFunction.accept(entities.stream().map(TbSqlQueueElement::getEntity).collect(Collectors.toList()));
70 69 entities.forEach(v -> v.getFuture().set(null));
71   - savedCount.addAndGet(entities.size());
  70 + stats.incrementSuccessful(entities.size());
72 71 if (!fullPack) {
73 72 long remainingDelay = maxDelay - (System.currentTimeMillis() - currentTs);
74 73 if (remainingDelay > 0) {
... ... @@ -76,7 +75,7 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
76 75 }
77 76 }
78 77 } catch (Exception e) {
79   - failedCount.addAndGet(entities.size());
  78 + stats.incrementFailed(entities.size());
80 79 entities.forEach(entityFutureWrapper -> entityFutureWrapper.getFuture().setException(e));
81 80 if (e instanceof InterruptedException) {
82 81 log.info("[{}] Queue polling was interrupted", logName);
... ... @@ -91,9 +90,10 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
91 90 });
92 91
93 92 logExecutor.scheduleAtFixedRate(() -> {
94   - if (queue.size() > 0 || addedCount.get() > 0 || savedCount.get() > 0 || failedCount.get() > 0) {
  93 + if (queue.size() > 0 || stats.getTotal() > 0 || stats.getSuccessful() > 0 || stats.getFailed() > 0) {
95 94 log.info("Queue-{} [{}] queueSize [{}] totalAdded [{}] totalSaved [{}] totalFailed [{}]", index,
96   - params.getLogName(), queue.size(), addedCount.getAndSet(0), savedCount.getAndSet(0), failedCount.getAndSet(0));
  95 + params.getLogName(), queue.size(), stats.getTotal(), stats.getSuccessful(), stats.getFailed());
  96 + stats.reset();
97 97 }
98 98 }, params.getStatsPrintIntervalMs(), params.getStatsPrintIntervalMs(), TimeUnit.MILLISECONDS);
99 99 }
... ... @@ -109,7 +109,7 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
109 109 public ListenableFuture<Void> add(E element) {
110 110 SettableFuture<Void> future = SettableFuture.create();
111 111 queue.add(new TbSqlQueueElement<>(future, element));
112   - addedCount.incrementAndGet();
  112 + stats.incrementTotal();
113 113 return future;
114 114 }
115 115 }
... ...
... ... @@ -18,6 +18,8 @@ package org.thingsboard.server.dao.sql;
18 18 import lombok.Builder;
19 19 import lombok.Data;
20 20 import lombok.extern.slf4j.Slf4j;
  21 +import org.thingsboard.server.common.stats.MessagesStats;
  22 +import org.thingsboard.server.common.stats.StatsFactory;
21 23
22 24 @Slf4j
23 25 @Data
... ... @@ -28,4 +30,5 @@ public class TbSqlBlockingQueueParams {
28 30 private final int batchSize;
29 31 private final long maxDelay;
30 32 private final long statsPrintIntervalMs;
  33 + private final String statsNamePrefix;
31 34 }
... ...
... ... @@ -18,6 +18,8 @@ package org.thingsboard.server.dao.sql;
18 18 import com.google.common.util.concurrent.ListenableFuture;
19 19 import lombok.Data;
20 20 import lombok.extern.slf4j.Slf4j;
  21 +import org.thingsboard.server.common.stats.MessagesStats;
  22 +import org.thingsboard.server.common.stats.StatsFactory;
21 23
22 24 import java.util.List;
23 25 import java.util.concurrent.CopyOnWriteArrayList;
... ... @@ -32,10 +34,12 @@ public class TbSqlBlockingQueueWrapper<E> {
32 34 private ScheduledLogExecutorComponent logExecutor;
33 35 private final Function<E, Integer> hashCodeFunction;
34 36 private final int maxThreads;
  37 + private final StatsFactory statsFactory;
35 38
36 39 public void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction) {
37 40 for (int i = 0; i < maxThreads; i++) {
38   - TbSqlBlockingQueue<E> queue = new TbSqlBlockingQueue<>(params);
  41 + MessagesStats stats = statsFactory.createMessagesStats(params.getStatsNamePrefix() + ".queue." + i);
  42 + TbSqlBlockingQueue<E> queue = new TbSqlBlockingQueue<>(params, stats);
39 43 queues.add(queue);
40 44 queue.init(logExecutor, saveFunction, i);
41 45 }
... ...
... ... @@ -25,6 +25,7 @@ import org.springframework.stereotype.Component;
25 25 import org.thingsboard.server.common.data.id.EntityId;
26 26 import org.thingsboard.server.common.data.id.TenantId;
27 27 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  28 +import org.thingsboard.server.common.stats.StatsFactory;
28 29 import org.thingsboard.server.dao.DaoUtil;
29 30 import org.thingsboard.server.dao.attributes.AttributesDao;
30 31 import org.thingsboard.server.dao.model.sql.AttributeKvCompositeKey;
... ... @@ -57,6 +58,9 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
57 58 @Autowired
58 59 private AttributeKvInsertRepository attributeKvInsertRepository;
59 60
  61 + @Autowired
  62 + private StatsFactory statsFactory;
  63 +
60 64 @Value("${sql.attributes.batch_size:1000}")
61 65 private int batchSize;
62 66
... ... @@ -78,10 +82,11 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
78 82 .batchSize(batchSize)
79 83 .maxDelay(maxDelay)
80 84 .statsPrintIntervalMs(statsPrintIntervalMs)
  85 + .statsNamePrefix("attributes")
81 86 .build();
82 87
83 88 Function<AttributeKvEntity, Integer> hashcodeFunction = entity -> entity.getId().getEntityId().hashCode();
84   - queue = new TbSqlBlockingQueueWrapper<>(params, hashcodeFunction, batchThreads);
  89 + queue = new TbSqlBlockingQueueWrapper<>(params, hashcodeFunction, batchThreads, statsFactory);
85 90 queue.init(logExecutor, v -> attributeKvInsertRepository.saveOrUpdate(v));
86 91 }
87 92
... ...
... ... @@ -30,8 +30,10 @@ import org.thingsboard.server.common.data.kv.Aggregation;
30 30 import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
31 31 import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
32 32 import org.thingsboard.server.common.data.kv.TsKvEntry;
  33 +import org.thingsboard.server.common.stats.StatsFactory;
33 34 import org.thingsboard.server.dao.DaoUtil;
34 35 import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
  36 +import org.thingsboard.server.dao.sql.TbSqlBlockingQueue;
35 37 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
36 38 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
37 39 import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository;
... ... @@ -57,6 +59,8 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
57 59 protected InsertTsRepository<TsKvEntity> insertRepository;
58 60
59 61 protected TbSqlBlockingQueueWrapper<TsKvEntity> tsQueue;
  62 + @Autowired
  63 + private StatsFactory statsFactory;
60 64
61 65 @PostConstruct
62 66 protected void init() {
... ... @@ -66,10 +70,11 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
66 70 .batchSize(tsBatchSize)
67 71 .maxDelay(tsMaxDelay)
68 72 .statsPrintIntervalMs(tsStatsPrintIntervalMs)
  73 + .statsNamePrefix("ts")
69 74 .build();
70 75
71 76 Function<TsKvEntity, Integer> hashcodeFunction = entity -> entity.getEntityId().hashCode();
72   - tsQueue = new TbSqlBlockingQueueWrapper<>(tsParams, hashcodeFunction, tsBatchThreads);
  77 + tsQueue = new TbSqlBlockingQueueWrapper<>(tsParams, hashcodeFunction, tsBatchThreads, statsFactory);
73 78 tsQueue.init(logExecutor, v -> insertRepository.saveOrUpdate(v));
74 79 }
75 80
... ...
... ... @@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
34 34 import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
35 35 import org.thingsboard.server.common.data.kv.StringDataEntry;
36 36 import org.thingsboard.server.common.data.kv.TsKvEntry;
  37 +import org.thingsboard.server.common.stats.StatsFactory;
37 38 import org.thingsboard.server.dao.DaoUtil;
38 39 import org.thingsboard.server.dao.model.sqlts.dictionary.TsKvDictionary;
39 40 import org.thingsboard.server.dao.model.sqlts.dictionary.TsKvDictionaryCompositeKey;
... ... @@ -102,6 +103,9 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
102 103 @Autowired
103 104 protected ScheduledLogExecutorComponent logExecutor;
104 105
  106 + @Autowired
  107 + private StatsFactory statsFactory;
  108 +
105 109 @Value("${sql.ts.batch_size:1000}")
106 110 protected int tsBatchSize;
107 111
... ... @@ -124,10 +128,11 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
124 128 .batchSize(tsLatestBatchSize)
125 129 .maxDelay(tsLatestMaxDelay)
126 130 .statsPrintIntervalMs(tsLatestStatsPrintIntervalMs)
  131 + .statsNamePrefix("ts.latest")
127 132 .build();
128 133
129 134 java.util.function.Function<TsKvLatestEntity, Integer> hashcodeFunction = entity -> entity.getEntityId().hashCode();
130   - tsLatestQueue = new TbSqlBlockingQueueWrapper<>(tsLatestParams, hashcodeFunction, tsLatestBatchThreads);
  135 + tsLatestQueue = new TbSqlBlockingQueueWrapper<>(tsLatestParams, hashcodeFunction, tsLatestBatchThreads, statsFactory);
131 136
132 137 tsLatestQueue.init(logExecutor, v -> {
133 138 Map<TsKey, TsKvLatestEntity> trueLatest = new HashMap<>();
... ...
... ... @@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.kv.Aggregation;
31 31 import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
32 32 import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
33 33 import org.thingsboard.server.common.data.kv.TsKvEntry;
  34 +import org.thingsboard.server.common.stats.StatsFactory;
34 35 import org.thingsboard.server.dao.DaoUtil;
35 36 import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity;
36 37 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
... ... @@ -62,6 +63,9 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
62 63 private AggregationRepository aggregationRepository;
63 64
64 65 @Autowired
  66 + private StatsFactory statsFactory;
  67 +
  68 + @Autowired
65 69 protected InsertTsRepository<TimescaleTsKvEntity> insertRepository;
66 70
67 71 protected TbSqlBlockingQueueWrapper<TimescaleTsKvEntity> tsQueue;
... ... @@ -74,10 +78,11 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
74 78 .batchSize(tsBatchSize)
75 79 .maxDelay(tsMaxDelay)
76 80 .statsPrintIntervalMs(tsStatsPrintIntervalMs)
  81 + .statsNamePrefix("ts.timescale")
77 82 .build();
78 83
79 84 Function<TimescaleTsKvEntity, Integer> hashcodeFunction = entity -> entity.getEntityId().hashCode();
80   - tsQueue = new TbSqlBlockingQueueWrapper<>(tsParams, hashcodeFunction, timescaleBatchThreads);
  85 + tsQueue = new TbSqlBlockingQueueWrapper<>(tsParams, hashcodeFunction, timescaleBatchThreads, statsFactory);
81 86
82 87 tsQueue.init(logExecutor, v -> insertRepository.saveOrUpdate(v));
83 88 }
... ...
... ... @@ -30,6 +30,8 @@ import com.google.common.util.concurrent.SettableFuture;
30 30 import lombok.extern.slf4j.Slf4j;
31 31 import org.thingsboard.common.util.ThingsBoardThreadFactory;
32 32 import org.thingsboard.server.common.data.id.TenantId;
  33 +import org.thingsboard.server.common.stats.StatsFactory;
  34 +import org.thingsboard.server.common.stats.StatsType;
33 35 import org.thingsboard.server.common.msg.tools.TbRateLimits;
34 36 import org.thingsboard.server.dao.nosql.CassandraStatementTask;
35 37
... ... @@ -53,6 +55,8 @@ import java.util.regex.Matcher;
53 55 @Slf4j
54 56 public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extends ListenableFuture<V>, V> implements BufferedRateExecutor<T, F> {
55 57
  58 + public static final String CONCURRENCY_LEVEL = "currBuffer";
  59 +
56 60 private final long maxWaitTime;
57 61 private final long pollMs;
58 62 private final BlockingQueue<AsyncTaskContext<T, V>> queue;
... ... @@ -64,20 +68,14 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
64 68 private final boolean perTenantLimitsEnabled;
65 69 private final String perTenantLimitsConfiguration;
66 70 private final ConcurrentMap<TenantId, TbRateLimits> perTenantLimits = new ConcurrentHashMap<>();
67   - protected final ConcurrentMap<TenantId, AtomicInteger> rateLimitedTenants = new ConcurrentHashMap<>();
68 71
69   - protected final AtomicInteger concurrencyLevel = new AtomicInteger();
70   - protected final AtomicInteger totalAdded = new AtomicInteger();
71   - protected final AtomicInteger totalLaunched = new AtomicInteger();
72   - protected final AtomicInteger totalReleased = new AtomicInteger();
73   - protected final AtomicInteger totalFailed = new AtomicInteger();
74   - protected final AtomicInteger totalExpired = new AtomicInteger();
75   - protected final AtomicInteger totalRejected = new AtomicInteger();
76   - protected final AtomicInteger totalRateLimited = new AtomicInteger();
77   - protected final AtomicInteger printQueriesIdx = new AtomicInteger();
  72 + private final AtomicInteger printQueriesIdx = new AtomicInteger(0);
  73 +
  74 + protected final AtomicInteger concurrencyLevel;
  75 + protected final BufferedRateExecutorStats stats;
78 76
79 77 public AbstractBufferedRateExecutor(int queueLimit, int concurrencyLimit, long maxWaitTime, int dispatcherThreads, int callbackThreads, long pollMs,
80   - boolean perTenantLimitsEnabled, String perTenantLimitsConfiguration, int printQueriesFreq) {
  78 + boolean perTenantLimitsEnabled, String perTenantLimitsConfiguration, int printQueriesFreq, StatsFactory statsFactory) {
81 79 this.maxWaitTime = maxWaitTime;
82 80 this.pollMs = pollMs;
83 81 this.concurrencyLimit = concurrencyLimit;
... ... @@ -88,6 +86,10 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
88 86 this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("nosql-timeout"));
89 87 this.perTenantLimitsEnabled = perTenantLimitsEnabled;
90 88 this.perTenantLimitsConfiguration = perTenantLimitsConfiguration;
  89 + this.stats = new BufferedRateExecutorStats(statsFactory);
  90 + String concurrencyLevelKey = StatsType.RATE_EXECUTOR.getName() + "." + CONCURRENCY_LEVEL;
  91 + this.concurrencyLevel = statsFactory.createGauge(concurrencyLevelKey, new AtomicInteger(0));
  92 +
91 93 for (int i = 0; i < dispatcherThreads; i++) {
92 94 dispatcherExecutor.submit(this::dispatch);
93 95 }
... ... @@ -104,8 +106,8 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
104 106 } else if (!task.getTenantId().isNullUid()) {
105 107 TbRateLimits rateLimits = perTenantLimits.computeIfAbsent(task.getTenantId(), id -> new TbRateLimits(perTenantLimitsConfiguration));
106 108 if (!rateLimits.tryConsume()) {
107   - rateLimitedTenants.computeIfAbsent(task.getTenantId(), tId -> new AtomicInteger(0)).incrementAndGet();
108   - totalRateLimited.incrementAndGet();
  109 + stats.incrementRateLimitedTenant(task.getTenantId());
  110 + stats.getTotalRateLimited().increment();
109 111 settableFuture.setException(new TenantRateLimitException());
110 112 perTenantLimitReached = true;
111 113 }
... ... @@ -113,10 +115,10 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
113 115 }
114 116 if (!perTenantLimitReached) {
115 117 try {
116   - totalAdded.incrementAndGet();
  118 + stats.getTotalAdded().increment();
117 119 queue.add(new AsyncTaskContext<>(UUID.randomUUID(), task, settableFuture, System.currentTimeMillis()));
118 120 } catch (IllegalStateException e) {
119   - totalRejected.incrementAndGet();
  121 + stats.getTotalRejected().increment();
120 122 settableFuture.setException(e);
121 123 }
122 124 }
... ... @@ -161,14 +163,14 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
161 163 concurrencyLevel.incrementAndGet();
162 164 long timeout = finalTaskCtx.getCreateTime() + maxWaitTime - System.currentTimeMillis();
163 165 if (timeout > 0) {
164   - totalLaunched.incrementAndGet();
  166 + stats.getTotalLaunched().increment();
165 167 ListenableFuture<V> result = execute(finalTaskCtx);
166 168 result = Futures.withTimeout(result, timeout, TimeUnit.MILLISECONDS, timeoutExecutor);
167 169 Futures.addCallback(result, new FutureCallback<V>() {
168 170 @Override
169 171 public void onSuccess(@Nullable V result) {
170 172 logTask("Releasing", finalTaskCtx);
171   - totalReleased.incrementAndGet();
  173 + stats.getTotalReleased().increment();
172 174 concurrencyLevel.decrementAndGet();
173 175 finalTaskCtx.getFuture().set(result);
174 176 }
... ... @@ -180,7 +182,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
180 182 } else {
181 183 logTask("Failed", finalTaskCtx);
182 184 }
183   - totalFailed.incrementAndGet();
  185 + stats.getTotalFailed().increment();
184 186 concurrencyLevel.decrementAndGet();
185 187 finalTaskCtx.getFuture().setException(t);
186 188 log.debug("[{}] Failed to execute task: {}", finalTaskCtx.getId(), finalTaskCtx.getTask(), t);
... ... @@ -188,7 +190,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
188 190 }, callbackExecutor);
189 191 } else {
190 192 logTask("Expired Before Execution", finalTaskCtx);
191   - totalExpired.incrementAndGet();
  193 + stats.getTotalExpired().increment();
192 194 concurrencyLevel.decrementAndGet();
193 195 taskCtx.getFuture().setException(new TimeoutException());
194 196 }
... ... @@ -200,7 +202,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
200 202 } catch (Throwable e) {
201 203 if (taskCtx != null) {
202 204 log.debug("[{}] Failed to execute task: {}", taskCtx.getId(), taskCtx, e);
203   - totalFailed.incrementAndGet();
  205 + stats.getTotalFailed().increment();
204 206 concurrencyLevel.decrementAndGet();
205 207 } else {
206 208 log.debug("Failed to queue task:", e);
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.util;
  17 +
  18 +import lombok.Getter;
  19 +import lombok.extern.slf4j.Slf4j;
  20 +import org.thingsboard.server.common.data.id.TenantId;
  21 +import org.thingsboard.server.common.stats.DefaultCounter;
  22 +import org.thingsboard.server.common.stats.StatsCounter;
  23 +import org.thingsboard.server.common.stats.StatsFactory;
  24 +import org.thingsboard.server.common.stats.StatsType;
  25 +
  26 +import java.util.ArrayList;
  27 +import java.util.List;
  28 +import java.util.concurrent.ConcurrentHashMap;
  29 +import java.util.concurrent.ConcurrentMap;
  30 +
  31 +@Slf4j
  32 +@Getter
  33 +public class BufferedRateExecutorStats {
  34 + private static final String TENANT_ID_TAG = "tenantId";
  35 +
  36 +
  37 + private static final String TOTAL_ADDED = "totalAdded";
  38 + private static final String TOTAL_LAUNCHED = "totalLaunched";
  39 + private static final String TOTAL_RELEASED = "totalReleased";
  40 + private static final String TOTAL_FAILED = "totalFailed";
  41 + private static final String TOTAL_EXPIRED = "totalExpired";
  42 + private static final String TOTAL_REJECTED = "totalRejected";
  43 + private static final String TOTAL_RATE_LIMITED = "totalRateLimited";
  44 +
  45 + private final StatsFactory statsFactory;
  46 +
  47 + private final ConcurrentMap<TenantId, DefaultCounter> rateLimitedTenants = new ConcurrentHashMap<>();
  48 +
  49 + private final List<StatsCounter> statsCounters = new ArrayList<>();
  50 +
  51 + private final StatsCounter totalAdded;
  52 + private final StatsCounter totalLaunched;
  53 + private final StatsCounter totalReleased;
  54 + private final StatsCounter totalFailed;
  55 + private final StatsCounter totalExpired;
  56 + private final StatsCounter totalRejected;
  57 + private final StatsCounter totalRateLimited;
  58 +
  59 + public BufferedRateExecutorStats(StatsFactory statsFactory) {
  60 + this.statsFactory = statsFactory;
  61 +
  62 + String key = StatsType.RATE_EXECUTOR.getName();
  63 +
  64 + this.totalAdded = statsFactory.createStatsCounter(key, TOTAL_ADDED);
  65 + this.totalLaunched = statsFactory.createStatsCounter(key, TOTAL_LAUNCHED);
  66 + this.totalReleased = statsFactory.createStatsCounter(key, TOTAL_RELEASED);
  67 + this.totalFailed = statsFactory.createStatsCounter(key, TOTAL_FAILED);
  68 + this.totalExpired = statsFactory.createStatsCounter(key, TOTAL_EXPIRED);
  69 + this.totalRejected = statsFactory.createStatsCounter(key, TOTAL_REJECTED);
  70 + this.totalRateLimited = statsFactory.createStatsCounter(key, TOTAL_RATE_LIMITED);
  71 +
  72 + this.statsCounters.add(totalAdded);
  73 + this.statsCounters.add(totalLaunched);
  74 + this.statsCounters.add(totalReleased);
  75 + this.statsCounters.add(totalFailed);
  76 + this.statsCounters.add(totalExpired);
  77 + this.statsCounters.add(totalRejected);
  78 + this.statsCounters.add(totalRateLimited);
  79 + }
  80 +
  81 + public void incrementRateLimitedTenant(TenantId tenantId){
  82 + rateLimitedTenants.computeIfAbsent(tenantId,
  83 + tId -> {
  84 + String key = StatsType.RATE_EXECUTOR.getName() + ".tenant";
  85 + return statsFactory.createDefaultCounter(key, TENANT_ID_TAG, tId.toString());
  86 + }
  87 + )
  88 + .increment();
  89 + }
  90 +}
... ...
... ... @@ -843,6 +843,11 @@
843 843 <version>${project.version}</version>
844 844 </dependency>
845 845 <dependency>
  846 + <groupId>org.thingsboard.common</groupId>
  847 + <artifactId>stats</artifactId>
  848 + <version>${project.version}</version>
  849 + </dependency>
  850 + <dependency>
846 851 <groupId>org.thingsboard</groupId>
847 852 <artifactId>tools</artifactId>
848 853 <version>${project.version}</version>
... ...
... ... @@ -16,8 +16,6 @@
16 16 package org.thingsboard.rule.engine.mqtt;
17 17
18 18 import io.netty.buffer.Unpooled;
19   -import io.netty.channel.EventLoopGroup;
20   -import io.netty.channel.nio.NioEventLoopGroup;
21 19 import io.netty.handler.codec.mqtt.MqttQoS;
22 20 import io.netty.handler.ssl.SslContext;
23 21 import io.netty.handler.ssl.SslContextBuilder;
... ... @@ -36,7 +34,6 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
36 34 import javax.net.ssl.SSLException;
37 35 import java.nio.charset.Charset;
38 36 import java.util.Optional;
39   -import java.util.concurrent.ExecutionException;
40 37 import java.util.concurrent.TimeUnit;
41 38 import java.util.concurrent.TimeoutException;
42 39
... ... @@ -57,14 +54,14 @@ public class TbMqttNode implements TbNode {
57 54
58 55 private static final String ERROR = "error";
59 56
60   - private TbMqttNodeConfiguration config;
  57 + protected TbMqttNodeConfiguration mqttNodeConfiguration;
61 58
62   - private MqttClient mqttClient;
  59 + protected MqttClient mqttClient;
63 60
64 61 @Override
65 62 public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
66 63 try {
67   - this.config = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class);
  64 + this.mqttNodeConfiguration = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class);
68 65 this.mqttClient = initClient(ctx);
69 66 } catch (Exception e) {
70 67 throw new TbNodeException(e);
... ... @@ -73,7 +70,7 @@ public class TbMqttNode implements TbNode {
73 70
74 71 @Override
75 72 public void onMsg(TbContext ctx, TbMsg msg) {
76   - String topic = TbNodeUtils.processPattern(this.config.getTopicPattern(), msg.getMetaData());
  73 + String topic = TbNodeUtils.processPattern(this.mqttNodeConfiguration.getTopicPattern(), msg.getMetaData());
77 74 this.mqttClient.publish(topic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE)
78 75 .addListener(future -> {
79 76 if (future.isSuccess()) {
... ... @@ -99,38 +96,38 @@ public class TbMqttNode implements TbNode {
99 96 }
100 97 }
101 98
102   - private MqttClient initClient(TbContext ctx) throws Exception {
  99 + protected MqttClient initClient(TbContext ctx) throws Exception {
103 100 Optional<SslContext> sslContextOpt = initSslContext();
104 101 MqttClientConfig config = sslContextOpt.isPresent() ? new MqttClientConfig(sslContextOpt.get()) : new MqttClientConfig();
105   - if (!StringUtils.isEmpty(this.config.getClientId())) {
106   - config.setClientId(this.config.getClientId());
  102 + if (!StringUtils.isEmpty(this.mqttNodeConfiguration.getClientId())) {
  103 + config.setClientId(this.mqttNodeConfiguration.getClientId());
107 104 }
108   - config.setCleanSession(this.config.isCleanSession());
109   - this.config.getCredentials().configure(config);
  105 + config.setCleanSession(this.mqttNodeConfiguration.isCleanSession());
  106 + this.mqttNodeConfiguration.getCredentials().configure(config);
110 107 MqttClient client = MqttClient.create(config, null);
111 108 client.setEventLoop(ctx.getSharedEventLoop());
112   - Future<MqttConnectResult> connectFuture = client.connect(this.config.getHost(), this.config.getPort());
  109 + Future<MqttConnectResult> connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort());
113 110 MqttConnectResult result;
114 111 try {
115   - result = connectFuture.get(this.config.getConnectTimeoutSec(), TimeUnit.SECONDS);
  112 + result = connectFuture.get(this.mqttNodeConfiguration.getConnectTimeoutSec(), TimeUnit.SECONDS);
116 113 } catch (TimeoutException ex) {
117 114 connectFuture.cancel(true);
118 115 client.disconnect();
119   - String hostPort = this.config.getHost() + ":" + this.config.getPort();
  116 + String hostPort = this.mqttNodeConfiguration.getHost() + ":" + this.mqttNodeConfiguration.getPort();
120 117 throw new RuntimeException(String.format("Failed to connect to MQTT broker at %s.", hostPort));
121 118 }
122 119 if (!result.isSuccess()) {
123 120 connectFuture.cancel(true);
124 121 client.disconnect();
125   - String hostPort = this.config.getHost() + ":" + this.config.getPort();
  122 + String hostPort = this.mqttNodeConfiguration.getHost() + ":" + this.mqttNodeConfiguration.getPort();
126 123 throw new RuntimeException(String.format("Failed to connect to MQTT broker at %s. Result code is: %s", hostPort, result.getReturnCode()));
127 124 }
128 125 return client;
129 126 }
130 127
131 128 private Optional<SslContext> initSslContext() throws SSLException {
132   - Optional<SslContext> result = this.config.getCredentials().initSslContext();
133   - if (this.config.isSsl() && !result.isPresent()) {
  129 + Optional<SslContext> result = this.mqttNodeConfiguration.getCredentials().initSslContext();
  130 + if (this.mqttNodeConfiguration.isSsl() && !result.isPresent()) {
134 131 result = Optional.of(SslContextBuilder.forClient().build());
135 132 }
136 133 return result;
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.mqtt.azure;
  17 +
  18 +import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
  19 +import io.netty.handler.ssl.ClientAuth;
  20 +import io.netty.handler.ssl.SslContext;
  21 +import io.netty.handler.ssl.SslContextBuilder;
  22 +import lombok.Data;
  23 +import lombok.extern.slf4j.Slf4j;
  24 +import org.apache.commons.codec.binary.Base64;
  25 +import org.bouncycastle.jce.provider.BouncyCastleProvider;
  26 +import org.thingsboard.common.util.AzureIotHubUtil;
  27 +import org.thingsboard.mqtt.MqttClientConfig;
  28 +import org.thingsboard.rule.engine.mqtt.credentials.MqttClientCredentials;
  29 +
  30 +import javax.net.ssl.TrustManagerFactory;
  31 +import java.io.ByteArrayInputStream;
  32 +import java.security.KeyStore;
  33 +import java.security.Security;
  34 +import java.security.cert.CertificateFactory;
  35 +import java.security.cert.X509Certificate;
  36 +import java.util.Optional;
  37 +
  38 +@Data
  39 +@Slf4j
  40 +@JsonIgnoreProperties(ignoreUnknown = true)
  41 +public class AzureIotHubSasCredentials implements MqttClientCredentials {
  42 + private String sasKey;
  43 + private String caCert;
  44 +
  45 + @Override
  46 + public Optional<SslContext> initSslContext() {
  47 + try {
  48 + Security.addProvider(new BouncyCastleProvider());
  49 + if (caCert == null || caCert.isEmpty()) {
  50 + caCert = AzureIotHubUtil.getDefaultCaCert();
  51 + }
  52 + return Optional.of(SslContextBuilder.forClient()
  53 + .trustManager(createAndInitTrustManagerFactory())
  54 + .clientAuth(ClientAuth.REQUIRE)
  55 + .build());
  56 + } catch (Exception e) {
  57 + log.error("[{}] Creating TLS factory failed!", caCert, e);
  58 + throw new RuntimeException("Creating TLS factory failed!", e);
  59 + }
  60 + }
  61 +
  62 + @Override
  63 + public void configure(MqttClientConfig config) {
  64 + }
  65 +
  66 + private TrustManagerFactory createAndInitTrustManagerFactory() throws Exception {
  67 + X509Certificate caCertHolder;
  68 + caCertHolder = readCertFile(caCert);
  69 +
  70 + KeyStore caKeyStore = KeyStore.getInstance(KeyStore.getDefaultType());
  71 + caKeyStore.load(null, null);
  72 + caKeyStore.setCertificateEntry("caCert-cert", caCertHolder);
  73 +
  74 + TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
  75 + trustManagerFactory.init(caKeyStore);
  76 + return trustManagerFactory;
  77 + }
  78 +
  79 + private X509Certificate readCertFile(String fileContent) throws Exception {
  80 + X509Certificate certificate = null;
  81 + if (fileContent != null && !fileContent.trim().isEmpty()) {
  82 + fileContent = fileContent.replace("-----BEGIN CERTIFICATE-----", "")
  83 + .replace("-----END CERTIFICATE-----", "")
  84 + .replaceAll("\\s", "");
  85 + byte[] decoded = Base64.decodeBase64(fileContent);
  86 + CertificateFactory certFactory = CertificateFactory.getInstance("X.509");
  87 + certificate = (X509Certificate) certFactory.generateCertificate(new ByteArrayInputStream(decoded));
  88 + }
  89 + return certificate;
  90 + }
  91 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.mqtt.azure;
  17 +
  18 +import io.netty.handler.codec.mqtt.MqttVersion;
  19 +import io.netty.handler.ssl.SslContext;
  20 +import lombok.extern.slf4j.Slf4j;
  21 +import org.thingsboard.common.util.AzureIotHubUtil;
  22 +import org.thingsboard.mqtt.MqttClientConfig;
  23 +import org.thingsboard.rule.engine.api.RuleNode;
  24 +import org.thingsboard.rule.engine.api.TbContext;
  25 +import org.thingsboard.rule.engine.api.TbNodeConfiguration;
  26 +import org.thingsboard.rule.engine.api.TbNodeException;
  27 +import org.thingsboard.rule.engine.api.util.TbNodeUtils;
  28 +import org.thingsboard.rule.engine.mqtt.TbMqttNode;
  29 +import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration;
  30 +import org.thingsboard.rule.engine.mqtt.credentials.CertPemClientCredentials;
  31 +import org.thingsboard.rule.engine.mqtt.credentials.MqttClientCredentials;
  32 +import org.thingsboard.server.common.data.plugin.ComponentType;
  33 +
  34 +import java.util.Optional;
  35 +
  36 +@Slf4j
  37 +@RuleNode(
  38 + type = ComponentType.EXTERNAL,
  39 + name = "azure iot hub",
  40 + configClazz = TbAzureIotHubNodeConfiguration.class,
  41 + nodeDescription = "Publish messages to the Azure IoT Hub",
  42 + nodeDetails = "Will publish message payload to the Azure IoT Hub with QoS <b>AT_LEAST_ONCE</b>.",
  43 + uiResources = {"static/rulenode/rulenode-core-config.js", "static/rulenode/rulenode-core-config.css"},
  44 + configDirective = "tbActionNodeAzureIotHubConfig"
  45 +)
  46 +public class TbAzureIotHubNode extends TbMqttNode {
  47 + @Override
  48 + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
  49 + try {
  50 + this.mqttNodeConfiguration = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class);
  51 + mqttNodeConfiguration.setPort(8883);
  52 + mqttNodeConfiguration.setCleanSession(true);
  53 + MqttClientCredentials credentials = mqttNodeConfiguration.getCredentials();
  54 + mqttNodeConfiguration.setCredentials(new MqttClientCredentials() {
  55 + @Override
  56 + public Optional<SslContext> initSslContext() {
  57 + if (credentials instanceof AzureIotHubSasCredentials) {
  58 + AzureIotHubSasCredentials sasCredentials = (AzureIotHubSasCredentials) credentials;
  59 + if (sasCredentials.getCaCert() == null || sasCredentials.getCaCert().isEmpty()) {
  60 + sasCredentials.setCaCert(AzureIotHubUtil.getDefaultCaCert());
  61 + }
  62 + } else if (credentials instanceof CertPemClientCredentials) {
  63 + CertPemClientCredentials pemCredentials = (CertPemClientCredentials) credentials;
  64 + if (pemCredentials.getCaCert() == null || pemCredentials.getCaCert().isEmpty()) {
  65 + pemCredentials.setCaCert(AzureIotHubUtil.getDefaultCaCert());
  66 + }
  67 + }
  68 + return credentials.initSslContext();
  69 + }
  70 +
  71 + @Override
  72 + public void configure(MqttClientConfig config) {
  73 + config.setProtocolVersion(MqttVersion.MQTT_3_1_1);
  74 + config.setUsername(AzureIotHubUtil.buildUsername(mqttNodeConfiguration.getHost(), config.getClientId()));
  75 + if (credentials instanceof AzureIotHubSasCredentials) {
  76 + AzureIotHubSasCredentials sasCredentials = (AzureIotHubSasCredentials) credentials;
  77 + config.setPassword(AzureIotHubUtil.buildSasToken(mqttNodeConfiguration.getHost(), sasCredentials.getSasKey()));
  78 + }
  79 + }
  80 + });
  81 +
  82 + this.mqttClient = initClient(ctx);
  83 + } catch (Exception e) {
  84 + throw new TbNodeException(e);
  85 + } }
  86 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.mqtt.azure;
  17 +
  18 +import lombok.Data;
  19 +import org.thingsboard.rule.engine.api.NodeConfiguration;
  20 +import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration;
  21 +import org.thingsboard.rule.engine.mqtt.credentials.AnonymousCredentials;
  22 +import org.thingsboard.rule.engine.mqtt.credentials.MqttClientCredentials;
  23 +
  24 +@Data
  25 +public class TbAzureIotHubNodeConfiguration extends TbMqttNodeConfiguration {
  26 +
  27 + @Override
  28 + public TbAzureIotHubNodeConfiguration defaultConfiguration() {
  29 + TbAzureIotHubNodeConfiguration configuration = new TbAzureIotHubNodeConfiguration();
  30 + configuration.setTopicPattern("devices/<device_id>/messages/events/");
  31 + configuration.setHost("<iot-hub-name>.azure-devices.net");
  32 + configuration.setPort(8883);
  33 + configuration.setConnectTimeoutSec(10);
  34 + configuration.setCleanSession(true);
  35 + configuration.setSsl(true);
  36 + configuration.setCredentials(new AzureIotHubSasCredentials());
  37 + return configuration;
  38 + }
  39 +
  40 +}
... ...
... ... @@ -166,7 +166,7 @@ public class CertPemClientCredentials implements MqttClientCredentials {
166 166
167 167 private KeySpec getKeySpec(byte[] encodedKey) throws Exception {
168 168 KeySpec keySpec;
169   - if (password == null) {
  169 + if (password == null || password.isEmpty()) {
170 170 keySpec = new PKCS8EncodedKeySpec(encodedKey);
171 171 } else {
172 172 PBEKeySpec pbeKeySpec = new PBEKeySpec(password.toCharArray());
... ...
... ... @@ -19,6 +19,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
19 19 import com.fasterxml.jackson.annotation.JsonTypeInfo;
20 20 import io.netty.handler.ssl.SslContext;
21 21 import org.thingsboard.mqtt.MqttClientConfig;
  22 +import org.thingsboard.rule.engine.mqtt.azure.AzureIotHubSasCredentials;
22 23
23 24 import java.util.Optional;
24 25
... ... @@ -29,6 +30,7 @@ import java.util.Optional;
29 30 @JsonSubTypes({
30 31 @JsonSubTypes.Type(value = AnonymousCredentials.class, name = "anonymous"),
31 32 @JsonSubTypes.Type(value = BasicCredentials.class, name = "basic"),
  33 + @JsonSubTypes.Type(value = AzureIotHubSasCredentials.class, name = "sas"),
32 34 @JsonSubTypes.Type(value = CertPemClientCredentials.class, name = "cert.PEM")})
33 35 public interface MqttClientCredentials {
34 36
... ...
... ... @@ -14,8 +14,16 @@
14 14 # limitations under the License.
15 15 #
16 16
17   -spring.main.web-environment: false
18   -spring.main.web-application-type: none
  17 +# If you enabled process metrics you should also enable 'web-environment'.
  18 +spring.main.web-environment: "${WEB_APPLICATION_ENABLE:false}"
  19 +# If you enabled process metrics you should set 'web-application-type' to 'servlet' value.
  20 +spring.main.web-application-type: "${WEB_APPLICATION_TYPE:none}"
  21 +
  22 +server:
  23 + # Server bind address (has no effect if web-environment is disabled).
  24 + address: "${HTTP_BIND_ADDRESS:0.0.0.0}"
  25 + # Server bind port (has no effect if web-environment is disabled).
  26 + port: "${HTTP_BIND_PORT:8083}"
19 27
20 28 # Zookeeper connection parameters. Used for service discovery.
21 29 zk:
... ... @@ -205,10 +213,22 @@ queue:
205 213 transport:
206 214 # For high priority notifications that require minimum latency and processing time
207 215 notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"
208   - poll_interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
  216 + poll_interval: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_POLL_INTERVAL_MS:25}"
209 217
210 218 service:
211 219 type: "${TB_SERVICE_TYPE:tb-transport}"
212 220 # Unique id for this service (autogenerated if empty)
213 221 id: "${TB_SERVICE_ID:}"
214   - tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
\ No newline at end of file
  222 + tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
  223 +
  224 +
  225 +metrics:
  226 + # Enable/disable actuator metrics.
  227 + enabled: "${METRICS_ENABLED:false}"
  228 +
  229 +management:
  230 + endpoints:
  231 + web:
  232 + exposure:
  233 + # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics).
  234 + include: '${METRICS_ENDPOINTS_EXPOSE:info}'
\ No newline at end of file
... ...
... ... @@ -206,10 +206,22 @@ queue:
206 206 transport:
207 207 # For high priority notifications that require minimum latency and processing time
208 208 notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"
209   - poll_interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
  209 + poll_interval: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_POLL_INTERVAL_MS:25}"
210 210
211 211 service:
212 212 type: "${TB_SERVICE_TYPE:tb-transport}"
213 213 # Unique id for this service (autogenerated if empty)
214 214 id: "${TB_SERVICE_ID:}"
215   - tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
\ No newline at end of file
  215 + tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
  216 +
  217 +
  218 +metrics:
  219 + # Enable/disable actuator metrics.
  220 + enabled: "${METRICS_ENABLED:false}"
  221 +
  222 +management:
  223 + endpoints:
  224 + web:
  225 + exposure:
  226 + # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics).
  227 + include: '${METRICS_ENDPOINTS_EXPOSE:info}'
\ No newline at end of file
... ...
... ... @@ -14,8 +14,16 @@
14 14 # limitations under the License.
15 15 #
16 16
17   -spring.main.web-environment: false
18   -spring.main.web-application-type: none
  17 +# If you enabled process metrics you should also enable 'web-environment'.
  18 +spring.main.web-environment: "${WEB_APPLICATION_ENABLE:false}"
  19 +# If you enabled process metrics you should set 'web-application-type' to 'servlet' value.
  20 +spring.main.web-application-type: "${WEB_APPLICATION_TYPE:none}"
  21 +
  22 +server:
  23 + # Server bind address (has no effect if web-environment is disabled).
  24 + address: "${HTTP_BIND_ADDRESS:0.0.0.0}"
  25 + # Server bind port (has no effect if web-environment is disabled).
  26 + port: "${HTTP_BIND_PORT:8083}"
19 27
20 28 # Zookeeper connection parameters. Used for service discovery.
21 29 zk:
... ... @@ -226,10 +234,21 @@ queue:
226 234 transport:
227 235 # For high priority notifications that require minimum latency and processing time
228 236 notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"
229   - poll_interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
  237 + poll_interval: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_POLL_INTERVAL_MS:25}"
230 238
231 239 service:
232 240 type: "${TB_SERVICE_TYPE:tb-transport}"
233 241 # Unique id for this service (autogenerated if empty)
234 242 id: "${TB_SERVICE_ID:}"
235   - tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
\ No newline at end of file
  243 + tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
  244 +
  245 +metrics:
  246 + # Enable/disable actuator metrics.
  247 + enabled: "${METRICS_ENABLED:false}"
  248 +
  249 +management:
  250 + endpoints:
  251 + web:
  252 + exposure:
  253 + # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics).
  254 + include: '${METRICS_ENDPOINTS_EXPOSE:info}'
\ No newline at end of file
... ...