"""Formatter Agent — renders final report using Skills toolkit. Skills integration: - docx: python-docx (baseline) + docx-js via Node.js (rich mode) + OOXML template editing - pptx: html2pptx.js via Node.js (visual slides) + python-pptx fallback - xlsx: openpyxl + recalc.py (formula recalculation via LibreOffice) - pdf: reportlab with CJK support + fpdf2 fallback """ from __future__ import annotations import asyncio import json import logging import shutil import subprocess import tempfile from pathlib import Path from typing import Any from .base import BaseAgent logger = logging.getLogger(__name__) # Skills root SKILLS_ROOT = Path.home() / "Projects/code/20260119-skills合集/anthropics_skills/skills" DOCX_SKILLS = SKILLS_ROOT / "docx" PPTX_SKILLS = SKILLS_ROOT / "pptx" XLSX_SKILLS = SKILLS_ROOT / "xlsx" PDF_SKILLS = SKILLS_ROOT / "pdf" def _skills_available() -> dict[str, bool]: """Check which skill toolkits are available.""" return { "docx_js": (DOCX_SKILLS / "docx-js.md").exists(), "html2pptx": (PPTX_SKILLS / "scripts" / "html2pptx.js").exists(), "recalc": (XLSX_SKILLS / "recalc.py").exists(), "ooxml_docx": (DOCX_SKILLS / "ooxml" / "scripts" / "unpack.py").exists(), "ooxml_pptx": (PPTX_SKILLS / "ooxml" / "scripts" / "unpack.py").exists(), "pdf_scripts": (PDF_SKILLS / "scripts").is_dir(), } class FormatterAgent(BaseAgent): name = "formatter" description = "将报告渲染为 docx/pptx/xlsx/pdf,融合 Skills 能力" def __init__(self): super().__init__() self.skills = _skills_available() available = [k for k, v in self.skills.items() if v] logger.info(f"[formatter] available skills: {available}") async def run(self, context: dict[str, Any]) -> dict[str, Any]: draft = context["draft"] data_assets = context.get("data_assets", {}) output_dir = Path(context.get("output_dir", "output")) formats = context.get("output_formats", ["docx"]) template_path = context.get("template_path") # optional: user-provided template output_dir.mkdir(parents=True, exist_ok=True) title = draft.get("title", "报告") generated_files = [] for fmt in formats: try: match fmt: case "docx": path = await self._render_docx(draft, data_assets, output_dir, title, template_path) case "pptx": path = await self._render_pptx(draft, data_assets, output_dir, title) case "xlsx": path = await self._render_xlsx(data_assets, output_dir, title) case "pdf": path = await self._render_pdf(draft, data_assets, output_dir, title) case _: logger.warning(f"Unsupported format: {fmt}") continue generated_files.append(str(path)) logger.info(f"[formatter] generated {path}") except Exception as e: logger.exception(f"[formatter] failed to render {fmt}") return {"generated_files": generated_files} # ----------------------------------------------------------------------- # DOCX — python-docx baseline + OOXML template editing # ----------------------------------------------------------------------- async def _render_docx( self, draft: dict, data_assets: dict, output_dir: Path, title: str, template_path: str | None = None, ) -> Path: if template_path and self.skills["ooxml_docx"]: return await self._render_docx_from_template( draft, data_assets, output_dir, title, Path(template_path) ) return await self._render_docx_baseline(draft, data_assets, output_dir, title) async def _render_docx_baseline( self, draft: dict, data_assets: dict, output_dir: Path, title: str ) -> Path: from docx import Document from docx.shared import Pt, RGBColor from docx.enum.text import WD_ALIGN_PARAGRAPH doc = Document() # -- Styles -- style = doc.styles["Normal"] style.font.name = "微软雅黑" style.font.size = Pt(11) # Title t = doc.add_heading(title, level=0) t.alignment = WD_ALIGN_PARAGRAPH.CENTER # Executive summary if summary := draft.get("executive_summary"): doc.add_heading("执行摘要", level=1) # Add summary with highlight styling p = doc.add_paragraph() run = p.add_run(summary) run.font.size = Pt(11) run.font.color.rgb = RGBColor(0x33, 0x33, 0x33) # Chapters for chapter in draft.get("chapters", []): doc.add_heading(chapter["title"], level=1) content = chapter.get("content", "") self._docx_render_markdown(doc, content) # Tables from data assets for table_spec in data_assets.get("tables", []): doc.add_heading(table_spec.get("title", "数据表"), level=2) self._docx_add_table(doc, table_spec) # Page break + chart descriptions as placeholders for chart_spec in data_assets.get("charts", []): doc.add_heading(chart_spec.get("title", "图表"), level=2) desc = chart_spec.get("description", "") chart_type = chart_spec.get("type", "") doc.add_paragraph(f"[{chart_type.upper()} 图表] {desc}") # Render chart data as a table too chart_data = chart_spec.get("data", {}) if labels := chart_data.get("labels"): for ds in chart_data.get("datasets", []): self._docx_add_table(doc, { "headers": ["项目", ds.get("label", "数据")], "rows": [[str(l), str(v)] for l, v in zip(labels, ds.get("data", []))], }) path = output_dir / f"{title}.docx" doc.save(str(path)) return path async def _render_docx_from_template( self, draft: dict, data_assets: dict, output_dir: Path, title: str, template_path: Path, ) -> Path: """Edit an existing DOCX template using OOXML unpack/edit/pack workflow.""" unpack_script = DOCX_SKILLS / "ooxml" / "scripts" / "unpack.py" pack_script = DOCX_SKILLS / "ooxml" / "scripts" / "pack.py" with tempfile.TemporaryDirectory() as tmpdir: work_dir = Path(tmpdir) / "unpacked" # Unpack template proc = await asyncio.create_subprocess_exec( "python3", str(unpack_script), str(template_path), str(work_dir), stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) await proc.wait() if proc.returncode != 0: logger.warning("[formatter] OOXML unpack failed, falling back to baseline") return await self._render_docx_baseline(draft, data_assets, output_dir, title) # TODO: edit XML content in work_dir based on draft # For now, just pack back as-is (template passthrough) output_path = output_dir / f"{title}.docx" proc = await asyncio.create_subprocess_exec( "python3", str(pack_script), str(work_dir), str(output_path), stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) await proc.wait() return output_path def _docx_render_markdown(self, doc, content: str): """Convert markdown-ish content to docx paragraphs.""" from docx.shared import Pt for block in content.split("\n\n"): block = block.strip() if not block: continue if block.startswith("#### "): doc.add_heading(block[5:], level=4) elif block.startswith("### "): doc.add_heading(block[4:], level=3) elif block.startswith("## "): doc.add_heading(block[3:], level=2) elif block.startswith("- ") or block.startswith("* "): # Bullet list for line in block.split("\n"): line = line.lstrip("- *").strip() if line: doc.add_paragraph(line, style="List Bullet") elif block.startswith("1. ") or block.startswith("1)"): # Numbered list for line in block.split("\n"): text = line.lstrip("0123456789.)) ").strip() if text: doc.add_paragraph(text, style="List Number") else: p = doc.add_paragraph(block) for run in p.runs: run.font.size = Pt(11) def _docx_add_table(self, doc, table_spec: dict): """Add a formatted table to the document.""" from docx.shared import Pt, RGBColor from docx.oxml.ns import qn headers = table_spec.get("headers", []) rows = table_spec.get("rows", []) if not headers: return tbl = doc.add_table(rows=1 + len(rows), cols=len(headers)) tbl.style = "Light Grid Accent 1" # Header row for i, h in enumerate(headers): cell = tbl.rows[0].cells[i] cell.text = str(h) for p in cell.paragraphs: for run in p.runs: run.font.bold = True run.font.size = Pt(10) # Data rows for r_idx, row in enumerate(rows): for c_idx, cell_val in enumerate(row): tbl.rows[r_idx + 1].cells[c_idx].text = str(cell_val) # ----------------------------------------------------------------------- # PPTX — html2pptx.js (rich) or python-pptx (fallback) # ----------------------------------------------------------------------- async def _render_pptx( self, draft: dict, data_assets: dict, output_dir: Path, title: str ) -> Path: if self.skills["html2pptx"]: try: return await self._render_pptx_html2pptx(draft, data_assets, output_dir, title) except Exception as e: logger.warning(f"[formatter] html2pptx failed ({e}), falling back to python-pptx") return await self._render_pptx_baseline(draft, data_assets, output_dir, title) async def _render_pptx_html2pptx( self, draft: dict, data_assets: dict, output_dir: Path, title: str ) -> Path: """Generate PPTX using html2pptx.js skill for visual slides.""" with tempfile.TemporaryDirectory() as tmpdir: work = Path(tmpdir) # Generate HTML slides slides_html = [] # Title slide slides_html.append(f"""

