Commit e9bf5bae29318d6b0a63dd104d365a522b24508d

Authored by Andrii Shvaika
1 parent f181beec

Rate limit improvements

... ... @@ -50,4 +50,8 @@ public class TbRateLimits {
50 50 return bucket.tryConsume(1);
51 51 }
52 52
  53 + public boolean tryConsume(long number) {
  54 + return bucket.tryConsume(number);
  55 + }
  56 +
53 57 }
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.util;
  17 +
  18 +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  19 +
  20 +import java.lang.annotation.Retention;
  21 +import java.lang.annotation.RetentionPolicy;
  22 +
  23 +@Retention(RetentionPolicy.RUNTIME)
  24 +@ConditionalOnExpression("('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='tb-transport'")
  25 +public @interface TbTransportComponent {
  26 +}
... ...
... ... @@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.DeviceTransportType;
20 20 import org.thingsboard.server.common.data.id.DeviceProfileId;
21 21 import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
22 22 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
  23 +import org.thingsboard.server.common.transport.limits.TransportRateLimitType;
23 24 import org.thingsboard.server.gen.transport.TransportProtos;
24 25 import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
25 26 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
... ... @@ -69,6 +70,8 @@ public interface TransportService {
69 70
70 71 boolean checkLimits(SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback);
71 72
  73 + boolean checkLimits(SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback, int dataPoints, TransportRateLimitType... limits);
  74 +
72 75 void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback);
73 76
74 77 void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback);
... ...
... ... @@ -16,7 +16,6 @@
16 16 package org.thingsboard.server.common.transport.limits;
17 17
18 18 import lombok.extern.slf4j.Slf4j;
19   -import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
20 19 import org.springframework.stereotype.Service;
21 20 import org.thingsboard.server.common.data.TenantProfile;
22 21 import org.thingsboard.server.common.data.TenantProfileData;
... ... @@ -24,12 +23,13 @@ import org.thingsboard.server.common.data.id.DeviceId;
24 23 import org.thingsboard.server.common.data.id.TenantId;
25 24 import org.thingsboard.server.common.transport.TransportTenantProfileCache;
26 25 import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult;
  26 +import org.thingsboard.server.queue.util.TbTransportComponent;
27 27
28 28 import java.util.concurrent.ConcurrentHashMap;
29 29 import java.util.concurrent.ConcurrentMap;
30 30
31 31 @Service
32   -@ConditionalOnExpression("('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='tb-transport'")
  32 +@TbTransportComponent
