Files
20250715-66bfff96/代码实现/document_exporter.py
2026-04-25 19:21:03 +08:00

370 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
文档导出器 - 将搜索结果导出为DOCX格式
"""
import logging
from datetime import datetime
from typing import List, Dict, Optional
from pathlib import Path
try:
from docx import Document
from docx.shared import Inches
from docx.enum.style import WD_STYLE_TYPE
from docx.enum.text import WD_ALIGN_PARAGRAPH
from docx.oxml.shared import OxmlElement, qn
except ImportError:
print("需要安装 python-docx: pip install python-docx")
raise
from database import DatabaseManager
from config import EXPORT_CONFIG, EXPORT_DIR
class DocumentExporter:
"""文档导出器"""
def __init__(self):
self.db = DatabaseManager()
self.logger = logging.getLogger(__name__)
self.export_dir = EXPORT_DIR
self.export_dir.mkdir(exist_ok=True)
def export_search_results(self, search_log_id: int,
custom_filename: str = None) -> Dict:
"""导出搜索结果为DOCX文档"""
try:
# 获取搜索记录和结果
search_log = self._get_search_log(search_log_id)
if not search_log:
return {'success': False, 'error': '搜索记录不存在'}
results = self._get_search_results(search_log_id)
if not results:
return {'success': False, 'error': '没有搜索结果可导出'}
# 生成文件名
filename = self._generate_filename(search_log, custom_filename)
file_path = self.export_dir / filename
# 创建文档
doc = self._create_document(search_log, results)
# 保存文档
doc.save(file_path)
# 记录导出信息
doc_id = self.db.save_exported_doc(
search_log_id, filename, str(file_path), len(results)
)
self.logger.info(f"文档导出成功: {filename}")
return {
'success': True,
'filename': filename,
'file_path': str(file_path),
'articles_count': len(results),
'doc_id': doc_id
}
except Exception as e:
self.logger.error(f"文档导出失败: {e}")
return {'success': False, 'error': str(e)}
def _get_search_log(self, search_log_id: int) -> Optional[Dict]:
"""获取搜索记录"""
try:
conn = self.db._get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT sl.*, i.name_cn as industry_name, i.name_en as industry_en
FROM search_logs sl
LEFT JOIN industries i ON sl.industry_id = i.id
WHERE sl.id = ?
""", (search_log_id,))
result = cursor.fetchone()
return dict(result) if result else None
except Exception as e:
self.logger.error(f"获取搜索记录失败: {e}")
return None
def _get_search_results(self, search_log_id: int) -> List[Dict]:
"""获取搜索结果"""
try:
conn = self.db._get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT a.*, rs.source_name, rs.authority_level, sr.relevance_score, sr.rank_position
FROM search_results sr
JOIN articles a ON sr.article_id = a.id
JOIN rss_sources rs ON a.source_id = rs.id
WHERE sr.search_log_id = ?
ORDER BY sr.rank_position ASC
""", (search_log_id,))
return [dict(row) for row in cursor.fetchall()]
except Exception as e:
self.logger.error(f"获取搜索结果失败: {e}")
return []
def _generate_filename(self, search_log: Dict, custom_filename: str = None) -> str:
"""生成文件名"""
if custom_filename:
if not custom_filename.endswith('.docx'):
custom_filename += '.docx'
return custom_filename
# 自动生成文件名
date_str = datetime.now().strftime('%Y%m%d')
keywords = search_log.get('keywords', '').replace(' ', '_')[:20]
industry = search_log.get('industry_en', 'general')
language = search_log.get('language', 'en')
# 根据语言选择文件名格式
if language == 'cn':
filename = f"{date_str}_{industry}_{keywords}_CN.docx"
else:
filename = f"{date_str}_{industry}_{keywords}.docx"
# 确保文件名安全
filename = self._sanitize_filename(filename)
return filename
def _sanitize_filename(self, filename: str) -> str:
"""清理文件名"""
import re
# 移除不安全字符
filename = re.sub(r'[<>:"/\\|?*]', '_', filename)
# 限制长度
if len(filename) > 100:
name, ext = filename.rsplit('.', 1)
filename = name[:90] + '.' + ext
return filename
def _create_document(self, search_log: Dict, results: List[Dict]) -> Document:
"""创建DOCX文档"""
doc = Document()
# 设置文档样式
self._setup_document_styles(doc)
# 添加标题
self._add_title(doc, search_log)
# 添加搜索信息
self._add_search_info(doc, search_log)
# 添加搜索结果
self._add_search_results(doc, results)
# 添加页脚
self._add_footer(doc)
return doc
def _setup_document_styles(self, doc: Document):
"""设置文档样式"""
try:
# 标题样式
title_style = doc.styles.add_style('CustomTitle', WD_STYLE_TYPE.PARAGRAPH)
title_font = title_style.font
title_font.size = Inches(0.2)
title_font.bold = True
title_style.paragraph_format.alignment = WD_ALIGN_PARAGRAPH.CENTER
# 文章标题样式
article_title_style = doc.styles.add_style('ArticleTitle', WD_STYLE_TYPE.PARAGRAPH)
article_title_font = article_title_style.font
article_title_font.size = Inches(0.15)
article_title_font.bold = True
# 来源信息样式
source_style = doc.styles.add_style('SourceInfo', WD_STYLE_TYPE.PARAGRAPH)
source_font = source_style.font
source_font.size = Inches(0.1)
source_font.italic = True
except Exception as e:
# 如果样式已存在,忽略错误
pass
def _add_title(self, doc: Document, search_log: Dict):
"""添加文档标题"""
keywords = search_log.get('keywords', '')
industry_name = search_log.get('industry_name', '通用')
date_str = datetime.now().strftime('%Y年%m月%d')
if search_log.get('language') == 'cn':
title = f"{industry_name}行业搜索报告\n关键词: {keywords}\n{date_str}"
else:
title = f"{search_log.get('industry_en', 'General')} Industry Search Report\nKeywords: {keywords}\n{date_str}"
try:
title_para = doc.add_paragraph(title, style='CustomTitle')
except:
title_para = doc.add_paragraph(title)
title_para.alignment = WD_ALIGN_PARAGRAPH.CENTER
doc.add_paragraph() # 空行
def _add_search_info(self, doc: Document, search_log: Dict):
"""添加搜索信息"""
search_time = search_log.get('search_time', '')
if search_time:
search_time = datetime.fromisoformat(search_time.replace('Z', '')).strftime('%Y-%m-%d %H:%M:%S')
info_lines = [
f"搜索时间: {search_time}",
f"关键词: {search_log.get('keywords', '')}",
f"搜索行业: {search_log.get('industry_name', '全部')}",
f"搜索语言: {'中文' if search_log.get('language') == 'cn' else '英文'}",
f"结果数量: {search_log.get('results_count', 0)}"
]
info_para = doc.add_paragraph()
for line in info_lines:
info_para.add_run(line + '\n')
doc.add_paragraph() # 空行
doc.add_paragraph("="*50) # 分隔线
doc.add_paragraph()
def _add_search_results(self, doc: Document, results: List[Dict]):
"""添加搜索结果"""
for i, result in enumerate(results, 1):
# 文章标题
title = result.get('title', '无标题')
try:
title_para = doc.add_paragraph(f"{i}. {title}", style='ArticleTitle')
except:
title_para = doc.add_paragraph(f"{i}. {title}")
title_para.runs[0].bold = True
# 来源信息
source_info = self._format_source_info(result)
try:
source_para = doc.add_paragraph(source_info, style='SourceInfo')
except:
source_para = doc.add_paragraph(source_info)
source_para.runs[0].italic = True
# 文章摘要
summary = result.get('summary', result.get('content', ''))
if summary:
# 限制摘要长度
if len(summary) > 300:
summary = summary[:300] + '...'
doc.add_paragraph(summary)
# 原文链接
url = result.get('original_url', '')
if url and EXPORT_CONFIG.get('include_source_links', True):
link_para = doc.add_paragraph(f"原文链接: {url}")
link_para.runs[0].font.color.rgb = None # 蓝色链接
doc.add_paragraph() # 空行分隔
# 分页每5篇文章一页
if i % 5 == 0 and i < len(results):
doc.add_page_break()
def _format_source_info(self, result: Dict) -> str:
"""格式化来源信息"""
source_name = result.get('source_name', '未知来源')
author = result.get('author', '')
published_date = result.get('published_date', '')
authority_level = result.get('authority_level', 3)
relevance_score = result.get('relevance_score', 0)
# 权威级别文本
authority_map = {1: '官方机构', 2: '主流媒体', 3: '专业平台', 4: '其他'}
authority_text = authority_map.get(authority_level, '其他')
# 格式化日期
if published_date:
try:
if isinstance(published_date, str):
pub_date = datetime.fromisoformat(published_date.replace('Z', ''))
else:
pub_date = published_date
date_str = pub_date.strftime('%Y-%m-%d')
except:
date_str = str(published_date)
else:
date_str = '未知日期'
info_parts = [
f"来源: {source_name} ({authority_text})",
f"发布时间: {date_str}",
f"相关性: {relevance_score:.2f}"
]
if author:
info_parts.insert(1, f"作者: {author}")
return " | ".join(info_parts)
def _add_footer(self, doc: Document):
"""添加页脚"""
doc.add_paragraph()
doc.add_paragraph("="*50)
footer_text = f"本报告由智能搜索系统生成 | 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
footer_para = doc.add_paragraph(footer_text)
footer_para.alignment = WD_ALIGN_PARAGRAPH.CENTER
def get_export_history(self, limit: int = 20) -> List[Dict]:
"""获取导出历史"""
try:
conn = self.db._get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT ed.*, sl.keywords, sl.search_time
FROM exported_docs ed
JOIN search_logs sl ON ed.search_log_id = sl.id
ORDER BY ed.created_at DESC
LIMIT ?
""", (limit,))
return [dict(row) for row in cursor.fetchall()]
except Exception as e:
self.logger.error(f"获取导出历史失败: {e}")
return []
def delete_exported_file(self, doc_id: int) -> Dict:
"""删除导出的文件"""
try:
conn = self.db._get_connection()
cursor = conn.cursor()
# 获取文件信息
cursor.execute("SELECT file_path FROM exported_docs WHERE id = ?", (doc_id,))
result = cursor.fetchone()
if not result:
return {'success': False, 'error': '文档记录不存在'}
file_path = Path(result['file_path'])
# 删除文件
if file_path.exists():
file_path.unlink()
# 删除数据库记录
cursor.execute("DELETE FROM exported_docs WHERE id = ?", (doc_id,))
conn.commit()
return {'success': True, 'message': '文件删除成功'}
except Exception as e:
self.logger.error(f"删除文件失败: {e}")
return {'success': False, 'error': str(e)}