pytest/errortest.py
2025-05-08 18:20:55 +08:00

280 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv
import os
import tempfile
import pymysql
from pywebio.input import file_upload
from pywebio.output import put_text, put_table, put_processbar, set_processbar, put_buttons
from pywebio.session import download
# MySQL数据库配置
DB_CONFIG = {
'host': 'localhost',
'user': 'testDB',
'password': 'WxMKtsNG8BRmWxDX',
'database': 'testdb',
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor
}
class CSVProcessor:
def __init__(self):
try:
print("正在连接数据库...")
print(f"数据库配置信息: 主机={DB_CONFIG['host']}, 数据库={DB_CONFIG['database']}")
# 首先尝试连接MySQL服务器不指定数据库
try:
print("尝试连接MySQL服务器...")
config_without_db = DB_CONFIG.copy()
del config_without_db['database']
print("connecting to MySQL server...")
# 添加连接超时设置
config_without_db['connect_timeout'] = 10
temp_conn = pymysql.connect(**config_without_db)
print("成功连接到MySQL服务器")
temp_cursor = temp_conn.cursor()
temp_cursor.close()
temp_conn.close()
print("成功连接MySQL服务器")
except pymysql.Error as e:
print(str(e))
raise e
# 连接指定的数据库
self.conn = pymysql.connect(**DB_CONFIG)
self.cursor = self.conn.cursor()
print("数据库连接成功")
except pymysql.Error as e:
error_msg = f"数据库连接失败: {str(e)}"
if "Can't connect to MySQL server" in str(e):
error_msg = "\n错误MySQL服务未运行\n解决方案:\n1. 以管理员身份打开命令提示符\n2. 输入命令 'net start MySQL80' 启动服务\n3. 如果启动失败请检查MySQL服务是否正确安装"
elif e.args[0] == 2003:
error_msg += "\n解决方案:\n1. 检查MySQL服务是否启动\n2. 确认主机名是否正确当前为localhost\n3. 检查防火墙设置是否允许MySQL连接"
elif e.args[0] == 1045:
error_msg += "\n解决方案:\n1. 验证用户名和密码是否正确\n2. 确认该用户是否有权限访问数据库\n3. 如忘记密码,请联系数据库管理员重置"
elif e.args[0] == 1049:
error_msg += "\n解决方案:\n1. 确认数据库名称是否正确当前为testdb\n2. 使用以下SQL命令创建数据库\n CREATE DATABASE testdb;"
print(error_msg)
put_text(error_msg) # 在Web界面显示错误信息
self.conn = None
self.cursor = None
def upload_csv(self):
try:
print("等待用户上传CSV文件...")
upfile = file_upload("请选择CSV文件:", accept=".csv", multiple=False)
if not upfile:
put_text("请上传CSV文件")
print("用户未选择文件")
return
file_content = upfile['content']
file_name = upfile['filename']
print(f"文件已上传: {file_name}")
# 将上传的文件内容保存到临时文件
import tempfile
import os
temp_dir = tempfile.gettempdir()
temp_file_path = os.path.join(temp_dir, file_name)
with open(temp_file_path, 'wb') as f:
f.write(file_content)
self.process_csv(temp_file_path)
except Exception as e:
error_msg = f"文件上传错误: {str(e)}"
put_text(error_msg)
print(error_msg)
def process_csv(self, file_name):
try:
print("开始处理CSV文件...")
# 读取CSV文件
data = self.read_csv(file_name)
# 显示结果
if data:
success_msg = f"文件读取成功,共{len(data)}行数据"
put_text("文件读取成功")
put_table([['文件名', '行数'],
[file_name, len(data)]])
print(success_msg)
# 处理充电枪编码数据
if len(data) > 1: # 确保有数据行
self.process_gun_codes(data)
except Exception as e:
error_msg = f"数据处理错误: {str(e)}"
put_text(error_msg)
print(error_msg)
def read_csv(self, filename):
try:
print(f"正在读取文件: {filename}")
if not os.path.exists(filename):
raise FileNotFoundError(f"文件不存在: {filename}")
data = []
encodings = ['utf-8', 'gbk', 'gb2312', 'utf-16']
for encoding in encodings:
try:
with open(filename, 'r', encoding=encoding) as file:
print(f"尝试使用 {encoding} 编码读取文件...")
csv_reader = csv.reader(file)
row_count = 0
data = []
# 读取第一行以验证CSV格式
header = next(csv_reader, None)
if not header:
raise ValueError("CSV文件为空")
data.append(header)
print(f"CSV文件头: {header}")
for row in csv_reader:
if not any(row): # 跳过空行
continue
data.append(row)
row_count += 1
if row_count % 100 == 0:
print(f"已读取 {row_count} 行...")
print(f"文件读取完成,使用 {encoding} 编码成功")
return data
except UnicodeDecodeError:
print(f"{encoding} 编码读取失败,尝试其他编码...")
continue
except Exception as e:
raise Exception(f"使用 {encoding} 编码读取时出错: {str(e)}")
raise Exception("尝试所有编码都失败,无法读取文件")
except Exception as e:
error_msg = f"读取文件 {filename} 时出错: {str(e)}"
put_text(error_msg)
print(error_msg)
return None
def process_gun_codes(self, data):
if not self.cursor:
put_text("数据库未连接,无法处理数据")
return
try:
# 假设CSV第一列是充电枪编码
gun_codes = [row[1] for row in data[1:]] # 跳过标题行
results = [] # 存储查询结果
total_codes = len(gun_codes)
# 创建进度条
put_processbar('progress', auto_close=True)
# 处理每个充电枪编码
for index, each_code in enumerate(gun_codes):
sql_query = """
SELECT max(temp_count) AS count1
FROM (
SELECT
a.实际支付,
@count := IF(@met_one = 0 AND a.实际支付 = 0, @count + 1, @count) AS temp_count,
@met_one := IF(a.实际支付 != 0 OR @met_one = 1, 1, 0) AS temp_met
FROM testdb.dingdan2025 a
CROSS JOIN (SELECT @count := 0, @met_one := 0) AS vars
where a.充电终端 = %s
ORDER BY a.下单时间 DESC
) AS subquery
"""
try:
print(f"执行SQL查询充电枪编码: {each_code}")
print(f"SQL语句: {sql_query}")
print(f"参数值: {each_code}")
# 执行查询
self.cursor.execute(sql_query, (each_code,))
result = self.cursor.fetchone()
print(f"查询结果: {result}")
if result is None:
print(f"警告:充电枪编码 {each_code} 未找到匹配记录")
# 更新进度条
set_processbar('progress', (index + 1) / total_codes)
except pymysql.Error as e:
error_msg = f"SQL执行错误 (充电枪编码: {each_code}): {str(e)}"
print(error_msg)
put_text(error_msg)
if e.args[0] == 1146: # 表不存在错误
print("错误:表'dingdan2025'不存在,请确认数据库表是否正确创建")
raise e
results.append(result['count1'] if result and result['count1'] is not None else 0)
# 准备表格数据
headers = data[0] + ['连续异常积累单数']
table_data = [headers] # 添加表头
# 合并CSV数据和查询结果
for i, row in enumerate(data[1:]):
table_data.append(row + [str(results[i])])
# 在Web界面显示结果
put_text("数据分析结果:")
put_table(table_data)
# 准备下载的CSV数据
output_csv = tempfile.NamedTemporaryFile(mode='w', newline='', suffix='.csv', delete=False, encoding='utf-8-sig')
csv_writer = csv.writer(output_csv)
csv_writer.writerows(table_data)
output_csv.close()
# 添加下载按钮
def download_callback():
with open(output_csv.name, 'rb') as f:
content = f.read()
download("分析结果.csv", content)
os.unlink(output_csv.name) # 删除临时文件
put_buttons(['下载分析结果'], onclick=[download_callback])
print("数据处理完成")
except Exception as e:
error_msg = f"处理充电枪编码时出错: {str(e)}"
put_text(error_msg)
print(error_msg)
def __del__(self):
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()
print("数据库连接已关闭")
if __name__ == '__main__':
try:
print("正在初始化应用...")
app = CSVProcessor()
from pywebio.platform.tornado_http import start_server
print("正在启动Web服务器...")
start_server(
applications=app.upload_csv,
port=8077,
host='localhost',
debug=False,
cdn=False,
auto_open_webbrowser=True
)
print("Web服务器启动成功请访问 http://localhost:8077")
except ImportError as e:
print(f"错误缺少必要的库请确保已安装pywebio。错误信息{e}")
except OSError as e:
if "address already in use" in str(e).lower():
print("错误端口8077已被占用请关闭占用该端口的程序后重试。")
else:
print(f"错误:服务器启动失败。错误信息:{e}")
except Exception as e:
print(f"错误:服务器启动失败。错误信息:{e}")
print("提示请检查端口8077是否被占用或尝试更换其他端口。")