33 33 @Slf4j
34 34 public class DefaultTransportRateLimitService implements TransportRateLimitService {
35 35
... ... @@ -45,23 +45,21 @@ public class DefaultTransportRateLimitService implements TransportRateLimitServi
45 45 }
46 46
47 47 @Override
48   - public TransportRateLimit getRateLimit(TenantId tenantId, TransportRateLimitType limitType) {
49   - TransportRateLimit[] limits = perTenantLimits.get(tenantId);
50   - if (limits == null) {
51   - limits = fetchProfileAndInit(tenantId);
52   - perTenantLimits.put(tenantId, limits);
53   - }
54   - return limits[limitType.ordinal()];
55   - }
56   -
57   - @Override
58   - public TransportRateLimit getRateLimit(TenantId tenantId, DeviceId deviceId, TransportRateLimitType limitType) {
59   - TransportRateLimit[] limits = perDeviceLimits.get(deviceId);
60   - if (limits == null) {
61   - limits = fetchProfileAndInit(tenantId);
62   - perDeviceLimits.put(deviceId, limits);
  48 + public TransportRateLimitType checkLimits(TenantId tenantId, DeviceId deviceId, int dataPoints, TransportRateLimitType... limits) {
  49 + TransportRateLimit[] tenantLimits = getTenantRateLimits(tenantId);
  50 + TransportRateLimit[] deviceLimits = getDeviceRateLimits(tenantId, deviceId);
  51 + for (TransportRateLimitType limitType : limits) {
  52 + TransportRateLimit rateLimit;
  53 + if (limitType.isTenantLevel()) {
  54 + rateLimit = tenantLimits[limitType.ordinal()];
  55 + } else {
  56 + rateLimit = deviceLimits[limitType.ordinal()];
  57 + }
  58 + if (!rateLimit.tryConsume(limitType.isMessageLevel() ? 1L : dataPoints)) {
  59 + return limitType;
  60 + }
63 61 }
64   - return limits[limitType.ordinal()];
  62 + return null;
65 63 }
66 64
67 65 @Override
... ... @@ -77,7 +75,17 @@ public class DefaultTransportRateLimitService implements TransportRateLimitServi
77 75 mergeLimits(tenantId, fetchProfileAndInit(tenantId));
78 76 }
79 77
80   - public void mergeLimits(TenantId tenantId, TransportRateLimit[] newRateLimits) {
  78 + @Override
  79 + public void remove(TenantId tenantId) {
  80 + perTenantLimits.remove(tenantId);
  81 + }
  82 +
  83 + @Override
  84 + public void remove(DeviceId deviceId) {
  85 + perDeviceLimits.remove(deviceId);
  86 + }
  87 +
  88 + private void mergeLimits(TenantId tenantId, TransportRateLimit[] newRateLimits) {
81 89 TransportRateLimit[] oldRateLimits = perTenantLimits.get(tenantId);
82 90 if (oldRateLimits == null) {
83 91 perTenantLimits.put(tenantId, newRateLimits);
... ... @@ -92,16 +100,6 @@ public class DefaultTransportRateLimitService implements TransportRateLimitServi
92 100 }
93 101 }
94 102
95   - @Override
96   - public void remove(TenantId tenantId) {
97   - perTenantLimits.remove(tenantId);
98   - }
99   -
100   - @Override
101   - public void remove(DeviceId deviceId) {
102   - perDeviceLimits.remove(deviceId);
103   - }
104   -
105 103 private TransportRateLimit[] fetchProfileAndInit(TenantId tenantId) {
106 104 return perTenantLimits.computeIfAbsent(tenantId, tmp -> createTransportRateLimits(tenantProfileCache.get(tenantId)));
107 105 }
... ... @@ -114,4 +112,22 @@ public class DefaultTransportRateLimitService implements TransportRateLimitServi
114 112 }
115 113 return rateLimits;
116 114 }
  115 +
  116 + private TransportRateLimit[] getTenantRateLimits(TenantId tenantId) {
  117 + TransportRateLimit[] limits = perTenantLimits.get(tenantId);
  118 + if (limits == null) {
  119 + limits = fetchProfileAndInit(tenantId);
  120 + perTenantLimits.put(tenantId, limits);
  121 + }
  122 + return limits;
  123 + }
  124 +
  125 + private TransportRateLimit[] getDeviceRateLimits(TenantId tenantId, DeviceId deviceId) {
  126 + TransportRateLimit[] limits = perDeviceLimits.get(deviceId);
  127 + if (limits == null) {
  128 + limits = fetchProfileAndInit(tenantId);
  129 + perDeviceLimits.put(deviceId, limits);
  130 + }
  131 + return limits;
  132 + }
117 133 }
... ...
... ... @@ -23,6 +23,11 @@ public class DummyTransportRateLimit implements TransportRateLimit {
23 23 }
24 24
25 25 @Override
  26 + public boolean tryConsume(long number) {
  27 + return true;
  28 + }
  29 +
  30 + @Override
26 31 public boolean tryConsume() {
27 32 return true;
28 33 }
... ...
... ... @@ -31,4 +31,8 @@ public class SimpleTransportRateLimit implements TransportRateLimit {
31 31 return rateLimit.tryConsume();
32 32 }
33 33
  34 + @Override
  35 + public boolean tryConsume(long number) {
  36 + return number <= 0 || rateLimit.tryConsume(number);
  37 + }
