Python 学习 
很难绷,编程四五年,现在才学Python
 
Python是一门很简单的语言,但很多语法跟C++、C#不一样,一眼看上去发现好多不认识的东西,在此记录一下
语法 
循环 
for  i in  range (5 ):  print (i)	 
 
循环指定范围
for  i in  range (1 , 5 ):  print (i)	 
 
循环指定步进步长
for  i in  range (0 , 5 , 2 ):  print (i)	 
 
索引 
Python除了可以使用正向索引,还可以使用负向索引,表示为从最后一个元素开始倒着数,-1是最后一个元素
a = [0 , 1 , 2 , 3 , 4 , 5 ] print (a[0 ])	print (a[1 ])	print (a[-1 ])	print (a[-2 ])	
 
这个功能常用于处理路径
file_path = "datasets/xa/xaa/a.json"  file_name = file_path.split('/' )[-1 ]	 
 
异常 
try :  if  xxx:     raise  Exception("There is a error" ) except  Exception as  e:  print (e) 
 
assert 
 
若条件为真,程序继续进行
若条件为假,程序抛出AssertionError异常,可以加一个参数信息
assert  <一个bool 变量>, <一个字符串参数>
 
内置函数 
isinstance 
用于判断一个对象是否是某种类型
if  not  isinstance (data, List ):	... else :  ... 
 
面向对象 
创建一个类
class  Student :  def  __init__ (self, student_name ):     self.name = student_name   def  test (self ):     print (self.name) 
 
魔法方法 
python通关魔法方法(Magic Methods)为类提供一些机制,形如__xxx__的成员函数
对象创建和销毁 
 
__new__ :在对象创建之前调用,用于控制对象的创建过程。 
__init__ :在对象创建后调用,用于初始化对象的属性。 
__del__ :在对象被销毁时调用,用于清理资源。 
 
字符串表示 
 
__str__ :返回对象的字符串表示,用于 print() 函数。 
__repr__ :返回对象的字符串表示,用于调试和开发。 
 
序列操作 
 
__len__ :返回对象的长度,用于 len() 函数。 
__getitem__ :获取对象中指定键的值,用于索引操作。 
__setitem__ :设置对象中指定键的值,用于赋值操作。 
__delitem__ :删除对象中指定键的值,用于删除操作。 
 
迭代器 
 
__iter__ :返回一个可迭代对象,用于 for 循环。 
__next__ :返回下一个迭代器对象,用于 next() 函数。 
 
运算符重载 
 
__add__ :定义加法运算符。 
__sub__ :定义减法运算符。 
__mul__ :定义乘法运算符。 
__div__ :定义除法运算符。 
__eq__ :定义等于运算符。 
__ne__ :定义不等于运算符。 
__lt__ :定义小于运算符。 
__le__ :定义小于等于运算符。 
__gt__ :定义大于运算符。 
__ge__ :定义大于等于运算符。 
 
属性访问 
 
__getattr__ :当访问不存在的属性时调用。 
__getattribute__ :拦截所有的属性访问。 
__setattr__ :拦截所有属性的赋值操作。 
__delattr__ :拦截所有属性的删除操作。 
 
上下文管理 
 
__enter__ :在 with 语句开始时调用。 
__exit__ :在 with 语句结束时调用。 
 
可调用对象 
 
装饰器 
class  EmbeddingLayer :    @staticmethod     def  embed (input , basis ):     		pass  EmbeddingLayer.embed(input_tensor, basis_matrix) 
 
高阶函数 
def  square (x ):    return  x ** 2  numbers = [1 , 2 , 3 , 4 , 5 ] squared_numbers = map (square, numbers) 
 
容器 
set 
dataset = set () key = '1'  dataset.add(key) if  key in  dataset:  print (key)      dataset.discard(key) 
 
list 
切片 
Python可以使用切片操作,从一个序列(如列表、元组或字符串)中获取一部分元素
numbers = [0 , 1 , 2 , 3 , 4 , 5 ] print (numbers[:2 ])  s = "Hello"  print (s[:2 ])  
 
列表推导式 
List Comprehension
 
raw = [1 , 2 , 3 , 4 , 5 ] ans = [x-1  for  x in  raw]	 
 
import  reinput  = "<h1><h3><h5><h7>" pattern = r'<h(\d+)>'  token_lst = [int (match ) for  match  in  re.findall(pattern, input )] 
 
dict 
判断Dict中有无Key 
if  'key_name'  not  in  dic:  xxx 
 
关键词 
with 
很类似C#的using,用于打开文件,期间代码出现异常会正常关闭,加载的文件也会被关闭释放
with  open ('file.txt' , 'r' ) as  f:  content = f.read() 
 
with  open ('a.json' , 'r' ) as  f:  content = json.loads(f) 
 
with  open ('a.txt' , 'r' , encoding='utf-8' ) as  file:  for  line in  file:   	print (line.strip()) 
 
字符串 
前缀 
Python的字符串可以使用前缀表示某个格式和行为
原始字符串 
转义字符将会被视为普通字符,常用于正则表达式和Windwos下文件路径
pattern = r"\d+"  path = r"C:\Users\Admin\Documents"  
 
格式化字符串 
可以用{}来嵌入Python表达式
str1 = 'Hello'  str2 = f'str1: {str1} ' 	 
 
字节字符串 
 
匹配 
endswith 
可以用元组实现多种匹配
if  str1.endswith(('.jpg' , '.png' )):  ... 
 
格式化数字 
在按顺序输出文件名时,经常有格式化数字的需求
num = 12  formatted_str = f"{num:04d} " 	 f = 1.2222  formatted_str_f = f"{f:.02 f} " 	 
 
常用库 
自定义文件 
从自定义的python文件中import函数
from  utils import  load_models
 
也可以将当前文件添加到sys路径中
current_dir = os.path.dirname(os.path.abspath(__file__)) sys.path.append(current_dir) from  utils import  load_models
 
sys 
传递参数,跟C++ main函数的argv意义相同
import  sysa1 = sys.argv[1 ]	 a2 = sys.argv[2 ]	 
 
os 
遍历目录 
访问文件夹下所有文件(递归遍历子文件夹)
for  dirpath, dirnames, filenames in  os.walk(root_dir):    for  filename in  filenames:         file_path = os.path.join(dirpath, filename)      
 
得到文件所在文件夹 
不存在就创建文件夹
temp_folder = os.path.dirname(out_path) os.makedirs(temp_folder, exist_ok=True ) 
 
unset http proxy 
os.environ.pop("http_proxy" , None ) os.environ.pop("https_proxy" , None ) 
 
添加文件夹 
可以import某个文件夹下的文件
libpath = os.path.abspath(     os.path.join(         os.path.dirname(os.path.abspath(__file__)),         "step_align" ,     ) ) if  os.path.exists(libpath):    if  libpath not  in  sys.path:         sys.path.insert(0 , libpath) from  local_file import  custom_class
 
zipfile 
python可以在不解压文件的情况下读取文件内容
import  zipfilewith  zipfile.ZipFile(f'{file_name} .zip' , 'r' ) as  zip_ref:  for  name in  zip_ref.namelist():     print (name)     content = zip_ref.read(name).decode('utf-8' ) 
 
tarfile 
import  tarfilewith  tarfile.open (f'{file_name} .tar' , 'r' ) as  tar_ref:     for  member in  tar_ref.getmembers():          if  member.isfile():	            	print (member.name)	     	     	content = tar_ref.extractfile(member).read().decode('utf-8' )              tar_ref.extract(member, path=extract_dir) 
 
import  tarfileimport  shutilwith  tarfile.open (new_tar_path, 'w' ) as  tar:        for  root, dirs, files in  os.walk(target_folder):             for  file in  files:                 file_path = os.path.join(root, file)                 tar.add(file_path, arcname=os.path.join(os.path.basename(root), file)) shutil.rmtree(target_folder) 
 
发起进程 
import  subprocessscript_path = 'inference.py'  arguments = ['-s' , 'assets/examples/source/hal.jpeg' , '-d' , 'assets/temp/sun.mp4' , '--flag_normalize_lip' ] subprocess.call(['python' , script_path] + arguments) 
 
