Commit c3011e07b8907ce2eae1aae7849735ddaf90950f

Authored by Dima Landiak
1 parent a4faa317

transaction rule nodes init

@@ -34,6 +34,7 @@ import org.springframework.beans.factory.annotation.Value; @@ -34,6 +34,7 @@ import org.springframework.beans.factory.annotation.Value;
34 import org.springframework.context.annotation.Lazy; 34 import org.springframework.context.annotation.Lazy;
35 import org.springframework.stereotype.Component; 35 import org.springframework.stereotype.Component;
36 import org.thingsboard.rule.engine.api.MailService; 36 import org.thingsboard.rule.engine.api.MailService;
  37 +import org.thingsboard.rule.engine.api.RuleChainTransactionService;
37 import org.thingsboard.server.actors.service.ActorService; 38 import org.thingsboard.server.actors.service.ActorService;
38 import org.thingsboard.server.common.data.DataConstants; 39 import org.thingsboard.server.common.data.DataConstants;
39 import org.thingsboard.server.common.data.Event; 40 import org.thingsboard.server.common.data.Event;
@@ -222,6 +223,11 @@ public class ActorSystemContext { @@ -222,6 +223,11 @@ public class ActorSystemContext {
222 @Getter 223 @Getter
223 private RuleEngineTransportService ruleEngineTransportService; 224 private RuleEngineTransportService ruleEngineTransportService;
224 225
  226 + @Lazy
  227 + @Autowired
  228 + @Getter
  229 + private RuleChainTransactionService ruleChainTransactionService;
  230 +
225 @Value("${cluster.partition_id}") 231 @Value("${cluster.partition_id}")
226 @Getter 232 @Getter
227 private long queuePartitionId; 233 private long queuePartitionId;
@@ -20,6 +20,7 @@ import com.datastax.driver.core.utils.UUIDs; @@ -20,6 +20,7 @@ import com.datastax.driver.core.utils.UUIDs;
20 import org.springframework.util.StringUtils; 20 import org.springframework.util.StringUtils;
21 import org.thingsboard.rule.engine.api.ListeningExecutor; 21 import org.thingsboard.rule.engine.api.ListeningExecutor;
22 import org.thingsboard.rule.engine.api.MailService; 22 import org.thingsboard.rule.engine.api.MailService;
  23 +import org.thingsboard.rule.engine.api.RuleChainTransactionService;
23 import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest; 24 import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest;
24 import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcResponse; 25 import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcResponse;
25 import org.thingsboard.rule.engine.api.RuleEngineRpcService; 26 import org.thingsboard.rule.engine.api.RuleEngineRpcService;
@@ -233,6 +234,11 @@ class DefaultTbContext implements TbContext { @@ -233,6 +234,11 @@ class DefaultTbContext implements TbContext {
233 } 234 }
234 235
235 @Override 236 @Override
  237 + public RuleChainTransactionService getRuleChainTransactionService() {
  238 + return mainCtx.getRuleChainTransactionService();
  239 + }
  240 +
  241 + @Override
236 public MailService getMailService() { 242 public MailService getMailService() {
237 if (mainCtx.isAllowSystemMailService()) { 243 if (mainCtx.isAllowSystemMailService()) {
238 return mainCtx.getMailService(); 244 return mainCtx.getMailService();
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + * <p>
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.transaction;
  17 +
  18 +import com.google.common.util.concurrent.FutureCallback;
  19 +import lombok.extern.slf4j.Slf4j;
  20 +import org.springframework.beans.factory.annotation.Value;
  21 +import org.springframework.stereotype.Service;
  22 +import org.thingsboard.rule.engine.api.RuleChainTransactionService;
  23 +import org.thingsboard.server.common.data.id.EntityId;
  24 +import org.thingsboard.server.common.msg.TbMsg;
  25 +
  26 +import java.util.Queue;
  27 +import java.util.concurrent.ConcurrentHashMap;
  28 +import java.util.concurrent.ConcurrentMap;
  29 +import java.util.concurrent.LinkedBlockingQueue;
  30 +
  31 +@Service
  32 +@Slf4j
  33 +public class BaseRuleChainTransactionService implements RuleChainTransactionService {
  34 +
  35 + @Value("${actors.rule.transaction.queue_size}")
  36 + private int queueSize;
  37 +
  38 + private final ConcurrentMap<EntityId, Queue<TbMsg>> transactionMap = new ConcurrentHashMap<>();
  39 +
  40 + @Override
  41 + public void beginTransaction(EntityId entityId, FutureCallback<Void> callback) {
  42 +
  43 +
  44 + transactionMap.computeIfAbsent(entityId, id -> new LinkedBlockingQueue<>(queueSize));
  45 +
  46 + log.info("[{}]", queueSize);
  47 +
  48 + }
  49 +
  50 + @Override
  51 + public void endTransaction() {
  52 +
  53 + }
  54 +}
@@ -212,6 +212,9 @@ actors: @@ -212,6 +212,9 @@ actors:
212 node: 212 node:
213 # Errors for particular actor are persisted once per specified amount of milliseconds 213 # Errors for particular actor are persisted once per specified amount of milliseconds
214 error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}" 214 error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
  215 + transaction:
  216 + # Size of queues which store messages for transaction rule nodes
  217 + queue_size: "${ACTORS_RULE_TRANSACTION_QUEUE_SIZE:10}"
215 statistics: 218 statistics:
216 # Enable/disable actor statistics 219 # Enable/disable actor statistics
217 enabled: "${ACTORS_STATISTICS_ENABLED:true}" 220 enabled: "${ACTORS_STATISTICS_ENABLED:true}"
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.api;
  17 +
  18 +import com.google.common.util.concurrent.FutureCallback;
  19 +import org.thingsboard.server.common.data.id.EntityId;
  20 +
  21 +public interface RuleChainTransactionService {
  22 +
  23 + void beginTransaction(EntityId entityId, FutureCallback<Void> callback);
  24 +
  25 + void endTransaction();
  26 +
  27 +}
@@ -103,4 +103,6 @@ public interface TbContext { @@ -103,4 +103,6 @@ public interface TbContext {
103 ScriptEngine createJsScriptEngine(String script, String... argNames); 103 ScriptEngine createJsScriptEngine(String script, String... argNames);
104 104
105 String getNodeId(); 105 String getNodeId();
  106 +
  107 + RuleChainTransactionService getRuleChainTransactionService();
106 } 108 }
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.transaction;
  17 +
  18 +import com.google.common.util.concurrent.FutureCallback;
  19 +import lombok.extern.slf4j.Slf4j;
  20 +import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
  21 +import org.thingsboard.rule.engine.api.RuleNode;
  22 +import org.thingsboard.rule.engine.api.TbContext;
  23 +import org.thingsboard.rule.engine.api.TbNode;
  24 +import org.thingsboard.rule.engine.api.TbNodeConfiguration;
  25 +import org.thingsboard.rule.engine.api.TbNodeException;
  26 +import org.thingsboard.rule.engine.api.util.TbNodeUtils;
  27 +import org.thingsboard.server.common.data.plugin.ComponentType;
  28 +import org.thingsboard.server.common.msg.TbMsg;
  29 +
  30 +import java.util.concurrent.ExecutionException;
  31 +
  32 +@Slf4j
  33 +@RuleNode(
  34 + type = ComponentType.ACTION,
  35 + name = "transaction start",
  36 + configClazz = EmptyNodeConfiguration.class,
  37 + nodeDescription = "Something",
  38 + nodeDetails = "Something more",
  39 + uiResources = {"static/rulenode/rulenode-core-config.js"},
  40 + configDirective = ("tbNodeEmptyConfig")
  41 +)
  42 +public class TbTransactionBeginNode implements TbNode {
  43 +
  44 + private EmptyNodeConfiguration config;
  45 +
  46 + @Override
  47 + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
  48 + this.config = TbNodeUtils.convert(configuration, EmptyNodeConfiguration.class);
  49 + }
  50 +
  51 + @Override
  52 + public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
  53 + FutureCallback<Void> callback = null;
  54 +
  55 + ctx.getRuleChainTransactionService().beginTransaction(msg.getOriginator(), callback);
  56 + }
  57 +
  58 + @Override
  59 + public void destroy() {
  60 +
  61 + }
  62 +}
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.transaction;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
  20 +import org.thingsboard.rule.engine.api.RuleChainTransactionService;
  21 +import org.thingsboard.rule.engine.api.RuleNode;
  22 +import org.thingsboard.rule.engine.api.TbContext;
  23 +import org.thingsboard.rule.engine.api.TbNode;
  24 +import org.thingsboard.rule.engine.api.TbNodeConfiguration;
  25 +import org.thingsboard.rule.engine.api.TbNodeException;
  26 +import org.thingsboard.rule.engine.api.util.TbNodeUtils;
  27 +import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration;
  28 +import org.thingsboard.server.common.data.plugin.ComponentType;
  29 +import org.thingsboard.server.common.msg.TbMsg;
  30 +
  31 +import java.util.concurrent.ExecutionException;
  32 +
  33 +@Slf4j
  34 +@RuleNode(
  35 + type = ComponentType.ACTION,
  36 + name = "transaction end",
  37 + configClazz = EmptyNodeConfiguration.class,
  38 + nodeDescription = "Something",
  39 + nodeDetails = "Something more",
  40 + uiResources = {"static/rulenode/rulenode-core-config.js"},
  41 + configDirective = ("tbNodeEmptyConfig")
  42 +)
  43 +public class TbTransactionEndNode implements TbNode {
  44 +
  45 + private EmptyNodeConfiguration config;
  46 +
  47 + @Override
  48 + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
  49 + this.config = TbNodeUtils.convert(configuration, EmptyNodeConfiguration.class);
  50 + }
  51 +
  52 + @Override
  53 + public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
  54 +
  55 + }
  56 +
  57 + @Override
  58 + public void destroy() {
  59 +
  60 + }
  61 +}