912 lines
38 KiB
Python
912 lines
38 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import time
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
try:
|
|
import psycopg
|
|
from psycopg.rows import dict_row
|
|
from psycopg.types.json import Jsonb
|
|
except ModuleNotFoundError: # Local dev can still run without Postgres deps installed.
|
|
psycopg = None
|
|
dict_row = None
|
|
Jsonb = None
|
|
|
|
|
|
DATABASE_URL = os.getenv("DATABASE_URL", "").strip()
|
|
DB_ENABLED = bool(DATABASE_URL and psycopg is not None)
|
|
|
|
|
|
def enabled() -> bool:
|
|
return DB_ENABLED
|
|
|
|
|
|
def _connect():
|
|
if not DB_ENABLED:
|
|
raise RuntimeError("database disabled")
|
|
return psycopg.connect(DATABASE_URL, row_factory=dict_row, connect_timeout=5)
|
|
|
|
|
|
def _dt(ts: float | int | None = None) -> datetime:
|
|
try:
|
|
value = float(ts or 0)
|
|
except (TypeError, ValueError):
|
|
value = 0
|
|
if value <= 0:
|
|
value = time.time()
|
|
return datetime.fromtimestamp(value, tz=timezone.utc)
|
|
|
|
|
|
def _json(value: Any):
|
|
return Jsonb(value if value is not None else {})
|
|
|
|
|
|
def _execute_safely(label: str, fn):
|
|
if not DB_ENABLED:
|
|
return None
|
|
try:
|
|
return fn()
|
|
except Exception as exc:
|
|
print(f"[db] {label} failed: {exc}", flush=True)
|
|
return None
|
|
|
|
|
|
def init_schema() -> bool:
|
|
if not DB_ENABLED:
|
|
print("[db] disabled: DATABASE_URL is empty or psycopg is missing", flush=True)
|
|
return False
|
|
|
|
ddl = [
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS app_users (
|
|
uid TEXT PRIMARY KEY,
|
|
provider TEXT NOT NULL DEFAULT '',
|
|
username TEXT NOT NULL DEFAULT '',
|
|
name TEXT NOT NULL DEFAULT '',
|
|
email TEXT NOT NULL DEFAULT '',
|
|
open_id TEXT NOT NULL DEFAULT '',
|
|
union_id TEXT NOT NULL DEFAULT '',
|
|
tenant_key TEXT NOT NULL DEFAULT '',
|
|
avatar_url TEXT NOT NULL DEFAULT '',
|
|
first_seen_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
last_seen_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
last_ip TEXT NOT NULL DEFAULT '',
|
|
last_user_agent TEXT NOT NULL DEFAULT '',
|
|
metadata JSONB NOT NULL DEFAULT '{}'::jsonb
|
|
)
|
|
""",
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS canvas_projects (
|
|
id TEXT PRIMARY KEY,
|
|
owner_id TEXT NOT NULL REFERENCES app_users(uid) ON DELETE CASCADE,
|
|
name TEXT NOT NULL DEFAULT '',
|
|
thumbnail TEXT NOT NULL DEFAULT '',
|
|
visibility TEXT NOT NULL DEFAULT 'private',
|
|
canvas_data JSONB NOT NULL DEFAULT '{}'::jsonb,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
deleted_at TIMESTAMPTZ,
|
|
version INTEGER NOT NULL DEFAULT 1,
|
|
source TEXT NOT NULL DEFAULT 'canvas',
|
|
metadata JSONB NOT NULL DEFAULT '{}'::jsonb
|
|
)
|
|
""",
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS canvas_workflows (
|
|
id TEXT PRIMARY KEY,
|
|
owner_id TEXT NOT NULL REFERENCES app_users(uid) ON DELETE CASCADE,
|
|
name TEXT NOT NULL DEFAULT '',
|
|
description TEXT NOT NULL DEFAULT '',
|
|
thumbnail TEXT NOT NULL DEFAULT '',
|
|
workflow_data JSONB NOT NULL DEFAULT '{}'::jsonb,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
deleted_at TIMESTAMPTZ,
|
|
version INTEGER NOT NULL DEFAULT 1,
|
|
source TEXT NOT NULL DEFAULT 'canvas',
|
|
metadata JSONB NOT NULL DEFAULT '{}'::jsonb
|
|
)
|
|
""",
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS job_index (
|
|
job_id TEXT PRIMARY KEY,
|
|
owner_id TEXT NOT NULL DEFAULT '',
|
|
owner_name TEXT NOT NULL DEFAULT '',
|
|
owner_email TEXT NOT NULL DEFAULT '',
|
|
owner_provider TEXT NOT NULL DEFAULT '',
|
|
tenant_key TEXT NOT NULL DEFAULT '',
|
|
url TEXT NOT NULL DEFAULT '',
|
|
status TEXT NOT NULL DEFAULT '',
|
|
progress INTEGER NOT NULL DEFAULT 0,
|
|
message TEXT NOT NULL DEFAULT '',
|
|
job_kind TEXT NOT NULL DEFAULT '',
|
|
width INTEGER NOT NULL DEFAULT 0,
|
|
height INTEGER NOT NULL DEFAULT 0,
|
|
duration DOUBLE PRECISION NOT NULL DEFAULT 0,
|
|
frame_count INTEGER NOT NULL DEFAULT 0,
|
|
video_count INTEGER NOT NULL DEFAULT 0,
|
|
thumbnail TEXT NOT NULL DEFAULT '',
|
|
state_path TEXT NOT NULL DEFAULT '',
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
last_synced_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
payload JSONB NOT NULL DEFAULT '{}'::jsonb
|
|
)
|
|
""",
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS generated_assets (
|
|
asset_key TEXT PRIMARY KEY,
|
|
asset_id TEXT NOT NULL DEFAULT '',
|
|
job_id TEXT NOT NULL DEFAULT '',
|
|
owner_id TEXT NOT NULL DEFAULT '',
|
|
kind TEXT NOT NULL DEFAULT '',
|
|
status TEXT NOT NULL DEFAULT '',
|
|
url TEXT NOT NULL DEFAULT '',
|
|
model TEXT NOT NULL DEFAULT '',
|
|
prompt TEXT NOT NULL DEFAULT '',
|
|
width INTEGER NOT NULL DEFAULT 0,
|
|
height INTEGER NOT NULL DEFAULT 0,
|
|
duration DOUBLE PRECISION NOT NULL DEFAULT 0,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
metadata JSONB NOT NULL DEFAULT '{}'::jsonb
|
|
)
|
|
""",
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS prompt_library_index (
|
|
item_id TEXT PRIMARY KEY,
|
|
owner_id TEXT NOT NULL DEFAULT '',
|
|
category TEXT NOT NULL DEFAULT '',
|
|
name TEXT NOT NULL DEFAULT '',
|
|
tags JSONB NOT NULL DEFAULT '[]'::jsonb,
|
|
visibility TEXT NOT NULL DEFAULT 'company',
|
|
source_job_id TEXT NOT NULL DEFAULT '',
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
payload JSONB NOT NULL DEFAULT '{}'::jsonb
|
|
)
|
|
""",
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS asset_library_index (
|
|
item_key TEXT PRIMARY KEY,
|
|
item_id TEXT NOT NULL DEFAULT '',
|
|
owner_id TEXT NOT NULL DEFAULT '',
|
|
kind TEXT NOT NULL DEFAULT '',
|
|
name TEXT NOT NULL DEFAULT '',
|
|
tags JSONB NOT NULL DEFAULT '[]'::jsonb,
|
|
visibility TEXT NOT NULL DEFAULT 'company',
|
|
source_job_id TEXT NOT NULL DEFAULT '',
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
payload JSONB NOT NULL DEFAULT '{}'::jsonb
|
|
)
|
|
""",
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS agent_run_index (
|
|
run_id TEXT PRIMARY KEY,
|
|
job_id TEXT NOT NULL DEFAULT '',
|
|
owner_id TEXT NOT NULL DEFAULT '',
|
|
owner_name TEXT NOT NULL DEFAULT '',
|
|
owner_email TEXT NOT NULL DEFAULT '',
|
|
owner_provider TEXT NOT NULL DEFAULT '',
|
|
status TEXT NOT NULL DEFAULT '',
|
|
stage TEXT NOT NULL DEFAULT '',
|
|
progress INTEGER NOT NULL DEFAULT 0,
|
|
final_video_url TEXT NOT NULL DEFAULT '',
|
|
contact_sheet_url TEXT NOT NULL DEFAULT '',
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
payload JSONB NOT NULL DEFAULT '{}'::jsonb
|
|
)
|
|
""",
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS audit_events (
|
|
id UUID PRIMARY KEY,
|
|
ts TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
user_id TEXT NOT NULL DEFAULT '',
|
|
action TEXT NOT NULL,
|
|
entity_type TEXT NOT NULL DEFAULT '',
|
|
entity_id TEXT NOT NULL DEFAULT '',
|
|
visibility TEXT NOT NULL DEFAULT '',
|
|
ip TEXT NOT NULL DEFAULT '',
|
|
user_agent TEXT NOT NULL DEFAULT '',
|
|
metadata JSONB NOT NULL DEFAULT '{}'::jsonb
|
|
)
|
|
""",
|
|
"CREATE INDEX IF NOT EXISTS idx_canvas_projects_owner_updated ON canvas_projects(owner_id, updated_at DESC) WHERE deleted_at IS NULL",
|
|
"CREATE INDEX IF NOT EXISTS idx_canvas_projects_visibility_updated ON canvas_projects(visibility, updated_at DESC) WHERE deleted_at IS NULL",
|
|
"CREATE INDEX IF NOT EXISTS idx_canvas_workflows_owner_updated ON canvas_workflows(owner_id, updated_at DESC) WHERE deleted_at IS NULL",
|
|
"CREATE INDEX IF NOT EXISTS idx_job_index_owner_updated ON job_index(owner_id, updated_at DESC)",
|
|
"CREATE INDEX IF NOT EXISTS idx_generated_assets_owner_created ON generated_assets(owner_id, created_at DESC)",
|
|
"CREATE INDEX IF NOT EXISTS idx_prompt_library_visibility ON prompt_library_index(visibility, updated_at DESC)",
|
|
"CREATE INDEX IF NOT EXISTS idx_asset_library_visibility ON asset_library_index(visibility, updated_at DESC)",
|
|
"CREATE INDEX IF NOT EXISTS idx_audit_events_user_ts ON audit_events(user_id, ts DESC)",
|
|
]
|
|
|
|
def run():
|
|
with _connect() as conn:
|
|
with conn.cursor() as cur:
|
|
for stmt in ddl:
|
|
cur.execute(stmt)
|
|
conn.commit()
|
|
return True
|
|
|
|
return bool(_execute_safely("init_schema", run))
|
|
|
|
|
|
def health() -> dict:
|
|
if not DB_ENABLED:
|
|
return {"enabled": False, "connected": False}
|
|
|
|
def run():
|
|
with _connect() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT 1 AS ok")
|
|
cur.fetchone()
|
|
return {"enabled": True, "connected": True}
|
|
|
|
return _execute_safely("health", run) or {"enabled": True, "connected": False}
|
|
|
|
|
|
def request_ip(request: Any) -> str:
|
|
if request is None:
|
|
return ""
|
|
forwarded = str(request.headers.get("x-forwarded-for") or "").split(",", 1)[0].strip()
|
|
return forwarded or getattr(getattr(request, "client", None), "host", "") or ""
|
|
|
|
|
|
def request_user_agent(request: Any) -> str:
|
|
if request is None:
|
|
return ""
|
|
return str(request.headers.get("user-agent") or "")[:600]
|
|
|
|
|
|
def upsert_user(user: dict, request: Any = None) -> None:
|
|
uid = str(user.get("uid") or "").strip()
|
|
if not uid:
|
|
return
|
|
payload = {
|
|
"username": str(user.get("username") or user.get("u") or ""),
|
|
"name": str(user.get("name") or ""),
|
|
"email": str(user.get("email") or ""),
|
|
"open_id": str(user.get("open_id") or ""),
|
|
"union_id": str(user.get("union_id") or ""),
|
|
"tenant_key": str(user.get("tenant_key") or ""),
|
|
"avatar_url": str(user.get("avatar_url") or ""),
|
|
"provider": str(user.get("provider") or ""),
|
|
}
|
|
|
|
def run():
|
|
with _connect() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO app_users (
|
|
uid, provider, username, name, email, open_id, union_id,
|
|
tenant_key, avatar_url, last_ip, last_user_agent, metadata
|
|
)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
|
ON CONFLICT (uid) DO UPDATE SET
|
|
provider = EXCLUDED.provider,
|
|
username = EXCLUDED.username,
|
|
name = EXCLUDED.name,
|
|
email = EXCLUDED.email,
|
|
open_id = EXCLUDED.open_id,
|
|
union_id = EXCLUDED.union_id,
|
|
tenant_key = EXCLUDED.tenant_key,
|
|
avatar_url = EXCLUDED.avatar_url,
|
|
last_seen_at = now(),
|
|
last_ip = EXCLUDED.last_ip,
|
|
last_user_agent = EXCLUDED.last_user_agent,
|
|
metadata = EXCLUDED.metadata
|
|
""",
|
|
(
|
|
uid,
|
|
payload["provider"],
|
|
payload["username"],
|
|
payload["name"],
|
|
payload["email"],
|
|
payload["open_id"],
|
|
payload["union_id"],
|
|
payload["tenant_key"],
|
|
payload["avatar_url"],
|
|
request_ip(request),
|
|
request_user_agent(request),
|
|
_json(payload),
|
|
),
|
|
)
|
|
conn.commit()
|
|
|
|
_execute_safely("upsert_user", run)
|
|
|
|
|
|
def audit(user: dict | None, action: str, entity_type: str = "", entity_id: str = "", metadata: dict | None = None, request: Any = None, visibility: str = "") -> None:
|
|
def run():
|
|
with _connect() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO audit_events (id, user_id, action, entity_type, entity_id, visibility, ip, user_agent, metadata)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
|
""",
|
|
(
|
|
str(uuid.uuid4()),
|
|
str((user or {}).get("uid") or ""),
|
|
action,
|
|
entity_type,
|
|
entity_id,
|
|
visibility,
|
|
request_ip(request),
|
|
request_user_agent(request),
|
|
_json(metadata or {}),
|
|
),
|
|
)
|
|
conn.commit()
|
|
|
|
_execute_safely("audit", run)
|
|
|
|
|
|
def list_canvas_projects(user: dict, include_shared: bool = True) -> list[dict]:
|
|
uid = str(user.get("uid") or "")
|
|
tenant_key = str(user.get("tenant_key") or "")
|
|
|
|
def run():
|
|
with _connect() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT
|
|
p.id, p.name, p.thumbnail, p.visibility, p.canvas_data,
|
|
p.created_at, p.updated_at, p.version, p.owner_id,
|
|
u.name AS owner_name, u.email AS owner_email, u.provider AS owner_provider
|
|
FROM canvas_projects p
|
|
LEFT JOIN app_users u ON u.uid = p.owner_id
|
|
WHERE p.deleted_at IS NULL
|
|
AND (
|
|
p.owner_id = %s
|
|
OR (%s AND p.visibility = 'company')
|
|
OR (%s AND p.visibility = 'team' AND COALESCE(u.tenant_key, '') = %s)
|
|
)
|
|
ORDER BY p.updated_at DESC
|
|
LIMIT 500
|
|
""",
|
|
(uid, include_shared, bool(tenant_key), tenant_key),
|
|
)
|
|
rows = cur.fetchall()
|
|
return [dict(row) for row in rows]
|
|
|
|
return _execute_safely("list_canvas_projects", run) or []
|
|
|
|
|
|
def get_canvas_project(project_id: str, user: dict) -> dict | None:
|
|
uid = str(user.get("uid") or "")
|
|
tenant_key = str(user.get("tenant_key") or "")
|
|
|
|
def run():
|
|
with _connect() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT p.*, u.tenant_key AS owner_tenant_key
|
|
FROM canvas_projects p
|
|
LEFT JOIN app_users u ON u.uid = p.owner_id
|
|
WHERE p.id = %s AND p.deleted_at IS NULL
|
|
""",
|
|
(project_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
return None
|
|
if row["owner_id"] == uid or row["visibility"] == "company" or (row["visibility"] == "team" and tenant_key and row["owner_tenant_key"] == tenant_key):
|
|
return dict(row)
|
|
return None
|
|
|
|
return _execute_safely("get_canvas_project", run)
|
|
|
|
|
|
def upsert_canvas_project(user: dict, project: dict) -> dict | None:
|
|
uid = str(user.get("uid") or "")
|
|
if not uid:
|
|
return None
|
|
project_id = str(project.get("id") or "").strip()
|
|
if not project_id:
|
|
project_id = f"project_{int(time.time() * 1000)}_{uuid.uuid4().hex[:9]}"
|
|
name = str(project.get("name") or "未命名项目").strip() or "未命名项目"
|
|
thumbnail = str(project.get("thumbnail") or "")
|
|
visibility = str(project.get("visibility") or "private").strip()
|
|
if visibility not in {"private", "team", "company"}:
|
|
visibility = "private"
|
|
canvas_data = project.get("canvas_data") or project.get("canvasData") or {"nodes": [], "edges": [], "viewport": {"x": 100, "y": 50, "zoom": 0.8}}
|
|
created_at = _dt(project.get("created_at") or project.get("createdAt"))
|
|
updated_at = _dt(project.get("updated_at") or project.get("updatedAt"))
|
|
|
|
def run():
|
|
with _connect() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO canvas_projects (
|
|
id, owner_id, name, thumbnail, visibility, canvas_data,
|
|
created_at, updated_at, version, source, metadata
|
|
)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,1,%s,%s)
|
|
ON CONFLICT (id) DO UPDATE SET
|
|
name = CASE
|
|
WHEN canvas_projects.owner_id = EXCLUDED.owner_id AND EXCLUDED.updated_at >= canvas_projects.updated_at THEN EXCLUDED.name
|
|
ELSE canvas_projects.name
|
|
END,
|
|
thumbnail = CASE
|
|
WHEN canvas_projects.owner_id = EXCLUDED.owner_id AND EXCLUDED.updated_at >= canvas_projects.updated_at THEN EXCLUDED.thumbnail
|
|
ELSE canvas_projects.thumbnail
|
|
END,
|
|
visibility = CASE
|
|
WHEN canvas_projects.owner_id = EXCLUDED.owner_id AND EXCLUDED.updated_at >= canvas_projects.updated_at THEN EXCLUDED.visibility
|
|
ELSE canvas_projects.visibility
|
|
END,
|
|
canvas_data = CASE
|
|
WHEN canvas_projects.owner_id = EXCLUDED.owner_id AND EXCLUDED.updated_at >= canvas_projects.updated_at THEN EXCLUDED.canvas_data
|
|
ELSE canvas_projects.canvas_data
|
|
END,
|
|
updated_at = CASE
|
|
WHEN canvas_projects.owner_id = EXCLUDED.owner_id THEN GREATEST(canvas_projects.updated_at, EXCLUDED.updated_at)
|
|
ELSE canvas_projects.updated_at
|
|
END,
|
|
version = CASE
|
|
WHEN canvas_projects.owner_id = EXCLUDED.owner_id AND EXCLUDED.updated_at >= canvas_projects.updated_at THEN canvas_projects.version + 1
|
|
ELSE canvas_projects.version
|
|
END,
|
|
deleted_at = CASE
|
|
WHEN canvas_projects.owner_id = EXCLUDED.owner_id AND EXCLUDED.updated_at >= canvas_projects.updated_at THEN NULL
|
|
ELSE canvas_projects.deleted_at
|
|
END
|
|
WHERE canvas_projects.owner_id = EXCLUDED.owner_id
|
|
RETURNING id, name, thumbnail, visibility, canvas_data, created_at, updated_at, version, owner_id
|
|
""",
|
|
(
|
|
project_id,
|
|
uid,
|
|
name,
|
|
thumbnail,
|
|
visibility,
|
|
_json(canvas_data),
|
|
created_at,
|
|
updated_at,
|
|
str(project.get("source") or "canvas"),
|
|
_json({"migrated_from": project.get("source") or "canvas"}),
|
|
),
|
|
)
|
|
row = cur.fetchone()
|
|
conn.commit()
|
|
return dict(row) if row else None
|
|
|
|
return _execute_safely("upsert_canvas_project", run)
|
|
|
|
|
|
def soft_delete_canvas_project(user: dict, project_id: str) -> bool:
|
|
uid = str(user.get("uid") or "")
|
|
|
|
def run():
|
|
with _connect() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
UPDATE canvas_projects
|
|
SET deleted_at = now(), updated_at = now(), version = version + 1
|
|
WHERE id = %s AND owner_id = %s AND deleted_at IS NULL
|
|
""",
|
|
(project_id, uid),
|
|
)
|
|
changed = cur.rowcount > 0
|
|
conn.commit()
|
|
return changed
|
|
|
|
return bool(_execute_safely("soft_delete_canvas_project", run))
|
|
|
|
|
|
def list_canvas_workflows(user: dict) -> list[dict]:
|
|
uid = str(user.get("uid") or "")
|
|
|
|
def run():
|
|
with _connect() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT
|
|
w.id, w.name, w.description, w.thumbnail, w.workflow_data,
|
|
w.created_at, w.updated_at, w.version, w.owner_id,
|
|
u.name AS owner_name, u.email AS owner_email, u.provider AS owner_provider
|
|
FROM canvas_workflows w
|
|
LEFT JOIN app_users u ON u.uid = w.owner_id
|
|
WHERE w.deleted_at IS NULL
|
|
AND w.owner_id = %s
|
|
ORDER BY w.updated_at DESC
|
|
LIMIT 500
|
|
""",
|
|
(uid,),
|
|
)
|
|
rows = cur.fetchall()
|
|
return [dict(row) for row in rows]
|
|
|
|
return _execute_safely("list_canvas_workflows", run) or []
|
|
|
|
|
|
def upsert_canvas_workflow(user: dict, workflow: dict) -> dict | None:
|
|
uid = str(user.get("uid") or "")
|
|
if not uid:
|
|
return None
|
|
workflow_id = str(workflow.get("id") or "").strip()
|
|
if not workflow_id:
|
|
workflow_id = f"workflow_{int(time.time() * 1000)}_{uuid.uuid4().hex[:9]}"
|
|
name = str(workflow.get("name") or "未命名工作流").strip() or "未命名工作流"
|
|
description = str(workflow.get("description") or "").strip()
|
|
thumbnail = str(workflow.get("thumbnail") or "")
|
|
workflow_data = workflow.get("workflow_data") or workflow.get("workflowData") or {"nodes": [], "edges": [], "viewport": {"x": 100, "y": 50, "zoom": 0.8}}
|
|
created_at = _dt(workflow.get("created_at") or workflow.get("createdAt"))
|
|
updated_at = _dt(workflow.get("updated_at") or workflow.get("updatedAt"))
|
|
|
|
def run():
|
|
with _connect() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO canvas_workflows (
|
|
id, owner_id, name, description, thumbnail, workflow_data,
|
|
created_at, updated_at, version, source, metadata
|
|
)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,1,%s,%s)
|
|
ON CONFLICT (id) DO UPDATE SET
|
|
name = CASE
|
|
WHEN canvas_workflows.owner_id = EXCLUDED.owner_id THEN EXCLUDED.name
|
|
ELSE canvas_workflows.name
|
|
END,
|
|
description = CASE
|
|
WHEN canvas_workflows.owner_id = EXCLUDED.owner_id THEN EXCLUDED.description
|
|
ELSE canvas_workflows.description
|
|
END,
|
|
thumbnail = CASE
|
|
WHEN canvas_workflows.owner_id = EXCLUDED.owner_id THEN EXCLUDED.thumbnail
|
|
ELSE canvas_workflows.thumbnail
|
|
END,
|
|
workflow_data = CASE
|
|
WHEN canvas_workflows.owner_id = EXCLUDED.owner_id THEN EXCLUDED.workflow_data
|
|
ELSE canvas_workflows.workflow_data
|
|
END,
|
|
updated_at = CASE
|
|
WHEN canvas_workflows.owner_id = EXCLUDED.owner_id THEN EXCLUDED.updated_at
|
|
ELSE canvas_workflows.updated_at
|
|
END,
|
|
version = CASE
|
|
WHEN canvas_workflows.owner_id = EXCLUDED.owner_id THEN canvas_workflows.version + 1
|
|
ELSE canvas_workflows.version
|
|
END,
|
|
deleted_at = CASE
|
|
WHEN canvas_workflows.owner_id = EXCLUDED.owner_id THEN NULL
|
|
ELSE canvas_workflows.deleted_at
|
|
END
|
|
WHERE canvas_workflows.owner_id = EXCLUDED.owner_id
|
|
RETURNING id, name, description, thumbnail, workflow_data, created_at, updated_at, version, owner_id
|
|
""",
|
|
(
|
|
workflow_id,
|
|
uid,
|
|
name,
|
|
description,
|
|
thumbnail,
|
|
_json(workflow_data),
|
|
created_at,
|
|
updated_at,
|
|
str(workflow.get("source") or "canvas"),
|
|
_json({"source_project_id": workflow.get("source_project_id") or workflow.get("sourceProjectId") or ""}),
|
|
),
|
|
)
|
|
row = cur.fetchone()
|
|
conn.commit()
|
|
return dict(row) if row else None
|
|
|
|
return _execute_safely("upsert_canvas_workflow", run)
|
|
|
|
|
|
def soft_delete_canvas_workflow(user: dict, workflow_id: str) -> bool:
|
|
uid = str(user.get("uid") or "")
|
|
|
|
def run():
|
|
with _connect() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
UPDATE canvas_workflows
|
|
SET deleted_at = now(), updated_at = now(), version = version + 1
|
|
WHERE id = %s AND owner_id = %s AND deleted_at IS NULL
|
|
""",
|
|
(workflow_id, uid),
|
|
)
|
|
changed = cur.rowcount > 0
|
|
conn.commit()
|
|
return changed
|
|
|
|
return bool(_execute_safely("soft_delete_canvas_workflow", run))
|
|
|
|
|
|
def index_job(job: dict, state_path: str = "") -> None:
|
|
job_id = str(job.get("id") or "")
|
|
if not job_id:
|
|
return
|
|
frames = job.get("frames") or []
|
|
generated_videos = job.get("generated_videos") or []
|
|
thumbnail = ""
|
|
if frames:
|
|
first = frames[0] if isinstance(frames[0], dict) else {}
|
|
thumbnail = str(first.get("url") or "")
|
|
updated_at = _dt()
|
|
|
|
def run():
|
|
with _connect() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO job_index (
|
|
job_id, owner_id, owner_name, owner_email, owner_provider, tenant_key,
|
|
url, status, progress, message, job_kind, width, height, duration,
|
|
frame_count, video_count, thumbnail, state_path, updated_at, payload
|
|
)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
|
ON CONFLICT (job_id) DO UPDATE SET
|
|
owner_id = EXCLUDED.owner_id,
|
|
owner_name = EXCLUDED.owner_name,
|
|
owner_email = EXCLUDED.owner_email,
|
|
owner_provider = EXCLUDED.owner_provider,
|
|
tenant_key = EXCLUDED.tenant_key,
|
|
url = EXCLUDED.url,
|
|
status = EXCLUDED.status,
|
|
progress = EXCLUDED.progress,
|
|
message = EXCLUDED.message,
|
|
job_kind = EXCLUDED.job_kind,
|
|
width = EXCLUDED.width,
|
|
height = EXCLUDED.height,
|
|
duration = EXCLUDED.duration,
|
|
frame_count = EXCLUDED.frame_count,
|
|
video_count = EXCLUDED.video_count,
|
|
thumbnail = EXCLUDED.thumbnail,
|
|
state_path = EXCLUDED.state_path,
|
|
updated_at = EXCLUDED.updated_at,
|
|
last_synced_at = now(),
|
|
payload = EXCLUDED.payload
|
|
""",
|
|
(
|
|
job_id,
|
|
str(job.get("owner_id") or ""),
|
|
str(job.get("owner_name") or ""),
|
|
str(job.get("owner_email") or ""),
|
|
str(job.get("owner_provider") or ""),
|
|
str(job.get("tenant_key") or ""),
|
|
str(job.get("url") or ""),
|
|
str(job.get("status") or ""),
|
|
int(job.get("progress") or 0),
|
|
str(job.get("message") or "")[:1000],
|
|
str(job.get("url") or "").split("://", 1)[0] or "job",
|
|
int(job.get("width") or 0),
|
|
int(job.get("height") or 0),
|
|
float(job.get("duration") or 0),
|
|
len(frames),
|
|
len(generated_videos),
|
|
thumbnail,
|
|
state_path,
|
|
updated_at,
|
|
_json(job),
|
|
),
|
|
)
|
|
for frame in frames:
|
|
if not isinstance(frame, dict):
|
|
continue
|
|
frame_idx = frame.get("index", 0)
|
|
for image in frame.get("generated_images") or []:
|
|
if not isinstance(image, dict):
|
|
continue
|
|
asset_key = f"{job_id}:image:{image.get('id')}"
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO generated_assets (
|
|
asset_key, asset_id, job_id, owner_id, kind, status, url,
|
|
model, prompt, created_at, updated_at, metadata
|
|
)
|
|
VALUES (%s,%s,%s,%s,'image','completed',%s,%s,%s,%s,%s,%s)
|
|
ON CONFLICT (asset_key) DO UPDATE SET
|
|
status = EXCLUDED.status,
|
|
url = EXCLUDED.url,
|
|
model = EXCLUDED.model,
|
|
prompt = EXCLUDED.prompt,
|
|
updated_at = EXCLUDED.updated_at,
|
|
metadata = EXCLUDED.metadata
|
|
""",
|
|
(
|
|
asset_key,
|
|
str(image.get("id") or ""),
|
|
job_id,
|
|
str(job.get("owner_id") or ""),
|
|
str(image.get("url") or ""),
|
|
str(image.get("model") or ""),
|
|
str(image.get("prompt") or ""),
|
|
_dt(image.get("created_at")),
|
|
updated_at,
|
|
_json({"frame_idx": frame_idx, **image}),
|
|
),
|
|
)
|
|
for video in generated_videos:
|
|
if not isinstance(video, dict):
|
|
continue
|
|
asset_key = f"{job_id}:video:{video.get('id')}"
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO generated_assets (
|
|
asset_key, asset_id, job_id, owner_id, kind, status, url,
|
|
model, prompt, duration, created_at, updated_at, metadata
|
|
)
|
|
VALUES (%s,%s,%s,%s,'video',%s,%s,%s,%s,%s,%s,%s,%s)
|
|
ON CONFLICT (asset_key) DO UPDATE SET
|
|
status = EXCLUDED.status,
|
|
url = EXCLUDED.url,
|
|
model = EXCLUDED.model,
|
|
prompt = EXCLUDED.prompt,
|
|
duration = EXCLUDED.duration,
|
|
updated_at = EXCLUDED.updated_at,
|
|
metadata = EXCLUDED.metadata
|
|
""",
|
|
(
|
|
asset_key,
|
|
str(video.get("id") or ""),
|
|
job_id,
|
|
str(job.get("owner_id") or ""),
|
|
str(video.get("status") or ""),
|
|
str(video.get("url") or ""),
|
|
str(video.get("model") or ""),
|
|
str(video.get("prompt") or ""),
|
|
float(video.get("duration") or 0),
|
|
_dt(video.get("created_at")),
|
|
updated_at,
|
|
_json(video),
|
|
),
|
|
)
|
|
conn.commit()
|
|
|
|
_execute_safely("index_job", run)
|
|
|
|
|
|
def index_agent_run(run_payload: dict) -> None:
|
|
run_id = str(run_payload.get("id") or "")
|
|
if not run_id:
|
|
return
|
|
|
|
def run():
|
|
with _connect() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO agent_run_index (
|
|
run_id, job_id, owner_id, owner_name, owner_email, owner_provider,
|
|
status, stage, progress, final_video_url, contact_sheet_url,
|
|
created_at, updated_at, payload
|
|
)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
|
ON CONFLICT (run_id) DO UPDATE SET
|
|
job_id = EXCLUDED.job_id,
|
|
owner_id = EXCLUDED.owner_id,
|
|
owner_name = EXCLUDED.owner_name,
|
|
owner_email = EXCLUDED.owner_email,
|
|
owner_provider = EXCLUDED.owner_provider,
|
|
status = EXCLUDED.status,
|
|
stage = EXCLUDED.stage,
|
|
progress = EXCLUDED.progress,
|
|
final_video_url = EXCLUDED.final_video_url,
|
|
contact_sheet_url = EXCLUDED.contact_sheet_url,
|
|
updated_at = EXCLUDED.updated_at,
|
|
payload = EXCLUDED.payload
|
|
""",
|
|
(
|
|
run_id,
|
|
str(run_payload.get("job_id") or ""),
|
|
str(run_payload.get("owner_id") or ""),
|
|
str(run_payload.get("owner_name") or ""),
|
|
str(run_payload.get("owner_email") or ""),
|
|
str(run_payload.get("owner_provider") or ""),
|
|
str(run_payload.get("status") or ""),
|
|
str(run_payload.get("stage") or ""),
|
|
int(run_payload.get("progress") or 0),
|
|
str(run_payload.get("final_video_url") or ""),
|
|
str(run_payload.get("contact_sheet_url") or ""),
|
|
_dt(run_payload.get("created_at")),
|
|
_dt(run_payload.get("updated_at")),
|
|
_json(run_payload),
|
|
),
|
|
)
|
|
conn.commit()
|
|
|
|
_execute_safely("index_agent_run", run)
|
|
|
|
|
|
def index_prompt_item(item: dict, owner_id: str = "") -> None:
|
|
item_id = str(item.get("id") or "")
|
|
if not item_id:
|
|
return
|
|
|
|
def run():
|
|
with _connect() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO prompt_library_index (
|
|
item_id, owner_id, category, name, tags, source_job_id,
|
|
created_at, updated_at, payload
|
|
)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
|
ON CONFLICT (item_id) DO UPDATE SET
|
|
owner_id = EXCLUDED.owner_id,
|
|
category = EXCLUDED.category,
|
|
name = EXCLUDED.name,
|
|
tags = EXCLUDED.tags,
|
|
source_job_id = EXCLUDED.source_job_id,
|
|
updated_at = EXCLUDED.updated_at,
|
|
payload = EXCLUDED.payload
|
|
""",
|
|
(
|
|
item_id,
|
|
owner_id,
|
|
str(item.get("category") or ""),
|
|
str(item.get("name") or ""),
|
|
_json(item.get("tags") or []),
|
|
str(item.get("source_job_id") or ""),
|
|
_dt(item.get("created_at")),
|
|
_dt(item.get("updated_at") or item.get("created_at")),
|
|
_json(item),
|
|
),
|
|
)
|
|
conn.commit()
|
|
|
|
_execute_safely("index_prompt_item", run)
|
|
|
|
|
|
def index_asset_item(item: dict, owner_id: str = "") -> None:
|
|
item_id = str(item.get("id") or "")
|
|
kind = str(item.get("kind") or "")
|
|
if not item_id or not kind:
|
|
return
|
|
item_key = f"{kind}:{item_id}"
|
|
|
|
def run():
|
|
with _connect() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO asset_library_index (
|
|
item_key, item_id, owner_id, kind, name, tags, source_job_id,
|
|
created_at, updated_at, payload
|
|
)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
|
ON CONFLICT (item_key) DO UPDATE SET
|
|
owner_id = EXCLUDED.owner_id,
|
|
kind = EXCLUDED.kind,
|
|
name = EXCLUDED.name,
|
|
tags = EXCLUDED.tags,
|
|
source_job_id = EXCLUDED.source_job_id,
|
|
updated_at = EXCLUDED.updated_at,
|
|
payload = EXCLUDED.payload
|
|
""",
|
|
(
|
|
item_key,
|
|
item_id,
|
|
owner_id,
|
|
kind,
|
|
str(item.get("name") or ""),
|
|
_json(item.get("tags") or []),
|
|
str(item.get("source_job_id") or ""),
|
|
_dt(item.get("created_at")),
|
|
_dt(item.get("updated_at") or item.get("created_at")),
|
|
_json(item),
|
|
),
|
|
)
|
|
conn.commit()
|
|
|
|
_execute_safely("index_asset_item", run)
|