Commit 1355f8c55dd482931e3f587263fe6af7b3a4f566

Authored by xp.Huang
1 parent e61d9204

feat: 新增任务中心,支持定时命令下发

Showing 19 changed files with 986 additions and 132 deletions
  1 +package org.thingsboard.server.controller.yunteng;
  2 +
  3 +import io.swagger.annotations.Api;
  4 +import io.swagger.annotations.ApiOperation;
  5 +import lombok.RequiredArgsConstructor;
  6 +import org.apache.commons.lang3.StringUtils;
  7 +import org.quartz.SchedulerException;
  8 +import org.springframework.security.access.prepost.PreAuthorize;
  9 +import org.springframework.validation.annotation.Validated;
  10 +import org.springframework.web.bind.annotation.*;
  11 +import org.thingsboard.server.common.data.exception.ThingsboardException;
  12 +import org.thingsboard.server.common.data.yunteng.common.AddGroup;
  13 +import org.thingsboard.server.common.data.yunteng.common.UpdateGroup;
  14 +import org.thingsboard.server.common.data.yunteng.core.exception.TkDataValidationException;
  15 +import org.thingsboard.server.common.data.yunteng.core.message.ErrorMessage;
  16 +import org.thingsboard.server.common.data.yunteng.dto.DeleteDTO;
  17 +import org.thingsboard.server.common.data.yunteng.dto.task.TkTaskCenterDTO;
  18 +import org.thingsboard.server.common.data.yunteng.enums.OrderTypeEnum;
  19 +import org.thingsboard.server.common.data.yunteng.enums.TargetTypeEnum;
  20 +import org.thingsboard.server.common.data.yunteng.utils.tools.ResponseResult;
  21 +import org.thingsboard.server.common.data.yunteng.utils.tools.TkPageData;
  22 +import org.thingsboard.server.controller.BaseController;
  23 +import org.thingsboard.server.dao.yunteng.service.TkTaskCenterService;
  24 +
  25 +import java.util.HashMap;
  26 +
  27 +import static org.thingsboard.server.common.data.yunteng.constant.QueryConstant.*;
  28 +import static org.thingsboard.server.common.data.yunteng.constant.QueryConstant.ORDER_TYPE;
  29 +
  30 +@RestController
  31 +@RequestMapping("api/yt/task_center")
  32 +@Api(tags = {"任务中心"})
  33 +@RequiredArgsConstructor
  34 +public class TkTaskCenterController extends BaseController {
  35 + private final TkTaskCenterService tkTaskCenterService;
  36 +
  37 + @GetMapping(
  38 + name = "page",
  39 + params = {PAGE_SIZE, PAGE})
  40 + public TkPageData<TkTaskCenterDTO> pageDevice(
  41 + @RequestParam(PAGE_SIZE) int pageSize,
  42 + @RequestParam(PAGE) int page,
  43 + @RequestParam(value = "state", required = false) Integer state,
  44 + @RequestParam(value = "targetType", required = false) TargetTypeEnum targetType,
  45 + @RequestParam(value = ORDER_FILED, required = false) String orderBy,
  46 + @RequestParam(value = ORDER_TYPE, required = false) OrderTypeEnum orderType)
  47 + throws ThingsboardException {
  48 + HashMap<String, Object> queryMap = new HashMap<>();
  49 + queryMap.put(PAGE_SIZE, pageSize);
  50 + queryMap.put(PAGE, page);
  51 + queryMap.put(ORDER_FILED, orderBy);
  52 + queryMap.put(ORDER_TYPE, orderType);
  53 + queryMap.put("state", state);
  54 + queryMap.put("targetType", targetType);
  55 + return tkTaskCenterService.taskCenterPage(queryMap, getCurrentUser().getCurrentTenantId());
  56 + }
  57 +
  58 + @PostMapping("/add")
  59 + @ApiOperation(value = "新增任务中心")
  60 +// @PreAuthorize(
  61 +// "@check.checkPermissions({'TENANT_ADMIN','CUSTOMER_USER'},{'api:yt:task_center:add:post'})")
  62 + public ResponseResult<TkTaskCenterDTO> save(
  63 + @RequestBody @Validated(AddGroup.class) TkTaskCenterDTO taskCenter)
  64 + throws ThingsboardException, SchedulerException {
  65 + if (StringUtils.isNotEmpty(taskCenter.getId())) {
  66 + throw new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage());
  67 + }
  68 + return ResponseResult.success(saveOrUpdate(taskCenter));
  69 + }
  70 +
  71 + @PutMapping("/update")
  72 +// @PreAuthorize(
  73 +// "@check.checkPermissions({'TENANT_ADMIN','CUSTOMER_USER'},{'api:yt:task_center:update:update'})")
  74 + @ApiOperation(value = "编辑任务中心")
  75 + public ResponseResult<TkTaskCenterDTO> update(
  76 + @RequestBody @Validated(UpdateGroup.class) TkTaskCenterDTO taskCenter)
  77 + throws ThingsboardException, SchedulerException {
  78 + if (StringUtils.isEmpty(taskCenter.getId())) {
  79 + throw new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage());
  80 + }
  81 + return ResponseResult.success(saveOrUpdate(taskCenter));
  82 + }
  83 +
  84 + @PutMapping("/{id}/update/{state}")
  85 +// @PreAuthorize(
  86 +// "@check.checkPermissions({'TENANT_ADMIN','CUSTOMER_USER'},{'api:yt:task_center:update:update'})")
  87 + @ApiOperation(value = "更新状态")
  88 + public ResponseResult<Boolean> updateState(
  89 + @PathVariable("id") String id, @PathVariable("state") Integer state)
  90 + throws ThingsboardException, SchedulerException {
  91 + if (StringUtils.isEmpty(id) || null == state) {
  92 + throw new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage());
  93 + }
  94 + return ResponseResult.success(
  95 + tkTaskCenterService.updateState(id, state, getCurrentUser().getCurrentTenantId()));
  96 + }
  97 +
  98 + @DeleteMapping
  99 +// @PreAuthorize(
  100 +// "@check.checkPermissions({'TENANT_ADMIN','CUSTOMER_USER'},{'api:yt:task_center:delete'})")
  101 + @ApiOperation(value = "删除任务中心")
  102 + public ResponseResult<Boolean> deleteTaskCenter(@RequestBody DeleteDTO deleteDTO)
  103 + throws ThingsboardException, SchedulerException {
  104 + deleteDTO.setTenantId(getCurrentUser().getCurrentTenantId());
  105 + return ResponseResult.success(tkTaskCenterService.deleteTaskCenter(deleteDTO));
  106 + }
  107 +
  108 + @PutMapping("/{id}/cancel/{tbDeviceId}")
  109 +// @PreAuthorize(
  110 +// "@check.checkPermissions({'TENANT_ADMIN','CUSTOMER_USER'},{'api:yt:task_center:cancel'})")
  111 + @ApiOperation(value = "设备取消任务执行")
  112 + public ResponseResult<Boolean> cancelExecute(
  113 + @PathVariable("id") String taskId, @PathVariable("tbDeviceId") String tbDeviceId)throws ThingsboardException {
  114 + if(StringUtils.isEmpty(taskId) || StringUtils.isEmpty(tbDeviceId)){
  115 + throw new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage());
  116 + }
  117 + return ResponseResult.success(
  118 + tkTaskCenterService.cancelExecute(
  119 + taskId, tbDeviceId, getCurrentUser().getCurrentTenantId()));
  120 + }
  121 +
  122 + private TkTaskCenterDTO saveOrUpdate(TkTaskCenterDTO tkTaskCenterDTO)
  123 + throws ThingsboardException, SchedulerException {
  124 + tkTaskCenterDTO.setTenantId(getCurrentUser().getCurrentTenantId());
  125 + return tkTaskCenterService.saveOrUpdateTaskCenter(tkTaskCenterDTO);
  126 + }
  127 +}
  1 +package org.thingsboard.server.utils.yunteng;
  2 +
  3 +import com.fasterxml.jackson.databind.JsonNode;
  4 +import com.google.common.util.concurrent.FutureCallback;
  5 +import com.google.common.util.concurrent.Futures;
  6 +import com.google.common.util.concurrent.ListenableFuture;
  7 +import com.google.common.util.concurrent.MoreExecutors;
  8 +import lombok.RequiredArgsConstructor;
  9 +import lombok.extern.slf4j.Slf4j;
  10 +import org.checkerframework.checker.nullness.qual.Nullable;
  11 +import org.jetbrains.annotations.NotNull;
  12 +import org.springframework.beans.factory.annotation.Value;
  13 +import org.springframework.stereotype.Component;
  14 +import org.thingsboard.common.util.JacksonUtil;
  15 +import org.thingsboard.server.common.data.id.DeviceId;
  16 +import org.thingsboard.server.common.data.id.TenantId;
  17 +import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
  18 +import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;
  19 +import org.thingsboard.server.common.data.yunteng.dto.DeviceDTO;
  20 +import org.thingsboard.server.common.data.yunteng.dto.task.TargetContentDTO;
  21 +import org.thingsboard.server.common.data.yunteng.dto.task.TkTaskCenterDTO;
  22 +import org.thingsboard.server.common.data.yunteng.enums.TargetTypeEnum;
  23 +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
  24 +import org.thingsboard.server.dao.yunteng.service.TkDeviceService;
  25 +import org.thingsboard.server.dao.yunteng.service.TkTaskCenterService;
  26 +import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
  27 +import org.thingsboard.server.service.security.model.SecurityUser;
  28 +import java.util.*;
  29 +
  30 +@Component("rpcCommandTask")
  31 +@RequiredArgsConstructor
  32 +@Slf4j
  33 +public class RpcCommandTask {
  34 + @Value("${server.rest.server_side_rpc.default_timeout:10000}")
  35 + protected long defaultTimeout;
  36 +
  37 + @Value("${server.rest.server_side_rpc.min_timeout:5000}")
  38 + protected long minTimeout;
  39 +
  40 + private final TkTaskCenterService tkTaskCenterService;
  41 + private final TbCoreDeviceRpcService tbCoreDeviceRpcService;
  42 + private final TkDeviceService tkDeviceService;
  43 + public void process(String taskCenterId) {
  44 + // 通过任务中心ID查询执行命令及执行对象
  45 + ListenableFuture<TkTaskCenterDTO> future =
  46 + tkTaskCenterService.getTaskCenterInfoById(taskCenterId);
  47 + Futures.addCallback(
  48 + future,
  49 + new FutureCallback<>() {
  50 + @Override
  51 + public void onSuccess(@Nullable TkTaskCenterDTO tkTaskCenterDTO) {
  52 + if (null != tkTaskCenterDTO) {
  53 + TargetTypeEnum targetType = tkTaskCenterDTO.getTargetType();
  54 + TargetContentDTO targetContent = tkTaskCenterDTO.getExecuteTarget();
  55 + JsonNode cmdJsonNode = tkTaskCenterDTO.getExecuteContent().getPushContent();
  56 + SecurityUser securityUser = new SecurityUser();
  57 + // send cmd
  58 + if (targetContent.getData() != null && null != cmdJsonNode) {
  59 + String tenantId = tkTaskCenterDTO.getTenantId();
  60 + if (targetType.equals(TargetTypeEnum.DEVICES)) {
  61 + for (String deviceId : targetContent.getData()) {
  62 + sendRpcCommand(cmdJsonNode, tenantId, deviceId, securityUser);
  63 + }
  64 + } else {
  65 + sendRpcCommandByProducts(targetContent, cmdJsonNode, tenantId, securityUser);
  66 + }
  67 + }
  68 + }
  69 + }
  70 +
  71 + @Override
  72 + public void onFailure(@NotNull Throwable throwable) {}
  73 + },
  74 + MoreExecutors.directExecutor());
  75 + }
  76 +
  77 + private void sendRpcCommandByProducts(
  78 + TargetContentDTO targetContent,
  79 + JsonNode cmdJsonNode,
  80 + String tenantId,
  81 + SecurityUser securityUser) {
  82 + for (String deviceProfileId : targetContent.getData()) {
  83 + Futures.addCallback(
  84 + tkDeviceService.findDeviceListByDeviceProfileId(deviceProfileId, tenantId),
  85 + new FutureCallback<List<DeviceDTO>>() {
  86 + @Override
  87 + public void onSuccess(@Nullable List<DeviceDTO> deviceDTOS) {
  88 + if (null != deviceDTOS && !deviceDTOS.isEmpty()) {
  89 + for (DeviceDTO dto : deviceDTOS) {
  90 + Map<String, List<String>> map = targetContent.getCancelExecuteDevices();
  91 + boolean needSendCommand = true;
  92 + if (null != map && !map.isEmpty() && !map.get(deviceProfileId).isEmpty()) {
  93 + for (String deviceId : map.get(deviceProfileId)) {
  94 + if (Objects.equals(deviceId, dto.getTbDeviceId())) {
  95 + needSendCommand = false;
  96 + break;
  97 + }
  98 + }
  99 + }
  100 + if (needSendCommand) {
  101 + sendRpcCommand(cmdJsonNode, tenantId, dto.getTbDeviceId(), securityUser);
  102 + }
  103 + }
  104 + }
  105 + }
  106 +
  107 + @Override
  108 + public void onFailure(@NotNull Throwable throwable) {}
  109 + },
  110 + MoreExecutors.directExecutor());
  111 + }
  112 + }
  113 +
  114 + private void sendRpcCommand(
  115 + JsonNode cmdJsonNode, String tenantId, String originateId, SecurityUser securityUser) {
  116 + JsonNode rpcCommand = cmdJsonNode.get(FastIotConstants.RPC_COMMAND);
  117 + ToDeviceRpcRequestBody body =
  118 + new ToDeviceRpcRequestBody("methodThingskit", JacksonUtil.toString(rpcCommand));
  119 + DeviceId deviceId = new DeviceId(UUID.fromString(originateId));
  120 + ToDeviceRpcRequest request =
  121 + new ToDeviceRpcRequest(
  122 + UUID.randomUUID(),
  123 + new TenantId(UUID.fromString(tenantId)),
  124 + deviceId,
  125 + true,
  126 + System.currentTimeMillis() + Math.max(minTimeout, defaultTimeout),
  127 + body,
  128 + false,
  129 + null,
  130 + null);
  131 + tbCoreDeviceRpcService.processRestApiRpcRequest(
  132 + request,
  133 + fromDeviceRpcResponse -> {
  134 + log.trace("Device renamed RPC with id: [{}] ", request.getId());
  135 + },
  136 + securityUser);
  137 + }
  138 +}
