Commit 588d13c3931fc6ad8f42f4252626311a493c80ee

Authored by xp.Huang
1 parent a56d897a

feat: 增加设备与任务中心的映射表,通过分页返回最后执行时间

Showing 20 changed files with 490 additions and 127 deletions
@@ -5,7 +5,6 @@ import io.swagger.annotations.ApiOperation; @@ -5,7 +5,6 @@ import io.swagger.annotations.ApiOperation;
5 import lombok.RequiredArgsConstructor; 5 import lombok.RequiredArgsConstructor;
6 import org.apache.commons.lang3.StringUtils; 6 import org.apache.commons.lang3.StringUtils;
7 import org.quartz.SchedulerException; 7 import org.quartz.SchedulerException;
8 -import org.springframework.security.access.prepost.PreAuthorize;  
9 import org.springframework.validation.annotation.Validated; 8 import org.springframework.validation.annotation.Validated;
10 import org.springframework.web.bind.annotation.*; 9 import org.springframework.web.bind.annotation.*;
11 import org.thingsboard.server.common.data.exception.ThingsboardException; 10 import org.thingsboard.server.common.data.exception.ThingsboardException;
@@ -37,13 +36,14 @@ public class TkTaskCenterController extends BaseController { @@ -37,13 +36,14 @@ public class TkTaskCenterController extends BaseController {
37 @GetMapping( 36 @GetMapping(
38 name = "page", 37 name = "page",
39 params = {PAGE_SIZE, PAGE}) 38 params = {PAGE_SIZE, PAGE})
40 - public TkPageData<TkTaskCenterDTO> pageDevice( 39 + public TkPageData<TkTaskCenterDTO> page(
41 @RequestParam(PAGE_SIZE) int pageSize, 40 @RequestParam(PAGE_SIZE) int pageSize,
42 @RequestParam(PAGE) int page, 41 @RequestParam(PAGE) int page,
43 @RequestParam(value = "state", required = false) Integer state, 42 @RequestParam(value = "state", required = false) Integer state,
44 @RequestParam(value = "targetType", required = false) TargetTypeEnum targetType, 43 @RequestParam(value = "targetType", required = false) TargetTypeEnum targetType,
45 @RequestParam(value = ORDER_FILED, required = false) String orderBy, 44 @RequestParam(value = ORDER_FILED, required = false) String orderBy,
46 - @RequestParam(value = ORDER_TYPE, required = false) OrderTypeEnum orderType) 45 + @RequestParam(value = ORDER_TYPE, required = false) OrderTypeEnum orderType,
  46 + @RequestParam(value = "tbDeviceId", required = false) String tbDeviceId)
47 throws ThingsboardException { 47 throws ThingsboardException {
48 HashMap<String, Object> queryMap = new HashMap<>(); 48 HashMap<String, Object> queryMap = new HashMap<>();
49 queryMap.put(PAGE_SIZE, pageSize); 49 queryMap.put(PAGE_SIZE, pageSize);
@@ -52,13 +52,17 @@ public class TkTaskCenterController extends BaseController { @@ -52,13 +52,17 @@ public class TkTaskCenterController extends BaseController {
52 queryMap.put(ORDER_TYPE, orderType); 52 queryMap.put(ORDER_TYPE, orderType);
53 queryMap.put("state", state); 53 queryMap.put("state", state);
54 queryMap.put("targetType", targetType); 54 queryMap.put("targetType", targetType);
  55 + if (!StringUtils.isEmpty(tbDeviceId)) {
  56 + queryMap.put("tbDeviceId", tbDeviceId);
  57 + }
55 return tkTaskCenterService.taskCenterPage(queryMap, getCurrentUser().getCurrentTenantId()); 58 return tkTaskCenterService.taskCenterPage(queryMap, getCurrentUser().getCurrentTenantId());
56 } 59 }
57 60
58 @PostMapping("/add") 61 @PostMapping("/add")
59 @ApiOperation(value = "新增任务中心") 62 @ApiOperation(value = "新增任务中心")
60 -// @PreAuthorize(  
61 -// "@check.checkPermissions({'TENANT_ADMIN','CUSTOMER_USER'},{'api:yt:task_center:add:post'})") 63 + // @PreAuthorize(
  64 + //
  65 + // "@check.checkPermissions({'TENANT_ADMIN','CUSTOMER_USER'},{'api:yt:task_center:add:post'})")
62 public ResponseResult<TkTaskCenterDTO> save( 66 public ResponseResult<TkTaskCenterDTO> save(
63 @RequestBody @Validated(AddGroup.class) TkTaskCenterDTO taskCenter) 67 @RequestBody @Validated(AddGroup.class) TkTaskCenterDTO taskCenter)
64 throws ThingsboardException, SchedulerException { 68 throws ThingsboardException, SchedulerException {
@@ -69,8 +73,9 @@ public class TkTaskCenterController extends BaseController { @@ -69,8 +73,9 @@ public class TkTaskCenterController extends BaseController {
69 } 73 }
70 74
71 @PutMapping("/update") 75 @PutMapping("/update")
72 -// @PreAuthorize(  
73 -// "@check.checkPermissions({'TENANT_ADMIN','CUSTOMER_USER'},{'api:yt:task_center:update:update'})") 76 + // @PreAuthorize(
  77 + //
  78 + // "@check.checkPermissions({'TENANT_ADMIN','CUSTOMER_USER'},{'api:yt:task_center:update:update'})")
74 @ApiOperation(value = "编辑任务中心") 79 @ApiOperation(value = "编辑任务中心")
75 public ResponseResult<TkTaskCenterDTO> update( 80 public ResponseResult<TkTaskCenterDTO> update(
76 @RequestBody @Validated(UpdateGroup.class) TkTaskCenterDTO taskCenter) 81 @RequestBody @Validated(UpdateGroup.class) TkTaskCenterDTO taskCenter)
@@ -82,8 +87,9 @@ public class TkTaskCenterController extends BaseController { @@ -82,8 +87,9 @@ public class TkTaskCenterController extends BaseController {
82 } 87 }
83 88
84 @PutMapping("/{id}/update/{state}") 89 @PutMapping("/{id}/update/{state}")
85 -// @PreAuthorize(  
86 -// "@check.checkPermissions({'TENANT_ADMIN','CUSTOMER_USER'},{'api:yt:task_center:update:update'})") 90 + // @PreAuthorize(
  91 + //
  92 + // "@check.checkPermissions({'TENANT_ADMIN','CUSTOMER_USER'},{'api:yt:task_center:update:update'})")
87 @ApiOperation(value = "更新状态") 93 @ApiOperation(value = "更新状态")
88 public ResponseResult<Boolean> updateState( 94 public ResponseResult<Boolean> updateState(
89 @PathVariable("id") String id, @PathVariable("state") Integer state) 95 @PathVariable("id") String id, @PathVariable("state") Integer state)
@@ -96,8 +102,8 @@ public class TkTaskCenterController extends BaseController { @@ -96,8 +102,8 @@ public class TkTaskCenterController extends BaseController {
96 } 102 }
97 103
98 @DeleteMapping 104 @DeleteMapping
99 -// @PreAuthorize(  
100 -// "@check.checkPermissions({'TENANT_ADMIN','CUSTOMER_USER'},{'api:yt:task_center:delete'})") 105 + // @PreAuthorize(
  106 + // "@check.checkPermissions({'TENANT_ADMIN','CUSTOMER_USER'},{'api:yt:task_center:delete'})")
101 @ApiOperation(value = "删除任务中心") 107 @ApiOperation(value = "删除任务中心")
102 public ResponseResult<Boolean> deleteTaskCenter(@RequestBody DeleteDTO deleteDTO) 108 public ResponseResult<Boolean> deleteTaskCenter(@RequestBody DeleteDTO deleteDTO)
103 throws ThingsboardException, SchedulerException { 109 throws ThingsboardException, SchedulerException {
@@ -105,18 +111,22 @@ public class TkTaskCenterController extends BaseController { @@ -105,18 +111,22 @@ public class TkTaskCenterController extends BaseController {
105 return ResponseResult.success(tkTaskCenterService.deleteTaskCenter(deleteDTO)); 111 return ResponseResult.success(tkTaskCenterService.deleteTaskCenter(deleteDTO));
106 } 112 }
107 113
108 - @PutMapping("/{id}/cancel/{tbDeviceId}")  
109 -// @PreAuthorize(  
110 -// "@check.checkPermissions({'TENANT_ADMIN','CUSTOMER_USER'},{'api:yt:task_center:cancel'})")  
111 - @ApiOperation(value = "设备取消任务执行")  
112 - public ResponseResult<Boolean> cancelExecute(  
113 - @PathVariable("id") String taskId, @PathVariable("tbDeviceId") String tbDeviceId)throws ThingsboardException {  
114 - if(StringUtils.isEmpty(taskId) || StringUtils.isEmpty(tbDeviceId)){ 114 + @PutMapping("/{id}/update/{tbDeviceId}/{allow}")
  115 + // @PreAuthorize(
  116 + //
  117 + // "@check.checkPermissions({'TENANT_ADMIN','CUSTOMER_USER'},{'api:yt:task_center:cancel'})")
  118 + @ApiOperation(value = "设备更新任务执行")
  119 + public ResponseResult<Boolean> cancelOrAllowExecute(
  120 + @PathVariable("id") String taskId,
  121 + @PathVariable("tbDeviceId") String tbDeviceId,
  122 + @PathVariable("allow") boolean allow)
  123 + throws ThingsboardException {
  124 + if (StringUtils.isEmpty(taskId) || StringUtils.isEmpty(tbDeviceId)) {
115 throw new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage()); 125 throw new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage());
116 } 126 }
117 return ResponseResult.success( 127 return ResponseResult.success(
118 - tkTaskCenterService.cancelExecute(  
119 - taskId, tbDeviceId, getCurrentUser().getCurrentTenantId())); 128 + tkTaskCenterService.cancelOrAllowExecute(
  129 + taskId, tbDeviceId, getCurrentUser().getCurrentTenantId(), allow));
120 } 130 }
121 131
122 private TkTaskCenterDTO saveOrUpdate(TkTaskCenterDTO tkTaskCenterDTO) 132 private TkTaskCenterDTO saveOrUpdate(TkTaskCenterDTO tkTaskCenterDTO)
@@ -16,6 +16,7 @@ import org.thingsboard.server.common.data.id.DeviceId; @@ -16,6 +16,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
16 import org.thingsboard.server.common.data.id.TenantId; 16 import org.thingsboard.server.common.data.id.TenantId;
17 import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; 17 import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
18 import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants; 18 import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;
  19 +import org.thingsboard.server.common.data.yunteng.core.cache.CacheUtils;
19 import org.thingsboard.server.common.data.yunteng.dto.DeviceDTO; 20 import org.thingsboard.server.common.data.yunteng.dto.DeviceDTO;
20 import org.thingsboard.server.common.data.yunteng.dto.task.TargetContentDTO; 21 import org.thingsboard.server.common.data.yunteng.dto.task.TargetContentDTO;
21 import org.thingsboard.server.common.data.yunteng.dto.task.TkTaskCenterDTO; 22 import org.thingsboard.server.common.data.yunteng.dto.task.TkTaskCenterDTO;
@@ -25,6 +26,8 @@ import org.thingsboard.server.dao.yunteng.service.TkDeviceService; @@ -25,6 +26,8 @@ import org.thingsboard.server.dao.yunteng.service.TkDeviceService;
25 import org.thingsboard.server.dao.yunteng.service.TkTaskCenterService; 26 import org.thingsboard.server.dao.yunteng.service.TkTaskCenterService;
26 import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; 27 import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
27 import org.thingsboard.server.service.security.model.SecurityUser; 28 import org.thingsboard.server.service.security.model.SecurityUser;
  29 +import java.time.LocalDateTime;
  30 +import java.time.ZoneOffset;
28 import java.util.*; 31 import java.util.*;
29 32
30 @Component("rpcCommandTask") 33 @Component("rpcCommandTask")
@@ -40,6 +43,8 @@ public class RpcCommandTask { @@ -40,6 +43,8 @@ public class RpcCommandTask {
40 private final TkTaskCenterService tkTaskCenterService; 43 private final TkTaskCenterService tkTaskCenterService;
41 private final TbCoreDeviceRpcService tbCoreDeviceRpcService; 44 private final TbCoreDeviceRpcService tbCoreDeviceRpcService;
42 private final TkDeviceService tkDeviceService; 45 private final TkDeviceService tkDeviceService;
  46 + private final CacheUtils cacheUtils;
  47 +
43 public void process(String taskCenterId) { 48 public void process(String taskCenterId) {
44 // 通过任务中心ID查询执行命令及执行对象 49 // 通过任务中心ID查询执行命令及执行对象
45 ListenableFuture<TkTaskCenterDTO> future = 50 ListenableFuture<TkTaskCenterDTO> future =
@@ -57,6 +62,15 @@ public class RpcCommandTask { @@ -57,6 +62,15 @@ public class RpcCommandTask {
57 // send cmd 62 // send cmd
58 if (targetContent.getData() != null && null != cmdJsonNode) { 63 if (targetContent.getData() != null && null != cmdJsonNode) {
59 String tenantId = tkTaskCenterDTO.getTenantId(); 64 String tenantId = tkTaskCenterDTO.getTenantId();
  65 + String cacheName = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS;
  66 + String key =
  67 + FastIotConstants.CacheConfigKey.TASK_CENTER_EXECUTE_TIME
  68 + + "_"
  69 + + tkTaskCenterDTO.getId();
  70 + cacheUtils.put(
  71 + cacheName,
  72 + key,
  73 + LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
60 if (targetType.equals(TargetTypeEnum.DEVICES)) { 74 if (targetType.equals(TargetTypeEnum.DEVICES)) {
61 for (String deviceId : targetContent.getData()) { 75 for (String deviceId : targetContent.getData()) {
62 sendRpcCommand(cmdJsonNode, tenantId, deviceId, securityUser); 76 sendRpcCommand(cmdJsonNode, tenantId, deviceId, securityUser);
@@ -82,7 +96,7 @@ public class RpcCommandTask { @@ -82,7 +96,7 @@ public class RpcCommandTask {
82 for (String deviceProfileId : targetContent.getData()) { 96 for (String deviceProfileId : targetContent.getData()) {
83 Futures.addCallback( 97 Futures.addCallback(
84 tkDeviceService.findDeviceListByDeviceProfileId(deviceProfileId, tenantId), 98 tkDeviceService.findDeviceListByDeviceProfileId(deviceProfileId, tenantId),
85 - new FutureCallback<List<DeviceDTO>>() { 99 + new FutureCallback<>() {
86 @Override 100 @Override
87 public void onSuccess(@Nullable List<DeviceDTO> deviceDTOS) { 101 public void onSuccess(@Nullable List<DeviceDTO> deviceDTOS) {
88 if (null != deviceDTOS && !deviceDTOS.isEmpty()) { 102 if (null != deviceDTOS && !deviceDTOS.isEmpty()) {
@@ -125,7 +139,7 @@ public class RpcCommandTask { @@ -125,7 +139,7 @@ public class RpcCommandTask {
125 true, 139 true,
126 System.currentTimeMillis() + Math.max(minTimeout, defaultTimeout), 140 System.currentTimeMillis() + Math.max(minTimeout, defaultTimeout),
127 body, 141 body,
128 - false, 142 + true,
129 null, 143 null,
130 null); 144 null);
131 tbCoreDeviceRpcService.processRestApiRpcRequest( 145 tbCoreDeviceRpcService.processRestApiRpcRequest(
@@ -27,7 +27,8 @@ public interface FastIotConstants { @@ -27,7 +27,8 @@ public interface FastIotConstants {
27 String MOBILE_LOGIN_SMS_CODE = "mobileLoginSmsCode"; 27 String MOBILE_LOGIN_SMS_CODE = "mobileLoginSmsCode";
28 String AREA = "thingsArea"; 28 String AREA = "thingsArea";
29 String PUBLIC_ID = "publicId_"; 29 String PUBLIC_ID = "publicId_";
30 - String TASK_CENTER_INFOS = "taskCenterInfos_"; 30 + String TASK_CENTER_INFOS = "taskCenterInfos";
  31 + String TASK_CENTER_EXECUTE_TIME = "taskCenterExecuteTime";
31 } 32 }
32 33
33 interface TBCacheConfig { 34 interface TBCacheConfig {
@@ -120,6 +120,8 @@ public final class ModelConstants { @@ -120,6 +120,8 @@ public final class ModelConstants {
120 public static final String TK_DEVICE_STATE_LOG = "tk_device_state_log"; 120 public static final String TK_DEVICE_STATE_LOG = "tk_device_state_log";
121 /** 任务中心表 */ 121 /** 任务中心表 */
122 public static final String TK_TASK_CENTER_NAME = "tk_task_center"; 122 public static final String TK_TASK_CENTER_NAME = "tk_task_center";
  123 + /** 设备与任务中心映射表 */
  124 + public static final String TK_DEVICE_TASK_CENTER_NAME = "tk_device_task_center";
123 } 125 }
124 126
125 public static class TableFields { 127 public static class TableFields {
@@ -19,6 +19,6 @@ public class TargetContentDTO { @@ -19,6 +19,6 @@ public class TargetContentDTO {
19 @ApiModelProperty(value = "设备配置ID") 19 @ApiModelProperty(value = "设备配置ID")
20 String deviceProfileId; 20 String deviceProfileId;
21 21
22 - @ApiModelProperty(value = "取消执行的数据:只有目标类型是产品时才会有值,且List<String>里面为tbDeviceId") 22 + @ApiModelProperty(value = "取消执行的数据:key为产品ID即deviceProfileId,value为List<String>里面为tbDeviceId")
23 Map<String, List<String>> cancelExecuteDevices; 23 Map<String, List<String>> cancelExecuteDevices;
24 } 24 }
  1 +package org.thingsboard.server.common.data.yunteng.dto.task;
  2 +
  3 +import io.swagger.annotations.ApiModelProperty;
  4 +import lombok.Data;
  5 +import org.thingsboard.server.common.data.yunteng.dto.BaseDTO;
  6 +
  7 +@Data
  8 +public class TkDeviceTaskCenterDTO extends BaseDTO {
  9 + @ApiModelProperty(value = "设备ID")
  10 + private String tbDeviceId;
  11 +
  12 + @ApiModelProperty(value = "任务中心ID")
  13 + private String taskCenterId;
  14 +
  15 + @ApiModelProperty(value = "是否允许执行:0不执行 1允许执行")
  16 + private Integer state;
  17 +
  18 + @ApiModelProperty(value = "租户ID")
  19 + private String tenantId;
  20 +}
@@ -33,9 +33,14 @@ public class TkTaskCenterDTO extends TenantDTO { @@ -33,9 +33,14 @@ public class TkTaskCenterDTO extends TenantDTO {
33 @NotNull(message = "执行时间不能为空", groups = AddGroup.class) 33 @NotNull(message = "执行时间不能为空", groups = AddGroup.class)
34 private TaskExecuteTimeDTO executeTime; 34 private TaskExecuteTimeDTO executeTime;
35 35
36 - @ApiModelProperty(value = "0禁用 1启用") 36 + @ApiModelProperty(value = "状态:0禁用 1启用")
37 private Integer state = FastIotConstants.StateValue.DISABLE; 37 private Integer state = FastIotConstants.StateValue.DISABLE;
38 38
  39 + @ApiModelProperty(value = "最后执行时间")
  40 + private Long lastExecuteTime;
  41 +
  42 + private TkDeviceTaskCenterDTO tkDeviceTaskCenter;
  43 +
39 @ApiModelProperty(value = "备注") 44 @ApiModelProperty(value = "备注")
40 private String remark; 45 private String remark;
41 } 46 }
  1 +package org.thingsboard.server.dao.yunteng.entities;
  2 +
  3 +import com.baomidou.mybatisplus.annotation.TableName;
  4 +import lombok.Data;
  5 +import lombok.EqualsAndHashCode;
  6 +import org.thingsboard.server.common.data.yunteng.constant.ModelConstants;
  7 +
  8 +@EqualsAndHashCode(callSuper = true)
  9 +@Data
  10 +@TableName(value = ModelConstants.Table.TK_DEVICE_TASK_CENTER_NAME)
  11 +public class TkDeviceTaskCenterEntity extends TenantBaseEntity {
  12 +
  13 + private static final long serialVersionUID = -5425391296429906856L;
  14 + private String tbDeviceId;
  15 +
  16 + private String taskCenterId;
  17 +
  18 + private Integer state;
  19 +}
@@ -2,6 +2,8 @@ package org.thingsboard.server.dao.yunteng.impl; @@ -2,6 +2,8 @@ package org.thingsboard.server.dao.yunteng.impl;
2 2
3 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; 3 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
4 import com.baomidou.mybatisplus.core.metadata.IPage; 4 import com.baomidou.mybatisplus.core.metadata.IPage;
  5 +import com.google.common.util.concurrent.Futures;
  6 +import com.google.common.util.concurrent.ListenableFuture;
5 import lombok.RequiredArgsConstructor; 7 import lombok.RequiredArgsConstructor;
6 import lombok.extern.slf4j.Slf4j; 8 import lombok.extern.slf4j.Slf4j;
7 import org.apache.commons.lang3.StringUtils; 9 import org.apache.commons.lang3.StringUtils;
@@ -17,6 +19,7 @@ import org.thingsboard.server.dao.yunteng.entities.SysJobLogEntity; @@ -17,6 +19,7 @@ import org.thingsboard.server.dao.yunteng.entities.SysJobLogEntity;
17 import org.thingsboard.server.dao.yunteng.mapper.SysJobLogMapper; 19 import org.thingsboard.server.dao.yunteng.mapper.SysJobLogMapper;
18 import org.thingsboard.server.dao.yunteng.service.AbstractBaseService; 20 import org.thingsboard.server.dao.yunteng.service.AbstractBaseService;
19 import org.thingsboard.server.dao.yunteng.service.TkSysJobLogService; 21 import org.thingsboard.server.dao.yunteng.service.TkSysJobLogService;
  22 +import java.time.ZoneOffset;
20 import java.util.Map; 23 import java.util.Map;
21 import java.util.Optional; 24 import java.util.Optional;
22 25
@@ -61,9 +64,7 @@ public class SysJobLogServiceImpl extends AbstractBaseService<SysJobLogMapper, S @@ -61,9 +64,7 @@ public class SysJobLogServiceImpl extends AbstractBaseService<SysJobLogMapper, S
61 } 64 }
62 }) 65 })
63 .orElseThrow( 66 .orElseThrow(
64 - () -> {  
65 - throw new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage());  
66 - }); 67 + () -> new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage()));
67 } 68 }
68 69
69 @Override 70 @Override
@@ -77,9 +78,7 @@ public class SysJobLogServiceImpl extends AbstractBaseService<SysJobLogMapper, S @@ -77,9 +78,7 @@ public class SysJobLogServiceImpl extends AbstractBaseService<SysJobLogMapper, S
77 Optional.ofNullable(sysJobLog) 78 Optional.ofNullable(sysJobLog)
78 .map(job -> baseMapper.updateById(jobLog)) 79 .map(job -> baseMapper.updateById(jobLog))
79 .orElseThrow( 80 .orElseThrow(
80 - () -> {  
81 - throw new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage());  
82 - }); 81 + () -> new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage()));
83 } 82 }
84 return jobLogDTO; 83 return jobLogDTO;
85 } 84 }
@@ -109,4 +108,21 @@ public class SysJobLogServiceImpl extends AbstractBaseService<SysJobLogMapper, S @@ -109,4 +108,21 @@ public class SysJobLogServiceImpl extends AbstractBaseService<SysJobLogMapper, S
109 } 108 }
110 return result; 109 return result;
111 } 110 }
  111 +
  112 + @Override
  113 + public ListenableFuture<Long> findLastLogEndTimeByJobId(String jobId, String jobGroup) {
  114 + SysJobLogEntity entity =
  115 + baseMapper.selectOne(
  116 + new LambdaQueryWrapper<SysJobLogEntity>()
  117 + .eq(SysJobLogEntity::getJobId, jobId)
  118 + .eq(SysJobLogEntity::getJobGroup, jobGroup)
  119 + .orderBy(true, false, SysJobLogEntity::getEndTime)
  120 + .last("limit 1"));
  121 + return Optional.ofNullable(entity)
  122 + .map(
  123 + obj ->
  124 + Futures.immediateFuture(
  125 + obj.getEndTime().toInstant(ZoneOffset.of("+8")).toEpochMilli()))
  126 + .orElse(Futures.immediateFuture(null));
  127 + }
112 } 128 }
@@ -2,17 +2,23 @@ package org.thingsboard.server.dao.yunteng.impl; @@ -2,17 +2,23 @@ package org.thingsboard.server.dao.yunteng.impl;
2 2
3 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; 3 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
4 import com.baomidou.mybatisplus.core.metadata.IPage; 4 import com.baomidou.mybatisplus.core.metadata.IPage;
  5 +import com.google.common.util.concurrent.FutureCallback;
5 import com.google.common.util.concurrent.Futures; 6 import com.google.common.util.concurrent.Futures;
6 import com.google.common.util.concurrent.ListenableFuture; 7 import com.google.common.util.concurrent.ListenableFuture;
  8 +import com.google.common.util.concurrent.MoreExecutors;
7 import lombok.RequiredArgsConstructor; 9 import lombok.RequiredArgsConstructor;
8 import org.apache.commons.lang3.StringUtils; 10 import org.apache.commons.lang3.StringUtils;
  11 +import org.checkerframework.checker.nullness.qual.Nullable;
  12 +import org.jetbrains.annotations.NotNull;
9 import org.quartz.*; 13 import org.quartz.*;
10 import org.springframework.stereotype.Service; 14 import org.springframework.stereotype.Service;
11 import org.springframework.transaction.annotation.Transactional; 15 import org.springframework.transaction.annotation.Transactional;
12 import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants; 16 import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;
  17 +import org.thingsboard.server.common.data.yunteng.core.cache.CacheUtils;
13 import org.thingsboard.server.common.data.yunteng.core.exception.TkDataValidationException; 18 import org.thingsboard.server.common.data.yunteng.core.exception.TkDataValidationException;
14 import org.thingsboard.server.common.data.yunteng.core.message.ErrorMessage; 19 import org.thingsboard.server.common.data.yunteng.core.message.ErrorMessage;
15 import org.thingsboard.server.common.data.yunteng.dto.*; 20 import org.thingsboard.server.common.data.yunteng.dto.*;
  21 +import org.thingsboard.server.common.data.yunteng.enums.JobGroupEnum;
16 import org.thingsboard.server.common.data.yunteng.enums.StatusEnum; 22 import org.thingsboard.server.common.data.yunteng.enums.StatusEnum;
17 import org.thingsboard.server.common.data.yunteng.utils.tools.TkPageData; 23 import org.thingsboard.server.common.data.yunteng.utils.tools.TkPageData;
18 import org.thingsboard.server.dao.util.yunteng.CronUtils; 24 import org.thingsboard.server.dao.util.yunteng.CronUtils;
@@ -32,24 +38,31 @@ import java.util.stream.Collectors; @@ -32,24 +38,31 @@ import java.util.stream.Collectors;
32 public class SysJobServiceImpl extends AbstractBaseService<SysJobMapper, SysJobEntity> 38 public class SysJobServiceImpl extends AbstractBaseService<SysJobMapper, SysJobEntity>
33 implements TkSysJobService { 39 implements TkSysJobService {
34 private final Scheduler scheduler; 40 private final Scheduler scheduler;
35 - 41 + private final SysJobLogServiceImpl sysJobLogService;
  42 + private final CacheUtils cacheUtils;
36 /** 项目启动时,初始化定时器 主要是防止手动修改数据库导致未同步到定时任务处理(注:不能手动修改数据库ID和任务组名,否则会导致脏数据) */ 43 /** 项目启动时,初始化定时器 主要是防止手动修改数据库导致未同步到定时任务处理(注:不能手动修改数据库ID和任务组名,否则会导致脏数据) */
37 @PostConstruct 44 @PostConstruct
38 public void init() throws SchedulerException { 45 public void init() throws SchedulerException {
39 scheduler.clear(); 46 scheduler.clear();
40 - List<SysJobEntity> jobList = new ArrayList<>();  
41 try { 47 try {
42 - jobList.addAll(  
43 - baseMapper.selectList(  
44 - new LambdaQueryWrapper<SysJobEntity>()  
45 - .eq(SysJobEntity::getStatus, StatusEnum.ENABLE.getIndex()))); 48 + List<SysJobEntity> jobList =
  49 + new ArrayList<>(baseMapper.selectList(new LambdaQueryWrapper<>()));
  50 + List<SysJobEntity> enableJobs = new ArrayList<>();
  51 + for (SysJobEntity job : jobList) {
  52 + if (Objects.equals(job.getStatus(),StatusEnum.ENABLE.getIndex())) {
  53 + enableJobs.add(job);
  54 + }
  55 + if (Objects.equals(job.getJobGroup(), JobGroupEnum.TASK_CENTER.name())) {
  56 + // 查询任务中心的最新执行时间放在缓存里面
  57 + queryJobLastExecuteTime(job.getSourceId(), job.getId(), job.getJobGroup());
  58 + }
  59 + }
  60 + for (SysJobEntity job : enableJobs) {
  61 + ScheduleUtils.createScheduleJob(scheduler, job);
  62 + }
46 } catch (Exception e) { 63 } catch (Exception e) {
47 // TODO: 兼容ThingsboardInstallApplication。执行数据库脚本前,会跑表不存在异常。 64 // TODO: 兼容ThingsboardInstallApplication。执行数据库脚本前,会跑表不存在异常。
48 } 65 }
49 -  
50 - for (SysJobEntity job : jobList) {  
51 - ScheduleUtils.createScheduleJob(scheduler, job);  
52 - }  
53 } 66 }
54 67
55 @Override 68 @Override
@@ -297,4 +310,22 @@ public class SysJobServiceImpl extends AbstractBaseService<SysJobMapper, SysJobE @@ -297,4 +310,22 @@ public class SysJobServiceImpl extends AbstractBaseService<SysJobMapper, SysJobE
297 .map(obj -> obj.getDTO(SysJobDTO.class)) 310 .map(obj -> obj.getDTO(SysJobDTO.class))
298 .orElse(null); 311 .orElse(null);
299 } 312 }
  313 +
  314 + private void queryJobLastExecuteTime(String sourceId, String jobId, String jobGroup) {
  315 + ListenableFuture<Long> future = sysJobLogService.findLastLogEndTimeByJobId(jobId, jobGroup);
  316 + Futures.addCallback(
  317 + future,
  318 + new FutureCallback<>() {
  319 + @Override
  320 + public void onSuccess(@Nullable Long executeTime) {
  321 + String cacheName = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS;
  322 + String key = FastIotConstants.CacheConfigKey.TASK_CENTER_EXECUTE_TIME + "_" + sourceId;
  323 + cacheUtils.put(cacheName, key, executeTime);
  324 + }
  325 +
  326 + @Override
  327 + public void onFailure(@NotNull Throwable throwable) {}
  328 + },
  329 + MoreExecutors.directExecutor());
  330 + }
300 } 331 }
@@ -613,9 +613,7 @@ public class TkDeviceServiceImpl extends AbstractBaseService<DeviceMapper, TkDev @@ -613,9 +613,7 @@ public class TkDeviceServiceImpl extends AbstractBaseService<DeviceMapper, TkDev
613 return Optional.ofNullable(device) 613 return Optional.ofNullable(device)
614 .map(obj -> obj.getDTO(DeviceDTO.class)) 614 .map(obj -> obj.getDTO(DeviceDTO.class))
615 .orElseThrow( 615 .orElseThrow(
616 - () -> {  
617 - throw new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage());  
618 - }); 616 + () -> new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage()));
619 } 617 }
620 618
621 @Override 619 @Override
@@ -710,4 +708,19 @@ public class TkDeviceServiceImpl extends AbstractBaseService<DeviceMapper, TkDev @@ -710,4 +708,19 @@ public class TkDeviceServiceImpl extends AbstractBaseService<DeviceMapper, TkDev
710 .collect(Collectors.toList()))) 708 .collect(Collectors.toList())))
711 .orElse(Futures.immediateFuture(devices)); 709 .orElse(Futures.immediateFuture(devices));
712 } 710 }
  711 +
  712 + @Override
  713 + public List<String> findTbDeviceIdsByDeviceProfileId(String deviceProfileId, String tenantId) {
  714 + if (StringUtils.isEmpty(deviceProfileId) || StringUtils.isEmpty(tenantId)) {
  715 + throw new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage());
  716 + }
  717 + return Optional.ofNullable(
  718 + baseMapper.selectList(
  719 + new LambdaQueryWrapper<TkDeviceEntity>()
  720 + .eq(TkDeviceEntity::getTenantId, tenantId)
  721 + .eq(TkDeviceEntity::getDeviceProfileId, deviceProfileId)
  722 + .select(TkDeviceEntity::getTbDeviceId)))
  723 + .map(list -> list.stream().map(TkDeviceEntity::getTbDeviceId).collect(Collectors.toList()))
  724 + .orElse(null);
  725 + }
713 } 726 }
  1 +package org.thingsboard.server.dao.yunteng.impl;
  2 +
  3 +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  4 +import lombok.RequiredArgsConstructor;
  5 +import org.springframework.stereotype.Service;
  6 +import org.springframework.transaction.annotation.Transactional;
  7 +import org.thingsboard.server.common.data.StringUtils;
  8 +import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;
  9 +import org.thingsboard.server.common.data.yunteng.dto.DeleteDTO;
  10 +import org.thingsboard.server.common.data.yunteng.dto.task.TkDeviceTaskCenterDTO;
  11 +import org.thingsboard.server.dao.yunteng.entities.TkDeviceTaskCenterEntity;
  12 +import org.thingsboard.server.dao.yunteng.mapper.TkDeviceTaskCenterMapper;
  13 +import org.thingsboard.server.dao.yunteng.service.AbstractBaseService;
  14 +import org.thingsboard.server.dao.yunteng.service.TkDeviceTaskCenterService;
  15 +import java.util.List;
  16 +
  17 +@Service
  18 +@RequiredArgsConstructor
  19 +public class TkDeviceTaskCenterServiceImpl
  20 + extends AbstractBaseService<TkDeviceTaskCenterMapper, TkDeviceTaskCenterEntity>
  21 + implements TkDeviceTaskCenterService {
  22 + @Override
  23 + @Transactional
  24 + public TkDeviceTaskCenterDTO saveOrUpdate(TkDeviceTaskCenterDTO tkDeviceTaskCenterDTO) {
  25 + TkDeviceTaskCenterEntity entity =
  26 + tkDeviceTaskCenterDTO.getEntity(TkDeviceTaskCenterEntity.class);
  27 + if (StringUtils.isEmpty(tkDeviceTaskCenterDTO.getId())) {
  28 + baseMapper.insert(entity);
  29 + } else {
  30 + baseMapper.updateById(entity);
  31 + }
  32 + return tkDeviceTaskCenterDTO;
  33 + }
  34 +
  35 + @Override
  36 + @Transactional
  37 + public boolean delete(DeleteDTO delete) {
  38 + return baseMapper.deleteBatchIds(delete.getIds()) > FastIotConstants.MagicNumber.ZERO;
  39 + }
  40 +
  41 + @Override
  42 + public boolean updateStateByTbDeviceIdAndTaskCenterId(
  43 + String tenantId, String taskCenterId, String tbDeviceId, Integer state) {
  44 + TkDeviceTaskCenterEntity entity =
  45 + baseMapper.selectOne(
  46 + new LambdaQueryWrapper<TkDeviceTaskCenterEntity>()
  47 + .eq(TkDeviceTaskCenterEntity::getTbDeviceId, tbDeviceId)
  48 + .eq(TkDeviceTaskCenterEntity::getTaskCenterId, taskCenterId)
  49 + .eq(TkDeviceTaskCenterEntity::getTenantId, tenantId));
  50 + if (null != entity) {
  51 + entity.setState(state);
  52 + baseMapper.updateById(entity);
  53 + }
  54 + return false;
  55 + }
  56 +
  57 + @Override
  58 + public boolean deleteByTaskCenterId(String tenantId, String taskCenterId) {
  59 + return baseMapper.delete(
  60 + new LambdaQueryWrapper<TkDeviceTaskCenterEntity>()
  61 + .eq(TkDeviceTaskCenterEntity::getTenantId, tenantId)
  62 + .eq(TkDeviceTaskCenterEntity::getTaskCenterId, taskCenterId))
  63 + > FastIotConstants.MagicNumber.ZERO;
  64 + }
  65 +
  66 + @Override
  67 + @Transactional
  68 + public boolean insertBatch(List<TkDeviceTaskCenterEntity> list) {
  69 + return insertBatch(list, list.size());
  70 + }
  71 +}
