推荐 最新
世界唯一的

python的round函数怎么用?

In [17]: round(2.455, 2) Out[17]: 2.46 In [18]: round(2.355, 2) Out[18]: 2.35 这个结果是怎么来的?

14
1
0
浏览量423
木子弓长

请问python的列表生成式怎样插入多个值?

比如: # 这种写法是错的,只是表达一下我想实现的数据 a = [0, i for i in range(2)] 简单来说,我想在每个数据前追加一个值 a = [[0, i] for i in range(2)] b = [i for j in a for i in j] 我想要b的结果,能直接用列表生成式做到吗? 谢谢

13
1
0
浏览量233
Fronttend

怎么解决python 爬虫运行多进程报错:TypeError: cannot pickle '_thread.lock' object?

python 爬虫运行多进程报错:TypeError: cannot pickle '_thread.lock' object # coding=utf-8 """ @project: 15python_spider @Author:frank @file: 01_xiaomi_app.py @date:2024/3/7 19:52 """ import json import time from multiprocessing import Process from queue import Queue import requests class XiaomiSpider(object): def __init__(self): self.url = 'http://app.mi.com/categotyAllListApi?page={}&categoryId=2&pageSize=30' self.headers = {'User-Agent': 'Mozilla/5.0'} # url队列 self.url_queue = Queue() self.n = 0 self.app_list = [] # URL入队列 def url_in(self): for i in range(6): url = self.url.format(i) # 入队列 self.url_queue.put(url) # 线程事件函数 def get_data(self): while True: # self.url_queue.empty() 为空,则退出执行 if self.url_queue.empty(): break # get地址,请求+解析+保存 url = self.url_queue.get() html = requests.get( url=url, headers=self.headers ).content.decode('utf-8') html = json.loads(html) # 解析数据 for app in html['data']: # 应用名称 app_name = app['displayName'] app_link = 'https://app.mi.com/details?id={}'.format(app['packageName']) app_info = { 'app_name': app_name, 'app_link': app_link } self.app_list.append(app_info) self.n += 1 print(url) # 主函数 def main(self): # url 入队列 self.url_in() t_list = [] for i in range(5): t = Process(target=self.get_data) t_list.append(t) t.start() for i in t_list: i.join() with open('app_list.json', 'w') as f: json.dump(self.app_list, f, ensure_ascii=False) print('应用数量:', self.n) if __name__ == "__main__": start = time.time() spider = XiaomiSpider() spider.main() end = time.time() print('执行时间:%.2f' % (end - start)) 怎么解决python 爬虫运行多进程报错:TypeError: cannot pickle '_thread.lock' object

0
1
0
浏览量212
应该是最帅的鹅

如何解决Linux中Python导入psycopg2时缺少libssl.so.10文件的错误?

