抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

Python 学习

很难绷,编程四五年,现在才学Python

Python是一门很简单的语言,但很多语法跟C++、C#不一样,一眼看上去发现好多不认识的东西,在此记录一下

语法

循环

for i in range(5):
print(i) # 0, 1, 2, 3, 4

循环指定范围

for i in range(1, 5):
print(i) # 1, 2, 3, 4

循环指定步进步长

for i in range(0, 5, 2):
print(i) # 0, 2, 4

索引

Python除了可以使用正向索引,还可以使用负向索引,表示为从最后一个元素开始倒着数,-1是最后一个元素

a = [0, 1, 2, 3, 4, 5]
print(a[0]) # 0
print(a[1]) # 1
print(a[-1]) # 5
print(a[-2]) # 4

这个功能常用于处理路径

file_path = "datasets/xa/xaa/a.json"
file_name = file_path.split('/')[-1] # a.json

异常

try:
if xxx:
raise Exception("There is a error")
except Exception as e:
print(e)

assert

assert <一个bool变量>

若条件为真,程序继续进行

若条件为假,程序抛出AssertionError异常,可以加一个参数信息

assert <一个bool变量>, <一个字符串参数>

面向对象

创建一个类

class Student:
def __init__(self, student_name):
self.name = student_name
def test(self):
print(self.name)

容器

set

# 创建一个set
dataset = set()
key = '1'
# 添加一个元素
dataset.add(key)
# 判断元素是否在set中
if key in dataset:
print(key)
# 将这个元素从set中移除
dataset.discard(key)

list

切片

Python可以使用切片操作,从一个序列(如列表、元组或字符串)中获取一部分元素

numbers = [0, 1, 2, 3, 4, 5]
print(numbers[:2]) # 输出:[0, 1]
s = "Hello"
print(s[:2]) # 输出:'He'

列表推导式

List Comprehension

raw = [1, 2, 3, 4, 5]
ans = [x-1 for x in raw] # ans = [0, 1, 2, 3, 4]
import re
input = "<h1><h3><h5><h7>"
pattern = r'<h(\d+)>'
token_lst = [int(match) for match in re.findall(pattern, input)]
# token_lst = [1, 3, 5, 7]

dict

判断Dict中有无Key

if 'key_name' not in dic:
xxx

关键词

with

很类似C#的using,用于打开文件,期间代码出现异常会正常关闭,加载的文件也会被关闭释放

with open('file.txt', 'r') as f:
content = f.read()
with open('a.json', 'r') as f:
content = json.loads(f)
with open('a.txt', 'r', encoding='utf-8') as file:
for line in file:
print(line.strip())

字符串

匹配

str1 = 'Hello'
str2 = f'str1: {str1}' # str1: Hello

转为字符串

s = str(o)
if s.endswith('.jsonl'):
s.replace('.jsonl', '.txt')

格式化数字

在按顺序输出文件名时,经常有格式化数字的需求

num = 12
formatted_str = f"{num:04d}" # 0012

f = 1.2222
formatted_str_f = f"{f:.02f}" # 1.22

常用库

自定义文件

从自定义的python文件中import函数

# 当前python的同级目录下有一个utils.py文件,文件中定义了一个load_models函数
from utils import load_models

也可以将当前文件添加到sys路径中

current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(current_dir)
from utils import load_models

sys

传递参数,跟C++ main函数的argv意义相同

import sys
# python a1 a2
a1 = sys.argv[1] # a1
a2 = sys.argv[2] # a2

os

遍历目录

访问文件夹下所有文件(递归遍历子文件夹)

for dirpath, dirnames, filenames in os.walk(root_dir):
for filename in filenames:
file_path = os.path.join(dirpath, filename)
# 打开 file_path

得到文件所在文件夹

不存在就创建文件夹

temp_folder = os.path.dirname(out_path)
os.makedirs(temp_folder, exist_ok=True)

unset http proxy

os.environ.pop("http_proxy", None)
os.environ.pop("https_proxy", None)

添加文件夹

可以import某个文件夹下的文件

libpath = os.path.abspath(
os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"step_align",
)
)
if os.path.exists(libpath):
if libpath not in sys.path:
sys.path.insert(0, libpath)
from local_file import custom_class

zipfile

python可以在不解压文件的情况下读取文件内容

import zipfile
# zip
with zipfile.ZipFile(f'{file_name}.zip', 'r') as zip_ref:
for name in zip_ref.namelist():
print(name)
content = zip_ref.read(name).decode('utf-8')

tarfile

