83 lines
2.6 KiB
Python
83 lines
2.6 KiB
Python
"""DataRouter — routes data requests to the optimal source.
|
|
|
|
Architecture:
|
|
Agent needs data → DataRouter → best available source → standardized result
|
|
|
|
Sources are tried in priority order. If a source fails or is unavailable,
|
|
the router falls back to the next source.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import Any
|
|
|
|
from .sources.base import DataSource, DataResult
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DataRouter:
|
|
"""Routes data queries to the best available source.
|
|
|
|
Sources are registered with a priority (lower = higher priority).
|
|
For each query, the router tries sources in priority order until one succeeds.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.sources: list[tuple[int, DataSource]] = []
|
|
|
|
def register(self, source: DataSource, priority: int = 100) -> "DataRouter":
|
|
self.sources.append((priority, source))
|
|
self.sources.sort(key=lambda x: x[0])
|
|
logger.info(f"[data_router] registered {source.name} (priority={priority})")
|
|
return self
|
|
|
|
async def query(
|
|
self,
|
|
query: str,
|
|
data_type: str = "general",
|
|
country: str | None = None,
|
|
**kwargs,
|
|
) -> DataResult:
|
|
"""Query data sources in priority order.
|
|
|
|
Args:
|
|
query: Natural language or structured data query
|
|
data_type: One of: macro, industry, company, trade, patent, general
|
|
country: ISO country code (CN, US, etc.) or None for global
|
|
**kwargs: Source-specific parameters
|
|
"""
|
|
errors = []
|
|
|
|
for priority, source in self.sources:
|
|
if not source.supports(data_type, country):
|
|
continue
|
|
|
|
try:
|
|
logger.info(f"[data_router] trying {source.name} for '{query[:50]}...'")
|
|
result = await source.fetch(query, data_type=data_type, country=country, **kwargs)
|
|
if result.data:
|
|
logger.info(f"[data_router] {source.name} returned {len(str(result.data))} chars")
|
|
return result
|
|
except Exception as e:
|
|
errors.append(f"{source.name}: {e}")
|
|
logger.warning(f"[data_router] {source.name} failed: {e}")
|
|
|
|
# All sources failed
|
|
return DataResult(
|
|
source="none",
|
|
data=None,
|
|
error=f"All sources failed: {'; '.join(errors)}",
|
|
)
|
|
|
|
async def query_multiple(
|
|
self,
|
|
queries: list[dict[str, Any]],
|
|
) -> list[DataResult]:
|
|
"""Run multiple queries (can be parallelized later)."""
|
|
import asyncio
|
|
return await asyncio.gather(*[
|
|
self.query(**q) for q in queries
|
|
])
|