auto-save 2026-05-11 14:50 (~4)

This commit is contained in:
2026-05-11 14:50:39 +08:00
parent 854152bb69
commit d95aed8917
4 changed files with 338 additions and 14 deletions

View File

@@ -13,6 +13,8 @@ import logging
import os
import hashlib
import re
import shlex
import subprocess
import threading
import time
import traceback
@@ -81,6 +83,9 @@ class Config:
hermes_api_base = _env("HERMES_API_BASE", "http://127.0.0.1:8642/v1").rstrip("/")
hermes_api_key = _env("HERMES_API_KEY")
hermes_model = _env("HERMES_MODEL", "gemini-3-pro-preview")
hermes_agent_lxc = _env("HERMES_AGENT_LXC", "hermes-personal")
hermes_agent_dir = _env("HERMES_AGENT_DIR", "/opt/hermes-agent")
incus_bin = _env("INCUS_BIN", "/usr/bin/incus")
hermes_system_prompt = _env(
"HERMES_SYSTEM_PROMPT",
"你是爱马仕 Hermes。你通过飞书与用户对话回答要直接、简洁、可执行。",
@@ -507,6 +512,251 @@ def handle_apps() -> tuple[int, dict[str, Any]]:
}
def run_lxc_command(args: list[str], input_text: str = "", timeout: float = 20) -> str:
cmd = [Config.incus_bin, "exec", Config.hermes_agent_lxc, "--", *args]
proc = subprocess.run(
cmd,
input=input_text,
text=True,
capture_output=True,
timeout=timeout,
check=False,
)
if proc.returncode != 0:
detail = (proc.stderr or proc.stdout or "").strip()
raise RuntimeError(detail or f"command failed with exit code {proc.returncode}")
return proc.stdout
def read_hermes_runtime_config() -> dict[str, Any]:
script = r'''
import json
import pathlib
import re
import sys
path = pathlib.Path(sys.argv[1])
text = path.read_text(encoding="utf-8")
lines = text.splitlines()
def clean_value(value):
value = value.strip()
if " #" in value:
value = value.split(" #", 1)[0].rstrip()
if len(value) >= 2 and value[0] == value[-1] and value[0] in ("'", '"'):
return value[1:-1]
return value
def read_mapping_block(key):
result = {}
in_block = False
for line in lines:
if re.match(rf"^{re.escape(key)}\s*:\s*$", line):
in_block = True
continue
if in_block:
if line and not line[0].isspace():
break
match = re.match(r"\s+([A-Za-z0-9_-]+)\s*:\s*(.*?)\s*$", line)
if match:
result[match.group(1)] = clean_value(match.group(2))
return result
def read_raw_block(key):
start = None
for idx, line in enumerate(lines):
if re.match(rf"^{re.escape(key)}\s*:\s*(?:#.*)?$", line):
start = idx
break
if start is None:
return ""
end = len(lines)
for idx in range(start + 1, len(lines)):
line = lines[idx]
if line and not line[0].isspace() and not line.lstrip().startswith("#"):
end = idx
break
return "\n".join(lines[start:end]).strip()
payload = {
"model": read_mapping_block("model"),
"mcp_servers_yaml": read_raw_block("mcp_servers"),
"config_path": str(path),
}
print(json.dumps(payload, ensure_ascii=False))
'''
path = f"{Config.hermes_agent_dir}/config.yaml"
raw = run_lxc_command(["python3", "-c", script, path], timeout=20)
data = json.loads(raw)
model = data.get("model") if isinstance(data.get("model"), dict) else {}
return {
"model": {
"default": str(model.get("default") or model.get("model") or ""),
"provider": str(model.get("provider") or ""),
"base_url": str(model.get("base_url") or ""),
},
"mcp_servers_yaml": str(data.get("mcp_servers_yaml") or ""),
"config_path": data.get("config_path") or path,
"lxc": Config.hermes_agent_lxc,
}
def normalize_mcp_yaml(value: str) -> str:
value = value.strip()
if not value:
return ""
first = next((line.strip() for line in value.splitlines() if line.strip()), "")
if re.match(r"^mcp_servers\s*:", first):
return value
indented = "\n".join((" " + line if line.strip() else line) for line in value.splitlines())
return "mcp_servers:\n" + indented
def validate_hermes_config_payload(body: dict[str, Any]) -> dict[str, str]:
model = body.get("model") if isinstance(body.get("model"), dict) else {}
default_model = str(model.get("default") or body.get("model_default") or "").strip()
provider = str(model.get("provider") or body.get("provider") or "openrouter").strip()
base_url = str(model.get("base_url") or body.get("base_url") or "").strip()
mcp_servers_yaml = normalize_mcp_yaml(str(body.get("mcp_servers_yaml") or ""))
for label, value, limit in (
("model.default", default_model, 180),
("model.provider", provider, 80),
("model.base_url", base_url, 220),
):
if "\n" in value or "\r" in value:
raise ValueError(f"{label} must be a single line")
if len(value) > limit:
raise ValueError(f"{label} is too long")
if not default_model:
raise ValueError("model.default is required")
if base_url and not re.match(r"^https?://", base_url):
raise ValueError("model.base_url must start with http:// or https://")
if len(mcp_servers_yaml) > 20000:
raise ValueError("mcp_servers_yaml is too large")
if mcp_servers_yaml and not re.match(r"^mcp_servers\s*:", mcp_servers_yaml.splitlines()[0].strip()):
raise ValueError("mcp_servers_yaml must start with mcp_servers:")
return {
"default_model": default_model,
"provider": provider,
"base_url": base_url,
"mcp_servers_yaml": mcp_servers_yaml,
}
def write_hermes_runtime_config(body: dict[str, Any]) -> dict[str, Any]:
payload = validate_hermes_config_payload(body)
script = r'''
import json
import pathlib
import re
import shutil
import sys
import time
path = pathlib.Path(sys.argv[1])
payload = json.loads(sys.stdin.read())
text = path.read_text(encoding="utf-8")
def quote(value):
return json.dumps(value, ensure_ascii=False)
def find_block(lines, key):
start = None
for idx, line in enumerate(lines):
if re.match(rf"^{re.escape(key)}\s*:\s*(?:#.*)?$", line):
start = idx
break
if start is None:
return None, None
end = len(lines)
for idx in range(start + 1, len(lines)):
line = lines[idx]
if line and not line[0].isspace() and not line.lstrip().startswith("#"):
end = idx
break
return start, end
def replace_block(text, key, block):
lines = text.splitlines()
start, end = find_block(lines, key)
block_lines = block.rstrip().splitlines() if block.strip() else []
if start is None:
if not block_lines:
return text.rstrip() + "\n"
return text.rstrip() + "\n\n" + "\n".join(block_lines) + "\n"
if block_lines:
new_lines = lines[:start] + block_lines + lines[end:]
else:
new_lines = lines[:start] + lines[end:]
return "\n".join(new_lines).rstrip() + "\n"
model_block = "\n".join([
"model:",
f" default: {quote(payload['default_model'])}",
f" provider: {quote(payload['provider'])}",
f" base_url: {quote(payload['base_url'])}",
])
text = replace_block(text, "model", model_block)
text = replace_block(text, "mcp_servers", payload.get("mcp_servers_yaml", ""))
backup = path.with_name(path.name + ".bak-" + time.strftime("%Y%m%d%H%M%S"))
shutil.copy2(path, backup)
path.write_text(text, encoding="utf-8")
print(json.dumps({"backup": str(backup)}, ensure_ascii=False))
'''
path = f"{Config.hermes_agent_dir}/config.yaml"
raw = run_lxc_command(
["python3", "-c", script, path],
input_text=json.dumps(payload, ensure_ascii=False),
timeout=20,
)
result = json.loads(raw)
if body.get("restart", True):
run_lxc_command(
["bash", "-lc", f"cd {shlex.quote(Config.hermes_agent_dir)} && docker compose restart hermes-agent"],
timeout=90,
)
return {
"model": {
"default": payload["default_model"],
"provider": payload["provider"],
"base_url": payload["base_url"],
},
"mcp_servers_yaml": payload["mcp_servers_yaml"],
"backup": result.get("backup"),
"restarted": bool(body.get("restart", True)),
}
def handle_hermes_config_get(headers: dict[str, str]) -> tuple[int, dict[str, Any]]:
ok, status, message = is_admin_request(headers)
if not ok:
return status, {"code": status, "msg": message}
try:
config = read_hermes_runtime_config()
except Exception as exc:
logging.error("failed to read Hermes config:\n%s", traceback.format_exc())
return 500, {"code": 500, "msg": str(exc)}
return 200, {"code": 0, "msg": "ok", "config": config}
def handle_hermes_config_post(headers: dict[str, str], body: dict[str, Any]) -> tuple[int, dict[str, Any]]:
ok, status, message = is_admin_request(headers)
if not ok:
return status, {"code": status, "msg": message}
try:
config = write_hermes_runtime_config(body)
except ValueError as exc:
return 400, {"code": 400, "msg": str(exc)}
except Exception as exc:
logging.error("failed to write Hermes config:\n%s", traceback.format_exc())
return 500, {"code": 500, "msg": str(exc)}
logging.info("updated Hermes runtime config model=%s", config["model"]["default"])
return 200, {"code": 0, "msg": "ok", "config": config}
def is_admin_request(headers: dict[str, str]) -> tuple[bool, int, str]:
cookie = headers.get("cookie", "")
if "hermes_auth=ok" not in cookie:
@@ -681,6 +931,7 @@ class Handler(BaseHTTPRequestHandler):
server_version = "HermesFeishuBridge/1.0"
def do_GET(self) -> None:
headers = {key.lower(): value for key, value in self.headers.items()}
if self.path == "/health":
self.send_json(200, {"ok": True, "service": "feishu-bridge"})
return
@@ -688,6 +939,10 @@ class Handler(BaseHTTPRequestHandler):
status, payload = handle_apps()
self.send_json(status, payload)
return
if self.path == "/feishu/hermes-config":
status, payload = handle_hermes_config_get(headers)
self.send_json(status, payload)
return
self.send_json(404, {"code": 404, "msg": "not found"})
def do_POST(self) -> None:
@@ -698,6 +953,8 @@ class Handler(BaseHTTPRequestHandler):
status, payload = handle_feishu_event(self.path, body)
elif self.path == "/feishu/apps":
status, payload = handle_apps_admin(headers, body)
elif self.path == "/feishu/hermes-config":
status, payload = handle_hermes_config_post(headers, body)
elif self.path == "/feishu/notify" or self.path.startswith("/feishu/notify/"):
status, payload = handle_notify(self.path, headers, body)
else: