""" 分析结果存储模块 - 标准化保存分析报告 """ import os import json from datetime import datetime from typing import Dict, Any import logging logger = logging.getLogger(__name__) class AnalysisStorage: def __init__(self, base_dir: str = "analysis_reports"): self.base_dir = base_dir self.ensure_directories() def ensure_directories(self): """确保必要的目录存在""" directories = [ self.base_dir, os.path.join(self.base_dir, "detailed"), os.path.join(self.base_dir, "summaries"), os.path.join(self.base_dir, "charts"), os.path.join(self.base_dir, "raw_data") ] for directory in directories: os.makedirs(directory, exist_ok=True) def save_analysis_report(self, symbol: str, analysis_data: Dict[str, Any]) -> Dict[str, str]: """ 保存分析报告到文件 Args: symbol: 股票代码 analysis_data: 分析数据 Returns: 保存的文件路径字典 """ try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") date_str = datetime.now().strftime("%Y-%m-%d") # 1. 保存详细分析报告 (JSON格式) detailed_file = self._save_detailed_report(symbol, analysis_data, timestamp) # 2. 保存简要报告 (Markdown格式) summary_file = self._save_summary_report(symbol, analysis_data, timestamp) # 3. 保存原始数据 (JSON格式) raw_data_file = self._save_raw_data(symbol, analysis_data, timestamp) # 4. 更新索引文件 self._update_index(symbol, analysis_data, timestamp) return { 'detailed_report': detailed_file, 'summary_report': summary_file, 'raw_data': raw_data_file, 'timestamp': timestamp, 'date': date_str } except Exception as e: logger.error(f"保存分析报告失败 {symbol}: {e}") return {} def _save_detailed_report(self, symbol: str, analysis_data: Dict, timestamp: str) -> str: """保存详细分析报告""" filename = f"{symbol}_detailed_{timestamp}.json" filepath = os.path.join(self.base_dir, "detailed", filename) detailed_report = { "analysis_metadata": { "symbol": symbol, "analysis_date": datetime.now().isoformat(), "timestamp": timestamp, "version": "1.0" }, "company_info": analysis_data.get('company_info', {}), "financial_data": analysis_data.get('financial_data', {}), "valuation_analysis": analysis_data.get('valuation_analysis', {}), "financial_health": analysis_data.get('financial_health', {}), "growth_analysis": analysis_data.get('growth_analysis', {}), "risk_analysis": analysis_data.get('risk_analysis', {}), "investment_recommendation": analysis_data.get('investment_recommendation', {}), "analyst_opinions": analysis_data.get('analyst_opinions', {}), "market_data": analysis_data.get('market_data', {}), "raw_data": analysis_data.get('raw_data', {}) } with open(filepath, 'w', encoding='utf-8') as f: json.dump(detailed_report, f, ensure_ascii=False, indent=2) logger.info(f"详细报告已保存: {filepath}") return filepath def _save_summary_report(self, symbol: str, analysis_data: Dict, timestamp: str) -> str: """保存简要分析报告 (Markdown格式)""" filename = f"{symbol}_summary_{timestamp}.md" filepath = os.path.join(self.base_dir, "summaries", filename) # 生成Markdown格式的简要报告 markdown_content = self._generate_markdown_summary(symbol, analysis_data, timestamp) with open(filepath, 'w', encoding='utf-8') as f: f.write(markdown_content) logger.info(f"简要报告已保存: {filepath}") return filepath def _save_raw_data(self, symbol: str, analysis_data: Dict, timestamp: str) -> str: """保存原始数据""" filename = f"{symbol}_raw_data_{timestamp}.json" filepath = os.path.join(self.base_dir, "raw_data", filename) with open(filepath, 'w', encoding='utf-8') as f: json.dump(analysis_data, f, ensure_ascii=False, indent=2) logger.info(f"原始数据已保存: {filepath}") return filepath def _update_index(self, symbol: str, analysis_data: Dict, timestamp: str): """更新索引文件""" index_file = os.path.join(self.base_dir, "analysis_index.json") # 读取现有索引 if os.path.exists(index_file): with open(index_file, 'r', encoding='utf-8') as f: index_data = json.load(f) else: index_data = {"analyses": []} # 添加新分析记录 new_entry = { "symbol": symbol, "timestamp": timestamp, "analysis_date": datetime.now().isoformat(), "company_name": analysis_data.get('company_info', {}).get('name', ''), "recommendation": analysis_data.get('investment_recommendation', {}).get('recommendation', 'N/A'), "overall_score": analysis_data.get('investment_recommendation', {}).get('overall_score', 0), "files": { "detailed": f"{symbol}_detailed_{timestamp}.json", "summary": f"{symbol}_summary_{timestamp}.md", "raw_data": f"{symbol}_raw_data_{timestamp}.json" } } index_data["analyses"].append(new_entry) # 按时间倒序排列 index_data["analyses"].sort(key=lambda x: x["timestamp"], reverse=True) # 保存索引 with open(index_file, 'w', encoding='utf-8') as f: json.dump(index_data, f, ensure_ascii=False, indent=2) def _generate_markdown_summary(self, symbol: str, analysis_data: Dict, timestamp: str) -> str: """生成Markdown格式的简要报告""" company_info = analysis_data.get('company_info', {}) investment_rec = analysis_data.get('investment_recommendation', {}) valuation = analysis_data.get('valuation_analysis', {}) financial_health = analysis_data.get('financial_health', {}) growth = analysis_data.get('growth_analysis', {}) risk = analysis_data.get('risk_analysis', {}) markdown = f"""# {symbol} 股票分析报告 ## 📊 基本信息 - **公司名称**: {company_info.get('name', 'N/A')} - **行业**: {company_info.get('industry', 'N/A')} - **市值**: ${company_info.get('market_cap', 0):,.0f} - **分析时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ## 🎯 投资建议 - **建议**: {investment_rec.get('recommendation', 'N/A')} - **信心度**: {investment_rec.get('confidence', 'N/A')} - **综合评分**: {investment_rec.get('overall_score', 0):.1f}/100 ## 📈 各维度评分 | 维度 | 评分 | 评级 | |------|------|------| | 估值吸引力 | {valuation.get('valuation_score', 0):.1f}/100 | {self._get_rating(valuation.get('valuation_score', 0))} | | 财务健康度 | {financial_health.get('health_score', 0):.1f}/100 | {self._get_rating(financial_health.get('health_score', 0))} | | 成长性 | {growth.get('growth_score', 0):.1f}/100 | {self._get_rating(growth.get('growth_score', 0))} | | 风险控制 | {risk.get('risk_score', 0):.1f}/100 | {self._get_rating(risk.get('risk_score', 0))} | ## ✅ 关键优势 {self._format_list(investment_rec.get('key_strengths', []))} ## ⚠️ 关键担忧 {self._format_list(investment_rec.get('key_concerns', []))} ## 🚨 风险提示 {self._format_list(investment_rec.get('risk_warnings', []))} ## 📋 详细指标 ### 估值指标 - PE比率: {valuation.get('pe_ratio', 'N/A')} - PB比率: {valuation.get('pb_ratio', 'N/A')} - PS比率: {valuation.get('ps_ratio', 'N/A')} - PEG比率: {valuation.get('peg_ratio', 'N/A')} ### 财务健康度 - 流动比率: {financial_health.get('current_ratio', 'N/A')} - 速动比率: {financial_health.get('quick_ratio', 'N/A')} - 债务比率: {financial_health.get('debt_ratio', 'N/A')} - ROE: {financial_health.get('roe', 'N/A')} ### 成长性指标 - 收入增长率: {growth.get('revenue_growth', 'N/A')} - 利润增长率: {growth.get('earnings_growth', 'N/A')} - 历史增长率: {growth.get('historical_growth', 'N/A')} ### 风险指标 - Beta系数: {risk.get('beta', 'N/A')} - 波动率: {risk.get('volatility', 'N/A')} - 最大回撤: {risk.get('max_drawdown', 'N/A')} - 风险等级: {risk.get('risk_level', 'N/A')} --- *分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}* *报告ID: {timestamp}* """ return markdown def _get_rating(self, score: float) -> str: """根据分数获取评级""" if score >= 90: return "优秀" elif score >= 80: return "良好" elif score >= 70: return "中等" elif score >= 60: return "一般" else: return "较差" def _format_list(self, items: list) -> str: """格式化列表""" if not items: return "无" return "\n".join([f"- {item}" for item in items]) def get_analysis_history(self, symbol: str = None) -> list: """获取分析历史""" index_file = os.path.join(self.base_dir, "analysis_index.json") if not os.path.exists(index_file): return [] with open(index_file, 'r', encoding='utf-8') as f: index_data = json.load(f) analyses = index_data.get('analyses', []) if symbol: analyses = [a for a in analyses if a['symbol'].upper() == symbol.upper()] return analyses def get_latest_analysis(self, symbol: str) -> Dict: """获取最新分析结果""" history = self.get_analysis_history(symbol) if not history: return {} latest = history[0] # 已按时间倒序排列 # 读取详细报告 detailed_file = os.path.join(self.base_dir, "detailed", latest['files']['detailed']) if os.path.exists(detailed_file): with open(detailed_file, 'r', encoding='utf-8') as f: return json.load(f) return {} def list_all_analyses(self) -> list: """列出所有分析记录""" return self.get_analysis_history()