@@ -431,18 +431,20 @@ caffeine: @@ -431,18 +431,20 @@ caffeine:
431 timeToLiveInMinutes: "${CACHE_SPECS_EDGES_TTL:1440}" 431 timeToLiveInMinutes: "${CACHE_SPECS_EDGES_TTL:1440}"
432 maxSize: "${CACHE_SPECS_EDGES_MAX_SIZE:10000}" 432 maxSize: "${CACHE_SPECS_EDGES_MAX_SIZE:10000}"
433 yunTengIotCache: 433 yunTengIotCache:
434 - timeToLiveInMinutes: 1440  
435 - maxSize: 10000 434 + timeToLiveInMinutes: "${CACHE_SPECS_YUN_TENG_IOT_CACHE_TTL:1440}"
  435 + maxSize: "${CACHE_SPECS_YUN_TENG_IOT_CACHE_MAX_SIZE:10000}"
436 userPermissionFor: 436 userPermissionFor:
437 - timeToLiveInMinutes: 1440  
438 - maxSize: 10000 437 + timeToLiveInMinutes: "${CACHE_SPECS_USER_PERMISSION_FOR_TTL:1440}"
  438 + maxSize: "${CACHE_SPECS_USER_PERMISSION_FOR_MAX_SIZE:10000}"
439 mobileLoginSmsCode: 439 mobileLoginSmsCode:
440 - timeToLiveInMinutes: 2  
441 - maxSize: 10000 440 + timeToLiveInMinutes: "${CACHE_SPECS_MOBILE_LOGIN_SMS_CODE_TTL:2}"
  441 + maxSize: "${CACHE_SPECS_MOBILE_LOGIN_SMS_CODE_MAX_SIZE:10000}"
442 thingsArea: 442 thingsArea:
443 - timeToLiveInMinutes: 1440  
444 - maxSize: 500  
445 - 443 + timeToLiveInMinutes: "${CACHE_SPECS_THINGS_AREA_TTL:1440}"
  444 + maxSize: "${CACHE_SPECS_THINGS_AREA_MAX_SIZE:500}"
  445 + taskCenterInfos:
  446 + timeToLiveInMinutes: "${CACHE_SPECS_TASK_CENTER_TTL:1440}"
  447 + maxSize: "${CACHE_SPECS_TASK_CENTER_MAX_SIZE:10000}"
