Commit c7134b0be30d78144a9a1c4572e13882dbd753ba

Authored by xp.Huang
1 parent 3239eb91

feat: 增加rtsp转flv的视频流转换

  1 +package org.thingsboard.server.controller.yunteng;
  2 +
  3 +import io.swagger.annotations.Api;
  4 +import lombok.RequiredArgsConstructor;
  5 +import org.springframework.web.bind.annotation.GetMapping;
  6 +import org.springframework.web.bind.annotation.RequestMapping;
  7 +import org.springframework.web.bind.annotation.RequestParam;
  8 +import org.springframework.web.bind.annotation.RestController;
  9 +import org.thingsboard.server.common.data.StringUtils;
  10 +import org.thingsboard.server.dao.yunteng.service.rtsp.TkFLVService;
  11 +
  12 +import javax.servlet.http.HttpServletRequest;
  13 +import javax.servlet.http.HttpServletResponse;
  14 +
  15 +@RestController
  16 +@RequestMapping("api/yt/rtsp")
  17 +@Api(tags = {"rtsp流转换"})
  18 +@RequiredArgsConstructor
  19 +public class TkRtspConvertController {
  20 +
  21 + private final TkFLVService tkFLVService;
  22 +
  23 + @GetMapping("/openFlv")
  24 + public void open(
  25 + @RequestParam String url,@RequestParam String browserId, HttpServletResponse response, HttpServletRequest request) {
  26 + if (StringUtils.isNotEmpty(url)) {
  27 + tkFLVService.open(url,browserId, response, request);
  28 + }
  29 + }
  30 +
  31 + @GetMapping("/closeFlv")
  32 + public void close(@RequestParam String url,@RequestParam String browserId) {
  33 + if (StringUtils.isNotEmpty(url)) {
  34 + tkFLVService.close(url,browserId);
  35 + }
  36 + }
  37 +}
... ...
... ... @@ -254,6 +254,16 @@
254 254 </exclusion>
255 255 </exclusions>
256 256 </dependency>
  257 + <dependency>
  258 + <groupId>org.bytedeco</groupId>
  259 + <artifactId>javacv-platform</artifactId>
  260 + <version>${javacv.platform.version}</version>
  261 + </dependency>
  262 + <dependency>
  263 + <groupId>javax.xml.bind</groupId>
  264 + <artifactId>jaxb-api</artifactId>
  265 + <version>${jaxb.api.version}</version>
  266 + </dependency>
