Commit 9d045e231b2e16a61760470e99aab2050d48e872

Authored by Viacheslav Klimov
Committed by Andrew Shvayka
1 parent 62979e80

Refactor DefaultTbApiUsageClient

@@ -161,14 +161,14 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa @@ -161,14 +161,14 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
161 ToUsageStatsServiceMsg statsMsg = msg.getValue(); 161 ToUsageStatsServiceMsg statsMsg = msg.getValue();
162 162
163 TenantId tenantId = new TenantId(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB())); 163 TenantId tenantId = new TenantId(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB()));
164 - EntityId initiatorId; 164 + EntityId entityId;
165 if (statsMsg.getCustomerIdMSB() != 0 && statsMsg.getCustomerIdLSB() != 0) { 165 if (statsMsg.getCustomerIdMSB() != 0 && statsMsg.getCustomerIdLSB() != 0) {
166 - initiatorId = new CustomerId(new UUID(statsMsg.getCustomerIdMSB(), statsMsg.getCustomerIdLSB())); 166 + entityId = new CustomerId(new UUID(statsMsg.getCustomerIdMSB(), statsMsg.getCustomerIdLSB()));
167 } else { 167 } else {
168 - initiatorId = tenantId; 168 + entityId = tenantId;
169 } 169 }
170 170
171 - processEntityUsageStats(tenantId, initiatorId, statsMsg.getValuesList()); 171 + processEntityUsageStats(tenantId, entityId, statsMsg.getValuesList());
172 callback.onSuccess(); 172 callback.onSuccess();
173 } 173 }
174 174
@@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
15 */ 15 */
16 package org.thingsboard.server.queue.usagestats; 16 package org.thingsboard.server.queue.usagestats;
17 17
  18 +import lombok.Data;
18 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
19 import org.springframework.beans.factory.annotation.Value; 20 import org.springframework.beans.factory.annotation.Value;
20 import org.springframework.stereotype.Component; 21 import org.springframework.stereotype.Component;
@@ -35,6 +36,7 @@ import org.thingsboard.server.queue.scheduler.SchedulerComponent; @@ -35,6 +36,7 @@ import org.thingsboard.server.queue.scheduler.SchedulerComponent;
35 36
36 import javax.annotation.PostConstruct; 37 import javax.annotation.PostConstruct;
37 import java.util.EnumMap; 38 import java.util.EnumMap;
  39 +import java.util.Optional;
