通过ffmpeg子进程进行视频数据IO
在求职过程中,我需要完成一个处理视频数据的原型项目,这其中涉及到使用ffmpeg进行视频预处理,并将多个视频文件连接在一起播放。由于缺乏经验,我借助生成式ai聊天机器人(google gemini)的帮助完成了这个挑战。
项目目标是串联播放多个视频。我采用了最直接的方法——将视频文件连接起来。为此,首先需要将视频文件重新编码成合适的格式。在与Gemini的讨论中,它建议使用MPEG-TS格式。
MPEG传输流(MPEG-TS)通过封装分组基本流工作,这些流包括音频、视频和PSIP数据,被打包成小段。每个流被分割成188字节的部分并交织在一起。这种方式确保了更短的延迟和更高的容错能力,使其成为理想的视频会议格式,尤其是在大帧可能导致音频延迟的情况下。引自https://castr.com/blog/mpeg-transport-stream-mpeg-ts/经过重新编码后,视频数据将被送入队列,供后续模块处理。确定输入(视频文件链接列表)和输出(重新编码的视频数据)后,我需要找到合适的FFmpeg命令。由于FFmpeg功能复杂,我利用Gemini的帮助,快速得到了正确的命令:
ffmpeg -hwaccel cuda -i pipe:0 -c:v h264_nvenc -b:v 1.5m -c:a aac -b:a 128k -f mpegts -y pipe:1
Gemini对FFmpeg命令的解释
该命令通过标准输入(stdin)接收视频数据,并将重新编码后的视频数据输出到标准输出(stdout)。
为了实现异步读取和写入,我使用了httpx库和asyncio。代码如下:
import httpximport asyncioimport subprocessfrom functools import partialfrom queue import Queue# ... (日志记录设置) ...client = httpx.AsyncClient()async def write_input(client, video_link, process): assert isinstance(process.stdin, asyncio.StreamWriter) async with client.stream("GET", video_link) as response: logger.info("data: streaming video to queue", link=video_link) async for chunk in response.aiter_raw(1024): process.stdin.write(chunk) await process.stdin.drain() if process.stdin.can_write_eof(): process.stdin.write_eof() process.stdin.close() await process.stdin.wait_closed() logger.info("data: done downloading video to ffmpeg")async def video_send(queue, client, video_link): logger.info("data: fetching video from link", link=video_link) process = await asyncio.create_subprocess_exec( "ffmpeg", "-hwaccel", "cuda", "-i", "pipe:0", "-c:v", "h264_nvenc", "-b:v", "1.5m", "-c:a", "aac", "-b:a", "128k", "-f", "mpegts", "-y", "pipe:1", stdin=subprocess.PIPE, stdout=subprocess.PIPE, ) asyncio.create_task(write_input(client, video_link, process)) assert isinstance(process.stdout, asyncio.StreamReader) while True: chunk = await process.stdout.read(1024) if not chunk: break else: await asyncio.to_thread(partial(queue.put, chunk)) await process.wait() logger.info("DATA: Done sending video to queue")# ... (主函数调用video_send) ...
代码实现了异步地从网络下载视频并将其发送到FFmpeg进行处理,然后将处理后的数据放入队列。整个过程充分利用了asyncio的异步特性,避免了因等待下载而造成的延迟。
这个项目花了我一个晚上才完成,期间不断查阅文档并寻求Gemini的帮助。 希望本文对您有所帮助。