Commit 830d02ef6a360ba93cca3c1629f5313303b4bac1

Authored by Andrii Shvaika
1 parent 2e148f6b

Fix for HTTP and CoAP device activity

@@ -37,6 +37,7 @@ import org.thingsboard.server.gen.transport.TransportProtos; @@ -37,6 +37,7 @@ import org.thingsboard.server.gen.transport.TransportProtos;
37 import java.lang.reflect.Field; 37 import java.lang.reflect.Field;
38 import java.util.List; 38 import java.util.List;
39 import java.util.Optional; 39 import java.util.Optional;
  40 +import java.util.Set;
40 import java.util.UUID; 41 import java.util.UUID;
41 import java.util.concurrent.ConcurrentHashMap; 42 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.ConcurrentMap; 43 import java.util.concurrent.ConcurrentMap;
@@ -55,6 +56,8 @@ public class CoapTransportResource extends CoapResource { @@ -55,6 +56,8 @@ public class CoapTransportResource extends CoapResource {
55 private final Field observerField; 56 private final Field observerField;
56 private final long timeout; 57 private final long timeout;
57 private final ConcurrentMap<String, TransportProtos.SessionInfoProto> tokenToSessionIdMap = new ConcurrentHashMap<>(); 58 private final ConcurrentMap<String, TransportProtos.SessionInfoProto> tokenToSessionIdMap = new ConcurrentHashMap<>();
  59 + private final Set<UUID> rpcSubscriptions = ConcurrentHashMap.newKeySet();
  60 + private final Set<UUID> attributeSubscriptions = ConcurrentHashMap.newKeySet();
58 61
59 public CoapTransportResource(CoapTransportContext context, String name) { 62 public CoapTransportResource(CoapTransportContext context, String name) {
60 super(name); 63 super(name);
@@ -149,11 +152,13 @@ public class CoapTransportResource extends CoapResource { @@ -149,11 +152,13 @@ public class CoapTransportResource extends CoapResource {
149 transportService.process(sessionInfo, 152 transportService.process(sessionInfo,
150 transportContext.getAdaptor().convertToPostAttributes(sessionId, request), 153 transportContext.getAdaptor().convertToPostAttributes(sessionId, request),
151 new CoapOkCallback(exchange)); 154 new CoapOkCallback(exchange));
  155 + reportActivity(sessionId, sessionInfo);
152 break; 156 break;
153 case POST_TELEMETRY_REQUEST: 157 case POST_TELEMETRY_REQUEST:
154 transportService.process(sessionInfo, 158 transportService.process(sessionInfo,
155 transportContext.getAdaptor().convertToPostTelemetry(sessionId, request), 159 transportContext.getAdaptor().convertToPostTelemetry(sessionId, request),
156 new CoapOkCallback(exchange)); 160 new CoapOkCallback(exchange));
  161 + reportActivity(sessionId, sessionInfo);
157 break; 162 break;
158 case CLAIM_REQUEST: 163 case CLAIM_REQUEST:
159 transportService.process(sessionInfo, 164 transportService.process(sessionInfo,
@@ -161,6 +166,7 @@ public class CoapTransportResource extends CoapResource { @@ -161,6 +166,7 @@ public class CoapTransportResource extends CoapResource {
161 new CoapOkCallback(exchange)); 166 new CoapOkCallback(exchange));
162 break; 167 break;
163 case SUBSCRIBE_ATTRIBUTES_REQUEST: 168 case SUBSCRIBE_ATTRIBUTES_REQUEST:
  169 + attributeSubscriptions.add(sessionId);
164 advanced.setObserver(new CoapExchangeObserverProxy((ExchangeObserver) observerField.get(advanced), 170 advanced.setObserver(new CoapExchangeObserverProxy((ExchangeObserver) observerField.get(advanced),
165 registerAsyncCoapSession(exchange, request, sessionInfo, sessionId))); 171 registerAsyncCoapSession(exchange, request, sessionInfo, sessionId)));
166 transportService.process(sessionInfo, 172 transportService.process(sessionInfo,
@@ -168,6 +174,7 @@ public class CoapTransportResource extends CoapResource { @@ -168,6 +174,7 @@ public class CoapTransportResource extends CoapResource {
168 new CoapNoOpCallback(exchange)); 174 new CoapNoOpCallback(exchange));
169 break; 175 break;
170 case UNSUBSCRIBE_ATTRIBUTES_REQUEST: 176 case UNSUBSCRIBE_ATTRIBUTES_REQUEST:
  177 + attributeSubscriptions.remove(sessionId);
171 TransportProtos.SessionInfoProto attrSession = lookupAsyncSessionInfo(request); 178 TransportProtos.SessionInfoProto attrSession = lookupAsyncSessionInfo(request);
172 if (attrSession != null) { 179 if (attrSession != null) {
173 transportService.process(attrSession, 180 transportService.process(attrSession,
@@ -177,6 +184,7 @@ public class CoapTransportResource extends CoapResource { @@ -177,6 +184,7 @@ public class CoapTransportResource extends CoapResource {
177 } 184 }
178 break; 185 break;
179 case SUBSCRIBE_RPC_COMMANDS_REQUEST: 186 case SUBSCRIBE_RPC_COMMANDS_REQUEST:
  187 + rpcSubscriptions.add(sessionId);
180 advanced.setObserver(new CoapExchangeObserverProxy((ExchangeObserver) observerField.get(advanced), 188 advanced.setObserver(new CoapExchangeObserverProxy((ExchangeObserver) observerField.get(advanced),
181 registerAsyncCoapSession(exchange, request, sessionInfo, sessionId))); 189 registerAsyncCoapSession(exchange, request, sessionInfo, sessionId)));
182 transportService.process(sessionInfo, 190 transportService.process(sessionInfo,
@@ -184,13 +192,13 @@ public class CoapTransportResource extends CoapResource { @@ -184,13 +192,13 @@ public class CoapTransportResource extends CoapResource {
184 new CoapNoOpCallback(exchange)); 192 new CoapNoOpCallback(exchange));
185 break; 193 break;
186 case UNSUBSCRIBE_RPC_COMMANDS_REQUEST: 194 case UNSUBSCRIBE_RPC_COMMANDS_REQUEST:
  195 + rpcSubscriptions.remove(sessionId);
187 TransportProtos.SessionInfoProto rpcSession = lookupAsyncSessionInfo(request); 196 TransportProtos.SessionInfoProto rpcSession = lookupAsyncSessionInfo(request);
188 if (rpcSession != null) { 197 if (rpcSession != null) {
189 transportService.process(rpcSession, 198 transportService.process(rpcSession,
190 TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), 199 TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(),
191 new CoapOkCallback(exchange)); 200 new CoapOkCallback(exchange));
192 - transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);  
193 - transportService.deregisterSession(rpcSession); 201 + closeAndDeregister(sessionInfo);
194 } 202 }
195 break; 203 break;
196 case TO_DEVICE_RPC_RESPONSE: 204 case TO_DEVICE_RPC_RESPONSE:
@@ -221,6 +229,14 @@ public class CoapTransportResource extends CoapResource { @@ -221,6 +229,14 @@ public class CoapTransportResource extends CoapResource {
221 })); 229 }));
222 } 230 }
223 231
  232 + private void reportActivity(UUID sessionId, TransportProtos.SessionInfoProto sessionInfo) {
  233 + transportContext.getTransportService().process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder()
  234 + .setAttributeSubscription(attributeSubscriptions.contains(sessionId))
  235 + .setRpcSubscription(rpcSubscriptions.contains(sessionId))
  236 + .setLastActivityTime(System.currentTimeMillis())
  237 + .build(), TransportServiceCallback.EMPTY);
  238 + }
  239 +
224 private TransportProtos.SessionInfoProto lookupAsyncSessionInfo(Request request) { 240 private TransportProtos.SessionInfoProto lookupAsyncSessionInfo(Request request) {
225 String token = request.getSource().getHostAddress() + ":" + request.getSourcePort() + ":" + request.getTokenString(); 241 String token = request.getSource().getHostAddress() + ":" + request.getSourcePort() + ":" + request.getTokenString();
226 return tokenToSessionIdMap.remove(token); 242 return tokenToSessionIdMap.remove(token);
@@ -438,6 +454,9 @@ public class CoapTransportResource extends CoapResource { @@ -438,6 +454,9 @@ public class CoapTransportResource extends CoapResource {
438 private void closeAndDeregister(TransportProtos.SessionInfoProto session) { 454 private void closeAndDeregister(TransportProtos.SessionInfoProto session) {
439 transportService.process(session, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null); 455 transportService.process(session, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
440 transportService.deregisterSession(session); 456 transportService.deregisterSession(session);
  457 + UUID sessionId = new UUID(session.getSessionIdMSB(), session.getSessionIdLSB());
  458 + rpcSubscriptions.remove(sessionId);
  459 + attributeSubscriptions.remove(sessionId);
441 } 460 }
442 461
443 } 462 }
@@ -36,6 +36,7 @@ import org.thingsboard.server.common.transport.TransportContext; @@ -36,6 +36,7 @@ import org.thingsboard.server.common.transport.TransportContext;
36 import org.thingsboard.server.common.transport.TransportService; 36 import org.thingsboard.server.common.transport.TransportService;
37 import org.thingsboard.server.common.transport.TransportServiceCallback; 37 import org.thingsboard.server.common.transport.TransportServiceCallback;
38 import org.thingsboard.server.common.transport.adaptor.JsonConverter; 38 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
  39 +import org.thingsboard.server.gen.transport.TransportProtos;
39 import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; 40 import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
40 import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto; 41 import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
41 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; 42 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
@@ -102,6 +103,7 @@ public class DeviceApiController { @@ -102,6 +103,7 @@ public class DeviceApiController {
102 TransportService transportService = transportContext.getTransportService(); 103 TransportService transportService = transportContext.getTransportService();
103 transportService.process(sessionInfo, JsonConverter.convertToAttributesProto(new JsonParser().parse(json)), 104 transportService.process(sessionInfo, JsonConverter.convertToAttributesProto(new JsonParser().parse(json)),
104 new HttpOkCallback(responseWriter)); 105 new HttpOkCallback(responseWriter));
  106 + reportActivity(sessionInfo);
105 })); 107 }));
106 return responseWriter; 108 return responseWriter;
107 } 109 }
@@ -115,6 +117,7 @@ public class DeviceApiController { @@ -115,6 +117,7 @@ public class DeviceApiController {
115 TransportService transportService = transportContext.getTransportService(); 117 TransportService transportService = transportContext.getTransportService();
116 transportService.process(sessionInfo, JsonConverter.convertToTelemetryProto(new JsonParser().parse(json)), 118 transportService.process(sessionInfo, JsonConverter.convertToTelemetryProto(new JsonParser().parse(json)),
117 new HttpOkCallback(responseWriter)); 119 new HttpOkCallback(responseWriter));
  120 + reportActivity(sessionInfo);
118 })); 121 }));
119 return responseWriter; 122 return responseWriter;
120 } 123 }
@@ -274,7 +277,6 @@ public class DeviceApiController { @@ -274,7 +277,6 @@ public class DeviceApiController {
274 } 277 }
275 } 278 }
276 279
277 -  
278 private static class HttpSessionListener implements SessionMsgListener { 280 private static class HttpSessionListener implements SessionMsgListener {
279 281
280 private final DeferredResult<ResponseEntity> responseWriter; 282 private final DeferredResult<ResponseEntity> responseWriter;
@@ -308,4 +310,13 @@ public class DeviceApiController { @@ -308,4 +310,13 @@ public class DeviceApiController {
308 responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK)); 310 responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK));
309 } 311 }
310 } 312 }
  313 +
  314 + private void reportActivity(SessionInfoProto sessionInfo) {
  315 + transportContext.getTransportService().process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder()
  316 + .setAttributeSubscription(false)
  317 + .setRpcSubscription(false)
  318 + .setLastActivityTime(System.currentTimeMillis())
  319 + .build(), TransportServiceCallback.EMPTY);
  320 + }
  321 +
311 } 322 }
@@ -20,7 +20,20 @@ package org.thingsboard.server.common.transport; @@ -20,7 +20,20 @@ package org.thingsboard.server.common.transport;
20 */ 20 */
21 public interface TransportServiceCallback<T> { 21 public interface TransportServiceCallback<T> {
22 22
  23 + TransportServiceCallback<Void> EMPTY = new TransportServiceCallback<Void>() {
  24 + @Override
  25 + public void onSuccess(Void msg) {
  26 +
  27 + }
  28 +
  29 + @Override
  30 + public void onError(Throwable e) {
  31 +
  32 + }
  33 + };
  34 +
23 void onSuccess(T msg); 35 void onSuccess(T msg);
  36 +
24 void onError(Throwable e); 37 void onError(Throwable e);
25 38
26 } 39 }