Commit acd945fc885949929106b7539da1598b835bcad7

Authored by DK
2 parents e4a599a8 b80bca25

Merge branch 'master' of https://github.com/thingsboard/thingsboard into simplif…

…y-localizations-management
@@ -111,6 +111,7 @@ public class AppActor extends RuleChainManagerActor { @@ -111,6 +111,7 @@ public class AppActor extends RuleChainManagerActor {
111 case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG: 111 case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
112 case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG: 112 case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
113 case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: 113 case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
  114 + case REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG:
114 onToDeviceActorMsg((TenantAwareMsg) msg); 115 onToDeviceActorMsg((TenantAwareMsg) msg);
115 break; 116 break;
116 case ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG: 117 case ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG:
@@ -29,11 +29,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress; @@ -29,11 +29,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
29 import org.thingsboard.server.gen.cluster.ClusterAPIProtos; 29 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
30 import org.thingsboard.server.service.cluster.discovery.ServerInstance; 30 import org.thingsboard.server.service.cluster.discovery.ServerInstance;
31 31
32 -import java.util.HashMap;  
33 -import java.util.LinkedList;  
34 -import java.util.Map;  
35 -import java.util.Queue;  
36 -import java.util.UUID; 32 +import java.util.*;
37 33
38 /** 34 /**
39 * @author Andrew Shvayka 35 * @author Andrew Shvayka
@@ -88,7 +84,17 @@ public class RpcManagerActor extends ContextAwareActor { @@ -88,7 +84,17 @@ public class RpcManagerActor extends ContextAwareActor {
88 84
89 private void onMsg(RpcBroadcastMsg msg) { 85 private void onMsg(RpcBroadcastMsg msg) {
90 log.debug("Forwarding msg to session actors {}", msg); 86 log.debug("Forwarding msg to session actors {}", msg);
91 - sessionActors.keySet().forEach(address -> onMsg(msg.getMsg())); 87 + sessionActors.keySet().forEach(address -> {
  88 + ClusterAPIProtos.ClusterMessage msgWithServerAddress = msg.getMsg()
  89 + .toBuilder()
  90 + .setServerAddress(ClusterAPIProtos.ServerAddress
  91 + .newBuilder()
  92 + .setHost(address.getHost())
  93 + .setPort(address.getPort())
  94 + .build())
  95 + .build();
  96 + onMsg(msgWithServerAddress);
  97 + });
92 pendingMsgs.values().forEach(queue -> queue.add(msg.getMsg())); 98 pendingMsgs.values().forEach(queue -> queue.add(msg.getMsg()));
93 } 99 }
94 100
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.actors.ruleChain;
  17 +
  18 +import lombok.Data;
  19 +import org.thingsboard.server.common.data.id.RuleChainId;
  20 +import org.thingsboard.server.common.data.id.TenantId;
  21 +import org.thingsboard.server.common.msg.MsgType;
  22 +import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
  23 +import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
  24 +
  25 +/**
  26 + * Created by ashvayka on 19.03.18.
  27 + */
  28 +@Data
  29 +final class RemoteToRuleChainTellNextMsg extends RuleNodeToRuleChainTellNextMsg implements TenantAwareMsg, RuleChainAwareMsg {
  30 +
  31 + private final TenantId tenantId;
  32 + private final RuleChainId ruleChainId;
  33 +
  34 + public RemoteToRuleChainTellNextMsg(RuleNodeToRuleChainTellNextMsg original, TenantId tenantId, RuleChainId ruleChainId) {
  35 + super(original.getOriginator(), original.getRelationTypes(), original.getMsg());
  36 + this.tenantId = tenantId;
  37 + this.ruleChainId = ruleChainId;
  38 + }
  39 +
  40 + @Override
  41 + public MsgType getMsgType() {
  42 + return MsgType.REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG;
  43 + }
  44 +
  45 +}
@@ -49,6 +49,7 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe @@ -49,6 +49,7 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
49 processor.onDeviceActorToRuleEngineMsg((DeviceActorToRuleEngineMsg) msg); 49 processor.onDeviceActorToRuleEngineMsg((DeviceActorToRuleEngineMsg) msg);
50 break; 50 break;
51 case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG: 51 case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
  52 + case REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG:
52 processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg); 53 processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);
53 break; 54 break;
54 case RULE_CHAIN_TO_RULE_CHAIN_MSG: 55 case RULE_CHAIN_TO_RULE_CHAIN_MSG:
@@ -20,6 +20,9 @@ import akka.actor.ActorRef; @@ -20,6 +20,9 @@ import akka.actor.ActorRef;
20 import akka.actor.Props; 20 import akka.actor.Props;
21 import akka.event.LoggingAdapter; 21 import akka.event.LoggingAdapter;
22 import com.datastax.driver.core.utils.UUIDs; 22 import com.datastax.driver.core.utils.UUIDs;
  23 +
  24 +import java.util.Optional;
  25 +
