Files
20260327-c863ce53/app/middleware/compliance.py
2026-04-25 19:25:22 +08:00

62 lines
2.0 KiB
Python

"""Compliance middleware — checks for sensitive data leakage in output."""
from __future__ import annotations
import json
import logging
import re
from app.graph.state import ReportState
from .base import Middleware
logger = logging.getLogger(__name__)
# Patterns that suggest sensitive data
SENSITIVE_PATTERNS = [
(r"\b\d{15,18}\b", "可能的身份证号"),
(r"\b\d{16,19}\b", "可能的银行卡号"),
(r"\b1[3-9]\d{9}\b", "可能的手机号"),
(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", "邮箱地址"),
(r"(?:密码|password|secret|token|api.?key)\s*[:=]\s*\S+", "可能的凭证信息"),
]
class ComplianceMiddleware(Middleware):
"""Scans report output for sensitive data patterns.
After graph execution, scans the draft for PII / credential patterns.
Logs warnings but does not block (can be made blocking later).
"""
name = "compliance"
def _scan_text(self, text: str) -> list[dict]:
findings = []
for pattern, desc in SENSITIVE_PATTERNS:
matches = re.findall(pattern, text)
if matches:
findings.append({
"type": desc,
"count": len(matches),
"samples": [m[:8] + "..." for m in matches[:3]],
})
return findings
async def after(self, state: ReportState) -> ReportState:
# Scan draft content
all_text = json.dumps(state.draft, ensure_ascii=False)
findings = self._scan_text(all_text)
if findings:
logger.warning(
f"[compliance] found {len(findings)} sensitive data patterns:"
)
for f in findings:
logger.warning(f" - {f['type']}: {f['count']} occurrences")
# Store in state for API to surface
state.review.setdefault("compliance_warnings", findings)
else:
logger.info("[compliance] no sensitive data patterns detected")
return state