Commit d40bc488980abc04b60523ef83901f6a92e30932

Authored by Andrii Shvaika
1 parent c52c9a16

Ability to log number of active MQTT connections

... ... @@ -702,6 +702,9 @@ transport:
702 702 parallelism_level: "${SNMP_RESPONSE_PROCESSING_PARALLELISM_LEVEL:20}"
703 703 # to configure SNMP to work over UDP or TCP
704 704 underlying_protocol: "${SNMP_UNDERLYING_PROTOCOL:udp}"
  705 + stats:
  706 + enabled: "${TB_TRANSPORT_STATS_ENABLED:true}"
  707 + print-interval-ms: "${TB_TRANSPORT_STATS_PRINT_INTERVAL_MS:60000}"
705 708
706 709 # Edges parameters
707 710 edges:
... ...
... ... @@ -31,6 +31,7 @@ import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor;
31 31 import javax.annotation.PostConstruct;
32 32 import javax.annotation.PreDestroy;
33 33 import java.util.concurrent.ExecutorService;
  34 +import java.util.concurrent.atomic.AtomicInteger;
34 35
35 36 /**
36 37 * Created by ashvayka on 04.10.18.
... ... @@ -71,4 +72,19 @@ public class MqttTransportContext extends TransportContext {
71 72 @Getter
72 73 @Value("${transport.mqtt.timeout:10000}")
73 74 private long timeout;
  75 +
  76 + private final AtomicInteger connectionsCounter = new AtomicInteger();
  77 +
  78 + @PostConstruct
  79 + public void init() {
  80 + transportService.createGaugeStats("openConnections", connectionsCounter);
  81 + }
  82 +
  83 + public void channelRegistered() {
  84 + connectionsCounter.incrementAndGet();
  85 + }
  86 +
  87 + public void channelUnregistered() {
  88 + connectionsCounter.decrementAndGet();
  89 + }
74 90 }
... ...
... ... @@ -147,6 +147,18 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
147 147 }
148 148
149 149 @Override
  150 + public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  151 + super.channelRegistered(ctx);
  152 + context.channelRegistered();
  153 + }
  154 +
  155 + @Override
  156 + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
  157 + super.channelUnregistered(ctx);
  158 + context.channelUnregistered();
  159 + }
  160 +
  161 + @Override
150 162 public void channelRead(ChannelHandlerContext ctx, Object msg) {
151 163 log.trace("[{}] Processing msg: {}", sessionId, msg);
152 164 try {
... ...
... ... @@ -39,7 +39,7 @@ public abstract class TransportContext {
39 39 protected final ObjectMapper mapper = new ObjectMapper();
40 40
41 41 @Autowired
42   - private TransportService transportService;
  42 + protected TransportService transportService;
43 43
44 44 @Autowired
45 45 private TbServiceInfoProvider serviceInfoProvider;
... ...
... ... @@ -58,6 +58,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenR
58 58 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
59 59
60 60 import java.util.concurrent.ExecutorService;
  61 +import java.util.concurrent.atomic.AtomicInteger;
61 62
62 63 /**
63 64 * Created by ashvayka on 04.10.18.
... ... @@ -138,4 +139,6 @@ public interface TransportService {
138 139 ExecutorService getCallbackExecutor();
139 140
140 141 boolean hasSession(SessionInfoProto sessionInfo);
  142 +
  143 + void createGaugeStats(String openConnections, AtomicInteger connectionsCounter);
141 144 }
... ...
... ... @@ -21,9 +21,11 @@ import com.google.common.util.concurrent.MoreExecutors;
21 21 import com.google.gson.Gson;
22 22 import com.google.gson.JsonObject;
23 23 import com.google.protobuf.ByteString;
  24 +import lombok.Getter;
24 25 import lombok.extern.slf4j.Slf4j;
25 26 import org.springframework.beans.factory.annotation.Value;
26 27 import org.springframework.context.ApplicationEventPublisher;
  28 +import org.springframework.scheduling.annotation.Scheduled;
27 29 import org.springframework.stereotype.Service;
28 30 import org.thingsboard.common.util.ThingsBoardExecutors;
29 31 import org.thingsboard.common.util.ThingsBoardThreadFactory;
... ... @@ -96,6 +98,7 @@ import javax.annotation.PostConstruct;
96 98 import javax.annotation.PreDestroy;
97 99 import java.util.Collections;
98 100 import java.util.HashSet;
  101 +import java.util.LinkedHashMap;
99 102 import java.util.List;
100 103 import java.util.Map;
101 104 import java.util.Optional;
... ... @@ -110,6 +113,7 @@ import java.util.concurrent.Executors;
110 113 import java.util.concurrent.ScheduledFuture;
111 114 import java.util.concurrent.TimeUnit;
112 115 import java.util.concurrent.atomic.AtomicInteger;
  116 +import java.util.stream.Collectors;
113 117
114 118 /**
115 119 * Created by ashvayka on 17.10.18.
... ... @@ -135,6 +139,10 @@ public class DefaultTransportService implements TransportService {
135 139 private long clientSideRpcTimeout;
136 140 @Value("${queue.transport.poll_interval}")
137 141 private int notificationsPollDuration;
  142 + @Value("${transport.stats.enabled:false}")
  143 + private boolean statsEnabled;
  144 +
  145 + private final Map<String, Number> statsMap = new LinkedHashMap<>();
138 146
139 147 private final Gson gson = new Gson();
140 148 private final TbTransportQueueFactory queueProvider;
... ... @@ -1167,4 +1175,19 @@ public class DefaultTransportService implements TransportService {
1167 1175 public boolean hasSession(TransportProtos.SessionInfoProto sessionInfo) {
1168 1176 return sessions.containsKey(toSessionId(sessionInfo));
1169 1177 }
  1178 +
  1179 + @Override
  1180 + public void createGaugeStats(String statsName, AtomicInteger number) {
  1181 + statsFactory.createGauge(StatsType.TRANSPORT + "." + statsName, number);
  1182 + statsMap.put(statsName, number);
  1183 + }
  1184 +
  1185 + @Scheduled(fixedDelayString = "${transport.stats.print-interval-ms:60000}")
  1186 + public void printStats() {
  1187 + if (statsEnabled) {
  1188 + String values = statsMap.entrySet().stream()
  1189 + .map(kv -> kv.getKey() + " [" + kv.getValue() + "]").collect(Collectors.joining(", "));
  1190 + log.info("Transport Stats: {}", values);
  1191 + }
  1192 + }
1170 1193 }
... ...
... ... @@ -122,6 +122,9 @@ transport:
122 122 log:
123 123 enabled: "${TB_TRANSPORT_LOG_ENABLED:true}"
124 124 max_length: "${TB_TRANSPORT_LOG_MAX_LENGTH:1024}"
  125 + stats:
  126 + enabled: "${TB_TRANSPORT_STATS_ENABLED:true}"
  127 + print-interval-ms: "${TB_TRANSPORT_STATS_PRINT_INTERVAL_MS:60000}"
125 128
126 129 queue:
127 130 type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
... ...
... ... @@ -94,6 +94,9 @@ transport:
94 94 log:
95 95 enabled: "${TB_TRANSPORT_LOG_ENABLED:true}"
96 96 max_length: "${TB_TRANSPORT_LOG_MAX_LENGTH:1024}"
  97 + stats:
  98 + enabled: "${TB_TRANSPORT_STATS_ENABLED:true}"
  99 + print-interval-ms: "${TB_TRANSPORT_STATS_PRINT_INTERVAL_MS:60000}"
97 100
98 101 queue:
99 102 type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
... ...
... ... @@ -147,6 +147,9 @@ transport:
147 147 paging_transmission_window: "${LWM2M_PAGING_TRANSMISSION_WINDOW:10000}"
148 148 # Use redis for Security and Registration stores
149 149 redis.enabled: "${LWM2M_REDIS_ENABLED:false}"
  150 + stats:
  151 + enabled: "${TB_TRANSPORT_STATS_ENABLED:true}"
  152 + print-interval-ms: "${TB_TRANSPORT_STATS_PRINT_INTERVAL_MS:60000}"
150 153
151 154 queue:
152 155 type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
... ...
... ... @@ -127,6 +127,9 @@ transport:
127 127 log:
128 128 enabled: "${TB_TRANSPORT_LOG_ENABLED:true}"
129 129 max_length: "${TB_TRANSPORT_LOG_MAX_LENGTH:1024}"
  130 + stats:
  131 + enabled: "${TB_TRANSPORT_STATS_ENABLED:true}"
  132 + print-interval-ms: "${TB_TRANSPORT_STATS_PRINT_INTERVAL_MS:60000}"
130 133
131 134
132 135 queue:
... ...
... ... @@ -59,6 +59,9 @@ transport:
59 59 log:
60 60 enabled: "${TB_TRANSPORT_LOG_ENABLED:true}"
61 61 max_length: "${TB_TRANSPORT_LOG_MAX_LENGTH:1024}"
  62 + stats:
  63 + enabled: "${TB_TRANSPORT_STATS_ENABLED:true}"
  64 + print-interval-ms: "${TB_TRANSPORT_STATS_PRINT_INTERVAL_MS:60000}"
62 65
63 66 queue:
64 67 type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
... ...