Commit ca8f04e1e970f319515e9d08c409355dd6c4189c
Merge remote-tracking branch 'upstream/master' into feature/dashboard/widget-select-preview
Showing
100 changed files
with
499 additions
and
264 deletions
Too many changes to show.
To preserve performance only 100 of 187 files are displayed.
... | ... | @@ -15,11 +15,10 @@ |
15 | 15 | # |
16 | 16 | |
17 | 17 | export JAVA_OPTS="$JAVA_OPTS -Dplatform=@pkg.platform@ -Dinstall.data_dir=@pkg.installFolder@/data" |
18 | -export JAVA_OPTS="$JAVA_OPTS -Xloggc:@pkg.logFolder@/gc.log -XX:+IgnoreUnrecognizedVMOptions -XX:+HeapDumpOnOutOfMemoryError -XX:+PrintGCDetails -XX:+PrintGCDateStamps" | |
19 | -export JAVA_OPTS="$JAVA_OPTS -XX:+PrintHeapAtGC -XX:+PrintTenuringDistribution -XX:+PrintGCApplicationStoppedTime -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10" | |
20 | -export JAVA_OPTS="$JAVA_OPTS -XX:GCLogFileSize=10M -XX:-UseBiasedLocking -XX:+UseTLAB -XX:+ResizeTLAB -XX:+PerfDisableSharedMem -XX:+UseCondCardMark" | |
21 | -export JAVA_OPTS="$JAVA_OPTS -XX:CMSWaitDuration=10000 -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+CMSParallelInitialMarkEnabled" | |
22 | -export JAVA_OPTS="$JAVA_OPTS -XX:+CMSEdenChunksRecordAlways -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly" | |
18 | +export JAVA_OPTS="$JAVA_OPTS -Xlog:gc*,heap*,age*,safepoint=debug:file=@pkg.logFolder@/gc.log:time,uptime,level,tags:filecount=10,filesize=10M" | |
19 | +export JAVA_OPTS="$JAVA_OPTS -XX:+IgnoreUnrecognizedVMOptions -XX:+HeapDumpOnOutOfMemoryError" | |
20 | +export JAVA_OPTS="$JAVA_OPTS -XX:-UseBiasedLocking -XX:+UseTLAB -XX:+ResizeTLAB -XX:+PerfDisableSharedMem -XX:+UseCondCardMark" | |
21 | +export JAVA_OPTS="$JAVA_OPTS -XX:+UseG1GC -XX:MaxGCPauseMillis=500 -XX:+UseStringDeduplication -XX:+ParallelRefProcEnabled -XX:MaxTenuringThreshold=10" | |
23 | 22 | export LOG_FILENAME=${pkg.name}.out |
24 | 23 | export LOADER_PATH=${pkg.installFolder}/conf,${pkg.installFolder}/extensions |
25 | 24 | export SQL_DATA_FOLDER=${pkg.installFolder}/data/sql | ... | ... |
... | ... | @@ -134,12 +134,12 @@ public class AppActor extends ContextAwareActor { |
134 | 134 | |
135 | 135 | private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) { |
136 | 136 | if (TenantId.SYS_TENANT_ID.equals(msg.getTenantId())) { |
137 | - msg.getTbMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!")); | |
137 | + msg.getMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!")); | |
138 | 138 | } else { |
139 | 139 | if (!deletedTenants.contains(msg.getTenantId())) { |
140 | 140 | getOrCreateTenantActor(msg.getTenantId()).tell(msg); |
141 | 141 | } else { |
142 | - msg.getTbMsg().getCallback().onSuccess(); | |
142 | + msg.getMsg().getCallback().onSuccess(); | |
143 | 143 | } |
144 | 144 | } |
145 | 145 | } | ... | ... |
... | ... | @@ -19,7 +19,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; |
19 | 19 | import com.fasterxml.jackson.databind.ObjectMapper; |
20 | 20 | import io.netty.channel.EventLoopGroup; |
21 | 21 | import lombok.extern.slf4j.Slf4j; |
22 | -import org.springframework.data.redis.core.RedisTemplate; | |
23 | 22 | import org.springframework.util.StringUtils; |
24 | 23 | import org.thingsboard.common.util.ListeningExecutor; |
25 | 24 | import org.thingsboard.rule.engine.api.MailService; |
... | ... | @@ -34,7 +33,6 @@ import org.thingsboard.rule.engine.api.TbRelationTypes; |
34 | 33 | import org.thingsboard.rule.engine.api.sms.SmsSenderFactory; |
35 | 34 | import org.thingsboard.server.actors.ActorSystemContext; |
36 | 35 | import org.thingsboard.server.actors.TbActorRef; |
37 | -import org.thingsboard.server.common.data.ApiUsageRecordKey; | |
38 | 36 | import org.thingsboard.server.common.data.Customer; |
39 | 37 | import org.thingsboard.server.common.data.DataConstants; |
40 | 38 | import org.thingsboard.server.common.data.Device; |
... | ... | @@ -90,10 +88,12 @@ class DefaultTbContext implements TbContext { |
90 | 88 | public final static ObjectMapper mapper = new ObjectMapper(); |
91 | 89 | |
92 | 90 | private final ActorSystemContext mainCtx; |
91 | + private final String ruleChainName; | |
93 | 92 | private final RuleNodeCtx nodeCtx; |
94 | 93 | |
95 | - public DefaultTbContext(ActorSystemContext mainCtx, RuleNodeCtx nodeCtx) { | |
94 | + public DefaultTbContext(ActorSystemContext mainCtx, String ruleChainName, RuleNodeCtx nodeCtx) { | |
96 | 95 | this.mainCtx = mainCtx; |
96 | + this.ruleChainName = ruleChainName; | |
97 | 97 | this.nodeCtx = nodeCtx; |
98 | 98 | } |
99 | 99 | |
... | ... | @@ -117,13 +117,13 @@ class DefaultTbContext implements TbContext { |
117 | 117 | relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th)); |
118 | 118 | } |
119 | 119 | msg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId()); |
120 | - nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null)); | |
120 | + nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null)); | |
121 | 121 | } |
122 | 122 | |
123 | 123 | @Override |
124 | 124 | public void tellSelf(TbMsg msg, long delayMs) { |
125 | 125 | //TODO: add persistence layer |
126 | - scheduleMsgWithDelay(new RuleNodeToSelfMsg(msg), delayMs, nodeCtx.getSelfActor()); | |
126 | + scheduleMsgWithDelay(new RuleNodeToSelfMsg(this, msg), delayMs, nodeCtx.getSelfActor()); | |
127 | 127 | } |
128 | 128 | |
129 | 129 | @Override |
... | ... | @@ -254,7 +254,8 @@ class DefaultTbContext implements TbContext { |
254 | 254 | } else { |
255 | 255 | failureMessage = null; |
256 | 256 | } |
257 | - nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), Collections.singleton(TbRelationTypes.FAILURE), | |
257 | + nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getRuleChainId(), | |
258 | + nodeCtx.getSelf().getId(), Collections.singleton(TbRelationTypes.FAILURE), | |
258 | 259 | msg, failureMessage)); |
259 | 260 | } |
260 | 261 | |
... | ... | @@ -302,6 +303,16 @@ class DefaultTbContext implements TbContext { |
302 | 303 | } |
303 | 304 | |
304 | 305 | @Override |
306 | + public RuleNode getSelf() { | |
307 | + return nodeCtx.getSelf(); | |
308 | + } | |
309 | + | |
310 | + @Override | |
311 | + public String getRuleChainName() { | |
312 | + return ruleChainName; | |
313 | + } | |
314 | + | |
315 | + @Override | |
305 | 316 | public TenantId getTenantId() { |
306 | 317 | return nodeCtx.getTenantId(); |
307 | 318 | } |
... | ... | @@ -476,11 +487,6 @@ class DefaultTbContext implements TbContext { |
476 | 487 | } |
477 | 488 | |
478 | 489 | @Override |
479 | - public RedisTemplate<String, Object> getRedisTemplate() { | |
480 | - return mainCtx.getRedisTemplate(); | |
481 | - } | |
482 | - | |
483 | - @Override | |
484 | 490 | public PageData<RuleNodeState> findRuleNodeStates(PageLink pageLink) { |
485 | 491 | if (log.isDebugEnabled()) { |
486 | 492 | log.debug("[{}][{}] Fetch Rule Node States.", getTenantId(), getSelfId()); | ... | ... |
... | ... | @@ -23,7 +23,6 @@ import org.thingsboard.server.actors.TbActorRef; |
23 | 23 | import org.thingsboard.server.actors.TbEntityActorId; |
24 | 24 | import org.thingsboard.server.actors.service.DefaultActorService; |
25 | 25 | import org.thingsboard.server.actors.shared.ComponentMsgProcessor; |
26 | -import org.thingsboard.server.common.data.ApiUsageRecordKey; | |
27 | 26 | import org.thingsboard.server.common.data.EntityType; |
28 | 27 | import org.thingsboard.server.common.data.id.EntityId; |
29 | 28 | import org.thingsboard.server.common.data.id.RuleChainId; |
... | ... | @@ -197,11 +196,11 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh |
197 | 196 | } |
198 | 197 | |
199 | 198 | void onQueueToRuleEngineMsg(QueueToRuleEngineMsg envelope) { |
200 | - TbMsg msg = envelope.getTbMsg(); | |
199 | + TbMsg msg = envelope.getMsg(); | |
201 | 200 | log.trace("[{}][{}] Processing message [{}]: {}", entityId, firstId, msg.getId(), msg); |
202 | 201 | if (envelope.getRelationTypes() == null || envelope.getRelationTypes().isEmpty()) { |
203 | 202 | try { |
204 | - checkActive(envelope.getTbMsg()); | |
203 | + checkActive(envelope.getMsg()); | |
205 | 204 | RuleNodeId targetId = msg.getRuleNodeId(); |
206 | 205 | RuleNodeCtx targetCtx; |
207 | 206 | if (targetId == null) { |
... | ... | @@ -218,12 +217,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh |
218 | 217 | msg.getCallback().onSuccess(); |
219 | 218 | } |
220 | 219 | } catch (RuleNodeException rne) { |
221 | - envelope.getTbMsg().getCallback().onFailure(rne); | |
220 | + envelope.getMsg().getCallback().onFailure(rne); | |
222 | 221 | } catch (Exception e) { |
223 | - envelope.getTbMsg().getCallback().onFailure(new RuleEngineException(e.getMessage())); | |
222 | + envelope.getMsg().getCallback().onFailure(new RuleEngineException(e.getMessage())); | |
224 | 223 | } |
225 | 224 | } else { |
226 | - onTellNext(envelope.getTbMsg(), envelope.getTbMsg().getRuleNodeId(), envelope.getRelationTypes(), envelope.getFailureMessage()); | |
225 | + onTellNext(envelope.getMsg(), envelope.getMsg().getRuleNodeId(), envelope.getRelationTypes(), envelope.getFailureMessage()); | |
227 | 226 | } |
228 | 227 | } |
229 | 228 | |
... | ... | @@ -335,7 +334,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh |
335 | 334 | |
336 | 335 | private void pushMsgToNode(RuleNodeCtx nodeCtx, TbMsg msg, String fromRelationType) { |
337 | 336 | if (nodeCtx != null) { |
338 | - nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg, fromRelationType)); | |
337 | + nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, ruleChainName, nodeCtx), msg, fromRelationType)); | |
339 | 338 | } else { |
340 | 339 | log.error("[{}][{}] RuleNodeCtx is empty", entityId, ruleChainName); |
341 | 340 | msg.getCallback().onFailure(new RuleEngineException("Rule Node CTX is empty")); | ... | ... |
... | ... | @@ -15,24 +15,44 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.actors.ruleChain; |
17 | 17 | |
18 | -import lombok.Data; | |
18 | +import lombok.EqualsAndHashCode; | |
19 | +import lombok.Getter; | |
20 | +import lombok.ToString; | |
19 | 21 | import org.thingsboard.server.common.data.id.RuleChainId; |
20 | 22 | import org.thingsboard.server.common.msg.MsgType; |
21 | -import org.thingsboard.server.common.msg.TbActorMsg; | |
23 | +import org.thingsboard.server.common.msg.TbActorStopReason; | |
22 | 24 | import org.thingsboard.server.common.msg.TbMsg; |
25 | +import org.thingsboard.server.common.msg.TbRuleEngineActorMsg; | |
23 | 26 | import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg; |
27 | +import org.thingsboard.server.common.msg.queue.RuleEngineException; | |
24 | 28 | |
25 | 29 | /** |
26 | 30 | * Created by ashvayka on 19.03.18. |
27 | 31 | */ |
28 | -@Data | |
29 | -public final class RuleChainToRuleChainMsg implements TbActorMsg, RuleChainAwareMsg { | |
32 | +@EqualsAndHashCode(callSuper = true) | |
33 | +@ToString | |
34 | +public final class RuleChainToRuleChainMsg extends TbRuleEngineActorMsg implements RuleChainAwareMsg { | |
30 | 35 | |
36 | + @Getter | |
31 | 37 | private final RuleChainId target; |
38 | + @Getter | |
32 | 39 | private final RuleChainId source; |
33 | - private final TbMsg msg; | |
40 | + @Getter | |
34 | 41 | private final String fromRelationType; |
35 | 42 | |
43 | + public RuleChainToRuleChainMsg(RuleChainId target, RuleChainId source, TbMsg tbMsg, String fromRelationType) { | |
44 | + super(tbMsg); | |
45 | + this.target = target; | |
46 | + this.source = source; | |
47 | + this.fromRelationType = fromRelationType; | |
48 | + } | |
49 | + | |
50 | + @Override | |
51 | + public void onTbActorStopped(TbActorStopReason reason) { | |
52 | + String message = reason == TbActorStopReason.STOPPED ? String.format("Rule chain [%s] stopped", target.getId()) : String.format("Failed to initialize rule chain [%s]!", target.getId()); | |
53 | + msg.getCallback().onFailure(new RuleEngineException(message)); | |
54 | + } | |
55 | + | |
36 | 56 | @Override |
37 | 57 | public RuleChainId getRuleChainId() { |
38 | 58 | return target; | ... | ... |
... | ... | @@ -15,22 +15,28 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.actors.ruleChain; |
17 | 17 | |
18 | -import lombok.Data; | |
18 | +import lombok.EqualsAndHashCode; | |
19 | +import lombok.Getter; | |
20 | +import lombok.ToString; | |
19 | 21 | import org.thingsboard.rule.engine.api.TbContext; |
20 | 22 | import org.thingsboard.server.common.msg.MsgType; |
21 | -import org.thingsboard.server.common.msg.TbActorMsg; | |
22 | 23 | import org.thingsboard.server.common.msg.TbMsg; |
23 | 24 | |
24 | 25 | /** |
25 | 26 | * Created by ashvayka on 19.03.18. |
26 | 27 | */ |
27 | -@Data | |
28 | -final class RuleChainToRuleNodeMsg implements TbActorMsg { | |
28 | +@EqualsAndHashCode(callSuper = true) | |
29 | +@ToString | |
30 | +final class RuleChainToRuleNodeMsg extends TbToRuleNodeActorMsg { | |
29 | 31 | |
30 | - private final TbContext ctx; | |
31 | - private final TbMsg msg; | |
32 | + @Getter | |
32 | 33 | private final String fromRelationType; |
33 | 34 | |
35 | + public RuleChainToRuleNodeMsg(TbContext ctx, TbMsg tbMsg, String fromRelationType) { | |
36 | + super(ctx, tbMsg); | |
37 | + this.fromRelationType = fromRelationType; | |
38 | + } | |
39 | + | |
34 | 40 | @Override |
35 | 41 | public MsgType getMsgType() { |
36 | 42 | return MsgType.RULE_CHAIN_TO_RULE_MSG; | ... | ... |
... | ... | @@ -59,9 +59,6 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa |
59 | 59 | case RULE_CHAIN_TO_RULE_MSG: |
60 | 60 | onRuleChainToRuleNodeMsg((RuleChainToRuleNodeMsg) msg); |
61 | 61 | break; |
62 | - case RULE_TO_SELF_ERROR_MSG: | |
63 | - onRuleNodeToSelfErrorMsg((RuleNodeToSelfErrorMsg) msg); | |
64 | - break; | |
65 | 62 | case RULE_TO_SELF_MSG: |
66 | 63 | onRuleNodeToSelfMsg((RuleNodeToSelfMsg) msg); |
67 | 64 | break; |
... | ... | @@ -101,10 +98,6 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa |
101 | 98 | } |
102 | 99 | } |
103 | 100 | |
104 | - private void onRuleNodeToSelfErrorMsg(RuleNodeToSelfErrorMsg msg) { | |
105 | - logAndPersist("onRuleMsg", ActorSystemContext.toException(msg.getError())); | |
106 | - } | |
107 | - | |
108 | 101 | public static class ActorCreator extends ContextBasedCreator { |
109 | 102 | |
110 | 103 | private final TenantId tenantId; | ... | ... |
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
... | ... | @@ -54,7 +54,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod |
54 | 54 | this.ruleChainName = ruleChainName; |
55 | 55 | this.self = self; |
56 | 56 | this.ruleNode = systemContext.getRuleChainService().findRuleNodeById(tenantId, entityId); |
57 | - this.defaultCtx = new DefaultTbContext(systemContext, new RuleNodeCtx(tenantId, parent, self, ruleNode)); | |
57 | + this.defaultCtx = new DefaultTbContext(systemContext, ruleChainName, new RuleNodeCtx(tenantId, parent, self, ruleNode)); | |
58 | 58 | this.info = new RuleNodeInfo(ruleNodeId, ruleChainName, ruleNode != null ? ruleNode.getName() : "Unknown"); |
59 | 59 | } |
60 | 60 | |
... | ... | @@ -147,7 +147,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod |
147 | 147 | TbNode tbNode = null; |
148 | 148 | if (ruleNode != null) { |
149 | 149 | Class<?> componentClazz = Class.forName(ruleNode.getType()); |
150 | - tbNode = (TbNode) (componentClazz.newInstance()); | |
150 | + tbNode = (TbNode) (componentClazz.getDeclaredConstructor().newInstance()); | |
151 | 151 | tbNode.init(defaultCtx, new TbNodeConfiguration(ruleNode.getConfiguration())); |
152 | 152 | } |
153 | 153 | return tbNode; | ... | ... |
... | ... | @@ -15,11 +15,16 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.actors.ruleChain; |
17 | 17 | |
18 | -import lombok.Data; | |
18 | +import lombok.EqualsAndHashCode; | |
19 | +import lombok.Getter; | |
20 | +import lombok.ToString; | |
21 | +import org.thingsboard.server.common.data.id.RuleChainId; | |
19 | 22 | import org.thingsboard.server.common.data.id.RuleNodeId; |
20 | 23 | import org.thingsboard.server.common.msg.MsgType; |
21 | -import org.thingsboard.server.common.msg.TbActorMsg; | |
24 | +import org.thingsboard.server.common.msg.TbActorStopReason; | |
22 | 25 | import org.thingsboard.server.common.msg.TbMsg; |
26 | +import org.thingsboard.server.common.msg.TbRuleEngineActorMsg; | |
27 | +import org.thingsboard.server.common.msg.queue.RuleEngineException; | |
23 | 28 | |
24 | 29 | import java.io.Serializable; |
25 | 30 | import java.util.Set; |
... | ... | @@ -27,15 +32,34 @@ import java.util.Set; |
27 | 32 | /** |
28 | 33 | * Created by ashvayka on 19.03.18. |
29 | 34 | */ |
30 | -@Data | |
31 | -class RuleNodeToRuleChainTellNextMsg implements TbActorMsg, Serializable { | |
35 | +@EqualsAndHashCode(callSuper = true) | |
36 | +@ToString | |
37 | +class RuleNodeToRuleChainTellNextMsg extends TbRuleEngineActorMsg implements Serializable { | |
32 | 38 | |
33 | 39 | private static final long serialVersionUID = 4577026446412871820L; |
40 | + @Getter | |
41 | + private final RuleChainId ruleChainId; | |
42 | + @Getter | |
34 | 43 | private final RuleNodeId originator; |
44 | + @Getter | |
35 | 45 | private final Set<String> relationTypes; |
36 | - private final TbMsg msg; | |
46 | + @Getter | |
37 | 47 | private final String failureMessage; |
38 | 48 | |
49 | + public RuleNodeToRuleChainTellNextMsg(RuleChainId ruleChainId, RuleNodeId originator, Set<String> relationTypes, TbMsg tbMsg, String failureMessage) { | |
50 | + super(tbMsg); | |
51 | + this.ruleChainId = ruleChainId; | |
52 | + this.originator = originator; | |
53 | + this.relationTypes = relationTypes; | |
54 | + this.failureMessage = failureMessage; | |
55 | + } | |
56 | + | |
57 | + @Override | |
58 | + public void onTbActorStopped(TbActorStopReason reason) { | |
59 | + String message = reason == TbActorStopReason.STOPPED ? String.format("Rule chain [%s] stopped", ruleChainId.getId()) : String.format("Failed to initialize rule chain [%s]!", ruleChainId.getId()); | |
60 | + msg.getCallback().onFailure(new RuleEngineException(message)); | |
61 | + } | |
62 | + | |
39 | 63 | @Override |
40 | 64 | public MsgType getMsgType() { |
41 | 65 | return MsgType.RULE_TO_RULE_CHAIN_TELL_NEXT_MSG; | ... | ... |
... | ... | @@ -15,18 +15,25 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.actors.ruleChain; |
17 | 17 | |
18 | -import lombok.Data; | |
18 | +import lombok.EqualsAndHashCode; | |
19 | +import lombok.ToString; | |
20 | +import org.thingsboard.rule.engine.api.TbContext; | |
19 | 21 | import org.thingsboard.server.common.msg.MsgType; |
20 | -import org.thingsboard.server.common.msg.TbActorMsg; | |
22 | +import org.thingsboard.server.common.msg.TbActorStopReason; | |
21 | 23 | import org.thingsboard.server.common.msg.TbMsg; |
24 | +import org.thingsboard.server.common.msg.TbRuleEngineActorMsg; | |
25 | +import org.thingsboard.server.common.msg.queue.RuleNodeException; | |
22 | 26 | |
23 | 27 | /** |
24 | 28 | * Created by ashvayka on 19.03.18. |
25 | 29 | */ |
26 | -@Data | |
27 | -final class RuleNodeToSelfMsg implements TbActorMsg { | |
30 | +@EqualsAndHashCode(callSuper = true) | |
31 | +@ToString | |
32 | +final class RuleNodeToSelfMsg extends TbToRuleNodeActorMsg { | |
28 | 33 | |
29 | - private final TbMsg msg; | |
34 | + public RuleNodeToSelfMsg(TbContext ctx, TbMsg tbMsg) { | |
35 | + super(ctx, tbMsg); | |
36 | + } | |
30 | 37 | |
31 | 38 | @Override |
32 | 39 | public MsgType getMsgType() { | ... | ... |
application/src/main/java/org/thingsboard/server/actors/ruleChain/TbToRuleNodeActorMsg.java
0 → 100644
1 | +/** | |
2 | + * Copyright © 2016-2021 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.actors.ruleChain; | |
17 | + | |
18 | +import lombok.EqualsAndHashCode; | |
19 | +import lombok.Getter; | |
20 | +import org.thingsboard.rule.engine.api.TbContext; | |
21 | +import org.thingsboard.server.common.msg.TbActorStopReason; | |
22 | +import org.thingsboard.server.common.msg.TbMsg; | |
23 | +import org.thingsboard.server.common.msg.TbRuleEngineActorMsg; | |
24 | +import org.thingsboard.server.common.msg.queue.RuleNodeException; | |
25 | + | |
26 | +@EqualsAndHashCode(callSuper = true) | |
27 | +public abstract class TbToRuleNodeActorMsg extends TbRuleEngineActorMsg { | |
28 | + | |
29 | + @Getter | |
30 | + private final TbContext ctx; | |
31 | + | |
32 | + public TbToRuleNodeActorMsg(TbContext ctx, TbMsg tbMsg) { | |
33 | + super(tbMsg); | |
34 | + this.ctx = ctx; | |
35 | + } | |
36 | + | |
37 | + @Override | |
38 | + public void onTbActorStopped(TbActorStopReason reason) { | |
39 | + String message = reason == TbActorStopReason.STOPPED ? "Rule node stopped" : "Failed to initialize rule node!"; | |
40 | + msg.getCallback().onFailure(new RuleNodeException(message, ctx.getRuleChainName(), ctx.getSelf())); | |
41 | + } | |
42 | +} | ... | ... |
... | ... | @@ -28,10 +28,10 @@ import org.thingsboard.server.common.msg.TbActorMsg; |
28 | 28 | @ToString |
29 | 29 | public final class StatsPersistMsg implements TbActorMsg { |
30 | 30 | |
31 | - private long messagesProcessed; | |
32 | - private long errorsOccurred; | |
33 | - private TenantId tenantId; | |
34 | - private EntityId entityId; | |
31 | + private final long messagesProcessed; | |
32 | + private final long errorsOccurred; | |
33 | + private final TenantId tenantId; | |
34 | + private final EntityId entityId; | |
35 | 35 | |
36 | 36 | @Override |
37 | 37 | public MsgType getMsgType() { | ... | ... |
... | ... | @@ -18,7 +18,7 @@ package org.thingsboard.server.actors.stats; |
18 | 18 | import org.thingsboard.server.common.msg.MsgType; |
19 | 19 | import org.thingsboard.server.common.msg.TbActorMsg; |
20 | 20 | |
21 | -public final class StatsPersistTick implements TbActorMsg{ | |
21 | +public final class StatsPersistTick implements TbActorMsg { | |
22 | 22 | @Override |
23 | 23 | public MsgType getMsgType() { |
24 | 24 | return MsgType.STATS_PERSIST_TICK_MSG; | ... | ... |
... | ... | @@ -119,7 +119,7 @@ public class TenantActor extends RuleChainManagerActor { |
119 | 119 | log.info("[{}] Processing missing Tenant msg: {}", tenantId, msg); |
120 | 120 | if (msg.getMsgType().equals(MsgType.QUEUE_TO_RULE_ENGINE_MSG)) { |
121 | 121 | QueueToRuleEngineMsg queueMsg = (QueueToRuleEngineMsg) msg; |
122 | - queueMsg.getTbMsg().getCallback().onSuccess(); | |
122 | + queueMsg.getMsg().getCallback().onSuccess(); | |
123 | 123 | } else if (msg.getMsgType().equals(MsgType.TRANSPORT_TO_DEVICE_ACTOR_MSG)) { |
124 | 124 | TransportToDeviceActorMsgWrapper transportMsg = (TransportToDeviceActorMsgWrapper) msg; |
125 | 125 | transportMsg.getCallback().onSuccess(); |
... | ... | @@ -177,7 +177,7 @@ public class TenantActor extends RuleChainManagerActor { |
177 | 177 | log.warn("RECEIVED INVALID MESSAGE: {}", msg); |
178 | 178 | return; |
179 | 179 | } |
180 | - TbMsg tbMsg = msg.getTbMsg(); | |
180 | + TbMsg tbMsg = msg.getMsg(); | |
181 | 181 | if (apiUsageState.isReExecEnabled()) { |
182 | 182 | if (tbMsg.getRuleChainId() == null) { |
183 | 183 | if (getRootChainActor() != null) { | ... | ... |
... | ... | @@ -91,6 +91,7 @@ public class CustomOAuth2AuthorizationRequestResolver implements OAuth2Authoriza |
91 | 91 | return action; |
92 | 92 | } |
93 | 93 | |
94 | + @SuppressWarnings("deprecation") | |
94 | 95 | private OAuth2AuthorizationRequest resolve(HttpServletRequest request, String registrationId, String redirectUriAction) { |
95 | 96 | if (registrationId == null) { |
96 | 97 | return null; | ... | ... |
... | ... | @@ -127,7 +127,7 @@ public class ThingsboardSecurityConfiguration extends WebSecurityConfigurerAdapt |
127 | 127 | } |
128 | 128 | |
129 | 129 | protected JwtTokenAuthenticationProcessingFilter buildJwtTokenAuthenticationProcessingFilter() throws Exception { |
130 | - List<String> pathsToSkip = new ArrayList(Arrays.asList(NON_TOKEN_BASED_AUTH_ENTRY_POINTS)); | |
130 | + List<String> pathsToSkip = new ArrayList<>(Arrays.asList(NON_TOKEN_BASED_AUTH_ENTRY_POINTS)); | |
131 | 131 | pathsToSkip.addAll(Arrays.asList(WS_TOKEN_BASED_AUTH_ENTRY_POINT, TOKEN_REFRESH_ENTRY_POINT, FORM_BASED_LOGIN_ENTRY_POINT, |
132 | 132 | PUBLIC_LOGIN_ENTRY_POINT, DEVICE_API_ENTRY_POINT, WEBJARS_ENTRY_POINT)); |
133 | 133 | SkipPathRequestMatcher matcher = new SkipPathRequestMatcher(pathsToSkip, TOKEN_BASED_AUTH_ENTRY_POINT); | ... | ... |
... | ... | @@ -645,6 +645,7 @@ public abstract class BaseController { |
645 | 645 | return ruleNode; |
646 | 646 | } |
647 | 647 | |
648 | + @SuppressWarnings("unchecked") | |
648 | 649 | protected <I extends EntityId> I emptyId(EntityType entityType) { |
649 | 650 | return (I) EntityIdFactory.getByTypeAndUuid(entityType, ModelConstants.NULL_UUID); |
650 | 651 | } |
... | ... | @@ -759,6 +760,7 @@ public abstract class BaseController { |
759 | 760 | entityNode = json.createObjectNode(); |
760 | 761 | if (actionType == ActionType.ATTRIBUTES_UPDATED) { |
761 | 762 | String scope = extractParameter(String.class, 0, additionalInfo); |
763 | + @SuppressWarnings("unchecked") | |
762 | 764 | List<AttributeKvEntry> attributes = extractParameter(List.class, 1, additionalInfo); |
763 | 765 | metaData.putValue("scope", scope); |
764 | 766 | if (attributes != null) { |
... | ... | @@ -768,6 +770,7 @@ public abstract class BaseController { |
768 | 770 | } |
769 | 771 | } else if (actionType == ActionType.ATTRIBUTES_DELETED) { |
770 | 772 | String scope = extractParameter(String.class, 0, additionalInfo); |
773 | + @SuppressWarnings("unchecked") | |
771 | 774 | List<String> keys = extractParameter(List.class, 1, additionalInfo); |
772 | 775 | metaData.putValue("scope", scope); |
773 | 776 | ArrayNode attrsArrayNode = entityNode.putArray("attributes"); |
... | ... | @@ -775,9 +778,11 @@ public abstract class BaseController { |
775 | 778 | keys.forEach(attrsArrayNode::add); |
776 | 779 | } |
777 | 780 | } else if (actionType == ActionType.TIMESERIES_UPDATED) { |
781 | + @SuppressWarnings("unchecked") | |
778 | 782 | List<TsKvEntry> timeseries = extractParameter(List.class, 0, additionalInfo); |
779 | 783 | addTimeseries(entityNode, timeseries); |
780 | 784 | } else if (actionType == ActionType.TIMESERIES_DELETED) { |
785 | + @SuppressWarnings("unchecked") | |
781 | 786 | List<String> keys = extractParameter(List.class, 0, additionalInfo); |
782 | 787 | if (keys != null) { |
783 | 788 | ArrayNode timeseriesArrayNode = entityNode.putArray("timeseries"); | ... | ... |
... | ... | @@ -63,7 +63,7 @@ import java.util.List; |
63 | 63 | import java.util.concurrent.ExecutionException; |
64 | 64 | import java.util.stream.Collectors; |
65 | 65 | |
66 | -import static org.apache.commons.lang.StringUtils.isBlank; | |
66 | +import static org.apache.commons.lang3.StringUtils.isBlank; | |
67 | 67 | import static org.thingsboard.server.controller.CustomerController.CUSTOMER_ID; |
68 | 68 | |
69 | 69 | /** | ... | ... |
... | ... | @@ -24,6 +24,7 @@ import org.springframework.beans.factory.annotation.Value; |
24 | 24 | import org.springframework.beans.factory.config.BeanDefinition; |
25 | 25 | import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider; |
26 | 26 | import org.springframework.core.env.Environment; |
27 | +import org.springframework.core.env.Profiles; | |
27 | 28 | import org.springframework.core.type.filter.AnnotationTypeFilter; |
28 | 29 | import org.springframework.stereotype.Service; |
29 | 30 | import org.thingsboard.rule.engine.api.NodeConfiguration; |
... | ... | @@ -69,7 +70,7 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe |
69 | 70 | private ObjectMapper mapper = new ObjectMapper(); |
70 | 71 | |
71 | 72 | private boolean isInstall() { |
72 | - return environment.acceptsProfiles("install"); | |
73 | + return environment.acceptsProfiles(Profiles.of("install")); | |
73 | 74 | } |
74 | 75 | |
75 | 76 | @PostConstruct |
... | ... | @@ -185,7 +186,7 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe |
185 | 186 | nodeDefinition.setRelationTypes(getRelationTypesWithFailureRelation(nodeAnnotation)); |
186 | 187 | nodeDefinition.setCustomRelations(nodeAnnotation.customRelations()); |
187 | 188 | Class<? extends NodeConfiguration> configClazz = nodeAnnotation.configClazz(); |
188 | - NodeConfiguration config = configClazz.newInstance(); | |
189 | + NodeConfiguration config = configClazz.getDeclaredConstructor().newInstance(); | |
189 | 190 | NodeConfiguration defaultConfiguration = config.defaultConfiguration(); |
190 | 191 | nodeDefinition.setDefaultConfiguration(mapper.valueToTree(defaultConfiguration)); |
191 | 192 | nodeDefinition.setUiResources(nodeAnnotation.uiResources()); | ... | ... |
... | ... | @@ -20,7 +20,7 @@ import com.fasterxml.jackson.databind.JsonNode; |
20 | 20 | import com.fasterxml.jackson.databind.node.ObjectNode; |
21 | 21 | import com.google.common.util.concurrent.ListenableFuture; |
22 | 22 | import lombok.extern.slf4j.Slf4j; |
23 | -import org.apache.commons.lang.RandomStringUtils; | |
23 | +import org.apache.commons.lang3.RandomStringUtils; | |
24 | 24 | import org.springframework.beans.factory.annotation.Autowired; |
25 | 25 | import org.springframework.stereotype.Service; |
26 | 26 | import org.springframework.util.StringUtils; | ... | ... |
... | ... | @@ -146,17 +146,17 @@ public class CassandraDbHelper { |
146 | 146 | if (row.isNull(index)) { |
147 | 147 | return null; |
148 | 148 | } else if (type.getProtocolCode() == ProtocolConstants.DataType.DOUBLE) { |
149 | - str = new Double(row.getDouble(index)).toString(); | |
149 | + str = Double.valueOf(row.getDouble(index)).toString(); | |
150 | 150 | } else if (type.getProtocolCode() == ProtocolConstants.DataType.INT) { |
151 | - str = new Integer(row.getInt(index)).toString(); | |
151 | + str = Integer.valueOf(row.getInt(index)).toString(); | |
152 | 152 | } else if (type.getProtocolCode() == ProtocolConstants.DataType.BIGINT) { |
153 | - str = new Long(row.getLong(index)).toString(); | |
153 | + str = Long.valueOf(row.getLong(index)).toString(); | |
154 | 154 | } else if (type.getProtocolCode() == ProtocolConstants.DataType.UUID) { |
155 | 155 | str = row.getUuid(index).toString(); |
156 | 156 | } else if (type.getProtocolCode() == ProtocolConstants.DataType.TIMEUUID) { |
157 | 157 | str = row.getUuid(index).toString(); |
158 | 158 | } else if (type.getProtocolCode() == ProtocolConstants.DataType.FLOAT) { |
159 | - str = new Float(row.getFloat(index)).toString(); | |
159 | + str = Float.valueOf(row.getFloat(index)).toString(); | |
160 | 160 | } else if (type.getProtocolCode() == ProtocolConstants.DataType.TIMESTAMP) { |
161 | 161 | str = ""+row.getInstant(index).toEpochMilli(); |
162 | 162 | } else { | ... | ... |
... | ... | @@ -153,7 +153,8 @@ public class CassandraToSqlColumn { |
153 | 153 | sqlInsertStatement.setBoolean(this.sqlIndex, Boolean.parseBoolean(value)); |
154 | 154 | break; |
155 | 155 | case ENUM_TO_INT: |
156 | - Enum enumVal = Enum.valueOf(this.enumClass, value); | |
156 | + @SuppressWarnings("unchecked") | |
157 | + Enum<?> enumVal = Enum.valueOf(this.enumClass, value); | |
157 | 158 | int intValue = enumVal.ordinal(); |
158 | 159 | sqlInsertStatement.setInt(this.sqlIndex, intValue); |
159 | 160 | break; | ... | ... |
... | ... | @@ -57,7 +57,7 @@ import java.util.List; |
57 | 57 | import java.util.concurrent.ExecutionException; |
58 | 58 | import java.util.stream.Collectors; |
59 | 59 | |
60 | -import static org.apache.commons.lang.StringUtils.isBlank; | |
60 | +import static org.apache.commons.lang3.StringUtils.isBlank; | |
61 | 61 | import static org.thingsboard.server.service.install.DatabaseHelper.objectMapper; |
62 | 62 | |
63 | 63 | @Service | ... | ... |
... | ... | @@ -206,7 +206,7 @@ public class DefaultEntityQueryService implements EntityQueryService { |
206 | 206 | addItemsToArrayNode(json.putArray("entityTypes"), types); |
207 | 207 | addItemsToArrayNode(json.putArray("timeseries"), timeseriesKeys); |
208 | 208 | addItemsToArrayNode(json.putArray("attribute"), attributesKeys); |
209 | - response.setResult(new ResponseEntity(json, HttpStatus.OK)); | |
209 | + response.setResult(new ResponseEntity<>(json, HttpStatus.OK)); | |
210 | 210 | } |
211 | 211 | |
212 | 212 | private void replyWithEmptyResponse(DeferredResult<ResponseEntity> response) { | ... | ... |
... | ... | @@ -181,7 +181,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< |
181 | 181 | new TbMsgPackCallback(id, tenantId, ctx, stats.getTimer(tenantId, SUCCESSFUL_STATUS), stats.getTimer(tenantId, FAILED_STATUS)) : |
182 | 182 | new TbMsgPackCallback(id, tenantId, ctx); |
183 | 183 | try { |
184 | - if (toRuleEngineMsg.getTbMsg() != null && !toRuleEngineMsg.getTbMsg().isEmpty()) { | |
184 | + if (!toRuleEngineMsg.getTbMsg().isEmpty()) { | |
185 | 185 | forwardToRuleEngineActor(configuration.getName(), tenantId, toRuleEngineMsg, callback); |
186 | 186 | } else { |
187 | 187 | callback.onSuccess(); |
... | ... | @@ -209,6 +209,9 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< |
209 | 209 | if (statsEnabled) { |
210 | 210 | stats.log(result, decision.isCommit()); |
211 | 211 | } |
212 | + | |
213 | + ctx.cleanup(); | |
214 | + | |
212 | 215 | if (decision.isCommit()) { |
213 | 216 | submitStrategy.stop(); |
214 | 217 | break; | ... | ... |
... | ... | @@ -147,4 +147,10 @@ public class TbMsgPackProcessingContext { |
147 | 147 | .forEach(info -> log.info("[{}][{}] execution count: {}. {}", queueName, info.getRuleNodeId(), info.getExecutionCount(), info.getLabel())); |
148 | 148 | } |
149 | 149 | } |
150 | + | |
151 | + public void cleanup() { | |
152 | + pendingMap.clear(); | |
153 | + successMap.clear(); | |
154 | + failedMap.clear(); | |
155 | + } | |
150 | 156 | } | ... | ... |
... | ... | @@ -31,6 +31,8 @@ import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; |
31 | 31 | @RequiredArgsConstructor |
32 | 32 | public class ToDeviceRpcRequestActorMsg implements ToDeviceActorNotificationMsg { |
33 | 33 | |
34 | + private static final long serialVersionUID = -8592877558138716589L; | |
35 | + | |
34 | 36 | @Getter |
35 | 37 | private final String serviceId; |
36 | 38 | @Getter | ... | ... |
... | ... | @@ -21,7 +21,6 @@ import com.google.common.util.concurrent.ListenableFuture; |
21 | 21 | import com.google.common.util.concurrent.MoreExecutors; |
22 | 22 | import delight.nashornsandbox.NashornSandbox; |
23 | 23 | import delight.nashornsandbox.NashornSandboxes; |
24 | -import jdk.nashorn.api.scripting.NashornScriptEngineFactory; | |
25 | 24 | import lombok.Getter; |
26 | 25 | import lombok.extern.slf4j.Slf4j; |
27 | 26 | import org.springframework.beans.factory.annotation.Value; |
... | ... | @@ -33,6 +32,7 @@ import javax.annotation.PostConstruct; |
33 | 32 | import javax.annotation.PreDestroy; |
34 | 33 | import javax.script.Invocable; |
35 | 34 | import javax.script.ScriptEngine; |
35 | +import javax.script.ScriptEngineManager; | |
36 | 36 | import javax.script.ScriptException; |
37 | 37 | import java.util.UUID; |
38 | 38 | import java.util.concurrent.ExecutionException; |
... | ... | @@ -97,8 +97,8 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer |
97 | 97 | sandbox.allowLoadFunctions(true); |
98 | 98 | sandbox.setMaxPreparedStatements(30); |
99 | 99 | } else { |
100 | - NashornScriptEngineFactory factory = new NashornScriptEngineFactory(); | |
101 | - engine = factory.getScriptEngine(new String[]{"--no-java"}); | |
100 | + ScriptEngineManager factory = new ScriptEngineManager(); | |
101 | + engine = factory.getEngineByName("nashorn"); | |
102 | 102 | } |
103 | 103 | } |
104 | 104 | ... | ... |
... | ... | @@ -29,7 +29,7 @@ public class SkipPathRequestMatcher implements RequestMatcher { |
29 | 29 | private RequestMatcher processingMatcher; |
30 | 30 | |
31 | 31 | public SkipPathRequestMatcher(List<String> pathsToSkip, String processingPath) { |
32 | - Assert.notNull(pathsToSkip); | |
32 | + Assert.notNull(pathsToSkip, "List of paths to skip is required."); | |
33 | 33 | List<RequestMatcher> m = pathsToSkip.stream().map(path -> new AntPathRequestMatcher(path)).collect(Collectors.toList()); |
34 | 34 | matchers = new OrRequestMatcher(m); |
35 | 35 | processingMatcher = new AntPathRequestMatcher(processingPath); | ... | ... |
... | ... | @@ -100,6 +100,7 @@ public class JwtTokenFactory { |
100 | 100 | Jws<Claims> jwsClaims = rawAccessToken.parseClaims(settings.getTokenSigningKey()); |
101 | 101 | Claims claims = jwsClaims.getBody(); |
102 | 102 | String subject = claims.getSubject(); |
103 | + @SuppressWarnings("unchecked") | |
103 | 104 | List<String> scopes = claims.get(SCOPES, List.class); |
104 | 105 | if (scopes == null || scopes.isEmpty()) { |
105 | 106 | throw new IllegalArgumentException("JWT Token doesn't have any scopes"); |
... | ... | @@ -155,6 +156,7 @@ public class JwtTokenFactory { |
155 | 156 | Jws<Claims> jwsClaims = rawAccessToken.parseClaims(settings.getTokenSigningKey()); |
156 | 157 | Claims claims = jwsClaims.getBody(); |
157 | 158 | String subject = claims.getSubject(); |
159 | + @SuppressWarnings("unchecked") | |
158 | 160 | List<String> scopes = claims.get(SCOPES, List.class); |
159 | 161 | if (scopes == null || scopes.isEmpty()) { |
160 | 162 | throw new IllegalArgumentException("Refresh Token doesn't have any scopes"); | ... | ... |
... | ... | @@ -47,6 +47,7 @@ public class CustomerUserPermissions extends AbstractPermissions { |
47 | 47 | Operation.READ_ATTRIBUTES, Operation.READ_TELEMETRY, Operation.RPC_CALL, Operation.CLAIM_DEVICES) { |
48 | 48 | |
49 | 49 | @Override |
50 | + @SuppressWarnings("unchecked") | |
50 | 51 | public boolean hasPermission(SecurityUser user, Operation operation, EntityId entityId, HasTenantId entity) { |
51 | 52 | |
52 | 53 | if (!super.hasPermission(user, operation, entityId, entity)) { |
... | ... | @@ -69,6 +70,7 @@ public class CustomerUserPermissions extends AbstractPermissions { |
69 | 70 | new PermissionChecker.GenericPermissionChecker(Operation.READ, Operation.READ_ATTRIBUTES, Operation.READ_TELEMETRY) { |
70 | 71 | |
71 | 72 | @Override |
73 | + @SuppressWarnings("unchecked") | |
72 | 74 | public boolean hasPermission(SecurityUser user, Operation operation, EntityId entityId, HasTenantId entity) { |
73 | 75 | if (!super.hasPermission(user, operation, entityId, entity)) { |
74 | 76 | return false; |
... | ... | @@ -119,6 +121,7 @@ public class CustomerUserPermissions extends AbstractPermissions { |
119 | 121 | private static final PermissionChecker widgetsPermissionChecker = new PermissionChecker.GenericPermissionChecker(Operation.READ) { |
120 | 122 | |
121 | 123 | @Override |
124 | + @SuppressWarnings("unchecked") | |
122 | 125 | public boolean hasPermission(SecurityUser user, Operation operation, EntityId entityId, HasTenantId entity) { |
123 | 126 | if (!super.hasPermission(user, operation, entityId, entity)) { |
124 | 127 | return false; | ... | ... |
... | ... | @@ -56,6 +56,7 @@ public class DefaultAccessControlService implements AccessControlService { |
56 | 56 | } |
57 | 57 | |
58 | 58 | @Override |
59 | + @SuppressWarnings("unchecked") | |
59 | 60 | public <I extends EntityId, T extends HasTenantId> void checkPermission(SecurityUser user, Resource resource, |
60 | 61 | Operation operation, I entityId, T entity) throws ThingsboardException { |
61 | 62 | PermissionChecker permissionChecker = getPermissionChecker(user.getAuthority(), resource); | ... | ... |
... | ... | @@ -59,6 +59,7 @@ public class TenantAdminPermissions extends AbstractPermissions { |
59 | 59 | new PermissionChecker.GenericPermissionChecker(Operation.READ, Operation.READ_ATTRIBUTES, Operation.READ_TELEMETRY) { |
60 | 60 | |
61 | 61 | @Override |
62 | + @SuppressWarnings("unchecked") | |
62 | 63 | public boolean hasPermission(SecurityUser user, Operation operation, EntityId entityId, HasTenantId entity) { |
63 | 64 | if (!super.hasPermission(user, operation, entityId, entity)) { |
64 | 65 | return false; | ... | ... |
... | ... | @@ -15,8 +15,8 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.service.security.system; |
17 | 17 | |
18 | +import com.fasterxml.jackson.core.type.TypeReference; | |
18 | 19 | import com.fasterxml.jackson.databind.JsonNode; |
19 | -import com.fasterxml.jackson.databind.ObjectMapper; | |
20 | 20 | import com.fasterxml.jackson.databind.node.ObjectNode; |
21 | 21 | import lombok.extern.slf4j.Slf4j; |
22 | 22 | import org.apache.commons.lang3.StringUtils; |
... | ... | @@ -49,6 +49,7 @@ import org.thingsboard.server.dao.exception.DataValidationException; |
49 | 49 | import org.thingsboard.server.dao.settings.AdminSettingsService; |
50 | 50 | import org.thingsboard.server.dao.user.UserService; |
51 | 51 | import org.thingsboard.server.dao.user.UserServiceImpl; |
52 | +import org.thingsboard.server.dao.util.mapping.JacksonUtil; | |
52 | 53 | import org.thingsboard.server.service.security.exception.UserPasswordExpiredException; |
53 | 54 | import org.thingsboard.server.utils.MiscUtils; |
54 | 55 | |
... | ... | @@ -65,8 +66,6 @@ import static org.thingsboard.server.common.data.CacheConstants.SECURITY_SETTING |
65 | 66 | @Slf4j |
66 | 67 | public class DefaultSystemSecurityService implements SystemSecurityService { |
67 | 68 | |
68 | - private static final ObjectMapper objectMapper = new ObjectMapper(); | |
69 | - | |
70 | 69 | @Autowired |
71 | 70 | private AdminSettingsService adminSettingsService; |
72 | 71 | |
... | ... | @@ -89,7 +88,7 @@ public class DefaultSystemSecurityService implements SystemSecurityService { |
89 | 88 | AdminSettings adminSettings = adminSettingsService.findAdminSettingsByKey(tenantId, "securitySettings"); |
90 | 89 | if (adminSettings != null) { |
91 | 90 | try { |
92 | - securitySettings = objectMapper.treeToValue(adminSettings.getJsonValue(), SecuritySettings.class); | |
91 | + securitySettings = JacksonUtil.convertValue(adminSettings.getJsonValue(), SecuritySettings.class); | |
93 | 92 | } catch (Exception e) { |
94 | 93 | throw new RuntimeException("Failed to load security settings!", e); |
95 | 94 | } |
... | ... | @@ -109,10 +108,10 @@ public class DefaultSystemSecurityService implements SystemSecurityService { |
109 | 108 | adminSettings = new AdminSettings(); |
110 | 109 | adminSettings.setKey("securitySettings"); |
111 | 110 | } |
112 | - adminSettings.setJsonValue(objectMapper.valueToTree(securitySettings)); | |
111 | + adminSettings.setJsonValue(JacksonUtil.valueToTree(securitySettings)); | |
113 | 112 | AdminSettings savedAdminSettings = adminSettingsService.saveAdminSettings(tenantId, adminSettings); |
114 | 113 | try { |
115 | - return objectMapper.treeToValue(savedAdminSettings.getJsonValue(), SecuritySettings.class); | |
114 | + return JacksonUtil.convertValue(savedAdminSettings.getJsonValue(), SecuritySettings.class); | |
116 | 115 | } catch (Exception e) { |
117 | 116 | throw new RuntimeException("Failed to load security settings!", e); |
118 | 117 | } |
... | ... | @@ -189,7 +188,7 @@ public class DefaultSystemSecurityService implements SystemSecurityService { |
189 | 188 | JsonNode additionalInfo = user.getAdditionalInfo(); |
190 | 189 | if (additionalInfo instanceof ObjectNode && additionalInfo.has(UserServiceImpl.USER_PASSWORD_HISTORY)) { |
191 | 190 | JsonNode userPasswordHistoryJson = additionalInfo.get(UserServiceImpl.USER_PASSWORD_HISTORY); |
192 | - Map<String, String> userPasswordHistoryMap = objectMapper.convertValue(userPasswordHistoryJson, Map.class); | |
191 | + Map<String, String> userPasswordHistoryMap = JacksonUtil.convertValue(userPasswordHistoryJson, new TypeReference<>() {}); | |
193 | 192 | for (Map.Entry<String, String> entry : userPasswordHistoryMap.entrySet()) { |
194 | 193 | if (encoder.matches(password, entry.getValue()) && Long.parseLong(entry.getKey()) > passwordReuseFrequencyTs) { |
195 | 194 | throw new DataValidationException("Password was already used for the last " + passwordPolicy.getPasswordReuseFrequencyDays() + " days"); | ... | ... |
... | ... | @@ -318,6 +318,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc |
318 | 318 | return ctx; |
319 | 319 | } |
320 | 320 | |
321 | + @SuppressWarnings("unchecked") | |
321 | 322 | private <T extends TbAbstractDataSubCtx> T getSubCtx(String sessionId, int cmdId) { |
322 | 323 | Map<Integer, TbAbstractDataSubCtx> sessionSubs = subscriptionsBySessionId.get(sessionId); |
323 | 324 | if (sessionSubs != null) { | ... | ... |
... | ... | @@ -123,6 +123,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer |
123 | 123 | } |
124 | 124 | |
125 | 125 | @Override |
126 | + @SuppressWarnings("unchecked") | |
126 | 127 | public void onSubscriptionUpdate(String sessionId, TelemetrySubscriptionUpdate update, TbCallback callback) { |
127 | 128 | TbSubscription subscription = subscriptionsBySessionId |
128 | 129 | .getOrDefault(sessionId, Collections.emptyMap()).get(update.getSubscriptionId()); |
... | ... | @@ -143,6 +144,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer |
143 | 144 | } |
144 | 145 | |
145 | 146 | @Override |
147 | + @SuppressWarnings("unchecked") | |
146 | 148 | public void onSubscriptionUpdate(String sessionId, AlarmSubscriptionUpdate update, TbCallback callback) { |
147 | 149 | TbSubscription subscription = subscriptionsBySessionId |
148 | 150 | .getOrDefault(sessionId, Collections.emptyMap()).get(update.getSubscriptionId()); | ... | ... |
... | ... | @@ -264,6 +264,7 @@ public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends |
264 | 264 | }, MoreExecutors.directExecutor()); |
265 | 265 | } |
266 | 266 | |
267 | + @SuppressWarnings("unchecked") | |
267 | 268 | private void updateDynamicValuesByKey(DynamicValueKeySub sub, TsValue tsValue) { |
268 | 269 | DynamicValueKey dvk = sub.getKey(); |
269 | 270 | switch (dvk.getPredicateType()) { |
... | ... | @@ -285,6 +286,7 @@ public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends |
285 | 286 | } |
286 | 287 | } |
287 | 288 | |
289 | + @SuppressWarnings("unchecked") | |
288 | 290 | private void registerDynamicValues(KeyFilterPredicate predicate) { |
289 | 291 | switch (predicate.getType()) { |
290 | 292 | case STRING: | ... | ... |
... | ... | @@ -34,6 +34,8 @@ import java.util.UUID; |
34 | 34 | @Data |
35 | 35 | public class TransportToDeviceActorMsgWrapper implements TbActorMsg, DeviceAwareMsg, TenantAwareMsg, Serializable { |
36 | 36 | |
37 | + private static final long serialVersionUID = 7191333353202935941L; | |
38 | + | |
37 | 39 | private final TenantId tenantId; |
38 | 40 | private final DeviceId deviceId; |
39 | 41 | private final TransportToDeviceActorMsg msg; | ... | ... |
... | ... | @@ -375,6 +375,10 @@ public abstract class AbstractWebTest { |
375 | 375 | return readResponse(doGetAsync(urlTemplate, urlVariables).andExpect(status().isOk()), responseClass); |
376 | 376 | } |
377 | 377 | |
378 | + protected <T> T doGetAsyncTyped(String urlTemplate, TypeReference<T> responseType, Object... urlVariables) throws Exception { | |
379 | + return readResponse(doGetAsync(urlTemplate, urlVariables).andExpect(status().isOk()), responseType); | |
380 | + } | |
381 | + | |
378 | 382 | protected ResultActions doGetAsync(String urlTemplate, Object... urlVariables) throws Exception { |
379 | 383 | MockHttpServletRequestBuilder getRequest; |
380 | 384 | getRequest = get(urlTemplate, urlVariables); | ... | ... |
... | ... | @@ -347,8 +347,8 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes |
347 | 347 | |
348 | 348 | Thread.sleep(1000); |
349 | 349 | |
350 | - List<Map<String, Object>> values = doGetAsync("/api/plugins/telemetry/ENTITY_VIEW/" + savedView.getId().getId().toString() + | |
351 | - "/values/attributes?keys=" + String.join(",", actualAttributesSet), List.class); | |
350 | + List<Map<String, Object>> values = doGetAsyncTyped("/api/plugins/telemetry/ENTITY_VIEW/" + savedView.getId().getId().toString() + | |
351 | + "/values/attributes?keys=" + String.join(",", actualAttributesSet), new TypeReference<>() {}); | |
352 | 352 | |
353 | 353 | assertEquals("value1", getValue(values, "caKey1")); |
354 | 354 | assertEquals(true, getValue(values, "caKey2")); |
... | ... | @@ -364,8 +364,8 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes |
364 | 364 | Set<String> expectedActualAttributesSet = new HashSet<>(Arrays.asList("caKey1", "caKey2", "caKey3", "caKey4")); |
365 | 365 | assertTrue(actualAttributesSet.containsAll(expectedActualAttributesSet)); |
366 | 366 | |
367 | - List<Map<String, Object>> valueTelemetryOfDevices = doGetAsync("/api/plugins/telemetry/DEVICE/" + testDevice.getId().getId().toString() + | |
368 | - "/values/attributes?keys=" + String.join(",", actualAttributesSet), List.class); | |
367 | + List<Map<String, Object>> valueTelemetryOfDevices = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + testDevice.getId().getId().toString() + | |
368 | + "/values/attributes?keys=" + String.join(",", actualAttributesSet), new TypeReference<>() {}); | |
369 | 369 | |
370 | 370 | EntityView view = new EntityView(); |
371 | 371 | view.setEntityId(testDevice.getId()); |
... | ... | @@ -379,8 +379,8 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes |
379 | 379 | |
380 | 380 | Thread.sleep(1000); |
381 | 381 | |
382 | - List<Map<String, Object>> values = doGetAsync("/api/plugins/telemetry/ENTITY_VIEW/" + savedView.getId().getId().toString() + | |
383 | - "/values/attributes?keys=" + String.join(",", actualAttributesSet), List.class); | |
382 | + List<Map<String, Object>> values = doGetAsyncTyped("/api/plugins/telemetry/ENTITY_VIEW/" + savedView.getId().getId().toString() + | |
383 | + "/values/attributes?keys=" + String.join(",", actualAttributesSet), new TypeReference<>() {}); | |
384 | 384 | assertEquals(0, values.size()); |
385 | 385 | } |
386 | 386 | |
... | ... | @@ -449,12 +449,12 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes |
449 | 449 | } |
450 | 450 | |
451 | 451 | private Set<String> getTelemetryKeys(String type, String id) throws Exception { |
452 | - return new HashSet<>(doGetAsync("/api/plugins/telemetry/" + type + "/" + id + "/keys/timeseries", List.class)); | |
452 | + return new HashSet<>(doGetAsyncTyped("/api/plugins/telemetry/" + type + "/" + id + "/keys/timeseries", new TypeReference<>() {})); | |
453 | 453 | } |
454 | 454 | |
455 | 455 | private Map<String, List<Map<String, String>>> getTelemetryValues(String type, String id, Set<String> keys, Long startTs, Long endTs) throws Exception { |
456 | - return doGetAsync("/api/plugins/telemetry/" + type + "/" + id + | |
457 | - "/values/timeseries?keys=" + String.join(",", keys) + "&startTs=" + startTs + "&endTs=" + endTs, Map.class); | |
456 | + return doGetAsyncTyped("/api/plugins/telemetry/" + type + "/" + id + | |
457 | + "/values/timeseries?keys=" + String.join(",", keys) + "&startTs=" + startTs + "&endTs=" + endTs, new TypeReference<>() {}); | |
458 | 458 | } |
459 | 459 | |
460 | 460 | private Set<String> getAttributesByKeys(String stringKV) throws Exception { |
... | ... | @@ -479,7 +479,7 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes |
479 | 479 | client.publish("v1/devices/me/attributes", message); |
480 | 480 | Thread.sleep(1000); |
481 | 481 | client.disconnect(); |
482 | - return new HashSet<>(doGetAsync("/api/plugins/telemetry/DEVICE/" + viewDeviceId + "/keys/attributes", List.class)); | |
482 | + return new HashSet<>(doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + viewDeviceId + "/keys/attributes", new TypeReference<>() {})); | |
483 | 483 | } |
484 | 484 | |
485 | 485 | private Object getValue(List<Map<String, Object>> values, String stringValue) { | ... | ... |
... | ... | @@ -16,6 +16,7 @@ |
16 | 16 | package org.thingsboard.server.mqtt.telemetry.attributes; |
17 | 17 | |
18 | 18 | import com.fasterxml.jackson.core.JsonProcessingException; |
19 | +import com.fasterxml.jackson.core.type.TypeReference; | |
19 | 20 | import lombok.extern.slf4j.Slf4j; |
20 | 21 | import org.eclipse.paho.client.mqttv3.MqttAsyncClient; |
21 | 22 | import org.junit.After; |
... | ... | @@ -80,7 +81,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt |
80 | 81 | |
81 | 82 | List<String> actualKeys = null; |
82 | 83 | while (start <= end) { |
83 | - actualKeys = doGetAsync("/api/plugins/telemetry/DEVICE/" + deviceId + "/keys/attributes/CLIENT_SCOPE", List.class); | |
84 | + actualKeys = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + deviceId + "/keys/attributes/CLIENT_SCOPE", new TypeReference<>() {}); | |
84 | 85 | if (actualKeys.size() == expectedKeys.size()) { |
85 | 86 | break; |
86 | 87 | } |
... | ... | @@ -96,7 +97,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt |
96 | 97 | assertEquals(expectedKeySet, actualKeySet); |
97 | 98 | |
98 | 99 | String getAttributesValuesUrl = getAttributesValuesUrl(deviceId, actualKeySet); |
99 | - List<Map<String, Object>> values = doGetAsync(getAttributesValuesUrl, List.class); | |
100 | + List<Map<String, Object>> values = doGetAsyncTyped(getAttributesValuesUrl, new TypeReference<>() {}); | |
100 | 101 | assertAttributesValues(values, expectedKeySet); |
101 | 102 | String deleteAttributesUrl = "/api/plugins/telemetry/DEVICE/" + deviceId + "/CLIENT_SCOPE?keys=" + String.join(",", actualKeySet); |
102 | 103 | doDelete(deleteAttributesUrl); |
... | ... | @@ -121,10 +122,10 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt |
121 | 122 | |
122 | 123 | Thread.sleep(2000); |
123 | 124 | |
124 | - List<String> firstDeviceActualKeys = doGetAsync("/api/plugins/telemetry/DEVICE/" + firstDevice.getId() + "/keys/attributes/CLIENT_SCOPE", List.class); | |
125 | + List<String> firstDeviceActualKeys = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + firstDevice.getId() + "/keys/attributes/CLIENT_SCOPE", new TypeReference<>() {}); | |
125 | 126 | Set<String> firstDeviceActualKeySet = new HashSet<>(firstDeviceActualKeys); |
126 | 127 | |
127 | - List<String> secondDeviceActualKeys = doGetAsync("/api/plugins/telemetry/DEVICE/" + secondDevice.getId() + "/keys/attributes/CLIENT_SCOPE", List.class); | |
128 | + List<String> secondDeviceActualKeys = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + secondDevice.getId() + "/keys/attributes/CLIENT_SCOPE", new TypeReference<>() {}); | |
128 | 129 | Set<String> secondDeviceActualKeySet = new HashSet<>(secondDeviceActualKeys); |
129 | 130 | |
130 | 131 | Set<String> expectedKeySet = new HashSet<>(expectedKeys); |
... | ... | @@ -135,14 +136,15 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt |
135 | 136 | String getAttributesValuesUrlFirstDevice = getAttributesValuesUrl(firstDevice.getId(), firstDeviceActualKeySet); |
136 | 137 | String getAttributesValuesUrlSecondDevice = getAttributesValuesUrl(firstDevice.getId(), secondDeviceActualKeySet); |
137 | 138 | |
138 | - List<Map<String, Object>> firstDeviceValues = doGetAsync(getAttributesValuesUrlFirstDevice, List.class); | |
139 | - List<Map<String, Object>> secondDeviceValues = doGetAsync(getAttributesValuesUrlSecondDevice, List.class); | |
139 | + List<Map<String, Object>> firstDeviceValues = doGetAsyncTyped(getAttributesValuesUrlFirstDevice, new TypeReference<>() {}); | |
140 | + List<Map<String, Object>> secondDeviceValues = doGetAsyncTyped(getAttributesValuesUrlSecondDevice, new TypeReference<>() {}); | |
140 | 141 | |
141 | 142 | assertAttributesValues(firstDeviceValues, expectedKeySet); |
142 | 143 | assertAttributesValues(secondDeviceValues, expectedKeySet); |
143 | 144 | |
144 | 145 | } |
145 | 146 | |
147 | + @SuppressWarnings("unchecked") | |
146 | 148 | protected void assertAttributesValues(List<Map<String, Object>> deviceValues, Set<String> expectedKeySet) throws JsonProcessingException { |
147 | 149 | for (Map<String, Object> map : deviceValues) { |
148 | 150 | String key = (String) map.get("key"); | ... | ... |
... | ... | @@ -15,6 +15,7 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.mqtt.telemetry.timeseries; |
17 | 17 | |
18 | +import com.fasterxml.jackson.core.type.TypeReference; | |
18 | 19 | import io.netty.handler.codec.mqtt.MqttQoS; |
19 | 20 | import lombok.extern.slf4j.Slf4j; |
20 | 21 | import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; |
... | ... | @@ -25,6 +26,7 @@ import org.eclipse.paho.client.mqttv3.MqttMessage; |
25 | 26 | import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; |
26 | 27 | import org.junit.After; |
27 | 28 | import org.junit.Before; |
29 | +import org.junit.Ignore; | |
28 | 30 | import org.junit.Test; |
29 | 31 | import org.thingsboard.server.common.data.Device; |
30 | 32 | import org.thingsboard.server.common.data.device.profile.MqttTopics; |
... | ... | @@ -107,7 +109,7 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt |
107 | 109 | |
108 | 110 | List<String> actualKeys = null; |
109 | 111 | while (start <= end) { |
110 | - actualKeys = doGetAsync("/api/plugins/telemetry/DEVICE/" + deviceId + "/keys/timeseries", List.class); | |
112 | + actualKeys = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + deviceId + "/keys/timeseries", new TypeReference<>() {}); | |
111 | 113 | if (actualKeys.size() == expectedKeys.size()) { |
112 | 114 | break; |
113 | 115 | } |
... | ... | @@ -129,13 +131,13 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt |
129 | 131 | } |
130 | 132 | start = System.currentTimeMillis(); |
131 | 133 | end = System.currentTimeMillis() + 5000; |
132 | - Map<String, List<Map<String, String>>> values = null; | |
134 | + Map<String, List<Map<String, Object>>> values = null; | |
133 | 135 | while (start <= end) { |
134 | - values = doGetAsync(getTelemetryValuesUrl, Map.class); | |
136 | + values = doGetAsyncTyped(getTelemetryValuesUrl, new TypeReference<>() {}); | |
135 | 137 | boolean valid = values.size() == expectedKeys.size(); |
136 | 138 | if (valid) { |
137 | 139 | for (String key : expectedKeys) { |
138 | - List<Map<String, String>> tsValues = values.get(key); | |
140 | + List<Map<String, Object>> tsValues = values.get(key); | |
139 | 141 | if (tsValues != null && tsValues.size() > 0) { |
140 | 142 | Object ts = tsValues.get(0).get("ts"); |
141 | 143 | if (ts == null) { |
... | ... | @@ -181,10 +183,10 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt |
181 | 183 | |
182 | 184 | Thread.sleep(2000); |
183 | 185 | |
184 | - List<String> firstDeviceActualKeys = doGetAsync("/api/plugins/telemetry/DEVICE/" + firstDevice.getId() + "/keys/timeseries", List.class); | |
186 | + List<String> firstDeviceActualKeys = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + firstDevice.getId() + "/keys/timeseries", new TypeReference<>() {}); | |
185 | 187 | Set<String> firstDeviceActualKeySet = new HashSet<>(firstDeviceActualKeys); |
186 | 188 | |
187 | - List<String> secondDeviceActualKeys = doGetAsync("/api/plugins/telemetry/DEVICE/" + secondDevice.getId() + "/keys/timeseries", List.class); | |
189 | + List<String> secondDeviceActualKeys = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + secondDevice.getId() + "/keys/timeseries", new TypeReference<>() {}); | |
188 | 190 | Set<String> secondDeviceActualKeySet = new HashSet<>(secondDeviceActualKeys); |
189 | 191 | |
190 | 192 | Set<String> expectedKeySet = new HashSet<>(expectedKeys); |
... | ... | @@ -195,8 +197,8 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt |
195 | 197 | String getTelemetryValuesUrlFirstDevice = getTelemetryValuesUrl(firstDevice.getId(), firstDeviceActualKeySet); |
196 | 198 | String getTelemetryValuesUrlSecondDevice = getTelemetryValuesUrl(firstDevice.getId(), secondDeviceActualKeySet); |
197 | 199 | |
198 | - Map<String, List<Map<String, String>>> firstDeviceValues = doGetAsync(getTelemetryValuesUrlFirstDevice, Map.class); | |
199 | - Map<String, List<Map<String, String>>> secondDeviceValues = doGetAsync(getTelemetryValuesUrlSecondDevice, Map.class); | |
200 | + Map<String, List<Map<String, Object>>> firstDeviceValues = doGetAsyncTyped(getTelemetryValuesUrlFirstDevice, new TypeReference<>() {}); | |
201 | + Map<String, List<Map<String, Object>>> secondDeviceValues = doGetAsyncTyped(getTelemetryValuesUrlSecondDevice, new TypeReference<>() {}); | |
200 | 202 | |
201 | 203 | assertGatewayDeviceData(firstDeviceValues, expectedKeys); |
202 | 204 | assertGatewayDeviceData(secondDeviceValues, expectedKeys); |
... | ... | @@ -212,7 +214,7 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt |
212 | 214 | return "/api/plugins/telemetry/DEVICE/" + deviceId + "/values/timeseries?startTs=0&endTs=25000&keys=" + String.join(",", actualKeySet); |
213 | 215 | } |
214 | 216 | |
215 | - private void assertGatewayDeviceData(Map<String, List<Map<String, String>>> deviceValues, List<String> expectedKeys) { | |
217 | + private void assertGatewayDeviceData(Map<String, List<Map<String, Object>>> deviceValues, List<String> expectedKeys) { | |
216 | 218 | |
217 | 219 | assertEquals(2, deviceValues.get(expectedKeys.get(0)).size()); |
218 | 220 | assertEquals(2, deviceValues.get(expectedKeys.get(1)).size()); |
... | ... | @@ -228,11 +230,11 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt |
228 | 230 | |
229 | 231 | } |
230 | 232 | |
231 | - private void assertValues(Map<String, List<Map<String, String>>> deviceValues, int arrayIndex) { | |
232 | - for (Map.Entry<String, List<Map<String, String>>> entry : deviceValues.entrySet()) { | |
233 | + private void assertValues(Map<String, List<Map<String, Object>>> deviceValues, int arrayIndex) { | |
234 | + for (Map.Entry<String, List<Map<String, Object>>> entry : deviceValues.entrySet()) { | |
233 | 235 | String key = entry.getKey(); |
234 | - List<Map<String, String>> tsKv = entry.getValue(); | |
235 | - String value = tsKv.get(arrayIndex).get("value"); | |
236 | + List<Map<String, Object>> tsKv = entry.getValue(); | |
237 | + String value = (String) tsKv.get(arrayIndex).get("value"); | |
236 | 238 | switch (key) { |
237 | 239 | case "key1": |
238 | 240 | assertEquals("value1", value); |
... | ... | @@ -253,7 +255,7 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt |
253 | 255 | } |
254 | 256 | } |
255 | 257 | |
256 | - private void assertTs(Map<String, List<Map<String, String>>> deviceValues, List<String> expectedKeys, int ts, int arrayIndex) { | |
258 | + private void assertTs(Map<String, List<Map<String, Object>>> deviceValues, List<String> expectedKeys, int ts, int arrayIndex) { | |
257 | 259 | assertEquals(ts, deviceValues.get(expectedKeys.get(0)).get(arrayIndex).get("ts")); |
258 | 260 | assertEquals(ts, deviceValues.get(expectedKeys.get(1)).get(arrayIndex).get("ts")); |
259 | 261 | assertEquals(ts, deviceValues.get(expectedKeys.get(2)).get(arrayIndex).get("ts")); | ... | ... |
... | ... | @@ -21,12 +21,11 @@ import org.junit.Assert; |
21 | 21 | import org.junit.Before; |
22 | 22 | import org.junit.Test; |
23 | 23 | import org.junit.runner.RunWith; |
24 | -import org.mockito.runners.MockitoJUnitRunner; | |
24 | +import org.mockito.junit.MockitoJUnitRunner; | |
25 | 25 | import org.springframework.context.ApplicationEventPublisher; |
26 | 26 | import org.springframework.test.util.ReflectionTestUtils; |
27 | 27 | import org.thingsboard.server.common.data.id.DeviceId; |
28 | 28 | import org.thingsboard.server.common.data.id.TenantId; |
29 | -import org.thingsboard.server.common.msg.queue.ServiceQueue; | |
30 | 29 | import org.thingsboard.server.queue.discovery.HashPartitionService; |
31 | 30 | import org.thingsboard.server.common.msg.queue.ServiceType; |
32 | 31 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; | ... | ... |
... | ... | @@ -20,7 +20,7 @@ import org.junit.Assert; |
20 | 20 | import org.junit.Test; |
21 | 21 | import org.junit.runner.RunWith; |
22 | 22 | import org.mockito.Mockito; |
23 | -import org.mockito.runners.MockitoJUnitRunner; | |
23 | +import org.mockito.junit.MockitoJUnitRunner; | |
24 | 24 | import org.thingsboard.server.gen.transport.TransportProtos; |
25 | 25 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
26 | 26 | import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy; | ... | ... |
... | ... | @@ -20,7 +20,7 @@ import lombok.extern.slf4j.Slf4j; |
20 | 20 | import org.junit.Test; |
21 | 21 | import org.junit.runner.RunWith; |
22 | 22 | import org.mockito.Mockito; |
23 | -import org.mockito.runners.MockitoJUnitRunner; | |
23 | +import org.mockito.junit.MockitoJUnitRunner; | |
24 | 24 | import org.thingsboard.server.utils.EventDeduplicationExecutor; |
25 | 25 | |
26 | 26 | import java.util.concurrent.ExecutorService; | ... | ... |
... | ... | @@ -30,7 +30,7 @@ public interface TbActor { |
30 | 30 | } |
31 | 31 | |
32 | 32 | default InitFailureStrategy onInitFailure(int attempt, Throwable t) { |
33 | - return InitFailureStrategy.retryWithDelay(5000 * attempt); | |
33 | + return InitFailureStrategy.retryWithDelay(5000L * attempt); | |
34 | 34 | } |
35 | 35 | |
36 | 36 | default ProcessFailureStrategy onProcessFailure(Throwable t) { | ... | ... |
... | ... | @@ -17,6 +17,8 @@ package org.thingsboard.server.actors; |
17 | 17 | |
18 | 18 | public class TbActorException extends Exception { |
19 | 19 | |
20 | + private static final long serialVersionUID = 8209771144711980882L; | |
21 | + | |
20 | 22 | public TbActorException(String message, Throwable cause) { |
21 | 23 | super(message, cause); |
22 | 24 | } | ... | ... |
... | ... | @@ -18,6 +18,7 @@ package org.thingsboard.server.actors; |
18 | 18 | import lombok.Data; |
19 | 19 | import lombok.extern.slf4j.Slf4j; |
20 | 20 | import org.thingsboard.server.common.msg.TbActorMsg; |
21 | +import org.thingsboard.server.common.msg.TbActorStopReason; | |
21 | 22 | |
22 | 23 | import java.util.List; |
23 | 24 | import java.util.concurrent.ConcurrentLinkedQueue; |
... | ... | @@ -49,6 +50,7 @@ public final class TbActorMailbox implements TbActorCtx { |
49 | 50 | private final AtomicBoolean busy = new AtomicBoolean(FREE); |
50 | 51 | private final AtomicBoolean ready = new AtomicBoolean(NOT_READY); |
51 | 52 | private final AtomicBoolean destroyInProgress = new AtomicBoolean(); |
53 | + private volatile TbActorStopReason stopReason; | |
52 | 54 | |
53 | 55 | public void initActor() { |
54 | 56 | dispatcher.getExecutor().execute(() -> tryInit(1)); |
... | ... | @@ -70,6 +72,7 @@ public final class TbActorMailbox implements TbActorCtx { |
70 | 72 | InitFailureStrategy strategy = actor.onInitFailure(attempt, t); |
71 | 73 | if (strategy.isStop() || (settings.getMaxActorInitAttempts() > 0 && attemptIdx > settings.getMaxActorInitAttempts())) { |
72 | 74 | log.info("[{}] Failed to init actor, attempt {}, going to stop attempts.", selfId, attempt, t); |
75 | + stopReason = TbActorStopReason.INIT_FAILED; | |
73 | 76 | system.stop(selfId); |
74 | 77 | } else if (strategy.getRetryDelay() > 0) { |
75 | 78 | log.info("[{}] Failed to init actor, attempt {}, going to retry in attempts in {}ms", selfId, attempt, strategy.getRetryDelay()); |
... | ... | @@ -84,12 +87,16 @@ public final class TbActorMailbox implements TbActorCtx { |
84 | 87 | } |
85 | 88 | |
86 | 89 | private void enqueue(TbActorMsg msg, boolean highPriority) { |
87 | - if (highPriority) { | |
88 | - highPriorityMsgs.add(msg); | |
90 | + if (!destroyInProgress.get()) { | |
91 | + if (highPriority) { | |
92 | + highPriorityMsgs.add(msg); | |
93 | + } else { | |
94 | + normalPriorityMsgs.add(msg); | |
95 | + } | |
96 | + tryProcessQueue(true); | |
89 | 97 | } else { |
90 | - normalPriorityMsgs.add(msg); | |
98 | + msg.onTbActorStopped(stopReason); | |
91 | 99 | } |
92 | - tryProcessQueue(true); | |
93 | 100 | } |
94 | 101 | |
95 | 102 | private void tryProcessQueue(boolean newMsg) { |
... | ... | @@ -180,11 +187,16 @@ public final class TbActorMailbox implements TbActorCtx { |
180 | 187 | } |
181 | 188 | |
182 | 189 | public void destroy() { |
190 | + if (stopReason == null) { | |
191 | + stopReason = TbActorStopReason.STOPPED; | |
192 | + } | |
183 | 193 | destroyInProgress.set(true); |
184 | 194 | dispatcher.getExecutor().execute(() -> { |
185 | 195 | try { |
186 | 196 | ready.set(NOT_READY); |
187 | 197 | actor.destroy(); |
198 | + highPriorityMsgs.forEach(msg -> msg.onTbActorStopped(stopReason)); | |
199 | + normalPriorityMsgs.forEach(msg -> msg.onTbActorStopped(stopReason)); | |
188 | 200 | } catch (Throwable t) { |
189 | 201 | log.warn("[{}] Failed to destroy actor: {}", selfId, t); |
190 | 202 | } | ... | ... |
... | ... | @@ -21,7 +21,7 @@ import org.junit.Assert; |
21 | 21 | import org.junit.Before; |
22 | 22 | import org.junit.Test; |
23 | 23 | import org.junit.runner.RunWith; |
24 | -import org.mockito.runners.MockitoJUnitRunner; | |
24 | +import org.mockito.junit.MockitoJUnitRunner; | |
25 | 25 | import org.thingsboard.server.common.data.id.DeviceId; |
26 | 26 | |
27 | 27 | import java.util.ArrayList; | ... | ... |
... | ... | @@ -49,6 +49,10 @@ |
49 | 49 | <artifactId>guava</artifactId> |
50 | 50 | </dependency> |
51 | 51 | <dependency> |
52 | + <groupId>javax.annotation</groupId> | |
53 | + <artifactId>javax.annotation-api</artifactId> | |
54 | + </dependency> | |
55 | + <dependency> | |
52 | 56 | <groupId>com.github.fge</groupId> |
53 | 57 | <artifactId>json-schema-validator</artifactId> |
54 | 58 | </dependency> |
... | ... | @@ -99,7 +103,7 @@ |
99 | 103 | </dependency> |
100 | 104 | <dependency> |
101 | 105 | <groupId>org.mockito</groupId> |
102 | - <artifactId>mockito-all</artifactId> | |
106 | + <artifactId>mockito-core</artifactId> | |
103 | 107 | <scope>test</scope> |
104 | 108 | </dependency> |
105 | 109 | </dependencies> | ... | ... |
... | ... | @@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j; |
23 | 23 | import org.springframework.beans.factory.annotation.Autowired; |
24 | 24 | import org.springframework.beans.factory.annotation.Value; |
25 | 25 | import org.springframework.core.env.Environment; |
26 | +import org.springframework.core.env.Profiles; | |
26 | 27 | import org.thingsboard.server.dao.cassandra.guava.GuavaSession; |
27 | 28 | import org.thingsboard.server.dao.cassandra.guava.GuavaSessionBuilder; |
28 | 29 | import org.thingsboard.server.dao.cassandra.guava.GuavaSessionUtils; |
... | ... | @@ -77,7 +78,7 @@ public abstract class AbstractCassandraCluster { |
77 | 78 | } |
78 | 79 | |
79 | 80 | private boolean isInstall() { |
80 | - return environment.acceptsProfiles("install"); | |
81 | + return environment.acceptsProfiles(Profiles.of("install")); | |
81 | 82 | } |
82 | 83 | |
83 | 84 | private void initSession() { | ... | ... |
... | ... | @@ -18,38 +18,25 @@ package org.thingsboard.server.dao.cassandra.guava; |
18 | 18 | import com.datastax.oss.driver.api.core.CqlSession; |
19 | 19 | import com.datastax.oss.driver.api.core.config.DriverConfigLoader; |
20 | 20 | import com.datastax.oss.driver.api.core.context.DriverContext; |
21 | -import com.datastax.oss.driver.api.core.metadata.Node; | |
22 | -import com.datastax.oss.driver.api.core.metadata.NodeStateListener; | |
23 | -import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListener; | |
21 | +import com.datastax.oss.driver.api.core.session.ProgrammaticArguments; | |
24 | 22 | import com.datastax.oss.driver.api.core.session.SessionBuilder; |
25 | -import com.datastax.oss.driver.api.core.tracker.RequestTracker; | |
26 | -import com.datastax.oss.driver.api.core.type.codec.TypeCodec; | |
27 | 23 | import edu.umd.cs.findbugs.annotations.NonNull; |
28 | -import java.util.List; | |
29 | -import java.util.Map; | |
30 | -import java.util.function.Predicate; | |
31 | 24 | |
32 | 25 | public class GuavaSessionBuilder extends SessionBuilder<GuavaSessionBuilder, GuavaSession> { |
33 | 26 | |
34 | 27 | @Override |
35 | 28 | protected DriverContext buildContext( |
36 | 29 | DriverConfigLoader configLoader, |
37 | - List<TypeCodec<?>> typeCodecs, | |
38 | - NodeStateListener nodeStateListener, | |
39 | - SchemaChangeListener schemaChangeListener, | |
40 | - RequestTracker requestTracker, | |
41 | - Map<String, String> localDatacenters, | |
42 | - Map<String, Predicate<Node>> nodeFilters, | |
43 | - ClassLoader classLoader) { | |
30 | + ProgrammaticArguments programmaticArguments) { | |
44 | 31 | return new GuavaDriverContext( |
45 | 32 | configLoader, |
46 | - typeCodecs, | |
47 | - nodeStateListener, | |
48 | - schemaChangeListener, | |
49 | - requestTracker, | |
50 | - localDatacenters, | |
51 | - nodeFilters, | |
52 | - classLoader); | |
33 | + programmaticArguments.getTypeCodecs(), | |
34 | + programmaticArguments.getNodeStateListener(), | |
35 | + programmaticArguments.getSchemaChangeListener(), | |
36 | + programmaticArguments.getRequestTracker(), | |
37 | + programmaticArguments.getLocalDatacenters(), | |
38 | + programmaticArguments.getNodeFilters(), | |
39 | + programmaticArguments.getClassLoader()); | |
53 | 40 | } |
54 | 41 | |
55 | 42 | @Override | ... | ... |
common/dao-api/src/main/java/org/thingsboard/server/dao/util/mapping/JacksonUtil.java
renamed from
dao/src/main/java/org/thingsboard/server/dao/util/mapping/JacksonUtil.java
... | ... | @@ -16,6 +16,7 @@ |
16 | 16 | package org.thingsboard.server.dao.util.mapping; |
17 | 17 | |
18 | 18 | import com.fasterxml.jackson.core.JsonProcessingException; |
19 | +import com.fasterxml.jackson.core.type.TypeReference; | |
19 | 20 | import com.fasterxml.jackson.databind.JsonNode; |
20 | 21 | import com.fasterxml.jackson.databind.ObjectMapper; |
21 | 22 | import com.fasterxml.jackson.databind.node.ObjectNode; |
... | ... | @@ -38,6 +39,15 @@ public class JacksonUtil { |
38 | 39 | } |
39 | 40 | } |
40 | 41 | |
42 | + public static <T> T convertValue(Object fromValue, TypeReference<T> toValueTypeRef) { | |
43 | + try { | |
44 | + return fromValue != null ? OBJECT_MAPPER.convertValue(fromValue, toValueTypeRef) : null; | |
45 | + } catch (IllegalArgumentException e) { | |
46 | + throw new IllegalArgumentException("The given object value: " | |
47 | + + fromValue + " cannot be converted to " + toValueTypeRef, e); | |
48 | + } | |
49 | + } | |
50 | + | |
41 | 51 | public static <T> T fromString(String string, Class<T> clazz) { |
42 | 52 | try { |
43 | 53 | return string != null ? OBJECT_MAPPER.readValue(string, clazz) : null; |
... | ... | @@ -72,7 +82,9 @@ public class JacksonUtil { |
72 | 82 | } |
73 | 83 | |
74 | 84 | public static <T> T clone(T value) { |
75 | - return fromString(toString(value), (Class<T>) value.getClass()); | |
85 | + @SuppressWarnings("unchecked") | |
86 | + Class<T> valueClass = (Class<T>) value.getClass(); | |
87 | + return fromString(toString(value), valueClass); | |
76 | 88 | } |
77 | 89 | |
78 | 90 | public static <T> JsonNode valueToTree(T value) { | ... | ... |
... | ... | @@ -19,7 +19,7 @@ import com.datastax.oss.driver.api.core.uuid.Uuids; |
19 | 19 | import org.junit.Assert; |
20 | 20 | import org.junit.Test; |
21 | 21 | import org.junit.runner.RunWith; |
22 | -import org.mockito.runners.MockitoJUnitRunner; | |
22 | +import org.mockito.junit.MockitoJUnitRunner; | |
23 | 23 | |
24 | 24 | import java.util.ArrayList; |
25 | 25 | import java.util.Arrays; | ... | ... |
... | ... | @@ -67,11 +67,6 @@ public enum MsgType { |
67 | 67 | REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG, |
68 | 68 | |
69 | 69 | /** |
70 | - * Message that is sent by RuleActor implementation to RuleActor itself to log the error. | |
71 | - */ | |
72 | - RULE_TO_SELF_ERROR_MSG, | |
73 | - | |
74 | - /** | |
75 | 70 | * Message that is sent by RuleActor implementation to RuleActor itself to process the message. |
76 | 71 | */ |
77 | 72 | RULE_TO_SELF_MSG, | ... | ... |
... | ... | @@ -22,4 +22,12 @@ public interface TbActorMsg { |
22 | 22 | |
23 | 23 | MsgType getMsgType(); |
24 | 24 | |
25 | + /** | |
26 | + * Executed when the target TbActor is stopped or destroyed. | |
27 | + * For example, rule node failed to initialize or removed from rule chain. | |
28 | + * Implementation should cleanup the resources. | |
29 | + */ | |
30 | + default void onTbActorStopped(TbActorStopReason reason) { | |
31 | + } | |
32 | + | |
25 | 33 | } | ... | ... |
common/message/src/main/java/org/thingsboard/server/common/msg/TbActorStopReason.java
renamed from
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToSelfErrorMsg.java
... | ... | @@ -13,25 +13,10 @@ |
13 | 13 | * See the License for the specific language governing permissions and |
14 | 14 | * limitations under the License. |
15 | 15 | */ |
16 | -package org.thingsboard.server.actors.ruleChain; | |
16 | +package org.thingsboard.server.common.msg; | |
17 | 17 | |
18 | -import lombok.Data; | |
19 | -import org.thingsboard.server.common.msg.MsgType; | |
20 | -import org.thingsboard.server.common.msg.TbActorMsg; | |
21 | -import org.thingsboard.server.common.msg.TbMsg; | |
18 | +public enum TbActorStopReason { | |
22 | 19 | |
23 | -/** | |
24 | - * Created by ashvayka on 19.03.18. | |
25 | - */ | |
26 | -@Data | |
27 | -final class RuleNodeToSelfErrorMsg implements TbActorMsg { | |
28 | - | |
29 | - private final TbMsg msg; | |
30 | - private final Throwable error; | |
31 | - | |
32 | - @Override | |
33 | - public MsgType getMsgType() { | |
34 | - return MsgType.RULE_TO_SELF_ERROR_MSG; | |
35 | - } | |
20 | + INIT_FAILED, STOPPED | |
36 | 21 | |
37 | 22 | } | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2021 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.msg; | |
17 | + | |
18 | +import lombok.EqualsAndHashCode; | |
19 | +import lombok.Getter; | |
20 | + | |
21 | +@EqualsAndHashCode | |
22 | +public abstract class TbRuleEngineActorMsg implements TbActorMsg { | |
23 | + | |
24 | + @Getter | |
25 | + protected final TbMsg msg; | |
26 | + | |
27 | + public TbRuleEngineActorMsg(TbMsg msg) { | |
28 | + this.msg = msg; | |
29 | + } | |
30 | +} | ... | ... |
... | ... | @@ -15,32 +15,58 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.common.msg.queue; |
17 | 17 | |
18 | -import lombok.Data; | |
18 | +import lombok.EqualsAndHashCode; | |
19 | +import lombok.Getter; | |
20 | +import lombok.ToString; | |
19 | 21 | import org.thingsboard.server.common.data.id.TenantId; |
20 | 22 | import org.thingsboard.server.common.msg.MsgType; |
21 | -import org.thingsboard.server.common.msg.TbActorMsg; | |
23 | +import org.thingsboard.server.common.msg.TbActorStopReason; | |
22 | 24 | import org.thingsboard.server.common.msg.TbMsg; |
25 | +import org.thingsboard.server.common.msg.TbRuleEngineActorMsg; | |
23 | 26 | |
24 | -import java.io.Serializable; | |
25 | 27 | import java.util.Set; |
26 | 28 | |
27 | 29 | /** |
28 | 30 | * Created by ashvayka on 15.03.18. |
29 | 31 | */ |
30 | -@Data | |
31 | -public final class QueueToRuleEngineMsg implements TbActorMsg { | |
32 | +@ToString | |
33 | +@EqualsAndHashCode(callSuper = true) | |
34 | +public final class QueueToRuleEngineMsg extends TbRuleEngineActorMsg { | |
32 | 35 | |
36 | + @Getter | |
33 | 37 | private final TenantId tenantId; |
34 | - private final TbMsg tbMsg; | |
38 | + @Getter | |
35 | 39 | private final Set<String> relationTypes; |
40 | + @Getter | |
36 | 41 | private final String failureMessage; |
37 | 42 | |
43 | + public QueueToRuleEngineMsg(TenantId tenantId, TbMsg tbMsg, Set<String> relationTypes, String failureMessage) { | |
44 | + super(tbMsg); | |
45 | + this.tenantId = tenantId; | |
46 | + this.relationTypes = relationTypes; | |
47 | + this.failureMessage = failureMessage; | |
48 | + } | |
49 | + | |
38 | 50 | @Override |
39 | 51 | public MsgType getMsgType() { |
40 | 52 | return MsgType.QUEUE_TO_RULE_ENGINE_MSG; |
41 | 53 | } |
42 | 54 | |
55 | + @Override | |
56 | + public void onTbActorStopped(TbActorStopReason reason) { | |
57 | + String message; | |
58 | + if (msg.getRuleChainId() != null) { | |
59 | + message = reason == TbActorStopReason.STOPPED ? | |
60 | + String.format("Rule chain [%s] stopped", msg.getRuleChainId().getId()) : | |
61 | + String.format("Failed to initialize rule chain [%s]!", msg.getRuleChainId().getId()); | |
62 | + } else { | |
63 | + message = reason == TbActorStopReason.STOPPED ? "Rule chain stopped" : "Failed to initialize rule chain!"; | |
64 | + } | |
65 | + msg.getCallback().onFailure(new RuleEngineException(message)); | |
66 | + } | |
67 | + | |
43 | 68 | public boolean isTellNext() { |
44 | 69 | return relationTypes != null && !relationTypes.isEmpty(); |
45 | 70 | } |
71 | + | |
46 | 72 | } | ... | ... |
... | ... | @@ -24,6 +24,9 @@ import org.thingsboard.server.common.data.rule.RuleNode; |
24 | 24 | |
25 | 25 | @Slf4j |
26 | 26 | public class RuleNodeException extends RuleEngineException { |
27 | + | |
28 | + private static final long serialVersionUID = -1776681087370749776L; | |
29 | + | |
27 | 30 | @Getter |
28 | 31 | private final String ruleChainName; |
29 | 32 | @Getter |
... | ... | @@ -33,6 +36,7 @@ public class RuleNodeException extends RuleEngineException { |
33 | 36 | @Getter |
34 | 37 | private final RuleNodeId ruleNodeId; |
35 | 38 | |
39 | + | |
36 | 40 | public RuleNodeException(String message, String ruleChainName, RuleNode ruleNode) { |
37 | 41 | super(message); |
38 | 42 | this.ruleChainName = ruleChainName; | ... | ... |
... | ... | @@ -154,6 +154,7 @@ public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> extends Abstract |
154 | 154 | } |
155 | 155 | |
156 | 156 | private <V> CompletableFuture<List<V>> fromList(List<CompletableFuture<V>> futures) { |
157 | + @SuppressWarnings("unchecked") | |
157 | 158 | CompletableFuture<Collection<V>>[] arrayFuture = new CompletableFuture[futures.size()]; |
158 | 159 | futures.toArray(arrayFuture); |
159 | 160 | ... | ... |
... | ... | @@ -22,6 +22,10 @@ import org.apache.kafka.clients.CommonClientConfigs; |
22 | 22 | import org.apache.kafka.clients.admin.AdminClientConfig; |
23 | 23 | import org.apache.kafka.clients.consumer.ConsumerConfig; |
24 | 24 | import org.apache.kafka.clients.producer.ProducerConfig; |
25 | +import org.apache.kafka.common.serialization.ByteArrayDeserializer; | |
26 | +import org.apache.kafka.common.serialization.ByteArraySerializer; | |
27 | +import org.apache.kafka.common.serialization.StringDeserializer; | |
28 | +import org.apache.kafka.common.serialization.StringSerializer; | |
25 | 29 | import org.springframework.beans.factory.annotation.Value; |
26 | 30 | import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; |
27 | 31 | import org.springframework.boot.context.properties.ConfigurationProperties; |
... | ... | @@ -107,8 +111,8 @@ public class TbKafkaSettings { |
107 | 111 | props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes); |
108 | 112 | props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs); |
109 | 113 | |
110 | - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); | |
111 | - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); | |
114 | + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | |
115 | + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); | |
112 | 116 | return props; |
113 | 117 | } |
114 | 118 | |
... | ... | @@ -120,8 +124,8 @@ public class TbKafkaSettings { |
120 | 124 | props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); |
121 | 125 | props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs); |
122 | 126 | props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); |
123 | - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); | |
124 | - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); | |
127 | + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | |
128 | + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); | |
125 | 129 | return props; |
126 | 130 | } |
127 | 131 | ... | ... |
... | ... | @@ -60,6 +60,7 @@ public final class InMemoryStorage { |
60 | 60 | public <T extends TbQueueMsg> List<T> get(String topic) throws InterruptedException { |
61 | 61 | if (storage.containsKey(topic)) { |
62 | 62 | List<T> entities; |
63 | + @SuppressWarnings("unchecked") | |
63 | 64 | T first = (T) storage.get(topic).poll(); |
64 | 65 | if (first != null) { |
65 | 66 | entities = new ArrayList<>(); |
... | ... | @@ -67,7 +68,9 @@ public final class InMemoryStorage { |
67 | 68 | List<TbQueueMsg> otherList = new ArrayList<>(); |
68 | 69 | storage.get(topic).drainTo(otherList, 999); |
69 | 70 | for (TbQueueMsg other : otherList) { |
70 | - entities.add((T) other); | |
71 | + @SuppressWarnings("unchecked") | |
72 | + T entity = (T) other; | |
73 | + entities.add(entity); | |
71 | 74 | } |
72 | 75 | } else { |
73 | 76 | entities = Collections.emptyList(); | ... | ... |
... | ... | @@ -64,6 +64,7 @@ public class InMemoryTbQueueConsumer<T extends TbQueueMsg> implements TbQueueCon |
64 | 64 | @Override |
65 | 65 | public List<T> poll(long durationInMillis) { |
66 | 66 | if (subscribed) { |
67 | + @SuppressWarnings("unchecked") | |
67 | 68 | List<T> messages = partitions |
68 | 69 | .stream() |
69 | 70 | .map(tpi -> { | ... | ... |
... | ... | @@ -47,6 +47,7 @@ public class DefaultTbApiUsageClient implements TbApiUsageClient { |
47 | 47 | @Value("${usage.stats.report.interval:10}") |
48 | 48 | private int interval; |
49 | 49 | |
50 | + @SuppressWarnings("unchecked") | |
50 | 51 | private final ConcurrentMap<TenantId, AtomicLong>[] values = new ConcurrentMap[ApiUsageRecordKey.values().length]; |
51 | 52 | private final PartitionService partitionService; |
52 | 53 | private final SchedulerComponent scheduler; | ... | ... |
... | ... | @@ -79,7 +79,7 @@ |
79 | 79 | </dependency> |
80 | 80 | <dependency> |
81 | 81 | <groupId>org.mockito</groupId> |
82 | - <artifactId>mockito-all</artifactId> | |
82 | + <artifactId>mockito-core</artifactId> | |
83 | 83 | <scope>test</scope> |
84 | 84 | </dependency> |
85 | 85 | </dependencies> |
... | ... | @@ -89,4 +89,4 @@ |
89 | 89 | </plugins> |
90 | 90 | </build> |
91 | 91 | |
92 | -</project> | |
\ No newline at end of file | ||
92 | +</project> | ... | ... |
... | ... | @@ -45,6 +45,7 @@ import java.io.IOException; |
45 | 45 | import java.io.InputStream; |
46 | 46 | import java.net.URL; |
47 | 47 | import java.security.KeyStore; |
48 | +import java.security.cert.CertificateEncodingException; | |
48 | 49 | import java.security.cert.CertificateException; |
49 | 50 | import java.security.cert.X509Certificate; |
50 | 51 | import java.util.concurrent.CountDownLatch; |
... | ... | @@ -154,7 +155,7 @@ public class MqttSslHandlerProvider { |
154 | 155 | String credentialsBody = null; |
155 | 156 | for (X509Certificate cert : chain) { |
156 | 157 | try { |
157 | - String strCert = SslUtil.getX509CertificateString(cert); | |
158 | + String strCert = SslUtil.getCertificateString(cert); | |
158 | 159 | String sha3Hash = EncryptionUtil.getSha3Hash(strCert); |
159 | 160 | final String[] credentialsBodyHolder = new String[1]; |
160 | 161 | CountDownLatch latch = new CountDownLatch(1); |
... | ... | @@ -179,7 +180,7 @@ public class MqttSslHandlerProvider { |
179 | 180 | credentialsBody = credentialsBodyHolder[0]; |
180 | 181 | break; |
181 | 182 | } |
182 | - } catch (InterruptedException | IOException e) { | |
183 | + } catch (InterruptedException | CertificateEncodingException e) { | |
183 | 184 | log.error(e.getMessage(), e); |
184 | 185 | } |
185 | 186 | } | ... | ... |
... | ... | @@ -35,6 +35,7 @@ import io.netty.handler.codec.mqtt.MqttSubscribeMessage; |
35 | 35 | import io.netty.handler.codec.mqtt.MqttTopicSubscription; |
36 | 36 | import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; |
37 | 37 | import io.netty.handler.ssl.SslHandler; |
38 | +import io.netty.util.CharsetUtil; | |
38 | 39 | import io.netty.util.ReferenceCountUtil; |
39 | 40 | import io.netty.util.concurrent.Future; |
40 | 41 | import io.netty.util.concurrent.GenericFutureListener; |
... | ... | @@ -68,7 +69,8 @@ import org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher; |
68 | 69 | import org.thingsboard.server.transport.mqtt.util.SslUtil; |
69 | 70 | |
70 | 71 | import javax.net.ssl.SSLPeerUnverifiedException; |
71 | -import javax.security.cert.X509Certificate; | |
72 | +import java.security.cert.Certificate; | |
73 | +import java.security.cert.X509Certificate; | |
72 | 74 | import java.io.IOException; |
73 | 75 | import java.net.InetSocketAddress; |
74 | 76 | import java.util.ArrayList; |
... | ... | @@ -315,7 +317,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
315 | 317 | } |
316 | 318 | |
317 | 319 | private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final int msgId, final T msg) { |
318 | - return new TransportServiceCallback<Void>() { | |
320 | + return new TransportServiceCallback<>() { | |
319 | 321 | @Override |
320 | 322 | public void onSuccess(Void dummy) { |
321 | 323 | log.trace("[{}] Published msg: {}", sessionId, msg); |
... | ... | @@ -482,12 +484,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
482 | 484 | if (userName != null) { |
483 | 485 | request.setUserName(userName); |
484 | 486 | } |
485 | - String password = connectMessage.payload().password(); | |
486 | - if (password != null) { | |
487 | + byte[] passwordBytes = connectMessage.payload().passwordInBytes(); | |
488 | + if (passwordBytes != null) { | |
489 | + String password = new String(passwordBytes, CharsetUtil.UTF_8); | |
487 | 490 | request.setPassword(password); |
488 | 491 | } |
489 | 492 | transportService.process(DeviceTransportType.MQTT, request.build(), |
490 | - new TransportServiceCallback<ValidateDeviceCredentialsResponse>() { | |
493 | + new TransportServiceCallback<>() { | |
491 | 494 | @Override |
492 | 495 | public void onSuccess(ValidateDeviceCredentialsResponse msg) { |
493 | 496 | onValidateDeviceResponse(msg, ctx, connectMessage); |
... | ... | @@ -507,10 +510,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
507 | 510 | if (!context.isSkipValidityCheckForClientCert()) { |
508 | 511 | cert.checkValidity(); |
509 | 512 | } |
510 | - String strCert = SslUtil.getX509CertificateString(cert); | |
513 | + String strCert = SslUtil.getCertificateString(cert); | |
511 | 514 | String sha3Hash = EncryptionUtil.getSha3Hash(strCert); |
512 | 515 | transportService.process(DeviceTransportType.MQTT, ValidateDeviceX509CertRequestMsg.newBuilder().setHash(sha3Hash).build(), |
513 | - new TransportServiceCallback<ValidateDeviceCredentialsResponse>() { | |
516 | + new TransportServiceCallback<>() { | |
514 | 517 | @Override |
515 | 518 | public void onSuccess(ValidateDeviceCredentialsResponse msg) { |
516 | 519 | onValidateDeviceResponse(msg, ctx, connectMessage); |
... | ... | @@ -531,9 +534,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
531 | 534 | |
532 | 535 | private X509Certificate getX509Certificate() { |
533 | 536 | try { |
534 | - X509Certificate[] certChain = sslHandler.engine().getSession().getPeerCertificateChain(); | |
537 | + Certificate[] certChain = sslHandler.engine().getSession().getPeerCertificates(); | |
535 | 538 | if (certChain.length > 0) { |
536 | - return certChain[0]; | |
539 | + return (X509Certificate) certChain[0]; | |
537 | 540 | } |
538 | 541 | } catch (SSLPeerUnverifiedException e) { |
539 | 542 | log.warn(e.getMessage()); | ... | ... |
... | ... | @@ -20,8 +20,8 @@ import org.springframework.util.Base64Utils; |
20 | 20 | import org.thingsboard.server.common.msg.EncryptionUtil; |
21 | 21 | |
22 | 22 | import java.io.IOException; |
23 | +import java.security.cert.Certificate; | |
23 | 24 | import java.security.cert.CertificateEncodingException; |
24 | -import java.security.cert.X509Certificate; | |
25 | 25 | |
26 | 26 | /** |
27 | 27 | * @author Valerii Sosliuk |
... | ... | @@ -32,15 +32,8 @@ public class SslUtil { |
32 | 32 | private SslUtil() { |
33 | 33 | } |
34 | 34 | |
35 | - public static String getX509CertificateString(X509Certificate cert) | |
36 | - throws CertificateEncodingException, IOException { | |
37 | - Base64Utils.encodeToString(cert.getEncoded()); | |
38 | - return EncryptionUtil.trimNewLines(Base64Utils.encodeToString(cert.getEncoded())); | |
39 | - } | |
40 | - | |
41 | - public static String getX509CertificateString(javax.security.cert.X509Certificate cert) | |
42 | - throws javax.security.cert.CertificateEncodingException, IOException { | |
43 | - Base64Utils.encodeToString(cert.getEncoded()); | |
35 | + public static String getCertificateString(Certificate cert) | |
36 | + throws CertificateEncodingException { | |
44 | 37 | return EncryptionUtil.trimNewLines(Base64Utils.encodeToString(cert.getEncoded())); |
45 | 38 | } |
46 | 39 | } | ... | ... |
... | ... | @@ -17,7 +17,7 @@ package org.thingsboard.server.transport.mqtt.util; |
17 | 17 | |
18 | 18 | import org.junit.Test; |
19 | 19 | import org.junit.runner.RunWith; |
20 | -import org.mockito.runners.MockitoJUnitRunner; | |
20 | +import org.mockito.junit.MockitoJUnitRunner; | |
21 | 21 | |
22 | 22 | import javax.script.ScriptException; |
23 | 23 | ... | ... |
... | ... | @@ -32,6 +32,7 @@ public class ProtoWithFSTService implements DataDecodingEncodingService { |
32 | 32 | @Override |
33 | 33 | public <T> Optional<T> decode(byte[] byteArray) { |
34 | 34 | try { |
35 | + @SuppressWarnings("unchecked") | |
35 | 36 | T msg = (T) config.asObject(byteArray); |
36 | 37 | return Optional.of(msg); |
37 | 38 | } catch (IllegalArgumentException e) { | ... | ... |
... | ... | @@ -42,6 +42,10 @@ |
42 | 42 | <scope>provided</scope> |
43 | 43 | </dependency> |
44 | 44 | <dependency> |
45 | + <groupId>javax.annotation</groupId> | |
46 | + <artifactId>javax.annotation-api</artifactId> | |
47 | + </dependency> | |
48 | + <dependency> | |
45 | 49 | <groupId>org.slf4j</groupId> |
46 | 50 | <artifactId>slf4j-api</artifactId> |
47 | 51 | </dependency> |
... | ... | @@ -64,7 +68,7 @@ |
64 | 68 | </dependency> |
65 | 69 | <dependency> |
66 | 70 | <groupId>org.mockito</groupId> |
67 | - <artifactId>mockito-all</artifactId> | |
71 | + <artifactId>mockito-core</artifactId> | |
68 | 72 | <scope>test</scope> |
69 | 73 | </dependency> |
70 | 74 | </dependencies> | ... | ... |
... | ... | @@ -40,11 +40,11 @@ public abstract class DaoUtil { |
40 | 40 | |
41 | 41 | public static <T> PageData<T> toPageData(Page<? extends ToData<T>> page) { |
42 | 42 | List<T> data = convertDataList(page.getContent()); |
43 | - return new PageData(data, page.getTotalPages(), page.getTotalElements(), page.hasNext()); | |
43 | + return new PageData<>(data, page.getTotalPages(), page.getTotalElements(), page.hasNext()); | |
44 | 44 | } |
45 | 45 | |
46 | 46 | public static <T> PageData<T> pageToPageData(Page<T> page) { |
47 | - return new PageData(page.getContent(), page.getTotalPages(), page.getTotalElements(), page.hasNext()); | |
47 | + return new PageData<>(page.getContent(), page.getTotalPages(), page.getTotalElements(), page.hasNext()); | |
48 | 48 | } |
49 | 49 | |
50 | 50 | public static Pageable toPageable(PageLink pageLink) { | ... | ... |
... | ... | @@ -306,7 +306,7 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ |
306 | 306 | )); |
307 | 307 | } |
308 | 308 | return Futures.transform(Futures.successfulAsList(alarmFutures), |
309 | - alarmInfos -> new PageData(alarmInfos, alarms.getTotalPages(), alarms.getTotalElements(), | |
309 | + alarmInfos -> new PageData<>(alarmInfos, alarms.getTotalPages(), alarms.getTotalElements(), | |
310 | 310 | alarms.hasNext()), MoreExecutors.directExecutor()); |
311 | 311 | } |
312 | 312 | return Futures.immediateFuture(alarms); | ... | ... |
... | ... | @@ -190,6 +190,7 @@ public class AuditLogServiceImpl implements AuditLogService { |
190 | 190 | case ATTRIBUTES_UPDATED: |
191 | 191 | actionData.put("entityId", entityId.toString()); |
192 | 192 | String scope = extractParameter(String.class, 0, additionalInfo); |
193 | + @SuppressWarnings("unchecked") | |
193 | 194 | List<AttributeKvEntry> attributes = extractParameter(List.class, 1, additionalInfo); |
194 | 195 | actionData.put("scope", scope); |
195 | 196 | ObjectNode attrsNode = JacksonUtil.newObjectNode(); |
... | ... | @@ -205,6 +206,7 @@ public class AuditLogServiceImpl implements AuditLogService { |
205 | 206 | actionData.put("entityId", entityId.toString()); |
206 | 207 | scope = extractParameter(String.class, 0, additionalInfo); |
207 | 208 | actionData.put("scope", scope); |
209 | + @SuppressWarnings("unchecked") | |
208 | 210 | List<String> keys = extractParameter(List.class, 1, additionalInfo); |
209 | 211 | ArrayNode attrsArrayNode = actionData.putArray("attributes"); |
210 | 212 | if (keys != null) { |
... | ... | @@ -267,6 +269,7 @@ public class AuditLogServiceImpl implements AuditLogService { |
267 | 269 | break; |
268 | 270 | case TIMESERIES_UPDATED: |
269 | 271 | actionData.put("entityId", entityId.toString()); |
272 | + @SuppressWarnings("unchecked") | |
270 | 273 | List<TsKvEntry> updatedTimeseries = extractParameter(List.class, 0, additionalInfo); |
271 | 274 | if (updatedTimeseries != null) { |
272 | 275 | ArrayNode result = actionData.putArray("timeseries"); |
... | ... | @@ -283,6 +286,7 @@ public class AuditLogServiceImpl implements AuditLogService { |
283 | 286 | break; |
284 | 287 | case TIMESERIES_DELETED: |
285 | 288 | actionData.put("entityId", entityId.toString()); |
289 | + @SuppressWarnings("unchecked") | |
286 | 290 | List<String> timeseriesKeys = extractParameter(List.class, 0, additionalInfo); |
287 | 291 | if (timeseriesKeys != null) { |
288 | 292 | ArrayNode timeseriesArrayNode = actionData.putArray("timeseries"); | ... | ... |
... | ... | @@ -36,22 +36,22 @@ public class DummyAuditLogServiceImpl implements AuditLogService { |
36 | 36 | |
37 | 37 | @Override |
38 | 38 | public PageData<AuditLog> findAuditLogsByTenantIdAndCustomerId(TenantId tenantId, CustomerId customerId, List<ActionType> actionTypes, TimePageLink pageLink) { |
39 | - return new PageData(); | |
39 | + return new PageData<>(); | |
40 | 40 | } |
41 | 41 | |
42 | 42 | @Override |
43 | 43 | public PageData<AuditLog> findAuditLogsByTenantIdAndUserId(TenantId tenantId, UserId userId, List<ActionType> actionTypes, TimePageLink pageLink) { |
44 | - return new PageData(); | |
44 | + return new PageData<>(); | |
45 | 45 | } |
46 | 46 | |
47 | 47 | @Override |
48 | 48 | public PageData<AuditLog> findAuditLogsByTenantIdAndEntityId(TenantId tenantId, EntityId entityId, List<ActionType> actionTypes, TimePageLink pageLink) { |
49 | - return new PageData(); | |
49 | + return new PageData<>(); | |
50 | 50 | } |
51 | 51 | |
52 | 52 | @Override |
53 | 53 | public PageData<AuditLog> findAuditLogsByTenantId(TenantId tenantId, List<ActionType> actionTypes, TimePageLink pageLink) { |
54 | - return new PageData(); | |
54 | + return new PageData<>(); | |
55 | 55 | } |
56 | 56 | |
57 | 57 | @Override | ... | ... |
... | ... | @@ -203,6 +203,8 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe |
203 | 203 | } catch (Exception t) { |
204 | 204 | ConstraintViolationException e = extractConstraintViolationException(t).orElse(null); |
205 | 205 | if (e != null && e.getConstraintName() != null && e.getConstraintName().equalsIgnoreCase("device_name_unq_key")) { |
206 | + // remove device from cache in case null value cached in the distributed redis. | |
207 | + removeDeviceFromCache(device.getTenantId(), device.getName()); | |
206 | 208 | throw new DataValidationException("Device with such name already exists!"); |
207 | 209 | } else { |
208 | 210 | throw t; |
... | ... | @@ -281,13 +283,17 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe |
281 | 283 | } |
282 | 284 | deleteEntityRelations(tenantId, deviceId); |
283 | 285 | |
286 | + removeDeviceFromCache(tenantId, device.getName()); | |
287 | + | |
288 | + deviceDao.removeById(tenantId, deviceId.getId()); | |
289 | + } | |
290 | + | |
291 | + private void removeDeviceFromCache(TenantId tenantId, String name) { | |
284 | 292 | List<Object> list = new ArrayList<>(); |
285 | - list.add(device.getTenantId()); | |
286 | - list.add(device.getName()); | |
293 | + list.add(tenantId); | |
294 | + list.add(name); | |
287 | 295 | Cache cache = cacheManager.getCache(DEVICE_CACHE); |
288 | 296 | cache.evict(list); |
289 | - | |
290 | - deviceDao.removeById(tenantId, deviceId.getId()); | |
291 | 297 | } |
292 | 298 | |
293 | 299 | @Override | ... | ... |
... | ... | @@ -275,6 +275,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti |
275 | 275 | tenantIdAndEntityId.add(entityId); |
276 | 276 | |
277 | 277 | Cache cache = cacheManager.getCache(ENTITY_VIEW_CACHE); |
278 | + @SuppressWarnings("unchecked") | |
278 | 279 | List<EntityView> fromCache = cache.get(tenantIdAndEntityId, List.class); |
279 | 280 | if (fromCache != null) { |
280 | 281 | return Futures.immediateFuture(fromCache); | ... | ... |
... | ... | @@ -53,7 +53,7 @@ public class HybridClientRegistrationRepository implements ClientRegistrationRep |
53 | 53 | .userNameAttributeName(localClientRegistration.getUserNameAttributeName()) |
54 | 54 | .jwkSetUri(localClientRegistration.getJwkSetUri()) |
55 | 55 | .clientAuthenticationMethod(new ClientAuthenticationMethod(localClientRegistration.getClientAuthenticationMethod())) |
56 | - .redirectUriTemplate(defaultRedirectUriTemplate) | |
56 | + .redirectUri(defaultRedirectUriTemplate) | |
57 | 57 | .build(); |
58 | 58 | } |
59 | 59 | } | ... | ... |
... | ... | @@ -301,6 +301,7 @@ public class BaseRelationService implements RelationService { |
301 | 301 | fromAndTypeGroup.add(EntitySearchDirection.FROM.name()); |
302 | 302 | |
303 | 303 | Cache cache = cacheManager.getCache(RELATIONS_CACHE); |
304 | + @SuppressWarnings("unchecked") | |
304 | 305 | List<EntityRelation> fromCache = cache.get(fromAndTypeGroup, List.class); |
305 | 306 | if (fromCache != null) { |
306 | 307 | return Futures.immediateFuture(fromCache); |
... | ... | @@ -382,6 +383,7 @@ public class BaseRelationService implements RelationService { |
382 | 383 | toAndTypeGroup.add(EntitySearchDirection.TO.name()); |
383 | 384 | |
384 | 385 | Cache cache = cacheManager.getCache(RELATIONS_CACHE); |
386 | + @SuppressWarnings("unchecked") | |
385 | 387 | List<EntityRelation> fromCache = cache.get(toAndTypeGroup, List.class); |
386 | 388 | if (fromCache != null) { |
387 | 389 | return Futures.immediateFuture(fromCache); | ... | ... |
... | ... | @@ -45,12 +45,12 @@ public class JpaDashboardInfoDao extends JpaAbstractSearchTextDao<DashboardInfoE |
45 | 45 | private DashboardInfoRepository dashboardInfoRepository; |
46 | 46 | |
47 | 47 | @Override |
48 | - protected Class getEntityClass() { | |
48 | + protected Class<DashboardInfoEntity> getEntityClass() { | |
49 | 49 | return DashboardInfoEntity.class; |
50 | 50 | } |
51 | 51 | |
52 | 52 | @Override |
53 | - protected CrudRepository getCrudRepository() { | |
53 | + protected CrudRepository<DashboardInfoEntity, UUID> getCrudRepository() { | |
54 | 54 | return dashboardInfoRepository; |
55 | 55 | } |
56 | 56 | ... | ... |
... | ... | @@ -15,7 +15,7 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.dao.sql.device; |
17 | 17 | |
18 | -import org.apache.commons.lang.StringUtils; | |
18 | +import org.apache.commons.lang3.StringUtils; | |
19 | 19 | import org.springframework.beans.factory.annotation.Autowired; |
20 | 20 | import org.springframework.data.repository.CrudRepository; |
21 | 21 | import org.springframework.stereotype.Component; | ... | ... |
... | ... | @@ -39,12 +39,12 @@ public class JpaRuleChainDao extends JpaAbstractSearchTextDao<RuleChainEntity, R |
39 | 39 | private RuleChainRepository ruleChainRepository; |
40 | 40 | |
41 | 41 | @Override |
42 | - protected Class getEntityClass() { | |
42 | + protected Class<RuleChainEntity> getEntityClass() { | |
43 | 43 | return RuleChainEntity.class; |
44 | 44 | } |
45 | 45 | |
46 | 46 | @Override |
47 | - protected CrudRepository getCrudRepository() { | |
47 | + protected CrudRepository<RuleChainEntity, UUID> getCrudRepository() { | |
48 | 48 | return ruleChainRepository; |
49 | 49 | } |
50 | 50 | ... | ... |
... | ... | @@ -24,6 +24,8 @@ import org.thingsboard.server.dao.model.sql.RuleNodeEntity; |
24 | 24 | import org.thingsboard.server.dao.rule.RuleNodeDao; |
25 | 25 | import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao; |
26 | 26 | |
27 | +import java.util.UUID; | |
28 | + | |
27 | 29 | @Slf4j |
28 | 30 | @Component |
29 | 31 | public class JpaRuleNodeDao extends JpaAbstractSearchTextDao<RuleNodeEntity, RuleNode> implements RuleNodeDao { |
... | ... | @@ -32,12 +34,12 @@ public class JpaRuleNodeDao extends JpaAbstractSearchTextDao<RuleNodeEntity, Rul |
32 | 34 | private RuleNodeRepository ruleNodeRepository; |
33 | 35 | |
34 | 36 | @Override |
35 | - protected Class getEntityClass() { | |
37 | + protected Class<RuleNodeEntity> getEntityClass() { | |
36 | 38 | return RuleNodeEntity.class; |
37 | 39 | } |
38 | 40 | |
39 | 41 | @Override |
40 | - protected CrudRepository getCrudRepository() { | |
42 | + protected CrudRepository<RuleNodeEntity, UUID> getCrudRepository() { | |
41 | 43 | return ruleNodeRepository; |
42 | 44 | } |
43 | 45 | ... | ... |
... | ... | @@ -39,12 +39,12 @@ public class JpaRuleNodeStateDao extends JpaAbstractDao<RuleNodeStateEntity, Rul |
39 | 39 | private RuleNodeStateRepository ruleNodeStateRepository; |
40 | 40 | |
41 | 41 | @Override |
42 | - protected Class getEntityClass() { | |
42 | + protected Class<RuleNodeStateEntity> getEntityClass() { | |
43 | 43 | return RuleNodeStateEntity.class; |
44 | 44 | } |
45 | 45 | |
46 | 46 | @Override |
47 | - protected CrudRepository getCrudRepository() { | |
47 | + protected CrudRepository<RuleNodeStateEntity, UUID> getCrudRepository() { | |
48 | 48 | return ruleNodeStateRepository; |
49 | 49 | } |
50 | 50 | ... | ... |
... | ... | @@ -18,6 +18,8 @@ package org.thingsboard.server.dao.sql.rule; |
18 | 18 | import org.springframework.data.repository.CrudRepository; |
19 | 19 | import org.thingsboard.server.dao.model.sql.RuleNodeEntity; |
20 | 20 | |
21 | -public interface RuleNodeRepository extends CrudRepository<RuleNodeEntity, String> { | |
21 | +import java.util.UUID; | |
22 | + | |
23 | +public interface RuleNodeRepository extends CrudRepository<RuleNodeEntity, UUID> { | |
22 | 24 | |
23 | 25 | } | ... | ... |
... | ... | @@ -32,6 +32,7 @@ import lombok.extern.slf4j.Slf4j; |
32 | 32 | import org.springframework.beans.factory.annotation.Autowired; |
33 | 33 | import org.springframework.beans.factory.annotation.Value; |
34 | 34 | import org.springframework.core.env.Environment; |
35 | +import org.springframework.core.env.Profiles; | |
35 | 36 | import org.springframework.stereotype.Component; |
36 | 37 | import org.thingsboard.server.common.data.id.EntityId; |
37 | 38 | import org.thingsboard.server.common.data.id.TenantId; |
... | ... | @@ -108,7 +109,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD |
108 | 109 | private PreparedStatement deletePartitionStmt; |
109 | 110 | |
110 | 111 | private boolean isInstall() { |
111 | - return environment.acceptsProfiles("install"); | |
112 | + return environment.acceptsProfiles(Profiles.of("install")); | |
112 | 113 | } |
113 | 114 | |
114 | 115 | @PostConstruct | ... | ... |
... | ... | @@ -15,8 +15,8 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.dao.user; |
17 | 17 | |
18 | +import com.fasterxml.jackson.core.type.TypeReference; | |
18 | 19 | import com.fasterxml.jackson.databind.JsonNode; |
19 | -import com.fasterxml.jackson.databind.ObjectMapper; | |
20 | 20 | import com.fasterxml.jackson.databind.node.ObjectNode; |
21 | 21 | import com.google.common.util.concurrent.ListenableFuture; |
22 | 22 | import lombok.extern.slf4j.Slf4j; |
... | ... | @@ -48,6 +48,7 @@ import org.thingsboard.server.dao.service.DataValidator; |
48 | 48 | import org.thingsboard.server.dao.service.PaginatedRemover; |
49 | 49 | import org.thingsboard.server.dao.tenant.TbTenantProfileCache; |
50 | 50 | import org.thingsboard.server.dao.tenant.TenantDao; |
51 | +import org.thingsboard.server.dao.util.mapping.JacksonUtil; | |
51 | 52 | |
52 | 53 | import java.util.HashMap; |
53 | 54 | import java.util.Map; |
... | ... | @@ -71,8 +72,6 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic |
71 | 72 | |
72 | 73 | private static final String USER_CREDENTIALS_ENABLED = "userCredentialsEnabled"; |
73 | 74 | |
74 | - private static final ObjectMapper objectMapper = new ObjectMapper(); | |
75 | - | |
76 | 75 | @Value("${security.user_login_case_sensitive:true}") |
77 | 76 | private boolean userLoginCaseSensitive; |
78 | 77 | |
... | ... | @@ -279,7 +278,7 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic |
279 | 278 | User user = findUserById(tenantId, userId); |
280 | 279 | JsonNode additionalInfo = user.getAdditionalInfo(); |
281 | 280 | if (!(additionalInfo instanceof ObjectNode)) { |
282 | - additionalInfo = objectMapper.createObjectNode(); | |
281 | + additionalInfo = JacksonUtil.newObjectNode(); | |
283 | 282 | } |
284 | 283 | ((ObjectNode) additionalInfo).put(USER_CREDENTIALS_ENABLED, enabled); |
285 | 284 | user.setAdditionalInfo(additionalInfo); |
... | ... | @@ -302,7 +301,7 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic |
302 | 301 | private void setLastLoginTs(User user) { |
303 | 302 | JsonNode additionalInfo = user.getAdditionalInfo(); |
304 | 303 | if (!(additionalInfo instanceof ObjectNode)) { |
305 | - additionalInfo = objectMapper.createObjectNode(); | |
304 | + additionalInfo = JacksonUtil.newObjectNode(); | |
306 | 305 | } |
307 | 306 | ((ObjectNode) additionalInfo).put(LAST_LOGIN_TS, System.currentTimeMillis()); |
308 | 307 | user.setAdditionalInfo(additionalInfo); |
... | ... | @@ -311,7 +310,7 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic |
311 | 310 | private void resetFailedLoginAttempts(User user) { |
312 | 311 | JsonNode additionalInfo = user.getAdditionalInfo(); |
313 | 312 | if (!(additionalInfo instanceof ObjectNode)) { |
314 | - additionalInfo = objectMapper.createObjectNode(); | |
313 | + additionalInfo = JacksonUtil.newObjectNode(); | |
315 | 314 | } |
316 | 315 | ((ObjectNode) additionalInfo).put(FAILED_LOGIN_ATTEMPTS, 0); |
317 | 316 | user.setAdditionalInfo(additionalInfo); |
... | ... | @@ -329,7 +328,7 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic |
329 | 328 | private int increaseFailedLoginAttempts(User user) { |
330 | 329 | JsonNode additionalInfo = user.getAdditionalInfo(); |
331 | 330 | if (!(additionalInfo instanceof ObjectNode)) { |
332 | - additionalInfo = objectMapper.createObjectNode(); | |
331 | + additionalInfo = JacksonUtil.newObjectNode(); | |
333 | 332 | } |
334 | 333 | int failedLoginAttempts = 0; |
335 | 334 | if (additionalInfo.has(FAILED_LOGIN_ATTEMPTS)) { |
... | ... | @@ -353,26 +352,30 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic |
353 | 352 | private void updatePasswordHistory(User user, UserCredentials userCredentials) { |
354 | 353 | JsonNode additionalInfo = user.getAdditionalInfo(); |
355 | 354 | if (!(additionalInfo instanceof ObjectNode)) { |
356 | - additionalInfo = objectMapper.createObjectNode(); | |
355 | + additionalInfo = JacksonUtil.newObjectNode(); | |
357 | 356 | } |
357 | + Map<String, String> userPasswordHistoryMap = null; | |
358 | + JsonNode userPasswordHistoryJson; | |
358 | 359 | if (additionalInfo.has(USER_PASSWORD_HISTORY)) { |
359 | - JsonNode userPasswordHistoryJson = additionalInfo.get(USER_PASSWORD_HISTORY); | |
360 | - Map<String, String> userPasswordHistoryMap = objectMapper.convertValue(userPasswordHistoryJson, Map.class); | |
360 | + userPasswordHistoryJson = additionalInfo.get(USER_PASSWORD_HISTORY); | |
361 | + userPasswordHistoryMap = JacksonUtil.convertValue(userPasswordHistoryJson, new TypeReference<>(){}); | |
362 | + } | |
363 | + if (userPasswordHistoryMap != null) { | |
361 | 364 | userPasswordHistoryMap.put(Long.toString(System.currentTimeMillis()), userCredentials.getPassword()); |
362 | - userPasswordHistoryJson = objectMapper.valueToTree(userPasswordHistoryMap); | |
365 | + userPasswordHistoryJson = JacksonUtil.valueToTree(userPasswordHistoryMap); | |
363 | 366 | ((ObjectNode) additionalInfo).replace(USER_PASSWORD_HISTORY, userPasswordHistoryJson); |
364 | 367 | } else { |
365 | - Map<String, String> userPasswordHistoryMap = new HashMap<>(); | |
368 | + userPasswordHistoryMap = new HashMap<>(); | |
366 | 369 | userPasswordHistoryMap.put(Long.toString(System.currentTimeMillis()), userCredentials.getPassword()); |
367 | - JsonNode userPasswordHistoryJson = objectMapper.valueToTree(userPasswordHistoryMap); | |
370 | + userPasswordHistoryJson = JacksonUtil.valueToTree(userPasswordHistoryMap); | |
368 | 371 | ((ObjectNode) additionalInfo).set(USER_PASSWORD_HISTORY, userPasswordHistoryJson); |
369 | 372 | } |
370 | 373 | user.setAdditionalInfo(additionalInfo); |
371 | 374 | saveUser(user); |
372 | 375 | } |
373 | 376 | |
374 | - private DataValidator<User> userValidator = | |
375 | - new DataValidator<User>() { | |
377 | + private final DataValidator<User> userValidator = | |
378 | + new DataValidator<>() { | |
376 | 379 | @Override |
377 | 380 | protected void validateCreate(TenantId tenantId, User user) { |
378 | 381 | if (!user.getTenantId().getId().equals(ModelConstants.NULL_UUID)) { |
... | ... | @@ -452,8 +455,8 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic |
452 | 455 | } |
453 | 456 | }; |
454 | 457 | |
455 | - private DataValidator<UserCredentials> userCredentialsValidator = | |
456 | - new DataValidator<UserCredentials>() { | |
458 | + private final DataValidator<UserCredentials> userCredentialsValidator = | |
459 | + new DataValidator<>() { | |
457 | 460 | |
458 | 461 | @Override |
459 | 462 | protected void validateCreate(TenantId tenantId, UserCredentials userCredentials) { |
... | ... | @@ -484,7 +487,7 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic |
484 | 487 | } |
485 | 488 | }; |
486 | 489 | |
487 | - private PaginatedRemover<TenantId, User> tenantAdminsRemover = new PaginatedRemover<TenantId, User>() { | |
490 | + private final PaginatedRemover<TenantId, User> tenantAdminsRemover = new PaginatedRemover<>() { | |
488 | 491 | @Override |
489 | 492 | protected PageData<User> findEntities(TenantId tenantId, TenantId id, PageLink pageLink) { |
490 | 493 | return userDao.findTenantAdmins(id.getId(), pageLink); |
... | ... | @@ -496,7 +499,7 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic |
496 | 499 | } |
497 | 500 | }; |
498 | 501 | |
499 | - private PaginatedRemover<CustomerId, User> customerUsersRemover = new PaginatedRemover<CustomerId, User>() { | |
502 | + private final PaginatedRemover<CustomerId, User> customerUsersRemover = new PaginatedRemover<>() { | |
500 | 503 | @Override |
501 | 504 | protected PageData<User> findEntities(TenantId tenantId, CustomerId id, PageLink pageLink) { |
502 | 505 | return userDao.findCustomerUsers(tenantId.getId(), id.getId(), pageLink); | ... | ... |