446 redis: 448 redis:
447 # standalone or cluster 449 # standalone or cluster
448 connection: 450 connection:
@@ -905,8 +907,8 @@ transport: @@ -905,8 +907,8 @@ transport:
905 psm_activity_timer: "${LWM2M_PSM_ACTIVITY_TIMER:10000}" 907 psm_activity_timer: "${LWM2M_PSM_ACTIVITY_TIMER:10000}"
906 paging_transmission_window: "${LWM2M_PAGING_TRANSMISSION_WINDOW:10000}" 908 paging_transmission_window: "${LWM2M_PAGING_TRANSMISSION_WINDOW:10000}"
907 network_config: # In this section you can specify custom parameters for LwM2M network configuration and expose the env variables to configure outside 909 network_config: # In this section you can specify custom parameters for LwM2M network configuration and expose the env variables to configure outside
908 -# - key: "PROTOCOL_STAGE_THREAD_COUNT"  
909 -# value: "${LWM2M_PROTOCOL_STAGE_THREAD_COUNT:4}" 910 + # - key: "PROTOCOL_STAGE_THREAD_COUNT"
  911 + # value: "${LWM2M_PROTOCOL_STAGE_THREAD_COUNT:4}"
910 snmp: 912 snmp:
911 enabled: "${SNMP_ENABLED:true}" 913 enabled: "${SNMP_ENABLED:true}"
912 response_processing: 914 response_processing:
@@ -989,9 +991,9 @@ queue: @@ -989,9 +991,9 @@ queue:
989 tb_ota_package: 991 tb_ota_package:
990 - key: max.poll.records 992 - key: max.poll.records
991 value: "${TB_QUEUE_KAFKA_OTA_MAX_POLL_RECORDS:10}" 993 value: "${TB_QUEUE_KAFKA_OTA_MAX_POLL_RECORDS:10}"
992 -# tb_rule_engine.sq:  
993 -# - key: max.poll.records  
994 -# value: "${TB_QUEUE_KAFKA_SQ_MAX_POLL_RECORDS:1024}" 994 + # tb_rule_engine.sq:
  995 + # - key: max.poll.records
  996 + # value: "${TB_QUEUE_KAFKA_SQ_MAX_POLL_RECORDS:1024}"
995 other: # In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside 997 other: # In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
996 - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms 998 - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
997 value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) 999 value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds)
@@ -1193,14 +1195,14 @@ file: @@ -1193,14 +1195,14 @@ file:
1193 staticUrl: /static/files/** #oss静态访问路径 只有type = local需要 1195 staticUrl: /static/files/** #oss静态访问路径 只有type = local需要
1194 randomFileName: ${file.storage.randomFileName} 1196 randomFileName: ${file.storage.randomFileName}
1195 minio: 1197 minio:
1196 - minioUrl: ${MINIO_URL:https://dev.thingskit.com:9000} #minio储存地址  
1197 - minioName: ${MINIO_NAME:test} #minio账户  
1198 - minioPass: ${MINIO_PWD:test} #minio访问密码  
1199 - bucketName: yunteng #minio储存桶名称 1198 + minioUrl: ${MINIO_URL:http://localhost:9000} #minio储存地址
  1199 + minioName: ${MINIO_NAME:thingskit} #minio账户
  1200 + minioPass: ${MINIO_PWD:thingskit} #minio访问密码
  1201 + bucketName: yunteng #minio储存桶名称
1200 randomFileName: ${file.storage.randomFileName} 1202 randomFileName: ${file.storage.randomFileName}
1201 account: 1203 account:
1202 info: 1204 info:
1203 - emailSuffix: ${ACCOUNT_EMAIL_SUFFIX:yunteng.com} 1205 + emailSuffix: ${ACCOUNT_EMAIL_SUFFIX:thingskit.com}
1204 defaultPassword: 123456 1206 defaultPassword: 123456
1205 reset: ${ACCOUNT_PASSWORD_FORCE_RESET:true} 1207 reset: ${ACCOUNT_PASSWORD_FORCE_RESET:true}
1206 third: 1208 third:
@@ -118,6 +118,8 @@ public final class ModelConstants { @@ -118,6 +118,8 @@ public final class ModelConstants {
118 public static final String TK_CUSTOMER_DEVICE_NAME = "tk_customer_device"; 118 public static final String TK_CUSTOMER_DEVICE_NAME = "tk_customer_device";
119 /** 设备上下线记录表 */ 119 /** 设备上下线记录表 */
120 public static final String TK_DEVICE_STATE_LOG = "tk_device_state_log"; 120 public static final String TK_DEVICE_STATE_LOG = "tk_device_state_log";
  121 + /** 任务中心表 */
  122 + public static final String TK_TASK_CENTER_NAME = "tk_task_center";
