479 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/bin/python
# coding: utf-8
# +-------------------------------------------------------------------
# | django-vue-lyadmin
# +-------------------------------------------------------------------
# | Author: lybbn
# +-------------------------------------------------------------------
# | QQ: 1042594286
# +-------------------------------------------------------------------
# ------------------------------
# windows系统命令工具类封装
# ------------------------------
import sys
import platform
import os, time
import psutil
import winreg
from config import EXEC_LOG_PATH, TEMP_EXEC_PATH
from django.core.cache import cache
from pathlib import Path
import win32security
def ReadReg(path, key):
"""
读取注册表
@path 注册表路径
@key 注册表键值
"""
import winreg
try:
newKey = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, path)
value, type = winreg.QueryValueEx(newKey, key)
return value
except:
return False
def get_mac_address():
"""
获取mac
"""
import uuid
mac = uuid.UUID(int=uuid.getnode()).hex[-12:]
return ":".join([mac[e:e + 2] for e in range(0, 11, 2)])
def md5(strings):
"""
@name 生成md5
@param strings 要被处理的字符串
@return string(32)
"""
if type(strings) != bytes:
strings = strings.encode()
import hashlib
m = hashlib.md5()
m.update(strings)
return m.hexdigest()
def to_size(size):
"""
字节单位转换
@size 字节大小
return 返回带单位的格式(如1 GB)
"""
if not size: return '0.00 b'
size = float(size)
d = ('b', 'KB', 'MB', 'GB', 'TB')
s = d[0]
for b in d:
if size < 1024: return ("%.2f" % size) + ' ' + b
size = size / 1024
s = b
return ("%.2f" % size) + ' ' + b
def is_64bitos():
"""
判断是否x64系统(windows\linux都适用)
利用platform.uname()
windows:uname_result(system='Windows', node='xxxxx', release='10', version='10.0.19042', machine='AMD64')
linux:uname_result(system='Linux', node='xxxx', release='5.10.134-12.2.al8.x86_64', version='#1 SMP Thu Oct 27 10:07:15 CST 2022', machine='x86_64', processor='x86_64')
"""
info = platform.uname()
return info.machine.endswith('64')
def get_registry_value(key, subkey, value):
"""
读取注册表信息
@key 注册表类型
@subkey 注册表路径
@value 注册表具体key值
"""
key = getattr(winreg, key)
handle = winreg.OpenKey(key, subkey)
(value, type) = winreg.QueryValueEx(handle, value)
return value
def WriteLog(logMsg):
"""
写入LOG日志
"""
try:
with open(EXEC_LOG_PATH, 'w+') as f:
f.write(logMsg)
f.close()
except:
pass
def GetSystemVersion():
"""
取操作系统版本
"""
try:
key = 'lybbn_sys_version'
version = cache.get(key)
if version: return version
bit = 'x86'
if is_64bitos(): bit = 'x64'
def get(key):
return get_registry_value("HKEY_LOCAL_MACHINE", "SOFTWARE\\Microsoft\\Windows NT\\CurrentVersion", key)
os = get("ProductName")
build = get("CurrentBuildNumber")
version = "%s (build %s) %s (Py%s)" % (os, build, bit, platform.python_version())
cache.set(key, version, 10000)
return version
except Exception as ex:
WriteLog('获取系统版', '获取系统版本失败,注册表无法打开,报错信息:%s'%(str(ex)))
version = "未知系统版本"
cache.set(key, version, 10000)
return version
def GetLoadAverage():
"""
取负载信息
"""
data = {}
data['one'] = 0
data['five'] = 0
data['fifteen'] = 0
data['max'] = psutil.cpu_count() * 2
data['limit'] = data['max']
data['safe'] = data['max'] * 0.75
data['percent'] = 0
return data
def GetMemInfo():
"""
取内存信息
"""
mem = psutil.virtual_memory()
memInfo = {}
memInfo['percent'] = mem.percent
memInfo['total'] = round(float(mem.total) / 1024 / 1024 / 1024, 2)
memInfo['free'] = round(float(mem.free) / 1024 / 1024 / 1024, 2)
memInfo['used'] = round(float(mem.used) / 1024 / 1024 / 1024, 2)
return memInfo
def get_disk_iostat():
"""
获取磁盘IO
"""
iokey = 'iostat'
diskio = cache.get(iokey)
mtime = int(time.time())
if not diskio:
diskio = {}
diskio['info'] = None
diskio['time'] = mtime
diskio_1 = diskio['info']
stime = mtime - diskio['time']
if not stime: stime = 1
diskInfo = {}
diskInfo['ALL'] = {}
diskInfo['ALL']['read_count'] = 0
diskInfo['ALL']['write_count'] = 0
diskInfo['ALL']['read_bytes'] = 0
diskInfo['ALL']['write_bytes'] = 0
diskInfo['ALL']['read_time'] = 0
diskInfo['ALL']['write_time'] = 0
diskInfo['ALL']['read_merged_count'] = 0
diskInfo['ALL']['write_merged_count'] = 0
try:
diskio_2 = psutil.disk_io_counters(perdisk=True)
if not diskio_1:
diskio_1 = diskio_2
i = 0
for disk_name in diskio_2.keys():
disk_name_alias = "磁盘"+str(0)
diskInfo[disk_name_alias] = {}
diskInfo[disk_name_alias]['read_count'] = int((diskio_2[disk_name].read_count - diskio_1[disk_name].read_count) / stime)
diskInfo[disk_name_alias]['write_count'] = int((diskio_2[disk_name].write_count - diskio_1[disk_name].write_count) / stime)
diskInfo[disk_name_alias]['read_bytes'] = int((diskio_2[disk_name].read_bytes - diskio_1[disk_name].read_bytes) / stime)
diskInfo[disk_name_alias]['write_bytes'] = int((diskio_2[disk_name].write_bytes - diskio_1[disk_name].write_bytes) / stime)
diskInfo[disk_name_alias]['read_time'] = int((diskio_2[disk_name].read_time - diskio_1[disk_name].read_time) / stime)
diskInfo[disk_name_alias]['write_time'] = int((diskio_2[disk_name].write_time - diskio_1[disk_name].write_time) / stime)
diskInfo[disk_name_alias]['read_merged_count'] = 0
diskInfo[disk_name_alias]['write_merged_count'] = 0
diskInfo['ALL']['read_count'] += diskInfo[disk_name_alias]['read_count']
diskInfo['ALL']['write_count'] += diskInfo[disk_name_alias]['write_count']
diskInfo['ALL']['read_bytes'] += diskInfo[disk_name_alias]['read_bytes']
diskInfo['ALL']['write_bytes'] += diskInfo[disk_name_alias]['write_bytes']
if diskInfo['ALL']['read_time'] < diskInfo[disk_name_alias]['read_time']:
diskInfo['ALL']['read_time'] = diskInfo[disk_name_alias]['read_time']
if diskInfo['ALL']['write_time'] < diskInfo[disk_name_alias]['write_time']:
diskInfo['ALL']['write_time'] = diskInfo[disk_name_alias]['write_time']
diskInfo['ALL']['read_merged_count'] += diskInfo[disk_name_alias]['read_merged_count']
diskInfo['ALL']['write_merged_count'] += diskInfo[disk_name_alias]['write_merged_count']
i = i+1
cache.set(iokey, {'info': diskio_2, 'time': mtime})
except:
return diskInfo
return diskInfo
def GetNetWork():
"""
获取网卡信息
"""
cache_timeout = 86400
otime = cache.get("otime")
ntime = time.time()
networkInfo = {}
networkInfo['network'] = {}
networkInfo['upTotal'] = 0
networkInfo['downTotal'] = 0
networkInfo['up'] = 0
networkInfo['down'] = 0
networkInfo['downPackets'] = 0
networkInfo['upPackets'] = 0
networkIo_list = psutil.net_io_counters(pernic=True)
for net_key in networkIo_list.keys():
if net_key.find('Loopback') >= 0 or net_key.find('Teredo') >= 0 or net_key.find('isatap') >= 0: continue
networkIo = networkIo_list[net_key][:4]
up_key = "{}_up".format(net_key)
down_key = "{}_down".format(net_key)
otime_key = "otime"
if not otime:
otime = time.time()
cache.set(up_key, networkIo[0], cache_timeout)
cache.set(down_key, networkIo[1], cache_timeout)
cache.set(otime_key, otime, cache_timeout)
networkInfo['network'][net_key] = {}
up = cache.get(up_key)
down = cache.get(down_key)
if not up:
up = networkIo[0]
if not down:
down = networkIo[1]
networkInfo['network'][net_key]['upTotal'] = networkIo[0]
networkInfo['network'][net_key]['downTotal'] = networkIo[1]
try:
networkInfo['network'][net_key]['up'] = round(float(networkIo[0] - up) / 1024 / (ntime - otime), 2)
networkInfo['network'][net_key]['down'] = round(float(networkIo[1] - down) / 1024 / (ntime - otime), 2)
except:
networkInfo['up'] = 0
networkInfo['down'] = 0
networkInfo['network'][net_key]['up'] = 0
networkInfo['network'][net_key]['down'] = 0
networkInfo['network'][net_key]['downPackets'] = networkIo[3]
networkInfo['network'][net_key]['upPackets'] = networkIo[2]
networkInfo['upTotal'] += networkInfo['network'][net_key]['upTotal']
networkInfo['downTotal'] += networkInfo['network'][net_key]['downTotal']
networkInfo['up'] += networkInfo['network'][net_key]['up']
networkInfo['down'] += networkInfo['network'][net_key]['down']
networkInfo['downPackets'] += networkInfo['network'][net_key]['downPackets']
networkInfo['upPackets'] += networkInfo['network'][net_key]['upPackets']
cache.set(up_key, networkIo[0], cache_timeout)
cache.set(down_key, networkIo[1], cache_timeout)
cache.set(otime_key, time.time(), cache_timeout)
networkInfo['up'] = round(float(networkInfo['up']), 2)
networkInfo['down'] = round(float(networkInfo['down']), 2)
networkInfo['iostat'] = get_disk_iostat()
return networkInfo
def GetBootTime():
"""
取系统启动时间
"""
key = 'lybbn_sys_time'
sys_time = cache.get(key)
if sys_time: return sys_time
import math
tStr = time.time() - psutil.boot_time()
min = tStr / 60
hours = min / 60
days = math.floor(hours / 24)
hours = math.floor(hours - (days * 24))
min = math.floor(min - (days * 60 * 24) - (hours * 60))
sys_time = "{}".format(int(days))
cache.set(key, sys_time, 1800)
return sys_time
def GetCpuInfo(interval=1):
"""
取CPU详细信息
"""
cpuCount = cache.get('lybbn_cpu_cpuCount')
if not cpuCount:
cpuCount = psutil.cpu_count()
cache.set('lybbn_cpu_cpuCount', cpuCount, 86400)
cpuNum = cache.get('lybbn_cpu_cpuNum')
if not cpuNum:
cpuNum = psutil.cpu_count(logical=False)
cache.set('lybbn_cpu_cpuNum', cpuNum, 86400)
# used = cache.get('lybbn_cpu_used')
# if not used:
# used = psutil.cpu_percent(interval)
# cache.set('lybbn_cpu_used',used,20)
used_all = cache.get('lybbn_cpu_used_all')
if not used_all:
used_all = psutil.cpu_percent(percpu=True)
used_total = 0
for x in used_all: used_total += x
cpuW = cache.get('lybbn_cpu_cpuW')
if not cpuW:
ret = os.popen('wmic cpu get NumberOfCores').read()
cpuW = 0
arrs = ret.strip().split('\n\n')
for x in arrs:
val = x.strip()
if not val: continue
try:
val = int(val)
cpuW += 1
except:
pass
cache.set('lybbn_cpu_cpuW', cpuW, 86400)
cpu_name = cache.get('lybbn_cpu_cpu_name')
if not cpu_name:
try:
cpu_name = '{} * {}'.format(
ReadReg(r'HARDWARE\DESCRIPTION\System\CentralProcessor\0', 'ProcessorNameString').strip(), cpuW)
except:
cpu_name = ''
cache.set('lybbn_cpu_cpu_name', cpu_name, 86400)
tmp = 0
if cpuW:
tmp = cpuNum / cpuW
used = 0
if used_total:
used = round(used_total / cpuCount, 2)
return used, cpuCount, used_all, cpu_name, tmp, cpuW
def GetDiskInfo():
"""
取磁盘分区信息
"""
key = 'lybbn_sys_disk'
diskInfo = cache.get(key)
if diskInfo: return diskInfo
try:
diskIo = psutil.disk_partitions()
except:
import string
diskIo = []
for c in string.ascii_uppercase:
disk = c + ':'
if os.path.isdir(disk):
data = {}
data['mountpoint'] = disk + '/'
diskIo.append(data)
diskInfo = []
for disk in diskIo:
try:
tmp = {}
tmp['path'] = disk.mountpoint.replace("\\", "/")
usage = psutil.disk_usage(disk.mountpoint)
tmp['size'] = [to_size(usage.total), to_size(usage.used), to_size(usage.free), usage.percent]
tmp['inodes'] = False
diskInfo.append(tmp)
except:
pass
cache.set(key, diskInfo, 10)
return diskInfo
def GetFileLastNumsLines(path,num=1000):
"""
获取指定文件的指定尾行数内容
"""
try:
if not os.path.exists(path): return ""
filesize = os.path.getsize(path)
if filesize == 0: return ""
num = int(num)
total_lines_wanted = num
BLOCK_SIZE = 4096
with open(path,"rb") as f:
f.seek(0, 2)
block_end_byte = f.tell()
lines_to_go = total_lines_wanted
block_number = -1
blocks = []
while lines_to_go > 0 and block_end_byte > 0:
if (block_end_byte - BLOCK_SIZE > 0):
f.seek(block_number*BLOCK_SIZE, 2)
blocks.append(f.read(BLOCK_SIZE))
else:
f.seek(0,0)
blocks.append(f.read(block_end_byte))
lines_found = blocks[-1].count(b'\n')
lines_to_go -= lines_found
block_end_byte -= BLOCK_SIZE
block_number -= 1
all_read_text = b''.join(reversed(blocks))
return b'\n'.join(all_read_text.splitlines()[-total_lines_wanted:])
except:
return ""
def RestartServer():
"""
重启系统
"""
try:
os.system("shutdown /r /f /t 0")
except:
pass
def GetUidName(file_path,uid=0):
"""
通过系统uid获取对应名称
"""
#如果运行GetUidName在file_path所在的分区可能导致文件占用导致获取失败
try:
security_descriptor = win32security.GetFileSecurity(file_path, win32security.OWNER_SECURITY_INFORMATION)
owner_sid = security_descriptor.GetSecurityDescriptorOwner()
owner_name, domain_name, type = win32security.LookupAccountSid(None, owner_sid)
return owner_name
except Exception as e:
return ""
def GetGroupidName(file_path,gid=0):
"""
通过系统goup id所属组id获取对应名称
"""
try:
group_sid = win32security.GetFileSecurity(file_path, win32security.GROUP_SECURITY_INFORMATION).GetSecurityDescriptorGroup()
group_name, domain, type = win32security.LookupAccountSid(None, group_sid)
return group_name
except Exception as e:
return ""