430 lines
17 KiB
Python
430 lines
17 KiB
Python
"""
|
||
分析引擎 - 核心估值和财务分析模块
|
||
"""
|
||
import pandas as pd
|
||
import numpy as np
|
||
from typing import Dict, List, Tuple, Optional
|
||
import logging
|
||
from datetime import datetime, timedelta
|
||
from config import LOW_VALUE_CRITERIA
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class StockAnalyzer:
|
||
def __init__(self):
|
||
self.criteria = LOW_VALUE_CRITERIA
|
||
|
||
def calculate_valuation_metrics(self, data: Dict) -> Dict:
|
||
"""计算估值指标"""
|
||
try:
|
||
key_metrics = data.get('key_metrics', {})
|
||
company_info = data.get('company_info', {})
|
||
|
||
# 基础估值指标
|
||
pe_ratio = key_metrics.get('pe_ratio', 0)
|
||
pb_ratio = key_metrics.get('pb_ratio', 0)
|
||
ps_ratio = key_metrics.get('ps_ratio', 0)
|
||
peg_ratio = key_metrics.get('peg_ratio', 0)
|
||
|
||
# 市值
|
||
market_cap = company_info.get('market_cap', 0)
|
||
|
||
# 计算估值分数 (0-100分,分数越高表示越被低估)
|
||
valuation_score = 0
|
||
|
||
# PE比率评分
|
||
if pe_ratio > 0 and pe_ratio <= self.criteria['max_pe_ratio']:
|
||
pe_score = max(0, 100 - (pe_ratio / self.criteria['max_pe_ratio']) * 50)
|
||
valuation_score += pe_score * 0.3
|
||
|
||
# PB比率评分
|
||
if pb_ratio > 0 and pb_ratio <= self.criteria['max_pb_ratio']:
|
||
pb_score = max(0, 100 - (pb_ratio / self.criteria['max_pb_ratio']) * 50)
|
||
valuation_score += pb_score * 0.3
|
||
|
||
# PS比率评分
|
||
if ps_ratio > 0 and ps_ratio <= self.criteria['max_ps_ratio']:
|
||
ps_score = max(0, 100 - (ps_ratio / self.criteria['max_ps_ratio']) * 50)
|
||
valuation_score += ps_score * 0.2
|
||
|
||
# PEG比率评分
|
||
if peg_ratio > 0 and peg_ratio <= 1.0:
|
||
peg_score = max(0, 100 - peg_ratio * 50)
|
||
valuation_score += peg_score * 0.2
|
||
|
||
return {
|
||
'pe_ratio': pe_ratio,
|
||
'pb_ratio': pb_ratio,
|
||
'ps_ratio': ps_ratio,
|
||
'peg_ratio': peg_ratio,
|
||
'market_cap': market_cap,
|
||
'valuation_score': min(100, max(0, valuation_score)),
|
||
'is_undervalued': pe_ratio <= self.criteria['max_pe_ratio'] and
|
||
pb_ratio <= self.criteria['max_pb_ratio'] and
|
||
ps_ratio <= self.criteria['max_ps_ratio']
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"计算估值指标失败: {e}")
|
||
return {'valuation_score': 0, 'is_undervalued': False}
|
||
|
||
def calculate_financial_health(self, data: Dict) -> Dict:
|
||
"""计算财务健康度"""
|
||
try:
|
||
key_metrics = data.get('key_metrics', {})
|
||
financial_statements = data.get('financial_statements', {})
|
||
|
||
# 获取最新财务数据
|
||
latest_financial = self._get_latest_financial_data(financial_statements)
|
||
|
||
# 流动性指标
|
||
current_ratio = key_metrics.get('current_ratio', 0)
|
||
quick_ratio = key_metrics.get('quick_ratio', 0)
|
||
|
||
# 偿债能力指标
|
||
debt_to_equity = key_metrics.get('debt_to_equity', 0)
|
||
debt_ratio = self._calculate_debt_ratio(latest_financial)
|
||
|
||
# 盈利能力指标
|
||
roe = key_metrics.get('return_on_equity', 0)
|
||
roa = key_metrics.get('return_on_assets', 0)
|
||
profit_margin = key_metrics.get('profit_margin', 0)
|
||
operating_margin = key_metrics.get('operating_margin', 0)
|
||
|
||
# 计算财务健康分数
|
||
health_score = 0
|
||
|
||
# 流动性评分 (30%)
|
||
if current_ratio >= self.criteria['min_current_ratio']:
|
||
liquidity_score = min(100, (current_ratio / 2.0) * 100)
|
||
health_score += liquidity_score * 0.3
|
||
|
||
# 偿债能力评分 (30%)
|
||
if debt_ratio <= self.criteria['max_debt_ratio']:
|
||
debt_score = max(0, 100 - (debt_ratio / self.criteria['max_debt_ratio']) * 100)
|
||
health_score += debt_score * 0.3
|
||
|
||
# 盈利能力评分 (40%)
|
||
if roe >= self.criteria['min_roe']:
|
||
profitability_score = min(100, (roe / 0.2) * 100) # 20% ROE为满分
|
||
health_score += profitability_score * 0.4
|
||
|
||
return {
|
||
'current_ratio': current_ratio,
|
||
'quick_ratio': quick_ratio,
|
||
'debt_to_equity': debt_to_equity,
|
||
'debt_ratio': debt_ratio,
|
||
'roe': roe,
|
||
'roa': roa,
|
||
'profit_margin': profit_margin,
|
||
'operating_margin': operating_margin,
|
||
'health_score': min(100, max(0, health_score)),
|
||
'is_healthy': (current_ratio >= self.criteria['min_current_ratio'] and
|
||
debt_ratio <= self.criteria['max_debt_ratio'] and
|
||
roe >= self.criteria['min_roe'])
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"计算财务健康度失败: {e}")
|
||
return {'health_score': 0, 'is_healthy': False}
|
||
|
||
def calculate_growth_metrics(self, data: Dict) -> Dict:
|
||
"""计算成长性指标"""
|
||
try:
|
||
key_metrics = data.get('key_metrics', {})
|
||
financial_statements = data.get('financial_statements', {})
|
||
|
||
# 收入增长率
|
||
revenue_growth = key_metrics.get('revenue_growth', 0)
|
||
earnings_growth = key_metrics.get('earnings_growth', 0)
|
||
|
||
# 计算历史增长率
|
||
historical_growth = self._calculate_historical_growth(financial_statements)
|
||
|
||
# 成长性评分
|
||
growth_score = 0
|
||
|
||
# 收入增长率评分
|
||
if revenue_growth > 0:
|
||
revenue_score = min(100, revenue_growth * 100)
|
||
growth_score += revenue_score * 0.4
|
||
|
||
# 利润增长率评分
|
||
if earnings_growth > 0:
|
||
earnings_score = min(100, earnings_growth * 100)
|
||
growth_score += earnings_score * 0.4
|
||
|
||
# 历史增长率评分
|
||
if historical_growth > 0:
|
||
historical_score = min(100, historical_growth * 100)
|
||
growth_score += historical_score * 0.2
|
||
|
||
return {
|
||
'revenue_growth': revenue_growth,
|
||
'earnings_growth': earnings_growth,
|
||
'historical_growth': historical_growth,
|
||
'growth_score': min(100, max(0, growth_score)),
|
||
'is_growing': revenue_growth > 0.05 and earnings_growth > 0.05 # 5%以上增长
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"计算成长性指标失败: {e}")
|
||
return {'growth_score': 0, 'is_growing': False}
|
||
|
||
def calculate_risk_metrics(self, data: Dict) -> Dict:
|
||
"""计算风险指标"""
|
||
try:
|
||
key_metrics = data.get('key_metrics', {})
|
||
stock_prices = data.get('stock_prices', pd.DataFrame())
|
||
|
||
# Beta系数
|
||
beta = key_metrics.get('beta', 1.0)
|
||
|
||
# 计算波动率
|
||
volatility = self._calculate_volatility(stock_prices)
|
||
|
||
# 计算最大回撤
|
||
max_drawdown = self._calculate_max_drawdown(stock_prices)
|
||
|
||
# 风险评分 (分数越高表示风险越低)
|
||
risk_score = 100
|
||
|
||
# Beta风险评分
|
||
if beta > 1.5:
|
||
risk_score -= (beta - 1.5) * 20
|
||
elif beta < 0.8:
|
||
risk_score -= (0.8 - beta) * 10
|
||
|
||
# 波动率风险评分
|
||
if volatility > 0.3: # 30%以上波动率
|
||
risk_score -= (volatility - 0.3) * 100
|
||
|
||
# 回撤风险评分
|
||
if max_drawdown > 0.2: # 20%以上回撤
|
||
risk_score -= (max_drawdown - 0.2) * 100
|
||
|
||
return {
|
||
'beta': beta,
|
||
'volatility': volatility,
|
||
'max_drawdown': max_drawdown,
|
||
'risk_score': max(0, min(100, risk_score)),
|
||
'risk_level': self._get_risk_level(risk_score)
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"计算风险指标失败: {e}")
|
||
return {'risk_score': 50, 'risk_level': '中等'}
|
||
|
||
def perform_dcf_valuation(self, data: Dict) -> Dict:
|
||
"""执行DCF估值分析"""
|
||
try:
|
||
financial_statements = data.get('financial_statements', {})
|
||
key_metrics = data.get('key_metrics', {})
|
||
|
||
# 获取最新财务数据
|
||
latest_financial = self._get_latest_financial_data(financial_statements)
|
||
|
||
if not latest_financial:
|
||
return {'dcf_value': 0, 'dcf_score': 0}
|
||
|
||
# 基础参数
|
||
revenue = latest_financial.get('revenue', 0)
|
||
net_income = latest_financial.get('net_income', 0)
|
||
total_assets = latest_financial.get('total_assets', 0)
|
||
|
||
if revenue <= 0 or net_income <= 0:
|
||
return {'dcf_value': 0, 'dcf_score': 0}
|
||
|
||
# 假设参数
|
||
growth_rate = min(0.1, max(0.02, key_metrics.get('revenue_growth', 0.05))) # 2%-10%
|
||
discount_rate = 0.1 # 10%折现率
|
||
terminal_growth_rate = 0.03 # 3%永续增长率
|
||
|
||
# 计算自由现金流 (简化版)
|
||
fcf = net_income * 0.8 # 假设80%的净利润为自由现金流
|
||
|
||
# 5年预测
|
||
years = 5
|
||
dcf_value = 0
|
||
|
||
for year in range(1, years + 1):
|
||
future_fcf = fcf * ((1 + growth_rate) ** year)
|
||
discounted_fcf = future_fcf / ((1 + discount_rate) ** year)
|
||
dcf_value += discounted_fcf
|
||
|
||
# 终值计算
|
||
terminal_fcf = fcf * ((1 + growth_rate) ** years) * (1 + terminal_growth_rate)
|
||
terminal_value = terminal_fcf / (discount_rate - terminal_growth_rate)
|
||
discounted_terminal_value = terminal_value / ((1 + discount_rate) ** years)
|
||
|
||
total_dcf_value = dcf_value + discounted_terminal_value
|
||
|
||
# 获取当前股价进行对比
|
||
current_price = self._get_current_price(data)
|
||
if current_price > 0:
|
||
dcf_score = min(100, (total_dcf_value / current_price) * 50) # 50%以上溢价为满分
|
||
else:
|
||
dcf_score = 0
|
||
|
||
return {
|
||
'dcf_value': total_dcf_value,
|
||
'current_price': current_price,
|
||
'dcf_score': dcf_score,
|
||
'is_undervalued_dcf': total_dcf_value > current_price if current_price > 0 else False
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"DCF估值分析失败: {e}")
|
||
return {'dcf_value': 0, 'dcf_score': 0}
|
||
|
||
def generate_investment_recommendation(self, analysis_results: Dict) -> Dict:
|
||
"""生成投资建议"""
|
||
try:
|
||
valuation_score = analysis_results.get('valuation_score', 0)
|
||
health_score = analysis_results.get('financial_health_score', 0)
|
||
growth_score = analysis_results.get('growth_score', 0)
|
||
risk_score = analysis_results.get('risk_score', 0)
|
||
|
||
# 计算综合评分
|
||
overall_score = (
|
||
valuation_score * 0.4 +
|
||
health_score * 0.3 +
|
||
growth_score * 0.2 +
|
||
risk_score * 0.1
|
||
)
|
||
|
||
# 生成建议
|
||
if overall_score >= 80:
|
||
recommendation = "强烈买入"
|
||
confidence = "高"
|
||
elif overall_score >= 65:
|
||
recommendation = "买入"
|
||
confidence = "中高"
|
||
elif overall_score >= 50:
|
||
recommendation = "持有"
|
||
confidence = "中等"
|
||
elif overall_score >= 35:
|
||
recommendation = "观望"
|
||
confidence = "中低"
|
||
else:
|
||
recommendation = "卖出"
|
||
confidence = "高"
|
||
|
||
# 风险提示
|
||
risk_warnings = []
|
||
if risk_score < 40:
|
||
risk_warnings.append("高风险投资")
|
||
if health_score < 50:
|
||
risk_warnings.append("财务健康度较低")
|
||
if growth_score < 30:
|
||
risk_warnings.append("成长性不足")
|
||
|
||
return {
|
||
'recommendation': recommendation,
|
||
'confidence': confidence,
|
||
'overall_score': overall_score,
|
||
'risk_warnings': risk_warnings,
|
||
'key_strengths': self._get_key_strengths(analysis_results),
|
||
'key_concerns': self._get_key_concerns(analysis_results)
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"生成投资建议失败: {e}")
|
||
return {'recommendation': '无法分析', 'overall_score': 0}
|
||
|
||
def _get_latest_financial_data(self, financial_statements: Dict) -> Dict:
|
||
"""获取最新财务数据"""
|
||
if not financial_statements:
|
||
return {}
|
||
|
||
# 按年份排序,获取最新的数据
|
||
sorted_periods = sorted(financial_statements.keys(), reverse=True)
|
||
return financial_statements.get(sorted_periods[0], {})
|
||
|
||
def _calculate_debt_ratio(self, financial_data: Dict) -> float:
|
||
"""计算债务比率"""
|
||
total_liabilities = financial_data.get('total_liabilities', 0)
|
||
total_assets = financial_data.get('total_assets', 0)
|
||
|
||
if total_assets > 0:
|
||
return total_liabilities / total_assets
|
||
return 0
|
||
|
||
def _calculate_historical_growth(self, financial_statements: Dict) -> float:
|
||
"""计算历史增长率"""
|
||
if len(financial_statements) < 2:
|
||
return 0
|
||
|
||
# 获取最近两年的收入数据
|
||
sorted_periods = sorted(financial_statements.keys(), reverse=True)
|
||
if len(sorted_periods) < 2:
|
||
return 0
|
||
|
||
current_revenue = financial_statements[sorted_periods[0]].get('revenue', 0)
|
||
previous_revenue = financial_statements[sorted_periods[1]].get('revenue', 0)
|
||
|
||
if previous_revenue > 0:
|
||
return (current_revenue - previous_revenue) / previous_revenue
|
||
return 0
|
||
|
||
def _calculate_volatility(self, stock_prices: pd.DataFrame) -> float:
|
||
"""计算波动率"""
|
||
if stock_prices.empty or 'close' not in stock_prices.columns:
|
||
return 0
|
||
|
||
returns = stock_prices['close'].pct_change().dropna()
|
||
return returns.std() * np.sqrt(252) # 年化波动率
|
||
|
||
def _calculate_max_drawdown(self, stock_prices: pd.DataFrame) -> float:
|
||
"""计算最大回撤"""
|
||
if stock_prices.empty or 'close' not in stock_prices.columns:
|
||
return 0
|
||
|
||
prices = stock_prices['close']
|
||
peak = prices.expanding().max()
|
||
drawdown = (prices - peak) / peak
|
||
return abs(drawdown.min())
|
||
|
||
def _get_risk_level(self, risk_score: float) -> str:
|
||
"""获取风险等级"""
|
||
if risk_score >= 80:
|
||
return "低"
|
||
elif risk_score >= 60:
|
||
return "中低"
|
||
elif risk_score >= 40:
|
||
return "中等"
|
||
elif risk_score >= 20:
|
||
return "中高"
|
||
else:
|
||
return "高"
|
||
|
||
def _get_current_price(self, data: Dict) -> float:
|
||
"""获取当前股价"""
|
||
stock_prices = data.get('stock_prices', pd.DataFrame())
|
||
if not stock_prices.empty and 'close' in stock_prices.columns:
|
||
return stock_prices['close'].iloc[-1]
|
||
return 0
|
||
|
||
def _get_key_strengths(self, analysis_results: Dict) -> List[str]:
|
||
"""获取关键优势"""
|
||
strengths = []
|
||
|
||
if analysis_results.get('valuation_score', 0) > 70:
|
||
strengths.append("估值吸引力高")
|
||
if analysis_results.get('financial_health_score', 0) > 70:
|
||
strengths.append("财务健康度良好")
|
||
if analysis_results.get('growth_score', 0) > 70:
|
||
strengths.append("成长性优秀")
|
||
if analysis_results.get('risk_score', 0) > 70:
|
||
strengths.append("风险控制良好")
|
||
|
||
return strengths
|
||
|
||
def _get_key_concerns(self, analysis_results: Dict) -> List[str]:
|
||
"""获取关键担忧"""
|
||
concerns = []
|
||
|
||
if analysis_results.get('valuation_score', 0) < 40:
|
||
concerns.append("估值偏高")
|
||
if analysis_results.get('financial_health_score', 0) < 40:
|
||
concerns.append("财务健康度不佳")
|
||
if analysis_results.get('growth_score', 0) < 40:
|
||
concerns.append("成长性不足")
|
||
if analysis_results.get('risk_score', 0) < 40:
|
||
concerns.append("风险较高")
|
||
|
||
return concerns |