import tarfile
# tar
with tarfile.open(f'{file_name}.tar', 'r') as tar_ref:
# 遍历包内所有文件
for member in tar_ref.getmembers():
# 压缩包中可能不是文件,而是文件夹
if member.isfile():
# 打印文件名
print(member.name)
# 使用文件名直接读文件内容
content = tar_ref.extractfile(member).read().decode('utf-8')
# 将文件解压到extract_dir文件夹下
tar_ref.extract(member, path=extract_dir)
import tarfile
import shutil
# 将target_folder整个压缩
with tarfile.open(new_tar_path, 'w') as tar:
for root, dirs, files in os.walk(target_folder):
for file in files:
file_path = os.path.join(root, file)
tar.add(file_path, arcname=os.path.join(os.path.basename(root), file))
# 删除target_folder
shutil.rmtree(target_folder)

发起进程

import subprocess

script_path = 'inference.py'
arguments = ['-s', 'assets/examples/source/hal.jpeg', '-d', 'assets/temp/sun.mp4', '--flag_normalize_lip']
subprocess.call(['python', script_path] + arguments)

pkl

将参数存储为二进制

import pickle

def read_pkl_file(file_path):
with open(file_path, 'rb') as file:
data = pickle.load(file)
return data

def write_pkl_file(file_path, data):
with open(file_path, 'wb') as file:
pickle.dump(data, file)

random

# 生成随机数
ans = random.uniform(c_min, c_max)
# 随机整数
n = random.randint(1, 10)
# 从数组中随机挑选500条不重复的
select_list = random.sample(raw_list, 500)

json

# 读字符串为json
data = json.loads(input_text)
# 对象转为字符串
json_str = json.dumps(data)
# 从文件中加载json
with open('xx.json', 'r') as f:
data = json.load(f)
# 写json到文件
with open('xx.json', 'w') as f:
json.dump(data, f, ensure_ascii=False)

moviepy

moviepy是一个处理视频和音频的库

为视频配音

video = mp.VideoFileClip("1.mp4")
audio = mp.AudioFileClip("2.mp3")
video = video.set_audio(audio)
video.write_videofile("3.mp4", codec="libx264", audio_codec="aac")

opencv

查看视频帧率

pip install opencv-python
import cv2

# 打开视频文件
video_path = 's18.mp4'
cap = cv2.VideoCapture(video_path)
# 获取视频的帧率
fps = cap.get(cv2.CAP_PROP_FPS)
print(f'视频的帧率为:{fps} FPS')
# 释放资源
cap.release()

plt

折线图

import matplotlib.pyplot as plt

x = [1, 2, 3, 4, 5]
y = [2, 3, 5, 7, 11]

# 绘制折线图
plt.plot(x, y, marker='o', linestyle='-', label='line')

# 设置图表标题和刻度标签
plt.title('Graph')
plt.xlabel('Frame')
plt.ylabel('Value')
# 显示折线的名字
# plt.legend()
if enable_save:
# 将图表保存为文件
plt.savefig(f'{file_name}.png', dpi=300)

# 显示图表(如果不是headless)
plt.show()

散点图

value_x = []
value_y = []

plt.figure(figsize=(10, 10))
# 绘制散点图
plt.scatter(value_x, value_y)
# 绘制文本
plt.text(0, p5, f'P5: {p5:.2f}', fontsize=20, verticalalignment='bottom', color='r', ha='right')
# 绘制线
plt.axhline(y=p5, color='r', linestyle='--')

plt.xlabel('x')
plt.ylabel('y')
plt.title('title')
# 布局风格
plt.tight_layout()
# 保存
plt.savefig('output.png')

联合图表

import matplotlib.pyplot as plt

# 创建一个2x2的联合图表
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

axes[0, 0].scatter(xx, yy)
axes[0, 0].set_xlabel('x')
axes[0, 0].set_ylabel('y')
axes[0, 0].set_title('title 1')

axes[0, 1].scatter(xx, yy)
axes[0, 1].set_xlabel('x')
axes[0, 1].set_ylabel('y')
axes[0, 1].set_title('title 2')

axes[1, 0].scatter(xx, yy)
axes[1, 0].set_xlabel('x')
axes[1, 0].set_ylabel('ye')
axes[1, 0].set_title('title 3')

axes[1, 1].scatter(xx, yy)
axes[1, 1].set_xlabel('x')
axes[1, 1].set_ylabel('y')
axes[1, 1].set_title('title 4')

plt.tight_layout()
plt.savefig('output.png')

numpy

求百分位数

p5 = np.percentile(speed, 5)
p50 = np.percentile(speed, 50)
p95 = np.percentile(speed, 95)

mask

arr = np.array(rates)
mask = arr > 0.05
count = mask.sum() # 统计数组rates中大于0.05的值的数量

ffmpeg

import subprocess
def m4s_to_mp3(input_file, output_file):
ffmpeg_command = [
"ffmpeg",
"-i", input_file,
"-acodec", "libmp3lame",
"-q:a", "2",
output_file
]
subprocess.run(ffmpeg_command, check=True)

m4s_to_mp3('a.m4s', 'a.mp3')

tqdm

用于显示进度条

