Commit 42dd386182aa2772fa6d4d51a32eb3c9aeb17610

Authored by YevhenBondarenko
2 parents f7aa8df6 9d14a389

Merge branch 'feature/usage-records' of https://github.com/thingsboard/thingsboa…

…rd into feature/usage-records
@@ -40,8 +40,6 @@ import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; @@ -40,8 +40,6 @@ import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
40 import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; 40 import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
41 import org.thingsboard.server.common.msg.queue.RuleEngineException; 41 import org.thingsboard.server.common.msg.queue.RuleEngineException;
42 import org.thingsboard.server.common.msg.queue.ServiceType; 42 import org.thingsboard.server.common.msg.queue.ServiceType;
43 -import org.thingsboard.server.dao.model.ModelConstants;  
44 -import org.thingsboard.server.dao.tenant.TenantProfileService;  
45 import org.thingsboard.server.dao.tenant.TenantService; 43 import org.thingsboard.server.dao.tenant.TenantService;
46 import org.thingsboard.server.service.profile.TbTenantProfileCache; 44 import org.thingsboard.server.service.profile.TbTenantProfileCache;
47 import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; 45 import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
@@ -150,15 +148,12 @@ public class AppActor extends ContextAwareActor { @@ -150,15 +148,12 @@ public class AppActor extends ContextAwareActor {
150 private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) { 148 private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
151 TbActorRef target = null; 149 TbActorRef target = null;
152 if (TenantId.SYS_TENANT_ID.equals(msg.getTenantId())) { 150 if (TenantId.SYS_TENANT_ID.equals(msg.getTenantId())) {
153 - if (msg.getEntityId().getEntityType() == EntityType.TENANT_PROFILE) {  
154 - tenantProfileCache.evict(new TenantProfileId(msg.getEntityId().getId()));  
155 - } else { 151 + if (!EntityType.TENANT_PROFILE.equals(msg.getEntityId().getEntityType())) {
156 log.warn("Message has system tenant id: {}", msg); 152 log.warn("Message has system tenant id: {}", msg);
157 } 153 }
158 } else { 154 } else {
159 - if (msg.getEntityId().getEntityType() == EntityType.TENANT) { 155 + if (EntityType.TENANT.equals(msg.getEntityId().getEntityType())) {
160 TenantId tenantId = new TenantId(msg.getEntityId().getId()); 156 TenantId tenantId = new TenantId(msg.getEntityId().getId());
161 - tenantProfileCache.evict(tenantId);  
162 if (msg.getEvent() == ComponentLifecycleEvent.DELETED) { 157 if (msg.getEvent() == ComponentLifecycleEvent.DELETED) {
163 log.info("[{}] Handling tenant deleted notification: {}", msg.getTenantId(), msg); 158 log.info("[{}] Handling tenant deleted notification: {}", msg.getTenantId(), msg);
164 deletedTenants.add(tenantId); 159 deletedTenants.add(tenantId);
@@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.ApiUsageRecordKey; @@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.ApiUsageRecordKey;
22 import org.thingsboard.server.common.data.ApiUsageState; 22 import org.thingsboard.server.common.data.ApiUsageState;
23 import org.thingsboard.server.common.data.TenantProfile; 23 import org.thingsboard.server.common.data.TenantProfile;
24 import org.thingsboard.server.common.data.id.TenantId; 24 import org.thingsboard.server.common.data.id.TenantId;
  25 +import org.thingsboard.server.common.data.id.TenantProfileId;
25 import org.thingsboard.server.common.data.kv.BasicTsKvEntry; 26 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
26 import org.thingsboard.server.common.data.kv.LongDataEntry; 27 import org.thingsboard.server.common.data.kv.LongDataEntry;
27 import org.thingsboard.server.common.data.kv.TsKvEntry; 28 import org.thingsboard.server.common.data.kv.TsKvEntry;
@@ -87,6 +88,7 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService { @@ -87,6 +88,7 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
87 TenantId tenantId = new TenantId(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB())); 88 TenantId tenantId = new TenantId(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB()));
88 TenantApiUsageState tenantState; 89 TenantApiUsageState tenantState;
89 List<TsKvEntry> updatedEntries; 90 List<TsKvEntry> updatedEntries;
  91 + boolean stateUpdated = false;
90 updateLock.lock(); 92 updateLock.lock();
91 try { 93 try {
92 tenantState = getOrFetchState(tenantId); 94 tenantState = getOrFetchState(tenantId);
@@ -101,13 +103,21 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService { @@ -101,13 +103,21 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
101 ApiUsageRecordKey recordKey = ApiUsageRecordKey.valueOf(kvProto.getKey()); 103 ApiUsageRecordKey recordKey = ApiUsageRecordKey.valueOf(kvProto.getKey());
102 long newValue = tenantState.add(recordKey, kvProto.getValue()); 104 long newValue = tenantState.add(recordKey, kvProto.getValue());
103 updatedEntries.add(new BasicTsKvEntry(ts, new LongDataEntry(recordKey.name(), newValue))); 105 updatedEntries.add(new BasicTsKvEntry(ts, new LongDataEntry(recordKey.name(), newValue)));
104 - newValue = tenantState.addToHourly(recordKey, kvProto.getValue());  
105 - updatedEntries.add(new BasicTsKvEntry(hourTs, new LongDataEntry(HOURLY + recordKey.name(), newValue))); 106 + long newHourlyValue = tenantState.addToHourly(recordKey, kvProto.getValue());
  107 + updatedEntries.add(new BasicTsKvEntry(hourTs, new LongDataEntry(HOURLY + recordKey.name(), newHourlyValue)));
  108 + stateUpdated |= tenantState.checkStateUpdatedDueToThreshold(recordKey);
106 } 109 }
107 } finally { 110 } finally {
108 updateLock.unlock(); 111 updateLock.unlock();
109 } 112 }
110 - tsService.save(tenantId, tenantState.getId(), updatedEntries, 0L); 113 + tsService.save(tenantId, tenantState.getApiUsageState().getId(), updatedEntries, 0L);
  114 + if (stateUpdated) {
  115 + // Save new state into the database;
  116 + apiUsageStateService.update(tenantState.getApiUsageState());
  117 + //TODO: clear cache on cluster repartition.
  118 + //TODO: update profiles on tenant and profile updates.
  119 + //TODO: broadcast to everyone notifications about enabled/disabled features.
  120 + }
111 } 121 }
112 122
113 @Override 123 @Override
@@ -116,12 +126,26 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService { @@ -116,12 +126,26 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
116 } 126 }
117 127
118 @Override 128 @Override
119 - public void onAddedToAllowList(TenantId tenantId) {  
120 - 129 + public void onTenantProfileUpdate(TenantProfileId tenantProfileId) {
  130 + TenantProfile tenantProfile = tenantProfileCache.get(tenantProfileId);
  131 + updateLock.lock();
  132 + try {
  133 + tenantStates.values().forEach(state -> {
  134 + if (tenantProfile.getId().equals(state.getTenantProfileId())) {
  135 + state.setTenantProfileData(tenantProfile.getProfileData());
  136 + if (state.checkStateUpdatedDueToThresholds()) {
  137 + apiUsageStateService.update(state.getApiUsageState());
  138 + //TODO: send notification to cluster;
  139 + }
  140 + }
  141 + });
  142 + } finally {
  143 + updateLock.unlock();
  144 + }
121 } 145 }
122 146
123 @Override 147 @Override
124 - public void onAddedToDenyList(TenantId tenantId) { 148 + public void onTenantUpdate(TenantId tenantId) {
125 149
126 } 150 }
127 151
@@ -150,7 +174,8 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService { @@ -150,7 +174,8 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
150 dbStateEntity = apiUsageStateService.findTenantApiUsageState(tenantId); 174 dbStateEntity = apiUsageStateService.findTenantApiUsageState(tenantId);
151 } 175 }
152 } 176 }
153 - tenantState = new TenantApiUsageState(dbStateEntity.getId()); 177 + TenantProfile tenantProfile = tenantProfileCache.get(tenantId);
  178 + tenantState = new TenantApiUsageState(tenantProfile, dbStateEntity);