257 267 </dependencies>
258 268 <build>
259 269 <plugins>
... ...
  1 +package org.thingsboard.server.dao.yunteng.impl.rtsp;
  2 +
  3 +import lombok.extern.slf4j.Slf4j;
  4 +import org.apache.commons.io.IOUtils;
  5 +import org.bytedeco.ffmpeg.avcodec.AVPacket;
  6 +import org.bytedeco.ffmpeg.global.avcodec;
  7 +import org.bytedeco.javacv.FFmpegFrameGrabber;
  8 +import org.bytedeco.javacv.FFmpegFrameRecorder;
  9 +import org.thingsboard.server.dao.yunteng.service.rtsp.TkConverter;
  10 +
  11 +import javax.servlet.AsyncContext;
  12 +import java.io.ByteArrayOutputStream;
  13 +import java.io.IOException;
  14 +import java.util.Iterator;
  15 +import java.util.List;
  16 +import java.util.Map;
  17 +
  18 +@Slf4j
  19 +public class TkAbstractBaseConvert implements TkConverter {
  20 +
  21 + public volatile boolean runing = true;
  22 + /** 读流器 */
  23 + private FFmpegFrameGrabber grabber;
  24 + /** 转码器 */
  25 + private FFmpegFrameRecorder recorder;
  26 + /**
  27 + * 转FLV格式的头信息<br>
  28 + * 如果有第二个客户端播放首先要返回头信息
  29 + */
  30 + private byte[] headers;
  31 + /** 保存转换好的流 */
  32 + private ByteArrayOutputStream stream;
  33 + /** 流地址,h264,aac */
  34 + private String url;
  35 + /** 流输出 */
  36 + private List<AsyncContext> outEntitys;
  37 +
  38 + /** key用于表示这个转换器 */
  39 + private String key;
  40 +
  41 + /** 转换队列 */
  42 + private Map<String, TkConverter> factories;
  43 +
  44 + public TkAbstractBaseConvert(
  45 + String url, String key, Map<String, TkConverter> factories, List<AsyncContext> outEntitys) {
  46 + this.url = url;
  47 + this.outEntitys = outEntitys;
  48 + this.key = key;
  49 + this.factories = factories;
  50 + }
  51 +
  52 + /**
  53 + * 输出FLV视频流
  54 + *
  55 + * @param b
  56 + */
  57 + private void writeResponse(byte[] b, List<AsyncContext> outEntitys) {
  58 + Iterator<AsyncContext> it = outEntitys.iterator();
  59 + while (it.hasNext()) {
  60 + AsyncContext o = it.next();
  61 + try {
  62 + o.getResponse().getOutputStream().write(b);
  63 + } catch (Exception e) {
  64 + if (null != it && it.hasNext()) {
  65 + it.remove();
  66 + log.info("移除一个输出");
  67 + }
  68 + }
  69 + }
  70 + }
  71 +
  72 + @Override
  73 + public void addOutputStreamEntity(String key, AsyncContext entity) throws IOException {
  74 + outEntitys.add(entity);
  75 + if (headers != null) {
  76 + entity.getResponse().getOutputStream().write(headers);
  77 + entity.getResponse().getOutputStream().flush();
  78 + }
  79 + }
  80 +
  81 + @Override
  82 + public void convert() {
  83 + try {
  84 + grabber = new FFmpegFrameGrabber(url);
  85 + if ("rtsp".equals(url.substring(0, 4))) {
  86 + grabber.setOption("rtsp_transport", "tcp");
  87 + grabber.setOption("stimeout", "5000000");
  88 + }
  89 + grabber.startUnsafe();
  90 + stream = new ByteArrayOutputStream();
  91 + recorder =
  92 + new FFmpegFrameRecorder(
  93 + stream,
  94 + grabber.getImageWidth(),
  95 + grabber.getImageHeight(),
  96 + grabber.getAudioChannels());
  97 + recorder.setInterleaved(true);
  98 + recorder.setVideoOption("preset", "ultrafast");
  99 + recorder.setVideoOption("tune", "zerolatency");
  100 + double framerate = 25.0;
  101 + if (grabber.getFrameRate() > 0 && grabber.getFrameRate() < 100) {
  102 + framerate = grabber.getFrameRate();
  103 + }
  104 + recorder.setVideoOption("crf", String.valueOf(framerate));
  105 + recorder.setFrameRate(grabber.getFrameRate());
  106 + recorder.setSampleRate(grabber.getSampleRate());
  107 + recorder.setFormat("flv");
  108 + recorder.setTimestamp(grabber.getTimestamp());
  109 + log.info("this url:{} converter start", url);
  110 + if (avcodec.AV_CODEC_ID_H264 == grabber.getVideoCodec()) {
  111 + rtspConvert();
  112 + }
  113 + factories.put(key, this);
  114 + } catch (Exception e) {
  115 + log.error(e.getMessage(), e);
  116 + } finally {
  117 + closeConverter();
  118 + completeResponse();
  119 + log.info("this url:{} converter exit", url);
  120 + factories.remove(this.key);
  121 + }
  122 + }
  123 + private void rtspConvert()
  124 + throws FFmpegFrameGrabber.Exception, FFmpegFrameRecorder.Exception, InterruptedException {
  125 + // 来源视频H264格式,音频AAC格式
  126 + // 无须转码,更低的资源消耗,更低的延迟
  127 + recorder.setVideoBitrate(grabber.getVideoBitrate());
  128 + recorder.setVideoCodec(grabber.getVideoCodec());
  129 + recorder.start(grabber.getFormatContext());
  130 + if (headers == null) {
  131 + headers = stream.toByteArray();
  132 + stream.reset();
  133 + writeResponse(headers, outEntitys);
  134 + }
  135 + int nullNumber = 0;
  136 + long dts = 0;
  137 + long pts = 0;
  138 + int timebase;
  139 + while (runing) {
  140 + AVPacket k = grabber.grabPacket();
  141 + if (k != null) {
  142 + k.pts(pts);
  143 + k.dts(dts);
  144 + timebase = grabber.getFormatContext().streams(k.stream_index()).time_base().den();
  145 + pts += timebase / grabber.getFrameRate();
  146 + dts += timebase / grabber.getFrameRate();
  147 + try {
  148 + recorder.recordPacket(k);
  149 + } catch (Exception e) {
  150 + }
  151 + if (stream.size() > 0) {
  152 + byte[] b = stream.toByteArray();
  153 + stream.reset();
  154 + writeResponse(b, outEntitys);
  155 + if (outEntitys.isEmpty()) {
  156 + log.info("没有输出退出");
  157 + break;
  158 + }
  159 + }
  160 + avcodec.av_packet_unref(k);
  161 + } else {
  162 + nullNumber++;
  163 + if (nullNumber > 200) {
  164 + break;
  165 + }
  166 + }
  167 + Thread.sleep(5);
  168 + }
  169 + }
  170 +
  171 + /** 退出转换 */
  172 + public void closeConverter() {
  173 + try {
  174 + IOUtils.close(grabber);
  175 + factories.remove(this.key);
  176 + IOUtils.close(recorder);
  177 + IOUtils.close(stream);
  178 + } catch (Exception e) {
  179 +
  180 + }
  181 + }
  182 +
  183 + /** 关闭异步响应 */
  184 + public void completeResponse() {
  185 + Iterator<AsyncContext> it = outEntitys.iterator();
  186 + while (it.hasNext()) {
  187 + AsyncContext o = it.next();
  188 + try{
  189 + o.complete();
  190 + }catch (Exception e){
  191 + log.info("请求已完成处理");
  192 + }
  193 + }
  194 + }
  195 +
  196 + @Override
  197 + public void exit() {
  198 + this.runing = false;
  199 + }
  200 +}
