Commit dda61383933cac9aa6821a77ff9b19291e69db9f

Authored by Andrew Shvayka
Committed by GitHub
2 parents 96d20b07 a098bf0b

Merge pull request #5328 from ViacheslavKlimov/bulk-import-improvements

Concurrent bulk import processing
@@ -133,7 +133,7 @@ public class AssetController extends BaseController { @@ -133,7 +133,7 @@ public class AssetController extends BaseController {
133 133
134 Asset savedAsset = checkNotNull(assetService.saveAsset(asset)); 134 Asset savedAsset = checkNotNull(assetService.saveAsset(asset));
135 135
136 - onAssetCreatedOrUpdated(savedAsset, asset.getId() != null); 136 + onAssetCreatedOrUpdated(savedAsset, asset.getId() != null, getCurrentUser());
137 137
138 return savedAsset; 138 return savedAsset;
139 } catch (Exception e) { 139 } catch (Exception e) {
@@ -143,9 +143,9 @@ public class AssetController extends BaseController { @@ -143,9 +143,9 @@ public class AssetController extends BaseController {
143 } 143 }
144 } 144 }
145 145
146 - private void onAssetCreatedOrUpdated(Asset asset, boolean updated) { 146 + private void onAssetCreatedOrUpdated(Asset asset, boolean updated, SecurityUser user) {
147 try { 147 try {
148 - logEntityAction(asset.getId(), asset, 148 + logEntityAction(user, asset.getId(), asset,
149 asset.getCustomerId(), 149 asset.getCustomerId(),
150 updated ? ActionType.UPDATED : ActionType.ADDED, null); 150 updated ? ActionType.UPDATED : ActionType.ADDED, null);
151 } catch (ThingsboardException e) { 151 } catch (ThingsboardException e) {
@@ -648,8 +648,9 @@ public class AssetController extends BaseController { @@ -648,8 +648,9 @@ public class AssetController extends BaseController {
648 @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") 648 @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
649 @PostMapping("/asset/bulk_import") 649 @PostMapping("/asset/bulk_import")
650 public BulkImportResult<Asset> processAssetsBulkImport(@RequestBody BulkImportRequest request) throws Exception { 650 public BulkImportResult<Asset> processAssetsBulkImport(@RequestBody BulkImportRequest request) throws Exception {
651 - return assetBulkImportService.processBulkImport(request, getCurrentUser(), importedAssetInfo -> {  
652 - onAssetCreatedOrUpdated(importedAssetInfo.getEntity(), importedAssetInfo.isUpdated()); 651 + SecurityUser user = getCurrentUser();
  652 + return assetBulkImportService.processBulkImport(request, user, importedAssetInfo -> {
  653 + onAssetCreatedOrUpdated(importedAssetInfo.getEntity(), importedAssetInfo.isUpdated(), user);
653 }); 654 });
654 } 655 }
655 656
@@ -161,7 +161,7 @@ public class DeviceController extends BaseController { @@ -161,7 +161,7 @@ public class DeviceController extends BaseController {
161 161
162 Device savedDevice = checkNotNull(deviceService.saveDeviceWithAccessToken(device, accessToken)); 162 Device savedDevice = checkNotNull(deviceService.saveDeviceWithAccessToken(device, accessToken));
163 163
164 - onDeviceCreatedOrUpdated(savedDevice, oldDevice, !created); 164 + onDeviceCreatedOrUpdated(savedDevice, oldDevice, !created, getCurrentUser());
165 165
166 return savedDevice; 166 return savedDevice;
167 } catch (Exception e) { 167 } catch (Exception e) {
@@ -172,11 +172,11 @@ public class DeviceController extends BaseController { @@ -172,11 +172,11 @@ public class DeviceController extends BaseController {
172 172
173 } 173 }
174 174
175 - private void onDeviceCreatedOrUpdated(Device savedDevice, Device oldDevice, boolean updated) { 175 + private void onDeviceCreatedOrUpdated(Device savedDevice, Device oldDevice, boolean updated, SecurityUser user) {
176 tbClusterService.onDeviceUpdated(savedDevice, oldDevice); 176 tbClusterService.onDeviceUpdated(savedDevice, oldDevice);
177 177
178 try { 178 try {
179 - logEntityAction(savedDevice.getId(), savedDevice, 179 + logEntityAction(user, savedDevice.getId(), savedDevice,
180 savedDevice.getCustomerId(), 180 savedDevice.getCustomerId(),
181 updated ? ActionType.UPDATED : ActionType.ADDED, null); 181 updated ? ActionType.UPDATED : ActionType.ADDED, null);
182 } catch (ThingsboardException e) { 182 } catch (ThingsboardException e) {
@@ -941,8 +941,9 @@ public class DeviceController extends BaseController { @@ -941,8 +941,9 @@ public class DeviceController extends BaseController {
941 @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") 941 @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
942 @PostMapping("/device/bulk_import") 942 @PostMapping("/device/bulk_import")
943 public BulkImportResult<Device> processDevicesBulkImport(@RequestBody BulkImportRequest request) throws Exception { 943 public BulkImportResult<Device> processDevicesBulkImport(@RequestBody BulkImportRequest request) throws Exception {
944 - return deviceBulkImportService.processBulkImport(request, getCurrentUser(), importedDeviceInfo -> {  
945 - onDeviceCreatedOrUpdated(importedDeviceInfo.getEntity(), importedDeviceInfo.getOldEntity(), importedDeviceInfo.isUpdated()); 944 + SecurityUser user = getCurrentUser();
  945 + return deviceBulkImportService.processBulkImport(request, user, importedDeviceInfo -> {
  946 + onDeviceCreatedOrUpdated(importedDeviceInfo.getEntity(), importedDeviceInfo.getOldEntity(), importedDeviceInfo.isUpdated(), user);
946 }); 947 });
947 } 948 }
948 949
@@ -140,7 +140,7 @@ public class EdgeController extends BaseController { @@ -140,7 +140,7 @@ public class EdgeController extends BaseController {
140 edge.getId(), edge); 140 edge.getId(), edge);
141 141
142 Edge savedEdge = checkNotNull(edgeService.saveEdge(edge, true)); 142 Edge savedEdge = checkNotNull(edgeService.saveEdge(edge, true));
143 - onEdgeCreatedOrUpdated(tenantId, savedEdge, edgeTemplateRootRuleChain, !created); 143 + onEdgeCreatedOrUpdated(tenantId, savedEdge, edgeTemplateRootRuleChain, !created, getCurrentUser());
144 144
145 return savedEdge; 145 return savedEdge;
146 } catch (Exception e) { 146 } catch (Exception e) {
@@ -150,7 +150,7 @@ public class EdgeController extends BaseController { @@ -150,7 +150,7 @@ public class EdgeController extends BaseController {
150 } 150 }
151 } 151 }
152 152
153 - private void onEdgeCreatedOrUpdated(TenantId tenantId, Edge edge, RuleChain edgeTemplateRootRuleChain, boolean updated) throws IOException, ThingsboardException { 153 + private void onEdgeCreatedOrUpdated(TenantId tenantId, Edge edge, RuleChain edgeTemplateRootRuleChain, boolean updated, SecurityUser user) throws IOException, ThingsboardException {
154 if (!updated) { 154 if (!updated) {
155 ruleChainService.assignRuleChainToEdge(tenantId, edgeTemplateRootRuleChain.getId(), edge.getId()); 155 ruleChainService.assignRuleChainToEdge(tenantId, edgeTemplateRootRuleChain.getId(), edge.getId());
156 edgeNotificationService.setEdgeRootRuleChain(tenantId, edge, edgeTemplateRootRuleChain.getId()); 156 edgeNotificationService.setEdgeRootRuleChain(tenantId, edge, edgeTemplateRootRuleChain.getId());
@@ -160,7 +160,7 @@ public class EdgeController extends BaseController { @@ -160,7 +160,7 @@ public class EdgeController extends BaseController {
160 tbClusterService.broadcastEntityStateChangeEvent(edge.getTenantId(), edge.getId(), 160 tbClusterService.broadcastEntityStateChangeEvent(edge.getTenantId(), edge.getId(),
161 updated ? ComponentLifecycleEvent.UPDATED : ComponentLifecycleEvent.CREATED); 161 updated ? ComponentLifecycleEvent.UPDATED : ComponentLifecycleEvent.CREATED);
162 162
163 - logEntityAction(edge.getId(), edge, null, updated ? ActionType.UPDATED : ActionType.ADDED, null); 163 + logEntityAction(user, edge.getId(), edge, null, updated ? ActionType.UPDATED : ActionType.ADDED, null);
164 } 164 }
165 165
166 @PreAuthorize("hasAuthority('TENANT_ADMIN')") 166 @PreAuthorize("hasAuthority('TENANT_ADMIN')")
@@ -586,7 +586,7 @@ public class EdgeController extends BaseController { @@ -586,7 +586,7 @@ public class EdgeController extends BaseController {
586 586
587 return edgeBulkImportService.processBulkImport(request, user, importedAssetInfo -> { 587 return edgeBulkImportService.processBulkImport(request, user, importedAssetInfo -> {
588 try { 588 try {
589 - onEdgeCreatedOrUpdated(user.getTenantId(), importedAssetInfo.getEntity(), edgeTemplateRootRuleChain, importedAssetInfo.isUpdated()); 589 + onEdgeCreatedOrUpdated(user.getTenantId(), importedAssetInfo.getEntity(), edgeTemplateRootRuleChain, importedAssetInfo.isUpdated(), user);
590 } catch (Exception e) { 590 } catch (Exception e) {
591 throw new RuntimeException(e); 591 throw new RuntimeException(e);
592 } 592 }
@@ -63,6 +63,8 @@ import java.util.Map; @@ -63,6 +63,8 @@ import java.util.Map;
63 import java.util.Objects; 63 import java.util.Objects;
64 import java.util.Optional; 64 import java.util.Optional;
65 import java.util.Set; 65 import java.util.Set;
  66 +import java.util.concurrent.locks.Lock;
  67 +import java.util.concurrent.locks.ReentrantLock;
66 68
67 @Service 69 @Service
68 @TbCoreComponent 70 @TbCoreComponent
@@ -71,6 +73,8 @@ public class DeviceBulkImportService extends AbstractBulkImportService<Device> { @@ -71,6 +73,8 @@ public class DeviceBulkImportService extends AbstractBulkImportService<Device> {
71 protected final DeviceCredentialsService deviceCredentialsService; 73 protected final DeviceCredentialsService deviceCredentialsService;
72 protected final DeviceProfileService deviceProfileService; 74 protected final DeviceProfileService deviceProfileService;
73 75
  76 + private final Lock findOrCreateDeviceProfileLock = new ReentrantLock();
  77 +
74 public DeviceBulkImportService(TelemetrySubscriptionService tsSubscriptionService, TbTenantProfileCache tenantProfileCache, 78 public DeviceBulkImportService(TelemetrySubscriptionService tsSubscriptionService, TbTenantProfileCache tenantProfileCache,
75 AccessControlService accessControlService, AccessValidator accessValidator, 79 AccessControlService accessControlService, AccessValidator accessValidator,
76 EntityActionService entityActionService, TbClusterService clusterService, 80 EntityActionService entityActionService, TbClusterService clusterService,
@@ -106,9 +110,13 @@ public class DeviceBulkImportService extends AbstractBulkImportService<Device> { @@ -106,9 +110,13 @@ public class DeviceBulkImportService extends AbstractBulkImportService<Device> {
106 throw new DeviceCredentialsValidationException("Invalid device credentials: " + e.getMessage()); 110 throw new DeviceCredentialsValidationException("Invalid device credentials: " + e.getMessage());
107 } 111 }
108 112
  113 + DeviceProfile deviceProfile;
109 if (deviceCredentials.getCredentialsType() == DeviceCredentialsType.LWM2M_CREDENTIALS) { 114 if (deviceCredentials.getCredentialsType() == DeviceCredentialsType.LWM2M_CREDENTIALS) {
110 - setUpLwM2mDeviceProfile(user.getTenantId(), device); 115 + deviceProfile = setUpLwM2mDeviceProfile(user.getTenantId(), device);
  116 + } else {
  117 + deviceProfile = deviceProfileService.findOrCreateDeviceProfile(user.getTenantId(), device.getType());
111 } 118 }
  119 + device.setDeviceProfileId(deviceProfile.getId());
112 120
113 device = deviceService.saveDeviceWithCredentials(device, deviceCredentials); 121 device = deviceService.saveDeviceWithCredentials(device, deviceCredentials);
114 122
@@ -215,36 +223,43 @@ public class DeviceBulkImportService extends AbstractBulkImportService<Device> { @@ -215,36 +223,43 @@ public class DeviceBulkImportService extends AbstractBulkImportService<Device> {
215 credentials.setCredentialsValue(lwm2mCredentials.toString()); 223 credentials.setCredentialsValue(lwm2mCredentials.toString());
216 } 224 }
217 225
218 - private void setUpLwM2mDeviceProfile(TenantId tenantId, Device device) { 226 + private DeviceProfile setUpLwM2mDeviceProfile(TenantId tenantId, Device device) {
219 DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileByName(tenantId, device.getType()); 227 DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileByName(tenantId, device.getType());
220 if (deviceProfile != null) { 228 if (deviceProfile != null) {
221 if (deviceProfile.getTransportType() != DeviceTransportType.LWM2M) { 229 if (deviceProfile.getTransportType() != DeviceTransportType.LWM2M) {
222 deviceProfile.setTransportType(DeviceTransportType.LWM2M); 230 deviceProfile.setTransportType(DeviceTransportType.LWM2M);
223 deviceProfile.getProfileData().setTransportConfiguration(new Lwm2mDeviceProfileTransportConfiguration()); 231 deviceProfile.getProfileData().setTransportConfiguration(new Lwm2mDeviceProfileTransportConfiguration());
224 deviceProfile = deviceProfileService.saveDeviceProfile(deviceProfile); 232 deviceProfile = deviceProfileService.saveDeviceProfile(deviceProfile);
225 - device.setDeviceProfileId(deviceProfile.getId());  
226 } 233 }
227 } else { 234 } else {
228 - deviceProfile = new DeviceProfile();  
229 - deviceProfile.setTenantId(tenantId);  
230 - deviceProfile.setType(DeviceProfileType.DEFAULT);  
231 - deviceProfile.setName(device.getType());  
232 - deviceProfile.setTransportType(DeviceTransportType.LWM2M);  
233 - deviceProfile.setProvisionType(DeviceProfileProvisionType.DISABLED); 235 + findOrCreateDeviceProfileLock.lock();
  236 + try {
  237 + deviceProfile = deviceProfileService.findDeviceProfileByName(tenantId, device.getType());
  238 + if (deviceProfile == null) {
  239 + deviceProfile = new DeviceProfile();
  240 + deviceProfile.setTenantId(tenantId);
  241 + deviceProfile.setType(DeviceProfileType.DEFAULT);
  242 + deviceProfile.setName(device.getType());
  243 + deviceProfile.setTransportType(DeviceTransportType.LWM2M);
  244 + deviceProfile.setProvisionType(DeviceProfileProvisionType.DISABLED);
234 245
235 - DeviceProfileData deviceProfileData = new DeviceProfileData();  
236 - DefaultDeviceProfileConfiguration configuration = new DefaultDeviceProfileConfiguration();  
237 - DeviceProfileTransportConfiguration transportConfiguration = new Lwm2mDeviceProfileTransportConfiguration();  
238 - DisabledDeviceProfileProvisionConfiguration provisionConfiguration = new DisabledDeviceProfileProvisionConfiguration(null); 246 + DeviceProfileData deviceProfileData = new DeviceProfileData();
  247 + DefaultDeviceProfileConfiguration configuration = new DefaultDeviceProfileConfiguration();
  248 + DeviceProfileTransportConfiguration transportConfiguration = new Lwm2mDeviceProfileTransportConfiguration();
  249 + DisabledDeviceProfileProvisionConfiguration provisionConfiguration = new DisabledDeviceProfileProvisionConfiguration(null);
239 250
240 - deviceProfileData.setConfiguration(configuration);  
241 - deviceProfileData.setTransportConfiguration(transportConfiguration);  
242 - deviceProfileData.setProvisionConfiguration(provisionConfiguration);  
243 - deviceProfile.setProfileData(deviceProfileData); 251 + deviceProfileData.setConfiguration(configuration);
  252 + deviceProfileData.setTransportConfiguration(transportConfiguration);
  253 + deviceProfileData.setProvisionConfiguration(provisionConfiguration);
  254 + deviceProfile.setProfileData(deviceProfileData);
244 255
245 - deviceProfile = deviceProfileService.saveDeviceProfile(deviceProfile);  
246 - device.setDeviceProfileId(deviceProfile.getId()); 256 + deviceProfile = deviceProfileService.saveDeviceProfile(deviceProfile);
  257 + }
  258 + } finally {
  259 + findOrCreateDeviceProfileLock.unlock();
  260 + }
247 } 261 }
  262 + return deviceProfile;
248 } 263 }
249 264
250 private void setValues(ObjectNode objectNode, Map<BulkImportColumnType, String> data, Collection<BulkImportColumnType> columns) { 265 private void setValues(ObjectNode objectNode, Map<BulkImportColumnType, String> data, Collection<BulkImportColumnType> columns) {
@@ -22,6 +22,9 @@ import lombok.Data; @@ -22,6 +22,9 @@ import lombok.Data;
22 import lombok.RequiredArgsConstructor; 22 import lombok.RequiredArgsConstructor;
23 import lombok.SneakyThrows; 23 import lombok.SneakyThrows;
24 import org.apache.commons.lang3.StringUtils; 24 import org.apache.commons.lang3.StringUtils;
  25 +import org.apache.commons.lang3.exception.ExceptionUtils;
  26 +import org.thingsboard.common.util.DonAsynchron;
  27 +import org.thingsboard.common.util.ThingsBoardThreadFactory;
25 import org.thingsboard.server.cluster.TbClusterService; 28 import org.thingsboard.server.cluster.TbClusterService;
26 import org.thingsboard.server.common.data.BaseData; 29 import org.thingsboard.server.common.data.BaseData;
27 import org.thingsboard.server.common.data.TenantProfile; 30 import org.thingsboard.server.common.data.TenantProfile;
@@ -47,11 +50,16 @@ import org.thingsboard.server.utils.CsvUtils; @@ -47,11 +50,16 @@ import org.thingsboard.server.utils.CsvUtils;
47 import org.thingsboard.server.utils.TypeCastUtil; 50 import org.thingsboard.server.utils.TypeCastUtil;
48 51
49 import javax.annotation.Nullable; 52 import javax.annotation.Nullable;
  53 +import javax.annotation.PostConstruct;
  54 +import javax.annotation.PreDestroy;
50 import java.util.ArrayList; 55 import java.util.ArrayList;
51 import java.util.Arrays; 56 import java.util.Arrays;
52 import java.util.LinkedHashMap; 57 import java.util.LinkedHashMap;
53 import java.util.List; 58 import java.util.List;
54 import java.util.Map; 59 import java.util.Map;
  60 +import java.util.concurrent.CountDownLatch;
  61 +import java.util.concurrent.LinkedBlockingQueue;
  62 +import java.util.concurrent.ThreadPoolExecutor;
55 import java.util.concurrent.TimeUnit; 63 import java.util.concurrent.TimeUnit;
56 import java.util.concurrent.atomic.AtomicInteger; 64 import java.util.concurrent.atomic.AtomicInteger;
57 import java.util.function.Consumer; 65 import java.util.function.Consumer;
@@ -67,39 +75,49 @@ public abstract class AbstractBulkImportService<E extends BaseData<? extends Ent @@ -67,39 +75,49 @@ public abstract class AbstractBulkImportService<E extends BaseData<? extends Ent
67 protected final EntityActionService entityActionService; 75 protected final EntityActionService entityActionService;
68 protected final TbClusterService clusterService; 76 protected final TbClusterService clusterService;
69 77
70 - public final BulkImportResult<E> processBulkImport(BulkImportRequest request, SecurityUser user, Consumer<ImportedEntityInfo<E>> onEntityImported) throws Exception {  
71 - BulkImportResult<E> result = new BulkImportResult<>(); 78 + private static ThreadPoolExecutor executor;
72 79
73 - AtomicInteger i = new AtomicInteger(0);  
74 - if (request.getMapping().getHeader()) {  
75 - i.incrementAndGet(); 80 + @PostConstruct
  81 + private void initExecutor() {
  82 + if (executor == null) {
  83 + executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(),
  84 + 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(150_000),
  85 + ThingsBoardThreadFactory.forName("bulk-import"), new ThreadPoolExecutor.CallerRunsPolicy());
  86 + executor.allowCoreThreadTimeOut(true);
76 } 87 }
  88 + }
77 89
78 - parseData(request).forEach(entityData -> {  
79 - i.incrementAndGet();  
80 - try {  
81 - ImportedEntityInfo<E> importedEntityInfo = saveEntity(request, entityData.getFields(), user);  
82 - onEntityImported.accept(importedEntityInfo); 90 + public final BulkImportResult<E> processBulkImport(BulkImportRequest request, SecurityUser user, Consumer<ImportedEntityInfo<E>> onEntityImported) throws Exception {
  91 + List<EntityData> entitiesData = parseData(request);
83 92
84 - E entity = importedEntityInfo.getEntity(); 93 + BulkImportResult<E> result = new BulkImportResult<>();
  94 + CountDownLatch completionLatch = new CountDownLatch(entitiesData.size());
85 95
86 - saveKvs(user, entity, entityData.getKvs()); 96 + entitiesData.forEach(entityData -> DonAsynchron.submit(() -> {
  97 + ImportedEntityInfo<E> importedEntityInfo = saveEntity(request, entityData.getFields(), user);
  98 + E entity = importedEntityInfo.getEntity();
87 99
88 - if (importedEntityInfo.getRelatedError() != null) {  
89 - throw new RuntimeException(importedEntityInfo.getRelatedError());  
90 - } 100 + onEntityImported.accept(importedEntityInfo);
  101 + saveKvs(user, entity, entityData.getKvs());
91 102
92 - if (importedEntityInfo.isUpdated()) {  
93 - result.setUpdated(result.getUpdated() + 1);  
94 - } else {  
95 - result.setCreated(result.getCreated() + 1);  
96 - }  
97 - } catch (Exception e) {  
98 - result.setErrors(result.getErrors() + 1);  
99 - result.getErrorsList().add(String.format("Line %d: %s", i.get(), e.getMessage()));  
100 - }  
101 - }); 103 + return importedEntityInfo;
  104 + },
  105 + importedEntityInfo -> {
  106 + if (importedEntityInfo.isUpdated()) {
  107 + result.getUpdated().incrementAndGet();
  108 + } else {
  109 + result.getCreated().incrementAndGet();
  110 + }
  111 + completionLatch.countDown();
  112 + },
  113 + throwable -> {
  114 + result.getErrors().incrementAndGet();
  115 + result.getErrorsList().add(String.format("Line %d: %s", entityData.getLineNumber(), ExceptionUtils.getRootCauseMessage(throwable)));
  116 + completionLatch.countDown();
  117 + },
  118 + executor));
102 119
  120 + completionLatch.await();
103 return result; 121 return result;
104 } 122 }
105 123
@@ -186,8 +204,11 @@ public abstract class AbstractBulkImportService<E extends BaseData<? extends Ent @@ -186,8 +204,11 @@ public abstract class AbstractBulkImportService<E extends BaseData<? extends Ent
186 204
187 private List<EntityData> parseData(BulkImportRequest request) throws Exception { 205 private List<EntityData> parseData(BulkImportRequest request) throws Exception {
188 List<List<String>> records = CsvUtils.parseCsv(request.getFile(), request.getMapping().getDelimiter()); 206 List<List<String>> records = CsvUtils.parseCsv(request.getFile(), request.getMapping().getDelimiter());
  207 + AtomicInteger linesCounter = new AtomicInteger(0);
  208 +
189 if (request.getMapping().getHeader()) { 209 if (request.getMapping().getHeader()) {
190 records.remove(0); 210 records.remove(0);
  211 + linesCounter.incrementAndGet();
191 } 212 }
192 213
193 List<ColumnMapping> columnsMappings = request.getMapping().getColumns(); 214 List<ColumnMapping> columnsMappings = request.getMapping().getColumns();
@@ -205,15 +226,24 @@ public abstract class AbstractBulkImportService<E extends BaseData<? extends Ent @@ -205,15 +226,24 @@ public abstract class AbstractBulkImportService<E extends BaseData<? extends Ent
205 entityData.getKvs().put(entry.getKey(), new ParsedValue(castResult.getValue(), castResult.getKey())); 226 entityData.getKvs().put(entry.getKey(), new ParsedValue(castResult.getValue(), castResult.getKey()));
206 } 227 }
207 }); 228 });
  229 + entityData.setLineNumber(linesCounter.incrementAndGet());
208 return entityData; 230 return entityData;
209 }) 231 })
210 .collect(Collectors.toList()); 232 .collect(Collectors.toList());
211 } 233 }
212 234
  235 + @PreDestroy
  236 + private void shutdownExecutor() {
  237 + if (!executor.isTerminating()) {
  238 + executor.shutdown();
  239 + }
  240 + }
  241 +
213 @Data 242 @Data
214 protected static class EntityData { 243 protected static class EntityData {
215 private final Map<BulkImportColumnType, String> fields = new LinkedHashMap<>(); 244 private final Map<BulkImportColumnType, String> fields = new LinkedHashMap<>();
216 private final Map<ColumnMapping, ParsedValue> kvs = new LinkedHashMap<>(); 245 private final Map<ColumnMapping, ParsedValue> kvs = new LinkedHashMap<>();
  246 + private int lineNumber;
217 } 247 }
218 248
219 @Data 249 @Data
@@ -17,14 +17,14 @@ package org.thingsboard.server.service.importing; @@ -17,14 +17,14 @@ package org.thingsboard.server.service.importing;
17 17
18 import lombok.Data; 18 import lombok.Data;
19 19
20 -import java.util.LinkedList;  
21 -import java.util.List; 20 +import java.util.Collection;
  21 +import java.util.concurrent.ConcurrentLinkedDeque;
  22 +import java.util.concurrent.atomic.AtomicInteger;
22 23
23 @Data 24 @Data
24 public class BulkImportResult<E> { 25 public class BulkImportResult<E> {
25 - private int created = 0;  
26 - private int updated = 0;  
27 - private int errors = 0;  
28 - private List<String> errorsList = new LinkedList<>();  
29 - 26 + private AtomicInteger created = new AtomicInteger();
  27 + private AtomicInteger updated = new AtomicInteger();
  28 + private AtomicInteger errors = new AtomicInteger();
  29 + private Collection<String> errorsList = new ConcurrentLinkedDeque<>();
30 } 30 }
@@ -22,5 +22,4 @@ public class ImportedEntityInfo<E> { @@ -22,5 +22,4 @@ public class ImportedEntityInfo<E> {
22 private E entity; 22 private E entity;
23 private boolean isUpdated; 23 private boolean isUpdated;
24 private E oldEntity; 24 private E oldEntity;
25 - private String relatedError;  
26 } 25 }
@@ -20,6 +20,7 @@ import com.google.common.util.concurrent.Futures; @@ -20,6 +20,7 @@ import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture; 20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.MoreExecutors; 21 import com.google.common.util.concurrent.MoreExecutors;
22 22
  23 +import java.util.concurrent.Callable;
23 import java.util.concurrent.Executor; 24 import java.util.concurrent.Executor;
24 import java.util.function.Consumer; 25 import java.util.function.Consumer;
25 26
@@ -53,4 +54,15 @@ public class DonAsynchron { @@ -53,4 +54,15 @@ public class DonAsynchron {
53 Futures.addCallback(future, callback, MoreExecutors.directExecutor()); 54 Futures.addCallback(future, callback, MoreExecutors.directExecutor());
54 } 55 }
55 } 56 }
  57 +
  58 + public static <T> ListenableFuture<T> submit(Callable<T> task, Consumer<T> onSuccess, Consumer<Throwable> onFailure, Executor executor) {
  59 + return submit(task, onSuccess, onFailure, executor, null);
  60 + }
  61 +
  62 + public static <T> ListenableFuture<T> submit(Callable<T> task, Consumer<T> onSuccess, Consumer<Throwable> onFailure, Executor executor, Executor callbackExecutor) {
  63 + ListenableFuture<T> future = Futures.submit(task, executor);
  64 + withCallback(future, onSuccess, onFailure, callbackExecutor);
  65 + return future;
  66 + }
  67 +
56 } 68 }
@@ -35,6 +35,8 @@ public interface DeviceCredentialsDao extends Dao<DeviceCredentials> { @@ -35,6 +35,8 @@ public interface DeviceCredentialsDao extends Dao<DeviceCredentials> {
35 */ 35 */
36 DeviceCredentials save(TenantId tenantId, DeviceCredentials deviceCredentials); 36 DeviceCredentials save(TenantId tenantId, DeviceCredentials deviceCredentials);
37 37
  38 + DeviceCredentials saveAndFlush(TenantId tenantId, DeviceCredentials deviceCredentials);
  39 +
38 /** 40 /**
39 * Find device credentials by device id. 41 * Find device credentials by device id.
40 * 42 *
@@ -96,7 +96,7 @@ public class DeviceCredentialsServiceImpl extends AbstractEntityService implemen @@ -96,7 +96,7 @@ public class DeviceCredentialsServiceImpl extends AbstractEntityService implemen
96 log.trace("Executing updateDeviceCredentials [{}]", deviceCredentials); 96 log.trace("Executing updateDeviceCredentials [{}]", deviceCredentials);
97 credentialsValidator.validate(deviceCredentials, id -> tenantId); 97 credentialsValidator.validate(deviceCredentials, id -> tenantId);
98 try { 98 try {
99 - return deviceCredentialsDao.save(tenantId, deviceCredentials); 99 + return deviceCredentialsDao.saveAndFlush(tenantId, deviceCredentials);
100 } catch (Exception t) { 100 } catch (Exception t) {
101 ConstraintViolationException e = extractConstraintViolationException(t).orElse(null); 101 ConstraintViolationException e = extractConstraintViolationException(t).orElse(null);
102 if (e != null && e.getConstraintName() != null 102 if (e != null && e.getConstraintName() != null
@@ -30,6 +30,8 @@ public interface DeviceProfileDao extends Dao<DeviceProfile> { @@ -30,6 +30,8 @@ public interface DeviceProfileDao extends Dao<DeviceProfile> {
30 30
31 DeviceProfile save(TenantId tenantId, DeviceProfile deviceProfile); 31 DeviceProfile save(TenantId tenantId, DeviceProfile deviceProfile);
32 32
  33 + DeviceProfile saveAndFlush(TenantId tenantId, DeviceProfile deviceProfile);
  34 +
33 PageData<DeviceProfile> findDeviceProfiles(TenantId tenantId, PageLink pageLink); 35 PageData<DeviceProfile> findDeviceProfiles(TenantId tenantId, PageLink pageLink);
34 36
35 PageData<DeviceProfileInfo> findDeviceProfileInfos(TenantId tenantId, PageLink pageLink, String transportType); 37 PageData<DeviceProfileInfo> findDeviceProfileInfos(TenantId tenantId, PageLink pageLink, String transportType);
@@ -167,7 +167,7 @@ public class DeviceProfileServiceImpl extends AbstractEntityService implements D @@ -167,7 +167,7 @@ public class DeviceProfileServiceImpl extends AbstractEntityService implements D
167 } 167 }
168 DeviceProfile savedDeviceProfile; 168 DeviceProfile savedDeviceProfile;
169 try { 169 try {
170 - savedDeviceProfile = deviceProfileDao.save(deviceProfile.getTenantId(), deviceProfile); 170 + savedDeviceProfile = deviceProfileDao.saveAndFlush(deviceProfile.getTenantId(), deviceProfile);
171 } catch (Exception t) { 171 } catch (Exception t) {
172 ConstraintViolationException e = extractConstraintViolationException(t).orElse(null); 172 ConstraintViolationException e = extractConstraintViolationException(t).orElse(null);
173 if (e != null && e.getConstraintName() != null && e.getConstraintName().equalsIgnoreCase("device_profile_name_unq_key")) { 173 if (e != null && e.getConstraintName() != null && e.getConstraintName().equalsIgnoreCase("device_profile_name_unq_key")) {
@@ -15,7 +15,7 @@ @@ -15,7 +15,7 @@
15 */ 15 */
16 package org.thingsboard.server.dao.sql.device; 16 package org.thingsboard.server.dao.sql.device;
17 17
18 -import org.springframework.data.repository.CrudRepository; 18 +import org.springframework.data.jpa.repository.JpaRepository;
19 import org.thingsboard.server.dao.model.sql.DeviceCredentialsEntity; 19 import org.thingsboard.server.dao.model.sql.DeviceCredentialsEntity;
20 20
21 import java.util.UUID; 21 import java.util.UUID;
@@ -23,7 +23,7 @@ import java.util.UUID; @@ -23,7 +23,7 @@ import java.util.UUID;
23 /** 23 /**
24 * Created by Valerii Sosliuk on 5/6/2017. 24 * Created by Valerii Sosliuk on 5/6/2017.
25 */ 25 */
26 -public interface DeviceCredentialsRepository extends CrudRepository<DeviceCredentialsEntity, UUID> { 26 +public interface DeviceCredentialsRepository extends JpaRepository<DeviceCredentialsEntity, UUID> {
27 27
28 DeviceCredentialsEntity findByDeviceId(UUID deviceId); 28 DeviceCredentialsEntity findByDeviceId(UUID deviceId);
29 29
@@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sql.device; @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sql.device;
17 17
18 import org.springframework.data.domain.Page; 18 import org.springframework.data.domain.Page;
19 import org.springframework.data.domain.Pageable; 19 import org.springframework.data.domain.Pageable;
  20 +import org.springframework.data.jpa.repository.JpaRepository;
20 import org.springframework.data.jpa.repository.Query; 21 import org.springframework.data.jpa.repository.Query;
21 import org.springframework.data.repository.PagingAndSortingRepository; 22 import org.springframework.data.repository.PagingAndSortingRepository;
22 import org.springframework.data.repository.query.Param; 23 import org.springframework.data.repository.query.Param;
@@ -26,7 +27,7 @@ import org.thingsboard.server.dao.model.sql.DeviceProfileEntity; @@ -26,7 +27,7 @@ import org.thingsboard.server.dao.model.sql.DeviceProfileEntity;
26 27
27 import java.util.UUID; 28 import java.util.UUID;
28 29
29 -public interface DeviceProfileRepository extends PagingAndSortingRepository<DeviceProfileEntity, UUID> { 30 +public interface DeviceProfileRepository extends JpaRepository<DeviceProfileEntity, UUID> {
30 31
31 @Query("SELECT new org.thingsboard.server.common.data.DeviceProfileInfo(d.id, d.name, d.image, d.defaultDashboardId, d.type, d.transportType) " + 32 @Query("SELECT new org.thingsboard.server.common.data.DeviceProfileInfo(d.id, d.name, d.image, d.defaultDashboardId, d.type, d.transportType) " +
32 "FROM DeviceProfileEntity d " + 33 "FROM DeviceProfileEntity d " +
@@ -18,6 +18,7 @@ package org.thingsboard.server.dao.sql.device; @@ -18,6 +18,7 @@ package org.thingsboard.server.dao.sql.device;
18 import org.springframework.beans.factory.annotation.Autowired; 18 import org.springframework.beans.factory.annotation.Autowired;
19 import org.springframework.data.repository.CrudRepository; 19 import org.springframework.data.repository.CrudRepository;
20 import org.springframework.stereotype.Component; 20 import org.springframework.stereotype.Component;
  21 +import org.springframework.transaction.annotation.Transactional;
21 import org.thingsboard.server.common.data.id.TenantId; 22 import org.thingsboard.server.common.data.id.TenantId;
22 import org.thingsboard.server.common.data.security.DeviceCredentials; 23 import org.thingsboard.server.common.data.security.DeviceCredentials;
23 import org.thingsboard.server.dao.DaoUtil; 24 import org.thingsboard.server.dao.DaoUtil;
@@ -46,6 +47,14 @@ public class JpaDeviceCredentialsDao extends JpaAbstractDao<DeviceCredentialsEnt @@ -46,6 +47,14 @@ public class JpaDeviceCredentialsDao extends JpaAbstractDao<DeviceCredentialsEnt
46 return deviceCredentialsRepository; 47 return deviceCredentialsRepository;
47 } 48 }
48 49
  50 + @Transactional
  51 + @Override
  52 + public DeviceCredentials saveAndFlush(TenantId tenantId, DeviceCredentials deviceCredentials) {
  53 + DeviceCredentials result = save(tenantId, deviceCredentials);
  54 + deviceCredentialsRepository.flush();
  55 + return result;
  56 + }
  57 +
49 @Override 58 @Override
50 public DeviceCredentials findByDeviceId(TenantId tenantId, UUID deviceId) { 59 public DeviceCredentials findByDeviceId(TenantId tenantId, UUID deviceId) {
51 return DaoUtil.getData(deviceCredentialsRepository.findByDeviceId(deviceId)); 60 return DaoUtil.getData(deviceCredentialsRepository.findByDeviceId(deviceId));
@@ -19,6 +19,7 @@ import org.apache.commons.lang3.StringUtils; @@ -19,6 +19,7 @@ import org.apache.commons.lang3.StringUtils;
19 import org.springframework.beans.factory.annotation.Autowired; 19 import org.springframework.beans.factory.annotation.Autowired;
20 import org.springframework.data.repository.CrudRepository; 20 import org.springframework.data.repository.CrudRepository;
21 import org.springframework.stereotype.Component; 21 import org.springframework.stereotype.Component;
  22 +import org.springframework.transaction.annotation.Transactional;
22 import org.thingsboard.server.common.data.DeviceProfile; 23 import org.thingsboard.server.common.data.DeviceProfile;
23 import org.thingsboard.server.common.data.DeviceProfileInfo; 24 import org.thingsboard.server.common.data.DeviceProfileInfo;
24 import org.thingsboard.server.common.data.DeviceTransportType; 25 import org.thingsboard.server.common.data.DeviceTransportType;
@@ -54,6 +55,14 @@ public class JpaDeviceProfileDao extends JpaAbstractSearchTextDao<DeviceProfileE @@ -54,6 +55,14 @@ public class JpaDeviceProfileDao extends JpaAbstractSearchTextDao<DeviceProfileE
54 return deviceProfileRepository.findDeviceProfileInfoById(deviceProfileId); 55 return deviceProfileRepository.findDeviceProfileInfoById(deviceProfileId);
55 } 56 }
56 57
  58 + @Transactional
  59 + @Override
  60 + public DeviceProfile saveAndFlush(TenantId tenantId, DeviceProfile deviceProfile) {
  61 + DeviceProfile result = save(tenantId, deviceProfile);
  62 + deviceProfileRepository.flush();
  63 + return result;
  64 + }
  65 +
57 @Override 66 @Override
58 public PageData<DeviceProfile> findDeviceProfiles(TenantId tenantId, PageLink pageLink) { 67 public PageData<DeviceProfile> findDeviceProfiles(TenantId tenantId, PageLink pageLink) {
59 return DaoUtil.toPageData( 68 return DaoUtil.toPageData(