Commit 6686003c5a2530385e573c087fcbfc2200db46cc

Authored by Dima Landiak
1 parent c3011e07

transaction development

... ... @@ -125,7 +125,7 @@ class DefaultTbContext implements TbContext {
125 125
126 126 @Override
127 127 public TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) {
128   - return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), data, origMsg.getRuleChainId(), origMsg.getRuleNodeId(), mainCtx.getQueuePartitionId());
  128 + return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), origMsg.getDataType(), data, origMsg.getTransactionData(), origMsg.getRuleChainId(), origMsg.getRuleNodeId(), mainCtx.getQueuePartitionId());
129 129 }
130 130
131 131 @Override
... ...
... ... @@ -279,6 +279,9 @@ public class TelemetryController extends BaseController {
279 279 deleteFromTs = 0L;
280 280 deleteToTs = System.currentTimeMillis();
281 281 } else {
  282 + if (startTs == null || endTs == null) {
  283 + return getImmediateDeferredResult("StartTs and endTs could not be empty when deleteAllDataForKeys equals [false]", HttpStatus.BAD_REQUEST);
  284 + }
282 285 deleteFromTs = startTs;
283 286 deleteToTs = endTs;
284 287 }
... ...
... ... @@ -15,40 +15,92 @@
15 15 */
16 16 package org.thingsboard.server.service.transaction;
17 17
18   -import com.google.common.util.concurrent.FutureCallback;
19 18 import lombok.extern.slf4j.Slf4j;
20 19 import org.springframework.beans.factory.annotation.Value;
21 20 import org.springframework.stereotype.Service;
22 21 import org.thingsboard.rule.engine.api.RuleChainTransactionService;
  22 +import org.thingsboard.rule.engine.api.TbContext;
23 23 import org.thingsboard.server.common.data.id.EntityId;
24 24 import org.thingsboard.server.common.msg.TbMsg;
25 25
26   -import java.util.Queue;
  26 +import java.util.concurrent.BlockingQueue;
27 27 import java.util.concurrent.ConcurrentHashMap;
28 28 import java.util.concurrent.ConcurrentMap;
29 29 import java.util.concurrent.LinkedBlockingQueue;
  30 +import java.util.concurrent.locks.Lock;
  31 +import java.util.concurrent.locks.ReentrantLock;
  32 +import java.util.function.Consumer;
