Commit 010ee3c5b4be1b6017478f743fdb0e1ae0f63765

Authored by xp.Huang
2 parents 12b15833 8ceddb8a

Merge branch 'cherry-pick-333ca4c9' into 'master'

#Based on modifications of the ThingsBoard Community Edition.

See merge request yunteng/thingskit!474
... ... @@ -18,7 +18,9 @@
18 18 * Source: ThingsBoard Community Edition
19 19 * Modifications made by: Chengdu Yunteng Wuzhou Technology Co., Ltd
20 20 * Modification date: 2022-09-20
21   - * Description of changes:Add device upper and lower limit log records
  21 + * Description of changes:
  22 + * 1、Add device upper and lower limit log records
  23 + * 2、merged https://github.com/thingsboard/thingsboard/pull/11536
22 24 */
23 25 package org.thingsboard.server.service.state;
24 26
... ... @@ -385,14 +387,16 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
385 387 if (proto.getAdded()) {
386 388 Futures.addCallback(fetchDeviceState(device), new FutureCallback<>() {
387 389 @Override
388   - public void onSuccess(@Nullable DeviceStateData state) {
  390 + public void onSuccess(DeviceStateData state) {
389 391 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, device.getId());
390   - if (addDeviceUsingState(tpi, state)) {
391   - save(deviceId, ACTIVITY_STATE, false);
  392 + Set<DeviceId> deviceIds = partitionedEntities.get(tpi);
  393 + boolean isMyPartition = deviceIds != null;
  394 + if (isMyPartition) {
  395 + deviceIds.add(state.getDeviceId());
  396 + initializeActivityState(deviceId, state);
392 397 callback.onSuccess();
393 398 } else {
394   - log.debug("[{}][{}] Device belongs to external partition. Probably rebalancing is in progress. Topic: {}"
395   - , tenantId, deviceId, tpi.getFullTopicName());
  399 + log.debug("[{}][{}] Device belongs to external partition. Probably rebalancing is in progress. Topic: {}", tenantId, deviceId, tpi.getFullTopicName());
396 400 callback.onFailure(new RuntimeException("Device belongs to external partition " + tpi.getFullTopicName() + "!"));
397 401 }
398 402 }
... ... @@ -423,6 +427,21 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
423 427 }
424 428 }
425 429
  430 + private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
  431 + cleanupEntity(deviceId);
  432 + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
  433 + Set<DeviceId> deviceIdSet = partitionedEntities.get(tpi);
  434 + if (deviceIdSet != null) {
  435 + deviceIdSet.remove(deviceId);
  436 + }
  437 + }
  438 +
  439 + private void initializeActivityState(DeviceId deviceId, DeviceStateData fetchedState) {
  440 + DeviceStateData cachedState = deviceStates.putIfAbsent(fetchedState.getDeviceId(), fetchedState);
  441 + boolean activityState = Objects.requireNonNullElse(cachedState, fetchedState).getState().isActive();
  442 + save(deviceId, ACTIVITY_STATE, activityState);
  443 + }
  444 +
426 445 @Override
427 446 protected Map<TopicPartitionInfo, List<ListenableFuture<?>>> onAddedPartitions(Set<TopicPartitionInfo> addedPartitions) {
428 447 var result = new HashMap<TopicPartitionInfo, List<ListenableFuture<?>>>();
... ... @@ -459,10 +478,16 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
459 478 }
460 479 if (devicePackFutureHolder.future == null || !devicePackFutureHolder.future.isCancelled()) {
461 480 for (var state : states) {
462   - if (!addDeviceUsingState(entry.getKey(), state)) {
463   - return;
  481 + TopicPartitionInfo tpi = entry.getKey();
  482 + Set<DeviceId> deviceIds = partitionedEntities.get(tpi);
  483 + boolean isMyPartition = deviceIds != null;
  484 + if (isMyPartition) {
  485 + deviceIds.add(state.getDeviceId());
  486 + deviceStates.putIfAbsent(state.getDeviceId(), state);
  487 + checkAndUpdateState(state.getDeviceId(), state);
  488 + } else {
  489 + log.debug("[{}] Device belongs to external partition {}", state.getDeviceId(), tpi.getFullTopicName());
464 490 }
465   - checkAndUpdateState(state.getDeviceId(), state);
466 491 }
467 492 log.info("[{}] Initialized {} out of {} device states", entry.getKey().getPartition().orElse(0), counter.addAndGet(states.size()), entry.getValue().size());
468 493 }
... ... @@ -519,18 +544,6 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
519 544 }
520 545 }
521 546
522   - private boolean addDeviceUsingState(TopicPartitionInfo tpi, DeviceStateData state) {
523   - Set<DeviceId> deviceIds = partitionedEntities.get(tpi);
524   - if (deviceIds != null) {
525   - deviceIds.add(state.getDeviceId());
526   - deviceStates.putIfAbsent(state.getDeviceId(), state);
527   - return true;
528   - } else {
529   - log.debug("[{}] Device belongs to external partition {}", state.getDeviceId(), tpi.getFullTopicName());
530   - return false;
531   - }
532   - }
533   -
534 547 void checkStates() {
535 548 try {
536 549 final long ts = getCurrentTimeMillis();
... ... @@ -671,15 +684,6 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
671 684 return cleanup;
672 685 }
673 686
674   - private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
675   - cleanupEntity(deviceId);
676   - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
677   - Set<DeviceId> deviceIdSet = partitionedEntities.get(tpi);
678   - if (deviceIdSet != null) {
679   - deviceIdSet.remove(deviceId);
680   - }
681   - }
682   -
683 687 @Override
684 688 protected void cleanupEntityOnPartitionRemoval(DeviceId deviceId) {
685 689 cleanupEntity(deviceId);
... ...