ubuntu 22.04中音视频合成?-灵析社区

生成头像

需求: 声音动态生成, 视频固定来源. 代码中使用的是testsrc. 代码一直卡在rawvedio写入命名管道哪也没撒错误 示例代码: import subprocess import os from threading import Thread import numpy as np from transformers import VitsModel, VitsTokenizer, PreTrainedTokenizerBase import torch import ffmpeg def read_frame_from_stdout(vedioProcess, width, height): frame_size = width * height * 3 input_bytes = vedioProcess.stdout.read(frame_size) if not input_bytes: return assert len(input_bytes) == frame_size return np.frombuffer(input_bytes, np.uint8).reshape([height, width, 3]) def writer(vedioProcess, pipe_name, chunk_size): width = 640 height = 480 while True: input_frame = read_frame_from_stdout(vedioProcess, width, height) print('read frame is:' % input_frame) if input_frame is None: print('read frame is: None') break frame = input_frame * 0.3 os.write(fd_pipe, (frame.astype(np.uint8).tobytes())) # Closing the pipes as closing files. os.close(fd_pipe) # 加载TTS模型 def loadModel(device: str): model = VitsModel.from_pretrained("./mms-tts-eng", local_files_only=True).to(device) # acebook/mms-tts-deu tokenizer = VitsTokenizer.from_pretrained("./mms-tts-eng", local_files_only=True) return model, tokenizer # 将32位浮点转成16位整数, 适用于:16000(音频采样率) def covertFl32ToInt16(nyArr): return np.int16(nyArr / np.max(np.abs(nyArr)) * 32767) def audioWriteInPipe(nyArr, audioPipeName): # Write to named pipe as writing to a file (but write the data in small chunks). os.write(audioPipeName, covertFl32ToInt16(nyArr.squeeze()).tobytes()) # Write 1024 bytes of data to fd_pipe # 生成numpy def generte(prompt:str, device: str, model: VitsModel, tokenizer: PreTrainedTokenizerBase): inputs = tokenizer(prompt, return_tensors="pt").to(device) # with torch.no_grad(): # output = model(**inputs).waveform return output.cpu().numpy() def soundPipeWriter(model, device, tokenizer, pipeName): fd_pipe = os.open(pipeName, os.O_WRONLY) filepath = 'night.txt' for content in read_file(filepath): print(content) audioWriteInPipe(generte(prompt=content, device=device, model=model, tokenizer=tokenizer), audioPipeName=fd_pipe) os.close(fd_pipe) # 读取文件源 def read_file(filepath:str): with open(filepath) as fp: for content in fp: yield content def record(vedioProcess, model, tokenizer, device): # Names of the "Named pipes" pipeA = "audio_pipe1" pipeV = "vedio_pipe2" # Create "named pipes". os.mkfifo(pipeA) os.mkfifo(pipeV) # Open FFmpeg as sub-process # Use two audio input streams: # 1. Named pipe: "audio_pipe1" # 2. Named pipe: "audio_pipe2" process = ( ffmpeg .concat(ffmpeg.input("pipe:vedio_pipe2"), ffmpeg.input("pipe:audio_pipe1"), v=1, a=1) .output("merge_audio_vedio.mp4", pix_fmt='yuv480p', vcodec='copy', acodec='aac') .run_async(pipe_stderr=True) ) # Initialize two "writer" threads (each writer writes data to named pipe in chunks of 1024 bytes). thread1 = Thread(target=soundPipeWriter, args=(model, device, tokenizer, pipeA)) # thread1 writes samp1 to pipe1 thread2 = Thread(target=writer, args=(vedioProcess, pipeV, 1024)) # thread2 writes samp2 to pipe2 # Start the two threads thread1.start() thread2.start() # Wait for the two writer threads to finish thread1.join() thread2.join() process.wait() # Wait for FFmpeg sub-process to finish # Remove the "named pipes". os.unlink(pipeV) os.unlink(pipeA) if __name__ == "__main__": device: str = "cuda:0" if torch.cuda.is_available() else "cpu" model, tokenizer = loadModel(device=device) # make lavfi-testSrc 60s mp4 vedioProcess = ( ffmpeg .input('testsrc=duration=10:size=640x480:rate=30', f="lavfi", t=60) .output('pipe:', format='rawvideo', pix_fmt='rgb24') .run_async(pipe_stdout=True) ) # record(vedioProcess, model, tokenizer, device) vedioProcess.wait() vscode中截图: ![](https://wmprod.oss-cn-shanghai.aliyuncs.com/c/user/20240925/f010760dfc6c74520f9b3197230485b2.png)

