Commit d0db49a2cb826a7b31760ab2809b7450bae6988d
1 parent
908bb8be
Fixed assign rule chain. Added sync of related rule chains
Showing
3 changed files
with
48 additions
and
0 deletions
... | ... | @@ -18,9 +18,11 @@ package org.thingsboard.server.service.edge; |
18 | 18 | import com.fasterxml.jackson.core.JsonProcessingException; |
19 | 19 | import com.fasterxml.jackson.databind.JsonNode; |
20 | 20 | import com.fasterxml.jackson.databind.ObjectMapper; |
21 | +import com.google.common.util.concurrent.FutureCallback; | |
21 | 22 | import com.google.common.util.concurrent.Futures; |
22 | 23 | import com.google.common.util.concurrent.ListenableFuture; |
23 | 24 | import lombok.extern.slf4j.Slf4j; |
25 | +import org.checkerframework.checker.nullness.qual.Nullable; | |
24 | 26 | import org.springframework.beans.factory.annotation.Autowired; |
25 | 27 | import org.springframework.stereotype.Service; |
26 | 28 | import org.thingsboard.server.common.data.EntityType; |
... | ... | @@ -46,12 +48,15 @@ import org.thingsboard.server.common.data.page.TimePageData; |
46 | 48 | import org.thingsboard.server.common.data.page.TimePageLink; |
47 | 49 | import org.thingsboard.server.common.data.relation.EntityRelation; |
48 | 50 | import org.thingsboard.server.common.data.relation.RelationTypeGroup; |
51 | +import org.thingsboard.server.common.data.rule.RuleChain; | |
52 | +import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo; | |
49 | 53 | import org.thingsboard.server.common.msg.queue.TbCallback; |
50 | 54 | import org.thingsboard.server.dao.alarm.AlarmService; |
51 | 55 | import org.thingsboard.server.dao.edge.EdgeEventService; |
52 | 56 | import org.thingsboard.server.dao.edge.EdgeService; |
53 | 57 | import org.thingsboard.server.dao.model.ModelConstants; |
54 | 58 | import org.thingsboard.server.dao.relation.RelationService; |
59 | +import org.thingsboard.server.dao.rule.RuleChainService; | |
55 | 60 | import org.thingsboard.server.dao.user.UserService; |
56 | 61 | import org.thingsboard.server.gen.transport.TransportProtos; |
57 | 62 | import org.thingsboard.server.queue.util.TbCoreComponent; |
... | ... | @@ -87,6 +92,9 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { |
87 | 92 | private UserService userService; |
88 | 93 | |
89 | 94 | @Autowired |
95 | + private RuleChainService ruleChainService; | |
96 | + | |
97 | + @Autowired | |
90 | 98 | private RelationService relationService; |
91 | 99 | |
92 | 100 | @Autowired |
... | ... | @@ -223,6 +231,9 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { |
223 | 231 | case UNASSIGNED_FROM_EDGE: |
224 | 232 | EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB())); |
225 | 233 | saveEdgeEvent(tenantId, edgeId, edgeEventType, edgeEventActionType, entityId, null); |
234 | + if (edgeEventType.equals(EdgeEventType.RULE_CHAIN)) { | |
235 | + updateDependentRuleChains(tenantId, new RuleChainId(entityId.getId()), edgeId); | |
236 | + } | |
226 | 237 | break; |
227 | 238 | case RELATIONS_DELETED: |
228 | 239 | // TODO: voba - add support for relations deleted |
... | ... | @@ -230,6 +241,40 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { |
230 | 241 | } |
231 | 242 | } |
232 | 243 | |
244 | + private void updateDependentRuleChains(TenantId tenantId, RuleChainId processingRuleChainId, EdgeId edgeId) { | |
245 | + ListenableFuture<TimePageData<RuleChain>> future = ruleChainService.findRuleChainsByTenantIdAndEdgeId(tenantId, edgeId, new TimePageLink(Integer.MAX_VALUE)); | |
246 | + Futures.addCallback(future, new FutureCallback<TimePageData<RuleChain>>() { | |
247 | + @Override | |
248 | + public void onSuccess(@Nullable TimePageData<RuleChain> pageData) { | |
249 | + if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { | |
250 | + for (RuleChain ruleChain : pageData.getData()) { | |
251 | + if (!ruleChain.getId().equals(processingRuleChainId)) { | |
252 | + List<RuleChainConnectionInfo> connectionInfos = | |
253 | + ruleChainService.loadRuleChainMetaData(ruleChain.getTenantId(), ruleChain.getId()).getRuleChainConnections(); | |
254 | + if (connectionInfos != null && !connectionInfos.isEmpty()) { | |
255 | + for (RuleChainConnectionInfo connectionInfo : connectionInfos) { | |
256 | + if (connectionInfo.getTargetRuleChainId().equals(processingRuleChainId)) { | |
257 | + saveEdgeEvent(tenantId, | |
258 | + edgeId, | |
259 | + EdgeEventType.RULE_CHAIN_METADATA, | |
260 | + ActionType.UPDATED, | |
261 | + ruleChain.getId(), | |
262 | + null); | |
263 | + } | |
264 | + } | |
265 | + } | |
266 | + } | |
267 | + } | |
268 | + } | |
269 | + } | |
270 | + | |
271 | + @Override | |
272 | + public void onFailure(Throwable t) { | |
273 | + log.error("Exception during updating dependent rule chains on sync!", t); | |
274 | + } | |
275 | + }, dbCallbackExecutorService); | |
276 | + } | |
277 | + | |
233 | 278 | private void processAlarm(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { |
234 | 279 | AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); |
235 | 280 | ListenableFuture<Alarm> alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId); | ... | ... |