23 import org.thingsboard.server.actors.ActorSystemContext; 26 import org.thingsboard.server.actors.ActorSystemContext;
24 import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg; 27 import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg;
25 import org.thingsboard.server.actors.device.RuleEngineQueuePutAckMsg; 28 import org.thingsboard.server.actors.device.RuleEngineQueuePutAckMsg;
@@ -37,6 +40,7 @@ import org.thingsboard.server.common.data.rule.RuleChain; @@ -37,6 +40,7 @@ import org.thingsboard.server.common.data.rule.RuleChain;
37 import org.thingsboard.server.common.data.rule.RuleNode; 40 import org.thingsboard.server.common.data.rule.RuleNode;
38 import org.thingsboard.server.common.msg.TbMsg; 41 import org.thingsboard.server.common.msg.TbMsg;
39 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; 42 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
  43 +import org.thingsboard.server.common.msg.cluster.ServerAddress;
40 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; 44 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
41 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; 45 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
42 import org.thingsboard.server.dao.rule.RuleChainService; 46 import org.thingsboard.server.dao.rule.RuleChainService;
@@ -217,16 +221,36 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -217,16 +221,36 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
217 221
218 void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) { 222 void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
219 checkActive(); 223 checkActive();
220 - RuleNodeId originator = envelope.getOriginator();  
221 - List<RuleNodeRelation> relations = nodeRoutes.get(originator).stream()  
222 - .filter(r -> contains(envelope.getRelationTypes(), r.getType()))  
223 - .collect(Collectors.toList()); 224 + TbMsg msg = envelope.getMsg();
  225 + EntityId originatorEntityId = msg.getOriginator();
  226 + Optional<ServerAddress> address = systemContext.getRoutingService().resolveById(originatorEntityId);
  227 +
  228 + if (address.isPresent()) {
  229 + onRemoteTellNext(address.get(), envelope);
  230 + } else {
  231 + onLocalTellNext(envelope);
  232 + }
  233 + }
  234 +
  235 + private void onRemoteTellNext(ServerAddress serverAddress, RuleNodeToRuleChainTellNextMsg envelope) {
  236 + TbMsg msg = envelope.getMsg();
  237 + logger.debug("Forwarding [{}] msg to remote server [{}] due to changed originator id: [{}]", msg.getId(), serverAddress, msg.getOriginator());
  238 + envelope = new RemoteToRuleChainTellNextMsg(envelope, tenantId, entityId);
  239 + systemContext.getRpcService().tell(systemContext.getEncodingService().convertToProtoDataMessage(serverAddress, envelope));
  240 + }
224 241
  242 + private void onLocalTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
225 TbMsg msg = envelope.getMsg(); 243 TbMsg msg = envelope.getMsg();
  244 + RuleNodeId originatorNodeId = envelope.getOriginator();
  245 + List<RuleNodeRelation> relations = nodeRoutes.get(originatorNodeId).stream()
  246 + .filter(r -> contains(envelope.getRelationTypes(), r.getType()))
  247 + .collect(Collectors.toList());