@@ -17,68 +17,49 @@ import org.thingsboard.server.common.data.yunteng.core.message.ErrorMessage; @@ -17,68 +17,49 @@ import org.thingsboard.server.common.data.yunteng.core.message.ErrorMessage;
17 import org.thingsboard.server.common.data.yunteng.dto.DeleteDTO; 17 import org.thingsboard.server.common.data.yunteng.dto.DeleteDTO;
18 import org.thingsboard.server.common.data.yunteng.dto.DeviceDTO; 18 import org.thingsboard.server.common.data.yunteng.dto.DeviceDTO;
19 import org.thingsboard.server.common.data.yunteng.dto.SysJobDTO; 19 import org.thingsboard.server.common.data.yunteng.dto.SysJobDTO;
20 -import org.thingsboard.server.common.data.yunteng.dto.task.TargetContentDTO;  
21 -import org.thingsboard.server.common.data.yunteng.dto.task.TaskExecuteTimeDTO;  
22 -import org.thingsboard.server.common.data.yunteng.dto.task.TaskTypeDTO;  
23 -import org.thingsboard.server.common.data.yunteng.dto.task.TkTaskCenterDTO; 20 +import org.thingsboard.server.common.data.yunteng.dto.task.*;
24 import org.thingsboard.server.common.data.yunteng.enums.JobGroupEnum; 21 import org.thingsboard.server.common.data.yunteng.enums.JobGroupEnum;
  22 +import org.thingsboard.server.common.data.yunteng.enums.StatusEnum;