... ...
  1 +package org.thingsboard.server.dao.yunteng.impl.rtsp;
  2 +
  3 +import lombok.extern.slf4j.Slf4j;
  4 +import org.springframework.stereotype.Service;
  5 +import org.thingsboard.common.util.ThingsBoardThreadFactory;
  6 +import org.thingsboard.server.dao.yunteng.service.rtsp.TkConverter;
  7 +import org.thingsboard.server.dao.yunteng.service.rtsp.TkFLVService;
  8 +
  9 +import javax.annotation.PostConstruct;
  10 +import javax.annotation.PreDestroy;
  11 +import javax.servlet.AsyncContext;
  12 +import javax.servlet.http.HttpServletRequest;
  13 +import javax.servlet.http.HttpServletResponse;
  14 +import java.io.IOException;
  15 +import java.security.MessageDigest;
  16 +import java.security.NoSuchAlgorithmException;
  17 +import java.util.List;
  18 +import java.util.concurrent.*;
  19 +
  20 +@Service
  21 +@Slf4j
  22 +public class TkFLVServiceImpl implements TkFLVService {
  23 + private ConcurrentHashMap<String, TkConverter> converters = new ConcurrentHashMap<>();
  24 + private ExecutorService executorService;
  25 +
  26 + @PostConstruct
  27 + public void init() {
  28 + executorService =
  29 + new ThreadPoolExecutor(
  30 + 0,
  31 + 50,
  32 + 10L,
  33 + TimeUnit.SECONDS,
  34 + new SynchronousQueue<Runnable>(),
  35 + ThingsBoardThreadFactory.forName("rtsp-convert-flv"));
  36 + }
  37 +
  38 + @PreDestroy
  39 + public void stop() {
  40 + if (executorService != null) {
  41 + executorService.shutdownNow();
  42 + }
  43 + }
  44 +
  45 + @Override
  46 + public void open(
  47 + String url, String browserId, HttpServletResponse response, HttpServletRequest request) {
  48 + String key = md5(url + browserId);
  49 + AsyncContext async = request.startAsync();
  50 + async.setTimeout(900000000);
  51 + if (converters.containsKey(key)) {
  52 + TkConverter c = converters.get(key);
  53 + try {
  54 + c.addOutputStreamEntity(key, async);
  55 + } catch (IOException e) {
  56 + log.error(e.getMessage(), e);
  57 + throw new IllegalArgumentException(e.getMessage());
  58 + }
  59 + } else {
  60 + executorService.execute(
  61 + () -> {
  62 + List<AsyncContext> outs = List.of(async);
  63 + TkAbstractBaseConvert c = new TkAbstractBaseConvert(url, key, converters, outs);
  64 + c.convert();
  65 + });
  66 + }
  67 + response.setHeader("Connection", "keep-alive");
  68 +
  69 + response.setStatus(HttpServletResponse.SC_OK);
  70 + try {
  71 + response.flushBuffer();
  72 + } catch (IOException e) {
  73 + log.error(e.getMessage(), e);
  74 + }
  75 + }
  76 +
  77 + public String md5(String plainText) {
  78 + StringBuilder buf = null;
  79 + try {
  80 + MessageDigest md = MessageDigest.getInstance("MD5");
  81 + md.update(plainText.getBytes());
  82 + byte b[] = md.digest();
  83 + int i;
  84 + buf = new StringBuilder("");
  85 + for (int offset = 0; offset < b.length; offset++) {
  86 + i = b[offset];
  87 + if (i < 0) i += 256;
  88 + if (i < 16) buf.append("0");
  89 + buf.append(Integer.toHexString(i));
  90 + }
  91 + } catch (NoSuchAlgorithmException e) {
  92 + log.error(e.getMessage(), e);
  93 + }
  94 + return buf.toString();
  95 + }
  96 +
  97 + public void close(String url, String browserId) {
  98 + // executorService.execute(()->{
  99 + //
  100 + // });
  101 + String key = md5(url + browserId);
  102 + if (converters.containsKey(key)) {
  103 + TkConverter c = converters.get(key);
  104 + c.exit();
  105 + }
  106 + }
  107 +}
