Commit eb3cc332d187464bbe296ef5ee8ebb81ca5be85e

Authored by Andrii Shvaika
1 parent cb6e9127

Fix gateway/device last activity time checks

... ... @@ -49,6 +49,8 @@ message SessionInfoProto {
49 49 int64 deviceIdLSB = 7;
50 50 string deviceName = 8;
51 51 string deviceType = 9;
  52 + int64 gwSessionIdMSB = 10;
  53 + int64 gwSessionIdLSB = 11;
52 54 }
53 55
54 56 enum SessionEvent {
... ...
... ... @@ -100,7 +100,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
100 100 private volatile DeviceSessionCtx deviceSessionCtx;
101 101 private volatile GatewaySessionHandler gatewaySessionHandler;
102 102
103   - MqttTransportHandler(MqttTransportContext context,SslHandler sslHandler) {
  103 + MqttTransportHandler(MqttTransportContext context, SslHandler sslHandler) {
104 104 this.sessionId = UUID.randomUUID();
105 105 this.context = context;
106 106 this.transportService = context.getTransportService();
... ... @@ -138,32 +138,17 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
138 138 break;
139 139 case PUBLISH:
140 140 processPublish(ctx, (MqttPublishMessage) msg);
141   - transportService.reportActivity(sessionInfo);
142   - if (gatewaySessionHandler != null) {
143   - gatewaySessionHandler.reportActivity();
144   - }
145 141 break;
146 142 case SUBSCRIBE:
147 143 processSubscribe(ctx, (MqttSubscribeMessage) msg);
148   - transportService.reportActivity(sessionInfo);
149   - if (gatewaySessionHandler != null) {
150   - gatewaySessionHandler.reportActivity();
151   - }
152 144 break;
153 145 case UNSUBSCRIBE:
154 146 processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
155   - transportService.reportActivity(sessionInfo);
156   - if (gatewaySessionHandler != null) {
157   - gatewaySessionHandler.reportActivity();
158   - }
159 147 break;
160 148 case PINGREQ:
161 149 if (checkConnected(ctx, msg)) {
162 150 ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
163 151 transportService.reportActivity(sessionInfo);
164   - if (gatewaySessionHandler != null) {
165   - gatewaySessionHandler.reportActivity();
166   - }
167 152 }
168 153 break;
169 154 case DISCONNECT:
... ... @@ -174,7 +159,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
174 159 default:
175 160 break;
176 161 }
177   -
178 162 }
179 163
180 164 private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
... ... @@ -188,6 +172,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
188 172 if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
189 173 if (gatewaySessionHandler != null) {
190 174 handleGatewayPublishMsg(topicName, msgId, mqttMsg);
  175 + transportService.reportActivity(sessionInfo);
191 176 }
192 177 } else {
193 178 processDevicePublish(ctx, mqttMsg, topicName, msgId);
... ... @@ -244,6 +229,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
244 229 } else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) {
245 230 TransportProtos.ClaimDeviceMsg claimDeviceMsg = adaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg);
246 231 transportService.process(sessionInfo, claimDeviceMsg, getPubAckCallback(ctx, msgId, claimDeviceMsg));
  232 + } else {
  233 + transportService.reportActivity(sessionInfo);
247 234 }
248 235 } catch (AdaptorException e) {
249 236 log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
... ... @@ -276,6 +263,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
276 263 }
277 264 log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
278 265 List<Integer> grantedQoSList = new ArrayList<>();
  266 + boolean activityReported = false;
279 267 for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
280 268 String topic = subscription.topicName();
281 269 MqttQoS reqQoS = subscription.qualityOfService();
... ... @@ -284,11 +272,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
284 272 case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
285 273 transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null);
286 274 registerSubQoS(topic, grantedQoSList, reqQoS);
  275 + activityReported = true;
287 276 break;
288 277 }
289 278 case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: {
290 279 transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null);
291 280 registerSubQoS(topic, grantedQoSList, reqQoS);
  281 + activityReported = true;
292 282 break;
293 283 }
294 284 case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC:
... ... @@ -308,6 +298,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
308 298 grantedQoSList.add(FAILURE.value());
309 299 }
310 300 }
  301 + if (!activityReported) {
  302 + transportService.reportActivity(sessionInfo);
  303 + }
311 304 ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
312 305 }
313 306
... ... @@ -320,6 +313,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
320 313 if (!checkConnected(ctx, mqttMsg)) {
321 314 return;
322 315 }
  316 + boolean activityReported = false;
323 317 log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
324 318 for (String topicName : mqttMsg.payload().topics()) {
325 319 mqttQoSMap.remove(new MqttTopicMatcher(topicName));
... ... @@ -327,10 +321,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
327 321 switch (topicName) {
328 322 case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
329 323 transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(), null);
  324 + activityReported = true;
330 325 break;
331 326 }
332 327 case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: {
333 328 transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), null);
  329 + activityReported = true;