38 import java.util.Random; 40 import java.util.Random;
39 import java.util.UUID; 41 import java.util.UUID;
40 import java.util.concurrent.ConcurrentHashMap; 42 import java.util.concurrent.ConcurrentHashMap;
@@ -51,8 +53,7 @@ public class DefaultTbApiUsageClient implements TbApiUsageClient { @@ -51,8 +53,7 @@ public class DefaultTbApiUsageClient implements TbApiUsageClient {
51 @Value("${usage.stats.report.interval:10}") 53 @Value("${usage.stats.report.interval:10}")
52 private int interval; 54 private int interval;
53 55
54 - private final EnumMap<ApiUsageRecordKey, ConcurrentMap<EntityId, AtomicLong>> stats = new EnumMap<>(ApiUsageRecordKey.class);  
55 - private final ConcurrentMap<EntityId, TenantId> tenants = new ConcurrentHashMap<>(); 56 + private final EnumMap<ApiUsageRecordKey, ConcurrentMap<OwnerId, AtomicLong>> stats = new EnumMap<>(ApiUsageRecordKey.class);
56 57
57 private final PartitionService partitionService; 58 private final PartitionService partitionService;
58 private final SchedulerComponent scheduler; 59 private final SchedulerComponent scheduler;
@@ -83,30 +84,25 @@ public class DefaultTbApiUsageClient implements TbApiUsageClient { @@ -83,30 +84,25 @@ public class DefaultTbApiUsageClient implements TbApiUsageClient {
83 } 84 }
84 85
85 private void reportStats() { 86 private void reportStats() {
86 - ConcurrentMap<EntityId, ToUsageStatsServiceMsg.Builder> report = new ConcurrentHashMap<>(); 87 + ConcurrentMap<OwnerId, ToUsageStatsServiceMsg.Builder> report = new ConcurrentHashMap<>();
87 88
88 for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { 89 for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
89 - ConcurrentMap<EntityId, AtomicLong> statsForKey = stats.get(key);  
90 - statsForKey.forEach((initiatorId, statsValue) -> { 90 + ConcurrentMap<OwnerId, AtomicLong> statsForKey = stats.get(key);
  91 + statsForKey.forEach((ownerId, statsValue) -> {
91 long value = statsValue.get(); 92 long value = statsValue.get();
92 if (value == 0) return; 93 if (value == 0) return;
93 94
94 - ToUsageStatsServiceMsg.Builder statsMsgBuilder = report.computeIfAbsent(initiatorId, id -> { 95 + ToUsageStatsServiceMsg.Builder statsMsgBuilder = report.computeIfAbsent(ownerId, id -> {
95 ToUsageStatsServiceMsg.Builder newStatsMsgBuilder = ToUsageStatsServiceMsg.newBuilder(); 96 ToUsageStatsServiceMsg.Builder newStatsMsgBuilder = ToUsageStatsServiceMsg.newBuilder();
96 97
97 - TenantId tenantId;  
98 - if (initiatorId.getEntityType() == EntityType.TENANT) {  
99 - tenantId = (TenantId) initiatorId;  
100 - } else {  
101 - tenantId = tenants.get(initiatorId);  
102 - }  
103 - 98 + TenantId tenantId = ownerId.getTenantId();
104 newStatsMsgBuilder.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); 99 newStatsMsgBuilder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
105 newStatsMsgBuilder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()); 100 newStatsMsgBuilder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
106 101
107 - if (initiatorId.getEntityType() == EntityType.CUSTOMER) {  
108 - newStatsMsgBuilder.setCustomerIdMSB(initiatorId.getId().getMostSignificantBits());  
109 - newStatsMsgBuilder.setCustomerIdLSB(initiatorId.getId().getLeastSignificantBits()); 102 + EntityId entityId = ownerId.getEntityId();
  103 + if (entityId != null && entityId.getEntityType() == EntityType.CUSTOMER) {
  104 + newStatsMsgBuilder.setCustomerIdMSB(entityId.getId().getMostSignificantBits());
  105 + newStatsMsgBuilder.setCustomerIdLSB(entityId.getId().getLeastSignificantBits());
110 } 106 }
111 107
112 return newStatsMsgBuilder; 108 return newStatsMsgBuilder;
@@ -117,18 +113,13 @@ public class DefaultTbApiUsageClient implements TbApiUsageClient { @@ -117,18 +113,13 @@ public class DefaultTbApiUsageClient implements TbApiUsageClient {
117 statsForKey.clear(); 113 statsForKey.clear();
118 } 114 }
119 115
120 - report.forEach(((initiatorId, builder) -> { 116 + report.forEach(((ownerId, statsMsg) -> {
121 //TODO: figure out how to minimize messages into the queue. Maybe group by 100s of messages? 117 //TODO: figure out how to minimize messages into the queue. Maybe group by 100s of messages?
122 118
123 - TenantId tenantId;  
124 - if (initiatorId.getEntityType() == EntityType.TENANT) {  
125 - tenantId = (TenantId) initiatorId;  
126 - } else {  
127 - tenantId = tenants.get(initiatorId);  
128 - }  
129 -  
130 - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, initiatorId).newByTopic(msgProducer.getDefaultTopic());  
131 - msgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), builder.build()), null); 119 + TenantId tenantId = ownerId.getTenantId();
  120 + EntityId entityId = Optional.ofNullable(ownerId.getEntityId()).orElse(tenantId);
  121 + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId).newByTopic(msgProducer.getDefaultTopic());
  122 + msgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), statsMsg.build()), null);
132 })); 123 }));
133 124
134 if (!report.isEmpty()) { 125 if (!report.isEmpty()) {
@@ -139,14 +130,13 @@ public class DefaultTbApiUsageClient implements TbApiUsageClient { @@ -139,14 +130,13 @@ public class DefaultTbApiUsageClient implements TbApiUsageClient {
139 @Override 130 @Override
140 public void report(TenantId tenantId, CustomerId customerId, ApiUsageRecordKey key, long value) { 131 public void report(TenantId tenantId, CustomerId customerId, ApiUsageRecordKey key, long value) {
141 if (enabled) { 132 if (enabled) {
142 - ConcurrentMap<EntityId, AtomicLong> statsForKey = stats.get(key); 133 + ConcurrentMap<OwnerId, AtomicLong> statsForKey = stats.get(key);
143 134
144 - statsForKey.computeIfAbsent(tenantId, id -> new AtomicLong()).addAndGet(value);  
145 - statsForKey.computeIfAbsent(TenantId.SYS_TENANT_ID, id -> new AtomicLong()).addAndGet(value); 135 + statsForKey.computeIfAbsent(new OwnerId(tenantId), id -> new AtomicLong()).addAndGet(value);
  136 + statsForKey.computeIfAbsent(new OwnerId(TenantId.SYS_TENANT_ID), id -> new AtomicLong()).addAndGet(value);
146 137
147 if (customerId != null && !customerId.isNullUid()) { 138 if (customerId != null && !customerId.isNullUid()) {
148 - statsForKey.computeIfAbsent(customerId, id -> new AtomicLong()).addAndGet(value);  
149 - tenants.putIfAbsent(customerId, tenantId); 139 + statsForKey.computeIfAbsent(new OwnerId(tenantId, customerId), id -> new AtomicLong()).addAndGet(value);
150 } 140 }
151 } 141 }
152 } 142 }
@@ -155,4 +145,19 @@ public class DefaultTbApiUsageClient implements TbApiUsageClient { @@ -155,4 +145,19 @@ public class DefaultTbApiUsageClient implements TbApiUsageClient {
155 public void report(TenantId tenantId, CustomerId customerId, ApiUsageRecordKey key) { 145 public void report(TenantId tenantId, CustomerId customerId, ApiUsageRecordKey key) {
156 report(tenantId, customerId, key, 1); 146 report(tenantId, customerId, key, 1);
157 } 147 }
  148 +
  149 + @Data
  150 + private static class OwnerId {
  151 + private TenantId tenantId;
  152 + private EntityId entityId;
  153 +
  154 + public OwnerId(TenantId tenantId) {
  155 + this.tenantId = tenantId;
  156 + }
  157 +
  158 + public OwnerId(TenantId tenantId, EntityId entityId) {
  159 + this.tenantId = tenantId;
  160 + this.entityId = entityId;
  161 + }
  162 + }
158 } 163 }