This commit is contained in:
etoai 2025-05-08 18:20:55 +08:00
parent fd1a7e84ca
commit 322f94d622
5 changed files with 341 additions and 0 deletions

23
bmi.py Normal file
View File

@ -0,0 +1,23 @@
import pywebio
from pywebio.input import input, FLOAT
from pywebio.output import put_text
def bmi():
height = input("请输入你的身高(cm)", type=FLOAT)
weight = input("请输入你的体重(kg)", type=FLOAT)
BMI = weight / (height / 100) ** 2
top_status = [(14.9, '极瘦'), (18.4, '偏瘦'),
(22.9, '正常'), (27.5, '过重'),
(40.0, '肥胖'), (float('inf'), '非常肥胖')]
for top, status in top_status:
if BMI <= top:
put_text('你的 BMI 值: %.1f,身体状态:%s' % (BMI, status))
break
# print('sss')
if __name__ == '__main__':
pywebio.start_server(bmi,8081,cdn=False)

280
errortest.py Normal file
View File

@ -0,0 +1,280 @@
import csv
import os
import tempfile
import pymysql
from pywebio.input import file_upload
from pywebio.output import put_text, put_table, put_processbar, set_processbar, put_buttons
from pywebio.session import download
# MySQL数据库配置
DB_CONFIG = {
'host': 'localhost',
'user': 'testDB',
'password': 'WxMKtsNG8BRmWxDX',
'database': 'testdb',
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor
}
class CSVProcessor:
def __init__(self):
try:
print("正在连接数据库...")
print(f"数据库配置信息: 主机={DB_CONFIG['host']}, 数据库={DB_CONFIG['database']}")
# 首先尝试连接MySQL服务器不指定数据库
try:
print("尝试连接MySQL服务器...")
config_without_db = DB_CONFIG.copy()
del config_without_db['database']
print("connecting to MySQL server...")
# 添加连接超时设置
config_without_db['connect_timeout'] = 10
temp_conn = pymysql.connect(**config_without_db)
print("成功连接到MySQL服务器")
temp_cursor = temp_conn.cursor()
temp_cursor.close()
temp_conn.close()
print("成功连接MySQL服务器")
except pymysql.Error as e:
print(str(e))
raise e
# 连接指定的数据库
self.conn = pymysql.connect(**DB_CONFIG)
self.cursor = self.conn.cursor()
print("数据库连接成功")
except pymysql.Error as e:
error_msg = f"数据库连接失败: {str(e)}"
if "Can't connect to MySQL server" in str(e):
error_msg = "\n错误MySQL服务未运行\n解决方案:\n1. 以管理员身份打开命令提示符\n2. 输入命令 'net start MySQL80' 启动服务\n3. 如果启动失败请检查MySQL服务是否正确安装"
elif e.args[0] == 2003:
error_msg += "\n解决方案:\n1. 检查MySQL服务是否启动\n2. 确认主机名是否正确当前为localhost\n3. 检查防火墙设置是否允许MySQL连接"
elif e.args[0] == 1045:
error_msg += "\n解决方案:\n1. 验证用户名和密码是否正确\n2. 确认该用户是否有权限访问数据库\n3. 如忘记密码,请联系数据库管理员重置"
elif e.args[0] == 1049:
error_msg += "\n解决方案:\n1. 确认数据库名称是否正确当前为testdb\n2. 使用以下SQL命令创建数据库\n CREATE DATABASE testdb;"
print(error_msg)
put_text(error_msg) # 在Web界面显示错误信息
self.conn = None
self.cursor = None
def upload_csv(self):
try:
print("等待用户上传CSV文件...")
upfile = file_upload("请选择CSV文件:", accept=".csv", multiple=False)
if not upfile:
put_text("请上传CSV文件")
print("用户未选择文件")
return
file_content = upfile['content']
file_name = upfile['filename']
print(f"文件已上传: {file_name}")
# 将上传的文件内容保存到临时文件
import tempfile
import os
temp_dir = tempfile.gettempdir()
temp_file_path = os.path.join(temp_dir, file_name)
with open(temp_file_path, 'wb') as f:
f.write(file_content)
self.process_csv(temp_file_path)
except Exception as e:
error_msg = f"文件上传错误: {str(e)}"
put_text(error_msg)
print(error_msg)
def process_csv(self, file_name):
try:
print("开始处理CSV文件...")
# 读取CSV文件
data = self.read_csv(file_name)
# 显示结果
if data:
success_msg = f"文件读取成功,共{len(data)}行数据"
put_text("文件读取成功")
put_table([['文件名', '行数'],
[file_name, len(data)]])
print(success_msg)
# 处理充电枪编码数据
if len(data) > 1: # 确保有数据行
self.process_gun_codes(data)
except Exception as e:
error_msg = f"数据处理错误: {str(e)}"
put_text(error_msg)
print(error_msg)
def read_csv(self, filename):
try:
print(f"正在读取文件: {filename}")
if not os.path.exists(filename):
raise FileNotFoundError(f"文件不存在: {filename}")
data = []
encodings = ['utf-8', 'gbk', 'gb2312', 'utf-16']
for encoding in encodings:
try:
with open(filename, 'r', encoding=encoding) as file:
print(f"尝试使用 {encoding} 编码读取文件...")
csv_reader = csv.reader(file)
row_count = 0
data = []
# 读取第一行以验证CSV格式
header = next(csv_reader, None)
if not header:
raise ValueError("CSV文件为空")
data.append(header)
print(f"CSV文件头: {header}")
for row in csv_reader:
if not any(row): # 跳过空行
continue
data.append(row)
row_count += 1
if row_count % 100 == 0:
print(f"已读取 {row_count} 行...")
print(f"文件读取完成,使用 {encoding} 编码成功")
return data
except UnicodeDecodeError:
print(f"{encoding} 编码读取失败,尝试其他编码...")
continue
except Exception as e:
raise Exception(f"使用 {encoding} 编码读取时出错: {str(e)}")
raise Exception("尝试所有编码都失败,无法读取文件")
except Exception as e:
error_msg = f"读取文件 {filename} 时出错: {str(e)}"
put_text(error_msg)
print(error_msg)
return None
def process_gun_codes(self, data):
if not self.cursor:
put_text("数据库未连接,无法处理数据")
return
try:
# 假设CSV第一列是充电枪编码
gun_codes = [row[1] for row in data[1:]] # 跳过标题行
results = [] # 存储查询结果
total_codes = len(gun_codes)
# 创建进度条
put_processbar('progress', auto_close=True)
# 处理每个充电枪编码
for index, each_code in enumerate(gun_codes):
sql_query = """
SELECT max(temp_count) AS count1
FROM (
SELECT
a.实际支付,
@count := IF(@met_one = 0 AND a.实际支付 = 0, @count + 1, @count) AS temp_count,
@met_one := IF(a.实际支付 != 0 OR @met_one = 1, 1, 0) AS temp_met
FROM testdb.dingdan2025 a
CROSS JOIN (SELECT @count := 0, @met_one := 0) AS vars
where a.充电终端 = %s
ORDER BY a.下单时间 DESC
) AS subquery
"""
try:
print(f"执行SQL查询充电枪编码: {each_code}")
print(f"SQL语句: {sql_query}")
print(f"参数值: {each_code}")
# 执行查询
self.cursor.execute(sql_query, (each_code,))
result = self.cursor.fetchone()
print(f"查询结果: {result}")
if result is None:
print(f"警告:充电枪编码 {each_code} 未找到匹配记录")
# 更新进度条
set_processbar('progress', (index + 1) / total_codes)
except pymysql.Error as e:
error_msg = f"SQL执行错误 (充电枪编码: {each_code}): {str(e)}"
print(error_msg)
put_text(error_msg)
if e.args[0] == 1146: # 表不存在错误
print("错误:表'dingdan2025'不存在,请确认数据库表是否正确创建")
raise e
results.append(result['count1'] if result and result['count1'] is not None else 0)
# 准备表格数据
headers = data[0] + ['连续异常积累单数']
table_data = [headers] # 添加表头
# 合并CSV数据和查询结果
for i, row in enumerate(data[1:]):
table_data.append(row + [str(results[i])])
# 在Web界面显示结果
put_text("数据分析结果:")
put_table(table_data)
# 准备下载的CSV数据
output_csv = tempfile.NamedTemporaryFile(mode='w', newline='', suffix='.csv', delete=False, encoding='utf-8-sig')
csv_writer = csv.writer(output_csv)
csv_writer.writerows(table_data)
output_csv.close()
# 添加下载按钮
def download_callback():
with open(output_csv.name, 'rb') as f:
content = f.read()
download("分析结果.csv", content)
os.unlink(output_csv.name) # 删除临时文件
put_buttons(['下载分析结果'], onclick=[download_callback])
print("数据处理完成")
except Exception as e:
error_msg = f"处理充电枪编码时出错: {str(e)}"
put_text(error_msg)
print(error_msg)
def __del__(self):
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()
print("数据库连接已关闭")
if __name__ == '__main__':
try:
print("正在初始化应用...")
app = CSVProcessor()
from pywebio.platform.tornado_http import start_server
print("正在启动Web服务器...")
start_server(
applications=app.upload_csv,
port=8077,
host='localhost',
debug=False,
cdn=False,
auto_open_webbrowser=True
)
print("Web服务器启动成功请访问 http://localhost:8077")
except ImportError as e:
print(f"错误缺少必要的库请确保已安装pywebio。错误信息{e}")
except OSError as e:
if "address already in use" in str(e).lower():
print("错误端口8077已被占用请关闭占用该端口的程序后重试。")
else:
print(f"错误:服务器启动失败。错误信息:{e}")
except Exception as e:
print(f"错误:服务器启动失败。错误信息:{e}")
print("提示请检查端口8077是否被占用或尝试更换其他端口。")

