from __future__ import annotations import logging import os import threading import time import uuid from datetime import datetime, timezone from typing import Any try: import psycopg from psycopg.rows import dict_row from psycopg.types.json import Jsonb except ModuleNotFoundError: # Local dev can still run without Postgres deps installed. psycopg = None dict_row = None Jsonb = None try: from psycopg_pool import ConnectionPool except ModuleNotFoundError: # Pool is optional; fall back to per-call connections. ConnectionPool = None logger = logging.getLogger("skg.db") DATABASE_URL = os.getenv("DATABASE_URL", "").strip() DB_ENABLED = bool(DATABASE_URL and psycopg is not None) _POOL = None _POOL_LOCK = threading.Lock() def enabled() -> bool: return DB_ENABLED def _pool(): """Lazily build a process-wide connection pool so concurrent workers/requests don't exhaust Postgres by opening a fresh connection per query.""" global _POOL if _POOL is not None: return _POOL with _POOL_LOCK: if _POOL is None: pool = ConnectionPool( DATABASE_URL, min_size=1, max_size=int(os.getenv("DB_POOL_MAX_SIZE", "10")), timeout=10, kwargs={"row_factory": dict_row, "connect_timeout": 5}, open=False, ) pool.open() _POOL = pool return _POOL def _connect(): if not DB_ENABLED: raise RuntimeError("database disabled") if ConnectionPool is not None: # pool.connection() is a context manager that returns the conn to the # pool on exit, matching the existing `with _connect() as conn:` callers. return _pool().connection() return psycopg.connect(DATABASE_URL, row_factory=dict_row, connect_timeout=5) def _dt(ts: float | int | None = None) -> datetime: try: value = float(ts or 0) except (TypeError, ValueError): value = 0 if value <= 0: value = time.time() return datetime.fromtimestamp(value, tz=timezone.utc) def _json(value: Any): return Jsonb(value if value is not None else {}) def _execute_safely(label: str, fn): # DB disabled is an expected, silent no-op; an actual failure while the DB is # enabled is a real problem (stale job index / dropped audit) and must be loud. if not DB_ENABLED: return None try: return fn() except Exception as exc: logger.error("[db] %s failed: %s", label, exc) return None def init_schema() -> bool: if not DB_ENABLED: print("[db] disabled: DATABASE_URL is empty or psycopg is missing", flush=True) return False ddl = [ """ CREATE TABLE IF NOT EXISTS app_users ( uid TEXT PRIMARY KEY, provider TEXT NOT NULL DEFAULT '', username TEXT NOT NULL DEFAULT '', name TEXT NOT NULL DEFAULT '', email TEXT NOT NULL DEFAULT '', open_id TEXT NOT NULL DEFAULT '', union_id TEXT NOT NULL DEFAULT '', tenant_key TEXT NOT NULL DEFAULT '', avatar_url TEXT NOT NULL DEFAULT '', first_seen_at TIMESTAMPTZ NOT NULL DEFAULT now(), last_seen_at TIMESTAMPTZ NOT NULL DEFAULT now(), last_ip TEXT NOT NULL DEFAULT '', last_user_agent TEXT NOT NULL DEFAULT '', metadata JSONB NOT NULL DEFAULT '{}'::jsonb ) """, """ CREATE TABLE IF NOT EXISTS canvas_projects ( id TEXT PRIMARY KEY, owner_id TEXT NOT NULL REFERENCES app_users(uid) ON DELETE CASCADE, name TEXT NOT NULL DEFAULT '', thumbnail TEXT NOT NULL DEFAULT '', visibility TEXT NOT NULL DEFAULT 'private', canvas_data JSONB NOT NULL DEFAULT '{}'::jsonb, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), deleted_at TIMESTAMPTZ, version INTEGER NOT NULL DEFAULT 1, source TEXT NOT NULL DEFAULT 'canvas', metadata JSONB NOT NULL DEFAULT '{}'::jsonb ) """, """ CREATE TABLE IF NOT EXISTS canvas_workflows ( id TEXT PRIMARY KEY, owner_id TEXT NOT NULL REFERENCES app_users(uid) ON DELETE CASCADE, name TEXT NOT NULL DEFAULT '', description TEXT NOT NULL DEFAULT '', thumbnail TEXT NOT NULL DEFAULT '', workflow_data JSONB NOT NULL DEFAULT '{}'::jsonb, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), deleted_at TIMESTAMPTZ, version INTEGER NOT NULL DEFAULT 1, source TEXT NOT NULL DEFAULT 'canvas', metadata JSONB NOT NULL DEFAULT '{}'::jsonb ) """, """ CREATE TABLE IF NOT EXISTS job_index ( job_id TEXT PRIMARY KEY, owner_id TEXT NOT NULL DEFAULT '', owner_name TEXT NOT NULL DEFAULT '', owner_email TEXT NOT NULL DEFAULT '', owner_provider TEXT NOT NULL DEFAULT '', tenant_key TEXT NOT NULL DEFAULT '', url TEXT NOT NULL DEFAULT '', status TEXT NOT NULL DEFAULT '', progress INTEGER NOT NULL DEFAULT 0, message TEXT NOT NULL DEFAULT '', job_kind TEXT NOT NULL DEFAULT '', width INTEGER NOT NULL DEFAULT 0, height INTEGER NOT NULL DEFAULT 0, duration DOUBLE PRECISION NOT NULL DEFAULT 0, frame_count INTEGER NOT NULL DEFAULT 0, video_count INTEGER NOT NULL DEFAULT 0, thumbnail TEXT NOT NULL DEFAULT '', state_path TEXT NOT NULL DEFAULT '', created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), last_synced_at TIMESTAMPTZ NOT NULL DEFAULT now(), payload JSONB NOT NULL DEFAULT '{}'::jsonb ) """, """ CREATE TABLE IF NOT EXISTS generated_assets ( asset_key TEXT PRIMARY KEY, asset_id TEXT NOT NULL DEFAULT '', job_id TEXT NOT NULL DEFAULT '', owner_id TEXT NOT NULL DEFAULT '', kind TEXT NOT NULL DEFAULT '', status TEXT NOT NULL DEFAULT '', url TEXT NOT NULL DEFAULT '', model TEXT NOT NULL DEFAULT '', prompt TEXT NOT NULL DEFAULT '', width INTEGER NOT NULL DEFAULT 0, height INTEGER NOT NULL DEFAULT 0, duration DOUBLE PRECISION NOT NULL DEFAULT 0, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), metadata JSONB NOT NULL DEFAULT '{}'::jsonb ) """, """ CREATE TABLE IF NOT EXISTS prompt_library_index ( item_id TEXT PRIMARY KEY, owner_id TEXT NOT NULL DEFAULT '', category TEXT NOT NULL DEFAULT '', name TEXT NOT NULL DEFAULT '', tags JSONB NOT NULL DEFAULT '[]'::jsonb, visibility TEXT NOT NULL DEFAULT 'company', source_job_id TEXT NOT NULL DEFAULT '', created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), payload JSONB NOT NULL DEFAULT '{}'::jsonb ) """, """ CREATE TABLE IF NOT EXISTS asset_library_index ( item_key TEXT PRIMARY KEY, item_id TEXT NOT NULL DEFAULT '', owner_id TEXT NOT NULL DEFAULT '', kind TEXT NOT NULL DEFAULT '', name TEXT NOT NULL DEFAULT '', tags JSONB NOT NULL DEFAULT '[]'::jsonb, visibility TEXT NOT NULL DEFAULT 'company', source_job_id TEXT NOT NULL DEFAULT '', created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), payload JSONB NOT NULL DEFAULT '{}'::jsonb ) """, """ CREATE TABLE IF NOT EXISTS agent_run_index ( run_id TEXT PRIMARY KEY, job_id TEXT NOT NULL DEFAULT '', owner_id TEXT NOT NULL DEFAULT '', owner_name TEXT NOT NULL DEFAULT '', owner_email TEXT NOT NULL DEFAULT '', owner_provider TEXT NOT NULL DEFAULT '', status TEXT NOT NULL DEFAULT '', stage TEXT NOT NULL DEFAULT '', progress INTEGER NOT NULL DEFAULT 0, final_video_url TEXT NOT NULL DEFAULT '', contact_sheet_url TEXT NOT NULL DEFAULT '', created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), payload JSONB NOT NULL DEFAULT '{}'::jsonb ) """, """ CREATE TABLE IF NOT EXISTS audit_events ( id UUID PRIMARY KEY, ts TIMESTAMPTZ NOT NULL DEFAULT now(), user_id TEXT NOT NULL DEFAULT '', action TEXT NOT NULL, entity_type TEXT NOT NULL DEFAULT '', entity_id TEXT NOT NULL DEFAULT '', visibility TEXT NOT NULL DEFAULT '', ip TEXT NOT NULL DEFAULT '', user_agent TEXT NOT NULL DEFAULT '', metadata JSONB NOT NULL DEFAULT '{}'::jsonb ) """, "CREATE INDEX IF NOT EXISTS idx_canvas_projects_owner_updated ON canvas_projects(owner_id, updated_at DESC) WHERE deleted_at IS NULL", "CREATE INDEX IF NOT EXISTS idx_canvas_projects_visibility_updated ON canvas_projects(visibility, updated_at DESC) WHERE deleted_at IS NULL", "CREATE INDEX IF NOT EXISTS idx_canvas_workflows_owner_updated ON canvas_workflows(owner_id, updated_at DESC) WHERE deleted_at IS NULL", "CREATE INDEX IF NOT EXISTS idx_job_index_owner_updated ON job_index(owner_id, updated_at DESC)", "CREATE INDEX IF NOT EXISTS idx_generated_assets_owner_created ON generated_assets(owner_id, created_at DESC)", "CREATE INDEX IF NOT EXISTS idx_prompt_library_visibility ON prompt_library_index(visibility, updated_at DESC)", "CREATE INDEX IF NOT EXISTS idx_asset_library_visibility ON asset_library_index(visibility, updated_at DESC)", "CREATE INDEX IF NOT EXISTS idx_audit_events_user_ts ON audit_events(user_id, ts DESC)", ] def run(): with _connect() as conn: with conn.cursor() as cur: for stmt in ddl: cur.execute(stmt) conn.commit() return True return bool(_execute_safely("init_schema", run)) def health() -> dict: if not DB_ENABLED: return {"enabled": False, "connected": False} def run(): with _connect() as conn: with conn.cursor() as cur: cur.execute("SELECT 1 AS ok") cur.fetchone() return {"enabled": True, "connected": True} return _execute_safely("health", run) or {"enabled": True, "connected": False} def request_ip(request: Any) -> str: if request is None: return "" forwarded = str(request.headers.get("x-forwarded-for") or "").split(",", 1)[0].strip() return forwarded or getattr(getattr(request, "client", None), "host", "") or "" def request_user_agent(request: Any) -> str: if request is None: return "" return str(request.headers.get("user-agent") or "")[:600] def upsert_user(user: dict, request: Any = None) -> None: uid = str(user.get("uid") or "").strip() if not uid: return payload = { "username": str(user.get("username") or user.get("u") or ""), "name": str(user.get("name") or ""), "email": str(user.get("email") or ""), "open_id": str(user.get("open_id") or ""), "union_id": str(user.get("union_id") or ""), "tenant_key": str(user.get("tenant_key") or ""), "avatar_url": str(user.get("avatar_url") or ""), "provider": str(user.get("provider") or ""), } def run(): with _connect() as conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO app_users ( uid, provider, username, name, email, open_id, union_id, tenant_key, avatar_url, last_ip, last_user_agent, metadata ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (uid) DO UPDATE SET provider = EXCLUDED.provider, username = EXCLUDED.username, name = EXCLUDED.name, email = EXCLUDED.email, open_id = EXCLUDED.open_id, union_id = EXCLUDED.union_id, tenant_key = EXCLUDED.tenant_key, avatar_url = EXCLUDED.avatar_url, last_seen_at = now(), last_ip = EXCLUDED.last_ip, last_user_agent = EXCLUDED.last_user_agent, metadata = EXCLUDED.metadata """, ( uid, payload["provider"], payload["username"], payload["name"], payload["email"], payload["open_id"], payload["union_id"], payload["tenant_key"], payload["avatar_url"], request_ip(request), request_user_agent(request), _json(payload), ), ) conn.commit() _execute_safely("upsert_user", run) def audit(user: dict | None, action: str, entity_type: str = "", entity_id: str = "", metadata: dict | None = None, request: Any = None, visibility: str = "") -> None: def run(): with _connect() as conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO audit_events (id, user_id, action, entity_type, entity_id, visibility, ip, user_agent, metadata) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s) """, ( str(uuid.uuid4()), str((user or {}).get("uid") or ""), action, entity_type, entity_id, visibility, request_ip(request), request_user_agent(request), _json(metadata or {}), ), ) conn.commit() _execute_safely("audit", run) def list_canvas_projects(user: dict, include_shared: bool = True) -> list[dict]: uid = str(user.get("uid") or "") tenant_key = str(user.get("tenant_key") or "") def run(): with _connect() as conn: with conn.cursor() as cur: cur.execute( """ SELECT p.id, p.name, p.thumbnail, p.visibility, p.canvas_data, p.created_at, p.updated_at, p.version, p.owner_id, u.name AS owner_name, u.email AS owner_email, u.provider AS owner_provider FROM canvas_projects p LEFT JOIN app_users u ON u.uid = p.owner_id WHERE p.deleted_at IS NULL AND ( p.owner_id = %s OR (%s AND p.visibility = 'company') OR (%s AND p.visibility = 'team' AND COALESCE(u.tenant_key, '') = %s) ) ORDER BY p.updated_at DESC LIMIT 500 """, (uid, include_shared, bool(tenant_key), tenant_key), ) rows = cur.fetchall() return [dict(row) for row in rows] return _execute_safely("list_canvas_projects", run) or [] def get_canvas_project(project_id: str, user: dict) -> dict | None: uid = str(user.get("uid") or "") tenant_key = str(user.get("tenant_key") or "") def run(): with _connect() as conn: with conn.cursor() as cur: cur.execute( """ SELECT p.*, u.tenant_key AS owner_tenant_key FROM canvas_projects p LEFT JOIN app_users u ON u.uid = p.owner_id WHERE p.id = %s AND p.deleted_at IS NULL """, (project_id,), ) row = cur.fetchone() if not row: return None if row["owner_id"] == uid or row["visibility"] == "company" or (row["visibility"] == "team" and tenant_key and row["owner_tenant_key"] == tenant_key): return dict(row) return None return _execute_safely("get_canvas_project", run) def upsert_canvas_project(user: dict, project: dict) -> dict | None: uid = str(user.get("uid") or "") if not uid: return None project_id = str(project.get("id") or "").strip() if not project_id: project_id = f"project_{int(time.time() * 1000)}_{uuid.uuid4().hex[:9]}" name = str(project.get("name") or "未命名项目").strip() or "未命名项目" thumbnail = str(project.get("thumbnail") or "") visibility = str(project.get("visibility") or "private").strip() if visibility not in {"private", "team", "company"}: visibility = "private" canvas_data = project.get("canvas_data") or project.get("canvasData") or {"nodes": [], "edges": [], "viewport": {"x": 100, "y": 50, "zoom": 0.8}} created_at = _dt(project.get("created_at") or project.get("createdAt")) updated_at = _dt(project.get("updated_at") or project.get("updatedAt")) def run(): with _connect() as conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO canvas_projects ( id, owner_id, name, thumbnail, visibility, canvas_data, created_at, updated_at, version, source, metadata ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,1,%s,%s) ON CONFLICT (id) DO UPDATE SET name = CASE WHEN canvas_projects.owner_id = EXCLUDED.owner_id AND EXCLUDED.updated_at >= canvas_projects.updated_at THEN EXCLUDED.name ELSE canvas_projects.name END, thumbnail = CASE WHEN canvas_projects.owner_id = EXCLUDED.owner_id AND EXCLUDED.updated_at >= canvas_projects.updated_at THEN EXCLUDED.thumbnail ELSE canvas_projects.thumbnail END, visibility = CASE WHEN canvas_projects.owner_id = EXCLUDED.owner_id AND EXCLUDED.updated_at >= canvas_projects.updated_at THEN EXCLUDED.visibility ELSE canvas_projects.visibility END, canvas_data = CASE WHEN canvas_projects.owner_id = EXCLUDED.owner_id AND EXCLUDED.updated_at >= canvas_projects.updated_at THEN EXCLUDED.canvas_data ELSE canvas_projects.canvas_data END, updated_at = CASE WHEN canvas_projects.owner_id = EXCLUDED.owner_id THEN GREATEST(canvas_projects.updated_at, EXCLUDED.updated_at) ELSE canvas_projects.updated_at END, version = CASE WHEN canvas_projects.owner_id = EXCLUDED.owner_id AND EXCLUDED.updated_at >= canvas_projects.updated_at THEN canvas_projects.version + 1 ELSE canvas_projects.version END, deleted_at = CASE WHEN canvas_projects.owner_id = EXCLUDED.owner_id AND EXCLUDED.updated_at >= canvas_projects.updated_at THEN NULL ELSE canvas_projects.deleted_at END WHERE canvas_projects.owner_id = EXCLUDED.owner_id RETURNING id, name, thumbnail, visibility, canvas_data, created_at, updated_at, version, owner_id """, ( project_id, uid, name, thumbnail, visibility, _json(canvas_data), created_at, updated_at, str(project.get("source") or "canvas"), _json({"migrated_from": project.get("source") or "canvas"}), ), ) row = cur.fetchone() conn.commit() return dict(row) if row else None return _execute_safely("upsert_canvas_project", run) def soft_delete_canvas_project(user: dict, project_id: str) -> bool: uid = str(user.get("uid") or "") def run(): with _connect() as conn: with conn.cursor() as cur: cur.execute( """ UPDATE canvas_projects SET deleted_at = now(), updated_at = now(), version = version + 1 WHERE id = %s AND owner_id = %s AND deleted_at IS NULL """, (project_id, uid), ) changed = cur.rowcount > 0 conn.commit() return changed return bool(_execute_safely("soft_delete_canvas_project", run)) def list_canvas_workflows(user: dict) -> list[dict]: uid = str(user.get("uid") or "") def run(): with _connect() as conn: with conn.cursor() as cur: cur.execute( """ SELECT w.id, w.name, w.description, w.thumbnail, w.workflow_data, w.created_at, w.updated_at, w.version, w.owner_id, u.name AS owner_name, u.email AS owner_email, u.provider AS owner_provider FROM canvas_workflows w LEFT JOIN app_users u ON u.uid = w.owner_id WHERE w.deleted_at IS NULL AND w.owner_id = %s ORDER BY w.updated_at DESC LIMIT 500 """, (uid,), ) rows = cur.fetchall() return [dict(row) for row in rows] return _execute_safely("list_canvas_workflows", run) or [] def upsert_canvas_workflow(user: dict, workflow: dict) -> dict | None: uid = str(user.get("uid") or "") if not uid: return None workflow_id = str(workflow.get("id") or "").strip() if not workflow_id: workflow_id = f"workflow_{int(time.time() * 1000)}_{uuid.uuid4().hex[:9]}" name = str(workflow.get("name") or "未命名工作流").strip() or "未命名工作流" description = str(workflow.get("description") or "").strip() thumbnail = str(workflow.get("thumbnail") or "") workflow_data = workflow.get("workflow_data") or workflow.get("workflowData") or {"nodes": [], "edges": [], "viewport": {"x": 100, "y": 50, "zoom": 0.8}} created_at = _dt(workflow.get("created_at") or workflow.get("createdAt")) updated_at = _dt(workflow.get("updated_at") or workflow.get("updatedAt")) def run(): with _connect() as conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO canvas_workflows ( id, owner_id, name, description, thumbnail, workflow_data, created_at, updated_at, version, source, metadata ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,1,%s,%s) ON CONFLICT (id) DO UPDATE SET name = CASE WHEN canvas_workflows.owner_id = EXCLUDED.owner_id THEN EXCLUDED.name ELSE canvas_workflows.name END, description = CASE WHEN canvas_workflows.owner_id = EXCLUDED.owner_id THEN EXCLUDED.description ELSE canvas_workflows.description END, thumbnail = CASE WHEN canvas_workflows.owner_id = EXCLUDED.owner_id THEN EXCLUDED.thumbnail ELSE canvas_workflows.thumbnail END, workflow_data = CASE WHEN canvas_workflows.owner_id = EXCLUDED.owner_id THEN EXCLUDED.workflow_data ELSE canvas_workflows.workflow_data END, updated_at = CASE WHEN canvas_workflows.owner_id = EXCLUDED.owner_id THEN EXCLUDED.updated_at ELSE canvas_workflows.updated_at END, version = CASE WHEN canvas_workflows.owner_id = EXCLUDED.owner_id THEN canvas_workflows.version + 1 ELSE canvas_workflows.version END, deleted_at = CASE WHEN canvas_workflows.owner_id = EXCLUDED.owner_id THEN NULL ELSE canvas_workflows.deleted_at END WHERE canvas_workflows.owner_id = EXCLUDED.owner_id RETURNING id, name, description, thumbnail, workflow_data, created_at, updated_at, version, owner_id """, ( workflow_id, uid, name, description, thumbnail, _json(workflow_data), created_at, updated_at, str(workflow.get("source") or "canvas"), _json({"source_project_id": workflow.get("source_project_id") or workflow.get("sourceProjectId") or ""}), ), ) row = cur.fetchone() conn.commit() return dict(row) if row else None return _execute_safely("upsert_canvas_workflow", run) def soft_delete_canvas_workflow(user: dict, workflow_id: str) -> bool: uid = str(user.get("uid") or "") def run(): with _connect() as conn: with conn.cursor() as cur: cur.execute( """ UPDATE canvas_workflows SET deleted_at = now(), updated_at = now(), version = version + 1 WHERE id = %s AND owner_id = %s AND deleted_at IS NULL """, (workflow_id, uid), ) changed = cur.rowcount > 0 conn.commit() return changed return bool(_execute_safely("soft_delete_canvas_workflow", run)) def index_job(job: dict, state_path: str = "") -> None: job_id = str(job.get("id") or "") if not job_id: return frames = job.get("frames") or [] generated_videos = job.get("generated_videos") or [] thumbnail = "" if frames: first = frames[0] if isinstance(frames[0], dict) else {} thumbnail = str(first.get("url") or "") updated_at = _dt() def run(): with _connect() as conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO job_index ( job_id, owner_id, owner_name, owner_email, owner_provider, tenant_key, url, status, progress, message, job_kind, width, height, duration, frame_count, video_count, thumbnail, state_path, updated_at, payload ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (job_id) DO UPDATE SET owner_id = EXCLUDED.owner_id, owner_name = EXCLUDED.owner_name, owner_email = EXCLUDED.owner_email, owner_provider = EXCLUDED.owner_provider, tenant_key = EXCLUDED.tenant_key, url = EXCLUDED.url, status = EXCLUDED.status, progress = EXCLUDED.progress, message = EXCLUDED.message, job_kind = EXCLUDED.job_kind, width = EXCLUDED.width, height = EXCLUDED.height, duration = EXCLUDED.duration, frame_count = EXCLUDED.frame_count, video_count = EXCLUDED.video_count, thumbnail = EXCLUDED.thumbnail, state_path = EXCLUDED.state_path, updated_at = EXCLUDED.updated_at, last_synced_at = now(), payload = EXCLUDED.payload """, ( job_id, str(job.get("owner_id") or ""), str(job.get("owner_name") or ""), str(job.get("owner_email") or ""), str(job.get("owner_provider") or ""), str(job.get("tenant_key") or ""), str(job.get("url") or ""), str(job.get("status") or ""), int(job.get("progress") or 0), str(job.get("message") or "")[:1000], str(job.get("url") or "").split("://", 1)[0] or "job", int(job.get("width") or 0), int(job.get("height") or 0), float(job.get("duration") or 0), len(frames), len(generated_videos), thumbnail, state_path, updated_at, _json(job), ), ) for frame in frames: if not isinstance(frame, dict): continue frame_idx = frame.get("index", 0) for image in frame.get("generated_images") or []: if not isinstance(image, dict): continue asset_key = f"{job_id}:image:{image.get('id')}" cur.execute( """ INSERT INTO generated_assets ( asset_key, asset_id, job_id, owner_id, kind, status, url, model, prompt, created_at, updated_at, metadata ) VALUES (%s,%s,%s,%s,'image','completed',%s,%s,%s,%s,%s,%s) ON CONFLICT (asset_key) DO UPDATE SET status = EXCLUDED.status, url = EXCLUDED.url, model = EXCLUDED.model, prompt = EXCLUDED.prompt, updated_at = EXCLUDED.updated_at, metadata = EXCLUDED.metadata """, ( asset_key, str(image.get("id") or ""), job_id, str(job.get("owner_id") or ""), str(image.get("url") or ""), str(image.get("model") or ""), str(image.get("prompt") or ""), _dt(image.get("created_at")), updated_at, _json({"frame_idx": frame_idx, **image}), ), ) for video in generated_videos: if not isinstance(video, dict): continue asset_key = f"{job_id}:video:{video.get('id')}" cur.execute( """ INSERT INTO generated_assets ( asset_key, asset_id, job_id, owner_id, kind, status, url, model, prompt, duration, created_at, updated_at, metadata ) VALUES (%s,%s,%s,%s,'video',%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (asset_key) DO UPDATE SET status = EXCLUDED.status, url = EXCLUDED.url, model = EXCLUDED.model, prompt = EXCLUDED.prompt, duration = EXCLUDED.duration, updated_at = EXCLUDED.updated_at, metadata = EXCLUDED.metadata """, ( asset_key, str(video.get("id") or ""), job_id, str(job.get("owner_id") or ""), str(video.get("status") or ""), str(video.get("url") or ""), str(video.get("model") or ""), str(video.get("prompt") or ""), float(video.get("duration") or 0), _dt(video.get("created_at")), updated_at, _json(video), ), ) conn.commit() _execute_safely("index_job", run) def index_agent_run(run_payload: dict) -> None: run_id = str(run_payload.get("id") or "") if not run_id: return def run(): with _connect() as conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO agent_run_index ( run_id, job_id, owner_id, owner_name, owner_email, owner_provider, status, stage, progress, final_video_url, contact_sheet_url, created_at, updated_at, payload ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (run_id) DO UPDATE SET job_id = EXCLUDED.job_id, owner_id = EXCLUDED.owner_id, owner_name = EXCLUDED.owner_name, owner_email = EXCLUDED.owner_email, owner_provider = EXCLUDED.owner_provider, status = EXCLUDED.status, stage = EXCLUDED.stage, progress = EXCLUDED.progress, final_video_url = EXCLUDED.final_video_url, contact_sheet_url = EXCLUDED.contact_sheet_url, updated_at = EXCLUDED.updated_at, payload = EXCLUDED.payload """, ( run_id, str(run_payload.get("job_id") or ""), str(run_payload.get("owner_id") or ""), str(run_payload.get("owner_name") or ""), str(run_payload.get("owner_email") or ""), str(run_payload.get("owner_provider") or ""), str(run_payload.get("status") or ""), str(run_payload.get("stage") or ""), int(run_payload.get("progress") or 0), str(run_payload.get("final_video_url") or ""), str(run_payload.get("contact_sheet_url") or ""), _dt(run_payload.get("created_at")), _dt(run_payload.get("updated_at")), _json(run_payload), ), ) conn.commit() _execute_safely("index_agent_run", run) def index_prompt_item(item: dict, owner_id: str = "") -> None: item_id = str(item.get("id") or "") if not item_id: return def run(): with _connect() as conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO prompt_library_index ( item_id, owner_id, category, name, tags, source_job_id, created_at, updated_at, payload ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (item_id) DO UPDATE SET owner_id = EXCLUDED.owner_id, category = EXCLUDED.category, name = EXCLUDED.name, tags = EXCLUDED.tags, source_job_id = EXCLUDED.source_job_id, updated_at = EXCLUDED.updated_at, payload = EXCLUDED.payload """, ( item_id, owner_id, str(item.get("category") or ""), str(item.get("name") or ""), _json(item.get("tags") or []), str(item.get("source_job_id") or ""), _dt(item.get("created_at")), _dt(item.get("updated_at") or item.get("created_at")), _json(item), ), ) conn.commit() _execute_safely("index_prompt_item", run) def index_asset_item(item: dict, owner_id: str = "") -> None: item_id = str(item.get("id") or "") kind = str(item.get("kind") or "") if not item_id or not kind: return item_key = f"{kind}:{item_id}" def run(): with _connect() as conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO asset_library_index ( item_key, item_id, owner_id, kind, name, tags, source_job_id, created_at, updated_at, payload ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (item_key) DO UPDATE SET owner_id = EXCLUDED.owner_id, kind = EXCLUDED.kind, name = EXCLUDED.name, tags = EXCLUDED.tags, source_job_id = EXCLUDED.source_job_id, updated_at = EXCLUDED.updated_at, payload = EXCLUDED.payload """, ( item_key, item_id, owner_id, kind, str(item.get("name") or ""), _json(item.get("tags") or []), str(item.get("source_job_id") or ""), _dt(item.get("created_at")), _dt(item.get("updated_at") or item.get("created_at")), _json(item), ), ) conn.commit() _execute_safely("index_asset_item", run)