226 int relationsCount = relations.size(); 248 int relationsCount = relations.size();
227 EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId(); 249 EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
228 if (relationsCount == 0) { 250 if (relationsCount == 0) {
229 - queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition()); 251 + if (ackId != null) {
  252 + queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
  253 + }
230 } else if (relationsCount == 1) { 254 } else if (relationsCount == 1) {
231 for (RuleNodeRelation relation : relations) { 255 for (RuleNodeRelation relation : relations) {
232 pushToTarget(msg, relation.getOut(), relation.getType()); 256 pushToTarget(msg, relation.getOut(), relation.getType());
@@ -244,7 +268,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -244,7 +268,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
244 } 268 }
245 } 269 }
246 //TODO: Ideally this should happen in async way when all targets confirm that the copied messages are successfully written to corresponding target queues. 270 //TODO: Ideally this should happen in async way when all targets confirm that the copied messages are successfully written to corresponding target queues.
247 - queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition()); 271 + if (ackId != null) {
  272 + queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
  273 + }
248 } 274 }
249 } 275 }
250 276
@@ -20,12 +20,13 @@ import org.thingsboard.server.common.data.id.RuleChainId; @@ -20,12 +20,13 @@ import org.thingsboard.server.common.data.id.RuleChainId;
20 import org.thingsboard.server.common.msg.MsgType; 20 import org.thingsboard.server.common.msg.MsgType;
21 import org.thingsboard.server.common.msg.TbActorMsg; 21 import org.thingsboard.server.common.msg.TbActorMsg;
22 import org.thingsboard.server.common.msg.TbMsg; 22 import org.thingsboard.server.common.msg.TbMsg;
  23 +import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
