From 345391d0050bab2af98c0fb2d759dd59bd008146 Mon Sep 17 00:00:00 2001 From: kang Date: Tue, 12 May 2026 16:55:37 +0800 Subject: [PATCH] auto-save 2026-05-12 16:55 (~4) --- .memory/worklog.json | 7 ++ api/main.py | 81 +++++++++++++------- web/components/nodes/index.tsx | 135 +++++++++++++++++++++------------ web/lib/api.ts | 10 +++ 4 files changed, 157 insertions(+), 76 deletions(-) diff --git a/.memory/worklog.json b/.memory/worklog.json index 75fb4ad..4af06c6 100644 --- a/.memory/worklog.json +++ b/.memory/worklog.json @@ -90,6 +90,13 @@ "message": "auto-save 2026-05-12 16:44 (~4)", "hash": "63552af", "files_changed": 4 + }, + { + "ts": "2026-05-12T16:50:05+08:00", + "type": "commit", + "message": "auto-save 2026-05-12 16:49 (~3)", + "hash": "4779c26", + "files_changed": 3 } ] } diff --git a/api/main.py b/api/main.py index 3409987..4799feb 100644 --- a/api/main.py +++ b/api/main.py @@ -39,12 +39,17 @@ def llm() -> OpenAI: _llm_client = OpenAI(base_url=LLM_BASE_URL or None, api_key=LLM_API_KEY) return _llm_client -# Pipeline 状态:created → downloading → splitting → frames_extracted → transcribing → transcribed | failed +# Pipeline 状态: +# created → downloading → downloaded(停,等用户点解析)→ splitting → frames_extracted +# → transcribing → transcribed | failed JobStatus = Literal[ - "created", "downloading", "splitting", "frames_extracted", + "created", "downloading", "downloaded", + "splitting", "frames_extracted", "transcribing", "transcribed", "failed", ] +KEYFRAME_COUNT = int(os.getenv("KEYFRAME_COUNT", "5")) + class KeyFrame(BaseModel): index: int @@ -134,15 +139,14 @@ def ffprobe_meta(mp4: Path) -> dict: return json.loads(out) -async def pipeline_download_split_frames(job_id: str) -> None: - """步骤 1+2+3:下载 + 拆音轨 + 抽取关键帧""" +async def pipeline_download(job_id: str) -> None: + """阶段 1:仅下载(或上传跳过),落 source.mp4,停在 downloaded 等用户点解析。""" job = JOBS[job_id] d = job_dir(job_id) try: mp4 = d / "source.mp4" - # ---- 1. yt-dlp 下载(上传模式 mp4 已存在 → 跳过) if mp4.exists(): - update(job, status="downloading", message="本地上传,跳过下载", progress=15) + update(job, status="downloading", message="本地上传 · 跳过下载", progress=15) else: update(job, status="downloading", message="yt-dlp 下载中…", progress=5) run([ @@ -155,22 +159,33 @@ async def pipeline_download_split_frames(job_id: str) -> None: if not mp4.exists(): raise RuntimeError("下载完成但找不到 source.mp4") - # 元数据 meta = ffprobe_meta(mp4) v_stream = next((s for s in meta["streams"] if s["codec_type"] == "video"), None) duration = float(meta["format"]["duration"]) update( job, + status="downloaded", video_url=f"/jobs/{job_id}/video.mp4", duration=duration, width=int(v_stream["width"]) if v_stream else 0, height=int(v_stream["height"]) if v_stream else 0, - progress=20, - message=f"下载完成 · {duration:.1f}s", + progress=25, + message=f"视频就绪 · {duration:.1f}s · 等待解析", ) + except Exception as e: + update(job, status="failed", error=str(e), message="下载失败") - # ---- 2. 拆音轨 - update(job, status="splitting", message="ffmpeg 拆分音轨…", progress=30) + +async def pipeline_analyze(job_id: str, frame_count: int = KEYFRAME_COUNT) -> None: + """阶段 2:拆音轨 + 抽关键帧 + ASR + 翻译。需要 source.mp4 已存在。""" + job = JOBS[job_id] + d = job_dir(job_id) + try: + mp4 = d / "source.mp4" + if not mp4.exists(): + raise RuntimeError("source.mp4 不存在,先完成下载") + + update(job, status="splitting", message="ffmpeg 拆分音轨…", progress=35) wav = d / "audio.wav" run([ "ffmpeg", "-y", "-i", str(mp4), @@ -178,32 +193,30 @@ async def pipeline_download_split_frames(job_id: str) -> None: str(wav), ]) - # ---- 3. 关键帧抽取(场景切换 + 均匀采样兜底,最多 10 张) - update(job, message="抽取关键帧…", progress=50) + n = max(1, min(int(frame_count), 20)) + update(job, message=f"抽取 {n} 张关键帧…", progress=50) frames_dir = d / "frames" if frames_dir.exists(): shutil.rmtree(frames_dir) frames_dir.mkdir(parents=True) - # 先用场景切换检测(失败时不阻塞,走均匀采样兜底) try: run([ "ffmpeg", "-y", "-i", str(mp4), "-vf", "select='gt(scene,0.4)'", "-fps_mode", "vfr", - "-frames:v", "30", - "-pix_fmt", "yuvj420p", # mjpeg encoder 要 JPEG full-range + "-frames:v", str(n * 3), + "-pix_fmt", "yuvj420p", "-q:v", "3", str(frames_dir / "scene_%03d.jpg"), ]) except Exception: - # 场景切换检测在某些纯合成 / 静态视频上会失败,让它静默走兜底 pass scene_frames = sorted(frames_dir.glob("scene_*.jpg")) - # 均匀采样兜底 / 补足 - if len(scene_frames) < 10: - sample_count = 10 - len(scene_frames) + if len(scene_frames) < n: + sample_count = n - len(scene_frames) + duration = job.duration or 1.0 step = duration / (sample_count + 1) for i in range(sample_count): t = step * (i + 1) @@ -215,15 +228,13 @@ async def pipeline_download_split_frames(job_id: str) -> None: "-q:v", "3", str(out), ]) - # 统一排序、按时间戳读取、限制 10 张 - all_frames = sorted(frames_dir.glob("*.jpg"))[:10] + all_frames = sorted(frames_dir.glob("*.jpg"))[:n] renamed: list[KeyFrame] = [] for i, src in enumerate(all_frames): dst = frames_dir / f"{i:03d}.jpg" if src != dst: src.rename(dst) - # 简化:用均匀分布估算时间戳(场景切换的精确时间需要解析 showinfo 输出,先省) - ts = duration * (i + 0.5) / max(len(all_frames), 1) + ts = (job.duration or 0) * (i + 0.5) / max(len(all_frames), 1) renamed.append(KeyFrame(index=i, timestamp=round(ts, 2), url=f"/jobs/{job_id}/frames/{i}.jpg")) update( @@ -234,8 +245,11 @@ async def pipeline_download_split_frames(job_id: str) -> None: message=f"已抽取 {len(renamed)} 张关键帧", ) + # 自动接 ASR + 翻译 + await pipeline_transcribe(job_id) + except Exception as e: - update(job, status="failed", error=str(e), message="管线失败") + update(job, status="failed", error=str(e), message="解析失败") # ---------- Gemini ASR + 翻译 ---------- @@ -378,7 +392,7 @@ async def create_job(req: CreateJobReq, bg: BackgroundTasks) -> Job: job = Job(id=job_id, url=req.url.strip()) JOBS[job_id] = job save_state(job) - bg.add_task(pipeline_download_split_frames, job_id) + bg.add_task(pipeline_download, job_id) return job @@ -386,7 +400,6 @@ async def create_job(req: CreateJobReq, bg: BackgroundTasks) -> Job: async def create_job_from_upload(bg: BackgroundTasks, file: UploadFile = File(...)) -> Job: if not file.filename: raise HTTPException(400, "file required") - # 简化:只验后缀,不嗅探 magic bytes ext = Path(file.filename).suffix.lower() if ext not in {".mp4", ".mov", ".webm", ".mkv", ".m4v"}: raise HTTPException(400, f"unsupported video format: {ext}") @@ -394,7 +407,6 @@ async def create_job_from_upload(bg: BackgroundTasks, file: UploadFile = File(.. job_id = uuid.uuid4().hex[:12] d = job_dir(job_id) mp4 = d / "source.mp4" - # 直接落盘(流式写入,避免全量进内存) with mp4.open("wb") as f: while chunk := await file.read(1024 * 1024): f.write(chunk) @@ -404,7 +416,18 @@ async def create_job_from_upload(bg: BackgroundTasks, file: UploadFile = File(.. job = Job(id=job_id, url=f"upload://{file.filename}") JOBS[job_id] = job save_state(job) - bg.add_task(pipeline_download_split_frames, job_id) + bg.add_task(pipeline_download, job_id) + return job + + +@app.post("/jobs/{job_id}/analyze", response_model=Job) +async def trigger_analyze(job_id: str, bg: BackgroundTasks, frames: int = KEYFRAME_COUNT) -> Job: + job = JOBS.get(job_id) + if not job: + raise HTTPException(404, "job not found") + if job.status not in {"downloaded", "frames_extracted", "transcribed", "failed"}: + raise HTTPException(409, f"status must be downloaded/failed, got {job.status}") + bg.add_task(pipeline_analyze, job_id, frames) return job diff --git a/web/components/nodes/index.tsx b/web/components/nodes/index.tsx index 0e3a638..9d54fd7 100644 --- a/web/components/nodes/index.tsx +++ b/web/components/nodes/index.tsx @@ -6,14 +6,16 @@ import { Mic, Languages, FileEdit, Sparkles, Film, FileVideo, Loader2, } from "lucide-react" import { NodeShell, type NodeStatus, type NodeKind } from "./node-shell" -import { type Job } from "@/lib/api" +import { type Job, videoUrl } from "@/lib/api" export interface NodeData { job: Job | null submitting: boolean + analyzing: boolean selectedFrames: Set onSubmitUrl: (url: string) => void onUploadFile: (file: File) => void + onAnalyze: () => void onToggleFrame: (idx: number) => void } @@ -24,7 +26,7 @@ function inputStatus(job: Job | null): NodeStatus { } function downloadStatus(job: Job | null): NodeStatus { if (!job) return "pending" - if (job.status === "failed" && job.progress < 20) return "failed" + if (job.status === "failed" && job.progress < 30) return "failed" if (job.status === "downloading") return "running" if (job.video_url) return "done" return "pending" @@ -58,58 +60,97 @@ export function InputNode({ data, selected }: NodeProps<{ data: NodeData }> | an const d: NodeData = data const [url, setUrl] = useState("") const fileRef = useRef(null) - const isLocked = !!d.job && d.job.status !== "failed" && d.job.status !== "transcribed" + const job = d.job + + // 是否已下载 → 显示视频 + 解析按钮 + const hasVideo = !!job?.video_url + const isDownloading = job?.status === "downloading" || job?.status === "created" + const isAnalyzing = !!job && ["splitting", "frames_extracted", "transcribing"].includes(job.status) + const isDone = job?.status === "transcribed" + const inputLocked = isDownloading || d.submitting + return ( } title="输入 · Input" - subtitle="STEP 1" - width={300} + subtitle={isDownloading ? "STEP 1 · 下载中" : hasVideo ? "STEP 1 · 视频就绪" : "STEP 1"} + width={320} selected={selected} hasTarget={false} > - setUrl(e.target.value)} - placeholder="粘贴 TikTok 链接" - disabled={isLocked} - className="w-full text-[12px] px-2.5 py-2 rounded-md bg-white/60 dark:bg-black/40 border border-black/10 dark:border-white/10 outline-none text-[var(--text-strong)] placeholder:text-[var(--text-faint)] focus:ring-2 focus:ring-[var(--ring)] disabled:opacity-40" - /> -
- - - { - const f = e.target.files?.[0] - if (f) d.onUploadFile(f) - e.target.value = "" - }} - /> -
- {d.job && ( -
- {d.job.url.startsWith("upload://") ? `📎 ${d.job.url.slice(9)}` : d.job.url} -
+ {/* 未下载:URL + 上传入口 */} + {!hasVideo && ( + <> + setUrl(e.target.value)} + placeholder="粘贴 TikTok 链接" + disabled={inputLocked} + className="w-full text-[12px] px-2.5 py-2 rounded-md bg-white/60 dark:bg-black/40 border border-black/10 dark:border-white/10 outline-none text-[var(--text-strong)] placeholder:text-[var(--text-faint)] focus:ring-2 focus:ring-[var(--ring)] disabled:opacity-40" + /> +
+ + + { + const f = e.target.files?.[0] + if (f) d.onUploadFile(f) + e.target.value = "" + }} + /> +
+ + )} + + {/* 已下载:内嵌视频 + 解析按钮 */} + {hasVideo && job && ( + <> +
) diff --git a/web/lib/api.ts b/web/lib/api.ts index e816490..3436758 100644 --- a/web/lib/api.ts +++ b/web/lib/api.ts @@ -3,6 +3,7 @@ const API_BASE = process.env.NEXT_PUBLIC_API_BASE ?? "http://localhost:4291" export type JobStatus = | "created" | "downloading" + | "downloaded" | "splitting" | "frames_extracted" | "transcribing" @@ -74,6 +75,15 @@ export async function triggerTranscribe(id: string): Promise { return res.json() } +export async function analyzeJob(id: string, frames = 5): Promise { + const res = await fetch(`${API_BASE}/jobs/${id}/analyze?frames=${frames}`, { method: "POST" }) + if (!res.ok) { + const t = await res.text().catch(() => "") + throw new Error(`analyze ${res.status} ${t.slice(0, 200)}`) + } + return res.json() +} + export function frameUrl(jobId: string, frameIndex: number): string { return `${API_BASE}/jobs/${jobId}/frames/${frameIndex}.jpg` }