Commit fbdd35b906e81fa90a0b46e79b965cd401d32d8e

Authored by xp.Huang
2 parents 59d0a199 45e152b4

Merge branch 'master_dev_gbt_0319' into 'master_dev'

fix: 视频点播问题修复

See merge request yunteng/thingskit!371
  1 +--修改流媒体表名称--
  2 +alter table tk_media_server rename to tk_video_zlmediakit;
  3 +--先删除status字段再加新类型--
  4 +alter table tk_video_zlmediakit drop status;
  5 +alter table tk_video_zlmediakit add status BOOLEAN;
  6 +COMMENT ON COLUMN "public"."tk_video_zlmediakit"."status" IS '流媒体服务状态:false离线 true在线';
\ No newline at end of file
... ...
... ... @@ -42,6 +42,7 @@ import org.thingsboard.server.common.data.page.PageLink;
42 42 import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;
43 43 import org.thingsboard.server.common.data.yunteng.dto.DeviceDTO;
44 44 import org.thingsboard.server.common.data.yunteng.dto.TkDeviceStateLogDTO;
  45 +import org.thingsboard.server.common.data.yunteng.enums.StatusEnum;
45 46 import org.thingsboard.server.common.msg.TbMsg;
46 47 import org.thingsboard.server.common.msg.TbMsgDataType;
47 48 import org.thingsboard.server.common.msg.TbMsgMetaData;
... ... @@ -54,6 +55,7 @@ import org.thingsboard.server.dao.tenant.TenantService;
54 55 import org.thingsboard.server.dao.timeseries.TimeseriesService;
55 56 import org.thingsboard.server.dao.yunteng.service.TkDeviceService;
56 57 import org.thingsboard.server.dao.yunteng.service.TkDeviceStateLogService;
  58 +import org.thingsboard.server.dao.yunteng.service.media.TkVideoChannelService;
57 59 import org.thingsboard.server.gen.transport.TransportProtos;
58 60 import org.thingsboard.server.queue.discovery.PartitionService;
59 61 import org.thingsboard.server.service.partition.AbstractPartitionBasedService;
... ... @@ -94,6 +96,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
94 96 private final TbClusterService clusterService;
95 97 private final PartitionService partitionService;
96 98 private final TkDeviceService tkDeviceService;
  99 + private final TkVideoChannelService channelService;
97 100 private final TkDeviceStateLogService tkDeviceStateLogService;
98 101
99 102 private TelemetrySubscriptionService tsSubService;
... ... @@ -121,7 +124,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
121 124 public DefaultDeviceStateService(TenantService tenantService, DeviceService deviceService,
122 125 AttributesService attributesService, TimeseriesService tsService,
123 126 TbClusterService clusterService, PartitionService partitionService,
124   - TkDeviceService tkDeviceService,TkDeviceStateLogService tkDeviceStateLogService) {
  127 + TkDeviceService tkDeviceService, TkVideoChannelService channelService, TkDeviceStateLogService tkDeviceStateLogService) {
125 128 this.tenantService = tenantService;
126 129 this.deviceService = deviceService;
127 130 this.attributesService = attributesService;
... ... @@ -129,6 +132,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
129 132 this.clusterService = clusterService;
130 133 this.partitionService = partitionService;
131 134 this.tkDeviceService = tkDeviceService;
  135 + this.channelService = channelService;
132 136 this.tkDeviceStateLogService = tkDeviceStateLogService;
133 137 }
134 138
... ... @@ -420,6 +424,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
420 424 save(deviceId, INACTIVITY_ALARM_TIME, ts);
421 425 pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
422 426 //thingskit update tkDevice state
  427 + channelService.updateVideoChannelState(deviceId.getId().toString(), StatusEnum.OFFLINE);