pkl 
将参数存储为二进制
import  pickledef  read_pkl_file (file_path ):    with  open (file_path, 'rb' ) as  file:         data = pickle.load(file)     return  data    def  write_pkl_file (file_path, data ):  with  open (file_path, 'wb' ) as  file:     pickle.dump(data, file) 
 
random 
ans = random.uniform(c_min, c_max) n = random.randint(1 , 10 ) select_list = random.sample(raw_list, 500 ) 
 
json 
data = json.loads(input_text) json_str = json.dumps(data) with  open ('xx.json' , 'r' ) as  f:  data = json.load(f) with  open ('xx.json' , 'w' ) as  f:  json.dump(data, f, ensure_ascii=False ) 
 
moviepy 
moviepy是一个处理视频和音频的库
为视频配音 
video = mp.VideoFileClip("1.mp4" ) audio = mp.AudioFileClip("2.mp3" ) video = video.set_audio(audio) video.write_videofile("3.mp4" , codec="libx264" , audio_codec="aac" ) 
 
opencv 
查看视频帧率 
pip install opencv-python 
 
import  cv2video_path = 's18.mp4'  cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) print (f'视频的帧率为:{fps}  FPS' )cap.release() 
 
plt 
折线图 
import  matplotlib.pyplot as  pltx = [1 , 2 , 3 , 4 , 5 ] y = [2 , 3 , 5 , 7 , 11 ] plt.plot(x, y, marker='o' , linestyle='-' , label='line' ) plt.title('Graph' ) plt.xlabel('Frame' ) plt.ylabel('Value' ) if  enable_save:  	     plt.savefig(f'{file_name} .png' , dpi=300 ) plt.show() 
 
散点图 
value_x = [] value_y = [] plt.figure(figsize=(10 , 10 )) plt.scatter(value_x, value_y) plt.text(0 , p5, f'P5: {p5:.2 f} ' , fontsize=20 , verticalalignment='bottom' , color='r' , ha='right' ) plt.axhline(y=p5, color='r' , linestyle='--' ) plt.xlabel('x' ) plt.ylabel('y' ) plt.title('title' ) plt.tight_layout() plt.savefig('output.png' ) 
 
联合图表 
import  matplotlib.pyplot as  pltfig, axes = plt.subplots(2 , 2 , figsize=(14 , 10 )) axes[0 , 0 ].scatter(xx, yy) axes[0 , 0 ].set_xlabel('x' ) axes[0 , 0 ].set_ylabel('y' ) axes[0 , 0 ].set_title('title 1' ) axes[0 , 1 ].scatter(xx, yy) axes[0 , 1 ].set_xlabel('x' ) axes[0 , 1 ].set_ylabel('y' ) axes[0 , 1 ].set_title('title 2' ) axes[1 , 0 ].scatter(xx, yy) axes[1 , 0 ].set_xlabel('x' ) axes[1 , 0 ].set_ylabel('ye' ) axes[1 , 0 ].set_title('title 3' ) axes[1 , 1 ].scatter(xx, yy) axes[1 , 1 ].set_xlabel('x' ) axes[1 , 1 ].set_ylabel('y' ) axes[1 , 1 ].set_title('title 4' ) plt.tight_layout() plt.savefig('output.png' ) 
 
numpy 
求百分位数 
p5 = np.percentile(speed, 5 ) p50 = np.percentile(speed, 50 ) p95 = np.percentile(speed, 95 ) 
 
mask 
arr = np.array(rates) mask = arr > 0.05 count = mask.sum()	 
 
unique 
从arr中提取唯一值(去除数组中重复元素,可以用于判断数组中出现了哪些数,可以用来判断数组是否全是01)
 
ffmpeg 
文件类型转换 
import  subprocessdef  m4s_to_mp3 (input_file, output_file ):    ffmpeg_command = [         "ffmpeg" ,         "-i" , input_file,         "-acodec" , "libmp3lame" ,         "-q:a" , "2" ,         output_file     ]     subprocess.run(ffmpeg_command, check=True )      m4s_to_mp3('a.m4s' , 'a.mp3' ) 
 
视频左右拼接 
ffmpeg -i 101.mp4 -i o101.mp4 -filter_complex "[0:v]pad=iw*2:ih[a];[a][1:v]overlay=w"  c101.mp4 
 