... ...
  1 +package org.thingsboard.server.dao.yunteng.service.rtsp;
  2 +
  3 +import javax.servlet.AsyncContext;
  4 +import java.io.IOException;
  5 +
  6 +public interface TkConverter {
  7 + /**
  8 + * 添加一个流输出
  9 + *
  10 + * @param entity
  11 + */
  12 + void addOutputStreamEntity(String key, AsyncContext entity) throws IOException;
  13 +
  14 + /**
  15 + * 退出转换
  16 + */
  17 + void exit();
  18 +
  19 + void convert();
  20 +}
... ...
  1 +package org.thingsboard.server.dao.yunteng.service.rtsp;
  2 +
  3 +import javax.servlet.http.HttpServletRequest;
  4 +import javax.servlet.http.HttpServletResponse;
  5 +
  6 +public interface TkFLVService {
  7 + /**
  8 + * 打开一个流地址
  9 + *
  10 + * @param url 流地址
  11 + * @param response 响应
  12 + * @param request 请求
  13 + */
  14 + void open(String url, String browserId,HttpServletResponse response, HttpServletRequest request);
  15 +
  16 + /**
  17 + * 关闭视频流
  18 + *
  19 + * @param url 视频流地址
  20 + */
  21 + void close(String url,String browserId);
  22 +}
... ...
... ... @@ -141,6 +141,8 @@
141 141 <jakarta.mail.version>2.0.1</jakarta.mail.version>
142 142 <io.minio.version>8.3.1</io.minio.version>
143 143 <com.alibaba.easyexcel.version>3.1.1</com.alibaba.easyexcel.version>
  144 + <javacv.platform.version>1.5.9</javacv.platform.version>
  145 + <jaxb.api.version>2.3.1</jaxb.api.version>
144 146 </properties>
145 147
146 148 <modules>
... ...