From 089a30d97003d7984e4937f9a0ed70f24b5674a6 Mon Sep 17 00:00:00 2001 From: kang Date: Tue, 26 May 2026 00:07:48 +0800 Subject: [PATCH] auto-save 2026-05-26 00:07 (+1, ~3) --- .memory/worklog.json | 50 +-- api/db.py | 763 +++++++++++++++++++++++++++++++++++++++++++ api/main.py | 6 +- api/requirements.txt | 1 + 4 files changed, 795 insertions(+), 25 deletions(-) create mode 100644 api/db.py diff --git a/.memory/worklog.json b/.memory/worklog.json index ffe221f..19802e3 100644 --- a/.memory/worklog.json +++ b/.memory/worklog.json @@ -1,29 +1,5 @@ { "entries": [ - { - "files_changed": 1, - "message": "Codex 会话活跃 · 最近命令:codex · 分支 main · 1 项未提交变更 · 最近提交:docs: record subject pack deployment", - "ts": "2026-05-20T02:13:55Z", - "type": "session-heartbeat" - }, - { - "files_changed": 1, - "message": "Codex 会话活跃 · 最近命令:codex · 分支 main · 1 项未提交变更 · 最近提交:docs: record subject pack deployment", - "ts": "2026-05-20T02:23:55Z", - "type": "session-heartbeat" - }, - { - "files_changed": 1, - "message": "Codex 会话活跃 · 最近命令:codex · 分支 main · 1 项未提交变更 · 最近提交:docs: record subject pack deployment", - "ts": "2026-05-20T02:33:56Z", - "type": "session-heartbeat" - }, - { - "files_changed": 1, - "message": "Codex 会话活跃 · 最近命令:codex · 分支 main · 1 项未提交变更 · 最近提交:docs: record subject pack deployment", - "ts": "2026-05-20T02:43:56Z", - "type": "session-heartbeat" - }, { "files_changed": 1, "message": "Codex 会话活跃 · 最近命令:codex · 分支 main · 1 项未提交变更 · 最近提交:docs: record subject pack deployment", @@ -3223,6 +3199,32 @@ "type": "session-heartbeat", "message": "Codex 会话活跃 · 最近命令:codex · 分支 main · 1 项未提交变更 · 最近提交:auto-save 2026-05-25 23:18 (~2)", "files_changed": 1 + }, + { + "ts": "2026-05-25T23:51:29+08:00", + "type": "commit", + "message": "auto-save 2026-05-25 23:51 (~3)", + "hash": "327cd2b", + "files_changed": 3 + }, + { + "ts": "2026-05-25T23:51:50+08:00", + "type": "commit", + "message": "docs: record Feishu OAuth enablement", + "hash": "830afac", + "files_changed": 0 + }, + { + "ts": "2026-05-25T15:54:42Z", + "type": "session-heartbeat", + "message": "Codex 会话活跃 · 最近命令:codex · 分支 main · 1 项未提交变更 · 最近提交:docs: record Feishu OAuth enablement", + "files_changed": 1 + }, + { + "ts": "2026-05-25T16:04:43Z", + "type": "session-heartbeat", + "message": "Codex 会话活跃 · 最近命令:codex · 分支 main · 1 项未提交变更 · 最近提交:docs: record Feishu OAuth enablement", + "files_changed": 1 } ] } diff --git a/api/db.py b/api/db.py new file mode 100644 index 0000000..8233580 --- /dev/null +++ b/api/db.py @@ -0,0 +1,763 @@ +from __future__ import annotations + +import os +import time +import uuid +from datetime import datetime, timezone +from typing import Any + +try: + import psycopg + from psycopg.rows import dict_row + from psycopg.types.json import Jsonb +except ModuleNotFoundError: # Local dev can still run without Postgres deps installed. + psycopg = None + dict_row = None + Jsonb = None + + +DATABASE_URL = os.getenv("DATABASE_URL", "").strip() +DB_ENABLED = bool(DATABASE_URL and psycopg is not None) + + +def enabled() -> bool: + return DB_ENABLED + + +def _connect(): + if not DB_ENABLED: + raise RuntimeError("database disabled") + return psycopg.connect(DATABASE_URL, row_factory=dict_row, connect_timeout=5) + + +def _dt(ts: float | int | None = None) -> datetime: + try: + value = float(ts or 0) + except (TypeError, ValueError): + value = 0 + if value <= 0: + value = time.time() + return datetime.fromtimestamp(value, tz=timezone.utc) + + +def _json(value: Any): + return Jsonb(value if value is not None else {}) + + +def _execute_safely(label: str, fn): + if not DB_ENABLED: + return None + try: + return fn() + except Exception as exc: + print(f"[db] {label} failed: {exc}", flush=True) + return None + + +def init_schema() -> bool: + if not DB_ENABLED: + print("[db] disabled: DATABASE_URL is empty or psycopg is missing", flush=True) + return False + + ddl = [ + """ + CREATE TABLE IF NOT EXISTS app_users ( + uid TEXT PRIMARY KEY, + provider TEXT NOT NULL DEFAULT '', + username TEXT NOT NULL DEFAULT '', + name TEXT NOT NULL DEFAULT '', + email TEXT NOT NULL DEFAULT '', + open_id TEXT NOT NULL DEFAULT '', + union_id TEXT NOT NULL DEFAULT '', + tenant_key TEXT NOT NULL DEFAULT '', + avatar_url TEXT NOT NULL DEFAULT '', + first_seen_at TIMESTAMPTZ NOT NULL DEFAULT now(), + last_seen_at TIMESTAMPTZ NOT NULL DEFAULT now(), + last_ip TEXT NOT NULL DEFAULT '', + last_user_agent TEXT NOT NULL DEFAULT '', + metadata JSONB NOT NULL DEFAULT '{}'::jsonb + ) + """, + """ + CREATE TABLE IF NOT EXISTS canvas_projects ( + id TEXT PRIMARY KEY, + owner_id TEXT NOT NULL REFERENCES app_users(uid) ON DELETE CASCADE, + name TEXT NOT NULL DEFAULT '', + thumbnail TEXT NOT NULL DEFAULT '', + visibility TEXT NOT NULL DEFAULT 'private', + canvas_data JSONB NOT NULL DEFAULT '{}'::jsonb, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + deleted_at TIMESTAMPTZ, + version INTEGER NOT NULL DEFAULT 1, + source TEXT NOT NULL DEFAULT 'canvas', + metadata JSONB NOT NULL DEFAULT '{}'::jsonb + ) + """, + """ + CREATE TABLE IF NOT EXISTS job_index ( + job_id TEXT PRIMARY KEY, + owner_id TEXT NOT NULL DEFAULT '', + owner_name TEXT NOT NULL DEFAULT '', + owner_email TEXT NOT NULL DEFAULT '', + owner_provider TEXT NOT NULL DEFAULT '', + tenant_key TEXT NOT NULL DEFAULT '', + url TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL DEFAULT '', + progress INTEGER NOT NULL DEFAULT 0, + message TEXT NOT NULL DEFAULT '', + job_kind TEXT NOT NULL DEFAULT '', + width INTEGER NOT NULL DEFAULT 0, + height INTEGER NOT NULL DEFAULT 0, + duration DOUBLE PRECISION NOT NULL DEFAULT 0, + frame_count INTEGER NOT NULL DEFAULT 0, + video_count INTEGER NOT NULL DEFAULT 0, + thumbnail TEXT NOT NULL DEFAULT '', + state_path TEXT NOT NULL DEFAULT '', + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + last_synced_at TIMESTAMPTZ NOT NULL DEFAULT now(), + payload JSONB NOT NULL DEFAULT '{}'::jsonb + ) + """, + """ + CREATE TABLE IF NOT EXISTS generated_assets ( + asset_key TEXT PRIMARY KEY, + asset_id TEXT NOT NULL DEFAULT '', + job_id TEXT NOT NULL DEFAULT '', + owner_id TEXT NOT NULL DEFAULT '', + kind TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL DEFAULT '', + url TEXT NOT NULL DEFAULT '', + model TEXT NOT NULL DEFAULT '', + prompt TEXT NOT NULL DEFAULT '', + width INTEGER NOT NULL DEFAULT 0, + height INTEGER NOT NULL DEFAULT 0, + duration DOUBLE PRECISION NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + metadata JSONB NOT NULL DEFAULT '{}'::jsonb + ) + """, + """ + CREATE TABLE IF NOT EXISTS prompt_library_index ( + item_id TEXT PRIMARY KEY, + owner_id TEXT NOT NULL DEFAULT '', + category TEXT NOT NULL DEFAULT '', + name TEXT NOT NULL DEFAULT '', + tags JSONB NOT NULL DEFAULT '[]'::jsonb, + visibility TEXT NOT NULL DEFAULT 'company', + source_job_id TEXT NOT NULL DEFAULT '', + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + payload JSONB NOT NULL DEFAULT '{}'::jsonb + ) + """, + """ + CREATE TABLE IF NOT EXISTS asset_library_index ( + item_key TEXT PRIMARY KEY, + item_id TEXT NOT NULL DEFAULT '', + owner_id TEXT NOT NULL DEFAULT '', + kind TEXT NOT NULL DEFAULT '', + name TEXT NOT NULL DEFAULT '', + tags JSONB NOT NULL DEFAULT '[]'::jsonb, + visibility TEXT NOT NULL DEFAULT 'company', + source_job_id TEXT NOT NULL DEFAULT '', + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + payload JSONB NOT NULL DEFAULT '{}'::jsonb + ) + """, + """ + CREATE TABLE IF NOT EXISTS agent_run_index ( + run_id TEXT PRIMARY KEY, + job_id TEXT NOT NULL DEFAULT '', + owner_id TEXT NOT NULL DEFAULT '', + owner_name TEXT NOT NULL DEFAULT '', + owner_email TEXT NOT NULL DEFAULT '', + owner_provider TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL DEFAULT '', + stage TEXT NOT NULL DEFAULT '', + progress INTEGER NOT NULL DEFAULT 0, + final_video_url TEXT NOT NULL DEFAULT '', + contact_sheet_url TEXT NOT NULL DEFAULT '', + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + payload JSONB NOT NULL DEFAULT '{}'::jsonb + ) + """, + """ + CREATE TABLE IF NOT EXISTS audit_events ( + id UUID PRIMARY KEY, + ts TIMESTAMPTZ NOT NULL DEFAULT now(), + user_id TEXT NOT NULL DEFAULT '', + action TEXT NOT NULL, + entity_type TEXT NOT NULL DEFAULT '', + entity_id TEXT NOT NULL DEFAULT '', + visibility TEXT NOT NULL DEFAULT '', + ip TEXT NOT NULL DEFAULT '', + user_agent TEXT NOT NULL DEFAULT '', + metadata JSONB NOT NULL DEFAULT '{}'::jsonb + ) + """, + "CREATE INDEX IF NOT EXISTS idx_canvas_projects_owner_updated ON canvas_projects(owner_id, updated_at DESC) WHERE deleted_at IS NULL", + "CREATE INDEX IF NOT EXISTS idx_canvas_projects_visibility_updated ON canvas_projects(visibility, updated_at DESC) WHERE deleted_at IS NULL", + "CREATE INDEX IF NOT EXISTS idx_job_index_owner_updated ON job_index(owner_id, updated_at DESC)", + "CREATE INDEX IF NOT EXISTS idx_generated_assets_owner_created ON generated_assets(owner_id, created_at DESC)", + "CREATE INDEX IF NOT EXISTS idx_prompt_library_visibility ON prompt_library_index(visibility, updated_at DESC)", + "CREATE INDEX IF NOT EXISTS idx_asset_library_visibility ON asset_library_index(visibility, updated_at DESC)", + "CREATE INDEX IF NOT EXISTS idx_audit_events_user_ts ON audit_events(user_id, ts DESC)", + ] + + def run(): + with _connect() as conn: + with conn.cursor() as cur: + for stmt in ddl: + cur.execute(stmt) + conn.commit() + return True + + return bool(_execute_safely("init_schema", run)) + + +def health() -> dict: + if not DB_ENABLED: + return {"enabled": False, "connected": False} + + def run(): + with _connect() as conn: + with conn.cursor() as cur: + cur.execute("SELECT 1 AS ok") + cur.fetchone() + return {"enabled": True, "connected": True} + + return _execute_safely("health", run) or {"enabled": True, "connected": False} + + +def request_ip(request: Any) -> str: + if request is None: + return "" + forwarded = str(request.headers.get("x-forwarded-for") or "").split(",", 1)[0].strip() + return forwarded or getattr(getattr(request, "client", None), "host", "") or "" + + +def request_user_agent(request: Any) -> str: + if request is None: + return "" + return str(request.headers.get("user-agent") or "")[:600] + + +def upsert_user(user: dict, request: Any = None) -> None: + uid = str(user.get("uid") or "").strip() + if not uid: + return + payload = { + "username": str(user.get("username") or user.get("u") or ""), + "name": str(user.get("name") or ""), + "email": str(user.get("email") or ""), + "open_id": str(user.get("open_id") or ""), + "union_id": str(user.get("union_id") or ""), + "tenant_key": str(user.get("tenant_key") or ""), + "avatar_url": str(user.get("avatar_url") or ""), + "provider": str(user.get("provider") or ""), + } + + def run(): + with _connect() as conn: + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO app_users ( + uid, provider, username, name, email, open_id, union_id, + tenant_key, avatar_url, last_ip, last_user_agent, metadata + ) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + ON CONFLICT (uid) DO UPDATE SET + provider = EXCLUDED.provider, + username = EXCLUDED.username, + name = EXCLUDED.name, + email = EXCLUDED.email, + open_id = EXCLUDED.open_id, + union_id = EXCLUDED.union_id, + tenant_key = EXCLUDED.tenant_key, + avatar_url = EXCLUDED.avatar_url, + last_seen_at = now(), + last_ip = EXCLUDED.last_ip, + last_user_agent = EXCLUDED.last_user_agent, + metadata = EXCLUDED.metadata + """, + ( + uid, + payload["provider"], + payload["username"], + payload["name"], + payload["email"], + payload["open_id"], + payload["union_id"], + payload["tenant_key"], + payload["avatar_url"], + request_ip(request), + request_user_agent(request), + _json(payload), + ), + ) + conn.commit() + + _execute_safely("upsert_user", run) + + +def audit(user: dict | None, action: str, entity_type: str = "", entity_id: str = "", metadata: dict | None = None, request: Any = None, visibility: str = "") -> None: + def run(): + with _connect() as conn: + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO audit_events (id, user_id, action, entity_type, entity_id, visibility, ip, user_agent, metadata) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s) + """, + ( + str(uuid.uuid4()), + str((user or {}).get("uid") or ""), + action, + entity_type, + entity_id, + visibility, + request_ip(request), + request_user_agent(request), + _json(metadata or {}), + ), + ) + conn.commit() + + _execute_safely("audit", run) + + +def list_canvas_projects(user: dict, include_shared: bool = True) -> list[dict]: + uid = str(user.get("uid") or "") + tenant_key = str(user.get("tenant_key") or "") + + def run(): + with _connect() as conn: + with conn.cursor() as cur: + cur.execute( + """ + SELECT + p.id, p.name, p.thumbnail, p.visibility, p.canvas_data, + p.created_at, p.updated_at, p.version, p.owner_id, + u.name AS owner_name, u.email AS owner_email, u.provider AS owner_provider + FROM canvas_projects p + LEFT JOIN app_users u ON u.uid = p.owner_id + WHERE p.deleted_at IS NULL + AND ( + p.owner_id = %s + OR (%s AND p.visibility = 'company') + OR (%s AND p.visibility = 'team' AND COALESCE(u.tenant_key, '') = %s) + ) + ORDER BY p.updated_at DESC + LIMIT 500 + """, + (uid, include_shared, bool(tenant_key), tenant_key), + ) + rows = cur.fetchall() + return [dict(row) for row in rows] + + return _execute_safely("list_canvas_projects", run) or [] + + +def get_canvas_project(project_id: str, user: dict) -> dict | None: + uid = str(user.get("uid") or "") + tenant_key = str(user.get("tenant_key") or "") + + def run(): + with _connect() as conn: + with conn.cursor() as cur: + cur.execute( + """ + SELECT p.*, u.tenant_key AS owner_tenant_key + FROM canvas_projects p + LEFT JOIN app_users u ON u.uid = p.owner_id + WHERE p.id = %s AND p.deleted_at IS NULL + """, + (project_id,), + ) + row = cur.fetchone() + if not row: + return None + if row["owner_id"] == uid or row["visibility"] == "company" or (row["visibility"] == "team" and tenant_key and row["owner_tenant_key"] == tenant_key): + return dict(row) + return None + + return _execute_safely("get_canvas_project", run) + + +def upsert_canvas_project(user: dict, project: dict) -> dict | None: + uid = str(user.get("uid") or "") + if not uid: + return None + project_id = str(project.get("id") or "").strip() + if not project_id: + project_id = f"project_{int(time.time() * 1000)}_{uuid.uuid4().hex[:9]}" + name = str(project.get("name") or "未命名项目").strip() or "未命名项目" + thumbnail = str(project.get("thumbnail") or "") + visibility = str(project.get("visibility") or "private").strip() + if visibility not in {"private", "team", "company"}: + visibility = "private" + canvas_data = project.get("canvas_data") or project.get("canvasData") or {"nodes": [], "edges": [], "viewport": {"x": 100, "y": 50, "zoom": 0.8}} + created_at = _dt(project.get("created_at") or project.get("createdAt")) + updated_at = _dt(project.get("updated_at") or project.get("updatedAt")) + + def run(): + with _connect() as conn: + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO canvas_projects ( + id, owner_id, name, thumbnail, visibility, canvas_data, + created_at, updated_at, version, source, metadata + ) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,1,%s,%s) + ON CONFLICT (id) DO UPDATE SET + name = EXCLUDED.name, + thumbnail = EXCLUDED.thumbnail, + visibility = CASE + WHEN canvas_projects.owner_id = EXCLUDED.owner_id THEN EXCLUDED.visibility + ELSE canvas_projects.visibility + END, + canvas_data = CASE + WHEN canvas_projects.owner_id = EXCLUDED.owner_id THEN EXCLUDED.canvas_data + ELSE canvas_projects.canvas_data + END, + updated_at = CASE + WHEN canvas_projects.owner_id = EXCLUDED.owner_id THEN GREATEST(canvas_projects.updated_at, EXCLUDED.updated_at) + ELSE canvas_projects.updated_at + END, + version = CASE + WHEN canvas_projects.owner_id = EXCLUDED.owner_id THEN canvas_projects.version + 1 + ELSE canvas_projects.version + END, + deleted_at = CASE + WHEN canvas_projects.owner_id = EXCLUDED.owner_id THEN NULL + ELSE canvas_projects.deleted_at + END + RETURNING id, name, thumbnail, visibility, canvas_data, created_at, updated_at, version, owner_id + """, + ( + project_id, + uid, + name, + thumbnail, + visibility, + _json(canvas_data), + created_at, + updated_at, + str(project.get("source") or "canvas"), + _json({"migrated_from": project.get("source") or "canvas"}), + ), + ) + row = cur.fetchone() + conn.commit() + return dict(row) if row else None + + return _execute_safely("upsert_canvas_project", run) + + +def soft_delete_canvas_project(user: dict, project_id: str) -> bool: + uid = str(user.get("uid") or "") + + def run(): + with _connect() as conn: + with conn.cursor() as cur: + cur.execute( + """ + UPDATE canvas_projects + SET deleted_at = now(), updated_at = now(), version = version + 1 + WHERE id = %s AND owner_id = %s AND deleted_at IS NULL + """, + (project_id, uid), + ) + changed = cur.rowcount > 0 + conn.commit() + return changed + + return bool(_execute_safely("soft_delete_canvas_project", run)) + + +def index_job(job: dict, state_path: str = "") -> None: + job_id = str(job.get("id") or "") + if not job_id: + return + frames = job.get("frames") or [] + generated_videos = job.get("generated_videos") or [] + thumbnail = "" + if frames: + first = frames[0] if isinstance(frames[0], dict) else {} + thumbnail = str(first.get("url") or "") + updated_at = _dt() + + def run(): + with _connect() as conn: + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO job_index ( + job_id, owner_id, owner_name, owner_email, owner_provider, tenant_key, + url, status, progress, message, job_kind, width, height, duration, + frame_count, video_count, thumbnail, state_path, updated_at, payload + ) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + ON CONFLICT (job_id) DO UPDATE SET + owner_id = EXCLUDED.owner_id, + owner_name = EXCLUDED.owner_name, + owner_email = EXCLUDED.owner_email, + owner_provider = EXCLUDED.owner_provider, + tenant_key = EXCLUDED.tenant_key, + url = EXCLUDED.url, + status = EXCLUDED.status, + progress = EXCLUDED.progress, + message = EXCLUDED.message, + job_kind = EXCLUDED.job_kind, + width = EXCLUDED.width, + height = EXCLUDED.height, + duration = EXCLUDED.duration, + frame_count = EXCLUDED.frame_count, + video_count = EXCLUDED.video_count, + thumbnail = EXCLUDED.thumbnail, + state_path = EXCLUDED.state_path, + updated_at = EXCLUDED.updated_at, + last_synced_at = now(), + payload = EXCLUDED.payload + """, + ( + job_id, + str(job.get("owner_id") or ""), + str(job.get("owner_name") or ""), + str(job.get("owner_email") or ""), + str(job.get("owner_provider") or ""), + str(job.get("tenant_key") or ""), + str(job.get("url") or ""), + str(job.get("status") or ""), + int(job.get("progress") or 0), + str(job.get("message") or "")[:1000], + str(job.get("url") or "").split("://", 1)[0] or "job", + int(job.get("width") or 0), + int(job.get("height") or 0), + float(job.get("duration") or 0), + len(frames), + len(generated_videos), + thumbnail, + state_path, + updated_at, + _json(job), + ), + ) + for frame in frames: + if not isinstance(frame, dict): + continue + frame_idx = frame.get("index", 0) + for image in frame.get("generated_images") or []: + if not isinstance(image, dict): + continue + asset_key = f"{job_id}:image:{image.get('id')}" + cur.execute( + """ + INSERT INTO generated_assets ( + asset_key, asset_id, job_id, owner_id, kind, status, url, + model, prompt, created_at, updated_at, metadata + ) + VALUES (%s,%s,%s,%s,'image','completed',%s,%s,%s,%s,%s,%s) + ON CONFLICT (asset_key) DO UPDATE SET + status = EXCLUDED.status, + url = EXCLUDED.url, + model = EXCLUDED.model, + prompt = EXCLUDED.prompt, + updated_at = EXCLUDED.updated_at, + metadata = EXCLUDED.metadata + """, + ( + asset_key, + str(image.get("id") or ""), + job_id, + str(job.get("owner_id") or ""), + str(image.get("url") or ""), + str(image.get("model") or ""), + str(image.get("prompt") or ""), + _dt(image.get("created_at")), + updated_at, + _json({"frame_idx": frame_idx, **image}), + ), + ) + for video in generated_videos: + if not isinstance(video, dict): + continue + asset_key = f"{job_id}:video:{video.get('id')}" + cur.execute( + """ + INSERT INTO generated_assets ( + asset_key, asset_id, job_id, owner_id, kind, status, url, + model, prompt, duration, created_at, updated_at, metadata + ) + VALUES (%s,%s,%s,%s,'video',%s,%s,%s,%s,%s,%s,%s,%s) + ON CONFLICT (asset_key) DO UPDATE SET + status = EXCLUDED.status, + url = EXCLUDED.url, + model = EXCLUDED.model, + prompt = EXCLUDED.prompt, + duration = EXCLUDED.duration, + updated_at = EXCLUDED.updated_at, + metadata = EXCLUDED.metadata + """, + ( + asset_key, + str(video.get("id") or ""), + job_id, + str(job.get("owner_id") or ""), + str(video.get("status") or ""), + str(video.get("url") or ""), + str(video.get("model") or ""), + str(video.get("prompt") or ""), + float(video.get("duration") or 0), + _dt(video.get("created_at")), + updated_at, + _json(video), + ), + ) + conn.commit() + + _execute_safely("index_job", run) + + +def index_agent_run(run_payload: dict) -> None: + run_id = str(run_payload.get("id") or "") + if not run_id: + return + + def run(): + with _connect() as conn: + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO agent_run_index ( + run_id, job_id, owner_id, owner_name, owner_email, owner_provider, + status, stage, progress, final_video_url, contact_sheet_url, + created_at, updated_at, payload + ) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + ON CONFLICT (run_id) DO UPDATE SET + job_id = EXCLUDED.job_id, + owner_id = EXCLUDED.owner_id, + owner_name = EXCLUDED.owner_name, + owner_email = EXCLUDED.owner_email, + owner_provider = EXCLUDED.owner_provider, + status = EXCLUDED.status, + stage = EXCLUDED.stage, + progress = EXCLUDED.progress, + final_video_url = EXCLUDED.final_video_url, + contact_sheet_url = EXCLUDED.contact_sheet_url, + updated_at = EXCLUDED.updated_at, + payload = EXCLUDED.payload + """, + ( + run_id, + str(run_payload.get("job_id") or ""), + str(run_payload.get("owner_id") or ""), + str(run_payload.get("owner_name") or ""), + str(run_payload.get("owner_email") or ""), + str(run_payload.get("owner_provider") or ""), + str(run_payload.get("status") or ""), + str(run_payload.get("stage") or ""), + int(run_payload.get("progress") or 0), + str(run_payload.get("final_video_url") or ""), + str(run_payload.get("contact_sheet_url") or ""), + _dt(run_payload.get("created_at")), + _dt(run_payload.get("updated_at")), + _json(run_payload), + ), + ) + conn.commit() + + _execute_safely("index_agent_run", run) + + +def index_prompt_item(item: dict, owner_id: str = "") -> None: + item_id = str(item.get("id") or "") + if not item_id: + return + + def run(): + with _connect() as conn: + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO prompt_library_index ( + item_id, owner_id, category, name, tags, source_job_id, + created_at, updated_at, payload + ) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s) + ON CONFLICT (item_id) DO UPDATE SET + owner_id = EXCLUDED.owner_id, + category = EXCLUDED.category, + name = EXCLUDED.name, + tags = EXCLUDED.tags, + source_job_id = EXCLUDED.source_job_id, + updated_at = EXCLUDED.updated_at, + payload = EXCLUDED.payload + """, + ( + item_id, + owner_id, + str(item.get("category") or ""), + str(item.get("name") or ""), + _json(item.get("tags") or []), + str(item.get("source_job_id") or ""), + _dt(item.get("created_at")), + _dt(item.get("updated_at") or item.get("created_at")), + _json(item), + ), + ) + conn.commit() + + _execute_safely("index_prompt_item", run) + + +def index_asset_item(item: dict, owner_id: str = "") -> None: + item_id = str(item.get("id") or "") + kind = str(item.get("kind") or "") + if not item_id or not kind: + return + item_key = f"{kind}:{item_id}" + + def run(): + with _connect() as conn: + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO asset_library_index ( + item_key, item_id, owner_id, kind, name, tags, source_job_id, + created_at, updated_at, payload + ) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + ON CONFLICT (item_key) DO UPDATE SET + owner_id = EXCLUDED.owner_id, + kind = EXCLUDED.kind, + name = EXCLUDED.name, + tags = EXCLUDED.tags, + source_job_id = EXCLUDED.source_job_id, + updated_at = EXCLUDED.updated_at, + payload = EXCLUDED.payload + """, + ( + item_key, + item_id, + owner_id, + kind, + str(item.get("name") or ""), + _json(item.get("tags") or []), + str(item.get("source_job_id") or ""), + _dt(item.get("created_at")), + _dt(item.get("updated_at") or item.get("created_at")), + _json(item), + ), + ) + conn.commit() + + _execute_safely("index_asset_item", run) diff --git a/api/main.py b/api/main.py index 6ae5501..f4209a1 100644 --- a/api/main.py +++ b/api/main.py @@ -28,6 +28,8 @@ from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, JSONResponse, RedirectResponse from pydantic import BaseModel, ConfigDict, Field +import db + load_dotenv() JOBS_DIR = Path(os.getenv("JOBS_DIR", "./jobs")).resolve() @@ -1285,7 +1287,9 @@ def job_with_artifacts(job: Job) -> Job: def save_state(job: Job) -> None: - (job_dir(job.id) / "state.json").write_text(job.model_dump_json(indent=2)) + state_path = job_dir(job.id) / "state.json" + state_path.write_text(job.model_dump_json(indent=2)) + db.index_job(job.model_dump(), str(state_path)) def update(job: Job, **kw) -> None: diff --git a/api/requirements.txt b/api/requirements.txt index 89ff262..b615890 100644 --- a/api/requirements.txt +++ b/api/requirements.txt @@ -7,6 +7,7 @@ yt-dlp==2026.3.17 openai==1.55.3 httpx==0.27.2 requests==2.32.5 +psycopg[binary]==3.2.3 imagehash==4.3.1 Pillow>=11.0 numpy>=2.0