""" 数据收集模块 - 从各种API获取股票和财务数据 """ import yfinance as yf import pandas as pd import requests from typing import Dict, List, Optional, Tuple import time from datetime import datetime, timedelta import logging # 设置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class StockDataCollector: def __init__(self): self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) def get_company_info(self, symbol: str) -> Dict: """获取公司基本信息""" try: ticker = yf.Ticker(symbol) info = ticker.info return { 'symbol': symbol, 'name': info.get('longName', ''), 'sector': info.get('sector', ''), 'industry': info.get('industry', ''), 'market_cap': info.get('marketCap', 0), 'employees': info.get('fullTimeEmployees', 0), 'website': info.get('website', ''), 'description': info.get('longBusinessSummary', ''), 'country': info.get('country', ''), 'currency': info.get('currency', 'USD') } except Exception as e: logger.error(f"获取公司信息失败 {symbol}: {e}") return {} def get_stock_prices(self, symbol: str, period: str = "1y") -> pd.DataFrame: """获取股价数据""" try: ticker = yf.Ticker(symbol) data = ticker.history(period=period) if data.empty: logger.warning(f"未找到股价数据: {symbol}") return pd.DataFrame() # 重命名列以匹配数据库结构 data = data.reset_index() data.columns = ['date', 'open', 'high', 'low', 'close', 'volume', 'dividends', 'stock_splits'] data = data.drop(['dividends', 'stock_splits'], axis=1) return data except Exception as e: logger.error(f"获取股价数据失败 {symbol}: {e}") return pd.DataFrame() def get_financial_statements(self, symbol: str) -> Dict: """获取财务报表数据""" try: ticker = yf.Ticker(symbol) # 获取季度和年度财务数据 quarterly_data = {} annual_data = {} # 季度数据 try: quarterly_financials = ticker.quarterly_financials if not quarterly_financials.empty: for i, (date, row) in enumerate(quarterly_financials.iterrows()): quarterly_data[f"Q{i+1}_{date.year}"] = { 'year': date.year, 'quarter': (i % 4) + 1, 'revenue': row.get('Total Revenue', 0), 'net_income': row.get('Net Income', 0), 'total_assets': row.get('Total Assets', 0), 'total_liabilities': row.get('Total Liabilities', 0), 'shareholders_equity': row.get('Stockholders Equity', 0), 'cash': row.get('Cash And Cash Equivalents', 0), 'debt': row.get('Total Debt', 0) } except Exception as e: logger.warning(f"获取季度财务数据失败 {symbol}: {e}") # 年度数据 try: annual_financials = ticker.financials if not annual_financials.empty: for i, (date, row) in enumerate(annual_financials.iterrows()): annual_data[f"Annual_{date.year}"] = { 'year': date.year, 'quarter': 0, 'revenue': row.get('Total Revenue', 0), 'net_income': row.get('Net Income', 0), 'total_assets': row.get('Total Assets', 0), 'total_liabilities': row.get('Total Liabilities', 0), 'shareholders_equity': row.get('Stockholders Equity', 0), 'cash': row.get('Cash And Cash Equivalents', 0), 'debt': row.get('Total Debt', 0) } except Exception as e: logger.warning(f"获取年度财务数据失败 {symbol}: {e}") return {**quarterly_data, **annual_data} except Exception as e: logger.error(f"获取财务数据失败 {symbol}: {e}") return {} def get_key_metrics(self, symbol: str) -> Dict: """获取关键财务指标""" try: ticker = yf.Ticker(symbol) info = ticker.info return { 'pe_ratio': info.get('trailingPE', 0), 'pb_ratio': info.get('priceToBook', 0), 'ps_ratio': info.get('priceToSalesTrailing12Months', 0), 'peg_ratio': info.get('pegRatio', 0), 'debt_to_equity': info.get('debtToEquity', 0), 'current_ratio': info.get('currentRatio', 0), 'quick_ratio': info.get('quickRatio', 0), 'return_on_equity': info.get('returnOnEquity', 0), 'return_on_assets': info.get('returnOnAssets', 0), 'profit_margin': info.get('profitMargins', 0), 'operating_margin': info.get('operatingMargins', 0), 'revenue_growth': info.get('revenueGrowth', 0), 'earnings_growth': info.get('earningsGrowth', 0), 'beta': info.get('beta', 0), 'dividend_yield': info.get('dividendYield', 0), 'payout_ratio': info.get('payoutRatio', 0) } except Exception as e: logger.error(f"获取关键指标失败 {symbol}: {e}") return {} def get_analyst_recommendations(self, symbol: str) -> Dict: """获取分析师推荐""" try: ticker = yf.Ticker(symbol) recommendations = ticker.recommendations if recommendations is None or recommendations.empty: return {} # 获取最新的推荐 latest_rec = recommendations.iloc[-1] if not recommendations.empty else None return { 'latest_recommendation': latest_rec.get('To Grade', '') if latest_rec is not None else '', 'latest_firm': latest_rec.get('Firm', '') if latest_rec is not None else '', 'latest_date': latest_rec.get('Date', '') if latest_rec is not None else '', 'total_recommendations': len(recommendations) } except Exception as e: logger.warning(f"获取分析师推荐失败 {symbol}: {e}") return {} def get_news_sentiment(self, symbol: str) -> Dict: """获取新闻情绪分析(简化版)""" try: ticker = yf.Ticker(symbol) news = ticker.news if not news: return {'sentiment_score': 0, 'news_count': 0} # 简单的情绪分析(实际应用中可以使用更复杂的NLP模型) positive_keywords = ['growth', 'profit', 'increase', 'strong', 'positive', 'beat', 'exceed'] negative_keywords = ['loss', 'decline', 'weak', 'negative', 'miss', 'fall', 'drop'] sentiment_score = 0 for article in news[:10]: # 只分析最近10条新闻 title = article.get('title', '').lower() summary = article.get('summary', '').lower() text = title + ' ' + summary positive_count = sum(1 for word in positive_keywords if word in text) negative_count = sum(1 for word in negative_keywords if word in text) sentiment_score += (positive_count - negative_count) return { 'sentiment_score': sentiment_score, 'news_count': len(news), 'recent_news': news[:5] # 最近5条新闻 } except Exception as e: logger.warning(f"获取新闻情绪失败 {symbol}: {e}") return {'sentiment_score': 0, 'news_count': 0} def collect_all_data(self, symbol: str) -> Dict: """收集所有相关数据""" logger.info(f"开始收集数据: {symbol}") all_data = { 'symbol': symbol, 'collection_time': datetime.now().isoformat(), 'company_info': self.get_company_info(symbol), 'stock_prices': self.get_stock_prices(symbol), 'financial_statements': self.get_financial_statements(symbol), 'key_metrics': self.get_key_metrics(symbol), 'analyst_recommendations': self.get_analyst_recommendations(symbol), 'news_sentiment': self.get_news_sentiment(symbol) } logger.info(f"数据收集完成: {symbol}") return all_data