280 lines
12 KiB
Python
280 lines
12 KiB
Python
import csv
|
||
import os
|
||
import tempfile
|
||
import pymysql
|
||
from pywebio.input import file_upload
|
||
from pywebio.output import put_text, put_table, put_processbar, set_processbar, put_buttons
|
||
from pywebio.session import download
|
||
|
||
# MySQL数据库配置
|
||
DB_CONFIG = {
|
||
'host': 'localhost',
|
||
'user': 'testDB',
|
||
'password': 'WxMKtsNG8BRmWxDX',
|
||
'database': 'testdb',
|
||
'charset': 'utf8mb4',
|
||
'cursorclass': pymysql.cursors.DictCursor
|
||
}
|
||
|
||
class CSVProcessor:
|
||
def __init__(self):
|
||
try:
|
||
print("正在连接数据库...")
|
||
print(f"数据库配置信息: 主机={DB_CONFIG['host']}, 数据库={DB_CONFIG['database']}")
|
||
|
||
# 首先尝试连接MySQL服务器(不指定数据库)
|
||
try:
|
||
print("尝试连接MySQL服务器...")
|
||
config_without_db = DB_CONFIG.copy()
|
||
del config_without_db['database']
|
||
print("connecting to MySQL server...")
|
||
# 添加连接超时设置
|
||
config_without_db['connect_timeout'] = 10
|
||
temp_conn = pymysql.connect(**config_without_db)
|
||
print("成功连接到MySQL服务器")
|
||
temp_cursor = temp_conn.cursor()
|
||
|
||
temp_cursor.close()
|
||
temp_conn.close()
|
||
print("成功连接MySQL服务器")
|
||
except pymysql.Error as e:
|
||
print(str(e))
|
||
raise e
|
||
|
||
# 连接指定的数据库
|
||
self.conn = pymysql.connect(**DB_CONFIG)
|
||
self.cursor = self.conn.cursor()
|
||
print("数据库连接成功")
|
||
except pymysql.Error as e:
|
||
error_msg = f"数据库连接失败: {str(e)}"
|
||
if "Can't connect to MySQL server" in str(e):
|
||
error_msg = "\n错误:MySQL服务未运行\n解决方案:\n1. 以管理员身份打开命令提示符\n2. 输入命令 'net start MySQL80' 启动服务\n3. 如果启动失败,请检查MySQL服务是否正确安装"
|
||
elif e.args[0] == 2003:
|
||
error_msg += "\n解决方案:\n1. 检查MySQL服务是否启动\n2. 确认主机名是否正确(当前为localhost)\n3. 检查防火墙设置是否允许MySQL连接"
|
||
elif e.args[0] == 1045:
|
||
error_msg += "\n解决方案:\n1. 验证用户名和密码是否正确\n2. 确认该用户是否有权限访问数据库\n3. 如忘记密码,请联系数据库管理员重置"
|
||
elif e.args[0] == 1049:
|
||
error_msg += "\n解决方案:\n1. 确认数据库名称是否正确(当前为testdb)\n2. 使用以下SQL命令创建数据库:\n CREATE DATABASE testdb;"
|
||
print(error_msg)
|
||
put_text(error_msg) # 在Web界面显示错误信息
|
||
self.conn = None
|
||
self.cursor = None
|
||
|
||
def upload_csv(self):
|
||
try:
|
||
print("等待用户上传CSV文件...")
|
||
upfile = file_upload("请选择CSV文件:", accept=".csv", multiple=False)
|
||
if not upfile:
|
||
put_text("请上传CSV文件")
|
||
print("用户未选择文件")
|
||
return
|
||
|
||
file_content = upfile['content']
|
||
file_name = upfile['filename']
|
||
print(f"文件已上传: {file_name}")
|
||
|
||
# 将上传的文件内容保存到临时文件
|
||
import tempfile
|
||
import os
|
||
|
||
temp_dir = tempfile.gettempdir()
|
||
temp_file_path = os.path.join(temp_dir, file_name)
|
||
|
||
with open(temp_file_path, 'wb') as f:
|
||
f.write(file_content)
|
||
|
||
self.process_csv(temp_file_path)
|
||
except Exception as e:
|
||
error_msg = f"文件上传错误: {str(e)}"
|
||
put_text(error_msg)
|
||
print(error_msg)
|
||
|
||
def process_csv(self, file_name):
|
||
try:
|
||
print("开始处理CSV文件...")
|
||
# 读取CSV文件
|
||
data = self.read_csv(file_name)
|
||
|
||
# 显示结果
|
||
if data:
|
||
success_msg = f"文件读取成功,共{len(data)}行数据"
|
||
put_text("文件读取成功")
|
||
put_table([['文件名', '行数'],
|
||
[file_name, len(data)]])
|
||
print(success_msg)
|
||
|
||
# 处理充电枪编码数据
|
||
if len(data) > 1: # 确保有数据行
|
||
self.process_gun_codes(data)
|
||
except Exception as e:
|
||
error_msg = f"数据处理错误: {str(e)}"
|
||
put_text(error_msg)
|
||
print(error_msg)
|
||
|
||
def read_csv(self, filename):
|
||
try:
|
||
print(f"正在读取文件: {filename}")
|
||
if not os.path.exists(filename):
|
||
raise FileNotFoundError(f"文件不存在: {filename}")
|
||
|
||
data = []
|
||
encodings = ['utf-8', 'gbk', 'gb2312', 'utf-16']
|
||
|
||
for encoding in encodings:
|
||
try:
|
||
with open(filename, 'r', encoding=encoding) as file:
|
||
print(f"尝试使用 {encoding} 编码读取文件...")
|
||
csv_reader = csv.reader(file)
|
||
row_count = 0
|
||
data = []
|
||
|
||
# 读取第一行以验证CSV格式
|
||
header = next(csv_reader, None)
|
||
if not header:
|
||
raise ValueError("CSV文件为空")
|
||
data.append(header)
|
||
print(f"CSV文件头: {header}")
|
||
|
||
for row in csv_reader:
|
||
if not any(row): # 跳过空行
|
||
continue
|
||
data.append(row)
|
||
row_count += 1
|
||
if row_count % 100 == 0:
|
||
print(f"已读取 {row_count} 行...")
|
||
|
||
print(f"文件读取完成,使用 {encoding} 编码成功")
|
||
return data
|
||
except UnicodeDecodeError:
|
||
print(f"{encoding} 编码读取失败,尝试其他编码...")
|
||
continue
|
||
except Exception as e:
|
||
raise Exception(f"使用 {encoding} 编码读取时出错: {str(e)}")
|
||
|
||
raise Exception("尝试所有编码都失败,无法读取文件")
|
||
except Exception as e:
|
||
error_msg = f"读取文件 {filename} 时出错: {str(e)}"
|
||
put_text(error_msg)
|
||
print(error_msg)
|
||
return None
|
||
|
||
def process_gun_codes(self, data):
|
||
if not self.cursor:
|
||
put_text("数据库未连接,无法处理数据")
|
||
return
|
||
|
||
try:
|
||
# 假设CSV第一列是充电枪编码
|
||
gun_codes = [row[1] for row in data[1:]] # 跳过标题行
|
||
results = [] # 存储查询结果
|
||
total_codes = len(gun_codes)
|
||
|
||
# 创建进度条
|
||
put_processbar('progress', auto_close=True)
|
||
|
||
# 处理每个充电枪编码
|
||
for index, each_code in enumerate(gun_codes):
|
||
sql_query = """
|
||
SELECT max(temp_count) AS count1
|
||
FROM (
|
||
SELECT
|
||
a.实际支付,
|
||
@count := IF(@met_one = 0 AND a.实际支付 = 0, @count + 1, @count) AS temp_count,
|
||
@met_one := IF(a.实际支付 != 0 OR @met_one = 1, 1, 0) AS temp_met
|
||
FROM testdb.dingdan2025 a
|
||
CROSS JOIN (SELECT @count := 0, @met_one := 0) AS vars
|
||
where a.充电终端 = %s
|
||
ORDER BY a.下单时间 DESC
|
||
) AS subquery
|
||
"""
|
||
|
||
try:
|
||
print(f"执行SQL查询,充电枪编码: {each_code}")
|
||
print(f"SQL语句: {sql_query}")
|
||
print(f"参数值: {each_code}")
|
||
|
||
# 执行查询
|
||
self.cursor.execute(sql_query, (each_code,))
|
||
result = self.cursor.fetchone()
|
||
print(f"查询结果: {result}")
|
||
|
||
if result is None:
|
||
print(f"警告:充电枪编码 {each_code} 未找到匹配记录")
|
||
|
||
# 更新进度条
|
||
set_processbar('progress', (index + 1) / total_codes)
|
||
except pymysql.Error as e:
|
||
error_msg = f"SQL执行错误 (充电枪编码: {each_code}): {str(e)}"
|
||
print(error_msg)
|
||
put_text(error_msg)
|
||
if e.args[0] == 1146: # 表不存在错误
|
||
print("错误:表'dingdan2025'不存在,请确认数据库表是否正确创建")
|
||
raise e
|
||
results.append(result['count1'] if result and result['count1'] is not None else 0)
|
||
|
||
# 准备表格数据
|
||
headers = data[0] + ['连续异常积累单数']
|
||
table_data = [headers] # 添加表头
|
||
|
||
# 合并CSV数据和查询结果
|
||
for i, row in enumerate(data[1:]):
|
||
table_data.append(row + [str(results[i])])
|
||
|
||
# 在Web界面显示结果
|
||
put_text("数据分析结果:")
|
||
put_table(table_data)
|
||
|
||
# 准备下载的CSV数据
|
||
output_csv = tempfile.NamedTemporaryFile(mode='w', newline='', suffix='.csv', delete=False, encoding='utf-8-sig')
|
||
csv_writer = csv.writer(output_csv)
|
||
csv_writer.writerows(table_data)
|
||
output_csv.close()
|
||
|
||
# 添加下载按钮
|
||
def download_callback():
|
||
with open(output_csv.name, 'rb') as f:
|
||
content = f.read()
|
||
download("分析结果.csv", content)
|
||
os.unlink(output_csv.name) # 删除临时文件
|
||
|
||
put_buttons(['下载分析结果'], onclick=[download_callback])
|
||
|
||
print("数据处理完成")
|
||
|
||
except Exception as e:
|
||
error_msg = f"处理充电枪编码时出错: {str(e)}"
|
||
put_text(error_msg)
|
||
print(error_msg)
|
||
|
||
def __del__(self):
|
||
if self.cursor:
|
||
self.cursor.close()
|
||
if self.conn:
|
||
self.conn.close()
|
||
print("数据库连接已关闭")
|
||
|
||
if __name__ == '__main__':
|
||
try:
|
||
print("正在初始化应用...")
|
||
app = CSVProcessor()
|
||
from pywebio.platform.tornado_http import start_server
|
||
print("正在启动Web服务器...")
|
||
start_server(
|
||
applications=app.upload_csv,
|
||
port=8077,
|
||
host='localhost',
|
||
debug=False,
|
||
cdn=False,
|
||
auto_open_webbrowser=True
|
||
)
|
||
print("Web服务器启动成功,请访问 http://localhost:8077")
|
||
except ImportError as e:
|
||
print(f"错误:缺少必要的库,请确保已安装pywebio。错误信息:{e}")
|
||
except OSError as e:
|
||
if "address already in use" in str(e).lower():
|
||
print("错误:端口8077已被占用,请关闭占用该端口的程序后重试。")
|
||
else:
|
||
print(f"错误:服务器启动失败。错误信息:{e}")
|
||
except Exception as e:
|
||
print(f"错误:服务器启动失败。错误信息:{e}")
|
||
print("提示:请检查端口8077是否被占用,或尝试更换其他端口。") |