Commit 9e4afaa5eeaa1ebc52166341400f7eec533b7f57

Authored by Andrew Shvayka
1 parent 0b4ec2b9

Ability to push msg to different rule chain(s) + msg ack

Showing 14 changed files with 358 additions and 17 deletions
... ... @@ -51,6 +51,9 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
51 51 case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
52 52 processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);
53 53 break;
  54 + case RULE_CHAIN_TO_RULE_CHAIN_MSG:
  55 + processor.onRuleChainToRuleChainMsg((RuleChainToRuleChainMsg) msg);
  56 + break;
54 57 default:
55 58 return false;
56 59 }
... ...
... ... @@ -180,6 +180,15 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
180 180 });
181 181 }
182 182
  183 + void onRuleChainToRuleChainMsg(RuleChainToRuleChainMsg envelope) {
  184 + checkActive();
  185 + if(envelope.isEnqueue()) {
  186 + putToQueue(enrichWithRuleChainId(envelope.getMsg()), msg -> pushMsgToNode(firstNode, msg));
  187 + } else {
  188 + pushMsgToNode(firstNode, envelope.getMsg());
  189 + }
  190 + }
  191 +
183 192 void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
184 193 checkActive();
185 194 RuleNodeId originator = envelope.getOriginator();
... ... @@ -190,8 +199,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
190 199
191 200 TbMsg msg = envelope.getMsg();
192 201 int relationsCount = relations.size();
  202 + EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
193 203 if (relationsCount == 0) {
194   - queue.ack(msg, msg.getRuleNodeId().getId(), msg.getClusterPartition());
  204 + queue.ack(msg, ackId.getId(), msg.getClusterPartition());
195 205 } else if (relationsCount == 1) {
196 206 for (RuleNodeRelation relation : relations) {
197 207 pushToTarget(msg, relation.getOut());
... ... @@ -201,22 +211,31 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
201 211 EntityId target = relation.getOut();
202 212 switch (target.getEntityType()) {
203 213 case RULE_NODE:
204   - RuleNodeId targetId = new RuleNodeId(target.getId());
205   - RuleNodeCtx targetNodeCtx = nodeActors.get(targetId);
206   - TbMsg copy = msg.copy(UUIDs.timeBased(), entityId, targetId, DEFAULT_CLUSTER_PARTITION);
207   - putToQueue(copy, queuedMsg -> pushMsgToNode(targetNodeCtx, queuedMsg));
  214 + enqueueAndForwardMsgCopyToNode(msg, target);
208 215 break;
209 216 case RULE_CHAIN:
210   - parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, true), self);
  217 + enqueueAndForwardMsgCopyToChain(msg, target);
211 218 break;
212 219 }
213 220 }
214 221 //TODO: Ideally this should happen in async way when all targets confirm that the copied messages are successfully written to corresponding target queues.
215   - EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
216 222 queue.ack(msg, ackId.getId(), msg.getClusterPartition());
217 223 }
218 224 }
219 225
  226 + private void enqueueAndForwardMsgCopyToChain(TbMsg msg, EntityId target) {
  227 + RuleChainId targetRCId = new RuleChainId(target.getId());
  228 + TbMsg copyMsg = msg.copy(UUIDs.timeBased(), targetRCId, null, DEFAULT_CLUSTER_PARTITION);
  229 + parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, copyMsg, true), self);
  230 + }
  231 +
  232 + private void enqueueAndForwardMsgCopyToNode(TbMsg msg, EntityId target) {
  233 + RuleNodeId targetId = new RuleNodeId(target.getId());
  234 + RuleNodeCtx targetNodeCtx = nodeActors.get(targetId);
  235 + TbMsg copy = msg.copy(UUIDs.timeBased(), entityId, targetId, DEFAULT_CLUSTER_PARTITION);
  236 + putToQueue(copy, queuedMsg -> pushMsgToNode(targetNodeCtx, queuedMsg));
  237 + }
  238 +
