auto-save 2026-05-14 13:04 (+1, ~3)
This commit is contained in:
136
api/main.py
136
api/main.py
@@ -30,6 +30,10 @@ PRODUCT_LIBRARY_DIR = Path(
|
||||
os.getenv("PRODUCT_LIBRARY_DIR", Path(__file__).resolve().parent / "product_library" / "skg-products")
|
||||
).resolve()
|
||||
PRODUCT_LIBRARY_MANIFEST = PRODUCT_LIBRARY_DIR / "manifest.json"
|
||||
CHARACTER_LIBRARY_DIR = Path(
|
||||
os.getenv("CHARACTER_LIBRARY_DIR", Path(__file__).resolve().parent / "character_library" / "skg-characters")
|
||||
).resolve()
|
||||
CHARACTER_LIBRARY_MANIFEST = CHARACTER_LIBRARY_DIR / "manifest.json"
|
||||
|
||||
LLM_BASE_URL = os.getenv("LLM_BASE_URL", "").strip()
|
||||
LLM_API_KEY = os.getenv("LLM_API_KEY", "").strip()
|
||||
@@ -306,6 +310,26 @@ class ProductLibraryItem(BaseModel):
|
||||
tags: list[str] = Field(default_factory=list)
|
||||
|
||||
|
||||
class CharacterLibraryImage(BaseModel):
|
||||
id: str
|
||||
view: str
|
||||
label: str
|
||||
filename: str
|
||||
width: int = 0
|
||||
height: int = 0
|
||||
source_path: str = ""
|
||||
url: str = ""
|
||||
|
||||
|
||||
class CharacterLibraryItem(BaseModel):
|
||||
id: str
|
||||
name: str
|
||||
folder: str = ""
|
||||
description: str = ""
|
||||
primary_image: str = ""
|
||||
images: list[CharacterLibraryImage] = Field(default_factory=list)
|
||||
|
||||
|
||||
class ProductFusionRegion(BaseModel):
|
||||
x: float = 0
|
||||
y: float = 0
|
||||
@@ -319,6 +343,10 @@ class ProductFusionShot(BaseModel):
|
||||
last_image: dict | None = None
|
||||
product_images: list[dict] = Field(default_factory=list)
|
||||
product_image: dict | None = None
|
||||
character_id: str = ""
|
||||
character_name: str = ""
|
||||
subject_image: dict | None = None
|
||||
subject_images: list[dict] = Field(default_factory=list)
|
||||
person_image: dict | None = None
|
||||
product_region: ProductFusionRegion | None = None
|
||||
scene_image: dict | None = None
|
||||
@@ -541,6 +569,41 @@ def product_library_file(item: ProductLibraryItem) -> Path:
|
||||
return p
|
||||
|
||||
|
||||
def load_character_library_items() -> list[CharacterLibraryItem]:
|
||||
if not CHARACTER_LIBRARY_MANIFEST.exists():
|
||||
return []
|
||||
try:
|
||||
data = json.loads(CHARACTER_LIBRARY_MANIFEST.read_text(encoding="utf-8"))
|
||||
items: list[CharacterLibraryItem] = []
|
||||
for raw in data.get("characters", []):
|
||||
item = CharacterLibraryItem(**raw)
|
||||
for image in item.images:
|
||||
image.url = f"/character-library/skg/images/{image.filename}"
|
||||
items.append(item)
|
||||
return items
|
||||
except Exception as e:
|
||||
raise HTTPException(500, f"character library manifest invalid: {e}")
|
||||
|
||||
|
||||
def find_character_library_item(character_id: str) -> CharacterLibraryItem:
|
||||
character_id = character_id.strip()
|
||||
for item in load_character_library_items():
|
||||
if item.id == character_id:
|
||||
return item
|
||||
raise HTTPException(404, "character library item not found")
|
||||
|
||||
|
||||
def character_library_file(filename: str) -> Path:
|
||||
p = (CHARACTER_LIBRARY_DIR / filename).resolve()
|
||||
try:
|
||||
p.relative_to(CHARACTER_LIBRARY_DIR)
|
||||
except ValueError:
|
||||
raise HTTPException(400, "invalid character library path")
|
||||
if not p.exists():
|
||||
raise HTTPException(404, "character library image missing")
|
||||
return p
|
||||
|
||||
|
||||
def storyboard_ref_url(job_id: str, ref: dict | None) -> str:
|
||||
if not ref:
|
||||
return ""
|
||||
@@ -3310,6 +3373,7 @@ class GenerateStoryboardVideoReq(BaseModel):
|
||||
last_image: dict | None = None
|
||||
product_images: list[dict] = Field(default_factory=list)
|
||||
subject_image: dict | None = None
|
||||
subject_images: list[dict] = Field(default_factory=list)
|
||||
scene_image: dict | None = None
|
||||
product_image: dict | None = None
|
||||
action_image: dict | None = None
|
||||
@@ -3433,6 +3497,7 @@ def submit_video_create(
|
||||
source_ref: VideoSourceRef | None = None,
|
||||
last_img: Path | None = None,
|
||||
product_imgs: list[Path] | None = None,
|
||||
primary_role: str = "first_frame",
|
||||
):
|
||||
if video_uses_ark():
|
||||
content = [{"type": "text", "text": payload["prompt"]}]
|
||||
@@ -3448,7 +3513,7 @@ def submit_video_create(
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {"url": ark_reference_data_url(ref_img)},
|
||||
"role": "first_frame",
|
||||
"role": primary_role,
|
||||
}
|
||||
)
|
||||
if last_img and last_img.exists():
|
||||
@@ -3505,6 +3570,7 @@ def render_storyboard_video(
|
||||
source_ref: VideoSourceRef | None = None,
|
||||
last_ref_path: Path | None = None,
|
||||
product_ref_paths: list[Path] | None = None,
|
||||
primary_role: str = "first_frame",
|
||||
) -> None:
|
||||
import httpx
|
||||
|
||||
@@ -3534,16 +3600,16 @@ def render_storyboard_video(
|
||||
create = None
|
||||
create_errors: list[str] = []
|
||||
for create_path in VIDEO_CREATE_PATHS:
|
||||
resp = submit_video_create(client, f"{base}{video_path(create_path)}", headers, ref_img, payload, source_ref, prepared_last_img, prepared_product_imgs)
|
||||
resp = submit_video_create(client, f"{base}{video_path(create_path)}", headers, ref_img, payload, source_ref, prepared_last_img, prepared_product_imgs, primary_role)
|
||||
if video_uses_ark() and source_ref and resp.status_code in {400, 422}:
|
||||
create_errors.append(f"{video_path(create_path)} + reference_video -> HTTP {resp.status_code}: {resp.text[:160]}")
|
||||
resp = submit_video_create(client, f"{base}{video_path(create_path)}", headers, ref_img, payload, None, prepared_last_img, prepared_product_imgs)
|
||||
resp = submit_video_create(client, f"{base}{video_path(create_path)}", headers, ref_img, payload, None, prepared_last_img, prepared_product_imgs, primary_role)
|
||||
if video_uses_ark() and prepared_last_img and resp.status_code in {400, 422}:
|
||||
create_errors.append(f"{video_path(create_path)} + last_frame -> HTTP {resp.status_code}: {resp.text[:160]}")
|
||||
resp = submit_video_create(client, f"{base}{video_path(create_path)}", headers, ref_img, payload, None, None, prepared_product_imgs)
|
||||
resp = submit_video_create(client, f"{base}{video_path(create_path)}", headers, ref_img, payload, None, None, prepared_product_imgs, primary_role)
|
||||
if video_uses_ark() and prepared_product_imgs and resp.status_code in {400, 422}:
|
||||
create_errors.append(f"{video_path(create_path)} + product_reference -> HTTP {resp.status_code}: {resp.text[:160]}")
|
||||
resp = submit_video_create(client, f"{base}{video_path(create_path)}", headers, ref_img, payload, None, prepared_last_img, None)
|
||||
resp = submit_video_create(client, f"{base}{video_path(create_path)}", headers, ref_img, payload, None, prepared_last_img, None, primary_role)
|
||||
if resp.status_code < 400:
|
||||
create = resp
|
||||
break
|
||||
@@ -3601,6 +3667,7 @@ def generate_storyboard_video(job_id: str, idx: int, req: GenerateStoryboardVide
|
||||
raise HTTPException(400, "prompt required")
|
||||
|
||||
ref = req.first_image or req.subject_image or req.product_image or req.scene_image or req.action_image
|
||||
primary_role = "first_frame" if req.first_image else "reference_image"
|
||||
ref_path = storyboard_ref_path(job_id, ref) or (job_dir(job_id) / "frames" / f"{idx:03d}.jpg")
|
||||
if not ref_path.exists():
|
||||
raise HTTPException(404, "reference image missing")
|
||||
@@ -3608,6 +3675,14 @@ def generate_storyboard_video(job_id: str, idx: int, req: GenerateStoryboardVide
|
||||
last_ref_path = storyboard_ref_path(job_id, req.last_image)
|
||||
raw_product_refs = req.product_images[:6] if req.product_images else ([req.product_image] if req.product_image else [])
|
||||
product_ref_paths = [p for p in (storyboard_ref_path(job_id, r) for r in raw_product_refs) if p]
|
||||
subject_ref_paths = [p for p in (storyboard_ref_path(job_id, r) for r in req.subject_images[:8]) if p]
|
||||
reference_ref_paths = []
|
||||
seen_ref_paths: set[str] = set()
|
||||
for p in [*subject_ref_paths, *product_ref_paths]:
|
||||
key = str(p)
|
||||
if key not in seen_ref_paths:
|
||||
reference_ref_paths.append(p)
|
||||
seen_ref_paths.add(key)
|
||||
|
||||
local_id = uuid.uuid4().hex[:12]
|
||||
model = resolve_video_model(req.model)
|
||||
@@ -3629,7 +3704,7 @@ def generate_storyboard_video(job_id: str, idx: int, req: GenerateStoryboardVide
|
||||
source_ref = req.source_ref
|
||||
if source_ref and source_ref.kind == "source_video" and not source_ref.url:
|
||||
source_ref = None
|
||||
bg.add_task(render_storyboard_video, job_id, local_id, "", ref_path, prompt, model, seconds, req.size, source_ref, last_ref_path, product_ref_paths)
|
||||
bg.add_task(render_storyboard_video, job_id, local_id, "", ref_path, prompt, model, seconds, req.size, source_ref, last_ref_path, reference_ref_paths, primary_role)
|
||||
return job
|
||||
|
||||
|
||||
@@ -3645,6 +3720,10 @@ class CopyProductLibraryAssetReq(BaseModel):
|
||||
product_id: str
|
||||
|
||||
|
||||
class CopyCharacterLibraryAssetReq(BaseModel):
|
||||
character_id: str
|
||||
|
||||
|
||||
@app.get("/product-library/skg", response_model=list[ProductLibraryItem])
|
||||
def list_skg_product_library() -> list[ProductLibraryItem]:
|
||||
"""内置 SKG 白底产品图库。来源是本地筛选后的产品图 manifest。"""
|
||||
@@ -3660,6 +3739,19 @@ def get_skg_product_library_image(filename: str):
|
||||
return FileResponse(product_library_file(item), media_type="image/jpeg")
|
||||
|
||||
|
||||
@app.get("/character-library/skg", response_model=list[CharacterLibraryItem])
|
||||
def list_skg_character_library() -> list[CharacterLibraryItem]:
|
||||
"""内置透明骨架人角色库。来源是桌面生成的 5 个角色参考组。"""
|
||||
return load_character_library_items()
|
||||
|
||||
|
||||
@app.get("/character-library/skg/images/{filename:path}")
|
||||
def get_skg_character_library_image(filename: str):
|
||||
p = character_library_file(filename)
|
||||
media_type = "image/png" if p.suffix.lower() == ".png" else "image/jpeg"
|
||||
return FileResponse(p, media_type=media_type)
|
||||
|
||||
|
||||
@app.post("/jobs/{job_id}/assets")
|
||||
async def upload_storyboard_asset(job_id: str, file: UploadFile = File(...)) -> dict:
|
||||
if job_id not in JOBS:
|
||||
@@ -3716,6 +3808,38 @@ def copy_product_library_asset(job_id: str, req: CopyProductLibraryAssetReq) ->
|
||||
}
|
||||
|
||||
|
||||
@app.post("/jobs/{job_id}/assets/character-library")
|
||||
def copy_character_library_assets(job_id: str, req: CopyCharacterLibraryAssetReq) -> dict:
|
||||
if job_id not in JOBS:
|
||||
raise HTTPException(404, "job not found")
|
||||
character = find_character_library_item(req.character_id)
|
||||
out_dir = job_dir(job_id) / "assets"
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
refs = []
|
||||
for image in character.images:
|
||||
src = character_library_file(image.filename)
|
||||
asset_id = uuid.uuid4().hex[:12]
|
||||
out = out_dir / f"{asset_id}.jpg"
|
||||
try:
|
||||
img = Image.open(src).convert("RGB")
|
||||
img.thumbnail((1600, 1600), Image.Resampling.LANCZOS)
|
||||
img.save(out, "JPEG", quality=94)
|
||||
except Exception as e:
|
||||
raise HTTPException(400, f"character library copy failed: {e}")
|
||||
refs.append({
|
||||
"kind": "asset",
|
||||
"frame_idx": -1,
|
||||
"element_id": asset_id,
|
||||
"cutout_id": asset_id,
|
||||
"label": f"角色 · {character.name} · {image.label}",
|
||||
})
|
||||
return {
|
||||
"character_id": character.id,
|
||||
"character_name": character.name,
|
||||
"images": refs,
|
||||
}
|
||||
|
||||
|
||||
def product_image_alpha(img: Image.Image) -> Image.Image:
|
||||
rgba = img.convert("RGBA")
|
||||
rgb = rgba.convert("RGB")
|
||||
|
||||
Reference in New Issue
Block a user