auto-save 2026-05-11 16:59 (~3)

This commit is contained in:
2026-05-11 16:59:20 +08:00
parent 48474dc4d6
commit 7a150ae74c
3 changed files with 621 additions and 54 deletions

View File

@@ -27,6 +27,7 @@ from typing import Any
FEISHU_API_BASE = "https://open.feishu.cn/open-apis"
APP_ID_RE = re.compile(r"^cli_[A-Za-z0-9]+$")
UI_ID_RE = re.compile(r"^[A-Za-z0-9_-]{1,80}$")
def _env(name: str, default: str = "") -> str:
@@ -616,12 +617,32 @@ def normalize_mcp_yaml(value: str) -> str:
return "mcp_servers:\n" + indented
def validate_hermes_config_payload(body: dict[str, Any]) -> dict[str, str]:
def validate_hermes_config_payload(body: dict[str, Any], current: dict[str, Any] | None = None) -> dict[str, str]:
current_model = (current or {}).get("model") if isinstance((current or {}).get("model"), dict) else {}
model = body.get("model") if isinstance(body.get("model"), dict) else {}
default_model = str(model.get("default") or body.get("model_default") or "").strip()
provider = str(model.get("provider") or body.get("provider") or "openrouter").strip()
base_url = str(model.get("base_url") or body.get("base_url") or "").strip()
mcp_servers_yaml = normalize_mcp_yaml(str(body.get("mcp_servers_yaml") or ""))
default_model = str(
model.get("default")
or body.get("model_default")
or current_model.get("default")
or current_model.get("model")
or ""
).strip()
provider = str(
model.get("provider")
or body.get("provider")
or current_model.get("provider")
or "openrouter"
).strip()
base_url = str(
model.get("base_url")
or body.get("base_url")
or current_model.get("base_url")
or ""
).strip()
if "mcp_servers_yaml" in body:
mcp_servers_yaml = normalize_mcp_yaml(str(body.get("mcp_servers_yaml") or ""))
else:
mcp_servers_yaml = str((current or {}).get("mcp_servers_yaml") or "")
for label, value, limit in (
("model.default", default_model, 180),
@@ -650,7 +671,8 @@ def validate_hermes_config_payload(body: dict[str, Any]) -> dict[str, str]:
def write_hermes_runtime_config(body: dict[str, Any]) -> dict[str, Any]:
payload = validate_hermes_config_payload(body)
current = read_hermes_runtime_config()
payload = validate_hermes_config_payload(body, current)
script = r'''
import json
import pathlib
@@ -761,6 +783,232 @@ def handle_hermes_config_post(headers: dict[str, str], body: dict[str, Any]) ->
return 200, {"code": 0, "msg": "ok", "config": config}
def one_line(value: Any, limit: int, default: str = "") -> str:
text = str(value if value is not None else default).strip()
text = text.replace("\r", " ").replace("\n", " ").strip()
if len(text) > limit:
text = text[:limit].rstrip()
return text
def string_list(value: Any, limit: int = 200) -> list[str]:
if not isinstance(value, list):
return []
out: list[str] = []
for item in value[:limit]:
text = one_line(item, 180)
if text:
out.append(text)
return out
def now_ms() -> int:
return int(time.time() * 1000)
def runtime_model_profile(runtime: dict[str, Any]) -> dict[str, Any]:
model = runtime.get("model") if isinstance(runtime.get("model"), dict) else {}
model_id = one_line(model.get("default") or Config.hermes_model or "gemini-3-pro-preview", 180)
provider = one_line(model.get("provider") or "openrouter", 80)
base_url = one_line(model.get("base_url") or "", 220)
created_at = now_ms()
return {
"id": "runtime-default",
"name": "线上默认模型",
"provider": provider,
"model": model_id,
"baseUrl": base_url,
"apiKeyRef": "服务器环境变量",
"enabled": True,
"isDefault": True,
"createdAt": created_at,
"updatedAt": created_at,
}
def normalize_model_profile(raw: Any, fallback: dict[str, Any], index: int) -> dict[str, Any] | None:
if not isinstance(raw, dict):
return None
profile_id = one_line(raw.get("id"), 80)
if not profile_id or not UI_ID_RE.match(profile_id):
profile_id = f"model_{index + 1}"
name = one_line(raw.get("name"), 80, fallback.get("name") or "模型接入")
model = one_line(raw.get("model"), 180, fallback.get("model") or "")
provider = one_line(raw.get("provider"), 80, fallback.get("provider") or "openrouter")
base_url = one_line(raw.get("baseUrl") or raw.get("base_url"), 220, fallback.get("baseUrl") or "")
api_key_ref = one_line(raw.get("apiKeyRef") or raw.get("api_key_ref"), 120, fallback.get("apiKeyRef") or "")
if not model:
return None
created_at = raw.get("createdAt") if isinstance(raw.get("createdAt"), (int, float)) else now_ms()
updated_at = raw.get("updatedAt") if isinstance(raw.get("updatedAt"), (int, float)) else now_ms()
return {
"id": profile_id,
"name": name or model,
"provider": provider,
"model": model,
"baseUrl": base_url,
"apiKeyRef": api_key_ref,
"enabled": bool(raw.get("enabled", True)),
"isDefault": bool(raw.get("isDefault", False)),
"createdAt": int(created_at),
"updatedAt": int(updated_at),
}
def normalize_stages(value: Any) -> dict[str, list[str]]:
value = value if isinstance(value, dict) else {}
return {
"pre": string_list(value.get("pre")),
"exec": string_list(value.get("exec")),
"post": string_list(value.get("post")),
}
def normalize_agent(raw: Any, index: int) -> dict[str, Any] | None:
if not isinstance(raw, dict):
return None
agent_id = one_line(raw.get("id"), 80)
if not agent_id or not UI_ID_RE.match(agent_id):
agent_id = f"agent_{index + 1}"
name = one_line(raw.get("name"), 80)
system_prompt = str(raw.get("systemPrompt") or raw.get("system_prompt") or "").strip()
if not name or not system_prompt:
return None
created_at = raw.get("createdAt") if isinstance(raw.get("createdAt"), (int, float)) else now_ms()
updated_at = raw.get("updatedAt") if isinstance(raw.get("updatedAt"), (int, float)) else now_ms()
return {
"id": agent_id,
"emoji": one_line(raw.get("emoji"), 16, "🤖"),
"name": name,
"desc": one_line(raw.get("desc"), 180),
"model": one_line(raw.get("model"), 180),
"modelProfileId": one_line(raw.get("modelProfileId") or raw.get("model_profile_id"), 80),
"systemPrompt": system_prompt[:20000],
"skills": string_list(raw.get("skills")),
"stages": normalize_stages(raw.get("stages")),
"createdAt": int(created_at),
"updatedAt": int(updated_at),
}
def normalize_ui_config(raw: dict[str, Any], runtime: dict[str, Any] | None = None) -> dict[str, Any]:
runtime_profile = runtime_model_profile(runtime or {})
model_profiles_raw = raw.get("modelProfiles") if isinstance(raw.get("modelProfiles"), list) else []
profiles: list[dict[str, Any]] = []
seen_profile_ids: set[str] = set()
for idx, item in enumerate(model_profiles_raw[:80]):
profile = normalize_model_profile(item, runtime_profile, idx)
if not profile or profile["id"] in seen_profile_ids:
continue
seen_profile_ids.add(profile["id"])
profiles.append(profile)
if not profiles:
profiles.append(runtime_profile)
if not any(profile.get("isDefault") for profile in profiles):
profiles[0]["isDefault"] = True
default_seen = False
for profile in profiles:
if profile.get("isDefault") and not default_seen:
default_seen = True
else:
profile["isDefault"] = False
agents_raw = raw.get("agents") if isinstance(raw.get("agents"), list) else []
agents: list[dict[str, Any]] = []
seen_agent_ids: set[str] = set()
for idx, item in enumerate(agents_raw[:200]):
agent = normalize_agent(item, idx)
if not agent or agent["id"] in seen_agent_ids:
continue
seen_agent_ids.add(agent["id"])
agents.append(agent)
return {
"version": 1,
"modelProfiles": profiles,
"agents": agents,
"updatedAt": now_ms(),
"configPath": Config.hermes_ui_config_path,
"lxc": Config.hermes_agent_lxc,
}
def read_ui_config_file() -> dict[str, Any]:
script = r'''
import json
import pathlib
import sys
path = pathlib.Path(sys.argv[1])
if not path.exists():
print("{}")
else:
print(path.read_text(encoding="utf-8"))
'''
raw = run_lxc_command(["python3", "-c", script, Config.hermes_ui_config_path], timeout=20)
if not raw.strip():
return {}
data = json.loads(raw)
return data if isinstance(data, dict) else {}
def write_ui_config_file(config: dict[str, Any]) -> None:
script = r'''
import json
import os
import pathlib
import sys
path = pathlib.Path(sys.argv[1])
payload = json.loads(sys.stdin.read())
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_name(path.name + f".tmp-{os.getpid()}")
tmp.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
tmp.chmod(0o600)
tmp.replace(path)
'''
run_lxc_command(
["python3", "-c", script, Config.hermes_ui_config_path],
input_text=json.dumps(config, ensure_ascii=False),
timeout=20,
)
def handle_ui_config_get(headers: dict[str, str]) -> tuple[int, dict[str, Any]]:
ok, status, message = is_admin_request(headers)
if not ok:
return status, {"code": status, "msg": message}
try:
runtime = read_hermes_runtime_config()
config = normalize_ui_config(read_ui_config_file(), runtime)
except Exception as exc:
logging.error("failed to read Hermes UI config:\n%s", traceback.format_exc())
return 500, {"code": 500, "msg": str(exc)}
return 200, {"code": 0, "msg": "ok", "config": config}
def handle_ui_config_post(headers: dict[str, str], body: dict[str, Any]) -> tuple[int, dict[str, Any]]:
ok, status, message = is_admin_request(headers)
if not ok:
return status, {"code": status, "msg": message}
try:
runtime = read_hermes_runtime_config()
raw_config = body.get("config") if isinstance(body.get("config"), dict) else body
config = normalize_ui_config(raw_config, runtime)
write_ui_config_file(config)
except ValueError as exc:
return 400, {"code": 400, "msg": str(exc)}
except Exception as exc:
logging.error("failed to write Hermes UI config:\n%s", traceback.format_exc())
return 500, {"code": 500, "msg": str(exc)}
logging.info(
"updated Hermes UI config model_profiles=%s agents=%s",
len(config.get("modelProfiles", [])),
len(config.get("agents", [])),
)
return 200, {"code": 0, "msg": "ok", "config": config}
def is_admin_request(headers: dict[str, str]) -> tuple[bool, int, str]:
cookie = headers.get("cookie", "")
if "hermes_auth=ok" not in cookie:
@@ -947,6 +1195,10 @@ class Handler(BaseHTTPRequestHandler):
status, payload = handle_hermes_config_get(headers)
self.send_json(status, payload)
return
if self.path == "/feishu/ui-config":
status, payload = handle_ui_config_get(headers)
self.send_json(status, payload)
return
self.send_json(404, {"code": 404, "msg": "not found"})
def do_POST(self) -> None:
@@ -959,6 +1211,8 @@ class Handler(BaseHTTPRequestHandler):
status, payload = handle_apps_admin(headers, body)
elif self.path == "/feishu/hermes-config":
status, payload = handle_hermes_config_post(headers, body)
elif self.path == "/feishu/ui-config":
status, payload = handle_ui_config_post(headers, body)
elif self.path == "/feishu/notify" or self.path.startswith("/feishu/notify/"):
status, payload = handle_notify(self.path, headers, body)
else: