Commit df133e48fffee8d66c8df9338471bc5d22a9693f

Authored by YevhenBondarenko
Committed by Andrew Shvayka
1 parent 7724495c

DefaultTransportService refactored

... ... @@ -156,11 +156,15 @@ public class DefaultTbClusterService implements TbClusterService {
156 156 private TbMsg transformMsg(TbMsg tbMsg, DeviceProfile deviceProfile) {
157 157 if (deviceProfile != null) {
158 158 RuleChainId targetRuleChainId = deviceProfile.getDefaultRuleChainId();
159   - if (targetRuleChainId != null && !targetRuleChainId.equals(tbMsg.getRuleChainId())) {
160   - tbMsg = TbMsg.transformMsg(tbMsg, targetRuleChainId);
161   - }
162 159 String targetQueueName = deviceProfile.getDefaultQueueName();
163   - if (targetQueueName != null && !targetQueueName.equals(tbMsg.getQueueName())) {
  160 + boolean isRuleChainTransform = targetRuleChainId != null && !targetRuleChainId.equals(tbMsg.getRuleChainId());
  161 + boolean isQueueTransform = targetQueueName != null && !targetQueueName.equals(tbMsg.getQueueName());
  162 +
  163 + if (isRuleChainTransform && isQueueTransform) {
  164 + tbMsg = TbMsg.transformMsg(tbMsg, targetRuleChainId, targetQueueName);
  165 + } else if (isRuleChainTransform) {
  166 + tbMsg = TbMsg.transformMsg(tbMsg, targetRuleChainId);
  167 + } else if (isQueueTransform) {
164 168 tbMsg = TbMsg.transformMsg(tbMsg, targetQueueName);
165 169 }
166 170 }
... ...
... ... @@ -95,6 +95,11 @@ public final class TbMsg implements Serializable {
95 95 origMsg.data, origMsg.getRuleChainId(), null, origMsg.getCallback());
96 96 }
97 97
  98 + public static TbMsg transformMsg(TbMsg origMsg, RuleChainId ruleChainId, String queueName) {
  99 + return new TbMsg(queueName, origMsg.id, origMsg.ts, origMsg.type, origMsg.originator, origMsg.metaData, origMsg.dataType,
  100 + origMsg.data, ruleChainId, null, origMsg.getCallback());
  101 + }
  102 +
98 103 public static TbMsg newMsg(TbMsg tbMsg, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
99 104 return new TbMsg(tbMsg.getQueueName(), UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(),
100 105 tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, TbMsgCallback.EMPTY);
... ...
... ... @@ -23,7 +23,6 @@ import com.google.gson.JsonObject;
23 23 import com.google.protobuf.ByteString;
24 24 import lombok.extern.slf4j.Slf4j;
25 25 import org.springframework.beans.factory.annotation.Value;
26   -import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
27 26 import org.springframework.stereotype.Service;
28 27 import org.thingsboard.common.util.ThingsBoardThreadFactory;
29 28 import org.thingsboard.server.common.data.DeviceProfile;
... ... @@ -53,21 +52,46 @@ import org.thingsboard.server.common.transport.TransportTenantProfileCache;
53 52 import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
54 53 import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
55 54 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
56   -import org.thingsboard.server.common.transport.limits.TransportRateLimit;
57 55 import org.thingsboard.server.common.transport.limits.TransportRateLimitService;
58 56 import org.thingsboard.server.common.transport.limits.TransportRateLimitType;
59 57 import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult;
60 58 import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
61 59 import org.thingsboard.server.common.transport.util.JsonUtils;
62   -import org.thingsboard.server.gen.transport.TransportProtos;
  60 +import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
  61 +import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
  62 +import org.thingsboard.server.gen.transport.TransportProtos.EntityDeleteMsg;
  63 +import org.thingsboard.server.gen.transport.TransportProtos.EntityUpdateMsg;
  64 +import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
  65 +import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileRequestMsg;
  66 +import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileResponseMsg;
  67 +import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
  68 +import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg;
  69 +import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
  70 +import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
63 71 import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRequestMsg;
64 72 import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg;
  73 +import org.thingsboard.server.gen.transport.TransportProtos.SessionCloseNotificationProto;
  74 +import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent;
  75 +import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
  76 +import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
  77 +import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
  78 +import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
  79 +import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
  80 +import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto;
65 81 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
  82 +import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
66 83 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
  84 +import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
  85 +import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg;
67 86 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
68 87 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
69 88 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
70 89 import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
  90 +import org.thingsboard.server.gen.transport.TransportProtos.TsKvListProto;
  91 +import org.thingsboard.server.gen.transport.TransportProtos.ValidateBasicMqttCredRequestMsg;
  92 +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
  93 +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
  94 +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
71 95 import org.thingsboard.server.queue.TbQueueCallback;
72 96 import org.thingsboard.server.queue.TbQueueConsumer;
73 97 import org.thingsboard.server.queue.TbQueueMsgMetadata;
... ... @@ -237,14 +261,14 @@ public class DefaultTransportService implements TransportService {
237 261 }
238 262
239 263 @Override
240   - public void registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) {
241   - sessions.putIfAbsent(toSessionId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener));
  264 + public void registerAsyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {
  265 + sessions.putIfAbsent(toSessionId(sessionInfo), new SessionMetaData(sessionInfo, SessionType.ASYNC, listener));
242 266 }
243 267
244 268 @Override
245   - public TransportProtos.GetEntityProfileResponseMsg getRoutingInfo(TransportProtos.GetEntityProfileRequestMsg msg) {
246   - TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg =
247   - new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setEntityProfileRequestMsg(msg).build());
  269 + public GetEntityProfileResponseMsg getRoutingInfo(GetEntityProfileRequestMsg msg) {
  270 + TbProtoQueueMsg<TransportApiRequestMsg> protoMsg =
  271 + new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setEntityProfileRequestMsg(msg).build());
248 272 try {
249 273 TbProtoQueueMsg<TransportApiResponseMsg> response = transportApiRequestTemplate.send(protoMsg).get();
250 274 return response.getValue().getEntityProfileResponseMsg();
... ... @@ -254,7 +278,7 @@ public class DefaultTransportService implements TransportService {
254 278 }
255 279
256 280 @Override
257   - public void process(DeviceTransportType transportType, TransportProtos.ValidateDeviceTokenRequestMsg msg,
  281 + public void process(DeviceTransportType transportType, ValidateDeviceTokenRequestMsg msg,
258 282 TransportServiceCallback<ValidateDeviceCredentialsResponse> callback) {
259 283 log.trace("Processing msg: {}", msg);
260 284 TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(),
... ... @@ -263,7 +287,7 @@ public class DefaultTransportService implements TransportService {
263 287 }
264 288
265 289 @Override
266   - public void process(DeviceTransportType transportType, TransportProtos.ValidateBasicMqttCredRequestMsg msg,
  290 + public void process(DeviceTransportType transportType, ValidateBasicMqttCredRequestMsg msg,
267 291 TransportServiceCallback<ValidateDeviceCredentialsResponse> callback) {
268 292 log.trace("Processing msg: {}", msg);
269 293 TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(),
... ... @@ -272,7 +296,7 @@ public class DefaultTransportService implements TransportService {
272 296 }
273 297
274 298 @Override
275   - public void process(DeviceTransportType transportType, TransportProtos.ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponse> callback) {
  299 + public void process(DeviceTransportType transportType, ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponse> callback) {
276 300 log.trace("Processing msg: {}", msg);
277 301 TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(msg).build());
278 302 doProcess(transportType, protoMsg, callback);
... ... @@ -281,7 +305,7 @@ public class DefaultTransportService implements TransportService {
281 305 private void doProcess(DeviceTransportType transportType, TbProtoQueueMsg<TransportApiRequestMsg> protoMsg,
282 306 TransportServiceCallback<ValidateDeviceCredentialsResponse> callback) {
283 307 ListenableFuture<ValidateDeviceCredentialsResponse> response = Futures.transform(transportApiRequestTemplate.send(protoMsg), tmp -> {
284   - TransportProtos.ValidateDeviceCredentialsResponseMsg msg = tmp.getValue().getValidateCredResponseMsg();
  308 + ValidateDeviceCredentialsResponseMsg msg = tmp.getValue().getValidateCredResponseMsg();
285 309 ValidateDeviceCredentialsResponse.ValidateDeviceCredentialsResponseBuilder result = ValidateDeviceCredentialsResponse.builder();
286 310 if (msg.hasDeviceInfo()) {
287 311 result.credentials(msg.getCredentialsBody());
... ... @@ -304,11 +328,11 @@ public class DefaultTransportService implements TransportService {
304 328 }
305 329
306 330 @Override
307   - public void process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg requestMsg, TransportServiceCallback<GetOrCreateDeviceFromGatewayResponse> callback) {
  331 + public void process(GetOrCreateDeviceFromGatewayRequestMsg requestMsg, TransportServiceCallback<GetOrCreateDeviceFromGatewayResponse> callback) {
308 332 TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(requestMsg).build());
309 333 log.trace("Processing msg: {}", requestMsg);
310 334 ListenableFuture<GetOrCreateDeviceFromGatewayResponse> response = Futures.transform(transportApiRequestTemplate.send(protoMsg), tmp -> {
311   - TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg msg = tmp.getValue().getGetOrCreateDeviceResponseMsg();
  335 + GetOrCreateDeviceFromGatewayResponseMsg msg = tmp.getValue().getGetOrCreateDeviceResponseMsg();
312 336 GetOrCreateDeviceFromGatewayResponse.GetOrCreateDeviceFromGatewayResponseBuilder result = GetOrCreateDeviceFromGatewayResponse.builder();
313 337 if (msg.hasDeviceInfo()) {
314 338 TransportDeviceInfo tdi = getTransportDeviceInfo(msg.getDeviceInfo());
... ... @@ -323,7 +347,7 @@ public class DefaultTransportService implements TransportService {
323 347 AsyncCallbackTemplate.withCallback(response, callback::onSuccess, callback::onError, transportCallbackExecutor);
324 348 }
325 349
326   - private TransportDeviceInfo getTransportDeviceInfo(TransportProtos.DeviceInfoProto di) {
  350 + private TransportDeviceInfo getTransportDeviceInfo(DeviceInfoProto di) {
327 351 TransportDeviceInfo tdi = new TransportDeviceInfo();
328 352 tdi.setTenantId(new TenantId(new UUID(di.getTenantIdMSB(), di.getTenantIdLSB())));
329 353 tdi.setDeviceId(new DeviceId(new UUID(di.getDeviceIdMSB(), di.getDeviceIdLSB())));
... ... @@ -345,7 +369,7 @@ public class DefaultTransportService implements TransportService {
345 369 }
346 370
347 371 @Override
348   - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
  372 + public void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
349 373 if (log.isTraceEnabled()) {
350 374 log.trace("[{}] Processing msg: {}", toSessionId(sessionInfo), msg);
351 375 }
... ... @@ -354,7 +378,7 @@ public class DefaultTransportService implements TransportService {
354 378 }
355 379
356 380 @Override
357   - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback) {
  381 + public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
358 382 if (checkLimits(sessionInfo, msg, callback)) {
359 383 reportActivityInternal(sessionInfo);
360 384 sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
... ... @@ -363,9 +387,9 @@ public class DefaultTransportService implements TransportService {
363 387 }
364 388
365 389 @Override
366   - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
  390 + public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
367 391 int dataPoints = 0;
368   - for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) {
  392 + for (TsKvListProto tsKv : msg.getTsKvListList()) {
369 393 dataPoints += tsKv.getKvCount();
370 394 }
371 395 if (checkLimits(sessionInfo, msg, callback, dataPoints, TELEMETRY)) {
... ... @@ -373,22 +397,19 @@ public class DefaultTransportService implements TransportService {
373 397 TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
374 398 DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
375 399 MsgPackCallback packCallback = new MsgPackCallback(msg.getTsKvListCount(), callback);
376   - for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) {
  400 + for (TsKvListProto tsKv : msg.getTsKvListList()) {
377 401 TbMsgMetaData metaData = new TbMsgMetaData();
378 402 metaData.putValue("deviceName", sessionInfo.getDeviceName());
379 403 metaData.putValue("deviceType", sessionInfo.getDeviceType());
380 404 metaData.putValue("ts", tsKv.getTs() + "");
381 405 JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList());
382   - RuleChainId ruleChainId = resolveRuleChainId(sessionInfo);
383   - TbMsg tbMsg = TbMsg.newMsg(ServiceQueue.MAIN, SessionMsgType.POST_TELEMETRY_REQUEST.name(),
384   - deviceId, metaData, gson.toJson(json), ruleChainId, null);
385   - sendToRuleEngine(tenantId, tbMsg, packCallback);
  406 + sendToRuleEngine(tenantId, deviceId, sessionInfo, json, metaData, SessionMsgType.POST_TELEMETRY_REQUEST, packCallback);
386 407 }
387 408 }
388 409 }
389 410
390 411 @Override
391   - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
  412 + public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
392 413 if (checkLimits(sessionInfo, msg, callback, msg.getKvCount(), TELEMETRY)) {
393 414 reportActivityInternal(sessionInfo);
394 415 TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
... ... @@ -398,15 +419,12 @@ public class DefaultTransportService implements TransportService {
398 419 metaData.putValue("deviceName", sessionInfo.getDeviceName());
399 420 metaData.putValue("deviceType", sessionInfo.getDeviceType());
400 421 metaData.putValue("notifyDevice", "false");
401   - RuleChainId ruleChainId = resolveRuleChainId(sessionInfo);
402   - TbMsg tbMsg = TbMsg.newMsg(ServiceQueue.MAIN, SessionMsgType.POST_ATTRIBUTES_REQUEST.name(),
403   - deviceId, metaData, gson.toJson(json), ruleChainId, null);
404   - sendToRuleEngine(tenantId, tbMsg, new TransportTbQueueCallback(callback));
  422 + sendToRuleEngine(tenantId, deviceId, sessionInfo, json, metaData, SessionMsgType.POST_ATTRIBUTES_REQUEST, new TransportTbQueueCallback(callback));
405 423 }
406 424 }
407 425
408 426 @Override
409   - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
  427 + public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
410 428 if (checkLimits(sessionInfo, msg, callback)) {
411 429 reportActivityInternal(sessionInfo);
412 430 sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
... ... @@ -415,7 +433,7 @@ public class DefaultTransportService implements TransportService {
415 433 }
416 434
417 435 @Override
418   - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
  436 + public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
419 437 if (checkLimits(sessionInfo, msg, callback)) {
420 438 SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
421 439 sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe());
... ... @@ -425,7 +443,7 @@ public class DefaultTransportService implements TransportService {
425 443 }
426 444
427 445 @Override
428   - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
  446 + public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
429 447 if (checkLimits(sessionInfo, msg, callback)) {
430 448 SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
431 449 sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
... ... @@ -435,7 +453,7 @@ public class DefaultTransportService implements TransportService {
435 453 }
436 454
437 455 @Override
438   - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
  456 + public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
439 457 if (checkLimits(sessionInfo, msg, callback)) {
440 458 reportActivityInternal(sessionInfo);
441 459 sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
... ... @@ -450,13 +468,13 @@ public class DefaultTransportService implements TransportService {
450 468 if (md != null) {
451 469 SessionMsgListener listener = md.getListener();
452 470 transportCallbackExecutor.submit(() -> {
453   - TransportProtos.ToServerRpcResponseMsg responseMsg =
454   - TransportProtos.ToServerRpcResponseMsg.newBuilder()
  471 + ToServerRpcResponseMsg responseMsg =
  472 + ToServerRpcResponseMsg.newBuilder()
455 473 .setRequestId(data.getRequestId())
456 474 .setError("timeout").build();
457 475 listener.onToServerRpcResponse(responseMsg);
458 476 });
459   - if (md.getSessionType() == TransportProtos.SessionType.SYNC) {
  477 + if (md.getSessionType() == SessionType.SYNC) {
460 478 deregisterSession(md.getSessionInfo());
461 479 }
462 480 } else {
... ... @@ -466,7 +484,7 @@ public class DefaultTransportService implements TransportService {
466 484 }
467 485
468 486 @Override
469   - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
  487 + public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
470 488 if (checkLimits(sessionInfo, msg, callback)) {
471 489 reportActivityInternal(sessionInfo);
472 490 UUID sessionId = toSessionId(sessionInfo);
... ... @@ -482,10 +500,9 @@ public class DefaultTransportService implements TransportService {
482 500 metaData.putValue("requestId", Integer.toString(msg.getRequestId()));
483 501 metaData.putValue("serviceId", serviceInfoProvider.getServiceId());
484 502 metaData.putValue("sessionId", sessionId.toString());
485   - RuleChainId ruleChainId = resolveRuleChainId(sessionInfo);
486   - TbMsg tbMsg = TbMsg.newMsg(ServiceQueue.MAIN, SessionMsgType.TO_SERVER_RPC_REQUEST.name(),
487   - deviceId, metaData, gson.toJson(json), ruleChainId, null);
488   - sendToRuleEngine(tenantId, tbMsg, new TransportTbQueueCallback(callback));
  503 +
  504 + sendToRuleEngine(tenantId, deviceId, sessionInfo, json, metaData,
  505 + SessionMsgType.TO_SERVER_RPC_REQUEST, new TransportTbQueueCallback(callback));
489 506 String requestId = sessionId + "-" + msg.getRequestId();
490 507 toServerRpcPendingMap.put(requestId, new RpcRequestMetadata(sessionId, msg.getRequestId()));
491 508 schedulerExecutor.schedule(() -> processTimeout(requestId), clientSideRpcTimeout, TimeUnit.MILLISECONDS);
... ... @@ -493,7 +510,7 @@ public class DefaultTransportService implements TransportService {
493 510 }
494 511
495 512 @Override
496   - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg, TransportServiceCallback<Void> callback) {
  513 + public void process(SessionInfoProto sessionInfo, ClaimDeviceMsg msg, TransportServiceCallback<Void> callback) {
497 514 if (checkLimits(sessionInfo, msg, callback)) {
498 515 reportActivityInternal(sessionInfo);
499 516 sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
... ... @@ -502,11 +519,11 @@ public class DefaultTransportService implements TransportService {
502 519 }
503 520
504 521 @Override
505   - public void reportActivity(TransportProtos.SessionInfoProto sessionInfo) {
  522 + public void reportActivity(SessionInfoProto sessionInfo) {
506 523 reportActivityInternal(sessionInfo);
507 524 }
508 525
509   - private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
  526 + private SessionMetaData reportActivityInternal(SessionInfoProto sessionInfo) {
510 527 UUID sessionId = toSessionId(sessionInfo);
511 528 SessionMetaData sessionMetaData = sessions.get(sessionId);
512 529 if (sessionMetaData != null) {
... ... @@ -519,7 +536,7 @@ public class DefaultTransportService implements TransportService {
519 536 long expTime = System.currentTimeMillis() - sessionInactivityTimeout;
520 537 sessions.forEach((uuid, sessionMD) -> {
521 538 long lastActivityTime = sessionMD.getLastActivityTime();
522   - TransportProtos.SessionInfoProto sessionInfo = sessionMD.getSessionInfo();
  539 + SessionInfoProto sessionInfo = sessionMD.getSessionInfo();
523 540 if (sessionInfo.getGwSessionIdMSB() > 0 &&
524 541 sessionInfo.getGwSessionIdLSB() > 0) {
525 542 SessionMetaData gwMetaData = sessions.get(new UUID(sessionInfo.getGwSessionIdMSB(), sessionInfo.getGwSessionIdLSB()));
... ... @@ -531,13 +548,13 @@ public class DefaultTransportService implements TransportService {
531 548 if (log.isDebugEnabled()) {
532 549 log.debug("[{}] Session has expired due to last activity time: {}", toSessionId(sessionInfo), lastActivityTime);
533 550 }
534   - process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
  551 + process(sessionInfo, getSessionEventMsg(SessionEvent.CLOSED), null);
535 552 sessions.remove(uuid);
536   - sessionMD.getListener().onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
  553 + sessionMD.getListener().onRemoteSessionCloseCommand(SessionCloseNotificationProto.getDefaultInstance());
537 554 } else {
538 555 if (lastActivityTime > sessionMD.getLastReportedActivityTime()) {
539 556 final long lastActivityTimeFinal = lastActivityTime;
540   - process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder()
  557 + process(sessionInfo, SubscriptionInfoProto.newBuilder()
541 558 .setAttributeSubscription(sessionMD.isSubscribedToAttributes())
542 559 .setRpcSubscription(sessionMD.isSubscribedToRPC())
543 560 .setLastActivityTime(lastActivityTime).build(), new TransportServiceCallback<Void>() {
... ... @@ -557,12 +574,12 @@ public class DefaultTransportService implements TransportService {
557 574 }
558 575
559 576 @Override
560   - public void registerSyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout) {
561   - SessionMetaData currentSession = new SessionMetaData(sessionInfo, TransportProtos.SessionType.SYNC, listener);
  577 + public void registerSyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout) {
  578 + SessionMetaData currentSession = new SessionMetaData(sessionInfo, SessionType.SYNC, listener);
562 579 sessions.putIfAbsent(toSessionId(sessionInfo), currentSession);
563 580
564 581 ScheduledFuture executorFuture = schedulerExecutor.schedule(() -> {
565   - listener.onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
  582 + listener.onRemoteSessionCloseCommand(SessionCloseNotificationProto.getDefaultInstance());
566 583 deregisterSession(sessionInfo);
567 584 }, timeout, TimeUnit.MILLISECONDS);
568 585
... ... @@ -570,7 +587,7 @@ public class DefaultTransportService implements TransportService {
570 587 }
571 588
572 589 @Override
573   - public void deregisterSession(TransportProtos.SessionInfoProto sessionInfo) {
  590 + public void deregisterSession(SessionInfoProto sessionInfo) {
574 591 SessionMetaData currentSession = sessions.get(toSessionId(sessionInfo));
575 592 if (currentSession != null && currentSession.hasScheduledFuture()) {
576 593 log.debug("Stopping scheduler to avoid resending response if request has been ack.");
... ... @@ -583,12 +600,12 @@ public class DefaultTransportService implements TransportService {
583 600 private TransportRateLimitType[] TELEMETRY = TransportRateLimitType.values();
584 601
585 602 @Override
586   - public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback) {
  603 + public boolean checkLimits(SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback) {
587 604 return checkLimits(sessionInfo, msg, callback, 0, DEFAULT);
588 605 }
589 606
590 607 @Override
591   - public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback, int dataPoints, TransportRateLimitType... limits) {
  608 + public boolean checkLimits(SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback, int dataPoints, TransportRateLimitType... limits) {
592 609 if (log.isTraceEnabled()) {
593 610 log.trace("[{}] Processing msg: {}", toSessionId(sessionInfo), msg);
594 611 }
... ... @@ -609,7 +626,7 @@ public class DefaultTransportService implements TransportService {
609 626 }
610 627 }
611 628
612   - protected void processToTransportMsg(TransportProtos.ToTransportMsg toSessionMsg) {
  629 + protected void processToTransportMsg(ToTransportMsg toSessionMsg) {
613 630 UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB());
614 631 SessionMetaData md = sessions.get(sessionId);
615 632 if (md != null) {
... ... @@ -633,12 +650,12 @@ public class DefaultTransportService implements TransportService {
633 650 listener.onToServerRpcResponse(toSessionMsg.getToServerResponse());
634 651 }
635 652 });
636   - if (md.getSessionType() == TransportProtos.SessionType.SYNC) {
  653 + if (md.getSessionType() == SessionType.SYNC) {
637 654 deregisterSession(md.getSessionInfo());
638 655 }
639 656 } else {
640 657 if (toSessionMsg.hasEntityUpdateMsg()) {
641   - TransportProtos.EntityUpdateMsg msg = toSessionMsg.getEntityUpdateMsg();
  658 + EntityUpdateMsg msg = toSessionMsg.getEntityUpdateMsg();
642 659 EntityType entityType = EntityType.valueOf(msg.getEntityType());
643 660 if (EntityType.DEVICE_PROFILE.equals(entityType)) {
644 661 DeviceProfile deviceProfile = deviceProfileCache.put(msg.getData());
... ... @@ -659,7 +676,7 @@ public class DefaultTransportService implements TransportService {
659 676 }
660 677 }
661 678 } else if (toSessionMsg.hasEntityDeleteMsg()) {
662   - TransportProtos.EntityDeleteMsg msg = toSessionMsg.getEntityDeleteMsg();
  679 + EntityDeleteMsg msg = toSessionMsg.getEntityDeleteMsg();
663 680 EntityType entityType = EntityType.valueOf(msg.getEntityType());
664 681 UUID entityUuid = new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB());
665 682 if (EntityType.DEVICE_PROFILE.equals(entityType)) {
... ... @@ -690,29 +707,29 @@ public class DefaultTransportService implements TransportService {
690 707 });
691 708 }
692 709
693   - protected UUID toSessionId(TransportProtos.SessionInfoProto sessionInfo) {
  710 + protected UUID toSessionId(SessionInfoProto sessionInfo) {
694 711 return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
695 712 }
696 713
697   - protected UUID getRoutingKey(TransportProtos.SessionInfoProto sessionInfo) {
  714 + protected UUID getRoutingKey(SessionInfoProto sessionInfo) {
698 715 return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB());
699 716 }
700 717
701   - protected TenantId getTenantId(TransportProtos.SessionInfoProto sessionInfo) {
  718 + protected TenantId getTenantId(SessionInfoProto sessionInfo) {
702 719 return new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
703 720 }
704 721
705   - protected DeviceId getDeviceId(TransportProtos.SessionInfoProto sessionInfo) {
  722 + protected DeviceId getDeviceId(SessionInfoProto sessionInfo) {
706 723 return new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
707 724 }
708 725
709   - public static TransportProtos.SessionEventMsg getSessionEventMsg(TransportProtos.SessionEvent event) {
710   - return TransportProtos.SessionEventMsg.newBuilder()
711   - .setSessionType(TransportProtos.SessionType.ASYNC)
  726 + public static SessionEventMsg getSessionEventMsg(SessionEvent event) {
  727 + return SessionEventMsg.newBuilder()
  728 + .setSessionType(SessionType.ASYNC)
712 729 .setEvent(event).build();
713 730 }
714 731
715   - protected void sendToDeviceActor(TransportProtos.SessionInfoProto sessionInfo, TransportToDeviceActorMsg toDeviceActorMsg, TransportServiceCallback<Void> callback) {
  732 + protected void sendToDeviceActor(SessionInfoProto sessionInfo, TransportToDeviceActorMsg toDeviceActorMsg, TransportServiceCallback<Void> callback) {
716 733 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, getTenantId(sessionInfo), getDeviceId(sessionInfo));
717 734 if (log.isTraceEnabled()) {
718 735 log.trace("[{}][{}] Pushing to topic {} message {}", getTenantId(sessionInfo), getDeviceId(sessionInfo), tpi.getFullTopicName(), toDeviceActorMsg);
... ... @@ -740,17 +757,25 @@ public class DefaultTransportService implements TransportService {
740 757 ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), wrappedCallback);
741 758 }
742 759
743   - private RuleChainId resolveRuleChainId(TransportProtos.SessionInfoProto sessionInfo) {
  760 + protected void sendToRuleEngine(TenantId tenantId, DeviceId deviceId, SessionInfoProto sessionInfo, JsonObject json,
  761 + TbMsgMetaData metaData, SessionMsgType sessionMsgType, TbQueueCallback callback) {
744 762 DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(sessionInfo.getDeviceProfileIdMSB(), sessionInfo.getDeviceProfileIdLSB()));
745 763 DeviceProfile deviceProfile = deviceProfileCache.get(deviceProfileId);
746 764 RuleChainId ruleChainId;
  765 + String queueName;
  766 +
747 767 if (deviceProfile == null) {
748 768 log.warn("[{}] Device profile is null!", deviceProfileId);
749 769 ruleChainId = null;
  770 + queueName = ServiceQueue.MAIN;
750 771 } else {
751 772 ruleChainId = deviceProfile.getDefaultRuleChainId();
  773 + String defaultQueueName = deviceProfile.getDefaultQueueName();
  774 + queueName = defaultQueueName != null ? defaultQueueName : ServiceQueue.MAIN;
752 775 }
753   - return ruleChainId;
  776 +
  777 + TbMsg tbMsg = TbMsg.newMsg(queueName, sessionMsgType.name(), deviceId, metaData, gson.toJson(json), ruleChainId, null);
  778 + sendToRuleEngine(tenantId, tbMsg, callback);
754 779 }
755 780
756 781 private class TransportTbQueueCallback implements TbQueueCallback {
... ...