Commit 9e2bfa40492c5ac28c25ab82e798c8225f8c5b10
1 parent
e17de056
Perfomance improvement for tenant state load
Showing
1 changed file
with
26 additions
and
12 deletions
application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java
... | ... | @@ -72,6 +72,7 @@ import java.util.concurrent.ConcurrentHashMap; |
72 | 72 | import java.util.concurrent.ExecutionException; |
73 | 73 | import java.util.concurrent.ExecutorService; |
74 | 74 | import java.util.concurrent.Executors; |
75 | +import java.util.concurrent.Future; | |
75 | 76 | import java.util.concurrent.TimeUnit; |
76 | 77 | import java.util.concurrent.locks.Lock; |
77 | 78 | import java.util.concurrent.locks.ReentrantLock; |
... | ... | @@ -421,20 +422,33 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa |
421 | 422 | private void initStatesFromDataBase() { |
422 | 423 | try { |
423 | 424 | log.info("Initializing tenant states."); |
424 | - PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, 1024); | |
425 | - for (Tenant tenant : tenantIterator) { | |
426 | - if (!myTenantStates.containsKey(tenant.getId()) && partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), tenant.getId()).isMyPartition()) { | |
427 | - log.debug("[{}] Initializing tenant state.", tenant.getId()); | |
428 | - updateLock.lock(); | |
429 | - try { | |
430 | - updateTenantState(getOrFetchState(tenant.getId()), tenantProfileCache.get(tenant.getTenantProfileId())); | |
431 | - log.debug("[{}] Initialized tenant state.", tenant.getId()); | |
432 | - } catch (Exception e) { | |
433 | - log.warn("[{}] Failed to initialize tenant API state", tenant.getId(), e); | |
434 | - } finally { | |
435 | - updateLock.unlock(); | |
425 | + updateLock.lock(); | |
426 | + try { | |
427 | + ExecutorService tmpInitExecutor = Executors.newWorkStealingPool(20); | |
428 | + try { | |
429 | + PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, 1024); | |
430 | + List<Future<?>> futures = new ArrayList<>(); | |
431 | + for (Tenant tenant : tenantIterator) { | |
432 | + if (!myTenantStates.containsKey(tenant.getId()) && partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), tenant.getId()).isMyPartition()) { | |
433 | + log.debug("[{}] Initializing tenant state.", tenant.getId()); | |
434 | + futures.add(tmpInitExecutor.submit(() -> { | |
435 | + try { | |
436 | + updateTenantState(getOrFetchState(tenant.getId()), tenantProfileCache.get(tenant.getTenantProfileId())); | |
437 | + log.debug("[{}] Initialized tenant state.", tenant.getId()); | |
438 | + } catch (Exception e) { | |
439 | + log.warn("[{}] Failed to initialize tenant API state", tenant.getId(), e); | |
440 | + } | |
441 | + })); | |
442 | + } | |
443 | + } | |
444 | + for (Future<?> future : futures) { | |
445 | + future.get(); | |
436 | 446 | } |
447 | + } finally { | |
448 | + tmpInitExecutor.shutdownNow(); | |
437 | 449 | } |
450 | + } finally { | |
451 | + updateLock.unlock(); | |
438 | 452 | } |
439 | 453 | log.info("Initialized tenant states."); |
440 | 454 | } catch (Exception e) { | ... | ... |