670 lines
27 KiB
Python
670 lines
27 KiB
Python
"""Formatter Agent — renders final report using Skills toolkit.
|
||
|
||
Skills integration:
|
||
- docx: python-docx (baseline) + docx-js via Node.js (rich mode) + OOXML template editing
|
||
- pptx: html2pptx.js via Node.js (visual slides) + python-pptx fallback
|
||
- xlsx: openpyxl + recalc.py (formula recalculation via LibreOffice)
|
||
- pdf: reportlab with CJK support + fpdf2 fallback
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import shutil
|
||
import subprocess
|
||
import tempfile
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
from .base import BaseAgent
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Skills root
|
||
SKILLS_ROOT = Path.home() / "Projects/code/20260119-skills合集/anthropics_skills/skills"
|
||
DOCX_SKILLS = SKILLS_ROOT / "docx"
|
||
PPTX_SKILLS = SKILLS_ROOT / "pptx"
|
||
XLSX_SKILLS = SKILLS_ROOT / "xlsx"
|
||
PDF_SKILLS = SKILLS_ROOT / "pdf"
|
||
|
||
|
||
def _skills_available() -> dict[str, bool]:
|
||
"""Check which skill toolkits are available."""
|
||
return {
|
||
"docx_js": (DOCX_SKILLS / "docx-js.md").exists(),
|
||
"html2pptx": (PPTX_SKILLS / "scripts" / "html2pptx.js").exists(),
|
||
"recalc": (XLSX_SKILLS / "recalc.py").exists(),
|
||
"ooxml_docx": (DOCX_SKILLS / "ooxml" / "scripts" / "unpack.py").exists(),
|
||
"ooxml_pptx": (PPTX_SKILLS / "ooxml" / "scripts" / "unpack.py").exists(),
|
||
"pdf_scripts": (PDF_SKILLS / "scripts").is_dir(),
|
||
}
|
||
|
||
|
||
class FormatterAgent(BaseAgent):
|
||
name = "formatter"
|
||
description = "将报告渲染为 docx/pptx/xlsx/pdf,融合 Skills 能力"
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.skills = _skills_available()
|
||
available = [k for k, v in self.skills.items() if v]
|
||
logger.info(f"[formatter] available skills: {available}")
|
||
|
||
async def run(self, context: dict[str, Any]) -> dict[str, Any]:
|
||
draft = context["draft"]
|
||
data_assets = context.get("data_assets", {})
|
||
output_dir = Path(context.get("output_dir", "output"))
|
||
formats = context.get("output_formats", ["docx"])
|
||
template_path = context.get("template_path") # optional: user-provided template
|
||
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
title = draft.get("title", "报告")
|
||
generated_files = []
|
||
|
||
for fmt in formats:
|
||
try:
|
||
match fmt:
|
||
case "docx":
|
||
path = await self._render_docx(draft, data_assets, output_dir, title, template_path)
|
||
case "pptx":
|
||
path = await self._render_pptx(draft, data_assets, output_dir, title)
|
||
case "xlsx":
|
||
path = await self._render_xlsx(data_assets, output_dir, title)
|
||
case "pdf":
|
||
path = await self._render_pdf(draft, data_assets, output_dir, title)
|
||
case _:
|
||
logger.warning(f"Unsupported format: {fmt}")
|
||
continue
|
||
generated_files.append(str(path))
|
||
logger.info(f"[formatter] generated {path}")
|
||
except Exception as e:
|
||
logger.exception(f"[formatter] failed to render {fmt}")
|
||
|
||
return {"generated_files": generated_files}
|
||
|
||
# -----------------------------------------------------------------------
|
||
# DOCX — python-docx baseline + OOXML template editing
|
||
# -----------------------------------------------------------------------
|
||
|
||
async def _render_docx(
|
||
self, draft: dict, data_assets: dict, output_dir: Path, title: str,
|
||
template_path: str | None = None,
|
||
) -> Path:
|
||
if template_path and self.skills["ooxml_docx"]:
|
||
return await self._render_docx_from_template(
|
||
draft, data_assets, output_dir, title, Path(template_path)
|
||
)
|
||
return await self._render_docx_baseline(draft, data_assets, output_dir, title)
|
||
|
||
async def _render_docx_baseline(
|
||
self, draft: dict, data_assets: dict, output_dir: Path, title: str
|
||
) -> Path:
|
||
from docx import Document
|
||
from docx.shared import Pt, RGBColor
|
||
from docx.enum.text import WD_ALIGN_PARAGRAPH
|
||
|
||
doc = Document()
|
||
|
||
# -- Styles --
|
||
style = doc.styles["Normal"]
|
||
style.font.name = "微软雅黑"
|
||
style.font.size = Pt(11)
|
||
|
||
# Title
|
||
t = doc.add_heading(title, level=0)
|
||
t.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
||
|
||
# Executive summary
|
||
if summary := draft.get("executive_summary"):
|
||
doc.add_heading("执行摘要", level=1)
|
||
# Add summary with highlight styling
|
||
p = doc.add_paragraph()
|
||
run = p.add_run(summary)
|
||
run.font.size = Pt(11)
|
||
run.font.color.rgb = RGBColor(0x33, 0x33, 0x33)
|
||
|
||
# Chapters
|
||
for chapter in draft.get("chapters", []):
|
||
doc.add_heading(chapter["title"], level=1)
|
||
content = chapter.get("content", "")
|
||
self._docx_render_markdown(doc, content)
|
||
|
||
# Tables from data assets
|
||
for table_spec in data_assets.get("tables", []):
|
||
doc.add_heading(table_spec.get("title", "数据表"), level=2)
|
||
self._docx_add_table(doc, table_spec)
|
||
|
||
# Page break + chart descriptions as placeholders
|
||
for chart_spec in data_assets.get("charts", []):
|
||
doc.add_heading(chart_spec.get("title", "图表"), level=2)
|
||
desc = chart_spec.get("description", "")
|
||
chart_type = chart_spec.get("type", "")
|
||
doc.add_paragraph(f"[{chart_type.upper()} 图表] {desc}")
|
||
# Render chart data as a table too
|
||
chart_data = chart_spec.get("data", {})
|
||
if labels := chart_data.get("labels"):
|
||
for ds in chart_data.get("datasets", []):
|
||
self._docx_add_table(doc, {
|
||
"headers": ["项目", ds.get("label", "数据")],
|
||
"rows": [[str(l), str(v)] for l, v in zip(labels, ds.get("data", []))],
|
||
})
|
||
|
||
path = output_dir / f"{title}.docx"
|
||
doc.save(str(path))
|
||
return path
|
||
|
||
async def _render_docx_from_template(
|
||
self, draft: dict, data_assets: dict, output_dir: Path, title: str,
|
||
template_path: Path,
|
||
) -> Path:
|
||
"""Edit an existing DOCX template using OOXML unpack/edit/pack workflow."""
|
||
unpack_script = DOCX_SKILLS / "ooxml" / "scripts" / "unpack.py"
|
||
pack_script = DOCX_SKILLS / "ooxml" / "scripts" / "pack.py"
|
||
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
work_dir = Path(tmpdir) / "unpacked"
|
||
|
||
# Unpack template
|
||
proc = await asyncio.create_subprocess_exec(
|
||
"python3", str(unpack_script), str(template_path), str(work_dir),
|
||
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||
)
|
||
await proc.wait()
|
||
|
||
if proc.returncode != 0:
|
||
logger.warning("[formatter] OOXML unpack failed, falling back to baseline")
|
||
return await self._render_docx_baseline(draft, data_assets, output_dir, title)
|
||
|
||
# TODO: edit XML content in work_dir based on draft
|
||
# For now, just pack back as-is (template passthrough)
|
||
output_path = output_dir / f"{title}.docx"
|
||
proc = await asyncio.create_subprocess_exec(
|
||
"python3", str(pack_script), str(work_dir), str(output_path),
|
||
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||
)
|
||
await proc.wait()
|
||
return output_path
|
||
|
||
def _docx_render_markdown(self, doc, content: str):
|
||
"""Convert markdown-ish content to docx paragraphs."""
|
||
from docx.shared import Pt
|
||
|
||
for block in content.split("\n\n"):
|
||
block = block.strip()
|
||
if not block:
|
||
continue
|
||
if block.startswith("#### "):
|
||
doc.add_heading(block[5:], level=4)
|
||
elif block.startswith("### "):
|
||
doc.add_heading(block[4:], level=3)
|
||
elif block.startswith("## "):
|
||
doc.add_heading(block[3:], level=2)
|
||
elif block.startswith("- ") or block.startswith("* "):
|
||
# Bullet list
|
||
for line in block.split("\n"):
|
||
line = line.lstrip("- *").strip()
|
||
if line:
|
||
doc.add_paragraph(line, style="List Bullet")
|
||
elif block.startswith("1. ") or block.startswith("1)"):
|
||
# Numbered list
|
||
for line in block.split("\n"):
|
||
text = line.lstrip("0123456789.)) ").strip()
|
||
if text:
|
||
doc.add_paragraph(text, style="List Number")
|
||
else:
|
||
p = doc.add_paragraph(block)
|
||
for run in p.runs:
|
||
run.font.size = Pt(11)
|
||
|
||
def _docx_add_table(self, doc, table_spec: dict):
|
||
"""Add a formatted table to the document."""
|
||
from docx.shared import Pt, RGBColor
|
||
from docx.oxml.ns import qn
|
||
|
||
headers = table_spec.get("headers", [])
|
||
rows = table_spec.get("rows", [])
|
||
if not headers:
|
||
return
|
||
|
||
tbl = doc.add_table(rows=1 + len(rows), cols=len(headers))
|
||
tbl.style = "Light Grid Accent 1"
|
||
|
||
# Header row
|
||
for i, h in enumerate(headers):
|
||
cell = tbl.rows[0].cells[i]
|
||
cell.text = str(h)
|
||
for p in cell.paragraphs:
|
||
for run in p.runs:
|
||
run.font.bold = True
|
||
run.font.size = Pt(10)
|
||
|
||
# Data rows
|
||
for r_idx, row in enumerate(rows):
|
||
for c_idx, cell_val in enumerate(row):
|
||
tbl.rows[r_idx + 1].cells[c_idx].text = str(cell_val)
|
||
|
||
# -----------------------------------------------------------------------
|
||
# PPTX — html2pptx.js (rich) or python-pptx (fallback)
|
||
# -----------------------------------------------------------------------
|
||
|
||
async def _render_pptx(
|
||
self, draft: dict, data_assets: dict, output_dir: Path, title: str
|
||
) -> Path:
|
||
if self.skills["html2pptx"]:
|
||
try:
|
||
return await self._render_pptx_html2pptx(draft, data_assets, output_dir, title)
|
||
except Exception as e:
|
||
logger.warning(f"[formatter] html2pptx failed ({e}), falling back to python-pptx")
|
||
return await self._render_pptx_baseline(draft, data_assets, output_dir, title)
|
||
|
||
async def _render_pptx_html2pptx(
|
||
self, draft: dict, data_assets: dict, output_dir: Path, title: str
|
||
) -> Path:
|
||
"""Generate PPTX using html2pptx.js skill for visual slides."""
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
work = Path(tmpdir)
|
||
|
||
# Generate HTML slides
|
||
slides_html = []
|
||
# Title slide
|
||
slides_html.append(f"""<html><body style="width:720pt;height:405pt;display:flex;align-items:center;justify-content:center;flex-direction:column;background:linear-gradient(135deg,#1a1a2e,#16213e);color:white;font-family:sans-serif;">
|
||
<h1 style="font-size:36pt;margin:0;">{title}</h1>
|
||
<p style="font-size:18pt;color:#aaa;margin-top:20pt;">{draft.get('executive_summary', '')[:100]}</p>
|
||
</body></html>""")
|
||
|
||
# Chapter slides
|
||
for ch in draft.get("chapters", []):
|
||
content_lines = ch.get("content", "")[:400].split("\n")
|
||
bullets = "".join(f"<li>{l.strip()}</li>" for l in content_lines if l.strip())
|
||
slides_html.append(f"""<html><body style="width:720pt;height:405pt;padding:40pt;font-family:sans-serif;background:#ffffff;">
|
||
<h2 style="font-size:28pt;color:#1a1a2e;border-bottom:2pt solid #e94560;padding-bottom:10pt;">{ch['title']}</h2>
|
||
<ul style="font-size:14pt;color:#333;line-height:1.8;">{bullets}</ul>
|
||
</body></html>""")
|
||
|
||
# Write HTML files
|
||
for i, html in enumerate(slides_html):
|
||
(work / f"slide_{i}.html").write_text(html, encoding="utf-8")
|
||
|
||
# Write conversion script
|
||
script = work / "convert.js"
|
||
html2pptx_path = PPTX_SKILLS / "scripts" / "html2pptx.js"
|
||
slide_files = [f"slide_{i}.html" for i in range(len(slides_html))]
|
||
|
||
script.write_text(f"""\
|
||
const pptxgen = require('pptxgenjs');
|
||
const {{ html2pptx }} = require('{html2pptx_path}');
|
||
const path = require('path');
|
||
|
||
async function main() {{
|
||
const pptx = new pptxgen();
|
||
pptx.layout = 'LAYOUT_16x9';
|
||
const files = {json.dumps(slide_files)};
|
||
for (const f of files) {{
|
||
await html2pptx(path.join('{work}', f), pptx);
|
||
}}
|
||
await pptx.writeFile({{ fileName: '{output_dir / f"{title}.pptx"}' }});
|
||
}}
|
||
main().catch(e => {{ console.error(e); process.exit(1); }});
|
||
""", encoding="utf-8")
|
||
|
||
proc = await asyncio.create_subprocess_exec(
|
||
"node", str(script),
|
||
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||
cwd=str(work),
|
||
)
|
||
stdout, stderr = await proc.communicate()
|
||
if proc.returncode != 0:
|
||
raise RuntimeError(f"html2pptx failed: {stderr.decode()}")
|
||
|
||
return output_dir / f"{title}.pptx"
|
||
|
||
async def _render_pptx_baseline(
|
||
self, draft: dict, data_assets: dict, output_dir: Path, title: str
|
||
) -> Path:
|
||
from pptx import Presentation
|
||
from pptx.util import Inches, Pt
|
||
from pptx.dml.color import RGBColor
|
||
|
||
prs = Presentation()
|
||
|
||
# Title slide
|
||
slide = prs.slides.add_slide(prs.slide_layouts[0])
|
||
slide.shapes.title.text = title
|
||
if len(slide.placeholders) > 1:
|
||
slide.placeholders[1].text = draft.get("executive_summary", "")[:200]
|
||
|
||
# Chapter slides
|
||
for chapter in draft.get("chapters", []):
|
||
slide = prs.slides.add_slide(prs.slide_layouts[1])
|
||
slide.shapes.title.text = chapter["title"]
|
||
body = slide.placeholders[1]
|
||
tf = body.text_frame
|
||
tf.clear()
|
||
|
||
content = chapter.get("content", "")
|
||
lines = [l.strip() for l in content.split("\n") if l.strip()]
|
||
for line in lines[:12]: # max 12 bullets per slide
|
||
p = tf.add_paragraph()
|
||
# Strip markdown markers
|
||
clean = line.lstrip("#-*0123456789.) ").strip()
|
||
p.text = clean
|
||
p.font.size = Pt(14)
|
||
p.space_after = Pt(4)
|
||
|
||
# Data table slides
|
||
for table_spec in data_assets.get("tables", []):
|
||
slide = prs.slides.add_slide(prs.slide_layouts[5]) # blank layout
|
||
slide.shapes.title.text = table_spec.get("title", "数据表")
|
||
|
||
headers = table_spec.get("headers", [])
|
||
rows = table_spec.get("rows", [])
|
||
if headers and rows:
|
||
n_rows = min(len(rows) + 1, 10) # limit rows per slide
|
||
n_cols = len(headers)
|
||
tbl = slide.shapes.add_table(
|
||
n_rows, n_cols,
|
||
Inches(0.5), Inches(1.5), Inches(9), Inches(4.5)
|
||
).table
|
||
|
||
for i, h in enumerate(headers):
|
||
tbl.cell(0, i).text = str(h)
|
||
for r_idx, row in enumerate(rows[:n_rows - 1]):
|
||
for c_idx, val in enumerate(row[:n_cols]):
|
||
tbl.cell(r_idx + 1, c_idx).text = str(val)
|
||
|
||
path = output_dir / f"{title}.pptx"
|
||
prs.save(str(path))
|
||
return path
|
||
|
||
# -----------------------------------------------------------------------
|
||
# XLSX — openpyxl + recalc.py (formula recalculation)
|
||
# -----------------------------------------------------------------------
|
||
|
||
async def _render_xlsx(
|
||
self, data_assets: dict, output_dir: Path, title: str
|
||
) -> Path:
|
||
from openpyxl import Workbook
|
||
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
|
||
from openpyxl.utils import get_column_letter
|
||
|
||
wb = Workbook()
|
||
ws = wb.active
|
||
ws.title = "数据总览"
|
||
|
||
# Professional styling
|
||
header_font = Font(bold=True, size=11, color="FFFFFF")
|
||
header_fill = PatternFill(start_color="1A1A2E", end_color="1A1A2E", fill_type="solid")
|
||
title_font = Font(bold=True, size=14, color="1A1A2E")
|
||
thin_border = Border(
|
||
left=Side(style="thin", color="CCCCCC"),
|
||
right=Side(style="thin", color="CCCCCC"),
|
||
top=Side(style="thin", color="CCCCCC"),
|
||
bottom=Side(style="thin", color="CCCCCC"),
|
||
)
|
||
|
||
current_row = 1
|
||
has_formulas = False
|
||
|
||
for table_spec in data_assets.get("tables", []):
|
||
# Table title
|
||
ws.cell(row=current_row, column=1, value=table_spec.get("title", "")).font = title_font
|
||
current_row += 1
|
||
|
||
headers = table_spec.get("headers", [])
|
||
rows = table_spec.get("rows", [])
|
||
|
||
if headers:
|
||
# Header row with styling
|
||
for col_idx, h in enumerate(headers, 1):
|
||
cell = ws.cell(row=current_row, column=col_idx, value=h)
|
||
cell.font = header_font
|
||
cell.fill = header_fill
|
||
cell.alignment = Alignment(horizontal="center")
|
||
cell.border = thin_border
|
||
current_row += 1
|
||
|
||
# Data rows
|
||
data_start = current_row
|
||
for row_data in rows:
|
||
for col_idx, val in enumerate(row_data, 1):
|
||
cell = ws.cell(row=current_row, column=col_idx, value=val)
|
||
cell.border = thin_border
|
||
# Try to convert numeric strings
|
||
if isinstance(val, str):
|
||
try:
|
||
cell.value = float(val.replace(",", ""))
|
||
except (ValueError, AttributeError):
|
||
pass
|
||
current_row += 1
|
||
|
||
# Auto-sum row for numeric columns
|
||
data_end = current_row - 1
|
||
if data_end > data_start:
|
||
for col_idx in range(1, len(headers) + 1):
|
||
col_letter = get_column_letter(col_idx)
|
||
test_cell = ws.cell(row=data_start, column=col_idx)
|
||
if isinstance(test_cell.value, (int, float)):
|
||
cell = ws.cell(
|
||
row=current_row, column=col_idx,
|
||
value=f"=SUM({col_letter}{data_start}:{col_letter}{data_end})"
|
||
)
|
||
cell.font = Font(bold=True)
|
||
cell.border = thin_border
|
||
has_formulas = True
|
||
elif col_idx == 1:
|
||
cell = ws.cell(row=current_row, column=1, value="合计")
|
||
cell.font = Font(bold=True)
|
||
cell.border = thin_border
|
||
current_row += 1
|
||
|
||
# Auto-fit column widths
|
||
for col_idx in range(1, len(headers) + 1):
|
||
max_len = max(
|
||
len(str(ws.cell(row=r, column=col_idx).value or ""))
|
||
for r in range(current_row - len(rows) - 2, current_row)
|
||
)
|
||
ws.column_dimensions[get_column_letter(col_idx)].width = min(max_len + 4, 30)
|
||
|
||
current_row += 2 # gap between tables
|
||
|
||
# Chart data sheets
|
||
for chart_spec in data_assets.get("charts", []):
|
||
chart_ws = wb.create_sheet(title=chart_spec.get("title", "图表")[:31])
|
||
chart_ws.cell(row=1, column=1, value=chart_spec.get("title", "")).font = title_font
|
||
chart_data = chart_spec.get("data", {})
|
||
labels = chart_data.get("labels", [])
|
||
datasets = chart_data.get("datasets", [])
|
||
|
||
# Headers: [项目, 数据集1, 数据集2, ...]
|
||
chart_ws.cell(row=2, column=1, value="项目").font = Font(bold=True)
|
||
for ds_idx, ds in enumerate(datasets, 2):
|
||
chart_ws.cell(row=2, column=ds_idx, value=ds.get("label", "")).font = Font(bold=True)
|
||
|
||
for r_idx, label in enumerate(labels, 3):
|
||
chart_ws.cell(row=r_idx, column=1, value=label)
|
||
for ds_idx, ds in enumerate(datasets, 2):
|
||
data = ds.get("data", [])
|
||
if r_idx - 3 < len(data):
|
||
chart_ws.cell(row=r_idx, column=ds_idx, value=data[r_idx - 3])
|
||
|
||
path = output_dir / f"{title}.xlsx"
|
||
wb.save(str(path))
|
||
|
||
# Run recalc.py if we have formulas and the skill is available
|
||
if has_formulas and self.skills["recalc"]:
|
||
await self._xlsx_recalc(path)
|
||
|
||
return path
|
||
|
||
async def _xlsx_recalc(self, path: Path):
|
||
"""Recalculate formulas using Skills recalc.py (requires LibreOffice)."""
|
||
recalc_script = XLSX_SKILLS / "recalc.py"
|
||
logger.info(f"[formatter] running recalc.py on {path}")
|
||
try:
|
||
proc = await asyncio.create_subprocess_exec(
|
||
"python3", str(recalc_script), str(path), "30",
|
||
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||
)
|
||
stdout, stderr = await proc.communicate()
|
||
if proc.returncode == 0:
|
||
result = json.loads(stdout.decode())
|
||
logger.info(f"[formatter] recalc result: {result.get('status')}")
|
||
else:
|
||
logger.warning(f"[formatter] recalc.py failed: {stderr.decode()[:200]}")
|
||
except Exception as e:
|
||
logger.warning(f"[formatter] recalc.py error: {e}")
|
||
|
||
# -----------------------------------------------------------------------
|
||
# PDF — reportlab with CJK support + fpdf2 fallback
|
||
# -----------------------------------------------------------------------
|
||
|
||
async def _render_pdf(
|
||
self, draft: dict, data_assets: dict, output_dir: Path, title: str
|
||
) -> Path:
|
||
try:
|
||
return await self._render_pdf_reportlab(draft, data_assets, output_dir, title)
|
||
except Exception as e:
|
||
logger.warning(f"[formatter] reportlab failed ({e}), falling back to fpdf2")
|
||
return await self._render_pdf_fpdf(draft, data_assets, output_dir, title)
|
||
|
||
async def _render_pdf_reportlab(
|
||
self, draft: dict, data_assets: dict, output_dir: Path, title: str
|
||
) -> Path:
|
||
"""Generate PDF with reportlab — better CJK support and table rendering."""
|
||
from reportlab.lib.pagesizes import A4
|
||
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
|
||
from reportlab.lib.units import mm
|
||
from reportlab.lib import colors
|
||
from reportlab.platypus import (
|
||
SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, PageBreak,
|
||
)
|
||
from reportlab.pdfbase import pdfmetrics
|
||
from reportlab.pdfbase.ttfonts import TTFont
|
||
|
||
# Try to register a CJK font
|
||
cjk_font = "Helvetica"
|
||
for font_path in [
|
||
"/System/Library/Fonts/STHeiti Medium.ttc",
|
||
"/System/Library/Fonts/PingFang.ttc",
|
||
"/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc",
|
||
]:
|
||
if Path(font_path).exists():
|
||
try:
|
||
pdfmetrics.registerFont(TTFont("CJK", font_path, subfontIndex=0))
|
||
cjk_font = "CJK"
|
||
break
|
||
except Exception:
|
||
continue
|
||
|
||
path = output_dir / f"{title}.pdf"
|
||
doc = SimpleDocTemplate(str(path), pagesize=A4,
|
||
topMargin=25*mm, bottomMargin=25*mm)
|
||
|
||
styles = getSampleStyleSheet()
|
||
styles.add(ParagraphStyle(
|
||
name="CJKTitle", fontName=cjk_font, fontSize=22,
|
||
spaceAfter=12, alignment=1,
|
||
))
|
||
styles.add(ParagraphStyle(
|
||
name="CJKHeading", fontName=cjk_font, fontSize=16,
|
||
spaceAfter=8, spaceBefore=16, textColor=colors.HexColor("#1a1a2e"),
|
||
))
|
||
styles.add(ParagraphStyle(
|
||
name="CJKBody", fontName=cjk_font, fontSize=11,
|
||
spaceAfter=6, leading=16,
|
||
))
|
||
|
||
elements = []
|
||
|
||
# Title
|
||
elements.append(Paragraph(title, styles["CJKTitle"]))
|
||
elements.append(Spacer(1, 12))
|
||
|
||
# Executive summary
|
||
if summary := draft.get("executive_summary"):
|
||
elements.append(Paragraph("执行摘要", styles["CJKHeading"]))
|
||
elements.append(Paragraph(summary, styles["CJKBody"]))
|
||
elements.append(Spacer(1, 12))
|
||
|
||
# Chapters
|
||
for chapter in draft.get("chapters", []):
|
||
elements.append(PageBreak())
|
||
elements.append(Paragraph(chapter["title"], styles["CJKHeading"]))
|
||
content = chapter.get("content", "")
|
||
for para in content.split("\n\n"):
|
||
para = para.strip()
|
||
if para:
|
||
elements.append(Paragraph(para, styles["CJKBody"]))
|
||
|
||
# Tables
|
||
for table_spec in data_assets.get("tables", []):
|
||
elements.append(Spacer(1, 12))
|
||
elements.append(Paragraph(table_spec.get("title", ""), styles["CJKHeading"]))
|
||
headers = table_spec.get("headers", [])
|
||
rows = table_spec.get("rows", [])
|
||
if headers:
|
||
table_data = [headers] + rows
|
||
t = Table(table_data)
|
||
t.setStyle(TableStyle([
|
||
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#1a1a2e")),
|
||
("TEXTCOLOR", (0, 0), (-1, 0), colors.white),
|
||
("FONTNAME", (0, 0), (-1, -1), cjk_font),
|
||
("FONTSIZE", (0, 0), (-1, 0), 10),
|
||
("FONTSIZE", (0, 1), (-1, -1), 9),
|
||
("GRID", (0, 0), (-1, -1), 0.5, colors.grey),
|
||
("ALIGN", (0, 0), (-1, -1), "CENTER"),
|
||
("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#f5f5f5")]),
|
||
]))
|
||
elements.append(t)
|
||
|
||
doc.build(elements)
|
||
return path
|
||
|
||
async def _render_pdf_fpdf(
|
||
self, draft: dict, data_assets: dict, output_dir: Path, title: str
|
||
) -> Path:
|
||
"""Fallback PDF generation with fpdf2."""
|
||
from fpdf import FPDF
|
||
|
||
pdf = FPDF()
|
||
pdf.set_auto_page_break(auto=True, margin=15)
|
||
|
||
# Try CJK font
|
||
for font_path in [
|
||
"/System/Library/Fonts/STHeiti Medium.ttc",
|
||
"/System/Library/Fonts/PingFang.ttc",
|
||
]:
|
||
if Path(font_path).exists():
|
||
try:
|
||
pdf.add_font("CJK", "", font_path, uni=True)
|
||
pdf.set_font("CJK", "", 11)
|
||
break
|
||
except Exception:
|
||
pdf.set_font("Helvetica", "", 11)
|
||
else:
|
||
pdf.set_font("Helvetica", "", 11)
|
||
|
||
pdf.add_page()
|
||
pdf.set_font_size(24)
|
||
pdf.cell(0, 20, title, new_x="LMARGIN", new_y="NEXT", align="C")
|
||
|
||
pdf.set_font_size(11)
|
||
if summary := draft.get("executive_summary"):
|
||
pdf.set_font_size(16)
|
||
pdf.cell(0, 12, "执行摘要", new_x="LMARGIN", new_y="NEXT")
|
||
pdf.set_font_size(11)
|
||
pdf.multi_cell(0, 6, summary)
|
||
|
||
for chapter in draft.get("chapters", []):
|
||
pdf.add_page()
|
||
pdf.set_font_size(16)
|
||
pdf.cell(0, 12, chapter["title"], new_x="LMARGIN", new_y="NEXT")
|
||
pdf.set_font_size(11)
|
||
pdf.multi_cell(0, 6, chapter.get("content", ""))
|
||
|
||
path = output_dir / f"{title}.pdf"
|
||
pdf.output(str(path))
|
||
return path
|