23 24
24 /** 25 /**
25 * Created by ashvayka on 19.03.18. 26 * Created by ashvayka on 19.03.18.
26 */ 27 */
27 @Data 28 @Data
28 -public final class RuleChainToRuleChainMsg implements TbActorMsg { 29 +public final class RuleChainToRuleChainMsg implements TbActorMsg, RuleChainAwareMsg {
29 30
30 private final RuleChainId target; 31 private final RuleChainId target;
31 private final RuleChainId source; 32 private final RuleChainId source;
@@ -34,6 +35,11 @@ public final class RuleChainToRuleChainMsg implements TbActorMsg { @@ -34,6 +35,11 @@ public final class RuleChainToRuleChainMsg implements TbActorMsg {
34 private final boolean enqueue; 35 private final boolean enqueue;
35 36
36 @Override 37 @Override
  38 + public RuleChainId getRuleChainId() {
  39 + return target;
  40 + }
  41 +
  42 + @Override
37 public MsgType getMsgType() { 43 public MsgType getMsgType() {
38 return MsgType.RULE_CHAIN_TO_RULE_CHAIN_MSG; 44 return MsgType.RULE_CHAIN_TO_RULE_CHAIN_MSG;
39 } 45 }
@@ -27,7 +27,7 @@ import java.util.Set; @@ -27,7 +27,7 @@ import java.util.Set;
27 * Created by ashvayka on 19.03.18. 27 * Created by ashvayka on 19.03.18.
28 */ 28 */
29 @Data 29 @Data
30 -final class RuleNodeToRuleChainTellNextMsg implements TbActorMsg { 30 +class RuleNodeToRuleChainTellNextMsg implements TbActorMsg {
31 31
32 private final RuleNodeId originator; 32 private final RuleNodeId originator;
33 private final Set<String> relationTypes; 33 private final Set<String> relationTypes;
@@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.id.TenantId; @@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.id.TenantId;
35 import org.thingsboard.server.common.data.rule.RuleChain; 35 import org.thingsboard.server.common.data.rule.RuleChain;
36 import org.thingsboard.server.common.msg.TbActorMsg; 36 import org.thingsboard.server.common.msg.TbActorMsg;
37 import org.thingsboard.server.common.msg.aware.DeviceAwareMsg; 37 import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
  38 +import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
38 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; 39 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
39 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; 40 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
40 import scala.concurrent.duration.Duration; 41 import scala.concurrent.duration.Duration;
@@ -94,7 +95,8 @@ public class TenantActor extends RuleChainManagerActor { @@ -94,7 +95,8 @@ public class TenantActor extends RuleChainManagerActor {
94 onToDeviceActorMsg((DeviceAwareMsg) msg); 95 onToDeviceActorMsg((DeviceAwareMsg) msg);
95 break; 96 break;
96 case RULE_CHAIN_TO_RULE_CHAIN_MSG: 97 case RULE_CHAIN_TO_RULE_CHAIN_MSG:
97 - onRuleChainMsg((RuleChainToRuleChainMsg) msg); 98 + case REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG:
  99 + onRuleChainMsg((RuleChainAwareMsg) msg);
98 break; 100 break;
99 default: 101 default:
100 return false; 102 return false;
@@ -120,8 +122,8 @@ public class TenantActor extends RuleChainManagerActor { @@ -120,8 +122,8 @@ public class TenantActor extends RuleChainManagerActor {
120 else logger.info("[{}] No Root Chain", msg); 122 else logger.info("[{}] No Root Chain", msg);
121 } 123 }
122 124
123 - private void onRuleChainMsg(RuleChainToRuleChainMsg msg) {  
124 - ruleChainManager.getOrCreateActor(context(), msg.getTarget()).tell(msg, self()); 125 + private void onRuleChainMsg(RuleChainAwareMsg msg) {
  126 + ruleChainManager.getOrCreateActor(context(), msg.getRuleChainId()).tell(msg, self());
125 } 127 }
126 128
127 129
@@ -63,6 +63,11 @@ public enum MsgType { @@ -63,6 +63,11 @@ public enum MsgType {
63 RULE_TO_RULE_CHAIN_TELL_NEXT_MSG, 63 RULE_TO_RULE_CHAIN_TELL_NEXT_MSG,
64 64
65 /** 65 /**
  66 + * Message forwarded from original rule chain to remote rule chain due to change in the cluster structure or originator entity of the TbMsg.
  67 + */
  68 + REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG,
  69 +
  70 + /**
66 * Message that is sent by RuleActor implementation to RuleActor itself to log the error. 71 * Message that is sent by RuleActor implementation to RuleActor itself to log the error.
67 */ 72 */
68 RULE_TO_SELF_ERROR_MSG, 73 RULE_TO_SELF_ERROR_MSG,
@@ -101,6 +106,10 @@ public enum MsgType { @@ -101,6 +106,10 @@ public enum MsgType {
101 /** 106 /**
102 * Message that is sent from Rule Engine to the Device Actor when message is successfully pushed to queue. 107 * Message that is sent from Rule Engine to the Device Actor when message is successfully pushed to queue.
103 */ 108 */
104 - RULE_ENGINE_QUEUE_PUT_ACK_MSG, ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG, TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG, SESSION_TIMEOUT_MSG, SESSION_CTRL_MSG; 109 + RULE_ENGINE_QUEUE_PUT_ACK_MSG,
  110 + ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG,
  111 + TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG,
  112 + SESSION_TIMEOUT_MSG,
  113 + SESSION_CTRL_MSG;
105 114
106 } 115 }
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.msg.aware;
  17 +
  18 +import org.thingsboard.server.common.data.id.RuleChainId;
  19 +
  20 +public interface RuleChainAwareMsg {
  21 +
  22 + RuleChainId getRuleChainId();
  23 +
  24 +}
  1 +#
  2 +# Copyright © 2016-2018 The Thingsboard Authors
  3 +#
  4 +# Licensed under the Apache License, Version 2.0 (the "License");
  5 +# you may not use this file except in compliance with the License.
  6 +# You may obtain a copy of the License at
  7 +#
  8 +# http://www.apache.org/licenses/LICENSE-2.0
  9 +#
  10 +# Unless required by applicable law or agreed to in writing, software
  11 +# distributed under the License is distributed on an "AS IS" BASIS,
  12 +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 +# See the License for the specific language governing permissions and
  14 +# limitations under the License.
  15 +#
  16 +
  17 +FROM openjdk:8-jre
  18 +
  19 +ADD upgrade.sh /upgrade.sh
  20 +ADD thingsboard.deb /thingsboard.deb
  21 +
  22 +RUN apt-get update \
  23 + && apt-get install -y nmap \
  24 + && chmod +x /upgrade.sh
  1 +VERSION=2.0.3
  2 +PROJECT=thingsboard
  3 +APP=cassandra-upgrade
  4 +
  5 +build:
  6 + cp ../../application/target/thingsboard.deb .
  7 + docker build --pull -t ${PROJECT}/${APP}:${VERSION} -t ${PROJECT}/${APP}:latest .
  8 + rm thingsboard.deb
  9 +
  10 +push: build
  11 + docker push ${PROJECT}/${APP}:${VERSION}
  12 + docker push ${PROJECT}/${APP}:latest
  1 +#!/bin/bash
  2 +#
  3 +# Copyright © 2016-2018 The Thingsboard Authors
  4 +#
  5 +# Licensed under the Apache License, Version 2.0 (the "License");
  6 +# you may not use this file except in compliance with the License.
  7 +# You may obtain a copy of the License at
  8 +#
  9 +# http://www.apache.org/licenses/LICENSE-2.0
  10 +#
  11 +# Unless required by applicable law or agreed to in writing, software
  12 +# distributed under the License is distributed on an "AS IS" BASIS,
  13 +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 +# See the License for the specific language governing permissions and
  15 +# limitations under the License.
  16 +#
  17 +
  18 +
  19 +dpkg -i /thingsboard.deb
  20 +
  21 +until nmap $CASSANDRA_HOST -p $CASSANDRA_PORT | grep "$CASSANDRA_PORT/tcp open"
  22 +do
  23 + echo "Wait for cassandra db to start..."
  24 + sleep 10
  25 +done
  26 +
  27 +echo "Upgrading 'Thingsboard' schema..."
  28 +/usr/share/thingsboard/bin/install/upgrade.sh --fromVersion=$UPGRADE_FROM_VERSION
  1 +#
  2 +# Copyright © 2016-2018 The Thingsboard Authors
  3 +#
  4 +# Licensed under the Apache License, Version 2.0 (the "License");
  5 +# you may not use this file except in compliance with the License.
  6 +# You may obtain a copy of the License at
  7 +#
  8 +# http://www.apache.org/licenses/LICENSE-2.0
  9 +#
  10 +# Unless required by applicable law or agreed to in writing, software
  11 +# distributed under the License is distributed on an "AS IS" BASIS,
  12 +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 +# See the License for the specific language governing permissions and
  14 +# limitations under the License.
  15 +#
  16 +
  17 +apiVersion: v1
  18 +kind: Pod
  19 +metadata:
  20 + name: cassandra-upgrade
  21 +spec:
  22 + containers:
  23 + - name: cassandra-upgrade
  24 + imagePullPolicy: Always
  25 + image: thingsboard/cassandra-upgrade:2.0.3
  26 + env:
  27 + - name: ADD_DEMO_DATA
  28 + value: "true"
  29 + - name : CASSANDRA_HOST
  30 + value: "cassandra-headless"
  31 + - name : CASSANDRA_PORT
  32 + value: "9042"
  33 + - name : DATABASE_TYPE
  34 + value: "cassandra"
  35 + - name : CASSANDRA_URL
  36 + value: "cassandra-headless:9042"
  37 + - name : UPGRADE_FROM_VERSION
  38 + value: "1.4.0"
  39 + command:
  40 + - sh
  41 + - -c
  42 + - /upgrade.sh
  43 + restartPolicy: Never
@@ -47,11 +47,12 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; @@ -47,11 +47,12 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
47 47
48 public class TbMsgGeneratorNode implements TbNode { 48 public class TbMsgGeneratorNode implements TbNode {
49 49
50 - public static final String TB_MSG_GENERATOR_NODE_MSG = "TbMsgGeneratorNodeMsg"; 50 + private static final String TB_MSG_GENERATOR_NODE_MSG = "TbMsgGeneratorNodeMsg";
51 51
52 private TbMsgGeneratorNodeConfiguration config; 52 private TbMsgGeneratorNodeConfiguration config;
53 private ScriptEngine jsEngine; 53 private ScriptEngine jsEngine;
54 private long delay; 54 private long delay;
  55 + private long lastScheduledTs;
55 private EntityId originatorId; 56 private EntityId originatorId;
56 private UUID nextTickId; 57 private UUID nextTickId;
57 private TbMsg prevMsg; 58 private TbMsg prevMsg;
@@ -66,28 +67,40 @@ public class TbMsgGeneratorNode implements TbNode { @@ -66,28 +67,40 @@ public class TbMsgGeneratorNode implements TbNode {
66 originatorId = ctx.getSelfId(); 67 originatorId = ctx.getSelfId();
67 } 68 }
68 this.jsEngine = ctx.createJsScriptEngine(config.getJsScript(), "prevMsg", "prevMetadata", "prevMsgType"); 69 this.jsEngine = ctx.createJsScriptEngine(config.getJsScript(), "prevMsg", "prevMetadata", "prevMsgType");
69 - sentTickMsg(ctx); 70 + scheduleTickMsg(ctx);
70 } 71 }
71 72
72 @Override 73 @Override
73 public void onMsg(TbContext ctx, TbMsg msg) { 74 public void onMsg(TbContext ctx, TbMsg msg) {
74 if (msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) { 75 if (msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) {
75 withCallback(generate(ctx), 76 withCallback(generate(ctx),
76 - m -> {ctx.tellNext(m, SUCCESS); sentTickMsg(ctx);},  
77 - t -> {ctx.tellFailure(msg, t); sentTickMsg(ctx);}); 77 + m -> {
  78 + ctx.tellNext(m, SUCCESS);
  79 + scheduleTickMsg(ctx);
  80 + },
  81 + t -> {
  82 + ctx.tellFailure(msg, t);
  83 + scheduleTickMsg(ctx);
  84 + });
78 } 85 }
79 } 86 }
80 87
81 - private void sentTickMsg(TbContext ctx) { 88 + private void scheduleTickMsg(TbContext ctx) {
  89 + long curTs = System.currentTimeMillis();
  90 + if (lastScheduledTs == 0L) {
  91 + lastScheduledTs = curTs;
  92 + }
  93 + lastScheduledTs = lastScheduledTs + delay;
  94 + long curDelay = Math.max(0L, (lastScheduledTs - curTs));
82 TbMsg tickMsg = ctx.newMsg(TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), ""); 95 TbMsg tickMsg = ctx.newMsg(TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
83 nextTickId = tickMsg.getId(); 96 nextTickId = tickMsg.getId();
84 - ctx.tellSelf(tickMsg, delay); 97 + ctx.tellSelf(tickMsg, curDelay);
85 } 98 }
86 99
87 private ListenableFuture<TbMsg> generate(TbContext ctx) { 100 private ListenableFuture<TbMsg> generate(TbContext ctx) {
88 return ctx.getJsExecutor().executeAsync(() -> { 101 return ctx.getJsExecutor().executeAsync(() -> {
89 if (prevMsg == null) { 102 if (prevMsg == null) {
90 - prevMsg = ctx.newMsg( "", originatorId, new TbMsgMetaData(), "{}"); 103 + prevMsg = ctx.newMsg("", originatorId, new TbMsgMetaData(), "{}");
91 } 104 }
92 TbMsg generated = jsEngine.executeGenerate(prevMsg); 105 TbMsg generated = jsEngine.executeGenerate(prevMsg);
93 prevMsg = ctx.newMsg(generated.getType(), originatorId, generated.getMetaData(), generated.getData()); 106 prevMsg = ctx.newMsg(generated.getType(), originatorId, generated.getMetaData(), generated.getData());