Linux服务器:NingOS V3(只有内网) python:3.9 报错信息: >>> import psycopg2 Traceback (most recent call last): File "", line 1, in File "/u01/bigdata/anaconda3/lib/python3.9/site-packages/psycopg2-2.9-py3.9-linux-x86_64.egg/psycopg2/init.py", line 51, in from psycopg2._psycopg import ( # noqa ImportError: libcrypto.so.10: cannot open shared object file: No such file or directory 我就单纯通过python进入应用,然后执行import psycopg2就直接报错了。 信息说缺少libcrypto.so.10文件,我正在网上搜索了很多教程,这个文件在很多地方都出现过,有些是yum安装命令的时候报这个错,有些是openssl报的这个错,我是import psycopg2报的这个错。 所以libcrypto.so.10这个文件到底是系统自带的?openssl安装附带的?还是python安装自带的啊?我在系统上找到了很多libcrypto.so.10的文件,但是通过网上教程建立软链接还是没用。 想问下libcrypto.so.10这个文件到底是哪来的?标题的问题有解决方法吗?

0
1
0
浏览量205
生成头像

ubuntu 22.04中音视频合成?

需求: 声音动态生成, 视频固定来源. 代码中使用的是testsrc. 代码一直卡在rawvedio写入命名管道哪也没撒错误 示例代码: import subprocess import os from threading import Thread import numpy as np from transformers import VitsModel, VitsTokenizer, PreTrainedTokenizerBase import torch import ffmpeg def read_frame_from_stdout(vedioProcess, width, height): frame_size = width * height * 3 input_bytes = vedioProcess.stdout.read(frame_size) if not input_bytes: return assert len(input_bytes) == frame_size return np.frombuffer(input_bytes, np.uint8).reshape([height, width, 3]) def writer(vedioProcess, pipe_name, chunk_size): width = 640 height = 480 while True: input_frame = read_frame_from_stdout(vedioProcess, width, height) print('read frame is:' % input_frame) if input_frame is None: print('read frame is: None') break frame = input_frame * 0.3 os.write(fd_pipe, (frame.astype(np.uint8).tobytes())) # Closing the pipes as closing files. os.close(fd_pipe) # 加载TTS模型 def loadModel(device: str): model = VitsModel.from_pretrained("./mms-tts-eng", local_files_only=True).to(device) # acebook/mms-tts-deu tokenizer = VitsTokenizer.from_pretrained("./mms-tts-eng", local_files_only=True) return model, tokenizer # 将32位浮点转成16位整数, 适用于:16000(音频采样率) def covertFl32ToInt16(nyArr): return np.int16(nyArr / np.max(np.abs(nyArr)) * 32767) def audioWriteInPipe(nyArr, audioPipeName): # Write to named pipe as writing to a file (but write the data in small chunks). os.write(audioPipeName, covertFl32ToInt16(nyArr.squeeze()).tobytes()) # Write 1024 bytes of data to fd_pipe # 生成numpy def generte(prompt:str, device: str, model: VitsModel, tokenizer: PreTrainedTokenizerBase): inputs = tokenizer(prompt, return_tensors="pt").to(device) # with torch.no_grad(): # output = model(**inputs).waveform return output.cpu().numpy() def soundPipeWriter(model, device, tokenizer, pipeName): fd_pipe = os.open(pipeName, os.O_WRONLY) filepath = 'night.txt' for content in read_file(filepath): print(content) audioWriteInPipe(generte(prompt=content, device=device, model=model, tokenizer=tokenizer), audioPipeName=fd_pipe) os.close(fd_pipe) # 读取文件源 def read_file(filepath:str): with open(filepath) as fp: for content in fp: yield content def record(vedioProcess, model, tokenizer, device): # Names of the "Named pipes" pipeA = "audio_pipe1" pipeV = "vedio_pipe2" # Create "named pipes". os.mkfifo(pipeA) os.mkfifo(pipeV) # Open FFmpeg as sub-process # Use two audio input streams: # 1. Named pipe: "audio_pipe1" # 2. Named pipe: "audio_pipe2" process = ( ffmpeg .concat(ffmpeg.input("pipe:vedio_pipe2"), ffmpeg.input("pipe:audio_pipe1"), v=1, a=1) .output("merge_audio_vedio.mp4", pix_fmt='yuv480p', vcodec='copy', acodec='aac') .run_async(pipe_stderr=True) ) # Initialize two "writer" threads (each writer writes data to named pipe in chunks of 1024 bytes). thread1 = Thread(target=soundPipeWriter, args=(model, device, tokenizer, pipeA)) # thread1 writes samp1 to pipe1 thread2 = Thread(target=writer, args=(vedioProcess, pipeV, 1024)) # thread2 writes samp2 to pipe2 # Start the two threads thread1.start() thread2.start() # Wait for the two writer threads to finish thread1.join() thread2.join() process.wait() # Wait for FFmpeg sub-process to finish # Remove the "named pipes". os.unlink(pipeV) os.unlink(pipeA) if __name__ == "__main__": device: str = "cuda:0" if torch.cuda.is_available() else "cpu" model, tokenizer = loadModel(device=device) # make lavfi-testSrc 60s mp4 vedioProcess = ( ffmpeg .input('testsrc=duration=10:size=640x480:rate=30', f="lavfi", t=60) .output('pipe:', format='rawvideo', pix_fmt='rgb24') .run_async(pipe_stdout=True) ) # record(vedioProcess, model, tokenizer, device) vedioProcess.wait() vscode中截图: https://wmprod.oss-cn-shanghai.aliyuncs.com/c/user/20240925/f010760dfc6c74520f9b3197230485b2.png

0
1
0
浏览量191
一只臭美的Doggg

如何统计转换列转数据?

我有一个比较大的df date type 2024-01-01 1 2024-01-01 2 2024-01-01 1 2024-01-02 3 2024-01-02 2 2024-01-02 3 2024-01-02 1 2024-01-02 1 2024-01-03 1 2024-01-03 4 2024-01-03 2 2024-01-03 5 ... 如何恰当地完成如下的统计转换 date type1 type2 type3 type4 type5 2024-01-01 2 1 0 0 0 2024-01-02 2 1 2 0 0 2024-01-03 1 0 1 1 1 ... 谢谢高人指定。

0
1
0
浏览量167
一只臭美的Doggg

python这句代码是什么意思?

START = 0x7E VER = 0xFF LEN = 0x06 FEEDBACK = 0x00 END = 0xEF Checksum = -(VER + LEN + CMD + FEEDBACK + Para1 + Para2) HighByte = Checksum >> 8 LowByte = Checksum & 0xFF CommandLine = bytes([b & 0xFF for b in [ START, VER, LEN, CMD, FEEDBACK, Para1, Para2, HighByte, LowByte, END ]]) b & 0xFF for b in这个怎么理解

0
1
0
浏览量190
中年复健狗

python subprocess启动多个ffmpeg shell?

我需要依次调用三次subprocess. 发现最后一个没有执行? 在流媒体服务器的控制台上只能看到: aac和mp4的rtmp信息, 看不到live的rtmp连接 参考代码: if __name__ == "__main__": device: str = "cuda:0" if torch.cuda.is_available() else "cpu" model, tokenizer = loadModel(device=device) # # 音频(aac) # audioProcess = sp.Popen(["ffmpeg", "-f", "s16le", '-y', '-vn', "-ac", "1", "-ar", "16000", "-channel_layout", "mono", '-acodec','pcm_s16le', "-i", "pipe:", "-ar", "44100", "-f", "flv", rtmpAURL], stdin=sp.PIPE) # # 视频(mp4) # vedioProcess = sp.Popen(["ffmpeg", "-re", '-y', '-an', '-rtbufsize', '1024M', "-f", "dshow", "-i", "video=Q8 HD Webcam", "-pix_fmt", "yuvj420p", "-framerate", "25", '-vcodec', 'libx264', '-preset', 'fast', '-crf', '25', "-vf", "scale=640:480", "-f", "flv", rtmpVURL], shell=True) # #https://stackoverflow.com/questions/18618191/ffmpeg-merge-multiple-rtmp-stream-inputs-to-a-single-rtmp-output #ffmpeg -i rtmp://ip:1935/live/micMyStream7 -i rtmp://ip:1935/live/MyStream7 -filter_complex "[0:a][1:a]amix[a]" -map 0:v -map "[a]" -c:v copy -f flv rtmp://ip:1935/live/bcove7 # # 合并(live) # mergeVAP = sp.Popen(["ffmpeg", "-i", rtmpAURL, "-i", rtmpVURL, "-c:v", "copy", "-c:a", "aac", "-preset", "veryfast", "-f", "flv", "-flvflags", "no_duration_filesize", rtmpURL], shell=True, stderr=True) writeAudioThead = Thread(target=soundRecorder, args=(model, device, tokenizer, audioProcess)) writeAudioThead.start() writeAudioThead.join() audioProcess.wait() vedioProcess.wait() mergeVAP.wait() 问题1:这三个是串行执行的. 如何让他们异步执行 问题2: 上面的示例串行执行, 为什么 mergeVAP没有执行.在命令行上执行是可以执行的

0
1
0
浏览量170
MastFancy

无法下载网页urllib.error.HTTPError: HTTP Error 403: Forbidden?

想提取这个网页的数据 from urllib.request import urlretrieve import urllib import random url="https://cn.investing.com/indices/hnx-30-components" opener = urllib.request.build_opener() ua_list = ['Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:102.0) Gecko/20100101 Firefox/102.0', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.114 Safari/537.36 Edg/103.0.1264.62', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:98.0) Gecko/20100101 Firefox/98.0', 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.81 Safari/537.36 SE 2.X MetaSr 1.0' ] opener.addheaders = [('User-Agent', random.choice(ua_list))] urllib.request.install_opener(opener) urlretrieve(url, '/tmp/test.html') 网页无法打开,浏览器可以打开 File "/usr/local/lib/python3.11/urllib/request.py", line 643, in http_error_default raise HTTPError(req.full_url, code, msg, hdrs, fp) urllib.error.HTTPError: HTTP Error 403: Forbidden 请问,如何解决?

0
1
0
浏览量133
biubiuuuuu

关于python的asyncio的timeout?

需求: 一个"asyncio.Queue()"作为中间交换。一个线程往里put, 一个线程从里get. 使用future总是取不得值. 示例demo: import asyncio import threading, time audioQueue = asyncio.Queue() def job(task,loop): def _asyncJob(task, loop): result = 'hello' time.sleep(1) asyncio.run_coroutine_threadsafe(task(result), loop) loop.call_soon_threadsafe(lambda: print('size:%s' % audioQueue.qsize())) threading.Thread(name=f"transcribe_thread_1", target=_asyncJob, args=(task, loop)).start() def sync_function(loop): # 这是一个同步函数 task = lambda result : audioQueue.put(result) job(task, loop) def getEle(loop): try: future = asyncio.run_coroutine_threadsafe(audioQueue.get(), loop) chnStatement = future.result(0.1) except asyncio.TimeoutError as exout: chnStatement = 'timeout' except asyncio.CancelledError as excel: chnStatement = 'cancel' except Exception as exc: chnStatement = 'exception' print(f'get element {chnStatement}') async def main(): # get the current event loop loop = asyncio.get_running_loop() sync_function(loop) threading.Thread(name=f"transcribe_thread_2", target=getEle, args=(loop,)).start() if __name__ == '__main__': asyncio.run(main()) get为什么需要timeout, 因为一开始put的过程要滞后一些,而get要求实时。要是使用await,线程会一直卡住 环境: python: 3.10.x

0
1
0
浏览量154