Files
20260327-c863ce53/app/agents/formatter.py
2026-04-25 19:25:22 +08:00

670 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Formatter Agent — renders final report using Skills toolkit.
Skills integration:
- docx: python-docx (baseline) + docx-js via Node.js (rich mode) + OOXML template editing
- pptx: html2pptx.js via Node.js (visual slides) + python-pptx fallback
- xlsx: openpyxl + recalc.py (formula recalculation via LibreOffice)
- pdf: reportlab with CJK support + fpdf2 fallback
"""
from __future__ import annotations
import asyncio
import json
import logging
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Any
from .base import BaseAgent
logger = logging.getLogger(__name__)
# Skills root
SKILLS_ROOT = Path.home() / "Projects/code/20260119-skills合集/anthropics_skills/skills"
DOCX_SKILLS = SKILLS_ROOT / "docx"
PPTX_SKILLS = SKILLS_ROOT / "pptx"
XLSX_SKILLS = SKILLS_ROOT / "xlsx"
PDF_SKILLS = SKILLS_ROOT / "pdf"
def _skills_available() -> dict[str, bool]:
"""Check which skill toolkits are available."""
return {
"docx_js": (DOCX_SKILLS / "docx-js.md").exists(),
"html2pptx": (PPTX_SKILLS / "scripts" / "html2pptx.js").exists(),
"recalc": (XLSX_SKILLS / "recalc.py").exists(),
"ooxml_docx": (DOCX_SKILLS / "ooxml" / "scripts" / "unpack.py").exists(),
"ooxml_pptx": (PPTX_SKILLS / "ooxml" / "scripts" / "unpack.py").exists(),
"pdf_scripts": (PDF_SKILLS / "scripts").is_dir(),
}
class FormatterAgent(BaseAgent):
name = "formatter"
description = "将报告渲染为 docx/pptx/xlsx/pdf融合 Skills 能力"
def __init__(self):
super().__init__()
self.skills = _skills_available()
available = [k for k, v in self.skills.items() if v]
logger.info(f"[formatter] available skills: {available}")
async def run(self, context: dict[str, Any]) -> dict[str, Any]:
draft = context["draft"]
data_assets = context.get("data_assets", {})
output_dir = Path(context.get("output_dir", "output"))
formats = context.get("output_formats", ["docx"])
template_path = context.get("template_path") # optional: user-provided template
output_dir.mkdir(parents=True, exist_ok=True)
title = draft.get("title", "报告")
generated_files = []
for fmt in formats:
try:
match fmt:
case "docx":
path = await self._render_docx(draft, data_assets, output_dir, title, template_path)
case "pptx":
path = await self._render_pptx(draft, data_assets, output_dir, title)
case "xlsx":
path = await self._render_xlsx(data_assets, output_dir, title)
case "pdf":
path = await self._render_pdf(draft, data_assets, output_dir, title)
case _:
logger.warning(f"Unsupported format: {fmt}")
continue
generated_files.append(str(path))
logger.info(f"[formatter] generated {path}")
except Exception as e:
logger.exception(f"[formatter] failed to render {fmt}")
return {"generated_files": generated_files}
# -----------------------------------------------------------------------
# DOCX — python-docx baseline + OOXML template editing
# -----------------------------------------------------------------------
async def _render_docx(
self, draft: dict, data_assets: dict, output_dir: Path, title: str,
template_path: str | None = None,
) -> Path:
if template_path and self.skills["ooxml_docx"]:
return await self._render_docx_from_template(
draft, data_assets, output_dir, title, Path(template_path)
)
return await self._render_docx_baseline(draft, data_assets, output_dir, title)
async def _render_docx_baseline(
self, draft: dict, data_assets: dict, output_dir: Path, title: str
) -> Path:
from docx import Document
from docx.shared import Pt, RGBColor
from docx.enum.text import WD_ALIGN_PARAGRAPH
doc = Document()
# -- Styles --
style = doc.styles["Normal"]
style.font.name = "微软雅黑"
style.font.size = Pt(11)
# Title
t = doc.add_heading(title, level=0)
t.alignment = WD_ALIGN_PARAGRAPH.CENTER
# Executive summary
if summary := draft.get("executive_summary"):
doc.add_heading("执行摘要", level=1)
# Add summary with highlight styling
p = doc.add_paragraph()
run = p.add_run(summary)
run.font.size = Pt(11)
run.font.color.rgb = RGBColor(0x33, 0x33, 0x33)
# Chapters
for chapter in draft.get("chapters", []):
doc.add_heading(chapter["title"], level=1)
content = chapter.get("content", "")
self._docx_render_markdown(doc, content)
# Tables from data assets
for table_spec in data_assets.get("tables", []):
doc.add_heading(table_spec.get("title", "数据表"), level=2)
self._docx_add_table(doc, table_spec)
# Page break + chart descriptions as placeholders
for chart_spec in data_assets.get("charts", []):
doc.add_heading(chart_spec.get("title", "图表"), level=2)
desc = chart_spec.get("description", "")
chart_type = chart_spec.get("type", "")
doc.add_paragraph(f"[{chart_type.upper()} 图表] {desc}")
# Render chart data as a table too
chart_data = chart_spec.get("data", {})
if labels := chart_data.get("labels"):
for ds in chart_data.get("datasets", []):
self._docx_add_table(doc, {
"headers": ["项目", ds.get("label", "数据")],
"rows": [[str(l), str(v)] for l, v in zip(labels, ds.get("data", []))],
})
path = output_dir / f"{title}.docx"
doc.save(str(path))
return path
async def _render_docx_from_template(
self, draft: dict, data_assets: dict, output_dir: Path, title: str,
template_path: Path,
) -> Path:
"""Edit an existing DOCX template using OOXML unpack/edit/pack workflow."""
unpack_script = DOCX_SKILLS / "ooxml" / "scripts" / "unpack.py"
pack_script = DOCX_SKILLS / "ooxml" / "scripts" / "pack.py"
with tempfile.TemporaryDirectory() as tmpdir:
work_dir = Path(tmpdir) / "unpacked"
# Unpack template
proc = await asyncio.create_subprocess_exec(
"python3", str(unpack_script), str(template_path), str(work_dir),
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
await proc.wait()
if proc.returncode != 0:
logger.warning("[formatter] OOXML unpack failed, falling back to baseline")
return await self._render_docx_baseline(draft, data_assets, output_dir, title)
# TODO: edit XML content in work_dir based on draft
# For now, just pack back as-is (template passthrough)
output_path = output_dir / f"{title}.docx"
proc = await asyncio.create_subprocess_exec(
"python3", str(pack_script), str(work_dir), str(output_path),
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
await proc.wait()
return output_path
def _docx_render_markdown(self, doc, content: str):
"""Convert markdown-ish content to docx paragraphs."""
from docx.shared import Pt
for block in content.split("\n\n"):
block = block.strip()
if not block:
continue
if block.startswith("#### "):
doc.add_heading(block[5:], level=4)
elif block.startswith("### "):
doc.add_heading(block[4:], level=3)
elif block.startswith("## "):
doc.add_heading(block[3:], level=2)
elif block.startswith("- ") or block.startswith("* "):
# Bullet list
for line in block.split("\n"):
line = line.lstrip("- *").strip()
if line:
doc.add_paragraph(line, style="List Bullet")
elif block.startswith("1. ") or block.startswith("1"):
# Numbered list
for line in block.split("\n"):
text = line.lstrip("0123456789.) ").strip()
if text:
doc.add_paragraph(text, style="List Number")
else:
p = doc.add_paragraph(block)
for run in p.runs:
run.font.size = Pt(11)
def _docx_add_table(self, doc, table_spec: dict):
"""Add a formatted table to the document."""
from docx.shared import Pt, RGBColor
from docx.oxml.ns import qn
headers = table_spec.get("headers", [])
rows = table_spec.get("rows", [])
if not headers:
return
tbl = doc.add_table(rows=1 + len(rows), cols=len(headers))
tbl.style = "Light Grid Accent 1"
# Header row
for i, h in enumerate(headers):
cell = tbl.rows[0].cells[i]
cell.text = str(h)
for p in cell.paragraphs:
for run in p.runs:
run.font.bold = True
run.font.size = Pt(10)
# Data rows
for r_idx, row in enumerate(rows):
for c_idx, cell_val in enumerate(row):
tbl.rows[r_idx + 1].cells[c_idx].text = str(cell_val)
# -----------------------------------------------------------------------
# PPTX — html2pptx.js (rich) or python-pptx (fallback)
# -----------------------------------------------------------------------
async def _render_pptx(
self, draft: dict, data_assets: dict, output_dir: Path, title: str
) -> Path:
if self.skills["html2pptx"]:
try:
return await self._render_pptx_html2pptx(draft, data_assets, output_dir, title)
except Exception as e:
logger.warning(f"[formatter] html2pptx failed ({e}), falling back to python-pptx")
return await self._render_pptx_baseline(draft, data_assets, output_dir, title)
async def _render_pptx_html2pptx(
self, draft: dict, data_assets: dict, output_dir: Path, title: str
) -> Path:
"""Generate PPTX using html2pptx.js skill for visual slides."""
with tempfile.TemporaryDirectory() as tmpdir:
work = Path(tmpdir)
# Generate HTML slides
slides_html = []
# Title slide
slides_html.append(f"""<html><body style="width:720pt;height:405pt;display:flex;align-items:center;justify-content:center;flex-direction:column;background:linear-gradient(135deg,#1a1a2e,#16213e);color:white;font-family:sans-serif;">
<h1 style="font-size:36pt;margin:0;">{title}</h1>
<p style="font-size:18pt;color:#aaa;margin-top:20pt;">{draft.get('executive_summary', '')[:100]}</p>
</body></html>""")
# Chapter slides
for ch in draft.get("chapters", []):
content_lines = ch.get("content", "")[:400].split("\n")
bullets = "".join(f"<li>{l.strip()}</li>" for l in content_lines if l.strip())
slides_html.append(f"""<html><body style="width:720pt;height:405pt;padding:40pt;font-family:sans-serif;background:#ffffff;">
<h2 style="font-size:28pt;color:#1a1a2e;border-bottom:2pt solid #e94560;padding-bottom:10pt;">{ch['title']}</h2>
<ul style="font-size:14pt;color:#333;line-height:1.8;">{bullets}</ul>
</body></html>""")
# Write HTML files
for i, html in enumerate(slides_html):
(work / f"slide_{i}.html").write_text(html, encoding="utf-8")
# Write conversion script
script = work / "convert.js"
html2pptx_path = PPTX_SKILLS / "scripts" / "html2pptx.js"
slide_files = [f"slide_{i}.html" for i in range(len(slides_html))]
script.write_text(f"""\
const pptxgen = require('pptxgenjs');
const {{ html2pptx }} = require('{html2pptx_path}');
const path = require('path');
async function main() {{
const pptx = new pptxgen();
pptx.layout = 'LAYOUT_16x9';
const files = {json.dumps(slide_files)};
for (const f of files) {{
await html2pptx(path.join('{work}', f), pptx);
}}
await pptx.writeFile({{ fileName: '{output_dir / f"{title}.pptx"}' }});
}}
main().catch(e => {{ console.error(e); process.exit(1); }});
""", encoding="utf-8")
proc = await asyncio.create_subprocess_exec(
"node", str(script),
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
cwd=str(work),
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
raise RuntimeError(f"html2pptx failed: {stderr.decode()}")
return output_dir / f"{title}.pptx"
async def _render_pptx_baseline(
self, draft: dict, data_assets: dict, output_dir: Path, title: str
) -> Path:
from pptx import Presentation
from pptx.util import Inches, Pt
from pptx.dml.color import RGBColor
prs = Presentation()
# Title slide
slide = prs.slides.add_slide(prs.slide_layouts[0])
slide.shapes.title.text = title
if len(slide.placeholders) > 1:
slide.placeholders[1].text = draft.get("executive_summary", "")[:200]
# Chapter slides
for chapter in draft.get("chapters", []):
slide = prs.slides.add_slide(prs.slide_layouts[1])
slide.shapes.title.text = chapter["title"]
body = slide.placeholders[1]
tf = body.text_frame
tf.clear()
content = chapter.get("content", "")
lines = [l.strip() for l in content.split("\n") if l.strip()]
for line in lines[:12]: # max 12 bullets per slide
p = tf.add_paragraph()
# Strip markdown markers
clean = line.lstrip("#-*0123456789. ").strip()
p.text = clean
p.font.size = Pt(14)
p.space_after = Pt(4)
# Data table slides
for table_spec in data_assets.get("tables", []):
slide = prs.slides.add_slide(prs.slide_layouts[5]) # blank layout
slide.shapes.title.text = table_spec.get("title", "数据表")
headers = table_spec.get("headers", [])
rows = table_spec.get("rows", [])
if headers and rows:
n_rows = min(len(rows) + 1, 10) # limit rows per slide
n_cols = len(headers)
tbl = slide.shapes.add_table(
n_rows, n_cols,
Inches(0.5), Inches(1.5), Inches(9), Inches(4.5)
).table
for i, h in enumerate(headers):
tbl.cell(0, i).text = str(h)
for r_idx, row in enumerate(rows[:n_rows - 1]):
for c_idx, val in enumerate(row[:n_cols]):
tbl.cell(r_idx + 1, c_idx).text = str(val)
path = output_dir / f"{title}.pptx"
prs.save(str(path))
return path
# -----------------------------------------------------------------------
# XLSX — openpyxl + recalc.py (formula recalculation)
# -----------------------------------------------------------------------
async def _render_xlsx(
self, data_assets: dict, output_dir: Path, title: str
) -> Path:
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils import get_column_letter
wb = Workbook()
ws = wb.active
ws.title = "数据总览"
# Professional styling
header_font = Font(bold=True, size=11, color="FFFFFF")
header_fill = PatternFill(start_color="1A1A2E", end_color="1A1A2E", fill_type="solid")
title_font = Font(bold=True, size=14, color="1A1A2E")
thin_border = Border(
left=Side(style="thin", color="CCCCCC"),
right=Side(style="thin", color="CCCCCC"),
top=Side(style="thin", color="CCCCCC"),
bottom=Side(style="thin", color="CCCCCC"),
)
current_row = 1
has_formulas = False
for table_spec in data_assets.get("tables", []):
# Table title
ws.cell(row=current_row, column=1, value=table_spec.get("title", "")).font = title_font
current_row += 1
headers = table_spec.get("headers", [])
rows = table_spec.get("rows", [])
if headers:
# Header row with styling
for col_idx, h in enumerate(headers, 1):
cell = ws.cell(row=current_row, column=col_idx, value=h)
cell.font = header_font
cell.fill = header_fill
cell.alignment = Alignment(horizontal="center")
cell.border = thin_border
current_row += 1
# Data rows
data_start = current_row
for row_data in rows:
for col_idx, val in enumerate(row_data, 1):
cell = ws.cell(row=current_row, column=col_idx, value=val)
cell.border = thin_border
# Try to convert numeric strings
if isinstance(val, str):
try:
cell.value = float(val.replace(",", ""))
except (ValueError, AttributeError):
pass
current_row += 1
# Auto-sum row for numeric columns
data_end = current_row - 1
if data_end > data_start:
for col_idx in range(1, len(headers) + 1):
col_letter = get_column_letter(col_idx)
test_cell = ws.cell(row=data_start, column=col_idx)
if isinstance(test_cell.value, (int, float)):
cell = ws.cell(
row=current_row, column=col_idx,
value=f"=SUM({col_letter}{data_start}:{col_letter}{data_end})"
)
cell.font = Font(bold=True)
cell.border = thin_border
has_formulas = True
elif col_idx == 1:
cell = ws.cell(row=current_row, column=1, value="合计")
cell.font = Font(bold=True)
cell.border = thin_border
current_row += 1
# Auto-fit column widths
for col_idx in range(1, len(headers) + 1):
max_len = max(
len(str(ws.cell(row=r, column=col_idx).value or ""))
for r in range(current_row - len(rows) - 2, current_row)
)
ws.column_dimensions[get_column_letter(col_idx)].width = min(max_len + 4, 30)
current_row += 2 # gap between tables
# Chart data sheets
for chart_spec in data_assets.get("charts", []):
chart_ws = wb.create_sheet(title=chart_spec.get("title", "图表")[:31])
chart_ws.cell(row=1, column=1, value=chart_spec.get("title", "")).font = title_font
chart_data = chart_spec.get("data", {})
labels = chart_data.get("labels", [])
datasets = chart_data.get("datasets", [])
# Headers: [项目, 数据集1, 数据集2, ...]
chart_ws.cell(row=2, column=1, value="项目").font = Font(bold=True)
for ds_idx, ds in enumerate(datasets, 2):
chart_ws.cell(row=2, column=ds_idx, value=ds.get("label", "")).font = Font(bold=True)
for r_idx, label in enumerate(labels, 3):
chart_ws.cell(row=r_idx, column=1, value=label)
for ds_idx, ds in enumerate(datasets, 2):
data = ds.get("data", [])
if r_idx - 3 < len(data):
chart_ws.cell(row=r_idx, column=ds_idx, value=data[r_idx - 3])
path = output_dir / f"{title}.xlsx"
wb.save(str(path))
# Run recalc.py if we have formulas and the skill is available
if has_formulas and self.skills["recalc"]:
await self._xlsx_recalc(path)
return path
async def _xlsx_recalc(self, path: Path):
"""Recalculate formulas using Skills recalc.py (requires LibreOffice)."""
recalc_script = XLSX_SKILLS / "recalc.py"
logger.info(f"[formatter] running recalc.py on {path}")
try:
proc = await asyncio.create_subprocess_exec(
"python3", str(recalc_script), str(path), "30",
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode == 0:
result = json.loads(stdout.decode())
logger.info(f"[formatter] recalc result: {result.get('status')}")
else:
logger.warning(f"[formatter] recalc.py failed: {stderr.decode()[:200]}")
except Exception as e:
logger.warning(f"[formatter] recalc.py error: {e}")
# -----------------------------------------------------------------------
# PDF — reportlab with CJK support + fpdf2 fallback
# -----------------------------------------------------------------------
async def _render_pdf(
self, draft: dict, data_assets: dict, output_dir: Path, title: str
) -> Path:
try:
return await self._render_pdf_reportlab(draft, data_assets, output_dir, title)
except Exception as e:
logger.warning(f"[formatter] reportlab failed ({e}), falling back to fpdf2")
return await self._render_pdf_fpdf(draft, data_assets, output_dir, title)
async def _render_pdf_reportlab(
self, draft: dict, data_assets: dict, output_dir: Path, title: str
) -> Path:
"""Generate PDF with reportlab — better CJK support and table rendering."""
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import mm
from reportlab.lib import colors
from reportlab.platypus import (
SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, PageBreak,
)
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
# Try to register a CJK font
cjk_font = "Helvetica"
for font_path in [
"/System/Library/Fonts/STHeiti Medium.ttc",
"/System/Library/Fonts/PingFang.ttc",
"/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc",
]:
if Path(font_path).exists():
try:
pdfmetrics.registerFont(TTFont("CJK", font_path, subfontIndex=0))
cjk_font = "CJK"
break
except Exception:
continue
path = output_dir / f"{title}.pdf"
doc = SimpleDocTemplate(str(path), pagesize=A4,
topMargin=25*mm, bottomMargin=25*mm)
styles = getSampleStyleSheet()
styles.add(ParagraphStyle(
name="CJKTitle", fontName=cjk_font, fontSize=22,
spaceAfter=12, alignment=1,
))
styles.add(ParagraphStyle(
name="CJKHeading", fontName=cjk_font, fontSize=16,
spaceAfter=8, spaceBefore=16, textColor=colors.HexColor("#1a1a2e"),
))
styles.add(ParagraphStyle(
name="CJKBody", fontName=cjk_font, fontSize=11,
spaceAfter=6, leading=16,
))
elements = []
# Title
elements.append(Paragraph(title, styles["CJKTitle"]))
elements.append(Spacer(1, 12))
# Executive summary
if summary := draft.get("executive_summary"):
elements.append(Paragraph("执行摘要", styles["CJKHeading"]))
elements.append(Paragraph(summary, styles["CJKBody"]))
elements.append(Spacer(1, 12))
# Chapters
for chapter in draft.get("chapters", []):
elements.append(PageBreak())
elements.append(Paragraph(chapter["title"], styles["CJKHeading"]))
content = chapter.get("content", "")
for para in content.split("\n\n"):
para = para.strip()
if para:
elements.append(Paragraph(para, styles["CJKBody"]))
# Tables
for table_spec in data_assets.get("tables", []):
elements.append(Spacer(1, 12))
elements.append(Paragraph(table_spec.get("title", ""), styles["CJKHeading"]))
headers = table_spec.get("headers", [])
rows = table_spec.get("rows", [])
if headers:
table_data = [headers] + rows
t = Table(table_data)
t.setStyle(TableStyle([
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#1a1a2e")),
("TEXTCOLOR", (0, 0), (-1, 0), colors.white),
("FONTNAME", (0, 0), (-1, -1), cjk_font),
("FONTSIZE", (0, 0), (-1, 0), 10),
("FONTSIZE", (0, 1), (-1, -1), 9),
("GRID", (0, 0), (-1, -1), 0.5, colors.grey),
("ALIGN", (0, 0), (-1, -1), "CENTER"),
("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#f5f5f5")]),
]))
elements.append(t)
doc.build(elements)
return path
async def _render_pdf_fpdf(
self, draft: dict, data_assets: dict, output_dir: Path, title: str
) -> Path:
"""Fallback PDF generation with fpdf2."""
from fpdf import FPDF
pdf = FPDF()
pdf.set_auto_page_break(auto=True, margin=15)
# Try CJK font
for font_path in [
"/System/Library/Fonts/STHeiti Medium.ttc",
"/System/Library/Fonts/PingFang.ttc",
]:
if Path(font_path).exists():
try:
pdf.add_font("CJK", "", font_path, uni=True)
pdf.set_font("CJK", "", 11)
break
except Exception:
pdf.set_font("Helvetica", "", 11)
else:
pdf.set_font("Helvetica", "", 11)
pdf.add_page()
pdf.set_font_size(24)
pdf.cell(0, 20, title, new_x="LMARGIN", new_y="NEXT", align="C")
pdf.set_font_size(11)
if summary := draft.get("executive_summary"):
pdf.set_font_size(16)
pdf.cell(0, 12, "执行摘要", new_x="LMARGIN", new_y="NEXT")
pdf.set_font_size(11)
pdf.multi_cell(0, 6, summary)
for chapter in draft.get("chapters", []):
pdf.add_page()
pdf.set_font_size(16)
pdf.cell(0, 12, chapter["title"], new_x="LMARGIN", new_y="NEXT")
pdf.set_font_size(11)
pdf.multi_cell(0, 6, chapter.get("content", ""))
path = output_dir / f"{title}.pdf"
pdf.output(str(path))
return path