Commit 7b0b5e72ecc8d8c94c8b60238c67ee6e703d029d
Committed by
GitHub
Merge pull request #440 from thingsboard/feature/api-limits
Api Quota
Showing
23 changed files
with
1062 additions
and
69 deletions
... | ... | @@ -111,6 +111,27 @@ coap: |
111 | 111 | adaptor: "${COAP_ADAPTOR_NAME:JsonCoapAdaptor}" |
112 | 112 | timeout: "${COAP_TIMEOUT:10000}" |
113 | 113 | |
114 | +#Quota parameters | |
115 | +quota: | |
116 | + host: | |
117 | + # Max allowed number of API requests in interval for single host | |
118 | + limit: "${QUOTA_HOST_LIMIT:10000}" | |
119 | + # Interval duration | |
120 | + intervalMs: "${QUOTA_HOST_INTERVAL_MS:60000}" | |
121 | + # Maximum silence duration for host after which Host removed from QuotaService. Must be bigger than intervalMs | |
122 | + ttlMs: "${QUOTA_HOST_TTL_MS:60000}" | |
123 | + # Interval for scheduled task that cleans expired records. TTL is used for expiring | |
124 | + cleanPeriodMs: "${QUOTA_HOST_CLEAN_PERIOD_MS:300000}" | |
125 | + # Enable Host API Limits | |
126 | + enabled: "${QUOTA_HOST_ENABLED:false}" | |
127 | + # Array of whitelist hosts | |
128 | + whitelist: "${QUOTA_HOST_WHITELIST:localhost,127.0.0.1}" | |
129 | + # Array of blacklist hosts | |
130 | + blacklist: "${QUOTA_HOST_BLACKLIST:}" | |
131 | + log: | |
132 | + topSize: 10 | |
133 | + intervalMin: 2 | |
134 | + | |
114 | 135 | database: |
115 | 136 | type: "${DATABASE_TYPE:sql}" # cassandra OR sql |
116 | 137 | ... | ... |
... | ... | @@ -74,6 +74,18 @@ |
74 | 74 | <artifactId>mockito-all</artifactId> |
75 | 75 | <scope>test</scope> |
76 | 76 | </dependency> |
77 | + <dependency> | |
78 | + <groupId>org.springframework</groupId> | |
79 | + <artifactId>spring-context</artifactId> | |
80 | + </dependency> | |
81 | + <dependency> | |
82 | + <groupId>com.google.guava</groupId> | |
83 | + <artifactId>guava</artifactId> | |
84 | + </dependency> | |
85 | + <dependency> | |
86 | + <groupId>org.apache.commons</groupId> | |
87 | + <artifactId>commons-lang3</artifactId> | |
88 | + </dependency> | |
77 | 89 | </dependencies> |
78 | 90 | |
79 | 91 | </project> | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2017 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.transport.quota; | |
17 | + | |
18 | +/** | |
19 | + * @author Vitaliy Paromskiy | |
20 | + * @version 1.0 | |
21 | + */ | |
22 | +public final class Clock { | |
23 | + | |
24 | + private static long time = 0L; | |
25 | + | |
26 | + private Clock() { | |
27 | + } | |
28 | + | |
29 | + | |
30 | + public static long millis() { | |
31 | + return time == 0 ? System.currentTimeMillis() : time; | |
32 | + } | |
33 | + | |
34 | + public static void setMillis(long millis) { | |
35 | + time = millis; | |
36 | + } | |
37 | + | |
38 | + public static void shift(long delta) { | |
39 | + time += delta; | |
40 | + } | |
41 | + | |
42 | + public static void reset() { | |
43 | + time = 0; | |
44 | + } | |
45 | +} | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2017 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.transport.quota; | |
17 | + | |
18 | +import org.springframework.beans.factory.annotation.Value; | |
19 | +import org.springframework.stereotype.Component; | |
20 | + | |
21 | +/** | |
22 | + * @author Vitaliy Paromskiy | |
23 | + * @version 1.0 | |
24 | + */ | |
25 | +@Component | |
26 | +public class HostRequestLimitPolicy { | |
27 | + | |
28 | + private final long limit; | |
29 | + | |
30 | + public HostRequestLimitPolicy(@Value("${quota.host.limit}") long limit) { | |
31 | + this.limit = limit; | |
32 | + } | |
33 | + | |
34 | + public boolean isValid(long currentValue) { | |
35 | + return currentValue <= limit; | |
36 | + } | |
37 | + | |
38 | +} | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2017 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.transport.quota; | |
17 | + | |
18 | +import lombok.extern.slf4j.Slf4j; | |
19 | +import org.springframework.beans.factory.annotation.Value; | |
20 | +import org.springframework.stereotype.Service; | |
21 | +import org.thingsboard.server.common.transport.quota.inmemory.HostRequestIntervalRegistry; | |
22 | +import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryCleaner; | |
23 | +import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryLogger; | |
24 | + | |
25 | +import javax.annotation.PostConstruct; | |
26 | +import javax.annotation.PreDestroy; | |
27 | + | |
28 | +/** | |
29 | + * @author Vitaliy Paromskiy | |
30 | + * @version 1.0 | |
31 | + */ | |
32 | +@Service | |
33 | +@Slf4j | |
34 | +public class HostRequestsQuotaService implements QuotaService { | |
35 | + | |
36 | + private final HostRequestIntervalRegistry requestRegistry; | |
37 | + private final HostRequestLimitPolicy requestsPolicy; | |
38 | + private final IntervalRegistryCleaner registryCleaner; | |
39 | + private final IntervalRegistryLogger registryLogger; | |
40 | + private final boolean enabled; | |
41 | + | |
42 | + public HostRequestsQuotaService(HostRequestIntervalRegistry requestRegistry, HostRequestLimitPolicy requestsPolicy, | |
43 | + IntervalRegistryCleaner registryCleaner, IntervalRegistryLogger registryLogger, | |
44 | + @Value("${quota.host.enabled}") boolean enabled) { | |
45 | + this.requestRegistry = requestRegistry; | |
46 | + this.requestsPolicy = requestsPolicy; | |
47 | + this.registryCleaner = registryCleaner; | |
48 | + this.registryLogger = registryLogger; | |
49 | + this.enabled = enabled; | |
50 | + } | |
51 | + | |
52 | + @PostConstruct | |
53 | + public void init() { | |
54 | + if (enabled) { | |
55 | + registryCleaner.schedule(); | |
56 | + registryLogger.schedule(); | |
57 | + } | |
58 | + } | |
59 | + | |
60 | + @PreDestroy | |
61 | + public void close() { | |
62 | + if (enabled) { | |
63 | + registryCleaner.stop(); | |
64 | + registryLogger.stop(); | |
65 | + } | |
66 | + } | |
67 | + | |
68 | + @Override | |
69 | + public boolean isQuotaExceeded(String key) { | |
70 | + if (enabled) { | |
71 | + long count = requestRegistry.tick(key); | |
72 | + return !requestsPolicy.isValid(count); | |
73 | + } | |
74 | + return false; | |
75 | + } | |
76 | +} | ... | ... |
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/QuotaService.java
0 → 100644
1 | +/** | |
2 | + * Copyright © 2016-2017 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.transport.quota; | |
17 | + | |
18 | +/** | |
19 | + * @author Vitaliy Paromskiy | |
20 | + * @version 1.0 | |
21 | + */ | |
22 | +public interface QuotaService { | |
23 | + | |
24 | + boolean isQuotaExceeded(String key); | |
25 | +} | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2017 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.transport.quota.inmemory; | |
17 | + | |
18 | +import com.google.common.collect.Sets; | |
19 | +import lombok.extern.slf4j.Slf4j; | |
20 | +import org.apache.commons.lang3.StringUtils; | |
21 | +import org.springframework.beans.factory.annotation.Value; | |
22 | +import org.springframework.stereotype.Component; | |
23 | + | |
24 | +import javax.annotation.PostConstruct; | |
25 | +import java.util.Map; | |
26 | +import java.util.Set; | |
27 | +import java.util.concurrent.ConcurrentHashMap; | |
28 | +import java.util.stream.Collectors; | |
29 | + | |
30 | +/** | |
31 | + * @author Vitaliy Paromskiy | |
32 | + * @version 1.0 | |
33 | + */ | |
34 | +@Component | |
35 | +@Slf4j | |
36 | +public class HostRequestIntervalRegistry { | |
37 | + | |
38 | + private final Map<String, IntervalCount> hostCounts = new ConcurrentHashMap<>(); | |
39 | + private final long intervalDurationMs; | |
40 | + private final long ttlMs; | |
41 | + private final Set<String> whiteList; | |
42 | + private final Set<String> blackList; | |
43 | + | |
44 | + public HostRequestIntervalRegistry(@Value("${quota.host.intervalMs}") long intervalDurationMs, | |
45 | + @Value("${quota.host.ttlMs}") long ttlMs, | |
46 | + @Value("${quota.host.whitelist}") String whiteList, | |
47 | + @Value("${quota.host.blacklist}") String blackList) { | |
48 | + this.intervalDurationMs = intervalDurationMs; | |
49 | + this.ttlMs = ttlMs; | |
50 | + this.whiteList = Sets.newHashSet(StringUtils.split(whiteList, ',')); | |
51 | + this.blackList = Sets.newHashSet(StringUtils.split(blackList, ',')); | |
52 | + } | |
53 | + | |
54 | + @PostConstruct | |
55 | + public void init() { | |
56 | + if (ttlMs < intervalDurationMs) { | |
57 | + log.warn("TTL for IntervalRegistry [{}] smaller than interval duration [{}]", ttlMs, intervalDurationMs); | |
58 | + } | |
59 | + log.info("Start Host Quota Service with whitelist {}", whiteList); | |
60 | + log.info("Start Host Quota Service with blacklist {}", blackList); | |
61 | + } | |
62 | + | |
63 | + public long tick(String clientHostId) { | |
64 | + if (whiteList.contains(clientHostId)) { | |
65 | + return 0; | |
66 | + } else if (blackList.contains(clientHostId)) { | |
67 | + return Long.MAX_VALUE; | |
68 | + } | |
69 | + IntervalCount intervalCount = hostCounts.computeIfAbsent(clientHostId, s -> new IntervalCount(intervalDurationMs)); | |
70 | + return intervalCount.resetIfExpiredAndTick(); | |
71 | + } | |
72 | + | |
73 | + public void clean() { | |
74 | + hostCounts.entrySet().removeIf(entry -> entry.getValue().silenceDuration() > ttlMs); | |
75 | + } | |
76 | + | |
77 | + public Map<String, Long> getContent() { | |
78 | + return hostCounts.entrySet().stream() | |
79 | + .collect(Collectors.toMap( | |
80 | + Map.Entry::getKey, | |
81 | + interval -> interval.getValue().getCount())); | |
82 | + } | |
83 | +} | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2017 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.transport.quota.inmemory; | |
17 | + | |
18 | + | |
19 | +import org.thingsboard.server.common.transport.quota.Clock; | |
20 | + | |
21 | +import java.util.concurrent.atomic.LongAdder; | |
22 | + | |
23 | +/** | |
24 | + * @author Vitaliy Paromskiy | |
25 | + * @version 1.0 | |
26 | + */ | |
27 | +public class IntervalCount { | |
28 | + | |
29 | + private final LongAdder adder = new LongAdder(); | |
30 | + private final long intervalDurationMs; | |
31 | + private volatile long startTime; | |
32 | + private volatile long lastTickTime; | |
33 | + | |
34 | + public IntervalCount(long intervalDurationMs) { | |
35 | + this.intervalDurationMs = intervalDurationMs; | |
36 | + startTime = Clock.millis(); | |
37 | + } | |
38 | + | |
39 | + public long resetIfExpiredAndTick() { | |
40 | + if (isExpired()) { | |
41 | + reset(); | |
42 | + } | |
43 | + tick(); | |
44 | + return adder.sum(); | |
45 | + } | |
46 | + | |
47 | + public long silenceDuration() { | |
48 | + return Clock.millis() - lastTickTime; | |
49 | + } | |
50 | + | |
51 | + public long getCount() { | |
52 | + return adder.sum(); | |
53 | + } | |
54 | + | |
55 | + private void tick() { | |
56 | + adder.add(1); | |
57 | + lastTickTime = Clock.millis(); | |
58 | + } | |
59 | + | |
60 | + private void reset() { | |
61 | + adder.reset(); | |
62 | + startTime = Clock.millis(); | |
63 | + } | |
64 | + | |
65 | + private boolean isExpired() { | |
66 | + return (Clock.millis() - startTime) > intervalDurationMs; | |
67 | + } | |
68 | +} | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2017 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.transport.quota.inmemory; | |
17 | + | |
18 | +import lombok.extern.slf4j.Slf4j; | |
19 | +import org.springframework.beans.factory.annotation.Value; | |
20 | +import org.springframework.stereotype.Component; | |
21 | + | |
22 | +import javax.annotation.PreDestroy; | |
23 | +import java.util.concurrent.Executors; | |
24 | +import java.util.concurrent.ScheduledExecutorService; | |
25 | +import java.util.concurrent.TimeUnit; | |
26 | + | |
27 | +/** | |
28 | + * @author Vitaliy Paromskiy | |
29 | + * @version 1.0 | |
30 | + */ | |
31 | +@Component | |
32 | +@Slf4j | |
33 | +public class IntervalRegistryCleaner { | |
34 | + | |
35 | + private final HostRequestIntervalRegistry intervalRegistry; | |
36 | + private final long cleanPeriodMs; | |
37 | + private ScheduledExecutorService executor; | |
38 | + | |
39 | + public IntervalRegistryCleaner(HostRequestIntervalRegistry intervalRegistry, @Value("${quota.host.cleanPeriodMs}") long cleanPeriodMs) { | |
40 | + this.intervalRegistry = intervalRegistry; | |
41 | + this.cleanPeriodMs = cleanPeriodMs; | |
42 | + } | |
43 | + | |
44 | + public void schedule() { | |
45 | + if (executor != null) { | |
46 | + throw new IllegalStateException("Registry Cleaner already scheduled"); | |
47 | + } | |
48 | + executor = Executors.newSingleThreadScheduledExecutor(); | |
49 | + executor.scheduleAtFixedRate(this::clean, cleanPeriodMs, cleanPeriodMs, TimeUnit.MILLISECONDS); | |
50 | + } | |
51 | + | |
52 | + public void stop() { | |
53 | + if (executor != null) { | |
54 | + executor.shutdown(); | |
55 | + } | |
56 | + } | |
57 | + | |
58 | + public void clean() { | |
59 | + try { | |
60 | + intervalRegistry.clean(); | |
61 | + } catch (RuntimeException ex) { | |
62 | + log.error("Could not clear Interval Registry", ex); | |
63 | + } | |
64 | + } | |
65 | + | |
66 | +} | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2017 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.transport.quota.inmemory; | |
17 | + | |
18 | +import com.google.common.collect.MinMaxPriorityQueue; | |
19 | +import lombok.extern.slf4j.Slf4j; | |
20 | +import org.springframework.beans.factory.annotation.Value; | |
21 | +import org.springframework.stereotype.Component; | |
22 | + | |
23 | +import java.util.Comparator; | |
24 | +import java.util.Map; | |
25 | +import java.util.concurrent.Executors; | |
26 | +import java.util.concurrent.ScheduledExecutorService; | |
27 | +import java.util.concurrent.TimeUnit; | |
28 | +import java.util.function.Function; | |
29 | +import java.util.stream.Collectors; | |
30 | + | |
31 | +/** | |
32 | + * @author Vitaliy Paromskiy | |
33 | + * @version 1.0 | |
34 | + */ | |
35 | +@Component | |
36 | +@Slf4j | |
37 | +public class IntervalRegistryLogger { | |
38 | + | |
39 | + private final int topSize; | |
40 | + private final HostRequestIntervalRegistry intervalRegistry; | |
41 | + private final long logIntervalMin; | |
42 | + private ScheduledExecutorService executor; | |
43 | + | |
44 | + public IntervalRegistryLogger(@Value("${quota.log.topSize}") int topSize, @Value("${quota.log.intervalMin}") long logIntervalMin, | |
45 | + HostRequestIntervalRegistry intervalRegistry) { | |
46 | + this.topSize = topSize; | |
47 | + this.logIntervalMin = logIntervalMin; | |
48 | + this.intervalRegistry = intervalRegistry; | |
49 | + } | |
50 | + | |
51 | + public void schedule() { | |
52 | + if (executor != null) { | |
53 | + throw new IllegalStateException("Registry Cleaner already scheduled"); | |
54 | + } | |
55 | + executor = Executors.newSingleThreadScheduledExecutor(); | |
56 | + executor.scheduleAtFixedRate(this::logStatistic, logIntervalMin, logIntervalMin, TimeUnit.MINUTES); | |
57 | + } | |
58 | + | |
59 | + public void stop() { | |
60 | + if (executor != null) { | |
61 | + executor.shutdown(); | |
62 | + } | |
63 | + } | |
64 | + | |
65 | + public void logStatistic() { | |
66 | + Map<String, Long> registryContent = intervalRegistry.getContent(); | |
67 | + int uniqHosts = registryContent.size(); | |
68 | + long requestsCount = registryContent.values().stream().mapToLong(i -> i).sum(); | |
69 | + Map<String, Long> top = getTopElements(registryContent); | |
70 | + log(top, uniqHosts, requestsCount); | |
71 | + } | |
72 | + | |
73 | + protected Map<String, Long> getTopElements(Map<String, Long> countMap) { | |
74 | + MinMaxPriorityQueue<Map.Entry<String, Long>> topQueue = MinMaxPriorityQueue | |
75 | + .orderedBy(Comparator.comparing((Function<Map.Entry<String, Long>, Long>) Map.Entry::getValue).reversed()) | |
76 | + .maximumSize(topSize) | |
77 | + .create(countMap.entrySet()); | |
78 | + | |
79 | + return topQueue.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); | |
80 | + } | |
81 | + | |
82 | + private void log(Map<String, Long> top, int uniqHosts, long requestsCount) { | |
83 | + long rps = requestsCount / TimeUnit.MINUTES.toSeconds(logIntervalMin); | |
84 | + StringBuilder builder = new StringBuilder("Quota Statistic : "); | |
85 | + builder.append("uniqHosts : ").append(uniqHosts).append("; "); | |
86 | + builder.append("requestsCount : ").append(requestsCount).append("; "); | |
87 | + builder.append("RPS : ").append(rps).append(" "); | |
88 | + builder.append("top -> "); | |
89 | + for (Map.Entry<String, Long> host : top.entrySet()) { | |
90 | + builder.append(host.getKey()).append(" : ").append(host.getValue()).append("; "); | |
91 | + } | |
92 | + | |
93 | + log.info(builder.toString()); | |
94 | + } | |
95 | +} | ... | ... |
common/transport/src/test/java/org/thingsboard/server/common/transport/quota/ClockTest.java
0 → 100644
1 | +/** | |
2 | + * Copyright © 2016-2017 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.transport.quota; | |
17 | + | |
18 | +import org.junit.After; | |
19 | +import org.junit.Before; | |
20 | +import org.junit.Test; | |
21 | + | |
22 | +import static org.junit.Assert.*; | |
23 | + | |
24 | +/** | |
25 | + * @author Vitaliy Paromskiy | |
26 | + * @version 1.0 | |
27 | + */ | |
28 | +public class ClockTest { | |
29 | + | |
30 | + @Before | |
31 | + public void init() { | |
32 | + Clock.reset(); | |
33 | + } | |
34 | + | |
35 | + @After | |
36 | + public void clear() { | |
37 | + Clock.reset(); | |
38 | + } | |
39 | + | |
40 | + @Test | |
41 | + public void defaultClockUseSystemTime() { | |
42 | + assertFalse(Clock.millis() > System.currentTimeMillis()); | |
43 | + } | |
44 | + | |
45 | + @Test | |
46 | + public void timeCanBeSet() { | |
47 | + Clock.setMillis(100L); | |
48 | + assertEquals(100L, Clock.millis()); | |
49 | + } | |
50 | + | |
51 | + @Test | |
52 | + public void clockCanBeReseted() { | |
53 | + Clock.setMillis(100L); | |
54 | + assertEquals(100L, Clock.millis()); | |
55 | + Clock.reset(); | |
56 | + assertFalse(Clock.millis() > System.currentTimeMillis()); | |
57 | + } | |
58 | + | |
59 | + @Test | |
60 | + public void timeIsShifted() { | |
61 | + Clock.setMillis(100L); | |
62 | + Clock.shift(50L); | |
63 | + assertEquals(150L, Clock.millis()); | |
64 | + } | |
65 | + | |
66 | +} | |
\ No newline at end of file | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2017 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.transport.quota; | |
17 | + | |
18 | +import org.junit.Test; | |
19 | + | |
20 | +import static org.junit.Assert.assertFalse; | |
21 | +import static org.junit.Assert.assertTrue; | |
22 | + | |
23 | +/** | |
24 | + * @author Vitaliy Paromskiy | |
25 | + * @version 1.0 | |
26 | + */ | |
27 | +public class HostRequestLimitPolicyTest { | |
28 | + | |
29 | + private HostRequestLimitPolicy limitPolicy = new HostRequestLimitPolicy(10L); | |
30 | + | |
31 | + @Test | |
32 | + public void ifCurrentValueLessThenLimitItIsValid() { | |
33 | + assertTrue(limitPolicy.isValid(9)); | |
34 | + } | |
35 | + | |
36 | + @Test | |
37 | + public void ifCurrentValueEqualsToLimitItIsValid() { | |
38 | + assertTrue(limitPolicy.isValid(10)); | |
39 | + } | |
40 | + | |
41 | + @Test | |
42 | + public void ifCurrentValueGreaterThenLimitItIsValid() { | |
43 | + assertFalse(limitPolicy.isValid(11)); | |
44 | + } | |
45 | + | |
46 | +} | |
\ No newline at end of file | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2017 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.transport.quota; | |
17 | + | |
18 | +import org.junit.Before; | |
19 | +import org.junit.Test; | |
20 | +import org.thingsboard.server.common.transport.quota.inmemory.HostRequestIntervalRegistry; | |
21 | +import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryCleaner; | |
22 | +import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryLogger; | |
23 | + | |
24 | +import static org.junit.Assert.assertFalse; | |
25 | +import static org.junit.Assert.assertTrue; | |
26 | +import static org.mockito.Mockito.*; | |
27 | + | |
28 | +/** | |
29 | + * @author Vitaliy Paromskiy | |
30 | + * @version 1.0 | |
31 | + */ | |
32 | +public class HostRequestsQuotaServiceTest { | |
33 | + | |
34 | + private HostRequestsQuotaService quotaService; | |
35 | + | |
36 | + private HostRequestIntervalRegistry requestRegistry = mock(HostRequestIntervalRegistry.class); | |
37 | + private HostRequestLimitPolicy requestsPolicy = mock(HostRequestLimitPolicy.class); | |
38 | + private IntervalRegistryCleaner registryCleaner = mock(IntervalRegistryCleaner.class); | |
39 | + private IntervalRegistryLogger registryLogger = mock(IntervalRegistryLogger.class); | |
40 | + | |
41 | + @Before | |
42 | + public void init() { | |
43 | + quotaService = new HostRequestsQuotaService(requestRegistry, requestsPolicy, registryCleaner, registryLogger, true); | |
44 | + } | |
45 | + | |
46 | + @Test | |
47 | + public void quotaExceededIfRequestCountBiggerThanAllowed() { | |
48 | + when(requestRegistry.tick("key")).thenReturn(10L); | |
49 | + when(requestsPolicy.isValid(10L)).thenReturn(false); | |
50 | + | |
51 | + assertTrue(quotaService.isQuotaExceeded("key")); | |
52 | + | |
53 | + verify(requestRegistry).tick("key"); | |
54 | + verify(requestsPolicy).isValid(10L); | |
55 | + verifyNoMoreInteractions(requestRegistry, requestsPolicy); | |
56 | + } | |
57 | + | |
58 | + @Test | |
59 | + public void quotaNotExceededIfRequestCountLessThanAllowed() { | |
60 | + when(requestRegistry.tick("key")).thenReturn(10L); | |
61 | + when(requestsPolicy.isValid(10L)).thenReturn(true); | |
62 | + | |
63 | + assertFalse(quotaService.isQuotaExceeded("key")); | |
64 | + | |
65 | + verify(requestRegistry).tick("key"); | |
66 | + verify(requestsPolicy).isValid(10L); | |
67 | + verifyNoMoreInteractions(requestRegistry, requestsPolicy); | |
68 | + } | |
69 | + | |
70 | + @Test | |
71 | + public void serviceCanBeDisabled() { | |
72 | + quotaService = new HostRequestsQuotaService(requestRegistry, requestsPolicy, registryCleaner, registryLogger, false); | |
73 | + assertFalse(quotaService.isQuotaExceeded("key")); | |
74 | + verifyNoMoreInteractions(requestRegistry, requestsPolicy); | |
75 | + } | |
76 | +} | |
\ No newline at end of file | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2017 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.transport.quota.inmemory; | |
17 | + | |
18 | +import com.google.common.collect.Sets; | |
19 | +import org.junit.Before; | |
20 | +import org.junit.Test; | |
21 | + | |
22 | +import java.util.Collections; | |
23 | + | |
24 | +import static org.junit.Assert.assertEquals; | |
25 | + | |
26 | +/** | |
27 | + * @author Vitaliy Paromskiy | |
28 | + * @version 1.0 | |
29 | + */ | |
30 | +public class HostRequestIntervalRegistryTest { | |
31 | + | |
32 | + private HostRequestIntervalRegistry registry; | |
33 | + | |
34 | + @Before | |
35 | + public void init() { | |
36 | + registry = new HostRequestIntervalRegistry(10000L, 100L,"g1,g2", "b1"); | |
37 | + } | |
38 | + | |
39 | + @Test | |
40 | + public void newHostCreateNewInterval() { | |
41 | + assertEquals(1L, registry.tick("host1")); | |
42 | + } | |
43 | + | |
44 | + @Test | |
45 | + public void existingHostUpdated() { | |
46 | + registry.tick("aaa"); | |
47 | + assertEquals(1L, registry.tick("bbb")); | |
48 | + assertEquals(2L, registry.tick("aaa")); | |
49 | + } | |
50 | + | |
51 | + @Test | |
52 | + public void expiredIntervalsCleaned() throws InterruptedException { | |
53 | + registry.tick("aaa"); | |
54 | + Thread.sleep(150L); | |
55 | + registry.tick("bbb"); | |
56 | + registry.clean(); | |
57 | + assertEquals(1L, registry.tick("aaa")); | |
58 | + assertEquals(2L, registry.tick("bbb")); | |
59 | + } | |
60 | + | |
61 | + @Test | |
62 | + public void domainFromWhitelistNotCounted(){ | |
63 | + assertEquals(0L, registry.tick("g1")); | |
64 | + assertEquals(0L, registry.tick("g1")); | |
65 | + assertEquals(0L, registry.tick("g2")); | |
66 | + } | |
67 | + | |
68 | + @Test | |
69 | + public void domainFromBlackListReturnMaxValue(){ | |
70 | + assertEquals(Long.MAX_VALUE, registry.tick("b1")); | |
71 | + assertEquals(Long.MAX_VALUE, registry.tick("b1")); | |
72 | + } | |
73 | + | |
74 | + @Test | |
75 | + public void emptyWhitelistParsedOk(){ | |
76 | + registry = new HostRequestIntervalRegistry(10000L, 100L,"", "b1"); | |
77 | + assertEquals(1L, registry.tick("aaa")); | |
78 | + } | |
79 | + | |
80 | + @Test | |
81 | + public void emptyBlacklistParsedOk(){ | |
82 | + registry = new HostRequestIntervalRegistry(10000L, 100L,"", ""); | |
83 | + assertEquals(1L, registry.tick("aaa")); | |
84 | + } | |
85 | +} | |
\ No newline at end of file | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2017 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.transport.quota.inmemory; | |
17 | + | |
18 | +import org.junit.After; | |
19 | +import org.junit.Before; | |
20 | +import org.junit.Test; | |
21 | +import org.thingsboard.server.common.transport.quota.Clock; | |
22 | + | |
23 | +import static org.junit.Assert.assertEquals; | |
24 | + | |
25 | +/** | |
26 | + * @author Vitaliy Paromskiy | |
27 | + * @version 1.0 | |
28 | + */ | |
29 | +public class IntervalCountTest { | |
30 | + | |
31 | + @Before | |
32 | + public void init() { | |
33 | + Clock.setMillis(1000L); | |
34 | + } | |
35 | + | |
36 | + @After | |
37 | + public void clear() { | |
38 | + Clock.reset(); | |
39 | + } | |
40 | + | |
41 | + @Test | |
42 | + public void ticksInSameIntervalAreSummed() { | |
43 | + IntervalCount intervalCount = new IntervalCount(100L); | |
44 | + assertEquals(1L, intervalCount.resetIfExpiredAndTick()); | |
45 | + Clock.shift(100); | |
46 | + assertEquals(2L, intervalCount.resetIfExpiredAndTick()); | |
47 | + } | |
48 | + | |
49 | + @Test | |
50 | + public void oldDataCleanedWhenIntervalExpired() { | |
51 | + IntervalCount intervalCount = new IntervalCount(100L); | |
52 | + assertEquals(1L, intervalCount.resetIfExpiredAndTick()); | |
53 | + Clock.shift(101); | |
54 | + assertEquals(1L, intervalCount.resetIfExpiredAndTick()); | |
55 | + } | |
56 | + | |
57 | + @Test | |
58 | + public void silenceDurationCalculatedFromLastTick() { | |
59 | + IntervalCount intervalCount = new IntervalCount(100L); | |
60 | + assertEquals(1L, intervalCount.resetIfExpiredAndTick()); | |
61 | + Clock.shift(10L); | |
62 | + assertEquals(10L, intervalCount.silenceDuration()); | |
63 | + } | |
64 | + | |
65 | +} | |
\ No newline at end of file | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2017 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.transport.quota.inmemory; | |
17 | + | |
18 | +import com.google.common.collect.ImmutableMap; | |
19 | +import org.junit.Before; | |
20 | +import org.junit.Test; | |
21 | + | |
22 | +import java.util.Collections; | |
23 | +import java.util.Map; | |
24 | + | |
25 | +import static org.junit.Assert.assertEquals; | |
26 | +import static org.mockito.Mockito.mock; | |
27 | + | |
28 | +/** | |
29 | + * @author Vitaliy Paromskiy | |
30 | + * @version 1.0 | |
31 | + */ | |
32 | +public class IntervalRegistryLoggerTest { | |
33 | + | |
34 | + private IntervalRegistryLogger logger; | |
35 | + | |
36 | + private HostRequestIntervalRegistry requestRegistry = mock(HostRequestIntervalRegistry.class); | |
37 | + | |
38 | + @Before | |
39 | + public void init() { | |
40 | + logger = new IntervalRegistryLogger(3, 10, requestRegistry); | |
41 | + } | |
42 | + | |
43 | + @Test | |
44 | + public void onlyMaxHostsCollected() { | |
45 | + Map<String, Long> map = ImmutableMap.of("a", 8L, "b", 3L, "c", 1L, "d", 3L); | |
46 | + Map<String, Long> actual = logger.getTopElements(map); | |
47 | + Map<String, Long> expected = ImmutableMap.of("a", 8L, "b", 3L, "d", 3L); | |
48 | + | |
49 | + assertEquals(expected, actual); | |
50 | + } | |
51 | + | |
52 | + @Test | |
53 | + public void emptyMapProcessedCorrectly() { | |
54 | + Map<String, Long> map = Collections.emptyMap(); | |
55 | + Map<String, Long> actual = logger.getTopElements(map); | |
56 | + Map<String, Long> expected = Collections.emptyMap(); | |
57 | + | |
58 | + assertEquals(expected, actual); | |
59 | + } | |
60 | + | |
61 | +} | |
\ No newline at end of file | ... | ... |
... | ... | @@ -34,6 +34,7 @@ import org.thingsboard.server.common.msg.session.*; |
34 | 34 | import org.thingsboard.server.common.transport.SessionMsgProcessor; |
35 | 35 | import org.thingsboard.server.common.transport.adaptor.AdaptorException; |
36 | 36 | import org.thingsboard.server.common.transport.auth.DeviceAuthService; |
37 | +import org.thingsboard.server.common.transport.quota.QuotaService; | |
37 | 38 | import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor; |
38 | 39 | import org.thingsboard.server.transport.coap.session.CoapExchangeObserverProxy; |
39 | 40 | import org.thingsboard.server.transport.coap.session.CoapSessionCtx; |
... | ... | @@ -51,13 +52,16 @@ public class CoapTransportResource extends CoapResource { |
51 | 52 | private final CoapTransportAdaptor adaptor; |
52 | 53 | private final SessionMsgProcessor processor; |
53 | 54 | private final DeviceAuthService authService; |
55 | + private final QuotaService quotaService; | |
54 | 56 | private final Field observerField; |
55 | 57 | private final long timeout; |
56 | 58 | |
57 | - public CoapTransportResource(SessionMsgProcessor processor, DeviceAuthService authService, CoapTransportAdaptor adaptor, String name, long timeout) { | |
59 | + public CoapTransportResource(SessionMsgProcessor processor, DeviceAuthService authService, CoapTransportAdaptor adaptor, String name, | |
60 | + long timeout, QuotaService quotaService) { | |
58 | 61 | super(name); |
59 | 62 | this.processor = processor; |
60 | 63 | this.authService = authService; |
64 | + this.quotaService = quotaService; | |
61 | 65 | this.adaptor = adaptor; |
62 | 66 | this.timeout = timeout; |
63 | 67 | // This is important to turn off existing observable logic in |
... | ... | @@ -70,6 +74,12 @@ public class CoapTransportResource extends CoapResource { |
70 | 74 | |
71 | 75 | @Override |
72 | 76 | public void handleGET(CoapExchange exchange) { |
77 | + if(quotaService.isQuotaExceeded(exchange.getSourceAddress().getHostAddress())) { | |
78 | + log.warn("COAP Quota exceeded for [{}:{}] . Disconnect", exchange.getSourceAddress().getHostAddress(), exchange.getSourcePort()); | |
79 | + exchange.respond(ResponseCode.BAD_REQUEST); | |
80 | + return; | |
81 | + } | |
82 | + | |
73 | 83 | Optional<FeatureType> featureType = getFeatureType(exchange.advanced().getRequest()); |
74 | 84 | if (!featureType.isPresent()) { |
75 | 85 | log.trace("Missing feature type parameter"); | ... | ... |
... | ... | @@ -15,25 +15,25 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.transport.coap; |
17 | 17 | |
18 | -import java.net.InetAddress; | |
19 | -import java.net.InetSocketAddress; | |
20 | -import java.net.UnknownHostException; | |
21 | - | |
22 | -import javax.annotation.PostConstruct; | |
23 | -import javax.annotation.PreDestroy; | |
24 | - | |
25 | 18 | import lombok.extern.slf4j.Slf4j; |
26 | 19 | import org.eclipse.californium.core.CoapResource; |
27 | 20 | import org.eclipse.californium.core.CoapServer; |
28 | 21 | import org.eclipse.californium.core.network.CoapEndpoint; |
29 | -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; | |
30 | -import org.thingsboard.server.common.transport.SessionMsgProcessor; | |
31 | -import org.thingsboard.server.common.transport.auth.DeviceAuthService; | |
32 | -import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor; | |
33 | 22 | import org.springframework.beans.factory.annotation.Autowired; |
34 | 23 | import org.springframework.beans.factory.annotation.Value; |
24 | +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; | |
35 | 25 | import org.springframework.context.ApplicationContext; |
36 | 26 | import org.springframework.stereotype.Service; |
27 | +import org.thingsboard.server.common.transport.SessionMsgProcessor; | |
28 | +import org.thingsboard.server.common.transport.auth.DeviceAuthService; | |
29 | +import org.thingsboard.server.common.transport.quota.QuotaService; | |
30 | +import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor; | |
31 | + | |
32 | +import javax.annotation.PostConstruct; | |
33 | +import javax.annotation.PreDestroy; | |
34 | +import java.net.InetAddress; | |
35 | +import java.net.InetSocketAddress; | |
36 | +import java.net.UnknownHostException; | |
37 | 37 | |
38 | 38 | @Service("CoapTransportService") |
39 | 39 | @ConditionalOnProperty(prefix = "coap", value = "enabled", havingValue = "true", matchIfMissing = true) |
... | ... | @@ -54,6 +54,9 @@ public class CoapTransportService { |
54 | 54 | @Autowired(required = false) |
55 | 55 | private DeviceAuthService authService; |
56 | 56 | |
57 | + @Autowired(required = false) | |
58 | + private QuotaService quotaService; | |
59 | + | |
57 | 60 | |
58 | 61 | @Value("${coap.bind_address}") |
59 | 62 | private String host; |
... | ... | @@ -83,7 +86,7 @@ public class CoapTransportService { |
83 | 86 | |
84 | 87 | private void createResources() { |
85 | 88 | CoapResource api = new CoapResource(API); |
86 | - api.add(new CoapTransportResource(processor, authService, adaptor, V1, timeout)); | |
89 | + api.add(new CoapTransportResource(processor, authService, adaptor, V1, timeout, quotaService)); | |
87 | 90 | server.add(api); |
88 | 91 | } |
89 | 92 | ... | ... |
... | ... | @@ -50,6 +50,7 @@ import org.thingsboard.server.common.msg.session.*; |
50 | 50 | import org.thingsboard.server.common.transport.SessionMsgProcessor; |
51 | 51 | import org.thingsboard.server.common.transport.auth.DeviceAuthResult; |
52 | 52 | import org.thingsboard.server.common.transport.auth.DeviceAuthService; |
53 | +import org.thingsboard.server.common.transport.quota.QuotaService; | |
53 | 54 | |
54 | 55 | import java.util.ArrayList; |
55 | 56 | import java.util.List; |
... | ... | @@ -131,6 +132,11 @@ public class CoapServerTest { |
131 | 132 | } |
132 | 133 | }; |
133 | 134 | } |
135 | + | |
136 | + @Bean | |
137 | + public static QuotaService quotaService() { | |
138 | + return key -> false; | |
139 | + } | |
134 | 140 | } |
135 | 141 | |
136 | 142 | @Autowired | ... | ... |
... | ... | @@ -35,10 +35,11 @@ import org.thingsboard.server.common.msg.session.FromDeviceMsg; |
35 | 35 | import org.thingsboard.server.common.transport.SessionMsgProcessor; |
36 | 36 | import org.thingsboard.server.common.transport.adaptor.JsonConverter; |
37 | 37 | import org.thingsboard.server.common.transport.auth.DeviceAuthService; |
38 | +import org.thingsboard.server.common.transport.quota.QuotaService; | |
38 | 39 | import org.thingsboard.server.transport.http.session.HttpSessionCtx; |
39 | 40 | |
41 | +import javax.servlet.http.HttpServletRequest; | |
40 | 42 | import java.util.Arrays; |
41 | -import java.util.Collections; | |
42 | 43 | import java.util.HashSet; |
43 | 44 | import java.util.Set; |
44 | 45 | |
... | ... | @@ -59,11 +60,18 @@ public class DeviceApiController { |
59 | 60 | @Autowired(required = false) |
60 | 61 | private DeviceAuthService authService; |
61 | 62 | |
63 | + @Autowired(required = false) | |
64 | + private QuotaService quotaService; | |
65 | + | |
62 | 66 | @RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.GET, produces = "application/json") |
63 | 67 | public DeferredResult<ResponseEntity> getDeviceAttributes(@PathVariable("deviceToken") String deviceToken, |
64 | 68 | @RequestParam(value = "clientKeys", required = false, defaultValue = "") String clientKeys, |
65 | - @RequestParam(value = "sharedKeys", required = false, defaultValue = "") String sharedKeys) { | |
69 | + @RequestParam(value = "sharedKeys", required = false, defaultValue = "") String sharedKeys, | |
70 | + HttpServletRequest httpRequest) { | |
66 | 71 | DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>(); |
72 | + if (quotaExceeded(httpRequest, responseWriter)) { | |
73 | + return responseWriter; | |
74 | + } | |
67 | 75 | HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); |
68 | 76 | if (ctx.login(new DeviceTokenCredentials(deviceToken))) { |
69 | 77 | GetAttributesRequest request; |
... | ... | @@ -84,8 +92,11 @@ public class DeviceApiController { |
84 | 92 | |
85 | 93 | @RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.POST) |
86 | 94 | public DeferredResult<ResponseEntity> postDeviceAttributes(@PathVariable("deviceToken") String deviceToken, |
87 | - @RequestBody String json) { | |
95 | + @RequestBody String json, HttpServletRequest request) { | |
88 | 96 | DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>(); |
97 | + if (quotaExceeded(request, responseWriter)) { | |
98 | + return responseWriter; | |
99 | + } | |
89 | 100 | HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); |
90 | 101 | if (ctx.login(new DeviceTokenCredentials(deviceToken))) { |
91 | 102 | try { |
... | ... | @@ -101,8 +112,11 @@ public class DeviceApiController { |
101 | 112 | |
102 | 113 | @RequestMapping(value = "/{deviceToken}/telemetry", method = RequestMethod.POST) |
103 | 114 | public DeferredResult<ResponseEntity> postTelemetry(@PathVariable("deviceToken") String deviceToken, |
104 | - @RequestBody String json) { | |
115 | + @RequestBody String json, HttpServletRequest request) { | |
105 | 116 | DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>(); |
117 | + if (quotaExceeded(request, responseWriter)) { | |
118 | + return responseWriter; | |
119 | + } | |
106 | 120 | HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); |
107 | 121 | if (ctx.login(new DeviceTokenCredentials(deviceToken))) { |
108 | 122 | try { |
... | ... | @@ -118,15 +132,20 @@ public class DeviceApiController { |
118 | 132 | |
119 | 133 | @RequestMapping(value = "/{deviceToken}/rpc", method = RequestMethod.GET, produces = "application/json") |
120 | 134 | public DeferredResult<ResponseEntity> subscribeToCommands(@PathVariable("deviceToken") String deviceToken, |
121 | - @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout) { | |
122 | - return subscribe(deviceToken, timeout, new RpcSubscribeMsg()); | |
135 | + @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout, | |
136 | + HttpServletRequest request) { | |
137 | + | |
138 | + return subscribe(deviceToken, timeout, new RpcSubscribeMsg(), request); | |
123 | 139 | } |
124 | 140 | |
125 | 141 | @RequestMapping(value = "/{deviceToken}/rpc/{requestId}", method = RequestMethod.POST) |
126 | 142 | public DeferredResult<ResponseEntity> replyToCommand(@PathVariable("deviceToken") String deviceToken, |
127 | 143 | @PathVariable("requestId") Integer requestId, |
128 | - @RequestBody String json) { | |
144 | + @RequestBody String json, HttpServletRequest request) { | |
129 | 145 | DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>(); |
146 | + if (quotaExceeded(request, responseWriter)) { | |
147 | + return responseWriter; | |
148 | + } | |
130 | 149 | HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); |
131 | 150 | if (ctx.login(new DeviceTokenCredentials(deviceToken))) { |
132 | 151 | try { |
... | ... | @@ -143,8 +162,11 @@ public class DeviceApiController { |
143 | 162 | |
144 | 163 | @RequestMapping(value = "/{deviceToken}/rpc", method = RequestMethod.POST) |
145 | 164 | public DeferredResult<ResponseEntity> postRpcRequest(@PathVariable("deviceToken") String deviceToken, |
146 | - @RequestBody String json) { | |
165 | + @RequestBody String json, HttpServletRequest httpRequest) { | |
147 | 166 | DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>(); |
167 | + if (quotaExceeded(httpRequest, responseWriter)) { | |
168 | + return responseWriter; | |
169 | + } | |
148 | 170 | HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); |
149 | 171 | if (ctx.login(new DeviceTokenCredentials(deviceToken))) { |
150 | 172 | try { |
... | ... | @@ -163,12 +185,17 @@ public class DeviceApiController { |
163 | 185 | |
164 | 186 | @RequestMapping(value = "/{deviceToken}/attributes/updates", method = RequestMethod.GET, produces = "application/json") |
165 | 187 | public DeferredResult<ResponseEntity> subscribeToAttributes(@PathVariable("deviceToken") String deviceToken, |
166 | - @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout) { | |
167 | - return subscribe(deviceToken, timeout, new AttributesSubscribeMsg()); | |
188 | + @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout, | |
189 | + HttpServletRequest httpRequest) { | |
190 | + | |
191 | + return subscribe(deviceToken, timeout, new AttributesSubscribeMsg(), httpRequest); | |
168 | 192 | } |
169 | 193 | |
170 | - private DeferredResult<ResponseEntity> subscribe(String deviceToken, long timeout, FromDeviceMsg msg) { | |
194 | + private DeferredResult<ResponseEntity> subscribe(String deviceToken, long timeout, FromDeviceMsg msg, HttpServletRequest httpRequest) { | |
171 | 195 | DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>(); |
196 | + if (quotaExceeded(httpRequest, responseWriter)) { | |
197 | + return responseWriter; | |
198 | + } | |
172 | 199 | HttpSessionCtx ctx = getHttpSessionCtx(responseWriter, timeout); |
173 | 200 | if (ctx.login(new DeviceTokenCredentials(deviceToken))) { |
174 | 201 | try { |
... | ... | @@ -195,4 +222,13 @@ public class DeviceApiController { |
195 | 222 | processor.process(new BasicToDeviceActorSessionMsg(ctx.getDevice(), msg)); |
196 | 223 | } |
197 | 224 | |
225 | + private boolean quotaExceeded(HttpServletRequest request, DeferredResult<ResponseEntity> responseWriter) { | |
226 | + if (quotaService.isQuotaExceeded(request.getRemoteAddr())) { | |
227 | + log.warn("REST Quota exceeded for [{}] . Disconnect", request.getRemoteAddr()); | |
228 | + responseWriter.setResult(new ResponseEntity<>(HttpStatus.BANDWIDTH_LIMIT_EXCEEDED)); | |
229 | + return true; | |
230 | + } | |
231 | + return false; | |
232 | + } | |
233 | + | |
198 | 234 | } | ... | ... |
... | ... | @@ -16,7 +16,6 @@ |
16 | 16 | package org.thingsboard.server.transport.mqtt; |
17 | 17 | |
18 | 18 | import com.fasterxml.jackson.databind.JsonNode; |
19 | -import io.netty.channel.Channel; | |
20 | 19 | import io.netty.channel.ChannelHandlerContext; |
21 | 20 | import io.netty.channel.ChannelInboundHandlerAdapter; |
22 | 21 | import io.netty.handler.codec.mqtt.*; |
... | ... | @@ -36,18 +35,18 @@ import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; |
36 | 35 | import org.thingsboard.server.common.transport.SessionMsgProcessor; |
37 | 36 | import org.thingsboard.server.common.transport.adaptor.AdaptorException; |
38 | 37 | import org.thingsboard.server.common.transport.auth.DeviceAuthService; |
38 | +import org.thingsboard.server.common.transport.quota.QuotaService; | |
39 | 39 | import org.thingsboard.server.dao.EncryptionUtil; |
40 | 40 | import org.thingsboard.server.dao.device.DeviceService; |
41 | 41 | import org.thingsboard.server.dao.relation.RelationService; |
42 | 42 | import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; |
43 | -import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx; | |
44 | 43 | import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; |
44 | +import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx; | |
45 | 45 | import org.thingsboard.server.transport.mqtt.util.SslUtil; |
46 | 46 | |
47 | 47 | import javax.net.ssl.SSLPeerUnverifiedException; |
48 | 48 | import javax.security.cert.X509Certificate; |
49 | 49 | import java.net.InetSocketAddress; |
50 | -import java.net.SocketAddress; | |
51 | 50 | import java.util.ArrayList; |
52 | 51 | import java.util.List; |
53 | 52 | |
... | ... | @@ -72,13 +71,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
72 | 71 | private final DeviceService deviceService; |
73 | 72 | private final DeviceAuthService authService; |
74 | 73 | private final RelationService relationService; |
74 | + private final QuotaService quotaService; | |
75 | 75 | private final SslHandler sslHandler; |
76 | 76 | private volatile boolean connected; |
77 | 77 | private volatile InetSocketAddress address; |
78 | 78 | private volatile GatewaySessionCtx gatewaySessionCtx; |
79 | 79 | |
80 | 80 | public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, |
81 | - MqttTransportAdaptor adaptor, SslHandler sslHandler) { | |
81 | + MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService) { | |
82 | 82 | this.processor = processor; |
83 | 83 | this.deviceService = deviceService; |
84 | 84 | this.relationService = relationService; |
... | ... | @@ -87,6 +87,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
87 | 87 | this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor); |
88 | 88 | this.sessionId = deviceSessionCtx.getSessionId().toUidStr(); |
89 | 89 | this.sslHandler = sslHandler; |
90 | + this.quotaService = quotaService; | |
90 | 91 | } |
91 | 92 | |
92 | 93 | @Override |
... | ... | @@ -102,35 +103,43 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
102 | 103 | if (msg.fixedHeader() == null) { |
103 | 104 | log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort()); |
104 | 105 | processDisconnect(ctx); |
105 | - } else { | |
106 | - deviceSessionCtx.setChannel(ctx); | |
107 | - switch (msg.fixedHeader().messageType()) { | |
108 | - case CONNECT: | |
109 | - processConnect(ctx, (MqttConnectMessage) msg); | |
110 | - break; | |
111 | - case PUBLISH: | |
112 | - processPublish(ctx, (MqttPublishMessage) msg); | |
113 | - break; | |
114 | - case SUBSCRIBE: | |
115 | - processSubscribe(ctx, (MqttSubscribeMessage) msg); | |
116 | - break; | |
117 | - case UNSUBSCRIBE: | |
118 | - processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg); | |
119 | - break; | |
120 | - case PINGREQ: | |
121 | - if (checkConnected(ctx)) { | |
122 | - ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); | |
123 | - } | |
124 | - break; | |
125 | - case DISCONNECT: | |
126 | - if (checkConnected(ctx)) { | |
127 | - processDisconnect(ctx); | |
128 | - } | |
129 | - break; | |
130 | - default: | |
131 | - break; | |
132 | - } | |
106 | + return; | |
107 | + } | |
108 | + | |
109 | + if (quotaService.isQuotaExceeded(address.getHostName())) { | |
110 | + log.warn("MQTT Quota exceeded for [{}:{}] . Disconnect", address.getHostName(), address.getPort()); | |
111 | + processDisconnect(ctx); | |
112 | + return; | |
133 | 113 | } |
114 | + | |
115 | + deviceSessionCtx.setChannel(ctx); | |
116 | + switch (msg.fixedHeader().messageType()) { | |
117 | + case CONNECT: | |
118 | + processConnect(ctx, (MqttConnectMessage) msg); | |
119 | + break; | |
120 | + case PUBLISH: | |
121 | + processPublish(ctx, (MqttPublishMessage) msg); | |
122 | + break; | |
123 | + case SUBSCRIBE: | |
124 | + processSubscribe(ctx, (MqttSubscribeMessage) msg); | |
125 | + break; | |
126 | + case UNSUBSCRIBE: | |
127 | + processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg); | |
128 | + break; | |
129 | + case PINGREQ: | |
130 | + if (checkConnected(ctx)) { | |
131 | + ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); | |
132 | + } | |
133 | + break; | |
134 | + case DISCONNECT: | |
135 | + if (checkConnected(ctx)) { | |
136 | + processDisconnect(ctx); | |
137 | + } | |
138 | + break; | |
139 | + default: | |
140 | + break; | |
141 | + } | |
142 | + | |
134 | 143 | } |
135 | 144 | |
136 | 145 | private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) { | ... | ... |
... | ... | @@ -15,27 +15,19 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.transport.mqtt; |
17 | 17 | |
18 | -import io.netty.buffer.ByteBufAllocator; | |
19 | -import io.netty.channel.ChannelHandler; | |
20 | 18 | import io.netty.channel.ChannelInitializer; |
21 | 19 | import io.netty.channel.ChannelPipeline; |
22 | 20 | import io.netty.channel.socket.SocketChannel; |
23 | 21 | import io.netty.handler.codec.mqtt.MqttDecoder; |
24 | 22 | import io.netty.handler.codec.mqtt.MqttEncoder; |
25 | -import io.netty.handler.ssl.SslContext; | |
26 | -import io.netty.handler.ssl.SslContextBuilder; | |
27 | 23 | import io.netty.handler.ssl.SslHandler; |
28 | -import io.netty.handler.ssl.util.SelfSignedCertificate; | |
29 | -import org.springframework.beans.factory.annotation.Value; | |
30 | 24 | import org.thingsboard.server.common.transport.SessionMsgProcessor; |
31 | 25 | import org.thingsboard.server.common.transport.auth.DeviceAuthService; |
26 | +import org.thingsboard.server.common.transport.quota.QuotaService; | |
32 | 27 | import org.thingsboard.server.dao.device.DeviceService; |
33 | 28 | import org.thingsboard.server.dao.relation.RelationService; |
34 | 29 | import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; |
35 | 30 | |
36 | -import javax.net.ssl.SSLException; | |
37 | -import java.security.cert.CertificateException; | |
38 | - | |
39 | 31 | /** |
40 | 32 | * @author Andrew Shvayka |
41 | 33 | */ |
... | ... | @@ -49,16 +41,18 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha |
49 | 41 | private final RelationService relationService; |
50 | 42 | private final MqttTransportAdaptor adaptor; |
51 | 43 | private final MqttSslHandlerProvider sslHandlerProvider; |
44 | + private final QuotaService quotaService; | |
52 | 45 | |
53 | 46 | public MqttTransportServerInitializer(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, |
54 | - MqttTransportAdaptor adaptor, | |
55 | - MqttSslHandlerProvider sslHandlerProvider) { | |
47 | + MqttTransportAdaptor adaptor, MqttSslHandlerProvider sslHandlerProvider, | |
48 | + QuotaService quotaService) { | |
56 | 49 | this.processor = processor; |
57 | 50 | this.deviceService = deviceService; |
58 | 51 | this.authService = authService; |
59 | 52 | this.relationService = relationService; |
60 | 53 | this.adaptor = adaptor; |
61 | 54 | this.sslHandlerProvider = sslHandlerProvider; |
55 | + this.quotaService = quotaService; | |
62 | 56 | } |
63 | 57 | |
64 | 58 | @Override |
... | ... | @@ -72,7 +66,9 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha |
72 | 66 | pipeline.addLast("decoder", new MqttDecoder(MAX_PAYLOAD_SIZE)); |
73 | 67 | pipeline.addLast("encoder", MqttEncoder.INSTANCE); |
74 | 68 | |
75 | - MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, relationService, adaptor, sslHandler); | |
69 | + MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, relationService, | |
70 | + adaptor, sslHandler, quotaService); | |
71 | + | |
76 | 72 | pipeline.addLast(handler); |
77 | 73 | ch.closeFuture().addListener(handler); |
78 | 74 | } | ... | ... |
... | ... | @@ -29,6 +29,7 @@ import org.springframework.context.ApplicationContext; |
29 | 29 | import org.springframework.stereotype.Service; |
30 | 30 | import org.thingsboard.server.common.transport.SessionMsgProcessor; |
31 | 31 | import org.thingsboard.server.common.transport.auth.DeviceAuthService; |
32 | +import org.thingsboard.server.common.transport.quota.QuotaService; | |
32 | 33 | import org.thingsboard.server.dao.device.DeviceService; |
33 | 34 | import org.thingsboard.server.dao.relation.RelationService; |
34 | 35 | import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; |
... | ... | @@ -65,6 +66,9 @@ public class MqttTransportService { |
65 | 66 | @Autowired(required = false) |
66 | 67 | private MqttSslHandlerProvider sslHandlerProvider; |
67 | 68 | |
69 | + @Autowired(required = false) | |
70 | + private QuotaService quotaService; | |
71 | + | |
68 | 72 | @Value("${mqtt.bind_address}") |
69 | 73 | private String host; |
70 | 74 | @Value("${mqtt.bind_port}") |
... | ... | @@ -101,7 +105,8 @@ public class MqttTransportService { |
101 | 105 | ServerBootstrap b = new ServerBootstrap(); |
102 | 106 | b.group(bossGroup, workerGroup) |
103 | 107 | .channel(NioServerSocketChannel.class) |
104 | - .childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, relationService, adaptor, sslHandlerProvider)); | |
108 | + .childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, relationService, | |
109 | + adaptor, sslHandlerProvider, quotaService)); | |
105 | 110 | |
106 | 111 | serverChannel = b.bind(host, port).sync().channel(); |
107 | 112 | log.info("Mqtt transport started!"); | ... | ... |