423 428 if(stateData.getState().getLastActivityTime()>0){
424 429 String tenantId = stateData.getTenantId().getId().toString();
425 430 String tbDeviceId = deviceId.getId().toString();
... ...
... ... @@ -24,10 +24,8 @@ import com.google.common.util.concurrent.Futures;
24 24 import com.google.common.util.concurrent.ListenableFuture;
25 25 import com.google.common.util.concurrent.MoreExecutors;
26 26 import com.google.protobuf.ByteString;
27   -import java.util.List;
28   -import java.util.Optional;
29   -import java.util.Set;
30   -import java.util.UUID;
  27 +
  28 +import java.util.*;
31 29 import java.util.concurrent.ConcurrentHashMap;
32 30 import java.util.concurrent.ConcurrentMap;
33 31 import java.util.concurrent.locks.Lock;
... ... @@ -886,10 +884,18 @@ public class DefaultTransportApiService implements TransportApiService {
886 884 dataDecodingEncodingService.decode(requestMsg.getContext().toByteArray());
887 885 allChannel.ifPresent(
888 886 d -> {
889   - channelService.clearDeviceChannel(d.get(0).getCameraCode());
  887 + String cameraCode = d.get(0).getCameraCode();
  888 + Map<String, String> playingChannels = channelService.findPlayingChannel(cameraCode);
  889 + channelService.clearDeviceChannel(cameraCode);
890 890 List<TkVideoChannelEntity> chanel =
891 891 d.stream()
892   - .map(item -> item.getEntity(TkVideoChannelEntity.class))
  892 + .map(item -> {
  893 + String channelId = item.getChannelId();
  894 + if(playingChannels.containsKey(channelId)){
  895 + item.setStreamId(playingChannels.get(channelId));
  896 + }
  897 + return item.getEntity(TkVideoChannelEntity.class);
  898 + })
893 899 .collect(Collectors.toList());
894 900 channelService.insertBatch(chanel, chanel.size());
895 901 });
... ...
... ... @@ -82,7 +82,7 @@ public class TkVideoControlServiceImpl implements TkVideoControlService {
82 82 // 找到可以使用的流媒体
83 83 MediaServerDTO mediaServer =
84 84 tkMediaServerService.getMediaServerForPlay(tenantId, sipDeviceDTO.getMediaServerId());
85   - if (null == mediaServer) {
  85 + if (null == mediaServer || !mediaServer.isStatus()) {
86 86 throw new TkDataValidationException(
87 87 ErrorMessage.NOT_FOUND_MEDIA_SERVER_FOR_PLAY.getMessage());
88 88 }
... ...
... ... @@ -28,6 +28,7 @@ import org.thingsboard.server.dao.tenant.TenantService;
28 28 import org.thingsboard.server.dao.timeseries.TimeseriesService;
29 29 import org.thingsboard.server.dao.yunteng.service.TkDeviceService;
30 30 import org.thingsboard.server.dao.yunteng.service.TkDeviceStateLogService;
  31 +import org.thingsboard.server.dao.yunteng.service.media.TkVideoChannelService;
31 32 import org.thingsboard.server.queue.discovery.PartitionService;
32 33 import org.thingsboard.server.cluster.TbClusterService;
33 34
... ... @@ -59,14 +60,15 @@ public class DefaultDeviceStateServiceTest {
59 60 TkDeviceService tkDeviceService;
60 61 @Mock
61 62 TkDeviceStateLogService tkDeviceStateLogService;
62   -
  63 + @Mock
  64 + TkVideoChannelService channelService;
63 65 DeviceId deviceId = DeviceId.fromString("00797a3b-7aeb-4b5b-b57a-c2a810d0f112");
64 66
65 67 DefaultDeviceStateService service;
66 68
67 69 @Before
68 70 public void setUp() {
69   - service = spy(new DefaultDeviceStateService(tenantService, deviceService, attributesService, tsService, clusterService, partitionService,tkDeviceService,tkDeviceStateLogService));
  71 + service = spy(new DefaultDeviceStateService(tenantService, deviceService, attributesService, tsService, clusterService, partitionService,tkDeviceService, channelService, tkDeviceStateLogService));
70 72 }
71 73
72 74 @Test
... ... @@ -86,4 +88,4 @@ public class DefaultDeviceStateServiceTest {
86 88 Mockito.verify(service, times(1)).fetchDeviceStateData(deviceId);
87 89 }
88 90
89   -}
\ No newline at end of file
  91 +}
... ...
... ... @@ -138,7 +138,7 @@ public final class ModelConstants {
138 138 public static final String TK_DEVICE_ACCESS_INFORMATION = "tk_device_access_information";
139 139
140 140 /** ZLMediaKit 流媒体表 */
141   - public static final String TK_MEDIA_SERVER_NAME = "tk_media_server";
  141 + public static final String TK_MEDIA_SERVER_NAME = "tk_video_zlmediakit";
142 142
143 143 /** 视频通道表 */
144 144 public static final String TK_VIDEO_CHANNEL_NAME = "tk_video_channel";
... ...
... ... @@ -136,6 +136,7 @@ public enum ErrorMessage {
136 136
137 137 DEVICE_NOT_ONLINE(400111,"设备不在线,无法执行相关操作!"),
138 138 GBT_VIDEO_REPETITION(400112,"该设备通道号视频已存在!"),
  139 + SSRC_NO_ENABLED(400113,"未获取到可用的SSRC资源"),
139 140 HAVE_NO_PERMISSION(500002,"没有修改权限"),
140 141 NOT_ALLOED_ISOLATED_IN_MONOLITH(500003,"【monolith】模式下,不能选择【isolated】类型的租户配置");
141 142
... ...
... ... @@ -23,7 +23,7 @@ public class TkMediaServerEntity extends TenantBaseEntity {
23 23 private Integer rtspSslPort;
24 24 private String secret;
25 25 private boolean rtpEnable;
26   - private Integer status;
  26 + private boolean status;
27 27 private String rtpPortRange;
28 28 private String sendRtpPortRange;
29 29 private int recordAssistPort;
... ...
... ... @@ -8,6 +8,7 @@ import org.thingsboard.server.common.data.yunteng.config.media.UserSetting;
8 8 import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;
9 9 import org.thingsboard.server.common.data.yunteng.core.cache.CacheUtils;
10 10 import org.thingsboard.server.common.data.yunteng.core.exception.TkDataValidationException;
  11 +import org.thingsboard.server.common.data.yunteng.core.message.ErrorMessage;
11 12
12 13 /** ssrc使用 */
13 14 @Component
... ... @@ -88,7 +89,7 @@ public class SSRCFactory {
88 89 String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId;
89 90 Optional<Set<String>> all = cacheUtils.get(cacheName, redisKey);
90 91 if (all.isEmpty()) {
91   - throw new TkDataValidationException("ssrc已经用完");
  92 + throw new TkDataValidationException(ErrorMessage.SSRC_NO_ENABLED.getMessage());
92 93 } else {
93 94 // 在集合中移除并返回第一个成员。
94 95 Set<String> list = all.get();
... ...
... ... @@ -3,9 +3,12 @@ package org.thingsboard.server.dao.yunteng.impl.media;
3 3 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
4 4 import com.baomidou.mybatisplus.core.metadata.IPage;
5 5 import java.time.LocalDateTime;
  6 +import java.util.HashMap;
6 7 import java.util.List;
7 8 import java.util.Map;
8 9 import java.util.Optional;
  10 +import java.util.stream.Collectors;
  11 +
9 12 import lombok.RequiredArgsConstructor;
10 13 import org.springframework.stereotype.Service;
11 14 import org.springframework.transaction.annotation.Transactional;
... ... @@ -74,6 +77,23 @@ public class TkVideoChannelServiceImpl
74 77 }
75 78
76 79 @Override
  80 + public Map<String, String> findPlayingChannel(String cameraCode) {
  81 + List<TkVideoChannelEntity> channelList =
  82 + baseMapper.selectList(
  83 + new LambdaQueryWrapper<TkVideoChannelEntity>()
  84 + .isNotNull(TkVideoChannelEntity::getStreamId)
  85 + .eq(TkVideoChannelEntity::getCameraCode, cameraCode));
  86 + Map<String, String> result = new HashMap<>();
  87 + if(channelList == null){
  88 + return result;
  89 + }
  90 + channelList.forEach(item ->{
  91 + result.put(item.getChannelId(),item.getStreamId());
  92 + });
  93 + return result;
  94 + }
  95 +
  96 + @Override
77 97 public VideoChanelDTO updateChannelInfo(VideoChanelDTO videoChanelDTO) {
78 98 if (StringUtils.isEmpty(videoChanelDTO.getId())) {
79 99 throw new TkDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage());
... ... @@ -103,6 +123,14 @@ public class TkVideoChannelServiceImpl
103 123 .orElse(false);
104 124 }
105 125
  126 + @Override
  127 + public void updateVideoChannelState(String tbDeviceId, StatusEnum online) {
  128 + TkVideoChannelEntity entity = new TkVideoChannelEntity();
  129 + entity.setStatus(online);
  130 + entity.setStreamId(null);
  131 + baseMapper.update(entity,new LambdaQueryWrapper<TkVideoChannelEntity>()
  132 + .eq(TkVideoChannelEntity::getDeviceId, tbDeviceId));
  133 + }
106 134
107 135 @Override
108 136 public List<VideoChanelDTO> getListByDeviceId(String deviceId,String tenantId) {
... ...
... ... @@ -3,6 +3,7 @@ package org.thingsboard.server.dao.yunteng.service.media;
3 3 import java.util.List;
4 4 import java.util.Map;
5 5 import org.thingsboard.server.common.data.yunteng.dto.sip.VideoChanelDTO;
  6 +import org.thingsboard.server.common.data.yunteng.enums.StatusEnum;
6 7 import org.thingsboard.server.common.data.yunteng.utils.tools.TkPageData;
7 8 import org.thingsboard.server.dao.yunteng.entities.TkVideoChannelEntity;
8 9 import org.thingsboard.server.dao.yunteng.service.BaseService;
... ... @@ -27,7 +28,12 @@ public interface TkVideoChannelService extends BaseService<TkVideoChannelEntity>
27 28 * @param cameraCode
28 29 * @return
29 30 */
30   - int clearDeviceChannel(String cameraCode);
  31 + int clearDeviceChannel(String cameraCode); /**
  32 + * 查询点播中的摄像头通道
  33 + * @param cameraCode
  34 + * @return
  35 + */
  36 + Map<String,String> findPlayingChannel(String cameraCode);
31 37
32 38 /**
33 39 * 更新视频通道信息。
... ... @@ -46,7 +52,14 @@ public interface TkVideoChannelService extends BaseService<TkVideoChannelEntity>
46 52 * @return true 更新成功 false更新失败
47 53 */
48 54 boolean updateVideoChannelStreamId(
49   - String streamId, String deviceCode, String channelId);
  55 + String streamId, String deviceCode, String channelId);
  56 + /**
  57 + * 更新视频通道状态
  58 + *
  59 + * @param tbDeviceId 设备ID
  60 + * @return true 更新成功 false更新失败
  61 + */
  62 + void updateVideoChannelState(String tbDeviceId, StatusEnum online);
50 63
51 64 /**
52 65 * 根据设备id获取通道号集合
... ...