需求: 声音动态生成, 视频固定来源. 代码中使用的是testsrc. 代码一直卡在rawvedio写入命名管道哪也没撒错误 示例代码: import subprocess import os from threading import Thread import numpy as np from transformers import VitsModel, VitsTokenizer, PreTrainedTokenizerBase import torch import ffmpeg def read_frame_from_stdout(vedioProcess, width, height): frame_size = width * height * 3 input_bytes = vedioProcess.stdout.read(frame_size) if not input_bytes: return assert len(input_bytes) == frame_size return np.frombuffer(input_bytes, np.uint8).reshape([height, width, 3]) def writer(vedioProcess, pipe_name, chunk_size): width = 640 height = 480 while True: input_frame = read_frame_from_stdout(vedioProcess, width, height) print('read frame is:' % input_frame) if input_frame is None: print('read frame is: None') break frame = input_frame * 0.3 os.write(fd_pipe, (frame.astype(np.uint8).tobytes())) # Closing the pipes as closing files. os.close(fd_pipe) # 加载TTS模型 def loadModel(device: str): model = VitsModel.from_pretrained("./mms-tts-eng", local_files_only=True).to(device) # acebook/mms-tts-deu tokenizer = VitsTokenizer.from_pretrained("./mms-tts-eng", local_files_only=True) return model, tokenizer # 将32位浮点转成16位整数, 适用于:16000(音频采样率) def covertFl32ToInt16(nyArr): return np.int16(nyArr / np.max(np.abs(nyArr)) * 32767) def audioWriteInPipe(nyArr, audioPipeName): # Write to named pipe as writing to a file (but write the data in small chunks). os.write(audioPipeName, covertFl32ToInt16(nyArr.squeeze()).tobytes()) # Write 1024 bytes of data to fd_pipe # 生成numpy def generte(prompt:str, device: str, model: VitsModel, tokenizer: PreTrainedTokenizerBase): inputs = tokenizer(prompt, return_tensors="pt").to(device) # with torch.no_grad(): # output = model(**inputs).waveform return output.cpu().numpy() def soundPipeWriter(model, device, tokenizer, pipeName): fd_pipe = os.open(pipeName, os.O_WRONLY) filepath = 'night.txt' for content in read_file(filepath): print(content) audioWriteInPipe(generte(prompt=content, device=device, model=model, tokenizer=tokenizer), audioPipeName=fd_pipe) os.close(fd_pipe) # 读取文件源 def read_file(filepath:str): with open(filepath) as fp: for content in fp: yield content def record(vedioProcess, model, tokenizer, device): # Names of the "Named pipes" pipeA = "audio_pipe1" pipeV = "vedio_pipe2" # Create "named pipes". os.mkfifo(pipeA) os.mkfifo(pipeV) # Open FFmpeg as sub-process # Use two audio input streams: # 1. Named pipe: "audio_pipe1" # 2. Named pipe: "audio_pipe2" process = ( ffmpeg .concat(ffmpeg.input("pipe:vedio_pipe2"), ffmpeg.input("pipe:audio_pipe1"), v=1, a=1) .output("merge_audio_vedio.mp4", pix_fmt='yuv480p', vcodec='copy', acodec='aac') .run_async(pipe_stderr=True) ) # Initialize two "writer" threads (each writer writes data to named pipe in chunks of 1024 bytes). thread1 = Thread(target=soundPipeWriter, args=(model, device, tokenizer, pipeA)) # thread1 writes samp1 to pipe1 thread2 = Thread(target=writer, args=(vedioProcess, pipeV, 1024)) # thread2 writes samp2 to pipe2 # Start the two threads thread1.start() thread2.start() # Wait for the two writer threads to finish thread1.join() thread2.join() process.wait() # Wait for FFmpeg sub-process to finish # Remove the "named pipes". os.unlink(pipeV) os.unlink(pipeA) if __name__ == "__main__": device: str = "cuda:0" if torch.cuda.is_available() else "cpu" model, tokenizer = loadModel(device=device) # make lavfi-testSrc 60s mp4 vedioProcess = ( ffmpeg .input('testsrc=duration=10:size=640x480:rate=30', f="lavfi", t=60) .output('pipe:', format='rawvideo', pix_fmt='rgb24') .run_async(pipe_stdout=True) ) # record(vedioProcess, model, tokenizer, device) vedioProcess.wait() vscode中截图: 