25 import org.thingsboard.server.common.data.yunteng.enums.TargetTypeEnum; 23 import org.thingsboard.server.common.data.yunteng.enums.TargetTypeEnum;
26 import org.thingsboard.server.common.data.yunteng.utils.JacksonUtil; 24 import org.thingsboard.server.common.data.yunteng.utils.JacksonUtil;
27 import org.thingsboard.server.common.data.yunteng.utils.tools.TkPageData; 25 import org.thingsboard.server.common.data.yunteng.utils.tools.TkPageData;
  26 +import org.thingsboard.server.dao.yunteng.entities.TkDeviceTaskCenterEntity;
28 import org.thingsboard.server.dao.yunteng.entities.TkTaskCenterEntity; 27 import org.thingsboard.server.dao.yunteng.entities.TkTaskCenterEntity;
29 -import org.thingsboard.server.dao.yunteng.mapper.TaskCenterMapper;  
30 -import org.thingsboard.server.dao.yunteng.service.AbstractBaseService;  
31 -import org.thingsboard.server.dao.yunteng.service.TkDeviceService;  
32 -import org.thingsboard.server.dao.yunteng.service.TkSysJobService;  
33 -import org.thingsboard.server.dao.yunteng.service.TkTaskCenterService; 28 +import org.thingsboard.server.dao.yunteng.mapper.TkTaskCenterMapper;
  29 +import org.thingsboard.server.dao.yunteng.service.*;
