Commit f89b30777e6bad9c96e0cfae8010002b04b00fc1

Authored by Viacheslav Klimov
Committed by Andrew Shvayka
1 parent 9900cc3d

Initial implementation for per-customer api usage stats

Showing 46 changed files with 541 additions and 294 deletions
... ... @@ -106,7 +106,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
106 106 int ruleNodeCount = tbMsg.getAndIncrementRuleNodeCounter();
107 107 int maxRuleNodeExecutionsPerMessage = getTenantProfileConfiguration().getMaxRuleNodeExecsPerMessage();
108 108 if (maxRuleNodeExecutionsPerMessage == 0 || ruleNodeCount < maxRuleNodeExecutionsPerMessage) {
109   - apiUsageClient.report(tenantId, ApiUsageRecordKey.RE_EXEC_COUNT);
  109 + apiUsageClient.report(tenantId, tbMsg.getCustomerId(), ApiUsageRecordKey.RE_EXEC_COUNT);
110 110 if (ruleNode.isDebugMode()) {
111 111 systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), "Self");
112 112 }
... ... @@ -127,7 +127,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
127 127 int ruleNodeCount = tbMsg.getAndIncrementRuleNodeCounter();
128 128 int maxRuleNodeExecutionsPerMessage = getTenantProfileConfiguration().getMaxRuleNodeExecsPerMessage();
129 129 if (maxRuleNodeExecutionsPerMessage == 0 || ruleNodeCount < maxRuleNodeExecutionsPerMessage) {
130   - apiUsageClient.report(tenantId, ApiUsageRecordKey.RE_EXEC_COUNT);
  130 + apiUsageClient.report(tenantId, tbMsg.getCustomerId(), ApiUsageRecordKey.RE_EXEC_COUNT);
131 131 if (ruleNode.isDebugMode()) {
132 132 systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), msg.getFromRelationType());
133 133 }
... ...
... ... @@ -926,7 +926,7 @@ public abstract class BaseController {
926 926 tenantId = ((HasTenantId) entity).getTenantId();
927 927 }
928 928 }
929   - tbClusterService.pushMsgToRuleEngine(tenantId, entityId, tbMsg, null);
  929 + tbClusterService.pushMsgToRuleEngine(tenantId, customerId, entityId, tbMsg, null);
930 930 } catch (Exception e) {
931 931 log.warn("[{}] Failed to push entity action to rule engine: {}", entityId, actionType, e);
932 932 }
... ...
... ... @@ -646,7 +646,7 @@ public class DeviceController extends BaseController {
646 646 String data = entityToStr(assignedDevice);
647 647 if (data != null) {
648 648 TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_ASSIGNED_FROM_TENANT, assignedDevice.getId(), getMetaDataForAssignedFrom(currentTenant), TbMsgDataType.JSON, data);
649   - tbClusterService.pushMsgToRuleEngine(newTenantId, assignedDevice.getId(), tbMsg, null);
  649 + tbClusterService.pushMsgToRuleEngine(newTenantId, assignedDevice.getCustomerId(), assignedDevice.getId(), tbMsg, null);
650 650 }
651 651 }
652 652
... ...
... ... @@ -119,7 +119,7 @@ public class RpcController extends BaseController {
119 119 expTime,
120 120 body
121 121 );
122   - deviceRpcService.processRestApiRpcRequest(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse));
  122 + deviceRpcService.processRestApiRpcRequest(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse), currentUser);
123 123 }
124 124
125 125 @Override
... ...
... ... @@ -48,6 +48,7 @@ import org.thingsboard.server.common.data.EntityType;
48 48 import org.thingsboard.server.common.data.TenantProfile;
49 49 import org.thingsboard.server.common.data.audit.ActionType;
50 50 import org.thingsboard.server.common.data.exception.ThingsboardException;
  51 +import org.thingsboard.server.common.data.id.CustomerId;
