Commit a7d966c7888b707c24f0d48859c8047bd895f71d

Authored by Dima Landiak
1 parent ae42bb4f

transaction fixed based on requested changes

... ... @@ -22,16 +22,15 @@ import org.springframework.beans.factory.annotation.Value;
22 22 import org.springframework.stereotype.Service;
23 23 import org.thingsboard.rule.engine.api.RuleChainTransactionService;
24 24 import org.thingsboard.rule.engine.api.TbContext;
25   -import org.thingsboard.server.common.data.EntityType;
26   -import org.thingsboard.server.common.data.id.AssetId;
27   -import org.thingsboard.server.common.data.id.DeviceId;
28 25 import org.thingsboard.server.common.data.id.EntityId;
  26 +import org.thingsboard.server.common.data.id.EntityIdFactory;
29 27 import org.thingsboard.server.common.data.id.TenantId;
30 28 import org.thingsboard.server.common.msg.TbMsg;
31 29 import org.thingsboard.server.common.msg.cluster.ServerAddress;
32 30 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
33 31 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
34 32 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
  33 +import org.thingsboard.server.service.executors.DbCallbackExecutorService;
35 34
36 35 import javax.annotation.PostConstruct;
37 36 import javax.annotation.PreDestroy;
... ... @@ -39,6 +38,7 @@ import java.util.Optional;
39 38 import java.util.Queue;
40 39 import java.util.UUID;
41 40 import java.util.concurrent.BlockingQueue;
  41 +import java.util.concurrent.Callable;
42 42 import java.util.concurrent.ConcurrentHashMap;
43 43 import java.util.concurrent.ConcurrentLinkedQueue;
44 44 import java.util.concurrent.ConcurrentMap;
... ... @@ -60,6 +60,9 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
60 60 @Autowired
61 61 private ClusterRpcService clusterRpcService;
62 62
  63 + @Autowired
  64 + private DbCallbackExecutorService callbackExecutor;
  65 +
63 66 @Value("${actors.rule.transaction.queue_size}")
64 67 private int finalQueueSize;
65 68 @Value("${actors.rule.transaction.duration}")
... ... @@ -70,12 +73,10 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
70 73 private final Queue<TbTransactionTask> timeoutQueue = new ConcurrentLinkedQueue<>();
71 74
72 75 private ExecutorService timeoutExecutor;
73   - private ExecutorService executor;
74 76
75 77 @PostConstruct
76 78 public void init() {
77 79 timeoutExecutor = Executors.newSingleThreadExecutor();
78   - executor = Executors.newSingleThreadExecutor();
79 80 executeOnTimeout();
80 81 }
81 82
... ... @@ -84,9 +85,6 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
84 85 if (timeoutExecutor != null) {
85 86 timeoutExecutor.shutdownNow();
86 87 }
87   - if (executor != null) {
88   - executor.shutdownNow();
89   - }
90 88 }
91 89
92 90 @Override
... ... @@ -96,16 +94,16 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
96 94 BlockingQueue<TbTransactionTask> queue = transactionMap.computeIfAbsent(msg.getTransactionData().getOriginatorId(), id ->
97 95 new LinkedBlockingQueue<>(finalQueueSize));
98 96
99   - TbTransactionTask task = new TbTransactionTask(msg, onStart, onEnd, onFailure);
  97 + TbTransactionTask transactionTask = new TbTransactionTask(msg, onStart, onEnd, onFailure, System.currentTimeMillis() + duration);