阅读量:192

点赞量:0

问AI
视频卡住是因为编码不对.用h264都可以过了.声音输入不可用是因为没有编码。 完整的示例如下: import subprocess import os from threading import Thread import numpy as np from transformers import VitsModel, VitsTokenizer, PreTrainedTokenizerBase import torch import ffmpeg # 写视频 def writer(data, pipe_name, chunk_size): dataLength = len(data) print('arg data is print') print('data length:%d, chunk size:%d' % (dataLength, chunk_size)) # Open the pipes as opening files (open for "open for writing only"). fd_pipe = os.open(pipe_name, os.O_WRONLY) # fd_pipe1 is a file descriptor (an integer) for i in range(0, dataLength, chunk_size): print('start write start:%d, finish:%d' % (i, chunk_size+i)) # Write to named pipe as writing to a file (but write the data in small chunks). os.write(fd_pipe, data[i:chunk_size+i]) # Write 1024 bytes of data to fd_pipe print('writing...') # Closing the pipes as closing files. os.close(fd_pipe) # 加载TTS模型 def loadModel(device: str): #model = VitsModel.from_pretrained("facebook/mms-tts-eng").to(device) # acebook/mms-tts-deu #tokenizer = VitsTokenizer.from_pretrained("facebook/mms-tts-eng") model = VitsModel.from_pretrained("./mms-tts-eng", local_files_only=True).to(device) # acebook/mms-tts-deu tokenizer = VitsTokenizer.from_pretrained("./mms-tts-eng", local_files_only=True) return model, tokenizer # 将32位浮点转成16位整数, 适用于:16000(音频采样率) def covertFl32ToInt16(nyArr): return np.int16(nyArr / np.max(np.abs(nyArr)) * 32767) def audioWriteInPipe(nyArr, audioPipeName): # Write to named pipe as writing to a file (but write the data in small chunks). os.write(audioPipeName, covertFl32ToInt16(nyArr.squeeze()).tobytes()) # Write 1024 bytes of data to fd_pipe # 生成numpy def generte(prompt:str, device: str, model: VitsModel, tokenizer: PreTrainedTokenizerBase): inputs = tokenizer(prompt, return_tensors="pt").to(device) # with torch.no_grad(): # output = model(**inputs).waveform return output.cpu().numpy() def soundPipeWriter(model, device, tokenizer, pipeName): fd_pipe = os.open(pipeName, os.O_WRONLY) filepath = 'night.txt' for content in read_file(filepath): print(content) audioWriteInPipe(generte(prompt=content, device=device, model=model, tokenizer=tokenizer), audioPipeName=fd_pipe) os.close(fd_pipe) # 读取文件源 def read_file(filepath:str): with open(filepath) as fp: for content in fp: yield content def record(vedioData, model, tokenizer, device): # Names of the "Named pipes" pipeA = "audio_pipe1" pipeV = "vedio_pipe2" # Create "named pipes". os.mkfifo(pipeA) os.mkfifo(pipeV) # Open FFmpeg as sub-process # Use two audio input streams: # 1. Named pipe: "audio_pipe1" # 2. Named pipe: "audio_pipe2" process = subprocess.Popen(["ffmpeg", "-i", pipeV, "-ar", "16000", "-f", "s16le", "-ac", "1", "-i", pipeA, "-c:v", "copy", "-c:a", "aac", "merge_audio_vedio.mp4"], stdin=subprocess.PIPE) # Initialize two "writer" threads (each writer writes data to named pipe in chunks of 1024 bytes). thread1 = Thread(target=soundPipeWriter, args=(model, device, tokenizer, pipeA)) # thread1 writes samp1 to pipe1 thread2 = Thread(target=writer, args=(vedioData, pipeV, 1024)) # thread2 writes samp2 to pipe2 # Start the two threads thread1.start() thread2.start() # Wait for the two writer threads to finish thread1.join() thread2.join() process.wait() # Wait for FFmpeg sub-process to finish # Remove the "named pipes". os.unlink(pipeV) os.unlink(pipeA) if __name__ == "__main__": device: str = "cuda:0" if torch.cuda.is_available() else "cpu" model, tokenizer = loadModel(device=device) # make lavfi-testSrc 60s mp4 vedioProcess = ( ffmpeg .input('testsrc=duration=10:size=640x480:rate=30', f="lavfi", t=60) .output('pipe:', format='h264', pix_fmt='rgb24') .run_async(pipe_stdout=True) ) buffer, _ = vedioProcess.communicate() # record(buffer, model, tokenizer, device) vedioProcess.wait()