Commit b38b47f4093b547502c3e47a72f7f88526e45032

Authored by Andrii Shvaika
1 parent 0b83ae8b

JS Invoke Service stats

Showing 31 changed files with 209 additions and 534 deletions
... ... @@ -312,7 +312,7 @@ class DefaultTbContext implements TbContext {
312 312
313 313 @Override
314 314 public ScriptEngine createJsScriptEngine(String script, String... argNames) {
315   - return new RuleNodeJsScriptEngine(mainCtx.getJsSandbox(), nodeCtx.getSelf().getId(), script, argNames);
  315 + return new RuleNodeJsScriptEngine(getTenantId(), mainCtx.getJsSandbox(), nodeCtx.getSelf().getId(), script, argNames);
316 316 }
317 317
318 318 @Override
... ...
... ... @@ -347,7 +347,7 @@ public class RuleChainController extends BaseController {
347 347 String errorText = "";
348 348 ScriptEngine engine = null;
349 349 try {
350   - engine = new RuleNodeJsScriptEngine(jsInvokeService, getCurrentUser().getId(), script, argNames);
  350 + engine = new RuleNodeJsScriptEngine(getTenantId(), jsInvokeService, getCurrentUser().getId(), script, argNames);
351 351 TbMsg inMsg = TbMsg.newMsg(msgType, null, new TbMsgMetaData(metadata), TbMsgDataType.JSON, data);
352 352 switch (scriptType) {
353 353 case "update":
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 16 package org.thingsboard.server.service.apiusage;
2 17
3 18 public enum ApiFeature {
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -21,6 +21,7 @@ import org.springframework.data.util.Pair;
21 21 import org.thingsboard.server.common.data.ApiUsageRecordKey;
22 22 import org.thingsboard.server.common.data.ApiUsageState;
23 23 import org.thingsboard.server.common.data.TenantProfile;
  24 +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
24 25 import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
25 26 import org.thingsboard.server.common.data.id.EntityId;
26 27 import org.thingsboard.server.common.data.id.TenantId;
... ... @@ -100,15 +101,18 @@ public class TenantApiUsageState {
100 101 }
101 102
102 103 public long getProfileThreshold(ApiUsageRecordKey key) {
103   - Object threshold = tenantProfileData.getProperties().get(key.name());
104   - if (threshold != null) {
105   - if (threshold instanceof String) {
106   - return Long.parseLong((String) threshold);
107   - } else if (threshold instanceof Long) {
108   - return (Long) threshold;
109   - } else if (threshold instanceof Integer) {
110   - return (Integer) threshold;
111   - }
  104 + DefaultTenantProfileConfiguration config = (DefaultTenantProfileConfiguration) tenantProfileData.getConfiguration();
  105 + switch (key) {
  106 + case TRANSPORT_MSG_COUNT:
  107 + return config.getMaxTransportMessages();
  108 + case TRANSPORT_DP_COUNT:
  109 + return config.getMaxTransportDataPoints();
  110 + case JS_EXEC_COUNT:
  111 + return config.getMaxJSExecutions();
  112 + case RE_EXEC_COUNT:
  113 + return config.getMaxREExecutions();
  114 + case STORAGE_DP_COUNT:
  115 + return config.getMaxDPStorageDays();
112 116 }
113 117 return 0L;
114 118 }
... ... @@ -155,12 +159,12 @@ public class TenantApiUsageState {
155 159
156 160 public boolean isFeatureEnabled(ApiUsageRecordKey recordKey) {
157 161 switch (recordKey) {
158   - case MSG_COUNT:
159   - case DP_TRANSPORT_COUNT:
  162 + case TRANSPORT_MSG_COUNT:
  163 + case TRANSPORT_DP_COUNT:
160 164 return isTransportEnabled();
161 165 case RE_EXEC_COUNT:
162 166 return isRuleEngineEnabled();
163   - case DP_STORAGE_COUNT:
  167 + case STORAGE_DP_COUNT:
164 168 return isDbStorageEnabled();
165 169 case JS_EXEC_COUNT:
166 170 return isJsExecEnabled();
... ... @@ -173,8 +177,8 @@ public class TenantApiUsageState {
173 177 ApiFeature feature = null;
174 178 boolean currentValue = isFeatureEnabled(recordKey);
175 179 switch (recordKey) {
176   - case MSG_COUNT:
177   - case DP_TRANSPORT_COUNT:
  180 + case TRANSPORT_MSG_COUNT:
  181 + case TRANSPORT_DP_COUNT:
178 182 feature = ApiFeature.TRANSPORT;
179 183 setTransportEnabled(value);
180 184 break;
... ... @@ -182,7 +186,7 @@ public class TenantApiUsageState {
182 186 feature = ApiFeature.RE;
183 187 setRuleEngineEnabled(value);
184 188 break;
185   - case DP_STORAGE_COUNT:
  189 + case STORAGE_DP_COUNT:
186 190 feature = ApiFeature.DB;
187 191 setDbStorageEnabled(value);
188 192 break;
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.EntityType;
26 26 import org.thingsboard.server.common.data.id.DeviceId;
27 27 import org.thingsboard.server.common.data.id.DeviceProfileId;
28 28 import org.thingsboard.server.common.data.id.TenantProfileId;
  29 +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
29 30 import org.thingsboard.server.common.msg.TbActorMsg;
30 31 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
31 32 import org.thingsboard.server.common.msg.queue.ServiceType;
... ... @@ -153,10 +154,14 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
153 154 if (EntityType.TENANT_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
154 155 TenantProfileId tenantProfileId = new TenantProfileId(componentLifecycleMsg.getEntityId().getId());
155 156 tenantProfileCache.evict(tenantProfileId);
156   - apiUsageStateService.onTenantProfileUpdate(tenantProfileId);
  157 + if (componentLifecycleMsg.getEvent().equals(ComponentLifecycleEvent.UPDATED)) {
  158 + apiUsageStateService.onTenantProfileUpdate(tenantProfileId);
  159 + }
157 160 } else if (EntityType.TENANT.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
158 161 tenantProfileCache.evict(componentLifecycleMsg.getTenantId());
159   - apiUsageStateService.onTenantUpdate(componentLifecycleMsg.getTenantId());
  162 + if (componentLifecycleMsg.getEvent().equals(ComponentLifecycleEvent.UPDATED)) {
  163 + apiUsageStateService.onTenantUpdate(componentLifecycleMsg.getTenantId());
  164 + }
160 165 } else if (EntityType.DEVICE_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
161 166 deviceProfileCache.evict(componentLifecycleMsg.getTenantId(), new DeviceProfileId(componentLifecycleMsg.getEntityId().getId()));
162 167 } else if (EntityType.DEVICE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
... ...
... ... @@ -19,6 +19,9 @@ import com.google.common.util.concurrent.Futures;
19 19 import com.google.common.util.concurrent.ListenableFuture;
20 20 import lombok.extern.slf4j.Slf4j;
21 21 import org.thingsboard.common.util.ThingsBoardThreadFactory;
  22 +import org.thingsboard.server.common.data.ApiUsageRecordKey;
  23 +import org.thingsboard.server.common.data.id.TenantId;
  24 +import org.thingsboard.server.queue.usagestats.TbUsageStatsClient;
22 25
23 26 import java.util.Map;
24 27 import java.util.UUID;
... ... @@ -33,10 +36,15 @@ import java.util.concurrent.atomic.AtomicInteger;
33 36 @Slf4j
34 37 public abstract class AbstractJsInvokeService implements JsInvokeService {
35 38
  39 + private final TbUsageStatsClient apiUsageStatsClient;
36 40 protected ScheduledExecutorService timeoutExecutorService;
37 41 protected Map<UUID, String> scriptIdToNameMap = new ConcurrentHashMap<>();
38 42 protected Map<UUID, BlackListInfo> blackListedFunctions = new ConcurrentHashMap<>();
39 43
  44 + protected AbstractJsInvokeService(TbUsageStatsClient apiUsageStatsClient) {
  45 + this.apiUsageStatsClient = apiUsageStatsClient;
  46 + }
  47 +
40 48 public void init(long maxRequestsTimeout) {
41 49 if (maxRequestsTimeout > 0) {
42 50 timeoutExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("nashorn-js-timeout"));
... ... @@ -50,7 +58,7 @@ public abstract class AbstractJsInvokeService implements JsInvokeService {
50 58 }
51 59
52 60 @Override
53   - public ListenableFuture<UUID> eval(JsScriptType scriptType, String scriptBody, String... argNames) {
  61 + public ListenableFuture<UUID> eval(TenantId tenantId, JsScriptType scriptType, String scriptBody, String... argNames) {
54 62 UUID scriptId = UUID.randomUUID();
55 63 String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_');
56 64 String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames);
... ... @@ -58,12 +66,13 @@ public abstract class AbstractJsInvokeService implements JsInvokeService {
58 66 }
59 67
60 68 @Override
61   - public ListenableFuture<Object> invokeFunction(UUID scriptId, Object... args) {
  69 + public ListenableFuture<Object> invokeFunction(TenantId tenantId, UUID scriptId, Object... args) {
62 70 String functionName = scriptIdToNameMap.get(scriptId);
63 71 if (functionName == null) {
64 72 return Futures.immediateFailedFuture(new RuntimeException("No compiled script found for scriptId: [" + scriptId + "]!"));
65 73 }
66 74 if (!isBlackListed(scriptId)) {
  75 + apiUsageStatsClient.report(tenantId, ApiUsageRecordKey.JS_EXEC_COUNT, 1);
67 76 return doInvokeFunction(scriptId, functionName, args);
68 77 } else {
69 78 return Futures.immediateFailedFuture(
... ...
... ... @@ -27,7 +27,7 @@ import lombok.extern.slf4j.Slf4j;
27 27 import org.springframework.beans.factory.annotation.Autowired;
28 28 import org.springframework.beans.factory.annotation.Value;
29 29 import org.springframework.scheduling.annotation.Scheduled;
30   -import org.thingsboard.common.util.ThingsBoardThreadFactory;
  30 +import org.thingsboard.server.queue.usagestats.TbUsageStatsClient;
31 31
32 32 import javax.annotation.PostConstruct;
33 33 import javax.annotation.PreDestroy;
... ... @@ -38,7 +38,6 @@ import java.util.UUID;
38 38 import java.util.concurrent.ExecutionException;
39 39 import java.util.concurrent.ExecutorService;
40 40 import java.util.concurrent.Executors;
41   -import java.util.concurrent.ScheduledExecutorService;
42 41 import java.util.concurrent.TimeUnit;
43 42 import java.util.concurrent.atomic.AtomicInteger;
44 43
... ... @@ -57,9 +56,8 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer
57 56 private final FutureCallback<UUID> evalCallback = new JsStatCallback<>(jsEvalMsgs, jsTimeoutMsgs, jsFailedMsgs);
58 57 private final FutureCallback<Object> invokeCallback = new JsStatCallback<>(jsInvokeMsgs, jsTimeoutMsgs, jsFailedMsgs);
59 58
60   - @Autowired
61 59 @Getter
62   - private JsExecutorService jsExecutor;
  60 + private final JsExecutorService jsExecutor;
63 61
64 62 @Value("${js.local.max_requests_timeout:0}")
65 63 private long maxRequestsTimeout;
... ... @@ -67,6 +65,11 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer
67 65 @Value("${js.local.stats.enabled:false}")
68 66 private boolean statsEnabled;
69 67
  68 + public AbstractNashornJsInvokeService(TbUsageStatsClient apiUsageStatsClient, JsExecutorService jsExecutor) {
  69 + super(apiUsageStatsClient);
  70 + this.jsExecutor = jsExecutor;
  71 + }
  72 +
70 73 @Scheduled(fixedDelayString = "${js.local.stats.print_interval_ms:10000}")
71 74 public void printStats() {
72 75 if (statsEnabled) {
... ...
... ... @@ -27,7 +27,7 @@ public class JsExecutorService extends AbstractListeningExecutor {
27 27
28 28 @Override
29 29 protected int getThreadPollSize() {
30   - return jsExecutorThreadPoolSize;
  30 + return Math.max(jsExecutorThreadPoolSize, 1);
31 31 }
32 32
33 33 }
... ...
... ... @@ -17,14 +17,15 @@ package org.thingsboard.server.service.script;
17 17
18 18 import com.google.common.util.concurrent.ListenableFuture;
19 19 import org.thingsboard.server.common.data.id.EntityId;
  20 +import org.thingsboard.server.common.data.id.TenantId;
20 21
21 22 import java.util.UUID;
22 23
23 24 public interface JsInvokeService {
24 25
25   - ListenableFuture<UUID> eval(JsScriptType scriptType, String scriptBody, String... argNames);
  26 + ListenableFuture<UUID> eval(TenantId tenantId, JsScriptType scriptType, String scriptBody, String... argNames);
26 27
27   - ListenableFuture<Object> invokeFunction(UUID scriptId, Object... args);
  28 + ListenableFuture<Object> invokeFunction(TenantId tenantId, UUID scriptId, Object... args);
28 29
29 30 ListenableFuture<Void> release(UUID scriptId);
30 31
... ...
... ... @@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j;
19 19 import org.springframework.beans.factory.annotation.Value;
20 20 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
21 21 import org.springframework.stereotype.Service;
  22 +import org.thingsboard.server.queue.usagestats.TbUsageStatsClient;
22 23
23 24 import java.util.concurrent.TimeUnit;
24 25
... ... @@ -42,6 +43,10 @@ public class NashornJsInvokeService extends AbstractNashornJsInvokeService {
42 43 @Value("${js.local.max_black_list_duration_sec:60}")
43 44 private int maxBlackListDurationSec;
44 45
  46 + public NashornJsInvokeService(TbUsageStatsClient apiUsageStatsClient, JsExecutorService jsExecutor) {
  47 + super(apiUsageStatsClient, jsExecutor);
  48 + }
  49 +
45 50 @Override
46 51 protected boolean useJsSandbox() {
47 52 return useJsSandbox;
... ...
... ... @@ -30,6 +30,7 @@ import org.thingsboard.server.gen.js.JsInvokeProtos;
30 30 import org.thingsboard.server.queue.TbQueueRequestTemplate;
31 31 import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
32 32 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  33 +import org.thingsboard.server.queue.usagestats.TbUsageStatsClient;
33 34
34 35 import javax.annotation.Nullable;
35 36 import javax.annotation.PostConstruct;
... ... @@ -68,6 +69,10 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
68 69 private final AtomicInteger queueFailedMsgs = new AtomicInteger(0);
69 70 private final AtomicInteger queueTimeoutMsgs = new AtomicInteger(0);
70 71
  72 + public RemoteJsInvokeService(TbUsageStatsClient apiUsageStatsClient) {
  73 + super(apiUsageStatsClient);
  74 + }
  75 +
71 76 @Scheduled(fixedDelayString = "${js.remote.stats.print_interval_ms}")
72 77 public void printStats() {
73 78 if (statsEnabled) {
... ...
... ... @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.MoreExecutors;
25 25 import lombok.extern.slf4j.Slf4j;
26 26 import org.apache.commons.lang3.StringUtils;
27 27 import org.thingsboard.server.common.data.id.EntityId;
  28 +import org.thingsboard.server.common.data.id.TenantId;
28 29 import org.thingsboard.server.common.msg.TbMsg;
29 30 import org.thingsboard.server.common.msg.TbMsgMetaData;
30 31
... ... @@ -43,13 +44,15 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
43 44 private final JsInvokeService sandboxService;
44 45
45 46 private final UUID scriptId;
  47 + private final TenantId tenantId;
46 48 private final EntityId entityId;
47 49
48   - public RuleNodeJsScriptEngine(JsInvokeService sandboxService, EntityId entityId, String script, String... argNames) {
  50 + public RuleNodeJsScriptEngine(TenantId tenantId, JsInvokeService sandboxService, EntityId entityId, String script, String... argNames) {
  51 + this.tenantId = tenantId;
49 52 this.sandboxService = sandboxService;
50 53 this.entityId = entityId;
51 54 try {
52   - this.scriptId = this.sandboxService.eval(JsScriptType.RULE_NODE_SCRIPT, script, argNames).get();
  55 + this.scriptId = this.sandboxService.eval(tenantId, JsScriptType.RULE_NODE_SCRIPT, script, argNames).get();
53 56 } catch (Exception e) {
54 57 Throwable t = e;
55 58 if (e instanceof ExecutionException) {
... ... @@ -203,7 +206,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
203 206 private JsonNode executeScript(TbMsg msg) throws ScriptException {
204 207 try {
205 208 String[] inArgs = prepareArgs(msg);
206   - String eval = sandboxService.invokeFunction(this.scriptId, inArgs[0], inArgs[1], inArgs[2]).get().toString();
  209 + String eval = sandboxService.invokeFunction(tenantId, this.scriptId, inArgs[0], inArgs[1], inArgs[2]).get().toString();
207 210 return mapper.readTree(eval);
208 211 } catch (ExecutionException e) {
209 212 if (e.getCause() instanceof ScriptException) {
... ... @@ -220,7 +223,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
220 223
221 224 private ListenableFuture<JsonNode> executeScriptAsync(TbMsg msg) {
222 225 String[] inArgs = prepareArgs(msg);
223   - return Futures.transformAsync(sandboxService.invokeFunction(this.scriptId, inArgs[0], inArgs[1], inArgs[2]),
  226 + return Futures.transformAsync(sandboxService.invokeFunction(tenantId, this.scriptId, inArgs[0], inArgs[1], inArgs[2]),
224 227 o -> {
225 228 try {
226 229 return Futures.immediateFuture(mapper.readTree(o.toString()));
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ...
1   -/**
2   - * Copyright © 2016-2020 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.service.script;
17   -
18   -import com.datastax.oss.driver.api.core.uuid.Uuids;
19   -import com.google.common.collect.Sets;
20   -import org.junit.After;
21   -import org.junit.Before;
22   -import org.junit.Test;
23   -import org.thingsboard.rule.engine.api.ScriptEngine;
24   -import org.thingsboard.server.common.data.id.EntityId;
25   -import org.thingsboard.server.common.data.id.RuleNodeId;
26   -import org.thingsboard.server.common.msg.TbMsg;
27   -import org.thingsboard.server.common.msg.TbMsgDataType;
28   -import org.thingsboard.server.common.msg.TbMsgMetaData;
29   -
30   -import javax.script.ScriptException;
31   -import java.util.Map;
32   -import java.util.Set;
33   -import java.util.UUID;
34   -import java.util.concurrent.*;
35   -import java.util.concurrent.atomic.AtomicInteger;
36   -
37   -import static org.junit.Assert.*;
38   -
39   -public class RuleNodeJsScriptEngineTest {
40   -
41   - private ScriptEngine scriptEngine;
42   - private TestNashornJsInvokeService jsSandboxService;
43   -
44   - private EntityId ruleNodeId = new RuleNodeId(Uuids.timeBased());
45   -
46   - @Before
47   - public void beforeTest() throws Exception {
48   - jsSandboxService = new TestNashornJsInvokeService(false, 1, 100, 3);
49   - }
50   -
51   - @After
52   - public void afterTest() throws Exception {
53   - jsSandboxService.stop();
54   - }
55   -
56   - @Test
57   - public void msgCanBeUpdated() throws ScriptException {
58   - String function = "metadata.temp = metadata.temp * 10; return {metadata: metadata};";
59   - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function);
60   -
61   - TbMsgMetaData metaData = new TbMsgMetaData();
62   - metaData.putValue("temp", "7");
63   - metaData.putValue("humidity", "99");
64   - String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
65   -
66   - TbMsg msg = TbMsg.newMsg( "USER", null, metaData, TbMsgDataType.JSON, rawJson);
67   -
68   - TbMsg actual = scriptEngine.executeUpdate(msg);
69   - assertEquals("70", actual.getMetaData().getValue("temp"));
70   - scriptEngine.destroy();
71   - }
72   -
73   - @Test
74   - public void newAttributesCanBeAddedInMsg() throws ScriptException {
75   - String function = "metadata.newAttr = metadata.humidity - msg.passed; return {metadata: metadata};";
76   - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function);
77   - TbMsgMetaData metaData = new TbMsgMetaData();
78   - metaData.putValue("temp", "7");
79   - metaData.putValue("humidity", "99");
80   - String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
81   -
82   - TbMsg msg = TbMsg.newMsg( "USER", null, metaData, TbMsgDataType.JSON, rawJson);
83   -
84   - TbMsg actual = scriptEngine.executeUpdate(msg);
85   - assertEquals("94", actual.getMetaData().getValue("newAttr"));
86   - scriptEngine.destroy();
87   - }
88   -
89   - @Test
90   - public void payloadCanBeUpdated() throws ScriptException {
91   - String function = "msg.passed = msg.passed * metadata.temp; msg.bigObj.newProp = 'Ukraine'; return {msg: msg};";
92   - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function);
93   - TbMsgMetaData metaData = new TbMsgMetaData();
94   - metaData.putValue("temp", "7");
95   - metaData.putValue("humidity", "99");
96   - String rawJson = "{\"name\":\"Vit\",\"passed\": 5,\"bigObj\":{\"prop\":42}}";
97   -
98   - TbMsg msg =TbMsg.newMsg("USER", null, metaData, TbMsgDataType.JSON, rawJson);
99   -
100   - TbMsg actual = scriptEngine.executeUpdate(msg);
101   -
102   - String expectedJson = "{\"name\":\"Vit\",\"passed\":35,\"bigObj\":{\"prop\":42,\"newProp\":\"Ukraine\"}}";
103   - assertEquals(expectedJson, actual.getData());
104   - scriptEngine.destroy();
105   - }
106   -
107   - @Test
108   - public void metadataAccessibleForFilter() throws ScriptException {
109   - String function = "return metadata.humidity < 15;";
110   - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function);
111   - TbMsgMetaData metaData = new TbMsgMetaData();
112   - metaData.putValue("temp", "7");
113   - metaData.putValue("humidity", "99");
114   - String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
115   -
116   - TbMsg msg = TbMsg.newMsg("USER", null, metaData, TbMsgDataType.JSON, rawJson);
117   - assertFalse(scriptEngine.executeFilter(msg));
118   - scriptEngine.destroy();
119   - }
120   -
121   - @Test
122   - public void dataAccessibleForFilter() throws ScriptException {
123   - String function = "return msg.passed < 15 && msg.name === 'Vit' && metadata.temp == 7 && msg.bigObj.prop == 42;";
124   - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function);
125   - TbMsgMetaData metaData = new TbMsgMetaData();
126   - metaData.putValue("temp", "7");
127   - metaData.putValue("humidity", "99");
128   - String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
129   -
130   - TbMsg msg = TbMsg.newMsg( "USER", null, metaData,TbMsgDataType.JSON, rawJson);
131   - assertTrue(scriptEngine.executeFilter(msg));
132   - scriptEngine.destroy();
133   - }
134   -
135   - @Test
136   - public void dataAccessibleForSwitch() throws ScriptException {
137   - String jsCode = "function nextRelation(metadata, msg) {\n" +
138   - " if(msg.passed == 5 && metadata.temp == 10)\n" +
139   - " return 'one'\n" +
140   - " else\n" +
141   - " return 'two';\n" +
142   - "};\n" +
143   - "\n" +
144   - "return nextRelation(metadata, msg);";
145   - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, jsCode);
146   - TbMsgMetaData metaData = new TbMsgMetaData();
147   - metaData.putValue("temp", "10");
148   - metaData.putValue("humidity", "99");
149   - String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
150   -
151   - TbMsg msg = TbMsg.newMsg( "USER", null, metaData, TbMsgDataType.JSON, rawJson);
152   - Set<String> actual = scriptEngine.executeSwitch(msg);
153   - assertEquals(Sets.newHashSet("one"), actual);
154   - scriptEngine.destroy();
155   - }
156   -
157   - @Test
158   - public void multipleRelationsReturnedFromSwitch() throws ScriptException {
159   - String jsCode = "function nextRelation(metadata, msg) {\n" +
160   - " if(msg.passed == 5 && metadata.temp == 10)\n" +
161   - " return ['three', 'one']\n" +
162   - " else\n" +
163   - " return 'two';\n" +
164   - "};\n" +
165   - "\n" +
166   - "return nextRelation(metadata, msg);";
167   - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, jsCode);
168   - TbMsgMetaData metaData = new TbMsgMetaData();
169   - metaData.putValue("temp", "10");
170   - metaData.putValue("humidity", "99");
171   - String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
172   -
173   - TbMsg msg = TbMsg.newMsg( "USER", null, metaData, TbMsgDataType.JSON, rawJson);
174   - Set<String> actual = scriptEngine.executeSwitch(msg);
175   - assertEquals(Sets.newHashSet("one", "three"), actual);
176   - scriptEngine.destroy();
177   - }
178   -
179   - @Test
180   - public void concurrentReleasedCorrectly() throws InterruptedException, ExecutionException {
181   - String code = "metadata.temp = metadata.temp * 10; return {metadata: metadata};";
182   -
183   - int repeat = 1000;
184   - ExecutorService service = Executors.newFixedThreadPool(repeat);
185   - Map<UUID, Object> scriptIds = new ConcurrentHashMap<>();
186   - CountDownLatch startLatch = new CountDownLatch(repeat);
187   - CountDownLatch finishLatch = new CountDownLatch(repeat);
188   - AtomicInteger failedCount = new AtomicInteger(0);
189   -
190   - for (int i = 0; i < repeat; i++) {
191   - service.submit(() -> runScript(startLatch, finishLatch, failedCount, scriptIds, code));
192   - }
193   -
194   - finishLatch.await();
195   - assertTrue(scriptIds.size() == 1);
196   - assertTrue(failedCount.get() == 0);
197   -
198   - CountDownLatch nextStart = new CountDownLatch(repeat);
199   - CountDownLatch nextFinish = new CountDownLatch(repeat);
200   - for (int i = 0; i < repeat; i++) {
201   - service.submit(() -> runScript(nextStart, nextFinish, failedCount, scriptIds, code));
202   - }
203   -
204   - nextFinish.await();
205   - assertTrue(scriptIds.size() == 1);
206   - assertTrue(failedCount.get() == 0);
207   - service.shutdownNow();
208   - }
209   -
210   - @Test
211   - public void concurrentFailedEvaluationShouldThrowException() throws InterruptedException {
212   - String code = "metadata.temp = metadata.temp * 10; urn {metadata: metadata};";
213   -
214   - int repeat = 10000;
215   - ExecutorService service = Executors.newFixedThreadPool(repeat);
216   - Map<UUID, Object> scriptIds = new ConcurrentHashMap<>();
217   - CountDownLatch startLatch = new CountDownLatch(repeat);
218   - CountDownLatch finishLatch = new CountDownLatch(repeat);
219   - AtomicInteger failedCount = new AtomicInteger(0);
220   - for (int i = 0; i < repeat; i++) {
221   - service.submit(() -> {
222   - service.submit(() -> runScript(startLatch, finishLatch, failedCount, scriptIds, code));
223   - });
224   - }
225   -
226   - finishLatch.await();
227   - assertTrue(scriptIds.isEmpty());
228   - assertEquals(repeat, failedCount.get());
229   - service.shutdownNow();
230   - }
231   -
232   - private void runScript(CountDownLatch startLatch, CountDownLatch finishLatch, AtomicInteger failedCount,
233   - Map<UUID, Object> scriptIds, String code) {
234   - try {
235   - for (int k = 0; k < 10; k++) {
236   - startLatch.countDown();
237   - startLatch.await();
238   - UUID scriptId = jsSandboxService.eval(JsScriptType.RULE_NODE_SCRIPT, code).get();
239   - scriptIds.put(scriptId, new Object());
240   - jsSandboxService.invokeFunction(scriptId, "{}", "{}", "TEXT").get();
241   - jsSandboxService.release(scriptId).get();
242   - }
243   - } catch (Throwable th) {
244   - failedCount.incrementAndGet();
245   - } finally {
246   - finishLatch.countDown();
247   - }
248   - }
249   -
250   -}
1   -/**
2   - * Copyright © 2016-2020 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.service.script;
17   -
18   -public class TestNashornJsInvokeService extends AbstractNashornJsInvokeService {
19   -
20   - private boolean useJsSandbox;
21   - private final int monitorThreadPoolSize;
22   - private final long maxCpuTime;
23   - private final int maxErrors;
24   -
25   - public TestNashornJsInvokeService(boolean useJsSandbox, int monitorThreadPoolSize, long maxCpuTime, int maxErrors) {
26   - this.useJsSandbox = useJsSandbox;
27   - this.monitorThreadPoolSize = monitorThreadPoolSize;
28   - this.maxCpuTime = maxCpuTime;
29   - this.maxErrors = maxErrors;
30   - init();
31   - }
32   -
33   - @Override
34   - protected boolean useJsSandbox() {
35   - return useJsSandbox;
36   - }
37   -
38   - @Override
39   - protected int getMonitorThreadPoolSize() {
40   - return monitorThreadPoolSize;
41   - }
42   -
43   - @Override
44   - protected long getMaxCpuTime() {
45   - return maxCpuTime;
46   - }
47   -
48   - @Override
49   - protected int getMaxErrors() {
50   - return maxErrors;
51   - }
52   -
53   - @Override
54   - protected long getMaxBlacklistDuration() {
55   - return 100000;
56   - }
57   -}
... ... @@ -17,9 +17,9 @@ package org.thingsboard.server.common.data;
17 17
18 18 public enum ApiUsageRecordKey {
19 19
20   - MSG_COUNT,
21   - DP_TRANSPORT_COUNT,
22   - DP_STORAGE_COUNT,
  20 + TRANSPORT_MSG_COUNT,
  21 + TRANSPORT_DP_COUNT,
  22 + STORAGE_DP_COUNT,
23 23 RE_EXEC_COUNT,
24 24 JS_EXEC_COUNT
25 25
... ...
... ... @@ -28,17 +28,4 @@ public class TenantProfileData {
28 28
29 29 private TenantProfileConfiguration configuration;
30 30
31   - @JsonIgnore
32   - private Map<String, Object> properties = new HashMap<>();
33   -
34   - @JsonAnyGetter
35   - public Map<String, Object> properties() {
36   - return this.properties;
37   - }
38   -
39   - @JsonAnySetter
40   - public void put(String name, Object value) {
41   - this.properties.put(name, value);
42   - }
43   -
44 31 }
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ...
... ... @@ -19,7 +19,6 @@ import org.thingsboard.server.common.data.DeviceProfile;
19 19 import org.thingsboard.server.common.data.DeviceTransportType;
20 20 import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
21 21 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
22   -import org.thingsboard.server.common.transport.limits.TransportRateLimitType;
23 22 import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
24 23 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
25 24 import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileRequestMsg;
... ... @@ -62,10 +61,6 @@ public interface TransportService {
62 61 void process(ProvisionDeviceRequestMsg msg,
63 62 TransportServiceCallback<ProvisionDeviceResponseMsg> callback);
64 63
65   - boolean checkLimits(SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback);
66   -
67   - boolean checkLimits(SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback, int dataPoints, TransportRateLimitType... limits);
68   -
69 64 void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback);
70 65
71 66 void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback);
... ...
1   -/**
2   - * Copyright © 2016-2020 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.common.transport.limits;
17   -
18   -import lombok.extern.slf4j.Slf4j;
19   -import org.springframework.stereotype.Component;
20   -import org.springframework.util.StringUtils;
21   -import org.thingsboard.server.common.msg.tools.TbRateLimits;
22   -
23   -@Slf4j
24   -@Component
25   -public class DefaultTransportRateLimitFactory implements TransportRateLimitFactory {
26   -
27   - private static final DummyTransportRateLimit ALWAYS_TRUE = new DummyTransportRateLimit();
28   -
29   - @Override
30   - public TransportRateLimit create(TransportRateLimitType type, Object configuration) {
31   - if (!StringUtils.isEmpty(configuration)) {
32   - try {
33   - return new SimpleTransportRateLimit(new TbRateLimits(configuration.toString()), configuration.toString());
34   - } catch (Exception e) {
35   - log.warn("[{}] Failed to init rate limit with configuration: {}", type, configuration, e);
36   - return ALWAYS_TRUE;
37   - }
38   - } else {
39   - return ALWAYS_TRUE;
40   - }
41   - }
42   -
43   - @Override
44   - public TransportRateLimit createDefault(TransportRateLimitType type) {
45   - return ALWAYS_TRUE;
46   - }
47   -}
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -17,76 +17,96 @@ package org.thingsboard.server.common.transport.limits;
17 17
18 18 import lombok.extern.slf4j.Slf4j;
19 19 import org.springframework.stereotype.Service;
  20 +import org.springframework.util.StringUtils;
  21 +import org.thingsboard.server.common.data.EntityType;
20 22 import org.thingsboard.server.common.data.TenantProfile;
21 23 import org.thingsboard.server.common.data.id.DeviceId;
22 24 import org.thingsboard.server.common.data.id.TenantId;
  25 +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
23 26 import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
  27 +import org.thingsboard.server.common.msg.tools.TbRateLimits;
24 28 import org.thingsboard.server.common.transport.TransportTenantProfileCache;
25 29 import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult;
26 30 import org.thingsboard.server.queue.util.TbTransportComponent;
27 31
  32 +import java.util.HashSet;
  33 +import java.util.Set;
28 34 import java.util.concurrent.ConcurrentHashMap;
29 35 import java.util.concurrent.ConcurrentMap;
  36 +import java.util.function.BiConsumer;
  37 +import java.util.function.Function;
30 38
31 39 @Service
32 40 @TbTransportComponent
33 41 @Slf4j
34 42 public class DefaultTransportRateLimitService implements TransportRateLimitService {
35 43
  44 + private final static DummyTransportRateLimit ALLOW = new DummyTransportRateLimit();
36 45 private final ConcurrentMap<TenantId, Boolean> tenantAllowed = new ConcurrentHashMap<>();
37   - private final ConcurrentMap<TenantId, TransportRateLimit[]> perTenantLimits = new ConcurrentHashMap<>();
38   - private final ConcurrentMap<DeviceId, TransportRateLimit[]> perDeviceLimits = new ConcurrentHashMap<>();
  46 + private final ConcurrentMap<TenantId, Set<DeviceId>> tenantDevices = new ConcurrentHashMap<>();
  47 + private final ConcurrentMap<TenantId, EntityTransportRateLimits> perTenantLimits = new ConcurrentHashMap<>();
  48 + private final ConcurrentMap<DeviceId, EntityTransportRateLimits> perDeviceLimits = new ConcurrentHashMap<>();
39 49
40   - private final TransportRateLimitFactory rateLimitFactory;
41 50 private final TransportTenantProfileCache tenantProfileCache;
42 51
43   - public DefaultTransportRateLimitService(TransportRateLimitFactory rateLimitFactory, TransportTenantProfileCache tenantProfileCache) {
44   - this.rateLimitFactory = rateLimitFactory;
  52 + public DefaultTransportRateLimitService(TransportTenantProfileCache tenantProfileCache) {
45 53 this.tenantProfileCache = tenantProfileCache;
46 54 }
47 55
48 56 @Override
49   - public TransportRateLimitType checkLimits(TenantId tenantId, DeviceId deviceId, int dataPoints, TransportRateLimitType... limits) {
  57 + public EntityType checkLimits(TenantId tenantId, DeviceId deviceId, int dataPoints) {
50 58 if (!tenantAllowed.getOrDefault(tenantId, Boolean.TRUE)) {
51   - return TransportRateLimitType.TENANT_ADDED_TO_DISABLED_LIST;
  59 + return EntityType.TENANT;
52 60 }
53   - TransportRateLimit[] tenantLimits = getTenantRateLimits(tenantId);
54   - TransportRateLimit[] deviceLimits = getDeviceRateLimits(tenantId, deviceId);
55   - for (TransportRateLimitType limitType : limits) {
56   - TransportRateLimit rateLimit;
57   - if (limitType.isTenantLevel()) {
58   - rateLimit = tenantLimits[limitType.ordinal()];
59   - } else {
60   - rateLimit = deviceLimits[limitType.ordinal()];
61   - }
62   - if (!rateLimit.tryConsume(limitType.isMessageLevel() ? 1L : dataPoints)) {
63   - return limitType;
64   - }
  61 + if (!checkEntityRateLimit(dataPoints, getTenantRateLimits(tenantId))) {
  62 + return EntityType.TENANT;
  63 + }
  64 + if (!checkEntityRateLimit(dataPoints, getDeviceRateLimits(tenantId, deviceId))) {
  65 + return EntityType.DEVICE;
65 66 }
66 67 return null;
67 68 }
68 69
  70 + private boolean checkEntityRateLimit(int dataPoints, EntityTransportRateLimits tenantLimits) {
  71 + if (dataPoints > 0) {
  72 + return tenantLimits.getTelemetryMsgRateLimit().tryConsume() && tenantLimits.getTelemetryDataPointsRateLimit().tryConsume(dataPoints);
  73 + } else {
  74 + return tenantLimits.getRegularMsgRateLimit().tryConsume();
  75 + }
  76 + }
  77 +
69 78 @Override
70 79 public void update(TenantProfileUpdateResult update) {
71   - TransportRateLimit[] newLimits = createTransportRateLimits(update.getProfile());
  80 + EntityTransportRateLimits tenantRateLimitPrototype = createRateLimits(update.getProfile(), true);
  81 + EntityTransportRateLimits deviceRateLimitPrototype = createRateLimits(update.getProfile(), false);
72 82 for (TenantId tenantId : update.getAffectedTenants()) {
73   - mergeLimits(tenantId, newLimits);
  83 + mergeLimits(tenantId, tenantRateLimitPrototype, perTenantLimits::get, perTenantLimits::put);
  84 + tenantDevices.get(tenantId).forEach(deviceId -> {
  85 + mergeLimits(deviceId, deviceRateLimitPrototype, perDeviceLimits::get, perDeviceLimits::put);
  86 + });
74 87 }
75 88 }
76 89
77 90 @Override
78 91 public void update(TenantId tenantId) {
79   - mergeLimits(tenantId, fetchProfileAndInit(tenantId));
  92 + EntityTransportRateLimits tenantRateLimitPrototype = createRateLimits(tenantProfileCache.get(tenantId), true);
  93 + EntityTransportRateLimits deviceRateLimitPrototype = createRateLimits(tenantProfileCache.get(tenantId), false);
  94 + mergeLimits(tenantId, tenantRateLimitPrototype, perTenantLimits::get, perTenantLimits::put);
  95 + tenantDevices.get(tenantId).forEach(deviceId -> {
  96 + mergeLimits(deviceId, deviceRateLimitPrototype, perDeviceLimits::get, perDeviceLimits::put);
  97 + });
80 98 }
81 99
82 100 @Override
83 101 public void remove(TenantId tenantId) {
84 102 perTenantLimits.remove(tenantId);
  103 + tenantDevices.remove(tenantId);
85 104 }
86 105
87 106 @Override
88 107 public void remove(DeviceId deviceId) {
89 108 perDeviceLimits.remove(deviceId);
  109 + tenantDevices.values().forEach(set -> set.remove(deviceId));
90 110 }
91 111
92 112 @Override
... ... @@ -94,48 +114,66 @@ public class DefaultTransportRateLimitService implements TransportRateLimitServi
94 114 tenantAllowed.put(tenantId, allowed);
95 115 }
96 116
97   - private void mergeLimits(TenantId tenantId, TransportRateLimit[] newRateLimits) {
98   - TransportRateLimit[] oldRateLimits = perTenantLimits.get(tenantId);
  117 + private <T> void mergeLimits(T deviceId, EntityTransportRateLimits newRateLimits,
  118 + Function<T, EntityTransportRateLimits> getFunction,
  119 + BiConsumer<T, EntityTransportRateLimits> putFunction) {
  120 + EntityTransportRateLimits oldRateLimits = getFunction.apply(deviceId);
99 121 if (oldRateLimits == null) {
100   - perTenantLimits.put(tenantId, newRateLimits);
  122 + putFunction.accept(deviceId, newRateLimits);
101 123 } else {
102   - for (int i = 0; i < TransportRateLimitType.values().length; i++) {
103   - TransportRateLimit newLimit = newRateLimits[i];
104   - TransportRateLimit oldLimit = oldRateLimits[i];
105   - if (newLimit != null && (oldLimit == null || !oldLimit.getConfiguration().equals(newLimit.getConfiguration()))) {
106   - oldRateLimits[i] = newLimit;
107   - }
  124 + EntityTransportRateLimits updated = merge(oldRateLimits, newRateLimits);
  125 + if (updated != null) {
  126 + putFunction.accept(deviceId, updated);
108 127 }
109 128 }
110 129 }
111 130
112   - private TransportRateLimit[] fetchProfileAndInit(TenantId tenantId) {
113   - return perTenantLimits.computeIfAbsent(tenantId, tmp -> createTransportRateLimits(tenantProfileCache.get(tenantId)));
  131 + private EntityTransportRateLimits merge(EntityTransportRateLimits oldRateLimits, EntityTransportRateLimits newRateLimits) {
  132 + boolean regularUpdate = !oldRateLimits.getRegularMsgRateLimit().getConfiguration().equals(newRateLimits.getRegularMsgRateLimit().getConfiguration());
  133 + boolean telemetryMsgRateUpdate = !oldRateLimits.getTelemetryMsgRateLimit().getConfiguration().equals(newRateLimits.getTelemetryMsgRateLimit().getConfiguration());
  134 + boolean telemetryDataPointUpdate = !oldRateLimits.getTelemetryDataPointsRateLimit().getConfiguration().equals(newRateLimits.getTelemetryDataPointsRateLimit().getConfiguration());
  135 + if (regularUpdate || telemetryMsgRateUpdate || telemetryDataPointUpdate) {
  136 + return new EntityTransportRateLimits(
  137 + regularUpdate ? newLimit(newRateLimits.getRegularMsgRateLimit().getConfiguration()) : oldRateLimits.getRegularMsgRateLimit(),
  138 + telemetryMsgRateUpdate ? newLimit(newRateLimits.getTelemetryMsgRateLimit().getConfiguration()) : oldRateLimits.getTelemetryMsgRateLimit(),
  139 + telemetryDataPointUpdate ? newLimit(newRateLimits.getTelemetryDataPointsRateLimit().getConfiguration()) : oldRateLimits.getTelemetryDataPointsRateLimit());
  140 + } else {
  141 + return null;
  142 + }
114 143 }
115 144
116   - private TransportRateLimit[] createTransportRateLimits(TenantProfile tenantProfile) {
  145 + private EntityTransportRateLimits createRateLimits(TenantProfile tenantProfile, boolean tenant) {
117 146 TenantProfileData profileData = tenantProfile.getProfileData();
118   - TransportRateLimit[] rateLimits = new TransportRateLimit[TransportRateLimitType.values().length];
119   - for (TransportRateLimitType type : TransportRateLimitType.values()) {
120   - rateLimits[type.ordinal()] = rateLimitFactory.create(type, profileData.getProperties().get(type.getConfigurationKey()));
  147 + DefaultTenantProfileConfiguration profile = (DefaultTenantProfileConfiguration) profileData.getConfiguration();
  148 + if (profile == null) {
  149 + return new EntityTransportRateLimits(ALLOW, ALLOW, ALLOW);
  150 + } else {
  151 + TransportRateLimit regularMsgRateLimit = newLimit(tenant ? profile.getTransportTenantMsgRateLimit() : profile.getTransportDeviceMsgRateLimit());
  152 + TransportRateLimit telemetryMsgRateLimit = newLimit(tenant ? profile.getTransportTenantTelemetryMsgRateLimit() : profile.getTransportDeviceTelemetryMsgRateLimit());
  153 + TransportRateLimit telemetryDpRateLimit = newLimit(tenant ? profile.getTransportTenantTelemetryDataPointsRateLimit() : profile.getTransportTenantTelemetryDataPointsRateLimit());
  154 + return new EntityTransportRateLimits(regularMsgRateLimit, telemetryMsgRateLimit, telemetryDpRateLimit);
121 155 }
122   - return rateLimits;
123 156 }
124 157
125   - private TransportRateLimit[] getTenantRateLimits(TenantId tenantId) {
126   - TransportRateLimit[] limits = perTenantLimits.get(tenantId);
  158 + private static TransportRateLimit newLimit(String config) {
  159 + return StringUtils.isEmpty(config) ? ALLOW : new SimpleTransportRateLimit(config);
  160 + }
  161 +
  162 + private EntityTransportRateLimits getTenantRateLimits(TenantId tenantId) {
  163 + EntityTransportRateLimits limits = perTenantLimits.get(tenantId);
127 164 if (limits == null) {
128   - limits = fetchProfileAndInit(tenantId);
  165 + limits = createRateLimits(tenantProfileCache.get(tenantId), true);
129 166 perTenantLimits.put(tenantId, limits);
130 167 }
131 168 return limits;
132 169 }
133 170
134   - private TransportRateLimit[] getDeviceRateLimits(TenantId tenantId, DeviceId deviceId) {
135   - TransportRateLimit[] limits = perDeviceLimits.get(deviceId);
  171 + private EntityTransportRateLimits getDeviceRateLimits(TenantId tenantId, DeviceId deviceId) {
  172 + EntityTransportRateLimits limits = perDeviceLimits.get(deviceId);
136 173 if (limits == null) {
137   - limits = fetchProfileAndInit(tenantId);
  174 + limits = createRateLimits(tenantProfileCache.get(tenantId), false);
138 175 perDeviceLimits.put(deviceId, limits);
  176 + tenantDevices.computeIfAbsent(tenantId, id -> ConcurrentHashMap.newKeySet()).add(deviceId);
139 177 }
140 178 return limits;
141 179 }
... ...
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/EntityTransportRateLimits.java renamed from common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimitFactory.java
... ... @@ -15,10 +15,15 @@
15 15 */
16 16 package org.thingsboard.server.common.transport.limits;
17 17
18   -public interface TransportRateLimitFactory {
  18 +import lombok.AllArgsConstructor;
  19 +import lombok.Data;
19 20
20   - TransportRateLimit create(TransportRateLimitType type, Object config);
  21 +@Data
  22 +@AllArgsConstructor
  23 +public class EntityTransportRateLimits {
21 24
22   - TransportRateLimit createDefault(TransportRateLimitType type);
  25 + private TransportRateLimit regularMsgRateLimit;
  26 + private TransportRateLimit telemetryMsgRateLimit;
  27 + private TransportRateLimit telemetryDataPointsRateLimit;
23 28
24 29 }
... ...
... ... @@ -26,6 +26,11 @@ public class SimpleTransportRateLimit implements TransportRateLimit {
26 26 @Getter
27 27 private final String configuration;
28 28
  29 + public SimpleTransportRateLimit(String configuration) {
  30 + this.configuration = configuration;
  31 + this.rateLimit = new TbRateLimits(configuration);
  32 + }
  33 +
29 34 @Override
30 35 public boolean tryConsume() {
31 36 return rateLimit.tryConsume();
... ...
... ... @@ -15,13 +15,14 @@
15 15 */
16 16 package org.thingsboard.server.common.transport.limits;
17 17
  18 +import org.thingsboard.server.common.data.EntityType;
18 19 import org.thingsboard.server.common.data.id.DeviceId;
19 20 import org.thingsboard.server.common.data.id.TenantId;
20 21 import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult;
21 22
22 23 public interface TransportRateLimitService {
23 24
24   - TransportRateLimitType checkLimits(TenantId tenantId, DeviceId deviceId, int dataPoints, TransportRateLimitType... limits);
  25 + EntityType checkLimits(TenantId tenantId, DeviceId deviceId, int dataPoints);
25 26
26 27 void update(TenantProfileUpdateResult update);
27 28
... ...
1   -/**
2   - * Copyright © 2016-2020 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.common.transport.limits;
17   -
18   -import lombok.Getter;
19   -
20   -public enum TransportRateLimitType {
21   -
22   - TENANT_ADDED_TO_DISABLED_LIST("general.tenant.disabled", true, false),
23   - TENANT_MAX_MSGS("transport.tenant.msg", true, true),
24   - TENANT_TELEMETRY_MSGS("transport.tenant.telemetry", true, true),
25   - TENANT_MAX_DATA_POINTS("transport.tenant.dataPoints", true, false),
26   - DEVICE_MAX_MSGS("transport.device.msg", false, true),
27   - DEVICE_TELEMETRY_MSGS("transport.device.telemetry", false, true),
28   - DEVICE_MAX_DATA_POINTS("transport.device.dataPoints", false, false);
29   -
30   - @Getter
31   - private final String configurationKey;
32   - @Getter
33   - private final boolean tenantLevel;
34   - @Getter
35   - private final boolean deviceLevel;
36   - @Getter
37   - private final boolean messageLevel;
38   - @Getter
39   - private final boolean dataPointLevel;
40   -
41   - TransportRateLimitType(String configurationKey, boolean tenantLevel, boolean messageLevel) {
42   - this.configurationKey = configurationKey;
43   - this.tenantLevel = tenantLevel;
44   - this.deviceLevel = !tenantLevel;
45   - this.messageLevel = messageLevel;
46   - this.dataPointLevel = !messageLevel;
47   - }
48   -}
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -55,8 +55,6 @@ import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGateway
55 55 import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
56 56 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
57 57 import org.thingsboard.server.common.transport.limits.TransportRateLimitService;
58   -import org.thingsboard.server.common.transport.limits.TransportRateLimitType;
59   -import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult;
60 58 import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
61 59 import org.thingsboard.server.common.transport.util.JsonUtils;
62 60 import org.thingsboard.server.gen.transport.TransportProtos;
... ... @@ -364,7 +362,7 @@ public class DefaultTransportService implements TransportService {
364 362 for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) {
365 363 dataPoints += tsKv.getKvCount();
366 364 }
367   - if (checkLimits(sessionInfo, msg, callback, dataPoints, TELEMETRY)) {
  365 + if (checkLimits(sessionInfo, msg, callback, dataPoints)) {
368 366 reportActivityInternal(sessionInfo);
369 367 TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
370 368 DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
... ... @@ -385,7 +383,7 @@ public class DefaultTransportService implements TransportService {
385 383
386 384 @Override
387 385 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
388   - if (checkLimits(sessionInfo, msg, callback, msg.getKvCount(), TELEMETRY)) {
  386 + if (checkLimits(sessionInfo, msg, callback, msg.getKvCount())) {
389 387 reportActivityInternal(sessionInfo);
390 388 TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
391 389 DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
... ... @@ -575,31 +573,23 @@ public class DefaultTransportService implements TransportService {
575 573 sessions.remove(toSessionId(sessionInfo));
576 574 }
577 575
578   - private TransportRateLimitType[] DEFAULT = new TransportRateLimitType[]{TransportRateLimitType.TENANT_MAX_MSGS, TransportRateLimitType.DEVICE_MAX_MSGS};
579   - private TransportRateLimitType[] TELEMETRY = TransportRateLimitType.values();
580   -
581   - @Override
582   - public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback) {
583   - return checkLimits(sessionInfo, msg, callback, 0, DEFAULT);
  576 + private boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback) {
  577 + return checkLimits(sessionInfo, msg, callback, 0);
584 578 }
585 579
586   - @Override
587   - public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback, int dataPoints, TransportRateLimitType... limits) {
  580 + private boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback, int dataPoints) {
588 581 if (log.isTraceEnabled()) {
589 582 log.trace("[{}] Processing msg: {}", toSessionId(sessionInfo), msg);
590 583 }
591 584 TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
592 585 DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
593 586
594   - TransportRateLimitType limit = rateLimitService.checkLimits(tenantId, deviceId, 0, limits);
595   - if (limit == null) {
  587 + EntityType rateLimitedEntityType = rateLimitService.checkLimits(tenantId, deviceId, dataPoints);
  588 + if (rateLimitedEntityType == null) {
596 589 return true;
597 590 } else {
598 591 if (callback != null) {
599   - callback.onError(new TbRateLimitsException(limit.isTenantLevel() ? EntityType.TENANT : EntityType.DEVICE));
600   - }
601   - if (log.isTraceEnabled()) {
602   - log.trace("[{}][{}] {} rateLimit detected: {}", toSessionId(sessionInfo), tenantId, limit, msg);
  592 + callback.onError(new TbRateLimitsException(rateLimitedEntityType));
603 593 }
604 594 return false;
605 595 }
... ... @@ -831,8 +821,8 @@ public class DefaultTransportService implements TransportService {
831 821 @Override
832 822 public void onSuccess(T msg) {
833 823 try {
834   - apiUsageStatsClient.report(tenantId, ApiUsageRecordKey.MSG_COUNT, 1);
835   - apiUsageStatsClient.report(tenantId, ApiUsageRecordKey.DP_TRANSPORT_COUNT, dataPoints);
  824 + apiUsageStatsClient.report(tenantId, ApiUsageRecordKey.TRANSPORT_MSG_COUNT, 1);
  825 + apiUsageStatsClient.report(tenantId, ApiUsageRecordKey.TRANSPORT_DP_COUNT, dataPoints);
836 826 } finally {
837 827 callback.onSuccess(msg);
838 828 }
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ...
... ... @@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.id.TenantId;
29 29 import org.thingsboard.server.common.data.id.TenantProfileId;
30 30 import org.thingsboard.server.common.data.page.PageData;
31 31 import org.thingsboard.server.common.data.page.PageLink;
  32 +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
32 33 import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
33 34 import org.thingsboard.server.dao.entity.AbstractEntityService;
34 35 import org.thingsboard.server.dao.exception.DataValidationException;
... ... @@ -145,7 +146,9 @@ public class TenantProfileServiceImpl extends AbstractEntityService implements T
145 146 defaultTenantProfile = new TenantProfile();
146 147 defaultTenantProfile.setDefault(true);
147 148 defaultTenantProfile.setName("Default");
148   - defaultTenantProfile.setProfileData(new TenantProfileData());
  149 + TenantProfileData profileData = new TenantProfileData();
  150 + profileData.setConfiguration(new DefaultTenantProfileConfiguration());
  151 + defaultTenantProfile.setProfileData(profileData);
149 152 defaultTenantProfile.setDescription("Default tenant profile");
150 153 defaultTenantProfile.setIsolatedTbCore(false);
151 154 defaultTenantProfile.setIsolatedTbRuleEngine(false);
... ...
... ... @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.id.TenantId;
25 25 import org.thingsboard.server.common.data.id.TenantProfileId;
26 26 import org.thingsboard.server.common.data.page.PageData;
27 27 import org.thingsboard.server.common.data.page.PageLink;
  28 +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
28 29 import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
29 30 import org.thingsboard.server.dao.exception.DataValidationException;
30 31
... ... @@ -262,7 +263,9 @@ public class BaseTenantProfileServiceTest extends AbstractServiceTest {
262 263 TenantProfile tenantProfile = new TenantProfile();
263 264 tenantProfile.setName(name);
264 265 tenantProfile.setDescription(name + " Test");
265   - tenantProfile.setProfileData(new TenantProfileData());
  266 + TenantProfileData profileData = new TenantProfileData();
  267 + profileData.setConfiguration(new DefaultTenantProfileConfiguration());
  268 + tenantProfile.setProfileData(profileData);
266 269 tenantProfile.setDefault(false);
267 270 tenantProfile.setIsolatedTbCore(false);
268 271 tenantProfile.setIsolatedTbRuleEngine(false);
... ...