auto-save 2026-05-12 17:28 (~6)

This commit is contained in:
2026-05-12 17:28:54 +08:00
parent e6b8615e3a
commit 6a9abeabc0
6 changed files with 210 additions and 22 deletions

View File

@@ -132,6 +132,77 @@ def run(cmd: list[str], cwd: Path | None = None) -> str:
return res.stdout
# ---- 启发式选帧工具 ----
import imagehash
import numpy as np
from PIL import Image
def _sharpness(img_path: Path) -> float:
"""Laplacian variance值越大越清晰模糊/转场帧值低。"""
g = np.asarray(Image.open(img_path).convert("L").resize((320, 180)), dtype=np.float32)
lap = (-4 * g[1:-1, 1:-1]
+ g[:-2, 1:-1] + g[2:, 1:-1] + g[1:-1, :-2] + g[1:-1, 2:])
return float(lap.var())
def _select_keyframes(candidates: list[Path], n: int, dup_threshold: int = 8) -> list[Path]:
"""
candidates: 按时间排序的候选帧路径
n: 目标帧数
dup_threshold: pHash 汉明距离 < 此值视为相似(默认 864bit hash 大致 ~12.5% 像素差)
"""
if len(candidates) <= n:
return candidates
# 算 pHash + sharpness
items = []
for i, p in enumerate(candidates):
try:
img = Image.open(p)
h = imagehash.phash(img)
s = _sharpness(p)
items.append({"path": p, "idx": i, "hash": h, "sharp": s})
except Exception:
continue
# 去重:相似帧保留 sharpness 高的
deduped: list[dict] = []
for it in items:
dup = None
for kept in deduped:
if (it["hash"] - kept["hash"]) < dup_threshold:
dup = kept
break
if dup is None:
deduped.append(it)
elif it["sharp"] > dup["sharp"]:
deduped[deduped.index(dup)] = it
# 时序分桶:把候选时间轴等分 n 段,每段取去重后 sharpness 最高的
total = len(candidates)
buckets: list[list[dict]] = [[] for _ in range(n)]
for it in deduped:
b = min(int(it["idx"] * n / total), n - 1)
buckets[b].append(it)
selected: list[dict] = []
for b in buckets:
if b:
selected.append(max(b, key=lambda x: x["sharp"]))
# 空桶补足:从未选的 deduped 里按 sharpness 排序补
chosen_paths = {it["path"] for it in selected}
remaining = sorted([it for it in deduped if it["path"] not in chosen_paths],
key=lambda x: -x["sharp"])
while len(selected) < n and remaining:
selected.append(remaining.pop(0))
# 按时间排序输出
selected.sort(key=lambda x: x["idx"])
return [it["path"] for it in selected]
def ffprobe_meta(mp4: Path) -> dict:
out = run([
"ffprobe", "-v", "error", "-print_format", "json", "-show_streams", "-show_format", str(mp4),
@@ -194,37 +265,55 @@ async def pipeline_analyze(job_id: str, frame_count: int = KEYFRAME_COUNT) -> No
])
n = max(1, min(int(frame_count), 20))
update(job, message=f"抽取 {n} 张关键帧(均匀采样)…", progress=50)
# 候选数n 的 6 倍或至少 24封顶 60
candidate_count = max(24, min(60, n * 6))
update(job, message=f"抽取候选 {candidate_count} 张…", progress=45)
frames_dir = d / "frames"
if frames_dir.exists():
shutil.rmtree(frames_dir)
frames_dir.mkdir(parents=True)
cand_dir = d / "candidates"
if cand_dir.exists():
shutil.rmtree(cand_dir)
cand_dir.mkdir(parents=True)
# 均匀采样:在 duration / (n+1) 的等距时间点各抽 1 帧
# 用 -ss 在 -i 前 = fast seek每张 < 0.5s
# 1) 均匀采样大批候选fast seek每张 < 0.5s
duration = max(float(job.duration or 1.0), 0.1)
step = duration / (n + 1)
for i in range(n):
step = duration / (candidate_count + 1)
candidate_meta: list[tuple[Path, float]] = [] # (path, timestamp)
for i in range(candidate_count):
t = step * (i + 1)
out = frames_dir / f"sample_{i:03d}.jpg"
out = cand_dir / f"c_{i:03d}.jpg"
run([
"ffmpeg", "-y",
"-ss", str(t),
"-i", str(mp4),
"ffmpeg", "-y", "-ss", str(t), "-i", str(mp4),
"-frames:v", "1",
"-pix_fmt", "yuvj420p",
"-q:v", "3",
"-pix_fmt", "yuvj420p", "-q:v", "3",
str(out),
])
if out.exists():
candidate_meta.append((out, t))
all_frames = sorted(frames_dir.glob("*.jpg"))[:n]
# 2) D 启发式选 n 张pHash 去重 + Laplacian 清晰度 + 时序分桶
update(job, message=f"启发式筛选 {n} / {len(candidate_meta)} 张…", progress=60)
cand_paths = [m[0] for m in candidate_meta]
ts_by_path = {m[0]: m[1] for m in candidate_meta}
chosen = _select_keyframes(cand_paths, n)
# 3) 落盘到 frames/<idx>.jpg
renamed: list[KeyFrame] = []
for i, src in enumerate(all_frames):
chosen_sorted = sorted(chosen, key=lambda p: ts_by_path[p])
for i, src in enumerate(chosen_sorted):
dst = frames_dir / f"{i:03d}.jpg"
if src != dst:
src.rename(dst)
ts = (job.duration or 0) * (i + 0.5) / max(len(all_frames), 1)
renamed.append(KeyFrame(index=i, timestamp=round(ts, 2), url=f"/jobs/{job_id}/frames/{i}.jpg"))
shutil.copyfile(src, dst)
renamed.append(KeyFrame(
index=i,
timestamp=round(ts_by_path[src], 2),
url=f"/jobs/{job_id}/frames/{i}.jpg",
))
# 4) 清理候选目录
shutil.rmtree(cand_dir, ignore_errors=True)
update(
job,
@@ -420,6 +509,43 @@ async def trigger_analyze(job_id: str, bg: BackgroundTasks, frames: int = KEYFRA
return job
@app.post("/jobs/{job_id}/frames", response_model=Job)
def add_manual_frame(job_id: str, t: float) -> Job:
"""从指定时间戳手动抽 1 帧追加到 job.frames"""
job = JOBS.get(job_id)
if not job:
raise HTTPException(404, "job not found")
if not job.video_url:
raise HTTPException(400, "video not ready")
d = job_dir(job_id)
mp4 = d / "source.mp4"
if not mp4.exists():
raise HTTPException(400, "source.mp4 missing")
frames_dir = d / "frames"
frames_dir.mkdir(parents=True, exist_ok=True)
# 新 indexmax(existing)+1即使列表已按 ts 排序,文件名用 index 保持稳定)
next_idx = max((f.index for f in job.frames), default=-1) + 1
out = frames_dir / f"{next_idx:03d}.jpg"
try:
run([
"ffmpeg", "-y", "-ss", str(t), "-i", str(mp4),
"-frames:v", "1", "-pix_fmt", "yuvj420p", "-q:v", "3",
str(out),
])
except RuntimeError as e:
raise HTTPException(500, f"ffmpeg failed: {e}")
new_frame = KeyFrame(
index=next_idx,
timestamp=round(float(t), 2),
url=f"/jobs/{job_id}/frames/{next_idx}.jpg",
)
merged = sorted(list(job.frames) + [new_frame], key=lambda f: f.timestamp)
update(job, frames=merged, message=f"已手动加帧({t:.1f}s{len(merged)}")
return job
@app.get("/jobs/{job_id}", response_model=Job)
def get_job(job_id: str) -> Job:
job = JOBS.get(job_id)