Commit 24f5872adf3f50f36629ef0d5fb3b791c92be838

Authored by Andrii Shvaika
1 parent 2e694e1d

Queue interfaces with no implementation

@@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
16 package org.thingsboard.server.queue.provider; 16 package org.thingsboard.server.queue.provider;
17 17
18 import org.thingsboard.server.gen.js.JsInvokeProtos; 18 import org.thingsboard.server.gen.js.JsInvokeProtos;
  19 +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
19 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; 20 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
20 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; 21 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
21 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; 22 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
@@ -33,7 +34,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -33,7 +34,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
33 * Responsible for initialization of various Producers and Consumers used by TB Core Node. 34 * Responsible for initialization of various Producers and Consumers used by TB Core Node.
34 * Implementation Depends on the queue queue.type from yml or TB_QUEUE_TYPE environment variable 35 * Implementation Depends on the queue queue.type from yml or TB_QUEUE_TYPE environment variable
35 */ 36 */
36 -public interface TbCoreQueueFactory { 37 +public interface TbCoreQueueFactory extends TbUsageStatsClientQueueFactory {
37 38
38 /** 39 /**
39 * Used to push messages to instances of TB Transport Service 40 * Used to push messages to instances of TB Transport Service
@@ -78,6 +79,16 @@ public interface TbCoreQueueFactory { @@ -78,6 +79,16 @@ public interface TbCoreQueueFactory {
78 TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer(); 79 TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer();
79 80
80 /** 81 /**
  82 + * Used to consume messages about usage statistics by TB Core Service
  83 + *
  84 + * @return
  85 + */
  86 + default TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
  87 + //TODO: implement
  88 + return null;
  89 + }
  90 +
  91 + /**
81 * Used to consume high priority messages by TB Core Service 92 * Used to consume high priority messages by TB Core Service
82 * 93 *
83 * @return 94 * @return
@@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
15 */ 15 */
16 package org.thingsboard.server.queue.provider; 16 package org.thingsboard.server.queue.provider;
17 17
  18 +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
18 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; 19 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
19 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; 20 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
20 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; 21 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
@@ -63,4 +64,14 @@ public interface TbQueueProducerProvider { @@ -63,4 +64,14 @@ public interface TbQueueProducerProvider {
63 */ 64 */
64 TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> getTbCoreNotificationsMsgProducer(); 65 TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> getTbCoreNotificationsMsgProducer();
65 66
  67 + /**
  68 + * Used to push messages to other instances of TB Core Service
  69 + *
  70 + * @return
  71 + */
  72 + default TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> getTbUsageStatsMsgProducer() {
  73 + //TODO: implement
  74 + return null;
  75 + }
  76 +
66 } 77 }
@@ -32,7 +32,7 @@ import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; @@ -32,7 +32,7 @@ import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
32 * Responsible for initialization of various Producers and Consumers used by TB Core Node. 32 * Responsible for initialization of various Producers and Consumers used by TB Core Node.
33 * Implementation Depends on the queue queue.type from yml or TB_QUEUE_TYPE environment variable 33 * Implementation Depends on the queue queue.type from yml or TB_QUEUE_TYPE environment variable
34 */ 34 */
35 -public interface TbRuleEngineQueueFactory { 35 +public interface TbRuleEngineQueueFactory extends TbUsageStatsClientQueueFactory {
36 36
37 /** 37 /**
38 * Used to push messages to instances of TB Transport Service 38 * Used to push messages to instances of TB Transport Service
@@ -15,17 +15,17 @@ @@ -15,17 +15,17 @@
15 */ 15 */
16 package org.thingsboard.server.queue.provider; 16 package org.thingsboard.server.queue.provider;
17 17
18 -import org.thingsboard.server.queue.TbQueueConsumer;  
19 -import org.thingsboard.server.queue.TbQueueProducer;  
20 -import org.thingsboard.server.queue.TbQueueRequestTemplate;  
21 -import org.thingsboard.server.queue.common.TbProtoQueueMsg;  
22 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; 18 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
23 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; 19 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
24 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; 20 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
25 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; 21 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
26 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; 22 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
  23 +import org.thingsboard.server.queue.TbQueueConsumer;
  24 +import org.thingsboard.server.queue.TbQueueProducer;
  25 +import org.thingsboard.server.queue.TbQueueRequestTemplate;
  26 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
27 27
28 -public interface TbTransportQueueFactory { 28 +public interface TbTransportQueueFactory extends TbUsageStatsClientQueueFactory {
29 29
30 TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiRequestTemplate(); 30 TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiRequestTemplate();
31 31
@@ -30,6 +30,7 @@ public class TbTransportQueueProducerProvider implements TbQueueProducerProvider @@ -30,6 +30,7 @@ public class TbTransportQueueProducerProvider implements TbQueueProducerProvider
30 private final TbTransportQueueFactory tbQueueProvider; 30 private final TbTransportQueueFactory tbQueueProvider;
31 private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> toRuleEngine; 31 private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> toRuleEngine;
32 private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> toTbCore; 32 private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> toTbCore;
  33 + private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> toUsageStats;
33 34
34 public TbTransportQueueProducerProvider(TbTransportQueueFactory tbQueueProvider) { 35 public TbTransportQueueProducerProvider(TbTransportQueueFactory tbQueueProvider) {
35 this.tbQueueProvider = tbQueueProvider; 36 this.tbQueueProvider = tbQueueProvider;
@@ -39,6 +40,7 @@ public class TbTransportQueueProducerProvider implements TbQueueProducerProvider @@ -39,6 +40,7 @@ public class TbTransportQueueProducerProvider implements TbQueueProducerProvider
39 public void init() { 40 public void init() {
40 this.toTbCore = tbQueueProvider.createTbCoreMsgProducer(); 41 this.toTbCore = tbQueueProvider.createTbCoreMsgProducer();
41 this.toRuleEngine = tbQueueProvider.createRuleEngineMsgProducer(); 42 this.toRuleEngine = tbQueueProvider.createRuleEngineMsgProducer();
  43 + this.toUsageStats = tbQueueProvider.createToUsageStatsServiceMsgProducer();
42 } 44 }
43 45
44 @Override 46 @Override
@@ -65,4 +67,9 @@ public class TbTransportQueueProducerProvider implements TbQueueProducerProvider @@ -65,4 +67,9 @@ public class TbTransportQueueProducerProvider implements TbQueueProducerProvider
65 public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> getTbCoreNotificationsMsgProducer() { 67 public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> getTbCoreNotificationsMsgProducer() {
66 throw new RuntimeException("Not Implemented! Should not be used by Transport!"); 68 throw new RuntimeException("Not Implemented! Should not be used by Transport!");
67 } 69 }
  70 +
  71 + @Override
  72 + public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> getTbUsageStatsMsgProducer() {
  73 + return toUsageStats;
  74 + }
68 } 75 }
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.provider;
  17 +
  18 +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
  19 +import org.thingsboard.server.queue.TbQueueProducer;
  20 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  21 +
  22 +public interface TbUsageStatsClientQueueFactory {
  23 +
  24 + default TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
  25 + //TODO: implement
  26 + return null;
  27 + }
  28 +
  29 +}
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.scheduler;
  17 +
  18 +import org.springframework.stereotype.Component;
  19 +import org.thingsboard.common.util.ThingsBoardThreadFactory;
  20 +
  21 +import javax.annotation.PostConstruct;
  22 +import javax.annotation.PreDestroy;
  23 +import java.util.concurrent.Callable;
  24 +import java.util.concurrent.Executors;
  25 +import java.util.concurrent.ScheduledExecutorService;
  26 +import java.util.concurrent.ScheduledFuture;
  27 +import java.util.concurrent.TimeUnit;
  28 +
  29 +@Component
  30 +public class DefaultSchedulerComponent implements SchedulerComponent{
  31 +
  32 + protected ScheduledExecutorService schedulerExecutor;
  33 +
  34 + @PostConstruct
  35 + public void init(){
  36 + this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("queue-scheduler"));
  37 + }
  38 +
  39 + @PreDestroy
  40 + public void destroy() {
  41 + if (schedulerExecutor != null) {
  42 + schedulerExecutor.shutdownNow();
  43 + }
  44 + }
  45 +
  46 + public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
  47 + return schedulerExecutor.schedule(command, delay, unit);
  48 + }
  49 +
  50 + public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
  51 + return schedulerExecutor.schedule(callable, delay, unit);
  52 + }
  53 +
  54 + public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
  55 + return schedulerExecutor.scheduleAtFixedRate(command, initialDelay, period, unit);
  56 + }
  57 +
  58 + public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
  59 + return schedulerExecutor.scheduleWithFixedDelay(command, initialDelay, delay, unit);
  60 + }
  61 +}
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.scheduler;
  17 +
  18 +import java.util.concurrent.Callable;
  19 +import java.util.concurrent.ScheduledFuture;
  20 +import java.util.concurrent.TimeUnit;
  21 +
  22 +public interface SchedulerComponent {
  23 +
  24 + ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
  25 +
  26 + <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
  27 +
  28 + ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
  29 +
  30 + ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
  31 +
  32 +}
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.usagestats;
  17 +
  18 +import org.springframework.beans.factory.annotation.Value;
  19 +import org.springframework.stereotype.Component;
  20 +import org.thingsboard.server.common.data.UsageRecordKey;
  21 +import org.thingsboard.server.common.data.id.TenantId;
  22 +import org.thingsboard.server.common.msg.queue.ServiceType;
  23 +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
  24 +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
  25 +import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto;
  26 +import org.thingsboard.server.queue.TbQueueCallback;
  27 +import org.thingsboard.server.queue.TbQueueProducer;
  28 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  29 +import org.thingsboard.server.queue.discovery.PartitionService;
  30 +import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
  31 +import org.thingsboard.server.queue.scheduler.SchedulerComponent;
  32 +
  33 +import javax.annotation.PostConstruct;
  34 +import java.util.Random;
  35 +import java.util.UUID;
  36 +import java.util.concurrent.ConcurrentHashMap;
  37 +import java.util.concurrent.ConcurrentMap;
  38 +import java.util.concurrent.TimeUnit;
  39 +import java.util.concurrent.atomic.AtomicLong;
  40 +
  41 +@Component
  42 +public class DefaultTbUsageStatsClient implements TbUsageStatsClient {
  43 +
  44 + @Value("${usage.stats.report.enabled:true}")
  45 + private boolean enabled;
  46 + @Value("${usage.stats.report.interval:600}")
  47 + private int interval;
  48 +
  49 + private final ConcurrentMap<TenantId, AtomicLong>[] values = new ConcurrentMap[UsageRecordKey.values().length];
  50 + private final PartitionService partitionService;
  51 + private final SchedulerComponent scheduler;
  52 + private final TbQueueProducerProvider producerProvider;
  53 + private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> msgProducer;
  54 +
  55 + public DefaultTbUsageStatsClient(PartitionService partitionService, SchedulerComponent scheduler, TbQueueProducerProvider producerProvider) {
  56 + this.partitionService = partitionService;
  57 + this.scheduler = scheduler;
  58 + this.producerProvider = producerProvider;
  59 + }
  60 +
  61 + @PostConstruct
  62 + private void init() {
  63 + if (enabled) {
  64 + msgProducer = this.producerProvider.getTbUsageStatsMsgProducer();
  65 + for (UsageRecordKey key : UsageRecordKey.values()) {
  66 + values[key.ordinal()] = new ConcurrentHashMap<>();
  67 + }
  68 + scheduler.scheduleWithFixedDelay(this::reportStats, new Random().nextInt(interval), interval, TimeUnit.SECONDS);
  69 + }
  70 + }
  71 +
  72 + private void reportStats() {
  73 + ConcurrentMap<TenantId, ToUsageStatsServiceMsg.Builder> report = new ConcurrentHashMap<>();
  74 +
  75 + for (UsageRecordKey key : UsageRecordKey.values()) {
  76 + values[key.ordinal()].forEach(((tenantId, atomicLong) -> {
  77 + long value = atomicLong.getAndSet(0);
  78 + if (value > 0) {
  79 + ToUsageStatsServiceMsg.Builder msgBuilder = report.computeIfAbsent(tenantId, id -> {
  80 + ToUsageStatsServiceMsg.Builder msg = ToUsageStatsServiceMsg.newBuilder();
  81 + msg.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
  82 + msg.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
  83 + return msg;
  84 + });
  85 + msgBuilder.addValues(UsageStatsKVProto.newBuilder().setKey(key.name()).setValue(value).build());
  86 + }
  87 + }));
  88 + }
  89 +
  90 + report.forEach(((tenantId, builder) -> {
  91 + //TODO: figure out how to minimize messages into the queue. Maybe group by 100s of messages?
  92 + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId);
  93 + msgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), builder.build()), null);
  94 + }));
  95 + }
  96 +
  97 + @Override
  98 + public void report(TenantId tenantId, UsageRecordKey key, long value) {
  99 + if (enabled) {
  100 + ConcurrentMap<TenantId, AtomicLong> map = values[key.ordinal()];
  101 + AtomicLong atomicValue = map.computeIfAbsent(tenantId, id -> new AtomicLong());
  102 + atomicValue.addAndGet(value);
  103 + }
  104 + }
  105 +
  106 +}
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.usagestats;
  17 +
  18 +import org.thingsboard.server.common.data.UsageRecordKey;
  19 +import org.thingsboard.server.common.data.id.TenantId;
  20 +
  21 +public interface TbUsageStatsClient {
  22 +
  23 + void report(TenantId tenantId, UsageRecordKey key, long value);
  24 +
  25 +
  26 +}
