Commit 671a0b579ef2b6850311b3ba8c3cdfffb9a34747
1 parent
d1c6a907
events debug mode rate limits added
Showing
5 changed files
with
159 additions
and
40 deletions
... | ... | @@ -36,6 +36,7 @@ import org.springframework.stereotype.Component; |
36 | 36 | import org.thingsboard.rule.engine.api.MailService; |
37 | 37 | import org.thingsboard.rule.engine.api.RuleChainTransactionService; |
38 | 38 | import org.thingsboard.server.actors.service.ActorService; |
39 | +import org.thingsboard.server.actors.tenant.DebugTbRateLimits; | |
39 | 40 | import org.thingsboard.server.common.data.DataConstants; |
40 | 41 | import org.thingsboard.server.common.data.Event; |
41 | 42 | import org.thingsboard.server.common.data.id.EntityId; |
... | ... | @@ -43,6 +44,7 @@ import org.thingsboard.server.common.data.id.TenantId; |
43 | 44 | import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; |
44 | 45 | import org.thingsboard.server.common.msg.TbMsg; |
45 | 46 | import org.thingsboard.server.common.msg.cluster.ServerAddress; |
47 | +import org.thingsboard.server.common.msg.tools.TbRateLimits; | |
46 | 48 | import org.thingsboard.server.common.transport.auth.DeviceAuthService; |
47 | 49 | import org.thingsboard.server.dao.alarm.AlarmService; |
48 | 50 | import org.thingsboard.server.dao.asset.AssetService; |
... | ... | @@ -84,6 +86,8 @@ import java.io.IOException; |
84 | 86 | import java.io.PrintWriter; |
85 | 87 | import java.io.StringWriter; |
86 | 88 | import java.util.Optional; |
89 | +import java.util.concurrent.ConcurrentHashMap; | |
90 | +import java.util.concurrent.ConcurrentMap; | |
87 | 91 | |
88 | 92 | @Slf4j |
89 | 93 | @Component |
... | ... | @@ -92,6 +96,12 @@ public class ActorSystemContext { |
92 | 96 | |
93 | 97 | protected final ObjectMapper mapper = new ObjectMapper(); |
94 | 98 | |
99 | + private final ConcurrentMap<TenantId, DebugTbRateLimits> debugPerTenantLimits = new ConcurrentHashMap<>(); | |
100 | + | |
101 | + public ConcurrentMap<TenantId, DebugTbRateLimits> getDebugPerTenantLimits() { | |
102 | + return debugPerTenantLimits; | |
103 | + } | |
104 | + | |
95 | 105 | @Getter |
96 | 106 | @Setter |
97 | 107 | private ActorService actorService; |
... | ... | @@ -291,6 +301,14 @@ public class ActorSystemContext { |
291 | 301 | @Getter |
292 | 302 | private long sessionReportTimeout; |
293 | 303 | |
304 | + @Value("${actors.rule.chain.debug_mode_rate_limits_per_tenant.enabled}") | |
305 | + @Getter | |
306 | + private boolean debugPerTenantEnabled; | |
307 | + | |
308 | + @Value("${actors.rule.chain.debug_mode_rate_limits_per_tenant.configuration}") | |
309 | + @Getter | |
310 | + private String debugPerTenantLimitsConfiguration; | |
311 | + | |
294 | 312 | @Getter |
295 | 313 | @Setter |
296 | 314 | private ActorSystem actorSystem; |
... | ... | @@ -318,8 +336,6 @@ public class ActorSystemContext { |
318 | 336 | @Getter |
319 | 337 | private CassandraBufferedRateExecutor cassandraBufferedRateExecutor; |
320 | 338 | |
321 | - | |
322 | - | |
323 | 339 | public ActorSystemContext() { |
324 | 340 | config = ConfigFactory.parseResources(AKKA_CONF_FILE_NAME).withFallback(ConfigFactory.load()); |
325 | 341 | } |
... | ... | @@ -392,46 +408,97 @@ public class ActorSystemContext { |
392 | 408 | } |
393 | 409 | |
394 | 410 | private void persistDebugAsync(TenantId tenantId, EntityId entityId, String type, TbMsg tbMsg, String relationType, Throwable error) { |
395 | - try { | |
396 | - Event event = new Event(); | |
397 | - event.setTenantId(tenantId); | |
398 | - event.setEntityId(entityId); | |
399 | - event.setType(DataConstants.DEBUG_RULE_NODE); | |
400 | - | |
401 | - String metadata = mapper.writeValueAsString(tbMsg.getMetaData().getData()); | |
402 | - | |
403 | - ObjectNode node = mapper.createObjectNode() | |
404 | - .put("type", type) | |
405 | - .put("server", getServerAddress()) | |
406 | - .put("entityId", tbMsg.getOriginator().getId().toString()) | |
407 | - .put("entityName", tbMsg.getOriginator().getEntityType().name()) | |
408 | - .put("msgId", tbMsg.getId().toString()) | |
409 | - .put("msgType", tbMsg.getType()) | |
410 | - .put("dataType", tbMsg.getDataType().name()) | |
411 | - .put("relationType", relationType) | |
412 | - .put("data", tbMsg.getData()) | |
413 | - .put("metadata", metadata); | |
414 | - | |
415 | - if (error != null) { | |
416 | - node = node.put("error", toString(error)); | |
411 | + if (checkLimits(tenantId, tbMsg, error)) { | |
412 | + try { | |
413 | + Event event = new Event(); | |
414 | + event.setTenantId(tenantId); | |
415 | + event.setEntityId(entityId); | |
416 | + event.setType(DataConstants.DEBUG_RULE_NODE); | |
417 | + | |
418 | + String metadata = mapper.writeValueAsString(tbMsg.getMetaData().getData()); | |
419 | + | |
420 | + ObjectNode node = mapper.createObjectNode() | |
421 | + .put("type", type) | |
422 | + .put("server", getServerAddress()) | |
423 | + .put("entityId", tbMsg.getOriginator().getId().toString()) | |
424 | + .put("entityName", tbMsg.getOriginator().getEntityType().name()) | |
425 | + .put("msgId", tbMsg.getId().toString()) | |
426 | + .put("msgType", tbMsg.getType()) | |
427 | + .put("dataType", tbMsg.getDataType().name()) | |
428 | + .put("relationType", relationType) | |
429 | + .put("data", tbMsg.getData()) | |
430 | + .put("metadata", metadata); | |
431 | + | |
432 | + if (error != null) { | |
433 | + node = node.put("error", toString(error)); | |
434 | + } | |
435 | + | |
436 | + event.setBody(node); | |
437 | + ListenableFuture<Event> future = eventService.saveAsync(event); | |
438 | + Futures.addCallback(future, new FutureCallback<Event>() { | |
439 | + @Override | |
440 | + public void onSuccess(@Nullable Event event) { | |
441 | + | |
442 | + } | |
443 | + | |
444 | + @Override | |
445 | + public void onFailure(Throwable th) { | |
446 | + log.error("Could not save debug Event for Node", th); | |
447 | + } | |
448 | + }); | |
449 | + } catch (IOException ex) { | |
450 | + log.warn("Failed to persist rule node debug message", ex); | |
417 | 451 | } |
452 | + } | |
453 | + } | |
418 | 454 | |
419 | - event.setBody(node); | |
420 | - ListenableFuture<Event> future = eventService.saveAsync(event); | |
421 | - Futures.addCallback(future, new FutureCallback<Event>() { | |
422 | - @Override | |
423 | - public void onSuccess(@Nullable Event event) { | |
455 | + private boolean checkLimits(TenantId tenantId, TbMsg tbMsg, Throwable error) { | |
456 | + if (debugPerTenantEnabled) { | |
457 | + DebugTbRateLimits debugTbRateLimits = debugPerTenantLimits.computeIfAbsent(tenantId, id -> | |
458 | + new DebugTbRateLimits(new TbRateLimits(debugPerTenantLimitsConfiguration), false)); | |
424 | 459 | |
460 | + if (!debugTbRateLimits.getTbRateLimits().tryConsume()) { | |
461 | + if (!debugTbRateLimits.isRuleChainEventSaved()) { | |
462 | + persistRuleChainDebugModeEvent(tenantId, tbMsg.getRuleChainId(), error); | |
463 | + debugTbRateLimits.setRuleChainEventSaved(true); | |
425 | 464 | } |
426 | - | |
427 | - @Override | |
428 | - public void onFailure(Throwable th) { | |
429 | - log.error("Could not save debug Event for Node", th); | |
465 | + if (log.isTraceEnabled()) { | |
466 | + log.trace("[{}] Tenant level debug mode rate limit detected: {}", tenantId, tbMsg); | |
430 | 467 | } |
431 | - }); | |
432 | - } catch (IOException ex) { | |
433 | - log.warn("Failed to persist rule node debug message", ex); | |
468 | + return false; | |
469 | + } | |
470 | + } | |
471 | + return true; | |
472 | + } | |
473 | + | |
474 | + private void persistRuleChainDebugModeEvent(TenantId tenantId, EntityId entityId, Throwable error) { | |
475 | + Event event = new Event(); | |
476 | + event.setTenantId(tenantId); | |
477 | + event.setEntityId(entityId); | |
478 | + event.setType(DataConstants.DEBUG_RULE_CHAIN); | |
479 | + | |
480 | + ObjectNode node = mapper.createObjectNode() | |
481 | + //todo: what fields are needed here? | |
482 | + .put("server", getServerAddress()) | |
483 | + .put("message", "Reached debug mode rate limit!"); | |
484 | + | |
485 | + if (error != null) { | |
486 | + node = node.put("error", toString(error)); | |
434 | 487 | } |
488 | + | |
489 | + event.setBody(node); | |
490 | + ListenableFuture<Event> future = eventService.saveAsync(event); | |
491 | + Futures.addCallback(future, new FutureCallback<Event>() { | |
492 | + @Override | |
493 | + public void onSuccess(@Nullable Event event) { | |
494 | + | |
495 | + } | |
496 | + | |
497 | + @Override | |
498 | + public void onFailure(Throwable th) { | |
499 | + log.error("Could not save debug Event for Rule Chain", th); | |
500 | + } | |
501 | + }); | |
435 | 502 | } |
436 | 503 | |
437 | 504 | public static Exception toException(Throwable error) { | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2019 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.actors.tenant; | |
17 | + | |
18 | +import lombok.AllArgsConstructor; | |
19 | +import lombok.Data; | |
20 | +import org.thingsboard.server.common.msg.tools.TbRateLimits; | |
21 | + | |
22 | +@Data | |
23 | +@AllArgsConstructor | |
24 | +public class DebugTbRateLimits { | |
25 | + | |
26 | + private TbRateLimits tbRateLimits; | |
27 | + private boolean ruleChainEventSaved; | |
28 | + | |
29 | +} | ... | ... |
... | ... | @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; |
22 | 22 | import com.fasterxml.jackson.databind.node.ObjectNode; |
23 | 23 | import lombok.extern.slf4j.Slf4j; |
24 | 24 | import org.springframework.beans.factory.annotation.Autowired; |
25 | +import org.springframework.beans.factory.annotation.Value; | |
25 | 26 | import org.springframework.http.HttpStatus; |
26 | 27 | import org.springframework.security.access.prepost.PreAuthorize; |
27 | 28 | import org.springframework.util.StringUtils; |
... | ... | @@ -34,6 +35,8 @@ import org.springframework.web.bind.annotation.ResponseBody; |
34 | 35 | import org.springframework.web.bind.annotation.ResponseStatus; |
35 | 36 | import org.springframework.web.bind.annotation.RestController; |
36 | 37 | import org.thingsboard.rule.engine.api.ScriptEngine; |
38 | +import org.thingsboard.server.actors.ActorSystemContext; | |
39 | +import org.thingsboard.server.actors.tenant.DebugTbRateLimits; | |
37 | 40 | import org.thingsboard.server.common.data.DataConstants; |
38 | 41 | import org.thingsboard.server.common.data.EntityType; |
39 | 42 | import org.thingsboard.server.common.data.Event; |
... | ... | @@ -56,10 +59,10 @@ import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; |
56 | 59 | import org.thingsboard.server.service.security.permission.Operation; |
57 | 60 | import org.thingsboard.server.service.security.permission.Resource; |
58 | 61 | |
59 | -import java.util.HashSet; | |
60 | 62 | import java.util.List; |
61 | 63 | import java.util.Map; |
62 | 64 | import java.util.Set; |
65 | +import java.util.concurrent.ConcurrentMap; | |
63 | 66 | import java.util.stream.Collectors; |
64 | 67 | |
65 | 68 | @Slf4j |
... | ... | @@ -78,6 +81,12 @@ public class RuleChainController extends BaseController { |
78 | 81 | @Autowired |
79 | 82 | private JsInvokeService jsInvokeService; |
80 | 83 | |
84 | + @Autowired(required = false) | |
85 | + private ActorSystemContext actorContext; | |
86 | + | |
87 | + @Value("${actors.rule.chain.debug_mode_rate_limits_per_tenant.enabled}") | |
88 | + private boolean debugPerTenantEnabled; | |
89 | + | |
81 | 90 | @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") |
82 | 91 | @RequestMapping(value = "/ruleChain/{ruleChainId}", method = RequestMethod.GET) |
83 | 92 | @ResponseBody |
... | ... | @@ -182,8 +191,17 @@ public class RuleChainController extends BaseController { |
182 | 191 | @ResponseBody |
183 | 192 | public RuleChainMetaData saveRuleChainMetaData(@RequestBody RuleChainMetaData ruleChainMetaData) throws ThingsboardException { |
184 | 193 | try { |
194 | + TenantId tenantId = getTenantId(); | |
195 | + if (debugPerTenantEnabled) { | |
196 | + ConcurrentMap<TenantId, DebugTbRateLimits> debugPerTenantLimits = actorContext.getDebugPerTenantLimits(); | |
197 | + DebugTbRateLimits debugTbRateLimits = debugPerTenantLimits.getOrDefault(tenantId, null); | |
198 | + if (debugTbRateLimits != null) { | |
199 | + debugPerTenantLimits.remove(tenantId, debugTbRateLimits); | |
200 | + } | |
201 | + } | |
202 | + | |
185 | 203 | RuleChain ruleChain = checkRuleChain(ruleChainMetaData.getRuleChainId(), Operation.WRITE); |
186 | - RuleChainMetaData savedRuleChainMetaData = checkNotNull(ruleChainService.saveRuleChainMetaData(getTenantId(), ruleChainMetaData)); | |
204 | + RuleChainMetaData savedRuleChainMetaData = checkNotNull(ruleChainService.saveRuleChainMetaData(tenantId, ruleChainMetaData)); | |
187 | 205 | |
188 | 206 | actorService.onEntityStateChange(ruleChain.getTenantId(), ruleChain.getId(), ComponentLifecycleEvent.UPDATED); |
189 | 207 | |
... | ... | @@ -236,7 +254,7 @@ public class RuleChainController extends BaseController { |
236 | 254 | referencingRuleChainIds.remove(ruleChain.getId()); |
237 | 255 | |
238 | 256 | referencingRuleChainIds.forEach(referencingRuleChainId -> |
239 | - actorService.onEntityStateChange(ruleChain.getTenantId(), referencingRuleChainId, ComponentLifecycleEvent.UPDATED)); | |
257 | + actorService.onEntityStateChange(ruleChain.getTenantId(), referencingRuleChainId, ComponentLifecycleEvent.UPDATED)); | |
240 | 258 | |
241 | 259 | actorService.onEntityStateChange(ruleChain.getTenantId(), ruleChain.getId(), ComponentLifecycleEvent.DELETED); |
242 | 260 | |
... | ... | @@ -291,7 +309,8 @@ public class RuleChainController extends BaseController { |
291 | 309 | |
292 | 310 | String data = inputParams.get("msg").asText(); |
293 | 311 | JsonNode metadataJson = inputParams.get("metadata"); |
294 | - Map<String, String> metadata = objectMapper.convertValue(metadataJson, new TypeReference<Map<String, String>>() {}); | |
312 | + Map<String, String> metadata = objectMapper.convertValue(metadataJson, new TypeReference<Map<String, String>>() { | |
313 | + }); | |
295 | 314 | String msgType = inputParams.get("msgType").asText(); |
296 | 315 | String output = ""; |
297 | 316 | String errorText = ""; | ... | ... |
... | ... | @@ -210,6 +210,9 @@ actors: |
210 | 210 | chain: |
211 | 211 | # Errors for particular actor are persisted once per specified amount of milliseconds |
212 | 212 | error_persist_frequency: "${ACTORS_RULE_CHAIN_ERROR_FREQUENCY:3000}" |
213 | + debug_mode_rate_limits_per_tenant: | |
214 | + enabled: "${ACTORS_RULE_CHAIN_DEBUG_MODE_RATE_LIMITS_PER_TENANT_ENABLED:true}" | |
215 | + configuration: "${ACTORS_RULE_CHAIN_DEBUG_MODE_RATE_LIMITS_PER_TENANT_CONFIGURATION:1000:3600}" | |
213 | 216 | node: |
214 | 217 | # Errors for particular actor are persisted once per specified amount of milliseconds |
215 | 218 | error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}" | ... | ... |
... | ... | @@ -38,6 +38,7 @@ public class DataConstants { |
38 | 38 | public static final String LC_EVENT = "LC_EVENT"; |
39 | 39 | public static final String STATS = "STATS"; |
40 | 40 | public static final String DEBUG_RULE_NODE = "DEBUG_RULE_NODE"; |
41 | + public static final String DEBUG_RULE_CHAIN = "DEBUG_RULE_CHAIN"; | |
41 | 42 | |
42 | 43 | public static final String ONEWAY = "ONEWAY"; |
43 | 44 | public static final String TWOWAY = "TWOWAY"; | ... | ... |