34 38 }
... ...
... ... @@ -21,4 +21,6 @@ public interface TransportRateLimit {
21 21
22 22 boolean tryConsume();
23 23
  24 + boolean tryConsume(long number);
  25 +
24 26 }
... ...
... ... @@ -21,9 +21,7 @@ import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult
21 21
22 22 public interface TransportRateLimitService {
23 23
24   - TransportRateLimit getRateLimit(TenantId tenantId, TransportRateLimitType limit);
25   -
26   - TransportRateLimit getRateLimit(TenantId tenantId, DeviceId deviceId, TransportRateLimitType limit);
  24 + TransportRateLimitType checkLimits(TenantId tenantId, DeviceId deviceId, int dataPoints, TransportRateLimitType... limits);
27 25
28 26 void update(TenantProfileUpdateResult update);
29 27
... ...
... ... @@ -19,15 +19,29 @@ import lombok.Getter;
19 19
20 20 public enum TransportRateLimitType {
21 21
22   - TENANT_MAX_MSGS("transport.tenant.max.msg"),
23   - TENANT_MAX_DATA_POINTS("transport.tenant.max.dataPoints"),
24   - DEVICE_MAX_MSGS("transport.device.max.msg"),
25   - DEVICE_MAX_DATA_POINTS("transport.device.max.dataPoints");
  22 + TENANT_MAX_MSGS("transport.tenant.msg", true, true),
  23 + TENANT_TELEMETRY_MSGS("transport.tenant.telemetry", true, true),
  24 + TENANT_MAX_DATA_POINTS("transport.tenant.dataPoints", true, false),
  25 + DEVICE_MAX_MSGS("transport.device.msg", false, true),
  26 + DEVICE_TELEMETRY_MSGS("transport.device.telemetry", false, true),
  27 + DEVICE_MAX_DATA_POINTS("transport.device.dataPoints", false, false);
26 28
27 29 @Getter
28 30 private final String configurationKey;
  31 + @Getter
  32 + private final boolean tenantLevel;
  33 + @Getter
  34 + private final boolean deviceLevel;
  35 + @Getter
  36 + private final boolean messageLevel;
  37 + @Getter
  38 + private final boolean dataPointLevel;
29 39
30   - TransportRateLimitType(String configurationKey) {
  40 + TransportRateLimitType(String configurationKey, boolean tenantLevel, boolean messageLevel) {
31 41 this.configurationKey = configurationKey;
  42 + this.tenantLevel = tenantLevel;
  43 + this.deviceLevel = !tenantLevel;
  44 + this.messageLevel = messageLevel;
  45 + this.dataPointLevel = !messageLevel;
32 46 }
33 47 }
... ...
... ... @@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.DeviceProfile;
23 23 import org.thingsboard.server.common.data.id.DeviceProfileId;
24 24 import org.thingsboard.server.common.transport.TransportDeviceProfileCache;
25 25 import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
  26 +import org.thingsboard.server.queue.util.TbTransportComponent;
26 27
27 28 import java.util.Optional;
28 29 import java.util.concurrent.ConcurrentHashMap;
... ... @@ -30,7 +31,7 @@ import java.util.concurrent.ConcurrentMap;
30 31
31 32 @Slf4j
32 33 @Component
33   -@ConditionalOnExpression("('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='tb-transport'")
  34 +@TbTransportComponent
34 35 public class DefaultTransportDeviceProfileCache implements TransportDeviceProfileCache {
35 36
36 37 private final ConcurrentMap<DeviceProfileId, DeviceProfile> deviceProfiles = new ConcurrentHashMap<>();
... ...
... ... @@ -79,6 +79,7 @@ import org.thingsboard.server.queue.discovery.PartitionService;
79 79 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
80 80 import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
81 81 import org.thingsboard.server.queue.provider.TbTransportQueueFactory;
  82 +import org.thingsboard.server.queue.util.TbTransportComponent;
82 83
83 84 import javax.annotation.PostConstruct;
84 85 import javax.annotation.PreDestroy;
... ... @@ -103,7 +104,7 @@ import java.util.concurrent.atomic.AtomicInteger;
103 104 */
104 105 @Slf4j
105 106 @Service
106   -@ConditionalOnExpression("('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='tb-transport'")
  107 +@TbTransportComponent
107 108 public class DefaultTransportService implements TransportService {
108 109
109 110 @Value("${transport.sessions.inactivity_timeout}")
... ... @@ -363,7 +364,11 @@ public class DefaultTransportService implements TransportService {
363 364
364 365 @Override
365 366 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
366   - if (checkLimits(sessionInfo, msg, callback)) {
  367 + int dataPoints = 0;
  368 + for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) {
  369 + dataPoints += tsKv.getKvCount();
  370 + }
  371 + if (checkLimits(sessionInfo, msg, callback, dataPoints, TELEMETRY)) {
367 372 reportActivityInternal(sessionInfo);
368 373 TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
369 374 DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
... ... @@ -384,7 +389,7 @@ public class DefaultTransportService implements TransportService {
384 389
385 390 @Override
386 391 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
387   - if (checkLimits(sessionInfo, msg, callback)) {
  392 + if (checkLimits(sessionInfo, msg, callback, msg.getKvCount(), TELEMETRY)) {
388 393 reportActivityInternal(sessionInfo);
389 394 TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
390 395 DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
... ... @@ -574,37 +579,34 @@ public class DefaultTransportService implements TransportService {
574 579 sessions.remove(toSessionId(sessionInfo));
575 580 }
576 581
  582 + private TransportRateLimitType[] DEFAULT = new TransportRateLimitType[]{TransportRateLimitType.TENANT_MAX_MSGS, TransportRateLimitType.DEVICE_MAX_MSGS};
  583 + private TransportRateLimitType[] TELEMETRY = TransportRateLimitType.values();
  584 +
577 585 @Override
578 586 public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback) {
  587 + return checkLimits(sessionInfo, msg, callback, 0, DEFAULT);
  588 + }
  589 +
  590 + @Override
  591 + public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback, int dataPoints, TransportRateLimitType... limits) {
579 592 if (log.isTraceEnabled()) {
580 593 log.trace("[{}] Processing msg: {}", toSessionId(sessionInfo), msg);
581 594 }
582 595 TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
583   -
584   - TransportRateLimit tenantRateLimit = rateLimitService.getRateLimit(tenantId, TransportRateLimitType.TENANT_MAX_MSGS);
585   -
586   - if (!tenantRateLimit.tryConsume()) {
587   - if (callback != null) {
588   - callback.onError(new TbRateLimitsException(EntityType.TENANT));
589   - }
590   - if (log.isTraceEnabled()) {
591   - log.trace("[{}][{}] Tenant level rate limit detected: {}", toSessionId(sessionInfo), tenantId, msg);
592   - }
593   - return false;
594   - }
595 596 DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
596   - TransportRateLimit deviceRateLimit = rateLimitService.getRateLimit(tenantId, deviceId, TransportRateLimitType.DEVICE_MAX_MSGS);
597   - if (!deviceRateLimit.tryConsume()) {
  597 +
  598 + TransportRateLimitType limit = rateLimitService.checkLimits(tenantId, deviceId, 0, limits);
  599 + if (limit == null) {
  600 + return true;
  601 + } else {
598 602 if (callback != null) {
599   - callback.onError(new TbRateLimitsException(EntityType.DEVICE));
  603 + callback.onError(new TbRateLimitsException(limit.isTenantLevel() ? EntityType.TENANT : EntityType.DEVICE));
600 604 }
601 605 if (log.isTraceEnabled()) {
602   - log.trace("[{}][{}] Device level rate limit detected: {}", toSessionId(sessionInfo), deviceId, msg);
  606 + log.trace("[{}][{}] {} rateLimit detected: {}", toSessionId(sessionInfo), tenantId, limit, msg);
603 607 }
604 608 return false;
605 609 }
606   -
607   - return true;
608 610 }
609 611
610 612 protected void processToTransportMsg(TransportProtos.ToTransportMsg toSessionMsg) {
... ...
... ... @@ -33,6 +33,7 @@ import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
33 33 import org.thingsboard.server.gen.transport.TransportProtos;
34 34 import org.thingsboard.server.queue.discovery.TenantRoutingInfo;
35 35 import org.thingsboard.server.queue.discovery.TenantRoutingInfoService;
  36 +import org.thingsboard.server.queue.util.TbTransportComponent;
36 37
37 38 import java.util.Collections;
38 39 import java.util.Optional;
... ... @@ -43,7 +44,7 @@ import java.util.concurrent.locks.Lock;
43 44 import java.util.concurrent.locks.ReentrantLock;
44 45
45 46 @Component
46   -@ConditionalOnExpression("('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='tb-transport'")
  47 +@TbTransportComponent
47 48 @Slf4j
48 49 public class DefaultTransportTenantProfileCache implements TransportTenantProfileCache {
49 50
... ...