from tqdm import tqdm

for i in tqdm(range(100)):
print(i)

datasets

Hugging face的数据类型

from datasets import Dataset, load_from_disk
import pandas as pd
# 将jsonl保存为hf datasets
name = 'combine'
df = pd.read_json(f"{name}.jsonl", lines=True)
dataset = Dataset.from_pandas(df)
dataset.save_to_disk(name)
# 加载hf datasets
loaded_dataset = load_from_disk(name)
print(len(loaded_dataset))
print(loaded_dataset[0])

argparse

处理输入参数,自动转化为变量

import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--input_folder', type=str, required=True, help='The folder path to the dataset')
parser.add_argument('--detail', action='store_true', help='Print detailed information')
parser.add_argument('--max_steps', type=int, default=2000000, help='Max steps')
args = parser.parse_args()

if args.detail:
print('The input folder is:', args.input_folder)
print('The max steps is:', args.max_steps)
else:
print('The input folder is:', args.input_folder)
python parser_test.py --input_folder ./ --detail --max_steps 10 
python parser_test.py --input_folder ./ --max_steps 10

imap

基于multiprocessing的并行处理代码

小技巧,如果我有一个巨大的dict,直接遍历处理会超内存,被Killed。可以将key转为哈希,根据dict的大小取哈希的前两三位,将dict摊开,再做处理

处理数组

import os
import io
import json
import multiprocessing
from tqdm import tqdm

# 处理数组
def process_item(item):
index, name = item
with open(f'hash_split/{name}', 'r') as f:
for line in f:
info = json.loads(line.strip())
...
return item

if __name__ == "__main__":
data = []
root_dir = 'hash_split' # 这个文件存储了被哈希切分的jsonl们
for dirpath, dirnames, filenames in os.walk(root_dir):
for filename in filenames:
if filename.endswith('.jsonl'):
data.append(filename)
# 并行处理
pool = multiprocessing.Pool(16)
processed_data_iterator = tqdm(pool.imap(process_item, enumerate(data)), total=len(data))
processed_data = list(processed_data_iterator)
pool.close()
pool.join()

处理dict

# 处理dict
def process_item(item):
key, value = item
...
return item

if __name__ == "__main__":
data = {}
# 并行处理
pool = multiprocessing.Pool(16)
processed_data_iterator = tqdm(pool.imap(process_item, data.items()), total=len(data))
processed_data = list(processed_data_iterator)
pool.close()
pool.join()

带参数

import functools

def process_item(item, args):
...
return item

if __name__ == "__main__":
datas = []
pool = multiprocessing.Pool(16)
partial_process_item = functools.partial(process_item, args=args)
processed_data_iterator = tqdm(pool.imap(partial_process_item, datas), total=len(datas), desc="Process")
result_list = list(processed_data_iterator)
pool.close()
pool.join()

curl

curl http://xxx/v1/api -X POST  -H  "Content-Type: application/json"  -d '{"text": ["1", "2"]}'

等价于

import requests
url = "http://xxx/v1/api"
headers = {
"Content-Type": "application/json"
}
data = {
"text": ["1", "2"]
}
response = requests.post(url, headers=headers, json=data)

torchaudio

import torchaudio
# 从路径加载音频
waveform, sample_rate = torchaudio.load(file_path)
# 获得音频时长,单位为秒
duration = waveform.shape[1] / sample_rate

warnings

import warnings
warnings.filterwarnings('ignore')

dataclass

更方便定义一个数据类

from dataclasses import dataclass
@dataclass
class ProcessConfig:
batch_size = 32
save_folder = "/tmp"

hashlib

# 用md5生成字符串的16进制哈希
hash_object = hashlib.new("md5")
hash_object.update(lyrics.encode('utf-8'))
hex_hash = hash_object.hexdigest()
print(hex_hash)

datetime

import time
from datetime import datetime, timedelta

current_time = time.time()
local_datetime = datetime.fromtimestamp(current_time)
print(local_datetime)

其他操作

找到site-packages

from distutils.sysconfig import get_python_lib
print(get_python_lib())

pip

requirements.txt

下载requirements.txt

pip install -r requirements.txt

生成

pipreqs ./ --encoding=utf8  --force

可编辑模式

pip install -e /path/to/my_package

选择一个本地python包路径,使用-e的pip,这个包内容会被链接到python环境中,对包内容的更改会实时影响python环境里的包内容,便于bao

以包的形式运行

python3 -m <package name>.<file name>

等价于

cd <package folder>
python3 <file name>.py

但这种调用方式可以用...,import包

bash

export music_type="song"
export model_name="llama3.1"
export retry_count=5

python3 call.py --music_type $music_type --model_name $model_name --retry_count $retry_count

调试

话说我发现一些同事居然在用IPython,感觉惊为天人(来自一个使用IDE人的震惊)

from IPython import embed
embed()

评论