feat: add personal canvas workflows

This commit is contained in:
2026-05-26 11:18:28 +08:00
parent bbd1f08f7c
commit 5290812353
7 changed files with 698 additions and 33 deletions

142
api/db.py
View File

@@ -95,6 +95,22 @@ def init_schema() -> bool:
)
""",
"""
CREATE TABLE IF NOT EXISTS canvas_workflows (
id TEXT PRIMARY KEY,
owner_id TEXT NOT NULL REFERENCES app_users(uid) ON DELETE CASCADE,
name TEXT NOT NULL DEFAULT '',
description TEXT NOT NULL DEFAULT '',
thumbnail TEXT NOT NULL DEFAULT '',
workflow_data JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
deleted_at TIMESTAMPTZ,
version INTEGER NOT NULL DEFAULT 1,
source TEXT NOT NULL DEFAULT 'canvas',
metadata JSONB NOT NULL DEFAULT '{}'::jsonb
)
""",
"""
CREATE TABLE IF NOT EXISTS job_index (
job_id TEXT PRIMARY KEY,
owner_id TEXT NOT NULL DEFAULT '',
@@ -202,6 +218,7 @@ def init_schema() -> bool:
""",
"CREATE INDEX IF NOT EXISTS idx_canvas_projects_owner_updated ON canvas_projects(owner_id, updated_at DESC) WHERE deleted_at IS NULL",
"CREATE INDEX IF NOT EXISTS idx_canvas_projects_visibility_updated ON canvas_projects(visibility, updated_at DESC) WHERE deleted_at IS NULL",
"CREATE INDEX IF NOT EXISTS idx_canvas_workflows_owner_updated ON canvas_workflows(owner_id, updated_at DESC) WHERE deleted_at IS NULL",
"CREATE INDEX IF NOT EXISTS idx_job_index_owner_updated ON job_index(owner_id, updated_at DESC)",
"CREATE INDEX IF NOT EXISTS idx_generated_assets_owner_created ON generated_assets(owner_id, created_at DESC)",
"CREATE INDEX IF NOT EXISTS idx_prompt_library_visibility ON prompt_library_index(visibility, updated_at DESC)",
@@ -445,6 +462,7 @@ def upsert_canvas_project(user: dict, project: dict) -> dict | None:
WHEN canvas_projects.owner_id = EXCLUDED.owner_id AND EXCLUDED.updated_at >= canvas_projects.updated_at THEN NULL
ELSE canvas_projects.deleted_at
END
WHERE canvas_projects.owner_id = EXCLUDED.owner_id
RETURNING id, name, thumbnail, visibility, canvas_data, created_at, updated_at, version, owner_id
""",
(
@@ -488,6 +506,130 @@ def soft_delete_canvas_project(user: dict, project_id: str) -> bool:
return bool(_execute_safely("soft_delete_canvas_project", run))
def list_canvas_workflows(user: dict) -> list[dict]:
uid = str(user.get("uid") or "")
def run():
with _connect() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT
w.id, w.name, w.description, w.thumbnail, w.workflow_data,
w.created_at, w.updated_at, w.version, w.owner_id,
u.name AS owner_name, u.email AS owner_email, u.provider AS owner_provider
FROM canvas_workflows w
LEFT JOIN app_users u ON u.uid = w.owner_id
WHERE w.deleted_at IS NULL
AND w.owner_id = %s
ORDER BY w.updated_at DESC
LIMIT 500
""",
(uid,),
)
rows = cur.fetchall()
return [dict(row) for row in rows]
return _execute_safely("list_canvas_workflows", run) or []
def upsert_canvas_workflow(user: dict, workflow: dict) -> dict | None:
uid = str(user.get("uid") or "")
if not uid:
return None
workflow_id = str(workflow.get("id") or "").strip()
if not workflow_id:
workflow_id = f"workflow_{int(time.time() * 1000)}_{uuid.uuid4().hex[:9]}"
name = str(workflow.get("name") or "未命名工作流").strip() or "未命名工作流"
description = str(workflow.get("description") or "").strip()
thumbnail = str(workflow.get("thumbnail") or "")
workflow_data = workflow.get("workflow_data") or workflow.get("workflowData") or {"nodes": [], "edges": [], "viewport": {"x": 100, "y": 50, "zoom": 0.8}}
created_at = _dt(workflow.get("created_at") or workflow.get("createdAt"))
updated_at = _dt(workflow.get("updated_at") or workflow.get("updatedAt"))
def run():
with _connect() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO canvas_workflows (
id, owner_id, name, description, thumbnail, workflow_data,
created_at, updated_at, version, source, metadata
)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,1,%s,%s)
ON CONFLICT (id) DO UPDATE SET
name = CASE
WHEN canvas_workflows.owner_id = EXCLUDED.owner_id THEN EXCLUDED.name
ELSE canvas_workflows.name
END,
description = CASE
WHEN canvas_workflows.owner_id = EXCLUDED.owner_id THEN EXCLUDED.description
ELSE canvas_workflows.description
END,
thumbnail = CASE
WHEN canvas_workflows.owner_id = EXCLUDED.owner_id THEN EXCLUDED.thumbnail
ELSE canvas_workflows.thumbnail
END,
workflow_data = CASE
WHEN canvas_workflows.owner_id = EXCLUDED.owner_id THEN EXCLUDED.workflow_data
ELSE canvas_workflows.workflow_data
END,
updated_at = CASE
WHEN canvas_workflows.owner_id = EXCLUDED.owner_id THEN EXCLUDED.updated_at
ELSE canvas_workflows.updated_at
END,
version = CASE
WHEN canvas_workflows.owner_id = EXCLUDED.owner_id THEN canvas_workflows.version + 1
ELSE canvas_workflows.version
END,
deleted_at = CASE
WHEN canvas_workflows.owner_id = EXCLUDED.owner_id THEN NULL
ELSE canvas_workflows.deleted_at
END
WHERE canvas_workflows.owner_id = EXCLUDED.owner_id
RETURNING id, name, description, thumbnail, workflow_data, created_at, updated_at, version, owner_id
""",
(
workflow_id,
uid,
name,
description,
thumbnail,
_json(workflow_data),
created_at,
updated_at,
str(workflow.get("source") or "canvas"),
_json({"source_project_id": workflow.get("source_project_id") or workflow.get("sourceProjectId") or ""}),
),
)
row = cur.fetchone()
conn.commit()
return dict(row) if row else None
return _execute_safely("upsert_canvas_workflow", run)
def soft_delete_canvas_workflow(user: dict, workflow_id: str) -> bool:
uid = str(user.get("uid") or "")
def run():
with _connect() as conn:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE canvas_workflows
SET deleted_at = now(), updated_at = now(), version = version + 1
WHERE id = %s AND owner_id = %s AND deleted_at IS NULL
""",
(workflow_id, uid),
)
changed = cur.rowcount > 0
conn.commit()
return changed
return bool(_execute_safely("soft_delete_canvas_workflow", run))
def index_job(job: dict, state_path: str = "") -> None:
job_id = str(job.get("id") or "")
if not job_id:

View File

@@ -2189,6 +2189,18 @@ class CanvasProjectImportReq(BaseModel):
projects: list[CanvasProjectWriteReq] = Field(default_factory=list)
class CanvasWorkflowWriteReq(BaseModel):
id: str = ""
name: str = "未命名工作流"
description: str = ""
thumbnail: str = ""
workflow_data: dict = Field(default_factory=dict)
created_at: float = 0.0
updated_at: float = 0.0
source: str = "canvas"
source_project_id: str = ""
def _ts(value) -> float:
if hasattr(value, "timestamp"):
return float(value.timestamp())
@@ -2220,6 +2232,23 @@ def _canvas_project_public(row: dict) -> dict:
}
def _canvas_workflow_public(row: dict) -> dict:
return {
"id": str(row.get("id") or ""),
"name": str(row.get("name") or ""),
"description": str(row.get("description") or ""),
"thumbnail": str(row.get("thumbnail") or ""),
"workflow_data": row.get("workflow_data") or {},
"created_at": _ts(row.get("created_at")),
"updated_at": _ts(row.get("updated_at")),
"version": int(row.get("version") or 1),
"owner_id": str(row.get("owner_id") or ""),
"owner_name": str(row.get("owner_name") or ""),
"owner_email": str(row.get("owner_email") or ""),
"owner_provider": str(row.get("owner_provider") or ""),
}
@app.get("/canvas-projects")
def list_canvas_projects(request: Request) -> dict:
_require_db()
@@ -2239,6 +2268,8 @@ def create_canvas_project(req: CanvasProjectWriteReq, request: Request) -> dict:
row = db.upsert_canvas_project(user, req.model_dump())
if not row:
raise HTTPException(500, "canvas project save failed")
if str(row.get("owner_id") or "") != _session_user_id(user):
raise HTTPException(403, "canvas project belongs to another user")
db.audit(user, "canvas_project.create", "canvas_project", str(row.get("id") or ""), req.model_dump(exclude={"canvas_data"}), request, str(row.get("visibility") or "private"))
return {"ok": True, "item": _canvas_project_public(row)}
@@ -2296,6 +2327,58 @@ def import_canvas_projects(req: CanvasProjectImportReq, request: Request) -> dic
return {"ok": True, "items": imported}
@app.get("/canvas-workflows")
def list_canvas_workflows(request: Request) -> dict:
_require_db()
user = data_user_from_request(request)
db.upsert_user(user, request)
return {
"ok": True,
"items": [_canvas_workflow_public(row) for row in db.list_canvas_workflows(user)],
}
@app.post("/canvas-workflows")
def create_canvas_workflow(req: CanvasWorkflowWriteReq, request: Request) -> dict:
_require_db()
user = data_user_from_request(request)
db.upsert_user(user, request)
row = db.upsert_canvas_workflow(user, req.model_dump())
if not row:
raise HTTPException(500, "canvas workflow save failed")
if str(row.get("owner_id") or "") != _session_user_id(user):
raise HTTPException(403, "canvas workflow belongs to another user")
db.audit(user, "canvas_workflow.create", "canvas_workflow", str(row.get("id") or ""), {"name": req.name, "source_project_id": req.source_project_id}, request, "private")
return {"ok": True, "item": _canvas_workflow_public(row)}
@app.put("/canvas-workflows/{workflow_id}")
def put_canvas_workflow(workflow_id: str, req: CanvasWorkflowWriteReq, request: Request) -> dict:
_require_db()
user = data_user_from_request(request)
db.upsert_user(user, request)
payload = req.model_dump()
payload["id"] = workflow_id
row = db.upsert_canvas_workflow(user, payload)
if not row:
raise HTTPException(500, "canvas workflow save failed")
if str(row.get("owner_id") or "") != _session_user_id(user):
raise HTTPException(403, "canvas workflow belongs to another user")
db.audit(user, "canvas_workflow.save", "canvas_workflow", workflow_id, {"name": req.name}, request, "private")
return {"ok": True, "item": _canvas_workflow_public(row)}
@app.delete("/canvas-workflows/{workflow_id}")
def delete_canvas_workflow(workflow_id: str, request: Request) -> dict:
_require_db()
user = data_user_from_request(request)
ok = db.soft_delete_canvas_workflow(user, workflow_id)
if not ok:
raise HTTPException(404, "canvas workflow not found")
db.audit(user, "canvas_workflow.delete", "canvas_workflow", workflow_id, request=request, visibility="private")
return {"ok": True, "id": workflow_id}
def _parse_library_metadata(raw: str) -> dict:
if not raw.strip():
return {}