Commit 856cc3722a9dac2f5551bd5ff9f9df8c6ccec1a6
1 parent
969a686c
Forward device messages to rule chain from device profile
Showing
14 changed files
with
261 additions
and
25 deletions
... | ... | @@ -105,6 +105,7 @@ import org.thingsboard.server.queue.discovery.PartitionService; |
105 | 105 | import org.thingsboard.server.queue.provider.TbQueueProducerProvider; |
106 | 106 | import org.thingsboard.server.queue.util.TbCoreComponent; |
107 | 107 | import org.thingsboard.server.service.component.ComponentDiscoveryService; |
108 | +import org.thingsboard.server.service.profile.TbDeviceProfileCache; | |
108 | 109 | import org.thingsboard.server.service.queue.TbClusterService; |
109 | 110 | import org.thingsboard.server.service.security.model.SecurityUser; |
110 | 111 | import org.thingsboard.server.service.security.permission.AccessControlService; |
... | ... | @@ -210,6 +211,9 @@ public abstract class BaseController { |
210 | 211 | @Autowired |
211 | 212 | protected TbQueueProducerProvider producerProvider; |
212 | 213 | |
214 | + @Autowired | |
215 | + protected TbDeviceProfileCache deviceProfileCache; | |
216 | + | |
213 | 217 | @Value("${server.log_controller_error_stack_trace}") |
214 | 218 | @Getter |
215 | 219 | private boolean logControllerErrorStackTrace; | ... | ... |
... | ... | @@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.exception.ThingsboardException; |
34 | 34 | import org.thingsboard.server.common.data.id.DeviceProfileId; |
35 | 35 | import org.thingsboard.server.common.data.page.PageData; |
36 | 36 | import org.thingsboard.server.common.data.page.PageLink; |
37 | +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; | |
37 | 38 | import org.thingsboard.server.queue.util.TbCoreComponent; |
38 | 39 | import org.thingsboard.server.service.security.permission.Operation; |
39 | 40 | import org.thingsboard.server.service.security.permission.Resource; |
... | ... | @@ -86,13 +87,17 @@ public class DeviceProfileController extends BaseController { |
86 | 87 | @ResponseBody |
87 | 88 | public DeviceProfile saveDeviceProfile(@RequestBody DeviceProfile deviceProfile) throws ThingsboardException { |
88 | 89 | try { |
90 | + boolean created = deviceProfile.getId() == null; | |
89 | 91 | deviceProfile.setTenantId(getTenantId()); |
90 | 92 | |
91 | 93 | checkEntity(deviceProfile.getId(), deviceProfile, Resource.DEVICE_PROFILE); |
92 | 94 | |
93 | 95 | DeviceProfile savedDeviceProfile = checkNotNull(deviceProfileService.saveDeviceProfile(deviceProfile)); |
94 | 96 | |
97 | + deviceProfileCache.put(savedDeviceProfile); | |
95 | 98 | tbClusterService.onDeviceProfileChange(savedDeviceProfile, null); |
99 | + tbClusterService.onEntityStateChange(deviceProfile.getTenantId(), savedDeviceProfile.getId(), | |
100 | + created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); | |
96 | 101 | |
97 | 102 | logEntityAction(savedDeviceProfile.getId(), savedDeviceProfile, |
98 | 103 | null, |
... | ... | @@ -115,6 +120,10 @@ public class DeviceProfileController extends BaseController { |
115 | 120 | DeviceProfileId deviceProfileId = new DeviceProfileId(toUUID(strDeviceProfileId)); |
116 | 121 | DeviceProfile deviceProfile = checkDeviceProfileId(deviceProfileId, Operation.DELETE); |
117 | 122 | deviceProfileService.deleteDeviceProfile(getTenantId(), deviceProfileId); |
123 | + deviceProfileCache.evict(deviceProfileId); | |
124 | + | |
125 | + tbClusterService.onDeviceProfileDelete(deviceProfile, null); | |
126 | + tbClusterService.onEntityStateChange(deviceProfile.getTenantId(), deviceProfile.getId(), ComponentLifecycleEvent.DELETED); | |
118 | 127 | |
119 | 128 | logEntityAction(deviceProfileId, deviceProfile, |
120 | 129 | null, |
... | ... | @@ -180,10 +189,10 @@ public class DeviceProfileController extends BaseController { |
180 | 189 | @RequestMapping(value = "/deviceProfileInfos", params = {"pageSize", "page"}, method = RequestMethod.GET) |
181 | 190 | @ResponseBody |
182 | 191 | public PageData<DeviceProfileInfo> getDeviceProfileInfos(@RequestParam int pageSize, |
183 | - @RequestParam int page, | |
184 | - @RequestParam(required = false) String textSearch, | |
185 | - @RequestParam(required = false) String sortProperty, | |
186 | - @RequestParam(required = false) String sortOrder) throws ThingsboardException { | |
192 | + @RequestParam int page, | |
193 | + @RequestParam(required = false) String textSearch, | |
194 | + @RequestParam(required = false) String sortProperty, | |
195 | + @RequestParam(required = false) String sortOrder) throws ThingsboardException { | |
187 | 196 | try { |
188 | 197 | PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder); |
189 | 198 | return checkNotNull(deviceProfileService.findDeviceProfileInfos(getTenantId(), pageLink)); | ... | ... |
application/src/main/java/org/thingsboard/server/service/profile/DefaultTbDeviceProfileCache.java
0 → 100644
1 | +/** | |
2 | + * Copyright © 2016-2020 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.service.profile; | |
17 | + | |
18 | +import lombok.extern.slf4j.Slf4j; | |
19 | +import org.springframework.stereotype.Service; | |
20 | +import org.thingsboard.server.common.data.Device; | |
21 | +import org.thingsboard.server.common.data.DeviceProfile; | |
22 | +import org.thingsboard.server.common.data.id.DeviceId; | |
23 | +import org.thingsboard.server.common.data.id.DeviceProfileId; | |
24 | +import org.thingsboard.server.common.data.id.TenantId; | |
25 | +import org.thingsboard.server.dao.device.DeviceProfileService; | |
26 | +import org.thingsboard.server.dao.device.DeviceService; | |
27 | + | |
28 | +import java.util.concurrent.ConcurrentHashMap; | |
29 | +import java.util.concurrent.ConcurrentMap; | |
30 | +import java.util.concurrent.locks.Lock; | |
31 | +import java.util.concurrent.locks.ReentrantLock; | |
32 | + | |
33 | +@Service | |
34 | +@Slf4j | |
35 | +public class DefaultTbDeviceProfileCache implements TbDeviceProfileCache { | |
36 | + | |
37 | + private final Lock deviceProfileFetchLock = new ReentrantLock(); | |
38 | + private final DeviceProfileService deviceProfileService; | |
39 | + private final DeviceService deviceService; | |
40 | + | |
41 | + private final ConcurrentMap<DeviceProfileId, DeviceProfile> deviceProfilesMap = new ConcurrentHashMap<>(); | |
42 | + private final ConcurrentMap<DeviceId, DeviceProfileId> devicesMap = new ConcurrentHashMap<>(); | |
43 | + | |
44 | + public DefaultTbDeviceProfileCache(DeviceProfileService deviceProfileService, DeviceService deviceService) { | |
45 | + this.deviceProfileService = deviceProfileService; | |
46 | + this.deviceService = deviceService; | |
47 | + } | |
48 | + | |
49 | + @Override | |
50 | + public DeviceProfile get(TenantId tenantId, DeviceProfileId deviceProfileId) { | |
51 | + DeviceProfile profile = deviceProfilesMap.get(deviceProfileId); | |
52 | + if (profile == null) { | |
53 | + deviceProfileFetchLock.lock(); | |
54 | + profile = deviceProfilesMap.get(deviceProfileId); | |
55 | + if (profile == null) { | |
56 | + try { | |
57 | + profile = deviceProfileService.findDeviceProfileById(tenantId, deviceProfileId); | |
58 | + if (profile != null) { | |
59 | + deviceProfilesMap.put(deviceProfileId, profile); | |
60 | + } | |
61 | + } finally { | |
62 | + deviceProfileFetchLock.unlock(); | |
63 | + } | |
64 | + } | |
65 | + } | |
66 | + return profile; | |
67 | + } | |
68 | + | |
69 | + @Override | |
70 | + public DeviceProfile get(TenantId tenantId, DeviceId deviceId) { | |
71 | + DeviceProfileId profileId = devicesMap.get(deviceId); | |
72 | + if (profileId == null) { | |
73 | + Device device = deviceService.findDeviceById(tenantId, deviceId); | |
74 | + if (device != null) { | |
75 | + profileId = device.getDeviceProfileId(); | |
76 | + devicesMap.put(deviceId, profileId); | |
77 | + } | |
78 | + } | |
79 | + return get(tenantId, profileId); | |
80 | + } | |
81 | + | |
82 | + @Override | |
83 | + public void put(DeviceProfile profile) { | |
84 | + if (profile.getId() != null) { | |
85 | + deviceProfilesMap.put(profile.getId(), profile); | |
86 | + } | |
87 | + } | |
88 | + | |
89 | + @Override | |
90 | + public void evict(DeviceProfileId profileId) { | |
91 | + deviceProfilesMap.remove(profileId); | |
92 | + } | |
93 | + | |
94 | + @Override | |
95 | + public void evict(DeviceId deviceId) { | |
96 | + devicesMap.remove(deviceId); | |
97 | + } | |
98 | + | |
99 | +} | ... | ... |
application/src/main/java/org/thingsboard/server/service/profile/TbDeviceProfileCache.java
0 → 100644
1 | +/** | |
2 | + * Copyright © 2016-2020 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.service.profile; | |
17 | + | |
18 | +import org.thingsboard.server.common.data.DeviceProfile; | |
19 | +import org.thingsboard.server.common.data.id.DeviceId; | |
20 | +import org.thingsboard.server.common.data.id.DeviceProfileId; | |
21 | +import org.thingsboard.server.common.data.id.TenantId; | |
22 | + | |
23 | +public interface TbDeviceProfileCache { | |
24 | + | |
25 | + DeviceProfile get(TenantId tenantId, DeviceProfileId deviceProfileId); | |
26 | + | |
27 | + DeviceProfile get(TenantId tenantId, DeviceId deviceId); | |
28 | + | |
29 | + void put(DeviceProfile profile); | |
30 | + | |
31 | + void evict(DeviceProfileId id); | |
32 | + | |
33 | + void evict(DeviceId id); | |
34 | + | |
35 | +} | ... | ... |
... | ... | @@ -23,13 +23,17 @@ import org.springframework.stereotype.Service; |
23 | 23 | import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg; |
24 | 24 | import org.thingsboard.server.common.data.DeviceProfile; |
25 | 25 | import org.thingsboard.server.common.data.EntityType; |
26 | +import org.thingsboard.server.common.data.id.DeviceId; | |
27 | +import org.thingsboard.server.common.data.id.DeviceProfileId; | |
26 | 28 | import org.thingsboard.server.common.data.id.EntityId; |
29 | +import org.thingsboard.server.common.data.id.RuleChainId; | |
27 | 30 | import org.thingsboard.server.common.data.id.TenantId; |
28 | 31 | import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; |
29 | 32 | import org.thingsboard.server.common.msg.TbMsg; |
30 | 33 | import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; |
31 | 34 | import org.thingsboard.server.common.msg.queue.ServiceType; |
32 | 35 | import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
36 | +import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; | |
33 | 37 | import org.thingsboard.server.gen.transport.TransportProtos; |
34 | 38 | import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto; |
35 | 39 | import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; |
... | ... | @@ -42,7 +46,7 @@ import org.thingsboard.server.queue.TbQueueProducer; |
42 | 46 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
43 | 47 | import org.thingsboard.server.queue.discovery.PartitionService; |
44 | 48 | import org.thingsboard.server.queue.provider.TbQueueProducerProvider; |
45 | -import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; | |
49 | +import org.thingsboard.server.service.profile.TbDeviceProfileCache; | |
46 | 50 | import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; |
47 | 51 | |
48 | 52 | import java.util.HashSet; |
... | ... | @@ -66,11 +70,13 @@ public class DefaultTbClusterService implements TbClusterService { |
66 | 70 | private final TbQueueProducerProvider producerProvider; |
67 | 71 | private final PartitionService partitionService; |
68 | 72 | private final DataDecodingEncodingService encodingService; |
73 | + private final TbDeviceProfileCache deviceProfileCache; | |
69 | 74 | |
70 | - public DefaultTbClusterService(TbQueueProducerProvider producerProvider, PartitionService partitionService, DataDecodingEncodingService encodingService) { | |
75 | + public DefaultTbClusterService(TbQueueProducerProvider producerProvider, PartitionService partitionService, DataDecodingEncodingService encodingService, TbDeviceProfileCache deviceProfileCache) { | |
71 | 76 | this.producerProvider = producerProvider; |
72 | 77 | this.partitionService = partitionService; |
73 | 78 | this.encodingService = encodingService; |
79 | + this.deviceProfileCache = deviceProfileCache; | |
74 | 80 | } |
75 | 81 | |
76 | 82 | @Override |
... | ... | @@ -126,6 +132,12 @@ public class DefaultTbClusterService implements TbClusterService { |
126 | 132 | log.warn("[{}][{}] Received invalid message: {}", tenantId, entityId, tbMsg); |
127 | 133 | return; |
128 | 134 | } |
135 | + } else { | |
136 | + if (entityId.getEntityType().equals(EntityType.DEVICE)) { | |
137 | + tbMsg = transformMsg(tbMsg, deviceProfileCache.get(tenantId, new DeviceId(entityId.getId()))); | |
138 | + } else if (entityId.getEntityType().equals(EntityType.DEVICE_PROFILE)) { | |
139 | + tbMsg = transformMsg(tbMsg, deviceProfileCache.get(tenantId, new DeviceProfileId(entityId.getId()))); | |
140 | + } | |
129 | 141 | } |
130 | 142 | TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId); |
131 | 143 | log.trace("PUSHING msg: {} to:{}", tbMsg, tpi); |
... | ... | @@ -137,6 +149,16 @@ public class DefaultTbClusterService implements TbClusterService { |
137 | 149 | toRuleEngineMsgs.incrementAndGet(); |
138 | 150 | } |
139 | 151 | |
152 | + private TbMsg transformMsg(TbMsg tbMsg, DeviceProfile deviceProfile) { | |
153 | + if (deviceProfile != null) { | |
154 | + RuleChainId targetRuleChainId = deviceProfile.getDefaultRuleChainId(); | |
155 | + if (targetRuleChainId != null && !targetRuleChainId.equals(tbMsg.getRuleChainId())) { | |
156 | + tbMsg = TbMsg.transformMsg(tbMsg, targetRuleChainId); | |
157 | + } | |
158 | + } | |
159 | + return tbMsg; | |
160 | + } | |
161 | + | |
140 | 162 | @Override |
141 | 163 | public void pushNotificationToRuleEngine(String serviceId, FromDeviceRpcResponse response, TbQueueCallback callback) { |
142 | 164 | TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceId); |
... | ... | @@ -167,11 +189,27 @@ public class DefaultTbClusterService implements TbClusterService { |
167 | 189 | |
168 | 190 | @Override |
169 | 191 | public void onDeviceProfileChange(DeviceProfile deviceProfile, TbQueueCallback callback) { |
170 | - log.trace("[{}][{}] Processing device profile [{}] event", deviceProfile.getTenantId(), deviceProfile.getId(), deviceProfile.getName()); | |
192 | + log.trace("[{}][{}] Processing device profile [{}] change event", deviceProfile.getTenantId(), deviceProfile.getId(), deviceProfile.getName()); | |
193 | + TransportProtos.DeviceProfileUpdateMsg profileUpdateMsg = TransportProtos.DeviceProfileUpdateMsg.newBuilder() | |
194 | + .setData(ByteString.copyFrom(encodingService.encode(deviceProfile))).build(); | |
195 | + ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setDeviceProfileUpdateMsg(profileUpdateMsg).build(); | |
196 | + broadcast(transportMsg); | |
197 | + } | |
198 | + | |
199 | + @Override | |
200 | + public void onDeviceProfileDelete(DeviceProfile deviceProfile, TbQueueCallback callback) { | |
201 | + log.trace("[{}][{}] Processing device profile [{}] delete event", deviceProfile.getTenantId(), deviceProfile.getId(), deviceProfile.getName()); | |
202 | + TransportProtos.DeviceProfileDeleteMsg profileDeleteMsg = TransportProtos.DeviceProfileDeleteMsg.newBuilder() | |
203 | + .setProfileIdMSB(deviceProfile.getId().getId().getMostSignificantBits()) | |
204 | + .setProfileIdLSB(deviceProfile.getId().getId().getLeastSignificantBits()) | |
205 | + .build(); | |
206 | + ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setDeviceProfileDeleteMsg(profileDeleteMsg).build(); | |
207 | + broadcast(transportMsg); | |
208 | + } | |
209 | + | |
210 | + private void broadcast(ToTransportMsg transportMsg) { | |
171 | 211 | TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> toTransportNfProducer = producerProvider.getTransportNotificationsMsgProducer(); |
172 | 212 | Set<String> tbTransportServices = partitionService.getAllServiceIds(ServiceType.TB_TRANSPORT); |
173 | - TransportProtos.DeviceProfileUpdateMsg profileUpdateMsg = TransportProtos.DeviceProfileUpdateMsg.newBuilder().setData(ByteString.copyFrom(encodingService.encode(deviceProfile))).build(); | |
174 | - ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setDeviceProfileUpdateMsg(profileUpdateMsg).build(); | |
175 | 213 | for (String transportServiceId : tbTransportServices) { |
176 | 214 | TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transportServiceId); |
177 | 215 | toTransportNfProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), transportMsg), null); | ... | ... |
... | ... | @@ -21,10 +21,13 @@ import org.springframework.scheduling.annotation.Scheduled; |
21 | 21 | import org.springframework.stereotype.Service; |
22 | 22 | import org.thingsboard.rule.engine.api.RpcError; |
23 | 23 | import org.thingsboard.server.actors.ActorSystemContext; |
24 | +import org.thingsboard.server.common.data.EntityType; | |
24 | 25 | import org.thingsboard.server.common.data.alarm.Alarm; |
26 | +import org.thingsboard.server.common.data.id.DeviceProfileId; | |
25 | 27 | import org.thingsboard.server.common.data.id.TenantId; |
26 | 28 | import org.thingsboard.server.common.msg.MsgType; |
27 | 29 | import org.thingsboard.server.common.msg.TbActorMsg; |
30 | +import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; | |
28 | 31 | import org.thingsboard.server.common.msg.queue.ServiceType; |
29 | 32 | import org.thingsboard.server.common.msg.queue.TbCallback; |
30 | 33 | import org.thingsboard.server.dao.util.mapping.JacksonUtil; |
... | ... | @@ -48,6 +51,7 @@ import org.thingsboard.server.queue.provider.TbCoreQueueFactory; |
48 | 51 | import org.thingsboard.server.common.stats.StatsFactory; |
49 | 52 | import org.thingsboard.server.queue.util.TbCoreComponent; |
50 | 53 | import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; |
54 | +import org.thingsboard.server.service.profile.TbDeviceProfileCache; | |
51 | 55 | import org.thingsboard.server.service.queue.processing.AbstractConsumerService; |
52 | 56 | import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; |
53 | 57 | import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; |
... | ... | @@ -92,8 +96,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore |
92 | 96 | public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory, ActorSystemContext actorContext, |
93 | 97 | DeviceStateService stateService, TbLocalSubscriptionService localSubscriptionService, |
94 | 98 | SubscriptionManagerService subscriptionManagerService, DataDecodingEncodingService encodingService, |
95 | - TbCoreDeviceRpcService tbCoreDeviceRpcService, StatsFactory statsFactory) { | |
96 | - super(actorContext, encodingService, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer()); | |
99 | + TbCoreDeviceRpcService tbCoreDeviceRpcService, StatsFactory statsFactory, TbDeviceProfileCache deviceProfileCache) { | |
100 | + super(actorContext, encodingService, deviceProfileCache, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer()); | |
97 | 101 | this.mainConsumer = tbCoreQueueFactory.createToCoreMsgConsumer(); |
98 | 102 | this.stateService = stateService; |
99 | 103 | this.localSubscriptionService = localSubscriptionService; |
... | ... | @@ -211,11 +215,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore |
211 | 215 | log.trace("[{}] Forwarding message to RPC service {}", id, toCoreNotification.getFromDeviceRpcResponse()); |
212 | 216 | forwardToCoreRpcService(toCoreNotification.getFromDeviceRpcResponse(), callback); |
213 | 217 | } else if (toCoreNotification.getComponentLifecycleMsg() != null && !toCoreNotification.getComponentLifecycleMsg().isEmpty()) { |
214 | - Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreNotification.getComponentLifecycleMsg().toByteArray()); | |
215 | - if (actorMsg.isPresent()) { | |
216 | - log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); | |
217 | - actorContext.tellWithHighPriority(actorMsg.get()); | |
218 | - } | |
218 | + handleComponentLifecycleMsg(id, toCoreNotification.getComponentLifecycleMsg()); | |
219 | 219 | callback.onSuccess(); |
220 | 220 | } |
221 | 221 | if (statsEnabled) { | ... | ... |
... | ... | @@ -15,6 +15,7 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.service.queue; |
17 | 17 | |
18 | +import com.google.protobuf.ByteString; | |
18 | 19 | import com.google.protobuf.ProtocolStringList; |
19 | 20 | import lombok.extern.slf4j.Slf4j; |
20 | 21 | import org.springframework.beans.factory.annotation.Value; |
... | ... | @@ -38,6 +39,7 @@ import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; |
38 | 39 | import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; |
39 | 40 | import org.thingsboard.server.queue.util.TbRuleEngineComponent; |
40 | 41 | import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; |
42 | +import org.thingsboard.server.service.profile.TbDeviceProfileCache; | |
41 | 43 | import org.thingsboard.server.service.queue.processing.*; |
42 | 44 | import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; |
43 | 45 | import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService; |
... | ... | @@ -80,8 +82,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< |
80 | 82 | TbRuleEngineQueueFactory tbRuleEngineQueueFactory, RuleEngineStatisticsService statisticsService, |
81 | 83 | ActorSystemContext actorContext, DataDecodingEncodingService encodingService, |
82 | 84 | TbRuleEngineDeviceRpcService tbDeviceRpcService, |
83 | - StatsFactory statsFactory) { | |
84 | - super(actorContext, encodingService, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer()); | |
85 | + StatsFactory statsFactory, TbDeviceProfileCache deviceProfileCache) { | |
86 | + super(actorContext, encodingService, deviceProfileCache, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer()); | |
85 | 87 | this.statisticsService = statisticsService; |
86 | 88 | this.ruleEngineSettings = ruleEngineSettings; |
87 | 89 | this.tbRuleEngineQueueFactory = tbRuleEngineQueueFactory; |
... | ... | @@ -239,11 +241,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< |
239 | 241 | protected void handleNotification(UUID id, TbProtoQueueMsg<ToRuleEngineNotificationMsg> msg, TbCallback callback) throws Exception { |
240 | 242 | ToRuleEngineNotificationMsg nfMsg = msg.getValue(); |
241 | 243 | if (nfMsg.getComponentLifecycleMsg() != null && !nfMsg.getComponentLifecycleMsg().isEmpty()) { |
242 | - Optional<TbActorMsg> actorMsg = encodingService.decode(nfMsg.getComponentLifecycleMsg().toByteArray()); | |
243 | - if (actorMsg.isPresent()) { | |
244 | - log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); | |
245 | - actorContext.tellWithHighPriority(actorMsg.get()); | |
246 | - } | |
244 | + handleComponentLifecycleMsg(id, nfMsg.getComponentLifecycleMsg()); | |
247 | 245 | callback.onSuccess(); |
248 | 246 | } else if (nfMsg.hasFromDeviceRpcResponse()) { |
249 | 247 | TransportProtos.FromDeviceRPCResponseProto proto = nfMsg.getFromDeviceRpcResponse(); | ... | ... |
... | ... | @@ -17,6 +17,7 @@ package org.thingsboard.server.service.queue; |
17 | 17 | |
18 | 18 | import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg; |
19 | 19 | import org.thingsboard.server.common.data.DeviceProfile; |
20 | +import org.thingsboard.server.common.data.id.DeviceProfileId; | |
20 | 21 | import org.thingsboard.server.common.data.id.EntityId; |
21 | 22 | import org.thingsboard.server.common.data.id.TenantId; |
22 | 23 | import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; |
... | ... | @@ -52,4 +53,5 @@ public interface TbClusterService { |
52 | 53 | |
53 | 54 | void onDeviceProfileChange(DeviceProfile deviceProfile, TbQueueCallback callback); |
54 | 55 | |
56 | + void onDeviceProfileDelete(DeviceProfile deviceProfileId, TbQueueCallback callback); | |
55 | 57 | } | ... | ... |
... | ... | @@ -15,23 +15,31 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.service.queue.processing; |
17 | 17 | |
18 | +import com.google.protobuf.ByteString; | |
18 | 19 | import lombok.extern.slf4j.Slf4j; |
19 | 20 | import org.springframework.boot.context.event.ApplicationReadyEvent; |
20 | 21 | import org.springframework.context.ApplicationListener; |
21 | 22 | import org.springframework.context.event.EventListener; |
22 | 23 | import org.thingsboard.common.util.ThingsBoardThreadFactory; |
23 | 24 | import org.thingsboard.server.actors.ActorSystemContext; |
25 | +import org.thingsboard.server.common.data.EntityType; | |
26 | +import org.thingsboard.server.common.data.id.DeviceId; | |
27 | +import org.thingsboard.server.common.data.id.DeviceProfileId; | |
28 | +import org.thingsboard.server.common.msg.TbActorMsg; | |
29 | +import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; | |
24 | 30 | import org.thingsboard.server.common.msg.queue.ServiceType; |
25 | 31 | import org.thingsboard.server.common.msg.queue.TbCallback; |
26 | 32 | import org.thingsboard.server.queue.TbQueueConsumer; |
27 | 33 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
28 | 34 | import org.thingsboard.server.queue.discovery.PartitionChangeEvent; |
29 | 35 | import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; |
36 | +import org.thingsboard.server.service.profile.TbDeviceProfileCache; | |
30 | 37 | import org.thingsboard.server.service.queue.TbPackCallback; |
31 | 38 | import org.thingsboard.server.service.queue.TbPackProcessingContext; |
32 | 39 | |
33 | 40 | import javax.annotation.PreDestroy; |
34 | 41 | import java.util.List; |
42 | +import java.util.Optional; | |
35 | 43 | import java.util.UUID; |
36 | 44 | import java.util.concurrent.ConcurrentHashMap; |
37 | 45 | import java.util.concurrent.ConcurrentMap; |
... | ... | @@ -51,12 +59,15 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene |
51 | 59 | |
52 | 60 | protected final ActorSystemContext actorContext; |
53 | 61 | protected final DataDecodingEncodingService encodingService; |
62 | + protected final TbDeviceProfileCache deviceProfileCache; | |
54 | 63 | |
55 | 64 | protected final TbQueueConsumer<TbProtoQueueMsg<N>> nfConsumer; |
56 | 65 | |
57 | - public AbstractConsumerService(ActorSystemContext actorContext, DataDecodingEncodingService encodingService, TbQueueConsumer<TbProtoQueueMsg<N>> nfConsumer) { | |
66 | + public AbstractConsumerService(ActorSystemContext actorContext, DataDecodingEncodingService encodingService, | |
67 | + TbDeviceProfileCache deviceProfileCache, TbQueueConsumer<TbProtoQueueMsg<N>> nfConsumer) { | |
58 | 68 | this.actorContext = actorContext; |
59 | 69 | this.encodingService = encodingService; |
70 | + this.deviceProfileCache = deviceProfileCache; | |
60 | 71 | this.nfConsumer = nfConsumer; |
61 | 72 | } |
62 | 73 | |
... | ... | @@ -126,6 +137,23 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene |
126 | 137 | }); |
127 | 138 | } |
128 | 139 | |
140 | + protected void handleComponentLifecycleMsg(UUID id, ByteString nfMsg) { | |
141 | + Optional<TbActorMsg> actorMsgOpt = encodingService.decode(nfMsg.toByteArray()); | |
142 | + if (actorMsgOpt.isPresent()) { | |
143 | + TbActorMsg actorMsg = actorMsgOpt.get(); | |
144 | + if (actorMsg instanceof ComponentLifecycleMsg) { | |
145 | + ComponentLifecycleMsg componentLifecycleMsg = (ComponentLifecycleMsg) actorMsg; | |
146 | + if (EntityType.DEVICE_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { | |
147 | + deviceProfileCache.evict(new DeviceProfileId(componentLifecycleMsg.getEntityId().getId())); | |
148 | + } else if (EntityType.DEVICE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { | |
149 | + deviceProfileCache.evict(new DeviceId(componentLifecycleMsg.getEntityId().getId())); | |
150 | + } | |
151 | + } | |
152 | + log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg); | |
153 | + actorContext.tellWithHighPriority(actorMsg); | |
154 | + } | |
155 | + } | |
156 | + | |
129 | 157 | protected abstract void handleNotification(UUID id, TbProtoQueueMsg<N> msg, TbCallback callback) throws Exception; |
130 | 158 | |
131 | 159 | @PreDestroy | ... | ... |
... | ... | @@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.id.EntityIdFactory; |
26 | 26 | import org.thingsboard.server.common.data.id.RuleChainId; |
27 | 27 | import org.thingsboard.server.common.data.id.RuleNodeId; |
28 | 28 | import org.thingsboard.server.common.msg.gen.MsgProtos; |
29 | +import org.thingsboard.server.common.msg.queue.RuleNodeInfo; | |
29 | 30 | import org.thingsboard.server.common.msg.queue.ServiceQueue; |
30 | 31 | import org.thingsboard.server.common.msg.queue.TbMsgCallback; |
31 | 32 | |
... | ... | @@ -84,6 +85,11 @@ public final class TbMsg implements Serializable { |
84 | 85 | data, origMsg.getRuleChainId(), origMsg.getRuleNodeId(), origMsg.getCallback()); |
85 | 86 | } |
86 | 87 | |
88 | + public static TbMsg transformMsg(TbMsg origMsg, RuleChainId ruleChainId) { | |
89 | + return new TbMsg(origMsg.queueName, origMsg.id, origMsg.ts, origMsg.type, origMsg.originator, origMsg.metaData, origMsg.dataType, | |
90 | + origMsg.data, ruleChainId, null, origMsg.getCallback()); | |
91 | + } | |
92 | + | |
87 | 93 | public static TbMsg newMsg(TbMsg tbMsg, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { |
88 | 94 | return new TbMsg(tbMsg.getQueueName(), UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(), |
89 | 95 | tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, TbMsgCallback.EMPTY); | ... | ... |
... | ... | @@ -194,6 +194,11 @@ message DeviceProfileUpdateMsg { |
194 | 194 | bytes data = 1; |
195 | 195 | } |
196 | 196 | |
197 | +message DeviceProfileDeleteMsg { | |
198 | + int64 profileIdMSB = 1; | |
199 | + int64 profileIdLSB = 2; | |
200 | +} | |
201 | + | |
197 | 202 | message SessionCloseNotificationProto { |
198 | 203 | string message = 1; |
199 | 204 | } |
... | ... | @@ -485,4 +490,5 @@ message ToTransportMsg { |
485 | 490 | ToDeviceRpcRequestMsg toDeviceRequest = 6; |
486 | 491 | ToServerRpcResponseMsg toServerResponse = 7; |
487 | 492 | DeviceProfileUpdateMsg deviceProfileUpdateMsg = 8; |
493 | + DeviceProfileDeleteMsg deviceProfileDeleteMsg = 9; | |
488 | 494 | } | ... | ... |
... | ... | @@ -23,7 +23,6 @@ import java.util.Optional; |
23 | 23 | |
24 | 24 | public interface TransportProfileCache { |
25 | 25 | |
26 | - | |
27 | 26 | DeviceProfile getOrCreate(DeviceProfileId id, ByteString profileBody); |
28 | 27 | |
29 | 28 | DeviceProfile get(DeviceProfileId id); |
... | ... | @@ -32,4 +31,6 @@ public interface TransportProfileCache { |
32 | 31 | |
33 | 32 | DeviceProfile put(ByteString profileBody); |
34 | 33 | |
34 | + void evict(DeviceProfileId id); | |
35 | + | |
35 | 36 | } | ... | ... |
... | ... | @@ -629,6 +629,11 @@ public class DefaultTransportService implements TransportService { |
629 | 629 | if (deviceProfile != null) { |
630 | 630 | onProfileUpdate(deviceProfile); |
631 | 631 | } |
632 | + } else if (toSessionMsg.hasDeviceProfileDeleteMsg()) { | |
633 | + transportProfileCache.evict(new DeviceProfileId(new UUID( | |
634 | + toSessionMsg.getDeviceProfileDeleteMsg().getProfileIdMSB(), | |
635 | + toSessionMsg.getDeviceProfileDeleteMsg().getProfileIdLSB() | |
636 | + ))); | |
632 | 637 | } else { |
633 | 638 | //TODO: should we notify the device actor about missed session? |
634 | 639 | log.debug("[{}] Missing session.", sessionId); | ... | ... |