Commit 0e0ab6ca3982d7e20c47267e1967010ad9e607f7

Authored by Andrii Shvaika
1 parent 1f6c0c54

Improvements to API State

Showing 25 changed files with 310 additions and 89 deletions
  1 +package org.thingsboard.server.service.apiusage;
  2 +
  3 +public enum ApiFeature {
  4 + TRANSPORT, DB, RE, JS
  5 +}
... ...
... ... @@ -17,6 +17,7 @@ package org.thingsboard.server.service.apiusage;
17 17
18 18 import lombok.extern.slf4j.Slf4j;
19 19 import org.springframework.beans.factory.annotation.Value;
  20 +import org.springframework.data.util.Pair;
20 21 import org.springframework.stereotype.Service;
21 22 import org.thingsboard.server.common.data.ApiUsageRecordKey;
22 23 import org.thingsboard.server.common.data.ApiUsageState;
... ... @@ -33,15 +34,18 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
33 34 import org.thingsboard.server.dao.usagerecord.ApiUsageStateService;
34 35 import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
35 36 import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto;
  37 +import org.thingsboard.server.queue.TbQueueCallback;
36 38 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
37 39 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
38 40 import org.thingsboard.server.queue.discovery.PartitionService;
39 41 import org.thingsboard.server.queue.scheduler.SchedulerComponent;
40 42 import org.thingsboard.server.queue.util.TbCoreComponent;
41 43 import org.thingsboard.server.service.profile.TbTenantProfileCache;
  44 +import org.thingsboard.server.service.queue.TbClusterService;
42 45
43 46 import javax.annotation.PostConstruct;
44 47 import java.util.ArrayList;
  48 +import java.util.HashMap;
45 49 import java.util.List;
46 50 import java.util.Map;
47 51 import java.util.UUID;
... ... @@ -57,12 +61,17 @@ import java.util.concurrent.locks.ReentrantLock;
57 61 public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
58 62
59 63 public static final String HOURLY = "HOURLY_";
  64 + private final TbClusterService clusterService;
60 65 private final PartitionService partitionService;
61 66 private final ApiUsageStateService apiUsageStateService;
62 67 private final TimeseriesService tsService;
63 68 private final SchedulerComponent scheduler;
64 69 private final TbTenantProfileCache tenantProfileCache;
65   - private final Map<TenantId, TenantApiUsageState> tenantStates = new ConcurrentHashMap<>();
  70 +
  71 + // Tenants that should be processed on this server
  72 + private final Map<TenantId, TenantApiUsageState> myTenantStates = new ConcurrentHashMap<>();
  73 + // Tenants that should be processed on other servers
  74 + private final Map<TenantId, ApiUsageState> otherTenantStates = new ConcurrentHashMap<>();
