Commit b7718b91d4bffd0901a2ce94dfd02174b135ee1e
Merge branch 'develop/3.2' of github.com:thingsboard/thingsboard into develop/3.2
Showing
6 changed files
with
78 additions
and
41 deletions
... | ... | @@ -243,6 +243,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
243 | 243 | } |
244 | 244 | } |
245 | 245 | |
246 | + | |
247 | + | |
246 | 248 | private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final int msgId, final T msg) { |
247 | 249 | return new TransportServiceCallback<Void>() { |
248 | 250 | @Override | ... | ... |
... | ... | @@ -34,7 +34,7 @@ public class JacksonUtil { |
34 | 34 | return fromValue != null ? OBJECT_MAPPER.convertValue(fromValue, toValueType) : null; |
35 | 35 | } catch (IllegalArgumentException e) { |
36 | 36 | throw new IllegalArgumentException("The given object value: " |
37 | - + fromValue + " cannot be converted to " + toValueType); | |
37 | + + fromValue + " cannot be converted to " + toValueType, e); | |
38 | 38 | } |
39 | 39 | } |
40 | 40 | |
... | ... | @@ -43,7 +43,7 @@ public class JacksonUtil { |
43 | 43 | return string != null ? OBJECT_MAPPER.readValue(string, clazz) : null; |
44 | 44 | } catch (IOException e) { |
45 | 45 | throw new IllegalArgumentException("The given string value: " |
46 | - + string + " cannot be transformed to Json object"); | |
46 | + + string + " cannot be transformed to Json object", e); | |
47 | 47 | } |
48 | 48 | } |
49 | 49 | |
... | ... | @@ -52,7 +52,7 @@ public class JacksonUtil { |
52 | 52 | return value != null ? OBJECT_MAPPER.writeValueAsString(value) : null; |
53 | 53 | } catch (JsonProcessingException e) { |
54 | 54 | throw new IllegalArgumentException("The given Json object value: " |
55 | - + value + " cannot be transformed to a String"); | |
55 | + + value + " cannot be transformed to a String", e); | |
56 | 56 | } |
57 | 57 | } |
58 | 58 | |
... | ... | @@ -71,7 +71,7 @@ public class JacksonUtil { |
71 | 71 | return fromString(toString(value), (Class<T>) value.getClass()); |
72 | 72 | } |
73 | 73 | |
74 | - public static JsonNode valueToTree(Alarm alarm) { | |
74 | + public static <T> JsonNode valueToTree(T alarm) { | |
75 | 75 | return OBJECT_MAPPER.valueToTree(alarm); |
76 | 76 | } |
77 | 77 | } | ... | ... |
... | ... | @@ -47,16 +47,14 @@ import java.util.concurrent.ExecutionException; |
47 | 47 | class DeviceProfileAlarmState { |
48 | 48 | |
49 | 49 | private final EntityId originator; |
50 | - private final DeviceProfileAlarm alarmDefinition; | |
50 | + private DeviceProfileAlarm alarmDefinition; | |
51 | 51 | private volatile Map<AlarmSeverity, AlarmRule> createRulesSortedBySeverityDesc; |
52 | 52 | private volatile Alarm currentAlarm; |
53 | 53 | private volatile boolean initialFetchDone; |
54 | 54 | |
55 | 55 | public DeviceProfileAlarmState(EntityId originator, DeviceProfileAlarm alarmDefinition) { |
56 | 56 | this.originator = originator; |
57 | - this.alarmDefinition = alarmDefinition; | |
58 | - this.createRulesSortedBySeverityDesc = new TreeMap<>(Comparator.comparingInt(AlarmSeverity::ordinal)); | |
59 | - this.createRulesSortedBySeverityDesc.putAll(alarmDefinition.getCreateRules()); | |
57 | + this.updateState(alarmDefinition); | |
60 | 58 | } |
61 | 59 | |
62 | 60 | public void process(TbContext ctx, TbMsg msg, DeviceDataSnapshot data) throws ExecutionException, InterruptedException { |
... | ... | @@ -111,6 +109,12 @@ class DeviceProfileAlarmState { |
111 | 109 | ctx.tellNext(newMsg, relationType); |
112 | 110 | } |
113 | 111 | |
112 | + public void updateState(DeviceProfileAlarm alarm) { | |
113 | + this.alarmDefinition = alarm; | |
114 | + this.createRulesSortedBySeverityDesc = new TreeMap<>(Comparator.comparingInt(AlarmSeverity::ordinal)); | |
115 | + this.createRulesSortedBySeverityDesc.putAll(alarmDefinition.getCreateRules()); | |
116 | + } | |
117 | + | |
114 | 118 | private TbAlarmResult calculateAlarmResult(TbContext ctx, AlarmSeverity severity) { |
115 | 119 | if (currentAlarm != null) { |
116 | 120 | currentAlarm.setEndTs(System.currentTimeMillis()); | ... | ... |
... | ... | @@ -20,6 +20,7 @@ import lombok.Getter; |
20 | 20 | import org.thingsboard.server.common.data.DeviceProfile; |
21 | 21 | import org.thingsboard.server.common.data.device.profile.AlarmRule; |
22 | 22 | import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm; |
23 | +import org.thingsboard.server.common.data.id.DeviceProfileId; | |
23 | 24 | import org.thingsboard.server.common.data.query.EntityKey; |
24 | 25 | import org.thingsboard.server.common.data.query.KeyFilter; |
25 | 26 | |
... | ... | @@ -55,4 +56,8 @@ class DeviceProfileState { |
55 | 56 | } |
56 | 57 | } |
57 | 58 | } |
59 | + | |
60 | + public DeviceProfileId getProfileId() { | |
61 | + return deviceProfile.getId(); | |
62 | + } | |
58 | 63 | } | ... | ... |
... | ... | @@ -20,8 +20,10 @@ import org.thingsboard.rule.engine.api.TbContext; |
20 | 20 | import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode; |
21 | 21 | import org.thingsboard.server.common.data.DataConstants; |
22 | 22 | import org.thingsboard.server.common.data.Device; |
23 | +import org.thingsboard.server.common.data.DeviceProfile; | |
23 | 24 | import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm; |
24 | 25 | import org.thingsboard.server.common.data.id.DeviceId; |
26 | +import org.thingsboard.server.common.data.id.DeviceProfileId; | |
25 | 27 | import org.thingsboard.server.common.data.id.EntityId; |
26 | 28 | import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
27 | 29 | import org.thingsboard.server.common.data.kv.KvEntry; |
... | ... | @@ -40,20 +42,44 @@ import java.util.Set; |
40 | 42 | import java.util.concurrent.ConcurrentHashMap; |
41 | 43 | import java.util.concurrent.ConcurrentMap; |
42 | 44 | import java.util.concurrent.ExecutionException; |
45 | +import java.util.stream.Collectors; | |
43 | 46 | |
44 | 47 | class DeviceState { |
45 | 48 | |
49 | + private final DeviceId deviceId; | |
46 | 50 | private DeviceProfileState deviceProfile; |
47 | 51 | private DeviceDataSnapshot latestValues; |
48 | 52 | private final ConcurrentMap<String, DeviceProfileAlarmState> alarmStates = new ConcurrentHashMap<>(); |
49 | 53 | |
50 | - public DeviceState(DeviceProfileState deviceProfile) { | |
54 | + public DeviceState(DeviceId deviceId, DeviceProfileState deviceProfile) { | |
55 | + this.deviceId = deviceId; | |
51 | 56 | this.deviceProfile = deviceProfile; |
52 | 57 | } |
53 | 58 | |
59 | + public void updateProfile(TbContext ctx, DeviceProfile deviceProfile) throws ExecutionException, InterruptedException { | |
60 | + Set<EntityKey> oldKeys = this.deviceProfile.getEntityKeys(); | |
61 | + this.deviceProfile.updateDeviceProfile(deviceProfile); | |
62 | + if (latestValues != null) { | |
63 | + Set<EntityKey> keysToFetch = new HashSet<>(this.deviceProfile.getEntityKeys()); | |
64 | + keysToFetch.removeAll(oldKeys); | |
65 | + if (!keysToFetch.isEmpty()) { | |
66 | + addEntityKeysToSnapshot(ctx, deviceId, keysToFetch, latestValues); | |
67 | + } | |
68 | + } | |
69 | + Set<String> newAlarmStateIds = this.deviceProfile.getAlarmSettings().stream().map(DeviceProfileAlarm::getId).collect(Collectors.toSet()); | |
70 | + alarmStates.keySet().removeIf(id -> !newAlarmStateIds.contains(id)); | |
71 | + for (DeviceProfileAlarm alarm : this.deviceProfile.getAlarmSettings()) { | |
72 | + if (alarmStates.containsKey(alarm.getId())) { | |
73 | + alarmStates.get(alarm.getId()).updateState(alarm); | |
74 | + } else { | |
75 | + alarmStates.putIfAbsent(alarm.getId(), new DeviceProfileAlarmState(deviceId, alarm)); | |
76 | + } | |
77 | + } | |
78 | + } | |
79 | + | |
54 | 80 | public void process(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { |
55 | 81 | if (latestValues == null) { |
56 | - latestValues = fetchLatestValues(ctx, msg.getOriginator()); | |
82 | + latestValues = fetchLatestValues(ctx, deviceId); | |
57 | 83 | } |
58 | 84 | if (msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) { |
59 | 85 | processTelemetry(ctx, msg); |
... | ... | @@ -69,7 +95,7 @@ class DeviceState { |
69 | 95 | List<KvEntry> data = entry.getValue(); |
70 | 96 | latestValues = merge(latestValues, ts, data); |
71 | 97 | for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) { |
72 | - DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(), a -> new DeviceProfileAlarmState(msg.getOriginator(), alarm)); | |
98 | + DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(), a -> new DeviceProfileAlarmState(deviceId, alarm)); | |
73 | 99 | alarmState.process(ctx, msg, latestValues); |
74 | 100 | } |
75 | 101 | } |
... | ... | @@ -85,8 +111,13 @@ class DeviceState { |
85 | 111 | } |
86 | 112 | |
87 | 113 | private DeviceDataSnapshot fetchLatestValues(TbContext ctx, EntityId originator) throws ExecutionException, InterruptedException { |
88 | - DeviceDataSnapshot result = new DeviceDataSnapshot(deviceProfile.getEntityKeys()); | |
114 | + Set<EntityKey> entityKeysToFetch = deviceProfile.getEntityKeys(); | |
115 | + DeviceDataSnapshot result = new DeviceDataSnapshot(entityKeysToFetch); | |
116 | + addEntityKeysToSnapshot(ctx, originator, entityKeysToFetch, result); | |
117 | + return result; | |
118 | + } | |
89 | 119 | |
120 | + private void addEntityKeysToSnapshot(TbContext ctx, EntityId originator, Set<EntityKey> entityKeysToFetch, DeviceDataSnapshot result) throws InterruptedException, ExecutionException { | |
90 | 121 | Set<String> serverAttributeKeys = new HashSet<>(); |
91 | 122 | Set<String> clientAttributeKeys = new HashSet<>(); |
92 | 123 | Set<String> sharedAttributeKeys = new HashSet<>(); |
... | ... | @@ -94,7 +125,7 @@ class DeviceState { |
94 | 125 | Set<String> latestTsKeys = new HashSet<>(); |
95 | 126 | |
96 | 127 | Device device = null; |
97 | - for (EntityKey entityKey : deviceProfile.getEntityKeys()) { | |
128 | + for (EntityKey entityKey : entityKeysToFetch) { | |
98 | 129 | String key = entityKey.getKey(); |
99 | 130 | switch (entityKey.getType()) { |
100 | 131 | case SERVER_ATTRIBUTE: |
... | ... | @@ -159,8 +190,6 @@ class DeviceState { |
159 | 190 | addToSnapshot(result, commonAttributeKeys, |
160 | 191 | ctx.getAttributesService().find(ctx.getTenantId(), originator, DataConstants.SERVER_SCOPE, serverAttributeKeys).get()); |
161 | 192 | } |
162 | - | |
163 | - return result; | |
164 | 193 | } |
165 | 194 | |
166 | 195 | private void addToSnapshot(DeviceDataSnapshot snapshot, Set<String> commonAttributeKeys, List<AttributeKvEntry> data) { |
... | ... | @@ -192,4 +221,8 @@ class DeviceState { |
192 | 221 | } |
193 | 222 | } |
194 | 223 | |
224 | + public DeviceProfileId getProfileId() { | |
225 | + return deviceProfile.getProfileId(); | |
226 | + } | |
227 | + | |
195 | 228 | } | ... | ... |
... | ... | @@ -16,15 +16,6 @@ |
16 | 16 | package org.thingsboard.rule.engine.profile; |
17 | 17 | |
18 | 18 | import lombok.extern.slf4j.Slf4j; |
19 | -import org.apache.commons.lang3.BooleanUtils; | |
20 | -import org.apache.kafka.clients.producer.KafkaProducer; | |
21 | -import org.apache.kafka.clients.producer.Producer; | |
22 | -import org.apache.kafka.clients.producer.ProducerConfig; | |
23 | -import org.apache.kafka.clients.producer.ProducerRecord; | |
24 | -import org.apache.kafka.clients.producer.RecordMetadata; | |
25 | -import org.apache.kafka.common.header.Headers; | |
26 | -import org.apache.kafka.common.header.internals.RecordHeader; | |
27 | -import org.apache.kafka.common.header.internals.RecordHeaders; | |
28 | 19 | import org.thingsboard.rule.engine.api.EmptyNodeConfiguration; |
29 | 20 | import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache; |
30 | 21 | import org.thingsboard.rule.engine.api.RuleNode; |
... | ... | @@ -32,22 +23,14 @@ import org.thingsboard.rule.engine.api.TbContext; |
32 | 23 | import org.thingsboard.rule.engine.api.TbNode; |
33 | 24 | import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
34 | 25 | import org.thingsboard.rule.engine.api.TbNodeException; |
35 | -import org.thingsboard.rule.engine.api.TbRelationTypes; | |
36 | -import org.thingsboard.rule.engine.api.util.TbNodeUtils; | |
37 | -import org.thingsboard.rule.engine.kafka.TbKafkaNodeConfiguration; | |
38 | 26 | import org.thingsboard.server.common.data.DeviceProfile; |
39 | 27 | import org.thingsboard.server.common.data.EntityType; |
40 | 28 | import org.thingsboard.server.common.data.id.DeviceId; |
41 | -import org.thingsboard.server.common.data.id.EntityId; | |
42 | 29 | import org.thingsboard.server.common.data.plugin.ComponentType; |
43 | 30 | import org.thingsboard.server.common.msg.TbMsg; |
44 | -import org.thingsboard.server.common.msg.TbMsgMetaData; | |
31 | +import org.thingsboard.server.dao.util.mapping.JacksonUtil; | |
45 | 32 | |
46 | -import java.nio.charset.Charset; | |
47 | -import java.nio.charset.StandardCharsets; | |
48 | -import java.util.HashMap; | |
49 | 33 | import java.util.Map; |
50 | -import java.util.Properties; | |
51 | 34 | import java.util.concurrent.ConcurrentHashMap; |
52 | 35 | import java.util.concurrent.ExecutionException; |
53 | 36 | |
... | ... | @@ -76,7 +59,6 @@ public class TbDeviceProfileNode implements TbNode { |
76 | 59 | /** |
77 | 60 | * TODO: |
78 | 61 | * 1. Duration in the alarm conditions; |
79 | - * 2. Update of the Profile (rules); | |
80 | 62 | * 3. Update of the Device attributes (client, server and shared); |
81 | 63 | * 4. Dynamic values evaluation; |
82 | 64 | */ |
... | ... | @@ -86,26 +68,37 @@ public class TbDeviceProfileNode implements TbNode { |
86 | 68 | EntityType originatorType = msg.getOriginator().getEntityType(); |
87 | 69 | if (EntityType.DEVICE.equals(originatorType)) { |
88 | 70 | DeviceId deviceId = new DeviceId(msg.getOriginator().getId()); |
89 | - DeviceState deviceState = getOrCreateDeviceState(ctx, msg, deviceId); | |
90 | - if (deviceState != null) { | |
91 | - deviceState.process(ctx, msg); | |
71 | + if (msg.getType().equals("ENTITY_UPDATED")) { | |
72 | + //TODO: handle if device profile id has changed. | |
92 | 73 | } else { |
93 | - ctx.tellFailure(msg, new IllegalStateException("Device profile for device [" + deviceId + "] not found!")); | |
74 | + DeviceState deviceState = getOrCreateDeviceState(ctx, deviceId); | |
75 | + if (deviceState != null) { | |
76 | + deviceState.process(ctx, msg); | |
77 | + } else { | |
78 | + ctx.tellFailure(msg, new IllegalStateException("Device profile for device [" + deviceId + "] not found!")); | |
79 | + } | |
94 | 80 | } |
95 | 81 | } else if (EntityType.DEVICE_PROFILE.equals(originatorType)) { |
96 | - //TODO: check that the profile rule set was changed. If yes - invalidate the rules. | |
82 | + if (msg.getType().equals("ENTITY_UPDATED")) { | |
83 | + DeviceProfile deviceProfile = JacksonUtil.fromString(msg.getData(), DeviceProfile.class); | |
84 | + for (DeviceState state : deviceStates.values()) { | |
85 | + if (deviceProfile.getId().equals(state.getProfileId())) { | |
86 | + state.updateProfile(ctx, deviceProfile); | |
87 | + } | |
88 | + } | |
89 | + } | |
97 | 90 | ctx.tellSuccess(msg); |
98 | 91 | } else { |
99 | 92 | ctx.tellSuccess(msg); |
100 | 93 | } |
101 | 94 | } |
102 | 95 | |
103 | - private DeviceState getOrCreateDeviceState(TbContext ctx, TbMsg msg, DeviceId deviceId) { | |
96 | + private DeviceState getOrCreateDeviceState(TbContext ctx, DeviceId deviceId) { | |
104 | 97 | DeviceState deviceState = deviceStates.get(deviceId); |
105 | 98 | if (deviceState == null) { |
106 | 99 | DeviceProfile deviceProfile = cache.get(ctx.getTenantId(), deviceId); |
107 | 100 | if (deviceProfile != null) { |
108 | - deviceState = new DeviceState(new DeviceProfileState(deviceProfile)); | |
101 | + deviceState = new DeviceState(deviceId, new DeviceProfileState(deviceProfile)); | |
109 | 102 | deviceStates.put(deviceId, deviceState); |
110 | 103 | } |
111 | 104 | } | ... | ... |