30 33
31 34 @Service
32 35 @Slf4j
33 36 public class BaseRuleChainTransactionService implements RuleChainTransactionService {
34 37
35 38 @Value("${actors.rule.transaction.queue_size}")
36   - private int queueSize;
  39 + private int finalQueueSize;
37 40
38   - private final ConcurrentMap<EntityId, Queue<TbMsg>> transactionMap = new ConcurrentHashMap<>();
  41 + private final Lock transactionLock = new ReentrantLock();
  42 + private final ConcurrentMap<EntityId, BlockingQueue<TbTransactionTask>> transactionMap = new ConcurrentHashMap<>();
39 43
40   - @Override
41   - public void beginTransaction(EntityId entityId, FutureCallback<Void> callback) {
42   -
43   -
44   - transactionMap.computeIfAbsent(entityId, id -> new LinkedBlockingQueue<>(queueSize));
  44 + //TODO: add delete on timeout from queue -> onFailure accept
45 45
46   - log.info("[{}]", queueSize);
  46 + @Override
  47 + public void beginTransaction(TbContext ctx, TbMsg msg, Consumer<TbMsg> onStart, Consumer<TbMsg> onEnd, Consumer<Throwable> onFailure) {
  48 + BlockingQueue<TbTransactionTask> queue = transactionMap.computeIfAbsent(msg.getTransactionData().getOriginatorId(), id ->
  49 + new LinkedBlockingQueue<>(finalQueueSize));
  50 + transactionLock.lock();
  51 + try {
  52 + TbTransactionTask task = new TbTransactionTask(msg, onStart, onEnd, onFailure);
  53 + int queueSize = queue.size();
  54 + if (queueSize >= finalQueueSize) {
  55 + onFailure.accept(new RuntimeException("Queue has no space!"));
  56 + } else {
  57 + addMsgToQueue(queue, task, onFailure);
  58 + if (queueSize == 0) {
  59 + onStart.accept(msg);
  60 + } else {
  61 + log.info("Msg [{}] [{}] is waiting to start transaction!", msg.getId(), msg.getType());
  62 + }
  63 + }
  64 + } finally {
  65 + transactionLock.unlock();
  66 + }
  67 + }
47 68
  69 + private void addMsgToQueue(BlockingQueue<TbTransactionTask> queue, TbTransactionTask task, Consumer<Throwable> onFailure) {
  70 + try {
  71 + queue.add(task);
  72 + log.info("Added msg to queue, size: [{}]", queue.size());
  73 + } catch (Exception e) {
  74 + log.error("Error when trying to add msg [{}] to the queue", task.getMsg(), e);
  75 + onFailure.accept(e);
  76 + }
48 77 }
49 78
50 79 @Override
51   - public void endTransaction() {
  80 + public boolean endTransaction(TbContext ctx, TbMsg msg, Consumer<Throwable> onFailure) {
  81 + transactionLock.lock();
  82 + try {
  83 + BlockingQueue<TbTransactionTask> queue = transactionMap.get(msg.getTransactionData().getOriginatorId());
  84 + try {
  85 + TbTransactionTask currentTask = queue.element();
  86 + if (currentTask.getMsg().getTransactionData().getTransactionId().equals(msg.getTransactionData().getTransactionId())) {
  87 + queue.remove();
  88 + log.info("Removed msg from queue, size [{}]", queue.size());
  89 + currentTask.getOnEnd().accept(currentTask.getMsg());
52 90
  91 + TbTransactionTask nextTask = queue.peek();
  92 + if (nextTask != null) {
  93 + nextTask.getOnStart().accept(nextTask.getMsg());
  94 + }
  95 + }
  96 + } catch (Exception e) {
  97 + log.error("Queue is empty!", queue);
  98 + onFailure.accept(e);
  99 + return true;
  100 + }
  101 + } finally {
  102 + transactionLock.unlock();
  103 + }
  104 + return false;
53 105 }
54 106 }
... ...
  1 +package org.thingsboard.server.service.transaction;
  2 +
  3 +import lombok.Data;
  4 +import org.thingsboard.server.common.msg.TbMsg;
  5 +
  6 +import java.util.function.Consumer;
  7 +
  8 +@Data
  9 +public final class TbTransactionTask {
  10 +
  11 + private final TbMsg msg;
  12 + private final Consumer<TbMsg> onStart;
  13 + private final Consumer<TbMsg> onEnd;
  14 + private final Consumer<Throwable> onFailure;
  15 +
  16 +}
... ...
... ... @@ -41,6 +41,7 @@ public final class TbMsg implements Serializable {
41 41 private final TbMsgMetaData metaData;
42 42 private final TbMsgDataType dataType;
43 43 private final String data;
  44 + private final TbMsgTransactionData transactionData;
44 45
45 46 //The following fields are not persisted to DB, because they can always be recovered from the context;
46 47 private final RuleChainId ruleChainId;
... ... @@ -55,11 +56,17 @@ public final class TbMsg implements Serializable {
55 56 this.metaData = metaData;
56 57 this.data = data;
57 58 this.dataType = TbMsgDataType.JSON;
  59 + this.transactionData = null;
58 60 this.ruleChainId = ruleChainId;
59 61 this.ruleNodeId = ruleNodeId;
60 62 this.clusterPartition = clusterPartition;
61 63 }
62 64
  65 + public TbMsg(UUID id, String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
  66 + RuleChainId ruleChainId, RuleNodeId ruleNodeId, long clusterPartition) {
  67 + this(id, type, originator, metaData, dataType, data, null, ruleChainId, ruleNodeId, clusterPartition);
  68 + }
  69 +
63 70 public static ByteBuffer toBytes(TbMsg msg) {
64 71 MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder();
65 72 builder.setId(msg.getId().toString());
... ... @@ -82,6 +89,16 @@ public final class TbMsg implements Serializable {
82 89 builder.setMetaData(MsgProtos.TbMsgMetaDataProto.newBuilder().putAllData(msg.getMetaData().getData()).build());
83 90 }
84 91
  92 + TbMsgTransactionData transactionData = msg.getTransactionData();
  93 + if (transactionData != null) {
  94 + MsgProtos.TbMsgTransactionDataProto.Builder transactionBuilder = MsgProtos.TbMsgTransactionDataProto.newBuilder();
  95 + transactionBuilder.setId(transactionData.getTransactionId().toString());
  96 + transactionBuilder.setEntityType(transactionData.getOriginatorId().getEntityType().name());
  97 + transactionBuilder.setEntityIdMSB(transactionData.getOriginatorId().getId().getMostSignificantBits());
  98 + transactionBuilder.setEntityIdLSB(transactionData.getOriginatorId().getId().getLeastSignificantBits());
  99 + builder.setTransactionData(transactionBuilder.build());
  100 + }
  101 +
85 102 builder.setDataType(msg.getDataType().ordinal());
86 103 builder.setData(msg.getData());
87 104 byte[] bytes = builder.build().toByteArray();
... ... @@ -92,6 +109,9 @@ public final class TbMsg implements Serializable {
92 109 try {
93 110 MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(buffer.array());
94 111 TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap());
  112 + EntityId transactionEntityId = EntityIdFactory.getByTypeAndUuid(proto.getTransactionData().getEntityType(),
  113 + new UUID(proto.getTransactionData().getEntityIdMSB(), proto.getTransactionData().getEntityIdLSB()));
  114 + TbMsgTransactionData transactionData = new TbMsgTransactionData(UUID.fromString(proto.getTransactionData().getId()), transactionEntityId);
95 115 EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
96 116 RuleChainId ruleChainId = new RuleChainId(new UUID(proto.getRuleChainIdMSB(), proto.getRuleChainIdLSB()));
97 117 RuleNodeId ruleNodeId = null;
... ... @@ -99,7 +119,7 @@ public final class TbMsg implements Serializable {
99 119 ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB()));
100 120 }
101 121 TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
102   - return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, proto.getClusterPartition());
  122 + return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData(), transactionData, ruleChainId, ruleNodeId, proto.getClusterPartition());
103 123 } catch (InvalidProtocolBufferException e) {
104 124 throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
105 125 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.msg;
  17 +
  18 +import lombok.Data;
  19 +import org.thingsboard.server.common.data.id.EntityId;
  20 +
  21 +import java.io.Serializable;
  22 +import java.util.UUID;
  23 +
  24 +@Data
  25 +public final class TbMsgTransactionData implements Serializable {
  26 +
  27 + private final UUID transactionId;
  28 + private final EntityId originatorId;
  29 +
  30 +}
... ...
... ... @@ -23,6 +23,13 @@ message TbMsgMetaDataProto {
23 23 map<string, string> data = 1;
24 24 }
25 25
  26 +message TbMsgTransactionDataProto {
  27 + string id = 1;
  28 + string entityType = 2;
  29 + int64 entityIdMSB = 3;
  30 + int64 entityIdLSB = 4;
  31 +}
  32 +
26 33 message TbMsgProto {
27 34 string id = 1;
28 35 string type = 2;
... ... @@ -39,7 +46,9 @@ message TbMsgProto {
39 46
40 47 TbMsgMetaDataProto metaData = 11;
41 48
42   - int32 dataType = 12;
43   - string data = 13;
  49 + TbMsgTransactionDataProto transactionData = 12;
  50 +
  51 + int32 dataType = 13;
  52 + string data = 14;
44 53
45 54 }
\ No newline at end of file
... ...
... ... @@ -15,13 +15,15 @@
15 15 */
16 16 package org.thingsboard.rule.engine.api;
17 17
18   -import com.google.common.util.concurrent.FutureCallback;
19   -import org.thingsboard.server.common.data.id.EntityId;
  18 +import org.thingsboard.server.common.msg.TbMsg;
  19 +
  20 +import java.util.concurrent.Callable;
  21 +import java.util.function.Consumer;
20 22
21 23 public interface RuleChainTransactionService {
22 24
23   - void beginTransaction(EntityId entityId, FutureCallback<Void> callback);
  25 + void beginTransaction(TbContext ctx, TbMsg msg, Consumer<TbMsg> onStart, Consumer<TbMsg> onEnd, Consumer<Throwable> onFailure);
24 26
25   - void endTransaction();
  27 + boolean endTransaction(TbContext ctx, TbMsg msg, Consumer<Throwable> onFailure);
26 28
27 29 }
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - *
  3 + * <p>
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -15,7 +15,6 @@
15 15 */
16 16 package org.thingsboard.rule.engine.transaction;
17 17
18   -import com.google.common.util.concurrent.FutureCallback;
19 18 import lombok.extern.slf4j.Slf4j;
20 19 import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
21 20 import org.thingsboard.rule.engine.api.RuleNode;
... ... @@ -26,9 +25,14 @@ import org.thingsboard.rule.engine.api.TbNodeException;
26 25 import org.thingsboard.rule.engine.api.util.TbNodeUtils;
27 26 import org.thingsboard.server.common.data.plugin.ComponentType;
28 27 import org.thingsboard.server.common.msg.TbMsg;
  28 +import org.thingsboard.server.common.msg.TbMsgDataType;
  29 +import org.thingsboard.server.common.msg.TbMsgTransactionData;
29 30
  31 +import java.util.UUID;
30 32 import java.util.concurrent.ExecutionException;
31 33
  34 +import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
  35 +
32 36 @Slf4j
33 37 @RuleNode(
34 38 type = ComponentType.ACTION,
... ... @@ -50,9 +54,21 @@ public class TbTransactionBeginNode implements TbNode {
50 54
51 55 @Override
52 56 public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
53   - FutureCallback<Void> callback = null;
  57 + log.info("Msg in - [{}] [{}]", msg.getId(), msg.getType());
  58 +
  59 + TbMsgTransactionData transactionData = new TbMsgTransactionData(UUID.randomUUID(), msg.getOriginator());
  60 +
  61 + TbMsg tbMsg = new TbMsg(msg.getId(), msg.getType(), msg.getOriginator(), msg.getMetaData(), TbMsgDataType.JSON,
  62 + msg.getData(), transactionData, msg.getRuleChainId(), msg.getRuleNodeId(), msg.getClusterPartition());
54 63
55   - ctx.getRuleChainTransactionService().beginTransaction(msg.getOriginator(), callback);
  64 + ctx.getRuleChainTransactionService().beginTransaction(ctx, tbMsg, onStart -> {
  65 + log.info("Transaction starting... [{}] [{}]", tbMsg.getId(), tbMsg.getType());
  66 + ctx.tellNext(tbMsg, SUCCESS);
  67 + }, onEnd -> log.info("Transaction ended successfully... [{}] [{}]", tbMsg.getId(), tbMsg.getType()),
  68 + throwable -> {
  69 + log.error("Transaction failed due to queue size restriction! [{}] [{}]", tbMsg.getId(), tbMsg.getType(), throwable);
  70 + ctx.tellFailure(tbMsg, throwable);
  71 + });
56 72 }
57 73
58 74 @Override
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - *
  3 + * <p>
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -17,19 +17,19 @@ package org.thingsboard.rule.engine.transaction;
17 17
18 18 import lombok.extern.slf4j.Slf4j;
19 19 import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
20   -import org.thingsboard.rule.engine.api.RuleChainTransactionService;
21 20 import org.thingsboard.rule.engine.api.RuleNode;
22 21 import org.thingsboard.rule.engine.api.TbContext;
23 22 import org.thingsboard.rule.engine.api.TbNode;
24 23 import org.thingsboard.rule.engine.api.TbNodeConfiguration;
25 24 import org.thingsboard.rule.engine.api.TbNodeException;
26 25 import org.thingsboard.rule.engine.api.util.TbNodeUtils;
27   -import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration;
28 26 import org.thingsboard.server.common.data.plugin.ComponentType;
29 27 import org.thingsboard.server.common.msg.TbMsg;
30 28
31 29 import java.util.concurrent.ExecutionException;
32 30
  31 +import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
  32 +
33 33 @Slf4j
34 34 @RuleNode(
35 35 type = ComponentType.ACTION,
... ... @@ -51,7 +51,11 @@ public class TbTransactionEndNode implements TbNode {
51 51
52 52 @Override
53 53 public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
54   -
  54 + boolean isFailed = ctx.getRuleChainTransactionService().endTransaction(ctx, msg, throwable -> ctx.tellFailure(msg, throwable));
  55 + if (!isFailed) {
  56 + ctx.tellNext(msg, SUCCESS);
  57 + }
  58 + log.info("Msg out - [{}] [{}]", msg.getId(), msg.getType());
55 59 }
56 60
57 61 @Override
... ...