Commit 2211cff1e9e35e3a98a5d4fa4cf52e1a0c6193c4

Authored by YevhenBondarenko
Committed by Andrew Shvayka
1 parent a0d39a3e

revert DefaultTransportService

@@ -57,41 +57,15 @@ import org.thingsboard.server.common.transport.limits.TransportRateLimitType; @@ -57,41 +57,15 @@ import org.thingsboard.server.common.transport.limits.TransportRateLimitType;
57 import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult; 57 import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult;
58 import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; 58 import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
59 import org.thingsboard.server.common.transport.util.JsonUtils; 59 import org.thingsboard.server.common.transport.util.JsonUtils;
60 -import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;  
61 -import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;  
62 -import org.thingsboard.server.gen.transport.TransportProtos.EntityDeleteMsg;  
63 -import org.thingsboard.server.gen.transport.TransportProtos.EntityUpdateMsg;  
64 -import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;  
65 -import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileRequestMsg;  
66 -import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileResponseMsg;  
67 -import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;  
68 -import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg;  
69 -import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;  
70 -import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg; 60 +import org.thingsboard.server.gen.transport.TransportProtos;
71 import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRequestMsg; 61 import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRequestMsg;
72 import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg; 62 import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg;
73 -import org.thingsboard.server.gen.transport.TransportProtos.SessionCloseNotificationProto;  
74 -import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent;  
75 -import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;  
76 -import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;  
77 -import org.thingsboard.server.gen.transport.TransportProtos.SessionType;  
78 -import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;  
79 -import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;  
80 -import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto;  
81 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; 63 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
82 -import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;  
83 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; 64 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
84 -import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;  
85 -import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg;  
86 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; 65 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
87 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; 66 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
88 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; 67 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
89 import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; 68 import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
90 -import org.thingsboard.server.gen.transport.TransportProtos.TsKvListProto;  
91 -import org.thingsboard.server.gen.transport.TransportProtos.ValidateBasicMqttCredRequestMsg;  
92 -import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;  
93 -import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;  
94 -import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;  
95 import org.thingsboard.server.queue.TbQueueCallback; 69 import org.thingsboard.server.queue.TbQueueCallback;
96 import org.thingsboard.server.queue.TbQueueConsumer; 70 import org.thingsboard.server.queue.TbQueueConsumer;
97 import org.thingsboard.server.queue.TbQueueMsgMetadata; 71 import org.thingsboard.server.queue.TbQueueMsgMetadata;
@@ -261,14 +235,14 @@ public class DefaultTransportService implements TransportService { @@ -261,14 +235,14 @@ public class DefaultTransportService implements TransportService {
261 } 235 }
262 236
263 @Override 237 @Override
264 - public void registerAsyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {  
265 - sessions.putIfAbsent(toSessionId(sessionInfo), new SessionMetaData(sessionInfo, SessionType.ASYNC, listener)); 238 + public void registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) {
  239 + sessions.putIfAbsent(toSessionId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener));
266 } 240 }
267 241
268 @Override 242 @Override
269 - public GetEntityProfileResponseMsg getRoutingInfo(GetEntityProfileRequestMsg msg) {  
270 - TbProtoQueueMsg<TransportApiRequestMsg> protoMsg =  
271 - new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setEntityProfileRequestMsg(msg).build()); 243 + public TransportProtos.GetEntityProfileResponseMsg getRoutingInfo(TransportProtos.GetEntityProfileRequestMsg msg) {
  244 + TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg =
  245 + new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setEntityProfileRequestMsg(msg).build());
