Commit 356d4ff26cc6101b66df347e8516857dc42716f7

Authored by Andrii Shvaika
1 parent d3f519a2

Profile Node improvements in cluster mode

... ... @@ -544,7 +544,11 @@ public class ActorSystemContext {
544 544
545 545 public void scheduleMsgWithDelay(TbActorRef ctx, TbActorMsg msg, long delayInMs) {
546 546 log.debug("Scheduling msg {} with delay {} ms", msg, delayInMs);
547   - getScheduler().schedule(() -> ctx.tell(msg), delayInMs, TimeUnit.MILLISECONDS);
  547 + if (delayInMs > 0) {
  548 + getScheduler().schedule(() -> ctx.tell(msg), delayInMs, TimeUnit.MILLISECONDS);
  549 + } else {
  550 + ctx.tell(msg);
  551 + }
548 552 }
549 553
550 554 }
... ...
... ... @@ -35,6 +35,7 @@ import org.thingsboard.server.actors.TbActorRef;
35 35 import org.thingsboard.server.common.data.Customer;
36 36 import org.thingsboard.server.common.data.DataConstants;
37 37 import org.thingsboard.server.common.data.Device;
  38 +import org.thingsboard.server.common.data.DeviceProfile;
38 39 import org.thingsboard.server.common.data.alarm.Alarm;
39 40 import org.thingsboard.server.common.data.asset.Asset;
40 41 import org.thingsboard.server.common.data.id.EntityId;
... ... @@ -477,6 +478,16 @@ class DefaultTbContext implements TbContext {
477 478 mainCtx.getRuleNodeStateService().removeByRuleNodeId(getTenantId(), getSelfId());
478 479 }
479 480
  481 + @Override
  482 + public void addProfileListener(Consumer<DeviceProfile> listener) {
  483 + mainCtx.getDeviceProfileCache().addListener(getTenantId(), getSelfId(), listener);
  484 + }
  485 +
  486 + @Override
  487 + public void removeProfileListener() {
  488 + mainCtx.getDeviceProfileCache().removeListener(getTenantId(), getSelfId());
  489 + }
  490 +
