libpath = os.path.abspath( os.path.join( os.path.dirname(os.path.abspath(__file__)), "step_align", ) ) if os.path.exists(libpath): if libpath notin sys.path: sys.path.insert(0, libpath) from local_file import custom_class
zipfile
python可以在不解压文件的情况下读取文件内容
import zipfile # zip with zipfile.ZipFile(f'{file_name}.zip', 'r') as zip_ref: for name in zip_ref.namelist(): print(name) content = zip_ref.read(name).decode('utf-8')
tarfile
import tarfile # tar with tarfile.open(f'{file_name}.tar', 'r') as tar_ref: # 遍历包内所有文件 for member in tar_ref.getmembers(): # 压缩包中可能不是文件,而是文件夹 if member.isfile(): # 打印文件名 print(member.name) # 使用文件名直接读文件内容 content = tar_ref.extractfile(member).read().decode('utf-8') # 将文件解压到extract_dir文件夹下 tar_ref.extract(member, path=extract_dir)
import tarfile import shutil # 将target_folder整个压缩 with tarfile.open(new_tar_path, 'w') as tar: for root, dirs, files in os.walk(target_folder): for file in files: file_path = os.path.join(root, file) tar.add(file_path, arcname=os.path.join(os.path.basename(root), file)) # 删除target_folder shutil.rmtree(target_folder)
defread_pkl_file(file_path): withopen(file_path, 'rb') as file: data = pickle.load(file) return data defwrite_pkl_file(file_path, data): withopen(file_path, 'wb') as file: pickle.dump(data, file)
random
# 生成随机数 ans = random.uniform(c_min, c_max) # 随机整数 n = random.randint(1, 10) # 从数组中随机挑选500条不重复的 select_list = random.sample(raw_list, 500)
json
# 读字符串为json data = json.loads(input_text) # 对象转为字符串 json_str = json.dumps(data) # 从文件中加载json withopen('xx.json', 'r') as f: data = json.load(f) # 写json到文件 withopen('xx.json', 'w') as f: json.dump(data, f, ensure_ascii=False)
moviepy
moviepy是一个处理视频和音频的库
为视频配音
video = mp.VideoFileClip("1.mp4") audio = mp.AudioFileClip("2.mp3") video = video.set_audio(audio) video.write_videofile("3.mp4", codec="libx264", audio_codec="aac")
import os import io import json import multiprocessing from tqdm import tqdm
# 处理数组 defprocess_item(item): index, name = item withopen(f'hash_split/{name}', 'r') as f: for line in f: info = json.loads(line.strip()) ... return item
if __name__ == "__main__": data = [] root_dir = 'hash_split'# 这个文件存储了被哈希切分的jsonl们 for dirpath, dirnames, filenames in os.walk(root_dir): for filename in filenames: if filename.endswith('.jsonl'): data.append(filename) # 并行处理 pool = multiprocessing.Pool(16) processed_data_iterator = tqdm(pool.imap(process_item, enumerate(data)), total=len(data)) processed_data = list(processed_data_iterator) pool.close() pool.join()
处理dict
# 处理dict defprocess_item(item): key, value = item ... return item
if __name__ == "__main__": data = {} # 并行处理 pool = multiprocessing.Pool(16) processed_data_iterator = tqdm(pool.imap(process_item, data.items()), total=len(data)) processed_data = list(processed_data_iterator) pool.close() pool.join()