Commit a87956ebfd3d0765224feb1cbac843638a29dbbd

Authored by Andrii Shvaika
1 parent 8385d18c

Improvements to Storage Days calculation

... ... @@ -39,6 +39,7 @@ import org.thingsboard.server.common.data.Customer;
39 39 import org.thingsboard.server.common.data.DataConstants;
40 40 import org.thingsboard.server.common.data.Device;
41 41 import org.thingsboard.server.common.data.DeviceProfile;
  42 +import org.thingsboard.server.common.data.TenantProfile;
42 43 import org.thingsboard.server.common.data.alarm.Alarm;
43 44 import org.thingsboard.server.common.data.asset.Asset;
44 45 import org.thingsboard.server.common.data.id.DeviceId;
... ... @@ -511,13 +512,24 @@ class DefaultTbContext implements TbContext {
511 512 }
512 513
513 514 @Override
  515 + public void addTenantProfileListener(Consumer<TenantProfile> listener) {
  516 + mainCtx.getTenantProfileCache().addListener(getTenantId(), getSelfId(), listener);
  517 + }
  518 +
  519 + @Override
514 520 public void addDeviceProfileListeners(Consumer<DeviceProfile> profileListener, BiConsumer<DeviceId, DeviceProfile> deviceListener) {
515 521 mainCtx.getDeviceProfileCache().addListener(getTenantId(), getSelfId(), profileListener, deviceListener);
516 522 }
517 523
518 524 @Override
519   - public void removeProfileListener() {
  525 + public void removeListeners() {
520 526 mainCtx.getDeviceProfileCache().removeListener(getTenantId(), getSelfId());
  527 + mainCtx.getTenantProfileCache().removeListener(getTenantId(), getSelfId());
  528 + }
  529 +
  530 + @Override
  531 + public TenantProfile getTenantProfile() {
  532 + return mainCtx.getTenantProfileCache().get(getTenantId());
521 533 }
522 534
523 535 private TbMsgMetaData getActionMetaData(RuleNodeId ruleNodeId) {
... ...
... ... @@ -45,6 +45,7 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory;
45 45 import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
46 46 import org.thingsboard.server.common.data.DataConstants;
47 47 import org.thingsboard.server.common.data.EntityType;
  48 +import org.thingsboard.server.common.data.TenantProfile;
48 49 import org.thingsboard.server.common.data.audit.ActionType;
49 50 import org.thingsboard.server.common.data.exception.ThingsboardException;
50 51 import org.thingsboard.server.common.data.id.DeviceId;
... ... @@ -69,6 +70,7 @@ import org.thingsboard.server.common.data.kv.LongDataEntry;
69 70 import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
70 71 import org.thingsboard.server.common.data.kv.StringDataEntry;
71 72 import org.thingsboard.server.common.data.kv.TsKvEntry;
  73 +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
72 74 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
73 75 import org.thingsboard.server.dao.timeseries.TimeseriesService;
74 76 import org.thingsboard.server.queue.util.TbCoreComponent;
... ... @@ -93,6 +95,7 @@ import java.util.Map;
93 95 import java.util.Set;
94 96 import java.util.concurrent.ExecutorService;
95 97 import java.util.concurrent.Executors;
  98 +import java.util.concurrent.TimeUnit;
96 99 import java.util.stream.Collectors;
97 100
98 101 /**
... ... @@ -205,7 +208,7 @@ public class TelemetryController extends BaseController {
205 208 @RequestParam(name = "interval", defaultValue = "0") Long interval,
206 209 @RequestParam(name = "limit", defaultValue = "100") Integer limit,
207 210 @RequestParam(name = "agg", defaultValue = "NONE") String aggStr,
208   - @RequestParam(name= "orderBy", defaultValue = "DESC") String orderBy,
  211 + @RequestParam(name = "orderBy", defaultValue = "DESC") String orderBy,
209 212 @RequestParam(name = "useStrictDataTypes", required = false, defaultValue = "false") Boolean useStrictDataTypes) throws ThingsboardException {
210 213 return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_TELEMETRY, entityType, entityIdStr,
211 214 (result, tenantId, entityId) -> {
... ... @@ -392,7 +395,7 @@ public class TelemetryController extends BaseController {
392 395 if (attributes.isEmpty()) {
393 396 return getImmediateDeferredResult("No attributes data found in request body!", HttpStatus.BAD_REQUEST);
394 397 }
395   - for (AttributeKvEntry attributeKvEntry: attributes) {
  398 + for (AttributeKvEntry attributeKvEntry : attributes) {
396 399 if (attributeKvEntry.getKey().isEmpty() || attributeKvEntry.getKey().trim().length() == 0) {
397 400 return getImmediateDeferredResult("Key cannot be empty or contains only spaces", HttpStatus.BAD_REQUEST);
398 401 }
... ... @@ -440,9 +443,13 @@ public class TelemetryController extends BaseController {
440 443 if (entries.isEmpty()) {
441 444 return getImmediateDeferredResult("No timeseries data found in request body!", HttpStatus.BAD_REQUEST);
442 445 }
443   - SecurityUser user = getCurrentUser();
444 446 return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.WRITE_TELEMETRY, entityIdSrc, (result, tenantId, entityId) -> {
445   - tsSubService.saveAndNotify(tenantId, entityId, entries, ttl, new FutureCallback<Void>() {
  447 + long tenantTtl = ttl;
  448 + if (!TenantId.SYS_TENANT_ID.equals(tenantId) && tenantTtl == 0) {
  449 + TenantProfile tenantProfile = tenantProfileCache.get(tenantId);
  450 + tenantTtl = TimeUnit.DAYS.toSeconds(((DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration()).getDefaultStorageTtlDays());
  451 + }
  452 + tsSubService.saveAndNotify(tenantId, entityId, entries, tenantTtl, new FutureCallback<Void>() {
446 453 @Override
447 454 public void onSuccess(@Nullable Void tmp) {
448 455 result.setResult(new ResponseEntity(HttpStatus.OK));
... ...
... ... @@ -521,27 +521,27 @@ public class DefaultDeviceStateService implements DeviceStateService {
521 521
522 522 private void save(DeviceId deviceId, String key, long value) {
523 523 if (persistToTelemetry) {
524   - tsSubService.saveAndNotify(
  524 + tsSubService.saveAndNotifyInternal(
525 525 TenantId.SYS_TENANT_ID, deviceId,
526 526 Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new LongDataEntry(key, value))),
527   - new AttributeSaveCallback(deviceId, key, value));
  527 + new AttributeSaveCallback<>(deviceId, key, value));
528 528 } else {
529   - tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback(deviceId, key, value));
  529 + tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback<>(deviceId, key, value));
530 530 }
531 531 }
532 532
533 533 private void save(DeviceId deviceId, String key, boolean value) {
534 534 if (persistToTelemetry) {
535   - tsSubService.saveAndNotify(
  535 + tsSubService.saveAndNotifyInternal(
536 536 TenantId.SYS_TENANT_ID, deviceId,
537 537 Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new BooleanDataEntry(key, value))),
538   - new AttributeSaveCallback(deviceId, key, value));
  538 + new AttributeSaveCallback<>(deviceId, key, value));
539 539 } else {
540   - tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback(deviceId, key, value));
  540 + tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback<>(deviceId, key, value));
541 541 }
542 542 }
543 543
544   - private static class AttributeSaveCallback implements FutureCallback<Void> {
  544 + private static class AttributeSaveCallback<T> implements FutureCallback<T> {
545 545 private final DeviceId deviceId;
546 546 private final String key;
547 547 private final Object value;
... ... @@ -553,7 +553,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
553 553 }
554 554
555 555 @Override
556   - public void onSuccess(@Nullable Void result) {
  556 + public void onSuccess(@Nullable T result) {
557 557 log.trace("[{}] Successfully updated attribute [{}] with value [{}]", deviceId, key, value);
558 558 }
559 559
... ...
... ... @@ -25,6 +25,7 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory;
25 25 import org.thingsboard.server.common.data.ApiUsageRecordKey;
26 26 import org.thingsboard.server.common.data.EntityType;
27 27 import org.thingsboard.server.common.data.EntityView;
  28 +import org.thingsboard.server.common.data.TenantProfile;
28 29 import org.thingsboard.server.common.data.id.EntityId;
29 30 import org.thingsboard.server.common.data.id.TenantId;
30 31 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
... ... @@ -34,13 +35,14 @@ import org.thingsboard.server.common.data.kv.DoubleDataEntry;
34 35 import org.thingsboard.server.common.data.kv.LongDataEntry;
35 36 import org.thingsboard.server.common.data.kv.StringDataEntry;
36 37 import org.thingsboard.server.common.data.kv.TsKvEntry;
  38 +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
37 39 import org.thingsboard.server.common.msg.queue.ServiceType;
38 40 import org.thingsboard.server.common.msg.queue.TbCallback;
39 41 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
40 42 import org.thingsboard.server.dao.attributes.AttributesService;
41 43 import org.thingsboard.server.dao.entityview.EntityViewService;
  44 +import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
42 45 import org.thingsboard.server.dao.timeseries.TimeseriesService;
43   -import org.thingsboard.server.dao.usagerecord.ApiUsageStateService;
44 46 import org.thingsboard.server.gen.transport.TransportProtos;
45 47 import org.thingsboard.server.queue.discovery.PartitionService;
46 48 import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
... ... @@ -119,11 +121,12 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
119 121 @Override
120 122 public void saveAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback) {
121 123 checkInternalEntity(entityId);
122   - if (apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) {
  124 + boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null;
  125 + if (sysTenant || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) {
123 126 saveAndNotifyInternal(tenantId, entityId, ts, ttl, new FutureCallback<Integer>() {
124 127 @Override
125 128 public void onSuccess(Integer result) {
126   - if (result != null && result > 0) {
  129 + if (!sysTenant && result != null && result > 0) {
127 130 apiUsageClient.report(tenantId, ApiUsageRecordKey.STORAGE_DP_COUNT, result);
128 131 }
129 132 callback.onSuccess(null);
... ... @@ -134,7 +137,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
134 137 callback.onFailure(t);
135 138 }
136 139 });
137   - } else{
  140 + } else {
138 141 callback.onFailure(new RuntimeException("DB storage writes are disabled due to API limits!"));
139 142 }
140 143 }
... ...
... ... @@ -16,9 +16,12 @@
16 16 package org.thingsboard.server.dao.tenant;
17 17
18 18 import org.thingsboard.server.common.data.TenantProfile;
  19 +import org.thingsboard.server.common.data.id.EntityId;
19 20 import org.thingsboard.server.common.data.id.TenantId;
20 21 import org.thingsboard.server.common.data.id.TenantProfileId;
21 22
  23 +import java.util.function.Consumer;
  24 +
22 25 public interface TbTenantProfileCache {
23 26
24 27 TenantProfile get(TenantId tenantId);
... ... @@ -31,4 +34,8 @@ public interface TbTenantProfileCache {
31 34
32 35 void evict(TenantId id);
33 36
  37 + void addListener(TenantId tenantId, EntityId listenerId, Consumer<TenantProfile> profileListener);
  38 +
  39 + void removeListener(TenantId tenantId, EntityId listenerId);
  40 +
34 41 }
... ...
... ... @@ -19,16 +19,15 @@ import lombok.extern.slf4j.Slf4j;
19 19 import org.springframework.stereotype.Service;
20 20 import org.thingsboard.server.common.data.Tenant;
21 21 import org.thingsboard.server.common.data.TenantProfile;
  22 +import org.thingsboard.server.common.data.id.EntityId;
22 23 import org.thingsboard.server.common.data.id.TenantId;
23 24 import org.thingsboard.server.common.data.id.TenantProfileId;
24   -import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
25   -import org.thingsboard.server.dao.tenant.TenantProfileService;
26   -import org.thingsboard.server.dao.tenant.TenantService;
27 25
28 26 import java.util.concurrent.ConcurrentHashMap;
29 27 import java.util.concurrent.ConcurrentMap;
30 28 import java.util.concurrent.locks.Lock;
31 29 import java.util.concurrent.locks.ReentrantLock;
  30 +import java.util.function.Consumer;
32 31
33 32 @Service
34 33 @Slf4j
... ... @@ -40,6 +39,7 @@ public class DefaultTbTenantProfileCache implements TbTenantProfileCache {
40 39
41 40 private final ConcurrentMap<TenantProfileId, TenantProfile> tenantProfilesMap = new ConcurrentHashMap<>();
42 41 private final ConcurrentMap<TenantId, TenantProfileId> tenantsMap = new ConcurrentHashMap<>();
  42 + private final ConcurrentMap<TenantId, ConcurrentMap<EntityId, Consumer<TenantProfile>>> profileListeners = new ConcurrentHashMap<>();
43 43
44 44 public DefaultTbTenantProfileCache(TenantProfileService tenantProfileService, TenantService tenantService) {
45 45 this.tenantProfileService = tenantProfileService;
... ... @@ -85,17 +85,56 @@ public class DefaultTbTenantProfileCache implements TbTenantProfileCache {
85 85 public void put(TenantProfile profile) {
86 86 if (profile.getId() != null) {
87 87 tenantProfilesMap.put(profile.getId(), profile);
  88 + notifyTenantListeners(profile);
88 89 }
89 90 }
90 91
91 92 @Override
92 93 public void evict(TenantProfileId profileId) {
93 94 tenantProfilesMap.remove(profileId);
  95 + notifyTenantListeners(get(profileId));
  96 + }
  97 +
  98 + public void notifyTenantListeners(TenantProfile tenantProfile) {
  99 + if (tenantProfile != null) {
  100 + tenantsMap.forEach(((tenantId, tenantProfileId) -> {
  101 + if (tenantProfileId.equals(tenantProfile.getId())) {
  102 + ConcurrentMap<EntityId, Consumer<TenantProfile>> tenantListeners = profileListeners.get(tenantId);
  103 + if (tenantListeners != null) {
  104 + tenantListeners.forEach((id, listener) -> listener.accept(tenantProfile));
  105 + }
  106 + }
  107 + }));
  108 + }
94 109 }
95 110
96 111 @Override
97 112 public void evict(TenantId tenantId) {
98 113 tenantsMap.remove(tenantId);
  114 + TenantProfile tenantProfile = get(tenantId);
  115 + if (tenantProfile != null) {
  116 + ConcurrentMap<EntityId, Consumer<TenantProfile>> tenantListeners = profileListeners.get(tenantId);
  117 + if (tenantListeners != null) {
  118 + tenantListeners.forEach((id, listener) -> listener.accept(tenantProfile));
  119 + }
  120 + }
  121 + }
  122 +
  123 + @Override
  124 + public void addListener(TenantId tenantId, EntityId listenerId, Consumer<TenantProfile> profileListener) {
  125 + //Force cache of the tenant id.
  126 + get(tenantId);
  127 + if (profileListener != null) {
  128 + profileListeners.computeIfAbsent(tenantId, id -> new ConcurrentHashMap<>()).put(listenerId, profileListener);
  129 + }
  130 + }
  131 +
  132 + @Override
  133 + public void removeListener(TenantId tenantId, EntityId listenerId) {
  134 + ConcurrentMap<EntityId, Consumer<TenantProfile>> tenantListeners = profileListeners.get(tenantId);
  135 + if (tenantListeners != null) {
  136 + tenantListeners.remove(listenerId);
  137 + }
99 138 }
100 139
101 140 }
... ...
... ... @@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.ApiUsageRecordKey;
23 23 import org.thingsboard.server.common.data.Customer;
24 24 import org.thingsboard.server.common.data.Device;
25 25 import org.thingsboard.server.common.data.DeviceProfile;
  26 +import org.thingsboard.server.common.data.TenantProfile;
26 27 import org.thingsboard.server.common.data.alarm.Alarm;
27 28 import org.thingsboard.server.common.data.asset.Asset;
28 29 import org.thingsboard.server.common.data.id.DeviceId;
... ... @@ -237,7 +238,11 @@ public interface TbContext {
237 238
238 239 void clearRuleNodeStates();
239 240
  241 + void addTenantProfileListener(Consumer<TenantProfile> listener);
  242 +
240 243 void addDeviceProfileListeners(Consumer<DeviceProfile> listener, BiConsumer<DeviceId, DeviceProfile> deviceListener);
241 244
242   - void removeProfileListener();
  245 + void removeListeners();
  246 +
  247 + TenantProfile getTenantProfile();
243 248 }
... ...
... ... @@ -149,7 +149,7 @@ public class TbDeviceProfileNode implements TbNode {
149 149
150 150 @Override
151 151 public void destroy() {
152   - ctx.removeProfileListener();
  152 + ctx.removeListeners();
153 153 deviceStates.clear();
154 154 }
155 155
... ...
... ... @@ -24,6 +24,7 @@ import org.thingsboard.rule.engine.api.TbContext;
24 24 import org.thingsboard.rule.engine.api.TbNode;
25 25 import org.thingsboard.rule.engine.api.TbNodeConfiguration;
26 26 import org.thingsboard.rule.engine.api.TbNodeException;
  27 +import org.thingsboard.server.common.data.TenantProfile;
27 28 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
28 29 import org.thingsboard.server.common.data.kv.KvEntry;
29 30 import org.thingsboard.server.common.data.kv.TsKvEntry;
... ... @@ -31,10 +32,12 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
31 32 import org.thingsboard.server.common.msg.TbMsg;
32 33 import org.thingsboard.server.common.msg.session.SessionMsgType;
33 34 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
  35 +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
34 36
35 37 import java.util.ArrayList;
36 38 import java.util.List;
37 39 import java.util.Map;
  40 +import java.util.concurrent.TimeUnit;
38 41
39 42 @Slf4j
40 43 @RuleNode(
... ... @@ -50,12 +53,20 @@ import java.util.Map;
50 53 public class TbMsgTimeseriesNode implements TbNode {
51 54
52 55 private TbMsgTimeseriesNodeConfiguration config;
  56 + private TbContext ctx;
53 57 private long tenantProfileDefaultStorageTtl;
54 58
55 59 @Override
56 60 public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
57 61 this.config = TbNodeUtils.convert(configuration, TbMsgTimeseriesNodeConfiguration.class);
  62 + this.ctx = ctx;
  63 + ctx.addTenantProfileListener(this::onTenantProfileUpdate);
  64 + onTenantProfileUpdate(ctx.getTenantProfile());
  65 + }
58 66
  67 + void onTenantProfileUpdate(TenantProfile tenantProfile) {
  68 + DefaultTenantProfileConfiguration configuration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration();
  69 + tenantProfileDefaultStorageTtl = TimeUnit.DAYS.toSeconds(configuration.getDefaultStorageTtlDays());
59 70 }
60 71
61 72 @Override
... ... @@ -101,6 +112,7 @@ public class TbMsgTimeseriesNode implements TbNode {
101 112
102 113 @Override
103 114 public void destroy() {
  115 + ctx.removeListeners();
104 116 }
105 117
106 118 }
... ...