100 98 int queueSize = queue.size();
101 99 if (queueSize >= finalQueueSize) {
102   - task.getOnFailure().accept(new RuntimeException("Queue has no space!"));
  100 + executeOnFailure(transactionTask.getOnFailure(), "Queue has no space!");
103 101 } else {
104   - addMsgToQueues(queue, task);
  102 + addMsgToQueues(queue, transactionTask);
105 103 if (queueSize == 0) {
106   - startTransactionTask(task);
  104 + executeOnSuccess(transactionTask.getOnStart(), transactionTask.getMsg());
107 105 } else {
108   - log.trace("Msg [{}] [{}] is waiting to start transaction!", msg.getId(), msg.getType());
  106 + log.trace("Msg [{}][{}] is waiting to start transaction!", msg.getId(), msg.getType());
109 107 }
110 108 }
111 109 } finally {
... ... @@ -113,64 +111,79 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
113 111 }
114 112 }
115 113
116   - private void addMsgToQueues(BlockingQueue<TbTransactionTask> queue, TbTransactionTask task) {
117   - queue.offer(task);
118   - timeoutQueue.offer(task);
  114 + private void addMsgToQueues(BlockingQueue<TbTransactionTask> queue, TbTransactionTask transactionTask) {
  115 + queue.offer(transactionTask);
  116 + timeoutQueue.offer(transactionTask);
119 117 log.trace("Added msg to queue, size: [{}]", queue.size());
120 118 }
121 119
122 120 @Override
123   - public boolean endTransaction(TbContext ctx, TbMsg msg, Consumer<Throwable> onFailure) {
124   - BlockingQueue<TbTransactionTask> queue = transactionMap.get(msg.getTransactionData().getOriginatorId());
125   -
126   - TbTransactionTask currentTask = queue.peek();
127   - if (currentTask != null) {
128   - if (currentTask.getMsg().getTransactionData().getTransactionId().equals(msg.getTransactionData().getTransactionId())) {
129   - currentTask.setIsCompleted(true);
130   - queue.remove();
131   - log.trace("Removed msg from queue, size [{}]", queue.size());
132   - currentTask.getOnEnd().accept(currentTask.getMsg());
133   -
134   - TbTransactionTask nextTask = queue.peek();
135   - if (nextTask != null) {
136   - startTransactionTask(nextTask);
  121 + public void endTransaction(TbContext ctx, TbMsg msg, Consumer<TbMsg> onSuccess, Consumer<Throwable> onFailure) {
  122 + EntityId originatorId = msg.getTransactionData().getOriginatorId();
  123 +
  124 + if (!onRemoteTransactionEndSync(ctx.getTenantId(), originatorId)) {
  125 + transactionLock.lock();
  126 + try {
  127 + BlockingQueue<TbTransactionTask> queue = transactionMap.computeIfAbsent(originatorId, id ->
  128 + new LinkedBlockingQueue<>(finalQueueSize));
  129 +
  130 + TbTransactionTask currentTransactionTask = queue.peek();
  131 + if (currentTransactionTask != null) {
  132 + if (currentTransactionTask.getMsg().getTransactionData().getTransactionId().equals(msg.getTransactionData().getTransactionId())) {
  133 + currentTransactionTask.setCompleted(true);
  134 + queue.poll();
  135 + log.trace("Removed msg from queue, size [{}]", queue.size());
  136 +
  137 + executeOnSuccess(currentTransactionTask.getOnEnd(), currentTransactionTask.getMsg());
  138 + executeOnSuccess(onSuccess, currentTransactionTask.getMsg());
  139 +
  140 + TbTransactionTask nextTransactionTask = queue.peek();
  141 + if (nextTransactionTask != null) {
  142 + executeOnSuccess(nextTransactionTask.getOnStart(), nextTransactionTask.getMsg());
  143 + }
  144 + } else {
  145 + log.trace("Task has expired!");
  146 + executeOnFailure(onFailure, "Task has expired!");
  147 + }
  148 + } else {
  149 + log.trace("Queue is empty, previous task has expired!");
  150 + executeOnFailure(onFailure, "Queue is empty, previous task has expired!");
137 151 }
138   - } else {
139   - log.trace("Task has expired!");
140   - onFailure.accept(new RuntimeException("Task has expired!"));
141   - return true;
  152 + } finally {
  153 + transactionLock.unlock();
142 154 }
143   - } else {
144   - log.trace("Queue is empty, previous task has expired!");
145   - onFailure.accept(new RuntimeException("Queue is empty, previous task has expired!"));
146   - return true;
147 155 }
148   - return false;
149 156 }
150 157
151 158 private void executeOnTimeout() {
152 159 timeoutExecutor.submit(() -> {
153 160 while (true) {
154   - TbTransactionTask task = timeoutQueue.peek();
155   - if (task != null) {
156   - if (task.getIsCompleted()) {
  161 + TbTransactionTask transactionTask = timeoutQueue.peek();
  162 + if (transactionTask != null) {
  163 + if (transactionTask.isCompleted()) {
157 164 timeoutQueue.poll();
158 165 } else {
159   - if (System.currentTimeMillis() > task.getExpirationTime()) {
160   - log.trace("Task has expired! Deleting it...[{}] [{}]", task.getMsg().getId(), task.getMsg().getType());
161   - timeoutQueue.poll();
162   - task.getOnFailure().accept(new RuntimeException("Task has expired!"));
163   -
164   - BlockingQueue<TbTransactionTask> queue = transactionMap.get(task.getMsg().getTransactionData().getOriginatorId());
165   - queue.poll();
  166 + if (System.currentTimeMillis() > transactionTask.getExpirationTime()) {
  167 + transactionLock.lock();
  168 + try {
  169 + log.trace("Task has expired! Deleting it...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType());
  170 + timeoutQueue.poll();
  171 + executeOnFailure(transactionTask.getOnFailure(), "Task has expired!");
166 172
167   - TbTransactionTask nextTask = queue.peek();
168   - if (nextTask != null) {
169   - startTransactionTask(nextTask);
  173 + BlockingQueue<TbTransactionTask> queue = transactionMap.get(transactionTask.getMsg().getTransactionData().getOriginatorId());
  174 + if (queue != null) {
  175 + queue.poll();
  176 + TbTransactionTask nextTransactionTask = queue.peek();
  177 + if (nextTransactionTask != null) {
  178 + executeOnSuccess(nextTransactionTask.getOnStart(), nextTransactionTask.getMsg());
  179 + }
  180 + }
  181 + } finally {
  182 + transactionLock.unlock();
170 183 }
171 184 } else {
172 185 try {
173   - log.trace("Task has not expired! Continue executing...[{}] [{}]", task.getMsg().getId(), task.getMsg().getType());
  186 + log.trace("Task has not expired! Continue executing...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType());
174 187 TimeUnit.MILLISECONDS.sleep(duration);
175 188 } catch (InterruptedException e) {
176 189 throw new IllegalStateException("Thread interrupted", e);
... ... @@ -189,10 +202,22 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
189 202 });
190 203 }
191 204
192   - private void startTransactionTask(TbTransactionTask task) {
193   - task.setIsCompleted(false);
194   - task.setExpirationTime(System.currentTimeMillis() + duration);
195   - task.getOnStart().accept(task.getMsg());
  205 + private void executeOnFailure(Consumer<Throwable> onFailure, String exception) {
  206 + executeCallback(() -> {
  207 + onFailure.accept(new RuntimeException(exception));
  208 + return null;
  209 + });
  210 + }
  211 +
  212 + private void executeOnSuccess(Consumer<TbMsg> onSuccess, TbMsg tbMsg) {
  213 + executeCallback(() -> {
  214 + onSuccess.accept(tbMsg);
  215 + return null;
  216 + });
  217 + }
  218 +
  219 + private void executeCallback(Callable<Void> task) {
  220 + callbackExecutor.executeAsync(task);
196 221 }
197 222
198 223 @Override
... ... @@ -204,25 +229,21 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
204 229 throw new RuntimeException(e);
205 230 }
206 231 TenantId tenantId = new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB()));
207   -
208   - String entityTypeStr = proto.getEntityType();
209   - EntityId entityId;
210   - if (entityTypeStr.equals(EntityType.ASSET.name())) {
211   - entityId = new AssetId(new UUID(proto.getOriginatorIdMSB(), proto.getOriginatorIdLSB()));
212   - } else {
213   - entityId = new DeviceId(new UUID(proto.getOriginatorIdMSB(), proto.getOriginatorIdLSB()));
214   - }
  232 + EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getOriginatorIdMSB(), proto.getOriginatorIdLSB()));
215 233 onTransactionEnd(tenantId, entityId);
216 234 }
217 235
218   - @Override
219   - public void onTransactionEnd(TenantId tenantId, EntityId entityId) {
220   - executor.submit(() -> onTransactionEndSync(tenantId, entityId));
  236 + private void onTransactionEnd(TenantId tenantId, EntityId entityId) {
  237 + callbackExecutor.executeAsync(() -> onRemoteTransactionEndSync(tenantId, entityId));
221 238 }
222 239
223   - private void onTransactionEndSync(TenantId tenantId, EntityId entityId) {
  240 + private boolean onRemoteTransactionEndSync(TenantId tenantId, EntityId entityId) {
224 241 Optional<ServerAddress> address = routingService.resolveById(entityId);
225   - address.ifPresent(serverAddress -> sendTransactionEvent(tenantId, entityId, serverAddress));
  242 + if (address.isPresent()) {
  243 + sendTransactionEvent(tenantId, entityId, address.get());
  244 + return true;
  245 + }
  246 + return false;
226 247 }
227 248
228 249 private void sendTransactionEvent(TenantId tenantId, EntityId entityId, ServerAddress address) {
... ...
... ... @@ -29,15 +29,16 @@ public final class TbTransactionTask {
29 29 private final Consumer<TbMsg> onStart;
30 30 private final Consumer<TbMsg> onEnd;
31 31 private final Consumer<Throwable> onFailure;
  32 + private final long expirationTime;
32 33
33   - private Boolean isCompleted;
34   - private Long expirationTime;
  34 + private boolean isCompleted;
35 35
36   - public TbTransactionTask(TbMsg msg, Consumer<TbMsg> onStart, Consumer<TbMsg> onEnd, Consumer<Throwable> onFailure) {
  36 + public TbTransactionTask(TbMsg msg, Consumer<TbMsg> onStart, Consumer<TbMsg> onEnd, Consumer<Throwable> onFailure, long expirationTime) {
37 37 this.msg = msg;
38 38 this.onStart = onStart;
39 39 this.onEnd = onEnd;
40 40 this.onFailure = onFailure;
  41 + this.expirationTime = expirationTime;
41 42 this.isCompleted = false;
42 43 }
43 44 }
... ...
... ... @@ -15,8 +15,6 @@
15 15 */
16 16 package org.thingsboard.rule.engine.api;
17 17
18   -import org.thingsboard.server.common.data.id.EntityId;
19   -import org.thingsboard.server.common.data.id.TenantId;
20 18 import org.thingsboard.server.common.msg.TbMsg;
21 19 import org.thingsboard.server.common.msg.cluster.ServerAddress;
22 20
... ... @@ -26,10 +24,8 @@ public interface RuleChainTransactionService {
26 24
27 25 void beginTransaction(TbContext ctx, TbMsg msg, Consumer<TbMsg> onStart, Consumer<TbMsg> onEnd, Consumer<Throwable> onFailure);
28 26
29   - boolean endTransaction(TbContext ctx, TbMsg msg, Consumer<Throwable> onFailure);
  27 + void endTransaction(TbContext ctx, TbMsg msg, Consumer<TbMsg> onSuccess, Consumer<Throwable> onFailure);
30 28
31 29 void onRemoteTransactionMsg(ServerAddress serverAddress, byte[] bytes);
32 30
33   - void onTransactionEnd(TenantId tenantId, EntityId entityId);
34   -
35 31 }
... ...
... ... @@ -28,7 +28,6 @@ import org.thingsboard.server.common.msg.TbMsg;
28 28 import org.thingsboard.server.common.msg.TbMsgDataType;
29 29 import org.thingsboard.server.common.msg.TbMsgTransactionData;
30 30
31   -import java.util.UUID;
32 31 import java.util.concurrent.ExecutionException;
33 32
34 33 import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
... ... @@ -54,19 +53,18 @@ public class TbTransactionBeginNode implements TbNode {
54 53
55 54 @Override
56 55 public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
57   - log.trace("Msg enters transaction - [{}] [{}]", msg.getId(), msg.getType());
  56 + log.trace("Msg enters transaction - [{}][{}]", msg.getId(), msg.getType());
58 57
59 58 TbMsgTransactionData transactionData = new TbMsgTransactionData(msg.getId(), msg.getOriginator());
60   -
61 59 TbMsg tbMsg = new TbMsg(msg.getId(), msg.getType(), msg.getOriginator(), msg.getMetaData(), TbMsgDataType.JSON,
62 60 msg.getData(), transactionData, msg.getRuleChainId(), msg.getRuleNodeId(), msg.getClusterPartition());
63 61
64   - ctx.getRuleChainTransactionService().beginTransaction(ctx, tbMsg, onStart -> {
65   - log.trace("Transaction starting... [{}] [{}]", tbMsg.getId(), tbMsg.getType());
66   - ctx.tellNext(tbMsg, SUCCESS);
67   - }, onEnd -> log.trace("Transaction ended successfully... [{}] [{}]", tbMsg.getId(), tbMsg.getType()),
  62 + ctx.getRuleChainTransactionService().beginTransaction(ctx, tbMsg, startMsg -> {
  63 + log.trace("Transaction starting... [{}][{}]", startMsg.getId(), startMsg.getType());
  64 + ctx.tellNext(startMsg, SUCCESS);
  65 + }, endMsg -> log.trace("Transaction ended successfully... [{}][{}]", endMsg.getId(), endMsg.getType()),
68 66 throwable -> {
69   - log.error("Transaction failed! [{}] [{}]", tbMsg.getId(), tbMsg.getType(), throwable);
  67 + log.error("Transaction failed! [{}][{}]", tbMsg.getId(), tbMsg.getType(), throwable);
70 68 ctx.tellFailure(tbMsg, throwable);
71 69 });
72 70 }
... ...
... ... @@ -51,12 +51,10 @@ public class TbTransactionEndNode implements TbNode {
51 51
52 52 @Override
53 53 public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
54   - ctx.getRuleChainTransactionService().onTransactionEnd(ctx.getTenantId(), msg.getTransactionData().getOriginatorId());
55   - boolean isFailed = ctx.getRuleChainTransactionService().endTransaction(ctx, msg, throwable -> ctx.tellFailure(msg, throwable));
56   - if (!isFailed) {
57   - ctx.tellNext(msg, SUCCESS);
58   - }
59   - log.trace("Msg left transaction - [{}] [{}]", msg.getId(), msg.getType());
  54 + ctx.getRuleChainTransactionService().endTransaction(ctx, msg,
  55 + successMsg -> ctx.tellNext(successMsg, SUCCESS),
  56 + throwable -> ctx.tellFailure(msg, throwable));
  57 + log.trace("Msg left transaction - [{}][{}]", msg.getId(), msg.getType());
60 58 }
61 59
62 60 @Override
... ...