816 lines
30 KiB
Python
816 lines
30 KiB
Python
#!/bin/python
|
||
#coding: utf-8
|
||
# +-------------------------------------------------------------------
|
||
# | system: 如意面板
|
||
# +-------------------------------------------------------------------
|
||
# | Author: lybbn
|
||
# +-------------------------------------------------------------------
|
||
# | QQ: 1042594286
|
||
# +-------------------------------------------------------------------
|
||
# | Date: 2024-01-03
|
||
# +-------------------------------------------------------------------
|
||
|
||
# ------------------------------
|
||
# 文件/目录操作
|
||
# ------------------------------
|
||
|
||
import re
|
||
import os
|
||
import shutil
|
||
import zipfile
|
||
import tarfile
|
||
import datetime
|
||
import mimetypes
|
||
import requests
|
||
from natsort import natsorted, ns
|
||
from utils.server.system import system
|
||
from utils.security.no_delete_list import check_no_delete,check_in_black_list
|
||
from utils.common import ast_convert,WriteFile,RunCommand,current_os
|
||
from itertools import chain
|
||
|
||
def get_file_name_from_url(url):
|
||
"""
|
||
@name 使用 os.path.basename() 函数获取 URL 中的文件名
|
||
@author lybbn<2024-02-22>
|
||
"""
|
||
file_name = os.path.basename(url)
|
||
return file_name
|
||
|
||
def get_file_extension(file_path):
|
||
"""
|
||
@name 获取文件后缀扩展
|
||
@author lybbn<2024-02-22>
|
||
"""
|
||
_, extension = os.path.splitext(file_path)
|
||
return extension
|
||
|
||
def detect_file_type(file_path):
|
||
"""
|
||
@name 检测文件类型
|
||
@author lybbn<2024-02-22>
|
||
"""
|
||
file_type, _ = mimetypes.guess_type(file_path)
|
||
return file_type
|
||
|
||
def auto_detect_file_language(file_path):
|
||
"""
|
||
@name 智能检测文件所属语言
|
||
@author lybbn<2024-03-08>
|
||
"""
|
||
ext = get_file_extension(file_path)
|
||
if ext in ['.readme','.md']:
|
||
return "markdown"
|
||
elif ext in ['.sh']:
|
||
return "shell"
|
||
elif ext in ['.lua']:
|
||
return "lua"
|
||
elif ext in ['.rb']:
|
||
return "ruby"
|
||
elif ext in ['.js','.ts']:
|
||
return "javascript"
|
||
elif ext in ['.html','htm']:
|
||
return "html"
|
||
elif ext in ['.css','.scss','.sass','.less']:
|
||
return "css"
|
||
elif ext in ['.json']:
|
||
return "json"
|
||
elif ext in ['.py']:
|
||
return "python"
|
||
elif ext in ['.yaml','.yml']:
|
||
return "yaml"
|
||
elif ext in ['.conf','.ini']:
|
||
if 'nginx' in file_path:
|
||
return "nginx"
|
||
return "properties"
|
||
elif ext in ['.vue']:
|
||
return "vue"
|
||
elif ext in ['.php']:
|
||
return "php"
|
||
elif ext in ['.java']:
|
||
return "java"
|
||
elif ext in ['.go']:
|
||
return "go"
|
||
elif ext in ['.sql']:
|
||
return "sql"
|
||
elif ext in ['.xml']:
|
||
return "xml"
|
||
else:
|
||
return "log"
|
||
def list_dirs(dst_path):
|
||
"""
|
||
列出指定目录下文件\目录名
|
||
返回指定目录下文件+目录名的列表
|
||
"""
|
||
if not os.path.exists(dst_path):
|
||
return []
|
||
data = []
|
||
for f in os.listdir(dst_path):
|
||
data.append(f)
|
||
return data
|
||
|
||
def get_size(file_path):
|
||
"""
|
||
@name 获取文件大小
|
||
@author lybbn<2024-02-22>
|
||
"""
|
||
return os.path.getsize(file_path)
|
||
|
||
def is_link(file_path):
|
||
"""
|
||
@name 是否软链接
|
||
@author lybbn<2024-02-22>
|
||
"""
|
||
return os.path.islink(file_path)
|
||
|
||
def get_directory_size(dst_path):
|
||
"""
|
||
@name 计算指定目录大小
|
||
@author lybbn<2024-02-22>
|
||
"""
|
||
if current_os == "windows":
|
||
total_size = 0
|
||
for path, dirs, files in os.walk(dst_path):
|
||
for file in files:
|
||
if not os.path.exists(file): continue
|
||
if os.path.islink(file): continue
|
||
file_path = os.path.join(path, file)
|
||
total_size += os.path.getsize(file_path)
|
||
return total_size
|
||
else:
|
||
result,err,returncode = RunCommand(f"du -sh {dst_path}",returncode=True)
|
||
if returncode == 0:
|
||
size = result.split()[0]
|
||
return re.sub(r'(\d+)([a-zA-Z])', r'\1 \2', size)+"B"
|
||
else:
|
||
return "0 B"
|
||
|
||
def get_path_files_nums(path):
|
||
"""
|
||
@name 获取指定目录文件数量
|
||
@author lybbn<2024-02-22>
|
||
"""
|
||
if os.path.isfile(path):
|
||
return 1
|
||
if not os.path.exists(path):
|
||
return 0
|
||
i = 0
|
||
for name in os.listdir(path):
|
||
i += 1
|
||
return i
|
||
|
||
def get_filename_ext(filename):
|
||
"""
|
||
@name 获取文件扩展名
|
||
@author lybbn<2024-02-22>
|
||
"""
|
||
tmpList = filename.split('.')
|
||
return tmpList[-1]
|
||
|
||
def windows_path_replace(path,is_windows = True):
|
||
"""
|
||
@name 把path中的\\ sep替换成 /
|
||
@author lybbn<2024-02-22>
|
||
"""
|
||
if is_windows:
|
||
path = path.replace("\\", "/")
|
||
return path
|
||
|
||
def list_files_in_directory(dst_path,sort="name",is_reverse=False,is_windows=False,search=None,containSub=False,isDir=False):
|
||
"""
|
||
@name 列出指定目录下文件\目录名列表,包含文件\目录属性(大小、路径、权限、所属者)
|
||
目录size大小默认不计算为0
|
||
owner所属者为空可能为文件/目录无权限查看或被占用等
|
||
@author lybbn<2024-02-22>
|
||
@param sort 排序
|
||
@param is_reverse True 降序(desc) 、False 升序(asc)
|
||
@param search 搜索名称
|
||
@param containSub 搜索内容是否包含所有子目录
|
||
@param isDir 是否只列出目录
|
||
"""
|
||
def get_file_info(entry):
|
||
"""获取文件信息的内部函数"""
|
||
file_info = entry.stat()
|
||
modified_time = datetime.datetime.fromtimestamp(file_info.st_mtime)
|
||
formatted_time = modified_time.strftime("%Y-%m-%d %H:%M:%S")
|
||
gid = file_info.st_gid
|
||
group_name = "" if is_windows else system.GetGroupidName(entry.path, gid)
|
||
|
||
return {
|
||
"name": entry.name,
|
||
"type": "file" if entry.is_file() else "dir",
|
||
"path": windows_path_replace(entry.path, is_windows=is_windows),
|
||
"size": file_info.st_size if entry.is_file() else None,
|
||
"permissions": oct(file_info.st_mode)[-3:],
|
||
"owner_uid": file_info.st_uid,
|
||
"owner": system.GetUidName(entry.path, file_info.st_uid),
|
||
"gid": gid,
|
||
"group": group_name,
|
||
"modified": formatted_time
|
||
}
|
||
|
||
def process_entry(entry):
|
||
"""处理单个目录项"""
|
||
if search and search.lower() not in entry.name.lower():
|
||
return None
|
||
if isDir and entry.is_file():
|
||
return None
|
||
return get_file_info(entry)
|
||
|
||
# 处理Windows磁盘根目录情况
|
||
if is_windows and not dst_path:
|
||
disk_paths = system().GetDiskInfo()
|
||
datainfo = {
|
||
'data':[],
|
||
'file_nums':0,
|
||
'dir_nums':0,
|
||
'total_nums':0
|
||
}
|
||
for d in disk_paths:
|
||
if search and d['path'].lower().find(search) == -1:
|
||
continue
|
||
datainfo['data'].append({
|
||
"name": d['path'].lower(),
|
||
"type": "pan",
|
||
"path": windows_path_replace(d['path'].lower(), is_windows=is_windows),
|
||
"size": d['size'][0],
|
||
"permissions": "",
|
||
"owner_uid": None,
|
||
"owner": "",
|
||
"modified": ""
|
||
})
|
||
datainfo['total_nums'] = len(disk_paths)
|
||
return datainfo
|
||
|
||
# 检查路径有效性
|
||
if not os.path.exists(dst_path):
|
||
return {
|
||
'data': [],
|
||
'file_nums': 0,
|
||
'dir_nums': 0,
|
||
'total_nums': 0
|
||
}
|
||
if not os.path.isdir(dst_path):
|
||
raise ValueError("错误:非目录")
|
||
|
||
# 处理不包含子目录的情况
|
||
if not containSub:
|
||
dirData = []
|
||
fileData = []
|
||
for entry in os.scandir(dst_path):
|
||
item = process_entry(entry)
|
||
if item:
|
||
if item["type"] == "dir":
|
||
dirData.append(item)
|
||
else:
|
||
fileData.append(item)
|
||
|
||
# 对目录和文件分别排序
|
||
if sort == "name":
|
||
dirData = natsorted(dirData, key=lambda x: x["name"], alg=ns.PATH, reverse=is_reverse)
|
||
fileData = natsorted(fileData, key=lambda x: x["name"], alg=ns.PATH, reverse=is_reverse)
|
||
elif sort == "modified":
|
||
dirData = sorted(dirData, key=lambda x: x["modified"], reverse=is_reverse)
|
||
fileData = sorted(fileData, key=lambda x: x["modified"], reverse=is_reverse)
|
||
elif sort == "size":
|
||
dirData = sorted(dirData, key=lambda x: x["size"] if x["size"] is not None else 0, reverse=is_reverse)
|
||
fileData = sorted(fileData, key=lambda x: x["size"] if x["size"] is not None else 0, reverse=is_reverse)
|
||
|
||
data = dirData + fileData
|
||
else:
|
||
# 处理包含子目录的情况
|
||
data = []
|
||
count_limit = 0
|
||
max_limit = 3000
|
||
for root, dirs, files in os.walk(dst_path):
|
||
if count_limit >= max_limit:
|
||
break
|
||
for entry in chain((os.path.join(root, f) for f in files),
|
||
(os.path.join(root, d) for d in dirs)):
|
||
if count_limit >= max_limit:
|
||
break
|
||
info = process_entry(os.DirEntry(entry))
|
||
if info:
|
||
data.append(info)
|
||
count_limit += 1
|
||
|
||
# 对结果进行排序
|
||
if sort == "name":
|
||
data = natsorted(data, key=lambda x: x["name"], alg=ns.PATH, reverse=is_reverse)
|
||
elif sort == "modified":
|
||
data = sorted(data, key=lambda x: x["modified"], reverse=is_reverse)
|
||
elif sort == "size":
|
||
data = sorted(data, key=lambda x: x["size"] if x["size"] is not None else 0, reverse=is_reverse)
|
||
|
||
# 统计文件数量
|
||
file_nums = sum(1 for item in data if item["type"] == "file")
|
||
dir_nums = sum(1 for item in data if item["type"] == "dir")
|
||
|
||
return {
|
||
'data': data,
|
||
'file_nums': file_nums,
|
||
'dir_nums': dir_nums,
|
||
'total_nums': file_nums + dir_nums
|
||
}
|
||
def list_files_in_directory_old(dst_path,sort="name",is_reverse=False,is_windows=False,search=None,containSub=False,isDir=False):
|
||
"""
|
||
@name 列出指定目录下文件\目录名列表,包含文件\目录属性(大小、路径、权限、所属者)
|
||
目录size大小默认不计算为0
|
||
owner所属者为空可能为文件/目录无权限查看或被占用等
|
||
@author lybbn<2024-02-22>
|
||
@param sort 排序
|
||
@param is_reverse True 降序(desc) 、False 升序(asc)
|
||
@param search 搜索名称
|
||
@param containSub 搜索内容是否包含所有子目录
|
||
@param isDir 是否只列出目录
|
||
"""
|
||
is_dir_first = True#是否把目录放在前面
|
||
if is_windows:
|
||
if not dst_path:
|
||
disk_paths = system().GetDiskInfo()
|
||
datainfo = {
|
||
'data':[],
|
||
'file_nums':0,
|
||
'dir_nums':0,
|
||
'total_nums':0
|
||
}
|
||
for d in disk_paths:
|
||
if search:
|
||
if d['path'].lower().find(search) == -1:
|
||
continue
|
||
datainfo['data'].append({"name":d['path'].lower(),"type":"pan","path":windows_path_replace(d['path'].lower(),is_windows=is_windows),"size":d['size'][0],"permissions":"","owner_uid":None,"owner":"","modified":""})
|
||
datainfo['total_nums'] = len(disk_paths)
|
||
return datainfo
|
||
if not os.path.exists(dst_path):
|
||
return {
|
||
'data':[],
|
||
'file_nums':0,
|
||
'dir_nums':0,
|
||
'total_nums':0
|
||
}
|
||
if not os.path.isdir(dst_path):
|
||
raise ValueError("错误:非目录")
|
||
data = []
|
||
dirData = []
|
||
fileData = []
|
||
file_nums = 0
|
||
dir_nums = 0
|
||
if not containSub:
|
||
for entry in os.scandir(dst_path):
|
||
if entry.is_file():
|
||
if isDir:
|
||
continue
|
||
if search:
|
||
if entry.name.lower().find(search) == -1:
|
||
continue
|
||
file_nums = file_nums + 1
|
||
file_info = entry.stat()
|
||
modified_time = datetime.datetime.fromtimestamp(file_info.st_mtime)
|
||
formatted_time = modified_time.strftime("%Y-%m-%d %H:%M:%S")
|
||
gid=file_info.st_gid
|
||
group_name=""
|
||
if not is_windows:
|
||
group_name = system.GetGroupidName(entry.path,gid)
|
||
tempData = {"name":entry.name,"type":"file","path":windows_path_replace(entry.path,is_windows=is_windows),"size":file_info.st_size,"permissions":oct(file_info.st_mode)[-3:],"owner_uid":file_info.st_uid,"owner":system.GetUidName(entry.path,file_info.st_uid),"gid":gid,"group":group_name,"modified":formatted_time}
|
||
data.append(tempData)
|
||
if is_dir_first:
|
||
fileData.append(tempData)
|
||
elif entry.is_dir():
|
||
if search:
|
||
if entry.name.lower().find(search) == -1:
|
||
continue
|
||
dir_nums = dir_nums + 1
|
||
dir_info = entry.stat()
|
||
modified_time = datetime.datetime.fromtimestamp(dir_info.st_mtime)
|
||
formatted_time = modified_time.strftime("%Y-%m-%d %H:%M:%S")
|
||
gid=dir_info.st_gid
|
||
group_name=""
|
||
if not is_windows:
|
||
group_name = system.GetGroupidName(entry.path,gid)
|
||
tempData = {"name":entry.name,"type":"dir","path":windows_path_replace(entry.path,is_windows=is_windows),"size":None,"permissions":oct(dir_info.st_mode)[-3:],"owner_uid":dir_info.st_uid,"owner":system.GetUidName(entry.path,dir_info.st_uid),"gid":gid,"group":group_name,"modified":formatted_time}
|
||
data.append(tempData)
|
||
if is_dir_first:
|
||
dirData.append(tempData)
|
||
else:
|
||
count_limit = 0
|
||
max_limit = 3000
|
||
for root, dirs, files in os.walk(dst_path):
|
||
if count_limit >= max_limit:
|
||
break
|
||
# 在当前目录下搜索文件
|
||
for file in files:
|
||
if count_limit >= max_limit:
|
||
break
|
||
if search:
|
||
if file.lower().find(search) == -1:
|
||
continue
|
||
file_nums = file_nums + 1
|
||
file_path = os.path.join(root, file)
|
||
file_info = os.stat(file_path)
|
||
modified_time = datetime.datetime.fromtimestamp(file_info.st_mtime)
|
||
formatted_time = modified_time.strftime("%Y-%m-%d %H:%M:%S")
|
||
tempData = {"name":file,"type":"file","path":windows_path_replace(file_path,is_windows=is_windows),"size":file_info.st_size,"permissions":oct(file_info.st_mode)[-3:],"owner_uid":file_info.st_uid,"owner":system.GetUidName(file_path,file_info.st_uid),"modified":formatted_time}
|
||
data.append(tempData)
|
||
if is_dir_first:
|
||
fileData.append(tempData)
|
||
count_limit += 1
|
||
# 在当前目录下搜索目录
|
||
for dir in dirs:
|
||
if count_limit >= max_limit:
|
||
break
|
||
if search:
|
||
if dir.lower().find(search) == -1:
|
||
continue
|
||
dir_nums = dir_nums + 1
|
||
dir_path = os.path.join(root, dir)
|
||
dir_info = os.stat(dir_path)
|
||
modified_time = datetime.datetime.fromtimestamp(dir_info.st_mtime)
|
||
formatted_time = modified_time.strftime("%Y-%m-%d %H:%M:%S")
|
||
tempData = {"name":dir,"type":"dir","path":windows_path_replace(dir_path,is_windows=is_windows),"size":None,"permissions":oct(dir_info.st_mode)[-3:],"owner_uid":dir_info.st_uid,"owner":system.GetUidName(dir_path,dir_info.st_uid),"modified":formatted_time}
|
||
data.append(tempData)
|
||
if is_dir_first:
|
||
dirData.append(tempData)
|
||
count_limit += 1
|
||
if is_dir_first:
|
||
data = []
|
||
temp_dir_date1 = dirData[:4000]
|
||
temp_dir_date2 = natsorted(temp_dir_date1,key=lambda x: x.get("name", ""), alg=ns.PATH, reverse=is_reverse)
|
||
temp_dir_data = temp_dir_date2 + dirData[4000:]
|
||
temp_file_date1 = fileData[:4000]
|
||
temp_file_date2 = natsorted(temp_file_date1,key=lambda x: x.get("name", ""), alg=ns.PATH, reverse=is_reverse)
|
||
temp_file_data = temp_file_date2 + fileData[4000:]
|
||
data.extend(temp_dir_data)
|
||
data.extend(temp_file_data)
|
||
|
||
# if sort == "name":
|
||
# # 根据 sort 参数对结果进行排序,海量文件可能导致排序缓慢,因此限制排序前4000个
|
||
# temp_date1 = data[:4000]
|
||
# temp_date2 = natsorted(temp_date1,key=lambda x: x.get("name", ""), alg=ns.PATH, reverse=is_reverse)
|
||
# data = temp_date2 + data[4000:]
|
||
if sort == "modified":
|
||
# 根据 sort 参数对结果进行排序,海量文件可能导致排序缓慢,因此限制排序前4000个
|
||
temp_date1 = data[:4000]
|
||
temp_date2 = sorted(temp_date1, key=lambda x: x["modified"], reverse=is_reverse)
|
||
data = temp_date2 + data[4000:]
|
||
elif sort == "size":
|
||
# 根据 sort 参数对结果进行排序,海量文件可能导致排序缓慢,因此限制排序前4000个
|
||
temp_date1 = data[:4000]
|
||
temp_date2 = sorted(temp_date1, key=lambda x: x["size"] if x["size"] is not None else 0, reverse=is_reverse)
|
||
data = temp_date2 + data[4000:]
|
||
|
||
data_info = {
|
||
'data':data,
|
||
'file_nums':file_nums,
|
||
'dir_nums':dir_nums,
|
||
'total_nums':file_nums+dir_nums
|
||
}
|
||
return data_info
|
||
|
||
def get_filedir_attribute(path,is_windows=False):
|
||
"""
|
||
@name 获取文件/目录属性
|
||
@author lybbn<2024-02-22>
|
||
"""
|
||
if not path:
|
||
return None
|
||
if os.path.isfile(path):
|
||
name = os.path.basename(path)
|
||
file_info = os.stat(path)
|
||
modified_time = datetime.datetime.fromtimestamp(file_info.st_mtime)
|
||
formatted_time = modified_time.strftime("%Y-%m-%d %H:%M:%S")
|
||
access_time = datetime.datetime.fromtimestamp(file_info.st_atime)
|
||
formatted_at = access_time.strftime("%Y-%m-%d %H:%M:%S")
|
||
return {"name":name,"type":"file","is_link":is_link(path),"path":windows_path_replace(path,is_windows=is_windows),"size":file_info.st_size,"permissions":oct(file_info.st_mode)[-3:],"owner_uid":file_info.st_uid,"owner":system.GetUidName(path,file_info.st_uid),"gid":file_info.st_gid,"group":system.GetGroupidName(path,file_info.st_gid),"modified":formatted_time,"access_at":formatted_at}
|
||
elif os.path.isdir(path):
|
||
name = os.path.basename(path)
|
||
dir_info = os.stat(path)
|
||
modified_time = datetime.datetime.fromtimestamp(dir_info.st_mtime)
|
||
formatted_time = modified_time.strftime("%Y-%m-%d %H:%M:%S")
|
||
access_time = datetime.datetime.fromtimestamp(dir_info.st_atime)
|
||
formatted_at = access_time.strftime("%Y-%m-%d %H:%M:%S")
|
||
return {"name":name,"type":"dir","is_link":is_link(path),"path":windows_path_replace(path,is_windows=is_windows),"size":get_directory_size(path),"permissions":oct(dir_info.st_mode)[-3:],"owner_uid":dir_info.st_uid,"owner":system.GetUidName(path,dir_info.st_uid),"gid":dir_info.st_gid,"group":system.GetGroupidName(path,dir_info.st_gid),"modified":formatted_time,"access_at":formatted_at}
|
||
return None
|
||
|
||
def create_file(path,is_windows=False):
|
||
"""
|
||
@name 创建文件
|
||
@author lybbn<2024-02-22>
|
||
"""
|
||
#去除干扰
|
||
filename = os.path.basename(path).strip()
|
||
filepath = os.path.dirname(path).strip()
|
||
if not filename:
|
||
raise ValueError("请填写文件名")
|
||
if not is_windows:
|
||
path = os.path.join(filepath, filename)
|
||
else:
|
||
filepath = filepath.replace("//", "/").replace("/", "\\")
|
||
path = os.path.join(filepath, filename)
|
||
if path[-1] == '.':
|
||
raise ValueError("文件名不能以'.'点结尾")
|
||
if len(filename)>100 or len(filename) < 1:
|
||
raise ValueError("长度在1到100个字符之间")
|
||
black_list = ['\\','/', '&', '*', '|', ';', '"', "'", '<', '>']
|
||
for black in black_list:
|
||
if black in filename:
|
||
raise ValueError("文件名不能包含指定特殊字符")
|
||
if os.path.exists(path):
|
||
return ValueError("该文件已存在")
|
||
if not os.path.exists(filepath):
|
||
os.makedirs(filepath)
|
||
# 创建空文件
|
||
open(path, 'w+').close()
|
||
|
||
def create_dir(path,is_windows=False):
|
||
"""
|
||
@name 创建目录
|
||
@author lybbn<2024-02-22>
|
||
"""
|
||
path = path.replace("//", "/")
|
||
if path[-1] == '.':
|
||
raise ValueError("目录名不能以'.'点结尾")
|
||
dirname = os.path.basename(path)
|
||
if len(dirname)>100 or len(dirname) < 1:
|
||
raise ValueError("长度在1到100个字符之间")
|
||
black_list = ['\\','/', '&', '*', '|', ';', '"', "'", '<', '>']
|
||
for black in black_list:
|
||
if black in dirname:
|
||
raise ValueError("文件名不能包含指定特殊字符")
|
||
if os.path.exists(path):
|
||
return ValueError("该目录已存在")
|
||
os.makedirs(path)
|
||
|
||
def delete_dir(path,is_windows=False):
|
||
"""
|
||
@name 删除目录
|
||
@author lybbn<2024-02-22>
|
||
"""
|
||
if not os.path.exists(path) and not os.path.islink(path):
|
||
raise ValueError("目录不存在")
|
||
|
||
#检查哪些目录不能被删除
|
||
check_no_delete(path,is_windows)
|
||
|
||
if os.path.islink(path):
|
||
os.remove(path)
|
||
else:
|
||
shutil.rmtree(path)
|
||
|
||
def delete_file(path,is_windows=False):
|
||
"""
|
||
@name 删除文件
|
||
@author lybbn<2024-02-22>
|
||
"""
|
||
if not os.path.exists(path) and not os.path.islink(path):
|
||
raise ValueError("文件不存在")
|
||
|
||
#检查哪些目录不能被删除
|
||
check_no_delete(path,is_windows)
|
||
|
||
os.remove(path)
|
||
|
||
def rename_file(sPath,dPath,is_windows=False):
|
||
"""
|
||
@name 重命名文件或目录
|
||
@author lybbn<2024-02-22>
|
||
@params sPath 源路径
|
||
@params dPath 新路径
|
||
"""
|
||
if sPath == dPath:
|
||
return ValueError("源目标名称相同,已忽略")
|
||
dPath = dPath.replace("//", "/")
|
||
sPath = sPath.replace('//', '/')
|
||
if dPath[-1] == '.':
|
||
raise ValueError("不能以'.'点结尾")
|
||
if not os.path.exists(sPath):
|
||
raise ValueError("源文件/目录不存在")
|
||
if os.path.exists(dPath):
|
||
raise ValueError("目标存在相同名称")
|
||
if dPath[-1] == '/':
|
||
dPath = dPath[:-1]
|
||
|
||
#安全检查
|
||
if check_in_black_list(dPath,is_windows):
|
||
raise ValueError("与系统内置冲突,请更换名称")
|
||
|
||
os.rename(sPath, dPath)
|
||
|
||
def copy_file(sPath,dPath,is_windows=False):
|
||
"""
|
||
@name 复制文件
|
||
@author lybbn<2024-02-22>
|
||
@params sPath 源路径
|
||
@params dPath 新路径
|
||
"""
|
||
# if sPath == dPath:
|
||
# raise ValueError("源目标相同,已忽略")
|
||
dPath = dPath.replace("//", "/")
|
||
sPath = sPath.replace('//', '/')
|
||
if dPath[-1] == '.':
|
||
raise ValueError("不能以'.'点结尾")
|
||
if not os.path.exists(sPath):
|
||
raise ValueError("源文件不存在")
|
||
# if os.path.exists(dPath):
|
||
# raise ValueError("目标存在相同名称")
|
||
|
||
shutil.copyfile(sPath, dPath)
|
||
|
||
def copy_dir(sPath,dPath,is_windows=False,cover=False):
|
||
"""
|
||
@name 复制目录
|
||
@author lybbn<2024-02-22>
|
||
@params sPath 源路径
|
||
@params dPath 新路径
|
||
"""
|
||
if sPath == dPath:
|
||
raise ValueError("源和目标相同,已忽略")
|
||
dPath = dPath.replace("//", "/")
|
||
sPath = sPath.replace('//', '/')
|
||
if dPath[-1] == '.':
|
||
raise ValueError("不能以'.'点结尾")
|
||
if not os.path.exists(sPath):
|
||
raise ValueError("源目录不存在")
|
||
if cover and os.path.exists(dPath):
|
||
shutil.rmtree(dPath)
|
||
# if os.path.exists(dPath):
|
||
# raise ValueError("目标存在相同名称")
|
||
# if not os.path.exists(dPath):
|
||
# os.makedirs(dPath)
|
||
shutil.copytree(sPath, dPath)
|
||
|
||
def move_file(sPath,dPath,is_windows=False,cover=False):
|
||
"""
|
||
@name 移动文件/目录
|
||
@author lybbn<2024-02-22>
|
||
@params sPath 源路径
|
||
@params dPath 新路径
|
||
"""
|
||
if sPath == dPath:
|
||
raise ValueError("源和目标相同,已忽略")
|
||
dPath = dPath.replace("//", "/")
|
||
sPath = sPath.replace('//', '/')
|
||
if dPath[-1] == '.':
|
||
raise ValueError("不能以'.'点结尾")
|
||
if dPath[-1] == '/':
|
||
dPath = dPath[:-1]
|
||
if not os.path.exists(sPath):
|
||
raise ValueError("源目录不存在")
|
||
#安全检查
|
||
if check_in_black_list(dPath,is_windows):
|
||
raise ValueError("与系统内置冲突,请更换名称")
|
||
is_dir = os.path.isdir(sPath)
|
||
if cover and os.path.exists(dPath):
|
||
if is_dir:
|
||
shutil.rmtree(dPath)
|
||
else:
|
||
os.remove(dPath)
|
||
|
||
shutil.move(sPath, dPath)
|
||
|
||
def batch_operate(param,is_windows=False):
|
||
"""
|
||
@name 批量操作(移动、复制、压缩、权限、删除)
|
||
@author lybbn<2024-02-22>
|
||
@params param 请求参数
|
||
"""
|
||
type = param.get('type',None)
|
||
if type in ['copy','move']:
|
||
confirm = param.get('confirm',False)
|
||
skip_list = ast_convert(param.get('skipList',[]))#需要跳过覆盖的文件列表
|
||
dPath = param.get('path',"")
|
||
sPath = ast_convert(param.get('spath',[]))
|
||
if not dPath or not sPath:
|
||
return False,"参数错误",4000,None
|
||
dPath = dPath.replace('//', '/')
|
||
# if dPath[-1] == '/':
|
||
# dPath = dPath[:-1]
|
||
conflict_list = []
|
||
if not confirm:#初次先检查有冲突则返回确认
|
||
for d in sPath:
|
||
if d[-1] == '.':
|
||
raise ValueError("%s不能以'.'点结尾"%d)
|
||
if d[-1] == '/':
|
||
d = d[:-1]
|
||
dfile = dPath + '/' + os.path.basename(d)
|
||
if os.path.exists(dfile):
|
||
conflict_list.append({
|
||
'path':d,
|
||
'name':os.path.basename(d)
|
||
})
|
||
if conflict_list:
|
||
return False,"文件冲突",4050,conflict_list
|
||
|
||
if skip_list:
|
||
for s in skip_list:
|
||
if s['path'] in sPath:
|
||
sPath.remove(s)
|
||
|
||
if type == 'copy':
|
||
for sf in sPath:
|
||
dfile = dPath + '/' + os.path.basename(sf)
|
||
if os.path.commonpath([dfile, sf]) == sf:
|
||
return False,'从{}复制到{}有包含关系,请更换目标目录!'.format(sf, dfile),4000,None
|
||
for sf in sPath:
|
||
dfile = dPath + '/' + os.path.basename(sf)
|
||
if os.path.isdir(sf):
|
||
shutil.copytree(sf, dfile)
|
||
else:
|
||
shutil.copyfile(sf, dfile)
|
||
return True,"批量复制成功",2000,None
|
||
else:
|
||
for sf in sPath:
|
||
dfile = dPath + '/' + os.path.basename(sf)
|
||
move_file(sPath=sf,dPath=dfile,is_windows=is_windows,cover=True)
|
||
return True,"批量剪切成功",2000,None
|
||
elif type == 'zip':
|
||
dPath = param.get('path',"")
|
||
sPath = ast_convert(param.get('spath',[]))
|
||
zip_type = param.get('zip_type',"")
|
||
if not dPath or not sPath:
|
||
return False,"参数错误",4000,None
|
||
if not zip_type in ["tar","zip"]:
|
||
return False,"不支持的压缩格式",4000,None
|
||
dPath = dPath.replace('//', '/')
|
||
# if dPath[-1] == '/':
|
||
# dPath = dPath[:-1]
|
||
func_zip(zip_filename=dPath,items=sPath,zip_type=zip_type)
|
||
return True,"压缩成功",2000,None
|
||
elif type == 'unzip':
|
||
dPath = param.get('path',"")
|
||
sPath = param.get('spath',"")
|
||
zip_type = param.get('zip_type',"")
|
||
if not dPath or not sPath:
|
||
return False,"参数错误",4000,None
|
||
dPath = dPath.replace('//', '/')
|
||
func_unzip(zip_filename=sPath,extract_path=dPath)
|
||
return True,"解压成功",2000,None
|
||
elif type == 'pms':
|
||
pass
|
||
elif type == 'del':
|
||
sPath = ast_convert(param.get('spath',[]))
|
||
for sf in sPath:
|
||
if os.path.isdir(sf):
|
||
delete_dir(path=sf,is_windows=is_windows)
|
||
else:
|
||
delete_file(path=sf,is_windows=is_windows)
|
||
return True,"批量删除成功",2000,None
|
||
else:
|
||
return False,"类型错误",4000,None
|
||
|
||
|
||
def func_zip(zip_filename,items,zip_type):
|
||
"""
|
||
@name 压缩
|
||
@author lybbn<2024-03-07>
|
||
@param zip_filename 压缩后的文件名(含路径)
|
||
@param items 需要压缩的文件或目录列表['/var/log/ruyi.log']
|
||
@param zip_type 压缩类型(tar 格式.tar.gz、zip 格式 .zip)
|
||
"""
|
||
if not items:
|
||
raise ValueError("需要压缩的文件或目录不能为空")
|
||
if zip_type == "zip":
|
||
zip_directories_and_files(zip_filename,items)
|
||
elif zip_type == "tar":
|
||
create_tar_gz(zip_filename,items)
|
||
else:
|
||
raise ValueError("不支持的压缩格式")
|
||
|
||
def func_unzip(zip_filename,extract_path):
|
||
"""
|
||
@name 解压
|
||
@author lybbn<2024-03-07>
|
||
@param zip_filename 压缩文件名(含路径)
|
||
@param extract_path 需要解压的目标目录
|
||
"""
|
||
if current_os == "windows":
|
||
#解除占用
|
||
from utils.server.windows import kill_cmd_if_working_dir
|
||
kill_cmd_if_working_dir(extract_path)
|
||
_, ext = os.path.splitext(zip_filename)
|
||
if ext in ['.tar.gz','.tgz','.tar.bz2','.tbz']:
|
||
with tarfile.open(zip_filename, 'r') as tar:
|
||
tar.extractall(extract_path)
|
||
elif ext == '.zip':
|
||
with zipfile.ZipFile(zip_filename, 'r') as zipf:
|
||
zipf.extractall(extract_path)
|
||
else:
|
||
raise ValueError("不支持的文件格式")
|
||
|
||
|
||
def zip_directories_and_files(zip_filename, items):
|
||
with zipfile.ZipFile(zip_filename, 'w') as zipf:
|
||
for item in items:
|
||
if os.path.isfile(item):
|
||
zipf.write(item, os.path.basename(item))
|
||
elif os.path.isdir(item):
|
||
for root, _, files in os.walk(item):
|
||
for file in files:
|
||
file_path = os.path.join(root, file)
|
||
zipf.write(file_path, os.path.relpath(file_path, os.path.dirname(item)))
|
||
|
||
def create_tar_gz(tar_filename, items):
|
||
with tarfile.open(tar_filename, "w:gz") as tar:
|
||
for item in items:
|
||
if os.path.isfile(item):
|
||
tar.add(item, arcname=os.path.basename(item))
|
||
elif os.path.isdir(item):
|
||
tar.add(item, arcname=os.path.basename(item)) |