auto-save 2026-05-14 04:32 (~5)

This commit is contained in:
2026-05-14 04:32:27 +08:00
parent 8f2b8d373c
commit 4935e34eb0
5 changed files with 106 additions and 21 deletions

View File

@@ -90,7 +90,8 @@ JobStatus = Literal[
KEYFRAME_COUNT = int(os.getenv("KEYFRAME_COUNT", "5"))
FrameExtractTarget = Literal["balanced", "subject", "transition", "expression", "motion"]
FrameExtractMode = Literal["replace", "append"]
FrameExtractQuality = Literal["fast", "accurate", "ultra"]
FrameExtractQuality = Literal["auto", "fast", "accurate", "ultra"]
AnalyzeTask = tuple[str, int, FrameExtractTarget, FrameExtractMode, FrameExtractQuality]
FRAME_TARGET_LABELS: dict[FrameExtractTarget, str] = {
"balanced": "综合关键帧",
"subject": "清晰主体",
@@ -99,6 +100,7 @@ FRAME_TARGET_LABELS: dict[FrameExtractTarget, str] = {
"motion": "动作峰值",
}
FRAME_QUALITY_LABELS: dict[FrameExtractQuality, str] = {
"auto": "自动",
"fast": "快速",
"accurate": "精细",
"ultra": "极准",
@@ -221,6 +223,8 @@ class Job(BaseModel):
JOBS: dict[str, Job] = {}
ANALYZE_QUEUE: list[AnalyzeTask] = []
ANALYZE_WORKER_RUNNING = False
def job_dir(job_id: str) -> Path:
@@ -441,6 +445,30 @@ def _frame_metrics(img_path: Path, idx: int, timestamp: float, metric_width: int
}
def _physical_memory_gb() -> float:
try:
page_size = os.sysconf("SC_PAGE_SIZE")
pages = os.sysconf("SC_PHYS_PAGES")
return float(page_size * pages) / (1024 ** 3)
except Exception:
return 0.0
def _resolve_frame_quality(duration: float, quality: FrameExtractQuality) -> FrameExtractQuality:
if quality != "auto":
return quality
cores = os.cpu_count() or 4
memory_gb = _physical_memory_gb()
strong_machine = cores >= 10 and (memory_gb == 0.0 or memory_gb >= 32)
if strong_machine and duration <= 180:
return "ultra"
if strong_machine and duration <= 600:
return "accurate"
if cores >= 8 and duration <= 240:
return "accurate"
return "fast"
def _scan_profile(duration: float, quality: FrameExtractQuality) -> tuple[float, int, int, int]:
"""返回 scan_fps / scan_width / metric_width / estimated_count。"""
if quality == "ultra":
@@ -607,7 +635,7 @@ async def pipeline_analyze(
frame_count: int = KEYFRAME_COUNT,
target: FrameExtractTarget = "balanced",
mode: FrameExtractMode = "replace",
quality: FrameExtractQuality = "accurate",
quality: FrameExtractQuality = "auto",
) -> None:
"""阶段 2拆音轨 + 抽关键帧。ASR/翻译是独立文案轨,不阻塞视觉素材流。"""
job = JOBS[job_id]
@@ -630,9 +658,11 @@ async def pipeline_analyze(
n = max(1, min(int(frame_count), 20))
target_label = FRAME_TARGET_LABELS.get(target, FRAME_TARGET_LABELS["balanced"])
quality_label = FRAME_QUALITY_LABELS.get(quality, FRAME_QUALITY_LABELS["accurate"])
duration = max(float(job.duration or 1.0), 0.1)
scan_fps, scan_width, metric_width, estimated_scan_count = _scan_profile(duration, quality)
effective_quality = _resolve_frame_quality(duration, quality)
effective_quality_label = FRAME_QUALITY_LABELS.get(effective_quality, FRAME_QUALITY_LABELS["accurate"])
quality_label = f"自动·{effective_quality_label}" if quality == "auto" else effective_quality_label
scan_fps, scan_width, metric_width, estimated_scan_count = _scan_profile(duration, effective_quality)
update(job, message=f"本地{quality_label}扫描 · {target_label} · 约 {estimated_scan_count} 帧…", progress=45)
frames_dir = d / "frames"
@@ -716,6 +746,24 @@ async def pipeline_analyze(
update(job, status="failed", error=str(e), message="解析失败")
async def analyze_queue_worker() -> None:
global ANALYZE_WORKER_RUNNING
ANALYZE_WORKER_RUNNING = True
try:
while ANALYZE_QUEUE:
job_id, frames, target, mode, quality = ANALYZE_QUEUE.pop(0)
if job_id not in JOBS:
continue
await pipeline_analyze(job_id, frames, target, mode, quality)
if ANALYZE_QUEUE:
for pos, (queued_job_id, *_rest) in enumerate(ANALYZE_QUEUE, start=1):
queued_job = JOBS.get(queued_job_id)
if queued_job:
update(queued_job, status="splitting", progress=30, message=f"排队等待抽帧 · 前方 {pos - 1} 个任务")
finally:
ANALYZE_WORKER_RUNNING = False
# ---------- Gemini ASR + 翻译 ----------
def _transcribe_sync(wav: Path) -> list[dict]:
@@ -1084,14 +1132,23 @@ async def trigger_analyze(
frames: int = KEYFRAME_COUNT,
target: FrameExtractTarget = "balanced",
mode: FrameExtractMode = "replace",
quality: FrameExtractQuality = "accurate",
quality: FrameExtractQuality = "auto",
) -> Job:
job = JOBS.get(job_id)
if not job:
raise HTTPException(404, "job not found")
if job.status not in {"downloaded", "frames_extracted", "transcribed", "failed"}:
raise HTTPException(409, f"status must be downloaded/failed, got {job.status}")
bg.add_task(pipeline_analyze, job_id, frames, target, mode, quality)
ANALYZE_QUEUE.append((job_id, frames, target, mode, quality))
position = len(ANALYZE_QUEUE)
update(
job,
status="splitting",
progress=30,
message="排队等待抽帧" if ANALYZE_WORKER_RUNNING or position > 1 else "准备抽帧…",
)
if not ANALYZE_WORKER_RUNNING:
bg.add_task(analyze_queue_worker)
return job