Commit 280a752fc92775ab146d31fd56a2147e72cfc17d

Authored by Andrew Shvayka
1 parent e141b560

Queue tests and ability to set size of in memory queue

... ... @@ -86,13 +86,26 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
86 86 nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
87 87 }
88 88 initRoutes(ruleChain, ruleNodeList);
89   - //TODO: read all messages from queues of the actors and push then to the corresponding node actors;
  89 + reprocess(ruleNodeList);
90 90 started = true;
91 91 } else {
92 92 onUpdate(context);
93 93 }
94 94 }
95 95
  96 + private void reprocess(List<RuleNode> ruleNodeList) {
  97 + for (RuleNode ruleNode : ruleNodeList) {
  98 + for (TbMsg tbMsg : queue.findUnprocessed(ruleNode.getId().getId(), 0L)) {
  99 + pushMsgToNode(nodeActors.get(ruleNode.getId()), tbMsg);
  100 + }
  101 + }
  102 + if (firstNode != null) {
  103 + for (TbMsg tbMsg : queue.findUnprocessed(entityId.getId(), 0L)) {
  104 + pushMsgToNode(firstNode, tbMsg);
  105 + }
  106 + }
  107 + }
  108 +
96 109 @Override
97 110 public void onUpdate(ActorContext context) throws Exception {
98 111 RuleChain ruleChain = service.findRuleChainById(entityId);
... ... @@ -117,6 +130,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
117 130 });
118 131
119 132 initRoutes(ruleChain, ruleNodeList);
  133 + reprocess(ruleNodeList);