272 try { 246 try {
273 TbProtoQueueMsg<TransportApiResponseMsg> response = transportApiRequestTemplate.send(protoMsg).get(); 247 TbProtoQueueMsg<TransportApiResponseMsg> response = transportApiRequestTemplate.send(protoMsg).get();
274 return response.getValue().getEntityProfileResponseMsg(); 248 return response.getValue().getEntityProfileResponseMsg();
@@ -278,7 +252,7 @@ public class DefaultTransportService implements TransportService { @@ -278,7 +252,7 @@ public class DefaultTransportService implements TransportService {
278 } 252 }
279 253
280 @Override 254 @Override
281 - public void process(DeviceTransportType transportType, ValidateDeviceTokenRequestMsg msg, 255 + public void process(DeviceTransportType transportType, TransportProtos.ValidateDeviceTokenRequestMsg msg,
282 TransportServiceCallback<ValidateDeviceCredentialsResponse> callback) { 256 TransportServiceCallback<ValidateDeviceCredentialsResponse> callback) {
283 log.trace("Processing msg: {}", msg); 257 log.trace("Processing msg: {}", msg);
284 TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), 258 TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(),
@@ -287,7 +261,7 @@ public class DefaultTransportService implements TransportService { @@ -287,7 +261,7 @@ public class DefaultTransportService implements TransportService {
287 } 261 }
288 262
289 @Override 263 @Override
290 - public void process(DeviceTransportType transportType, ValidateBasicMqttCredRequestMsg msg, 264 + public void process(DeviceTransportType transportType, TransportProtos.ValidateBasicMqttCredRequestMsg msg,
291 TransportServiceCallback<ValidateDeviceCredentialsResponse> callback) { 265 TransportServiceCallback<ValidateDeviceCredentialsResponse> callback) {
292 log.trace("Processing msg: {}", msg); 266 log.trace("Processing msg: {}", msg);
293 TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), 267 TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(),
@@ -296,7 +270,7 @@ public class DefaultTransportService implements TransportService { @@ -296,7 +270,7 @@ public class DefaultTransportService implements TransportService {
296 } 270 }
297 271
298 @Override 272 @Override
299 - public void process(DeviceTransportType transportType, ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponse> callback) { 273 + public void process(DeviceTransportType transportType, TransportProtos.ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponse> callback) {
300 log.trace("Processing msg: {}", msg); 274 log.trace("Processing msg: {}", msg);
301 TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(msg).build()); 275 TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(msg).build());
302 doProcess(transportType, protoMsg, callback); 276 doProcess(transportType, protoMsg, callback);
@@ -305,7 +279,7 @@ public class DefaultTransportService implements TransportService { @@ -305,7 +279,7 @@ public class DefaultTransportService implements TransportService {
305 private void doProcess(DeviceTransportType transportType, TbProtoQueueMsg<TransportApiRequestMsg> protoMsg, 279 private void doProcess(DeviceTransportType transportType, TbProtoQueueMsg<TransportApiRequestMsg> protoMsg,
306 TransportServiceCallback<ValidateDeviceCredentialsResponse> callback) { 280 TransportServiceCallback<ValidateDeviceCredentialsResponse> callback) {
307 ListenableFuture<ValidateDeviceCredentialsResponse> response = Futures.transform(transportApiRequestTemplate.send(protoMsg), tmp -> { 281 ListenableFuture<ValidateDeviceCredentialsResponse> response = Futures.transform(transportApiRequestTemplate.send(protoMsg), tmp -> {
308 - ValidateDeviceCredentialsResponseMsg msg = tmp.getValue().getValidateCredResponseMsg(); 282 + TransportProtos.ValidateDeviceCredentialsResponseMsg msg = tmp.getValue().getValidateCredResponseMsg();
309 ValidateDeviceCredentialsResponse.ValidateDeviceCredentialsResponseBuilder result = ValidateDeviceCredentialsResponse.builder(); 283 ValidateDeviceCredentialsResponse.ValidateDeviceCredentialsResponseBuilder result = ValidateDeviceCredentialsResponse.builder();
310 if (msg.hasDeviceInfo()) { 284 if (msg.hasDeviceInfo()) {
311 result.credentials(msg.getCredentialsBody()); 285 result.credentials(msg.getCredentialsBody());
@@ -328,11 +302,11 @@ public class DefaultTransportService implements TransportService { @@ -328,11 +302,11 @@ public class DefaultTransportService implements TransportService {
328 } 302 }
329 303
330 @Override 304 @Override
331 - public void process(GetOrCreateDeviceFromGatewayRequestMsg requestMsg, TransportServiceCallback<GetOrCreateDeviceFromGatewayResponse> callback) { 305 + public void process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg requestMsg, TransportServiceCallback<GetOrCreateDeviceFromGatewayResponse> callback) {
332 TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(requestMsg).build()); 306 TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(requestMsg).build());
333 log.trace("Processing msg: {}", requestMsg); 307 log.trace("Processing msg: {}", requestMsg);
334 ListenableFuture<GetOrCreateDeviceFromGatewayResponse> response = Futures.transform(transportApiRequestTemplate.send(protoMsg), tmp -> { 308 ListenableFuture<GetOrCreateDeviceFromGatewayResponse> response = Futures.transform(transportApiRequestTemplate.send(protoMsg), tmp -> {
335 - GetOrCreateDeviceFromGatewayResponseMsg msg = tmp.getValue().getGetOrCreateDeviceResponseMsg(); 309 + TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg msg = tmp.getValue().getGetOrCreateDeviceResponseMsg();
336 GetOrCreateDeviceFromGatewayResponse.GetOrCreateDeviceFromGatewayResponseBuilder result = GetOrCreateDeviceFromGatewayResponse.builder(); 310 GetOrCreateDeviceFromGatewayResponse.GetOrCreateDeviceFromGatewayResponseBuilder result = GetOrCreateDeviceFromGatewayResponse.builder();
337 if (msg.hasDeviceInfo()) { 311 if (msg.hasDeviceInfo()) {
338 TransportDeviceInfo tdi = getTransportDeviceInfo(msg.getDeviceInfo()); 312 TransportDeviceInfo tdi = getTransportDeviceInfo(msg.getDeviceInfo());
@@ -347,7 +321,7 @@ public class DefaultTransportService implements TransportService { @@ -347,7 +321,7 @@ public class DefaultTransportService implements TransportService {
347 AsyncCallbackTemplate.withCallback(response, callback::onSuccess, callback::onError, transportCallbackExecutor); 321 AsyncCallbackTemplate.withCallback(response, callback::onSuccess, callback::onError, transportCallbackExecutor);
348 } 322 }
349 323
350 - private TransportDeviceInfo getTransportDeviceInfo(DeviceInfoProto di) { 324 + private TransportDeviceInfo getTransportDeviceInfo(TransportProtos.DeviceInfoProto di) {
351 TransportDeviceInfo tdi = new TransportDeviceInfo(); 325 TransportDeviceInfo tdi = new TransportDeviceInfo();
352 tdi.setTenantId(new TenantId(new UUID(di.getTenantIdMSB(), di.getTenantIdLSB()))); 326 tdi.setTenantId(new TenantId(new UUID(di.getTenantIdMSB(), di.getTenantIdLSB())));
353 tdi.setDeviceId(new DeviceId(new UUID(di.getDeviceIdMSB(), di.getDeviceIdLSB()))); 327 tdi.setDeviceId(new DeviceId(new UUID(di.getDeviceIdMSB(), di.getDeviceIdLSB())));
@@ -369,7 +343,7 @@ public class DefaultTransportService implements TransportService { @@ -369,7 +343,7 @@ public class DefaultTransportService implements TransportService {
369 } 343 }
370 344
371 @Override 345 @Override
372 - public void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) { 346 + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
373 if (log.isTraceEnabled()) { 347 if (log.isTraceEnabled()) {
374 log.trace("[{}] Processing msg: {}", toSessionId(sessionInfo), msg); 348 log.trace("[{}] Processing msg: {}", toSessionId(sessionInfo), msg);
375 } 349 }
@@ -378,7 +352,7 @@ public class DefaultTransportService implements TransportService { @@ -378,7 +352,7 @@ public class DefaultTransportService implements TransportService {
378 } 352 }
379 353
380 @Override 354 @Override
381 - public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) { 355 + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback) {
382 if (checkLimits(sessionInfo, msg, callback)) { 356 if (checkLimits(sessionInfo, msg, callback)) {
383 reportActivityInternal(sessionInfo); 357 reportActivityInternal(sessionInfo);
384 sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) 358 sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
@@ -387,9 +361,9 @@ public class DefaultTransportService implements TransportService { @@ -387,9 +361,9 @@ public class DefaultTransportService implements TransportService {
387 } 361 }
388 362
389 @Override 363 @Override
390 - public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) { 364 + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
391 int dataPoints = 0; 365 int dataPoints = 0;
392 - for (TsKvListProto tsKv : msg.getTsKvListList()) { 366 + for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) {
393 dataPoints += tsKv.getKvCount(); 367 dataPoints += tsKv.getKvCount();
394 } 368 }
395 if (checkLimits(sessionInfo, msg, callback, dataPoints, TELEMETRY)) { 369 if (checkLimits(sessionInfo, msg, callback, dataPoints, TELEMETRY)) {
@@ -397,19 +371,20 @@ public class DefaultTransportService implements TransportService { @@ -397,19 +371,20 @@ public class DefaultTransportService implements TransportService {
397 TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); 371 TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
398 DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); 372 DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
399 MsgPackCallback packCallback = new MsgPackCallback(msg.getTsKvListCount(), callback); 373 MsgPackCallback packCallback = new MsgPackCallback(msg.getTsKvListCount(), callback);
400 - for (TsKvListProto tsKv : msg.getTsKvListList()) { 374 + for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) {
401 TbMsgMetaData metaData = new TbMsgMetaData(); 375 TbMsgMetaData metaData = new TbMsgMetaData();
402 metaData.putValue("deviceName", sessionInfo.getDeviceName()); 376 metaData.putValue("deviceName", sessionInfo.getDeviceName());
403 metaData.putValue("deviceType", sessionInfo.getDeviceType()); 377 metaData.putValue("deviceType", sessionInfo.getDeviceType());
404 metaData.putValue("ts", tsKv.getTs() + ""); 378 metaData.putValue("ts", tsKv.getTs() + "");
405 JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList()); 379 JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList());
406 sendToRuleEngine(tenantId, deviceId, sessionInfo, json, metaData, SessionMsgType.POST_TELEMETRY_REQUEST, packCallback); 380 sendToRuleEngine(tenantId, deviceId, sessionInfo, json, metaData, SessionMsgType.POST_TELEMETRY_REQUEST, packCallback);
  381 +
407 } 382 }
408 } 383 }
409 } 384 }
410 385
411 @Override 386 @Override
412 - public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) { 387 + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
413 if (checkLimits(sessionInfo, msg, callback, msg.getKvCount(), TELEMETRY)) { 388 if (checkLimits(sessionInfo, msg, callback, msg.getKvCount(), TELEMETRY)) {
414 reportActivityInternal(sessionInfo); 389 reportActivityInternal(sessionInfo);
415 TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); 390 TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
@@ -424,7 +399,7 @@ public class DefaultTransportService implements TransportService { @@ -424,7 +399,7 @@ public class DefaultTransportService implements TransportService {
424 } 399 }
425 400
426 @Override 401 @Override
427 - public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) { 402 + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
428 if (checkLimits(sessionInfo, msg, callback)) { 403 if (checkLimits(sessionInfo, msg, callback)) {
429 reportActivityInternal(sessionInfo); 404 reportActivityInternal(sessionInfo);
430 sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) 405 sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
@@ -433,7 +408,7 @@ public class DefaultTransportService implements TransportService { @@ -433,7 +408,7 @@ public class DefaultTransportService implements TransportService {
433 } 408 }
434 409
435 @Override 410 @Override
436 - public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) { 411 + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
437 if (checkLimits(sessionInfo, msg, callback)) { 412 if (checkLimits(sessionInfo, msg, callback)) {
438 SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo); 413 SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
439 sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe()); 414 sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe());
@@ -443,7 +418,7 @@ public class DefaultTransportService implements TransportService { @@ -443,7 +418,7 @@ public class DefaultTransportService implements TransportService {
443 } 418 }
444 419
445 @Override 420 @Override
446 - public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) { 421 + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
447 if (checkLimits(sessionInfo, msg, callback)) { 422 if (checkLimits(sessionInfo, msg, callback)) {
448 SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo); 423 SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
449 sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe()); 424 sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
@@ -453,7 +428,7 @@ public class DefaultTransportService implements TransportService { @@ -453,7 +428,7 @@ public class DefaultTransportService implements TransportService {
453 } 428 }
454 429
455 @Override 430 @Override
456 - public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) { 431 + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
457 if (checkLimits(sessionInfo, msg, callback)) { 432 if (checkLimits(sessionInfo, msg, callback)) {
458 reportActivityInternal(sessionInfo); 433 reportActivityInternal(sessionInfo);
459 sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) 434 sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
@@ -468,13 +443,13 @@ public class DefaultTransportService implements TransportService { @@ -468,13 +443,13 @@ public class DefaultTransportService implements TransportService {
468 if (md != null) { 443 if (md != null) {
469 SessionMsgListener listener = md.getListener(); 444 SessionMsgListener listener = md.getListener();
470 transportCallbackExecutor.submit(() -> { 445 transportCallbackExecutor.submit(() -> {
471 - ToServerRpcResponseMsg responseMsg =  
472 - ToServerRpcResponseMsg.newBuilder() 446 + TransportProtos.ToServerRpcResponseMsg responseMsg =
  447 + TransportProtos.ToServerRpcResponseMsg.newBuilder()
473 .setRequestId(data.getRequestId()) 448 .setRequestId(data.getRequestId())
474 .setError("timeout").build(); 449 .setError("timeout").build();
475 listener.onToServerRpcResponse(responseMsg); 450 listener.onToServerRpcResponse(responseMsg);
476 }); 451 });
477 - if (md.getSessionType() == SessionType.SYNC) { 452 + if (md.getSessionType() == TransportProtos.SessionType.SYNC) {
478 deregisterSession(md.getSessionInfo()); 453 deregisterSession(md.getSessionInfo());
479 } 454 }
480 } else { 455 } else {
@@ -484,7 +459,7 @@ public class DefaultTransportService implements TransportService { @@ -484,7 +459,7 @@ public class DefaultTransportService implements TransportService {
484 } 459 }
485 460
486 @Override 461 @Override
487 - public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) { 462 + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
488 if (checkLimits(sessionInfo, msg, callback)) { 463 if (checkLimits(sessionInfo, msg, callback)) {
489 reportActivityInternal(sessionInfo); 464 reportActivityInternal(sessionInfo);
490 UUID sessionId = toSessionId(sessionInfo); 465 UUID sessionId = toSessionId(sessionInfo);
@@ -500,7 +475,6 @@ public class DefaultTransportService implements TransportService { @@ -500,7 +475,6 @@ public class DefaultTransportService implements TransportService {
500 metaData.putValue("requestId", Integer.toString(msg.getRequestId())); 475 metaData.putValue("requestId", Integer.toString(msg.getRequestId()));
501 metaData.putValue("serviceId", serviceInfoProvider.getServiceId()); 476 metaData.putValue("serviceId", serviceInfoProvider.getServiceId());
502 metaData.putValue("sessionId", sessionId.toString()); 477 metaData.putValue("sessionId", sessionId.toString());
503 -  
504 sendToRuleEngine(tenantId, deviceId, sessionInfo, json, metaData, 478 sendToRuleEngine(tenantId, deviceId, sessionInfo, json, metaData,
505 SessionMsgType.TO_SERVER_RPC_REQUEST, new TransportTbQueueCallback(callback)); 479 SessionMsgType.TO_SERVER_RPC_REQUEST, new TransportTbQueueCallback(callback));
506 String requestId = sessionId + "-" + msg.getRequestId(); 480 String requestId = sessionId + "-" + msg.getRequestId();
@@ -510,7 +484,7 @@ public class DefaultTransportService implements TransportService { @@ -510,7 +484,7 @@ public class DefaultTransportService implements TransportService {
510 } 484 }
511 485
512 @Override 486 @Override
513 - public void process(SessionInfoProto sessionInfo, ClaimDeviceMsg msg, TransportServiceCallback<Void> callback) { 487 + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg, TransportServiceCallback<Void> callback) {
514 if (checkLimits(sessionInfo, msg, callback)) { 488 if (checkLimits(sessionInfo, msg, callback)) {
515 reportActivityInternal(sessionInfo); 489 reportActivityInternal(sessionInfo);
516 sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) 490 sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
@@ -519,11 +493,11 @@ public class DefaultTransportService implements TransportService { @@ -519,11 +493,11 @@ public class DefaultTransportService implements TransportService {
519 } 493 }
520 494
521 @Override 495 @Override
522 - public void reportActivity(SessionInfoProto sessionInfo) { 496 + public void reportActivity(TransportProtos.SessionInfoProto sessionInfo) {
523 reportActivityInternal(sessionInfo); 497 reportActivityInternal(sessionInfo);
524 } 498 }
525 499
526 - private SessionMetaData reportActivityInternal(SessionInfoProto sessionInfo) { 500 + private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
527 UUID sessionId = toSessionId(sessionInfo); 501 UUID sessionId = toSessionId(sessionInfo);
528 SessionMetaData sessionMetaData = sessions.get(sessionId); 502 SessionMetaData sessionMetaData = sessions.get(sessionId);
529 if (sessionMetaData != null) { 503 if (sessionMetaData != null) {
@@ -536,7 +510,7 @@ public class DefaultTransportService implements TransportService { @@ -536,7 +510,7 @@ public class DefaultTransportService implements TransportService {
536 long expTime = System.currentTimeMillis() - sessionInactivityTimeout; 510 long expTime = System.currentTimeMillis() - sessionInactivityTimeout;
537 sessions.forEach((uuid, sessionMD) -> { 511 sessions.forEach((uuid, sessionMD) -> {
538 long lastActivityTime = sessionMD.getLastActivityTime(); 512 long lastActivityTime = sessionMD.getLastActivityTime();
539 - SessionInfoProto sessionInfo = sessionMD.getSessionInfo(); 513 + TransportProtos.SessionInfoProto sessionInfo = sessionMD.getSessionInfo();
540 if (sessionInfo.getGwSessionIdMSB() > 0 && 514 if (sessionInfo.getGwSessionIdMSB() > 0 &&
541 sessionInfo.getGwSessionIdLSB() > 0) { 515 sessionInfo.getGwSessionIdLSB() > 0) {
542 SessionMetaData gwMetaData = sessions.get(new UUID(sessionInfo.getGwSessionIdMSB(), sessionInfo.getGwSessionIdLSB())); 516 SessionMetaData gwMetaData = sessions.get(new UUID(sessionInfo.getGwSessionIdMSB(), sessionInfo.getGwSessionIdLSB()));
@@ -548,13 +522,13 @@ public class DefaultTransportService implements TransportService { @@ -548,13 +522,13 @@ public class DefaultTransportService implements TransportService {
548 if (log.isDebugEnabled()) { 522 if (log.isDebugEnabled()) {
549 log.debug("[{}] Session has expired due to last activity time: {}", toSessionId(sessionInfo), lastActivityTime); 523 log.debug("[{}] Session has expired due to last activity time: {}", toSessionId(sessionInfo), lastActivityTime);
550 } 524 }
551 - process(sessionInfo, getSessionEventMsg(SessionEvent.CLOSED), null); 525 + process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
552 sessions.remove(uuid); 526 sessions.remove(uuid);
553 - sessionMD.getListener().onRemoteSessionCloseCommand(SessionCloseNotificationProto.getDefaultInstance()); 527 + sessionMD.getListener().onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
554 } else { 528 } else {
555 if (lastActivityTime > sessionMD.getLastReportedActivityTime()) { 529 if (lastActivityTime > sessionMD.getLastReportedActivityTime()) {
556 final long lastActivityTimeFinal = lastActivityTime; 530 final long lastActivityTimeFinal = lastActivityTime;
557 - process(sessionInfo, SubscriptionInfoProto.newBuilder() 531 + process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder()
558 .setAttributeSubscription(sessionMD.isSubscribedToAttributes()) 532 .setAttributeSubscription(sessionMD.isSubscribedToAttributes())
559 .setRpcSubscription(sessionMD.isSubscribedToRPC()) 533 .setRpcSubscription(sessionMD.isSubscribedToRPC())
560 .setLastActivityTime(lastActivityTime).build(), new TransportServiceCallback<Void>() { 534 .setLastActivityTime(lastActivityTime).build(), new TransportServiceCallback<Void>() {
@@ -574,12 +548,12 @@ public class DefaultTransportService implements TransportService { @@ -574,12 +548,12 @@ public class DefaultTransportService implements TransportService {
574 } 548 }
575 549
576 @Override 550 @Override
577 - public void registerSyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout) {  
578 - SessionMetaData currentSession = new SessionMetaData(sessionInfo, SessionType.SYNC, listener); 551 + public void registerSyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout) {
  552 + SessionMetaData currentSession = new SessionMetaData(sessionInfo, TransportProtos.SessionType.SYNC, listener);
579 sessions.putIfAbsent(toSessionId(sessionInfo), currentSession); 553 sessions.putIfAbsent(toSessionId(sessionInfo), currentSession);
580 554
581 ScheduledFuture executorFuture = schedulerExecutor.schedule(() -> { 555 ScheduledFuture executorFuture = schedulerExecutor.schedule(() -> {
582 - listener.onRemoteSessionCloseCommand(SessionCloseNotificationProto.getDefaultInstance()); 556 + listener.onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
583 deregisterSession(sessionInfo); 557 deregisterSession(sessionInfo);
584 }, timeout, TimeUnit.MILLISECONDS); 558 }, timeout, TimeUnit.MILLISECONDS);
585 559
@@ -587,7 +561,7 @@ public class DefaultTransportService implements TransportService { @@ -587,7 +561,7 @@ public class DefaultTransportService implements TransportService {
587 } 561 }
588 562
589 @Override 563 @Override
590 - public void deregisterSession(SessionInfoProto sessionInfo) { 564 + public void deregisterSession(TransportProtos.SessionInfoProto sessionInfo) {
591 SessionMetaData currentSession = sessions.get(toSessionId(sessionInfo)); 565 SessionMetaData currentSession = sessions.get(toSessionId(sessionInfo));
592 if (currentSession != null && currentSession.hasScheduledFuture()) { 566 if (currentSession != null && currentSession.hasScheduledFuture()) {
593 log.debug("Stopping scheduler to avoid resending response if request has been ack."); 567 log.debug("Stopping scheduler to avoid resending response if request has been ack.");
@@ -600,12 +574,12 @@ public class DefaultTransportService implements TransportService { @@ -600,12 +574,12 @@ public class DefaultTransportService implements TransportService {
600 private TransportRateLimitType[] TELEMETRY = TransportRateLimitType.values(); 574 private TransportRateLimitType[] TELEMETRY = TransportRateLimitType.values();
601 575
602 @Override 576 @Override
603 - public boolean checkLimits(SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback) { 577 + public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback) {
604 return checkLimits(sessionInfo, msg, callback, 0, DEFAULT); 578 return checkLimits(sessionInfo, msg, callback, 0, DEFAULT);
605 } 579 }
606 580
607 @Override 581 @Override
608 - public boolean checkLimits(SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback, int dataPoints, TransportRateLimitType... limits) { 582 + public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback, int dataPoints, TransportRateLimitType... limits) {
609 if (log.isTraceEnabled()) { 583 if (log.isTraceEnabled()) {
610 log.trace("[{}] Processing msg: {}", toSessionId(sessionInfo), msg); 584 log.trace("[{}] Processing msg: {}", toSessionId(sessionInfo), msg);
611 } 585 }
@@ -626,7 +600,7 @@ public class DefaultTransportService implements TransportService { @@ -626,7 +600,7 @@ public class DefaultTransportService implements TransportService {
626 } 600 }
627 } 601 }
628 602
629 - protected void processToTransportMsg(ToTransportMsg toSessionMsg) { 603 + protected void processToTransportMsg(TransportProtos.ToTransportMsg toSessionMsg) {
630 UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB()); 604 UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB());
631 SessionMetaData md = sessions.get(sessionId); 605 SessionMetaData md = sessions.get(sessionId);
632 if (md != null) { 606 if (md != null) {
@@ -650,12 +624,12 @@ public class DefaultTransportService implements TransportService { @@ -650,12 +624,12 @@ public class DefaultTransportService implements TransportService {
650 listener.onToServerRpcResponse(toSessionMsg.getToServerResponse()); 624 listener.onToServerRpcResponse(toSessionMsg.getToServerResponse());
651 } 625 }
652 }); 626 });
653 - if (md.getSessionType() == SessionType.SYNC) { 627 + if (md.getSessionType() == TransportProtos.SessionType.SYNC) {
654 deregisterSession(md.getSessionInfo()); 628 deregisterSession(md.getSessionInfo());
655 } 629 }
656 } else { 630 } else {
657 if (toSessionMsg.hasEntityUpdateMsg()) { 631 if (toSessionMsg.hasEntityUpdateMsg()) {
658 - EntityUpdateMsg msg = toSessionMsg.getEntityUpdateMsg(); 632 + TransportProtos.EntityUpdateMsg msg = toSessionMsg.getEntityUpdateMsg();
659 EntityType entityType = EntityType.valueOf(msg.getEntityType()); 633 EntityType entityType = EntityType.valueOf(msg.getEntityType());
660 if (EntityType.DEVICE_PROFILE.equals(entityType)) { 634 if (EntityType.DEVICE_PROFILE.equals(entityType)) {
661 DeviceProfile deviceProfile = deviceProfileCache.put(msg.getData()); 635 DeviceProfile deviceProfile = deviceProfileCache.put(msg.getData());
@@ -676,7 +650,7 @@ public class DefaultTransportService implements TransportService { @@ -676,7 +650,7 @@ public class DefaultTransportService implements TransportService {
676 } 650 }
677 } 651 }
678 } else if (toSessionMsg.hasEntityDeleteMsg()) { 652 } else if (toSessionMsg.hasEntityDeleteMsg()) {
679 - EntityDeleteMsg msg = toSessionMsg.getEntityDeleteMsg(); 653 + TransportProtos.EntityDeleteMsg msg = toSessionMsg.getEntityDeleteMsg();
680 EntityType entityType = EntityType.valueOf(msg.getEntityType()); 654 EntityType entityType = EntityType.valueOf(msg.getEntityType());
681 UUID entityUuid = new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB()); 655 UUID entityUuid = new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB());
682 if (EntityType.DEVICE_PROFILE.equals(entityType)) { 656 if (EntityType.DEVICE_PROFILE.equals(entityType)) {
@@ -707,29 +681,29 @@ public class DefaultTransportService implements TransportService { @@ -707,29 +681,29 @@ public class DefaultTransportService implements TransportService {
707 }); 681 });
708 } 682 }
709 683
710 - protected UUID toSessionId(SessionInfoProto sessionInfo) { 684 + protected UUID toSessionId(TransportProtos.SessionInfoProto sessionInfo) {
711 return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); 685 return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
712 } 686 }
713 687
714 - protected UUID getRoutingKey(SessionInfoProto sessionInfo) { 688 + protected UUID getRoutingKey(TransportProtos.SessionInfoProto sessionInfo) {
715 return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()); 689 return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB());
716 } 690 }
717 691
718 - protected TenantId getTenantId(SessionInfoProto sessionInfo) { 692 + protected TenantId getTenantId(TransportProtos.SessionInfoProto sessionInfo) {
719 return new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); 693 return new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
720 } 694 }
721 695
722 - protected DeviceId getDeviceId(SessionInfoProto sessionInfo) { 696 + protected DeviceId getDeviceId(TransportProtos.SessionInfoProto sessionInfo) {
723 return new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); 697 return new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
724 } 698 }
725 699
726 - public static SessionEventMsg getSessionEventMsg(SessionEvent event) {  
727 - return SessionEventMsg.newBuilder()  
728 - .setSessionType(SessionType.ASYNC) 700 + public static TransportProtos.SessionEventMsg getSessionEventMsg(TransportProtos.SessionEvent event) {
  701 + return TransportProtos.SessionEventMsg.newBuilder()
  702 + .setSessionType(TransportProtos.SessionType.ASYNC)
729 .setEvent(event).build(); 703 .setEvent(event).build();
730 } 704 }
731 705
732 - protected void sendToDeviceActor(SessionInfoProto sessionInfo, TransportToDeviceActorMsg toDeviceActorMsg, TransportServiceCallback<Void> callback) { 706 + protected void sendToDeviceActor(TransportProtos.SessionInfoProto sessionInfo, TransportToDeviceActorMsg toDeviceActorMsg, TransportServiceCallback<Void> callback) {
733 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, getTenantId(sessionInfo), getDeviceId(sessionInfo)); 707 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, getTenantId(sessionInfo), getDeviceId(sessionInfo));
734 if (log.isTraceEnabled()) { 708 if (log.isTraceEnabled()) {
735 log.trace("[{}][{}] Pushing to topic {} message {}", getTenantId(sessionInfo), getDeviceId(sessionInfo), tpi.getFullTopicName(), toDeviceActorMsg); 709 log.trace("[{}][{}] Pushing to topic {} message {}", getTenantId(sessionInfo), getDeviceId(sessionInfo), tpi.getFullTopicName(), toDeviceActorMsg);