34 30
35 import javax.transaction.Transactional; 31 import javax.transaction.Transactional;
36 import java.util.*; 32 import java.util.*;
37 -import java.util.stream.Collectors;  
38 33
39 @Service 34 @Service
40 @RequiredArgsConstructor 35 @RequiredArgsConstructor
41 @Slf4j 36 @Slf4j
42 public class TkTaskCenterServiceImpl 37 public class TkTaskCenterServiceImpl
43 - extends AbstractBaseService<TaskCenterMapper, TkTaskCenterEntity> 38 + extends AbstractBaseService<TkTaskCenterMapper, TkTaskCenterEntity>
44 implements TkTaskCenterService { 39 implements TkTaskCenterService {
45 private final TkSysJobService tkSysJobService; 40 private final TkSysJobService tkSysJobService;
46 private final TkDeviceService tkDeviceService; 41 private final TkDeviceService tkDeviceService;
47 private final CacheUtils cacheUtils; 42 private final CacheUtils cacheUtils;
  43 + private final TkDeviceTaskCenterService tkDeviceTaskCenterService;
48 44
49 @Override 45 @Override
50 public TkPageData<TkTaskCenterDTO> taskCenterPage(Map<String, Object> queryMap, String tenantId) { 46 public TkPageData<TkTaskCenterDTO> taskCenterPage(Map<String, Object> queryMap, String tenantId) {
51 - Integer state = (Integer) queryMap.get("state");  
52 - TargetTypeEnum targetTypeEnum = (TargetTypeEnum) queryMap.get("targetType"); 47 + queryMap.put("tenantId", tenantId);
53 IPage<TkTaskCenterEntity> page = 48 IPage<TkTaskCenterEntity> page =
54 getPage(queryMap, FastIotConstants.DefaultOrder.CREATE_TIME, false); 49 getPage(queryMap, FastIotConstants.DefaultOrder.CREATE_TIME, false);
55 - IPage<TkTaskCenterEntity> taskCenterIPage =  
56 - baseMapper.selectPage(  
57 - page,  
58 - new LambdaQueryWrapper<TkTaskCenterEntity>()  
59 - .eq(TkTaskCenterEntity::getTenantId, tenantId)  
60 - .eq(null != state, TkTaskCenterEntity::getState, state)  
61 - .eq(null != targetTypeEnum, TkTaskCenterEntity::getTargetType, targetTypeEnum));  
62 - if (!taskCenterIPage.getRecords().isEmpty()) {  
63 - List<TkTaskCenterDTO> listRecords =  
64 - taskCenterIPage.getRecords().stream()  
65 - .map(  
66 - entity -> {  
67 - TkTaskCenterDTO dto = entity.getDTO(TkTaskCenterDTO.class);  
68 - dto.setExecuteContent(  
69 - JacksonUtil.convertValue(entity.getExecuteContent(), TaskTypeDTO.class));  
70 - dto.setExecuteTime(  
71 - JacksonUtil.convertValue(  
72 - entity.getExecuteTime(), TaskExecuteTimeDTO.class));  
73 - dto.setExecuteTarget(  
74 - JacksonUtil.convertValue(  
75 - entity.getExecuteTarget(), TargetContentDTO.class));  
76 - return dto;  
77 - })  
78 - .collect(Collectors.toList());  
79 - return new TkPageData<>(listRecords, taskCenterIPage.getTotal()); 50 + IPage<TkTaskCenterDTO> iPage = baseMapper.getPageData(page, queryMap);
  51 + if (!iPage.getRecords().isEmpty()) {
  52 + iPage.getRecords().stream()
  53 + .peek(
  54 + obj -> {
  55 + String cacheName = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS;
  56 + String key =
  57 + FastIotConstants.CacheConfigKey.TASK_CENTER_EXECUTE_TIME + "_" + obj.getId();
  58 + Optional<Long> lastExecuteTime = cacheUtils.get(cacheName, key);
  59 + lastExecuteTime.ifPresent(obj::setLastExecuteTime);
  60 + });
80 } 61 }
81 - return getPageData(taskCenterIPage, TkTaskCenterDTO.class); 62 + return new TkPageData<>(iPage.getRecords(), iPage.getTotal());
82 } 63 }
83 64
84 @Override 65 @Override
@@ -90,7 +71,9 @@ public class TkTaskCenterServiceImpl @@ -90,7 +71,9 @@ public class TkTaskCenterServiceImpl
90 throw new TkDataValidationException(ErrorMessage.EXECUTE_COMMAND_IS_NULL.getMessage()); 71 throw new TkDataValidationException(ErrorMessage.EXECUTE_COMMAND_IS_NULL.getMessage());
91 } 72 }
92 TaskExecuteTimeDTO executeTime = tkTaskCenterDTO.getExecuteTime(); 73 TaskExecuteTimeDTO executeTime = tkTaskCenterDTO.getExecuteTime();
93 - if (null == executeTime || StringUtils.isEmpty(executeTime.getCron())) { 74 + if (null == executeTime
  75 + || StringUtils.isEmpty(executeTime.getCron())
  76 + || !tkSysJobService.checkCronExpressionIsValid(executeTime.getCron())) {
94 throw new TkDataValidationException(ErrorMessage.CRON_INVALID.getMessage()); 77 throw new TkDataValidationException(ErrorMessage.CRON_INVALID.getMessage());
95 } 78 }
96 entity.setExecuteContent( 79 entity.setExecuteContent(
@@ -113,16 +96,18 @@ public class TkTaskCenterServiceImpl @@ -113,16 +96,18 @@ public class TkTaskCenterServiceImpl
113 baseMapper.updateById(entity); 96 baseMapper.updateById(entity);
114 updateTaskCenterCache(entity); 97 updateTaskCenterCache(entity);
115 } 98 }
  99 + saveOrUpdateDeviceTaskCenter(tkTaskCenterDTO);
