Commit 6c83c00115c3900051ab3d486407c62b50d4130b

Authored by xp.Huang
1 parent 80c8e60f

fix: 完善任务中心的命令下发

... ... @@ -518,4 +518,13 @@ public class TkDeviceController extends BaseController {
518 518 return ResponseEntity.ok(
519 519 tkdeviceService.getDeviceAttributes(deviceProfileId, tenantId, dataType));
520 520 }
  521 +
  522 + @PostMapping("get/devices")
  523 + @ApiOperation("通过设备ID列表获取设备信息")
  524 + public ResponseResult<List<DeviceDTO>> findDeviceInfoByTbDeviceIds(
  525 + @RequestBody List<String> tbDeviceIds) throws ThingsboardException {
  526 + return ResponseResult.success(
  527 + tkdeviceService.findDeviceInfoByTbDeviceIds(
  528 + tbDeviceIds, getCurrentUser().getCurrentTenantId()));
  529 + }
521 530 }
... ...
... ... @@ -14,6 +14,7 @@ import org.thingsboard.server.common.data.yunteng.common.UpdateGroup;
14 14 import org.thingsboard.server.common.data.yunteng.core.exception.TkDataValidationException;
15 15 import org.thingsboard.server.common.data.yunteng.core.message.ErrorMessage;
16 16 import org.thingsboard.server.common.data.yunteng.dto.DeleteDTO;
  17 +import org.thingsboard.server.common.data.yunteng.dto.task.TaskImmediateExecuteDTO;
17 18 import org.thingsboard.server.common.data.yunteng.dto.task.TkTaskCenterDTO;
18 19 import org.thingsboard.server.common.data.yunteng.enums.OrderTypeEnum;
19 20 import org.thingsboard.server.common.data.yunteng.enums.TargetTypeEnum;
... ... @@ -126,6 +127,18 @@ public class TkTaskCenterController extends BaseController {
126 127 taskId, tbDeviceId, getCurrentUser().getCurrentTenantId(), allow));
127 128 }
128 129
  130 + @PostMapping("/immediate/execute")
  131 + @PreAuthorize(
  132 + "@check.checkPermissions({'TENANT_ADMIN','CUSTOMER_USER'},{'api:yt:task_center:immediate:execute'})")
  133 + @ApiOperation(value = "立即执行任务中心任务")
  134 + public ResponseResult<Boolean> immediateExecute(
  135 + @Validated @RequestBody TaskImmediateExecuteDTO immediateExecuteDTO)
  136 + throws SchedulerException, ThingsboardException {
  137 + return ResponseResult.success(
  138 + tkTaskCenterService.immediateExecute(
  139 + immediateExecuteDTO, getCurrentUser().getCurrentTenantId()));
  140 + }
  141 +