154 try { 179 try {
155 List<TsKvEntry> dbValues = tsService.findAllLatest(tenantId, dbStateEntity.getEntityId()).get(); 180 List<TsKvEntry> dbValues = tsService.findAllLatest(tenantId, dbStateEntity.getEntityId()).get();
156 for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { 181 for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
@@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
16 package org.thingsboard.server.service.apiusage; 16 package org.thingsboard.server.service.apiusage;
17 17
18 import org.thingsboard.server.common.data.id.TenantId; 18 import org.thingsboard.server.common.data.id.TenantId;
  19 +import org.thingsboard.server.common.data.id.TenantProfileId;
19 import org.thingsboard.server.common.msg.queue.TbCallback; 20 import org.thingsboard.server.common.msg.queue.TbCallback;
20 import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; 21 import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
21 import org.thingsboard.server.queue.common.TbProtoQueueMsg; 22 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@@ -26,8 +27,7 @@ public interface TbApiUsageStateService { @@ -26,8 +27,7 @@ public interface TbApiUsageStateService {
26 27
27 TenantApiUsageState getApiUsageState(TenantId tenantId); 28 TenantApiUsageState getApiUsageState(TenantId tenantId);
28 29
29 - void onAddedToAllowList(TenantId tenantId);  
30 -  
31 - void onAddedToDenyList(TenantId tenantId); 30 + void onTenantProfileUpdate(TenantProfileId tenantProfileId);
32 31
  32 + void onTenantUpdate(TenantId tenantId);
33 } 33 }
@@ -16,9 +16,13 @@ @@ -16,9 +16,13 @@
16 package org.thingsboard.server.service.apiusage; 16 package org.thingsboard.server.service.apiusage;
17 17
18 import lombok.Getter; 18 import lombok.Getter;
  19 +import lombok.Setter;
19 import org.thingsboard.server.common.data.ApiUsageRecordKey; 20 import org.thingsboard.server.common.data.ApiUsageRecordKey;
20 -import org.thingsboard.server.common.data.id.ApiUsageStateId; 21 +import org.thingsboard.server.common.data.ApiUsageState;
  22 +import org.thingsboard.server.common.data.TenantProfile;
  23 +import org.thingsboard.server.common.data.TenantProfileData;
21 import org.thingsboard.server.common.data.id.EntityId; 24 import org.thingsboard.server.common.data.id.EntityId;
  25 +import org.thingsboard.server.common.data.id.TenantProfileId;
22 import org.thingsboard.server.common.msg.tools.SchedulerUtils; 26 import org.thingsboard.server.common.msg.tools.SchedulerUtils;
23 27
24 import java.util.Map; 28 import java.util.Map;
@@ -30,7 +34,13 @@ public class TenantApiUsageState { @@ -30,7 +34,13 @@ public class TenantApiUsageState {
30 private final Map<ApiUsageRecordKey, Long> currentHourValues = new ConcurrentHashMap<>(); 34 private final Map<ApiUsageRecordKey, Long> currentHourValues = new ConcurrentHashMap<>();
31 35
32 @Getter 36 @Getter
33 - private final ApiUsageStateId id; 37 + @Setter
  38 + private TenantProfileId tenantProfileId;
  39 + @Getter
  40 + @Setter
  41 + private TenantProfileData tenantProfileData;
  42 + @Getter
  43 + private final ApiUsageState apiUsageState;
34 @Getter 44 @Getter
35 private volatile long currentCycleTs; 45 private volatile long currentCycleTs;
36 @Getter 46 @Getter
@@ -38,8 +48,10 @@ public class TenantApiUsageState { @@ -38,8 +48,10 @@ public class TenantApiUsageState {
38 @Getter 48 @Getter
39 private volatile long currentHourTs; 49 private volatile long currentHourTs;
40 50
41 - public TenantApiUsageState(ApiUsageStateId id) {  
42 - this.id = id; 51 + public TenantApiUsageState(TenantProfile tenantProfile, ApiUsageState apiUsageState) {
  52 + this.tenantProfileId = tenantProfile.getId();
  53 + this.tenantProfileData = tenantProfile.getProfileData();
  54 + this.apiUsageState = apiUsageState;
43 this.currentCycleTs = SchedulerUtils.getStartOfCurrentMonth(); 55 this.currentCycleTs = SchedulerUtils.getStartOfCurrentMonth();
44 this.nextCycleTs = SchedulerUtils.getStartOfNextMonth(); 56 this.nextCycleTs = SchedulerUtils.getStartOfNextMonth();
45 this.currentHourTs = SchedulerUtils.getStartOfCurrentHour(); 57 this.currentHourTs = SchedulerUtils.getStartOfCurrentHour();
@@ -59,6 +71,10 @@ public class TenantApiUsageState { @@ -59,6 +71,10 @@ public class TenantApiUsageState {
59 return result; 71 return result;
60 } 72 }
61 73
  74 + public long get(ApiUsageRecordKey key) {
  75 + return currentCycleValues.getOrDefault(key, 0L);
  76 + }
  77 +
62 public long addToHourly(ApiUsageRecordKey key, long value) { 78 public long addToHourly(ApiUsageRecordKey key, long value) {
63 long result = currentHourValues.getOrDefault(key, 0L) + value; 79 long result = currentHourValues.getOrDefault(key, 0L) + value;
64 currentHourValues.put(key, result); 80 currentHourValues.put(key, result);
@@ -80,4 +96,103 @@ public class TenantApiUsageState { @@ -80,4 +96,103 @@ public class TenantApiUsageState {
80 } 96 }
81 } 97 }
82 98
  99 + public long getProfileThreshold(ApiUsageRecordKey key) {
  100 + Object threshold = tenantProfileData.getProperties().get(key.name());
  101 + if (threshold != null) {
  102 + if (threshold instanceof String) {
  103 + return Long.parseLong((String) threshold);
  104 + } else if (threshold instanceof Long) {
  105 + return (Long) threshold;
  106 + }
  107 + }
  108 + return 0L;
  109 + }
  110 +
  111 + public EntityId getEntityId() {
  112 + return apiUsageState.getEntityId();
  113 + }
  114 +
  115 + public boolean isTransportEnabled() {
  116 + return apiUsageState.isTransportEnabled();
  117 + }
  118 +
  119 + public boolean isDbStorageEnabled() {
  120 + return apiUsageState.isDbStorageEnabled();
  121 + }
  122 +
  123 + public boolean isRuleEngineEnabled() {
  124 + return apiUsageState.isRuleEngineEnabled();
  125 + }
  126 +
  127 + public boolean isJsExecEnabled() {
  128 + return apiUsageState.isJsExecEnabled();
  129 + }
  130 +
  131 + public void setTransportEnabled(boolean transportEnabled) {
  132 + apiUsageState.setTransportEnabled(transportEnabled);
  133 + }
  134 +
  135 + public void setDbStorageEnabled(boolean dbStorageEnabled) {
  136 + apiUsageState.setDbStorageEnabled(dbStorageEnabled);
  137 + }
  138 +
  139 + public void setRuleEngineEnabled(boolean ruleEngineEnabled) {
  140 + apiUsageState.setRuleEngineEnabled(ruleEngineEnabled);
  141 + }
  142 +
  143 + public void setJsExecEnabled(boolean jsExecEnabled) {
  144 + apiUsageState.setJsExecEnabled(jsExecEnabled);
  145 + }
  146 +
  147 + public boolean isFeatureEnabled(ApiUsageRecordKey recordKey) {
  148 + switch (recordKey) {
  149 + case MSG_COUNT:
  150 + case MSG_BYTES_COUNT:
  151 + case DP_TRANSPORT_COUNT:
  152 + return isTransportEnabled();
  153 + case RE_EXEC_COUNT:
  154 + return isRuleEngineEnabled();
  155 + case DP_STORAGE_COUNT:
  156 + return isDbStorageEnabled();
  157 + case JS_EXEC_COUNT:
  158 + return isJsExecEnabled();
  159 + default:
  160 + return true;
  161 + }
  162 + }
  163 +
  164 + public boolean setFeatureValue(ApiUsageRecordKey recordKey, boolean value) {
  165 + boolean currentValue = isFeatureEnabled(recordKey);
  166 + switch (recordKey) {
  167 + case MSG_COUNT:
  168 + case MSG_BYTES_COUNT:
  169 + case DP_TRANSPORT_COUNT:
  170 + setTransportEnabled(value);
  171 + break;
  172 + case RE_EXEC_COUNT:
  173 + setRuleEngineEnabled(value);
  174 + break;
  175 + case DP_STORAGE_COUNT:
  176 + setDbStorageEnabled(value);
  177 + break;
  178 + case JS_EXEC_COUNT:
  179 + setJsExecEnabled(value);
  180 + break;
  181 + }
  182 + return currentValue == value;
  183 + }
  184 +
  185 + public boolean checkStateUpdatedDueToThresholds() {
  186 + boolean update = false;
  187 + for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
  188 + update |= checkStateUpdatedDueToThreshold(key);
  189 + }
  190 + return update;
  191 + }
  192 +
  193 + public boolean checkStateUpdatedDueToThreshold(ApiUsageRecordKey recordKey) {
  194 + long value = get(recordKey);
  195 + long threshold = getProfileThreshold(recordKey);
  196 + return setFeatureValue(recordKey, threshold == 0 || value < threshold);
  197 + }
83 } 198 }
@@ -54,6 +54,7 @@ import org.thingsboard.server.queue.provider.TbCoreQueueFactory; @@ -54,6 +54,7 @@ import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
54 import org.thingsboard.server.queue.util.TbCoreComponent; 54 import org.thingsboard.server.queue.util.TbCoreComponent;
55 import org.thingsboard.server.service.apiusage.TbApiUsageStateService; 55 import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
56 import org.thingsboard.server.service.profile.TbDeviceProfileCache; 56 import org.thingsboard.server.service.profile.TbDeviceProfileCache;
  57 +import org.thingsboard.server.service.profile.TbTenantProfileCache;
57 import org.thingsboard.server.service.queue.processing.AbstractConsumerService; 58 import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
58 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; 59 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
59 import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; 60 import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
@@ -102,12 +103,19 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore @@ -102,12 +103,19 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
102 protected volatile ExecutorService usageStatsExecutor; 103 protected volatile ExecutorService usageStatsExecutor;
103 104
104 105
105 - public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory, ActorSystemContext actorContext,  
106 - DeviceStateService stateService, TbLocalSubscriptionService localSubscriptionService,  
107 - SubscriptionManagerService subscriptionManagerService, DataDecodingEncodingService encodingService,  
108 - TbCoreDeviceRpcService tbCoreDeviceRpcService, StatsFactory statsFactory, TbDeviceProfileCache deviceProfileCache,  
109 - TbApiUsageStateService statsService) {  
110 - super(actorContext, encodingService, deviceProfileCache, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer()); 106 + public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory,
  107 + ActorSystemContext actorContext,
  108 + DeviceStateService stateService,
  109 + TbLocalSubscriptionService localSubscriptionService,
  110 + SubscriptionManagerService subscriptionManagerService,
  111 + DataDecodingEncodingService encodingService,
  112 + TbCoreDeviceRpcService tbCoreDeviceRpcService,
  113 + StatsFactory statsFactory,
  114 + TbDeviceProfileCache deviceProfileCache,
  115 + TbApiUsageStateService statsService,
  116 + TbTenantProfileCache tenantProfileCache,
  117 + TbApiUsageStateService apiUsageStateService) {
  118 + super(actorContext, encodingService, tenantProfileCache, deviceProfileCache, apiUsageStateService, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer());
111 this.mainConsumer = tbCoreQueueFactory.createToCoreMsgConsumer(); 119 this.mainConsumer = tbCoreQueueFactory.createToCoreMsgConsumer();
112 this.usageStatsConsumer = tbCoreQueueFactory.createToUsageStatsServiceMsgConsumer(); 120 this.usageStatsConsumer = tbCoreQueueFactory.createToUsageStatsServiceMsgConsumer();
113 this.stateService = stateService; 121 this.stateService = stateService;
@@ -15,7 +15,6 @@ @@ -15,7 +15,6 @@
15 */ 15 */
16 package org.thingsboard.server.service.queue; 16 package org.thingsboard.server.service.queue;
17 17
18 -import com.google.protobuf.ByteString;  
19 import com.google.protobuf.ProtocolStringList; 18 import com.google.protobuf.ProtocolStringList;
20 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
21 import org.springframework.beans.factory.annotation.Value; 20 import org.springframework.beans.factory.annotation.Value;
@@ -24,10 +23,16 @@ import org.springframework.stereotype.Service; @@ -24,10 +23,16 @@ import org.springframework.stereotype.Service;
24 import org.thingsboard.rule.engine.api.RpcError; 23 import org.thingsboard.rule.engine.api.RpcError;
25 import org.thingsboard.server.actors.ActorSystemContext; 24 import org.thingsboard.server.actors.ActorSystemContext;
26 import org.thingsboard.server.common.data.id.TenantId; 25 import org.thingsboard.server.common.data.id.TenantId;
27 -import org.thingsboard.server.common.msg.TbActorMsg;  
28 import org.thingsboard.server.common.msg.TbMsg; 26 import org.thingsboard.server.common.msg.TbMsg;
29 -import org.thingsboard.server.common.msg.queue.*; 27 +import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
  28 +import org.thingsboard.server.common.msg.queue.RuleEngineException;
  29 +import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
  30 +import org.thingsboard.server.common.msg.queue.ServiceQueue;
  31 +import org.thingsboard.server.common.msg.queue.ServiceType;
  32 +import org.thingsboard.server.common.msg.queue.TbCallback;
  33 +import org.thingsboard.server.common.msg.queue.TbMsgCallback;
30 import org.thingsboard.server.common.stats.StatsFactory; 34 import org.thingsboard.server.common.stats.StatsFactory;
  35 +import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
31 import org.thingsboard.server.gen.transport.TransportProtos; 36 import org.thingsboard.server.gen.transport.TransportProtos;
32 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; 37 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
33 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; 38 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
@@ -38,17 +43,33 @@ import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory; @@ -38,17 +43,33 @@ import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory;
38 import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; 43 import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
39 import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; 44 import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
40 import org.thingsboard.server.queue.util.TbRuleEngineComponent; 45 import org.thingsboard.server.queue.util.TbRuleEngineComponent;
41 -import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; 46 +import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
42 import org.thingsboard.server.service.profile.TbDeviceProfileCache; 47 import org.thingsboard.server.service.profile.TbDeviceProfileCache;
43 -import org.thingsboard.server.service.queue.processing.*; 48 +import org.thingsboard.server.service.profile.TbTenantProfileCache;
  49 +import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
  50 +import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingDecision;
  51 +import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult;
  52 +import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategy;
  53 +import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategyFactory;
  54 +import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy;
  55 +import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategyFactory;
44 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; 56 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
45 import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService; 57 import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
46 import org.thingsboard.server.service.stats.RuleEngineStatisticsService; 58 import org.thingsboard.server.service.stats.RuleEngineStatisticsService;
47 59
48 import javax.annotation.PostConstruct; 60 import javax.annotation.PostConstruct;
49 import javax.annotation.PreDestroy; 61 import javax.annotation.PreDestroy;
50 -import java.util.*;  
51 -import java.util.concurrent.*; 62 +import java.util.Collections;
  63 +import java.util.HashSet;
  64 +import java.util.List;
  65 +import java.util.Map;
  66 +import java.util.Set;
  67 +import java.util.UUID;
  68 +import java.util.concurrent.ConcurrentHashMap;
  69 +import java.util.concurrent.ConcurrentMap;
  70 +import java.util.concurrent.ExecutorService;
  71 +import java.util.concurrent.Executors;
  72 +import java.util.concurrent.TimeUnit;
52 73
53 @Service 74 @Service
54 @TbRuleEngineComponent 75 @TbRuleEngineComponent
@@ -79,11 +100,16 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< @@ -79,11 +100,16 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
79 public DefaultTbRuleEngineConsumerService(TbRuleEngineProcessingStrategyFactory processingStrategyFactory, 100 public DefaultTbRuleEngineConsumerService(TbRuleEngineProcessingStrategyFactory processingStrategyFactory,
80 TbRuleEngineSubmitStrategyFactory submitStrategyFactory, 101 TbRuleEngineSubmitStrategyFactory submitStrategyFactory,
81 TbQueueRuleEngineSettings ruleEngineSettings, 102 TbQueueRuleEngineSettings ruleEngineSettings,
82 - TbRuleEngineQueueFactory tbRuleEngineQueueFactory, RuleEngineStatisticsService statisticsService,  
83 - ActorSystemContext actorContext, DataDecodingEncodingService encodingService, 103 + TbRuleEngineQueueFactory tbRuleEngineQueueFactory,
  104 + RuleEngineStatisticsService statisticsService,
  105 + ActorSystemContext actorContext,
  106 + DataDecodingEncodingService encodingService,
84 TbRuleEngineDeviceRpcService tbDeviceRpcService, 107 TbRuleEngineDeviceRpcService tbDeviceRpcService,
85 - StatsFactory statsFactory, TbDeviceProfileCache deviceProfileCache) {  
86 - super(actorContext, encodingService, deviceProfileCache, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer()); 108 + StatsFactory statsFactory,
  109 + TbDeviceProfileCache deviceProfileCache,
  110 + TbTenantProfileCache tenantProfileCache,
  111 + TbApiUsageStateService apiUsageStateService) {
  112 + super(actorContext, encodingService, tenantProfileCache, deviceProfileCache, apiUsageStateService, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer());
87 this.statisticsService = statisticsService; 113 this.statisticsService = statisticsService;
88 this.ruleEngineSettings = ruleEngineSettings; 114 this.ruleEngineSettings = ruleEngineSettings;
89 this.tbRuleEngineQueueFactory = tbRuleEngineQueueFactory; 115 this.tbRuleEngineQueueFactory = tbRuleEngineQueueFactory;
@@ -25,6 +25,7 @@ import org.thingsboard.server.actors.ActorSystemContext; @@ -25,6 +25,7 @@ import org.thingsboard.server.actors.ActorSystemContext;
25 import org.thingsboard.server.common.data.EntityType; 25 import org.thingsboard.server.common.data.EntityType;
26 import org.thingsboard.server.common.data.id.DeviceId; 26 import org.thingsboard.server.common.data.id.DeviceId;
27 import org.thingsboard.server.common.data.id.DeviceProfileId; 27 import org.thingsboard.server.common.data.id.DeviceProfileId;
  28 +import org.thingsboard.server.common.data.id.TenantProfileId;
28 import org.thingsboard.server.common.msg.TbActorMsg; 29 import org.thingsboard.server.common.msg.TbActorMsg;
29 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; 30 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
30 import org.thingsboard.server.common.msg.queue.ServiceType; 31 import org.thingsboard.server.common.msg.queue.ServiceType;
@@ -33,7 +34,9 @@ import org.thingsboard.server.queue.TbQueueConsumer; @@ -33,7 +34,9 @@ import org.thingsboard.server.queue.TbQueueConsumer;
33 import org.thingsboard.server.queue.common.TbProtoQueueMsg; 34 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
34 import org.thingsboard.server.queue.discovery.PartitionChangeEvent; 35 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
35 import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; 36 import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
  37 +import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
36 import org.thingsboard.server.service.profile.TbDeviceProfileCache; 38 import org.thingsboard.server.service.profile.TbDeviceProfileCache;
  39 +import org.thingsboard.server.service.profile.TbTenantProfileCache;
37 import org.thingsboard.server.service.queue.TbPackCallback; 40 import org.thingsboard.server.service.queue.TbPackCallback;
38 import org.thingsboard.server.service.queue.TbPackProcessingContext; 41 import org.thingsboard.server.service.queue.TbPackProcessingContext;
39 42
@@ -59,15 +62,19 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene @@ -59,15 +62,19 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
59 62
60 protected final ActorSystemContext actorContext; 63 protected final ActorSystemContext actorContext;
61 protected final DataDecodingEncodingService encodingService; 64 protected final DataDecodingEncodingService encodingService;
  65 + protected final TbTenantProfileCache tenantProfileCache;
62 protected final TbDeviceProfileCache deviceProfileCache; 66 protected final TbDeviceProfileCache deviceProfileCache;
  67 + protected final TbApiUsageStateService apiUsageStateService;
63 68
64 protected final TbQueueConsumer<TbProtoQueueMsg<N>> nfConsumer; 69 protected final TbQueueConsumer<TbProtoQueueMsg<N>> nfConsumer;
65 70
66 public AbstractConsumerService(ActorSystemContext actorContext, DataDecodingEncodingService encodingService, 71 public AbstractConsumerService(ActorSystemContext actorContext, DataDecodingEncodingService encodingService,
67 - TbDeviceProfileCache deviceProfileCache, TbQueueConsumer<TbProtoQueueMsg<N>> nfConsumer) { 72 + TbTenantProfileCache tenantProfileCache, TbDeviceProfileCache deviceProfileCache, TbApiUsageStateService apiUsageStateService, TbQueueConsumer<TbProtoQueueMsg<N>> nfConsumer) {
68 this.actorContext = actorContext; 73 this.actorContext = actorContext;
69 this.encodingService = encodingService; 74 this.encodingService = encodingService;
  75 + this.tenantProfileCache = tenantProfileCache;
70 this.deviceProfileCache = deviceProfileCache; 76 this.deviceProfileCache = deviceProfileCache;
  77 + this.apiUsageStateService = apiUsageStateService;
71 this.nfConsumer = nfConsumer; 78 this.nfConsumer = nfConsumer;
72 } 79 }
73 80
@@ -143,7 +150,14 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene @@ -143,7 +150,14 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
143 TbActorMsg actorMsg = actorMsgOpt.get(); 150 TbActorMsg actorMsg = actorMsgOpt.get();
144 if (actorMsg instanceof ComponentLifecycleMsg) { 151 if (actorMsg instanceof ComponentLifecycleMsg) {
145 ComponentLifecycleMsg componentLifecycleMsg = (ComponentLifecycleMsg) actorMsg; 152 ComponentLifecycleMsg componentLifecycleMsg = (ComponentLifecycleMsg) actorMsg;
146 - if (EntityType.DEVICE_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { 153 + if (EntityType.TENANT_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
  154 + TenantProfileId tenantProfileId = new TenantProfileId(componentLifecycleMsg.getEntityId().getId());
  155 + tenantProfileCache.evict(tenantProfileId);
  156 + apiUsageStateService.onTenantProfileUpdate(tenantProfileId);
  157 + } else if (EntityType.TENANT.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
  158 + tenantProfileCache.evict(componentLifecycleMsg.getTenantId());
  159 + apiUsageStateService.onTenantUpdate(componentLifecycleMsg.getTenantId());
  160 + } else if (EntityType.DEVICE_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
147 deviceProfileCache.evict(componentLifecycleMsg.getTenantId(), new DeviceProfileId(componentLifecycleMsg.getEntityId().getId())); 161 deviceProfileCache.evict(componentLifecycleMsg.getTenantId(), new DeviceProfileId(componentLifecycleMsg.getEntityId().getId()));
148 } else if (EntityType.DEVICE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { 162 } else if (EntityType.DEVICE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
149 deviceProfileCache.evict(new DeviceId(componentLifecycleMsg.getEntityId().getId())); 163 deviceProfileCache.evict(new DeviceId(componentLifecycleMsg.getEntityId().getId()));
@@ -21,11 +21,13 @@ import org.thingsboard.server.common.data.id.TenantId; @@ -21,11 +21,13 @@ import org.thingsboard.server.common.data.id.TenantId;
21 21
22 public interface ApiUsageStateService { 22 public interface ApiUsageStateService {
23 23
  24 + ApiUsageState createDefaultApiUsageState(TenantId id);
  25 +
  26 + ApiUsageState update(ApiUsageState apiUsageState);
  27 +
24 ApiUsageState findTenantApiUsageState(TenantId tenantId); 28 ApiUsageState findTenantApiUsageState(TenantId tenantId);
25 29
26 void deleteApiUsageStateByTenantId(TenantId tenantId); 30 void deleteApiUsageStateByTenantId(TenantId tenantId);
27 31
28 - ApiUsageState createDefaultApiUsageState(TenantId id);  
29 -  
30 ApiUsageState findApiUsageStateById(TenantId tenantId, ApiUsageStateId id); 32 ApiUsageState findApiUsageStateById(TenantId tenantId, ApiUsageStateId id);
31 } 33 }
@@ -16,6 +16,8 @@ @@ -16,6 +16,8 @@
16 package org.thingsboard.server.common.data; 16 package org.thingsboard.server.common.data;
17 17
18 import lombok.EqualsAndHashCode; 18 import lombok.EqualsAndHashCode;
  19 +import lombok.Getter;
  20 +import lombok.Setter;
19 import lombok.ToString; 21 import lombok.ToString;
20 import org.thingsboard.server.common.data.id.EntityId; 22 import org.thingsboard.server.common.data.id.EntityId;
21 import org.thingsboard.server.common.data.id.TenantId; 23 import org.thingsboard.server.common.data.id.TenantId;
@@ -27,8 +29,18 @@ public class ApiUsageState extends BaseData<ApiUsageStateId> implements HasTenan @@ -27,8 +29,18 @@ public class ApiUsageState extends BaseData<ApiUsageStateId> implements HasTenan
27 29
28 private static final long serialVersionUID = 8250339805336035966L; 30 private static final long serialVersionUID = 8250339805336035966L;
29 31
  32 + @Getter @Setter
30 private TenantId tenantId; 33 private TenantId tenantId;
  34 + @Getter @Setter
31 private EntityId entityId; 35 private EntityId entityId;
  36 + @Getter @Setter
  37 + private boolean transportEnabled;
  38 + @Getter @Setter
  39 + private boolean dbStorageEnabled;
  40 + @Getter @Setter
  41 + private boolean ruleEngineEnabled;
  42 + @Getter @Setter
  43 + private boolean jsExecEnabled;
32 44
33 public ApiUsageState() { 45 public ApiUsageState() {
34 super(); 46 super();
@@ -43,22 +55,4 @@ public class ApiUsageState extends BaseData<ApiUsageStateId> implements HasTenan @@ -43,22 +55,4 @@ public class ApiUsageState extends BaseData<ApiUsageStateId> implements HasTenan
43 this.tenantId = ur.getTenantId(); 55 this.tenantId = ur.getTenantId();
44 this.entityId = ur.getEntityId(); 56 this.entityId = ur.getEntityId();
45 } 57 }
46 -  
47 - @Override  
48 - public TenantId getTenantId() {  
49 - return tenantId;  
50 - }  
51 -  
52 - public void setTenantId(TenantId tenantId) {  
53 - this.tenantId = tenantId;  
54 - }  
55 -  
56 - public EntityId getEntityId() {  
57 - return entityId;  
58 - }  
59 -  
60 - public void setEntityId(EntityId entityId) {  
61 - this.entityId = entityId;  
62 - }  
63 -  
64 } 58 }
@@ -61,6 +61,14 @@ public class ApiApiUsageStateServiceImpl extends AbstractEntityService implement @@ -61,6 +61,14 @@ public class ApiApiUsageStateServiceImpl extends AbstractEntityService implement
61 } 61 }
62 62
63 @Override 63 @Override
  64 + public ApiUsageState update(ApiUsageState apiUsageState) {
  65 + log.trace("Executing save [{}]", apiUsageState.getTenantId());
  66 + validateId(apiUsageState.getTenantId(), INCORRECT_TENANT_ID + apiUsageState.getTenantId());
  67 + validateId(apiUsageState.getId(), "Can't save new usage state. Only update is allowed!");
  68 + return apiUsageStateDao.save(apiUsageState.getTenantId(), apiUsageState);
  69 + }
  70 +
  71 + @Override
64 public ApiUsageState findTenantApiUsageState(TenantId tenantId) { 72 public ApiUsageState findTenantApiUsageState(TenantId tenantId) {
65 log.trace("Executing findTenantUsageRecord, tenantId [{}]", tenantId); 73 log.trace("Executing findTenantUsageRecord, tenantId [{}]", tenantId);
66 validateId(tenantId, INCORRECT_TENANT_ID + tenantId); 74 validateId(tenantId, INCORRECT_TENANT_ID + tenantId);