1
query Normal file
View File

@ -0,0 +1 @@
state=all

16
test.py Normal file
View File

@ -0,0 +1,16 @@
import os
from http import HTTPStatus
from dashscope import Application
response = Application.call(
# 若没有配置环境变量可用百炼API Key将下行替换为api_key="sk-xxx"。但不建议在生产环境中直接将API Key硬编码到代码中以减少API Key泄露风险。
api_key="sk-bf61ff61d4fc496a8e97b165e8f1b4d3",
app_id='1718ca5a4fb740a289bc5487ab96a11b',
prompt='你们公司的常用箱变的容量是多少')
if response.status_code != HTTPStatus.OK:
print(f'request_id={response.request_id}')
print(f'code={response.status_code}')
print(f'message={response.message}')
print(f'请参考文档https://help.aliyun.com/zh/model-studio/developer-reference/error-code')
else:
print(response.output.text)

21
tongji.py Normal file
View File

@ -0,0 +1,21 @@
import pywebio
from pywebio.input import input, FLOAT,file_upload
from pywebio.output import put_text
class tj:
def upXls(self):
upfile = file_upload("Select some pictures:", accept="xlsx/*", multiple=True)
# upfile2 = file_upload("Select some pictures:", accept="xlsx/*", multiple=False)
# put_text(upfile)
oldfile = upfile[0]['filename']
newfile = upfile[1]['filename']
self.readXls(oldfile,newfile)
def readXls(self,oldfile,newfile):
pass
if __name__ == '__main__':
app = tj()
pywebio.start_server(app.upXls,8081,cdn=False,debug=True,auto_open_webbrowser=True)