2026-05-18 09:08:31 +08:00

123 lines
3.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""缓存状态管理模块"""
import json
import os
import pandas as pd
CACHE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".cache")
CACHE_FILE = os.path.join(CACHE_DIR, "lof_cache.json")
PURCHASE_CACHE_FILE = os.path.join(CACHE_DIR, "purchase_cache.json")
# LOF 实时数据缓存stale-while-revalidate
cache_data = {"data": [], "large": [], "small": [], "time": None}
# 净值/限额数据缓存(变化频率低,缓存 5 分钟)
purchase_cache = {"data": None, "time": None}
PURCHASE_CACHE_TTL_SECONDS = 300 # 5 分钟
def _ensure_cache_dir():
os.makedirs(CACHE_DIR, exist_ok=True)
def _save_to_disk(data: dict, has_valid_data: bool = False):
"""将缓存写入磁盘文件"""
try:
_ensure_cache_dir()
payload = {
"data": data.get("data", []),
"large": data.get("large", []),
"small": data.get("small", []),
"time": str(pd.Timestamp.now()),
"valid": has_valid_data
}
with open(CACHE_FILE, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False)
except Exception:
pass
def _load_from_disk() -> dict | None:
"""从磁盘文件加载缓存"""
try:
if os.path.exists(CACHE_FILE):
with open(CACHE_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
# 只返回标记为有效的数据
if data.get("valid") and data.get("large"):
return data
except Exception:
pass
return None
def update_purchase_cache(data: pd.DataFrame) -> None:
"""更新净值/限额缓存"""
global purchase_cache
purchase_cache = {"data": data, "time": pd.Timestamp.now()}
def get_purchase_cache_age() -> float:
"""获取净值/限额缓存已存在的秒数"""
if purchase_cache["time"] is None:
return float("inf")
return (pd.Timestamp.now() - purchase_cache["time"]).total_seconds()
def is_purchase_cache_valid() -> bool:
"""判断净值/限额缓存是否仍然有效"""
if purchase_cache["data"] is None or purchase_cache["time"] is None:
return False
return get_purchase_cache_age() < PURCHASE_CACHE_TTL_SECONDS
def get_purchase_cache_data() -> pd.DataFrame:
"""获取缓存的净值/限额数据副本"""
return purchase_cache["data"].copy()
def update_cache_data(large_data: list, small_data: list, has_valid_data: bool = False) -> None:
"""更新 LOF 实时数据缓存(内存 + 磁盘)"""
global cache_data
cache_data = {
"data": large_data + small_data,
"large": large_data,
"small": small_data,
"time": pd.Timestamp.now()
}
# 写入磁盘,标记是否为有效数据
_save_to_disk(cache_data, has_valid_data=has_valid_data)
def get_cached_lof_data() -> dict | None:
"""获取缓存的 LOF 实时数据(仅大基金)"""
# 优先读内存
if cache_data["large"]:
return {
"data": cache_data["large"],
"hasMore": len(cache_data["small"]) > 0,
"time": cache_data["time"]
}
# 内存无缓存,尝试读磁盘
disk = _load_from_disk()
if disk and disk.get("large"):
cache_data["data"] = disk["data"]
cache_data["large"] = disk["large"]
cache_data["small"] = disk.get("small", [])
cache_data["time"] = disk.get("time")
return {
"data": disk["large"],
"hasMore": len(disk.get("small", [])) > 0,
"time": disk.get("time")
}
return None
def get_cached_small_data() -> list:
"""获取缓存的剩余小基金数据"""
if cache_data["small"]:
return cache_data["small"]
disk = _load_from_disk()
if disk:
return disk.get("small", [])
return []