Commit 2222e79c84da6efc87f9f80c55ab0ffad8b4f58c
1 parent
9f4f4aaa
feat: 增加rtsp转flv的视频流转换
(cherry picked from commit c7134b0b)
Showing
7 changed files
with
398 additions
and
0 deletions
application/src/main/java/org/thingsboard/server/controller/yunteng/TkRtspConvertController.java
0 → 100644
1 | +package org.thingsboard.server.controller.yunteng; | ||
2 | + | ||
3 | +import io.swagger.annotations.Api; | ||
4 | +import lombok.RequiredArgsConstructor; | ||
5 | +import org.springframework.web.bind.annotation.GetMapping; | ||
6 | +import org.springframework.web.bind.annotation.RequestMapping; | ||
7 | +import org.springframework.web.bind.annotation.RequestParam; | ||
8 | +import org.springframework.web.bind.annotation.RestController; | ||
9 | +import org.thingsboard.server.common.data.StringUtils; | ||
10 | +import org.thingsboard.server.dao.yunteng.service.rtsp.TkFLVService; | ||
11 | + | ||
12 | +import javax.servlet.http.HttpServletRequest; | ||
13 | +import javax.servlet.http.HttpServletResponse; | ||
14 | + | ||
15 | +@RestController | ||
16 | +@RequestMapping("api/yt/rtsp") | ||
17 | +@Api(tags = {"rtsp流转换"}) | ||
18 | +@RequiredArgsConstructor | ||
19 | +public class TkRtspConvertController { | ||
20 | + | ||
21 | + private final TkFLVService tkFLVService; | ||
22 | + | ||
23 | + @GetMapping("/openFlv") | ||
24 | + public void open( | ||
25 | + @RequestParam String url,@RequestParam String browserId, HttpServletResponse response, HttpServletRequest request) { | ||
26 | + if (StringUtils.isNotEmpty(url)) { | ||
27 | + tkFLVService.open(url,browserId, response, request); | ||
28 | + } | ||
29 | + } | ||
30 | + | ||
31 | + @GetMapping("/closeFlv") | ||
32 | + public void close(@RequestParam String url,@RequestParam String browserId) { | ||
33 | + if (StringUtils.isNotEmpty(url)) { | ||
34 | + tkFLVService.close(url,browserId); | ||
35 | + } | ||
36 | + } | ||
37 | +} |
@@ -254,6 +254,16 @@ | @@ -254,6 +254,16 @@ | ||
254 | </exclusion> | 254 | </exclusion> |
255 | </exclusions> | 255 | </exclusions> |
256 | </dependency> | 256 | </dependency> |
257 | + <dependency> | ||
258 | + <groupId>org.bytedeco</groupId> | ||
259 | + <artifactId>javacv-platform</artifactId> | ||
260 | + <version>${javacv.platform.version}</version> | ||
261 | + </dependency> | ||
262 | + <dependency> | ||
263 | + <groupId>javax.xml.bind</groupId> | ||
264 | + <artifactId>jaxb-api</artifactId> | ||
265 | + <version>${jaxb.api.version}</version> | ||
266 | + </dependency> | ||
257 | </dependencies> | 267 | </dependencies> |
258 | <build> | 268 | <build> |
259 | <plugins> | 269 | <plugins> |
dao/src/main/java/org/thingsboard/server/dao/yunteng/impl/rtsp/TkAbstractBaseConvert.java
0 → 100644
1 | +package org.thingsboard.server.dao.yunteng.impl.rtsp; | ||
2 | + | ||
3 | +import lombok.extern.slf4j.Slf4j; | ||
4 | +import org.apache.commons.io.IOUtils; | ||
5 | +import org.bytedeco.ffmpeg.avcodec.AVPacket; | ||
6 | +import org.bytedeco.ffmpeg.global.avcodec; | ||
7 | +import org.bytedeco.javacv.FFmpegFrameGrabber; | ||
8 | +import org.bytedeco.javacv.FFmpegFrameRecorder; | ||
9 | +import org.thingsboard.server.dao.yunteng.service.rtsp.TkConverter; | ||
10 | + | ||
11 | +import javax.servlet.AsyncContext; | ||
12 | +import java.io.ByteArrayOutputStream; | ||
13 | +import java.io.IOException; | ||
14 | +import java.util.Iterator; | ||
15 | +import java.util.List; | ||
16 | +import java.util.Map; | ||
17 | + | ||
18 | +@Slf4j | ||
19 | +public class TkAbstractBaseConvert implements TkConverter { | ||
20 | + | ||
21 | + public volatile boolean runing = true; | ||
22 | + /** 读流器 */ | ||
23 | + private FFmpegFrameGrabber grabber; | ||
24 | + /** 转码器 */ | ||
25 | + private FFmpegFrameRecorder recorder; | ||
26 | + /** | ||
27 | + * 转FLV格式的头信息<br> | ||
28 | + * 如果有第二个客户端播放首先要返回头信息 | ||
29 | + */ | ||
30 | + private byte[] headers; | ||
31 | + /** 保存转换好的流 */ | ||
32 | + private ByteArrayOutputStream stream; | ||
33 | + /** 流地址,h264,aac */ | ||
34 | + private String url; | ||
35 | + /** 流输出 */ | ||
36 | + private List<AsyncContext> outEntitys; | ||
37 | + | ||
38 | + /** key用于表示这个转换器 */ | ||
39 | + private String key; | ||
40 | + | ||
41 | + /** 转换队列 */ | ||
42 | + private Map<String, TkConverter> factories; | ||
43 | + | ||
44 | + public TkAbstractBaseConvert( | ||
45 | + String url, String key, Map<String, TkConverter> factories, List<AsyncContext> outEntitys) { | ||
46 | + this.url = url; | ||
47 | + this.outEntitys = outEntitys; | ||
48 | + this.key = key; | ||
49 | + this.factories = factories; | ||
50 | + } | ||
51 | + | ||
52 | + /** | ||
53 | + * 输出FLV视频流 | ||
54 | + * | ||
55 | + * @param b | ||
56 | + */ | ||
57 | + private void writeResponse(byte[] b, List<AsyncContext> outEntitys) { | ||
58 | + Iterator<AsyncContext> it = outEntitys.iterator(); | ||
59 | + while (it.hasNext()) { | ||
60 | + AsyncContext o = it.next(); | ||
61 | + try { | ||
62 | + o.getResponse().getOutputStream().write(b); | ||
63 | + } catch (Exception e) { | ||
64 | + if (null != it && it.hasNext()) { | ||
65 | + it.remove(); | ||
66 | + log.info("移除一个输出"); | ||
67 | + } | ||
68 | + } | ||
69 | + } | ||
70 | + } | ||
71 | + | ||
72 | + @Override | ||
73 | + public void addOutputStreamEntity(String key, AsyncContext entity) throws IOException { | ||
74 | + outEntitys.add(entity); | ||
75 | + if (headers != null) { | ||
76 | + entity.getResponse().getOutputStream().write(headers); | ||
77 | + entity.getResponse().getOutputStream().flush(); | ||
78 | + } | ||
79 | + } | ||
80 | + | ||
81 | + @Override | ||
82 | + public void convert() { | ||
83 | + try { | ||
84 | + grabber = new FFmpegFrameGrabber(url); | ||
85 | + if ("rtsp".equals(url.substring(0, 4))) { | ||
86 | + grabber.setOption("rtsp_transport", "tcp"); | ||
87 | + grabber.setOption("stimeout", "5000000"); | ||
88 | + } | ||
89 | + grabber.startUnsafe(); | ||
90 | + stream = new ByteArrayOutputStream(); | ||
91 | + recorder = | ||
92 | + new FFmpegFrameRecorder( | ||
93 | + stream, | ||
94 | + grabber.getImageWidth(), | ||
95 | + grabber.getImageHeight(), | ||
96 | + grabber.getAudioChannels()); | ||
97 | + recorder.setInterleaved(true); | ||
98 | + recorder.setVideoOption("preset", "ultrafast"); | ||
99 | + recorder.setVideoOption("tune", "zerolatency"); | ||
100 | + double framerate = 25.0; | ||
101 | + if (grabber.getFrameRate() > 0 && grabber.getFrameRate() < 100) { | ||
102 | + framerate = grabber.getFrameRate(); | ||
103 | + } | ||
104 | + recorder.setVideoOption("crf", String.valueOf(framerate)); | ||
105 | + recorder.setFrameRate(grabber.getFrameRate()); | ||
106 | + recorder.setSampleRate(grabber.getSampleRate()); | ||
107 | + recorder.setFormat("flv"); | ||
108 | + recorder.setTimestamp(grabber.getTimestamp()); | ||
109 | + log.info("this url:{} converter start", url); | ||
110 | + if (avcodec.AV_CODEC_ID_H264 == grabber.getVideoCodec()) { | ||
111 | + rtspConvert(); | ||
112 | + } | ||
113 | + factories.put(key, this); | ||
114 | + } catch (Exception e) { | ||
115 | + log.error(e.getMessage(), e); | ||
116 | + } finally { | ||
117 | + closeConverter(); | ||
118 | + completeResponse(); | ||
119 | + log.info("this url:{} converter exit", url); | ||
120 | + factories.remove(this.key); | ||
121 | + } | ||
122 | + } | ||
123 | + private void rtspConvert() | ||
124 | + throws FFmpegFrameGrabber.Exception, FFmpegFrameRecorder.Exception, InterruptedException { | ||
125 | + // 来源视频H264格式,音频AAC格式 | ||
126 | + // 无须转码,更低的资源消耗,更低的延迟 | ||
127 | + recorder.setVideoBitrate(grabber.getVideoBitrate()); | ||
128 | + recorder.setVideoCodec(grabber.getVideoCodec()); | ||
129 | + recorder.start(grabber.getFormatContext()); | ||
130 | + if (headers == null) { | ||
131 | + headers = stream.toByteArray(); | ||
132 | + stream.reset(); | ||
133 | + writeResponse(headers, outEntitys); | ||
134 | + } | ||
135 | + int nullNumber = 0; | ||
136 | + long dts = 0; | ||
137 | + long pts = 0; | ||
138 | + int timebase; | ||
139 | + while (runing) { | ||
140 | + AVPacket k = grabber.grabPacket(); | ||
141 | + if (k != null) { | ||
142 | + k.pts(pts); | ||
143 | + k.dts(dts); | ||
144 | + timebase = grabber.getFormatContext().streams(k.stream_index()).time_base().den(); | ||
145 | + pts += timebase / grabber.getFrameRate(); | ||
146 | + dts += timebase / grabber.getFrameRate(); | ||
147 | + try { | ||
148 | + recorder.recordPacket(k); | ||
149 | + } catch (Exception e) { | ||
150 | + } | ||
151 | + if (stream.size() > 0) { | ||
152 | + byte[] b = stream.toByteArray(); | ||
153 | + stream.reset(); | ||
154 | + writeResponse(b, outEntitys); | ||
155 | + if (outEntitys.isEmpty()) { | ||
156 | + log.info("没有输出退出"); | ||
157 | + break; | ||
158 | + } | ||
159 | + } | ||
160 | + avcodec.av_packet_unref(k); | ||
161 | + } else { | ||
162 | + nullNumber++; | ||
163 | + if (nullNumber > 200) { | ||
164 | + break; | ||
165 | + } | ||
166 | + } | ||
167 | + Thread.sleep(5); | ||
168 | + } | ||
169 | + } | ||
170 | + | ||
171 | + /** 退出转换 */ | ||
172 | + public void closeConverter() { | ||
173 | + try { | ||
174 | + IOUtils.close(grabber); | ||
175 | + factories.remove(this.key); | ||
176 | + IOUtils.close(recorder); | ||
177 | + IOUtils.close(stream); | ||
178 | + } catch (Exception e) { | ||
179 | + | ||
180 | + } | ||
181 | + } | ||
182 | + | ||
183 | + /** 关闭异步响应 */ | ||
184 | + public void completeResponse() { | ||
185 | + Iterator<AsyncContext> it = outEntitys.iterator(); | ||
186 | + while (it.hasNext()) { | ||
187 | + AsyncContext o = it.next(); | ||
188 | + try{ | ||
189 | + o.complete(); | ||
190 | + }catch (Exception e){ | ||
191 | + log.info("请求已完成处理"); | ||
192 | + } | ||
193 | + } | ||
194 | + } | ||
195 | + | ||
196 | + @Override | ||
197 | + public void exit() { | ||
198 | + this.runing = false; | ||
199 | + } | ||
200 | +} |
1 | +package org.thingsboard.server.dao.yunteng.impl.rtsp; | ||
2 | + | ||
3 | +import lombok.extern.slf4j.Slf4j; | ||
4 | +import org.springframework.stereotype.Service; | ||
5 | +import org.thingsboard.common.util.ThingsBoardThreadFactory; | ||
6 | +import org.thingsboard.server.dao.yunteng.service.rtsp.TkConverter; | ||
7 | +import org.thingsboard.server.dao.yunteng.service.rtsp.TkFLVService; | ||
8 | + | ||
9 | +import javax.annotation.PostConstruct; | ||
10 | +import javax.annotation.PreDestroy; | ||
11 | +import javax.servlet.AsyncContext; | ||
12 | +import javax.servlet.http.HttpServletRequest; | ||
13 | +import javax.servlet.http.HttpServletResponse; | ||
14 | +import java.io.IOException; | ||
15 | +import java.security.MessageDigest; | ||
16 | +import java.security.NoSuchAlgorithmException; | ||
17 | +import java.util.List; | ||
18 | +import java.util.concurrent.*; | ||
19 | + | ||
20 | +@Service | ||
21 | +@Slf4j | ||
22 | +public class TkFLVServiceImpl implements TkFLVService { | ||
23 | + private ConcurrentHashMap<String, TkConverter> converters = new ConcurrentHashMap<>(); | ||
24 | + private ExecutorService executorService; | ||
25 | + | ||
26 | + @PostConstruct | ||
27 | + public void init() { | ||
28 | + executorService = | ||
29 | + new ThreadPoolExecutor( | ||
30 | + 0, | ||
31 | + 50, | ||
32 | + 10L, | ||
33 | + TimeUnit.SECONDS, | ||
34 | + new SynchronousQueue<Runnable>(), | ||
35 | + ThingsBoardThreadFactory.forName("rtsp-convert-flv")); | ||
36 | + } | ||
37 | + | ||
38 | + @PreDestroy | ||
39 | + public void stop() { | ||
40 | + if (executorService != null) { | ||
41 | + executorService.shutdownNow(); | ||
42 | + } | ||
43 | + } | ||
44 | + | ||
45 | + @Override | ||
46 | + public void open( | ||
47 | + String url, String browserId, HttpServletResponse response, HttpServletRequest request) { | ||
48 | + String key = md5(url + browserId); | ||
49 | + AsyncContext async = request.startAsync(); | ||
50 | + async.setTimeout(900000000); | ||
51 | + if (converters.containsKey(key)) { | ||
52 | + TkConverter c = converters.get(key); | ||
53 | + try { | ||
54 | + c.addOutputStreamEntity(key, async); | ||
55 | + } catch (IOException e) { | ||
56 | + log.error(e.getMessage(), e); | ||
57 | + throw new IllegalArgumentException(e.getMessage()); | ||
58 | + } | ||
59 | + } else { | ||
60 | + executorService.execute( | ||
61 | + () -> { | ||
62 | + List<AsyncContext> outs = List.of(async); | ||
63 | + TkAbstractBaseConvert c = new TkAbstractBaseConvert(url, key, converters, outs); | ||
64 | + c.convert(); | ||
65 | + }); | ||
66 | + } | ||
67 | + response.setHeader("Connection", "keep-alive"); | ||
68 | + | ||
69 | + response.setStatus(HttpServletResponse.SC_OK); | ||
70 | + try { | ||
71 | + response.flushBuffer(); | ||
72 | + } catch (IOException e) { | ||
73 | + log.error(e.getMessage(), e); | ||
74 | + } | ||
75 | + } | ||
76 | + | ||
77 | + public String md5(String plainText) { | ||
78 | + StringBuilder buf = null; | ||
79 | + try { | ||
80 | + MessageDigest md = MessageDigest.getInstance("MD5"); | ||
81 | + md.update(plainText.getBytes()); | ||
82 | + byte b[] = md.digest(); | ||
83 | + int i; | ||
84 | + buf = new StringBuilder(""); | ||
85 | + for (int offset = 0; offset < b.length; offset++) { | ||
86 | + i = b[offset]; | ||
87 | + if (i < 0) i += 256; | ||
88 | + if (i < 16) buf.append("0"); | ||
89 | + buf.append(Integer.toHexString(i)); | ||
90 | + } | ||
91 | + } catch (NoSuchAlgorithmException e) { | ||
92 | + log.error(e.getMessage(), e); | ||
93 | + } | ||
94 | + return buf.toString(); | ||
95 | + } | ||
96 | + | ||
97 | + public void close(String url, String browserId) { | ||
98 | + // executorService.execute(()->{ | ||
99 | + // | ||
100 | + // }); | ||
101 | + String key = md5(url + browserId); | ||
102 | + if (converters.containsKey(key)) { | ||
103 | + TkConverter c = converters.get(key); | ||
104 | + c.exit(); | ||
105 | + } | ||
106 | + } | ||
107 | +} |
1 | +package org.thingsboard.server.dao.yunteng.service.rtsp; | ||
2 | + | ||
3 | +import javax.servlet.AsyncContext; | ||
4 | +import java.io.IOException; | ||
5 | + | ||
6 | +public interface TkConverter { | ||
7 | + /** | ||
8 | + * 添加一个流输出 | ||
9 | + * | ||
10 | + * @param entity | ||
11 | + */ | ||
12 | + void addOutputStreamEntity(String key, AsyncContext entity) throws IOException; | ||
13 | + | ||
14 | + /** | ||
15 | + * 退出转换 | ||
16 | + */ | ||
17 | + void exit(); | ||
18 | + | ||
19 | + void convert(); | ||
20 | +} |
1 | +package org.thingsboard.server.dao.yunteng.service.rtsp; | ||
2 | + | ||
3 | +import javax.servlet.http.HttpServletRequest; | ||
4 | +import javax.servlet.http.HttpServletResponse; | ||
5 | + | ||
6 | +public interface TkFLVService { | ||
7 | + /** | ||
8 | + * 打开一个流地址 | ||
9 | + * | ||
10 | + * @param url 流地址 | ||
11 | + * @param response 响应 | ||
12 | + * @param request 请求 | ||
13 | + */ | ||
14 | + void open(String url, String browserId,HttpServletResponse response, HttpServletRequest request); | ||
15 | + | ||
16 | + /** | ||
17 | + * 关闭视频流 | ||
18 | + * | ||
19 | + * @param url 视频流地址 | ||
20 | + */ | ||
21 | + void close(String url,String browserId); | ||
22 | +} |
@@ -141,6 +141,8 @@ | @@ -141,6 +141,8 @@ | ||
141 | <jakarta.mail.version>2.0.1</jakarta.mail.version> | 141 | <jakarta.mail.version>2.0.1</jakarta.mail.version> |
142 | <io.minio.version>8.3.1</io.minio.version> | 142 | <io.minio.version>8.3.1</io.minio.version> |
143 | <com.alibaba.easyexcel.version>3.1.1</com.alibaba.easyexcel.version> | 143 | <com.alibaba.easyexcel.version>3.1.1</com.alibaba.easyexcel.version> |
144 | + <javacv.platform.version>1.5.9</javacv.platform.version> | ||
145 | + <jaxb.api.version>2.3.1</jaxb.api.version> | ||
144 | </properties> | 146 | </properties> |
145 | 147 | ||
146 | <modules> | 148 | <modules> |