@@ -532,3 +532,16 @@ message ToTransportMsg { @@ -532,3 +532,16 @@ message ToTransportMsg {
532 EntityDeleteMsg entityDeleteMsg = 9; 532 EntityDeleteMsg entityDeleteMsg = 9;
533 ProvisionDeviceResponseMsg provisionResponse = 10; 533 ProvisionDeviceResponseMsg provisionResponse = 10;
534 } 534 }
  535 +
  536 +message UsageStatsKVProto{
  537 + string key = 1;
  538 + int64 value = 2;
  539 +}
  540 +
  541 +message ToUsageStatsServiceMsg {
  542 + int64 tenantIdMSB = 1;
  543 + int64 tenantIdLSB = 2;
  544 + int64 entityIdMSB = 3;
  545 + int64 entityIdLSB = 4;
  546 + repeated UsageStatsKVProto values = 5;
  547 +}
@@ -57,6 +57,7 @@ import org.thingsboard.server.gen.transport.TransportProtos; @@ -57,6 +57,7 @@ import org.thingsboard.server.gen.transport.TransportProtos;
57 import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg; 57 import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg;
58 import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent; 58 import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent;
59 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; 59 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
  60 +import org.thingsboard.server.queue.scheduler.SchedulerComponent;
60 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; 61 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
61 import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; 62 import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
62 import org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler; 63 import org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler;
@@ -97,6 +98,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -97,6 +98,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
97 private final UUID sessionId; 98 private final UUID sessionId;
98 private final MqttTransportContext context; 99 private final MqttTransportContext context;
99 private final TransportService transportService; 100 private final TransportService transportService;
  101 + private final SchedulerComponent scheduler;