tqdm 
用于显示进度条
from  tqdm import  tqdmfor  i in  tqdm(range (100 )):	print (i) 
 
datasets 
Hugging face的数据类型
from  datasets import  Dataset, load_from_diskimport  pandas as  pdname = 'combine'  df = pd.read_json(f"{name} .jsonl" , lines=True ) dataset = Dataset.from_pandas(df) dataset.save_to_disk(name) loaded_dataset = load_from_disk(name) print (len (loaded_dataset))print (loaded_dataset[0 ])
 
argparse 
处理输入参数,自动转化为变量
import  argparseparser = argparse.ArgumentParser() parser.add_argument('--input_folder' , type =str , required=True , help ='The folder path to the dataset' ) parser.add_argument('--detail' , action='store_true' , help ='Print detailed information' ) parser.add_argument('--max_steps' , type =int , default=2000000 , help ='Max steps' ) group = parser.add_mutually_exclusive_group() group.add_argument("--mode1" , action="store_true" , help ="启用模式 1" ) group.add_argument("--mode2" , action="store_true" , help ="启用模式 2" ) group.add_argument("--mode3" , action="store_true" , help ="启用模式 3" ) args = parser.parse_args() if  args.detail:    print ('The input folder is:' , args.input_folder)     print ('The max steps is:' , args.max_steps) else :    print ('The input folder is:' , args.input_folder) 
 
python parser_test.py --input_folder ./ --detail --max_steps 10   python parser_test.py --input_folder ./ --max_steps 10  
 
imap 
基于multiprocessing的并行处理代码
小技巧,如果我有一个巨大的dict,直接遍历处理会超内存,被Killed。可以将key转为哈希,根据dict的大小取哈希的前两三位,将dict摊开,再做处理
 
处理数组 
import  osimport  ioimport  jsonimport  multiprocessingfrom  tqdm import  tqdmdef  process_item (item ):    index, name = item     with  open (f'hash_split/{name} ' , 'r' ) as  f:         for  line in  f:             info = json.loads(line.strip())             ...     return  item     if  __name__ == "__main__" :    data = []     root_dir = 'hash_split' 	     for  dirpath, dirnames, filenames in  os.walk(root_dir):         for  filename in  filenames:             if  filename.endswith('.jsonl' ):                 data.append(filename)          pool = multiprocessing.Pool(16 )     processed_data_iterator = tqdm(pool.imap(process_item, enumerate (data)), total=len (data))     processed_data = list (processed_data_iterator)     pool.close()     pool.join() 
 
处理dict 
def  process_item (item ):    key, value = item     ...     return  item     if  __name__ == "__main__" :    data = {}          pool = multiprocessing.Pool(16 )     processed_data_iterator = tqdm(pool.imap(process_item, data.items()), total=len (data))     processed_data = list (processed_data_iterator)     pool.close()     pool.join() 
 
带参数 
import  functoolsdef  process_item (item, args ):    ...     return  item    if  __name__ == "__main__" :  	datas = []   	pool = multiprocessing.Pool(16 )     partial_process_item = functools.partial(process_item, args=args)     processed_data_iterator = tqdm(pool.imap(partial_process_item, datas), total=len (datas), desc="Process" )     result_list = list (processed_data_iterator)     pool.close()     pool.join() 
 
实时更新 
对于一些执行速度很慢的任务,经常发现tqdm的更新不够及时。这是因为imap要保证按顺序完成执行,如果对顺序要求不那么强烈,可以使用imap_unordered
processed_data_iterator = tqdm(pool.imap_unordered(partial_process_item, datas), total=len (datas), desc="Process" ) 
 
curl 
curl http://xxx/v1/api -X POST  -H  "Content-Type: application/json"   -d '{"text": ["1", "2"]}'  
 
等价于
import  requestsurl = "http://xxx/v1/api"  headers = {     "Content-Type" : "application/json"  } data = {     "text" : ["1" , "2" ] } response = requests.post(url, headers=headers, json=data) 
 
torchaudio 
import  torchaudiowaveform, sample_rate = torchaudio.load(file_path) duration = waveform.shape[1 ] / sample_rate 
 
warnings 
import  warningswarnings.filterwarnings('ignore' ) 
 
