auto-save 2026-05-14 11:15 (~5)

This commit is contained in:
2026-05-14 11:16:12 +08:00
parent 72702221a9
commit 4127adc5e7
5 changed files with 129 additions and 78 deletions

View File

@@ -20,7 +20,7 @@ uvicorn main:app --host 127.0.0.1 --port 4291
- `GET /health` — 健康检查 + 配置状态
- `POST /jobs` `{url}` — 创建 job后台下载源视频视频就绪后可手动解析或提取音频
- `GET /jobs/{id}` — 当前状态 + 产物
- `POST /jobs/{id}/transcribe` — 触发音频提取 + ASR + 翻译 + SKG 文案改写;配置 MiniMax 后生成配音。前端 Audio 节点提供“提取音频 / 重新提取音频”按钮,不依赖抽帧完成
- `POST /jobs/{id}/transcribe` — 触发音频提取 + ASR + 翻译 + SKG 文案改写;配置 MiniMax 后生成配音。前端 Audio 节点提供“提取音频 / 重新提取音频”按钮,可与抽帧并行,不自动触发
- `GET /jobs/{id}/video.mp4` — 原视频
- `GET /jobs/{id}/audio.wav` — 拆轨后的原始音频,供前端底部音频条生成波形
- `GET /jobs/{id}/audio-script.mp3` — 改写文案的 MiniMax 配音

View File

