#!/bin/python #coding: utf-8 # +------------------------------------------------------------------- # | system: 如意面板 # +------------------------------------------------------------------- # | Author: lybbn # +------------------------------------------------------------------- # | QQ: 1042594286 # +------------------------------------------------------------------- # | Date: 2024-01-03 # +------------------------------------------------------------------- # ------------------------------ # 文件/目录操作 # ------------------------------ import re import os import shutil import zipfile import tarfile import datetime import mimetypes import requests from natsort import natsorted, ns from utils.server.system import system from utils.security.no_delete_list import check_no_delete,check_in_black_list from utils.common import ast_convert,WriteFile,RunCommand,current_os from itertools import chain def get_file_name_from_url(url): """ @name 使用 os.path.basename() 函数获取 URL 中的文件名 @author lybbn<2024-02-22> """ file_name = os.path.basename(url) return file_name def get_file_extension(file_path): """ @name 获取文件后缀扩展 @author lybbn<2024-02-22> """ _, extension = os.path.splitext(file_path) return extension def detect_file_type(file_path): """ @name 检测文件类型 @author lybbn<2024-02-22> """ file_type, _ = mimetypes.guess_type(file_path) return file_type def auto_detect_file_language(file_path): """ @name 智能检测文件所属语言 @author lybbn<2024-03-08> """ ext = get_file_extension(file_path) if ext in ['.readme','.md']: return "markdown" elif ext in ['.sh']: return "shell" elif ext in ['.lua']: return "lua" elif ext in ['.rb']: return "ruby" elif ext in ['.js','.ts']: return "javascript" elif ext in ['.html','htm']: return "html" elif ext in ['.css','.scss','.sass','.less']: return "css" elif ext in ['.json']: return "json" elif ext in ['.py']: return "python" elif ext in ['.yaml','.yml']: return "yaml" elif ext in ['.conf','.ini']: if 'nginx' in file_path: return "nginx" return "properties" elif ext in ['.vue']: return "vue" elif ext in ['.php']: return "php" elif ext in ['.java']: return "java" elif ext in ['.go']: return "go" elif ext in ['.sql']: return "sql" elif ext in ['.xml']: return "xml" else: return "log" def list_dirs(dst_path): """ 列出指定目录下文件\目录名 返回指定目录下文件+目录名的列表 """ if not os.path.exists(dst_path): return [] data = [] for f in os.listdir(dst_path): data.append(f) return data def get_size(file_path): """ @name 获取文件大小 @author lybbn<2024-02-22> """ return os.path.getsize(file_path) def is_link(file_path): """ @name 是否软链接 @author lybbn<2024-02-22> """ return os.path.islink(file_path) def get_directory_size(dst_path): """ @name 计算指定目录大小 @author lybbn<2024-02-22> """ if current_os == "windows": total_size = 0 for path, dirs, files in os.walk(dst_path): for file in files: if not os.path.exists(file): continue if os.path.islink(file): continue file_path = os.path.join(path, file) total_size += os.path.getsize(file_path) return total_size else: result,err,returncode = RunCommand(f"du -sh {dst_path}",returncode=True) if returncode == 0: size = result.split()[0] return re.sub(r'(\d+)([a-zA-Z])', r'\1 \2', size)+"B" else: return "0 B" def get_path_files_nums(path): """ @name 获取指定目录文件数量 @author lybbn<2024-02-22> """ if os.path.isfile(path): return 1 if not os.path.exists(path): return 0 i = 0 for name in os.listdir(path): i += 1 return i def get_filename_ext(filename): """ @name 获取文件扩展名 @author lybbn<2024-02-22> """ tmpList = filename.split('.') return tmpList[-1] def windows_path_replace(path,is_windows = True): """ @name 把path中的\\ sep替换成 / @author lybbn<2024-02-22> """ if is_windows: path = path.replace("\\", "/") return path def list_files_in_directory(dst_path,sort="name",is_reverse=False,is_windows=False,search=None,containSub=False,isDir=False): """ @name 列出指定目录下文件\目录名列表,包含文件\目录属性(大小、路径、权限、所属者) 目录size大小默认不计算为0 owner所属者为空可能为文件/目录无权限查看或被占用等 @author lybbn<2024-02-22> @param sort 排序 @param is_reverse True 降序(desc) 、False 升序(asc) @param search 搜索名称 @param containSub 搜索内容是否包含所有子目录 @param isDir 是否只列出目录 """ def get_file_info(entry): """获取文件信息的内部函数""" file_info = entry.stat() modified_time = datetime.datetime.fromtimestamp(file_info.st_mtime) formatted_time = modified_time.strftime("%Y-%m-%d %H:%M:%S") gid = file_info.st_gid group_name = "" if is_windows else system.GetGroupidName(entry.path, gid) return { "name": entry.name, "type": "file" if entry.is_file() else "dir", "path": windows_path_replace(entry.path, is_windows=is_windows), "size": file_info.st_size if entry.is_file() else None, "permissions": oct(file_info.st_mode)[-3:], "owner_uid": file_info.st_uid, "owner": system.GetUidName(entry.path, file_info.st_uid), "gid": gid, "group": group_name, "modified": formatted_time } def process_entry(entry): """处理单个目录项""" if search and search.lower() not in entry.name.lower(): return None if isDir and entry.is_file(): return None return get_file_info(entry) # 处理Windows磁盘根目录情况 if is_windows and not dst_path: disk_paths = system().GetDiskInfo() datainfo = { 'data':[], 'file_nums':0, 'dir_nums':0, 'total_nums':0 } for d in disk_paths: if search and d['path'].lower().find(search) == -1: continue datainfo['data'].append({ "name": d['path'].lower(), "type": "pan", "path": windows_path_replace(d['path'].lower(), is_windows=is_windows), "size": d['size'][0], "permissions": "", "owner_uid": None, "owner": "", "modified": "" }) datainfo['total_nums'] = len(disk_paths) return datainfo # 检查路径有效性 if not os.path.exists(dst_path): return { 'data': [], 'file_nums': 0, 'dir_nums': 0, 'total_nums': 0 } if not os.path.isdir(dst_path): raise ValueError("错误:非目录") # 处理不包含子目录的情况 if not containSub: dirData = [] fileData = [] for entry in os.scandir(dst_path): item = process_entry(entry) if item: if item["type"] == "dir": dirData.append(item) else: fileData.append(item) # 对目录和文件分别排序 if sort == "name": dirData = natsorted(dirData, key=lambda x: x["name"], alg=ns.PATH, reverse=is_reverse) fileData = natsorted(fileData, key=lambda x: x["name"], alg=ns.PATH, reverse=is_reverse) elif sort == "modified": dirData = sorted(dirData, key=lambda x: x["modified"], reverse=is_reverse) fileData = sorted(fileData, key=lambda x: x["modified"], reverse=is_reverse) elif sort == "size": dirData = sorted(dirData, key=lambda x: x["size"] if x["size"] is not None else 0, reverse=is_reverse) fileData = sorted(fileData, key=lambda x: x["size"] if x["size"] is not None else 0, reverse=is_reverse) data = dirData + fileData else: # 处理包含子目录的情况 data = [] count_limit = 0 max_limit = 3000 for root, dirs, files in os.walk(dst_path): if count_limit >= max_limit: break for entry in chain((os.path.join(root, f) for f in files), (os.path.join(root, d) for d in dirs)): if count_limit >= max_limit: break info = process_entry(os.DirEntry(entry)) if info: data.append(info) count_limit += 1 # 对结果进行排序 if sort == "name": data = natsorted(data, key=lambda x: x["name"], alg=ns.PATH, reverse=is_reverse) elif sort == "modified": data = sorted(data, key=lambda x: x["modified"], reverse=is_reverse) elif sort == "size": data = sorted(data, key=lambda x: x["size"] if x["size"] is not None else 0, reverse=is_reverse) # 统计文件数量 file_nums = sum(1 for item in data if item["type"] == "file") dir_nums = sum(1 for item in data if item["type"] == "dir") return { 'data': data, 'file_nums': file_nums, 'dir_nums': dir_nums, 'total_nums': file_nums + dir_nums } def list_files_in_directory_old(dst_path,sort="name",is_reverse=False,is_windows=False,search=None,containSub=False,isDir=False): """ @name 列出指定目录下文件\目录名列表,包含文件\目录属性(大小、路径、权限、所属者) 目录size大小默认不计算为0 owner所属者为空可能为文件/目录无权限查看或被占用等 @author lybbn<2024-02-22> @param sort 排序 @param is_reverse True 降序(desc) 、False 升序(asc) @param search 搜索名称 @param containSub 搜索内容是否包含所有子目录 @param isDir 是否只列出目录 """ is_dir_first = True#是否把目录放在前面 if is_windows: if not dst_path: disk_paths = system().GetDiskInfo() datainfo = { 'data':[], 'file_nums':0, 'dir_nums':0, 'total_nums':0 } for d in disk_paths: if search: if d['path'].lower().find(search) == -1: continue datainfo['data'].append({"name":d['path'].lower(),"type":"pan","path":windows_path_replace(d['path'].lower(),is_windows=is_windows),"size":d['size'][0],"permissions":"","owner_uid":None,"owner":"","modified":""}) datainfo['total_nums'] = len(disk_paths) return datainfo if not os.path.exists(dst_path): return { 'data':[], 'file_nums':0, 'dir_nums':0, 'total_nums':0 } if not os.path.isdir(dst_path): raise ValueError("错误:非目录") data = [] dirData = [] fileData = [] file_nums = 0 dir_nums = 0 if not containSub: for entry in os.scandir(dst_path): if entry.is_file(): if isDir: continue if search: if entry.name.lower().find(search) == -1: continue file_nums = file_nums + 1 file_info = entry.stat() modified_time = datetime.datetime.fromtimestamp(file_info.st_mtime) formatted_time = modified_time.strftime("%Y-%m-%d %H:%M:%S") gid=file_info.st_gid group_name="" if not is_windows: group_name = system.GetGroupidName(entry.path,gid) tempData = {"name":entry.name,"type":"file","path":windows_path_replace(entry.path,is_windows=is_windows),"size":file_info.st_size,"permissions":oct(file_info.st_mode)[-3:],"owner_uid":file_info.st_uid,"owner":system.GetUidName(entry.path,file_info.st_uid),"gid":gid,"group":group_name,"modified":formatted_time} data.append(tempData) if is_dir_first: fileData.append(tempData) elif entry.is_dir(): if search: if entry.name.lower().find(search) == -1: continue dir_nums = dir_nums + 1 dir_info = entry.stat() modified_time = datetime.datetime.fromtimestamp(dir_info.st_mtime) formatted_time = modified_time.strftime("%Y-%m-%d %H:%M:%S") gid=dir_info.st_gid group_name="" if not is_windows: group_name = system.GetGroupidName(entry.path,gid) tempData = {"name":entry.name,"type":"dir","path":windows_path_replace(entry.path,is_windows=is_windows),"size":None,"permissions":oct(dir_info.st_mode)[-3:],"owner_uid":dir_info.st_uid,"owner":system.GetUidName(entry.path,dir_info.st_uid),"gid":gid,"group":group_name,"modified":formatted_time} data.append(tempData) if is_dir_first: dirData.append(tempData) else: count_limit = 0 max_limit = 3000 for root, dirs, files in os.walk(dst_path): if count_limit >= max_limit: break # 在当前目录下搜索文件 for file in files: if count_limit >= max_limit: break if search: if file.lower().find(search) == -1: continue file_nums = file_nums + 1 file_path = os.path.join(root, file) file_info = os.stat(file_path) modified_time = datetime.datetime.fromtimestamp(file_info.st_mtime) formatted_time = modified_time.strftime("%Y-%m-%d %H:%M:%S") tempData = {"name":file,"type":"file","path":windows_path_replace(file_path,is_windows=is_windows),"size":file_info.st_size,"permissions":oct(file_info.st_mode)[-3:],"owner_uid":file_info.st_uid,"owner":system.GetUidName(file_path,file_info.st_uid),"modified":formatted_time} data.append(tempData) if is_dir_first: fileData.append(tempData) count_limit += 1 # 在当前目录下搜索目录 for dir in dirs: if count_limit >= max_limit: break if search: if dir.lower().find(search) == -1: continue dir_nums = dir_nums + 1 dir_path = os.path.join(root, dir) dir_info = os.stat(dir_path) modified_time = datetime.datetime.fromtimestamp(dir_info.st_mtime) formatted_time = modified_time.strftime("%Y-%m-%d %H:%M:%S") tempData = {"name":dir,"type":"dir","path":windows_path_replace(dir_path,is_windows=is_windows),"size":None,"permissions":oct(dir_info.st_mode)[-3:],"owner_uid":dir_info.st_uid,"owner":system.GetUidName(dir_path,dir_info.st_uid),"modified":formatted_time} data.append(tempData) if is_dir_first: dirData.append(tempData) count_limit += 1 if is_dir_first: data = [] temp_dir_date1 = dirData[:4000] temp_dir_date2 = natsorted(temp_dir_date1,key=lambda x: x.get("name", ""), alg=ns.PATH, reverse=is_reverse) temp_dir_data = temp_dir_date2 + dirData[4000:] temp_file_date1 = fileData[:4000] temp_file_date2 = natsorted(temp_file_date1,key=lambda x: x.get("name", ""), alg=ns.PATH, reverse=is_reverse) temp_file_data = temp_file_date2 + fileData[4000:] data.extend(temp_dir_data) data.extend(temp_file_data) # if sort == "name": # # 根据 sort 参数对结果进行排序,海量文件可能导致排序缓慢,因此限制排序前4000个 # temp_date1 = data[:4000] # temp_date2 = natsorted(temp_date1,key=lambda x: x.get("name", ""), alg=ns.PATH, reverse=is_reverse) # data = temp_date2 + data[4000:] if sort == "modified": # 根据 sort 参数对结果进行排序,海量文件可能导致排序缓慢,因此限制排序前4000个 temp_date1 = data[:4000] temp_date2 = sorted(temp_date1, key=lambda x: x["modified"], reverse=is_reverse) data = temp_date2 + data[4000:] elif sort == "size": # 根据 sort 参数对结果进行排序,海量文件可能导致排序缓慢,因此限制排序前4000个 temp_date1 = data[:4000] temp_date2 = sorted(temp_date1, key=lambda x: x["size"] if x["size"] is not None else 0, reverse=is_reverse) data = temp_date2 + data[4000:] data_info = { 'data':data, 'file_nums':file_nums, 'dir_nums':dir_nums, 'total_nums':file_nums+dir_nums } return data_info def get_filedir_attribute(path,is_windows=False): """ @name 获取文件/目录属性 @author lybbn<2024-02-22> """ if not path: return None if os.path.isfile(path): name = os.path.basename(path) file_info = os.stat(path) modified_time = datetime.datetime.fromtimestamp(file_info.st_mtime) formatted_time = modified_time.strftime("%Y-%m-%d %H:%M:%S") access_time = datetime.datetime.fromtimestamp(file_info.st_atime) formatted_at = access_time.strftime("%Y-%m-%d %H:%M:%S") return {"name":name,"type":"file","is_link":is_link(path),"path":windows_path_replace(path,is_windows=is_windows),"size":file_info.st_size,"permissions":oct(file_info.st_mode)[-3:],"owner_uid":file_info.st_uid,"owner":system.GetUidName(path,file_info.st_uid),"gid":file_info.st_gid,"group":system.GetGroupidName(path,file_info.st_gid),"modified":formatted_time,"access_at":formatted_at} elif os.path.isdir(path): name = os.path.basename(path) dir_info = os.stat(path) modified_time = datetime.datetime.fromtimestamp(dir_info.st_mtime) formatted_time = modified_time.strftime("%Y-%m-%d %H:%M:%S") access_time = datetime.datetime.fromtimestamp(dir_info.st_atime) formatted_at = access_time.strftime("%Y-%m-%d %H:%M:%S") return {"name":name,"type":"dir","is_link":is_link(path),"path":windows_path_replace(path,is_windows=is_windows),"size":get_directory_size(path),"permissions":oct(dir_info.st_mode)[-3:],"owner_uid":dir_info.st_uid,"owner":system.GetUidName(path,dir_info.st_uid),"gid":dir_info.st_gid,"group":system.GetGroupidName(path,dir_info.st_gid),"modified":formatted_time,"access_at":formatted_at} return None def create_file(path,is_windows=False): """ @name 创建文件 @author lybbn<2024-02-22> """ #去除干扰 filename = os.path.basename(path).strip() filepath = os.path.dirname(path).strip() if not filename: raise ValueError("请填写文件名") if not is_windows: path = os.path.join(filepath, filename) else: filepath = filepath.replace("//", "/").replace("/", "\\") path = os.path.join(filepath, filename) if path[-1] == '.': raise ValueError("文件名不能以'.'点结尾") if len(filename)>100 or len(filename) < 1: raise ValueError("长度在1到100个字符之间") black_list = ['\\','/', '&', '*', '|', ';', '"', "'", '<', '>'] for black in black_list: if black in filename: raise ValueError("文件名不能包含指定特殊字符") if os.path.exists(path): return ValueError("该文件已存在") if not os.path.exists(filepath): os.makedirs(filepath) # 创建空文件 open(path, 'w+').close() def create_dir(path,is_windows=False): """ @name 创建目录 @author lybbn<2024-02-22> """ path = path.replace("//", "/") if path[-1] == '.': raise ValueError("目录名不能以'.'点结尾") dirname = os.path.basename(path) if len(dirname)>100 or len(dirname) < 1: raise ValueError("长度在1到100个字符之间") black_list = ['\\','/', '&', '*', '|', ';', '"', "'", '<', '>'] for black in black_list: if black in dirname: raise ValueError("文件名不能包含指定特殊字符") if os.path.exists(path): return ValueError("该目录已存在") os.makedirs(path) def delete_dir(path,is_windows=False): """ @name 删除目录 @author lybbn<2024-02-22> """ if not os.path.exists(path) and not os.path.islink(path): raise ValueError("目录不存在") #检查哪些目录不能被删除 check_no_delete(path,is_windows) if os.path.islink(path): os.remove(path) else: shutil.rmtree(path) def delete_file(path,is_windows=False): """ @name 删除文件 @author lybbn<2024-02-22> """ if not os.path.exists(path) and not os.path.islink(path): raise ValueError("文件不存在") #检查哪些目录不能被删除 check_no_delete(path,is_windows) os.remove(path) def rename_file(sPath,dPath,is_windows=False): """ @name 重命名文件或目录 @author lybbn<2024-02-22> @params sPath 源路径 @params dPath 新路径 """ if sPath == dPath: return ValueError("源目标名称相同,已忽略") dPath = dPath.replace("//", "/") sPath = sPath.replace('//', '/') if dPath[-1] == '.': raise ValueError("不能以'.'点结尾") if not os.path.exists(sPath): raise ValueError("源文件/目录不存在") if os.path.exists(dPath): raise ValueError("目标存在相同名称") if dPath[-1] == '/': dPath = dPath[:-1] #安全检查 if check_in_black_list(dPath,is_windows): raise ValueError("与系统内置冲突,请更换名称") os.rename(sPath, dPath) def copy_file(sPath,dPath,is_windows=False): """ @name 复制文件 @author lybbn<2024-02-22> @params sPath 源路径 @params dPath 新路径 """ # if sPath == dPath: # raise ValueError("源目标相同,已忽略") dPath = dPath.replace("//", "/") sPath = sPath.replace('//', '/') if dPath[-1] == '.': raise ValueError("不能以'.'点结尾") if not os.path.exists(sPath): raise ValueError("源文件不存在") # if os.path.exists(dPath): # raise ValueError("目标存在相同名称") shutil.copyfile(sPath, dPath) def copy_dir(sPath,dPath,is_windows=False,cover=False): """ @name 复制目录 @author lybbn<2024-02-22> @params sPath 源路径 @params dPath 新路径 """ if sPath == dPath: raise ValueError("源和目标相同,已忽略") dPath = dPath.replace("//", "/") sPath = sPath.replace('//', '/') if dPath[-1] == '.': raise ValueError("不能以'.'点结尾") if not os.path.exists(sPath): raise ValueError("源目录不存在") if cover and os.path.exists(dPath): shutil.rmtree(dPath) # if os.path.exists(dPath): # raise ValueError("目标存在相同名称") # if not os.path.exists(dPath): # os.makedirs(dPath) shutil.copytree(sPath, dPath) def move_file(sPath,dPath,is_windows=False,cover=False): """ @name 移动文件/目录 @author lybbn<2024-02-22> @params sPath 源路径 @params dPath 新路径 """ if sPath == dPath: raise ValueError("源和目标相同,已忽略") dPath = dPath.replace("//", "/") sPath = sPath.replace('//', '/') if dPath[-1] == '.': raise ValueError("不能以'.'点结尾") if dPath[-1] == '/': dPath = dPath[:-1] if not os.path.exists(sPath): raise ValueError("源目录不存在") #安全检查 if check_in_black_list(dPath,is_windows): raise ValueError("与系统内置冲突,请更换名称") is_dir = os.path.isdir(sPath) if cover and os.path.exists(dPath): if is_dir: shutil.rmtree(dPath) else: os.remove(dPath) shutil.move(sPath, dPath) def batch_operate(param,is_windows=False): """ @name 批量操作(移动、复制、压缩、权限、删除) @author lybbn<2024-02-22> @params param 请求参数 """ type = param.get('type',None) if type in ['copy','move']: confirm = param.get('confirm',False) skip_list = ast_convert(param.get('skipList',[]))#需要跳过覆盖的文件列表 dPath = param.get('path',"") sPath = ast_convert(param.get('spath',[])) if not dPath or not sPath: return False,"参数错误",4000,None dPath = dPath.replace('//', '/') # if dPath[-1] == '/': # dPath = dPath[:-1] conflict_list = [] if not confirm:#初次先检查有冲突则返回确认 for d in sPath: if d[-1] == '.': raise ValueError("%s不能以'.'点结尾"%d) if d[-1] == '/': d = d[:-1] dfile = dPath + '/' + os.path.basename(d) if os.path.exists(dfile): conflict_list.append({ 'path':d, 'name':os.path.basename(d) }) if conflict_list: return False,"文件冲突",4050,conflict_list if skip_list: for s in skip_list: if s['path'] in sPath: sPath.remove(s) if type == 'copy': for sf in sPath: dfile = dPath + '/' + os.path.basename(sf) if os.path.commonpath([dfile, sf]) == sf: return False,'从{}复制到{}有包含关系,请更换目标目录!'.format(sf, dfile),4000,None for sf in sPath: dfile = dPath + '/' + os.path.basename(sf) if os.path.isdir(sf): shutil.copytree(sf, dfile) else: shutil.copyfile(sf, dfile) return True,"批量复制成功",2000,None else: for sf in sPath: dfile = dPath + '/' + os.path.basename(sf) move_file(sPath=sf,dPath=dfile,is_windows=is_windows,cover=True) return True,"批量剪切成功",2000,None elif type == 'zip': dPath = param.get('path',"") sPath = ast_convert(param.get('spath',[])) zip_type = param.get('zip_type',"") if not dPath or not sPath: return False,"参数错误",4000,None if not zip_type in ["tar","zip"]: return False,"不支持的压缩格式",4000,None dPath = dPath.replace('//', '/') # if dPath[-1] == '/': # dPath = dPath[:-1] func_zip(zip_filename=dPath,items=sPath,zip_type=zip_type) return True,"压缩成功",2000,None elif type == 'unzip': dPath = param.get('path',"") sPath = param.get('spath',"") zip_type = param.get('zip_type',"") if not dPath or not sPath: return False,"参数错误",4000,None dPath = dPath.replace('//', '/') func_unzip(zip_filename=sPath,extract_path=dPath) return True,"解压成功",2000,None elif type == 'pms': pass elif type == 'del': sPath = ast_convert(param.get('spath',[])) for sf in sPath: if os.path.isdir(sf): delete_dir(path=sf,is_windows=is_windows) else: delete_file(path=sf,is_windows=is_windows) return True,"批量删除成功",2000,None else: return False,"类型错误",4000,None def func_zip(zip_filename,items,zip_type): """ @name 压缩 @author lybbn<2024-03-07> @param zip_filename 压缩后的文件名(含路径) @param items 需要压缩的文件或目录列表['/var/log/ruyi.log'] @param zip_type 压缩类型(tar 格式.tar.gz、zip 格式 .zip) """ if not items: raise ValueError("需要压缩的文件或目录不能为空") if zip_type == "zip": zip_directories_and_files(zip_filename,items) elif zip_type == "tar": create_tar_gz(zip_filename,items) else: raise ValueError("不支持的压缩格式") def func_unzip(zip_filename,extract_path): """ @name 解压 @author lybbn<2024-03-07> @param zip_filename 压缩文件名(含路径) @param extract_path 需要解压的目标目录 """ if current_os == "windows": #解除占用 from utils.server.windows import kill_cmd_if_working_dir kill_cmd_if_working_dir(extract_path) _, ext = os.path.splitext(zip_filename) if ext in ['.tar.gz','.tgz','.tar.bz2','.tbz']: with tarfile.open(zip_filename, 'r') as tar: tar.extractall(extract_path) elif ext == '.zip': with zipfile.ZipFile(zip_filename, 'r') as zipf: zipf.extractall(extract_path) else: raise ValueError("不支持的文件格式") def zip_directories_and_files(zip_filename, items): with zipfile.ZipFile(zip_filename, 'w') as zipf: for item in items: if os.path.isfile(item): zipf.write(item, os.path.basename(item)) elif os.path.isdir(item): for root, _, files in os.walk(item): for file in files: file_path = os.path.join(root, file) zipf.write(file_path, os.path.relpath(file_path, os.path.dirname(item))) def create_tar_gz(tar_filename, items): with tarfile.open(tar_filename, "w:gz") as tar: for item in items: if os.path.isfile(item): tar.add(item, arcname=os.path.basename(item)) elif os.path.isdir(item): tar.add(item, arcname=os.path.basename(item))