"""Compliance middleware — checks for sensitive data leakage in output.""" from __future__ import annotations import json import logging import re from app.graph.state import ReportState from .base import Middleware logger = logging.getLogger(__name__) # Patterns that suggest sensitive data SENSITIVE_PATTERNS = [ (r"\b\d{15,18}\b", "可能的身份证号"), (r"\b\d{16,19}\b", "可能的银行卡号"), (r"\b1[3-9]\d{9}\b", "可能的手机号"), (r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", "邮箱地址"), (r"(?:密码|password|secret|token|api.?key)\s*[:=]\s*\S+", "可能的凭证信息"), ] class ComplianceMiddleware(Middleware): """Scans report output for sensitive data patterns. After graph execution, scans the draft for PII / credential patterns. Logs warnings but does not block (can be made blocking later). """ name = "compliance" def _scan_text(self, text: str) -> list[dict]: findings = [] for pattern, desc in SENSITIVE_PATTERNS: matches = re.findall(pattern, text) if matches: findings.append({ "type": desc, "count": len(matches), "samples": [m[:8] + "..." for m in matches[:3]], }) return findings async def after(self, state: ReportState) -> ReportState: # Scan draft content all_text = json.dumps(state.draft, ensure_ascii=False) findings = self._scan_text(all_text) if findings: logger.warning( f"[compliance] found {len(findings)} sensitive data patterns:" ) for f in findings: logger.warning(f" - {f['type']}: {f['count']} occurrences") # Store in state for API to surface state.review.setdefault("compliance_warnings", findings) else: logger.info("[compliance] no sensitive data patterns detected") return state