220 239 private void pushToTarget(TbMsg msg, EntityId target) {
221 240 switch (target.getEntityType()) {
222 241 case RULE_NODE:
... ...
... ... @@ -26,7 +26,7 @@ import org.thingsboard.server.common.msg.TbMsg;
26 26 * Created by ashvayka on 19.03.18.
27 27 */
28 28 @Data
29   -final class RuleChainToRuleChainMsg implements TbActorMsg {
  29 +public final class RuleChainToRuleChainMsg implements TbActorMsg {
30 30
31 31 private final RuleChainId target;
32 32 private final RuleChainId source;
... ...
... ... @@ -22,6 +22,7 @@ import org.thingsboard.server.actors.device.DeviceActor;
22 22 import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg;
23 23 import org.thingsboard.server.actors.plugin.PluginTerminationMsg;
24 24 import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor;
  25 +import org.thingsboard.server.actors.ruleChain.RuleChainToRuleChainMsg;
25 26 import org.thingsboard.server.actors.service.ContextBasedCreator;
26 27 import org.thingsboard.server.actors.service.DefaultActorService;
27 28 import org.thingsboard.server.actors.shared.plugin.TenantPluginManager;
... ... @@ -83,6 +84,9 @@ public class TenantActor extends RuleChainManagerActor {
83 84 case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
84 85 onToDeviceActorMsg((DeviceAwareMsg) msg);
85 86 break;
  87 + case RULE_CHAIN_TO_RULE_CHAIN_MSG:
  88 + onRuleChainMsg((RuleChainToRuleChainMsg) msg);
  89 + break;
86 90 default:
87 91 return false;
88 92 }
... ... @@ -103,6 +107,11 @@ public class TenantActor extends RuleChainManagerActor {
103 107 ruleChainManager.getRootChainActor().tell(msg, self());
104 108 }
105 109
  110 + private void onRuleChainMsg(RuleChainToRuleChainMsg msg) {
  111 + ruleChainManager.getOrCreateActor(context(), msg.getTarget()).tell(msg, self());
  112 + }
  113 +
  114 +
106 115 private void onToDeviceActorMsg(DeviceAwareMsg msg) {
107 116 getOrCreateDeviceActor(msg.getDeviceId()).tell(msg, ActorRef.noSender());
108 117 }
... ...
... ... @@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.page.TimePageData;
27 27 import org.thingsboard.server.common.data.page.TimePageLink;
28 28 import org.thingsboard.server.common.data.rule.RuleChain;
29 29 import org.thingsboard.server.common.data.rule.RuleChainMetaData;
  30 +import org.thingsboard.server.dao.queue.MsgQueue;
30 31 import org.thingsboard.server.dao.rule.RuleChainService;
31 32
32 33 import java.io.IOException;
... ... @@ -39,6 +40,9 @@ public class AbstractRuleEngineControllerTest extends AbstractControllerTest {
39 40 @Autowired
40 41 protected RuleChainService ruleChainService;
41 42
  43 + @Autowired
  44 + protected MsgQueue msgQueue;
  45 +
42 46 protected RuleChain saveRuleChain(RuleChain ruleChain) throws Exception {
43 47 return doPost("/api/ruleChain", ruleChain, RuleChain.class);
44 48 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.rules;
  17 +
  18 +import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
  19 +import org.junit.ClassRule;
  20 +import org.junit.extensions.cpsuite.ClasspathSuite;
  21 +import org.junit.runner.RunWith;
  22 +import org.thingsboard.server.dao.CustomCassandraCQLUnit;
  23 +import org.thingsboard.server.dao.CustomSqlUnit;
  24 +
  25 +import java.util.Arrays;
  26 +
  27 +@RunWith(ClasspathSuite.class)
  28 +@ClasspathSuite.ClassnameFilters({
  29 + "org.thingsboard.server.rules.flow.nosql.*Test",
  30 + "org.thingsboard.server.rules.lifecycle.nosql.*Test"
  31 +})
  32 +public class RuleEngineNoSqlTestSuite {
  33 +
  34 + @ClassRule
  35 + public static CustomCassandraCQLUnit cassandraUnit =
  36 + new CustomCassandraCQLUnit(
  37 + Arrays.asList(
  38 + new ClassPathCQLDataSet("cassandra/schema.cql", false, false),
  39 + new ClassPathCQLDataSet("cassandra/system-data.cql", false, false)),
  40 + "cassandra-test.yaml", 30000l);
  41 +
  42 +}
... ...
... ... @@ -24,8 +24,8 @@ import java.util.Arrays;
24 24
25 25 @RunWith(ClasspathSuite.class)
26 26 @ClasspathSuite.ClassnameFilters({
27   - "org.thingsboard.server.rules.flow.*Test",
28   - "org.thingsboard.server.rules.lifecycle.*Test"})
  27 + "org.thingsboard.server.rules.flow.sql.*Test",
  28 + "org.thingsboard.server.rules.lifecycle.sql.*Test"})
29 29 public class RuleEngineSqlTestSuite {
30 30
31 31 @ClassRule
... ...
... ... @@ -17,6 +17,7 @@ package org.thingsboard.server.rules.flow;
17 17
18 18 import com.datastax.driver.core.utils.UUIDs;
19 19 import com.fasterxml.jackson.databind.JsonNode;
  20 +import com.google.common.collect.Lists;
20 21 import lombok.Data;
21 22 import lombok.extern.slf4j.Slf4j;
22 23 import org.junit.After;
... ... @@ -45,6 +46,7 @@ import org.thingsboard.server.dao.rule.RuleChainService;
45 46 import java.io.IOException;
46 47 import java.util.Arrays;
47 48 import java.util.Collections;
  49 +import java.util.List;
48 50
49 51 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
50 52
... ... @@ -186,6 +188,129 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
186 188
187 189 Assert.assertEquals("serverAttributeValue1", getMetadata(outEvent).get("ss_serverAttributeKey1").asText());
188 190 Assert.assertEquals("serverAttributeValue2", getMetadata(outEvent).get("ss_serverAttributeKey2").asText());
  191 +
  192 + List<TbMsg> unAckMsgList = Lists.newArrayList(msgQueue.findUnprocessed(ruleChain.getId().getId(), 0L));
  193 + Assert.assertEquals(0, unAckMsgList.size());
  194 + }
  195 +
  196 + @Test
  197 + public void testTwoRuleChainsWithTwoRules() throws Exception {
  198 + // Creating Rule Chain
  199 + RuleChain rootRuleChain = new RuleChain();
  200 + rootRuleChain.setName("Root Rule Chain");
  201 + rootRuleChain.setTenantId(savedTenant.getId());
  202 + rootRuleChain.setRoot(true);
  203 + rootRuleChain.setDebugMode(true);
  204 + rootRuleChain = saveRuleChain(rootRuleChain);
  205 + Assert.assertNull(rootRuleChain.getFirstRuleNodeId());
  206 +
  207 + // Creating Rule Chain
  208 + RuleChain secondaryRuleChain = new RuleChain();
  209 + secondaryRuleChain.setName("Secondary Rule Chain");
  210 + secondaryRuleChain.setTenantId(savedTenant.getId());
  211 + secondaryRuleChain.setRoot(false);
  212 + secondaryRuleChain.setDebugMode(true);
  213 + secondaryRuleChain = saveRuleChain(secondaryRuleChain);
  214 + Assert.assertNull(secondaryRuleChain.getFirstRuleNodeId());
  215 +
  216 + RuleChainMetaData rootMetaData = new RuleChainMetaData();
  217 + rootMetaData.setRuleChainId(rootRuleChain.getId());
  218 +
  219 + RuleNode ruleNode1 = new RuleNode();
  220 + ruleNode1.setName("Simple Rule Node 1");
  221 + ruleNode1.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
  222 + ruleNode1.setDebugMode(true);
  223 + TbGetAttributesNodeConfiguration configuration1 = new TbGetAttributesNodeConfiguration();
  224 + configuration1.setServerAttributeNames(Collections.singletonList("serverAttributeKey1"));
  225 + ruleNode1.setConfiguration(mapper.valueToTree(configuration1));
  226 +
  227 + rootMetaData.setNodes(Collections.singletonList(ruleNode1));
  228 + rootMetaData.setFirstNodeIndex(0);
  229 + rootMetaData.addRuleChainConnectionInfo(0, secondaryRuleChain.getId(), "Success", mapper.createObjectNode());
  230 + rootMetaData = saveRuleChainMetaData(rootMetaData);
  231 + Assert.assertNotNull(rootMetaData);
  232 +
  233 + rootRuleChain = getRuleChain(rootRuleChain.getId());
  234 + Assert.assertNotNull(rootRuleChain.getFirstRuleNodeId());
  235 +
  236 +
  237 + RuleChainMetaData secondaryMetaData = new RuleChainMetaData();
  238 + secondaryMetaData.setRuleChainId(secondaryRuleChain.getId());
  239 +
  240 + RuleNode ruleNode2 = new RuleNode();
  241 + ruleNode2.setName("Simple Rule Node 2");
  242 + ruleNode2.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
  243 + ruleNode2.setDebugMode(true);
  244 + TbGetAttributesNodeConfiguration configuration2 = new TbGetAttributesNodeConfiguration();
  245 + configuration2.setServerAttributeNames(Collections.singletonList("serverAttributeKey2"));
  246 + ruleNode2.setConfiguration(mapper.valueToTree(configuration2));
  247 +
  248 + secondaryMetaData.setNodes(Collections.singletonList(ruleNode2));
  249 + secondaryMetaData.setFirstNodeIndex(0);
  250 + secondaryMetaData = saveRuleChainMetaData(secondaryMetaData);
  251 + Assert.assertNotNull(secondaryMetaData);
  252 +
  253 + // Saving the device
  254 + Device device = new Device();
  255 + device.setName("My device");
  256 + device.setType("default");
  257 + device = doPost("/api/device", device, Device.class);
  258 +
  259 + attributesService.save(device.getId(), DataConstants.SERVER_SCOPE,
  260 + Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey1", "serverAttributeValue1"), System.currentTimeMillis())));
  261 + attributesService.save(device.getId(), DataConstants.SERVER_SCOPE,
  262 + Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey2", "serverAttributeValue2"), System.currentTimeMillis())));
  263 +
  264 +
  265 + Thread.sleep(1000);
  266 +
  267 + // Pushing Message to the system
  268 + TbMsg tbMsg = new TbMsg(UUIDs.timeBased(),
  269 + "CUSTOM",
  270 + device.getId(),
  271 + new TbMsgMetaData(),
  272 + "{}", null, null, 0L);
  273 + actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg));
  274 +
  275 + Thread.sleep(3000);
  276 +
  277 + TimePageData<Event> events = getDebugEvents(savedTenant.getId(), rootRuleChain.getFirstRuleNodeId(), 1000);
  278 +
  279 + Assert.assertEquals(2, events.getData().size());
  280 +
  281 + Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
  282 + Assert.assertEquals(rootRuleChain.getFirstRuleNodeId(), inEvent.getEntityId());
  283 + Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
  284 +
  285 + Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
  286 + Assert.assertEquals(rootRuleChain.getFirstRuleNodeId(), outEvent.getEntityId());
  287 + Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
  288 +
  289 + Assert.assertEquals("serverAttributeValue1", getMetadata(outEvent).get("ss_serverAttributeKey1").asText());
  290 +
  291 + RuleChain finalRuleChain = rootRuleChain;
  292 + RuleNode lastRuleNode = secondaryMetaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get();
  293 +
  294 + events = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
  295 +
  296 + Assert.assertEquals(2, events.getData().size());
  297 +
  298 + inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
  299 + Assert.assertEquals(lastRuleNode.getId(), inEvent.getEntityId());
  300 + Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
  301 +
  302 + outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
  303 + Assert.assertEquals(lastRuleNode.getId(), outEvent.getEntityId());
  304 + Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
  305 +
  306 + Assert.assertEquals("serverAttributeValue1", getMetadata(outEvent).get("ss_serverAttributeKey1").asText());
  307 + Assert.assertEquals("serverAttributeValue2", getMetadata(outEvent).get("ss_serverAttributeKey2").asText());
  308 +
  309 + List<TbMsg> unAckMsgList = Lists.newArrayList(msgQueue.findUnprocessed(rootRuleChain.getId().getId(), 0L));
  310 + Assert.assertEquals(0, unAckMsgList.size());
  311 +
  312 + unAckMsgList = Lists.newArrayList(msgQueue.findUnprocessed(secondaryRuleChain.getId().getId(), 0L));
  313 + Assert.assertEquals(0, unAckMsgList.size());
189 314 }
190 315
191 316 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.rules.flow.nosql;
  17 +
  18 +import org.thingsboard.server.dao.service.DaoNoSqlTest;
  19 +import org.thingsboard.server.rules.flow.AbstractRuleEngineFlowIntegrationTest;
  20 +
  21 +/**
  22 + * Created by Valerii Sosliuk on 8/22/2017.
  23 + */
  24 +@DaoNoSqlTest
  25 +public class RuleEngineFlowNoSqlIntegrationTest extends AbstractRuleEngineFlowIntegrationTest {
  26 +}
... ...
application/src/test/java/org/thingsboard/server/rules/flow/sql/RuleEngineFlowSqlIntegrationTest.java renamed from application/src/test/java/org/thingsboard/server/rules/flow/RuleEngineFlowSqlIntegrationTest.java
... ... @@ -13,10 +13,11 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -package org.thingsboard.server.rules.flow;
  16 +package org.thingsboard.server.rules.flow.sql;
17 17
18 18 import org.thingsboard.server.dao.service.DaoSqlTest;
19 19 import org.thingsboard.server.mqtt.rpc.AbstractMqttServerSideRpcIntegrationTest;
  20 +import org.thingsboard.server.rules.flow.AbstractRuleEngineFlowIntegrationTest;
20 21
21 22 /**
22 23 * Created by Valerii Sosliuk on 8/22/2017.
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.rules.lifecycle.nosql;
  17 +
  18 +import org.thingsboard.server.dao.service.DaoNoSqlTest;
  19 +import org.thingsboard.server.rules.lifecycle.AbstractRuleEngineLifecycleIntegrationTest;
  20 +
  21 +/**
  22 + * Created by Valerii Sosliuk on 8/22/2017.
  23 + */
  24 +@DaoNoSqlTest
  25 +public class RuleEngineLifecycleNoSqlIntegrationTest extends AbstractRuleEngineLifecycleIntegrationTest {
  26 +}
... ...
application/src/test/java/org/thingsboard/server/rules/lifecycle/sql/RuleEngineLifecycleSqlIntegrationTest.java renamed from application/src/test/java/org/thingsboard/server/rules/lifecycle/RuleEngineLifecycleSqlIntegrationTest.java
... ... @@ -13,10 +13,11 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -package org.thingsboard.server.rules.lifecycle;
  16 +package org.thingsboard.server.rules.lifecycle.sql;
17 17
18 18 import org.thingsboard.server.dao.service.DaoSqlTest;
19 19 import org.thingsboard.server.rules.flow.AbstractRuleEngineFlowIntegrationTest;
  20 +import org.thingsboard.server.rules.lifecycle.AbstractRuleEngineLifecycleIntegrationTest;
20 21
21 22 /**
22 23 * Created by Valerii Sosliuk on 8/22/2017.
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.sql.queue;
  17 +
  18 +import lombok.Data;
  19 +
  20 +import java.util.UUID;
  21 +
  22 +/**
  23 + * Created by ashvayka on 30.04.18.
  24 + */
  25 +@Data
  26 +public final class InMemoryMsgKey {
  27 + final UUID nodeId;
  28 + final long clusterPartition;
  29 +}
... ...
dao/src/main/java/org/thingsboard/server/dao/sql/queue/InMemoryMsgQueue.java renamed from dao/src/main/java/org/thingsboard/server/dao/sql/queue/DummySqlMsgQueue.java
... ... @@ -15,16 +15,26 @@
15 15 */
16 16 package org.thingsboard.server.dao.sql.queue;
17 17
18   -import com.google.common.util.concurrent.Futures;
19 18 import com.google.common.util.concurrent.ListenableFuture;
  19 +import com.google.common.util.concurrent.ListeningExecutorService;
  20 +import com.google.common.util.concurrent.MoreExecutors;
20 21 import lombok.extern.slf4j.Slf4j;
21 22 import org.springframework.stereotype.Component;
22 23 import org.thingsboard.server.common.msg.TbMsg;
23 24 import org.thingsboard.server.dao.queue.MsgQueue;
24 25 import org.thingsboard.server.dao.util.SqlDao;
25 26
  27 +import javax.annotation.PostConstruct;
  28 +import javax.annotation.PreDestroy;
  29 +import java.util.ArrayList;
26 30 import java.util.Collections;
  31 +import java.util.HashMap;
  32 +import java.util.List;
  33 +import java.util.Map;
27 34 import java.util.UUID;
  35 +import java.util.concurrent.ExecutionException;
  36 +import java.util.concurrent.Executors;
  37 +import java.util.concurrent.atomic.AtomicLong;
28 38
29 39 /**
30 40 * Created by ashvayka on 27.04.18.
... ... @@ -32,19 +42,65 @@ import java.util.UUID;
32 42 @Component
33 43 @Slf4j
34 44 @SqlDao
35   -public class DummySqlMsgQueue implements MsgQueue {
  45 +public class InMemoryMsgQueue implements MsgQueue {
  46 +
  47 + private ListeningExecutorService queueExecutor;
  48 + //TODO:
  49 + private AtomicLong pendingMsgCount;
  50 + private Map<InMemoryMsgKey, Map<UUID, TbMsg>> data = new HashMap<>();
  51 +
  52 + @PostConstruct
  53 + public void init() {
  54 + // Should be always single threaded due to absence of locks.
  55 + queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
  56 + }
  57 +
  58 + @PreDestroy
  59 + public void stop() {
  60 + if (queueExecutor == null) {
  61 + queueExecutor.shutdownNow();
  62 + }
  63 + }
  64 +
36 65 @Override
37 66 public ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusterPartition) {
38   - return Futures.immediateFuture(null);
  67 + return queueExecutor.submit(() -> {
  68 + data.computeIfAbsent(new InMemoryMsgKey(nodeId, clusterPartition), key -> new HashMap<>()).put(msg.getId(), msg);
  69 + return null;
  70 + });
39 71 }
40 72
41 73 @Override
42 74 public ListenableFuture<Void> ack(TbMsg msg, UUID nodeId, long clusterPartition) {
43   - return Futures.immediateFuture(null);
  75 + return queueExecutor.submit(() -> {
  76 + InMemoryMsgKey key = new InMemoryMsgKey(nodeId, clusterPartition);
  77 + Map<UUID, TbMsg> map = data.get(key);
  78 + if (map != null) {
  79 + map.remove(msg.getId());
  80 + if (map.isEmpty()) {
  81 + data.remove(key);
  82 + }
  83 + }
  84 + return null;
  85 + });
  86 +
44 87 }
45 88
46 89 @Override
47 90 public Iterable<TbMsg> findUnprocessed(UUID nodeId, long clusterPartition) {
48   - return Collections.emptyList();
  91 + ListenableFuture<List<TbMsg>> list = queueExecutor.submit(() -> {
  92 + InMemoryMsgKey key = new InMemoryMsgKey(nodeId, clusterPartition);
  93 + Map<UUID, TbMsg> map = data.get(key);
  94 + if (map != null) {
  95 + return new ArrayList<>(map.values());
  96 + } else {
  97 + return Collections.emptyList();
  98 + }
  99 + });
  100 + try {
  101 + return list.get();
  102 + } catch (InterruptedException | ExecutionException e) {
  103 + throw new RuntimeException(e);
  104 + }
49 105 }
50 106 }
... ...