120 134 }
121 135
122 136 @Override
... ... @@ -182,7 +196,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
182 196
183 197 void onRuleChainToRuleChainMsg(RuleChainToRuleChainMsg envelope) {
184 198 checkActive();
185   - if(envelope.isEnqueue()) {
  199 + if (envelope.isEnqueue()) {
186 200 putToQueue(enrichWithRuleChainId(envelope.getMsg()), msg -> pushMsgToNode(firstNode, msg));
187 201 } else {
188 202 pushMsgToNode(firstNode, envelope.getMsg());
... ...
... ... @@ -302,7 +302,9 @@ spring:
302 302
303 303 rule:
304 304 queue:
305   - msg_partitioning: "${QUEUE_MSG_PARTITIONING:HOURS}"
  305 + type: "memory"
  306 + max_size: 10000
  307 +
306 308
307 309 # PostgreSQL DAO Configuration
308 310 #spring:
... ...
... ... @@ -159,4 +159,72 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
159 159 Assert.assertEquals("serverAttributeValue", getMetadata(outEvent).get("ss_serverAttributeKey").asText());
160 160 }
161 161
  162 + @Test
  163 + public void testRuleChainWithOneRuleAndMsgFromQueue() throws Exception {
  164 + // Creating Rule Chain
  165 + RuleChain ruleChain = new RuleChain();
  166 + ruleChain.setName("Simple Rule Chain");
  167 + ruleChain.setTenantId(savedTenant.getId());
  168 + ruleChain.setRoot(true);
  169 + ruleChain.setDebugMode(true);
  170 + ruleChain = saveRuleChain(ruleChain);
  171 + Assert.assertNull(ruleChain.getFirstRuleNodeId());
  172 +
  173 + // Saving the device
  174 + Device device = new Device();
  175 + device.setName("My device");
  176 + device.setType("default");
  177 + device = doPost("/api/device", device, Device.class);
  178 +
  179 + attributesService.save(device.getId(), DataConstants.SERVER_SCOPE,
  180 + Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey", "serverAttributeValue"), System.currentTimeMillis())));
  181 +
  182 + // Pushing Message to the system
  183 + TbMsg tbMsg = new TbMsg(UUIDs.timeBased(),
  184 + "CUSTOM",
  185 + device.getId(),
  186 + new TbMsgMetaData(),
  187 + "{}",
  188 + ruleChain.getId(), null, 0L);
  189 + msgQueue.put(tbMsg, ruleChain.getId().getId(), 0L);
  190 +
  191 + Thread.sleep(1000);
  192 +
  193 + RuleChainMetaData metaData = new RuleChainMetaData();
  194 + metaData.setRuleChainId(ruleChain.getId());
  195 +
  196 + RuleNode ruleNode = new RuleNode();
  197 + ruleNode.setName("Simple Rule Node");
  198 + ruleNode.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
  199 + ruleNode.setDebugMode(true);
  200 + TbGetAttributesNodeConfiguration configuration = new TbGetAttributesNodeConfiguration();
  201 + configuration.setServerAttributeNames(Collections.singletonList("serverAttributeKey"));
  202 + ruleNode.setConfiguration(mapper.valueToTree(configuration));
  203 +
  204 + metaData.setNodes(Collections.singletonList(ruleNode));
  205 + metaData.setFirstNodeIndex(0);
  206 +
  207 + metaData = saveRuleChainMetaData(metaData);
  208 + Assert.assertNotNull(metaData);
  209 +
  210 + ruleChain = getRuleChain(ruleChain.getId());
  211 + Assert.assertNotNull(ruleChain.getFirstRuleNodeId());
  212 +
  213 + Thread.sleep(3000);
  214 +
  215 + TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
  216 +
  217 + Assert.assertEquals(2, events.getData().size());
  218 +
  219 + Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
  220 + Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
  221 + Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
  222 +
  223 + Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
  224 + Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
  225 + Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
  226 +
  227 + Assert.assertEquals("serverAttributeValue", getMetadata(outEvent).get("ss_serverAttributeKey").asText());
  228 + }
  229 +
162 230 }
... ...
... ... @@ -30,6 +30,7 @@ import java.time.ZoneOffset;
30 30 import java.util.List;
31 31 import java.util.Optional;
32 32 import java.util.UUID;
  33 +import java.util.concurrent.TimeUnit;
33 34
34 35 @Component
35 36 @Slf4j
... ... @@ -60,7 +61,7 @@ public class QueuePartitioner {
60 61
61 62 public List<Long> findUnprocessedPartitions(UUID nodeId, long clusteredHash) {
62 63 Optional<Long> lastPartitionOption = processedPartitionRepository.findLastProcessedPartition(nodeId, clusteredHash);
63   - long lastPartition = lastPartitionOption.orElse(System.currentTimeMillis() - 7 * 24 * 60 * 60 * 100);
  64 + long lastPartition = lastPartitionOption.orElse(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(7));
64 65 List<Long> unprocessedPartitions = Lists.newArrayList();
65 66
66 67 LocalDateTime current = LocalDateTime.ofInstant(Instant.ofEpochMilli(lastPartition), ZoneOffset.UTC);
... ...
... ... @@ -15,10 +15,14 @@
15 15 */
16 16 package org.thingsboard.server.dao.sql.queue;
17 17
  18 +import com.google.common.util.concurrent.Futures;
18 19 import com.google.common.util.concurrent.ListenableFuture;
19 20 import com.google.common.util.concurrent.ListeningExecutorService;
20 21 import com.google.common.util.concurrent.MoreExecutors;
  22 +import lombok.Getter;
21 23 import lombok.extern.slf4j.Slf4j;
  24 +import org.springframework.beans.factory.annotation.Value;
  25 +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
22 26 import org.springframework.stereotype.Component;
23 27 import org.thingsboard.server.common.msg.TbMsg;
24 28 import org.thingsboard.server.dao.queue.MsgQueue;
... ... @@ -40,13 +44,17 @@ import java.util.concurrent.atomic.AtomicLong;
40 44 * Created by ashvayka on 27.04.18.
41 45 */
42 46 @Component
  47 +//@ConditionalOnProperty(prefix = "rule.queue", value = "type", havingValue = "memory", matchIfMissing = true)
43 48 @Slf4j
44 49 @SqlDao
45 50 public class InMemoryMsgQueue implements MsgQueue {
46 51
  52 + @Value("${rule.queue.max_size}")
  53 + @Getter
  54 + private long maxSize;
  55 +
47 56 private ListeningExecutorService queueExecutor;
48   - //TODO:
49   - private AtomicLong pendingMsgCount;
  57 + private AtomicLong pendingMsgCount = new AtomicLong();
50 58 private Map<InMemoryMsgKey, Map<UUID, TbMsg>> data = new HashMap<>();
51 59
52 60 @PostConstruct
... ... @@ -64,10 +72,15 @@ public class InMemoryMsgQueue implements MsgQueue {
64 72
65 73 @Override
66 74 public ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusterPartition) {
67   - return queueExecutor.submit(() -> {
68   - data.computeIfAbsent(new InMemoryMsgKey(nodeId, clusterPartition), key -> new HashMap<>()).put(msg.getId(), msg);
69   - return null;
70   - });
  75 + if (pendingMsgCount.get() < maxSize) {
  76 + return queueExecutor.submit(() -> {
  77 + data.computeIfAbsent(new InMemoryMsgKey(nodeId, clusterPartition), key -> new HashMap<>()).put(msg.getId(), msg);
  78 + pendingMsgCount.incrementAndGet();
  79 + return null;
  80 + });
  81 + } else {
  82 + return Futures.immediateFailedFuture(new RuntimeException("Message queue is full!"));
  83 + }
71 84 }
72 85
73 86 @Override
... ... @@ -76,14 +89,15 @@ public class InMemoryMsgQueue implements MsgQueue {
76 89 InMemoryMsgKey key = new InMemoryMsgKey(nodeId, clusterPartition);
77 90 Map<UUID, TbMsg> map = data.get(key);
78 91 if (map != null) {
79   - map.remove(msg.getId());
  92 + if (map.remove(msg.getId()) != null) {
  93 + pendingMsgCount.decrementAndGet();
  94 + }
80 95 if (map.isEmpty()) {
81 96 data.remove(key);
82 97 }
83 98 }
84 99 return null;
85 100 });
86   -
87 101 }
88 102
89 103 @Override
... ...
... ... @@ -75,7 +75,7 @@ public class QueuePartitionerTest {
75 75 long clusteredHash = 101L;
76 76 when(partitionRepo.findLastProcessedPartition(nodeId, clusteredHash)).thenReturn(Optional.empty());
77 77 List<Long> actual = queuePartitioner.findUnprocessedPartitions(nodeId, clusteredHash);
78   - assertEquals(1011, actual.size());
  78 + assertEquals(10083, actual.size());
79 79 }
80 80
81 81 }
\ No newline at end of file
... ...
... ... @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.ListenableFuture;
21 21 import org.junit.Before;
22 22 import org.junit.Test;
23 23 import org.springframework.beans.factory.annotation.Autowired;
  24 +import org.springframework.test.util.ReflectionTestUtils;
24 25 import org.thingsboard.server.dao.service.AbstractServiceTest;
25 26 import org.thingsboard.server.dao.service.DaoNoSqlTest;
26 27 import org.thingsboard.server.dao.service.queue.cassandra.MsgAck;
... ... @@ -66,6 +67,7 @@ public class CassandraAckRepositoryTest extends AbstractServiceTest {
66 67
67 68 @Test
68 69 public void expiredAcksAreNotReturned() throws ExecutionException, InterruptedException {
  70 + ReflectionTestUtils.setField(ackRepository, "ackQueueTtl", 1);
69 71 UUID msgId = UUIDs.timeBased();
70 72 UUID nodeId = UUIDs.timeBased();
71 73 MsgAck ack = new MsgAck(msgId, nodeId, 30L, 40L);
... ...
... ... @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.ListenableFuture;
22 22 import org.junit.Before;
23 23 import org.junit.Test;
24 24 import org.springframework.beans.factory.annotation.Autowired;
  25 +import org.springframework.test.util.ReflectionTestUtils;
25 26 import org.thingsboard.server.common.data.id.DeviceId;
26 27 import org.thingsboard.server.common.data.id.RuleChainId;
27 28 import org.thingsboard.server.common.data.id.RuleNodeId;
... ... @@ -58,6 +59,7 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest {
58 59
59 60 @Test
60 61 public void expiredMsgsAreNotReturned() throws ExecutionException, InterruptedException {
  62 + ReflectionTestUtils.setField(msgRepository, "msqQueueTtl", 1);
61 63 TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, "0000",
62 64 new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L);
63 65 UUID nodeId = UUIDs.timeBased();
... ...
... ... @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.ListenableFuture;
21 21 import org.junit.Before;
22 22 import org.junit.Test;
23 23 import org.springframework.beans.factory.annotation.Autowired;
  24 +import org.springframework.test.util.ReflectionTestUtils;
24 25 import org.thingsboard.server.dao.service.AbstractServiceTest;
25 26 import org.thingsboard.server.dao.service.DaoNoSqlTest;
26 27
... ... @@ -61,6 +62,7 @@ public class CassandraProcessedPartitionRepositoryTest extends AbstractServiceTe
61 62
62 63 @Test
63 64 public void expiredPartitionsAreNotReturned() throws ExecutionException, InterruptedException {
  65 + ReflectionTestUtils.setField(partitionRepository, "partitionsTtl", 1);
64 66 UUID nodeId = UUIDs.timeBased();
65 67 ListenableFuture<Void> future = partitionRepository.partitionProcessed(nodeId, 404L, 10L);
66 68 future.get();
... ...
... ... @@ -28,3 +28,6 @@ redis.connection.host=localhost
28 28 redis.connection.port=6379
29 29 redis.connection.db=0
30 30 redis.connection.password=
  31 +
  32 +rule.queue.type=memory
  33 +rule.queue.max_size=10000
\ No newline at end of file
... ...
1 1 database.type=cassandra
2 2
3 3 cassandra.queue.partitioning=HOURS
4   -cassandra.queue.ack.ttl=1
5   -cassandra.queue.msg.ttl=1
6   -cassandra.queue.partitions.ttl=1
\ No newline at end of file
  4 +cassandra.queue.ack.ttl=3600
  5 +cassandra.queue.msg.ttl=3600
  6 +cassandra.queue.partitions.ttl=3600
\ No newline at end of file
... ...