100 private final SslHandler sslHandler; 102 private final SslHandler sslHandler;
101 private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap; 103 private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
102 104
@@ -108,6 +110,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -108,6 +110,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
108 this.sessionId = UUID.randomUUID(); 110 this.sessionId = UUID.randomUUID();
109 this.context = context; 111 this.context = context;
110 this.transportService = context.getTransportService(); 112 this.transportService = context.getTransportService();
  113 + this.scheduler = context.getScheduler();
111 this.sslHandler = sslHandler; 114 this.sslHandler = sslHandler;
112 this.mqttQoSMap = new ConcurrentHashMap<>(); 115 this.mqttQoSMap = new ConcurrentHashMap<>();
113 this.deviceSessionCtx = new DeviceSessionCtx(sessionId, mqttQoSMap, context); 116 this.deviceSessionCtx = new DeviceSessionCtx(sessionId, mqttQoSMap, context);
@@ -333,7 +336,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -333,7 +336,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
333 } else { 336 } else {
334 deviceSessionCtx.getContext().getProtoMqttAdaptor().convertToPublish(deviceSessionCtx, provisionResponseMsg).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); 337 deviceSessionCtx.getContext().getProtoMqttAdaptor().convertToPublish(deviceSessionCtx, provisionResponseMsg).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
335 } 338 }
336 - transportService.getSchedulerExecutor().schedule(() -> processDisconnect(ctx), 60, TimeUnit.SECONDS); 339 + scheduler.schedule(() -> processDisconnect(ctx), 60, TimeUnit.SECONDS);
337 } catch (Exception e) { 340 } catch (Exception e) {
338 log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); 341 log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
339 } 342 }
@@ -23,6 +23,7 @@ import org.springframework.beans.factory.annotation.Autowired; @@ -23,6 +23,7 @@ import org.springframework.beans.factory.annotation.Autowired;
23 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; 23 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
24 import org.springframework.stereotype.Service; 24 import org.springframework.stereotype.Service;
25 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; 25 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
  26 +import org.thingsboard.server.queue.scheduler.SchedulerComponent;