116 return tkTaskCenterDTO; 100 return tkTaskCenterDTO;
117 } 101 }
118 102
119 @Override 103 @Override
120 @Transactional 104 @Transactional
121 public boolean deleteTaskCenter(DeleteDTO deleteDTO) throws SchedulerException { 105 public boolean deleteTaskCenter(DeleteDTO deleteDTO) throws SchedulerException {
  106 + String tenantId = deleteDTO.getTenantId();
122 List<TkTaskCenterEntity> list = 107 List<TkTaskCenterEntity> list =
123 baseMapper.selectList( 108 baseMapper.selectList(
124 new LambdaQueryWrapper<TkTaskCenterEntity>() 109 new LambdaQueryWrapper<TkTaskCenterEntity>()
125 - .eq(TkTaskCenterEntity::getTenantId, deleteDTO.getTenantId()) 110 + .eq(TkTaskCenterEntity::getTenantId, tenantId)
126 .in(TkTaskCenterEntity::getId, deleteDTO.getIds())); 111 .in(TkTaskCenterEntity::getId, deleteDTO.getIds()));
127 if (null == list || list.isEmpty()) { 112 if (null == list || list.isEmpty()) {
128 throw new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage()); 113 throw new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage());
@@ -139,13 +124,15 @@ public class TkTaskCenterServiceImpl @@ -139,13 +124,15 @@ public class TkTaskCenterServiceImpl
139 } 124 }
140 int result = baseMapper.deleteBatchIds(sourceIds); 125 int result = baseMapper.deleteBatchIds(sourceIds);
141 if (result > FastIotConstants.MagicNumber.ZERO) { 126 if (result > FastIotConstants.MagicNumber.ZERO) {
142 - return tkSysJobService.deleteJobs(  
143 - sourceIds, JobGroupEnum.TASK_CENTER.name(), deleteDTO.getTenantId()); 127 + for (String taskCenterId : sourceIds) {
  128 + tkDeviceTaskCenterService.deleteByTaskCenterId(tenantId, taskCenterId);
  129 + }
  130 + return tkSysJobService.deleteJobs(sourceIds, JobGroupEnum.TASK_CENTER.name(), tenantId);
144 } 131 }
145 // 移除任务中心缓存 132 // 移除任务中心缓存
146 for (String taskCenterId : sourceIds) { 133 for (String taskCenterId : sourceIds) {
147 - String key = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS + taskCenterId;  
148 - cacheUtils.invalidate(key); 134 + String cacheName = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS;
  135 + cacheUtils.invalidate(cacheName, taskCenterId);
149 } 136 }
150 return false; 137 return false;
151 } 138 }
@@ -180,8 +167,8 @@ public class TkTaskCenterServiceImpl @@ -180,8 +167,8 @@ public class TkTaskCenterServiceImpl
180 if (StringUtils.isEmpty(id)) { 167 if (StringUtils.isEmpty(id)) {
181 throw new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage()); 168 throw new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage());
182 } 169 }
183 - String key = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS + id;  
184 - Optional<TkTaskCenterEntity> entityCache = cacheUtils.get(key); 170 + String cacheName = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS;
  171 + Optional<TkTaskCenterEntity> entityCache = cacheUtils.get(cacheName, id);
