Commit 02f24ca672898bf292009a02792369b0e448b948

Authored by Andrew Shvayka
2 parents db4e1289 89ffda9a

Merged with main branch and fixed tests

Showing 37 changed files with 492 additions and 179 deletions
@@ -23,6 +23,7 @@ import org.springframework.beans.factory.annotation.Value; @@ -23,6 +23,7 @@ import org.springframework.beans.factory.annotation.Value;
23 import org.springframework.stereotype.Service; 23 import org.springframework.stereotype.Service;
24 import org.thingsboard.server.common.data.id.TenantId; 24 import org.thingsboard.server.common.data.id.TenantId;
25 import org.thingsboard.server.common.msg.TbMsg; 25 import org.thingsboard.server.common.msg.TbMsg;
  26 +import org.thingsboard.server.common.transport.quota.tenant.TenantQuotaService;
26 import org.thingsboard.server.dao.queue.MsgQueue; 27 import org.thingsboard.server.dao.queue.MsgQueue;
27 28
28 import javax.annotation.PostConstruct; 29 import javax.annotation.PostConstruct;
@@ -48,6 +49,9 @@ public class DefaultMsgQueueService implements MsgQueueService { @@ -48,6 +49,9 @@ public class DefaultMsgQueueService implements MsgQueueService {
48 @Autowired 49 @Autowired
49 private MsgQueue msgQueue; 50 private MsgQueue msgQueue;
50 51
  52 + @Autowired
  53 + private TenantQuotaService quotaService;
  54 +
51 private ScheduledExecutorService cleanupExecutor; 55 private ScheduledExecutorService cleanupExecutor;
52 56
53 private Map<TenantId, AtomicLong> pendingCountPerTenant = new ConcurrentHashMap<>(); 57 private Map<TenantId, AtomicLong> pendingCountPerTenant = new ConcurrentHashMap<>();
@@ -70,6 +74,11 @@ public class DefaultMsgQueueService implements MsgQueueService { @@ -70,6 +74,11 @@ public class DefaultMsgQueueService implements MsgQueueService {
70 74
71 @Override 75 @Override
72 public ListenableFuture<Void> put(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition) { 76 public ListenableFuture<Void> put(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition) {
  77 + if(quotaService.isQuotaExceeded(tenantId.getId().toString())) {
  78 + log.warn("Tenant TbMsg Quota exceeded for [{}:{}] . Reject", tenantId.getId());
  79 + return Futures.immediateFailedFuture(new RuntimeException("Tenant TbMsg Quota exceeded"));
  80 + }
  81 +
73 AtomicLong pendingMsgCount = pendingCountPerTenant.computeIfAbsent(tenantId, key -> new AtomicLong()); 82 AtomicLong pendingMsgCount = pendingCountPerTenant.computeIfAbsent(tenantId, key -> new AtomicLong());
74 if (pendingMsgCount.incrementAndGet() < queueMaxSize) { 83 if (pendingMsgCount.incrementAndGet() < queueMaxSize) {
75 return msgQueue.put(tenantId, msg, nodeId, clusterPartition); 84 return msgQueue.put(tenantId, msg, nodeId, clusterPartition);
@@ -131,9 +131,28 @@ quota: @@ -131,9 +131,28 @@ quota:
131 whitelist: "${QUOTA_HOST_WHITELIST:localhost,127.0.0.1}" 131 whitelist: "${QUOTA_HOST_WHITELIST:localhost,127.0.0.1}"
132 # Array of blacklist hosts 132 # Array of blacklist hosts
133 blacklist: "${QUOTA_HOST_BLACKLIST:}" 133 blacklist: "${QUOTA_HOST_BLACKLIST:}"
134 - log:  
135 - topSize: 10  
136 - intervalMin: 2 134 + log:
  135 + topSize: 10
  136 + intervalMin: 2
  137 + rule:
  138 + tenant:
  139 + # Max allowed number of API requests in interval for single tenant
  140 + limit: "${QUOTA_TENANT_LIMIT:100000}"
  141 + # Interval duration
  142 + intervalMs: "${QUOTA_TENANT_INTERVAL_MS:60000}"
  143 + # Maximum silence duration for tenant after which Tenant removed from QuotaService. Must be bigger than intervalMs
  144 + ttlMs: "${QUOTA_TENANT_TTL_MS:60000}"
  145 + # Interval for scheduled task that cleans expired records. TTL is used for expiring
  146 + cleanPeriodMs: "${QUOTA_TENANT_CLEAN_PERIOD_MS:300000}"
  147 + # Enable Host API Limits
  148 + enabled: "${QUOTA_TENANT_ENABLED:false}"
  149 + # Array of whitelist tenants
  150 + whitelist: "${QUOTA_TENANT_WHITELIST:}"
  151 + # Array of blacklist tenants
  152 + blacklist: "${QUOTA_HOST_BLACKLIST:}"
  153 + log:
  154 + topSize: 10
  155 + intervalMin: 2
137 156
138 database: 157 database:
139 type: "${DATABASE_TYPE:sql}" # cassandra OR sql 158 type: "${DATABASE_TYPE:sql}" # cassandra OR sql
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/AbstractQuotaService.java renamed from common/transport/src/main/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaService.java
@@ -15,33 +15,24 @@ @@ -15,33 +15,24 @@
15 */ 15 */
16 package org.thingsboard.server.common.transport.quota; 16 package org.thingsboard.server.common.transport.quota;
17 17
18 -import lombok.extern.slf4j.Slf4j;  
19 -import org.springframework.beans.factory.annotation.Value;  
20 -import org.springframework.stereotype.Service;  
21 -import org.thingsboard.server.common.transport.quota.inmemory.HostRequestIntervalRegistry;  
22 import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryCleaner; 18 import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryCleaner;
23 import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryLogger; 19 import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryLogger;
  20 +import org.thingsboard.server.common.transport.quota.inmemory.KeyBasedIntervalRegistry;
24 21
25 import javax.annotation.PostConstruct; 22 import javax.annotation.PostConstruct;
26 import javax.annotation.PreDestroy; 23 import javax.annotation.PreDestroy;
27 24
28 -/**  
29 - * @author Vitaliy Paromskiy  
30 - * @version 1.0  
31 - */  
32 -@Service  
33 -@Slf4j  
34 -public class HostRequestsQuotaService implements QuotaService { 25 +public class AbstractQuotaService implements QuotaService {
35 26
36 - private final HostRequestIntervalRegistry requestRegistry;  
37 - private final HostRequestLimitPolicy requestsPolicy; 27 + private final KeyBasedIntervalRegistry requestRegistry;
  28 + private final RequestLimitPolicy requestsPolicy;
38 private final IntervalRegistryCleaner registryCleaner; 29 private final IntervalRegistryCleaner registryCleaner;
39 private final IntervalRegistryLogger registryLogger; 30 private final IntervalRegistryLogger registryLogger;
40 private final boolean enabled; 31 private final boolean enabled;
41 32
42 - public HostRequestsQuotaService(HostRequestIntervalRegistry requestRegistry, HostRequestLimitPolicy requestsPolicy, 33 + public AbstractQuotaService(KeyBasedIntervalRegistry requestRegistry, RequestLimitPolicy requestsPolicy,
43 IntervalRegistryCleaner registryCleaner, IntervalRegistryLogger registryLogger, 34 IntervalRegistryCleaner registryCleaner, IntervalRegistryLogger registryLogger,
44 - @Value("${quota.host.enabled}") boolean enabled) { 35 + boolean enabled) {
45 this.requestRegistry = requestRegistry; 36 this.requestRegistry = requestRegistry;
46 this.requestsPolicy = requestsPolicy; 37 this.requestsPolicy = requestsPolicy;
47 this.registryCleaner = registryCleaner; 38 this.registryCleaner = registryCleaner;
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.transport.quota;
  17 +
  18 +
  19 +public abstract class RequestLimitPolicy {
  20 +
  21 + private final long limit;
  22 +
  23 + public RequestLimitPolicy(long limit) {
  24 + this.limit = limit;
  25 + }
  26 +
  27 + public boolean isValid(long currentValue) {
  28 + return currentValue <= limit;
  29 + }
  30 +}
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.transport.quota.host;
  17 +
  18 +import org.springframework.beans.factory.annotation.Value;
  19 +import org.springframework.stereotype.Component;
  20 +import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryCleaner;
  21 +
  22 +@Component
  23 +public class HostIntervalRegistryCleaner extends IntervalRegistryCleaner {
  24 +
  25 + public HostIntervalRegistryCleaner(HostRequestIntervalRegistry intervalRegistry,
  26 + @Value("${quota.host.cleanPeriodMs}") long cleanPeriodMs) {
  27 + super(intervalRegistry, cleanPeriodMs);
  28 + }
  29 +}
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.transport.quota.host;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.springframework.beans.factory.annotation.Value;
  20 +import org.springframework.stereotype.Component;
  21 +import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryLogger;
  22 +
  23 +import java.util.Map;
  24 +import java.util.concurrent.TimeUnit;
  25 +
  26 +@Component
  27 +@Slf4j
  28 +public class HostIntervalRegistryLogger extends IntervalRegistryLogger {
  29 +
  30 + private final long logIntervalMin;
  31 +
  32 + public HostIntervalRegistryLogger(@Value("${quota.host.log.topSize}") int topSize,
  33 + @Value("${quota.host.log.intervalMin}") long logIntervalMin,
  34 + HostRequestIntervalRegistry intervalRegistry) {
  35 + super(topSize, logIntervalMin, intervalRegistry);
  36 + this.logIntervalMin = logIntervalMin;
  37 + }
  38 +
  39 + protected void log(Map<String, Long> top, int uniqHosts, long requestsCount) {
  40 + long rps = requestsCount / TimeUnit.MINUTES.toSeconds(logIntervalMin);
  41 + StringBuilder builder = new StringBuilder("Quota Statistic : ");
  42 + builder.append("uniqHosts : ").append(uniqHosts).append("; ");
  43 + builder.append("requestsCount : ").append(requestsCount).append("; ");
  44 + builder.append("RPS : ").append(rps).append(" ");
  45 + builder.append("top -> ");
  46 + for (Map.Entry<String, Long> host : top.entrySet()) {
  47 + builder.append(host.getKey()).append(" : ").append(host.getValue()).append("; ");
  48 + }
  49 +
  50 + log.info(builder.toString());
  51 + }
  52 +}
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.transport.quota.host;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.springframework.beans.factory.annotation.Value;
  20 +import org.springframework.stereotype.Component;
  21 +import org.thingsboard.server.common.transport.quota.inmemory.KeyBasedIntervalRegistry;
  22 +
  23 +/**
  24 + * @author Vitaliy Paromskiy
  25 + * @version 1.0
  26 + */
  27 +@Component
  28 +@Slf4j
  29 +public class HostRequestIntervalRegistry extends KeyBasedIntervalRegistry {
  30 +
  31 + public HostRequestIntervalRegistry(@Value("${quota.host.intervalMs}") long intervalDurationMs,
  32 + @Value("${quota.host.ttlMs}") long ttlMs,
  33 + @Value("${quota.host.whitelist}") String whiteList,
  34 + @Value("${quota.host.blacklist}") String blackList) {
  35 + super(intervalDurationMs, ttlMs, whiteList, blackList, "host");
  36 + }
  37 +}
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/host/HostRequestLimitPolicy.java renamed from common/transport/src/main/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicy.java
@@ -13,26 +13,21 @@ @@ -13,26 +13,21 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.common.transport.quota; 16 +package org.thingsboard.server.common.transport.quota.host;
17 17
18 import org.springframework.beans.factory.annotation.Value; 18 import org.springframework.beans.factory.annotation.Value;
19 import org.springframework.stereotype.Component; 19 import org.springframework.stereotype.Component;
  20 +import org.thingsboard.server.common.transport.quota.RequestLimitPolicy;
20 21
21 /** 22 /**
22 * @author Vitaliy Paromskiy 23 * @author Vitaliy Paromskiy
23 * @version 1.0 24 * @version 1.0
24 */ 25 */
25 @Component 26 @Component
26 -public class HostRequestLimitPolicy {  
27 -  
28 - private final long limit; 27 +public class HostRequestLimitPolicy extends RequestLimitPolicy {
29 28
30 public HostRequestLimitPolicy(@Value("${quota.host.limit}") long limit) { 29 public HostRequestLimitPolicy(@Value("${quota.host.limit}") long limit) {
31 - this.limit = limit;  
32 - }  
33 -  
34 - public boolean isValid(long currentValue) {  
35 - return currentValue <= limit; 30 + super(limit);
36 } 31 }
37 32
38 } 33 }
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.transport.quota.host;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.springframework.beans.factory.annotation.Value;
  20 +import org.springframework.stereotype.Service;
  21 +import org.thingsboard.server.common.transport.quota.AbstractQuotaService;
  22 +
  23 +/**
  24 + * @author Vitaliy Paromskiy
  25 + * @version 1.0
  26 + */
  27 +@Service
  28 +@Slf4j
  29 +public class HostRequestsQuotaService extends AbstractQuotaService {
  30 +
  31 + public HostRequestsQuotaService(HostRequestIntervalRegistry requestRegistry, HostRequestLimitPolicy requestsPolicy,
  32 + HostIntervalRegistryCleaner registryCleaner, HostIntervalRegistryLogger registryLogger,
  33 + @Value("${quota.host.enabled}") boolean enabled) {
  34 + super(requestRegistry, requestsPolicy, registryCleaner, registryLogger, enabled);
  35 + }
  36 +
  37 +}
@@ -16,10 +16,7 @@ @@ -16,10 +16,7 @@
16 package org.thingsboard.server.common.transport.quota.inmemory; 16 package org.thingsboard.server.common.transport.quota.inmemory;
17 17
18 import lombok.extern.slf4j.Slf4j; 18 import lombok.extern.slf4j.Slf4j;
19 -import org.springframework.beans.factory.annotation.Value;  
20 -import org.springframework.stereotype.Component;  
21 19
22 -import javax.annotation.PreDestroy;  
23 import java.util.concurrent.Executors; 20 import java.util.concurrent.Executors;
24 import java.util.concurrent.ScheduledExecutorService; 21 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.TimeUnit; 22 import java.util.concurrent.TimeUnit;
@@ -28,15 +25,14 @@ import java.util.concurrent.TimeUnit; @@ -28,15 +25,14 @@ import java.util.concurrent.TimeUnit;
28 * @author Vitaliy Paromskiy 25 * @author Vitaliy Paromskiy
29 * @version 1.0 26 * @version 1.0
30 */ 27 */
31 -@Component  
32 @Slf4j 28 @Slf4j
33 -public class IntervalRegistryCleaner { 29 +public abstract class IntervalRegistryCleaner {
34 30
35 - private final HostRequestIntervalRegistry intervalRegistry; 31 + private final KeyBasedIntervalRegistry intervalRegistry;
36 private final long cleanPeriodMs; 32 private final long cleanPeriodMs;
37 private ScheduledExecutorService executor; 33 private ScheduledExecutorService executor;
38 34
39 - public IntervalRegistryCleaner(HostRequestIntervalRegistry intervalRegistry, @Value("${quota.host.cleanPeriodMs}") long cleanPeriodMs) { 35 + public IntervalRegistryCleaner(KeyBasedIntervalRegistry intervalRegistry, long cleanPeriodMs) {
40 this.intervalRegistry = intervalRegistry; 36 this.intervalRegistry = intervalRegistry;
41 this.cleanPeriodMs = cleanPeriodMs; 37 this.cleanPeriodMs = cleanPeriodMs;
42 } 38 }
@@ -17,8 +17,6 @@ package org.thingsboard.server.common.transport.quota.inmemory; @@ -17,8 +17,6 @@ package org.thingsboard.server.common.transport.quota.inmemory;
17 17
18 import com.google.common.collect.MinMaxPriorityQueue; 18 import com.google.common.collect.MinMaxPriorityQueue;
19 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
20 -import org.springframework.beans.factory.annotation.Value;  
21 -import org.springframework.stereotype.Component;  
22 20
23 import java.util.Comparator; 21 import java.util.Comparator;
24 import java.util.Map; 22 import java.util.Map;
@@ -32,17 +30,15 @@ import java.util.stream.Collectors; @@ -32,17 +30,15 @@ import java.util.stream.Collectors;
32 * @author Vitaliy Paromskiy 30 * @author Vitaliy Paromskiy
33 * @version 1.0 31 * @version 1.0
34 */ 32 */
35 -@Component  
36 @Slf4j 33 @Slf4j
37 -public class IntervalRegistryLogger { 34 +public abstract class IntervalRegistryLogger {
38 35
39 private final int topSize; 36 private final int topSize;
40 - private final HostRequestIntervalRegistry intervalRegistry; 37 + private final KeyBasedIntervalRegistry intervalRegistry;
41 private final long logIntervalMin; 38 private final long logIntervalMin;
42 private ScheduledExecutorService executor; 39 private ScheduledExecutorService executor;
43 40
44 - public IntervalRegistryLogger(@Value("${quota.log.topSize}") int topSize, @Value("${quota.log.intervalMin}") long logIntervalMin,  
45 - HostRequestIntervalRegistry intervalRegistry) { 41 + public IntervalRegistryLogger(int topSize, long logIntervalMin, KeyBasedIntervalRegistry intervalRegistry) {
46 this.topSize = topSize; 42 this.topSize = topSize;
47 this.logIntervalMin = logIntervalMin; 43 this.logIntervalMin = logIntervalMin;
48 this.intervalRegistry = intervalRegistry; 44 this.intervalRegistry = intervalRegistry;
@@ -79,17 +75,5 @@ public class IntervalRegistryLogger { @@ -79,17 +75,5 @@ public class IntervalRegistryLogger {
79 return topQueue.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); 75 return topQueue.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
80 } 76 }
81 77
82 - private void log(Map<String, Long> top, int uniqHosts, long requestsCount) {  
83 - long rps = requestsCount / TimeUnit.MINUTES.toSeconds(logIntervalMin);  
84 - StringBuilder builder = new StringBuilder("Quota Statistic : ");  
85 - builder.append("uniqHosts : ").append(uniqHosts).append("; ");  
86 - builder.append("requestsCount : ").append(requestsCount).append("; ");  
87 - builder.append("RPS : ").append(rps).append(" ");  
88 - builder.append("top -> ");  
89 - for (Map.Entry<String, Long> host : top.entrySet()) {  
90 - builder.append(host.getKey()).append(" : ").append(host.getValue()).append("; ");  
91 - }  
92 -  
93 - log.info(builder.toString());  
94 - } 78 + protected abstract void log(Map<String, Long> top, int uniqHosts, long requestsCount);
95 } 79 }
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/KeyBasedIntervalRegistry.java renamed from common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java
@@ -18,22 +18,14 @@ package org.thingsboard.server.common.transport.quota.inmemory; @@ -18,22 +18,14 @@ package org.thingsboard.server.common.transport.quota.inmemory;
18 import com.google.common.collect.Sets; 18 import com.google.common.collect.Sets;
19 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
20 import org.apache.commons.lang3.StringUtils; 20 import org.apache.commons.lang3.StringUtils;
21 -import org.springframework.beans.factory.annotation.Value;  
22 -import org.springframework.stereotype.Component;  
23 21
24 -import javax.annotation.PostConstruct;  
25 import java.util.Map; 22 import java.util.Map;
26 import java.util.Set; 23 import java.util.Set;
27 import java.util.concurrent.ConcurrentHashMap; 24 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.stream.Collectors; 25 import java.util.stream.Collectors;
29 26
30 -/**  
31 - * @author Vitaliy Paromskiy  
32 - * @version 1.0  
33 - */  
34 -@Component  
35 @Slf4j 27 @Slf4j
36 -public class HostRequestIntervalRegistry { 28 +public abstract class KeyBasedIntervalRegistry {
37 29
38 private final Map<String, IntervalCount> hostCounts = new ConcurrentHashMap<>(); 30 private final Map<String, IntervalCount> hostCounts = new ConcurrentHashMap<>();
39 private final long intervalDurationMs; 31 private final long intervalDurationMs;
@@ -41,23 +33,20 @@ public class HostRequestIntervalRegistry { @@ -41,23 +33,20 @@ public class HostRequestIntervalRegistry {
41 private final Set<String> whiteList; 33 private final Set<String> whiteList;
42 private final Set<String> blackList; 34 private final Set<String> blackList;
43 35
44 - public HostRequestIntervalRegistry(@Value("${quota.host.intervalMs}") long intervalDurationMs,  
45 - @Value("${quota.host.ttlMs}") long ttlMs,  
46 - @Value("${quota.host.whitelist}") String whiteList,  
47 - @Value("${quota.host.blacklist}") String blackList) { 36 + public KeyBasedIntervalRegistry(long intervalDurationMs, long ttlMs, String whiteList, String blackList, String name) {
48 this.intervalDurationMs = intervalDurationMs; 37 this.intervalDurationMs = intervalDurationMs;
49 this.ttlMs = ttlMs; 38 this.ttlMs = ttlMs;
50 this.whiteList = Sets.newHashSet(StringUtils.split(whiteList, ',')); 39 this.whiteList = Sets.newHashSet(StringUtils.split(whiteList, ','));
51 this.blackList = Sets.newHashSet(StringUtils.split(blackList, ',')); 40 this.blackList = Sets.newHashSet(StringUtils.split(blackList, ','));
  41 + validate(name);
52 } 42 }
53 43
54 - @PostConstruct  
55 - public void init() { 44 + private void validate(String name) {
56 if (ttlMs < intervalDurationMs) { 45 if (ttlMs < intervalDurationMs) {
57 - log.warn("TTL for IntervalRegistry [{}] smaller than interval duration [{}]", ttlMs, intervalDurationMs); 46 + log.warn("TTL for {} IntervalRegistry [{}] smaller than interval duration [{}]", name, ttlMs, intervalDurationMs);
58 } 47 }
59 - log.info("Start Host Quota Service with whitelist {}", whiteList);  
60 - log.info("Start Host Quota Service with blacklist {}", blackList); 48 + log.info("Start {} KeyBasedIntervalRegistry with whitelist {}", name, whiteList);
  49 + log.info("Start {} KeyBasedIntervalRegistry with blacklist {}", name, blackList);
61 } 50 }
62 51
63 public long tick(String clientHostId) { 52 public long tick(String clientHostId) {
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.transport.quota.tenant;
  17 +
  18 +import org.springframework.beans.factory.annotation.Value;
  19 +import org.springframework.stereotype.Component;
  20 +import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryCleaner;
  21 +
  22 +@Component
  23 +public class TenantIntervalRegistryCleaner extends IntervalRegistryCleaner {
  24 +
  25 + public TenantIntervalRegistryCleaner(TenantMsgsIntervalRegistry intervalRegistry,
  26 + @Value("${quota.rule.tenant.cleanPeriodMs}") long cleanPeriodMs) {
  27 + super(intervalRegistry, cleanPeriodMs);
  28 + }
  29 +}
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.transport.quota.tenant;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.springframework.beans.factory.annotation.Value;
  20 +import org.springframework.stereotype.Component;
  21 +import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryLogger;
  22 +
  23 +import java.util.Map;
  24 +import java.util.concurrent.TimeUnit;
  25 +
  26 +@Slf4j
  27 +@Component
  28 +public class TenantIntervalRegistryLogger extends IntervalRegistryLogger {
  29 +
  30 + private final long logIntervalMin;
  31 +
  32 + public TenantIntervalRegistryLogger(@Value("${quota.rule.tenant.log.topSize}") int topSize,
  33 + @Value("${quota.rule.tenant.log.intervalMin}") long logIntervalMin,
  34 + TenantMsgsIntervalRegistry intervalRegistry) {
  35 + super(topSize, logIntervalMin, intervalRegistry);
  36 + this.logIntervalMin = logIntervalMin;
  37 + }
  38 +
  39 + protected void log(Map<String, Long> top, int uniqHosts, long requestsCount) {
  40 + long rps = requestsCount / TimeUnit.MINUTES.toSeconds(logIntervalMin);
  41 + StringBuilder builder = new StringBuilder("Tenant Quota Statistic : ");
  42 + builder.append("uniqTenants : ").append(uniqHosts).append("; ");
  43 + builder.append("requestsCount : ").append(requestsCount).append("; ");
  44 + builder.append("RPS : ").append(rps).append(" ");
  45 + builder.append("top -> ");
  46 + for (Map.Entry<String, Long> host : top.entrySet()) {
  47 + builder.append(host.getKey()).append(" : ").append(host.getValue()).append("; ");
  48 + }
  49 +
  50 + log.info(builder.toString());
  51 + }
  52 +}
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.transport.quota.tenant;
  17 +
  18 +import org.springframework.beans.factory.annotation.Value;
  19 +import org.springframework.stereotype.Component;
  20 +import org.thingsboard.server.common.transport.quota.inmemory.KeyBasedIntervalRegistry;
  21 +
  22 +@Component
  23 +public class TenantMsgsIntervalRegistry extends KeyBasedIntervalRegistry {
  24 +
  25 + public TenantMsgsIntervalRegistry(@Value("${quota.rule.tenant.intervalMs}") long intervalDurationMs,
  26 + @Value("${quota.rule.tenant.ttlMs}") long ttlMs,
  27 + @Value("${quota.rule.tenant.whitelist}") String whiteList,
  28 + @Value("${quota.rule.tenant.blacklist}") String blackList) {
  29 + super(intervalDurationMs, ttlMs, whiteList, blackList, "Rule Tenant");
  30 + }
  31 +}
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.transport.quota.tenant;
  17 +
  18 +import org.springframework.beans.factory.annotation.Value;
  19 +import org.springframework.stereotype.Component;
  20 +import org.thingsboard.server.common.transport.quota.AbstractQuotaService;
  21 +
  22 +@Component
  23 +public class TenantQuotaService extends AbstractQuotaService {
  24 +
  25 + public TenantQuotaService(TenantMsgsIntervalRegistry requestRegistry, TenantRequestLimitPolicy requestsPolicy,
  26 + TenantIntervalRegistryCleaner registryCleaner, TenantIntervalRegistryLogger registryLogger,
  27 + @Value("${quota.rule.tenant.enabled}") boolean enabled) {
  28 + super(requestRegistry, requestsPolicy, registryCleaner, registryLogger, enabled);
  29 + }
  30 +}
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.transport.quota.tenant;
  17 +
  18 +import org.springframework.beans.factory.annotation.Value;
  19 +import org.springframework.stereotype.Component;
  20 +import org.thingsboard.server.common.transport.quota.RequestLimitPolicy;
  21 +
  22 +@Component
  23 +public class TenantRequestLimitPolicy extends RequestLimitPolicy {
  24 +
  25 + public TenantRequestLimitPolicy(@Value("${quota.rule.tenant.limit}") long limit) {
  26 + super(limit);
  27 + }
  28 +}
@@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
16 package org.thingsboard.server.common.transport.quota; 16 package org.thingsboard.server.common.transport.quota;
17 17
18 import org.junit.Test; 18 import org.junit.Test;
  19 +import org.thingsboard.server.common.transport.quota.host.HostRequestLimitPolicy;
19 20
20 import static org.junit.Assert.assertFalse; 21 import static org.junit.Assert.assertFalse;
21 import static org.junit.Assert.assertTrue; 22 import static org.junit.Assert.assertTrue;
@@ -17,9 +17,7 @@ package org.thingsboard.server.common.transport.quota; @@ -17,9 +17,7 @@ package org.thingsboard.server.common.transport.quota;
17 17
18 import org.junit.Before; 18 import org.junit.Before;
19 import org.junit.Test; 19 import org.junit.Test;
20 -import org.thingsboard.server.common.transport.quota.inmemory.HostRequestIntervalRegistry;  
21 -import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryCleaner;  
22 -import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryLogger; 20 +import org.thingsboard.server.common.transport.quota.host.*;
23 21
24 import static org.junit.Assert.assertFalse; 22 import static org.junit.Assert.assertFalse;
25 import static org.junit.Assert.assertTrue; 23 import static org.junit.Assert.assertTrue;
@@ -35,8 +33,8 @@ public class HostRequestsQuotaServiceTest { @@ -35,8 +33,8 @@ public class HostRequestsQuotaServiceTest {
35 33
36 private HostRequestIntervalRegistry requestRegistry = mock(HostRequestIntervalRegistry.class); 34 private HostRequestIntervalRegistry requestRegistry = mock(HostRequestIntervalRegistry.class);
37 private HostRequestLimitPolicy requestsPolicy = mock(HostRequestLimitPolicy.class); 35 private HostRequestLimitPolicy requestsPolicy = mock(HostRequestLimitPolicy.class);
38 - private IntervalRegistryCleaner registryCleaner = mock(IntervalRegistryCleaner.class);  
39 - private IntervalRegistryLogger registryLogger = mock(IntervalRegistryLogger.class); 36 + private HostIntervalRegistryCleaner registryCleaner = mock(HostIntervalRegistryCleaner.class);
  37 + private HostIntervalRegistryLogger registryLogger = mock(HostIntervalRegistryLogger.class);
40 38
41 @Before 39 @Before
42 public void init() { 40 public void init() {
@@ -15,11 +15,9 @@ @@ -15,11 +15,9 @@
15 */ 15 */
16 package org.thingsboard.server.common.transport.quota.inmemory; 16 package org.thingsboard.server.common.transport.quota.inmemory;
17 17
18 -import com.google.common.collect.Sets;  
19 import org.junit.Before; 18 import org.junit.Before;
20 import org.junit.Test; 19 import org.junit.Test;
21 -  
22 -import java.util.Collections; 20 +import org.thingsboard.server.common.transport.quota.host.HostRequestIntervalRegistry;
23 21
24 import static org.junit.Assert.assertEquals; 22 import static org.junit.Assert.assertEquals;
25 23
@@ -18,6 +18,8 @@ package org.thingsboard.server.common.transport.quota.inmemory; @@ -18,6 +18,8 @@ package org.thingsboard.server.common.transport.quota.inmemory;
18 import com.google.common.collect.ImmutableMap; 18 import com.google.common.collect.ImmutableMap;
19 import org.junit.Before; 19 import org.junit.Before;
20 import org.junit.Test; 20 import org.junit.Test;
  21 +import org.thingsboard.server.common.transport.quota.host.HostIntervalRegistryLogger;
  22 +import org.thingsboard.server.common.transport.quota.host.HostRequestIntervalRegistry;
21 23
22 import java.util.Collections; 24 import java.util.Collections;
23 import java.util.Map; 25 import java.util.Map;
@@ -37,7 +39,7 @@ public class IntervalRegistryLoggerTest { @@ -37,7 +39,7 @@ public class IntervalRegistryLoggerTest {
37 39
38 @Before 40 @Before
39 public void init() { 41 public void init() {
40 - logger = new IntervalRegistryLogger(3, 10, requestRegistry); 42 + logger = new HostIntervalRegistryLogger(3, 10, requestRegistry);
41 } 43 }
42 44
43 @Test 45 @Test
dao/src/main/java/org/thingsboard/server/dao/queue/db/MsgAck.java renamed from dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/MsgAck.java
@@ -13,7 +13,7 @@ @@ -13,7 +13,7 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.dao.queue.db.nosql; 16 +package org.thingsboard.server.dao.queue.db;
17 17
18 import lombok.Data; 18 import lombok.Data;
19 import lombok.EqualsAndHashCode; 19 import lombok.EqualsAndHashCode;
dao/src/main/java/org/thingsboard/server/dao/queue/db/UnprocessedMsgFilter.java renamed from dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/UnprocessedMsgFilter.java
@@ -13,10 +13,11 @@ @@ -13,10 +13,11 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.dao.queue.db.nosql; 16 +package org.thingsboard.server.dao.queue.db;
17 17
18 import org.springframework.stereotype.Component; 18 import org.springframework.stereotype.Component;
19 import org.thingsboard.server.common.msg.TbMsg; 19 import org.thingsboard.server.common.msg.TbMsg;
  20 +import org.thingsboard.server.dao.queue.db.MsgAck;
20 21
21 import java.util.Collection; 22 import java.util.Collection;
22 import java.util.List; 23 import java.util.List;
@@ -26,6 +26,8 @@ import org.springframework.stereotype.Component; @@ -26,6 +26,8 @@ import org.springframework.stereotype.Component;
26 import org.thingsboard.server.common.data.id.TenantId; 26 import org.thingsboard.server.common.data.id.TenantId;
27 import org.thingsboard.server.common.msg.TbMsg; 27 import org.thingsboard.server.common.msg.TbMsg;
28 import org.thingsboard.server.dao.queue.MsgQueue; 28 import org.thingsboard.server.dao.queue.MsgQueue;
  29 +import org.thingsboard.server.dao.queue.db.MsgAck;
  30 +import org.thingsboard.server.dao.queue.db.UnprocessedMsgFilter;
29 import org.thingsboard.server.dao.queue.db.repository.AckRepository; 31 import org.thingsboard.server.dao.queue.db.repository.AckRepository;
30 import org.thingsboard.server.dao.queue.db.repository.MsgRepository; 32 import org.thingsboard.server.dao.queue.db.repository.MsgRepository;
31 import org.thingsboard.server.dao.util.NoSqlDao; 33 import org.thingsboard.server.dao.util.NoSqlDao;
@@ -26,7 +26,7 @@ import com.google.common.util.concurrent.ListenableFuture; @@ -26,7 +26,7 @@ import com.google.common.util.concurrent.ListenableFuture;
26 import org.springframework.beans.factory.annotation.Value; 26 import org.springframework.beans.factory.annotation.Value;
27 import org.springframework.stereotype.Component; 27 import org.springframework.stereotype.Component;
28 import org.thingsboard.server.dao.nosql.CassandraAbstractDao; 28 import org.thingsboard.server.dao.nosql.CassandraAbstractDao;
29 -import org.thingsboard.server.dao.queue.db.nosql.MsgAck; 29 +import org.thingsboard.server.dao.queue.db.MsgAck;
30 import org.thingsboard.server.dao.queue.db.repository.AckRepository; 30 import org.thingsboard.server.dao.queue.db.repository.AckRepository;
31 import org.thingsboard.server.dao.util.NoSqlDao; 31 import org.thingsboard.server.dao.util.NoSqlDao;
32 32
@@ -16,7 +16,7 @@ @@ -16,7 +16,7 @@
16 package org.thingsboard.server.dao.queue.db.repository; 16 package org.thingsboard.server.dao.queue.db.repository;
17 17
18 import com.google.common.util.concurrent.ListenableFuture; 18 import com.google.common.util.concurrent.ListenableFuture;
19 -import org.thingsboard.server.dao.queue.db.nosql.MsgAck; 19 +import org.thingsboard.server.dao.queue.db.MsgAck;
20 20
21 import java.util.List; 21 import java.util.List;
22 import java.util.UUID; 22 import java.util.UUID;
@@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
16 package org.thingsboard.server.dao.relation; 16 package org.thingsboard.server.dao.relation;
17 17
18 import com.google.common.base.Function; 18 import com.google.common.base.Function;
  19 +import com.google.common.util.concurrent.AsyncFunction;
19 import com.google.common.util.concurrent.Futures; 20 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture; 21 import com.google.common.util.concurrent.ListenableFuture;
21 import lombok.extern.slf4j.Slf4j; 22 import lombok.extern.slf4j.Slf4j;
@@ -176,97 +177,65 @@ public class BaseRelationService implements RelationService { @@ -176,97 +177,65 @@ public class BaseRelationService implements RelationService {
176 } 177 }
177 178
178 @Override 179 @Override
179 - public boolean deleteEntityRelations(EntityId entity) {  
180 - Cache cache = cacheManager.getCache(RELATIONS_CACHE);  
181 - log.trace("Executing deleteEntityRelations [{}]", entity);  
182 - validate(entity);  
183 - List<ListenableFuture<List<EntityRelation>>> inboundRelationsList = new ArrayList<>();  
184 - for (RelationTypeGroup typeGroup : RelationTypeGroup.values()) {  
185 - inboundRelationsList.add(relationDao.findAllByTo(entity, typeGroup));  
186 - }  
187 - ListenableFuture<List<List<EntityRelation>>> inboundRelations = Futures.allAsList(inboundRelationsList);  
188 - ListenableFuture<List<Boolean>> inboundDeletions = Futures.transform(inboundRelations, relations ->  
189 - getBooleans(relations, cache, true));  
190 -  
191 - ListenableFuture<Boolean> inboundFuture = Futures.transform(inboundDeletions, getListToBooleanFunction());  
192 - boolean inboundDeleteResult = false; 180 + public void deleteEntityRelations(EntityId entityId) {
193 try { 181 try {
194 - inboundDeleteResult = inboundFuture.get(); 182 + deleteEntityRelationsAsync(entityId).get();
195 } catch (InterruptedException | ExecutionException e) { 183 } catch (InterruptedException | ExecutionException e) {
196 - log.error("Error deleting entity inbound relations", e);  
197 - }  
198 -  
199 - List<ListenableFuture<List<EntityRelation>>> outboundRelationsList = new ArrayList<>();  
200 - for (RelationTypeGroup typeGroup : RelationTypeGroup.values()) {  
201 - outboundRelationsList.add(relationDao.findAllByFrom(entity, typeGroup));  
202 - }  
203 - ListenableFuture<List<List<EntityRelation>>> outboundRelations = Futures.allAsList(outboundRelationsList);  
204 - Futures.transform(outboundRelations, relations -> getBooleans(relations, cache, false));  
205 -  
206 - boolean outboundDeleteResult = relationDao.deleteOutboundRelations(entity);  
207 - return inboundDeleteResult && outboundDeleteResult;  
208 - }  
209 -  
210 - private List<Boolean> getBooleans(List<List<EntityRelation>> relations, Cache cache, boolean isRemove) {  
211 - List<Boolean> results = new ArrayList<>();  
212 - for (List<EntityRelation> relationList : relations) {  
213 - relationList.forEach(relation -> checkFromDeleteSync(cache, results, relation, isRemove)); 184 + throw new RuntimeException(e);
214 } 185 }
215 - return results;  
216 - }  
217 -  
218 - private void checkFromDeleteSync(Cache cache, List<Boolean> results, EntityRelation relation, boolean isRemove) {  
219 - if (isRemove) {  
220 - results.add(relationDao.deleteRelation(relation));  
221 - }  
222 - cacheEviction(relation, cache);  
223 } 186 }
224 187
225 @Override 188 @Override
226 - public ListenableFuture<Boolean> deleteEntityRelationsAsync(EntityId entity) { 189 + public ListenableFuture<Void> deleteEntityRelationsAsync(EntityId entityId) {
227 Cache cache = cacheManager.getCache(RELATIONS_CACHE); 190 Cache cache = cacheManager.getCache(RELATIONS_CACHE);
228 - log.trace("Executing deleteEntityRelationsAsync [{}]", entity);  
229 - validate(entity); 191 + log.trace("Executing deleteEntityRelationsAsync [{}]", entityId);
  192 + validate(entityId);
230 List<ListenableFuture<List<EntityRelation>>> inboundRelationsList = new ArrayList<>(); 193 List<ListenableFuture<List<EntityRelation>>> inboundRelationsList = new ArrayList<>();
231 for (RelationTypeGroup typeGroup : RelationTypeGroup.values()) { 194 for (RelationTypeGroup typeGroup : RelationTypeGroup.values()) {
232 - inboundRelationsList.add(relationDao.findAllByTo(entity, typeGroup)); 195 + inboundRelationsList.add(relationDao.findAllByTo(entityId, typeGroup));
233 } 196 }
  197 +
234 ListenableFuture<List<List<EntityRelation>>> inboundRelations = Futures.allAsList(inboundRelationsList); 198 ListenableFuture<List<List<EntityRelation>>> inboundRelations = Futures.allAsList(inboundRelationsList);
  199 +
  200 + List<ListenableFuture<List<EntityRelation>>> outboundRelationsList = new ArrayList<>();
  201 + for (RelationTypeGroup typeGroup : RelationTypeGroup.values()) {
  202 + outboundRelationsList.add(relationDao.findAllByFrom(entityId, typeGroup));
  203 + }
  204 +
  205 + ListenableFuture<List<List<EntityRelation>>> outboundRelations = Futures.allAsList(outboundRelationsList);
  206 +
235 ListenableFuture<List<Boolean>> inboundDeletions = Futures.transformAsync(inboundRelations, 207 ListenableFuture<List<Boolean>> inboundDeletions = Futures.transformAsync(inboundRelations,
236 relations -> { 208 relations -> {
237 - List<ListenableFuture<Boolean>> results = getListenableFutures(relations, cache, true); 209 + List<ListenableFuture<Boolean>> results = deleteRelationGroupsAsync(relations, cache, true);
238 return Futures.allAsList(results); 210 return Futures.allAsList(results);
239 }); 211 });
240 212
241 - ListenableFuture<Boolean> inboundFuture = Futures.transform(inboundDeletions, getListToBooleanFunction()); 213 + ListenableFuture<List<Boolean>> outboundDeletions = Futures.transformAsync(outboundRelations,
  214 + relations -> {
  215 + List<ListenableFuture<Boolean>> results = deleteRelationGroupsAsync(relations, cache, false);
  216 + return Futures.allAsList(results);
  217 + });
242 218
243 - List<ListenableFuture<List<EntityRelation>>> outboundRelationsList = new ArrayList<>();  
244 - for (RelationTypeGroup typeGroup : RelationTypeGroup.values()) {  
245 - outboundRelationsList.add(relationDao.findAllByFrom(entity, typeGroup));  
246 - }  
247 - ListenableFuture<List<List<EntityRelation>>> outboundRelations = Futures.allAsList(outboundRelationsList);  
248 - Futures.transformAsync(outboundRelations, relations -> {  
249 - List<ListenableFuture<Boolean>> results = getListenableFutures(relations, cache, false);  
250 - return Futures.allAsList(results);  
251 - }); 219 + ListenableFuture<List<List<Boolean>>> deletionsFuture = Futures.allAsList(inboundDeletions, outboundDeletions);
252 220
253 - ListenableFuture<Boolean> outboundFuture = relationDao.deleteOutboundRelationsAsync(entity);  
254 - return Futures.transform(Futures.allAsList(Arrays.asList(inboundFuture, outboundFuture)), getListToBooleanFunction()); 221 + return Futures.transform(Futures.transformAsync(deletionsFuture, (deletions) -> relationDao.deleteOutboundRelationsAsync(entityId)), result -> null);
255 } 222 }
256 223
257 - private List<ListenableFuture<Boolean>> getListenableFutures(List<List<EntityRelation>> relations, Cache cache, boolean isRemove) { 224 + private List<ListenableFuture<Boolean>> deleteRelationGroupsAsync(List<List<EntityRelation>> relations, Cache cache, boolean deleteFromDb) {
258 List<ListenableFuture<Boolean>> results = new ArrayList<>(); 225 List<ListenableFuture<Boolean>> results = new ArrayList<>();
259 for (List<EntityRelation> relationList : relations) { 226 for (List<EntityRelation> relationList : relations) {
260 - relationList.forEach(relation -> checkFromDeleteAsync(cache, results, relation, isRemove)); 227 + relationList.forEach(relation -> results.add(deleteAsync(cache, relation, deleteFromDb)));
261 } 228 }
262 return results; 229 return results;
263 } 230 }
264 231
265 - private void checkFromDeleteAsync(Cache cache, List<ListenableFuture<Boolean>> results, EntityRelation relation, boolean isRemove) {  
266 - if (isRemove) {  
267 - results.add(relationDao.deleteRelationAsync(relation));  
268 - } 232 + private ListenableFuture<Boolean> deleteAsync(Cache cache, EntityRelation relation, boolean deleteFromDb) {
269 cacheEviction(relation, cache); 233 cacheEviction(relation, cache);
  234 + if (deleteFromDb) {
  235 + return relationDao.deleteRelationAsync(relation);
  236 + } else {
  237 + return Futures.immediateFuture(false);
  238 + }
270 } 239 }
271 240
272 private void cacheEviction(EntityRelation relation, Cache cache) { 241 private void cacheEviction(EntityRelation relation, Cache cache) {
@@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.relation.EntityRelationsQuery; @@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.relation.EntityRelationsQuery;
23 import org.thingsboard.server.common.data.relation.RelationTypeGroup; 23 import org.thingsboard.server.common.data.relation.RelationTypeGroup;
24 24
25 import java.util.List; 25 import java.util.List;
  26 +import java.util.concurrent.ExecutionException;
26 27
27 /** 28 /**
28 * Created by ashvayka on 27.04.17. 29 * Created by ashvayka on 27.04.17.
@@ -47,9 +48,9 @@ public interface RelationService { @@ -47,9 +48,9 @@ public interface RelationService {
47 48
48 ListenableFuture<Boolean> deleteRelationAsync(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup); 49 ListenableFuture<Boolean> deleteRelationAsync(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup);
49 50
50 - boolean deleteEntityRelations(EntityId entity); 51 + void deleteEntityRelations(EntityId entity);
51 52
52 - ListenableFuture<Boolean> deleteEntityRelationsAsync(EntityId entity); 53 + ListenableFuture<Void> deleteEntityRelationsAsync(EntityId entity);
53 54
54 List<EntityRelation> findByFrom(EntityId from, RelationTypeGroup typeGroup); 55 List<EntityRelation> findByFrom(EntityId from, RelationTypeGroup typeGroup);
55 56
@@ -127,39 +127,35 @@ public class JpaRelationDao extends JpaAbstractDaoListeningExecutorService imple @@ -127,39 +127,35 @@ public class JpaRelationDao extends JpaAbstractDaoListeningExecutorService imple
127 @Override 127 @Override
128 public boolean deleteRelation(EntityRelation relation) { 128 public boolean deleteRelation(EntityRelation relation) {
129 RelationCompositeKey key = new RelationCompositeKey(relation); 129 RelationCompositeKey key = new RelationCompositeKey(relation);
130 - boolean relationExistsBeforeDelete = relationRepository.exists(key);  
131 - relationRepository.delete(key);  
132 - return relationExistsBeforeDelete; 130 + return deleteRelationIfExists(key);
133 } 131 }
134 132
135 @Override 133 @Override
136 public ListenableFuture<Boolean> deleteRelationAsync(EntityRelation relation) { 134 public ListenableFuture<Boolean> deleteRelationAsync(EntityRelation relation) {
137 RelationCompositeKey key = new RelationCompositeKey(relation); 135 RelationCompositeKey key = new RelationCompositeKey(relation);
138 return service.submit( 136 return service.submit(
139 - () -> {  
140 - boolean relationExistsBeforeDelete = relationRepository.exists(key);  
141 - relationRepository.delete(key);  
142 - return relationExistsBeforeDelete;  
143 - }); 137 + () -> deleteRelationIfExists(key));
144 } 138 }
145 139
146 @Override 140 @Override
147 public boolean deleteRelation(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) { 141 public boolean deleteRelation(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
148 RelationCompositeKey key = getRelationCompositeKey(from, to, relationType, typeGroup); 142 RelationCompositeKey key = getRelationCompositeKey(from, to, relationType, typeGroup);
149 - boolean relationExistsBeforeDelete = relationRepository.exists(key);  
150 - relationRepository.delete(key);  
151 - return relationExistsBeforeDelete; 143 + return deleteRelationIfExists(key);
152 } 144 }
153 145
154 @Override 146 @Override
155 public ListenableFuture<Boolean> deleteRelationAsync(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) { 147 public ListenableFuture<Boolean> deleteRelationAsync(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
156 RelationCompositeKey key = getRelationCompositeKey(from, to, relationType, typeGroup); 148 RelationCompositeKey key = getRelationCompositeKey(from, to, relationType, typeGroup);
157 return service.submit( 149 return service.submit(
158 - () -> {  
159 - boolean relationExistsBeforeDelete = relationRepository.exists(key);  
160 - relationRepository.delete(key);  
161 - return relationExistsBeforeDelete;  
162 - }); 150 + () -> deleteRelationIfExists(key));
  151 + }
  152 +
  153 + private boolean deleteRelationIfExists(RelationCompositeKey key) {
  154 + boolean relationExistsBeforeDelete = relationRepository.exists(key);
  155 + if (relationExistsBeforeDelete) {
  156 + relationRepository.delete(key);
  157 + }
  158 + return relationExistsBeforeDelete;
163 } 159 }
164 160
165 @Override 161 @Override
@@ -167,7 +163,9 @@ public class JpaRelationDao extends JpaAbstractDaoListeningExecutorService imple @@ -167,7 +163,9 @@ public class JpaRelationDao extends JpaAbstractDaoListeningExecutorService imple
167 boolean relationExistsBeforeDelete = relationRepository 163 boolean relationExistsBeforeDelete = relationRepository
168 .findAllByFromIdAndFromType(UUIDConverter.fromTimeUUID(entity.getId()), entity.getEntityType().name()) 164 .findAllByFromIdAndFromType(UUIDConverter.fromTimeUUID(entity.getId()), entity.getEntityType().name())
169 .size() > 0; 165 .size() > 0;
170 - relationRepository.deleteByFromIdAndFromType(UUIDConverter.fromTimeUUID(entity.getId()), entity.getEntityType().name()); 166 + if (relationExistsBeforeDelete) {
  167 + relationRepository.deleteByFromIdAndFromType(UUIDConverter.fromTimeUUID(entity.getId()), entity.getEntityType().name());
  168 + }
171 return relationExistsBeforeDelete; 169 return relationExistsBeforeDelete;
172 } 170 }
173 171
@@ -178,7 +176,9 @@ public class JpaRelationDao extends JpaAbstractDaoListeningExecutorService imple @@ -178,7 +176,9 @@ public class JpaRelationDao extends JpaAbstractDaoListeningExecutorService imple
178 boolean relationExistsBeforeDelete = relationRepository 176 boolean relationExistsBeforeDelete = relationRepository
179 .findAllByFromIdAndFromType(UUIDConverter.fromTimeUUID(entity.getId()), entity.getEntityType().name()) 177 .findAllByFromIdAndFromType(UUIDConverter.fromTimeUUID(entity.getId()), entity.getEntityType().name())
180 .size() > 0; 178 .size() > 0;
181 - relationRepository.deleteByFromIdAndFromType(UUIDConverter.fromTimeUUID(entity.getId()), entity.getEntityType().name()); 179 + if (relationExistsBeforeDelete) {
  180 + relationRepository.deleteByFromIdAndFromType(UUIDConverter.fromTimeUUID(entity.getId()), entity.getEntityType().name());
  181 + }
182 return relationExistsBeforeDelete; 182 return relationExistsBeforeDelete;
183 }); 183 });
184 } 184 }
@@ -18,6 +18,8 @@ package org.thingsboard.server.dao.queue.db.nosql; @@ -18,6 +18,8 @@ package org.thingsboard.server.dao.queue.db.nosql;
18 import com.google.common.collect.Lists; 18 import com.google.common.collect.Lists;
19 import org.junit.Test; 19 import org.junit.Test;
20 import org.thingsboard.server.common.msg.TbMsg; 20 import org.thingsboard.server.common.msg.TbMsg;
  21 +import org.thingsboard.server.dao.queue.db.MsgAck;
  22 +import org.thingsboard.server.dao.queue.db.UnprocessedMsgFilter;
21 23
22 import java.util.Collection; 24 import java.util.Collection;
23 import java.util.List; 25 import java.util.List;
@@ -21,9 +21,9 @@ import com.google.common.util.concurrent.ListenableFuture; @@ -21,9 +21,9 @@ import com.google.common.util.concurrent.ListenableFuture;
21 import org.junit.Test; 21 import org.junit.Test;
22 import org.springframework.beans.factory.annotation.Autowired; 22 import org.springframework.beans.factory.annotation.Autowired;
23 import org.springframework.test.util.ReflectionTestUtils; 23 import org.springframework.test.util.ReflectionTestUtils;
24 -import org.thingsboard.server.dao.queue.db.nosql.MsgAck;  
25 import org.thingsboard.server.dao.service.AbstractServiceTest; 24 import org.thingsboard.server.dao.service.AbstractServiceTest;
26 import org.thingsboard.server.dao.service.DaoNoSqlTest; 25 import org.thingsboard.server.dao.service.DaoNoSqlTest;
  26 +import org.thingsboard.server.dao.queue.db.MsgAck;
27 27
28 import java.util.List; 28 import java.util.List;
29 import java.util.UUID; 29 import java.util.UUID;
@@ -96,7 +96,7 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest { @@ -96,7 +96,7 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest {
96 saveRelation(relationA); 96 saveRelation(relationA);
97 saveRelation(relationB); 97 saveRelation(relationB);
98 98
99 - Assert.assertTrue(relationService.deleteEntityRelationsAsync(childId).get()); 99 + Assert.assertNull(relationService.deleteEntityRelationsAsync(childId).get());
100 100
101 Assert.assertFalse(relationService.checkRelation(parentId, childId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.COMMON).get()); 101 Assert.assertFalse(relationService.checkRelation(parentId, childId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.COMMON).get());
102 102
@@ -30,4 +30,3 @@ redis.connection.db=0 @@ -30,4 +30,3 @@ redis.connection.db=0
30 redis.connection.password= 30 redis.connection.password=
31 31
32 rule.queue.type=memory 32 rule.queue.type=memory
33 -rule.queue.max_size=10000  
@@ -27,6 +27,7 @@ import org.springframework.stereotype.Service; @@ -27,6 +27,7 @@ import org.springframework.stereotype.Service;
27 import org.thingsboard.server.common.transport.SessionMsgProcessor; 27 import org.thingsboard.server.common.transport.SessionMsgProcessor;
28 import org.thingsboard.server.common.transport.auth.DeviceAuthService; 28 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
29 import org.thingsboard.server.common.transport.quota.QuotaService; 29 import org.thingsboard.server.common.transport.quota.QuotaService;
  30 +import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
30 import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor; 31 import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
31 32
32 import javax.annotation.PostConstruct; 33 import javax.annotation.PostConstruct;
@@ -55,7 +56,7 @@ public class CoapTransportService { @@ -55,7 +56,7 @@ public class CoapTransportService {
55 private DeviceAuthService authService; 56 private DeviceAuthService authService;
56 57
57 @Autowired(required = false) 58 @Autowired(required = false)
58 - private QuotaService quotaService; 59 + private HostRequestsQuotaService quotaService;
59 60
60 61
61 @Value("${coap.bind_address}") 62 @Value("${coap.bind_address}")
@@ -50,7 +50,7 @@ import org.thingsboard.server.common.msg.session.*; @@ -50,7 +50,7 @@ import org.thingsboard.server.common.msg.session.*;
50 import org.thingsboard.server.common.transport.SessionMsgProcessor; 50 import org.thingsboard.server.common.transport.SessionMsgProcessor;
51 import org.thingsboard.server.common.transport.auth.DeviceAuthResult; 51 import org.thingsboard.server.common.transport.auth.DeviceAuthResult;
52 import org.thingsboard.server.common.transport.auth.DeviceAuthService; 52 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
53 -import org.thingsboard.server.common.transport.quota.QuotaService; 53 +import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
54 54
55 import java.util.ArrayList; 55 import java.util.ArrayList;
56 import java.util.List; 56 import java.util.List;
@@ -134,8 +134,8 @@ public class CoapServerTest { @@ -134,8 +134,8 @@ public class CoapServerTest {
134 } 134 }
135 135
136 @Bean 136 @Bean
137 - public static QuotaService quotaService() {  
138 - return key -> false; 137 + public static HostRequestsQuotaService quotaService() {
  138 + return new HostRequestsQuotaService(null, null, null, null, false);
139 } 139 }
140 } 140 }
141 141
@@ -36,6 +36,7 @@ import org.thingsboard.server.common.transport.SessionMsgProcessor; @@ -36,6 +36,7 @@ import org.thingsboard.server.common.transport.SessionMsgProcessor;
36 import org.thingsboard.server.common.transport.adaptor.JsonConverter; 36 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
37 import org.thingsboard.server.common.transport.auth.DeviceAuthService; 37 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
38 import org.thingsboard.server.common.transport.quota.QuotaService; 38 import org.thingsboard.server.common.transport.quota.QuotaService;
  39 +import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
39 import org.thingsboard.server.transport.http.session.HttpSessionCtx; 40 import org.thingsboard.server.transport.http.session.HttpSessionCtx;
40 41
41 import javax.servlet.http.HttpServletRequest; 42 import javax.servlet.http.HttpServletRequest;
@@ -61,7 +62,7 @@ public class DeviceApiController { @@ -61,7 +62,7 @@ public class DeviceApiController {
61 private DeviceAuthService authService; 62 private DeviceAuthService authService;
62 63
63 @Autowired(required = false) 64 @Autowired(required = false)
64 - private QuotaService quotaService; 65 + private HostRequestsQuotaService quotaService;
65 66
66 @RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.GET, produces = "application/json") 67 @RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.GET, produces = "application/json")
67 public DeferredResult<ResponseEntity> getDeviceAttributes(@PathVariable("deviceToken") String deviceToken, 68 public DeferredResult<ResponseEntity> getDeviceAttributes(@PathVariable("deviceToken") String deviceToken,
@@ -29,7 +29,7 @@ import org.springframework.context.ApplicationContext; @@ -29,7 +29,7 @@ import org.springframework.context.ApplicationContext;
29 import org.springframework.stereotype.Service; 29 import org.springframework.stereotype.Service;
30 import org.thingsboard.server.common.transport.SessionMsgProcessor; 30 import org.thingsboard.server.common.transport.SessionMsgProcessor;
31 import org.thingsboard.server.common.transport.auth.DeviceAuthService; 31 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
32 -import org.thingsboard.server.common.transport.quota.QuotaService; 32 +import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
33 import org.thingsboard.server.dao.device.DeviceService; 33 import org.thingsboard.server.dao.device.DeviceService;
34 import org.thingsboard.server.dao.relation.RelationService; 34 import org.thingsboard.server.dao.relation.RelationService;
35 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; 35 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
@@ -67,7 +67,7 @@ public class MqttTransportService { @@ -67,7 +67,7 @@ public class MqttTransportService {
67 private MqttSslHandlerProvider sslHandlerProvider; 67 private MqttSslHandlerProvider sslHandlerProvider;
68 68
69 @Autowired(required = false) 69 @Autowired(required = false)
70 - private QuotaService quotaService; 70 + private HostRequestsQuotaService quotaService;
71 71
72 @Value("${mqtt.bind_address}") 72 @Value("${mqtt.bind_address}")
73 private String host; 73 private String host;