26 27
27 import javax.annotation.PostConstruct; 28 import javax.annotation.PostConstruct;
28 import javax.annotation.PreDestroy; 29 import javax.annotation.PreDestroy;
@@ -44,6 +45,8 @@ public abstract class TransportContext { @@ -44,6 +45,8 @@ public abstract class TransportContext {
44 private TransportService transportService; 45 private TransportService transportService;
45 @Autowired 46 @Autowired
46 private TbServiceInfoProvider serviceInfoProvider; 47 private TbServiceInfoProvider serviceInfoProvider;
  48 + @Autowired
  49 + private SchedulerComponent scheduler;
47 50
48 @Getter 51 @Getter
49 private ExecutorService executor; 52 private ExecutorService executor;
@@ -17,16 +17,14 @@ package org.thingsboard.server.common.transport; @@ -17,16 +17,14 @@ package org.thingsboard.server.common.transport;
17 17
18 import org.thingsboard.server.common.data.DeviceProfile; 18 import org.thingsboard.server.common.data.DeviceProfile;
19 import org.thingsboard.server.common.data.DeviceTransportType; 19 import org.thingsboard.server.common.data.DeviceTransportType;
20 -import org.thingsboard.server.common.data.id.DeviceProfileId;  
21 import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; 20 import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
22 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; 21 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
23 import org.thingsboard.server.common.transport.limits.TransportRateLimitType; 22 import org.thingsboard.server.common.transport.limits.TransportRateLimitType;
24 -import org.thingsboard.server.gen.transport.TransportProtos;  
25 import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; 23 import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
26 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; 24 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
27 -import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;  
28 import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileRequestMsg; 25 import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileRequestMsg;
29 import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileResponseMsg; 26 import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileResponseMsg;
  27 +import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
30 import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg; 28 import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
31 import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg; 29 import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
32 import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRequestMsg; 30 import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRequestMsg;
@@ -42,8 +40,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateBasicMqttCre @@ -42,8 +40,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateBasicMqttCre
42 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; 40 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
43 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; 41 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
44 42
45 -import java.util.concurrent.ScheduledExecutorService;  
46 -  
47 /** 43 /**
48 * Created by ashvayka on 04.10.18. 44 * Created by ashvayka on 04.10.18.
49 */ 45 */
@@ -92,8 +88,6 @@ public interface TransportService { @@ -92,8 +88,6 @@ public interface TransportService {
92 88
93 void process(SessionInfoProto sessionInfo, ClaimDeviceMsg msg, TransportServiceCallback<Void> callback); 89 void process(SessionInfoProto sessionInfo, ClaimDeviceMsg msg, TransportServiceCallback<Void> callback);
94 90
95 - ScheduledExecutorService getSchedulerExecutor();  
96 -  
97 void registerAsyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener); 91 void registerAsyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
98 92
99 void registerSyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout); 93 void registerSyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout);
@@ -23,7 +23,6 @@ import com.google.gson.JsonObject; @@ -23,7 +23,6 @@ import com.google.gson.JsonObject;
23 import com.google.protobuf.ByteString; 23 import com.google.protobuf.ByteString;
24 import lombok.extern.slf4j.Slf4j; 24 import lombok.extern.slf4j.Slf4j;
25 import org.springframework.beans.factory.annotation.Value; 25 import org.springframework.beans.factory.annotation.Value;
26 -import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;  
27 import org.springframework.stereotype.Service; 26 import org.springframework.stereotype.Service;
28 import org.thingsboard.common.util.ThingsBoardThreadFactory; 27 import org.thingsboard.common.util.ThingsBoardThreadFactory;
29 import org.thingsboard.server.common.data.DeviceProfile; 28 import org.thingsboard.server.common.data.DeviceProfile;
@@ -53,7 +52,6 @@ import org.thingsboard.server.common.transport.TransportTenantProfileCache; @@ -53,7 +52,6 @@ import org.thingsboard.server.common.transport.TransportTenantProfileCache;
53 import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; 52 import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
54 import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; 53 import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
55 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; 54 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
56 -import org.thingsboard.server.common.transport.limits.TransportRateLimit;  
57 import org.thingsboard.server.common.transport.limits.TransportRateLimitService; 55 import org.thingsboard.server.common.transport.limits.TransportRateLimitService;
58 import org.thingsboard.server.common.transport.limits.TransportRateLimitType; 56 import org.thingsboard.server.common.transport.limits.TransportRateLimitType;
59 import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult; 57 import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult;
@@ -79,6 +77,7 @@ import org.thingsboard.server.queue.discovery.PartitionService; @@ -79,6 +77,7 @@ import org.thingsboard.server.queue.discovery.PartitionService;
79 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; 77 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
80 import org.thingsboard.server.queue.provider.TbQueueProducerProvider; 78 import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
81 import org.thingsboard.server.queue.provider.TbTransportQueueFactory; 79 import org.thingsboard.server.queue.provider.TbTransportQueueFactory;
  80 +import org.thingsboard.server.queue.scheduler.SchedulerComponent;
82 import org.thingsboard.server.queue.util.TbTransportComponent; 81 import org.thingsboard.server.queue.util.TbTransportComponent;
83 82
84 import javax.annotation.PostConstruct; 83 import javax.annotation.PostConstruct;
@@ -94,7 +93,6 @@ import java.util.concurrent.ConcurrentMap; @@ -94,7 +93,6 @@ import java.util.concurrent.ConcurrentMap;
94 import java.util.concurrent.ExecutionException; 93 import java.util.concurrent.ExecutionException;
95 import java.util.concurrent.ExecutorService; 94 import java.util.concurrent.ExecutorService;
96 import java.util.concurrent.Executors; 95 import java.util.concurrent.Executors;
97 -import java.util.concurrent.ScheduledExecutorService;  
98 import java.util.concurrent.ScheduledFuture; 96 import java.util.concurrent.ScheduledFuture;
99 import java.util.concurrent.TimeUnit; 97 import java.util.concurrent.TimeUnit;
100 import java.util.concurrent.atomic.AtomicInteger; 98 import java.util.concurrent.atomic.AtomicInteger;
@@ -126,6 +124,7 @@ public class DefaultTransportService implements TransportService { @@ -126,6 +124,7 @@ public class DefaultTransportService implements TransportService {
126 private final TransportTenantProfileCache tenantProfileCache; 124 private final TransportTenantProfileCache tenantProfileCache;
127 private final TransportRateLimitService rateLimitService; 125 private final TransportRateLimitService rateLimitService;
128 private final DataDecodingEncodingService dataDecodingEncodingService; 126 private final DataDecodingEncodingService dataDecodingEncodingService;
  127 + private final SchedulerComponent scheduler;
129 128
130 protected TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> transportApiRequestTemplate; 129 protected TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> transportApiRequestTemplate;
131 protected TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer; 130 protected TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer;
@@ -136,7 +135,6 @@ public class DefaultTransportService implements TransportService { @@ -136,7 +135,6 @@ public class DefaultTransportService implements TransportService {
136 protected MessagesStats tbCoreProducerStats; 135 protected MessagesStats tbCoreProducerStats;
137 protected MessagesStats transportApiStats; 136 protected MessagesStats transportApiStats;
138 137
139 - protected ScheduledExecutorService schedulerExecutor;  
140 protected ExecutorService transportCallbackExecutor; 138 protected ExecutorService transportCallbackExecutor;
141 private ExecutorService mainConsumerExecutor; 139 private ExecutorService mainConsumerExecutor;
142 140
@@ -152,7 +150,7 @@ public class DefaultTransportService implements TransportService { @@ -152,7 +150,7 @@ public class DefaultTransportService implements TransportService {
152 StatsFactory statsFactory, 150 StatsFactory statsFactory,
153 TransportDeviceProfileCache deviceProfileCache, 151 TransportDeviceProfileCache deviceProfileCache,
154 TransportTenantProfileCache tenantProfileCache, 152 TransportTenantProfileCache tenantProfileCache,
155 - TransportRateLimitService rateLimitService, DataDecodingEncodingService dataDecodingEncodingService) { 153 + TransportRateLimitService rateLimitService, DataDecodingEncodingService dataDecodingEncodingService, SchedulerComponent scheduler) {
156 this.serviceInfoProvider = serviceInfoProvider; 154 this.serviceInfoProvider = serviceInfoProvider;
157 this.queueProvider = queueProvider; 155 this.queueProvider = queueProvider;
158 this.producerProvider = producerProvider; 156 this.producerProvider = producerProvider;
@@ -162,6 +160,7 @@ public class DefaultTransportService implements TransportService { @@ -162,6 +160,7 @@ public class DefaultTransportService implements TransportService {
162 this.tenantProfileCache = tenantProfileCache; 160 this.tenantProfileCache = tenantProfileCache;
163 this.rateLimitService = rateLimitService; 161 this.rateLimitService = rateLimitService;
164 this.dataDecodingEncodingService = dataDecodingEncodingService; 162 this.dataDecodingEncodingService = dataDecodingEncodingService;
  163 + this.scheduler = scheduler;
165 } 164 }
166 165
167 @PostConstruct 166 @PostConstruct
@@ -169,9 +168,8 @@ public class DefaultTransportService implements TransportService { @@ -169,9 +168,8 @@ public class DefaultTransportService implements TransportService {
169 this.ruleEngineProducerStats = statsFactory.createMessagesStats(StatsType.RULE_ENGINE.getName() + ".producer"); 168 this.ruleEngineProducerStats = statsFactory.createMessagesStats(StatsType.RULE_ENGINE.getName() + ".producer");
170 this.tbCoreProducerStats = statsFactory.createMessagesStats(StatsType.CORE.getName() + ".producer"); 169 this.tbCoreProducerStats = statsFactory.createMessagesStats(StatsType.CORE.getName() + ".producer");
171 this.transportApiStats = statsFactory.createMessagesStats(StatsType.TRANSPORT.getName() + ".producer"); 170 this.transportApiStats = statsFactory.createMessagesStats(StatsType.TRANSPORT.getName() + ".producer");
172 - this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("transport-scheduler"));  
173 this.transportCallbackExecutor = Executors.newWorkStealingPool(20); 171 this.transportCallbackExecutor = Executors.newWorkStealingPool(20);
174 - this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS); 172 + this.scheduler.scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS);
175 transportApiRequestTemplate = queueProvider.createTransportApiRequestTemplate(); 173 transportApiRequestTemplate = queueProvider.createTransportApiRequestTemplate();
176 transportApiRequestTemplate.setMessagesStats(transportApiStats); 174 transportApiRequestTemplate.setMessagesStats(transportApiStats);
177 ruleEngineMsgProducer = producerProvider.getRuleEngineMsgProducer(); 175 ruleEngineMsgProducer = producerProvider.getRuleEngineMsgProducer();
@@ -217,9 +215,6 @@ public class DefaultTransportService implements TransportService { @@ -217,9 +215,6 @@ public class DefaultTransportService implements TransportService {
217 if (transportNotificationsConsumer != null) { 215 if (transportNotificationsConsumer != null) {
218 transportNotificationsConsumer.unsubscribe(); 216 transportNotificationsConsumer.unsubscribe();
219 } 217 }
220 - if (schedulerExecutor != null) {  
221 - schedulerExecutor.shutdownNow();  
222 - }  
223 if (transportCallbackExecutor != null) { 218 if (transportCallbackExecutor != null) {
224 transportCallbackExecutor.shutdownNow(); 219 transportCallbackExecutor.shutdownNow();
225 } 220 }
@@ -232,11 +227,6 @@ public class DefaultTransportService implements TransportService { @@ -232,11 +227,6 @@ public class DefaultTransportService implements TransportService {
232 } 227 }
233 228
234 @Override 229 @Override
235 - public ScheduledExecutorService getSchedulerExecutor() {  
236 - return this.schedulerExecutor;  
237 - }  
238 -  
239 - @Override  
240 public void registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) { 230 public void registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) {
241 sessions.putIfAbsent(toSessionId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener)); 231 sessions.putIfAbsent(toSessionId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener));
242 } 232 }
@@ -488,7 +478,7 @@ public class DefaultTransportService implements TransportService { @@ -488,7 +478,7 @@ public class DefaultTransportService implements TransportService {
488 sendToRuleEngine(tenantId, tbMsg, new TransportTbQueueCallback(callback)); 478 sendToRuleEngine(tenantId, tbMsg, new TransportTbQueueCallback(callback));
489 String requestId = sessionId + "-" + msg.getRequestId(); 479 String requestId = sessionId + "-" + msg.getRequestId();
490 toServerRpcPendingMap.put(requestId, new RpcRequestMetadata(sessionId, msg.getRequestId())); 480 toServerRpcPendingMap.put(requestId, new RpcRequestMetadata(sessionId, msg.getRequestId()));
491 - schedulerExecutor.schedule(() -> processTimeout(requestId), clientSideRpcTimeout, TimeUnit.MILLISECONDS); 481 + scheduler.schedule(() -> processTimeout(requestId), clientSideRpcTimeout, TimeUnit.MILLISECONDS);
492 } 482 }
493 } 483 }
494 484
@@ -561,7 +551,7 @@ public class DefaultTransportService implements TransportService { @@ -561,7 +551,7 @@ public class DefaultTransportService implements TransportService {
561 SessionMetaData currentSession = new SessionMetaData(sessionInfo, TransportProtos.SessionType.SYNC, listener); 551 SessionMetaData currentSession = new SessionMetaData(sessionInfo, TransportProtos.SessionType.SYNC, listener);
562 sessions.putIfAbsent(toSessionId(sessionInfo), currentSession); 552 sessions.putIfAbsent(toSessionId(sessionInfo), currentSession);
563 553
564 - ScheduledFuture executorFuture = schedulerExecutor.schedule(() -> { 554 + ScheduledFuture executorFuture = scheduler.schedule(() -> {
565 listener.onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance()); 555 listener.onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
566 deregisterSession(sessionInfo); 556 deregisterSession(sessionInfo);
567 }, timeout, TimeUnit.MILLISECONDS); 557 }, timeout, TimeUnit.MILLISECONDS);