185 TkTaskCenterEntity entity = 172 TkTaskCenterEntity entity =
186 entityCache.orElseGet( 173 entityCache.orElseGet(
187 () -> 174 () ->
@@ -200,7 +187,7 @@ public class TkTaskCenterServiceImpl @@ -200,7 +187,7 @@ public class TkTaskCenterServiceImpl
200 JacksonUtil.convertValue(object.getExecuteTime(), TaskExecuteTimeDTO.class)); 187 JacksonUtil.convertValue(object.getExecuteTime(), TaskExecuteTimeDTO.class));
201 return Futures.immediateFuture(tkTaskCenterDTO); 188 return Futures.immediateFuture(tkTaskCenterDTO);
202 }) 189 })
203 - .orElse(null); 190 + .orElse(Futures.immediateFuture(new TkTaskCenterDTO()));
204 } 191 }
205 192
206 @Override 193 @Override
@@ -243,7 +230,8 @@ public class TkTaskCenterServiceImpl @@ -243,7 +230,8 @@ public class TkTaskCenterServiceImpl
243 230
244 @Override 231 @Override
245 @Transactional 232 @Transactional
246 - public boolean cancelExecute(String id, String tbDeviceId, String tenantId) { 233 + public boolean cancelOrAllowExecute(
  234 + String id, String tbDeviceId, String tenantId, boolean isAllow) {
247 boolean result = false; 235 boolean result = false;
248 TkTaskCenterEntity entity = baseMapper.selectById(id); 236 TkTaskCenterEntity entity = baseMapper.selectById(id);
249 if (entity != null) { 237 if (entity != null) {
@@ -253,31 +241,21 @@ public class TkTaskCenterServiceImpl @@ -253,31 +241,21 @@ public class TkTaskCenterServiceImpl
253 TargetContentDTO targetContent = 241 TargetContentDTO targetContent =
254 JacksonUtil.convertValue(entity.getExecuteTarget(), TargetContentDTO.class); 242 JacksonUtil.convertValue(entity.getExecuteTarget(), TargetContentDTO.class);
255 if (null != targetContent) { 243 if (null != targetContent) {
256 - // 按设备执行  
257 - if (Objects.equals(entity.getTargetType(), TargetTypeEnum.DEVICES)) {  
258 - List<String> data = targetContent.getData();  
259 - List<String> removeResult = new ArrayList<>();  
260 - if (null != data && !data.isEmpty()) {  
261 - for (String deviceId : data) {  
262 - if (!Objects.equals(deviceId, tbDeviceId)) {  
263 - removeResult.add(deviceId);  
264 - }  
265 - }  
266 - targetContent.setData(removeResult);  
267 - } 244 + // 查询设备信息
  245 + DeviceDTO dto = tkDeviceService.findDeviceInfoByTbDeviceId(tenantId, tbDeviceId);
  246 + Map<String, List<String>> map = targetContent.getCancelExecuteDevices();
  247 + if (isAllow) {
  248 + map.get(dto.getDeviceProfileId()).remove(tbDeviceId);
268 } else { 249 } else {
269 - // 按产品执行  
270 - Map<String, List<String>> map = targetContent.getCancelExecuteDevices();  
271 - DeviceDTO deviceDTO = tkDeviceService.findDeviceInfoByTbDeviceId(tenantId, tbDeviceId);  
272 - for (String key : map.keySet()) {  
273 - if (Objects.equals(key, deviceDTO.getDeviceProfileId())) {  
274 - map.get(key).add(tbDeviceId);  
275 - break;  
276 - }  
277 - } 250 + map.get(dto.getDeviceProfileId()).add(tbDeviceId);
278 } 251 }
279 entity.setExecuteTarget(JacksonUtil.convertValue(targetContent, JsonNode.class)); 252 entity.setExecuteTarget(JacksonUtil.convertValue(targetContent, JsonNode.class));
280 baseMapper.updateById(entity); 253 baseMapper.updateById(entity);
  254 + tkDeviceTaskCenterService.updateStateByTbDeviceIdAndTaskCenterId(
  255 + tenantId,
  256 + id,
  257 + tbDeviceId,
  258 + isAllow ? StatusEnum.ENABLE.getIndex() : StatusEnum.DISABLE.getIndex());
281 updateTaskCenterCache(entity); 259 updateTaskCenterCache(entity);
282 } 260 }
283 } 261 }
@@ -285,10 +263,50 @@ public class TkTaskCenterServiceImpl @@ -285,10 +263,50 @@ public class TkTaskCenterServiceImpl
285 } 263 }
286 264
287 private void updateTaskCenterCache(TkTaskCenterEntity entity) { 265 private void updateTaskCenterCache(TkTaskCenterEntity entity) {
288 - String key = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS + entity.getId();  
289 - Optional<TkTaskCenterEntity> entityCache = cacheUtils.get(key); 266 + String cacheName = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS;
  267 + Optional<TkTaskCenterEntity> entityCache = cacheUtils.get(cacheName, entity.getId());
290 if (entityCache.isPresent()) { 268 if (entityCache.isPresent()) {
291 - cacheUtils.put(key, entity); 269 + cacheUtils.put(cacheName, entity.getId(), entity);
  270 + }
  271 + }
  272 +
  273 + private void saveOrUpdateDeviceTaskCenter(TkTaskCenterDTO tkTaskCenter) {
  274 + String taskCenterId = tkTaskCenter.getId();
  275 + String tenantId = tkTaskCenter.getTenantId();
  276 + // 判断是按产品执行还是按设备执行
  277 + List<String> data = tkTaskCenter.getExecuteTarget().getData();
  278 + boolean isUpdate = !StringUtils.isEmpty(taskCenterId);
  279 + if (Objects.equals(tkTaskCenter.getTargetType(), TargetTypeEnum.DEVICES)) {
  280 + processDeviceTaskCenterMapping(isUpdate, tenantId, taskCenterId, data);
  281 + } else {
  282 + for (String key : data) {
  283 + List<String> tbDeviceIds = tkDeviceService.findTbDeviceIdsByDeviceProfileId(key, tenantId);
  284 + processDeviceTaskCenterMapping(isUpdate, tenantId, taskCenterId, tbDeviceIds);
  285 + }
  286 + }
  287 + }
  288 +
  289 + private void processDeviceTaskCenterMapping(
  290 + boolean isUpdate, String tenantId, String taskCenterId, List<String> deviceIds) {
  291 + if (null != deviceIds && !deviceIds.isEmpty()) {
  292 + if (isUpdate) {
  293 + tkDeviceTaskCenterService.deleteByTaskCenterId(tenantId, taskCenterId);
  294 + }
  295 + insertBatchByTbDeviceIdsAndTaskCenterId(tenantId, taskCenterId, deviceIds);
  296 + }
  297 + }
  298 +
  299 + private void insertBatchByTbDeviceIdsAndTaskCenterId(
  300 + String tenantId, String taskCenterId, List<String> deviceIds) {
  301 + List<TkDeviceTaskCenterEntity> saveList = new ArrayList<>();
  302 + for (String key : deviceIds) {
  303 + TkDeviceTaskCenterEntity entity = new TkDeviceTaskCenterEntity();
  304 + entity.setTbDeviceId(key);
  305 + entity.setTaskCenterId(taskCenterId);
  306 + entity.setTenantId(tenantId);
  307 + entity.setState(StatusEnum.ENABLE.getIndex());
  308 + saveList.add(entity);
292 } 309 }
  310 + tkDeviceTaskCenterService.insertBatch(saveList);
293 } 311 }
294 } 312 }
dao/src/main/java/org/thingsboard/server/dao/yunteng/mapper/TkDeviceTaskCenterMapper.java renamed from dao/src/main/java/org/thingsboard/server/dao/yunteng/mapper/TaskCenterMapper.java
@@ -2,8 +2,7 @@ package org.thingsboard.server.dao.yunteng.mapper; @@ -2,8 +2,7 @@ package org.thingsboard.server.dao.yunteng.mapper;
2 2
3 import com.baomidou.mybatisplus.core.mapper.BaseMapper; 3 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
4 import org.apache.ibatis.annotations.Mapper; 4 import org.apache.ibatis.annotations.Mapper;
5 -import org.thingsboard.server.dao.yunteng.entities.TkTaskCenterEntity; 5 +import org.thingsboard.server.dao.yunteng.entities.TkDeviceTaskCenterEntity;
6 6
7 @Mapper 7 @Mapper
8 -public interface TaskCenterMapper extends BaseMapper<TkTaskCenterEntity> {  
9 -} 8 +public interface TkDeviceTaskCenterMapper extends BaseMapper<TkDeviceTaskCenterEntity> {}
  1 +package org.thingsboard.server.dao.yunteng.mapper;
  2 +
  3 +import com.baomidou.mybatisplus.core.mapper.BaseMapper;
  4 +import com.baomidou.mybatisplus.core.metadata.IPage;import org.apache.ibatis.annotations.Mapper;
  5 +import org.apache.ibatis.annotations.Param;
  6 +import org.thingsboard.server.common.data.yunteng.dto.task.TkTaskCenterDTO;
  7 +import org.thingsboard.server.dao.yunteng.entities.TkTaskCenterEntity;
  8 +import java.util.Map;
  9 +
  10 +@Mapper
  11 +public interface TkTaskCenterMapper extends BaseMapper<TkTaskCenterEntity> {
  12 + IPage<TkTaskCenterDTO> getPageData(
  13 + IPage<?> page,
  14 + @Param("queryMap")Map<String,Object> queryMap);
  15 +}