@@ -6,6 +6,7 @@ import json
import os
import shutil
import subprocess
import threading
import time
import uuid
from contextlib import asynccontextmanager
@@ -388,6 +389,8 @@ class Job(BaseModel):
JOBS: dict[str, Job] = {}
ANALYZE_QUEUE: list[AnalyzeTask] = []
ANALYZE_WORKER_RUNNING = False
AUDIO_WORKERS_RUNNING: set[str] = set()
AUDIO_WORKERS_LOCK = threading.Lock()
def job_dir(job_id: str) -> Path:
@@ -974,8 +977,8 @@ def _target_score(item: dict, target: FrameExtractTarget) -> float:
motion = float(item.get("motion_n", 0.0))
if target == "transparent_human":
# 透明骨架人仍先依赖本地清晰度 / 中心主体 / 对比度筛候选,
# 后续再交给 Vision 逐张语义验收
# 当前抽帧阶段走本地算力:优先清晰中心主体、高对比、适度色彩和时间覆盖。
# 透明骨架人的语义判断留给后续审核/识别,不在抽帧阶段逐帧调用 Vision
score = center * 0.45 + sharp * 0.30 + contrast * 0.15 + color * 0.10
elif target == "subject":
score = center * 0.48 + sharp * 0.25 + contrast * 0.17 + color * 0.10
@@ -1217,7 +1220,6 @@ def pipeline_analyze(
"-vn", "-ac", "1", "-ar", "16000", "-c:a", "pcm_s16le",
str(wav),
])
n = max(1, min(int(frame_count), 20))
target_label = FRAME_TARGET_LABELS.get(target, FRAME_TARGET_LABELS["balanced"])
duration = max(float(job.duration or 1.0), 0.1)
@@ -1260,16 +1262,11 @@ def pipeline_analyze(
raise RuntimeError("候选帧评分失败")
# 2) 目标化筛选pHash 去重 + 清晰度 / 中心细节 / 转场变化 / 动作强度。
# 透明骨架人目标会先扩大候选池,再用 Vision 逐张验收;不合格自动换下一帧
semantic_transparent = target == "transparent_human"
if semantic_transparent:
selection_count = min(len(candidates), min(max(n * 10, 24), 48))
update(job, message=f"{quality_label}筛选透明骨架人候选 · 本地 {selection_count} / {len(candidates)} 张…", progress=58)
chosen = _rank_keyframe_candidates(candidates, target, selection_count)
else:
selection_count = n if replacing else min(len(candidates), max(n * 4, n + len(existing_frames) + 2))
update(job, message=f"{quality_label}筛选 · {target_label} · {n} / {len(candidates)} 张…", progress=60)
chosen = _select_keyframes(candidates, selection_count, target)
# 抽帧阶段只走本机算力,不逐帧调用 Vision语义审核留到后续素材准备
semantic_transparent = False
selection_count = n if replacing else min(len(candidates), max(n * 4, n + len(existing_frames) + 2))
update(job, message=f"{quality_label}本地筛选 · {target_label} · {n} / {len(candidates)} 张…", progress=60)
chosen = _select_keyframes(candidates, selection_count, target)
# 3) 只对最终选中的时间点,从原视频抽高质量关键帧。
renamed: list[KeyFrame] = []
@@ -1558,16 +1555,20 @@ def _build_audio_script_sync(job_id: str, segments: list[TranscriptSegment]) ->
)
async def pipeline_transcribe(job_id: str) -> None:
def pipeline_transcribe(job_id: str, manage_job_status: bool = True) -> None:
job = JOBS[job_id]
d = job_dir(job_id)
wav = d / "audio.wav"
def progress(message: str, value: int) -> None:
if manage_job_status:
update(job, status="transcribing", message=message, progress=value, error="")
try:
if not wav.exists():
mp4 = d / "source.mp4"
if not mp4.exists():
raise RuntimeError("source.mp4 不存在,视频导入完成后再提取音频")
update(job, status="transcribing", message="ffmpeg 提取音频轨…", progress=max(45, min(job.progress, 70)), error="")
progress("ffmpeg 提取音频轨…", max(45, min(job.progress, 70)))
run([
"ffmpeg", "-y", "-i", str(mp4),
"-vn", "-ac", "1", "-ar", "16000", "-c:a", "pcm_s16le",
@@ -1578,8 +1579,8 @@ async def pipeline_transcribe(job_id: str) -> None:
if not LLM_API_KEY:
# 无 key 模式mock 数据
update(job, status="transcribing", message="ASR (mock) …", progress=75)
await asyncio.sleep(1.0)
progress("ASR (mock) …", 75)
time.sleep(1.0)
mock = [
TranscriptSegment(index=0, start=0.0, end=3.5,
en="Welcome back, today we're testing something new.",
@@ -1588,10 +1589,9 @@ async def pipeline_transcribe(job_id: str) -> None:
en="This device looks really sleek and minimal.",
zh="这个设备看起来非常时尚和简约。"),
]
update(
job,
transcript=mock,
audio_script=AudioScript(
update_kwargs = {
"transcript": mock,
"audio_script": AudioScript(
status="rewriting",
source_text=_transcript_join(mock, "en"),
source_zh=_transcript_join(mock, "zh"),
@@ -1601,18 +1601,22 @@ async def pipeline_transcribe(job_id: str) -> None:
voice_model=MINIMAX_TTS_MODEL,
voice_id=MINIMAX_TTS_VOICE_ID,
),
message="ASR mock 完成,生成 SKG 改写文案…",
progress=92,
)
audio_script = await asyncio.to_thread(_build_audio_script_sync, job_id, mock)
update(job, transcript=mock, status="transcribed", progress=100,
audio_script=audio_script,
message="转录完成MOCK · 未设 LLM_API_KEY")
}
if manage_job_status:
update_kwargs.update(message="ASR mock 完成,生成 SKG 改写文案…", progress=92)
update(job, **update_kwargs)
audio_script = _build_audio_script_sync(job_id, mock)
if manage_job_status:
update(job, transcript=mock, status="transcribed", progress=100,
audio_script=audio_script,
message="转录完成MOCK · 未设 LLM_API_KEY")
else:
update(job, transcript=mock, audio_script=audio_script)
return
# 1) whisper ASR
update(job, status="transcribing", message=f"{ASR_MODEL} 转录中…", progress=78)
segments = await asyncio.to_thread(_transcribe_sync, wav)
progress(f"{ASR_MODEL} 转录中…", 78)
segments = _transcribe_sync(wav)
if not segments:
raise RuntimeError("ASR 返回 0 段(可能无人声 / 格式问题)")
@@ -1627,10 +1631,13 @@ async def pipeline_transcribe(job_id: str) -> None:
)
for i, s in enumerate(segments)
]
update(job, transcript=en_only, message=f"ASR 完成 · {len(en_only)} 段,开始翻译…", progress=88)
if manage_job_status:
update(job, transcript=en_only, message=f"ASR 完成 · {len(en_only)} 段,开始翻译…", progress=88)
else:
update(job, transcript=en_only)
# 2) Gemini 翻译
zh_list = await asyncio.to_thread(_translate_sync, segments)
zh_list = _translate_sync(segments)
full = [
TranscriptSegment(
index=seg.index, start=seg.start, end=seg.end, en=seg.en,
@@ -1638,10 +1645,9 @@ async def pipeline_transcribe(job_id: str) -> None:
)
for i, seg in enumerate(en_only)
]
update(
job,
transcript=full,
audio_script=AudioScript(
update_kwargs = {
"transcript": full,
"audio_script": AudioScript(
status="rewriting",
source_text=_transcript_join(full, "en"),
source_zh=_transcript_join(full, "zh"),
@@ -1651,22 +1657,58 @@ async def pipeline_transcribe(job_id: str) -> None:
voice_model=MINIMAX_TTS_MODEL,
voice_id=MINIMAX_TTS_VOICE_ID,
),
message="翻译完成,生成 SKG 改写文案与 MiniMax 配音…",
progress=94,
)
audio_script = await asyncio.to_thread(_build_audio_script_sync, job_id, full)
update(job, transcript=full, status="transcribed", progress=100,
audio_script=audio_script,
message=f"转录完成 · {len(full)} 段({ASR_MODEL} + {TRANSLATE_MODEL}")
}
if manage_job_status:
update_kwargs.update(message="翻译完成,生成 SKG 改写文案与 MiniMax 配音…", progress=94)
update(job, **update_kwargs)
audio_script = _build_audio_script_sync(job_id, full)
if manage_job_status:
update(job, transcript=full, status="transcribed", progress=100,
audio_script=audio_script,
message=f"转录完成 · {len(full)} 段({ASR_MODEL} + {TRANSLATE_MODEL}")
else:
update(job, transcript=full, audio_script=audio_script)
except Exception as e:
update(
job,
status="failed",
audio_script=AudioScript(status="failed", error=str(e), created_at=time.time()),
error=str(e),
message="转录失败",
)
if manage_job_status:
update(
job,
status="failed",
audio_script=AudioScript(status="failed", error=str(e), created_at=time.time()),
error=str(e),
message="转录失败",
)
else:
update(job, audio_script=AudioScript(status="failed", error=str(e), created_at=time.time()))
def _audio_processing_worker(job_id: str, manage_job_status: bool) -> None:
try:
pipeline_transcribe(job_id, manage_job_status=manage_job_status)
finally:
with AUDIO_WORKERS_LOCK:
AUDIO_WORKERS_RUNNING.discard(job_id)
def start_audio_processing(job_id: str, manage_job_status: bool = True) -> bool:
job = JOBS.get(job_id)
if not job:
return False
if not manage_job_status:
has_audio_output = bool(job.transcript) or bool(job.audio_script.rewritten_text)
if has_audio_output or job.audio_script.status == "rewriting":
return False
with AUDIO_WORKERS_LOCK:
if job_id in AUDIO_WORKERS_RUNNING:
return False
AUDIO_WORKERS_RUNNING.add(job_id)
threading.Thread(
target=_audio_processing_worker,
args=(job_id, manage_job_status),
daemon=True,
name=f"audio-{job_id}",
).start()
return True
def _image_edit_call(
@@ -2018,10 +2060,23 @@ async def trigger_transcribe(job_id: str, bg: BackgroundTasks) -> Job:
mp4 = job_dir(job_id) / "source.mp4"
if job.status in {"created", "downloading"} or not mp4.exists():
raise HTTPException(409, f"video not ready, got {job.status}")
if job.status in {"splitting", "transcribing"} or job.audio_script.status == "rewriting":
if job.status == "transcribing" or job.audio_script.status == "rewriting" or job_id in AUDIO_WORKERS_RUNNING:
raise HTTPException(409, f"job is busy, got {job.status}")
update(job, status="transcribing", progress=max(45, min(job.progress, 70)), error="", message="准备提取音频…")
bg.add_task(pipeline_transcribe, job_id)
manage_job_status = job.status != "splitting"
audio_payload = AudioScript(
status="rewriting",
product_brief=AUDIO_PRODUCT_BRIEF,
rewrite_model=AUDIO_REWRITE_MODEL,
voice_provider="minimax",
voice_model=MINIMAX_TTS_MODEL,
voice_id=MINIMAX_TTS_VOICE_ID,
)
if manage_job_status:
update(job, status="transcribing", progress=max(45, min(job.progress, 70)), error="", message="准备提取音频…", audio_script=audio_payload)
else:
update(job, error="", audio_script=audio_payload)
if not start_audio_processing(job_id, manage_job_status=manage_job_status):
update(job, message="音频已在处理中")
return job