{title}

{draft.get('executive_summary', '')[:100]}

""") # Chapter slides for ch in draft.get("chapters", []): content_lines = ch.get("content", "")[:400].split("\n") bullets = "".join(f"
  • {l.strip()}
  • " for l in content_lines if l.strip()) slides_html.append(f"""

    {ch['title']}

    """) # Write HTML files for i, html in enumerate(slides_html): (work / f"slide_{i}.html").write_text(html, encoding="utf-8") # Write conversion script script = work / "convert.js" html2pptx_path = PPTX_SKILLS / "scripts" / "html2pptx.js" slide_files = [f"slide_{i}.html" for i in range(len(slides_html))] script.write_text(f"""\ const pptxgen = require('pptxgenjs'); const {{ html2pptx }} = require('{html2pptx_path}'); const path = require('path'); async function main() {{ const pptx = new pptxgen(); pptx.layout = 'LAYOUT_16x9'; const files = {json.dumps(slide_files)}; for (const f of files) {{ await html2pptx(path.join('{work}', f), pptx); }} await pptx.writeFile({{ fileName: '{output_dir / f"{title}.pptx"}' }}); }} main().catch(e => {{ console.error(e); process.exit(1); }}); """, encoding="utf-8") proc = await asyncio.create_subprocess_exec( "node", str(script), stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=str(work), ) stdout, stderr = await proc.communicate() if proc.returncode != 0: raise RuntimeError(f"html2pptx failed: {stderr.decode()}") return output_dir / f"{title}.pptx" async def _render_pptx_baseline( self, draft: dict, data_assets: dict, output_dir: Path, title: str ) -> Path: from pptx import Presentation from pptx.util import Inches, Pt from pptx.dml.color import RGBColor prs = Presentation() # Title slide slide = prs.slides.add_slide(prs.slide_layouts[0]) slide.shapes.title.text = title if len(slide.placeholders) > 1: slide.placeholders[1].text = draft.get("executive_summary", "")[:200] # Chapter slides for chapter in draft.get("chapters", []): slide = prs.slides.add_slide(prs.slide_layouts[1]) slide.shapes.title.text = chapter["title"] body = slide.placeholders[1] tf = body.text_frame tf.clear() content = chapter.get("content", "") lines = [l.strip() for l in content.split("\n") if l.strip()] for line in lines[:12]: # max 12 bullets per slide p = tf.add_paragraph() # Strip markdown markers clean = line.lstrip("#-*0123456789.) ").strip() p.text = clean p.font.size = Pt(14) p.space_after = Pt(4) # Data table slides for table_spec in data_assets.get("tables", []): slide = prs.slides.add_slide(prs.slide_layouts[5]) # blank layout slide.shapes.title.text = table_spec.get("title", "数据表") headers = table_spec.get("headers", []) rows = table_spec.get("rows", []) if headers and rows: n_rows = min(len(rows) + 1, 10) # limit rows per slide n_cols = len(headers) tbl = slide.shapes.add_table( n_rows, n_cols, Inches(0.5), Inches(1.5), Inches(9), Inches(4.5) ).table for i, h in enumerate(headers): tbl.cell(0, i).text = str(h) for r_idx, row in enumerate(rows[:n_rows - 1]): for c_idx, val in enumerate(row[:n_cols]): tbl.cell(r_idx + 1, c_idx).text = str(val) path = output_dir / f"{title}.pptx" prs.save(str(path)) return path # ----------------------------------------------------------------------- # XLSX — openpyxl + recalc.py (formula recalculation) # ----------------------------------------------------------------------- async def _render_xlsx( self, data_assets: dict, output_dir: Path, title: str ) -> Path: from openpyxl import Workbook from openpyxl.styles import Font, PatternFill, Alignment, Border, Side from openpyxl.utils import get_column_letter wb = Workbook() ws = wb.active ws.title = "数据总览" # Professional styling header_font = Font(bold=True, size=11, color="FFFFFF") header_fill = PatternFill(start_color="1A1A2E", end_color="1A1A2E", fill_type="solid") title_font = Font(bold=True, size=14, color="1A1A2E") thin_border = Border( left=Side(style="thin", color="CCCCCC"), right=Side(style="thin", color="CCCCCC"), top=Side(style="thin", color="CCCCCC"), bottom=Side(style="thin", color="CCCCCC"), ) current_row = 1 has_formulas = False for table_spec in data_assets.get("tables", []): # Table title ws.cell(row=current_row, column=1, value=table_spec.get("title", "")).font = title_font current_row += 1 headers = table_spec.get("headers", []) rows = table_spec.get("rows", []) if headers: # Header row with styling for col_idx, h in enumerate(headers, 1): cell = ws.cell(row=current_row, column=col_idx, value=h) cell.font = header_font cell.fill = header_fill cell.alignment = Alignment(horizontal="center") cell.border = thin_border current_row += 1 # Data rows data_start = current_row for row_data in rows: for col_idx, val in enumerate(row_data, 1): cell = ws.cell(row=current_row, column=col_idx, value=val) cell.border = thin_border # Try to convert numeric strings if isinstance(val, str): try: cell.value = float(val.replace(",", "")) except (ValueError, AttributeError): pass current_row += 1 # Auto-sum row for numeric columns data_end = current_row - 1 if data_end > data_start: for col_idx in range(1, len(headers) + 1): col_letter = get_column_letter(col_idx) test_cell = ws.cell(row=data_start, column=col_idx) if isinstance(test_cell.value, (int, float)): cell = ws.cell( row=current_row, column=col_idx, value=f"=SUM({col_letter}{data_start}:{col_letter}{data_end})" ) cell.font = Font(bold=True) cell.border = thin_border has_formulas = True elif col_idx == 1: cell = ws.cell(row=current_row, column=1, value="合计") cell.font = Font(bold=True) cell.border = thin_border current_row += 1 # Auto-fit column widths for col_idx in range(1, len(headers) + 1): max_len = max( len(str(ws.cell(row=r, column=col_idx).value or "")) for r in range(current_row - len(rows) - 2, current_row) ) ws.column_dimensions[get_column_letter(col_idx)].width = min(max_len + 4, 30) current_row += 2 # gap between tables # Chart data sheets for chart_spec in data_assets.get("charts", []): chart_ws = wb.create_sheet(title=chart_spec.get("title", "图表")[:31]) chart_ws.cell(row=1, column=1, value=chart_spec.get("title", "")).font = title_font chart_data = chart_spec.get("data", {}) labels = chart_data.get("labels", []) datasets = chart_data.get("datasets", []) # Headers: [项目, 数据集1, 数据集2, ...] chart_ws.cell(row=2, column=1, value="项目").font = Font(bold=True) for ds_idx, ds in enumerate(datasets, 2): chart_ws.cell(row=2, column=ds_idx, value=ds.get("label", "")).font = Font(bold=True) for r_idx, label in enumerate(labels, 3): chart_ws.cell(row=r_idx, column=1, value=label) for ds_idx, ds in enumerate(datasets, 2): data = ds.get("data", []) if r_idx - 3 < len(data): chart_ws.cell(row=r_idx, column=ds_idx, value=data[r_idx - 3]) path = output_dir / f"{title}.xlsx" wb.save(str(path)) # Run recalc.py if we have formulas and the skill is available if has_formulas and self.skills["recalc"]: await self._xlsx_recalc(path) return path async def _xlsx_recalc(self, path: Path): """Recalculate formulas using Skills recalc.py (requires LibreOffice).""" recalc_script = XLSX_SKILLS / "recalc.py" logger.info(f"[formatter] running recalc.py on {path}") try: proc = await asyncio.create_subprocess_exec( "python3", str(recalc_script), str(path), "30", stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = await proc.communicate() if proc.returncode == 0: result = json.loads(stdout.decode()) logger.info(f"[formatter] recalc result: {result.get('status')}") else: logger.warning(f"[formatter] recalc.py failed: {stderr.decode()[:200]}") except Exception as e: logger.warning(f"[formatter] recalc.py error: {e}") # ----------------------------------------------------------------------- # PDF — reportlab with CJK support + fpdf2 fallback # ----------------------------------------------------------------------- async def _render_pdf( self, draft: dict, data_assets: dict, output_dir: Path, title: str ) -> Path: try: return await self._render_pdf_reportlab(draft, data_assets, output_dir, title) except Exception as e: logger.warning(f"[formatter] reportlab failed ({e}), falling back to fpdf2") return await self._render_pdf_fpdf(draft, data_assets, output_dir, title) async def _render_pdf_reportlab( self, draft: dict, data_assets: dict, output_dir: Path, title: str ) -> Path: """Generate PDF with reportlab — better CJK support and table rendering.""" from reportlab.lib.pagesizes import A4 from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.units import mm from reportlab.lib import colors from reportlab.platypus import ( SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, PageBreak, ) from reportlab.pdfbase import pdfmetrics from reportlab.pdfbase.ttfonts import TTFont # Try to register a CJK font cjk_font = "Helvetica" for font_path in [ "/System/Library/Fonts/STHeiti Medium.ttc", "/System/Library/Fonts/PingFang.ttc", "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc", ]: if Path(font_path).exists(): try: pdfmetrics.registerFont(TTFont("CJK", font_path, subfontIndex=0)) cjk_font = "CJK" break except Exception: continue path = output_dir / f"{title}.pdf" doc = SimpleDocTemplate(str(path), pagesize=A4, topMargin=25*mm, bottomMargin=25*mm) styles = getSampleStyleSheet() styles.add(ParagraphStyle( name="CJKTitle", fontName=cjk_font, fontSize=22, spaceAfter=12, alignment=1, )) styles.add(ParagraphStyle( name="CJKHeading", fontName=cjk_font, fontSize=16, spaceAfter=8, spaceBefore=16, textColor=colors.HexColor("#1a1a2e"), )) styles.add(ParagraphStyle( name="CJKBody", fontName=cjk_font, fontSize=11, spaceAfter=6, leading=16, )) elements = [] # Title elements.append(Paragraph(title, styles["CJKTitle"])) elements.append(Spacer(1, 12)) # Executive summary if summary := draft.get("executive_summary"): elements.append(Paragraph("执行摘要", styles["CJKHeading"])) elements.append(Paragraph(summary, styles["CJKBody"])) elements.append(Spacer(1, 12)) # Chapters for chapter in draft.get("chapters", []): elements.append(PageBreak()) elements.append(Paragraph(chapter["title"], styles["CJKHeading"])) content = chapter.get("content", "") for para in content.split("\n\n"): para = para.strip() if para: elements.append(Paragraph(para, styles["CJKBody"])) # Tables for table_spec in data_assets.get("tables", []): elements.append(Spacer(1, 12)) elements.append(Paragraph(table_spec.get("title", ""), styles["CJKHeading"])) headers = table_spec.get("headers", []) rows = table_spec.get("rows", []) if headers: table_data = [headers] + rows t = Table(table_data) t.setStyle(TableStyle([ ("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#1a1a2e")), ("TEXTCOLOR", (0, 0), (-1, 0), colors.white), ("FONTNAME", (0, 0), (-1, -1), cjk_font), ("FONTSIZE", (0, 0), (-1, 0), 10), ("FONTSIZE", (0, 1), (-1, -1), 9), ("GRID", (0, 0), (-1, -1), 0.5, colors.grey), ("ALIGN", (0, 0), (-1, -1), "CENTER"), ("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#f5f5f5")]), ])) elements.append(t) doc.build(elements) return path async def _render_pdf_fpdf( self, draft: dict, data_assets: dict, output_dir: Path, title: str ) -> Path: """Fallback PDF generation with fpdf2.""" from fpdf import FPDF pdf = FPDF() pdf.set_auto_page_break(auto=True, margin=15) # Try CJK font for font_path in [ "/System/Library/Fonts/STHeiti Medium.ttc", "/System/Library/Fonts/PingFang.ttc", ]: if Path(font_path).exists(): try: pdf.add_font("CJK", "", font_path, uni=True) pdf.set_font("CJK", "", 11) break except Exception: pdf.set_font("Helvetica", "", 11) else: pdf.set_font("Helvetica", "", 11) pdf.add_page() pdf.set_font_size(24) pdf.cell(0, 20, title, new_x="LMARGIN", new_y="NEXT", align="C") pdf.set_font_size(11) if summary := draft.get("executive_summary"): pdf.set_font_size(16) pdf.cell(0, 12, "执行摘要", new_x="LMARGIN", new_y="NEXT") pdf.set_font_size(11) pdf.multi_cell(0, 6, summary) for chapter in draft.get("chapters", []): pdf.add_page() pdf.set_font_size(16) pdf.cell(0, 12, chapter["title"], new_x="LMARGIN", new_y="NEXT") pdf.set_font_size(11) pdf.multi_cell(0, 6, chapter.get("content", "")) path = output_dir / f"{title}.pdf" pdf.output(str(path)) return path