121 } 123 }
122 124
123 public static class TableFields { 125 public static class TableFields {
  1 +package org.thingsboard.server.common.data.yunteng.dto.task;
  2 +
  3 +import io.swagger.annotations.ApiModelProperty;
  4 +import lombok.Data;
  5 +import java.util.List;
  6 +import java.util.Map;
  7 +
  8 +@Data
  9 +public class TargetContentDTO {
  10 + @ApiModelProperty(value = "执行的数据:目标类型如果是产品则为deviceProfileId,如果是设备则为tbDeviceId")
  11 + List<String> data;
  12 +
  13 + @ApiModelProperty(value = "设备类型")
  14 + String deviceType;
  15 +
  16 + @ApiModelProperty(value = "组织ID")
  17 + String organizationId;
  18 +
  19 + @ApiModelProperty(value = "设备配置ID")
  20 + String deviceProfileId;
  21 +
  22 + @ApiModelProperty(value = "取消执行的数据:只有目标类型是产品时才会有值,且List<String>里面为tbDeviceId")
  23 + Map<String, List<String>> cancelExecuteDevices;
  24 +}
  1 +package org.thingsboard.server.common.data.yunteng.dto.task;
  2 +
  3 +import io.swagger.annotations.ApiModelProperty;
  4 +import lombok.Data;
  5 +import org.thingsboard.server.common.data.yunteng.enums.ExecuteTypeEnum;
  6 +
  7 +@Data
  8 +public class TaskExecuteTimeDTO {
  9 +
  10 + @ApiModelProperty(value = "执行类型")
  11 + private ExecuteTypeEnum type = ExecuteTypeEnum.CUSTOM;
  12 +
  13 + @ApiModelProperty(value = "周期类型")
  14 + private String periodType;
  15 +
  16 + @ApiModelProperty(value = "周期")
  17 + private String period;
  18 +
  19 + @ApiModelProperty(value = "时间")
  20 + private String time;
  21 +
  22 + @ApiModelProperty(value = "cron表达式")
  23 + private String cron;
  24 +}
  1 +package org.thingsboard.server.common.data.yunteng.dto.task;
  2 +
  3 +import com.fasterxml.jackson.databind.JsonNode;
  4 +import io.swagger.annotations.ApiModelProperty;
  5 +import lombok.Data;
  6 +import org.thingsboard.server.common.data.yunteng.enums.TaskTypeEnum;
  7 +
  8 +@Data
  9 +public class TaskTypeDTO {
  10 + @ApiModelProperty(value = "类型")
  11 + private TaskTypeEnum type = TaskTypeEnum.CUSTOM;
  12 +
  13 + @ApiModelProperty(value = "推送方式:MQTT TCP")
  14 + private String pushWay;
  15 +
  16 + @ApiModelProperty(value = "推送内容:即下发的命令内容")
  17 + private JsonNode pushContent;
  18 +}
  1 +package org.thingsboard.server.common.data.yunteng.dto.task;
  2 +
  3 +import io.swagger.annotations.ApiModelProperty;
  4 +import lombok.Data;
  5 +import lombok.EqualsAndHashCode;
  6 +import org.thingsboard.server.common.data.yunteng.common.AddGroup;
  7 +import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;
  8 +import org.thingsboard.server.common.data.yunteng.dto.TenantDTO;
  9 +import org.thingsboard.server.common.data.yunteng.enums.TargetTypeEnum;
  10 +
  11 +import javax.validation.constraints.NotEmpty;
  12 +import javax.validation.constraints.NotNull;
  13 +
  14 +@Data
  15 +@EqualsAndHashCode(callSuper = true)
  16 +public class TkTaskCenterDTO extends TenantDTO {
  17 + @NotEmpty(message = "任务名称不能为空或空字符串", groups = AddGroup.class)
  18 + @ApiModelProperty(value = "任务名称")
  19 + private String name;
  20 +
  21 + @ApiModelProperty(value = "目标类型")
  22 + private TargetTypeEnum targetType = TargetTypeEnum.DEVICES;
  23 +
  24 + @NotNull(message = "执行目标不能为空", groups = AddGroup.class)
  25 + @ApiModelProperty(value = "执行目标")
  26 + private TargetContentDTO executeTarget;
  27 +
  28 + @NotNull(message = "执行内容不能为空", groups = AddGroup.class)
  29 + @ApiModelProperty(value = "执行内容")
  30 + private TaskTypeDTO executeContent;
  31 +
  32 + @ApiModelProperty(value = "执行时间")
  33 + @NotNull(message = "执行时间不能为空", groups = AddGroup.class)
  34 + private TaskExecuteTimeDTO executeTime;
  35 +
  36 + @ApiModelProperty(value = "0禁用 1启用")
  37 + private Integer state = FastIotConstants.StateValue.DISABLE;
  38 +
  39 + @ApiModelProperty(value = "备注")
  40 + private String remark;
  41 +}
  1 +package org.thingsboard.server.common.data.yunteng.enums;
  2 +
  3 +public enum ExecuteTypeEnum {
  4 + CUSTOM,
  5 + POLL
  6 +}
@@ -3,5 +3,6 @@ package org.thingsboard.server.common.data.yunteng.enums; @@ -3,5 +3,6 @@ package org.thingsboard.server.common.data.yunteng.enums;
3 public enum JobGroupEnum { 3 public enum JobGroupEnum {
4 DEFAULT, 4 DEFAULT,
5 SYSTEM, 5 SYSTEM,
6 - REPORT 6 + REPORT,
  7 + TASK_CENTER
7 } 8 }
  1 +package org.thingsboard.server.common.data.yunteng.enums;
  2 +
  3 +public enum TargetTypeEnum {
  4 + PRODUCTS,
  5 + DEVICES
  6 +}
  1 +package org.thingsboard.server.common.data.yunteng.enums;
  2 +
  3 +public enum TaskTypeEnum {
  4 + CUSTOM,
  5 + MODBUS_RTU
  6 +}
@@ -36,7 +36,6 @@ import org.thingsboard.server.common.data.id.DeviceId; @@ -36,7 +36,6 @@ import org.thingsboard.server.common.data.id.DeviceId;
36 import org.thingsboard.server.common.data.id.OtaPackageId; 36 import org.thingsboard.server.common.data.id.OtaPackageId;
37 import org.thingsboard.server.common.data.ota.OtaPackageType; 37 import org.thingsboard.server.common.data.ota.OtaPackageType;
38 import org.thingsboard.server.common.data.rpc.RpcStatus; 38 import org.thingsboard.server.common.data.rpc.RpcStatus;
39 -import org.thingsboard.server.common.data.yunteng.enums.TcpDataTypeEnum;  
40 import org.thingsboard.server.common.msg.EncryptionUtil; 39 import org.thingsboard.server.common.msg.EncryptionUtil;
41 import org.thingsboard.server.common.msg.tools.TbRateLimitsException; 40 import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
42 import org.thingsboard.server.common.transport.SessionMsgListener; 41 import org.thingsboard.server.common.transport.SessionMsgListener;
@@ -59,7 +58,6 @@ import org.thingsboard.server.transport.tcp.util.ByteUtils; @@ -59,7 +58,6 @@ import org.thingsboard.server.transport.tcp.util.ByteUtils;
59 58
60 import javax.net.ssl.SSLPeerUnverifiedException; 59 import javax.net.ssl.SSLPeerUnverifiedException;
61 import java.io.IOException; 60 import java.io.IOException;
62 -import java.io.UnsupportedEncodingException;  
63 import java.net.InetSocketAddress; 61 import java.net.InetSocketAddress;
64 import java.security.cert.Certificate; 62 import java.security.cert.Certificate;
65 import java.security.cert.X509Certificate; 63 import java.security.cert.X509Certificate;
@@ -717,25 +715,15 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements @@ -717,25 +715,15 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
717 * @return 715 * @return
718 */ 716 */
719 private ChannelFuture pushDeviceMsg(ChannelHandlerContext ctx,TCPMessage tcp) { 717 private ChannelFuture pushDeviceMsg(ChannelHandlerContext ctx,TCPMessage tcp) {
720 - try {  
721 - String message = tcp.getMessage();  
722 - byte[] payloadInBytes ;  
723 - if(deviceSessionCtx.getPayloadType().equals(TcpDataTypeEnum.HEX)){  
724 - payloadInBytes = ByteUtils.hexStr2Bytes(message);  
725 - }else{  
726 - payloadInBytes = message.getBytes(ByteUtils.UTF_8);  
727 - }  
728 -// ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);  
729 -// ByteBuf payload = ALLOCATOR.buffer();  
730 -// payload.writeBytes(payloadInBytes);  
731 - ByteBuf payload = Unpooled.copiedBuffer(payloadInBytes);  
732 -  
733 - return ctx.writeAndFlush(payload);  
734 - } catch (UnsupportedEncodingException e) {  
735 - log.error(e.getMessage(),e);  
736 - throw new RuntimeException(e); 718 + String message = tcp.getMessage().replace(" ","");
  719 + ByteBuf buff = Unpooled.buffer();
  720 + if(!message.matches("-?[0-9a-fA-F]+")){
  721 + //不满足16进制将字符串转为16进制
  722 + message = ByteUtils.stringEncodeToHex(message);
  723 + }
  724 + buff.writeBytes(ByteUtils.hexToByteArray(message));
  725 + return ctx.writeAndFlush(buff);
737 } 726 }
738 - }  
739 727
740 private boolean isAckExpected(MqttMessage message) { 728 private boolean isAckExpected(MqttMessage message) {
741 return message.fixedHeader().qosLevel().value() > 0; 729 return message.fixedHeader().qosLevel().value() > 0;
@@ -758,4 +746,9 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements @@ -758,4 +746,9 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
758 ctx.close(); 746 ctx.close();
759 } 747 }
760 748
  749 + public static void main(String[] args){
  750 + System.out.printf("结果1:" + ByteUtils.stringEncodeToHex("hex转字符串"));
  751 + System.out.printf("结果2:" + ByteUtils.hexDecodeToString("686578e8bdace5ad97e7aca6e4b8b2"));
  752 + System.out.printf("匹配结果:" + "414244535344".matches("-?[0-9a-fA-F]+"));
  753 + }
761 } 754 }
  1 +package org.thingsboard.server.dao.yunteng.entities;
  2 +
  3 +import com.baomidou.mybatisplus.annotation.TableField;
  4 +import com.baomidou.mybatisplus.annotation.TableName;
  5 +import com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler;
  6 +import com.fasterxml.jackson.databind.JsonNode;
  7 +import lombok.Data;
  8 +import lombok.EqualsAndHashCode;
  9 +import org.apache.ibatis.type.EnumTypeHandler;
  10 +import org.thingsboard.server.common.data.yunteng.constant.ModelConstants;
  11 +import org.thingsboard.server.common.data.yunteng.enums.TargetTypeEnum;
  12 +
  13 +@Data
  14 +@EqualsAndHashCode(callSuper = true)
  15 +@TableName(value = ModelConstants.Table.TK_TASK_CENTER_NAME, autoResultMap = true)
  16 +public class TkTaskCenterEntity extends TenantBaseEntity {
  17 + private String name;
  18 +
  19 + @TableField(typeHandler = EnumTypeHandler.class)
  20 + private TargetTypeEnum targetType;
  21 +
  22 + @TableField(typeHandler = JacksonTypeHandler.class)
  23 + private JsonNode executeTarget;
  24 +
  25 + @TableField(typeHandler = JacksonTypeHandler.class)
  26 + private JsonNode executeContent;
  27 +
  28 + @TableField(typeHandler = JacksonTypeHandler.class)
  29 + private JsonNode executeTime;
  30 +
  31 + private Integer state;
  32 +
  33 + private String remark;
  34 +}
@@ -2,6 +2,8 @@ package org.thingsboard.server.dao.yunteng.impl; @@ -2,6 +2,8 @@ package org.thingsboard.server.dao.yunteng.impl;
2 2
3 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; 3 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
4 import com.baomidou.mybatisplus.core.metadata.IPage; 4 import com.baomidou.mybatisplus.core.metadata.IPage;
  5 +import com.google.common.util.concurrent.Futures;
  6 +import com.google.common.util.concurrent.ListenableFuture;
