""" 分析引擎 - 核心估值和财务分析模块 """ import pandas as pd import numpy as np from typing import Dict, List, Tuple, Optional import logging from datetime import datetime, timedelta from config import LOW_VALUE_CRITERIA logger = logging.getLogger(__name__) class StockAnalyzer: def __init__(self): self.criteria = LOW_VALUE_CRITERIA def calculate_valuation_metrics(self, data: Dict) -> Dict: """计算估值指标""" try: key_metrics = data.get('key_metrics', {}) company_info = data.get('company_info', {}) # 基础估值指标 pe_ratio = key_metrics.get('pe_ratio', 0) pb_ratio = key_metrics.get('pb_ratio', 0) ps_ratio = key_metrics.get('ps_ratio', 0) peg_ratio = key_metrics.get('peg_ratio', 0) # 市值 market_cap = company_info.get('market_cap', 0) # 计算估值分数 (0-100分,分数越高表示越被低估) valuation_score = 0 # PE比率评分 if pe_ratio > 0 and pe_ratio <= self.criteria['max_pe_ratio']: pe_score = max(0, 100 - (pe_ratio / self.criteria['max_pe_ratio']) * 50) valuation_score += pe_score * 0.3 # PB比率评分 if pb_ratio > 0 and pb_ratio <= self.criteria['max_pb_ratio']: pb_score = max(0, 100 - (pb_ratio / self.criteria['max_pb_ratio']) * 50) valuation_score += pb_score * 0.3 # PS比率评分 if ps_ratio > 0 and ps_ratio <= self.criteria['max_ps_ratio']: ps_score = max(0, 100 - (ps_ratio / self.criteria['max_ps_ratio']) * 50) valuation_score += ps_score * 0.2 # PEG比率评分 if peg_ratio > 0 and peg_ratio <= 1.0: peg_score = max(0, 100 - peg_ratio * 50) valuation_score += peg_score * 0.2 return { 'pe_ratio': pe_ratio, 'pb_ratio': pb_ratio, 'ps_ratio': ps_ratio, 'peg_ratio': peg_ratio, 'market_cap': market_cap, 'valuation_score': min(100, max(0, valuation_score)), 'is_undervalued': pe_ratio <= self.criteria['max_pe_ratio'] and pb_ratio <= self.criteria['max_pb_ratio'] and ps_ratio <= self.criteria['max_ps_ratio'] } except Exception as e: logger.error(f"计算估值指标失败: {e}") return {'valuation_score': 0, 'is_undervalued': False} def calculate_financial_health(self, data: Dict) -> Dict: """计算财务健康度""" try: key_metrics = data.get('key_metrics', {}) financial_statements = data.get('financial_statements', {}) # 获取最新财务数据 latest_financial = self._get_latest_financial_data(financial_statements) # 流动性指标 current_ratio = key_metrics.get('current_ratio', 0) quick_ratio = key_metrics.get('quick_ratio', 0) # 偿债能力指标 debt_to_equity = key_metrics.get('debt_to_equity', 0) debt_ratio = self._calculate_debt_ratio(latest_financial) # 盈利能力指标 roe = key_metrics.get('return_on_equity', 0) roa = key_metrics.get('return_on_assets', 0) profit_margin = key_metrics.get('profit_margin', 0) operating_margin = key_metrics.get('operating_margin', 0) # 计算财务健康分数 health_score = 0 # 流动性评分 (30%) if current_ratio >= self.criteria['min_current_ratio']: liquidity_score = min(100, (current_ratio / 2.0) * 100) health_score += liquidity_score * 0.3 # 偿债能力评分 (30%) if debt_ratio <= self.criteria['max_debt_ratio']: debt_score = max(0, 100 - (debt_ratio / self.criteria['max_debt_ratio']) * 100) health_score += debt_score * 0.3 # 盈利能力评分 (40%) if roe >= self.criteria['min_roe']: profitability_score = min(100, (roe / 0.2) * 100) # 20% ROE为满分 health_score += profitability_score * 0.4 return { 'current_ratio': current_ratio, 'quick_ratio': quick_ratio, 'debt_to_equity': debt_to_equity, 'debt_ratio': debt_ratio, 'roe': roe, 'roa': roa, 'profit_margin': profit_margin, 'operating_margin': operating_margin, 'health_score': min(100, max(0, health_score)), 'is_healthy': (current_ratio >= self.criteria['min_current_ratio'] and debt_ratio <= self.criteria['max_debt_ratio'] and roe >= self.criteria['min_roe']) } except Exception as e: logger.error(f"计算财务健康度失败: {e}") return {'health_score': 0, 'is_healthy': False} def calculate_growth_metrics(self, data: Dict) -> Dict: """计算成长性指标""" try: key_metrics = data.get('key_metrics', {}) financial_statements = data.get('financial_statements', {}) # 收入增长率 revenue_growth = key_metrics.get('revenue_growth', 0) earnings_growth = key_metrics.get('earnings_growth', 0) # 计算历史增长率 historical_growth = self._calculate_historical_growth(financial_statements) # 成长性评分 growth_score = 0 # 收入增长率评分 if revenue_growth > 0: revenue_score = min(100, revenue_growth * 100) growth_score += revenue_score * 0.4 # 利润增长率评分 if earnings_growth > 0: earnings_score = min(100, earnings_growth * 100) growth_score += earnings_score * 0.4 # 历史增长率评分 if historical_growth > 0: historical_score = min(100, historical_growth * 100) growth_score += historical_score * 0.2 return { 'revenue_growth': revenue_growth, 'earnings_growth': earnings_growth, 'historical_growth': historical_growth, 'growth_score': min(100, max(0, growth_score)), 'is_growing': revenue_growth > 0.05 and earnings_growth > 0.05 # 5%以上增长 } except Exception as e: logger.error(f"计算成长性指标失败: {e}") return {'growth_score': 0, 'is_growing': False} def calculate_risk_metrics(self, data: Dict) -> Dict: """计算风险指标""" try: key_metrics = data.get('key_metrics', {}) stock_prices = data.get('stock_prices', pd.DataFrame()) # Beta系数 beta = key_metrics.get('beta', 1.0) # 计算波动率 volatility = self._calculate_volatility(stock_prices) # 计算最大回撤 max_drawdown = self._calculate_max_drawdown(stock_prices) # 风险评分 (分数越高表示风险越低) risk_score = 100 # Beta风险评分 if beta > 1.5: risk_score -= (beta - 1.5) * 20 elif beta < 0.8: risk_score -= (0.8 - beta) * 10 # 波动率风险评分 if volatility > 0.3: # 30%以上波动率 risk_score -= (volatility - 0.3) * 100 # 回撤风险评分 if max_drawdown > 0.2: # 20%以上回撤 risk_score -= (max_drawdown - 0.2) * 100 return { 'beta': beta, 'volatility': volatility, 'max_drawdown': max_drawdown, 'risk_score': max(0, min(100, risk_score)), 'risk_level': self._get_risk_level(risk_score) } except Exception as e: logger.error(f"计算风险指标失败: {e}") return {'risk_score': 50, 'risk_level': '中等'} def perform_dcf_valuation(self, data: Dict) -> Dict: """执行DCF估值分析""" try: financial_statements = data.get('financial_statements', {}) key_metrics = data.get('key_metrics', {}) # 获取最新财务数据 latest_financial = self._get_latest_financial_data(financial_statements) if not latest_financial: return {'dcf_value': 0, 'dcf_score': 0} # 基础参数 revenue = latest_financial.get('revenue', 0) net_income = latest_financial.get('net_income', 0) total_assets = latest_financial.get('total_assets', 0) if revenue <= 0 or net_income <= 0: return {'dcf_value': 0, 'dcf_score': 0} # 假设参数 growth_rate = min(0.1, max(0.02, key_metrics.get('revenue_growth', 0.05))) # 2%-10% discount_rate = 0.1 # 10%折现率 terminal_growth_rate = 0.03 # 3%永续增长率 # 计算自由现金流 (简化版) fcf = net_income * 0.8 # 假设80%的净利润为自由现金流 # 5年预测 years = 5 dcf_value = 0 for year in range(1, years + 1): future_fcf = fcf * ((1 + growth_rate) ** year) discounted_fcf = future_fcf / ((1 + discount_rate) ** year) dcf_value += discounted_fcf # 终值计算 terminal_fcf = fcf * ((1 + growth_rate) ** years) * (1 + terminal_growth_rate) terminal_value = terminal_fcf / (discount_rate - terminal_growth_rate) discounted_terminal_value = terminal_value / ((1 + discount_rate) ** years) total_dcf_value = dcf_value + discounted_terminal_value # 获取当前股价进行对比 current_price = self._get_current_price(data) if current_price > 0: dcf_score = min(100, (total_dcf_value / current_price) * 50) # 50%以上溢价为满分 else: dcf_score = 0 return { 'dcf_value': total_dcf_value, 'current_price': current_price, 'dcf_score': dcf_score, 'is_undervalued_dcf': total_dcf_value > current_price if current_price > 0 else False } except Exception as e: logger.error(f"DCF估值分析失败: {e}") return {'dcf_value': 0, 'dcf_score': 0} def generate_investment_recommendation(self, analysis_results: Dict) -> Dict: """生成投资建议""" try: valuation_score = analysis_results.get('valuation_score', 0) health_score = analysis_results.get('financial_health_score', 0) growth_score = analysis_results.get('growth_score', 0) risk_score = analysis_results.get('risk_score', 0) # 计算综合评分 overall_score = ( valuation_score * 0.4 + health_score * 0.3 + growth_score * 0.2 + risk_score * 0.1 ) # 生成建议 if overall_score >= 80: recommendation = "强烈买入" confidence = "高" elif overall_score >= 65: recommendation = "买入" confidence = "中高" elif overall_score >= 50: recommendation = "持有" confidence = "中等" elif overall_score >= 35: recommendation = "观望" confidence = "中低" else: recommendation = "卖出" confidence = "高" # 风险提示 risk_warnings = [] if risk_score < 40: risk_warnings.append("高风险投资") if health_score < 50: risk_warnings.append("财务健康度较低") if growth_score < 30: risk_warnings.append("成长性不足") return { 'recommendation': recommendation, 'confidence': confidence, 'overall_score': overall_score, 'risk_warnings': risk_warnings, 'key_strengths': self._get_key_strengths(analysis_results), 'key_concerns': self._get_key_concerns(analysis_results) } except Exception as e: logger.error(f"生成投资建议失败: {e}") return {'recommendation': '无法分析', 'overall_score': 0} def _get_latest_financial_data(self, financial_statements: Dict) -> Dict: """获取最新财务数据""" if not financial_statements: return {} # 按年份排序,获取最新的数据 sorted_periods = sorted(financial_statements.keys(), reverse=True) return financial_statements.get(sorted_periods[0], {}) def _calculate_debt_ratio(self, financial_data: Dict) -> float: """计算债务比率""" total_liabilities = financial_data.get('total_liabilities', 0) total_assets = financial_data.get('total_assets', 0) if total_assets > 0: return total_liabilities / total_assets return 0 def _calculate_historical_growth(self, financial_statements: Dict) -> float: """计算历史增长率""" if len(financial_statements) < 2: return 0 # 获取最近两年的收入数据 sorted_periods = sorted(financial_statements.keys(), reverse=True) if len(sorted_periods) < 2: return 0 current_revenue = financial_statements[sorted_periods[0]].get('revenue', 0) previous_revenue = financial_statements[sorted_periods[1]].get('revenue', 0) if previous_revenue > 0: return (current_revenue - previous_revenue) / previous_revenue return 0 def _calculate_volatility(self, stock_prices: pd.DataFrame) -> float: """计算波动率""" if stock_prices.empty or 'close' not in stock_prices.columns: return 0 returns = stock_prices['close'].pct_change().dropna() return returns.std() * np.sqrt(252) # 年化波动率 def _calculate_max_drawdown(self, stock_prices: pd.DataFrame) -> float: """计算最大回撤""" if stock_prices.empty or 'close' not in stock_prices.columns: return 0 prices = stock_prices['close'] peak = prices.expanding().max() drawdown = (prices - peak) / peak return abs(drawdown.min()) def _get_risk_level(self, risk_score: float) -> str: """获取风险等级""" if risk_score >= 80: return "低" elif risk_score >= 60: return "中低" elif risk_score >= 40: return "中等" elif risk_score >= 20: return "中高" else: return "高" def _get_current_price(self, data: Dict) -> float: """获取当前股价""" stock_prices = data.get('stock_prices', pd.DataFrame()) if not stock_prices.empty and 'close' in stock_prices.columns: return stock_prices['close'].iloc[-1] return 0 def _get_key_strengths(self, analysis_results: Dict) -> List[str]: """获取关键优势""" strengths = [] if analysis_results.get('valuation_score', 0) > 70: strengths.append("估值吸引力高") if analysis_results.get('financial_health_score', 0) > 70: strengths.append("财务健康度良好") if analysis_results.get('growth_score', 0) > 70: strengths.append("成长性优秀") if analysis_results.get('risk_score', 0) > 70: strengths.append("风险控制良好") return strengths def _get_key_concerns(self, analysis_results: Dict) -> List[str]: """获取关键担忧""" concerns = [] if analysis_results.get('valuation_score', 0) < 40: concerns.append("估值偏高") if analysis_results.get('financial_health_score', 0) < 40: concerns.append("财务健康度不佳") if analysis_results.get('growth_score', 0) < 40: concerns.append("成长性不足") if analysis_results.get('risk_score', 0) < 40: concerns.append("风险较高") return concerns