480 491 private TbMsgMetaData getActionMetaData(RuleNodeId ruleNodeId) {
481 492 TbMsgMetaData metaData = new TbMsgMetaData();
482 493 metaData.putValue("ruleNodeId", ruleNodeId.toString());
... ...
... ... @@ -94,7 +94,6 @@ public class DeviceProfileController extends BaseController {
94 94
95 95 DeviceProfile savedDeviceProfile = checkNotNull(deviceProfileService.saveDeviceProfile(deviceProfile));
96 96
97   - deviceProfileCache.put(savedDeviceProfile);
98 97 tbClusterService.onDeviceProfileChange(savedDeviceProfile, null);
99 98 tbClusterService.onEntityStateChange(deviceProfile.getTenantId(), savedDeviceProfile.getId(),
100 99 created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
... ... @@ -120,7 +119,6 @@ public class DeviceProfileController extends BaseController {
120 119 DeviceProfileId deviceProfileId = new DeviceProfileId(toUUID(strDeviceProfileId));
121 120 DeviceProfile deviceProfile = checkDeviceProfileId(deviceProfileId, Operation.DELETE);
122 121 deviceProfileService.deleteDeviceProfile(getTenantId(), deviceProfileId);
123   - deviceProfileCache.evict(deviceProfileId);
124 122
125 123 tbClusterService.onDeviceProfileDelete(deviceProfile, null);
126 124 tbClusterService.onEntityStateChange(deviceProfile.getTenantId(), deviceProfile.getId(), ComponentLifecycleEvent.DELETED);
... ...
... ... @@ -21,14 +21,17 @@ import org.thingsboard.server.common.data.Device;
21 21 import org.thingsboard.server.common.data.DeviceProfile;
22 22 import org.thingsboard.server.common.data.id.DeviceId;
23 23 import org.thingsboard.server.common.data.id.DeviceProfileId;
  24 +import org.thingsboard.server.common.data.id.EntityId;
24 25 import org.thingsboard.server.common.data.id.TenantId;
25 26 import org.thingsboard.server.dao.device.DeviceProfileService;
26 27 import org.thingsboard.server.dao.device.DeviceService;
27 28
  29 +import java.util.Map;
28 30 import java.util.concurrent.ConcurrentHashMap;
29 31 import java.util.concurrent.ConcurrentMap;
30 32 import java.util.concurrent.locks.Lock;
31 33 import java.util.concurrent.locks.ReentrantLock;
  34 +import java.util.function.Consumer;
32 35
33 36 @Service
34 37 @Slf4j
... ... @@ -40,6 +43,7 @@ public class DefaultTbDeviceProfileCache implements TbDeviceProfileCache {
40 43
41 44 private final ConcurrentMap<DeviceProfileId, DeviceProfile> deviceProfilesMap = new ConcurrentHashMap<>();
42 45 private final ConcurrentMap<DeviceId, DeviceProfileId> devicesMap = new ConcurrentHashMap<>();
  46 + private final ConcurrentMap<TenantId, ConcurrentMap<EntityId, Consumer<DeviceProfile>>> listeners = new ConcurrentHashMap<>();
43 47
44 48 public DefaultTbDeviceProfileCache(DeviceProfileService deviceProfileService, DeviceService deviceService) {
45 49 this.deviceProfileService = deviceProfileService;
... ... @@ -50,19 +54,21 @@ public class DefaultTbDeviceProfileCache implements TbDeviceProfileCache {
50 54 public DeviceProfile get(TenantId tenantId, DeviceProfileId deviceProfileId) {
51 55 DeviceProfile profile = deviceProfilesMap.get(deviceProfileId);
52 56 if (profile == null) {
53   - profile = deviceProfilesMap.get(deviceProfileId);
54   - if (profile == null) {
55   - deviceProfileFetchLock.lock();
56   - try {
  57 + deviceProfileFetchLock.lock();
  58 + try {
  59 + profile = deviceProfilesMap.get(deviceProfileId);
  60 + if (profile == null) {
57 61 profile = deviceProfileService.findDeviceProfileById(tenantId, deviceProfileId);
58 62 if (profile != null) {
59 63 deviceProfilesMap.put(deviceProfileId, profile);
  64 + log.info("[{}] Fetch device profile into cache: {}", profile.getId(), profile);
60 65 }
61   - } finally {
62   - deviceProfileFetchLock.unlock();
63 66 }
  67 + } finally {
  68 + deviceProfileFetchLock.unlock();
64 69 }
65 70 }
  71 + log.trace("[{}] Found device profile in cache: {}", deviceProfileId, profile);
66 72 return profile;
67 73 }
68 74
... ... @@ -85,12 +91,19 @@ public class DefaultTbDeviceProfileCache implements TbDeviceProfileCache {
85 91 public void put(DeviceProfile profile) {
86 92 if (profile.getId() != null) {
87 93 deviceProfilesMap.put(profile.getId(), profile);
  94 + log.info("[{}] pushed device profile to cache: {}", profile.getId(), profile);
  95 + notifyListeners(profile);
88 96 }
89 97 }
90 98
91 99 @Override
92   - public void evict(DeviceProfileId profileId) {
93   - deviceProfilesMap.remove(profileId);
  100 + public void evict(TenantId tenantId, DeviceProfileId profileId) {
  101 + DeviceProfile oldProfile = deviceProfilesMap.remove(profileId);
  102 + log.info("[{}] evict device profile from cache: {}", profileId, oldProfile);
  103 + DeviceProfile newProfile = get(tenantId, profileId);
  104 + if (newProfile != null) {
  105 + notifyListeners(newProfile);
  106 + }
94 107 }
95 108
96 109 @Override
... ... @@ -98,4 +111,24 @@ public class DefaultTbDeviceProfileCache implements TbDeviceProfileCache {
98 111 devicesMap.remove(deviceId);
99 112 }
100 113
  114 + @Override
  115 + public void addListener(TenantId tenantId, EntityId listenerId, Consumer<DeviceProfile> listener) {
  116 + listeners.computeIfAbsent(tenantId, id -> new ConcurrentHashMap<>()).put(listenerId, listener);
  117 + }
  118 +
  119 + @Override
  120 + public void removeListener(TenantId tenantId, EntityId listenerId) {
  121 + ConcurrentMap<EntityId, Consumer<DeviceProfile>> tenantListeners = listeners.get(tenantId);
  122 + if (tenantListeners != null) {
  123 + tenantListeners.remove(listenerId);
  124 + }
  125 + }
  126 +
  127 + private void notifyListeners(DeviceProfile profile) {
  128 + ConcurrentMap<EntityId, Consumer<DeviceProfile>> tenantListeners = listeners.get(profile.getTenantId());
  129 + if (tenantListeners != null) {
  130 + tenantListeners.forEach((id, listener) -> listener.accept(profile));
  131 + }
  132 + }
  133 +
101 134 }
... ...
... ... @@ -55,17 +55,17 @@ public class DefaultTbTenantProfileCache implements TbTenantProfileCache {
55 55 public TenantProfile get(TenantProfileId tenantProfileId) {
56 56 TenantProfile profile = tenantProfilesMap.get(tenantProfileId);
57 57 if (profile == null) {
58   - profile = tenantProfilesMap.get(tenantProfileId);
59   - if (profile == null) {
60   - tenantProfileFetchLock.lock();
61   - try {
  58 + tenantProfileFetchLock.lock();
  59 + try {
  60 + profile = tenantProfilesMap.get(tenantProfileId);
  61 + if (profile == null) {
62 62 profile = tenantProfileService.findTenantProfileById(TenantId.SYS_TENANT_ID, tenantProfileId);
63 63 if (profile != null) {
64 64 tenantProfilesMap.put(tenantProfileId, profile);
65 65 }
66   - } finally {
67   - tenantProfileFetchLock.unlock();
68 66 }
  67 + } finally {
  68 + tenantProfileFetchLock.unlock();
69 69 }
70 70 }
71 71 return profile;
... ...
... ... @@ -19,12 +19,13 @@ import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
19 19 import org.thingsboard.server.common.data.DeviceProfile;
20 20 import org.thingsboard.server.common.data.id.DeviceId;
21 21 import org.thingsboard.server.common.data.id.DeviceProfileId;
  22 +import org.thingsboard.server.common.data.id.TenantId;
22 23
23 24 public interface TbDeviceProfileCache extends RuleEngineDeviceProfileCache {
24 25
25 26 void put(DeviceProfile profile);
26 27
27   - void evict(DeviceProfileId id);
  28 + void evict(TenantId tenantId, DeviceProfileId id);
28 29
29 30 void evict(DeviceId id);
30 31
... ...
... ... @@ -15,11 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.service.profile;
17 17
18   -import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
19   -import org.thingsboard.server.common.data.DeviceProfile;
20 18 import org.thingsboard.server.common.data.TenantProfile;
21   -import org.thingsboard.server.common.data.id.DeviceId;
22   -import org.thingsboard.server.common.data.id.DeviceProfileId;
23 19 import org.thingsboard.server.common.data.id.TenantId;
24 20 import org.thingsboard.server.common.data.id.TenantProfileId;
25 21
... ...
... ... @@ -221,7 +221,8 @@ public class DefaultTbClusterService implements TbClusterService {
221 221 byte[] msgBytes = encodingService.encode(msg);
222 222 TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> toRuleEngineProducer = producerProvider.getRuleEngineNotificationsMsgProducer();
223 223 Set<String> tbRuleEngineServices = new HashSet<>(partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE));
224   - if (msg.getEntityId().getEntityType().equals(EntityType.TENANT)) {
  224 + if (msg.getEntityId().getEntityType().equals(EntityType.TENANT)
  225 + || msg.getEntityId().getEntityType().equals(EntityType.DEVICE_PROFILE)) {
225 226 TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer();
226 227 Set<String> tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE);
227 228 for (String serviceId : tbCoreServices) {
... ...
... ... @@ -144,7 +144,7 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
144 144 if (actorMsg instanceof ComponentLifecycleMsg) {
145 145 ComponentLifecycleMsg componentLifecycleMsg = (ComponentLifecycleMsg) actorMsg;
146 146 if (EntityType.DEVICE_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
147   - deviceProfileCache.evict(new DeviceProfileId(componentLifecycleMsg.getEntityId().getId()));
  147 + deviceProfileCache.evict(componentLifecycleMsg.getTenantId(), new DeviceProfileId(componentLifecycleMsg.getEntityId().getId()));
148 148 } else if (EntityType.DEVICE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
149 149 deviceProfileCache.evict(new DeviceId(componentLifecycleMsg.getEntityId().getId()));
150 150 }
... ...
... ... @@ -18,8 +18,11 @@ package org.thingsboard.rule.engine.api;
18 18 import org.thingsboard.server.common.data.DeviceProfile;
19 19 import org.thingsboard.server.common.data.id.DeviceId;
20 20 import org.thingsboard.server.common.data.id.DeviceProfileId;
  21 +import org.thingsboard.server.common.data.id.EntityId;
21 22 import org.thingsboard.server.common.data.id.TenantId;
22 23
  24 +import java.util.function.Consumer;
  25 +
23 26 /**
24 27 * Created by ashvayka on 02.04.18.
25 28 */
... ... @@ -29,4 +32,8 @@ public interface RuleEngineDeviceProfileCache {
29 32
30 33 DeviceProfile get(TenantId tenantId, DeviceId deviceId);
31 34
  35 + void addListener(TenantId tenantId, EntityId listenerId, Consumer<DeviceProfile> listener);
  36 +
  37 + void removeListener(TenantId tenantId, EntityId listenerId);
  38 +
32 39 }
... ...
... ... @@ -20,6 +20,7 @@ import org.springframework.data.redis.core.RedisTemplate;
20 20 import org.thingsboard.common.util.ListeningExecutor;
21 21 import org.thingsboard.server.common.data.Customer;
22 22 import org.thingsboard.server.common.data.Device;
  23 +import org.thingsboard.server.common.data.DeviceProfile;
23 24 import org.thingsboard.server.common.data.alarm.Alarm;
24 25 import org.thingsboard.server.common.data.asset.Asset;
25 26 import org.thingsboard.server.common.data.id.EntityId;
... ... @@ -223,4 +224,8 @@ public interface TbContext {
223 224 RuleNodeState saveRuleNodeState(RuleNodeState state);
224 225
225 226 void clearRuleNodeStates();
  227 +
  228 + void addProfileListener(Consumer<DeviceProfile> listener);
  229 +
  230 + void removeProfileListener();
226 231 }
... ...
... ... @@ -39,6 +39,7 @@ import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
39 39 import org.thingsboard.server.dao.util.mapping.JacksonUtil;
40 40
41 41 import java.util.Map;
  42 +import java.util.UUID;
42 43 import java.util.concurrent.ConcurrentHashMap;
43 44 import java.util.concurrent.ExecutionException;
44 45 import java.util.concurrent.TimeUnit;
... ... @@ -57,16 +58,20 @@ import java.util.concurrent.TimeUnit;
57 58 )
58 59 public class TbDeviceProfileNode implements TbNode {
59 60 private static final String PERIODIC_MSG_TYPE = "TbDeviceProfilePeriodicMsg";
  61 + private static final String PROFILE_UPDATE_MSG_TYPE = "TbDeviceProfileUpdateMsg";
60 62
61 63 private TbDeviceProfileNodeConfiguration config;
62 64 private RuleEngineDeviceProfileCache cache;
  65 + private TbContext ctx;
63 66 private final Map<DeviceId, DeviceState> deviceStates = new ConcurrentHashMap<>();
64 67
65 68 @Override
66 69 public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
67 70 this.config = TbNodeUtils.convert(configuration, TbDeviceProfileNodeConfiguration.class);
68 71 this.cache = ctx.getDeviceProfileCache();
  72 + this.ctx = ctx;
69 73 scheduleAlarmHarvesting(ctx);
  74 + ctx.addProfileListener(this::onProfileUpdate);
70 75 if (config.isFetchAlarmRulesStateOnStart()) {
71 76 log.info("[{}] Fetching alarm rule state", ctx.getSelfId());
72 77 int fetchCount = 0;
... ... @@ -95,15 +100,14 @@ public class TbDeviceProfileNode implements TbNode {
95 100 }
96 101 }
97 102
98   - /**
99   - * TODO: Dynamic values evaluation;
100   - */
101 103 @Override
102 104 public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
103 105 EntityType originatorType = msg.getOriginator().getEntityType();
104 106 if (msg.getType().equals(PERIODIC_MSG_TYPE)) {
105 107 scheduleAlarmHarvesting(ctx);
106 108 harvestAlarms(ctx, System.currentTimeMillis());
  109 + } else if (msg.getType().equals(PROFILE_UPDATE_MSG_TYPE)) {
  110 + updateProfile(ctx, new DeviceProfileId(UUID.fromString(msg.getData())));
107 111 } else {
108 112 if (EntityType.DEVICE.equals(originatorType)) {
109 113 DeviceId deviceId = new DeviceId(msg.getOriginator().getId());
... ... @@ -119,36 +123,12 @@ public class TbDeviceProfileNode implements TbNode {
119 123 ctx.tellFailure(msg, new IllegalStateException("Device profile for device [" + deviceId + "] not found!"));
120 124 }
121 125 }
122   - } else if (EntityType.DEVICE_PROFILE.equals(originatorType)) {
123   - log.info("[{}] Received device profile update notification: {}", ctx.getSelfId(), msg.getData());
124   - if (msg.getType().equals("ENTITY_UPDATED")) {
125   - DeviceProfile deviceProfile = JacksonUtil.fromString(msg.getData(), DeviceProfile.class);
126   - if (deviceProfile != null) {
127   - for (DeviceState state : deviceStates.values()) {
128   - if (deviceProfile.getId().equals(state.getProfileId())) {
129   - state.updateProfile(ctx, deviceProfile);
130   - }
131   - }
132   - }
133   - }
134   - ctx.tellSuccess(msg);
135 126 } else {
136 127 ctx.tellSuccess(msg);
137 128 }
138 129 }
139 130 }
140 131
141   - public void invalidateDeviceProfileCache(DeviceId deviceId, String deviceJson) {
142   - DeviceState deviceState = deviceStates.get(deviceId);
143   - if (deviceState != null) {
144   - DeviceProfileId currentProfileId = deviceState.getProfileId();
145   - Device device = JacksonUtil.fromString(deviceJson, Device.class);
146   - if (!currentProfileId.equals(device.getDeviceProfileId())) {
147   - deviceStates.remove(deviceId);
148   - }
149   - }
150   - }
151   -
152 132 @Override
153 133 public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) {
154 134 // Cleanup the cache for all entities that are no longer assigned to current server partitions
... ... @@ -157,6 +137,7 @@ public class TbDeviceProfileNode implements TbNode {
157 137
158 138 @Override
159 139 public void destroy() {
  140 + ctx.removeProfileListener();
160 141 deviceStates.clear();
161 142 }
162 143
... ... @@ -183,4 +164,33 @@ public class TbDeviceProfileNode implements TbNode {
183 164 }
184 165 }
185 166
  167 + protected void updateProfile(TbContext ctx, DeviceProfileId deviceProfileId) throws ExecutionException, InterruptedException {
  168 + DeviceProfile deviceProfile = cache.get(ctx.getTenantId(), deviceProfileId);
  169 + if (deviceProfile != null) {
  170 + log.info("[{}] Received device profile update notification: {}", ctx.getSelfId(), deviceProfile);
  171 + for (DeviceState state : deviceStates.values()) {
  172 + if (deviceProfile.getId().equals(state.getProfileId())) {
  173 + state.updateProfile(ctx, deviceProfile);
  174 + }
  175 + }
  176 + } else {
  177 + log.info("[{}] Received stale profile update notification: [{}]", ctx.getSelfId(), deviceProfileId);
  178 + }
  179 + }
  180 +
  181 + protected void onProfileUpdate(DeviceProfile profile) {
  182 + ctx.tellSelf(TbMsg.newMsg(PROFILE_UPDATE_MSG_TYPE, ctx.getTenantId(), TbMsgMetaData.EMPTY, profile.getId().getId().toString()), 0L);
  183 + }
  184 +
  185 + protected void invalidateDeviceProfileCache(DeviceId deviceId, String deviceJson) {
  186 + DeviceState deviceState = deviceStates.get(deviceId);
  187 + if (deviceState != null) {
  188 + DeviceProfileId currentProfileId = deviceState.getProfileId();
  189 + Device device = JacksonUtil.fromString(deviceJson, Device.class);
  190 + if (!currentProfileId.equals(device.getDeviceProfileId())) {
  191 + deviceStates.remove(deviceId);
  192 + }
  193 + }
  194 + }
  195 +
186 196 }
... ...