5 import lombok.RequiredArgsConstructor; 7 import lombok.RequiredArgsConstructor;
6 import org.apache.commons.lang3.StringUtils; 8 import org.apache.commons.lang3.StringUtils;
7 import org.quartz.*; 9 import org.quartz.*;
@@ -15,12 +17,14 @@ import org.thingsboard.server.common.data.yunteng.enums.StatusEnum; @@ -15,12 +17,14 @@ import org.thingsboard.server.common.data.yunteng.enums.StatusEnum;
15 import org.thingsboard.server.common.data.yunteng.utils.tools.TkPageData; 17 import org.thingsboard.server.common.data.yunteng.utils.tools.TkPageData;
16 import org.thingsboard.server.dao.util.yunteng.CronUtils; 18 import org.thingsboard.server.dao.util.yunteng.CronUtils;
17 import org.thingsboard.server.dao.util.yunteng.ScheduleUtils; 19 import org.thingsboard.server.dao.util.yunteng.ScheduleUtils;
  20 +import org.thingsboard.server.dao.yunteng.entities.BaseEntity;
18 import org.thingsboard.server.dao.yunteng.entities.SysJobEntity; 21 import org.thingsboard.server.dao.yunteng.entities.SysJobEntity;
19 import org.thingsboard.server.dao.yunteng.mapper.SysJobMapper; 22 import org.thingsboard.server.dao.yunteng.mapper.SysJobMapper;
20 import org.thingsboard.server.dao.yunteng.service.*; 23 import org.thingsboard.server.dao.yunteng.service.*;
21 24
22 import javax.annotation.PostConstruct; 25 import javax.annotation.PostConstruct;
23 import java.util.*; 26 import java.util.*;
  27 +import java.util.stream.Collectors;
