Commit c4276ae41b4decbbc14f5bcfc352e4cc29a891ab
1 parent
73d39f40
Save activity flag for devices loaded as part of demo data
Showing
1 changed file
with
88 additions
and
1 deletions
application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java
... | ... | @@ -17,12 +17,18 @@ package org.thingsboard.server.service.install; |
17 | 17 | |
18 | 18 | import com.fasterxml.jackson.databind.ObjectMapper; |
19 | 19 | import com.fasterxml.jackson.databind.node.ObjectNode; |
20 | +import com.google.common.util.concurrent.FutureCallback; | |
21 | +import com.google.common.util.concurrent.Futures; | |
22 | +import com.google.common.util.concurrent.ListenableFuture; | |
23 | +import lombok.Getter; | |
20 | 24 | import lombok.extern.slf4j.Slf4j; |
21 | 25 | import org.springframework.beans.factory.annotation.Autowired; |
26 | +import org.springframework.beans.factory.annotation.Value; | |
22 | 27 | import org.springframework.context.annotation.Bean; |
23 | 28 | import org.springframework.context.annotation.Profile; |
24 | 29 | import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder; |
25 | 30 | import org.springframework.stereotype.Service; |
31 | +import org.thingsboard.common.util.ThingsBoardThreadFactory; | |
26 | 32 | import org.thingsboard.server.common.data.AdminSettings; |
27 | 33 | import org.thingsboard.server.common.data.Customer; |
28 | 34 | import org.thingsboard.server.common.data.DataConstants; |
... | ... | @@ -51,6 +57,7 @@ import org.thingsboard.server.common.data.id.DeviceId; |
51 | 57 | import org.thingsboard.server.common.data.id.DeviceProfileId; |
52 | 58 | import org.thingsboard.server.common.data.id.TenantId; |
53 | 59 | import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; |
60 | +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; | |
54 | 61 | import org.thingsboard.server.common.data.kv.BooleanDataEntry; |
55 | 62 | import org.thingsboard.server.common.data.kv.DoubleDataEntry; |
56 | 63 | import org.thingsboard.server.common.data.kv.LongDataEntry; |
... | ... | @@ -78,12 +85,19 @@ import org.thingsboard.server.dao.rule.RuleChainService; |
78 | 85 | import org.thingsboard.server.dao.settings.AdminSettingsService; |
79 | 86 | import org.thingsboard.server.dao.tenant.TenantProfileService; |
80 | 87 | import org.thingsboard.server.dao.tenant.TenantService; |
88 | +import org.thingsboard.server.dao.timeseries.TimeseriesService; | |
81 | 89 | import org.thingsboard.server.dao.user.UserService; |
82 | 90 | import org.thingsboard.server.dao.widget.WidgetsBundleService; |
83 | 91 | |
92 | +import javax.annotation.Nullable; | |
93 | +import javax.annotation.PostConstruct; | |
94 | +import javax.annotation.PreDestroy; | |
84 | 95 | import java.util.Arrays; |
85 | 96 | import java.util.Collections; |
97 | +import java.util.List; | |
86 | 98 | import java.util.TreeMap; |
99 | +import java.util.concurrent.ExecutorService; | |
100 | +import java.util.concurrent.Executors; | |
87 | 101 | |
88 | 102 | @Service |
89 | 103 | @Profile("install") |
... | ... | @@ -93,6 +107,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService { |
93 | 107 | private static final ObjectMapper objectMapper = new ObjectMapper(); |
94 | 108 | public static final String CUSTOMER_CRED = "customer"; |
95 | 109 | public static final String DEFAULT_DEVICE_TYPE = "default"; |
110 | + public static final String ACTIVITY_STATE = "active"; | |
96 | 111 | |
97 | 112 | @Autowired |
98 | 113 | private InstallScripts installScripts; |
... | ... | @@ -133,11 +148,32 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService { |
133 | 148 | @Autowired |
134 | 149 | private RuleChainService ruleChainService; |
135 | 150 | |
151 | + @Autowired | |
152 | + private TimeseriesService tsService; | |
153 | + | |
154 | + @Value("${state.persistToTelemetry:false}") | |
155 | + @Getter | |
156 | + private boolean persistActivityToTelemetry; | |
157 | + | |
136 | 158 | @Bean |
137 | 159 | protected BCryptPasswordEncoder passwordEncoder() { |
138 | 160 | return new BCryptPasswordEncoder(); |
139 | 161 | } |
140 | 162 | |
163 | + private ExecutorService tsCallBackExecutor; | |
164 | + | |
165 | + @PostConstruct | |
166 | + public void initExecutor() { | |
167 | + tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("sys-loader-ts-callback")); | |
168 | + } | |
169 | + | |
170 | + @PreDestroy | |
171 | + public void shutdownExecutor() { | |
172 | + if (tsCallBackExecutor != null) { | |
173 | + tsCallBackExecutor.shutdownNow(); | |
174 | + } | |
175 | + } | |
176 | + | |
141 | 177 | @Override |
142 | 178 | public void createSysAdmin() { |
143 | 179 | createUser(Authority.SYS_ADMIN, null, null, "sysadmin@thingsboard.org", "sysadmin"); |
... | ... | @@ -481,11 +517,62 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService { |
481 | 517 | device.setAdditionalInfo(additionalInfo); |
482 | 518 | } |
483 | 519 | device = deviceService.saveDevice(device); |
484 | - //TODO: No access to cluster service, so we should manually update the status of device. | |
520 | + save(device.getId(), ACTIVITY_STATE, false); | |
485 | 521 | DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(TenantId.SYS_TENANT_ID, device.getId()); |
486 | 522 | deviceCredentials.setCredentialsId(accessToken); |
487 | 523 | deviceCredentialsService.updateDeviceCredentials(TenantId.SYS_TENANT_ID, deviceCredentials); |
488 | 524 | return device; |
489 | 525 | } |
490 | 526 | |
527 | + private void save(DeviceId deviceId, String key, boolean value) { | |
528 | + if (persistActivityToTelemetry) { | |
529 | + ListenableFuture<Integer> saveFuture = tsService.save( | |
530 | + TenantId.SYS_TENANT_ID, | |
531 | + deviceId, | |
532 | + Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new BooleanDataEntry(key, value))), 0L); | |
533 | + addTsCallback(saveFuture, new TelemetrySaveCallback<>(deviceId, key, value)); | |
534 | + } else { | |
535 | + ListenableFuture<List<Void>> saveFuture = attributesService.save(TenantId.SYS_TENANT_ID, deviceId, DataConstants.SERVER_SCOPE, | |
536 | + Collections.singletonList(new BaseAttributeKvEntry(new BooleanDataEntry(key, value) | |
537 | + , System.currentTimeMillis()))); | |
538 | + addTsCallback(saveFuture, new TelemetrySaveCallback<>(deviceId, key, value)); | |
539 | + } | |
540 | + } | |
541 | + | |
542 | + private static class TelemetrySaveCallback<T> implements FutureCallback<T> { | |
543 | + private final DeviceId deviceId; | |
544 | + private final String key; | |
545 | + private final Object value; | |
546 | + | |
547 | + TelemetrySaveCallback(DeviceId deviceId, String key, Object value) { | |
548 | + this.deviceId = deviceId; | |
549 | + this.key = key; | |
550 | + this.value = value; | |
551 | + } | |
552 | + | |
553 | + @Override | |
554 | + public void onSuccess(@Nullable T result) { | |
555 | + log.trace("[{}] Successfully updated attribute [{}] with value [{}]", deviceId, key, value); | |
556 | + } | |
557 | + | |
558 | + @Override | |
559 | + public void onFailure(Throwable t) { | |
560 | + log.warn("[{}] Failed to update attribute [{}] with value [{}]", deviceId, key, value, t); | |
561 | + } | |
562 | + } | |
563 | + | |
564 | + private <S> void addTsCallback(ListenableFuture<S> saveFuture, final FutureCallback<S> callback) { | |
565 | + Futures.addCallback(saveFuture, new FutureCallback<S>() { | |
566 | + @Override | |
567 | + public void onSuccess(@Nullable S result) { | |
568 | + callback.onSuccess(result); | |
569 | + } | |
570 | + | |
571 | + @Override | |
572 | + public void onFailure(Throwable t) { | |
573 | + callback.onFailure(t); | |
574 | + } | |
575 | + }, tsCallBackExecutor); | |
576 | + } | |
577 | + | |
491 | 578 | } | ... | ... |