51 52 import org.thingsboard.server.common.data.id.DeviceId;
52 53 import org.thingsboard.server.common.data.id.EntityId;
53 54 import org.thingsboard.server.common.data.id.EntityIdFactory;
... ... @@ -449,7 +450,7 @@ public class TelemetryController extends BaseController {
449 450 TenantProfile tenantProfile = tenantProfileCache.get(tenantId);
450 451 tenantTtl = TimeUnit.DAYS.toSeconds(((DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration()).getDefaultStorageTtlDays());
451 452 }
452   - tsSubService.saveAndNotify(tenantId, entityId, entries, tenantTtl, new FutureCallback<Void>() {
  453 + tsSubService.saveAndNotify(tenantId, user.getCustomerId(), entityId, entries, tenantTtl, new FutureCallback<Void>() {
453 454 @Override
454 455 public void onSuccess(@Nullable Void tmp) {
455 456 logTelemetryUpdated(user, entityId, entries, null);
... ...
  1 +package org.thingsboard.server.service.apiusage;
  2 +
  3 +import lombok.Getter;
  4 +import org.springframework.data.util.Pair;
  5 +import org.thingsboard.server.common.data.ApiFeature;
  6 +import org.thingsboard.server.common.data.ApiUsageRecordKey;
  7 +import org.thingsboard.server.common.data.ApiUsageState;
  8 +import org.thingsboard.server.common.data.ApiUsageStateValue;
  9 +import org.thingsboard.server.common.data.EntityType;
  10 +import org.thingsboard.server.common.data.id.EntityId;
  11 +import org.thingsboard.server.common.data.id.TenantId;
  12 +import org.thingsboard.server.common.msg.tools.SchedulerUtils;
  13 +
  14 +import java.util.Arrays;
  15 +import java.util.HashMap;
  16 +import java.util.HashSet;
  17 +import java.util.Map;
  18 +import java.util.Set;
  19 +import java.util.concurrent.ConcurrentHashMap;
  20 +
  21 +public abstract class BaseApiUsageState {
  22 + private final Map<ApiUsageRecordKey, Long> currentCycleValues = new ConcurrentHashMap<>();
  23 + private final Map<ApiUsageRecordKey, Long> currentHourValues = new ConcurrentHashMap<>();
  24 +
  25 + @Getter
  26 + private final ApiUsageState apiUsageState;
  27 + @Getter
  28 + private volatile long currentCycleTs;
  29 + @Getter
  30 + private volatile long nextCycleTs;
  31 + @Getter
  32 + private volatile long currentHourTs;
  33 +
  34 + public BaseApiUsageState(ApiUsageState apiUsageState) {
  35 + this.apiUsageState = apiUsageState;
  36 + this.currentCycleTs = SchedulerUtils.getStartOfCurrentMonth();
  37 + this.nextCycleTs = SchedulerUtils.getStartOfNextMonth();
  38 + this.currentHourTs = SchedulerUtils.getStartOfCurrentHour();
  39 + }
  40 +
  41 + public void put(ApiUsageRecordKey key, Long value) {
  42 + currentCycleValues.put(key, value);
  43 + }
  44 +
  45 + public void putHourly(ApiUsageRecordKey key, Long value) {
  46 + currentHourValues.put(key, value);
  47 + }
  48 +
  49 + public long add(ApiUsageRecordKey key, long value) {
  50 + long result = currentCycleValues.getOrDefault(key, 0L) + value;
  51 + currentCycleValues.put(key, result);
  52 + return result;
  53 + }
  54 +
  55 + public long get(ApiUsageRecordKey key) {
  56 + return currentCycleValues.getOrDefault(key, 0L);
  57 + }
  58 +
  59 + public long addToHourly(ApiUsageRecordKey key, long value) {
  60 + long result = currentHourValues.getOrDefault(key, 0L) + value;
  61 + currentHourValues.put(key, result);
  62 + return result;
  63 + }
  64 +
  65 + public void setHour(long currentHourTs) {
  66 + this.currentHourTs = currentHourTs;
  67 + for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
  68 + currentHourValues.put(key, 0L);
  69 + }
  70 + }
  71 +
  72 + public void setCycles(long currentCycleTs, long nextCycleTs) {
  73 + this.currentCycleTs = currentCycleTs;
  74 + this.nextCycleTs = nextCycleTs;
  75 + for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
  76 + currentCycleValues.put(key, 0L);
  77 + }
  78 + }
  79 +
  80 + public ApiUsageStateValue getFeatureValue(ApiFeature feature) {
  81 + switch (feature) {
  82 + case TRANSPORT:
  83 + return apiUsageState.getTransportState();
  84 + case RE:
  85 + return apiUsageState.getReExecState();
  86 + case DB:
  87 + return apiUsageState.getDbStorageState();
  88 + case JS:
  89 + return apiUsageState.getJsExecState();
  90 + case EMAIL:
  91 + return apiUsageState.getEmailExecState();
  92 + case SMS:
  93 + return apiUsageState.getSmsExecState();
  94 + default:
  95 + return ApiUsageStateValue.ENABLED;
  96 + }
  97 + }
  98 +
  99 + public boolean setFeatureValue(ApiFeature feature, ApiUsageStateValue value) {
  100 + ApiUsageStateValue currentValue = getFeatureValue(feature);
  101 + switch (feature) {
  102 + case TRANSPORT:
  103 + apiUsageState.setTransportState(value);
  104 + break;
  105 + case RE:
  106 + apiUsageState.setReExecState(value);
  107 + break;
  108 + case DB:
  109 + apiUsageState.setDbStorageState(value);
  110 + break;
  111 + case JS:
  112 + apiUsageState.setJsExecState(value);
  113 + break;
  114 + case EMAIL:
  115 + apiUsageState.setEmailExecState(value);
  116 + break;
  117 + case SMS:
  118 + apiUsageState.setSmsExecState(value);
  119 + break;
  120 + }
  121 + return !currentValue.equals(value);
  122 + }
  123 +
  124 + public Map<ApiFeature, ApiUsageStateValue> checkStateUpdatedDueToThresholds() {
  125 + return checkStateUpdatedDueToThreshold(new HashSet<>(Arrays.asList(ApiFeature.values())));
  126 + }
  127 +
  128 + public Map<ApiFeature, ApiUsageStateValue> checkStateUpdatedDueToThreshold(Set<ApiFeature> features) {
  129 + Map<ApiFeature, ApiUsageStateValue> result = new HashMap<>();
  130 + for (ApiFeature feature : features) {
  131 + Pair<ApiFeature, ApiUsageStateValue> tmp = checkStateUpdatedDueToThreshold(feature);
  132 + if (tmp != null) {
  133 + result.put(tmp.getFirst(), tmp.getSecond());
  134 + }
  135 + }
  136 + return result;
  137 + }
  138 +
  139 + public abstract Pair<ApiFeature, ApiUsageStateValue> checkStateUpdatedDueToThreshold(ApiFeature feature);
  140 +
  141 + public abstract EntityType getEntityType();
  142 +
  143 + public TenantId getTenantId() {
  144 + return getApiUsageState().getTenantId();
  145 + }
  146 +
  147 + public EntityId getEntityId() {
  148 + return getApiUsageState().getEntityId();
  149 + }
  150 +}
... ...
  1 +package org.thingsboard.server.service.apiusage;
  2 +
  3 +import org.springframework.data.util.Pair;
  4 +import org.thingsboard.server.common.data.ApiFeature;
  5 +import org.thingsboard.server.common.data.ApiUsageState;
  6 +import org.thingsboard.server.common.data.ApiUsageStateValue;
  7 +import org.thingsboard.server.common.data.EntityType;
  8 +
  9 +public class CustomerApiUsageState extends BaseApiUsageState {
  10 + public CustomerApiUsageState(ApiUsageState apiUsageState) {
  11 + super(apiUsageState);
  12 + }
  13 +
  14 + @Override
  15 + public Pair<ApiFeature, ApiUsageStateValue> checkStateUpdatedDueToThreshold(ApiFeature feature) {
  16 + ApiUsageStateValue featureValue = ApiUsageStateValue.ENABLED;
  17 + return setFeatureValue(feature, featureValue) ? Pair.of(feature, featureValue) : null;
  18 + }
  19 +
  20 + @Override
  21 + public EntityType getEntityType() {
  22 + return EntityType.CUSTOMER;
  23 + }
  24 +}
... ...
... ... @@ -29,10 +29,13 @@ import org.thingsboard.server.common.data.ApiUsageRecordKey;
29 29 import org.thingsboard.server.common.data.ApiUsageState;
30 30 import org.thingsboard.server.common.data.ApiUsageStateMailMessage;
31 31 import org.thingsboard.server.common.data.ApiUsageStateValue;
  32 +import org.thingsboard.server.common.data.EntityType;
32 33 import org.thingsboard.server.common.data.Tenant;
33 34 import org.thingsboard.server.common.data.TenantProfile;
34 35 import org.thingsboard.server.common.data.exception.ThingsboardException;
35 36 import org.thingsboard.server.common.data.id.ApiUsageStateId;
  37 +import org.thingsboard.server.common.data.id.CustomerId;
  38 +import org.thingsboard.server.common.data.id.EntityId;
36 39 import org.thingsboard.server.common.data.id.TenantId;
37 40 import org.thingsboard.server.common.data.id.TenantProfileId;
38 41 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
... ... @@ -66,6 +69,7 @@ import java.util.Arrays;
66 69 import java.util.HashSet;
67 70 import java.util.List;
68 71 import java.util.Map;
  72 +import java.util.Optional;
69 73 import java.util.Set;
70 74 import java.util.UUID;
71 75 import java.util.concurrent.ConcurrentHashMap;
... ... @@ -106,9 +110,9 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
106 110 private InternalTelemetryService tsWsService;
107 111
108 112 // Tenants that should be processed on this server
109   - private final Map<TenantId, TenantApiUsageState> myTenantStates = new ConcurrentHashMap<>();
  113 + private final Map<EntityId, BaseApiUsageState> myUsageStates = new ConcurrentHashMap<>();
110 114 // Tenants that should be processed on other servers
111   - private final Map<TenantId, ApiUsageState> otherTenantStates = new ConcurrentHashMap<>();
  115 + private final Map<EntityId, ApiUsageState> otherUsageStates = new ConcurrentHashMap<>();
112 116
113 117 @Value("${usage.stats.report.enabled:true}")
114 118 private boolean enabled;
... ... @@ -151,60 +155,73 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
151 155 public void process(TbProtoQueueMsg<ToUsageStatsServiceMsg> msg, TbCallback callback) {
152 156 ToUsageStatsServiceMsg statsMsg = msg.getValue();
153 157 TenantId tenantId = new TenantId(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB()));
154   -
155   - if (tenantProfileCache.get(tenantId) == null) {
156   - return;
  158 + CustomerId customerId;
  159 + if (statsMsg.getCustomerIdMSB() != 0 && statsMsg.getCustomerIdLSB() != 0) {
  160 + customerId = new CustomerId(new UUID(statsMsg.getCustomerIdMSB(), statsMsg.getCustomerIdLSB()));
  161 + } else {
  162 + customerId = new CustomerId(EntityId.NULL_UUID);
157 163 }
158 164
159   - TenantApiUsageState tenantState;
  165 + processEntityUsageStats(tenantId, customerId.isNullUid() ? tenantId : customerId, statsMsg.getValuesList());
  166 + callback.onSuccess();
  167 + }
  168 +
  169 + private void processEntityUsageStats(TenantId tenantId, EntityId entityId, List<UsageStatsKVProto> values) {
  170 + if (tenantProfileCache.get(tenantId) == null) return;
  171 +
  172 + BaseApiUsageState usageState;
160 173 List<TsKvEntry> updatedEntries;
161 174 Map<ApiFeature, ApiUsageStateValue> result;
  175 +
162 176 updateLock.lock();
163 177 try {
164   - tenantState = getOrFetchState(tenantId);
165   - long ts = tenantState.getCurrentCycleTs();
166   - long hourTs = tenantState.getCurrentHourTs();
  178 + usageState = getOrFetchState(tenantId, entityId);
  179 + long ts = usageState.getCurrentCycleTs();
  180 + long hourTs = usageState.getCurrentHourTs();
167 181 long newHourTs = SchedulerUtils.getStartOfCurrentHour();
168 182 if (newHourTs != hourTs) {
169   - tenantState.setHour(newHourTs);
  183 + usageState.setHour(newHourTs);
170 184 }
171 185 updatedEntries = new ArrayList<>(ApiUsageRecordKey.values().length);
172 186 Set<ApiFeature> apiFeatures = new HashSet<>();
173   - for (UsageStatsKVProto kvProto : statsMsg.getValuesList()) {
  187 + for (UsageStatsKVProto kvProto : values) {
174 188 ApiUsageRecordKey recordKey = ApiUsageRecordKey.valueOf(kvProto.getKey());
175   - long newValue = tenantState.add(recordKey, kvProto.getValue());
  189 + long newValue = usageState.add(recordKey, kvProto.getValue());
176 190 updatedEntries.add(new BasicTsKvEntry(ts, new LongDataEntry(recordKey.getApiCountKey(), newValue)));
177   - long newHourlyValue = tenantState.addToHourly(recordKey, kvProto.getValue());
  191 + long newHourlyValue = usageState.addToHourly(recordKey, kvProto.getValue());
178 192 updatedEntries.add(new BasicTsKvEntry(newHourTs, new LongDataEntry(recordKey.getApiCountKey() + HOURLY, newHourlyValue)));
179 193 apiFeatures.add(recordKey.getApiFeature());
180 194 }
181   - result = tenantState.checkStateUpdatedDueToThreshold(apiFeatures);
  195 + result = usageState.checkStateUpdatedDueToThreshold(apiFeatures);
182 196 } finally {
183 197 updateLock.unlock();
184 198 }
185   - tsWsService.saveAndNotifyInternal(tenantId, tenantState.getApiUsageState().getId(), updatedEntries, VOID_CALLBACK);
  199 + tsWsService.saveAndNotifyInternal(tenantId, usageState.getApiUsageState().getId(), updatedEntries, VOID_CALLBACK);
186 200 if (!result.isEmpty()) {
187   - persistAndNotify(tenantState, result);
  201 + persistAndNotify(usageState, result);
188 202 }
189   - callback.onSuccess();
190 203 }
191 204
192 205 @Override
193 206 protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
194 207 if (partitionChangeEvent.getServiceType().equals(ServiceType.TB_CORE)) {
195   - myTenantStates.entrySet().removeIf(entry -> !partitionService.resolve(ServiceType.TB_CORE, entry.getKey(), entry.getKey()).isMyPartition());
196   - otherTenantStates.entrySet().removeIf(entry -> partitionService.resolve(ServiceType.TB_CORE, entry.getKey(), entry.getKey()).isMyPartition());
  208 + myUsageStates.entrySet().removeIf(entry -> {
  209 + return !partitionService.resolve(ServiceType.TB_CORE, entry.getValue().getTenantId(), entry.getKey()).isMyPartition();
  210 + });
  211 + otherUsageStates.entrySet().removeIf(entry -> {
  212 + return partitionService.resolve(ServiceType.TB_CORE, entry.getValue().getTenantId(), entry.getKey()).isMyPartition();
  213 + });
197 214 initStatesFromDataBase();
198 215 }
199 216 }
200 217
201 218 @Override
202 219 public ApiUsageState getApiUsageState(TenantId tenantId) {
203   - TenantApiUsageState tenantState = myTenantStates.get(tenantId);
  220 + TenantApiUsageState tenantState = (TenantApiUsageState) myUsageStates.get(tenantId);
204 221 if (tenantState != null) {
205 222 return tenantState.getApiUsageState();
206 223 } else {
207   - ApiUsageState state = otherTenantStates.get(tenantId);
  224 + ApiUsageState state = otherUsageStates.get(tenantId);
208 225 if (state != null) {
209 226 return state;
210 227 } else {
... ... @@ -213,11 +230,11 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
213 230 } else {
214 231 updateLock.lock();
215 232 try {
216   - state = otherTenantStates.get(tenantId);
  233 + state = otherUsageStates.get(tenantId);
217 234 if (state == null) {
218 235 state = apiUsageStateService.findTenantApiUsageState(tenantId);
219 236 if (state != null) {
220   - otherTenantStates.put(tenantId, state);
  237 + otherUsageStates.put(tenantId, state);
221 238 }
222 239 }
223 240 } finally {
... ... @@ -231,7 +248,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
231 248
232 249 @Override
233 250 public void onApiUsageStateUpdate(TenantId tenantId) {
234   - otherTenantStates.remove(tenantId);
  251 + otherUsageStates.remove(tenantId);
235 252 }
236 253
237 254 @Override
... ... @@ -240,11 +257,14 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
240 257 TenantProfile tenantProfile = tenantProfileCache.get(tenantProfileId);
241 258 updateLock.lock();
242 259 try {
243   - myTenantStates.values().forEach(state -> {
244   - if (tenantProfile.getId().equals(state.getTenantProfileId())) {
245   - updateTenantState(state, tenantProfile);
246   - }
247   - });
  260 + myUsageStates.values().stream()
  261 + .filter(state -> state.getEntityType() == EntityType.TENANT)
  262 + .map(state -> (TenantApiUsageState) state)
  263 + .forEach(state -> {
  264 + if (tenantProfile.getId().equals(state.getTenantProfileId())) {
  265 + updateTenantState(state, tenantProfile);
  266 + }
  267 + });
248 268 } finally {
249 269 updateLock.unlock();
250 270 }
... ... @@ -256,7 +276,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
256 276 TenantProfile tenantProfile = tenantProfileCache.get(tenantId);
257 277 updateLock.lock();
258 278 try {
259   - TenantApiUsageState state = myTenantStates.get(tenantId);
  279 + TenantApiUsageState state = (TenantApiUsageState) myUsageStates.get(tenantId);
260 280 if (state != null && !state.getTenantProfileId().equals(tenantProfile.getId())) {
261 281 updateTenantState(state, tenantProfile);
262 282 }
... ... @@ -293,8 +313,8 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
293 313 }
294 314 }
295 315
296   - private void persistAndNotify(TenantApiUsageState state, Map<ApiFeature, ApiUsageStateValue> result) {
297   - log.info("[{}] Detected update of the API state: {}", state.getTenantId(), result);
  316 + private void persistAndNotify(BaseApiUsageState state, Map<ApiFeature, ApiUsageStateValue> result) {
  317 + log.info("[{}] Detected update of the API state for {}: {}", state.getEntityId(), state.getEntityType(), result);
298 318 apiUsageStateService.update(state.getApiUsageState());
299 319 clusterService.onApiStateChange(state.getApiUsageState(), null);
300 320 long ts = System.currentTimeMillis();
... ... @@ -302,20 +322,21 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
302 322 result.forEach(((apiFeature, aState) -> stateTelemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(apiFeature.getApiStateKey(), aState.name())))));
303 323 tsWsService.saveAndNotifyInternal(state.getTenantId(), state.getApiUsageState().getId(), stateTelemetry, VOID_CALLBACK);
304 324
305   - String email = tenantService.findTenantById(state.getTenantId()).getEmail();
306   -
307   - if (StringUtils.isNotEmpty(email)) {
308   - result.forEach((apiFeature, stateValue) -> {
309   - mailExecutor.submit(() -> {
310   - try {
311   - mailService.sendApiFeatureStateEmail(apiFeature, stateValue, email, createStateMailMessage(state, apiFeature, stateValue));
312   - } catch (ThingsboardException e) {
313   - log.warn("[{}] Can't send update of the API state to tenant with provided email [{}]", state.getTenantId(), email, e);
314   - }
  325 + if (state.getEntityType() == EntityType.TENANT) {
  326 + String email = tenantService.findTenantById(state.getTenantId()).getEmail();
  327 + if (StringUtils.isNotEmpty(email)) {
  328 + result.forEach((apiFeature, stateValue) -> {
  329 + mailExecutor.submit(() -> {
  330 + try {
  331 + mailService.sendApiFeatureStateEmail(apiFeature, stateValue, email, createStateMailMessage((TenantApiUsageState) state, apiFeature, stateValue));
  332 + } catch (ThingsboardException e) {
  333 + log.warn("[{}] Can't send update of the API state to tenant with provided email [{}]", state.getTenantId(), email, e);
  334 + }
  335 + });
315 336 });
316   - });
317   - } else {
318   - log.warn("[{}] Can't send update of the API state to tenant with empty email!", state.getTenantId());
  337 + } else {
  338 + log.warn("[{}] Can't send update of the API state to tenant with empty email!", state.getTenantId());
  339 + }
319 340 }
320 341 }
321 342
... ... @@ -350,12 +371,13 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
350 371 updateLock.lock();
351 372 try {
352 373 long now = System.currentTimeMillis();
353   - myTenantStates.values().forEach(state -> {
  374 + myUsageStates.values().forEach(state -> {
354 375 if ((state.getNextCycleTs() < now) && (now - state.getNextCycleTs() < TimeUnit.HOURS.toMillis(1))) {
  376 + // FIXME
355 377 TenantId tenantId = state.getTenantId();
356 378 state.setCycles(state.getNextCycleTs(), SchedulerUtils.getStartOfNextNextMonth());
357 379 saveNewCounts(state, Arrays.asList(ApiUsageRecordKey.values()));
358   - updateTenantState(state, tenantProfileCache.get(tenantId));
  380 + updateTenantState((TenantApiUsageState) state, tenantProfileCache.get(tenantId));
359 381 }
360 382 });
361 383 } finally {
... ... @@ -363,7 +385,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
363 385 }
364 386 }
365 387
366   - private void saveNewCounts(TenantApiUsageState state, List<ApiUsageRecordKey> keys) {
  388 + private void saveNewCounts(BaseApiUsageState state, List<ApiUsageRecordKey> keys) {
367 389 List<TsKvEntry> counts = keys.stream()
368 390 .map(key -> new BasicTsKvEntry(state.getCurrentCycleTs(), new LongDataEntry(key.getApiCountKey(), 0L)))
369 391 .collect(Collectors.toList());
... ... @@ -371,13 +393,74 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
371 393 tsWsService.saveAndNotifyInternal(state.getTenantId(), state.getApiUsageState().getId(), counts, VOID_CALLBACK);
372 394 }
373 395
  396 + private BaseApiUsageState getOrFetchState(TenantId tenantId, EntityId entityId) {
  397 + BaseApiUsageState state = myUsageStates.get(entityId);
  398 + if (state != null) {
  399 + return state;
  400 + }
  401 +
  402 + ApiUsageState storedState = Optional.ofNullable(apiUsageStateService.findApiUsageStateByEntityId(entityId))
  403 + .orElseGet(() -> {
  404 + try {
  405 + return apiUsageStateService.createDefaultApiUsageState(tenantId, entityId);
  406 + } catch (Exception e) {
  407 + return apiUsageStateService.findApiUsageStateByEntityId(entityId);
  408 + }
  409 + });
  410 +
  411 + switch (entityId.getEntityType()) {
  412 + case TENANT:
  413 + TenantProfile tenantProfile = tenantProfileCache.get(tenantId);
  414 + state = new TenantApiUsageState(tenantProfile, storedState);
  415 + break;
  416 + case CUSTOMER:
  417 + default:
  418 + state = new CustomerApiUsageState(storedState);
  419 + break;
  420 + }
  421 +
  422 + List<ApiUsageRecordKey> newCounts = new ArrayList<>();
  423 + try {
  424 + List<TsKvEntry> dbValues = tsService.findAllLatest(tenantId, storedState.getId()).get();
  425 + for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
  426 + boolean cycleEntryFound = false;
  427 + boolean hourlyEntryFound = false;
  428 + for (TsKvEntry tsKvEntry : dbValues) {
  429 + if (tsKvEntry.getKey().equals(key.getApiCountKey())) {
  430 + cycleEntryFound = true;
  431 +
  432 + boolean oldCount = tsKvEntry.getTs() == state.getCurrentCycleTs();
  433 + state.put(key, oldCount ? tsKvEntry.getLongValue().get() : 0L);
  434 +
  435 + if (!oldCount) {
  436 + newCounts.add(key);
  437 + }
  438 + } else if (tsKvEntry.getKey().equals(key.getApiCountKey() + HOURLY)) {
  439 + hourlyEntryFound = true;
  440 + state.putHourly(key, tsKvEntry.getTs() == state.getCurrentHourTs() ? tsKvEntry.getLongValue().get() : 0L);
  441 + }
  442 + if (cycleEntryFound && hourlyEntryFound) {
  443 + break;
  444 + }
  445 + }
  446 + }
  447 + log.debug("[{}] Initialized state: {}", entityId, storedState);
  448 + myUsageStates.put(entityId, state);
  449 + saveNewCounts(state, newCounts);
  450 + } catch (InterruptedException | ExecutionException e) {
  451 + log.warn("[{}] Failed to fetch api usage state from db.", tenantId, e);
  452 + }
  453 +
  454 + return state;
  455 + }
  456 +
374 457 private TenantApiUsageState getOrFetchState(TenantId tenantId) {
375   - TenantApiUsageState tenantState = myTenantStates.get(tenantId);
  458 + TenantApiUsageState tenantState = (TenantApiUsageState) myUsageStates.get(tenantId);
376 459 if (tenantState == null) {
377 460 ApiUsageState dbStateEntity = apiUsageStateService.findTenantApiUsageState(tenantId);
378 461 if (dbStateEntity == null) {
379 462 try {
380   - dbStateEntity = apiUsageStateService.createDefaultApiUsageState(tenantId);
  463 + dbStateEntity = apiUsageStateService.createDefaultApiUsageState(tenantId, null);
381 464 } catch (Exception e) {
382 465 dbStateEntity = apiUsageStateService.findTenantApiUsageState(tenantId);
383 466 }
... ... @@ -410,7 +493,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
410 493 }
411 494 }
412 495 log.debug("[{}] Initialized state: {}", tenantId, dbStateEntity);
413   - myTenantStates.put(tenantId, tenantState);
  496 + myUsageStates.put(tenantId, tenantState);
414 497 saveNewCounts(tenantState, newCounts);
415 498 } catch (InterruptedException | ExecutionException e) {
416 499 log.warn("[{}] Failed to fetch api usage state from db.", tenantId, e);
... ... @@ -429,7 +512,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
429 512 PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, 1024);
430 513 List<Future<?>> futures = new ArrayList<>();
431 514 for (Tenant tenant : tenantIterator) {
432   - if (!myTenantStates.containsKey(tenant.getId()) && partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), tenant.getId()).isMyPartition()) {
  515 + if (!myUsageStates.containsKey(tenant.getId()) && partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), tenant.getId()).isMyPartition()) {
433 516 log.debug("[{}] Initializing tenant state.", tenant.getId());
434 517 futures.add(tmpInitExecutor.submit(() -> {
435 518 try {
... ...
... ... @@ -22,85 +22,23 @@ import org.thingsboard.server.common.data.ApiFeature;
22 22 import org.thingsboard.server.common.data.ApiUsageRecordKey;
23 23 import org.thingsboard.server.common.data.ApiUsageState;
24 24 import org.thingsboard.server.common.data.ApiUsageStateValue;
  25 +import org.thingsboard.server.common.data.EntityType;
25 26 import org.thingsboard.server.common.data.TenantProfile;
26   -import org.thingsboard.server.common.data.id.TenantId;
27 27 import org.thingsboard.server.common.data.id.TenantProfileId;
28 28 import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
29   -import org.thingsboard.server.common.msg.tools.SchedulerUtils;
30   -
31   -import java.util.Arrays;
32   -import java.util.HashMap;
33   -import java.util.HashSet;
34   -import java.util.Map;
35   -import java.util.Set;
36   -import java.util.concurrent.ConcurrentHashMap;
37   -
38   -public class TenantApiUsageState {
39   -
40   - private final Map<ApiUsageRecordKey, Long> currentCycleValues = new ConcurrentHashMap<>();
41   - private final Map<ApiUsageRecordKey, Long> currentHourValues = new ConcurrentHashMap<>();
42 29
  30 +public class TenantApiUsageState extends BaseApiUsageState {
43 31 @Getter
44 32 @Setter
45 33 private TenantProfileId tenantProfileId;
46 34 @Getter
47 35 @Setter
48 36 private TenantProfileData tenantProfileData;
49   - @Getter
50   - private final ApiUsageState apiUsageState;
51   - @Getter
52   - private volatile long currentCycleTs;
53   - @Getter
54   - private volatile long nextCycleTs;
55   - @Getter
56   - private volatile long currentHourTs;
57 37
58 38 public TenantApiUsageState(TenantProfile tenantProfile, ApiUsageState apiUsageState) {
  39 + super(apiUsageState);
59 40 this.tenantProfileId = tenantProfile.getId();
60 41 this.tenantProfileData = tenantProfile.getProfileData();
61   - this.apiUsageState = apiUsageState;
62   - this.currentCycleTs = SchedulerUtils.getStartOfCurrentMonth();
63   - this.nextCycleTs = SchedulerUtils.getStartOfNextMonth();
64   - this.currentHourTs = SchedulerUtils.getStartOfCurrentHour();
65   - }
66   -
67   - public void put(ApiUsageRecordKey key, Long value) {
68   - currentCycleValues.put(key, value);
69   - }
70   -
71   - public void putHourly(ApiUsageRecordKey key, Long value) {
72   - currentHourValues.put(key, value);
73   - }
74   -
75   - public long add(ApiUsageRecordKey key, long value) {
76   - long result = currentCycleValues.getOrDefault(key, 0L) + value;
77   - currentCycleValues.put(key, result);
78   - return result;
79   - }
80   -
81   - public long get(ApiUsageRecordKey key) {
82   - return currentCycleValues.getOrDefault(key, 0L);
83   - }
84   -
85   - public long addToHourly(ApiUsageRecordKey key, long value) {
86   - long result = currentHourValues.getOrDefault(key, 0L) + value;
87   - currentHourValues.put(key, result);
88   - return result;
89   - }
90   -
91   - public void setHour(long currentHourTs) {
92   - this.currentHourTs = currentHourTs;
93   - for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
94   - currentHourValues.put(key, 0L);
95   - }
96   - }
97   -
98   - public void setCycles(long currentCycleTs, long nextCycleTs) {
99   - this.currentCycleTs = currentCycleTs;
100   - this.nextCycleTs = nextCycleTs;
101   - for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
102   - currentCycleValues.put(key, 0L);
103   - }
104 42 }
105 43
106 44 public long getProfileThreshold(ApiUsageRecordKey key) {
... ... @@ -111,69 +49,7 @@ public class TenantApiUsageState {
111 49 return tenantProfileData.getConfiguration().getWarnThreshold(key);
112 50 }
113 51
114   - public TenantId getTenantId() {
115   - return apiUsageState.getTenantId();
116   - }
117   -
118   - public ApiUsageStateValue getFeatureValue(ApiFeature feature) {
119   - switch (feature) {
120   - case TRANSPORT:
121   - return apiUsageState.getTransportState();
122   - case RE:
123   - return apiUsageState.getReExecState();
124   - case DB:
125   - return apiUsageState.getDbStorageState();
126   - case JS:
127   - return apiUsageState.getJsExecState();
128   - case EMAIL:
129   - return apiUsageState.getEmailExecState();
130   - case SMS:
131   - return apiUsageState.getSmsExecState();
132   - default:
133   - return ApiUsageStateValue.ENABLED;
134   - }
135   - }
136   -
137   - public boolean setFeatureValue(ApiFeature feature, ApiUsageStateValue value) {
138   - ApiUsageStateValue currentValue = getFeatureValue(feature);
139   - switch (feature) {
140   - case TRANSPORT:
141   - apiUsageState.setTransportState(value);
142   - break;
143   - case RE:
144   - apiUsageState.setReExecState(value);
145   - break;
146   - case DB:
147   - apiUsageState.setDbStorageState(value);
148   - break;
149   - case JS:
150   - apiUsageState.setJsExecState(value);
151   - break;
152   - case EMAIL:
153   - apiUsageState.setEmailExecState(value);
154   - break;
155   - case SMS:
156   - apiUsageState.setSmsExecState(value);
157   - break;
158   - }
159   - return !currentValue.equals(value);
160   - }
161   -
162   - public Map<ApiFeature, ApiUsageStateValue> checkStateUpdatedDueToThresholds() {
163   - return checkStateUpdatedDueToThreshold(new HashSet<>(Arrays.asList(ApiFeature.values())));
164   - }
165   -
166   - public Map<ApiFeature, ApiUsageStateValue> checkStateUpdatedDueToThreshold(Set<ApiFeature> features) {
167   - Map<ApiFeature, ApiUsageStateValue> result = new HashMap<>();
168   - for (ApiFeature feature : features) {
169   - Pair<ApiFeature, ApiUsageStateValue> tmp = checkStateUpdatedDueToThreshold(feature);
170   - if (tmp != null) {
171   - result.put(tmp.getFirst(), tmp.getSecond());
172   - }
173   - }
174   - return result;
175   - }
176   -
  52 + @Override
177 53 public Pair<ApiFeature, ApiUsageStateValue> checkStateUpdatedDueToThreshold(ApiFeature feature) {
178 54 ApiUsageStateValue featureValue = ApiUsageStateValue.ENABLED;
179 55 for (ApiUsageRecordKey recordKey : ApiUsageRecordKey.getKeys(feature)) {
... ... @@ -193,4 +69,9 @@ public class TenantApiUsageState {
193 69 return setFeatureValue(feature, featureValue) ? Pair.of(feature, featureValue) : null;
194 70 }
195 71
  72 + @Override
  73 + public EntityType getEntityType() {
  74 + return EntityType.TENANT;
  75 + }
  76 +
196 77 }
... ...
... ... @@ -920,7 +920,7 @@ public final class EdgeGrpcSession implements Closeable {
920 920 try {
921 921 if (uplinkMsg.getEntityDataCount() > 0) {
922 922 for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) {
923   - result.addAll(ctx.getTelemetryProcessor().onTelemetryUpdate(edge.getTenantId(), entityData));
  923 + result.addAll(ctx.getTelemetryProcessor().onTelemetryUpdate(edge.getTenantId(), edge.getCustomerId(), entityData));
924 924 }
925 925 }
926 926 if (uplinkMsg.getDeviceUpdateMsgCount() > 0) {
... ...
... ... @@ -233,7 +233,7 @@ public class DeviceProcessor extends BaseProcessor {
233 233 ObjectNode entityNode = mapper.valueToTree(device);
234 234 TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, deviceId,
235 235 getActionTbMsgMetaData(edge, device.getCustomerId()), TbMsgDataType.JSON, mapper.writeValueAsString(entityNode));
236   - tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, new TbQueueCallback() {
  236 + tbClusterService.pushMsgToRuleEngine(tenantId, device.getCustomerId(), deviceId, tbMsg, new TbQueueCallback() {
237 237 @Override
238 238 public void onSuccess(TbQueueMsgMetadata metadata) {
239 239 log.debug("Successfully send ENTITY_CREATED EVENT to rule engine [{}]", device);
... ...
... ... @@ -70,7 +70,7 @@ public class TelemetryProcessor extends BaseProcessor {
70 70
71 71 private final Gson gson = new Gson();
72 72
73   - public List<ListenableFuture<Void>> onTelemetryUpdate(TenantId tenantId, EntityDataProto entityData) {
  73 + public List<ListenableFuture<Void>> onTelemetryUpdate(TenantId tenantId, CustomerId customerId, EntityDataProto entityData) {
74 74 log.trace("[{}] onTelemetryUpdate [{}]", tenantId, entityData);
75 75 List<ListenableFuture<Void>> result = new ArrayList<>();
76 76 EntityId entityId = constructEntityId(entityData);
... ... @@ -80,14 +80,14 @@ public class TelemetryProcessor extends BaseProcessor {
80 80 TbMsgMetaData metaData = new TbMsgMetaData();
81 81 metaData.putValue(DataConstants.MSG_SOURCE_KEY, DataConstants.EDGE_MSG_SOURCE);
82 82 if (entityData.hasPostAttributesMsg()) {
83   - result.add(processPostAttributes(tenantId, entityId, entityData.getPostAttributesMsg(), metaData));
  83 + result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData));
84 84 }
85 85 if (entityData.hasAttributesUpdatedMsg()) {
86 86 metaData.putValue("scope", entityData.getPostAttributeScope());
87   - result.add(processAttributesUpdate(tenantId, entityId, entityData.getAttributesUpdatedMsg(), metaData));
  87 + result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData));
88 88 }
89 89 if (entityData.hasPostTelemetryMsg()) {
90   - result.add(processPostTelemetry(tenantId, entityId, entityData.getPostTelemetryMsg(), metaData));
  90 + result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData));
91 91 }
92 92 }
93 93 if (entityData.hasAttributeDeleteMsg()) {
... ... @@ -148,7 +148,7 @@ public class TelemetryProcessor extends BaseProcessor {
148 148 }
149 149 }
150 150
151   - private ListenableFuture<Void> processPostTelemetry(TenantId tenantId, EntityId entityId, TransportProtos.PostTelemetryMsg msg, TbMsgMetaData metaData) {
  151 + private ListenableFuture<Void> processPostTelemetry(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostTelemetryMsg msg, TbMsgMetaData metaData) {
152 152 SettableFuture<Void> futureToSet = SettableFuture.create();
153 153 for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) {
154 154 JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList());
... ... @@ -157,7 +157,7 @@ public class TelemetryProcessor extends BaseProcessor {
157 157 String queueName = defaultQueueAndRuleChain.getKey();
158 158 RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue();
159 159 TbMsg tbMsg = TbMsg.newMsg(queueName, SessionMsgType.POST_TELEMETRY_REQUEST.name(), entityId, metaData, gson.toJson(json), ruleChainId, null);
160   - tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
  160 + tbClusterService.pushMsgToRuleEngine(tenantId, customerId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
161 161 @Override
162 162 public void onSuccess(TbQueueMsgMetadata metadata) {
163 163 futureToSet.set(null);
... ... @@ -173,14 +173,14 @@ public class TelemetryProcessor extends BaseProcessor {
173 173 return futureToSet;
174 174 }
175 175
176   - private ListenableFuture<Void> processPostAttributes(TenantId tenantId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) {
  176 + private ListenableFuture<Void> processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) {
177 177 SettableFuture<Void> futureToSet = SettableFuture.create();
178 178 JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
179 179 Pair<String, RuleChainId> defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
180 180 String queueName = defaultQueueAndRuleChain.getKey();
181 181 RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue();
182 182 TbMsg tbMsg = TbMsg.newMsg(queueName, SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, metaData, gson.toJson(json), ruleChainId, null);
183   - tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
  183 + tbClusterService.pushMsgToRuleEngine(tenantId, customerId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
184 184 @Override
185 185 public void onSuccess(TbQueueMsgMetadata metadata) {
186 186 futureToSet.set(null);
... ... @@ -195,7 +195,7 @@ public class TelemetryProcessor extends BaseProcessor {
195 195 return futureToSet;
196 196 }
197 197
198   - private ListenableFuture<Void> processAttributesUpdate(TenantId tenantId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) {
  198 + private ListenableFuture<Void> processAttributesUpdate(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) {
199 199 SettableFuture<Void> futureToSet = SettableFuture.create();
200 200 JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
201 201 Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(json);
... ... @@ -207,7 +207,7 @@ public class TelemetryProcessor extends BaseProcessor {
207 207 String queueName = defaultQueueAndRuleChain.getKey();
208 208 RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue();
209 209 TbMsg tbMsg = TbMsg.newMsg(queueName, DataConstants.ATTRIBUTES_UPDATED, entityId, metaData, gson.toJson(json), ruleChainId, null);
210   - tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
  210 + tbClusterService.pushMsgToRuleEngine(tenantId, customerId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
211 211 @Override
212 212 public void onSuccess(TbQueueMsgMetadata metadata) {
213 213 futureToSet.set(null);
... ...
... ... @@ -390,7 +390,7 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
390 390 pageData = tenantService.findTenants(pageLink);
391 391 for (Tenant tenant : pageData.getData()) {
392 392 try {
393   - apiUsageStateService.createDefaultApiUsageState(tenant.getId());
  393 + apiUsageStateService.createDefaultApiUsageState(tenant.getId(), null);
394 394 } catch (Exception e) {
395 395 }
396 396 List<EntitySubtype> deviceTypes = deviceService.findDeviceTypesByTenantId(tenant.getId()).get();
... ...
... ... @@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.ApiUsageStateMailMessage;
36 36 import org.thingsboard.server.common.data.ApiUsageStateValue;
37 37 import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
38 38 import org.thingsboard.server.common.data.exception.ThingsboardException;
  39 +import org.thingsboard.server.common.data.id.CustomerId;
39 40 import org.thingsboard.server.common.data.id.EntityId;
40 41 import org.thingsboard.server.common.data.id.TenantId;
41 42 import org.thingsboard.server.dao.exception.IncorrectParameterException;
... ... @@ -233,7 +234,7 @@ public class DefaultMailService implements MailService {
233 234 }
234 235
235 236 @Override
236   - public void send(TenantId tenantId, String from, String to, String cc, String bcc, String subject, String body) throws MessagingException {
  237 + public void send(TenantId tenantId, CustomerId customerId, String from, String to, String cc, String bcc, String subject, String body) throws MessagingException {
237 238 if (apiUsageStateService.getApiUsageState(tenantId).isEmailSendEnabled()) {
238 239 MimeMessage mailMsg = mailSender.createMimeMessage();
239 240 MimeMessageHelper helper = new MimeMessageHelper(mailMsg, "UTF-8");
... ... @@ -248,7 +249,7 @@ public class DefaultMailService implements MailService {
248 249 helper.setSubject(subject);
249 250 helper.setText(body);
250 251 mailSender.send(helper.getMimeMessage());
251   - apiUsageClient.report(tenantId, ApiUsageRecordKey.EMAIL_EXEC_COUNT, 1);
  252 + apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.EMAIL_EXEC_COUNT, 1);
252 253 } else {
253 254 throw new RuntimeException("Email sending is disabled due to API limits!");
254 255 }
... ...
... ... @@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.HasName;
29 29 import org.thingsboard.server.common.data.TbResource;
30 30 import org.thingsboard.server.common.data.Tenant;
31 31 import org.thingsboard.server.common.data.TenantProfile;
  32 +import org.thingsboard.server.common.data.id.CustomerId;
32 33 import org.thingsboard.server.common.data.id.DeviceId;
33 34 import org.thingsboard.server.common.data.id.DeviceProfileId;
34 35 import org.thingsboard.server.common.data.id.EdgeId;
... ... @@ -133,7 +134,12 @@ public class DefaultTbClusterService implements TbClusterService {
133 134 }
134 135
135 136 @Override
136   - public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) {
  137 + public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg msg, TbQueueCallback callback) {
  138 + pushMsgToRuleEngine(tenantId, null, entityId, msg, callback);
  139 + }
  140 +
  141 + @Override
  142 + public void pushMsgToRuleEngine(TenantId tenantId, CustomerId customerId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) {
137 143 if (tenantId.isNullUid()) {
138 144 if (entityId.getEntityType().equals(EntityType.TENANT)) {
139 145 tenantId = new TenantId(entityId.getId());
... ... @@ -148,6 +154,7 @@ public class DefaultTbClusterService implements TbClusterService {
148 154 tbMsg = transformMsg(tbMsg, deviceProfileCache.get(tenantId, new DeviceProfileId(entityId.getId())));
149 155 }
150 156 }
  157 + tbMsg.setCustomerId(customerId);
151 158 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, entityId);
152 159 log.trace("PUSHING msg: {} to:{}", tbMsg, tpi);
153 160 ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder()
... ...
... ... @@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.DeviceProfile;
22 22 import org.thingsboard.server.common.data.TbResource;
23 23 import org.thingsboard.server.common.data.Tenant;
24 24 import org.thingsboard.server.common.data.TenantProfile;
  25 +import org.thingsboard.server.common.data.id.CustomerId;
25 26 import org.thingsboard.server.common.data.id.EdgeId;
26 27 import org.thingsboard.server.common.data.id.EntityId;
27 28 import org.thingsboard.server.common.data.id.TenantId;
... ... @@ -50,6 +51,8 @@ public interface TbClusterService {
50 51
51 52 void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg msg, TbQueueCallback callback);
52 53
  54 + void pushMsgToRuleEngine(TenantId tenantId, CustomerId customerId, EntityId entityId, TbMsg msg, TbQueueCallback callback);
  55 +
53 56 void pushNotificationToRuleEngine(String targetServiceId, FromDeviceRpcResponse response, TbQueueCallback callback);
54 57
55 58 void pushNotificationToTransport(String targetServiceId, ToTransportMsg response, TbQueueCallback callback);
... ...
... ... @@ -34,6 +34,7 @@ import org.thingsboard.server.dao.device.DeviceService;
34 34 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
35 35 import org.thingsboard.server.queue.util.TbCoreComponent;
36 36 import org.thingsboard.server.service.queue.TbClusterService;
  37 +import org.thingsboard.server.service.security.model.SecurityUser;
37 38
38 39 import javax.annotation.PostConstruct;
39 40 import javax.annotation.PreDestroy;
... ... @@ -95,11 +96,11 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
95 96 }
96 97
97 98 @Override
98   - public void processRestApiRpcRequest(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) {
  99 + public void processRestApiRpcRequest(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer, SecurityUser currentUser) {
99 100 log.trace("[{}][{}] Processing REST API call to rule engine [{}]", request.getTenantId(), request.getId(), request.getDeviceId());
100 101 UUID requestId = request.getId();
101 102 localToRuleEngineRpcRequests.put(requestId, responseConsumer);
102   - sendRpcRequestToRuleEngine(request);
  103 + sendRpcRequestToRuleEngine(request, currentUser);
103 104 scheduleToRuleEngineTimeout(request, requestId);
104 105 }
105 106
... ... @@ -149,7 +150,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
149 150 }
150 151 }
151 152
152   - private void sendRpcRequestToRuleEngine(ToDeviceRpcRequest msg) {
  153 + private void sendRpcRequestToRuleEngine(ToDeviceRpcRequest msg, SecurityUser currentUser) {
153 154 ObjectNode entityNode = json.createObjectNode();
154 155 TbMsgMetaData metaData = new TbMsgMetaData();
155 156 metaData.putValue("requestUUID", msg.getId().toString());
... ... @@ -168,7 +169,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
168 169
169 170 try {
170 171 TbMsg tbMsg = TbMsg.newMsg(DataConstants.RPC_CALL_FROM_SERVER_TO_DEVICE, msg.getDeviceId(), metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode));
171   - clusterService.pushMsgToRuleEngine(msg.getTenantId(), msg.getDeviceId(), tbMsg, null);
  172 + clusterService.pushMsgToRuleEngine(msg.getTenantId(), currentUser.getCustomerId(), msg.getDeviceId(), tbMsg, null);
172 173 } catch (JsonProcessingException e) {
173 174 throw new RuntimeException(e);
174 175 }
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.service.rpc;
17 17
18 18 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
  19 +import org.thingsboard.server.service.security.model.SecurityUser;
19 20
20 21 import java.util.function.Consumer;
21 22
... ... @@ -27,11 +28,11 @@ public interface TbCoreDeviceRpcService {
27 28 /**
28 29 * Handles REST API calls that contain RPC requests to Device and pushes them to Rule Engine.
29 30 * Schedules the timeout for the RPC call based on the {@link ToDeviceRpcRequest}
30   - *
31   - * @param request the RPC request
  31 + * @param request the RPC request
32 32 * @param responseConsumer the consumer of the RPC response
  33 + * @param currentUser
33 34 */
34   - void processRestApiRpcRequest(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer);
  35 + void processRestApiRpcRequest(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer, SecurityUser currentUser);
35 36
36 37 /**
37 38 * Handles the RPC response from the Rule Engine.
... ...
... ... @@ -20,6 +20,7 @@ import com.google.common.util.concurrent.ListenableFuture;
20 20 import lombok.extern.slf4j.Slf4j;
21 21 import org.thingsboard.common.util.ThingsBoardThreadFactory;
22 22 import org.thingsboard.server.common.data.ApiUsageRecordKey;
  23 +import org.thingsboard.server.common.data.id.CustomerId;
23 24 import org.thingsboard.server.common.data.id.TenantId;
24 25 import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
25 26 import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
... ... @@ -73,14 +74,14 @@ public abstract class AbstractJsInvokeService implements JsInvokeService {
73 74 }
74 75
75 76 @Override
76   - public ListenableFuture<Object> invokeFunction(TenantId tenantId, UUID scriptId, Object... args) {
  77 + public ListenableFuture<Object> invokeFunction(TenantId tenantId, CustomerId customerId, UUID scriptId, Object... args) {
77 78 if (apiUsageStateService.getApiUsageState(tenantId).isJsExecEnabled()) {
78 79 String functionName = scriptIdToNameMap.get(scriptId);
79 80 if (functionName == null) {
80 81 return Futures.immediateFailedFuture(new RuntimeException("No compiled script found for scriptId: [" + scriptId + "]!"));
81 82 }
82 83 if (!isDisabled(scriptId)) {
83   - apiUsageClient.report(tenantId, ApiUsageRecordKey.JS_EXEC_COUNT, 1);
  84 + apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.JS_EXEC_COUNT, 1);
84 85 return doInvokeFunction(scriptId, functionName, args);
85 86 } else {
86 87 return Futures.immediateFailedFuture(
... ...
... ... @@ -16,7 +16,7 @@
16 16 package org.thingsboard.server.service.script;
17 17
18 18 import com.google.common.util.concurrent.ListenableFuture;
19   -import org.thingsboard.server.common.data.id.EntityId;
  19 +import org.thingsboard.server.common.data.id.CustomerId;
20 20 import org.thingsboard.server.common.data.id.TenantId;
21 21
22 22 import java.util.UUID;
... ... @@ -25,7 +25,7 @@ public interface JsInvokeService {
25 25
26 26 ListenableFuture<UUID> eval(TenantId tenantId, JsScriptType scriptType, String scriptBody, String... argNames);
27 27
28   - ListenableFuture<Object> invokeFunction(TenantId tenantId, UUID scriptId, Object... args);
  28 + ListenableFuture<Object> invokeFunction(TenantId tenantId, CustomerId customerId, UUID scriptId, Object... args);
29 29
30 30 ListenableFuture<Void> release(UUID scriptId);
31 31
... ...
... ... @@ -218,7 +218,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
218 218 private JsonNode executeScript(TbMsg msg) throws ScriptException {
219 219 try {
220 220 String[] inArgs = prepareArgs(msg);
221   - String eval = sandboxService.invokeFunction(tenantId, this.scriptId, inArgs[0], inArgs[1], inArgs[2]).get().toString();
  221 + String eval = sandboxService.invokeFunction(tenantId, msg.getCustomerId(), this.scriptId, inArgs[0], inArgs[1], inArgs[2]).get().toString();
222 222 return mapper.readTree(eval);
223 223 } catch (ExecutionException e) {
224 224 if (e.getCause() instanceof ScriptException) {
... ... @@ -235,7 +235,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
235 235
236 236 private ListenableFuture<JsonNode> executeScriptAsync(TbMsg msg) {
237 237 String[] inArgs = prepareArgs(msg);
238   - return Futures.transformAsync(sandboxService.invokeFunction(tenantId, this.scriptId, inArgs[0], inArgs[1], inArgs[2]),
  238 + return Futures.transformAsync(sandboxService.invokeFunction(tenantId, msg.getCustomerId(), this.scriptId, inArgs[0], inArgs[1], inArgs[2]),
239 239 o -> {
240 240 try {
241 241 return Futures.immediateFuture(mapper.readTree(o.toString()));
... ...
... ... @@ -22,6 +22,7 @@ import org.springframework.stereotype.Service;
22 22 import org.thingsboard.rule.engine.api.SmsService;
23 23 import org.thingsboard.rule.engine.api.sms.SmsSender;
24 24 import org.thingsboard.rule.engine.api.sms.SmsSenderFactory;
  25 +import org.thingsboard.server.common.data.id.CustomerId;
25 26 import org.thingsboard.server.common.data.sms.config.SmsProviderConfiguration;
26 27 import org.thingsboard.server.common.data.sms.config.TestSmsRequest;
27 28 import org.thingsboard.server.common.data.AdminSettings;
... ... @@ -94,7 +95,7 @@ public class DefaultSmsService implements SmsService {
94 95 }
95 96
96 97 @Override
97   - public void sendSms(TenantId tenantId, String[] numbersTo, String message) throws ThingsboardException {
  98 + public void sendSms(TenantId tenantId, CustomerId customerId, String[] numbersTo, String message) throws ThingsboardException {
98 99 if (apiUsageStateService.getApiUsageState(tenantId).isSmsSendEnabled()) {
99 100 int smsCount = 0;
100 101 try {
... ... @@ -103,7 +104,7 @@ public class DefaultSmsService implements SmsService {
103 104 }
104 105 } finally {
105 106 if (smsCount > 0) {
106   - apiUsageClient.report(tenantId, ApiUsageRecordKey.SMS_EXEC_COUNT, smsCount);
  107 + apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.SMS_EXEC_COUNT, smsCount);
107 108 }
108 109 }
109 110 } else {
... ...
... ... @@ -468,6 +468,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
468 468 md.putValue("deviceName", device.getName());
469 469 md.putValue("deviceType", device.getType());
470 470 return DeviceStateData.builder()
  471 + .customerId(device.getCustomerId())
471 472 .tenantId(device.getTenantId())
472 473 .deviceId(device.getId())
473 474 .deviceCreationTime(device.getCreatedTime())
... ... @@ -508,7 +509,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
508 509 md.putValue(DataConstants.SCOPE, SERVER_SCOPE);
509 510 }
510 511 TbMsg tbMsg = TbMsg.newMsg(msgType, stateData.getDeviceId(), md, TbMsgDataType.JSON, data);
511   - clusterService.pushMsgToRuleEngine(stateData.getTenantId(), stateData.getDeviceId(), tbMsg, null);
  512 + clusterService.pushMsgToRuleEngine(stateData.getTenantId(), stateData.getCustomerId(), stateData.getDeviceId(), tbMsg, null);
512 513 } catch (Exception e) {
513 514 log.warn("[{}] Failed to push inactivity alarm: {}", stateData.getDeviceId(), state, e);
514 515 }
... ...
... ... @@ -17,6 +17,7 @@ package org.thingsboard.server.service.state;
17 17
18 18 import lombok.Builder;
19 19 import lombok.Data;
  20 +import org.thingsboard.server.common.data.id.CustomerId;
20 21 import org.thingsboard.server.common.data.id.DeviceId;
21 22 import org.thingsboard.server.common.data.id.TenantId;
22 23 import org.thingsboard.server.common.msg.TbMsgMetaData;
... ... @@ -29,6 +30,7 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
29 30 class DeviceStateData {
30 31
31 32 private final TenantId tenantId;
  33 + private final CustomerId customerId;
32 34 private final DeviceId deviceId;
33 35 private final long deviceCreationTime;
34 36 private TbMsgMetaData metaData;
... ...
... ... @@ -25,7 +25,7 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory;
25 25 import org.thingsboard.server.common.data.ApiUsageRecordKey;
26 26 import org.thingsboard.server.common.data.EntityType;
27 27 import org.thingsboard.server.common.data.EntityView;
28   -import org.thingsboard.server.common.data.TenantProfile;
  28 +import org.thingsboard.server.common.data.id.CustomerId;
29 29 import org.thingsboard.server.common.data.id.EntityId;
30 30 import org.thingsboard.server.common.data.id.TenantId;
31 31 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
... ... @@ -35,13 +35,11 @@ import org.thingsboard.server.common.data.kv.DoubleDataEntry;
35 35 import org.thingsboard.server.common.data.kv.LongDataEntry;
36 36 import org.thingsboard.server.common.data.kv.StringDataEntry;
37 37 import org.thingsboard.server.common.data.kv.TsKvEntry;
38   -import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
39 38 import org.thingsboard.server.common.msg.queue.ServiceType;
40 39 import org.thingsboard.server.common.msg.queue.TbCallback;
41 40 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
42 41 import org.thingsboard.server.dao.attributes.AttributesService;
43 42 import org.thingsboard.server.dao.entityview.EntityViewService;
44   -import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
45 43 import org.thingsboard.server.dao.timeseries.TimeseriesService;
46 44 import org.thingsboard.server.gen.transport.TransportProtos;
47 45 import org.thingsboard.server.queue.discovery.PartitionService;
... ... @@ -115,11 +113,11 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
115 113
116 114 @Override
117 115 public void saveAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback) {
118   - saveAndNotify(tenantId, entityId, ts, 0L, callback);
  116 + saveAndNotify(tenantId, null, entityId, ts, 0L, callback);
119 117 }
120 118
121 119 @Override
122   - public void saveAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback) {
  120 + public void saveAndNotify(TenantId tenantId, CustomerId customerId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback) {
123 121 checkInternalEntity(entityId);
124 122 boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null;
125 123 if (sysTenant || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) {
... ... @@ -127,7 +125,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
127 125 @Override
128 126 public void onSuccess(Integer result) {
129 127 if (!sysTenant && result != null && result > 0) {
130   - apiUsageClient.report(tenantId, ApiUsageRecordKey.STORAGE_DP_COUNT, result);
  128 + apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, result);
131 129 }
132 130 callback.onSuccess(null);
133 131 }
... ...
... ... @@ -81,6 +81,7 @@ import org.thingsboard.server.service.profile.TbDeviceProfileCache;
81 81 import org.thingsboard.server.service.queue.TbClusterService;
82 82 import org.thingsboard.server.service.state.DeviceStateService;
83 83
  84 +import java.util.Optional;
84 85 import java.util.UUID;
85 86 import java.util.concurrent.ConcurrentHashMap;
86 87 import java.util.concurrent.ConcurrentMap;
... ... @@ -273,7 +274,7 @@ public class DefaultTransportApiService implements TransportApiService {
273 274 DeviceId deviceId = device.getId();
274 275 ObjectNode entityNode = mapper.valueToTree(device);
275 276 TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, deviceId, metaData, TbMsgDataType.JSON, mapper.writeValueAsString(entityNode));
276   - tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, null);
  277 + tbClusterService.pushMsgToRuleEngine(tenantId, customerId, deviceId, tbMsg, null);
277 278 }
278 279 GetOrCreateDeviceFromGatewayResponseMsg.Builder builder = GetOrCreateDeviceFromGatewayResponseMsg.newBuilder()
279 280 .setDeviceInfo(getDeviceInfoProto(device));
... ... @@ -411,6 +412,8 @@ public class DefaultTransportApiService implements TransportApiService {
411 412 return DeviceInfoProto.newBuilder()
412 413 .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits())
413 414 .setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits())
  415 + .setCustomerIdMSB(device.getCustomerId().getId().getMostSignificantBits())
  416 + .setCustomerIdLSB(device.getCustomerId().getId().getLeastSignificantBits())
414 417 .setDeviceIdMSB(device.getId().getId().getMostSignificantBits())
415 418 .setDeviceIdLSB(device.getId().getId().getLeastSignificantBits())
416 419 .setDeviceName(device.getName())
... ...
... ... @@ -21,12 +21,12 @@ import org.checkerframework.checker.nullness.qual.Nullable;
21 21 import org.junit.After;
22 22 import org.junit.Assert;
23 23 import org.junit.Before;
24   -import org.junit.Ignore;
25 24 import org.junit.Test;
26 25 import org.springframework.beans.factory.annotation.Autowired;
27 26 import org.thingsboard.server.common.data.Device;
28 27 import org.thingsboard.server.common.data.Tenant;
29 28 import org.thingsboard.server.common.data.User;
  29 +import org.thingsboard.server.common.data.id.CustomerId;
30 30 import org.thingsboard.server.common.data.kv.Aggregation;
31 31 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
32 32 import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
... ... @@ -750,7 +750,7 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
750 750
751 751 private void sendTelemetry(Device device, List<TsKvEntry> tsData) throws InterruptedException {
752 752 CountDownLatch latch = new CountDownLatch(1);
753   - tsService.saveAndNotify(device.getTenantId(), device.getId(), tsData, 0, new FutureCallback<Void>() {
  753 + tsService.saveAndNotify(device.getTenantId(), null, device.getId(), tsData, 0, new FutureCallback<Void>() {
754 754 @Override
755 755 public void onSuccess(@Nullable Void result) {
756 756 latch.countDown();
... ...
... ... @@ -17,16 +17,19 @@ package org.thingsboard.server.dao.usagerecord;
17 17
18 18 import org.thingsboard.server.common.data.ApiUsageState;
19 19 import org.thingsboard.server.common.data.id.ApiUsageStateId;
  20 +import org.thingsboard.server.common.data.id.EntityId;
20 21 import org.thingsboard.server.common.data.id.TenantId;
21 22
22 23 public interface ApiUsageStateService {
23 24
24   - ApiUsageState createDefaultApiUsageState(TenantId id);
  25 + ApiUsageState createDefaultApiUsageState(TenantId id, EntityId entityId);
25 26
26 27 ApiUsageState update(ApiUsageState apiUsageState);
27 28
28 29 ApiUsageState findTenantApiUsageState(TenantId tenantId);
29 30
  31 + ApiUsageState findApiUsageStateByEntityId(EntityId entityId);
  32 +
30 33 void deleteApiUsageStateByTenantId(TenantId tenantId);
31 34
32 35 ApiUsageState findApiUsageStateById(TenantId tenantId, ApiUsageStateId id);
... ...
... ... @@ -23,6 +23,7 @@ import lombok.Builder;
23 23 import lombok.Data;
24 24 import lombok.Getter;
25 25 import lombok.extern.slf4j.Slf4j;
  26 +import org.thingsboard.server.common.data.id.CustomerId;
26 27 import org.thingsboard.server.common.data.id.EntityId;
27 28 import org.thingsboard.server.common.data.id.EntityIdFactory;
28 29 import org.thingsboard.server.common.data.id.RuleChainId;
... ... @@ -47,6 +48,7 @@ public final class TbMsg implements Serializable {
47 48 private final long ts;
48 49 private final String type;
49 50 private final EntityId originator;
  51 + private CustomerId customerId;
50 52 private final TbMsgMetaData metaData;
51 53 private final TbMsgDataType dataType;
52 54 private final String data;
... ...
... ... @@ -19,6 +19,9 @@ import lombok.extern.slf4j.Slf4j;
19 19 import org.springframework.beans.factory.annotation.Value;
20 20 import org.springframework.stereotype.Component;
21 21 import org.thingsboard.server.common.data.ApiUsageRecordKey;
  22 +import org.thingsboard.server.common.data.EntityType;
  23 +import org.thingsboard.server.common.data.id.CustomerId;
  24 +import org.thingsboard.server.common.data.id.EntityId;
22 25 import org.thingsboard.server.common.data.id.TenantId;
23 26 import org.thingsboard.server.common.msg.queue.ServiceType;
24 27 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
... ... @@ -31,6 +34,7 @@ import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
31 34 import org.thingsboard.server.queue.scheduler.SchedulerComponent;
32 35
33 36 import javax.annotation.PostConstruct;
  37 +import java.util.EnumMap;
34 38 import java.util.Random;
35 39 import java.util.UUID;
36 40 import java.util.concurrent.ConcurrentHashMap;
... ... @@ -47,8 +51,9 @@ public class DefaultTbApiUsageClient implements TbApiUsageClient {
47 51 @Value("${usage.stats.report.interval:10}")
48 52 private int interval;
49 53
50   - @SuppressWarnings("unchecked")
51   - private final ConcurrentMap<TenantId, AtomicLong>[] values = new ConcurrentMap[ApiUsageRecordKey.values().length];
  54 + private final EnumMap<ApiUsageRecordKey, ConcurrentMap<EntityId, AtomicLong>> stats = new EnumMap<>(ApiUsageRecordKey.class);
  55 + private final ConcurrentMap<EntityId, TenantId> tenants = new ConcurrentHashMap<>();
  56 +
52 57 private final PartitionService partitionService;
53 58 private final SchedulerComponent scheduler;
54 59 private final TbQueueProducerProvider producerProvider;
... ... @@ -65,55 +70,88 @@ public class DefaultTbApiUsageClient implements TbApiUsageClient {
65 70 if (enabled) {
66 71 msgProducer = this.producerProvider.getTbUsageStatsMsgProducer();
67 72 for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
68   - values[key.ordinal()] = new ConcurrentHashMap<>();
  73 + stats.put(key, new ConcurrentHashMap<>());
69 74 }
70   - scheduler.scheduleWithFixedDelay(this::reportStats, new Random().nextInt(interval), interval, TimeUnit.SECONDS);
  75 + scheduler.scheduleWithFixedDelay(() -> {
  76 + try {
  77 + reportStats();
  78 + } catch (Exception e) {
  79 + log.warn("Failed to report statistics: ", e);
  80 + }
  81 + }, new Random().nextInt(interval), interval, TimeUnit.SECONDS);
71 82 }
72 83 }
73 84
74 85 private void reportStats() {
75   - try {
76   - ConcurrentMap<TenantId, ToUsageStatsServiceMsg.Builder> report = new ConcurrentHashMap<>();
  86 + ConcurrentMap<EntityId, ToUsageStatsServiceMsg.Builder> report = new ConcurrentHashMap<>();
77 87
78   - for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
79   - values[key.ordinal()].forEach(((tenantId, atomicLong) -> {
80   - long value = atomicLong.getAndSet(0);
81   - if (value > 0) {
82   - ToUsageStatsServiceMsg.Builder msgBuilder = report.computeIfAbsent(tenantId, id -> {
83   - ToUsageStatsServiceMsg.Builder msg = ToUsageStatsServiceMsg.newBuilder();
84   - msg.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
85   - msg.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
86   - return msg;
87   - });
88   - msgBuilder.addValues(UsageStatsKVProto.newBuilder().setKey(key.name()).setValue(value).build());
  88 + for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
  89 + ConcurrentMap<EntityId, AtomicLong> statsForKey = stats.get(key);
  90 + statsForKey.forEach((initiatorId, statsValue) -> {
  91 + long value = statsValue.get();
  92 + if (value == 0) return;
  93 +
  94 + ToUsageStatsServiceMsg.Builder statsMsgBuilder = report.computeIfAbsent(initiatorId, id -> {
  95 + ToUsageStatsServiceMsg.Builder newStatsMsgBuilder = ToUsageStatsServiceMsg.newBuilder();
  96 +
  97 + TenantId tenantId;
  98 + if (initiatorId.getEntityType() == EntityType.TENANT) {
  99 + tenantId = (TenantId) initiatorId;
  100 + } else {
  101 + tenantId = tenants.get(initiatorId);
89 102 }
90   - }));
91   - }
92 103
93   - report.forEach(((tenantId, builder) -> {
94   - //TODO: figure out how to minimize messages into the queue. Maybe group by 100s of messages?
95   - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).newByTopic(msgProducer.getDefaultTopic());
96   - msgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), builder.build()), null);
97   - }));
98   - if (!report.isEmpty()) {
99   - log.info("Report statistics for: {} tenants", report.size());
  104 + newStatsMsgBuilder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
  105 + newStatsMsgBuilder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
  106 +
  107 + if (initiatorId.getEntityType() == EntityType.CUSTOMER) {
  108 + newStatsMsgBuilder.setCustomerIdMSB(initiatorId.getId().getMostSignificantBits());
  109 + newStatsMsgBuilder.setCustomerIdLSB(initiatorId.getId().getLeastSignificantBits());
  110 + }
  111 +
  112 + return newStatsMsgBuilder;
  113 + });
  114 +
  115 + statsMsgBuilder.addValues(UsageStatsKVProto.newBuilder().setKey(key.name()).setValue(value).build());
  116 + });
  117 + statsForKey.clear();
  118 + }
  119 +
  120 + report.forEach(((initiatorId, builder) -> {
  121 + //TODO: figure out how to minimize messages into the queue. Maybe group by 100s of messages?
  122 +
  123 + TenantId tenantId;
  124 + if (initiatorId.getEntityType() == EntityType.TENANT) {
  125 + tenantId = (TenantId) initiatorId;
  126 + } else {
  127 + tenantId = tenants.get(initiatorId);
100 128 }
101   - } catch (Exception e) {
102   - log.warn("Failed to report statistics: ", e);
  129 +
  130 + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, initiatorId).newByTopic(msgProducer.getDefaultTopic());
  131 + msgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), builder.build()), null);
  132 + }));
  133 +
  134 + if (!report.isEmpty()) {
  135 + log.info("Reporting API usage statistics for {} tenants and customers", report.size());
103 136 }
104 137 }
105 138
106 139 @Override
107   - public void report(TenantId tenantId, ApiUsageRecordKey key, long value) {
  140 + public void report(TenantId tenantId, CustomerId customerId, ApiUsageRecordKey key, long value) {
108 141 if (enabled) {
109   - ConcurrentMap<TenantId, AtomicLong> map = values[key.ordinal()];
110   - AtomicLong atomicValue = map.computeIfAbsent(tenantId, id -> new AtomicLong());
111   - atomicValue.addAndGet(value);
  142 + ConcurrentMap<EntityId, AtomicLong> statsForKey = stats.get(key);
  143 +
  144 + statsForKey.computeIfAbsent(tenantId, id -> new AtomicLong()).addAndGet(value);
  145 +
  146 + if (customerId != null && !customerId.isNullUid()) {
  147 + statsForKey.computeIfAbsent(customerId, id -> new AtomicLong()).addAndGet(value);
  148 + tenants.putIfAbsent(customerId, tenantId);
  149 + }
112 150 }
113 151 }
114 152
115 153 @Override
116   - public void report(TenantId tenantId, ApiUsageRecordKey key) {
117   - report(tenantId, key, 1L);
  154 + public void report(TenantId tenantId, CustomerId customerId, ApiUsageRecordKey key) {
  155 + report(tenantId, customerId, key, 1);
118 156 }
119 157 }
... ...
... ... @@ -16,12 +16,13 @@
16 16 package org.thingsboard.server.queue.usagestats;
17 17
18 18 import org.thingsboard.server.common.data.ApiUsageRecordKey;
  19 +import org.thingsboard.server.common.data.id.CustomerId;
19 20 import org.thingsboard.server.common.data.id.TenantId;
20 21
21 22 public interface TbApiUsageClient {
22 23
23   - void report(TenantId tenantId, ApiUsageRecordKey key, long value);
  24 + void report(TenantId tenantId, CustomerId customerId, ApiUsageRecordKey key, long value);
24 25
25   - void report(TenantId tenantId, ApiUsageRecordKey key);
  26 + void report(TenantId tenantId, CustomerId customerId, ApiUsageRecordKey key);
26 27
27 28 }
... ...
... ... @@ -53,6 +53,8 @@ message SessionInfoProto {
53 53 int64 gwSessionIdLSB = 11;
54 54 int64 deviceProfileIdMSB = 12;
55 55 int64 deviceProfileIdLSB = 13;
  56 + int64 customerIdMSB = 14;
  57 + int64 customerIdLSB = 15;
56 58 }
57 59
58 60 enum SessionEvent {
... ... @@ -110,6 +112,8 @@ message DeviceInfoProto {
110 112 string additionalInfo = 7;
111 113 int64 deviceProfileIdMSB = 8;
112 114 int64 deviceProfileIdLSB = 9;
  115 + int64 customerIdMSB = 10;
  116 + int64 customerIdLSB = 11;
113 117 }
114 118
115 119 /**
... ... @@ -638,4 +642,6 @@ message ToUsageStatsServiceMsg {
638 642 int64 entityIdMSB = 3;
639 643 int64 entityIdLSB = 4;
640 644 repeated UsageStatsKVProto values = 5;
  645 + int64 customerIdMSB = 6;
  646 + int64 customerIdLSB = 7;
641 647 }
... ...
... ... @@ -40,6 +40,8 @@ public class SessionInfoCreator {
40 40 .setDeviceIdLSB(msg.getDeviceInfo().getDeviceId().getId().getLeastSignificantBits())
41 41 .setTenantIdMSB(msg.getDeviceInfo().getTenantId().getId().getMostSignificantBits())
42 42 .setTenantIdLSB(msg.getDeviceInfo().getTenantId().getId().getLeastSignificantBits())
  43 + .setCustomerIdMSB(msg.getDeviceInfo().getCustomerId().getId().getMostSignificantBits())
  44 + .setCustomerIdLSB(msg.getDeviceInfo().getCustomerId().getId().getLeastSignificantBits())
43 45 .setDeviceName(msg.getDeviceInfo().getDeviceName())
44 46 .setDeviceType(msg.getDeviceInfo().getDeviceType())
45 47 .setDeviceProfileIdMSB(msg.getDeviceInfo().getDeviceProfileId().getId().getMostSignificantBits())
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.common.transport.auth;
17 17
18 18 import lombok.Data;
  19 +import org.thingsboard.server.common.data.id.CustomerId;
19 20 import org.thingsboard.server.common.data.id.DeviceId;
20 21 import org.thingsboard.server.common.data.id.DeviceProfileId;
21 22 import org.thingsboard.server.common.data.id.TenantId;
... ... @@ -24,6 +25,7 @@ import org.thingsboard.server.common.data.id.TenantId;
24 25 public class TransportDeviceInfo {
25 26
26 27 private TenantId tenantId;
  28 + private CustomerId customerId;
27 29 private DeviceProfileId deviceProfileId;
28 30 private DeviceId deviceId;
29 31 private String deviceName;
... ...
... ... @@ -33,8 +33,10 @@ import org.thingsboard.server.common.data.DeviceTransportType;
33 33 import org.thingsboard.server.common.data.EntityType;
34 34 import org.thingsboard.server.common.data.ResourceType;
35 35 import org.thingsboard.server.common.data.Tenant;
  36 +import org.thingsboard.server.common.data.id.CustomerId;
36 37 import org.thingsboard.server.common.data.id.DeviceId;
37 38 import org.thingsboard.server.common.data.id.DeviceProfileId;
  39 +import org.thingsboard.server.common.data.id.EntityId;
38 40 import org.thingsboard.server.common.data.id.RuleChainId;
39 41 import org.thingsboard.server.common.data.id.TenantId;
40 42 import org.thingsboard.server.common.data.id.TenantProfileId;
... ... @@ -359,6 +361,7 @@ public class DefaultTransportService implements TransportService {
359 361 private TransportDeviceInfo getTransportDeviceInfo(TransportProtos.DeviceInfoProto di) {
360 362 TransportDeviceInfo tdi = new TransportDeviceInfo();
361 363 tdi.setTenantId(new TenantId(new UUID(di.getTenantIdMSB(), di.getTenantIdLSB())));
  364 + tdi.setCustomerId(new CustomerId(new UUID(di.getCustomerIdMSB(), di.getCustomerIdLSB())));
362 365 tdi.setDeviceId(new DeviceId(new UUID(di.getDeviceIdMSB(), di.getDeviceIdLSB())));
363 366 tdi.setDeviceProfileId(new DeviceProfileId(new UUID(di.getDeviceProfileIdMSB(), di.getDeviceProfileIdLSB())));
364 367 tdi.setAdditionalInfo(di.getAdditionalInfo());
... ... @@ -404,9 +407,9 @@ public class DefaultTransportService implements TransportService {
404 407 }
405 408 if (checkLimits(sessionInfo, msg, callback, dataPoints)) {
406 409 reportActivityInternal(sessionInfo);
407   - TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
  410 + TenantId tenantId = getTenantId(sessionInfo);
408 411 DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
409   - MsgPackCallback packCallback = new MsgPackCallback(msg.getTsKvListCount(), new ApiStatsProxyCallback<>(tenantId, dataPoints, callback));
  412 + MsgPackCallback packCallback = new MsgPackCallback(msg.getTsKvListCount(), new ApiStatsProxyCallback<>(tenantId, getCustomerId(sessionInfo), dataPoints, callback));
410 413 for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) {
411 414 TbMsgMetaData metaData = new TbMsgMetaData();
412 415 metaData.putValue("deviceName", sessionInfo.getDeviceName());
... ... @@ -423,7 +426,7 @@ public class DefaultTransportService implements TransportService {
423 426 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
424 427 if (checkLimits(sessionInfo, msg, callback, msg.getKvCount())) {
425 428 reportActivityInternal(sessionInfo);
426   - TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
  429 + TenantId tenantId = getTenantId(sessionInfo);
427 430 DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
428 431 JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
429 432 TbMsgMetaData metaData = new TbMsgMetaData();
... ... @@ -431,7 +434,7 @@ public class DefaultTransportService implements TransportService {
431 434 metaData.putValue("deviceType", sessionInfo.getDeviceType());
432 435 metaData.putValue("notifyDevice", "false");
433 436 sendToRuleEngine(tenantId, deviceId, sessionInfo, json, metaData, SessionMsgType.POST_ATTRIBUTES_REQUEST,
434   - new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, msg.getKvList().size(), callback)));
  437 + new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, getCustomerId(sessionInfo), msg.getKvList().size(), callback)));
435 438 }
436 439 }
437 440
... ... @@ -440,7 +443,7 @@ public class DefaultTransportService implements TransportService {
440 443 if (checkLimits(sessionInfo, msg, callback)) {
441 444 reportActivityInternal(sessionInfo);
442 445 sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
443   - .setGetAttributes(msg).build(), new ApiStatsProxyCallback<>(getTenantId(sessionInfo), 1, callback));
  446 + .setGetAttributes(msg).build(), new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, callback));
444 447 }
445 448 }
446 449
... ... @@ -449,8 +452,8 @@ public class DefaultTransportService implements TransportService {
449 452 if (checkLimits(sessionInfo, msg, callback)) {
450 453 SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
451 454 sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe());
452   - sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
453   - .setSubscribeToAttributes(msg).build(), new ApiStatsProxyCallback<>(getTenantId(sessionInfo), 1, callback));
  455 + sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(),
  456 + new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, callback));
454 457 }
455 458 }
456 459
... ... @@ -459,8 +462,8 @@ public class DefaultTransportService implements TransportService {
459 462 if (checkLimits(sessionInfo, msg, callback)) {
460 463 SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
461 464 sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
462   - sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
463   - .setSubscribeToRPC(msg).build(), new ApiStatsProxyCallback<>(getTenantId(sessionInfo), 1, callback));
  465 + sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(),
  466 + new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, callback));
464 467 }
465 468 }
466 469
... ... @@ -468,8 +471,8 @@ public class DefaultTransportService implements TransportService {
468 471 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
469 472 if (checkLimits(sessionInfo, msg, callback)) {
470 473 reportActivityInternal(sessionInfo);
471   - sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
472   - .setToDeviceRPCCallResponse(msg).build(), new ApiStatsProxyCallback<>(getTenantId(sessionInfo), 1, callback));
  474 + sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build(),
  475 + new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, callback));
473 476 }
474 477 }
475 478
... ... @@ -797,6 +800,16 @@ public class DefaultTransportService implements TransportService {
797 800 return new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
798 801 }
799 802
  803 + protected CustomerId getCustomerId(TransportProtos.SessionInfoProto sessionInfo) {
  804 + long msb = sessionInfo.getCustomerIdMSB();
  805 + long lsb = sessionInfo.getCustomerIdLSB();
  806 + if (msb != 0 && lsb != 0) {
  807 + return new CustomerId(new UUID(msb, lsb));
  808 + } else {
  809 + return new CustomerId(EntityId.NULL_UUID);
  810 + }
  811 + }
  812 +
800 813 protected DeviceId getDeviceId(TransportProtos.SessionInfoProto sessionInfo) {
801 814 return new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
802 815 }
... ... @@ -922,11 +935,13 @@ public class DefaultTransportService implements TransportService {
922 935
923 936 private class ApiStatsProxyCallback<T> implements TransportServiceCallback<T> {
924 937 private final TenantId tenantId;
  938 + private final CustomerId customerId;
925 939 private final int dataPoints;
926 940 private final TransportServiceCallback<T> callback;
927 941
928   - public ApiStatsProxyCallback(TenantId tenantId, int dataPoints, TransportServiceCallback<T> callback) {
  942 + public ApiStatsProxyCallback(TenantId tenantId, CustomerId customerId, int dataPoints, TransportServiceCallback<T> callback) {
929 943 this.tenantId = tenantId;
  944 + this.customerId = customerId;
930 945 this.dataPoints = dataPoints;
931 946 this.callback = callback;
932 947 }
... ... @@ -934,8 +949,8 @@ public class DefaultTransportService implements TransportService {
934 949 @Override
935 950 public void onSuccess(T msg) {
936 951 try {
937   - apiUsageClient.report(tenantId, ApiUsageRecordKey.TRANSPORT_MSG_COUNT, 1);
938   - apiUsageClient.report(tenantId, ApiUsageRecordKey.TRANSPORT_DP_COUNT, dataPoints);
  952 + apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.TRANSPORT_MSG_COUNT, 1);
  953 + apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.TRANSPORT_DP_COUNT, dataPoints);
939 954 } finally {
940 955 callback.onSuccess(msg);
941 956 }
... ...
... ... @@ -33,6 +33,8 @@ public interface ApiUsageStateRepository extends CrudRepository<ApiUsageStateEnt
33 33 "AND ur.entityId = :tenantId AND ur.entityType = 'TENANT' ")
34 34 ApiUsageStateEntity findByTenantId(@Param("tenantId") UUID tenantId);
35 35
  36 + ApiUsageStateEntity findByEntityIdAndEntityType(UUID entityId, String entityType);
  37 +
36 38 @Transactional
37 39 @Modifying
38 40 @Query("DELETE FROM ApiUsageStateEntity ur WHERE ur.tenantId = :tenantId")
... ...
... ... @@ -18,6 +18,7 @@ package org.thingsboard.server.dao.sql.usagerecord;
18 18 import org.springframework.data.repository.CrudRepository;
19 19 import org.springframework.stereotype.Component;
20 20 import org.thingsboard.server.common.data.ApiUsageState;
  21 +import org.thingsboard.server.common.data.id.EntityId;
21 22 import org.thingsboard.server.common.data.id.TenantId;
22 23 import org.thingsboard.server.dao.DaoUtil;
23 24 import org.thingsboard.server.dao.model.sql.ApiUsageStateEntity;
... ... @@ -54,6 +55,11 @@ public class JpaApiUsageStateDao extends JpaAbstractDao<ApiUsageStateEntity, Api
54 55 }
55 56
56 57 @Override
  58 + public ApiUsageState findApiUsageStateByEntityId(EntityId entityId) {
  59 + return DaoUtil.getData(apiUsageStateRepository.findByEntityIdAndEntityType(entityId.getId(), entityId.getEntityType().name()));
  60 + }
  61 +
  62 + @Override
57 63 public void deleteApiUsageStateByTenantId(TenantId tenantId) {
58 64 apiUsageStateRepository.deleteApiUsageStateByTenantId(tenantId.getId());
59 65 }
... ...
... ... @@ -125,7 +125,7 @@ public class TenantServiceImpl extends AbstractEntityService implements TenantSe
125 125 Tenant savedTenant = tenantDao.save(tenant.getId(), tenant);
126 126 if (tenant.getId() == null) {
127 127 deviceProfileService.createDefaultDeviceProfile(savedTenant.getId());
128   - apiUsageStateService.createDefaultApiUsageState(savedTenant.getId());
  128 + apiUsageStateService.createDefaultApiUsageState(savedTenant.getId(), null);
129 129 }
130 130 return savedTenant;
131 131 }
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.dao.usagerecord;
17 17
18 18 import org.thingsboard.server.common.data.ApiUsageState;
  19 +import org.thingsboard.server.common.data.id.EntityId;
19 20 import org.thingsboard.server.common.data.id.TenantId;
20 21 import org.thingsboard.server.dao.Dao;
21 22
... ... @@ -39,6 +40,8 @@ public interface ApiUsageStateDao extends Dao<ApiUsageState> {
39 40 */
40 41 ApiUsageState findTenantApiUsageState(UUID tenantId);
41 42
  43 + ApiUsageState findApiUsageStateByEntityId(EntityId entityId);
  44 +
42 45 /**
43 46 * Delete usage record by tenantId.
44 47 *
... ...
... ... @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.EntityType;
25 25 import org.thingsboard.server.common.data.Tenant;
26 26 import org.thingsboard.server.common.data.TenantProfile;
27 27 import org.thingsboard.server.common.data.id.ApiUsageStateId;
  28 +import org.thingsboard.server.common.data.id.EntityId;
28 29 import org.thingsboard.server.common.data.id.TenantId;
29 30 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
30 31 import org.thingsboard.server.common.data.kv.LongDataEntry;
... ... @@ -68,12 +69,12 @@ public class ApiUsageStateServiceImpl extends AbstractEntityService implements A
68 69 }
69 70
70 71 @Override
71   - public ApiUsageState createDefaultApiUsageState(TenantId tenantId) {
  72 + public ApiUsageState createDefaultApiUsageState(TenantId tenantId, EntityId entityId) {
72 73 log.trace("Executing createDefaultUsageRecord [{}]", tenantId);
73 74 validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
74 75 ApiUsageState apiUsageState = new ApiUsageState();
75 76 apiUsageState.setTenantId(tenantId);
76   - apiUsageState.setEntityId(tenantId);
  77 + apiUsageState.setEntityId(entityId);
77 78 apiUsageState.setTransportState(ApiUsageStateValue.ENABLED);
78 79 apiUsageState.setReExecState(ApiUsageStateValue.ENABLED);
79 80 apiUsageState.setJsExecState(ApiUsageStateValue.ENABLED);
... ... @@ -87,6 +88,7 @@ public class ApiUsageStateServiceImpl extends AbstractEntityService implements A
87 88 Tenant tenant = tenantDao.findById(tenantId, tenantId.getId());
88 89 TenantProfile tenantProfile = tenantProfileDao.findById(tenantId, tenant.getTenantProfileId().getId());
89 90 TenantProfileConfiguration configuration = tenantProfile.getProfileData().getConfiguration();
  91 +
90 92 List<TsKvEntry> apiUsageStates = new ArrayList<>();
91 93 apiUsageStates.add(new BasicTsKvEntry(saved.getCreatedTime(),
92 94 new StringDataEntry(ApiFeature.TRANSPORT.getApiStateKey(), ApiUsageStateValue.ENABLED.name())));
... ... @@ -127,6 +129,12 @@ public class ApiUsageStateServiceImpl extends AbstractEntityService implements A
127 129 }
128 130
129 131 @Override
  132 + public ApiUsageState findApiUsageStateByEntityId(EntityId entityId) {
  133 + validateId(entityId.getId(), "Invalid entity id");
  134 + return apiUsageStateDao.findApiUsageStateByEntityId(entityId);
  135 + }
  136 +
  137 + @Override
130 138 public ApiUsageState findApiUsageStateById(TenantId tenantId, ApiUsageStateId id) {
131 139 log.trace("Executing findApiUsageStateById, tenantId [{}], apiUsageStateId [{}]", tenantId, id);
132 140 validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
... ... @@ -148,10 +156,8 @@ public class ApiUsageStateServiceImpl extends AbstractEntityService implements A
148 156 }
149 157 if (apiUsageState.getEntityId() == null) {
150 158 throw new DataValidationException("UsageRecord should be assigned to entity!");
151   - } else if (!EntityType.TENANT.equals(apiUsageState.getEntityId().getEntityType())) {
152   - throw new DataValidationException("Only Tenant Usage Records are supported!");
153   - } else if (!apiUsageState.getTenantId().getId().equals(apiUsageState.getEntityId().getId())) {
154   - throw new DataValidationException("Can't assign one Usage Record to multiple tenants!");
  159 + } else if (apiUsageState.getEntityId().getEntityType() != EntityType.TENANT && apiUsageState.getEntityId().getEntityType() != EntityType.CUSTOMER) {
  160 + throw new DataValidationException("Only Tenant and Customer Usage Records are supported!");
155 161 }
156 162 }
157 163 };
... ...
... ... @@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.ApiFeature;
20 20 import org.thingsboard.server.common.data.ApiUsageStateMailMessage;
21 21 import org.thingsboard.server.common.data.ApiUsageStateValue;
22 22 import org.thingsboard.server.common.data.exception.ThingsboardException;
  23 +import org.thingsboard.server.common.data.id.CustomerId;
23 24 import org.thingsboard.server.common.data.id.TenantId;
24 25
25 26 import javax.mail.MessagingException;
... ... @@ -40,7 +41,7 @@ public interface MailService {
40 41
41 42 void sendPasswordWasResetEmail(String loginLink, String email) throws ThingsboardException;
42 43
43   - void send(TenantId tenantId, String from, String to, String cc, String bcc, String subject, String body) throws MessagingException;
  44 + void send(TenantId tenantId, CustomerId customerId, String from, String to, String cc, String bcc, String subject, String body) throws MessagingException;
44 45
45 46 void sendAccountLockoutEmail( String lockoutEmail, String email, Integer maxFailedLoginAttempts) throws ThingsboardException;
46 47
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.rule.engine.api;
17 17
18 18 import com.google.common.util.concurrent.FutureCallback;
  19 +import org.thingsboard.server.common.data.id.CustomerId;
19 20 import org.thingsboard.server.common.data.id.EntityId;
20 21 import org.thingsboard.server.common.data.id.TenantId;
21 22 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
... ... @@ -31,7 +32,7 @@ public interface RuleEngineTelemetryService {
31 32
32 33 void saveAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
33 34
34   - void saveAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback);
  35 + void saveAndNotify(TenantId tenantId, CustomerId id, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback);
35 36
36 37 void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback);
37 38
... ...
... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.rule.engine.api;
17 17
  18 +import org.thingsboard.server.common.data.id.CustomerId;
18 19 import org.thingsboard.server.common.data.sms.config.TestSmsRequest;
19 20 import org.thingsboard.server.common.data.exception.ThingsboardException;
20 21 import org.thingsboard.server.common.data.id.TenantId;
... ... @@ -23,7 +24,7 @@ public interface SmsService {
23 24
24 25 void updateSmsConfiguration();
25 26
26   - void sendSms(TenantId tenantId, String[] numbersTo, String message) throws ThingsboardException;;
  27 + void sendSms(TenantId tenantId, CustomerId customerId, String[] numbersTo, String message) throws ThingsboardException;;
27 28
28 29 void sendTestSms(TestSmsRequest testSmsRequest) throws ThingsboardException;
29 30
... ...
... ... @@ -26,7 +26,6 @@ import org.thingsboard.rule.engine.api.TbNode;
26 26 import org.thingsboard.rule.engine.api.TbNodeConfiguration;
27 27 import org.thingsboard.rule.engine.api.TbNodeException;
28 28 import org.thingsboard.rule.engine.api.util.TbNodeUtils;
29   -import org.thingsboard.server.common.data.ApiUsageRecordKey;
30 29 import org.thingsboard.server.common.data.plugin.ComponentType;
31 30 import org.thingsboard.server.common.msg.TbMsg;
32 31
... ... @@ -76,7 +75,7 @@ public class TbSendEmailNode implements TbNode {
76 75 validateType(msg.getType());
77 76 EmailPojo email = getEmail(msg);
78 77 withCallback(ctx.getMailExecutor().executeAsync(() -> {
79   - sendEmail(ctx, email);
  78 + sendEmail(ctx, msg, email);
80 79 return null;
81 80 }),
82 81 ok -> ctx.tellSuccess(msg),
... ... @@ -86,9 +85,9 @@ public class TbSendEmailNode implements TbNode {
86 85 }
87 86 }
88 87
89   - private void sendEmail(TbContext ctx, EmailPojo email) throws Exception {
  88 + private void sendEmail(TbContext ctx, TbMsg msg, EmailPojo email) throws Exception {
90 89 if (this.config.isUseSystemSmtpSettings()) {
91   - ctx.getMailService().send(ctx.getTenantId(), email.getFrom(), email.getTo(), email.getCc(),
  90 + ctx.getMailService().send(ctx.getTenantId(), msg.getCustomerId(), email.getFrom(), email.getTo(), email.getCc(),
92 91 email.getBcc(), email.getSubject(), email.getBody());
93 92 } else {
94 93 MimeMessage mailMsg = mailSender.createMimeMessage();
... ...
... ... @@ -76,7 +76,7 @@ public class TbSendSmsNode implements TbNode {
76 76 String message = TbNodeUtils.processPattern(this.config.getSmsMessageTemplate(), msg);
77 77 String[] numbersToList = numbersTo.split(",");
78 78 if (this.config.isUseSystemSmsSettings()) {
79   - ctx.getSmsService().sendSms(ctx.getTenantId(), numbersToList, message);
  79 + ctx.getSmsService().sendSms(ctx.getTenantId(), msg.getCustomerId(), numbersToList, message);
80 80 } else {
81 81 for (String numberTo : numbersToList) {
82 82 this.smsSender.sendSms(numberTo, message);
... ...
... ... @@ -25,6 +25,7 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
25 25 import org.thingsboard.rule.engine.api.TbNodeException;
26 26 import org.thingsboard.server.common.data.TenantProfile;
27 27 import org.thingsboard.rule.engine.api.util.TbNodeUtils;
  28 +import org.thingsboard.server.common.data.id.CustomerId;
28 29 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
29 30 import org.thingsboard.server.common.data.kv.KvEntry;
30 31 import org.thingsboard.server.common.data.kv.TsKvEntry;
... ... @@ -93,7 +94,7 @@ public class TbMsgTimeseriesNode implements TbNode {
93 94 if (ttl == 0L) {
94 95 ttl = tenantProfileDefaultStorageTtl;
95 96 }
96   - ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(), tsKvEntryList, ttl, new TelemetryNodeCallback(ctx, msg));
  97 + ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getCustomerId(), msg.getOriginator(), tsKvEntryList, ttl, new TelemetryNodeCallback(ctx, msg));
97 98 }
98 99
99 100 public static long getTs(TbMsg msg) {
... ...