@@ -204,4 +204,12 @@ public interface TkDeviceService extends BaseService<TkDeviceEntity> { @@ -204,4 +204,12 @@ public interface TkDeviceService extends BaseService<TkDeviceEntity> {
204 List<String> rpcDevices(String tenantId,String organizationId,String projectId); 204 List<String> rpcDevices(String tenantId,String organizationId,String projectId);
205 205
206 ListenableFuture<List<DeviceDTO>> findDeviceListByDeviceProfileId(String deviceProfileId,String tenantId); 206 ListenableFuture<List<DeviceDTO>> findDeviceListByDeviceProfileId(String deviceProfileId,String tenantId);
  207 +
  208 + /**
  209 + * 通过设备配置ID查询所有的tbDeviceId
  210 + * @param deviceProfileId 设备配置ID
  211 + * @param tenantId 租户ID
  212 + * @return id集合
  213 + */
  214 + List<String> findTbDeviceIdsByDeviceProfileId(String deviceProfileId,String tenantId);
207 } 215 }
  1 +package org.thingsboard.server.dao.yunteng.service;
  2 +
  3 +import org.thingsboard.server.common.data.yunteng.dto.DeleteDTO;
  4 +import org.thingsboard.server.common.data.yunteng.dto.task.TkDeviceTaskCenterDTO;
  5 +import org.thingsboard.server.dao.yunteng.entities.TkDeviceTaskCenterEntity;
  6 +import java.util.List;
  7 +
  8 +public interface TkDeviceTaskCenterService {
  9 + /**
  10 + * 新增或修改设备与任务中心的映射关系
  11 + *
  12 + * @param tkDeviceTaskCenterDTO 映射信息
  13 + * @return 映射信息
  14 + */
  15 + TkDeviceTaskCenterDTO saveOrUpdate(TkDeviceTaskCenterDTO tkDeviceTaskCenterDTO);
  16 +
  17 + /**
  18 + * 删除设备与任务中心的映射关系
  19 + *
  20 + * @param delete 删除参数
  21 + * @return true成功 false失败
  22 + */
  23 + boolean delete(DeleteDTO delete);
  24 +
  25 + /**
  26 + * 根据任务中心ID和设备ID更新状态
  27 + * @param tenantId 租户ID
  28 + * @param taskCenterId 任务重ID
  29 + * @param tbDeviceId 设备ID
  30 + * @param state 状态
  31 + * @return true成功 false失败
  32 + */
  33 + boolean updateStateByTbDeviceIdAndTaskCenterId(String tenantId,String taskCenterId,String tbDeviceId,Integer state);
  34 +
  35 + /**
  36 + * 通过任务ID删除所有的映射关系
  37 + * @param tenantId 租户ID
  38 + * @param taskCenterId 任务ID
  39 + * @return true成功 false失败
  40 + */
  41 + boolean deleteByTaskCenterId(String tenantId,String taskCenterId);
  42 +
  43 + /**
  44 + * 批量插入
  45 + *
  46 + * @param list 插入实体集合
  47 + * @return true成功 false失败
  48 + */
  49 + boolean insertBatch(List<TkDeviceTaskCenterEntity> list);
  50 +}