129 142 private TkTaskCenterDTO saveOrUpdate(TkTaskCenterDTO tkTaskCenterDTO)
130 143 throws ThingsboardException, SchedulerException {
131 144 tkTaskCenterDTO.setTenantId(getCurrentUser().getCurrentTenantId());
... ...
1 1 package org.thingsboard.server.utils.yunteng;
2 2
3 3 import com.fasterxml.jackson.databind.JsonNode;
  4 +import com.fasterxml.jackson.databind.node.ObjectNode;
4 5 import com.google.common.util.concurrent.FutureCallback;
5 6 import com.google.common.util.concurrent.Futures;
6 7 import com.google.common.util.concurrent.ListenableFuture;
... ... @@ -16,13 +17,18 @@ import org.thingsboard.server.common.data.id.DeviceId;
16 17 import org.thingsboard.server.common.data.id.TenantId;
17 18 import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
18 19 import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;
  20 +import org.thingsboard.server.common.data.yunteng.constant.ModelConstants;
19 21 import org.thingsboard.server.common.data.yunteng.core.cache.CacheUtils;
20 22 import org.thingsboard.server.common.data.yunteng.dto.DeviceDTO;
21 23 import org.thingsboard.server.common.data.yunteng.dto.task.TargetContentDTO;
  24 +import org.thingsboard.server.common.data.yunteng.dto.task.TaskImmediateExecuteDTO;
  25 +import org.thingsboard.server.common.data.yunteng.dto.task.TkDeviceTaskCenterDTO;
22 26 import org.thingsboard.server.common.data.yunteng.dto.task.TkTaskCenterDTO;
  27 +import org.thingsboard.server.common.data.yunteng.enums.CmdTypeEnum;
23 28 import org.thingsboard.server.common.data.yunteng.enums.TargetTypeEnum;
24 29 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
25 30 import org.thingsboard.server.dao.yunteng.service.TkDeviceService;
  31 +import org.thingsboard.server.dao.yunteng.service.TkDeviceTaskCenterService;
26 32 import org.thingsboard.server.dao.yunteng.service.TkTaskCenterService;
27 33 import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
28 34 import org.thingsboard.server.service.security.model.SecurityUser;
... ... @@ -44,6 +50,8 @@ public class RpcCommandTask {
44 50 private final TbCoreDeviceRpcService tbCoreDeviceRpcService;
45 51 private final TkDeviceService tkDeviceService;
46 52 private final CacheUtils cacheUtils;
  53 + private final TkDeviceTaskCenterService tkDeviceTaskCenterService;
  54 + private static final String cacheName = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS;
47 55
48 56 public void process(String taskCenterId) {
49 57 // 通过任务中心ID查询执行命令及执行对象
... ... @@ -59,24 +67,34 @@ public class RpcCommandTask {
59 67 TargetContentDTO targetContent = tkTaskCenterDTO.getExecuteTarget();
60 68 JsonNode cmdJsonNode = tkTaskCenterDTO.getExecuteContent().getPushContent();
61 69 SecurityUser securityUser = new SecurityUser();
  70 + String taskCenterId = tkTaskCenterDTO.getId();
  71 +
  72 + String immediateExecuteKey = FastIotConstants.CacheConfigKey.TASK_IMMEDIATE_EXECUTE + "_" + taskCenterId;
  73 + Optional<TaskImmediateExecuteDTO> immediateExecuteDTO = cacheUtils.get(cacheName,immediateExecuteKey);
  74 + List<String> data = targetContent.getData();
  75 + //如果从缓存里面获取了任务的立即执行,则以缓存里面的数据作为执行依据
  76 + if(immediateExecuteDTO.isPresent()){
  77 + TaskImmediateExecuteDTO immediateExecute = immediateExecuteDTO.get();
  78 + data = immediateExecute.getTargetIds();
  79 + targetType = immediateExecute.getExecuteTarget();
  80 + }
  81 +
62 82 // send cmd
63   - if (targetContent.getData() != null && null != cmdJsonNode) {
  83 + if (data != null && null != cmdJsonNode) {
64 84 String tenantId = tkTaskCenterDTO.getTenantId();
65   - String cacheName = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS;
66 85 String key =
67   - FastIotConstants.CacheConfigKey.TASK_CENTER_EXECUTE_TIME
68   - + "_"
69   - + tkTaskCenterDTO.getId();
  86 + FastIotConstants.CacheConfigKey.TASK_CENTER_EXECUTE_TIME + "_" + taskCenterId;
70 87 cacheUtils.put(
71 88 cacheName,
72 89 key,
73 90 LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
74 91 if (targetType.equals(TargetTypeEnum.DEVICES)) {
75   - for (String deviceId : targetContent.getData()) {
76   - sendRpcCommand(cmdJsonNode, tenantId, deviceId, securityUser);
  92 + for (String deviceId : data) {
  93 + sendRpcCommand(cmdJsonNode, tenantId, deviceId, securityUser, taskCenterId);
77 94 }
78 95 } else {
79   - sendRpcCommandByProducts(targetContent, cmdJsonNode, tenantId, securityUser);
  96 + sendRpcCommandByProducts(
  97 + targetContent, cmdJsonNode, tenantId, securityUser, taskCenterId);
80 98 }
81 99 }
82 100 }
... ... @@ -92,7 +110,8 @@ public class RpcCommandTask {
92 110 TargetContentDTO targetContent,
93 111 JsonNode cmdJsonNode,
94 112 String tenantId,
95   - SecurityUser securityUser) {
  113 + SecurityUser securityUser,
  114 + String taskCenterId) {
96 115 for (String deviceProfileId : targetContent.getData()) {
97 116 Futures.addCallback(
98 117 tkDeviceService.findDeviceListByDeviceProfileId(deviceProfileId, tenantId),
... ... @@ -112,7 +131,8 @@ public class RpcCommandTask {
112 131 }
113 132 }
114 133 if (needSendCommand) {
115   - sendRpcCommand(cmdJsonNode, tenantId, dto.getTbDeviceId(), securityUser);
  134 + sendRpcCommand(
  135 + cmdJsonNode, tenantId, dto.getTbDeviceId(), securityUser, taskCenterId);
116 136 }
117 137 }
118 138 }
... ... @@ -126,11 +146,18 @@ public class RpcCommandTask {
126 146 }
127 147
128 148 private void sendRpcCommand(
129   - JsonNode cmdJsonNode, String tenantId, String originateId, SecurityUser securityUser) {
  149 + JsonNode cmdJsonNode,
  150 + String tenantId,
  151 + String originateId,
  152 + SecurityUser securityUser,
  153 + String taskCenterId) {
130 154 JsonNode rpcCommand = cmdJsonNode.get(FastIotConstants.RPC_COMMAND);
131 155 ToDeviceRpcRequestBody body =
132 156 new ToDeviceRpcRequestBody("methodThingskit", JacksonUtil.toString(rpcCommand));
133 157 DeviceId deviceId = new DeviceId(UUID.fromString(originateId));
  158 + ObjectNode objectNode = JacksonUtil.newObjectNode();
  159 + objectNode.put(
  160 + ModelConstants.TablePropertyMapping.COMMAND_TYPE, CmdTypeEnum.TASK_CENTER.ordinal());
134 161 ToDeviceRpcRequest request =
135 162 new ToDeviceRpcRequest(
136 163 UUID.randomUUID(),
... ... @@ -141,12 +168,26 @@ public class RpcCommandTask {
141 168 body,
142 169 true,
143 170 null,
144   - null);
  171 + JacksonUtil.toString(objectNode));
145 172 tbCoreDeviceRpcService.processRestApiRpcRequest(
146 173 request,
147 174 fromDeviceRpcResponse -> {
148 175 log.trace("Device renamed RPC with id: [{}] ", request.getId());
149 176 },
150 177 securityUser);
  178 + // 更新设备与任务中心的执行时间
  179 + TkDeviceTaskCenterDTO tkDeviceTaskCenterDTO =
  180 + tkDeviceTaskCenterService.findInfoByDeviceIdAndTaskCenterId(
  181 + tenantId, originateId, taskCenterId);
  182 + if (null != tkDeviceTaskCenterDTO) {
  183 + LocalDateTime localDateTime = LocalDateTime.now();
  184 + Long executeTime = localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
  185 + tkDeviceTaskCenterDTO.setLastExecuteTime(localDateTime);
  186 + tkDeviceTaskCenterService.saveOrUpdate(tkDeviceTaskCenterDTO);
  187 + // 将设备执行时间存入缓存
  188 + String key =
  189 + FastIotConstants.CacheConfigKey.TASK_CENTER_DEVICE_EXECUTE_TIME + "_" + originateId;
  190 + cacheUtils.put(cacheName, key, executeTime);
  191 + }
151 192 }
152 193 }
... ...
... ... @@ -20,7 +20,6 @@ public interface FastIotConstants {
20 20 Pattern CHINA_MOBILE_PATTERN = Pattern.compile(MOBILE);
21 21 String DEFAULT_DELIMITER = "#";
22 22 String RPC_COMMAND = "rpcCommand";
23   -
24 23 interface CacheConfigKey {
25 24 String CACHE_CONFIG_KEY = "yunTengIotCache";
26 25 String USER_PERMISSION_PREFIX = "userPermissionFor_";
... ... @@ -29,6 +28,8 @@ public interface FastIotConstants {
29 28 String PUBLIC_ID = "publicId_";
30 29 String TASK_CENTER_INFOS = "taskCenterInfos";
31 30 String TASK_CENTER_EXECUTE_TIME = "taskCenterExecuteTime";
  31 + String TASK_CENTER_DEVICE_EXECUTE_TIME = "taskCenterDeviceExecuteTime";
  32 + String TASK_IMMEDIATE_EXECUTE = "taskImmediateExecute";
32 33 }
33 34
34 35 interface TBCacheConfig {
... ...
  1 +package org.thingsboard.server.common.data.yunteng.dto.task;
  2 +
  3 +import io.swagger.annotations.ApiModelProperty;
  4 +import lombok.Data;
  5 +import org.thingsboard.server.common.data.yunteng.enums.TargetTypeEnum;
  6 +
  7 +import javax.validation.constraints.NotEmpty;
  8 +import javax.validation.constraints.NotNull;
  9 +import java.io.Serializable;
  10 +import java.util.List;
  11 +
  12 +@Data
  13 +public class TaskImmediateExecuteDTO implements Serializable {
  14 +
  15 + private static final long serialVersionUID = -6249425147049039746L;
  16 +
  17 + @NotEmpty(message = "任务中心ID不能为空或空字符串")
  18 + @ApiModelProperty(value = "任务中心任务ID")
  19 + private String id;
  20 +
  21 + @ApiModelProperty(value = "执行类型:设备/产品")
  22 + private TargetTypeEnum executeTarget = TargetTypeEnum.DEVICES;
  23 +
  24 + @NotEmpty(message = "任务中心ID不能为空或空字符串")
  25 + @ApiModelProperty(value = "corn表达式")
  26 + private String cronExpression;
  27 +
  28 + @NotEmpty(message = "任务中心名称不能为空或空字符串")
  29 + @ApiModelProperty(value = "任务中心名称")
  30 + private String name;
  31 +
  32 + @NotNull(message = "执行列表不能为空")
  33 + @ApiModelProperty(value = "执行列表")
  34 + private List<String> targetIds;
  35 +}
... ...
... ... @@ -2,8 +2,12 @@ package org.thingsboard.server.common.data.yunteng.dto.task;
2 2
3 3 import io.swagger.annotations.ApiModelProperty;
4 4 import lombok.Data;
  5 +import lombok.EqualsAndHashCode;
5 6 import org.thingsboard.server.common.data.yunteng.dto.BaseDTO;
6 7
  8 +import java.time.LocalDateTime;
  9 +
  10 +@EqualsAndHashCode(callSuper = true)
7 11 @Data
8 12 public class TkDeviceTaskCenterDTO extends BaseDTO {
9 13 @ApiModelProperty(value = "设备ID")
... ... @@ -15,6 +19,9 @@ public class TkDeviceTaskCenterDTO extends BaseDTO {
15 19 @ApiModelProperty(value = "是否允许执行:0不执行 1允许执行")
16 20 private Integer allowState;
17 21
  22 + @ApiModelProperty(value = "最新执行时间")
  23 + private LocalDateTime lastExecuteTime;
  24 +
18 25 @ApiModelProperty(value = "租户ID")
19 26 private String tenantId;
20 27 }
... ...
... ... @@ -7,5 +7,7 @@ public enum CmdTypeEnum {
7 7 //自定义
8 8 DIY,
9 9 //服务
10   - SERVICE
  10 + SERVICE,
  11 + //任务中心
  12 + TASK_CENTER
11 13 }
... ...
... ... @@ -5,15 +5,14 @@ import lombok.Data;
5 5 import lombok.EqualsAndHashCode;
6 6 import org.thingsboard.server.common.data.yunteng.constant.ModelConstants;
7 7
  8 +import java.time.LocalDateTime;
  9 +
8 10 @EqualsAndHashCode(callSuper = true)
9 11 @Data
10 12 @TableName(value = ModelConstants.Table.TK_DEVICE_TASK_CENTER_NAME)
11 13 public class TkDeviceTaskCenterEntity extends TenantBaseEntity {
12   -
13   - private static final long serialVersionUID = -5425391296429906856L;
14 14 private String tbDeviceId;
15   -
16 15 private String taskCenterId;
17   -
  16 + private LocalDateTime lastExecuteTime;
18 17 private Integer allowState;
19 18 }
... ...
... ... @@ -18,6 +18,7 @@ import org.thingsboard.server.common.data.yunteng.core.cache.CacheUtils;
18 18 import org.thingsboard.server.common.data.yunteng.core.exception.TkDataValidationException;
19 19 import org.thingsboard.server.common.data.yunteng.core.message.ErrorMessage;
20 20 import org.thingsboard.server.common.data.yunteng.dto.*;
  21 +import org.thingsboard.server.common.data.yunteng.dto.task.TkDeviceTaskCenterDTO;
21 22 import org.thingsboard.server.common.data.yunteng.enums.JobGroupEnum;
22 23 import org.thingsboard.server.common.data.yunteng.enums.StatusEnum;
23 24 import org.thingsboard.server.common.data.yunteng.utils.tools.TkPageData;
... ... @@ -29,6 +30,7 @@ import org.thingsboard.server.dao.yunteng.mapper.SysJobMapper;
29 30 import org.thingsboard.server.dao.yunteng.service.*;
30 31
31 32 import javax.annotation.PostConstruct;
  33 +import java.time.ZoneOffset;
32 34 import java.util.*;
33 35 import java.util.stream.Collectors;
34 36
... ... @@ -40,6 +42,8 @@ public class SysJobServiceImpl extends AbstractBaseService<SysJobMapper, SysJobE
40 42 private final Scheduler scheduler;
41 43 private final SysJobLogServiceImpl sysJobLogService;
42 44 private final CacheUtils cacheUtils;
  45 + private final TkDeviceTaskCenterService tkDeviceTaskCenterService;
  46 + private static final String cacheName = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS;
43 47 /** 项目启动时,初始化定时器 主要是防止手动修改数据库导致未同步到定时任务处理(注:不能手动修改数据库ID和任务组名,否则会导致脏数据) */
44 48 @PostConstruct
45 49 public void init() throws SchedulerException {
... ... @@ -49,12 +53,13 @@ public class SysJobServiceImpl extends AbstractBaseService<SysJobMapper, SysJobE
49 53 new ArrayList<>(baseMapper.selectList(new LambdaQueryWrapper<>()));
50 54 List<SysJobEntity> enableJobs = new ArrayList<>();
51 55 for (SysJobEntity job : jobList) {
52   - if (Objects.equals(job.getStatus(),StatusEnum.ENABLE.getIndex())) {
  56 + if (Objects.equals(job.getStatus(), StatusEnum.ENABLE.getIndex())) {
53 57 enableJobs.add(job);
54 58 }
55 59 if (Objects.equals(job.getJobGroup(), JobGroupEnum.TASK_CENTER.name())) {
56   - // 查询任务中心的最新执行时间放在缓存里面
57   - queryJobLastExecuteTime(job.getSourceId(), job.getId(), job.getJobGroup());
  60 + // 查询设备任务中心映射表的最新执行时间放入缓存
  61 + queryDeviceTaskCenterLastExecuteTime(
  62 + job.getTenantId(), job.getSourceId(), job.getId(), job.getJobGroup());
58 63 }
59 64 }
60 65 for (SysJobEntity job : enableJobs) {
... ... @@ -261,6 +266,7 @@ public class SysJobServiceImpl extends AbstractBaseService<SysJobMapper, SysJobE
261 266 SysJobEntity job = jobDTO.getEntity(SysJobEntity.class);
262 267 baseMapper.insert(job);
263 268 ScheduleUtils.createScheduleJob(scheduler, job);
  269 + jobDTO.setId(job.getId());
264 270 } else {
265 271 SysJobEntity job = baseMapper.selectById(jobDTO.getId());
266 272 SysJobEntity newJob = jobDTO.getEntity(SysJobEntity.class);
... ... @@ -318,7 +324,6 @@ public class SysJobServiceImpl extends AbstractBaseService<SysJobMapper, SysJobE
318 324 new FutureCallback<>() {
319 325 @Override
320 326 public void onSuccess(@Nullable Long executeTime) {
321   - String cacheName = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS;
322 327 String key = FastIotConstants.CacheConfigKey.TASK_CENTER_EXECUTE_TIME + "_" + sourceId;
323 328 cacheUtils.put(cacheName, key, executeTime);
324 329 }
... ... @@ -328,4 +333,38 @@ public class SysJobServiceImpl extends AbstractBaseService<SysJobMapper, SysJobE
328 333 },
329 334 MoreExecutors.directExecutor());
330 335 }
  336 +
  337 + private void queryDeviceTaskCenterLastExecuteTime(
  338 + String tenantId, String sourceId, String jobId, String JobGroup) {
  339 + Futures.addCallback(
  340 + tkDeviceTaskCenterService.findInfosByTaskCenterId(tenantId, sourceId),
  341 + new FutureCallback<>() {
  342 + @Override
  343 + public void onSuccess(@Nullable List<TkDeviceTaskCenterDTO> tkDeviceTaskCenterDTOS) {
  344 + if (null != tkDeviceTaskCenterDTOS && !tkDeviceTaskCenterDTOS.isEmpty()) {
  345 + // 任务还存在,才查询任务中心的最新执行时间放入缓存
  346 + queryJobLastExecuteTime(sourceId, jobId, JobGroup);
  347 + for (TkDeviceTaskCenterDTO tkDeviceTaskCenterDTO : tkDeviceTaskCenterDTOS) {
  348 + String key =
  349 + FastIotConstants.CacheConfigKey.TASK_CENTER_DEVICE_EXECUTE_TIME
  350 + + "_"
  351 + + tkDeviceTaskCenterDTO.getTbDeviceId();
  352 + if (null != tkDeviceTaskCenterDTO.getLastExecuteTime()) {
  353 + cacheUtils.put(
  354 + cacheName,
  355 + key,
  356 + tkDeviceTaskCenterDTO
  357 + .getLastExecuteTime()
  358 + .toInstant(ZoneOffset.of("+8"))
  359 + .toEpochMilli());
  360 + }
  361 + }
  362 + }
  363 + }
  364 +
  365 + @Override
  366 + public void onFailure(@NotNull Throwable throwable) {}
  367 + },
  368 + MoreExecutors.directExecutor());
  369 + }
331 370 }
... ...
... ... @@ -723,4 +723,19 @@ public class TkDeviceServiceImpl extends AbstractBaseService<DeviceMapper, TkDev
723 723 .map(list -> list.stream().map(TkDeviceEntity::getTbDeviceId).collect(Collectors.toList()))
724 724 .orElse(null);
725 725 }
  726 +
  727 + @Override
  728 + public List<DeviceDTO> findDeviceInfoByTbDeviceIds(List<String> tbDeviceIds, String tenantId) {
  729 + if (StringUtils.isEmpty(tenantId) || null == tbDeviceIds || tbDeviceIds.isEmpty()) {
  730 + throw new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage());
  731 + }
  732 + return baseMapper
  733 + .selectList(
  734 + new LambdaQueryWrapper<TkDeviceEntity>()
  735 + .eq(TkDeviceEntity::getTenantId, tenantId)
  736 + .in(TkDeviceEntity::getTbDeviceId, tbDeviceIds))
  737 + .stream()
  738 + .map(entity -> entity.getDTO(DeviceDTO.class))
  739 + .collect(Collectors.toList());
  740 + }
726 741 }
... ...
1 1 package org.thingsboard.server.dao.yunteng.impl;
2 2
3 3 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  4 +import com.google.common.util.concurrent.Futures;
  5 +import com.google.common.util.concurrent.ListenableFuture;
4 6 import lombok.RequiredArgsConstructor;
  7 +import lombok.extern.slf4j.Slf4j;
5 8 import org.springframework.stereotype.Service;
6 9 import org.springframework.transaction.annotation.Transactional;
7 10 import org.thingsboard.server.common.data.StringUtils;
8 11 import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;
9   -import org.thingsboard.server.common.data.yunteng.dto.DeleteDTO;
  12 +import org.thingsboard.server.common.data.yunteng.core.cache.CacheUtils;
  13 +import org.thingsboard.server.common.data.yunteng.core.exception.TkDataValidationException;
  14 +import org.thingsboard.server.common.data.yunteng.core.message.ErrorMessage;
10 15 import org.thingsboard.server.common.data.yunteng.dto.task.TkDeviceTaskCenterDTO;
11 16 import org.thingsboard.server.dao.yunteng.entities.TkDeviceTaskCenterEntity;
12 17 import org.thingsboard.server.dao.yunteng.mapper.TkDeviceTaskCenterMapper;
13 18 import org.thingsboard.server.dao.yunteng.service.AbstractBaseService;
14 19 import org.thingsboard.server.dao.yunteng.service.TkDeviceTaskCenterService;
  20 +
15 21 import java.util.List;
  22 +import java.util.Optional;
  23 +import java.util.stream.Collectors;
16 24
17 25 @Service
18 26 @RequiredArgsConstructor
  27 +@Slf4j
19 28 public class TkDeviceTaskCenterServiceImpl
20 29 extends AbstractBaseService<TkDeviceTaskCenterMapper, TkDeviceTaskCenterEntity>
21 30 implements TkDeviceTaskCenterService {
  31 + private final CacheUtils cacheUtils;
  32 +
22 33 @Override
23 34 @Transactional
24 35 public TkDeviceTaskCenterDTO saveOrUpdate(TkDeviceTaskCenterDTO tkDeviceTaskCenterDTO) {
... ... @@ -34,11 +45,6 @@ public class TkDeviceTaskCenterServiceImpl
34 45
35 46 @Override
36 47 @Transactional
37   - public boolean delete(DeleteDTO delete) {
38   - return baseMapper.deleteBatchIds(delete.getIds()) > FastIotConstants.MagicNumber.ZERO;
39   - }
40   -
41   - @Override
42 48 public boolean updateStateByTbDeviceIdAndTaskCenterId(
43 49 String tenantId, String taskCenterId, String tbDeviceId, Integer state) {
44 50 TkDeviceTaskCenterEntity entity =
... ... @@ -55,7 +61,19 @@ public class TkDeviceTaskCenterServiceImpl
55 61 }
56 62
57 63 @Override
  64 + @Transactional
58 65 public boolean deleteByTaskCenterId(String tenantId, String taskCenterId) {
  66 + List<TkDeviceTaskCenterDTO> list = getInfosByTaskCenterId(tenantId, taskCenterId);
  67 + if (null != list && !list.isEmpty()) {
  68 + for (TkDeviceTaskCenterDTO tkDeviceTaskCenterDTO : list) {
  69 + String cacheName = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS;
  70 + String key =
  71 + FastIotConstants.CacheConfigKey.TASK_CENTER_DEVICE_EXECUTE_TIME
  72 + + "_"
  73 + + tkDeviceTaskCenterDTO.getTbDeviceId();
  74 + cacheUtils.invalidate(cacheName, key);
  75 + }
  76 + }
59 77 return baseMapper.delete(
60 78 new LambdaQueryWrapper<TkDeviceTaskCenterEntity>()
61 79 .eq(TkDeviceTaskCenterEntity::getTenantId, tenantId)
... ... @@ -68,4 +86,47 @@ public class TkDeviceTaskCenterServiceImpl
68 86 public boolean insertBatch(List<TkDeviceTaskCenterEntity> list) {
69 87 return insertBatch(list, list.size());
70 88 }
  89 +
  90 + @Override
  91 + public TkDeviceTaskCenterDTO findInfoByDeviceIdAndTaskCenterId(
  92 + String tenantId, String tbDeviceId, String taskCenterId) {
  93 + if (StringUtils.isEmpty(taskCenterId)
  94 + || StringUtils.isEmpty(tbDeviceId)
  95 + || StringUtils.isEmpty(tenantId)) {
  96 + throw new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage());
  97 + }
  98 + TkDeviceTaskCenterEntity entity =
  99 + baseMapper.selectOne(
  100 + new LambdaQueryWrapper<TkDeviceTaskCenterEntity>()
  101 + .eq(TkDeviceTaskCenterEntity::getTenantId, tenantId)
  102 + .eq(TkDeviceTaskCenterEntity::getTbDeviceId, tbDeviceId)
  103 + .eq(TkDeviceTaskCenterEntity::getTaskCenterId, taskCenterId));
  104 + return Optional.ofNullable(entity)
  105 + .map(obj -> obj.getDTO(TkDeviceTaskCenterDTO.class))
  106 + .orElse(null);
  107 + }
  108 +
  109 + @Override
  110 + public ListenableFuture<List<TkDeviceTaskCenterDTO>> findInfosByTaskCenterId(
  111 + String tenantId, String taskCenterId) {
  112 + return Futures.immediateFuture(getInfosByTaskCenterId(tenantId, taskCenterId));
  113 + }
  114 +
  115 + @Override
  116 + public List<TkDeviceTaskCenterDTO> getInfosByTaskCenterId(String tenantId, String taskCenterId) {
  117 + if (StringUtils.isEmpty(taskCenterId) || StringUtils.isEmpty(tenantId)) {
  118 + throw new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage());
  119 + }
  120 + List<TkDeviceTaskCenterEntity> entityList =
  121 + baseMapper.selectList(
  122 + new LambdaQueryWrapper<TkDeviceTaskCenterEntity>()
  123 + .eq(TkDeviceTaskCenterEntity::getTenantId, tenantId)
  124 + .eq(TkDeviceTaskCenterEntity::getTaskCenterId, taskCenterId));
  125 + if (null != entityList && !entityList.isEmpty()) {
  126 + return entityList.stream()
  127 + .map(entity -> entity.getDTO(TkDeviceTaskCenterDTO.class))
  128 + .collect(Collectors.toList());
  129 + }
  130 + return null;
  131 + }
71 132 }
... ...
... ... @@ -42,6 +42,7 @@ public class TkTaskCenterServiceImpl
42 42 private final TkDeviceService tkDeviceService;
43 43 private final CacheUtils cacheUtils;
44 44 private final TkDeviceTaskCenterService tkDeviceTaskCenterService;
  45 + private static final String cacheName = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS;
45 46
46 47 @Override
47 48 public TkPageData<TkTaskCenterDTO> taskCenterPage(Map<String, Object> queryMap, String tenantId) {
... ... @@ -50,17 +51,29 @@ public class TkTaskCenterServiceImpl
50 51 getPage(queryMap, FastIotConstants.DefaultOrder.CREATE_TIME, false);
51 52 IPage<TkTaskCenterDTO> iPage = baseMapper.getPageData(page, queryMap);
52 53 if (!iPage.getRecords().isEmpty()) {
53   - iPage.setRecords(iPage.getRecords().stream()
54   - .map(
55   - obj -> {
56   - String cacheName = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS;
57   - String key =
58   - FastIotConstants.CacheConfigKey.TASK_CENTER_EXECUTE_TIME + "_" + obj.getId();
59   - Optional<Long> lastExecuteTime = cacheUtils.get(cacheName, key);
60   - lastExecuteTime.ifPresent(obj::setLastExecuteTime);
61   - return obj;
62   - })
63   - .collect(Collectors.toList()));
  54 + iPage.setRecords(
  55 + iPage.getRecords().stream()
  56 + .map(
  57 + obj -> {
  58 + // 任务执行时间
  59 + String key =
  60 + FastIotConstants.CacheConfigKey.TASK_CENTER_EXECUTE_TIME
  61 + + "_"
  62 + + obj.getId();
  63 +
  64 + // 如果通过设备查询分页,以设备执行时间为准
  65 + String tbDeviceId = (String) queryMap.get("tbDeviceId");
  66 + if (null != tbDeviceId) {
  67 + key =
  68 + FastIotConstants.CacheConfigKey.TASK_CENTER_DEVICE_EXECUTE_TIME
  69 + + "_"
  70 + + tbDeviceId;
  71 + }
  72 + Optional<Long> lastExecuteTime = cacheUtils.get(cacheName, key);
  73 + lastExecuteTime.ifPresent(obj::setLastExecuteTime);
  74 + return obj;
  75 + })
  76 + .collect(Collectors.toList()));
64 77 }
65 78 return new TkPageData<>(iPage.getRecords(), iPage.getTotal());
66 79 }
... ... @@ -85,7 +98,8 @@ public class TkTaskCenterServiceImpl
85 98 JacksonUtil.convertValue(tkTaskCenterDTO.getExecuteTarget(), JsonNode.class));
86 99 entity.setExecuteTime(
87 100 JacksonUtil.convertValue(tkTaskCenterDTO.getExecuteTime(), JsonNode.class));
88   - if (StringUtils.isEmpty(tkTaskCenterDTO.getId())) {
  101 + boolean isUpdate = !StringUtils.isEmpty(tkTaskCenterDTO.getId());
  102 + if (!isUpdate) {
89 103 entity.setState(FastIotConstants.StateValue.DISABLE);
90 104 baseMapper.insert(entity);
91 105 } else {
... ... @@ -99,7 +113,7 @@ public class TkTaskCenterServiceImpl
99 113 baseMapper.updateById(entity);
100 114 updateTaskCenterCache(entity);
101 115 }
102   - saveOrUpdateDeviceTaskCenter(tkTaskCenterDTO);
  116 + saveOrUpdateDeviceTaskCenter(entity, isUpdate);
103 117 return tkTaskCenterDTO;
104 118 }
105 119
... ... @@ -125,18 +139,20 @@ public class TkTaskCenterServiceImpl
125 139 }
126 140 sourceIds.add(entity.getId());
127 141 }
  142 + for (String taskCenterId : sourceIds) {
  143 + tkDeviceTaskCenterService.deleteByTaskCenterId(tenantId, taskCenterId);
  144 + }
128 145 int result = baseMapper.deleteBatchIds(sourceIds);
129 146 if (result > FastIotConstants.MagicNumber.ZERO) {
  147 + // 移除任务中心缓存
130 148 for (String taskCenterId : sourceIds) {
131   - tkDeviceTaskCenterService.deleteByTaskCenterId(tenantId, taskCenterId);
  149 + String cacheName = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS;
  150 + cacheUtils.invalidate(
  151 + cacheName,
  152 + FastIotConstants.CacheConfigKey.TASK_CENTER_EXECUTE_TIME + "_" + taskCenterId);
132 153 }
133 154 return tkSysJobService.deleteJobs(sourceIds, JobGroupEnum.TASK_CENTER.name(), tenantId);
134 155 }
135   - // 移除任务中心缓存
136   - for (String taskCenterId : sourceIds) {
137   - String cacheName = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS;
138   - cacheUtils.invalidate(cacheName, taskCenterId);
139   - }
140 156 return false;
141 157 }
142 158
... ... @@ -211,7 +227,7 @@ public class TkTaskCenterServiceImpl
211 227 String sourceId = entity.getId();
212 228 SysJobDTO queryJob = tkSysJobService.findSysJobBySourceId(sourceId);
213 229 TaskExecuteTimeDTO executeTime =
214   - JacksonUtil.convertValue(entity.getExecuteTime(), TaskExecuteTimeDTO.class);
  230 + JacksonUtil.convertValue(entity.getExecuteTime(), TaskExecuteTimeDTO.class);
215 231 if (null == executeTime || StringUtils.isEmpty(executeTime.getCron())) {
216 232 throw new TkDataValidationException(ErrorMessage.CRON_INVALID.getMessage());
217 233 }
... ... @@ -276,6 +292,29 @@ public class TkTaskCenterServiceImpl
276 292 return result;
277 293 }
278 294
  295 + @Override
  296 + public boolean immediateExecute(TaskImmediateExecuteDTO immediateExecuteDTO, String tenantId)
  297 + throws SchedulerException {
  298 + // 放入缓存
  299 + String taskCenterId = immediateExecuteDTO.getId();
  300 + String key = FastIotConstants.CacheConfigKey.TASK_IMMEDIATE_EXECUTE + "_" + taskCenterId;
  301 + cacheUtils.put(cacheName, key, immediateExecuteDTO);
  302 + // 直接调用任务中心任务执行
  303 + SysJobDTO sysJobDTO = tkSysJobService.findSysJobBySourceId(taskCenterId);
  304 + if (null == sysJobDTO) {
  305 + sysJobDTO = new SysJobDTO();
  306 + sysJobDTO.setSourceId(taskCenterId);
  307 + sysJobDTO.setInvokeTarget("rpcCommandTask.process('" + taskCenterId + "')");
  308 + sysJobDTO.setJobGroup(JobGroupEnum.TASK_CENTER.name());
  309 + sysJobDTO.setTenantId(tenantId);
  310 + sysJobDTO.setCronExpression(immediateExecuteDTO.getCronExpression());
  311 + sysJobDTO.setJobName(immediateExecuteDTO.getName());
  312 + sysJobDTO.setStatus(StatusEnum.DISABLE.getIndex());
  313 + tkSysJobService.saveOrUpdateJob(sysJobDTO);
  314 + }
  315 + return tkSysJobService.run(sysJobDTO);
  316 + }
  317 +
279 318 private void updateTaskCenterCache(TkTaskCenterEntity entity) {
280 319 String cacheName = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS;
281 320 Optional<TkTaskCenterEntity> entityCache = cacheUtils.get(cacheName, entity.getId());
... ... @@ -284,13 +323,15 @@ public class TkTaskCenterServiceImpl
284 323 }
285 324 }
286 325
287   - private void saveOrUpdateDeviceTaskCenter(TkTaskCenterDTO tkTaskCenter) {
288   - String taskCenterId = tkTaskCenter.getId();
289   - String tenantId = tkTaskCenter.getTenantId();
  326 + private void saveOrUpdateDeviceTaskCenter(TkTaskCenterEntity entity, boolean isUpdate) {
  327 + String taskCenterId = entity.getId();
  328 + String tenantId = entity.getTenantId();
290 329 // 判断是按产品执行还是按设备执行
291   - List<String> data = tkTaskCenter.getExecuteTarget().getData();
292   - boolean isUpdate = !StringUtils.isEmpty(taskCenterId);
293   - if (Objects.equals(tkTaskCenter.getTargetType(), TargetTypeEnum.DEVICES)) {
  330 + List<String> data =
  331 + Objects.requireNonNull(
  332 + JacksonUtil.convertValue(entity.getExecuteTarget(), TargetContentDTO.class))
  333 + .getData();
  334 + if (Objects.equals(entity.getTargetType(), TargetTypeEnum.DEVICES)) {
294 335 processDeviceTaskCenterMapping(isUpdate, tenantId, taskCenterId, data);
295 336 } else {
296 337 for (String key : data) {
... ...
... ... @@ -88,13 +88,14 @@ public interface TkDeviceService extends BaseService<TkDeviceEntity> {
88 88 *
89 89 * @param tbDeviceId TB设备主键
90 90 * @param created 告警状态:0正常,1告警
  91 + * @return true成功 false失败
91 92 */
92 93 boolean freshAlarmStatus(EntityId tbDeviceId, Integer created);
93 94
94 95 /**
95 96 * 自动生成设备SN
96 97 *
97   - * @return
  98 + * @return 设备SN
98 99 */
99 100 String generateSn();
100 101
... ... @@ -103,7 +104,7 @@ public interface TkDeviceService extends BaseService<TkDeviceEntity> {
103 104 *
104 105 * @param deviceId 平台设备ID
105 106 * @param tenantId 租户ID
106   - * @return
  107 + * @return 场景联动ID
107 108 */
108 109 String otherUsing(String deviceId, String tenantId);
109 110
... ... @@ -134,7 +135,7 @@ public interface TkDeviceService extends BaseService<TkDeviceEntity> {
134 135 * @param tenantId 租户ID
135 136 * @param masterId 网关设备的TB_ID
136 137 * @param deviceCode 网关子设备的标识符,例如:485协议的从设备地址码
137   - * @return
  138 + * @return 从设备信息
138 139 */
139 140 DeviceDTO findSlaveDevice(String tenantId, String masterId, String deviceCode);
140 141
... ... @@ -145,7 +146,7 @@ public interface TkDeviceService extends BaseService<TkDeviceEntity> {
145 146 * @param customerId 客户ID
146 147 * @param organizationId 组织ID
147 148 * @param deviceIds 设备ID
148   - * @return
  149 + * @return 遥测属性列表
149 150 */
150 151 List<String> findDeviceKeys(
151 152 String tenantId, String customerId, String organizationId, List<String> deviceIds);
... ... @@ -156,7 +157,7 @@ public interface TkDeviceService extends BaseService<TkDeviceEntity> {
156 157 * @param slaveId 网关子设备TB平台的ID
157 158 * @param slaveName 网关子设备TB平台的名称
158 159 * @param gatewayId 网关设备TB平台的ID
159   - * @return
  160 + * @return true成功 false失败
160 161 */
161 162 boolean saveSlaveDevice(
162 163 String slaveId, String slaveName, String tbProfileId, String gatewayId, Long createTime);
... ... @@ -212,4 +213,12 @@ public interface TkDeviceService extends BaseService<TkDeviceEntity> {
212 213 * @return id集合
213 214 */
214 215 List<String> findTbDeviceIdsByDeviceProfileId(String deviceProfileId,String tenantId);
  216 +
  217 + /**
  218 + * 根据TB设备ID列表获取设备信息
  219 + * @param tbDeviceIds id列表
  220 + * @param tenantId 租户ID
  221 + * @return 设备列表
  222 + */
  223 + List<DeviceDTO> findDeviceInfoByTbDeviceIds(List<String> tbDeviceIds,String tenantId);
215 224 }
... ...
1 1 package org.thingsboard.server.dao.yunteng.service;
2 2
3   -import org.thingsboard.server.common.data.yunteng.dto.DeleteDTO;
  3 +import com.google.common.util.concurrent.ListenableFuture;
4 4 import org.thingsboard.server.common.data.yunteng.dto.task.TkDeviceTaskCenterDTO;
5 5 import org.thingsboard.server.dao.yunteng.entities.TkDeviceTaskCenterEntity;
6 6 import java.util.List;
... ... @@ -15,30 +15,25 @@ public interface TkDeviceTaskCenterService {
15 15 TkDeviceTaskCenterDTO saveOrUpdate(TkDeviceTaskCenterDTO tkDeviceTaskCenterDTO);
16 16
17 17 /**
18   - * 删除设备与任务中心的映射关系
19   - *
20   - * @param delete 删除参数
21   - * @return true成功 false失败
22   - */
23   - boolean delete(DeleteDTO delete);
24   -
25   - /**
26 18 * 根据任务中心ID和设备ID更新状态
  19 + *
27 20 * @param tenantId 租户ID
28 21 * @param taskCenterId 任务重ID
29 22 * @param tbDeviceId 设备ID
30 23 * @param state 状态
31 24 * @return true成功 false失败
32 25 */
33   - boolean updateStateByTbDeviceIdAndTaskCenterId(String tenantId,String taskCenterId,String tbDeviceId,Integer state);
  26 + boolean updateStateByTbDeviceIdAndTaskCenterId(
  27 + String tenantId, String taskCenterId, String tbDeviceId, Integer state);
34 28
35 29 /**
36 30 * 通过任务ID删除所有的映射关系
  31 + *
37 32 * @param tenantId 租户ID
38 33 * @param taskCenterId 任务ID
39 34 * @return true成功 false失败
40 35 */
41   - boolean deleteByTaskCenterId(String tenantId,String taskCenterId);
  36 + boolean deleteByTaskCenterId(String tenantId, String taskCenterId);
42 37
43 38 /**
44 39 * 批量插入
... ... @@ -47,4 +42,27 @@ public interface TkDeviceTaskCenterService {
47 42 * @return true成功 false失败
48 43 */
49 44 boolean insertBatch(List<TkDeviceTaskCenterEntity> list);
  45 +
  46 + /**
  47 + * 根据任务中心ID和设备ID查询设备与任务中心映射关系
  48 + *
  49 + * @param tenantId 租户ID
  50 + * @param tbDeviceId 设备ID
  51 + * @param taskCenterId 任务中心ID
  52 + * @return 设备与任务中心映射关系
  53 + */
  54 + TkDeviceTaskCenterDTO findInfoByDeviceIdAndTaskCenterId(
  55 + String tenantId, String tbDeviceId, String taskCenterId);
  56 +
  57 + /**
  58 + * 根据任务中心的ID查询设备与任务中心的映射列表
  59 + *
  60 + * @param tenantId 租户ID
  61 + * @param taskCenterId 任务中心ID
  62 + * @return 设备与任务中心映射关系列表
  63 + */
  64 + ListenableFuture<List<TkDeviceTaskCenterDTO>> findInfosByTaskCenterId(
  65 + String tenantId, String taskCenterId);
  66 +
  67 + List<TkDeviceTaskCenterDTO> getInfosByTaskCenterId(String tenantId,String taskCenterId);
50 68 }
... ...
... ... @@ -3,6 +3,7 @@ package org.thingsboard.server.dao.yunteng.service;
3 3 import com.google.common.util.concurrent.ListenableFuture;
4 4 import org.quartz.SchedulerException;
5 5 import org.thingsboard.server.common.data.yunteng.dto.DeleteDTO;
  6 +import org.thingsboard.server.common.data.yunteng.dto.task.TaskImmediateExecuteDTO;
6 7 import org.thingsboard.server.common.data.yunteng.dto.task.TkTaskCenterDTO;
7 8 import org.thingsboard.server.common.data.yunteng.utils.tools.TkPageData;
8 9 import java.util.Map;
... ... @@ -67,4 +68,14 @@ public interface TkTaskCenterService {
67 68 * @return true成功 false失败
68 69 */
69 70 boolean cancelOrAllowExecute(String id, String tbDeviceId, String tenantId, boolean isAllow);
  71 +
  72 + /**
  73 + * 立即执行任务
  74 + *
  75 + * @param immediateExecuteDTO 执行信息
  76 + * @param tenantId 租户ID
  77 + * @return ture成功 false失败
  78 + */
  79 + boolean immediateExecute(TaskImmediateExecuteDTO immediateExecuteDTO, String tenantId)
  80 + throws SchedulerException;
70 81 }
... ...