dataclass 
更方便定义一个数据类
from  dataclasses import  dataclass@dataclass class  ProcessConfig :    batch_size = 32      save_folder = "/tmp"  
 
hashlib 
hash_object = hashlib.new("md5" ) hash_object.update(lyrics.encode('utf-8' )) hex_hash = hash_object.hexdigest() print (hex_hash)
 
datetime 
import  timefrom  datetime import  datetime, timedeltacurrent_time = time.time() local_datetime = datetime.fromtimestamp(current_time) print (local_datetime)
 
trimesh 
import  trimeshmesh = trimesh.load(glb_path) if  isinstance (mesh, trimesh.Scene):    mesh = mesh.dump(concatenate=True ) mesh.export(obj_path) 
 
如果没能导出图片,那么更新一下包 pip install --upgrade trimesh pillow imageio
 
PIL 
from  PIL import  Imageimage = Image.open (path) image.save('output.png' ) 
 
base64 
将图片转为base64
import  base64from  PIL import  Imagefrom  io import  BytesIOdef  image_to_base64 (image_path ):         with  Image.open (image_path) as  img:                  buffered = BytesIO()         img.save(buffered, format ="PNG" )                  img_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8" )     return  img_base64 
 
dataset 
huggingface提供的数据集类型,非常轻量,整个数据集的核心是metadata.jsonl,形如:
my_dataset/ ├── 001.jpg ├── 002.jpg └── metadata.jsonl 
 
metadata.jsonl格式为:
{ "file_name" :  "001.jpg" ,  "prompt" :  "xxx" ,  "width" :  2560 ,  "height" :  3581 } { "file_name" :  "002.jpg" ,  "prompt" :  "xxx" ,  "width" :  2560 ,  "height" :  3581 } 
 
你可以为这个jsonl添加各种keys
下载 
from  huggingface_hub import  snapshot_downloadlocal_dir = "./my_dataset"  snapshot_download(     "Reuben-Sun/dataset-name" ,     local_dir=local_dir, repo_type="dataset" ,     ignore_patterns=".gitattributes" , ) 
 
加载 
from  datasets import  load_datasetdataset = load_dataset(path="imagefolder" , data_dir="my_dataset" ) train_dataset = dataset['train' ] 
 
imagefolder:数据集类型为imagefolder 
 
huggingface_hub 
下载某个库的某些文件 
from  huggingface_hub import  hf_hub_download, loginimport  osrepo_id = "black-forest-labs/FLUX.1-dev"  files_to_download = ["flux1-dev.safetensors" , "ae.safetensors" ] download_dir = "./flux_models"  os.makedirs(download_dir, exist_ok=True ) for  file in  files_to_download:    print (f"Downloading {file} ..." )     hf_hub_download(         repo_id=repo_id,         filename=file,         local_dir=download_dir,         local_dir_use_symlinks=False        )     print (f"{file}  downloaded to {download_dir} /{file} " ) 
 
其他操作 
找到site-packages 
from  distutils.sysconfig import  get_python_libprint (get_python_lib())
 
pip 
requirements.txt 
下载requirements.txt
pip install -r requirements.txt 
 
生成
pipreqs ./ --encoding=utf8  --force 
 
可编辑模式 
pip install -e /path/to/my_package 
 
选择一个本地python包路径,使用-e的pip,这个包内容会被链接到python环境中,对包内容的更改会实时影响python环境里的包内容,便于bao
whl 
安装
 
生成whl(要求项目根目录有setup.py文件),执行后dist目录下就会有whl文件
python setup.py bdist_wheel 
 
以包的形式运行 
python3 -m <package name>.<file name> 
 
等价于
cd <package folder> python3 <file name>.py 
 
但这种调用方式可以用.和..,import包
bash 
export  music_type="song" export  model_name="llama3.1" export  retry_count=5python3 call.py --music_type $music_type  --model_name $model_name  --retry_count $retry_count  
 
调试 
IPython 
from  IPython import  embedembed() 
 
pdb 
import  pdbpdb.set_trace() 
 
traceback 
import  tracebacktry :         result = 10  / 0  except  Exception as  e:         traceback.print_exc()               error_msg = traceback.format_exc()     print ("错误信息:" )     print (error_msg)