auto-save 2026-05-13 10:33 (~2)

This commit is contained in:
2026-05-13 10:33:17 +08:00
parent e154f8b1d8
commit 3fee4a4b7f
2 changed files with 295 additions and 0 deletions

View File

@@ -1230,6 +1230,13 @@
"type": "session-heartbeat", "type": "session-heartbeat",
"message": "Claude 会话活跃 · 最近命令claude · 1 项未提交变更 · 最近提交auto-save 2026-05-13 10:21 (~1)", "message": "Claude 会话活跃 · 最近命令claude · 1 项未提交变更 · 最近提交auto-save 2026-05-13 10:21 (~1)",
"files_changed": 1 "files_changed": 1
},
{
"ts": "2026-05-13T10:27:44+08:00",
"type": "commit",
"message": "auto-save 2026-05-13 10:27 (~1)",
"hash": "e154f8b",
"files_changed": 1
} }
] ]
} }

View File

@@ -63,11 +63,24 @@ class GeneratedImage(BaseModel):
created_at: float = 0.0 created_at: float = 0.0
class KeyElement(BaseModel):
"""关键帧里识别 / 用户提取的元素,可单独抠图给下游做"二创素材层" """
id: str # uuid hex 8
name_zh: str
name_en: str = ""
position: str = "" # 在画面中的位置描述vision 给的)
source: Literal["auto", "manual"] = "manual" # auto=vision 识别 / manual=用户加
cutout_id: str | None = None # 已抠图 → /jobs/{id}/frames/{idx}/elements/{element_id}/cutout.png
created_at: float = 0.0
class KeyFrame(BaseModel): class KeyFrame(BaseModel):
index: int index: int
timestamp: float timestamp: float
url: str url: str
description: dict | None = None # vision 模型识别结果 {scene, objects, style, suggested_prompt} description: dict | None = None # vision 模型识别结果 {scene, objects, style, suggested_prompt}
cleaned_url: str | None = None # 清洗后干净版 → /jobs/{id}/frames/{idx}/cleaned.jpg
elements: list[KeyElement] = [] # 提取的元素清单(持久化)
generated_images: list[GeneratedImage] = [] generated_images: list[GeneratedImage] = []
@@ -456,6 +469,83 @@ async def pipeline_transcribe(job_id: str) -> None:
update(job, status="failed", error=str(e), message="转录失败") update(job, status="failed", error=str(e), message="转录失败")
def _image_edit_call(
image_path: Path,
prompt: str,
model: str | None = None,
fallback_text: bool = False,
max_attempts: int = 3,
) -> tuple[bytes, str]:
"""通用 image edit 调用 · 失败重试 + 可选 text fallback。
返回 (image_bytes, effective_mode) where effective_mode in {"edit","text"}。
失败 raise RuntimeError。"""
import base64 as b64lib
import time as _time
import httpx
if not LLM_API_KEY:
raise RuntimeError("LLM_API_KEY 未配置")
model = model or IMAGE_MODEL
img_b64 = b64lib.b64encode(image_path.read_bytes()).decode("ascii")
data_uri = f"data:image/jpeg;base64,{img_b64}"
plan: list[str] = ["edit"] * max_attempts
if fallback_text:
plan.append("text")
last_err = ""
resp_data: dict = {}
effective_mode = "edit"
for attempt, current_mode in enumerate(plan):
try:
if current_mode == "edit":
with httpx.Client(timeout=120) as client:
r = client.post(
f"{LLM_BASE_URL}/images/generations",
headers={
"Authorization": f"Bearer {LLM_API_KEY}",
"Content-Type": "application/json",
},
json={"model": model, "prompt": prompt, "image": data_uri, "n": 1},
)
r.raise_for_status()
resp_data = r.json()
else:
resp = llm().images.generate(model=model, prompt=prompt, n=1)
resp_data = resp.model_dump() if hasattr(resp, "model_dump") else {"data": [{"b64_json": resp.data[0].b64_json}]}
if resp_data.get("data"):
effective_mode = current_mode
break
err_obj = resp_data.get("error") or {}
last_err = f"empty data · {err_obj.get('code', '')} · {str(err_obj.get('message', ''))[:200]}"
except httpx.HTTPStatusError as e:
body = e.response.text
transient = (
e.response.status_code >= 500
or "incomplete_generation" in body
or "rate_limit" in body
or "timeout" in body.lower()
)
last_err = f"HTTP {e.response.status_code}: {body[:200]}"
if not transient:
raise RuntimeError(f"image edit HTTP {e.response.status_code}: {body[:300]}")
except Exception as e:
last_err = f"{type(e).__name__}: {e}"
if attempt < len(plan) - 1:
next_mode = plan[attempt + 1]
tag = f"fallback → {next_mode}" if next_mode != current_mode else f"retry {attempt + 1}/{len(plan)}"
print(f"[image edit {tag}] {last_err}", flush=True)
_time.sleep(1.5 * (attempt + 1))
data_arr = resp_data.get("data", [])
if not data_arr:
raise RuntimeError(f"image edit failed after {len(plan)} attempts: {last_err}")
b64 = data_arr[0].get("b64_json")
if not b64:
raise RuntimeError("image edit returned no b64_json")
return b64lib.b64decode(b64), effective_mode
# ---------- API 路由 ---------- # ---------- API 路由 ----------
class CreateJobReq(BaseModel): class CreateJobReq(BaseModel):
@@ -884,3 +974,201 @@ def describe_frame(job_id: str, idx: int) -> Job:
new_frames.append(f) new_frames.append(f)
update(job, frames=new_frames, message=f"识别完成 · 分镜 {idx + 1}") update(job, frames=new_frames, message=f"识别完成 · 分镜 {idx + 1}")
return job return job
# ---------- 清洗水印 / 元素提取(关键帧二阶段加工) ----------
@app.post("/jobs/{job_id}/frames/{idx}/cleanup", response_model=Job)
def cleanup_frame(job_id: str, idx: int) -> Job:
"""调 nano-banana image edit 清洗关键帧:去水印 / @用户名 / 字幕 / 平台 logo。
输出干净版到 jobs/<id>/cleaned/<idx>.jpg写回 frame.cleaned_url。"""
import time as _time
job = JOBS.get(job_id)
if not job:
raise HTTPException(404, "job not found")
frame = next((f for f in job.frames if f.index == idx), None)
if not frame:
raise HTTPException(404, "frame not found")
frame_path = job_dir(job_id) / "frames" / f"{idx:03d}.jpg"
if not frame_path.exists():
raise HTTPException(404, "frame file missing")
prompt = (
"Clean this image by removing all overlay graphics that obstruct the main content: "
"watermarks, social media usernames or @handles, platform logos (TikTok, Instagram, etc.), "
"subtitles, captions, overlay text, sticker text, hashtags. "
"Keep all original scene elements (characters, props, background, lighting) intact. "
"The result should look like the same photograph with overlay UI removed — "
"natural, seamless, no visible patches or artifacts."
)
try:
img_bytes, _mode = _image_edit_call(frame_path, prompt, fallback_text=False, max_attempts=3)
except RuntimeError as e:
raise HTTPException(500, f"cleanup failed: {e}")
out_dir = job_dir(job_id) / "cleaned"
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / f"{idx:03d}.jpg"
out_path.write_bytes(img_bytes)
new_frames = []
for f in job.frames:
if f.index == idx:
f.cleaned_url = f"/jobs/{job_id}/frames/{idx}/cleaned.jpg?t={int(_time.time())}"
new_frames.append(f)
update(job, frames=new_frames, message=f"清洗完成 · 分镜 {idx + 1}")
return job
@app.get("/jobs/{job_id}/frames/{idx}/cleaned.jpg")
def get_cleaned_frame(job_id: str, idx: int):
p = job_dir(job_id) / "cleaned" / f"{idx:03d}.jpg"
if not p.exists():
raise HTTPException(404, "cleaned frame not found")
return FileResponse(p, media_type="image/jpeg")
class AddElementReq(BaseModel):
name_zh: str
name_en: str = ""
position: str = ""
source: Literal["auto", "manual"] = "manual"
@app.post("/jobs/{job_id}/frames/{idx}/elements", response_model=Job)
def add_element(job_id: str, idx: int, req: AddElementReq) -> Job:
"""加一条元素 · 若 name_en 缺则自动 zh→en 翻译"""
import time as _time
import re as _re
job = JOBS.get(job_id)
if not job:
raise HTTPException(404, "job not found")
frame = next((f for f in job.frames if f.index == idx), None)
if not frame:
raise HTTPException(404, "frame not found")
name_zh = req.name_zh.strip()
if not name_zh:
raise HTTPException(400, "name_zh required")
name_en = req.name_en.strip()
if not name_en and LLM_API_KEY:
try:
prompt = (
"Translate the following text into concise English, suitable as an element label "
"in an image-generation prompt. Output only the translation — no quotes, no punctuation, "
f"no explanation.\n\nInput: {name_zh}"
)
resp = llm().chat.completions.create(
model=TRANSLATE_MODEL,
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
max_tokens=200,
)
out = (resp.choices[0].message.content or "").strip()
if not out:
rc = getattr(resp.choices[0].message, "reasoning_content", "") or ""
if rc:
out = rc.strip().splitlines()[-1].strip()
name_en = _re.sub(r'^[\'"「『]+|[\'"」』]+$', "", out).strip()
except Exception as e:
print(f"[add_element translate failed] {e}", flush=True)
name_en = ""
el = KeyElement(
id=uuid.uuid4().hex[:8],
name_zh=name_zh,
name_en=name_en,
position=req.position.strip(),
source=req.source,
created_at=_time.time(),
)
new_frames = []
for f in job.frames:
if f.index == idx:
f.elements = f.elements + [el]
new_frames.append(f)
update(job, frames=new_frames, message=f"加入元素 · 分镜 {idx + 1} · {name_zh}")
return job
@app.delete("/jobs/{job_id}/frames/{idx}/elements/{element_id}", response_model=Job)
def delete_element(job_id: str, idx: int, element_id: str) -> Job:
job = JOBS.get(job_id)
if not job:
raise HTTPException(404, "job not found")
new_frames = []
removed = False
for f in job.frames:
if f.index == idx:
before = len(f.elements)
f.elements = [e for e in f.elements if e.id != element_id]
removed = len(f.elements) < before
# 若有抠图文件也删
if removed:
cutout = job_dir(job_id) / "elements" / f"{idx:03d}_{element_id}.png"
if cutout.exists():
try:
cutout.unlink()
except OSError:
pass
new_frames.append(f)
if not removed:
raise HTTPException(404, "element not found")
update(job, frames=new_frames, message=f"删除元素 · 分镜 {idx + 1}")
return job
@app.post("/jobs/{job_id}/frames/{idx}/elements/{element_id}/cutout", response_model=Job)
def cutout_element(job_id: str, idx: int, element_id: str) -> Job:
"""单元素抠图:调 nano-banana image edit 输出透明背景元素图"""
import time as _time
job = JOBS.get(job_id)
if not job:
raise HTTPException(404, "job not found")
frame = next((f for f in job.frames if f.index == idx), None)
if not frame:
raise HTTPException(404, "frame not found")
el = next((e for e in frame.elements if e.id == element_id), None)
if not el:
raise HTTPException(404, "element not found")
# 优先用 cleaned 版作 reference已去掉 logo / 水印干扰fallback 原图
cleaned_path = job_dir(job_id) / "cleaned" / f"{idx:03d}.jpg"
src = cleaned_path if cleaned_path.exists() else job_dir(job_id) / "frames" / f"{idx:03d}.jpg"
if not src.exists():
raise HTTPException(404, "source frame file missing")
target = (el.name_en or el.name_zh).strip()
position_hint = f" Located {el.position}." if el.position else ""
prompt = (
f"Extract the element '{target}' from this image as a standalone asset.{position_hint} "
"Output: the element on a fully transparent background (alpha channel), "
"isolated cleanly with no surrounding scene, no other objects, no shadows from the original scene. "
"Preserve the element's original colors, lighting, shape and proportions."
)
try:
img_bytes, _mode = _image_edit_call(src, prompt, fallback_text=False, max_attempts=3)
except RuntimeError as e:
raise HTTPException(500, f"cutout failed: {e}")
out_dir = job_dir(job_id) / "elements"
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / f"{idx:03d}_{element_id}.png"
out_path.write_bytes(img_bytes)
new_frames = []
for f in job.frames:
if f.index == idx:
for e in f.elements:
if e.id == element_id:
e.cutout_id = element_id # marker that cutout exists; URL derived from id
new_frames.append(f)
update(job, frames=new_frames, message=f"抠图完成 · {el.name_zh}")
return job
@app.get("/jobs/{job_id}/frames/{idx}/elements/{element_id}/cutout.png")
def get_cutout(job_id: str, idx: int, element_id: str):
p = job_dir(job_id) / "elements" / f"{idx:03d}_{element_id}.png"
if not p.exists():
raise HTTPException(404, "cutout not found")
return FileResponse(p, media_type="image/png")