import csv import os import tempfile import pymysql from pywebio.input import file_upload from pywebio.output import put_text, put_table, put_processbar, set_processbar, put_buttons from pywebio.session import download # MySQL数据库配置 DB_CONFIG = { 'host': 'localhost', 'user': 'testDB', 'password': 'WxMKtsNG8BRmWxDX', 'database': 'testdb', 'charset': 'utf8mb4', 'cursorclass': pymysql.cursors.DictCursor } class CSVProcessor: def __init__(self): try: print("正在连接数据库...") print(f"数据库配置信息: 主机={DB_CONFIG['host']}, 数据库={DB_CONFIG['database']}") # 首先尝试连接MySQL服务器(不指定数据库) try: print("尝试连接MySQL服务器...") config_without_db = DB_CONFIG.copy() del config_without_db['database'] print("connecting to MySQL server...") # 添加连接超时设置 config_without_db['connect_timeout'] = 10 temp_conn = pymysql.connect(**config_without_db) print("成功连接到MySQL服务器") temp_cursor = temp_conn.cursor() temp_cursor.close() temp_conn.close() print("成功连接MySQL服务器") except pymysql.Error as e: print(str(e)) raise e # 连接指定的数据库 self.conn = pymysql.connect(**DB_CONFIG) self.cursor = self.conn.cursor() print("数据库连接成功") except pymysql.Error as e: error_msg = f"数据库连接失败: {str(e)}" if "Can't connect to MySQL server" in str(e): error_msg = "\n错误:MySQL服务未运行\n解决方案:\n1. 以管理员身份打开命令提示符\n2. 输入命令 'net start MySQL80' 启动服务\n3. 如果启动失败,请检查MySQL服务是否正确安装" elif e.args[0] == 2003: error_msg += "\n解决方案:\n1. 检查MySQL服务是否启动\n2. 确认主机名是否正确(当前为localhost)\n3. 检查防火墙设置是否允许MySQL连接" elif e.args[0] == 1045: error_msg += "\n解决方案:\n1. 验证用户名和密码是否正确\n2. 确认该用户是否有权限访问数据库\n3. 如忘记密码,请联系数据库管理员重置" elif e.args[0] == 1049: error_msg += "\n解决方案:\n1. 确认数据库名称是否正确(当前为testdb)\n2. 使用以下SQL命令创建数据库:\n CREATE DATABASE testdb;" print(error_msg) put_text(error_msg) # 在Web界面显示错误信息 self.conn = None self.cursor = None def upload_csv(self): try: print("等待用户上传CSV文件...") upfile = file_upload("请选择CSV文件:", accept=".csv", multiple=False) if not upfile: put_text("请上传CSV文件") print("用户未选择文件") return file_content = upfile['content'] file_name = upfile['filename'] print(f"文件已上传: {file_name}") # 将上传的文件内容保存到临时文件 import tempfile import os temp_dir = tempfile.gettempdir() temp_file_path = os.path.join(temp_dir, file_name) with open(temp_file_path, 'wb') as f: f.write(file_content) self.process_csv(temp_file_path) except Exception as e: error_msg = f"文件上传错误: {str(e)}" put_text(error_msg) print(error_msg) def process_csv(self, file_name): try: print("开始处理CSV文件...") # 读取CSV文件 data = self.read_csv(file_name) # 显示结果 if data: success_msg = f"文件读取成功,共{len(data)}行数据" put_text("文件读取成功") put_table([['文件名', '行数'], [file_name, len(data)]]) print(success_msg) # 处理充电枪编码数据 if len(data) > 1: # 确保有数据行 self.process_gun_codes(data) except Exception as e: error_msg = f"数据处理错误: {str(e)}" put_text(error_msg) print(error_msg) def read_csv(self, filename): try: print(f"正在读取文件: {filename}") if not os.path.exists(filename): raise FileNotFoundError(f"文件不存在: {filename}") data = [] encodings = ['utf-8', 'gbk', 'gb2312', 'utf-16'] for encoding in encodings: try: with open(filename, 'r', encoding=encoding) as file: print(f"尝试使用 {encoding} 编码读取文件...") csv_reader = csv.reader(file) row_count = 0 data = [] # 读取第一行以验证CSV格式 header = next(csv_reader, None) if not header: raise ValueError("CSV文件为空") data.append(header) print(f"CSV文件头: {header}") for row in csv_reader: if not any(row): # 跳过空行 continue data.append(row) row_count += 1 if row_count % 100 == 0: print(f"已读取 {row_count} 行...") print(f"文件读取完成,使用 {encoding} 编码成功") return data except UnicodeDecodeError: print(f"{encoding} 编码读取失败,尝试其他编码...") continue except Exception as e: raise Exception(f"使用 {encoding} 编码读取时出错: {str(e)}") raise Exception("尝试所有编码都失败,无法读取文件") except Exception as e: error_msg = f"读取文件 {filename} 时出错: {str(e)}" put_text(error_msg) print(error_msg) return None def process_gun_codes(self, data): if not self.cursor: put_text("数据库未连接,无法处理数据") return try: # 假设CSV第一列是充电枪编码 gun_codes = [row[1] for row in data[1:]] # 跳过标题行 results = [] # 存储查询结果 total_codes = len(gun_codes) # 创建进度条 put_processbar('progress', auto_close=True) # 处理每个充电枪编码 for index, each_code in enumerate(gun_codes): sql_query = """ SELECT max(temp_count) AS count1 FROM ( SELECT a.实际支付, @count := IF(@met_one = 0 AND a.实际支付 = 0, @count + 1, @count) AS temp_count, @met_one := IF(a.实际支付 != 0 OR @met_one = 1, 1, 0) AS temp_met FROM testdb.dingdan2025 a CROSS JOIN (SELECT @count := 0, @met_one := 0) AS vars where a.充电终端 = %s ORDER BY a.下单时间 DESC ) AS subquery """ try: print(f"执行SQL查询,充电枪编码: {each_code}") print(f"SQL语句: {sql_query}") print(f"参数值: {each_code}") # 执行查询 self.cursor.execute(sql_query, (each_code,)) result = self.cursor.fetchone() print(f"查询结果: {result}") if result is None: print(f"警告:充电枪编码 {each_code} 未找到匹配记录") # 更新进度条 set_processbar('progress', (index + 1) / total_codes) except pymysql.Error as e: error_msg = f"SQL执行错误 (充电枪编码: {each_code}): {str(e)}" print(error_msg) put_text(error_msg) if e.args[0] == 1146: # 表不存在错误 print("错误:表'dingdan2025'不存在,请确认数据库表是否正确创建") raise e results.append(result['count1'] if result and result['count1'] is not None else 0) # 准备表格数据 headers = data[0] + ['连续异常积累单数'] table_data = [headers] # 添加表头 # 合并CSV数据和查询结果 for i, row in enumerate(data[1:]): table_data.append(row + [str(results[i])]) # 在Web界面显示结果 put_text("数据分析结果:") put_table(table_data) # 准备下载的CSV数据 output_csv = tempfile.NamedTemporaryFile(mode='w', newline='', suffix='.csv', delete=False, encoding='utf-8-sig') csv_writer = csv.writer(output_csv) csv_writer.writerows(table_data) output_csv.close() # 添加下载按钮 def download_callback(): with open(output_csv.name, 'rb') as f: content = f.read() download("分析结果.csv", content) os.unlink(output_csv.name) # 删除临时文件 put_buttons(['下载分析结果'], onclick=[download_callback]) print("数据处理完成") except Exception as e: error_msg = f"处理充电枪编码时出错: {str(e)}" put_text(error_msg) print(error_msg) def __del__(self): if self.cursor: self.cursor.close() if self.conn: self.conn.close() print("数据库连接已关闭") if __name__ == '__main__': try: print("正在初始化应用...") app = CSVProcessor() from pywebio.platform.tornado_http import start_server print("正在启动Web服务器...") start_server( applications=app.upload_csv, port=8077, host='localhost', debug=False, cdn=False, auto_open_webbrowser=True ) print("Web服务器启动成功,请访问 http://localhost:8077") except ImportError as e: print(f"错误:缺少必要的库,请确保已安装pywebio。错误信息:{e}") except OSError as e: if "address already in use" in str(e).lower(): print("错误:端口8077已被占用,请关闭占用该端口的程序后重试。") else: print(f"错误:服务器启动失败。错误信息:{e}") except Exception as e: print(f"错误:服务器启动失败。错误信息:{e}") print("提示:请检查端口8077是否被占用,或尝试更换其他端口。")