Files
20260512-skg-tk/api/db.py

764 lines
31 KiB
Python

from __future__ import annotations
import os
import time
import uuid
from datetime import datetime, timezone
from typing import Any
try:
import psycopg
from psycopg.rows import dict_row
from psycopg.types.json import Jsonb
except ModuleNotFoundError: # Local dev can still run without Postgres deps installed.
psycopg = None
dict_row = None
Jsonb = None
DATABASE_URL = os.getenv("DATABASE_URL", "").strip()
DB_ENABLED = bool(DATABASE_URL and psycopg is not None)
def enabled() -> bool:
return DB_ENABLED
def _connect():
if not DB_ENABLED:
raise RuntimeError("database disabled")
return psycopg.connect(DATABASE_URL, row_factory=dict_row, connect_timeout=5)
def _dt(ts: float | int | None = None) -> datetime:
try:
value = float(ts or 0)
except (TypeError, ValueError):
value = 0
if value <= 0:
value = time.time()
return datetime.fromtimestamp(value, tz=timezone.utc)
def _json(value: Any):
return Jsonb(value if value is not None else {})
def _execute_safely(label: str, fn):
if not DB_ENABLED:
return None
try:
return fn()
except Exception as exc:
print(f"[db] {label} failed: {exc}", flush=True)
return None
def init_schema() -> bool:
if not DB_ENABLED:
print("[db] disabled: DATABASE_URL is empty or psycopg is missing", flush=True)
return False
ddl = [
"""
CREATE TABLE IF NOT EXISTS app_users (
uid TEXT PRIMARY KEY,
provider TEXT NOT NULL DEFAULT '',
username TEXT NOT NULL DEFAULT '',
name TEXT NOT NULL DEFAULT '',
email TEXT NOT NULL DEFAULT '',
open_id TEXT NOT NULL DEFAULT '',
union_id TEXT NOT NULL DEFAULT '',
tenant_key TEXT NOT NULL DEFAULT '',
avatar_url TEXT NOT NULL DEFAULT '',
first_seen_at TIMESTAMPTZ NOT NULL DEFAULT now(),
last_seen_at TIMESTAMPTZ NOT NULL DEFAULT now(),
last_ip TEXT NOT NULL DEFAULT '',
last_user_agent TEXT NOT NULL DEFAULT '',
metadata JSONB NOT NULL DEFAULT '{}'::jsonb
)
""",
"""
CREATE TABLE IF NOT EXISTS canvas_projects (
id TEXT PRIMARY KEY,
owner_id TEXT NOT NULL REFERENCES app_users(uid) ON DELETE CASCADE,
name TEXT NOT NULL DEFAULT '',
thumbnail TEXT NOT NULL DEFAULT '',
visibility TEXT NOT NULL DEFAULT 'private',
canvas_data JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
deleted_at TIMESTAMPTZ,
version INTEGER NOT NULL DEFAULT 1,
source TEXT NOT NULL DEFAULT 'canvas',
metadata JSONB NOT NULL DEFAULT '{}'::jsonb
)
""",
"""
CREATE TABLE IF NOT EXISTS job_index (
job_id TEXT PRIMARY KEY,
owner_id TEXT NOT NULL DEFAULT '',
owner_name TEXT NOT NULL DEFAULT '',
owner_email TEXT NOT NULL DEFAULT '',
owner_provider TEXT NOT NULL DEFAULT '',
tenant_key TEXT NOT NULL DEFAULT '',
url TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT '',
progress INTEGER NOT NULL DEFAULT 0,
message TEXT NOT NULL DEFAULT '',
job_kind TEXT NOT NULL DEFAULT '',
width INTEGER NOT NULL DEFAULT 0,
height INTEGER NOT NULL DEFAULT 0,
duration DOUBLE PRECISION NOT NULL DEFAULT 0,
frame_count INTEGER NOT NULL DEFAULT 0,
video_count INTEGER NOT NULL DEFAULT 0,
thumbnail TEXT NOT NULL DEFAULT '',
state_path TEXT NOT NULL DEFAULT '',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
last_synced_at TIMESTAMPTZ NOT NULL DEFAULT now(),
payload JSONB NOT NULL DEFAULT '{}'::jsonb
)
""",
"""
CREATE TABLE IF NOT EXISTS generated_assets (
asset_key TEXT PRIMARY KEY,
asset_id TEXT NOT NULL DEFAULT '',
job_id TEXT NOT NULL DEFAULT '',
owner_id TEXT NOT NULL DEFAULT '',
kind TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT '',
url TEXT NOT NULL DEFAULT '',
model TEXT NOT NULL DEFAULT '',
prompt TEXT NOT NULL DEFAULT '',
width INTEGER NOT NULL DEFAULT 0,
height INTEGER NOT NULL DEFAULT 0,
duration DOUBLE PRECISION NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
metadata JSONB NOT NULL DEFAULT '{}'::jsonb
)
""",
"""
CREATE TABLE IF NOT EXISTS prompt_library_index (
item_id TEXT PRIMARY KEY,
owner_id TEXT NOT NULL DEFAULT '',
category TEXT NOT NULL DEFAULT '',
name TEXT NOT NULL DEFAULT '',
tags JSONB NOT NULL DEFAULT '[]'::jsonb,
visibility TEXT NOT NULL DEFAULT 'company',
source_job_id TEXT NOT NULL DEFAULT '',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
payload JSONB NOT NULL DEFAULT '{}'::jsonb
)
""",
"""
CREATE TABLE IF NOT EXISTS asset_library_index (
item_key TEXT PRIMARY KEY,
item_id TEXT NOT NULL DEFAULT '',
owner_id TEXT NOT NULL DEFAULT '',
kind TEXT NOT NULL DEFAULT '',
name TEXT NOT NULL DEFAULT '',
tags JSONB NOT NULL DEFAULT '[]'::jsonb,
visibility TEXT NOT NULL DEFAULT 'company',
source_job_id TEXT NOT NULL DEFAULT '',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
payload JSONB NOT NULL DEFAULT '{}'::jsonb
)
""",
"""
CREATE TABLE IF NOT EXISTS agent_run_index (
run_id TEXT PRIMARY KEY,
job_id TEXT NOT NULL DEFAULT '',
owner_id TEXT NOT NULL DEFAULT '',
owner_name TEXT NOT NULL DEFAULT '',
owner_email TEXT NOT NULL DEFAULT '',
owner_provider TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT '',
stage TEXT NOT NULL DEFAULT '',
progress INTEGER NOT NULL DEFAULT 0,
final_video_url TEXT NOT NULL DEFAULT '',
contact_sheet_url TEXT NOT NULL DEFAULT '',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
payload JSONB NOT NULL DEFAULT '{}'::jsonb
)
""",
"""
CREATE TABLE IF NOT EXISTS audit_events (
id UUID PRIMARY KEY,
ts TIMESTAMPTZ NOT NULL DEFAULT now(),
user_id TEXT NOT NULL DEFAULT '',
action TEXT NOT NULL,
entity_type TEXT NOT NULL DEFAULT '',
entity_id TEXT NOT NULL DEFAULT '',
visibility TEXT NOT NULL DEFAULT '',
ip TEXT NOT NULL DEFAULT '',
user_agent TEXT NOT NULL DEFAULT '',
metadata JSONB NOT NULL DEFAULT '{}'::jsonb
)
""",
"CREATE INDEX IF NOT EXISTS idx_canvas_projects_owner_updated ON canvas_projects(owner_id, updated_at DESC) WHERE deleted_at IS NULL",
"CREATE INDEX IF NOT EXISTS idx_canvas_projects_visibility_updated ON canvas_projects(visibility, updated_at DESC) WHERE deleted_at IS NULL",
"CREATE INDEX IF NOT EXISTS idx_job_index_owner_updated ON job_index(owner_id, updated_at DESC)",
"CREATE INDEX IF NOT EXISTS idx_generated_assets_owner_created ON generated_assets(owner_id, created_at DESC)",
"CREATE INDEX IF NOT EXISTS idx_prompt_library_visibility ON prompt_library_index(visibility, updated_at DESC)",
"CREATE INDEX IF NOT EXISTS idx_asset_library_visibility ON asset_library_index(visibility, updated_at DESC)",
"CREATE INDEX IF NOT EXISTS idx_audit_events_user_ts ON audit_events(user_id, ts DESC)",
]
def run():
with _connect() as conn:
with conn.cursor() as cur:
for stmt in ddl:
cur.execute(stmt)
conn.commit()
return True
return bool(_execute_safely("init_schema", run))
def health() -> dict:
if not DB_ENABLED:
return {"enabled": False, "connected": False}
def run():
with _connect() as conn:
with conn.cursor() as cur:
cur.execute("SELECT 1 AS ok")
cur.fetchone()
return {"enabled": True, "connected": True}
return _execute_safely("health", run) or {"enabled": True, "connected": False}
def request_ip(request: Any) -> str:
if request is None:
return ""
forwarded = str(request.headers.get("x-forwarded-for") or "").split(",", 1)[0].strip()
return forwarded or getattr(getattr(request, "client", None), "host", "") or ""
def request_user_agent(request: Any) -> str:
if request is None:
return ""
return str(request.headers.get("user-agent") or "")[:600]
def upsert_user(user: dict, request: Any = None) -> None:
uid = str(user.get("uid") or "").strip()
if not uid:
return
payload = {
"username": str(user.get("username") or user.get("u") or ""),
"name": str(user.get("name") or ""),
"email": str(user.get("email") or ""),
"open_id": str(user.get("open_id") or ""),
"union_id": str(user.get("union_id") or ""),
"tenant_key": str(user.get("tenant_key") or ""),
"avatar_url": str(user.get("avatar_url") or ""),
"provider": str(user.get("provider") or ""),
}
def run():
with _connect() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO app_users (
uid, provider, username, name, email, open_id, union_id,
tenant_key, avatar_url, last_ip, last_user_agent, metadata
)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT (uid) DO UPDATE SET
provider = EXCLUDED.provider,
username = EXCLUDED.username,
name = EXCLUDED.name,
email = EXCLUDED.email,
open_id = EXCLUDED.open_id,
union_id = EXCLUDED.union_id,
tenant_key = EXCLUDED.tenant_key,
avatar_url = EXCLUDED.avatar_url,
last_seen_at = now(),
last_ip = EXCLUDED.last_ip,
last_user_agent = EXCLUDED.last_user_agent,
metadata = EXCLUDED.metadata
""",
(
uid,
payload["provider"],
payload["username"],
payload["name"],
payload["email"],
payload["open_id"],
payload["union_id"],
payload["tenant_key"],
payload["avatar_url"],
request_ip(request),
request_user_agent(request),
_json(payload),
),
)
conn.commit()
_execute_safely("upsert_user", run)
def audit(user: dict | None, action: str, entity_type: str = "", entity_id: str = "", metadata: dict | None = None, request: Any = None, visibility: str = "") -> None:
def run():
with _connect() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO audit_events (id, user_id, action, entity_type, entity_id, visibility, ip, user_agent, metadata)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)
""",
(
str(uuid.uuid4()),
str((user or {}).get("uid") or ""),
action,
entity_type,
entity_id,
visibility,
request_ip(request),
request_user_agent(request),
_json(metadata or {}),
),
)
conn.commit()
_execute_safely("audit", run)
def list_canvas_projects(user: dict, include_shared: bool = True) -> list[dict]:
uid = str(user.get("uid") or "")
tenant_key = str(user.get("tenant_key") or "")
def run():
with _connect() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT
p.id, p.name, p.thumbnail, p.visibility, p.canvas_data,
p.created_at, p.updated_at, p.version, p.owner_id,
u.name AS owner_name, u.email AS owner_email, u.provider AS owner_provider
FROM canvas_projects p
LEFT JOIN app_users u ON u.uid = p.owner_id
WHERE p.deleted_at IS NULL
AND (
p.owner_id = %s
OR (%s AND p.visibility = 'company')
OR (%s AND p.visibility = 'team' AND COALESCE(u.tenant_key, '') = %s)
)
ORDER BY p.updated_at DESC
LIMIT 500
""",
(uid, include_shared, bool(tenant_key), tenant_key),
)
rows = cur.fetchall()
return [dict(row) for row in rows]
return _execute_safely("list_canvas_projects", run) or []
def get_canvas_project(project_id: str, user: dict) -> dict | None:
uid = str(user.get("uid") or "")
tenant_key = str(user.get("tenant_key") or "")
def run():
with _connect() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT p.*, u.tenant_key AS owner_tenant_key
FROM canvas_projects p
LEFT JOIN app_users u ON u.uid = p.owner_id
WHERE p.id = %s AND p.deleted_at IS NULL
""",
(project_id,),
)
row = cur.fetchone()
if not row:
return None
if row["owner_id"] == uid or row["visibility"] == "company" or (row["visibility"] == "team" and tenant_key and row["owner_tenant_key"] == tenant_key):
return dict(row)
return None
return _execute_safely("get_canvas_project", run)
def upsert_canvas_project(user: dict, project: dict) -> dict | None:
uid = str(user.get("uid") or "")
if not uid:
return None
project_id = str(project.get("id") or "").strip()
if not project_id:
project_id = f"project_{int(time.time() * 1000)}_{uuid.uuid4().hex[:9]}"
name = str(project.get("name") or "未命名项目").strip() or "未命名项目"
thumbnail = str(project.get("thumbnail") or "")
visibility = str(project.get("visibility") or "private").strip()
if visibility not in {"private", "team", "company"}:
visibility = "private"
canvas_data = project.get("canvas_data") or project.get("canvasData") or {"nodes": [], "edges": [], "viewport": {"x": 100, "y": 50, "zoom": 0.8}}
created_at = _dt(project.get("created_at") or project.get("createdAt"))
updated_at = _dt(project.get("updated_at") or project.get("updatedAt"))
def run():
with _connect() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO canvas_projects (
id, owner_id, name, thumbnail, visibility, canvas_data,
created_at, updated_at, version, source, metadata
)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,1,%s,%s)
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
thumbnail = EXCLUDED.thumbnail,
visibility = CASE
WHEN canvas_projects.owner_id = EXCLUDED.owner_id THEN EXCLUDED.visibility
ELSE canvas_projects.visibility
END,
canvas_data = CASE
WHEN canvas_projects.owner_id = EXCLUDED.owner_id THEN EXCLUDED.canvas_data
ELSE canvas_projects.canvas_data
END,
updated_at = CASE
WHEN canvas_projects.owner_id = EXCLUDED.owner_id THEN GREATEST(canvas_projects.updated_at, EXCLUDED.updated_at)
ELSE canvas_projects.updated_at
END,
version = CASE
WHEN canvas_projects.owner_id = EXCLUDED.owner_id THEN canvas_projects.version + 1
ELSE canvas_projects.version
END,
deleted_at = CASE
WHEN canvas_projects.owner_id = EXCLUDED.owner_id THEN NULL
ELSE canvas_projects.deleted_at
END
RETURNING id, name, thumbnail, visibility, canvas_data, created_at, updated_at, version, owner_id
""",
(
project_id,
uid,
name,
thumbnail,
visibility,
_json(canvas_data),
created_at,
updated_at,
str(project.get("source") or "canvas"),
_json({"migrated_from": project.get("source") or "canvas"}),
),
)
row = cur.fetchone()
conn.commit()
return dict(row) if row else None
return _execute_safely("upsert_canvas_project", run)
def soft_delete_canvas_project(user: dict, project_id: str) -> bool:
uid = str(user.get("uid") or "")
def run():
with _connect() as conn:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE canvas_projects
SET deleted_at = now(), updated_at = now(), version = version + 1
WHERE id = %s AND owner_id = %s AND deleted_at IS NULL
""",
(project_id, uid),
)
changed = cur.rowcount > 0
conn.commit()
return changed
return bool(_execute_safely("soft_delete_canvas_project", run))
def index_job(job: dict, state_path: str = "") -> None:
job_id = str(job.get("id") or "")
if not job_id:
return
frames = job.get("frames") or []
generated_videos = job.get("generated_videos") or []
thumbnail = ""
if frames:
first = frames[0] if isinstance(frames[0], dict) else {}
thumbnail = str(first.get("url") or "")
updated_at = _dt()
def run():
with _connect() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO job_index (
job_id, owner_id, owner_name, owner_email, owner_provider, tenant_key,
url, status, progress, message, job_kind, width, height, duration,
frame_count, video_count, thumbnail, state_path, updated_at, payload
)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT (job_id) DO UPDATE SET
owner_id = EXCLUDED.owner_id,
owner_name = EXCLUDED.owner_name,
owner_email = EXCLUDED.owner_email,
owner_provider = EXCLUDED.owner_provider,
tenant_key = EXCLUDED.tenant_key,
url = EXCLUDED.url,
status = EXCLUDED.status,
progress = EXCLUDED.progress,
message = EXCLUDED.message,
job_kind = EXCLUDED.job_kind,
width = EXCLUDED.width,
height = EXCLUDED.height,
duration = EXCLUDED.duration,
frame_count = EXCLUDED.frame_count,
video_count = EXCLUDED.video_count,
thumbnail = EXCLUDED.thumbnail,
state_path = EXCLUDED.state_path,
updated_at = EXCLUDED.updated_at,
last_synced_at = now(),
payload = EXCLUDED.payload
""",
(
job_id,
str(job.get("owner_id") or ""),
str(job.get("owner_name") or ""),
str(job.get("owner_email") or ""),
str(job.get("owner_provider") or ""),
str(job.get("tenant_key") or ""),
str(job.get("url") or ""),
str(job.get("status") or ""),
int(job.get("progress") or 0),
str(job.get("message") or "")[:1000],
str(job.get("url") or "").split("://", 1)[0] or "job",
int(job.get("width") or 0),
int(job.get("height") or 0),
float(job.get("duration") or 0),
len(frames),
len(generated_videos),
thumbnail,
state_path,
updated_at,
_json(job),
),
)
for frame in frames:
if not isinstance(frame, dict):
continue
frame_idx = frame.get("index", 0)
for image in frame.get("generated_images") or []:
if not isinstance(image, dict):
continue
asset_key = f"{job_id}:image:{image.get('id')}"
cur.execute(
"""
INSERT INTO generated_assets (
asset_key, asset_id, job_id, owner_id, kind, status, url,
model, prompt, created_at, updated_at, metadata
)
VALUES (%s,%s,%s,%s,'image','completed',%s,%s,%s,%s,%s,%s)
ON CONFLICT (asset_key) DO UPDATE SET
status = EXCLUDED.status,
url = EXCLUDED.url,
model = EXCLUDED.model,
prompt = EXCLUDED.prompt,
updated_at = EXCLUDED.updated_at,
metadata = EXCLUDED.metadata
""",
(
asset_key,
str(image.get("id") or ""),
job_id,
str(job.get("owner_id") or ""),
str(image.get("url") or ""),
str(image.get("model") or ""),
str(image.get("prompt") or ""),
_dt(image.get("created_at")),
updated_at,
_json({"frame_idx": frame_idx, **image}),
),
)
for video in generated_videos:
if not isinstance(video, dict):
continue
asset_key = f"{job_id}:video:{video.get('id')}"
cur.execute(
"""
INSERT INTO generated_assets (
asset_key, asset_id, job_id, owner_id, kind, status, url,
model, prompt, duration, created_at, updated_at, metadata
)
VALUES (%s,%s,%s,%s,'video',%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT (asset_key) DO UPDATE SET
status = EXCLUDED.status,
url = EXCLUDED.url,
model = EXCLUDED.model,
prompt = EXCLUDED.prompt,
duration = EXCLUDED.duration,
updated_at = EXCLUDED.updated_at,
metadata = EXCLUDED.metadata
""",
(
asset_key,
str(video.get("id") or ""),
job_id,
str(job.get("owner_id") or ""),
str(video.get("status") or ""),
str(video.get("url") or ""),
str(video.get("model") or ""),
str(video.get("prompt") or ""),
float(video.get("duration") or 0),
_dt(video.get("created_at")),
updated_at,
_json(video),
),
)
conn.commit()
_execute_safely("index_job", run)
def index_agent_run(run_payload: dict) -> None:
run_id = str(run_payload.get("id") or "")
if not run_id:
return
def run():
with _connect() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO agent_run_index (
run_id, job_id, owner_id, owner_name, owner_email, owner_provider,
status, stage, progress, final_video_url, contact_sheet_url,
created_at, updated_at, payload
)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT (run_id) DO UPDATE SET
job_id = EXCLUDED.job_id,
owner_id = EXCLUDED.owner_id,
owner_name = EXCLUDED.owner_name,
owner_email = EXCLUDED.owner_email,
owner_provider = EXCLUDED.owner_provider,
status = EXCLUDED.status,
stage = EXCLUDED.stage,
progress = EXCLUDED.progress,
final_video_url = EXCLUDED.final_video_url,
contact_sheet_url = EXCLUDED.contact_sheet_url,
updated_at = EXCLUDED.updated_at,
payload = EXCLUDED.payload
""",
(
run_id,
str(run_payload.get("job_id") or ""),
str(run_payload.get("owner_id") or ""),
str(run_payload.get("owner_name") or ""),
str(run_payload.get("owner_email") or ""),
str(run_payload.get("owner_provider") or ""),
str(run_payload.get("status") or ""),
str(run_payload.get("stage") or ""),
int(run_payload.get("progress") or 0),
str(run_payload.get("final_video_url") or ""),
str(run_payload.get("contact_sheet_url") or ""),
_dt(run_payload.get("created_at")),
_dt(run_payload.get("updated_at")),
_json(run_payload),
),
)
conn.commit()
_execute_safely("index_agent_run", run)
def index_prompt_item(item: dict, owner_id: str = "") -> None:
item_id = str(item.get("id") or "")
if not item_id:
return
def run():
with _connect() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO prompt_library_index (
item_id, owner_id, category, name, tags, source_job_id,
created_at, updated_at, payload
)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT (item_id) DO UPDATE SET
owner_id = EXCLUDED.owner_id,
category = EXCLUDED.category,
name = EXCLUDED.name,
tags = EXCLUDED.tags,
source_job_id = EXCLUDED.source_job_id,
updated_at = EXCLUDED.updated_at,
payload = EXCLUDED.payload
""",
(
item_id,
owner_id,
str(item.get("category") or ""),
str(item.get("name") or ""),
_json(item.get("tags") or []),
str(item.get("source_job_id") or ""),
_dt(item.get("created_at")),
_dt(item.get("updated_at") or item.get("created_at")),
_json(item),
),
)
conn.commit()
_execute_safely("index_prompt_item", run)
def index_asset_item(item: dict, owner_id: str = "") -> None:
item_id = str(item.get("id") or "")
kind = str(item.get("kind") or "")
if not item_id or not kind:
return
item_key = f"{kind}:{item_id}"
def run():
with _connect() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO asset_library_index (
item_key, item_id, owner_id, kind, name, tags, source_job_id,
created_at, updated_at, payload
)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT (item_key) DO UPDATE SET
owner_id = EXCLUDED.owner_id,
kind = EXCLUDED.kind,
name = EXCLUDED.name,
tags = EXCLUDED.tags,
source_job_id = EXCLUDED.source_job_id,
updated_at = EXCLUDED.updated_at,
payload = EXCLUDED.payload
""",
(
item_key,
item_id,
owner_id,
kind,
str(item.get("name") or ""),
_json(item.get("tags") or []),
str(item.get("source_job_id") or ""),
_dt(item.get("created_at")),
_dt(item.get("updated_at") or item.get("created_at")),
_json(item),
),
)
conn.commit()
_execute_safely("index_asset_item", run)