66 75
67 76 @Value("${usage.stats.report.enabled:true}")
68 77 private boolean enabled;
... ... @@ -72,7 +81,13 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
72 81
73 82 private final Lock updateLock = new ReentrantLock();
74 83
75   - public DefaultTbApiUsageStateService(PartitionService partitionService, ApiUsageStateService apiUsageStateService, TimeseriesService tsService, SchedulerComponent scheduler, TbTenantProfileCache tenantProfileCache) {
  84 + public DefaultTbApiUsageStateService(TbClusterService clusterService,
  85 + PartitionService partitionService,
  86 + ApiUsageStateService apiUsageStateService,
  87 + TimeseriesService tsService,
  88 + SchedulerComponent scheduler,
  89 + TbTenantProfileCache tenantProfileCache) {
  90 + this.clusterService = clusterService;
76 91 this.partitionService = partitionService;
77 92 this.apiUsageStateService = apiUsageStateService;
78 93 this.tsService = tsService;
... ... @@ -93,7 +108,7 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
93 108 TenantId tenantId = new TenantId(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB()));
94 109 TenantApiUsageState tenantState;
95 110 List<TsKvEntry> updatedEntries;
96   - boolean stateUpdated = false;
  111 + Map<ApiFeature, Boolean> result = new HashMap<>();
97 112 updateLock.lock();
98 113 try {
99 114 tenantState = getOrFetchState(tenantId);
... ... @@ -110,32 +125,49 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
110 125 updatedEntries.add(new BasicTsKvEntry(ts, new LongDataEntry(recordKey.name(), newValue)));
111 126 long newHourlyValue = tenantState.addToHourly(recordKey, kvProto.getValue());
112 127 updatedEntries.add(new BasicTsKvEntry(hourTs, new LongDataEntry(HOURLY + recordKey.name(), newHourlyValue)));
113   - stateUpdated |= tenantState.checkStateUpdatedDueToThreshold(recordKey);
  128 + Pair<ApiFeature, Boolean> update = tenantState.checkStateUpdatedDueToThreshold(recordKey);
  129 + if (update != null) {
  130 + result.put(update.getFirst(), update.getSecond());
  131 + }
114 132 }
115 133 } finally {
116 134 updateLock.unlock();
117 135 }
118 136 tsService.save(tenantId, tenantState.getApiUsageState().getId(), updatedEntries, 0L);
119   - if (stateUpdated) {
120   - // Save new state into the database;
121   - apiUsageStateService.update(tenantState.getApiUsageState());
122   - //TODO: clear cache on cluster repartition.
123   - //TODO: update profiles on tenant and profile updates.
124   - //TODO: broadcast to everyone notifications about enabled/disabled features.
  137 + if (!result.isEmpty()) {
  138 + persistAndNotify(tenantState, result);
125 139 }
126 140 }
127 141
128 142 @Override
129 143 public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
130 144 if (partitionChangeEvent.getServiceType().equals(ServiceType.TB_CORE)) {
131   - tenantStates.entrySet().removeIf(entry -> !partitionService.resolve(ServiceType.TB_CORE, entry.getKey(), entry.getKey()).isMyPartition());
  145 + myTenantStates.entrySet().removeIf(entry -> !partitionService.resolve(ServiceType.TB_CORE, entry.getKey(), entry.getKey()).isMyPartition());
  146 + otherTenantStates.entrySet().removeIf(entry -> partitionService.resolve(ServiceType.TB_CORE, entry.getKey(), entry.getKey()).isMyPartition());
132 147 }
133 148 }
134 149
135 150 @Override
136   - public TenantApiUsageState getApiUsageState(TenantId tenantId) {
137   - //We should always get it from the map of from the database;
138   - return null;
  151 + public ApiUsageState getApiUsageState(TenantId tenantId) {
  152 + if (partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).isMyPartition()) {
  153 + TenantApiUsageState state = getOrFetchState(tenantId);
  154 + return state.getApiUsageState();
  155 + } else {
  156 + ApiUsageState state = otherTenantStates.get(tenantId);
  157 + if (state == null) {
  158 + updateLock.lock();
  159 + try {
  160 + state = otherTenantStates.get(tenantId);
  161 + if (state == null) {
  162 + state = apiUsageStateService.findTenantApiUsageState(tenantId);
  163 + otherTenantStates.put(tenantId, state);
  164 + }
  165 + } finally {
  166 + updateLock.unlock();
  167 + }
  168 + }
  169 + return state;
  170 + }
139 171 }
140 172
141 173 @Override
... ... @@ -143,7 +175,7 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
143 175 TenantProfile tenantProfile = tenantProfileCache.get(tenantProfileId);
144 176 updateLock.lock();
145 177 try {
146   - tenantStates.values().forEach(state -> {
  178 + myTenantStates.values().forEach(state -> {
147 179 if (tenantProfile.getId().equals(state.getTenantProfileId())) {
148 180 updateTenantState(state, tenantProfile);
149 181 }
... ... @@ -158,7 +190,7 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
158 190 TenantProfile tenantProfile = tenantProfileCache.get(tenantId);
159 191 updateLock.lock();
160 192 try {
161   - TenantApiUsageState state = tenantStates.get(tenantId);
  193 + TenantApiUsageState state = myTenantStates.get(tenantId);
162 194 if (state != null && !state.getTenantProfileId().equals(tenantProfile.getId())) {
163 195 updateTenantState(state, tenantProfile);
164 196 }
... ... @@ -167,12 +199,28 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
167 199 }
168 200 }
169 201
  202 + @Override
  203 + public void onApiUsageStateUpdate(TenantId tenantId) {
  204 +
  205 + }
170 206
171 207 private void updateTenantState(TenantApiUsageState state, TenantProfile tenantProfile) {
172 208 state.setTenantProfileData(tenantProfile.getProfileData());
173   - if (state.checkStateUpdatedDueToThresholds()) {
174   - apiUsageStateService.update(state.getApiUsageState());
175   - //TODO: send notification to cluster;
  209 + Map<ApiFeature, Boolean> result = state.checkStateUpdatedDueToThresholds();
  210 + if (!result.isEmpty()) {
  211 + persistAndNotify(state, result);
  212 + }
  213 + }
  214 +
  215 + private void persistAndNotify(TenantApiUsageState state, Map<ApiFeature, Boolean> result) {
  216 + // TODO:
  217 + // 1. Broadcast to everyone notifications about enabled/disabled features.
  218 + // 2. Report rule engine and js executor metrics
  219 + // 4. UI for configuration of the thresholds
  220 + // 5. Max rule node executions per message.
  221 + apiUsageStateService.update(state.getApiUsageState());
  222 + if (result.containsKey(ApiFeature.TRANSPORT)) {
  223 + clusterService.onApiStateChange(state.getApiUsageState(), null);
176 224 }
177 225 }
178 226
... ... @@ -180,7 +228,7 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
180 228 updateLock.lock();
181 229 try {
182 230 long now = System.currentTimeMillis();
183   - tenantStates.values().forEach(state -> {
  231 + myTenantStates.values().forEach(state -> {
184 232 if ((state.getNextCycleTs() > now) && (state.getNextCycleTs() - now < TimeUnit.HOURS.toMillis(1))) {
185 233 state.setCycles(state.getNextCycleTs(), SchedulerUtils.getStartOfNextNextMonth());
186 234 }
... ... @@ -191,7 +239,7 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
191 239 }
192 240
193 241 private TenantApiUsageState getOrFetchState(TenantId tenantId) {
194   - TenantApiUsageState tenantState = tenantStates.get(tenantId);
  242 + TenantApiUsageState tenantState = myTenantStates.get(tenantId);
195 243 if (tenantState == null) {
196 244 ApiUsageState dbStateEntity = apiUsageStateService.findTenantApiUsageState(tenantId);
197 245 if (dbStateEntity == null) {
... ... @@ -223,7 +271,7 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
223 271 }
224 272 }
225 273 }
226   - tenantStates.put(tenantId, tenantState);
  274 + myTenantStates.put(tenantId, tenantState);
227 275 } catch (InterruptedException | ExecutionException e) {
228 276 log.warn("[{}] Failed to fetch api usage state from db.", tenantId, e);
229 277 }
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.service.apiusage;
17 17
18 18 import org.springframework.context.ApplicationListener;
  19 +import org.thingsboard.server.common.data.ApiUsageState;
19 20 import org.thingsboard.server.common.data.id.TenantId;
20 21 import org.thingsboard.server.common.data.id.TenantProfileId;
21 22 import org.thingsboard.server.common.msg.queue.TbCallback;
... ... @@ -27,9 +28,11 @@ public interface TbApiUsageStateService extends ApplicationListener<PartitionCha
27 28
28 29 void process(TbProtoQueueMsg<ToUsageStatsServiceMsg> msg, TbCallback callback);
29 30
30   - TenantApiUsageState getApiUsageState(TenantId tenantId);
  31 + ApiUsageState getApiUsageState(TenantId tenantId);
31 32
32 33 void onTenantProfileUpdate(TenantProfileId tenantProfileId);
33 34
34 35 void onTenantUpdate(TenantId tenantId);
  36 +
  37 + void onApiUsageStateUpdate(TenantId tenantId);
35 38 }
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -17,14 +17,17 @@ package org.thingsboard.server.service.apiusage;
17 17
18 18 import lombok.Getter;
19 19 import lombok.Setter;
  20 +import org.springframework.data.util.Pair;
20 21 import org.thingsboard.server.common.data.ApiUsageRecordKey;
21 22 import org.thingsboard.server.common.data.ApiUsageState;
22 23 import org.thingsboard.server.common.data.TenantProfile;
23 24 import org.thingsboard.server.common.data.TenantProfileData;
24 25 import org.thingsboard.server.common.data.id.EntityId;
  26 +import org.thingsboard.server.common.data.id.TenantId;
25 27 import org.thingsboard.server.common.data.id.TenantProfileId;
26 28 import org.thingsboard.server.common.msg.tools.SchedulerUtils;
27 29
  30 +import java.util.HashMap;
28 31 import java.util.Map;
29 32 import java.util.concurrent.ConcurrentHashMap;
30 33
... ... @@ -103,11 +106,17 @@ public class TenantApiUsageState {
103 106 return Long.parseLong((String) threshold);
104 107 } else if (threshold instanceof Long) {
105 108 return (Long) threshold;
  109 + } else if (threshold instanceof Integer) {
  110 + return (Integer) threshold;
106 111 }
107 112 }
108 113 return 0L;
109 114 }
110 115
  116 + public TenantId getTenantId() {
  117 + return apiUsageState.getTenantId();
  118 + }
  119 +
111 120 public EntityId getEntityId() {
112 121 return apiUsageState.getEntityId();
113 122 }
... ... @@ -121,7 +130,7 @@ public class TenantApiUsageState {
121 130 }
122 131
123 132 public boolean isRuleEngineEnabled() {
124   - return apiUsageState.isRuleEngineEnabled();
  133 + return apiUsageState.isReExecEnabled();
125 134 }
126 135
127 136 public boolean isJsExecEnabled() {
... ... @@ -137,7 +146,7 @@ public class TenantApiUsageState {
137 146 }
138 147
139 148 public void setRuleEngineEnabled(boolean ruleEngineEnabled) {
140   - apiUsageState.setRuleEngineEnabled(ruleEngineEnabled);
  149 + apiUsageState.setReExecEnabled(ruleEngineEnabled);
141 150 }
142 151
143 152 public void setJsExecEnabled(boolean jsExecEnabled) {
... ... @@ -147,7 +156,6 @@ public class TenantApiUsageState {
147 156 public boolean isFeatureEnabled(ApiUsageRecordKey recordKey) {
148 157 switch (recordKey) {
149 158 case MSG_COUNT:
150   - case MSG_BYTES_COUNT:
151 159 case DP_TRANSPORT_COUNT:
152 160 return isTransportEnabled();
153 161 case RE_EXEC_COUNT:
... ... @@ -161,38 +169,47 @@ public class TenantApiUsageState {
161 169 }
162 170 }
163 171
164   - public boolean setFeatureValue(ApiUsageRecordKey recordKey, boolean value) {
  172 + public ApiFeature setFeatureValue(ApiUsageRecordKey recordKey, boolean value) {
  173 + ApiFeature feature = null;
165 174 boolean currentValue = isFeatureEnabled(recordKey);
166 175 switch (recordKey) {
167 176 case MSG_COUNT:
168   - case MSG_BYTES_COUNT:
169 177 case DP_TRANSPORT_COUNT:
  178 + feature = ApiFeature.TRANSPORT;
170 179 setTransportEnabled(value);
171 180 break;
172 181 case RE_EXEC_COUNT:
  182 + feature = ApiFeature.RE;
173 183 setRuleEngineEnabled(value);
174 184 break;
175 185 case DP_STORAGE_COUNT:
  186 + feature = ApiFeature.DB;
176 187 setDbStorageEnabled(value);
177 188 break;
178 189 case JS_EXEC_COUNT:
  190 + feature = ApiFeature.JS;
179 191 setJsExecEnabled(value);
180 192 break;
181 193 }
182   - return currentValue == value;
  194 + return currentValue == value ? null : feature;
183 195 }
184 196
185   - public boolean checkStateUpdatedDueToThresholds() {
186   - boolean update = false;
  197 + public Map<ApiFeature, Boolean> checkStateUpdatedDueToThresholds() {
  198 + Map<ApiFeature, Boolean> result = new HashMap<>();
187 199 for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
188   - update |= checkStateUpdatedDueToThreshold(key);
  200 + Pair<ApiFeature, Boolean> featureUpdate = checkStateUpdatedDueToThreshold(key);
  201 + if (featureUpdate != null) {
  202 + result.put(featureUpdate.getFirst(), featureUpdate.getSecond());
  203 + }
189 204 }
190   - return update;
  205 + return result;
191 206 }
192 207
193   - public boolean checkStateUpdatedDueToThreshold(ApiUsageRecordKey recordKey) {
  208 + public Pair<ApiFeature, Boolean> checkStateUpdatedDueToThreshold(ApiUsageRecordKey recordKey) {
194 209 long value = get(recordKey);
195 210 long threshold = getProfileThreshold(recordKey);
196   - return setFeatureValue(recordKey, threshold == 0 || value < threshold);
  211 + boolean featureValue = threshold == 0 || value < threshold;
  212 + ApiFeature feature = setFeatureValue(recordKey, featureValue);
  213 + return feature != null ? Pair.of(feature, featureValue) : null;
197 214 }
198 215 }
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -21,6 +21,7 @@ import org.springframework.beans.factory.annotation.Value;
21 21 import org.springframework.scheduling.annotation.Scheduled;
22 22 import org.springframework.stereotype.Service;
23 23 import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg;
  24 +import org.thingsboard.server.common.data.ApiUsageState;
24 25 import org.thingsboard.server.common.data.DeviceProfile;
25 26 import org.thingsboard.server.common.data.EntityType;
26 27 import org.thingsboard.server.common.data.HasName;
... ... @@ -47,6 +48,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotifica
47 48 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
48 49 import org.thingsboard.server.queue.TbQueueCallback;
49 50 import org.thingsboard.server.queue.TbQueueProducer;
  51 +import org.thingsboard.server.queue.common.MultipleTbQueueCallbackWrapper;
50 52 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
51 53 import org.thingsboard.server.queue.discovery.PartitionService;
52 54 import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
... ... @@ -207,6 +209,12 @@ public class DefaultTbClusterService implements TbClusterService {
207 209 }
208 210
209 211 @Override
  212 + public void onApiStateChange(ApiUsageState apiUsageState, TbQueueCallback callback) {
  213 + onEntityChange(apiUsageState.getTenantId(), apiUsageState.getId(), apiUsageState, callback);
  214 + broadcast(new ComponentLifecycleMsg(apiUsageState.getTenantId(), apiUsageState.getId(), ComponentLifecycleEvent.UPDATED));
  215 + }
  216 +
  217 + @Override
210 218 public void onDeviceProfileDelete(DeviceProfile entity, TbQueueCallback callback) {
211 219 onEntityDelete(entity.getTenantId(), entity.getId(), entity.getName(), callback);
212 220 }
... ... @@ -221,13 +229,14 @@ public class DefaultTbClusterService implements TbClusterService {
221 229 onEntityDelete(TenantId.SYS_TENANT_ID, entity.getId(), entity.getName(), callback);
222 230 }
223 231
224   - public <T extends HasName> void onEntityChange(TenantId tenantId, EntityId entityid, T entity, TbQueueCallback callback) {
225   - log.trace("[{}][{}][{}] Processing [{}] change event", tenantId, entityid.getEntityType(), entityid.getId(), entity.getName());
  232 + public <T> void onEntityChange(TenantId tenantId, EntityId entityid, T entity, TbQueueCallback callback) {
  233 + String entityName = (entity instanceof HasName) ? ((HasName) entity).getName() : entity.getClass().getName();
  234 + log.trace("[{}][{}][{}] Processing [{}] change event", tenantId, entityid.getEntityType(), entityid.getId(), entityName);
226 235 TransportProtos.EntityUpdateMsg entityUpdateMsg = TransportProtos.EntityUpdateMsg.newBuilder()
227 236 .setEntityType(entityid.getEntityType().name())
228 237 .setData(ByteString.copyFrom(encodingService.encode(entity))).build();
229 238 ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setEntityUpdateMsg(entityUpdateMsg).build();
230   - broadcast(transportMsg);
  239 + broadcast(transportMsg, callback);
231 240 }
232 241
233 242 private void onEntityDelete(TenantId tenantId, EntityId entityId, String name, TbQueueCallback callback) {
... ... @@ -238,15 +247,16 @@ public class DefaultTbClusterService implements TbClusterService {
238 247 .setEntityIdLSB(entityId.getId().getLeastSignificantBits())
239 248 .build();
240 249 ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setEntityDeleteMsg(entityDeleteMsg).build();
241   - broadcast(transportMsg);
  250 + broadcast(transportMsg, callback);
242 251 }
243 252
244   - private void broadcast(ToTransportMsg transportMsg) {
  253 + private void broadcast(ToTransportMsg transportMsg, TbQueueCallback callback) {
245 254 TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> toTransportNfProducer = producerProvider.getTransportNotificationsMsgProducer();
246 255 Set<String> tbTransportServices = partitionService.getAllServiceIds(ServiceType.TB_TRANSPORT);
  256 + TbQueueCallback proxyCallback = callback != null ? new MultipleTbQueueCallbackWrapper(tbTransportServices.size(), callback) : null;
247 257 for (String transportServiceId : tbTransportServices) {
248 258 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transportServiceId);
249   - toTransportNfProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), transportMsg), null);
  259 + toTransportNfProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), transportMsg), proxyCallback);
250 260 toTransportNfs.incrementAndGet();
251 261 }
252 262 }
... ... @@ -256,7 +266,8 @@ public class DefaultTbClusterService implements TbClusterService {
256 266 TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> toRuleEngineProducer = producerProvider.getRuleEngineNotificationsMsgProducer();
257 267 Set<String> tbRuleEngineServices = new HashSet<>(partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE));
258 268 if (msg.getEntityId().getEntityType().equals(EntityType.TENANT)
259   - || msg.getEntityId().getEntityType().equals(EntityType.DEVICE_PROFILE)) {
  269 + || msg.getEntityId().getEntityType().equals(EntityType.DEVICE_PROFILE)
  270 + || msg.getEntityId().getEntityType().equals(EntityType.API_USAGE_STATE)) {
260 271 TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer();
261 272 Set<String> tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE);
262 273 for (String serviceId : tbCoreServices) {
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.service.queue;
17 17
18 18 import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg;
  19 +import org.thingsboard.server.common.data.ApiUsageState;
19 20 import org.thingsboard.server.common.data.DeviceProfile;
20 21 import org.thingsboard.server.common.data.Tenant;
21 22 import org.thingsboard.server.common.data.TenantProfile;
... ... @@ -63,4 +64,6 @@ public interface TbClusterService {
63 64 void onTenantChange(Tenant tenant, TbQueueCallback callback);
64 65
65 66 void onTenantDelete(Tenant tenant, TbQueueCallback callback);
  67 +
  68 + void onApiStateChange(ApiUsageState apiUsageState, TbQueueCallback callback);
66 69 }
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -161,6 +161,8 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
161 161 deviceProfileCache.evict(componentLifecycleMsg.getTenantId(), new DeviceProfileId(componentLifecycleMsg.getEntityId().getId()));
162 162 } else if (EntityType.DEVICE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
163 163 deviceProfileCache.evict(new DeviceId(componentLifecycleMsg.getEntityId().getId()));
  164 + } else if (EntityType.API_USAGE_STATE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
  165 + apiUsageStateService.onApiUsageStateUpdate(componentLifecycleMsg.getTenantId());
164 166 }
165 167 }
166 168 log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg);
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -25,6 +25,7 @@ import com.google.protobuf.ByteString;
25 25 import lombok.extern.slf4j.Slf4j;
26 26 import org.springframework.stereotype.Service;
27 27 import org.springframework.util.StringUtils;
  28 +import org.thingsboard.server.common.data.ApiUsageState;
28 29 import org.thingsboard.server.common.data.DataConstants;
29 30 import org.thingsboard.server.common.data.Device;
30 31 import org.thingsboard.server.common.data.DeviceProfile;
... ... @@ -51,7 +52,6 @@ import org.thingsboard.server.dao.device.DeviceService;
51 52 import org.thingsboard.server.dao.device.provision.ProvisionRequest;
52 53 import org.thingsboard.server.dao.device.provision.ProvisionResponse;
53 54 import org.thingsboard.server.dao.relation.RelationService;
54   -import org.thingsboard.server.dao.tenant.TenantService;
55 55 import org.thingsboard.server.dao.util.mapping.JacksonUtil;
56 56 import org.thingsboard.server.gen.transport.TransportProtos;
57 57 import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
... ... @@ -68,6 +68,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509Ce
68 68 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
69 69 import org.thingsboard.server.queue.util.TbCoreComponent;
70 70 import org.thingsboard.server.dao.device.provision.ProvisionFailedException;
  71 +import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
71 72 import org.thingsboard.server.service.executors.DbCallbackExecutorService;
72 73 import org.thingsboard.server.service.profile.TbDeviceProfileCache;
73 74 import org.thingsboard.server.service.profile.TbTenantProfileCache;
... ... @@ -92,6 +93,7 @@ public class DefaultTransportApiService implements TransportApiService {
92 93
93 94 private final TbDeviceProfileCache deviceProfileCache;
94 95 private final TbTenantProfileCache tenantProfileCache;
  96 + private final TbApiUsageStateService apiUsageStateService;
95 97 private final DeviceService deviceService;
96 98 private final RelationService relationService;
97 99 private final DeviceCredentialsService deviceCredentialsService;
... ... @@ -104,13 +106,14 @@ public class DefaultTransportApiService implements TransportApiService {
104 106 private final ConcurrentMap<String, ReentrantLock> deviceCreationLocks = new ConcurrentHashMap<>();
105 107
106 108 public DefaultTransportApiService(TbDeviceProfileCache deviceProfileCache,
107   - TbTenantProfileCache tenantProfileCache, DeviceService deviceService,
  109 + TbTenantProfileCache tenantProfileCache, TbApiUsageStateService apiUsageStateService, DeviceService deviceService,
108 110 RelationService relationService, DeviceCredentialsService deviceCredentialsService,
109 111 DeviceStateService deviceStateService, DbCallbackExecutorService dbCallbackExecutorService,
110 112 TbClusterService tbClusterService, DataDecodingEncodingService dataDecodingEncodingService,
111 113 DeviceProvisionService deviceProvisionService) {
112 114 this.deviceProfileCache = deviceProfileCache;
113 115 this.tenantProfileCache = tenantProfileCache;
  116 + this.apiUsageStateService = apiUsageStateService;
114 117 this.deviceService = deviceService;
115 118 this.relationService = relationService;
116 119 this.deviceCredentialsService = deviceCredentialsService;
... ... @@ -316,18 +319,21 @@ public class DefaultTransportApiService implements TransportApiService {
316 319 private ListenableFuture<TransportApiResponseMsg> handle(GetEntityProfileRequestMsg requestMsg) {
317 320 EntityType entityType = EntityType.valueOf(requestMsg.getEntityType());
318 321 UUID entityUuid = new UUID(requestMsg.getEntityIdMSB(), requestMsg.getEntityIdLSB());
319   - ByteString data;
  322 + GetEntityProfileResponseMsg.Builder builder = GetEntityProfileResponseMsg.newBuilder();
320 323 if (entityType.equals(EntityType.DEVICE_PROFILE)) {
321 324 DeviceProfileId deviceProfileId = new DeviceProfileId(entityUuid);
322 325 DeviceProfile deviceProfile = deviceProfileCache.find(deviceProfileId);
323   - data = ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile));
  326 + builder.setData(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile)));
324 327 } else if (entityType.equals(EntityType.TENANT)) {
325   - TenantProfile tenantProfile = tenantProfileCache.get(new TenantId(entityUuid));
326   - data = ByteString.copyFrom(dataDecodingEncodingService.encode(tenantProfile));
  328 + TenantId tenantId = new TenantId(entityUuid);
  329 + TenantProfile tenantProfile = tenantProfileCache.get(tenantId);
  330 + ApiUsageState state = apiUsageStateService.getApiUsageState(tenantId);
  331 + builder.setData(ByteString.copyFrom(dataDecodingEncodingService.encode(tenantProfile)));
  332 + builder.setApiState(ByteString.copyFrom(dataDecodingEncodingService.encode(state)));
327 333 } else {
328 334 throw new RuntimeException("Invalid entity profile request: " + entityType);
329 335 }
330   - return Futures.immediateFuture(TransportApiResponseMsg.newBuilder().setEntityProfileResponseMsg(GetEntityProfileResponseMsg.newBuilder().setData(data).build()).build());
  336 + return Futures.immediateFuture(TransportApiResponseMsg.newBuilder().setEntityProfileResponseMsg(builder).build());
331 337 }
332 338
333 339 private ListenableFuture<TransportApiResponseMsg> getDeviceInfo(DeviceId deviceId, DeviceCredentials credentials) {
... ...
... ... @@ -18,7 +18,6 @@ package org.thingsboard.server.common.data;
18 18 public enum ApiUsageRecordKey {
19 19
20 20 MSG_COUNT,
21   - MSG_BYTES_COUNT,
22 21 DP_TRANSPORT_COUNT,
23 22 DP_STORAGE_COUNT,
24 23 RE_EXEC_COUNT,
... ...
... ... @@ -29,18 +29,24 @@ public class ApiUsageState extends BaseData<ApiUsageStateId> implements HasTenan
29 29
30 30 private static final long serialVersionUID = 8250339805336035966L;
31 31
32   - @Getter @Setter
  32 + @Getter
  33 + @Setter
33 34 private TenantId tenantId;
34   - @Getter @Setter
  35 + @Getter
  36 + @Setter
35 37 private EntityId entityId;
36   - @Getter @Setter
37   - private boolean transportEnabled;
38   - @Getter @Setter
39   - private boolean dbStorageEnabled;
40   - @Getter @Setter
41   - private boolean ruleEngineEnabled;
42   - @Getter @Setter
43   - private boolean jsExecEnabled;
  38 + @Getter
  39 + @Setter
  40 + private boolean transportEnabled = true;
  41 + @Getter
  42 + @Setter
  43 + private boolean dbStorageEnabled = true;
  44 + @Getter
  45 + @Setter
  46 + private boolean reExecEnabled = true;
  47 + @Getter
  48 + @Setter
  49 + private boolean jsExecEnabled = true;
44 50
45 51 public ApiUsageState() {
46 52 super();
... ... @@ -54,5 +60,9 @@ public class ApiUsageState extends BaseData<ApiUsageStateId> implements HasTenan
54 60 super(ur);
55 61 this.tenantId = ur.getTenantId();
56 62 this.entityId = ur.getEntityId();
  63 + this.transportEnabled = ur.isTransportEnabled();
  64 + this.dbStorageEnabled = ur.isDbStorageEnabled();
  65 + this.reExecEnabled = ur.isReExecEnabled();
  66 + this.jsExecEnabled = ur.isJsExecEnabled();
57 67 }
58 68 }
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.common;
  17 +
  18 +import org.thingsboard.server.common.msg.queue.RuleEngineException;
  19 +import org.thingsboard.server.queue.TbQueueCallback;
  20 +import org.thingsboard.server.queue.TbQueueMsgMetadata;
  21 +
  22 +import java.util.concurrent.atomic.AtomicInteger;
  23 +
  24 +public class MultipleTbQueueCallbackWrapper implements TbQueueCallback {
  25 +
  26 + private final AtomicInteger tbQueueCallbackCount;
  27 + private final TbQueueCallback callback;
  28 +
  29 + public MultipleTbQueueCallbackWrapper(int tbQueueCallbackCount, TbQueueCallback callback) {
  30 + this.tbQueueCallbackCount = new AtomicInteger(tbQueueCallbackCount);
  31 + this.callback = callback;
  32 + }
  33 +
  34 + @Override
  35 + public void onSuccess(TbQueueMsgMetadata metadata) {
  36 + if (tbQueueCallbackCount.decrementAndGet() <= 0) {
  37 + callback.onSuccess(metadata);
  38 + }
  39 + }
  40 +
  41 + @Override
  42 + public void onFailure(Throwable t) {
  43 + callback.onFailure(new RuleEngineException(t.getMessage()));
  44 + }
  45 +}
... ...
... ... @@ -186,6 +186,7 @@ message GetEntityProfileRequestMsg {
186 186 message GetEntityProfileResponseMsg {
187 187 string entityType = 1;
188 188 bytes data = 2;
  189 + bytes apiState = 3;
189 190 }
190 191
191 192 message EntityUpdateMsg {
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -45,6 +45,7 @@ import org.thingsboard.server.common.data.DeviceTransportType;
45 45 import org.thingsboard.server.common.data.TransportPayloadType;
46 46 import org.thingsboard.server.common.data.device.profile.MqttTopics;
47 47 import org.thingsboard.server.common.msg.EncryptionUtil;
  48 +import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
48 49 import org.thingsboard.server.common.transport.SessionMsgListener;
49 50 import org.thingsboard.server.common.transport.TransportService;
50 51 import org.thingsboard.server.common.transport.TransportServiceCallback;
... ... @@ -630,7 +631,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
630 631
631 632 @Override
632 633 public void onError(Throwable e) {
633   - log.warn("[{}] Failed to submit session event", sessionId, e);
  634 + if (e instanceof TbRateLimitsException) {
  635 + log.trace("[{}] Failed to submit session event", sessionId, e);
  636 + } else {
  637 + log.warn("[{}] Failed to submit session event", sessionId, e);
  638 + }
634 639 ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
635 640 ctx.close();
636 641 }
... ...
... ... @@ -45,7 +45,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509Ce
45 45 */
46 46 public interface TransportService {
47 47
48   - GetEntityProfileResponseMsg getRoutingInfo(GetEntityProfileRequestMsg msg);
  48 + GetEntityProfileResponseMsg getEntityProfile(GetEntityProfileRequestMsg msg);
49 49
50 50 void process(DeviceTransportType transportType, ValidateDeviceTokenRequestMsg msg,
51 51 TransportServiceCallback<ValidateDeviceCredentialsResponse> callback);
... ... @@ -62,8 +62,6 @@ public interface TransportService {
62 62 void process(ProvisionDeviceRequestMsg msg,
63 63 TransportServiceCallback<ProvisionDeviceResponseMsg> callback);
64 64
65   - void onProfileUpdate(DeviceProfile deviceProfile);
66   -
67 65 boolean checkLimits(SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback);
68 66
69 67 boolean checkLimits(SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback, int dataPoints, TransportRateLimitType... limits);
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentMap;
33 33 @Slf4j
34 34 public class DefaultTransportRateLimitService implements TransportRateLimitService {
35 35
  36 + private final ConcurrentMap<TenantId, Boolean> tenantAllowed = new ConcurrentHashMap<>();
36 37 private final ConcurrentMap<TenantId, TransportRateLimit[]> perTenantLimits = new ConcurrentHashMap<>();
37 38 private final ConcurrentMap<DeviceId, TransportRateLimit[]> perDeviceLimits = new ConcurrentHashMap<>();
38 39
... ... @@ -46,6 +47,9 @@ public class DefaultTransportRateLimitService implements TransportRateLimitServi
46 47
47 48 @Override
48 49 public TransportRateLimitType checkLimits(TenantId tenantId, DeviceId deviceId, int dataPoints, TransportRateLimitType... limits) {
  50 + if (!tenantAllowed.getOrDefault(tenantId, Boolean.TRUE)) {
  51 + return TransportRateLimitType.TENANT_ADDED_TO_DISABLED_LIST;
  52 + }
49 53 TransportRateLimit[] tenantLimits = getTenantRateLimits(tenantId);
50 54 TransportRateLimit[] deviceLimits = getDeviceRateLimits(tenantId, deviceId);
51 55 for (TransportRateLimitType limitType : limits) {
... ... @@ -85,6 +89,11 @@ public class DefaultTransportRateLimitService implements TransportRateLimitServi
85 89 perDeviceLimits.remove(deviceId);
86 90 }
87 91
  92 + @Override
  93 + public void update(TenantId tenantId, boolean allowed) {
  94 + tenantAllowed.put(tenantId, allowed);
  95 + }
  96 +
88 97 private void mergeLimits(TenantId tenantId, TransportRateLimit[] newRateLimits) {
89 98 TransportRateLimit[] oldRateLimits = perTenantLimits.get(tenantId);
90 99 if (oldRateLimits == null) {
... ...
... ... @@ -31,4 +31,5 @@ public interface TransportRateLimitService {
31 31
32 32 void remove(DeviceId deviceId);
33 33
  34 + void update(TenantId tenantId, boolean transportEnabled);
34 35 }
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -19,6 +19,7 @@ import lombok.Getter;
19 19
20 20 public enum TransportRateLimitType {
21 21
  22 + TENANT_ADDED_TO_DISABLED_LIST("general.tenant.disabled", true, false),
22 23 TENANT_MAX_MSGS("transport.tenant.msg", true, true),
23 24 TENANT_TELEMETRY_MSGS("transport.tenant.telemetry", true, true),
24 25 TENANT_MAX_DATA_POINTS("transport.tenant.dataPoints", true, false),
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -26,6 +26,7 @@ import org.springframework.beans.factory.annotation.Value;
26 26 import org.springframework.stereotype.Service;
27 27 import org.thingsboard.common.util.ThingsBoardThreadFactory;
28 28 import org.thingsboard.server.common.data.ApiUsageRecordKey;
  29 +import org.thingsboard.server.common.data.ApiUsageState;
29 30 import org.thingsboard.server.common.data.DeviceProfile;
30 31 import org.thingsboard.server.common.data.DeviceTransportType;
31 32 import org.thingsboard.server.common.data.EntityType;
... ... @@ -237,7 +238,7 @@ public class DefaultTransportService implements TransportService {
237 238 }
238 239
239 240 @Override
240   - public TransportProtos.GetEntityProfileResponseMsg getRoutingInfo(TransportProtos.GetEntityProfileRequestMsg msg) {
  241 + public TransportProtos.GetEntityProfileResponseMsg getEntityProfile(TransportProtos.GetEntityProfileRequestMsg msg) {
241 242 TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg =
242 243 new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setEntityProfileRequestMsg(msg).build());
243 244 try {
... ... @@ -641,8 +642,7 @@ public class DefaultTransportService implements TransportService {
641 642 onProfileUpdate(deviceProfile);
642 643 }
643 644 } else if (EntityType.TENANT_PROFILE.equals(entityType)) {
644   - TenantProfileUpdateResult update = tenantProfileCache.put(msg.getData());
645   - rateLimitService.update(update);
  645 + rateLimitService.update(tenantProfileCache.put(msg.getData()));
646 646 } else if (EntityType.TENANT.equals(entityType)) {
647 647 Optional<Tenant> profileOpt = dataDecodingEncodingService.decode(msg.getData().toByteArray());
648 648 if (profileOpt.isPresent()) {
... ... @@ -652,6 +652,12 @@ public class DefaultTransportService implements TransportService {
652 652 rateLimitService.update(tenant.getId());
653 653 }
654 654 }
  655 + } else if (EntityType.API_USAGE_STATE.equals(entityType)) {
  656 + Optional<ApiUsageState> stateOpt = dataDecodingEncodingService.decode(msg.getData().toByteArray());
  657 + if (stateOpt.isPresent()) {
  658 + ApiUsageState apiUsageState = stateOpt.get();
  659 + rateLimitService.update(apiUsageState.getTenantId(), apiUsageState.isTransportEnabled());
  660 + }
655 661 }
656 662 } else if (toSessionMsg.hasEntityDeleteMsg()) {
657 663 TransportProtos.EntityDeleteMsg msg = toSessionMsg.getEntityDeleteMsg();
... ... @@ -673,8 +679,7 @@ public class DefaultTransportService implements TransportService {
673 679 }
674 680 }
675 681
676   - @Override
677   - public void onProfileUpdate(DeviceProfile deviceProfile) {
  682 + private void onProfileUpdate(DeviceProfile deviceProfile) {
678 683 long deviceProfileIdMSB = deviceProfile.getId().getId().getMostSignificantBits();
679 684 long deviceProfileIdLSB = deviceProfile.getId().getId().getLeastSignificantBits();
680 685 sessions.forEach((id, md) -> {
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -18,21 +18,19 @@ package org.thingsboard.server.common.transport.service;
18 18 import com.google.protobuf.ByteString;
19 19 import lombok.extern.slf4j.Slf4j;
20 20 import org.springframework.beans.factory.annotation.Autowired;
21   -import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
22 21 import org.springframework.context.annotation.Lazy;
23 22 import org.springframework.stereotype.Component;
24   -import org.thingsboard.server.common.data.DeviceProfile;
  23 +import org.thingsboard.server.common.data.ApiUsageState;
25 24 import org.thingsboard.server.common.data.EntityType;
26 25 import org.thingsboard.server.common.data.TenantProfile;
27 26 import org.thingsboard.server.common.data.id.TenantId;
28 27 import org.thingsboard.server.common.data.id.TenantProfileId;
29 28 import org.thingsboard.server.common.transport.TransportService;
30 29 import org.thingsboard.server.common.transport.TransportTenantProfileCache;
  30 +import org.thingsboard.server.common.transport.limits.TransportRateLimitService;
31 31 import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult;
32 32 import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
33 33 import org.thingsboard.server.gen.transport.TransportProtos;
34   -import org.thingsboard.server.queue.discovery.TenantRoutingInfo;
35   -import org.thingsboard.server.queue.discovery.TenantRoutingInfoService;
36 34 import org.thingsboard.server.queue.util.TbTransportComponent;
37 35
38 36 import java.util.Collections;
... ... @@ -54,10 +52,17 @@ public class DefaultTransportTenantProfileCache implements TransportTenantProfil
54 52 private final ConcurrentMap<TenantProfileId, Set<TenantId>> tenantProfileIds = new ConcurrentHashMap<>();
55 53 private final DataDecodingEncodingService dataDecodingEncodingService;
56 54
  55 + private TransportRateLimitService rateLimitService;
57 56 private TransportService transportService;
58 57
59 58 @Lazy
60 59 @Autowired
  60 + public void setRateLimitService(TransportRateLimitService rateLimitService) {
  61 + this.rateLimitService = rateLimitService;
  62 + }
  63 +
  64 + @Lazy
  65 + @Autowired
61 66 public void setTransportService(TransportService transportService) {
62 67 this.transportService = transportService;
63 68 }
... ... @@ -77,7 +82,8 @@ public class DefaultTransportTenantProfileCache implements TransportTenantProfil
77 82 if (profileOpt.isPresent()) {
78 83 TenantProfile newProfile = profileOpt.get();
79 84 log.trace("[{}] put: {}", newProfile.getId(), newProfile);
80   - return new TenantProfileUpdateResult(newProfile, tenantProfileIds.get(newProfile.getId()));
  85 + Set<TenantId> affectedTenants = tenantProfileIds.get(newProfile.getId());
  86 + return new TenantProfileUpdateResult(newProfile, affectedTenants != null ? affectedTenants : Collections.emptySet());
81 87 } else {
82 88 log.warn("Failed to decode profile: {}", profileBody.toString());
83 89 return new TenantProfileUpdateResult(null, Collections.emptySet());
... ... @@ -127,8 +133,8 @@ public class DefaultTransportTenantProfileCache implements TransportTenantProfil
127 133 .setEntityIdMSB(tenantId.getId().getMostSignificantBits())
128 134 .setEntityIdLSB(tenantId.getId().getLeastSignificantBits())
129 135 .build();
130   - TransportProtos.GetEntityProfileResponseMsg routingInfo = transportService.getRoutingInfo(msg);
131   - Optional<TenantProfile> profileOpt = dataDecodingEncodingService.decode(routingInfo.getData().toByteArray());
  136 + TransportProtos.GetEntityProfileResponseMsg entityProfileMsg = transportService.getEntityProfile(msg);
  137 + Optional<TenantProfile> profileOpt = dataDecodingEncodingService.decode(entityProfileMsg.getData().toByteArray());
132 138 if (profileOpt.isPresent()) {
133 139 profile = profileOpt.get();
134 140 TenantProfile existingProfile = profiles.get(profile.getId());
... ... @@ -140,9 +146,11 @@ public class DefaultTransportTenantProfileCache implements TransportTenantProfil
140 146 tenantProfileIds.computeIfAbsent(profile.getId(), id -> ConcurrentHashMap.newKeySet()).add(tenantId);
141 147 tenantIds.put(tenantId, profile.getId());
142 148 } else {
143   - log.warn("[{}] Can't decode tenant profile: {}", tenantId, routingInfo.getData());
  149 + log.warn("[{}] Can't decode tenant profile: {}", tenantId, entityProfileMsg.getData());
144 150 throw new RuntimeException("Can't decode tenant profile!");
145 151 }
  152 + Optional<ApiUsageState> apiStateOpt = dataDecodingEncodingService.decode(entityProfileMsg.getApiState().toByteArray());
  153 + apiStateOpt.ifPresent(apiUsageState -> rateLimitService.update(tenantId, apiUsageState.isTransportEnabled()));
146 154 }
147 155 } finally {
148 156 tenantProfileFetchLock.unlock();
... ...
... ... @@ -445,6 +445,10 @@ public class ModelConstants {
445 445 public static final String API_USAGE_STATE_TENANT_ID_COLUMN = TENANT_ID_PROPERTY;
446 446 public static final String API_USAGE_STATE_ENTITY_TYPE_COLUMN = ENTITY_TYPE_COLUMN;
447 447 public static final String API_USAGE_STATE_ENTITY_ID_COLUMN = ENTITY_ID_COLUMN;
  448 + public static final String API_USAGE_STATE_TRANSPORT_ENABLED_COLUMN = "transport_enabled";
  449 + public static final String API_USAGE_STATE_DB_STORAGE_ENABLED_COLUMN = "db_storage_enabled";
  450 + public static final String API_USAGE_STATE_RE_EXEC_ENABLED_COLUMN = "re_exec_enabled";
  451 + public static final String API_USAGE_STATE_JS_EXEC_ENABLED_COLUMN = "js_exec_enabled";
448 452
449 453 /**
450 454 * Cassandra attributes and timeseries constants.
... ...
... ... @@ -17,6 +17,8 @@ package org.thingsboard.server.dao.model.sql;
17 17
18 18 import lombok.Data;
19 19 import lombok.EqualsAndHashCode;
  20 +import lombok.Getter;
  21 +import lombok.Setter;
20 22 import org.hibernate.annotations.TypeDef;
21 23 import org.thingsboard.server.common.data.ApiUsageState;
22 24 import org.thingsboard.server.common.data.id.EntityIdFactory;
... ... @@ -51,6 +53,15 @@ public class ApiUsageStateEntity extends BaseSqlEntity<ApiUsageState> implements
51 53 @Column(name = ModelConstants.API_USAGE_STATE_ENTITY_ID_COLUMN)
52 54 private UUID entityId;
53 55
  56 + @Column(name = ModelConstants.API_USAGE_STATE_TRANSPORT_ENABLED_COLUMN)
  57 + private boolean transportEnabled = true;
  58 + @Column(name = ModelConstants.API_USAGE_STATE_DB_STORAGE_ENABLED_COLUMN)
  59 + private boolean dbStorageEnabled = true;
  60 + @Column(name = ModelConstants.API_USAGE_STATE_RE_EXEC_ENABLED_COLUMN)
  61 + private boolean reExecEnabled = true;
  62 + @Column(name = ModelConstants.API_USAGE_STATE_JS_EXEC_ENABLED_COLUMN)
  63 + private boolean jsExecEnabled = true;
  64 +
54 65 public ApiUsageStateEntity() {
55 66 }
56 67
... ... @@ -66,6 +77,10 @@ public class ApiUsageStateEntity extends BaseSqlEntity<ApiUsageState> implements
66 77 this.entityType = ur.getEntityId().getEntityType().name();
67 78 this.entityId = ur.getEntityId().getId();
68 79 }
  80 + this.transportEnabled = ur.isTransportEnabled();
  81 + this.dbStorageEnabled = ur.isDbStorageEnabled();
  82 + this.reExecEnabled = ur.isReExecEnabled();
  83 + this.jsExecEnabled = ur.isJsExecEnabled();
69 84 }
70 85
71 86 @Override
... ... @@ -78,6 +93,10 @@ public class ApiUsageStateEntity extends BaseSqlEntity<ApiUsageState> implements
78 93 if (entityId != null) {
79 94 ur.setEntityId(EntityIdFactory.getByTypeAndUuid(entityType, entityId));
80 95 }
  96 + ur.setTransportEnabled(transportEnabled);
  97 + ur.setDbStorageEnabled(dbStorageEnabled);
  98 + ur.setReExecEnabled(reExecEnabled);
  99 + ur.setJsExecEnabled(jsExecEnabled);
81 100 return ur;
82 101 }
83 102
... ...
dao/src/main/java/org/thingsboard/server/dao/usagerecord/ApiUsageStateServiceImpl.java renamed from dao/src/main/java/org/thingsboard/server/dao/usagerecord/ApiApiUsageStateServiceImpl.java
... ... @@ -31,13 +31,13 @@ import static org.thingsboard.server.dao.service.Validator.validateId;
31 31
32 32 @Service
33 33 @Slf4j
34   -public class ApiApiUsageStateServiceImpl extends AbstractEntityService implements ApiUsageStateService {
  34 +public class ApiUsageStateServiceImpl extends AbstractEntityService implements ApiUsageStateService {
35 35 public static final String INCORRECT_TENANT_ID = "Incorrect tenantId ";
36 36
37 37 private final ApiUsageStateDao apiUsageStateDao;
38 38 private final TenantDao tenantDao;
39 39
40   - public ApiApiUsageStateServiceImpl(TenantDao tenantDao, ApiUsageStateDao apiUsageStateDao) {
  40 + public ApiUsageStateServiceImpl(TenantDao tenantDao, ApiUsageStateDao apiUsageStateDao) {
41 41 this.tenantDao = tenantDao;
42 42 this.apiUsageStateDao = apiUsageStateDao;
43 43 }
... ...
... ... @@ -411,5 +411,9 @@ CREATE TABLE IF NOT EXISTS api_usage_state (
411 411 tenant_id uuid,
412 412 entity_type varchar(32),
413 413 entity_id uuid,
  414 + transport_enabled boolean,
  415 + db_storage_enabled boolean,
  416 + re_exec_enabled boolean,
  417 + js_exec_enabled boolean,
414 418 CONSTRAINT api_usage_state_unq_key UNIQUE (tenant_id, entity_id)
415 419 );
... ...
... ... @@ -437,6 +437,10 @@ CREATE TABLE IF NOT EXISTS api_usage_state (
437 437 tenant_id uuid,
438 438 entity_type varchar(32),
439 439 entity_id uuid,
  440 + transport_enabled boolean,
  441 + db_storage_enabled boolean,
  442 + re_exec_enabled boolean,
  443 + js_exec_enabled boolean,
440 444 CONSTRAINT api_usage_state_unq_key UNIQUE (tenant_id, entity_id)
441 445 );
442 446
... ...
... ... @@ -48,4 +48,17 @@ public abstract class BaseApiUsageStateServiceTest extends AbstractServiceTest {
48 48 Assert.assertNotNull(apiUsageState);
49 49 }
50 50
  51 + @Test
  52 + public void testUpdateApiUsageState(){
  53 + ApiUsageState apiUsageState = apiUsageStateService.findTenantApiUsageState(tenantId);
  54 + Assert.assertNotNull(apiUsageState);
  55 + Assert.assertTrue(apiUsageState.isTransportEnabled());
  56 + apiUsageState.setTransportEnabled(false);
  57 + apiUsageState = apiUsageStateService.update(apiUsageState);
  58 + Assert.assertNotNull(apiUsageState);
  59 + apiUsageState = apiUsageStateService.findTenantApiUsageState(tenantId);
  60 + Assert.assertNotNull(apiUsageState);
  61 + Assert.assertFalse(apiUsageState.isTransportEnabled());
  62 + }
  63 +
51 64 }
... ...