Commit 99b19034e202611bd3410bcfa5039f125154f19a

Authored by Andrii Shvaika
1 parent 60c9e43e

Uplink notifications for PSM & eDRX for CoAP in MSA deployment

@@ -36,6 +36,12 @@ public class DataConstants { @@ -36,6 +36,12 @@ public class DataConstants {
36 public static final String ALARM_CONDITION_REPEATS = "alarmConditionRepeats"; 36 public static final String ALARM_CONDITION_REPEATS = "alarmConditionRepeats";
37 public static final String ALARM_CONDITION_DURATION = "alarmConditionDuration"; 37 public static final String ALARM_CONDITION_DURATION = "alarmConditionDuration";
38 public static final String PERSISTENT = "persistent"; 38 public static final String PERSISTENT = "persistent";
  39 + public static final String COAP_TRANSPORT_NAME = "COAP";
  40 + public static final String LWM2M_TRANSPORT_NAME = "LWM2M";
  41 + public static final String MQTT_TRANSPORT_NAME = "MQTT";
  42 + public static final String HTTP_TRANSPORT_NAME = "HTTP";
  43 + public static final String SNMP_TRANSPORT_NAME = "SNMP";
  44 +
39 45
40 public static final String[] allScopes() { 46 public static final String[] allScopes() {
41 return new String[]{CLIENT_SCOPE, SHARED_SCOPE, SERVER_SCOPE}; 47 return new String[]{CLIENT_SCOPE, SHARED_SCOPE, SERVER_SCOPE};
@@ -73,6 +73,7 @@ public class HashPartitionService implements PartitionService { @@ -73,6 +73,7 @@ public class HashPartitionService implements PartitionService {
73 73
74 private Map<String, TopicPartitionInfo> tbCoreNotificationTopics = new HashMap<>(); 74 private Map<String, TopicPartitionInfo> tbCoreNotificationTopics = new HashMap<>();
75 private Map<String, TopicPartitionInfo> tbRuleEngineNotificationTopics = new HashMap<>(); 75 private Map<String, TopicPartitionInfo> tbRuleEngineNotificationTopics = new HashMap<>();
  76 + private Map<String, List<ServiceInfo>> tbTransportServicesByType = new HashMap<>();
76 private List<ServiceInfo> currentOtherServices; 77 private List<ServiceInfo> currentOtherServices;
77 78
78 private HashFunction hashFunction; 79 private HashFunction hashFunction;
@@ -127,6 +128,7 @@ public class HashPartitionService implements PartitionService { @@ -127,6 +128,7 @@ public class HashPartitionService implements PartitionService {
127 128
128 @Override 129 @Override
129 public synchronized void recalculatePartitions(ServiceInfo currentService, List<ServiceInfo> otherServices) { 130 public synchronized void recalculatePartitions(ServiceInfo currentService, List<ServiceInfo> otherServices) {
  131 + tbTransportServicesByType.clear();
130 logServiceInfo(currentService); 132 logServiceInfo(currentService);
131 otherServices.forEach(this::logServiceInfo); 133 otherServices.forEach(this::logServiceInfo);
132 Map<ServiceQueueKey, List<ServiceInfo>> queueServicesMap = new HashMap<>(); 134 Map<ServiceQueueKey, List<ServiceInfo>> queueServicesMap = new HashMap<>();
@@ -229,6 +231,12 @@ public class HashPartitionService implements PartitionService { @@ -229,6 +231,12 @@ public class HashPartitionService implements PartitionService {
229 return Math.abs(hash % partitions); 231 return Math.abs(hash % partitions);
230 } 232 }
231 233
  234 + @Override
  235 + public int countTransportsByType(String type) {
  236 + var list = tbTransportServicesByType.get(type);
  237 + return list == null ? 0 : list.size();
  238 + }
  239 +
232 private Map<ServiceQueueKey, List<ServiceInfo>> getServiceKeyListMap(List<ServiceInfo> services) { 240 private Map<ServiceQueueKey, List<ServiceInfo>> getServiceKeyListMap(List<ServiceInfo> services) {
233 final Map<ServiceQueueKey, List<ServiceInfo>> currentMap = new HashMap<>(); 241 final Map<ServiceQueueKey, List<ServiceInfo>> currentMap = new HashMap<>();
234 services.forEach(serviceInfo -> { 242 services.forEach(serviceInfo -> {
@@ -332,6 +340,9 @@ public class HashPartitionService implements PartitionService { @@ -332,6 +340,9 @@ public class HashPartitionService implements PartitionService {
332 queueServiceList.computeIfAbsent(serviceQueueKey, key -> new ArrayList<>()).add(instance); 340 queueServiceList.computeIfAbsent(serviceQueueKey, key -> new ArrayList<>()).add(instance);
333 } 341 }
334 } 342 }
  343 + for (String transportType : instance.getTransportsList()) {
  344 + tbTransportServicesByType.computeIfAbsent(transportType, t -> new ArrayList<>()).add(instance);
  345 + }
335 } 346 }
336 347
337 private ServiceInfo resolveByPartitionIdx(List<ServiceInfo> servers, Integer partitionIdx) { 348 private ServiceInfo resolveByPartitionIdx(List<ServiceInfo> servers, Integer partitionIdx) {
@@ -59,4 +59,6 @@ public interface PartitionService { @@ -59,4 +59,6 @@ public interface PartitionService {
59 TopicPartitionInfo getNotificationsTopic(ServiceType serviceType, String serviceId); 59 TopicPartitionInfo getNotificationsTopic(ServiceType serviceType, String serviceId);
60 60
61 int resolvePartitionIndex(UUID entityId, int partitions); 61 int resolvePartitionIndex(UUID entityId, int partitions);
  62 +
  63 + int countTransportsByType(String type);
62 } 64 }
@@ -22,6 +22,7 @@ import org.springframework.beans.factory.annotation.Autowired; @@ -22,6 +22,7 @@ import org.springframework.beans.factory.annotation.Autowired;
22 import org.springframework.stereotype.Service; 22 import org.springframework.stereotype.Service;
23 import org.thingsboard.server.coapserver.CoapServerService; 23 import org.thingsboard.server.coapserver.CoapServerService;
24 import org.thingsboard.server.coapserver.TbCoapServerComponent; 24 import org.thingsboard.server.coapserver.TbCoapServerComponent;
  25 +import org.thingsboard.server.common.data.DataConstants;
25 import org.thingsboard.server.common.data.TbTransportService; 26 import org.thingsboard.server.common.data.TbTransportService;
26 import org.thingsboard.server.common.data.ota.OtaPackageType; 27 import org.thingsboard.server.common.data.ota.OtaPackageType;
27 import org.thingsboard.server.transport.coap.efento.CoapEfentoTransportResource; 28 import org.thingsboard.server.transport.coap.efento.CoapEfentoTransportResource;
@@ -72,6 +73,6 @@ public class CoapTransportService implements TbTransportService { @@ -72,6 +73,6 @@ public class CoapTransportService implements TbTransportService {
72 73
73 @Override 74 @Override
74 public String getName() { 75 public String getName() {
75 - return "COAP"; 76 + return DataConstants.COAP_TRANSPORT_NAME;
76 } 77 }
77 } 78 }
@@ -18,14 +18,13 @@ package org.thingsboard.server.transport.coap.client; @@ -18,14 +18,13 @@ package org.thingsboard.server.transport.coap.client;
18 import lombok.RequiredArgsConstructor; 18 import lombok.RequiredArgsConstructor;
19 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
20 import org.eclipse.californium.core.coap.CoAP; 20 import org.eclipse.californium.core.coap.CoAP;
21 -import org.eclipse.californium.core.coap.MediaTypeRegistry;  
22 import org.eclipse.californium.core.coap.Response; 21 import org.eclipse.californium.core.coap.Response;
23 import org.eclipse.californium.core.observe.ObserveRelation; 22 import org.eclipse.californium.core.observe.ObserveRelation;
24 import org.eclipse.californium.core.server.resources.CoapExchange; 23 import org.eclipse.californium.core.server.resources.CoapExchange;
25 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; 24 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
26 import org.springframework.stereotype.Service; 25 import org.springframework.stereotype.Service;
27 import org.thingsboard.server.coapserver.CoapServerContext; 26 import org.thingsboard.server.coapserver.CoapServerContext;
28 -import org.thingsboard.server.coapserver.TbCoapServerComponent; 27 +import org.thingsboard.server.common.data.DataConstants;
29 import org.thingsboard.server.common.data.Device; 28 import org.thingsboard.server.common.data.Device;
30 import org.thingsboard.server.common.data.DeviceProfile; 29 import org.thingsboard.server.common.data.DeviceProfile;
31 import org.thingsboard.server.common.data.DeviceTransportType; 30 import org.thingsboard.server.common.data.DeviceTransportType;
@@ -51,6 +50,7 @@ import org.thingsboard.server.common.transport.adaptor.AdaptorException; @@ -51,6 +50,7 @@ import org.thingsboard.server.common.transport.adaptor.AdaptorException;
51 import org.thingsboard.server.common.transport.auth.SessionInfoCreator; 50 import org.thingsboard.server.common.transport.auth.SessionInfoCreator;
52 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; 51 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
53 import org.thingsboard.server.gen.transport.TransportProtos; 52 import org.thingsboard.server.gen.transport.TransportProtos;
  53 +import org.thingsboard.server.queue.discovery.PartitionService;
54 import org.thingsboard.server.transport.coap.CoapTransportContext; 54 import org.thingsboard.server.transport.coap.CoapTransportContext;
55 import org.thingsboard.server.transport.coap.TbCoapMessageObserver; 55 import org.thingsboard.server.transport.coap.TbCoapMessageObserver;
56 import org.thingsboard.server.transport.coap.TransportConfigurationContainer; 56 import org.thingsboard.server.transport.coap.TransportConfigurationContainer;
@@ -81,6 +81,7 @@ public class DefaultCoapClientContext implements CoapClientContext { @@ -81,6 +81,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
81 private final CoapTransportContext transportContext; 81 private final CoapTransportContext transportContext;
82 private final TransportService transportService; 82 private final TransportService transportService;
83 private final TransportDeviceProfileCache profileCache; 83 private final TransportDeviceProfileCache profileCache;
  84 + private final PartitionService partitionService;
84 private final ConcurrentMap<DeviceId, TbCoapClientState> clients = new ConcurrentHashMap<>(); 85 private final ConcurrentMap<DeviceId, TbCoapClientState> clients = new ConcurrentHashMap<>();
85 private final ConcurrentMap<String, TbCoapClientState> clientsByToken = new ConcurrentHashMap<>(); 86 private final ConcurrentMap<String, TbCoapClientState> clientsByToken = new ConcurrentHashMap<>();
86 87
@@ -214,7 +215,7 @@ public class DefaultCoapClientContext implements CoapClientContext { @@ -214,7 +215,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
214 return null; 215 return null;
215 }, timeout, TimeUnit.MILLISECONDS); 216 }, timeout, TimeUnit.MILLISECONDS);
216 client.setSleepTask(task); 217 client.setSleepTask(task);
217 - if (notifyOtherServers) { 218 + if (notifyOtherServers && partitionService.countTransportsByType(DataConstants.COAP_TRANSPORT_NAME) > 1) {
218 transportService.notifyAboutUplink(getNewSyncSession(client), TransportProtos.UplinkNotificationMsg.newBuilder().setUplinkTs(uplinkTime).build(), TransportServiceCallback.EMPTY); 219 transportService.notifyAboutUplink(getNewSyncSession(client), TransportProtos.UplinkNotificationMsg.newBuilder().setUplinkTs(uplinkTime).build(), TransportServiceCallback.EMPTY);
219 } 220 }
220 } finally { 221 } finally {
@@ -34,6 +34,7 @@ import org.springframework.web.bind.annotation.RequestMethod; @@ -34,6 +34,7 @@ import org.springframework.web.bind.annotation.RequestMethod;
34 import org.springframework.web.bind.annotation.RequestParam; 34 import org.springframework.web.bind.annotation.RequestParam;
35 import org.springframework.web.bind.annotation.RestController; 35 import org.springframework.web.bind.annotation.RestController;
36 import org.springframework.web.context.request.async.DeferredResult; 36 import org.springframework.web.context.request.async.DeferredResult;
  37 +import org.thingsboard.server.common.data.DataConstants;
37 import org.thingsboard.server.common.data.DeviceTransportType; 38 import org.thingsboard.server.common.data.DeviceTransportType;
38 import org.thingsboard.server.common.data.TbTransportService; 39 import org.thingsboard.server.common.data.TbTransportService;
39 import org.thingsboard.server.common.data.id.DeviceId; 40 import org.thingsboard.server.common.data.id.DeviceId;
@@ -436,7 +437,7 @@ public class DeviceApiController implements TbTransportService { @@ -436,7 +437,7 @@ public class DeviceApiController implements TbTransportService {
436 437
437 @Override 438 @Override
438 public String getName() { 439 public String getName() {
439 - return "HTTP"; 440 + return DataConstants.HTTP_TRANSPORT_NAME;
440 } 441 }
441 442
442 } 443 }
@@ -28,6 +28,7 @@ import org.eclipse.leshan.server.californium.registration.CaliforniumRegistratio @@ -28,6 +28,7 @@ import org.eclipse.leshan.server.californium.registration.CaliforniumRegistratio
28 import org.eclipse.leshan.server.model.LwM2mModelProvider; 28 import org.eclipse.leshan.server.model.LwM2mModelProvider;
29 import org.springframework.stereotype.Component; 29 import org.springframework.stereotype.Component;
30 import org.thingsboard.server.cache.ota.OtaPackageDataCache; 30 import org.thingsboard.server.cache.ota.OtaPackageDataCache;
  31 +import org.thingsboard.server.common.data.DataConstants;
31 import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; 32 import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
32 import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig; 33 import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig;
33 import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MAuthorizer; 34 import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MAuthorizer;
@@ -177,7 +178,7 @@ public class DefaultLwM2mTransportService implements LwM2MTransportService { @@ -177,7 +178,7 @@ public class DefaultLwM2mTransportService implements LwM2MTransportService {
177 178
178 @Override 179 @Override
179 public String getName() { 180 public String getName() {
180 - return "LWM2M"; 181 + return DataConstants.LWM2M_TRANSPORT_NAME;
181 } 182 }
182 183
183 } 184 }
@@ -28,6 +28,7 @@ import org.springframework.beans.factory.annotation.Value; @@ -28,6 +28,7 @@ import org.springframework.beans.factory.annotation.Value;
28 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; 28 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
29 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; 29 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
30 import org.springframework.stereotype.Service; 30 import org.springframework.stereotype.Service;
  31 +import org.thingsboard.server.common.data.DataConstants;
31 import org.thingsboard.server.common.data.TbTransportService; 32 import org.thingsboard.server.common.data.TbTransportService;
32 33
33 import javax.annotation.PostConstruct; 34 import javax.annotation.PostConstruct;
@@ -114,6 +115,6 @@ public class MqttTransportService implements TbTransportService { @@ -114,6 +115,6 @@ public class MqttTransportService implements TbTransportService {
114 115
115 @Override 116 @Override
116 public String getName() { 117 public String getName() {
117 - return "MQTT"; 118 + return DataConstants.MQTT_TRANSPORT_NAME;
118 } 119 }
119 } 120 }
@@ -35,6 +35,7 @@ import org.snmp4j.transport.DefaultUdpTransportMapping; @@ -35,6 +35,7 @@ import org.snmp4j.transport.DefaultUdpTransportMapping;
35 import org.springframework.beans.factory.annotation.Value; 35 import org.springframework.beans.factory.annotation.Value;
36 import org.springframework.stereotype.Service; 36 import org.springframework.stereotype.Service;
37 import org.thingsboard.common.util.ThingsBoardThreadFactory; 37 import org.thingsboard.common.util.ThingsBoardThreadFactory;
  38 +import org.thingsboard.server.common.data.DataConstants;
38 import org.thingsboard.server.common.data.TbTransportService; 39 import org.thingsboard.server.common.data.TbTransportService;
39 import org.thingsboard.server.common.data.kv.DataType; 40 import org.thingsboard.server.common.data.kv.DataType;
40 import org.thingsboard.server.common.data.transport.snmp.SnmpCommunicationSpec; 41 import org.thingsboard.server.common.data.transport.snmp.SnmpCommunicationSpec;
@@ -300,7 +301,7 @@ public class SnmpTransportService implements TbTransportService { @@ -300,7 +301,7 @@ public class SnmpTransportService implements TbTransportService {
300 301
301 @Override 302 @Override
302 public String getName() { 303 public String getName() {
303 - return "SNMP"; 304 + return DataConstants.SNMP_TRANSPORT_NAME;
304 } 305 }
305 306
306 @PreDestroy 307 @PreDestroy
@@ -573,7 +573,6 @@ public class DefaultTransportService implements TransportService { @@ -573,7 +573,6 @@ public class DefaultTransportService implements TransportService {
573 573
574 @Override 574 @Override
575 public void notifyAboutUplink(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.UplinkNotificationMsg msg, TransportServiceCallback<Void> callback) { 575 public void notifyAboutUplink(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.UplinkNotificationMsg msg, TransportServiceCallback<Void> callback) {
576 -  
577 if (checkLimits(sessionInfo, msg, callback)) { 576 if (checkLimits(sessionInfo, msg, callback)) {
578 reportActivityInternal(sessionInfo); 577 reportActivityInternal(sessionInfo);
579 sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setUplinkNotificationMsg(msg).build(), callback); 578 sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setUplinkNotificationMsg(msg).build(), callback);