334 330 break;
335 331 }
336 332 }
... ... @@ -338,6 +334,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
338 334 log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
339 335 }
340 336 }
  337 + if (!activityReported) {
  338 + transportService.reportActivity(sessionInfo);
  339 + }
341 340 ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId()));
342 341 }
343 342
... ...
... ... @@ -46,6 +46,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
46 46 .setTenantIdLSB(deviceInfo.getTenantIdLSB())
47 47 .setDeviceName(deviceInfo.getDeviceName())
48 48 .setDeviceType(deviceInfo.getDeviceType())
  49 + .setGwSessionIdMSB(parent.getSessionId().getMostSignificantBits())
  50 + .setGwSessionIdLSB(parent.getSessionId().getLeastSignificantBits())
49 51 .build();
50 52 setDeviceInfo(deviceInfo);
51 53 }
... ...
... ... @@ -410,7 +410,7 @@ public class GatewaySessionHandler {
410 410 return deviceSessionCtx.nextMsgId();
411 411 }
412 412
413   - public void reportActivity() {
414   - devices.forEach((id, deviceCtx) -> transportService.reportActivity(deviceCtx.getSessionInfo()));
  413 + public UUID getSessionId() {
  414 + return sessionId;
415 415 }
416 416 }
... ...
... ... @@ -379,6 +379,7 @@ public class DefaultTransportService implements TransportService {
379 379 @Override
380 380 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg, TransportServiceCallback<Void> callback) {
381 381 if (checkLimits(sessionInfo, msg, callback)) {
  382 + reportActivityInternal(sessionInfo);
382 383 sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
383 384 .setClaimDevice(msg).build(), callback);
384 385 }
... ... @@ -401,23 +402,32 @@ public class DefaultTransportService implements TransportService {
401 402 private void checkInactivityAndReportActivity() {
402 403 long expTime = System.currentTimeMillis() - sessionInactivityTimeout;
403 404 sessions.forEach((uuid, sessionMD) -> {
404   - if (sessionMD.getLastActivityTime() < expTime) {
  405 + long lastActivityTime = sessionMD.getLastActivityTime();
  406 + TransportProtos.SessionInfoProto sessionInfo = sessionMD.getSessionInfo();
  407 + if (sessionInfo.getGwSessionIdMSB() > 0 &&
  408 + sessionInfo.getGwSessionIdLSB() > 0) {
  409 + SessionMetaData gwMetaData = sessions.get(new UUID(sessionInfo.getGwSessionIdMSB(), sessionInfo.getGwSessionIdLSB()));
  410 + if (gwMetaData != null) {
  411 + lastActivityTime = Math.max(gwMetaData.getLastActivityTime(), lastActivityTime);
  412 + }
  413 + }
  414 + if (lastActivityTime < expTime) {
405 415 if (log.isDebugEnabled()) {
406   - log.debug("[{}] Session has expired due to last activity time: {}", toSessionId(sessionMD.getSessionInfo()), sessionMD.getLastActivityTime());
  416 + log.debug("[{}] Session has expired due to last activity time: {}", toSessionId(sessionInfo), lastActivityTime);
407 417 }
408   - process(sessionMD.getSessionInfo(), getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
  418 + process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
409 419 sessions.remove(uuid);
410 420 sessionMD.getListener().onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
411 421 } else {
412   - if (sessionMD.getLastActivityTime() > sessionMD.getLastReportedActivityTime()) {
413   - final long lastActivityTime = sessionMD.getLastActivityTime();
414   - process(sessionMD.getSessionInfo(), TransportProtos.SubscriptionInfoProto.newBuilder()
  422 + if (lastActivityTime > sessionMD.getLastReportedActivityTime()) {
  423 + final long lastActivityTimeFinal = lastActivityTime;
  424 + process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder()
415 425 .setAttributeSubscription(sessionMD.isSubscribedToAttributes())
416 426 .setRpcSubscription(sessionMD.isSubscribedToRPC())
417   - .setLastActivityTime(sessionMD.getLastActivityTime()).build(), new TransportServiceCallback<Void>() {
  427 + .setLastActivityTime(lastActivityTime).build(), new TransportServiceCallback<Void>() {
418 428 @Override
419 429 public void onSuccess(Void msg) {
420   - sessionMD.setLastReportedActivityTime(lastActivityTime);
  430 + sessionMD.setLastReportedActivityTime(lastActivityTimeFinal);
421 431 }
422 432
423 433 @Override
... ...
... ... @@ -305,6 +305,7 @@
305 305 </properties>
306 306 <excludes>
307 307 <exclude>**/.env</exclude>
  308 + <exclude>**/*.env</exclude>
308 309 <exclude>**/.eslintrc</exclude>
309 310 <exclude>**/.babelrc</exclude>
310 311 <exclude>**/.jshintrc</exclude>
... ...