24 28
25 /** 定时任务调度信息 服务层 */ 29 /** 定时任务调度信息 服务层 */
26 @Service 30 @Service
@@ -35,10 +39,12 @@ public class SysJobServiceImpl extends AbstractBaseService<SysJobMapper, SysJobE @@ -35,10 +39,12 @@ public class SysJobServiceImpl extends AbstractBaseService<SysJobMapper, SysJobE
35 scheduler.clear(); 39 scheduler.clear();
36 List<SysJobEntity> jobList = new ArrayList<>(); 40 List<SysJobEntity> jobList = new ArrayList<>();
37 try { 41 try {
38 - jobList.addAll(baseMapper.selectList(  
39 - new LambdaQueryWrapper<SysJobEntity>().eq(SysJobEntity::getStatus, StatusEnum.ENABLE.getIndex())));  
40 - }catch (Exception e){  
41 - //TODO: 兼容ThingsboardInstallApplication。执行数据库脚本前,会跑表不存在异常。 42 + jobList.addAll(
  43 + baseMapper.selectList(
  44 + new LambdaQueryWrapper<SysJobEntity>()
  45 + .eq(SysJobEntity::getStatus, StatusEnum.ENABLE.getIndex())));
  46 + } catch (Exception e) {
  47 + // TODO: 兼容ThingsboardInstallApplication。执行数据库脚本前,会跑表不存在异常。
42 } 48 }
43 49
44 for (SysJobEntity job : jobList) { 50 for (SysJobEntity job : jobList) {
@@ -72,15 +78,22 @@ public class SysJobServiceImpl extends AbstractBaseService<SysJobMapper, SysJobE @@ -72,15 +78,22 @@ public class SysJobServiceImpl extends AbstractBaseService<SysJobMapper, SysJobE
72 public SysJobDTO selectJobById(String id) { 78 public SysJobDTO selectJobById(String id) {
73 79
74 return Optional.ofNullable( 80 return Optional.ofNullable(
75 - baseMapper.selectOne(new LambdaQueryWrapper<SysJobEntity>().eq(SysJobEntity::getId, id))) 81 + baseMapper.selectOne(
  82 + new LambdaQueryWrapper<SysJobEntity>().eq(SysJobEntity::getId, id)))
76 .map(obj -> obj.getDTO(SysJobDTO.class)) 83 .map(obj -> obj.getDTO(SysJobDTO.class))
77 .orElseThrow( 84 .orElseThrow(
78 - () -> {  
79 - throw new TkDataValidationException(  
80 - ErrorMessage.NOT_BELONG_CURRENT_TENANT.getMessage());  
81 - }); 85 + () ->
  86 + new TkDataValidationException(ErrorMessage.NOT_BELONG_CURRENT_TENANT.getMessage()));
82 } 87 }
83 88
  89 + @Override
  90 + public ListenableFuture<SysJobDTO> selectJobInfoById(String id) {
  91 + return Optional.ofNullable(
  92 + baseMapper.selectOne(
  93 + new LambdaQueryWrapper<SysJobEntity>().eq(SysJobEntity::getId, id)))
  94 + .map(obj -> Futures.immediateFuture(obj.getDTO(SysJobDTO.class)))
  95 + .orElse(Futures.immediateFuture(null));
  96 + }
84 /** 97 /**
85 * 暂停任务 98 * 暂停任务
86 * 99 *
@@ -130,6 +143,40 @@ public class SysJobServiceImpl extends AbstractBaseService<SysJobMapper, SysJobE @@ -130,6 +143,40 @@ public class SysJobServiceImpl extends AbstractBaseService<SysJobMapper, SysJobE
130 } 143 }
131 return rows > 0; 144 return rows > 0;
132 } 145 }
  146 + /**
  147 + * 批量删除任务后,所对应的trigger也将被删除
  148 + *
  149 + * @param sourceIds 源ID
  150 + * @param jobGroup 分组
  151 + * @param tenantId 租户ID
  152 + */
  153 + @Override
  154 + @Transactional(rollbackFor = Exception.class)
  155 + public boolean deleteJobs(List<String> sourceIds, String jobGroup, String tenantId)
  156 + throws SchedulerException {
  157 + List<String> jobIds =
  158 + Optional.ofNullable(
  159 + baseMapper.selectList(
  160 + new LambdaQueryWrapper<SysJobEntity>()
  161 + .eq(SysJobEntity::getTenantId, tenantId)
  162 + .eq(SysJobEntity::getJobGroup, jobGroup)
  163 + .in(SysJobEntity::getSourceId, sourceIds)))
  164 + .map(list -> list.stream().map(BaseEntity::getId).collect(Collectors.toList()))
  165 + .orElse(null);
  166 + if (null != jobIds && !jobIds.isEmpty()) {
  167 + int rows = baseMapper.deleteBatchIds(jobIds);
  168 + if (rows > 0) {
  169 + List<JobKey> jobKeys = new ArrayList<>();
  170 + for (String jobId : jobIds) {
  171 + jobKeys.add(ScheduleUtils.getJobKey(jobId, jobGroup));
  172 + }
  173 + if (!jobKeys.isEmpty()) {
  174 + return scheduler.deleteJobs(jobKeys);
  175 + }
  176 + }
  177 + }
  178 + return false;
  179 + }
133 180
134 /** 181 /**
135 * 批量删除调度信息 182 * 批量删除调度信息
  1 +package org.thingsboard.server.dao.yunteng.impl;
  2 +
  3 +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  4 +import com.baomidou.mybatisplus.core.metadata.IPage;
  5 +import com.fasterxml.jackson.databind.JsonNode;
  6 +import com.google.common.util.concurrent.Futures;
  7 +import com.google.common.util.concurrent.ListenableFuture;
  8 +import lombok.RequiredArgsConstructor;
  9 +import lombok.extern.slf4j.Slf4j;
  10 +import org.apache.commons.lang3.StringUtils;
  11 +import org.quartz.SchedulerException;
  12 +import org.springframework.stereotype.Service;
  13 +import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;
  14 +import org.thingsboard.server.common.data.yunteng.core.cache.CacheUtils;
  15 +import org.thingsboard.server.common.data.yunteng.core.exception.TkDataValidationException;
  16 +import org.thingsboard.server.common.data.yunteng.core.message.ErrorMessage;
  17 +import org.thingsboard.server.common.data.yunteng.dto.DeleteDTO;
  18 +import org.thingsboard.server.common.data.yunteng.dto.DeviceDTO;
  19 +import org.thingsboard.server.common.data.yunteng.dto.SysJobDTO;
  20 +import org.thingsboard.server.common.data.yunteng.dto.task.TargetContentDTO;
  21 +import org.thingsboard.server.common.data.yunteng.dto.task.TaskExecuteTimeDTO;
  22 +import org.thingsboard.server.common.data.yunteng.dto.task.TaskTypeDTO;
  23 +import org.thingsboard.server.common.data.yunteng.dto.task.TkTaskCenterDTO;
  24 +import org.thingsboard.server.common.data.yunteng.enums.JobGroupEnum;
  25 +import org.thingsboard.server.common.data.yunteng.enums.TargetTypeEnum;
  26 +import org.thingsboard.server.common.data.yunteng.utils.JacksonUtil;
  27 +import org.thingsboard.server.common.data.yunteng.utils.tools.TkPageData;
  28 +import org.thingsboard.server.dao.yunteng.entities.TkTaskCenterEntity;
  29 +import org.thingsboard.server.dao.yunteng.mapper.TaskCenterMapper;
  30 +import org.thingsboard.server.dao.yunteng.service.AbstractBaseService;
  31 +import org.thingsboard.server.dao.yunteng.service.TkDeviceService;
  32 +import org.thingsboard.server.dao.yunteng.service.TkSysJobService;
  33 +import org.thingsboard.server.dao.yunteng.service.TkTaskCenterService;
  34 +
  35 +import javax.transaction.Transactional;
  36 +import java.util.*;
  37 +import java.util.stream.Collectors;
  38 +
  39 +@Service
  40 +@RequiredArgsConstructor
  41 +@Slf4j
  42 +public class TkTaskCenterServiceImpl
  43 + extends AbstractBaseService<TaskCenterMapper, TkTaskCenterEntity>
  44 + implements TkTaskCenterService {
  45 + private final TkSysJobService tkSysJobService;
  46 + private final TkDeviceService tkDeviceService;
  47 + private final CacheUtils cacheUtils;
  48 +
  49 + @Override
  50 + public TkPageData<TkTaskCenterDTO> taskCenterPage(Map<String, Object> queryMap, String tenantId) {
  51 + Integer state = (Integer) queryMap.get("state");
  52 + TargetTypeEnum targetTypeEnum = (TargetTypeEnum) queryMap.get("targetType");
  53 + IPage<TkTaskCenterEntity> page =
  54 + getPage(queryMap, FastIotConstants.DefaultOrder.CREATE_TIME, false);
  55 + IPage<TkTaskCenterEntity> taskCenterIPage =
  56 + baseMapper.selectPage(
  57 + page,
  58 + new LambdaQueryWrapper<TkTaskCenterEntity>()
  59 + .eq(TkTaskCenterEntity::getTenantId, tenantId)
  60 + .eq(null != state, TkTaskCenterEntity::getState, state)
  61 + .eq(null != targetTypeEnum, TkTaskCenterEntity::getTargetType, targetTypeEnum));
  62 + if (!taskCenterIPage.getRecords().isEmpty()) {
  63 + List<TkTaskCenterDTO> listRecords =
  64 + taskCenterIPage.getRecords().stream()
  65 + .map(
  66 + entity -> {
  67 + TkTaskCenterDTO dto = entity.getDTO(TkTaskCenterDTO.class);
  68 + dto.setExecuteContent(
  69 + JacksonUtil.convertValue(entity.getExecuteContent(), TaskTypeDTO.class));
  70 + dto.setExecuteTime(
  71 + JacksonUtil.convertValue(
  72 + entity.getExecuteTime(), TaskExecuteTimeDTO.class));
  73 + dto.setExecuteTarget(
  74 + JacksonUtil.convertValue(
  75 + entity.getExecuteTarget(), TargetContentDTO.class));
  76 + return dto;
  77 + })
  78 + .collect(Collectors.toList());
  79 + return new TkPageData<>(listRecords, taskCenterIPage.getTotal());
  80 + }
  81 + return getPageData(taskCenterIPage, TkTaskCenterDTO.class);
  82 + }
  83 +
  84 + @Override
  85 + @Transactional
  86 + public TkTaskCenterDTO saveOrUpdateTaskCenter(TkTaskCenterDTO tkTaskCenterDTO) {
  87 + TkTaskCenterEntity entity = tkTaskCenterDTO.getEntity(TkTaskCenterEntity.class);
  88 + JsonNode jsonNode = tkTaskCenterDTO.getExecuteContent().getPushContent();
  89 + if (null == jsonNode || null == jsonNode.get(FastIotConstants.RPC_COMMAND)) {
  90 + throw new TkDataValidationException(ErrorMessage.EXECUTE_COMMAND_IS_NULL.getMessage());
  91 + }
  92 + TaskExecuteTimeDTO executeTime = tkTaskCenterDTO.getExecuteTime();
  93 + if (null == executeTime || StringUtils.isEmpty(executeTime.getCron())) {
  94 + throw new TkDataValidationException(ErrorMessage.CRON_INVALID.getMessage());
  95 + }
  96 + entity.setExecuteContent(
  97 + JacksonUtil.convertValue(tkTaskCenterDTO.getExecuteContent(), JsonNode.class));
  98 + entity.setExecuteTarget(
  99 + JacksonUtil.convertValue(tkTaskCenterDTO.getExecuteTarget(), JsonNode.class));
  100 + entity.setExecuteTime(
  101 + JacksonUtil.convertValue(tkTaskCenterDTO.getExecuteTime(), JsonNode.class));
  102 + if (StringUtils.isEmpty(tkTaskCenterDTO.getId())) {
  103 + entity.setState(FastIotConstants.StateValue.DISABLE);
  104 + baseMapper.insert(entity);
  105 + } else {
  106 + TkTaskCenterDTO queryDTO =
  107 + findTaskCenterInfoById(tkTaskCenterDTO.getId(), tkTaskCenterDTO.getTenantId());
  108 + if (null == queryDTO) {
  109 + throw new TkDataValidationException(ErrorMessage.NOT_BELONG_CURRENT_TENANT.getMessage());
  110 + }
  111 + // 只有通过修改状态接口,才可以修改状态
  112 + entity.setState(null);
  113 + baseMapper.updateById(entity);
  114 + updateTaskCenterCache(entity);
  115 + }
  116 + return tkTaskCenterDTO;
  117 + }
  118 +
  119 + @Override
  120 + @Transactional
  121 + public boolean deleteTaskCenter(DeleteDTO deleteDTO) throws SchedulerException {
  122 + List<TkTaskCenterEntity> list =
  123 + baseMapper.selectList(
  124 + new LambdaQueryWrapper<TkTaskCenterEntity>()
  125 + .eq(TkTaskCenterEntity::getTenantId, deleteDTO.getTenantId())
  126 + .in(TkTaskCenterEntity::getId, deleteDTO.getIds()));
  127 + if (null == list || list.isEmpty()) {
  128 + throw new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage());
  129 + }
  130 + List<String> sourceIds = new ArrayList<>();
  131 + for (TkTaskCenterEntity entity : list) {
  132 + // 任务中心,是启用状态不能删除
  133 + if (entity.getState() == FastIotConstants.StateValue.ENABLE) {
  134 + throw new TkDataValidationException(
  135 + String.format(
  136 + ErrorMessage.DATA_STATE_ENABLE_NOT_DELETE.getMessage(), entity.getName()));
  137 + }
  138 + sourceIds.add(entity.getId());
  139 + }
  140 + int result = baseMapper.deleteBatchIds(sourceIds);
  141 + if (result > FastIotConstants.MagicNumber.ZERO) {
  142 + return tkSysJobService.deleteJobs(
  143 + sourceIds, JobGroupEnum.TASK_CENTER.name(), deleteDTO.getTenantId());
  144 + }
  145 + // 移除任务中心缓存
  146 + for (String taskCenterId : sourceIds) {
  147 + String key = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS + taskCenterId;
  148 + cacheUtils.invalidate(key);
  149 + }
  150 + return false;
  151 + }
  152 +
  153 + @Override
  154 + public TkTaskCenterDTO findTaskCenterInfoById(String id, String tenantId) {
  155 + if (StringUtils.isEmpty(id) || StringUtils.isEmpty(tenantId)) {
  156 + throw new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage());
  157 + }
  158 + TkTaskCenterEntity entity =
  159 + baseMapper.selectOne(
  160 + new LambdaQueryWrapper<TkTaskCenterEntity>()
  161 + .eq(TkTaskCenterEntity::getTenantId, tenantId)
  162 + .eq(TkTaskCenterEntity::getId, id));
  163 + return Optional.ofNullable(entity)
  164 + .map(
  165 + object -> {
  166 + TkTaskCenterDTO tkTaskCenterDTO = object.getDTO(TkTaskCenterDTO.class);
  167 + tkTaskCenterDTO.setExecuteTarget(
  168 + JacksonUtil.convertValue(object.getExecuteTarget(), TargetContentDTO.class));
  169 + tkTaskCenterDTO.setExecuteContent(
  170 + JacksonUtil.convertValue(object.getExecuteContent(), TaskTypeDTO.class));
  171 + tkTaskCenterDTO.setExecuteTime(
  172 + JacksonUtil.convertValue(object.getExecuteTime(), TaskExecuteTimeDTO.class));
  173 + return tkTaskCenterDTO;
  174 + })
  175 + .orElse(null);
  176 + }
  177 +
  178 + @Override
  179 + public ListenableFuture<TkTaskCenterDTO> getTaskCenterInfoById(String id) {
  180 + if (StringUtils.isEmpty(id)) {
  181 + throw new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage());
  182 + }
  183 + String key = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS + id;
  184 + Optional<TkTaskCenterEntity> entityCache = cacheUtils.get(key);
  185 + TkTaskCenterEntity entity =
  186 + entityCache.orElseGet(
  187 + () ->
  188 + baseMapper.selectOne(
  189 + new LambdaQueryWrapper<TkTaskCenterEntity>()
  190 + .eq(TkTaskCenterEntity::getId, id)));
  191 + return Optional.ofNullable(entity)
  192 + .map(
  193 + object -> {
  194 + TkTaskCenterDTO tkTaskCenterDTO = object.getDTO(TkTaskCenterDTO.class);
  195 + tkTaskCenterDTO.setExecuteTarget(
  196 + JacksonUtil.convertValue(object.getExecuteTarget(), TargetContentDTO.class));
  197 + tkTaskCenterDTO.setExecuteContent(
  198 + JacksonUtil.convertValue(object.getExecuteContent(), TaskTypeDTO.class));
  199 + tkTaskCenterDTO.setExecuteTime(
  200 + JacksonUtil.convertValue(object.getExecuteTime(), TaskExecuteTimeDTO.class));
  201 + return Futures.immediateFuture(tkTaskCenterDTO);
  202 + })
  203 + .orElse(null);
  204 + }
  205 +
  206 + @Override
  207 + @Transactional
  208 + public boolean updateState(String id, Integer state, String tenantId) throws SchedulerException {
  209 + TkTaskCenterEntity entity =
  210 + baseMapper.selectOne(
  211 + new LambdaQueryWrapper<TkTaskCenterEntity>().eq(TkTaskCenterEntity::getId, id));
  212 + boolean result = false;
  213 + if (null == entity) {
  214 + throw new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage());
  215 + }
  216 + if (!Objects.equals(entity.getTenantId(), tenantId)) {
  217 + throw new TkDataValidationException(ErrorMessage.NOT_BELONG_CURRENT_TENANT.getMessage());
  218 + }
  219 + entity.setState(state);
  220 + if (baseMapper.updateById(entity) > FastIotConstants.MagicNumber.ZERO) {
  221 + String sourceId = entity.getId();
  222 + SysJobDTO queryJob = tkSysJobService.findSysJobBySourceId(sourceId);
  223 + if (null == queryJob) {
  224 + TaskExecuteTimeDTO executeTime =
  225 + JacksonUtil.convertValue(entity.getExecuteTime(), TaskExecuteTimeDTO.class);
  226 + if (null == executeTime || StringUtils.isEmpty(executeTime.getCron())) {
  227 + throw new TkDataValidationException(ErrorMessage.CRON_INVALID.getMessage());
  228 + }
  229 + queryJob = new SysJobDTO();
  230 + queryJob.setSourceId(sourceId);
  231 + queryJob.setJobName(entity.getName());
  232 + queryJob.setInvokeTarget("rpcCommandTask.process('" + sourceId + "')");
  233 + queryJob.setJobGroup(JobGroupEnum.TASK_CENTER.name());
  234 + queryJob.setCronExpression(executeTime.getCron());
  235 + queryJob.setTenantId(entity.getTenantId());
  236 + }
  237 + queryJob.setStatus(state);
  238 + tkSysJobService.saveOrUpdateJob(queryJob);
  239 + result = true;
  240 + }
  241 + return result;
  242 + }
  243 +
  244 + @Override
  245 + @Transactional
  246 + public boolean cancelExecute(String id, String tbDeviceId, String tenantId) {
  247 + boolean result = false;
  248 + TkTaskCenterEntity entity = baseMapper.selectById(id);
  249 + if (entity != null) {
  250 + if (!Objects.equals(entity.getTenantId(), tenantId)) {
  251 + throw new TkDataValidationException(ErrorMessage.NOT_BELONG_CURRENT_TENANT.getMessage());
  252 + }
  253 + TargetContentDTO targetContent =
  254 + JacksonUtil.convertValue(entity.getExecuteTarget(), TargetContentDTO.class);
  255 + if (null != targetContent) {
  256 + // 按设备执行
  257 + if (Objects.equals(entity.getTargetType(), TargetTypeEnum.DEVICES)) {
  258 + List<String> data = targetContent.getData();
  259 + List<String> removeResult = new ArrayList<>();
  260 + if (null != data && !data.isEmpty()) {
  261 + for (String deviceId : data) {
  262 + if (!Objects.equals(deviceId, tbDeviceId)) {
  263 + removeResult.add(deviceId);
  264 + }
  265 + }
  266 + targetContent.setData(removeResult);
  267 + }
  268 + } else {
  269 + // 按产品执行
  270 + Map<String, List<String>> map = targetContent.getCancelExecuteDevices();
  271 + DeviceDTO deviceDTO = tkDeviceService.findDeviceInfoByTbDeviceId(tenantId, tbDeviceId);
  272 + for (String key : map.keySet()) {
  273 + if (Objects.equals(key, deviceDTO.getDeviceProfileId())) {
  274 + map.get(key).add(tbDeviceId);
  275 + break;
  276 + }
  277 + }
  278 + }
  279 + entity.setExecuteTarget(JacksonUtil.convertValue(targetContent, JsonNode.class));
  280 + baseMapper.updateById(entity);
  281 + updateTaskCenterCache(entity);
  282 + }
  283 + }
  284 + return result;
  285 + }
  286 +
  287 + private void updateTaskCenterCache(TkTaskCenterEntity entity) {
  288 + String key = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS + entity.getId();
  289 + Optional<TkTaskCenterEntity> entityCache = cacheUtils.get(key);
  290 + if (entityCache.isPresent()) {
  291 + cacheUtils.put(key, entity);
  292 + }
  293 + }
  294 +}
  1 +package org.thingsboard.server.dao.yunteng.mapper;
  2 +
  3 +import com.baomidou.mybatisplus.core.mapper.BaseMapper;
  4 +import org.apache.ibatis.annotations.Mapper;
  5 +import org.thingsboard.server.dao.yunteng.entities.TkTaskCenterEntity;
  6 +
  7 +@Mapper
  8 +public interface TaskCenterMapper extends BaseMapper<TkTaskCenterEntity> {
  9 +}
1 package org.thingsboard.server.dao.yunteng.service; 1 package org.thingsboard.server.dao.yunteng.service;
  2 +
  3 +import com.google.common.util.concurrent.ListenableFuture;
2 import org.quartz.SchedulerException; 4 import org.quartz.SchedulerException;
3 import org.thingsboard.server.common.data.yunteng.dto.DeleteDTO; 5 import org.thingsboard.server.common.data.yunteng.dto.DeleteDTO;
4 import org.thingsboard.server.common.data.yunteng.dto.SysJobDTO; 6 import org.thingsboard.server.common.data.yunteng.dto.SysJobDTO;
5 import org.thingsboard.server.common.data.yunteng.utils.tools.TkPageData; 7 import org.thingsboard.server.common.data.yunteng.utils.tools.TkPageData;
  8 +import java.util.List;
6 import java.util.Map; 9 import java.util.Map;
7 10
8 -/**  
9 - * 定时任务调度信息信息 服务层  
10 - */  
11 -public interface TkSysJobService  
12 -{  
13 - /**  
14 - * 获取quartz调度器的计划任务  
15 - *  
16 - * @param queryMap 查询参数  
17 - * @return 分页数据  
18 - */  
19 - TkPageData<SysJobDTO> sysJobPage(Map<String, Object> queryMap);  
20 -  
21 - /**  
22 - * 通过调度任务ID查询调度信息  
23 - *  
24 - * @param id 调度任务ID  
25 - * @return 调度任务对象信息  
26 - */  
27 - SysJobDTO selectJobById(String id); 11 +/** 定时任务调度信息信息 服务层 */
  12 +public interface TkSysJobService {
  13 + /**
  14 + * 获取quartz调度器的计划任务
  15 + *
  16 + * @param queryMap 查询参数
  17 + * @return 分页数据
  18 + */
  19 + TkPageData<SysJobDTO> sysJobPage(Map<String, Object> queryMap);
28 20
29 - /**  
30 - * 暂停任务  
31 - *  
32 - * @param job 调度信息  
33 - * @return 结果  
34 - */  
35 - SysJobDTO pauseJob(SysJobDTO job) throws SchedulerException; 21 + /**
  22 + * 通过调度任务ID查询调度信息
  23 + *
  24 + * @param id 调度任务ID
  25 + * @return 调度任务对象信息
  26 + */
  27 + SysJobDTO selectJobById(String id);
  28 + /**
  29 + * 通过调度任务ID查询调度信息
  30 + *
  31 + * @param id 调度任务ID
  32 + * @return 调度任务对象信息
  33 + */
  34 + ListenableFuture<SysJobDTO> selectJobInfoById(String id);
  35 + /**
  36 + * 暂停任务
  37 + *
  38 + * @param job 调度信息
  39 + * @return 结果
  40 + */
  41 + SysJobDTO pauseJob(SysJobDTO job) throws SchedulerException;
36 42
37 - /**  
38 - * 恢复任务  
39 - *  
40 - * @param job 调度信息  
41 - * @return 结果  
42 - */  
43 - SysJobDTO resumeJob(SysJobDTO job) throws SchedulerException; 43 + /**
  44 + * 恢复任务
  45 + *
  46 + * @param job 调度信息
  47 + * @return 结果
  48 + */
  49 + SysJobDTO resumeJob(SysJobDTO job) throws SchedulerException;
44 50
45 - /**  
46 - * 删除任务后,所对应的trigger也将被删除  
47 - *  
48 - * @param job 调度信息  
49 - * @return 结果  
50 - */  
51 - boolean deleteJob(SysJobDTO job) throws SchedulerException; 51 + /**
  52 + * 删除任务后,所对应的trigger也将被删除
  53 + *
  54 + * @param job 调度信息
  55 + * @return 结果
  56 + */
  57 + boolean deleteJob(SysJobDTO job) throws SchedulerException;
52 58
53 - /**  
54 - * 批量删除调度信息  
55 - *  
56 - * @param deleteDTO 需要删除的数据ID  
57 - * @return 结果 true删除成功 false删除失败  
58 - */  
59 - boolean deleteJobByIds(DeleteDTO deleteDTO) throws SchedulerException; 59 + /**
  60 + * 批量删除定时任务
  61 + *
  62 + * @param sourceIds 来源ID即业务ID
  63 + * @param jobGroup 定时任务分组
  64 + * @param tenantId 租户ID
  65 + * @return true成功 false失败
  66 + */
  67 + boolean deleteJobs(List<String> sourceIds, String jobGroup, String tenantId)
  68 + throws SchedulerException;
60 69
61 - /**  
62 - * 任务调度状态修改  
63 - *  
64 - * @param job 调度信息  
65 - * @return 结果  
66 - */  
67 - SysJobDTO updateSysJobStatus(SysJobDTO job) throws SchedulerException; 70 + /**
  71 + * 批量删除调度信息
  72 + *
  73 + * @param deleteDTO 需要删除的数据ID
  74 + * @return 结果 true删除成功 false删除失败
  75 + */
  76 + boolean deleteJobByIds(DeleteDTO deleteDTO) throws SchedulerException;
68 77
69 - /**  
70 - * 立即运行任务  
71 - *  
72 - * @param job 调度信息  
73 - * @return 结果  
74 - */  
75 - boolean run(SysJobDTO job) throws SchedulerException; 78 + /**
  79 + * 任务调度状态修改
  80 + *
  81 + * @param job 调度信息
  82 + * @return 结果
  83 + */
  84 + SysJobDTO updateSysJobStatus(SysJobDTO job) throws SchedulerException;
76 85
77 - /**  
78 - * 新增或编辑任务  
79 - *  
80 - * @param job 调度信息  
81 - * @return 结果  
82 - */  
83 - SysJobDTO saveOrUpdateJob(SysJobDTO job) throws SchedulerException; 86 + /**
  87 + * 立即运行任务
  88 + *
  89 + * @param job 调度信息
  90 + * @return 结果
  91 + */
  92 + boolean run(SysJobDTO job) throws SchedulerException;
84 93
  94 + /**
  95 + * 新增或编辑任务
  96 + *
  97 + * @param job 调度信息
  98 + * @return 结果
  99 + */
  100 + SysJobDTO saveOrUpdateJob(SysJobDTO job) throws SchedulerException;
85 101
86 - /**  
87 - * 校验cron表达式是否有效  
88 - *  
89 - * @param cronExpression 表达式  
90 - * @return 结果  
91 - */  
92 - boolean checkCronExpressionIsValid(String cronExpression); 102 + /**
  103 + * 校验cron表达式是否有效
  104 + *
  105 + * @param cronExpression 表达式
  106 + * @return 结果
  107 + */
  108 + boolean checkCronExpressionIsValid(String cronExpression);
93 109
94 - /**  
95 - * 通过源ID找到定时任务信息  
96 - * @param sourceId 源ID  
97 - * @return 定时任务信息  
98 - */  
99 - SysJobDTO findSysJobBySourceId(String sourceId);  
100 -}  
  110 + /**
  111 + * 通过源ID找到定时任务信息
  112 + *
  113 + * @param sourceId 源ID
  114 + * @return 定时任务信息
  115 + */
  116 + SysJobDTO findSysJobBySourceId(String sourceId);
  117 +}
  1 +package org.thingsboard.server.dao.yunteng.service;
  2 +
  3 +import com.google.common.util.concurrent.ListenableFuture;import org.quartz.SchedulerException;
  4 +import org.thingsboard.server.common.data.yunteng.dto.DeleteDTO;
  5 +import org.thingsboard.server.common.data.yunteng.dto.task.TkTaskCenterDTO;
  6 +import org.thingsboard.server.common.data.yunteng.utils.tools.TkPageData;
  7 +import java.util.Map;
  8 +
  9 +public interface TkTaskCenterService {
  10 + /**
  11 + * 任务中心分页
  12 + *
  13 + * @param queryMap 分页查询条件
  14 + * @param tenantId 租户ID
  15 + * @return 分页数据
  16 + */
  17 + TkPageData<TkTaskCenterDTO> taskCenterPage(Map<String, Object> queryMap, String tenantId);
  18 +
  19 + /**
  20 + * 修改或保存任务中心
  21 + *
  22 + * @param tkTaskCenterDTO 任务中心信息
  23 + */
  24 + TkTaskCenterDTO saveOrUpdateTaskCenter(TkTaskCenterDTO tkTaskCenterDTO) throws SchedulerException;
  25 +
  26 + /**
  27 + * 删除任务中心
  28 + *
  29 + * @param deleteDTO 删除IDS
  30 + */
  31 + boolean deleteTaskCenter(DeleteDTO deleteDTO)throws SchedulerException;
  32 +
  33 + /**
  34 + * 根据任务中心ID,查询任务信息
  35 + * @param id 任务中心ID
  36 + * @param tenantId 租户ID
  37 + * @return 任务信息
  38 + */
  39 + TkTaskCenterDTO findTaskCenterInfoById(String id,String tenantId);
  40 +
  41 + /**
  42 + * 根据任务中心ID,异步查询任务信息
  43 + * @param id 任务中心ID
  44 + * @return 任务信息
  45 + */
  46 + ListenableFuture<TkTaskCenterDTO> getTaskCenterInfoById(String id);
  47 +
  48 + /**
  49 + * 启用禁用任务中心
  50 + * @param id 任务中心ID
  51 + * @param state 1启用 0禁用
  52 + * @param tenantId 租户ID
  53 + * @return true成功 false失败
  54 + */
  55 + boolean updateState(String id,Integer state,String tenantId)throws SchedulerException;
  56 +
  57 + /**
  58 + * 设备取消任务执行
  59 + * @param id 任务ID
  60 + * @param tbDeviceId 设备ID
  61 + * @param tenantId 租户ID
  62 + * @return true成功 false失败
  63 + */
  64 + boolean cancelExecute(String id,String tbDeviceId,String tenantId);
  65 +}