Commit 02e2c14fc959156cd2a8b406f33c6af3e875c7f1

Authored by Igor Kulikov
1 parent d1552a13

Cleanup

Showing 17 changed files with 1 additions and 928 deletions
@@ -257,7 +257,7 @@ actors: @@ -257,7 +257,7 @@ actors:
257 # Errors for particular actor are persisted once per specified amount of milliseconds 257 # Errors for particular actor are persisted once per specified amount of milliseconds
258 error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}" 258 error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
259 queue: 259 queue:
260 - # Message queue type (memory or db) 260 + # Message queue type
261 type: "${ACTORS_RULE_QUEUE_TYPE:memory}" 261 type: "${ACTORS_RULE_QUEUE_TYPE:memory}"
262 # Message queue maximum size (per tenant) 262 # Message queue maximum size (per tenant)
263 max_size: "${ACTORS_RULE_QUEUE_MAX_SIZE:100}" 263 max_size: "${ACTORS_RULE_QUEUE_MAX_SIZE:100}"
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db;  
17 -  
18 -import lombok.Data;  
19 -import lombok.EqualsAndHashCode;  
20 -  
21 -import java.util.UUID;  
22 -  
23 -@Data  
24 -@EqualsAndHashCode  
25 -public class MsgAck {  
26 -  
27 - private final UUID msgId;  
28 - private final UUID nodeId;  
29 - private final long clusteredPartition;  
30 - private final long tsPartition;  
31 -  
32 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db;  
17 -  
18 -import org.springframework.stereotype.Component;  
19 -import org.thingsboard.server.common.msg.TbMsg;  
20 -import org.thingsboard.server.dao.queue.db.MsgAck;  
21 -  
22 -import java.util.Collection;  
23 -import java.util.List;  
24 -import java.util.Set;  
25 -import java.util.UUID;  
26 -import java.util.stream.Collectors;  
27 -  
28 -@Component  
29 -public class UnprocessedMsgFilter {  
30 -  
31 - public Collection<TbMsg> filter(List<TbMsg> msgs, List<MsgAck> acks) {  
32 - Set<UUID> processedIds = acks.stream().map(MsgAck::getMsgId).collect(Collectors.toSet());  
33 - return msgs.stream().filter(i -> !processedIds.contains(i.getId())).collect(Collectors.toList());  
34 - }  
35 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.nosql;  
17 -  
18 -import com.datastax.driver.core.utils.UUIDs;  
19 -import com.google.common.collect.Lists;  
20 -import com.google.common.util.concurrent.Futures;  
21 -import com.google.common.util.concurrent.ListenableFuture;  
22 -import lombok.extern.slf4j.Slf4j;  
23 -import org.springframework.beans.factory.annotation.Autowired;  
24 -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;  
25 -import org.springframework.stereotype.Component;  
26 -import org.thingsboard.server.common.data.id.TenantId;  
27 -import org.thingsboard.server.common.msg.TbMsg;  
28 -import org.thingsboard.server.dao.queue.MsgQueue;  
29 -import org.thingsboard.server.dao.queue.db.MsgAck;  
30 -import org.thingsboard.server.dao.queue.db.UnprocessedMsgFilter;  
31 -import org.thingsboard.server.dao.queue.db.repository.AckRepository;  
32 -import org.thingsboard.server.dao.queue.db.repository.MsgRepository;  
33 -import org.thingsboard.server.dao.util.NoSqlDao;  
34 -  
35 -import java.util.List;  
36 -import java.util.UUID;  
37 -  
38 -@Component  
39 -@ConditionalOnProperty(prefix = "actors.rule.queue", value = "type", havingValue = "db")  
40 -@Slf4j  
41 -@NoSqlDao  
42 -public class CassandraMsgQueue implements MsgQueue {  
43 -  
44 - @Autowired  
45 - private MsgRepository msgRepository;  
46 - @Autowired  
47 - private AckRepository ackRepository;  
48 - @Autowired  
49 - private UnprocessedMsgFilter unprocessedMsgFilter;  
50 - @Autowired  
51 - private QueuePartitioner queuePartitioner;  
52 -  
53 - @Override  
54 - public ListenableFuture<Void> put(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition) {  
55 - long msgTime = getMsgTime(msg);  
56 - long tsPartition = queuePartitioner.getPartition(msgTime);  
57 - return msgRepository.save(msg, nodeId, clusterPartition, tsPartition, msgTime);  
58 - }  
59 -  
60 - @Override  
61 - public ListenableFuture<Void> ack(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition) {  
62 - long tsPartition = queuePartitioner.getPartition(getMsgTime(msg));  
63 - MsgAck ack = new MsgAck(msg.getId(), nodeId, clusterPartition, tsPartition);  
64 - return ackRepository.ack(ack);  
65 - }  
66 -  
67 - @Override  
68 - public Iterable<TbMsg> findUnprocessed(TenantId tenantId, UUID nodeId, long clusterPartition) {  
69 - List<TbMsg> unprocessedMsgs = Lists.newArrayList();  
70 - for (Long tsPartition : queuePartitioner.findUnprocessedPartitions(nodeId, clusterPartition)) {  
71 - List<TbMsg> msgs = msgRepository.findMsgs(nodeId, clusterPartition, tsPartition);  
72 - List<MsgAck> acks = ackRepository.findAcks(nodeId, clusterPartition, tsPartition);  
73 - unprocessedMsgs.addAll(unprocessedMsgFilter.filter(msgs, acks));  
74 - }  
75 - return unprocessedMsgs;  
76 - }  
77 -  
78 - @Override  
79 - public ListenableFuture<Void> cleanUp(TenantId tenantId) {  
80 - return Futures.immediateFuture(null);  
81 - }  
82 -  
83 - private long getMsgTime(TbMsg msg) {  
84 - return UUIDs.unixTimestamp(msg.getId());  
85 - }  
86 -  
87 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.nosql;  
17 -  
18 -import com.google.common.collect.Lists;  
19 -import lombok.extern.slf4j.Slf4j;  
20 -import org.springframework.beans.factory.annotation.Value;  
21 -import org.springframework.stereotype.Component;  
22 -import org.thingsboard.server.dao.queue.db.repository.ProcessedPartitionRepository;  
23 -import org.thingsboard.server.dao.timeseries.TsPartitionDate;  
24 -import org.thingsboard.server.dao.util.NoSqlDao;  
25 -  
26 -import java.time.Clock;  
27 -import java.time.Instant;  
28 -import java.time.LocalDateTime;  
29 -import java.time.ZoneOffset;  
30 -import java.util.List;  
31 -import java.util.Optional;  
32 -import java.util.UUID;  
33 -import java.util.concurrent.TimeUnit;  
34 -  
35 -@Component  
36 -@Slf4j  
37 -@NoSqlDao  
38 -public class QueuePartitioner {  
39 -  
40 - private final TsPartitionDate tsFormat;  
41 - private ProcessedPartitionRepository processedPartitionRepository;  
42 - private Clock clock = Clock.systemUTC();  
43 -  
44 - public QueuePartitioner(@Value("${cassandra.queue.partitioning}") String partitioning,  
45 - ProcessedPartitionRepository processedPartitionRepository) {  
46 - this.processedPartitionRepository = processedPartitionRepository;  
47 - Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);  
48 - if (partition.isPresent()) {  
49 - tsFormat = partition.get();  
50 - } else {  
51 - log.warn("Incorrect configuration of partitioning {}", partitioning);  
52 - throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!");  
53 - }  
54 - }  
55 -  
56 - public long getPartition(long ts) {  
57 - //TODO: use TsPartitionDate.truncateTo?  
58 - LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);  
59 - return tsFormat.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli();  
60 - }  
61 -  
62 - public List<Long> findUnprocessedPartitions(UUID nodeId, long clusteredHash) {  
63 - Optional<Long> lastPartitionOption = processedPartitionRepository.findLastProcessedPartition(nodeId, clusteredHash);  
64 - long lastPartition = lastPartitionOption.orElse(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(7));  
65 - List<Long> unprocessedPartitions = Lists.newArrayList();  
66 -  
67 - LocalDateTime current = LocalDateTime.ofInstant(Instant.ofEpochMilli(lastPartition), ZoneOffset.UTC);  
68 - LocalDateTime end = LocalDateTime.ofInstant(Instant.now(clock), ZoneOffset.UTC)  
69 - .plus(1L, tsFormat.getTruncateUnit());  
70 -  
71 - while (current.isBefore(end)) {  
72 - current = current.plus(1L, tsFormat.getTruncateUnit());  
73 - unprocessedPartitions.add(tsFormat.truncatedTo(current).toInstant(ZoneOffset.UTC).toEpochMilli());  
74 - }  
75 -  
76 - return unprocessedPartitions;  
77 - }  
78 -  
79 - public void setClock(Clock clock) {  
80 - this.clock = clock;  
81 - }  
82 -  
83 - public void checkProcessedPartitions() {  
84 - //todo-vp: we need to implement this  
85 - }  
86 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.nosql.repository;  
17 -  
18 -import com.datastax.driver.core.BoundStatement;  
19 -import com.datastax.driver.core.PreparedStatement;  
20 -import com.datastax.driver.core.ResultSet;  
21 -import com.datastax.driver.core.ResultSetFuture;  
22 -import com.datastax.driver.core.Row;  
23 -import com.google.common.base.Function;  
24 -import com.google.common.util.concurrent.Futures;  
25 -import com.google.common.util.concurrent.ListenableFuture;  
26 -import org.springframework.beans.factory.annotation.Value;  
27 -import org.springframework.stereotype.Component;  
28 -import org.thingsboard.server.dao.nosql.CassandraAbstractDao;  
29 -import org.thingsboard.server.dao.queue.db.MsgAck;  
30 -import org.thingsboard.server.dao.queue.db.repository.AckRepository;  
31 -import org.thingsboard.server.dao.util.NoSqlDao;  
32 -  
33 -import java.util.ArrayList;  
34 -import java.util.List;  
35 -import java.util.UUID;  
36 -  
37 -@Component  
38 -@NoSqlDao  
39 -public class CassandraAckRepository extends CassandraAbstractDao implements AckRepository {  
40 -  
41 - @Value("${cassandra.queue.ack.ttl}")  
42 - private int ackQueueTtl;  
43 -  
44 - @Override  
45 - public ListenableFuture<Void> ack(MsgAck msgAck) {  
46 - String insert = "INSERT INTO msg_ack_queue (node_id, cluster_partition, ts_partition, msg_id) VALUES (?, ?, ?, ?) USING TTL ?";  
47 - PreparedStatement statement = prepare(insert);  
48 - BoundStatement boundStatement = statement.bind(msgAck.getNodeId(), msgAck.getClusteredPartition(),  
49 - msgAck.getTsPartition(), msgAck.getMsgId(), ackQueueTtl);  
50 - ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);  
51 - return Futures.transform(resultSetFuture, (Function<ResultSet, Void>) input -> null);  
52 - }  
53 -  
54 - @Override  
55 - public List<MsgAck> findAcks(UUID nodeId, long clusterPartition, long tsPartition) {  
56 - String select = "SELECT msg_id FROM msg_ack_queue WHERE " +  
57 - "node_id = ? AND cluster_partition = ? AND ts_partition = ?";  
58 - PreparedStatement statement = prepare(select);  
59 - BoundStatement boundStatement = statement.bind(nodeId, clusterPartition, tsPartition);  
60 - ResultSet rows = executeRead(boundStatement);  
61 - List<MsgAck> msgs = new ArrayList<>();  
62 - for (Row row : rows) {  
63 - msgs.add(new MsgAck(row.getUUID("msg_id"), nodeId, clusterPartition, tsPartition));  
64 - }  
65 - return msgs;  
66 - }  
67 -  
68 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.nosql.repository;  
17 -  
18 -import com.datastax.driver.core.BoundStatement;  
19 -import com.datastax.driver.core.PreparedStatement;  
20 -import com.datastax.driver.core.ResultSet;  
21 -import com.datastax.driver.core.ResultSetFuture;  
22 -import com.datastax.driver.core.Row;  
23 -import com.google.common.base.Function;  
24 -import com.google.common.util.concurrent.Futures;  
25 -import com.google.common.util.concurrent.ListenableFuture;  
26 -import org.springframework.beans.factory.annotation.Value;  
27 -import org.springframework.stereotype.Component;  
28 -import org.thingsboard.server.common.msg.TbMsg;  
29 -import org.thingsboard.server.dao.nosql.CassandraAbstractDao;  
30 -import org.thingsboard.server.dao.queue.db.repository.MsgRepository;  
31 -import org.thingsboard.server.dao.util.NoSqlDao;  
32 -  
33 -import java.util.ArrayList;  
34 -import java.util.List;  
35 -import java.util.UUID;  
36 -  
37 -@Component  
38 -@NoSqlDao  
39 -public class CassandraMsgRepository extends CassandraAbstractDao implements MsgRepository {  
40 -  
41 - @Value("${cassandra.queue.msg.ttl}")  
42 - private int msqQueueTtl;  
43 -  
44 - @Override  
45 - public ListenableFuture<Void> save(TbMsg msg, UUID nodeId, long clusterPartition, long tsPartition, long msgTs) {  
46 - String insert = "INSERT INTO msg_queue (node_id, cluster_partition, ts_partition, ts, msg) VALUES (?, ?, ?, ?, ?) USING TTL ?";  
47 - PreparedStatement statement = prepare(insert);  
48 - BoundStatement boundStatement = statement.bind(nodeId, clusterPartition, tsPartition, msgTs, TbMsg.toBytes(msg), msqQueueTtl);  
49 - ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);  
50 - return Futures.transform(resultSetFuture, (Function<ResultSet, Void>) input -> null);  
51 - }  
52 -  
53 - @Override  
54 - public List<TbMsg> findMsgs(UUID nodeId, long clusterPartition, long tsPartition) {  
55 - String select = "SELECT node_id, cluster_partition, ts_partition, ts, msg FROM msg_queue WHERE " +  
56 - "node_id = ? AND cluster_partition = ? AND ts_partition = ?";  
57 - PreparedStatement statement = prepare(select);  
58 - BoundStatement boundStatement = statement.bind(nodeId, clusterPartition, tsPartition);  
59 - ResultSet rows = executeRead(boundStatement);  
60 - List<TbMsg> msgs = new ArrayList<>();  
61 - for (Row row : rows) {  
62 - msgs.add(TbMsg.fromBytes(row.getBytes("msg")));  
63 - }  
64 - return msgs;  
65 - }  
66 -  
67 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.nosql.repository;  
17 -  
18 -import com.datastax.driver.core.BoundStatement;  
19 -import com.datastax.driver.core.PreparedStatement;  
20 -import com.datastax.driver.core.ResultSet;  
21 -import com.datastax.driver.core.ResultSetFuture;  
22 -import com.datastax.driver.core.Row;  
23 -import com.google.common.base.Function;  
24 -import com.google.common.util.concurrent.Futures;  
25 -import com.google.common.util.concurrent.ListenableFuture;  
26 -import org.springframework.beans.factory.annotation.Value;  
27 -import org.springframework.stereotype.Component;  
28 -import org.thingsboard.server.dao.nosql.CassandraAbstractDao;  
29 -import org.thingsboard.server.dao.queue.db.repository.ProcessedPartitionRepository;  
30 -import org.thingsboard.server.dao.util.NoSqlDao;  
31 -  
32 -import java.util.Optional;  
33 -import java.util.UUID;  
34 -  
35 -@Component  
36 -@NoSqlDao  
37 -public class CassandraProcessedPartitionRepository extends CassandraAbstractDao implements ProcessedPartitionRepository {  
38 -  
39 - @Value("${cassandra.queue.partitions.ttl}")  
40 - private int partitionsTtl;  
41 -  
42 - @Override  
43 - public ListenableFuture<Void> partitionProcessed(UUID nodeId, long clusterPartition, long tsPartition) {  
44 - String insert = "INSERT INTO processed_msg_partitions (node_id, cluster_partition, ts_partition) VALUES (?, ?, ?) USING TTL ?";  
45 - PreparedStatement prepared = prepare(insert);  
46 - BoundStatement boundStatement = prepared.bind(nodeId, clusterPartition, tsPartition, partitionsTtl);  
47 - ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);  
48 - return Futures.transform(resultSetFuture, (Function<ResultSet, Void>) input -> null);  
49 - }  
50 -  
51 - @Override  
52 - public Optional<Long> findLastProcessedPartition(UUID nodeId, long clusteredHash) {  
53 - String select = "SELECT ts_partition FROM processed_msg_partitions WHERE " +  
54 - "node_id = ? AND cluster_partition = ?";  
55 - PreparedStatement prepared = prepare(select);  
56 - BoundStatement boundStatement = prepared.bind(nodeId, clusteredHash);  
57 - Row row = executeRead(boundStatement).one();  
58 - if (row == null) {  
59 - return Optional.empty();  
60 - }  
61 -  
62 - return Optional.of(row.getLong("ts_partition"));  
63 - }  
64 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.repository;  
17 -  
18 -import com.google.common.util.concurrent.ListenableFuture;  
19 -import org.thingsboard.server.dao.queue.db.MsgAck;  
20 -  
21 -import java.util.List;  
22 -import java.util.UUID;  
23 -  
24 -public interface AckRepository {  
25 -  
26 - ListenableFuture<Void> ack(MsgAck msgAck);  
27 -  
28 - List<MsgAck> findAcks(UUID nodeId, long clusterPartition, long tsPartition);  
29 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.repository;  
17 -  
18 -import com.google.common.util.concurrent.ListenableFuture;  
19 -import org.thingsboard.server.common.msg.TbMsg;  
20 -  
21 -import java.util.List;  
22 -import java.util.UUID;  
23 -  
24 -public interface MsgRepository {  
25 -  
26 - ListenableFuture<Void> save(TbMsg msg, UUID nodeId, long clusterPartition, long tsPartition, long msgTs);  
27 -  
28 - List<TbMsg> findMsgs(UUID nodeId, long clusterPartition, long tsPartition);  
29 -  
30 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.repository;  
17 -  
18 -import com.google.common.util.concurrent.ListenableFuture;  
19 -  
20 -import java.util.Optional;  
21 -import java.util.UUID;  
22 -  
23 -public interface ProcessedPartitionRepository {  
24 -  
25 - ListenableFuture<Void> partitionProcessed(UUID nodeId, long clusteredHash, long partition);  
26 -  
27 - Optional<Long> findLastProcessedPartition(UUID nodeId, long clusteredHash);  
28 -  
29 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.sql;  
17 -  
18 -//@todo-vp: implement  
19 -public class SqlMsgQueue {  
20 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.nosql;  
17 -  
18 -  
19 -import org.junit.Before;  
20 -import org.junit.Test;  
21 -import org.junit.runner.RunWith;  
22 -import org.mockito.Mock;  
23 -import org.mockito.runners.MockitoJUnitRunner;  
24 -import org.thingsboard.server.dao.queue.db.repository.ProcessedPartitionRepository;  
25 -  
26 -import java.time.Clock;  
27 -import java.time.Instant;  
28 -import java.time.ZoneOffset;  
29 -import java.time.temporal.ChronoUnit;  
30 -import java.util.List;  
31 -import java.util.Optional;  
32 -import java.util.UUID;  
33 -  
34 -import static org.junit.Assert.assertEquals;  
35 -import static org.mockito.Mockito.when;  
36 -  
37 -@RunWith(MockitoJUnitRunner.class)  
38 -public class QueuePartitionerTest {  
39 -  
40 - private QueuePartitioner queuePartitioner;  
41 -  
42 - @Mock  
43 - private ProcessedPartitionRepository partitionRepo;  
44 -  
45 - private Instant startInstant;  
46 - private Instant endInstant;  
47 -  
48 - @Before  
49 - public void init() {  
50 - queuePartitioner = new QueuePartitioner("MINUTES", partitionRepo);  
51 - startInstant = Instant.now();  
52 - endInstant = startInstant.plus(2, ChronoUnit.MINUTES);  
53 - queuePartitioner.setClock(Clock.fixed(endInstant, ZoneOffset.UTC));  
54 - }  
55 -  
56 - @Test  
57 - public void partitionCalculated() {  
58 - long time = 1519390191425L;  
59 - long partition = queuePartitioner.getPartition(time);  
60 - assertEquals(1519390140000L, partition);  
61 - }  
62 -  
63 - @Test  
64 - public void unprocessedPartitionsReturned() {  
65 - UUID nodeId = UUID.randomUUID();  
66 - long clusteredHash = 101L;  
67 - when(partitionRepo.findLastProcessedPartition(nodeId, clusteredHash)).thenReturn(Optional.of(startInstant.toEpochMilli()));  
68 - List<Long> actual = queuePartitioner.findUnprocessedPartitions(nodeId, clusteredHash);  
69 - assertEquals(3, actual.size());  
70 - }  
71 -  
72 - @Test  
73 - public void defaultShiftUsedIfNoPartitionWasProcessed() {  
74 - UUID nodeId = UUID.randomUUID();  
75 - long clusteredHash = 101L;  
76 - when(partitionRepo.findLastProcessedPartition(nodeId, clusteredHash)).thenReturn(Optional.empty());  
77 - List<Long> actual = queuePartitioner.findUnprocessedPartitions(nodeId, clusteredHash);  
78 - assertEquals(10083, actual.size());  
79 - }  
80 -  
81 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.nosql;  
17 -  
18 -import com.google.common.collect.Lists;  
19 -import org.junit.Test;  
20 -import org.thingsboard.server.common.msg.TbMsg;  
21 -import org.thingsboard.server.dao.queue.db.MsgAck;  
22 -import org.thingsboard.server.dao.queue.db.UnprocessedMsgFilter;  
23 -  
24 -import java.util.Collection;  
25 -import java.util.List;  
26 -import java.util.UUID;  
27 -  
28 -import static org.junit.Assert.assertEquals;  
29 -  
30 -public class UnprocessedMsgFilterTest {  
31 -  
32 - private UnprocessedMsgFilter msgFilter = new UnprocessedMsgFilter();  
33 -  
34 - @Test  
35 - public void acknowledgedMsgsAreFilteredOut() {  
36 - UUID id1 = UUID.randomUUID();  
37 - UUID id2 = UUID.randomUUID();  
38 - TbMsg msg1 = new TbMsg(id1, "T", null, null, null, null, null, null, 0L);  
39 - TbMsg msg2 = new TbMsg(id2, "T", null, null, null, null, null, null, 0L);  
40 - List<TbMsg> msgs = Lists.newArrayList(msg1, msg2);  
41 - List<MsgAck> acks = Lists.newArrayList(new MsgAck(id2, UUID.randomUUID(), 1L, 1L));  
42 - Collection<TbMsg> actual = msgFilter.filter(msgs, acks);  
43 - assertEquals(1, actual.size());  
44 - assertEquals(msg1, actual.iterator().next());  
45 - }  
46 -  
47 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.nosql.repository;  
17 -  
18 -import com.datastax.driver.core.utils.UUIDs;  
19 -import com.google.common.collect.Lists;  
20 -import com.google.common.util.concurrent.ListenableFuture;  
21 -import org.junit.Test;  
22 -import org.springframework.beans.factory.annotation.Autowired;  
23 -import org.springframework.test.util.ReflectionTestUtils;  
24 -import org.thingsboard.server.dao.service.AbstractServiceTest;  
25 -import org.thingsboard.server.dao.service.DaoNoSqlTest;  
26 -import org.thingsboard.server.dao.queue.db.MsgAck;  
27 -  
28 -import java.util.List;  
29 -import java.util.UUID;  
30 -import java.util.concurrent.ExecutionException;  
31 -import java.util.concurrent.TimeUnit;  
32 -  
33 -import static org.junit.Assert.assertEquals;  
34 -import static org.junit.Assert.assertTrue;  
35 -  
36 -@DaoNoSqlTest  
37 -public class CassandraAckRepositoryTest extends AbstractServiceTest {  
38 -  
39 - @Autowired  
40 - private CassandraAckRepository ackRepository;  
41 -  
42 - @Test  
43 - public void acksInPartitionCouldBeFound() {  
44 - UUID nodeId = UUID.fromString("055eee50-1883-11e8-b380-65b5d5335ba9");  
45 -  
46 - List<MsgAck> extectedAcks = Lists.newArrayList(  
47 - new MsgAck(UUID.fromString("bebaeb60-1888-11e8-bf21-65b5d5335ba9"), nodeId, 101L, 300L),  
48 - new MsgAck(UUID.fromString("12baeb60-1888-11e8-bf21-65b5d5335ba9"), nodeId, 101L, 300L)  
49 - );  
50 -  
51 - List<MsgAck> actualAcks = ackRepository.findAcks(nodeId, 101L, 300L);  
52 - assertEquals(extectedAcks, actualAcks);  
53 - }  
54 -  
55 - @Test  
56 - public void ackCanBeSavedAndRead() throws ExecutionException, InterruptedException {  
57 - UUID msgId = UUIDs.timeBased();  
58 - UUID nodeId = UUIDs.timeBased();  
59 - MsgAck ack = new MsgAck(msgId, nodeId, 10L, 20L);  
60 - ListenableFuture<Void> future = ackRepository.ack(ack);  
61 - future.get();  
62 - List<MsgAck> actualAcks = ackRepository.findAcks(nodeId, 10L, 20L);  
63 - assertEquals(1, actualAcks.size());  
64 - assertEquals(ack, actualAcks.get(0));  
65 - }  
66 -  
67 - @Test  
68 - public void expiredAcksAreNotReturned() throws ExecutionException, InterruptedException {  
69 - ReflectionTestUtils.setField(ackRepository, "ackQueueTtl", 1);  
70 - UUID msgId = UUIDs.timeBased();  
71 - UUID nodeId = UUIDs.timeBased();  
72 - MsgAck ack = new MsgAck(msgId, nodeId, 30L, 40L);  
73 - ListenableFuture<Void> future = ackRepository.ack(ack);  
74 - future.get();  
75 - List<MsgAck> actualAcks = ackRepository.findAcks(nodeId, 30L, 40L);  
76 - assertEquals(1, actualAcks.size());  
77 - TimeUnit.SECONDS.sleep(2);  
78 - assertTrue(ackRepository.findAcks(nodeId, 30L, 40L).isEmpty());  
79 - }  
80 -  
81 -  
82 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.nosql.repository;  
17 -  
18 -//import static org.junit.jupiter.api.Assertions.*;  
19 -  
20 -import com.datastax.driver.core.utils.UUIDs;  
21 -import com.google.common.util.concurrent.ListenableFuture;  
22 -import org.junit.Test;  
23 -import org.springframework.beans.factory.annotation.Autowired;  
24 -import org.springframework.test.util.ReflectionTestUtils;  
25 -import org.thingsboard.server.common.data.id.DeviceId;  
26 -import org.thingsboard.server.common.data.id.RuleChainId;  
27 -import org.thingsboard.server.common.data.id.RuleNodeId;  
28 -import org.thingsboard.server.common.msg.TbMsg;  
29 -import org.thingsboard.server.common.msg.TbMsgDataType;  
30 -import org.thingsboard.server.common.msg.TbMsgMetaData;  
31 -import org.thingsboard.server.dao.service.AbstractServiceTest;  
32 -import org.thingsboard.server.dao.service.DaoNoSqlTest;  
33 -  
34 -import java.util.List;  
35 -import java.util.UUID;  
36 -import java.util.concurrent.ExecutionException;  
37 -import java.util.concurrent.TimeUnit;  
38 -  
39 -import static org.junit.Assert.assertEquals;  
40 -import static org.junit.Assert.assertTrue;  
41 -  
42 -@DaoNoSqlTest  
43 -public class CassandraMsgRepositoryTest extends AbstractServiceTest {  
44 -  
45 - @Autowired  
46 - private CassandraMsgRepository msgRepository;  
47 -  
48 - @Test  
49 - public void msgCanBeSavedAndRead() throws ExecutionException, InterruptedException {  
50 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, "0000",  
51 - new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L);  
52 - UUID nodeId = UUIDs.timeBased();  
53 - ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L);  
54 - future.get();  
55 - List<TbMsg> msgs = msgRepository.findMsgs(nodeId, 1L, 1L);  
56 - assertEquals(1, msgs.size());  
57 - }  
58 -  
59 - @Test  
60 - public void expiredMsgsAreNotReturned() throws ExecutionException, InterruptedException {  
61 - ReflectionTestUtils.setField(msgRepository, "msqQueueTtl", 1);  
62 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, "0000",  
63 - new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L);  
64 - UUID nodeId = UUIDs.timeBased();  
65 - ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 2L, 2L, 2L);  
66 - future.get();  
67 - TimeUnit.SECONDS.sleep(2);  
68 - assertTrue(msgRepository.findMsgs(nodeId, 2L, 2L).isEmpty());  
69 - }  
70 -  
71 - @Test  
72 - public void protoBufConverterWorkAsExpected() throws ExecutionException, InterruptedException {  
73 - TbMsgMetaData metaData = new TbMsgMetaData();  
74 - metaData.putValue("key", "value");  
75 - String dataStr = "someContent";  
76 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), metaData, TbMsgDataType.JSON, dataStr,  
77 - new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L);  
78 - UUID nodeId = UUIDs.timeBased();  
79 - ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L);  
80 - future.get();  
81 - List<TbMsg> msgs = msgRepository.findMsgs(nodeId, 1L, 1L);  
82 - assertEquals(1, msgs.size());  
83 - assertEquals(msg, msgs.get(0));  
84 - }  
85 -  
86 -  
87 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.nosql.repository;  
17 -  
18 -import com.datastax.driver.core.utils.UUIDs;  
19 -import com.google.common.util.concurrent.Futures;  
20 -import com.google.common.util.concurrent.ListenableFuture;  
21 -import org.junit.Test;  
22 -import org.springframework.beans.factory.annotation.Autowired;  
23 -import org.springframework.test.util.ReflectionTestUtils;  
24 -import org.thingsboard.server.dao.service.AbstractServiceTest;  
25 -import org.thingsboard.server.dao.service.DaoNoSqlTest;  
26 -  
27 -import java.util.List;  
28 -import java.util.Optional;  
29 -import java.util.UUID;  
30 -import java.util.concurrent.ExecutionException;  
31 -import java.util.concurrent.TimeUnit;  
32 -  
33 -import static org.junit.Assert.assertEquals;  
34 -import static org.junit.Assert.assertFalse;  
35 -import static org.junit.Assert.assertTrue;  
36 -  
37 -@DaoNoSqlTest  
38 -public class CassandraProcessedPartitionRepositoryTest extends AbstractServiceTest {  
39 -  
40 - @Autowired  
41 - private CassandraProcessedPartitionRepository partitionRepository;  
42 -  
43 - @Test  
44 - public void lastProcessedPartitionCouldBeFound() {  
45 - UUID nodeId = UUID.fromString("055eee50-1883-11e8-b380-65b5d5335ba9");  
46 - Optional<Long> lastProcessedPartition = partitionRepository.findLastProcessedPartition(nodeId, 101L);  
47 - assertTrue(lastProcessedPartition.isPresent());  
48 - assertEquals((Long) 777L, lastProcessedPartition.get());  
49 - }  
50 -  
51 - @Test  
52 - public void highestProcessedPartitionReturned() throws ExecutionException, InterruptedException {  
53 - UUID nodeId = UUIDs.timeBased();  
54 - ListenableFuture<Void> future1 = partitionRepository.partitionProcessed(nodeId, 303L, 100L);  
55 - ListenableFuture<Void> future2 = partitionRepository.partitionProcessed(nodeId, 303L, 200L);  
56 - ListenableFuture<Void> future3 = partitionRepository.partitionProcessed(nodeId, 303L, 10L);  
57 - ListenableFuture<List<Void>> allFutures = Futures.allAsList(future1, future2, future3);  
58 - allFutures.get();  
59 - Optional<Long> actual = partitionRepository.findLastProcessedPartition(nodeId, 303L);  
60 - assertTrue(actual.isPresent());  
61 - assertEquals((Long) 200L, actual.get());  
62 - }  
63 -  
64 - @Test  
65 - public void expiredPartitionsAreNotReturned() throws ExecutionException, InterruptedException {  
66 - ReflectionTestUtils.setField(partitionRepository, "partitionsTtl", 1);  
67 - UUID nodeId = UUIDs.timeBased();  
68 - ListenableFuture<Void> future = partitionRepository.partitionProcessed(nodeId, 404L, 10L);  
69 - future.get();  
70 - Optional<Long> actual = partitionRepository.findLastProcessedPartition(nodeId, 404L);  
71 - assertEquals((Long) 10L, actual.get());  
72 - TimeUnit.SECONDS.sleep(2);  
73 - assertFalse(partitionRepository.findLastProcessedPartition(nodeId, 404L).isPresent());  
74 - }  
75 -  
76 - @Test  
77 - public void ifNoPartitionsWereProcessedEmptyResultReturned() {  
78 - UUID nodeId = UUIDs.timeBased();  
79 - Optional<Long> actual = partitionRepository.findLastProcessedPartition(nodeId, 505L);  
80 - assertFalse(actual.isPresent());  
81 - }  
82 -  
83 -}