Commit 2638d1eaf7122927e04ec0bfd23e75a7c2563aa6

Authored by Igor Kulikov
1 parent f135b268

Remove Msg queue. Fix tests.

... ... @@ -26,7 +26,6 @@ import org.thingsboard.server.common.data.id.TenantId;
26 26 import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
27 27 import org.thingsboard.server.common.msg.TbMsg;
28 28 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
29   -import org.thingsboard.server.service.queue.MsgQueueService;
30 29
31 30 import javax.annotation.Nullable;
32 31 import java.util.function.Consumer;
... ...
1   -/**
2   - * Copyright © 2016-2018 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.service.queue;
17   -
18   -import com.google.common.util.concurrent.Futures;
19   -import com.google.common.util.concurrent.ListenableFuture;
20   -import lombok.extern.slf4j.Slf4j;
21   -import org.springframework.beans.factory.annotation.Autowired;
22   -import org.springframework.beans.factory.annotation.Value;
23   -import org.springframework.stereotype.Service;
24   -import org.thingsboard.server.common.data.id.TenantId;
25   -import org.thingsboard.server.common.msg.TbMsg;
26   -import org.thingsboard.server.common.transport.quota.tenant.TenantQuotaService;
27   -import org.thingsboard.server.dao.queue.MsgQueue;
28   -
29   -import javax.annotation.PostConstruct;
30   -import javax.annotation.PreDestroy;
31   -import java.util.Map;
32   -import java.util.UUID;
33   -import java.util.concurrent.ConcurrentHashMap;
34   -import java.util.concurrent.Executors;
35   -import java.util.concurrent.ScheduledExecutorService;
36   -import java.util.concurrent.TimeUnit;
37   -import java.util.concurrent.atomic.AtomicLong;
38   -
39   -@Service
40   -@Slf4j
41   -public class DefaultMsgQueueService implements MsgQueueService {
42   -
43   - @Value("${actors.rule.queue.max_size}")
44   - private long queueMaxSize;
45   -
46   - @Value("${actors.rule.queue.cleanup_period}")
47   - private long queueCleanUpPeriod;
48   -
49   - @Autowired
50   - private MsgQueue msgQueue;
51   -
52   - @Autowired(required = false)
53   - private TenantQuotaService quotaService;
54   -
55   - private ScheduledExecutorService cleanupExecutor;
56   -
57   - private Map<TenantId, AtomicLong> pendingCountPerTenant = new ConcurrentHashMap<>();
58   -
59   - @PostConstruct
60   - public void init() {
61   - if (queueCleanUpPeriod > 0) {
62   - cleanupExecutor = Executors.newSingleThreadScheduledExecutor();
63   - cleanupExecutor.scheduleAtFixedRate(() -> cleanup(),
64   - queueCleanUpPeriod, queueCleanUpPeriod, TimeUnit.SECONDS);
65   - }
66   - }
67   -
68   - @PreDestroy
69   - public void stop() {
70   - if (cleanupExecutor != null) {
71   - cleanupExecutor.shutdownNow();
72   - }
73   - }
74   -
75   - @Override
76   - public ListenableFuture<Void> put(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition) {
77   - if(quotaService != null && quotaService.isQuotaExceeded(tenantId.getId().toString())) {
78   - log.warn("Tenant TbMsg Quota exceeded for [{}:{}] . Reject", tenantId.getId());
79   - return Futures.immediateFailedFuture(new RuntimeException("Tenant TbMsg Quota exceeded"));
80   - }
81   -
82   - AtomicLong pendingMsgCount = pendingCountPerTenant.computeIfAbsent(tenantId, key -> new AtomicLong());
83   - if (pendingMsgCount.incrementAndGet() < queueMaxSize) {
84   - return msgQueue.put(tenantId, msg, nodeId, clusterPartition);
85   - } else {
86   - pendingMsgCount.decrementAndGet();
87   - return Futures.immediateFailedFuture(new RuntimeException("Message queue is full!"));
88   - }
89   - }
90   -
91   - @Override
92   - public ListenableFuture<Void> ack(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition) {
93   - ListenableFuture<Void> result = msgQueue.ack(tenantId, msg, nodeId, clusterPartition);
94   - AtomicLong pendingMsgCount = pendingCountPerTenant.computeIfAbsent(tenantId, key -> new AtomicLong());
95   - pendingMsgCount.decrementAndGet();
96   - return result;
97   - }
98   -
99   - @Override
100   - public Iterable<TbMsg> findUnprocessed(TenantId tenantId, UUID nodeId, long clusterPartition) {
101   - return msgQueue.findUnprocessed(tenantId, nodeId, clusterPartition);
102   - }
103   -
104   - private void cleanup() {
105   - pendingCountPerTenant.forEach((tenantId, pendingMsgCount) -> {
106   - pendingMsgCount.set(0);
107   - msgQueue.cleanUp(tenantId);
108   - });
109   - }
110   -
111   -}
1   -/**
2   - * Copyright © 2016-2018 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.service.queue;
17   -
18   -import com.google.common.util.concurrent.ListenableFuture;
19   -import org.thingsboard.server.common.data.id.TenantId;
20   -import org.thingsboard.server.common.msg.TbMsg;
21   -
22   -import java.util.UUID;
23   -
24   -public interface MsgQueueService {
25   -
26   - ListenableFuture<Void> put(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition);
27   -
28   - ListenableFuture<Void> ack(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition);
29   -
30   - Iterable<TbMsg> findUnprocessed(TenantId tenantId, UUID nodeId, long clusterPartition);
31   -
32   -}
... ... @@ -182,12 +182,6 @@ cassandra:
182 182 permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
183 183 rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:30000}"
184 184
185   - queue:
186   - msg.ttl: 604800 # 7 days
187   - ack.ttl: 604800 # 7 days
188   - partitions.ttl: 604800 # 7 days
189   - partitioning: "HOURS"
190   -
191 185 # SQL configuration parameters
192 186 sql:
193 187 # Specify executor service type used to perform timeseries insert tasks: SINGLE FIXED CACHED
... ... @@ -221,13 +215,6 @@ actors:
221 215 node:
222 216 # Errors for particular actor are persisted once per specified amount of milliseconds
223 217 error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
224   - queue:
225   - # Message queue type
226   - type: "${ACTORS_RULE_QUEUE_TYPE:memory}"
227   - # Message queue maximum size (per tenant)
228   - max_size: "${ACTORS_RULE_QUEUE_MAX_SIZE:100}"
229   - # Message queue cleanup period in seconds
230   - cleanup_period: "${ACTORS_RULE_QUEUE_CLEANUP_PERIOD:3600}"
231 218 statistics:
232 219 # Enable/disable actor statistics
233 220 enabled: "${ACTORS_STATISTICS_ENABLED:true}"
... ...
... ... @@ -27,9 +27,7 @@ import org.thingsboard.server.common.data.page.TimePageData;
27 27 import org.thingsboard.server.common.data.page.TimePageLink;
28 28 import org.thingsboard.server.common.data.rule.RuleChain;
29 29 import org.thingsboard.server.common.data.rule.RuleChainMetaData;
30   -import org.thingsboard.server.dao.queue.MsgQueue;
31 30 import org.thingsboard.server.dao.rule.RuleChainService;
32   -import org.thingsboard.server.service.queue.MsgQueueService;
33 31
34 32 import java.io.IOException;
35 33 import java.util.function.Predicate;
... ... @@ -42,9 +40,6 @@ public class AbstractRuleEngineControllerTest extends AbstractControllerTest {
42 40 @Autowired
43 41 protected RuleChainService ruleChainService;
44 42
45   - @Autowired
46   - protected MsgQueueService msgQueueService;
47   -
48 43 protected RuleChain saveRuleChain(RuleChain ruleChain) throws Exception {
49 44 return doPost("/api/ruleChain", ruleChain, RuleChain.class);
50 45 }
... ...
... ... @@ -191,9 +191,6 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
191 191
192 192 Assert.assertEquals("serverAttributeValue1", getMetadata(outEvent).get("ss_serverAttributeKey1").asText());
193 193 Assert.assertEquals("serverAttributeValue2", getMetadata(outEvent).get("ss_serverAttributeKey2").asText());
194   -
195   - List<TbMsg> unAckMsgList = Lists.newArrayList(msgQueueService.findUnprocessed(savedTenant.getId(), ruleChain.getId().getId(), 0L));
196   - Assert.assertEquals(0, unAckMsgList.size());
197 194 }
198 195
199 196 @Test
... ... @@ -311,12 +308,6 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
311 308
312 309 Assert.assertEquals("serverAttributeValue1", getMetadata(outEvent).get("ss_serverAttributeKey1").asText());
313 310 Assert.assertEquals("serverAttributeValue2", getMetadata(outEvent).get("ss_serverAttributeKey2").asText());
314   -
315   - List<TbMsg> unAckMsgList = Lists.newArrayList(msgQueueService.findUnprocessed(savedTenant.getId(), rootRuleChain.getId().getId(), 0L));
316   - Assert.assertEquals(0, unAckMsgList.size());
317   -
318   - unAckMsgList = Lists.newArrayList(msgQueueService.findUnprocessed(savedTenant.getId(), secondaryRuleChain.getId().getId(), 0L));
319   - Assert.assertEquals(0, unAckMsgList.size());
320 311 }
321 312
322 313 }
... ...
... ... @@ -162,73 +162,4 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
162 162 Assert.assertEquals("serverAttributeValue", getMetadata(outEvent).get("ss_serverAttributeKey").asText());
163 163 }
164 164
165   - @Test
166   - public void testRuleChainWithOneRuleAndMsgFromQueue() throws Exception {
167   - // Creating Rule Chain
168   - RuleChain ruleChain = new RuleChain();
169   - ruleChain.setName("Simple Rule Chain");
170   - ruleChain.setTenantId(savedTenant.getId());
171   - ruleChain.setRoot(true);
172   - ruleChain.setDebugMode(true);
173   - ruleChain = saveRuleChain(ruleChain);
174   - Assert.assertNull(ruleChain.getFirstRuleNodeId());
175   -
176   - // Saving the device
177   - Device device = new Device();
178   - device.setName("My device");
179   - device.setType("default");
180   - device = doPost("/api/device", device, Device.class);
181   -
182   - attributesService.save(device.getId(), DataConstants.SERVER_SCOPE,
183   - Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey", "serverAttributeValue"), System.currentTimeMillis())));
184   -
185   - // Pushing Message to the system
186   - TbMsg tbMsg = new TbMsg(UUIDs.timeBased(),
187   - "CUSTOM",
188   - device.getId(),
189   - new TbMsgMetaData(),
190   - "{}",
191   - ruleChain.getId(), null, 0L);
192   - msgQueueService.put(device.getTenantId(), tbMsg, ruleChain.getId().getId(), 0L);
193   -
194   - Thread.sleep(1000);
195   -
196   - RuleChainMetaData metaData = new RuleChainMetaData();
197   - metaData.setRuleChainId(ruleChain.getId());
198   -
199   - RuleNode ruleNode = new RuleNode();
200   - ruleNode.setName("Simple Rule Node");
201   - ruleNode.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
202   - ruleNode.setDebugMode(true);
203   - TbGetAttributesNodeConfiguration configuration = new TbGetAttributesNodeConfiguration();
204   - configuration.setServerAttributeNames(Collections.singletonList("serverAttributeKey"));
205   - ruleNode.setConfiguration(mapper.valueToTree(configuration));
206   -
207   - metaData.setNodes(Collections.singletonList(ruleNode));
208   - metaData.setFirstNodeIndex(0);
209   -
210   - metaData = saveRuleChainMetaData(metaData);
211   - Assert.assertNotNull(metaData);
212   -
213   - ruleChain = getRuleChain(ruleChain.getId());
214   - Assert.assertNotNull(ruleChain.getFirstRuleNodeId());
215   -
216   - Thread.sleep(3000);
217   -
218   - TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
219   - List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
220   -
221   - Assert.assertEquals(2, events.size());
222   -
223   - Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
224   - Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
225   - Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
226   -
227   - Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
228   - Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
229   - Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
230   -
231   - Assert.assertEquals("serverAttributeValue", getMetadata(outEvent).get("ss_serverAttributeKey").asText());
232   - }
233   -
234 165 }
... ...
... ... @@ -57,7 +57,7 @@ public abstract class BaseHttpDeviceApiTest extends AbstractControllerTest {
57 57 @Test
58 58 public void testGetAttributes() throws Exception {
59 59 doGetAsync("/api/v1/" + "WRONG_TOKEN" + "/attributes?clientKeys=keyA,keyB,keyC").andExpect(status().isUnauthorized());
60   - doGetAsync("/api/v1/" + deviceCredentials.getCredentialsId() + "/attributes?clientKeys=keyA,keyB,keyC").andExpect(status().isNotFound());
  60 + doGetAsync("/api/v1/" + deviceCredentials.getCredentialsId() + "/attributes?clientKeys=keyA,keyB,keyC").andExpect(status().isOk());
61 61
62 62 Map<String, String> attrMap = new HashMap<>();
63 63 attrMap.put("keyA", "valueA");
... ...
1   -/**
2   - * Copyright © 2016-2018 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.dao.queue;
17   -
18   -import com.google.common.util.concurrent.ListenableFuture;
19   -import org.thingsboard.server.common.data.id.TenantId;
20   -import org.thingsboard.server.common.msg.TbMsg;
21   -
22   -import java.util.UUID;
23   -
24   -public interface MsgQueue {
25   -
26   - ListenableFuture<Void> put(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition);
27   -
28   - ListenableFuture<Void> ack(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition);
29   -
30   - Iterable<TbMsg> findUnprocessed(TenantId tenantId, UUID nodeId, long clusterPartition);
31   -
32   - ListenableFuture<Void> cleanUp(TenantId tenantId);
33   -
34   -}
1   -/**
2   - * Copyright © 2016-2018 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.dao.queue;
17   -
18   -import com.datastax.driver.core.Cluster;
19   -import com.datastax.driver.core.HostDistance;
20   -import com.datastax.driver.core.PoolingOptions;
21   -import com.datastax.driver.core.Session;
22   -import com.datastax.driver.core.utils.UUIDs;
23   -import com.google.common.util.concurrent.FutureCallback;
24   -import com.google.common.util.concurrent.Futures;
25   -import com.google.common.util.concurrent.ListenableFuture;
26   -import lombok.extern.slf4j.Slf4j;
27   -import org.springframework.beans.factory.annotation.Autowired;
28   -import org.springframework.boot.CommandLineRunner;
29   -import org.springframework.boot.SpringApplication;
30   -import org.springframework.context.annotation.Bean;
31   -import org.thingsboard.server.common.data.id.EntityId;
32   -import org.thingsboard.server.common.data.id.RuleChainId;
33   -import org.thingsboard.server.common.data.id.RuleNodeId;
34   -import org.thingsboard.server.common.data.id.TenantId;
35   -import org.thingsboard.server.common.msg.TbMsg;
36   -import org.thingsboard.server.common.msg.TbMsgDataType;
37   -import org.thingsboard.server.common.msg.TbMsgMetaData;
38   -
39   -import javax.annotation.Nullable;
40   -import java.net.InetSocketAddress;
41   -import java.util.UUID;
42   -import java.util.concurrent.CountDownLatch;
43   -import java.util.concurrent.ExecutorService;
44   -import java.util.concurrent.Executors;
45   -import java.util.concurrent.TimeUnit;
46   -import java.util.concurrent.atomic.AtomicLong;
47   -
48   -//@SpringBootApplication
49   -//@EnableAutoConfiguration
50   -//@ComponentScan({"org.thingsboard.rule.engine"})
51   -//@PropertySource("classpath:processing-pipeline.properties")
52   -@Slf4j
53   -public class QueueBenchmark implements CommandLineRunner {
54   -
55   - public static void main(String[] args) {
56   - try {
57   - SpringApplication.run(QueueBenchmark.class, args);
58   - } catch (Throwable th) {
59   - th.printStackTrace();
60   - System.exit(0);
61   - }
62   - }
63   -
64   - @Autowired
65   - private MsgQueue msgQueue;
66   -
67   - @Override
68   - public void run(String... strings) throws Exception {
69   - System.out.println("It works + " + msgQueue);
70   -
71   -
72   - long start = System.currentTimeMillis();
73   - int msgCount = 10000000;
74   - AtomicLong count = new AtomicLong(0);
75   - ExecutorService service = Executors.newFixedThreadPool(100);
76   -
77   - CountDownLatch latch = new CountDownLatch(msgCount);
78   - for (int i = 0; i < msgCount; i++) {
79   - service.submit(() -> {
80   - boolean isFinished = false;
81   - while (!isFinished) {
82   - try {
83   - TbMsg msg = randomMsg();
84   - UUID nodeId = UUIDs.timeBased();
85   - ListenableFuture<Void> put = msgQueue.put(new TenantId(EntityId.NULL_UUID), msg, nodeId, 100L);
86   -// ListenableFuture<Void> put = msgQueue.ack(msg, nodeId, 100L);
87   - Futures.addCallback(put, new FutureCallback<Void>() {
88   - @Override
89   - public void onSuccess(@Nullable Void result) {
90   - latch.countDown();
91   - }
92   -
93   - @Override
94   - public void onFailure(Throwable t) {
95   -// t.printStackTrace();
96   - System.out.println("onFailure, because:" + t.getMessage());
97   - latch.countDown();
98   - }
99   - });
100   - isFinished = true;
101   - } catch (Throwable th) {
102   -// th.printStackTrace();
103   - System.out.println("Repeat query, because:" + th.getMessage());
104   -// latch.countDown();
105   - }
106   - }
107   - });
108   - }
109   -
110   - long prev = 0L;
111   - while (latch.getCount() != 0) {
112   - TimeUnit.SECONDS.sleep(1);
113   - long curr = latch.getCount();
114   - long rps = prev - curr;
115   - prev = curr;
116   - System.out.println("rps = " + rps);
117   - }
118   -
119   - long end = System.currentTimeMillis();
120   - System.out.println("final rps = " + (msgCount / (end - start) * 1000));
121   -
122   - System.out.println("Finished");
123   -
124   - }
125   -
126   - private TbMsg randomMsg() {
127   - TbMsgMetaData metaData = new TbMsgMetaData();
128   - metaData.putValue("key", "value");
129   - String dataStr = "someContent";
130   - return new TbMsg(UUIDs.timeBased(), "type", null, metaData, TbMsgDataType.JSON, dataStr, new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L);
131   - }
132   -
133   - @Bean
134   - public Session session() {
135   - Cluster thingsboard = Cluster.builder()
136   - .addContactPointsWithPorts(new InetSocketAddress("127.0.0.1", 9042))
137   - .withClusterName("thingsboard")
138   -// .withSocketOptions(socketOpts.getOpts())
139   - .withPoolingOptions(new PoolingOptions()
140   - .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
141   - .setMaxRequestsPerConnection(HostDistance.REMOTE, 32768)).build();
142   -
143   - Session session = thingsboard.connect("thingsboard");
144   - return session;
145   - }
146   -
147   - @Bean
148   - public int defaultTtl() {
149   - return 6000;
150   - }
151   -
152   -}
1   -/**
2   - * Copyright © 2016-2018 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.dao.queue.memory;
17   -
18   -import lombok.Data;
19   -
20   -import java.util.UUID;
21   -
22   -/**
23   - * Created by ashvayka on 30.04.18.
24   - */
25   -@Data
26   -public final class InMemoryMsgKey {
27   - final UUID nodeId;
28   - final long clusterPartition;
29   -}
1   -/**
2   - * Copyright © 2016-2018 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.dao.queue.memory;
17   -
18   -import com.google.common.util.concurrent.ListenableFuture;
19   -import com.google.common.util.concurrent.ListeningExecutorService;
20   -import com.google.common.util.concurrent.MoreExecutors;
21   -import lombok.extern.slf4j.Slf4j;
22   -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
23   -import org.springframework.stereotype.Component;
24   -import org.thingsboard.server.common.data.id.TenantId;
25   -import org.thingsboard.server.common.msg.TbMsg;
26   -import org.thingsboard.server.dao.queue.MsgQueue;
27   -
28   -import javax.annotation.PostConstruct;
29   -import javax.annotation.PreDestroy;
30   -import java.util.ArrayList;
31   -import java.util.Collections;
32   -import java.util.HashMap;
33   -import java.util.List;
34   -import java.util.Map;
35   -import java.util.UUID;
36   -import java.util.concurrent.ExecutionException;
37   -import java.util.concurrent.Executors;
38   -
39   -/**
40   - * Created by ashvayka on 27.04.18.
41   - */
42   -@Component
43   -@ConditionalOnProperty(prefix = "actors.rule.queue", value = "type", havingValue = "memory", matchIfMissing = true)
44   -@Slf4j
45   -public class InMemoryMsgQueue implements MsgQueue {
46   -
47   - private ListeningExecutorService queueExecutor;
48   - private Map<TenantId, Map<InMemoryMsgKey, Map<UUID, TbMsg>>> data = new HashMap<>();
49   -
50   - @PostConstruct
51   - public void init() {
52   - // Should be always single threaded due to absence of locks.
53   - queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
54   - }
55   -
56   - @PreDestroy
57   - public void stop() {
58   - if (queueExecutor != null) {
59   - queueExecutor.shutdownNow();
60   - }
61   - }
62   -
63   - @Override
64   - public ListenableFuture<Void> put(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition) {
65   - return queueExecutor.submit(() -> {
66   - data.computeIfAbsent(tenantId, key -> new HashMap<>()).
67   - computeIfAbsent(new InMemoryMsgKey(nodeId, clusterPartition), key -> new HashMap<>()).put(msg.getId(), msg);
68   - return null;
69   - });
70   - }
71   -
72   - @Override
73   - public ListenableFuture<Void> ack(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition) {
74   - return queueExecutor.submit(() -> {
75   - Map<InMemoryMsgKey, Map<UUID, TbMsg>> tenantMap = data.get(tenantId);
76   - if (tenantMap != null) {
77   - InMemoryMsgKey key = new InMemoryMsgKey(nodeId, clusterPartition);
78   - Map<UUID, TbMsg> map = tenantMap.get(key);
79   - if (map != null) {
80   - map.remove(msg.getId());
81   - if (map.isEmpty()) {
82   - tenantMap.remove(key);
83   - }
84   - }
85   - if (tenantMap.isEmpty()) {
86   - data.remove(tenantId);
87   - }
88   - }
89   - return null;
90   - });
91   - }
92   -
93   - @Override
94   - public Iterable<TbMsg> findUnprocessed(TenantId tenantId, UUID nodeId, long clusterPartition) {
95   - ListenableFuture<List<TbMsg>> list = queueExecutor.submit(() -> {
96   - Map<InMemoryMsgKey, Map<UUID, TbMsg>> tenantMap = data.get(tenantId);
97   - if (tenantMap != null) {
98   - InMemoryMsgKey key = new InMemoryMsgKey(nodeId, clusterPartition);
99   - Map<UUID, TbMsg> map = tenantMap.get(key);
100   - if (map != null) {
101   - return new ArrayList<>(map.values());
102   - } else {
103   - return Collections.emptyList();
104   - }
105   - } else {
106   - return Collections.emptyList();
107   - }
108   - });
109   - try {
110   - return list.get();
111   - } catch (InterruptedException | ExecutionException e) {
112   - throw new RuntimeException(e);
113   - }
114   - }
115   -
116   - @Override
117   - public ListenableFuture<Void> cleanUp(TenantId tenantId) {
118   - return queueExecutor.submit(() -> {
119   - data.remove(tenantId);
120   - return null;
121   - });
122   - }
123   -}
1 1 database.entities.type=cassandra
2 2 database.ts.type=cassandra
3   -
4   -cassandra.queue.partitioning=HOURS
5   -cassandra.queue.ack.ttl=3600
6   -cassandra.queue.msg.ttl=3600
7   -cassandra.queue.partitions.ttl=3600
\ No newline at end of file
... ...