1 package org.thingsboard.server.dao.yunteng.service; 1 package org.thingsboard.server.dao.yunteng.service;
2 2
3 -import org.thingsboard.server.common.data.yunteng.dto.DeleteDTO; 3 +import com.google.common.util.concurrent.ListenableFuture;import org.thingsboard.server.common.data.yunteng.dto.DeleteDTO;
4 import org.thingsboard.server.common.data.yunteng.dto.SysJobLogDTO; 4 import org.thingsboard.server.common.data.yunteng.dto.SysJobLogDTO;
5 import org.thingsboard.server.common.data.yunteng.utils.tools.TkPageData; 5 import org.thingsboard.server.common.data.yunteng.utils.tools.TkPageData;
6 6
@@ -46,4 +46,12 @@ public interface TkSysJobLogService @@ -46,4 +46,12 @@ public interface TkSysJobLogService
46 * 清空任务日志 46 * 清空任务日志
47 */ 47 */
48 boolean cleanJobLog(); 48 boolean cleanJobLog();
  49 +
  50 + /**
  51 + * 根据任务ID获取任务日志执行的最新时间戳
  52 + * @param jobId 任务ID
  53 + * @param jobGroup 任务分组
  54 + * @return 最新时间戳
  55 + */
  56 + ListenableFuture<Long> findLastLogEndTimeByJobId(String jobId, String jobGroup);
49 } 57 }
1 package org.thingsboard.server.dao.yunteng.service; 1 package org.thingsboard.server.dao.yunteng.service;
2 2
3 -import com.google.common.util.concurrent.ListenableFuture;import org.quartz.SchedulerException; 3 +import com.google.common.util.concurrent.ListenableFuture;
  4 +import org.quartz.SchedulerException;
4 import org.thingsboard.server.common.data.yunteng.dto.DeleteDTO; 5 import org.thingsboard.server.common.data.yunteng.dto.DeleteDTO;
5 import org.thingsboard.server.common.data.yunteng.dto.task.TkTaskCenterDTO; 6 import org.thingsboard.server.common.data.yunteng.dto.task.TkTaskCenterDTO;
6 import org.thingsboard.server.common.data.yunteng.utils.tools.TkPageData; 7 import org.thingsboard.server.common.data.yunteng.utils.tools.TkPageData;
@@ -15,7 +16,6 @@ public interface TkTaskCenterService { @@ -15,7 +16,6 @@ public interface TkTaskCenterService {
15 * @return 分页数据 16 * @return 分页数据
16 */ 17 */
17 TkPageData<TkTaskCenterDTO> taskCenterPage(Map<String, Object> queryMap, String tenantId); 18 TkPageData<TkTaskCenterDTO> taskCenterPage(Map<String, Object> queryMap, String tenantId);
18 -  
19 /** 19 /**
20 * 修改或保存任务中心 20 * 修改或保存任务中心
21 * 21 *
@@ -28,18 +28,20 @@ public interface TkTaskCenterService { @@ -28,18 +28,20 @@ public interface TkTaskCenterService {
28 * 28 *
29 * @param deleteDTO 删除IDS 29 * @param deleteDTO 删除IDS
30 */ 30 */
31 - boolean deleteTaskCenter(DeleteDTO deleteDTO)throws SchedulerException; 31 + boolean deleteTaskCenter(DeleteDTO deleteDTO) throws SchedulerException;
32 32
33 /** 33 /**
34 * 根据任务中心ID,查询任务信息 34 * 根据任务中心ID,查询任务信息
  35 + *
35 * @param id 任务中心ID 36 * @param id 任务中心ID
36 * @param tenantId 租户ID 37 * @param tenantId 租户ID
37 * @return 任务信息 38 * @return 任务信息
38 */ 39 */
39 - TkTaskCenterDTO findTaskCenterInfoById(String id,String tenantId); 40 + TkTaskCenterDTO findTaskCenterInfoById(String id, String tenantId);
40 41
41 /** 42 /**
42 * 根据任务中心ID,异步查询任务信息 43 * 根据任务中心ID,异步查询任务信息
  44 + *
43 * @param id 任务中心ID 45 * @param id 任务中心ID
44 * @return 任务信息 46 * @return 任务信息
45 */ 47 */
@@ -47,19 +49,22 @@ public interface TkTaskCenterService { @@ -47,19 +49,22 @@ public interface TkTaskCenterService {
47 49
48 /** 50 /**
49 * 启用禁用任务中心 51 * 启用禁用任务中心
  52 + *
50 * @param id 任务中心ID 53 * @param id 任务中心ID
51 * @param state 1启用 0禁用 54 * @param state 1启用 0禁用
52 * @param tenantId 租户ID 55 * @param tenantId 租户ID
53 * @return true成功 false失败 56 * @return true成功 false失败
54 */ 57 */
55 - boolean updateState(String id,Integer state,String tenantId)throws SchedulerException; 58 + boolean updateState(String id, Integer state, String tenantId) throws SchedulerException;
56 59
57 /** 60 /**
58 * 设备取消任务执行 61 * 设备取消任务执行
  62 + *
59 * @param id 任务ID 63 * @param id 任务ID
60 * @param tbDeviceId 设备ID 64 * @param tbDeviceId 设备ID
61 * @param tenantId 租户ID 65 * @param tenantId 租户ID
  66 + * @param isAllow true允许执行 false取消执行
62 * @return true成功 false失败 67 * @return true成功 false失败
63 */ 68 */
64 - boolean cancelExecute(String id,String tbDeviceId,String tenantId); 69 + boolean cancelOrAllowExecute(String id, String tbDeviceId, String tenantId, boolean isAllow);
65 } 70 }
  1 +<?xml version="1.0" encoding="UTF-8"?>
  2 +<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
  3 +
  4 +<mapper namespace="org.thingsboard.server.dao.yunteng.mapper.TkTaskCenterMapper">
  5 + <resultMap type="org.thingsboard.server.common.data.yunteng.dto.task.TkTaskCenterDTO" id="dataMap">
  6 + <result property="id" column="id"/>
  7 + <result property="name" column="name"/>
  8 + <result property="targetType" column="target_type" typeHandler="org.apache.ibatis.type.EnumTypeHandler" />
  9 + <result property="executeTarget" column="execute_target"
  10 + typeHandler="com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler"/>
  11 + <result property="executeContent" column="execute_content"
  12 + typeHandler="com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler"/>
  13 + <result property="executeTime" column="execute_time"
  14 + typeHandler="com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler"/>
  15 + <result property="createTime" column="create_time"/>
  16 + <result property="creator" column="creator"/>
  17 + <result property="updater" column="updater"/>
  18 + <result property="updateTime" column="update_time"/>
  19 + <result property="tenantId" column="tenant_id" />
  20 + <association property="tkDeviceTaskCenter" javaType="org.thingsboard.server.common.data.yunteng.dto.task.TkDeviceTaskCenterDTO">
  21 + <result property="state" column="state"/>
  22 + <result property="taskCenterId" column="task_center_id"/>
  23 + <result property="tbDeviceId" column="tb_device_id"/>
  24 + </association>
  25 + </resultMap>
  26 +
  27 +
  28 +
  29 + <sql id="basicColumns">
  30 + tc.id,tc.name,tc.target_type,tc.execute_target,tc.execute_content,tc.execute_time,tc.create_time,tc.creator,
  31 + tc.updater,tc.update_time,tc.tenant_id
  32 + </sql>
  33 +
  34 + <sql id="detailColumns">
  35 + <include refid="basicColumns"/>
  36 + ,tdc.state,tdc.task_center_id,tb_device_id
  37 + </sql>
  38 + <select id="getPageData" resultMap="dataMap">
  39 + SELECT
  40 + <include refid="detailColumns"/>
  41 + FROM tk_task_center tc
  42 + LEFT JOIN tk_device_task_center tdc ON tc.id = tdc.task_center_id
  43 + <where>
  44 + <if test="queryMap.tenantId !=null and queryMap.tenantId !=''">
  45 + AND tc.tenant_id = #{queryMap.tenantId}
  46 + </if>
  47 + <if test="queryMap.state !=null">
  48 + AND tc.state = #{queryMap.state}
  49 + </if>
  50 + <if test="queryMap.targetType !=null">
  51 + AND tc.target_type = #{queryMap.targetType}
  52 + </if>
  53 + <if test="queryMap.tbDeviceId !=null and queryMap.tbDeviceId !=''">
  54 + AND tdc.tb_device_id = #{queryMap.tbDeviceId}
  55 + </if>
  56 + </where>
  57 + </select>
  58 +</mapper>