Commit d9c5135f4f86aa1fec6a45ef0f2eb325f6d2c542

Authored by Igor Kulikov
1 parent abac3098

Refactor Claim device service

... ... @@ -265,7 +265,8 @@ public class ActorSystemContext {
265 265 @Getter
266 266 private SmsSenderFactory smsSenderFactory;
267 267
268   - @Autowired
  268 + @Lazy
  269 + @Autowired(required = false)
269 270 @Getter
270 271 private ClaimDevicesService claimDevicesService;
271 272
... ...
application/src/main/java/org/thingsboard/server/service/device/ClaimDevicesServiceImpl.java renamed from dao/src/main/java/org/thingsboard/server/dao/device/ClaimDevicesServiceImpl.java
... ... @@ -13,12 +13,14 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -package org.thingsboard.server.dao.device;
  16 +package org.thingsboard.server.service.device;
17 17
18 18 import com.fasterxml.jackson.databind.ObjectMapper;
  19 +import com.google.common.util.concurrent.FutureCallback;
19 20 import com.google.common.util.concurrent.Futures;
20 21 import com.google.common.util.concurrent.ListenableFuture;
21 22 import com.google.common.util.concurrent.MoreExecutors;
  23 +import com.google.common.util.concurrent.SettableFuture;
22 24 import lombok.extern.slf4j.Slf4j;
23 25 import org.springframework.beans.factory.annotation.Autowired;
24 26 import org.springframework.beans.factory.annotation.Value;
... ... @@ -26,6 +28,7 @@ import org.springframework.cache.Cache;
26 28 import org.springframework.cache.CacheManager;
27 29 import org.springframework.stereotype.Service;
28 30 import org.springframework.util.StringUtils;
  31 +import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
29 32 import org.thingsboard.server.common.data.Customer;
30 33 import org.thingsboard.server.common.data.DataConstants;
31 34 import org.thingsboard.server.common.data.Device;
... ... @@ -37,12 +40,17 @@ import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
37 40 import org.thingsboard.server.common.data.kv.BooleanDataEntry;
38 41 import org.thingsboard.server.dao.attributes.AttributesService;
39 42 import org.thingsboard.server.dao.customer.CustomerService;
  43 +import org.thingsboard.server.dao.device.ClaimDataInfo;
  44 +import org.thingsboard.server.dao.device.ClaimDevicesService;
  45 +import org.thingsboard.server.dao.device.DeviceService;
40 46 import org.thingsboard.server.dao.device.claim.ClaimData;
41 47 import org.thingsboard.server.dao.device.claim.ClaimResponse;
42 48 import org.thingsboard.server.dao.device.claim.ClaimResult;
43 49 import org.thingsboard.server.dao.device.claim.ReclaimResult;
44 50 import org.thingsboard.server.dao.model.ModelConstants;
  51 +import org.thingsboard.server.queue.util.TbCoreComponent;
45 52
  53 +import javax.annotation.Nullable;
46 54 import java.io.IOException;
47 55 import java.util.Arrays;
48 56 import java.util.Collections;
... ... @@ -54,6 +62,7 @@ import static org.thingsboard.server.common.data.CacheConstants.CLAIM_DEVICES_CA
54 62
55 63 @Service
56 64 @Slf4j
  65 +@TbCoreComponent
57 66 public class ClaimDevicesServiceImpl implements ClaimDevicesService {
58 67
59 68 private static final String CLAIM_ATTRIBUTE_NAME = "claimingAllowed";
... ... @@ -65,6 +74,8 @@ public class ClaimDevicesServiceImpl implements ClaimDevicesService {
65 74 @Autowired
66 75 private AttributesService attributesService;
67 76 @Autowired
  77 + private RuleEngineTelemetryService telemetryService;
  78 + @Autowired
68 79 private CustomerService customerService;
69 80 @Autowired
70 81 private CacheManager cacheManager;
... ... @@ -172,10 +183,23 @@ public class ClaimDevicesServiceImpl implements ClaimDevicesService {
172 183 if (isAllowedClaimingByDefault) {
173 184 return Futures.immediateFuture(new ReclaimResult(unassignedCustomer));
174 185 }
175   - return Futures.transform(attributesService.save(
  186 + SettableFuture<ReclaimResult> result = SettableFuture.create();
  187 + telemetryService.saveAndNotify(
176 188 tenantId, device.getId(), DataConstants.SERVER_SCOPE, Collections.singletonList(
177 189 new BaseAttributeKvEntry(new BooleanDataEntry(CLAIM_ATTRIBUTE_NAME, true), System.currentTimeMillis())
178   - )), result -> new ReclaimResult(unassignedCustomer), MoreExecutors.directExecutor());
  190 + ),
  191 + new FutureCallback<>() {
  192 + @Override
  193 + public void onSuccess(@Nullable Void tmp) {
  194 + result.set(new ReclaimResult(unassignedCustomer));
  195 + }
  196 +
  197 + @Override
  198 + public void onFailure(Throwable t) {
  199 + result.setException(t);
  200 + }
  201 + });
  202 + return result;
179 203 }
180 204 cacheEviction(device.getId());
181 205 return Futures.immediateFuture(new ReclaimResult(null));
... ... @@ -198,12 +222,24 @@ public class ClaimDevicesServiceImpl implements ClaimDevicesService {
198 222 return systemDurationMs;
199 223 }
200 224
201   - private ListenableFuture<List<Void>> removeClaimingSavedData(Cache cache, ClaimDataInfo data, Device device) {
  225 + private ListenableFuture<Void> removeClaimingSavedData(Cache cache, ClaimDataInfo data, Device device) {
202 226 if (data.isFromCache()) {
203 227 cache.evict(data.getKey());
204 228 }
205   - return attributesService.removeAll(device.getTenantId(),
206   - device.getId(), DataConstants.SERVER_SCOPE, Arrays.asList(CLAIM_ATTRIBUTE_NAME, CLAIM_DATA_ATTRIBUTE_NAME));
  229 + SettableFuture<Void> result = SettableFuture.create();
  230 + telemetryService.deleteAndNotify(device.getTenantId(),
  231 + device.getId(), DataConstants.SERVER_SCOPE, Arrays.asList(CLAIM_ATTRIBUTE_NAME, CLAIM_DATA_ATTRIBUTE_NAME), new FutureCallback<>() {
  232 + @Override
  233 + public void onSuccess(@Nullable Void tmp) {
  234 + result.set(tmp);
  235 + }
  236 +
  237 + @Override
  238 + public void onFailure(Throwable t) {
  239 + result.setException(t);
  240 + }
  241 + });
  242 + return result;
207 243 }
208 244
209 245 private void cacheEviction(DeviceId deviceId) {
... ...