Commit 8ceddb8ac76b300055686012e88ecba6bdf93058

Authored by xp.Huang
1 parent 12b15833

#Based on modifications of the ThingsBoard Community Edition.

fix: 同步thingsboard3.8.1设备状态的逻辑


(cherry picked from commit 333ca4c93424fba4cf1d44aa3c57fe8a50f3a2e6)
@@ -18,7 +18,9 @@ @@ -18,7 +18,9 @@
18 * Source: ThingsBoard Community Edition 18 * Source: ThingsBoard Community Edition
19 * Modifications made by: Chengdu Yunteng Wuzhou Technology Co., Ltd 19 * Modifications made by: Chengdu Yunteng Wuzhou Technology Co., Ltd
20 * Modification date: 2022-09-20 20 * Modification date: 2022-09-20
21 - * Description of changes:Add device upper and lower limit log records 21 + * Description of changes:
  22 + * 1、Add device upper and lower limit log records
  23 + * 2、merged https://github.com/thingsboard/thingsboard/pull/11536
22 */ 24 */
23 package org.thingsboard.server.service.state; 25 package org.thingsboard.server.service.state;
24 26
@@ -385,14 +387,16 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev @@ -385,14 +387,16 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
385 if (proto.getAdded()) { 387 if (proto.getAdded()) {
386 Futures.addCallback(fetchDeviceState(device), new FutureCallback<>() { 388 Futures.addCallback(fetchDeviceState(device), new FutureCallback<>() {
387 @Override 389 @Override
388 - public void onSuccess(@Nullable DeviceStateData state) { 390 + public void onSuccess(DeviceStateData state) {
389 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, device.getId()); 391 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, device.getId());
390 - if (addDeviceUsingState(tpi, state)) {  
391 - save(deviceId, ACTIVITY_STATE, false); 392 + Set<DeviceId> deviceIds = partitionedEntities.get(tpi);
  393 + boolean isMyPartition = deviceIds != null;
  394 + if (isMyPartition) {
  395 + deviceIds.add(state.getDeviceId());
  396 + initializeActivityState(deviceId, state);
392 callback.onSuccess(); 397 callback.onSuccess();
393 } else { 398 } else {
394 - log.debug("[{}][{}] Device belongs to external partition. Probably rebalancing is in progress. Topic: {}"  
395 - , tenantId, deviceId, tpi.getFullTopicName()); 399 + log.debug("[{}][{}] Device belongs to external partition. Probably rebalancing is in progress. Topic: {}", tenantId, deviceId, tpi.getFullTopicName());
396 callback.onFailure(new RuntimeException("Device belongs to external partition " + tpi.getFullTopicName() + "!")); 400 callback.onFailure(new RuntimeException("Device belongs to external partition " + tpi.getFullTopicName() + "!"));
397 } 401 }
398 } 402 }
@@ -423,6 +427,21 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev @@ -423,6 +427,21 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
423 } 427 }
424 } 428 }
425 429
  430 + private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
  431 + cleanupEntity(deviceId);
  432 + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
  433 + Set<DeviceId> deviceIdSet = partitionedEntities.get(tpi);
  434 + if (deviceIdSet != null) {
  435 + deviceIdSet.remove(deviceId);
  436 + }
  437 + }
  438 +
  439 + private void initializeActivityState(DeviceId deviceId, DeviceStateData fetchedState) {
  440 + DeviceStateData cachedState = deviceStates.putIfAbsent(fetchedState.getDeviceId(), fetchedState);
  441 + boolean activityState = Objects.requireNonNullElse(cachedState, fetchedState).getState().isActive();
  442 + save(deviceId, ACTIVITY_STATE, activityState);
  443 + }
  444 +
426 @Override 445 @Override
427 protected Map<TopicPartitionInfo, List<ListenableFuture<?>>> onAddedPartitions(Set<TopicPartitionInfo> addedPartitions) { 446 protected Map<TopicPartitionInfo, List<ListenableFuture<?>>> onAddedPartitions(Set<TopicPartitionInfo> addedPartitions) {
428 var result = new HashMap<TopicPartitionInfo, List<ListenableFuture<?>>>(); 447 var result = new HashMap<TopicPartitionInfo, List<ListenableFuture<?>>>();
@@ -459,10 +478,16 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev @@ -459,10 +478,16 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
459 } 478 }
460 if (devicePackFutureHolder.future == null || !devicePackFutureHolder.future.isCancelled()) { 479 if (devicePackFutureHolder.future == null || !devicePackFutureHolder.future.isCancelled()) {
461 for (var state : states) { 480 for (var state : states) {
462 - if (!addDeviceUsingState(entry.getKey(), state)) {  
463 - return; 481 + TopicPartitionInfo tpi = entry.getKey();
  482 + Set<DeviceId> deviceIds = partitionedEntities.get(tpi);
  483 + boolean isMyPartition = deviceIds != null;
  484 + if (isMyPartition) {
  485 + deviceIds.add(state.getDeviceId());
  486 + deviceStates.putIfAbsent(state.getDeviceId(), state);
  487 + checkAndUpdateState(state.getDeviceId(), state);
  488 + } else {
  489 + log.debug("[{}] Device belongs to external partition {}", state.getDeviceId(), tpi.getFullTopicName());
464 } 490 }
465 - checkAndUpdateState(state.getDeviceId(), state);  
466 } 491 }
467 log.info("[{}] Initialized {} out of {} device states", entry.getKey().getPartition().orElse(0), counter.addAndGet(states.size()), entry.getValue().size()); 492 log.info("[{}] Initialized {} out of {} device states", entry.getKey().getPartition().orElse(0), counter.addAndGet(states.size()), entry.getValue().size());
468 } 493 }
@@ -519,18 +544,6 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev @@ -519,18 +544,6 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
519 } 544 }
520 } 545 }
521 546
522 - private boolean addDeviceUsingState(TopicPartitionInfo tpi, DeviceStateData state) {  
523 - Set<DeviceId> deviceIds = partitionedEntities.get(tpi);  
524 - if (deviceIds != null) {  
525 - deviceIds.add(state.getDeviceId());  
526 - deviceStates.putIfAbsent(state.getDeviceId(), state);  
527 - return true;  
528 - } else {  
529 - log.debug("[{}] Device belongs to external partition {}", state.getDeviceId(), tpi.getFullTopicName());  
530 - return false;  
531 - }  
532 - }  
533 -  
534 void checkStates() { 547 void checkStates() {
535 try { 548 try {
536 final long ts = getCurrentTimeMillis(); 549 final long ts = getCurrentTimeMillis();
@@ -671,15 +684,6 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev @@ -671,15 +684,6 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
671 return cleanup; 684 return cleanup;
672 } 685 }
673 686
674 - private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {  
675 - cleanupEntity(deviceId);  
676 - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);  
677 - Set<DeviceId> deviceIdSet = partitionedEntities.get(tpi);  
678 - if (deviceIdSet != null) {  
679 - deviceIdSet.remove(deviceId);  
680 - }  
681 - }  
682 -  
683 @Override 687 @Override
684 protected void cleanupEntityOnPartitionRemoval(DeviceId deviceId) { 688 protected void cleanupEntityOnPartitionRemoval(DeviceId deviceId) {
685 cleanupEntity(deviceId); 689 cleanupEntity(deviceId);