330 lines
13 KiB
Python
330 lines
13 KiB
Python
"""
|
|
美股低价值公司分析系统 - 主程序
|
|
"""
|
|
import sys
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Dict, Optional
|
|
|
|
# 导入自定义模块
|
|
from data_collector import StockDataCollector
|
|
from analysis_engine import StockAnalyzer
|
|
from report_generator import ReportGenerator
|
|
from database import StockDatabase
|
|
from analysis_storage import AnalysisStorage
|
|
from config import LOW_VALUE_CRITERIA
|
|
|
|
# 设置日志
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler('stock_analysis.log', encoding='utf-8'),
|
|
logging.StreamHandler(sys.stdout)
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class StockAnalysisSystem:
|
|
def __init__(self):
|
|
self.data_collector = StockDataCollector()
|
|
self.analyzer = StockAnalyzer()
|
|
self.report_generator = ReportGenerator()
|
|
self.database = StockDatabase()
|
|
self.storage = AnalysisStorage()
|
|
|
|
def analyze_stock(self, symbol: str, force_refresh: bool = False) -> Dict:
|
|
"""
|
|
分析单只股票
|
|
|
|
Args:
|
|
symbol: 股票代码
|
|
force_refresh: 是否强制刷新数据
|
|
|
|
Returns:
|
|
分析结果字典
|
|
"""
|
|
try:
|
|
logger.info(f"开始分析股票: {symbol}")
|
|
|
|
# 检查是否需要刷新数据
|
|
if not force_refresh:
|
|
latest_analysis = self.database.get_latest_analysis(symbol)
|
|
if latest_analysis:
|
|
# 检查分析时间是否在24小时内
|
|
analysis_time = datetime.fromisoformat(latest_analysis['analysis_date'])
|
|
if (datetime.now() - analysis_time).total_seconds() < 86400: # 24小时
|
|
logger.info(f"使用缓存的分析结果: {symbol}")
|
|
return latest_analysis
|
|
|
|
# 1. 数据收集阶段
|
|
logger.info(f"正在收集数据: {symbol}")
|
|
raw_data = self.data_collector.collect_all_data(symbol)
|
|
|
|
if not raw_data.get('company_info'):
|
|
logger.error(f"无法获取公司信息: {symbol}")
|
|
return {'error': '无法获取公司信息'}
|
|
|
|
# 保存数据到数据库
|
|
self.database.save_company_info(symbol, raw_data['company_info'])
|
|
if not raw_data['stock_prices'].empty:
|
|
self.database.save_stock_prices(symbol, raw_data['stock_prices'])
|
|
if raw_data['financial_statements']:
|
|
self.database.save_financial_data(symbol, raw_data['financial_statements'])
|
|
|
|
# 2. 分析阶段
|
|
logger.info(f"正在进行深度分析: {symbol}")
|
|
|
|
# 估值分析
|
|
valuation_results = self.analyzer.calculate_valuation_metrics(raw_data)
|
|
|
|
# 财务健康度分析
|
|
health_results = self.analyzer.calculate_financial_health(raw_data)
|
|
|
|
# 成长性分析
|
|
growth_results = self.analyzer.calculate_growth_metrics(raw_data)
|
|
|
|
# 风险分析
|
|
risk_results = self.analyzer.calculate_risk_metrics(raw_data)
|
|
|
|
# DCF估值
|
|
dcf_results = self.analyzer.perform_dcf_valuation(raw_data)
|
|
|
|
# 生成投资建议
|
|
investment_recommendation = self.analyzer.generate_investment_recommendation({
|
|
'valuation_score': valuation_results.get('valuation_score', 0),
|
|
'financial_health_score': health_results.get('health_score', 0),
|
|
'growth_score': growth_results.get('growth_score', 0),
|
|
'risk_score': risk_results.get('risk_score', 0)
|
|
})
|
|
|
|
# 整合分析结果
|
|
analysis_results = {
|
|
'symbol': symbol,
|
|
'analysis_date': datetime.now().isoformat(),
|
|
'valuation_metrics': valuation_results,
|
|
'financial_health': health_results,
|
|
'growth_metrics': growth_results,
|
|
'risk_metrics': risk_results,
|
|
'dcf_analysis': dcf_results,
|
|
'investment_recommendation': investment_recommendation,
|
|
'overall_score': investment_recommendation.get('overall_score', 0),
|
|
'recommendation': investment_recommendation.get('recommendation', 'N/A')
|
|
}
|
|
|
|
# 保存分析结果到数据库
|
|
self.database.save_analysis_result(symbol, analysis_results)
|
|
|
|
# 3. 报告生成阶段
|
|
logger.info(f"正在生成报告: {symbol}")
|
|
|
|
# 生成详细报告
|
|
report_file = self.report_generator.generate_comprehensive_report(
|
|
symbol, raw_data, analysis_results
|
|
)
|
|
|
|
# 生成简要报告
|
|
summary_file = self.report_generator.generate_summary_report(
|
|
symbol, analysis_results
|
|
)
|
|
|
|
# 4. 保存标准化分析报告
|
|
logger.info(f"正在保存分析报告: {symbol}")
|
|
storage_result = self.storage.save_analysis_report(symbol, {
|
|
'company_info': raw_data.get('company_info', {}),
|
|
'financial_data': raw_data.get('financial_statements', {}),
|
|
'valuation_analysis': analysis_results.get('valuation_metrics', {}),
|
|
'financial_health': analysis_results.get('financial_health', {}),
|
|
'growth_analysis': analysis_results.get('growth_metrics', {}),
|
|
'risk_analysis': analysis_results.get('risk_metrics', {}),
|
|
'investment_recommendation': analysis_results.get('investment_recommendation', {}),
|
|
'analyst_opinions': raw_data.get('analyst_recommendations', {}),
|
|
'market_data': raw_data.get('stock_prices', {}),
|
|
'raw_data': raw_data
|
|
})
|
|
|
|
analysis_results['report_file'] = report_file
|
|
analysis_results['summary_file'] = summary_file
|
|
analysis_results['storage_files'] = storage_result
|
|
|
|
logger.info(f"分析完成: {symbol}")
|
|
return analysis_results
|
|
|
|
except Exception as e:
|
|
logger.error(f"分析股票失败 {symbol}: {e}")
|
|
return {'error': str(e)}
|
|
|
|
def quick_screening(self, symbol: str) -> Dict:
|
|
"""
|
|
快速筛选 - 检查是否符合低价值投资标准
|
|
|
|
Args:
|
|
symbol: 股票代码
|
|
|
|
Returns:
|
|
筛选结果
|
|
"""
|
|
try:
|
|
logger.info(f"快速筛选: {symbol}")
|
|
|
|
# 获取基础数据
|
|
raw_data = self.data_collector.collect_all_data(symbol)
|
|
|
|
if not raw_data.get('company_info'):
|
|
return {'error': '无法获取公司信息', 'passes_screening': False}
|
|
|
|
# 基础筛选条件
|
|
company_info = raw_data['company_info']
|
|
key_metrics = raw_data.get('key_metrics', {})
|
|
|
|
screening_results = {
|
|
'symbol': symbol,
|
|
'company_name': company_info.get('name', ''),
|
|
'market_cap': company_info.get('market_cap', 0),
|
|
'passes_screening': True,
|
|
'criteria_met': [],
|
|
'criteria_failed': []
|
|
}
|
|
|
|
# 检查市值
|
|
if company_info.get('market_cap', 0) >= LOW_VALUE_CRITERIA['min_market_cap']:
|
|
screening_results['criteria_met'].append('市值符合要求')
|
|
else:
|
|
screening_results['criteria_failed'].append('市值过小')
|
|
screening_results['passes_screening'] = False
|
|
|
|
# 检查PE比率
|
|
pe_ratio = key_metrics.get('pe_ratio', 0)
|
|
if 0 < pe_ratio <= LOW_VALUE_CRITERIA['max_pe_ratio']:
|
|
screening_results['criteria_met'].append(f'PE比率合理 ({pe_ratio:.2f})')
|
|
elif pe_ratio > LOW_VALUE_CRITERIA['max_pe_ratio']:
|
|
screening_results['criteria_failed'].append(f'PE比率过高 ({pe_ratio:.2f})')
|
|
screening_results['passes_screening'] = False
|
|
|
|
# 检查PB比率
|
|
pb_ratio = key_metrics.get('pb_ratio', 0)
|
|
if 0 < pb_ratio <= LOW_VALUE_CRITERIA['max_pb_ratio']:
|
|
screening_results['criteria_met'].append(f'PB比率合理 ({pb_ratio:.2f})')
|
|
elif pb_ratio > LOW_VALUE_CRITERIA['max_pb_ratio']:
|
|
screening_results['criteria_failed'].append(f'PB比率过高 ({pb_ratio:.2f})')
|
|
screening_results['passes_screening'] = False
|
|
|
|
# 检查PS比率
|
|
ps_ratio = key_metrics.get('ps_ratio', 0)
|
|
if 0 < ps_ratio <= LOW_VALUE_CRITERIA['max_ps_ratio']:
|
|
screening_results['criteria_met'].append(f'PS比率合理 ({ps_ratio:.2f})')
|
|
elif ps_ratio > LOW_VALUE_CRITERIA['max_ps_ratio']:
|
|
screening_results['criteria_failed'].append(f'PS比率过高 ({ps_ratio:.2f})')
|
|
screening_results['passes_screening'] = False
|
|
|
|
return screening_results
|
|
|
|
except Exception as e:
|
|
logger.error(f"快速筛选失败 {symbol}: {e}")
|
|
return {'error': str(e), 'passes_screening': False}
|
|
|
|
def batch_analysis(self, symbols: list) -> Dict:
|
|
"""
|
|
批量分析多只股票
|
|
|
|
Args:
|
|
symbols: 股票代码列表
|
|
|
|
Returns:
|
|
批量分析结果
|
|
"""
|
|
results = {}
|
|
|
|
for symbol in symbols:
|
|
logger.info(f"批量分析: {symbol}")
|
|
results[symbol] = self.analyze_stock(symbol)
|
|
|
|
return results
|
|
|
|
def main():
|
|
"""主函数 - 命令行接口"""
|
|
if len(sys.argv) < 2:
|
|
print("使用方法:")
|
|
print("python main.py <股票代码> [--refresh]")
|
|
print("python main.py --batch <股票代码1,股票代码2,...>")
|
|
print("python main.py --screen <股票代码>")
|
|
return
|
|
|
|
system = StockAnalysisSystem()
|
|
|
|
if sys.argv[1] == '--batch':
|
|
# 批量分析
|
|
if len(sys.argv) < 3:
|
|
print("请提供股票代码列表")
|
|
return
|
|
|
|
symbols = [s.strip() for s in sys.argv[2].split(',')]
|
|
results = system.batch_analysis(symbols)
|
|
|
|
print("\n=== 批量分析结果 ===")
|
|
for symbol, result in results.items():
|
|
if 'error' in result:
|
|
print(f"{symbol}: 分析失败 - {result['error']}")
|
|
else:
|
|
print(f"{symbol}: {result.get('recommendation', 'N/A')} (评分: {result.get('overall_score', 0):.1f})")
|
|
|
|
elif sys.argv[1] == '--screen':
|
|
# 快速筛选
|
|
if len(sys.argv) < 3:
|
|
print("请提供股票代码")
|
|
return
|
|
|
|
symbol = sys.argv[2].upper()
|
|
result = system.quick_screening(symbol)
|
|
|
|
print(f"\n=== {symbol} 快速筛选结果 ===")
|
|
if 'error' in result:
|
|
print(f"筛选失败: {result['error']}")
|
|
else:
|
|
print(f"公司名称: {result.get('company_name', 'N/A')}")
|
|
print(f"市值: ${result.get('market_cap', 0):,.0f}")
|
|
print(f"通过筛选: {'是' if result.get('passes_screening') else '否'}")
|
|
|
|
if result.get('criteria_met'):
|
|
print("符合条件:")
|
|
for criteria in result['criteria_met']:
|
|
print(f" ✓ {criteria}")
|
|
|
|
if result.get('criteria_failed'):
|
|
print("不符合条件:")
|
|
for criteria in result['criteria_failed']:
|
|
print(f" ✗ {criteria}")
|
|
|
|
else:
|
|
# 单只股票分析
|
|
symbol = sys.argv[1].upper()
|
|
force_refresh = '--refresh' in sys.argv
|
|
|
|
print(f"正在分析股票: {symbol}")
|
|
result = system.analyze_stock(symbol, force_refresh)
|
|
|
|
if 'error' in result:
|
|
print(f"分析失败: {result['error']}")
|
|
else:
|
|
print(f"\n=== {symbol} 分析结果 ===")
|
|
print(f"投资建议: {result.get('recommendation', 'N/A')}")
|
|
print(f"综合评分: {result.get('overall_score', 0):.1f}/100")
|
|
|
|
recommendation = result.get('investment_recommendation', {})
|
|
if recommendation.get('key_strengths'):
|
|
print("关键优势:")
|
|
for strength in recommendation['key_strengths']:
|
|
print(f" • {strength}")
|
|
|
|
if recommendation.get('key_concerns'):
|
|
print("关键担忧:")
|
|
for concern in recommendation['key_concerns']:
|
|
print(f" • {concern}")
|
|
|
|
if result.get('report_file'):
|
|
print(f"\n详细报告已生成: {result['report_file']}")
|
|
if result.get('summary_file'):
|
|
print(f"简要报告已生成: {result['summary_file']}")
|
|
|
|
if __name__ == "__main__":
|
|
main() |