1280 lines
45 KiB
Python
1280 lines
45 KiB
Python
#!/usr/bin/env python3
|
||
"""Feishu <-> Hermes bridge.
|
||
|
||
This service intentionally uses only the Python standard library so it can run
|
||
next to the existing no-build Hermes UI deployment without adding package
|
||
management to the project.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import os
|
||
import hashlib
|
||
import re
|
||
import shlex
|
||
import subprocess
|
||
import threading
|
||
import time
|
||
import traceback
|
||
import urllib.error
|
||
import urllib.parse
|
||
import urllib.request
|
||
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
||
from typing import Any
|
||
|
||
|
||
FEISHU_API_BASE = "https://open.feishu.cn/open-apis"
|
||
APP_ID_RE = re.compile(r"^cli_[A-Za-z0-9]+$")
|
||
UI_ID_RE = re.compile(r"^[A-Za-z0-9_-]{1,80}$")
|
||
|
||
|
||
def _env(name: str, default: str = "") -> str:
|
||
return os.environ.get(name, default).strip()
|
||
|
||
|
||
def _bool_env(name: str, default: str = "false") -> bool:
|
||
return _env(name, default).lower() in {"1", "true", "yes"}
|
||
|
||
|
||
def _csv_set(value: str) -> set[str]:
|
||
return {item.strip() for item in value.split(",") if item.strip()}
|
||
|
||
|
||
def _csv_list(value: str) -> list[str]:
|
||
return [item.strip() for item in value.split(",") if item.strip()]
|
||
|
||
|
||
def app_suffixes() -> list[str]:
|
||
return ["", *[f"_{idx}" for idx in range(2, 21)]]
|
||
|
||
|
||
def _load_feishu_apps() -> dict[str, dict[str, Any]]:
|
||
apps: dict[str, dict[str, Any]] = {}
|
||
for suffix in app_suffixes():
|
||
app_id = _env(f"FEISHU_APP_ID{suffix}")
|
||
app_secret = _env(f"FEISHU_APP_SECRET{suffix}")
|
||
if not app_id and not app_secret:
|
||
continue
|
||
apps[app_id] = {
|
||
"app_id": app_id,
|
||
"app_secret": app_secret,
|
||
"verification_tokens": _csv_list(_env(f"FEISHU_VERIFICATION_TOKEN{suffix}")),
|
||
"default_receive_id": _env(f"FEISHU_DEFAULT_RECEIVE_ID{suffix}"),
|
||
"default_receive_id_type": _env(
|
||
f"FEISHU_DEFAULT_RECEIVE_ID_TYPE{suffix}",
|
||
"chat_id",
|
||
),
|
||
"allowed_chat_ids": _csv_set(_env(f"FEISHU_ALLOWED_CHAT_IDS{suffix}")),
|
||
}
|
||
return apps
|
||
|
||
|
||
class Config:
|
||
env_file = _env("FEISHU_BRIDGE_ENV_FILE", "/etc/hermes-feishu-bridge.env")
|
||
tokens_file = _env("FEISHU_BRIDGE_TOKENS_FILE", "/root/hermes-feishu-bridge.tokens")
|
||
feishu_app_id = _env("FEISHU_APP_ID")
|
||
feishu_app_secret = _env("FEISHU_APP_SECRET")
|
||
feishu_apps = _load_feishu_apps()
|
||
default_feishu_app_id = _env("FEISHU_DEFAULT_APP_ID", feishu_app_id)
|
||
notify_token = _env("FEISHU_NOTIFY_TOKEN")
|
||
reply_in_thread = _bool_env("FEISHU_REPLY_IN_THREAD")
|
||
|
||
hermes_api_base = _env("HERMES_API_BASE", "http://127.0.0.1:8642/v1").rstrip("/")
|
||
hermes_api_key = _env("HERMES_API_KEY")
|
||
hermes_model = _env("HERMES_MODEL", "gemini-3-pro-preview")
|
||
hermes_agent_lxc = _env("HERMES_AGENT_LXC", "hermes-personal")
|
||
hermes_agent_dir = _env("HERMES_AGENT_DIR", "/opt/hermes-agent")
|
||
hermes_ui_config_path = _env(
|
||
"HERMES_UI_CONFIG_PATH",
|
||
os.path.join(hermes_agent_dir, "hermes-ui-config.json"),
|
||
)
|
||
incus_bin = _env("INCUS_BIN", "/usr/bin/incus")
|
||
hermes_system_prompt = _env(
|
||
"HERMES_SYSTEM_PROMPT",
|
||
"你是爱马仕 Hermes。你通过飞书与用户对话,回答要直接、简洁、可执行。",
|
||
)
|
||
request_timeout = float(_env("FEISHU_BRIDGE_TIMEOUT", "60"))
|
||
port = int(_env("PORT", _env("FEISHU_BRIDGE_PORT", "8787")))
|
||
|
||
|
||
class TokenCache:
|
||
def __init__(self) -> None:
|
||
self._lock = threading.Lock()
|
||
self._tokens: dict[str, tuple[str, float]] = {}
|
||
|
||
def get(self, app_id: str) -> str:
|
||
with self._lock:
|
||
token, expires_at = self._tokens.get(app_id, ("", 0.0))
|
||
if token and time.time() < expires_at - 300:
|
||
return token
|
||
|
||
app = Config.feishu_apps.get(app_id)
|
||
if not app:
|
||
raise RuntimeError(f"unknown Feishu app_id: {app_id}")
|
||
if not app.get("app_id") or not app.get("app_secret"):
|
||
raise RuntimeError(f"FEISHU_APP_ID and FEISHU_APP_SECRET are required for {app_id}")
|
||
|
||
payload = {
|
||
"app_id": app["app_id"],
|
||
"app_secret": app["app_secret"],
|
||
}
|
||
data = http_json(
|
||
"POST",
|
||
f"{FEISHU_API_BASE}/auth/v3/tenant_access_token/internal",
|
||
payload,
|
||
)
|
||
if data.get("code") != 0:
|
||
raise RuntimeError(f"Feishu token error: {data}")
|
||
|
||
token = str(data["tenant_access_token"])
|
||
expires_at = time.time() + int(data.get("expire", 7200))
|
||
self._tokens[app_id] = (token, expires_at)
|
||
return token
|
||
|
||
def drop(self, app_id: str) -> None:
|
||
with self._lock:
|
||
self._tokens.pop(app_id, None)
|
||
|
||
|
||
token_cache = TokenCache()
|
||
seen_events: dict[str, float] = {}
|
||
seen_lock = threading.Lock()
|
||
|
||
|
||
def parse_env_file(path: str) -> dict[str, str]:
|
||
values: dict[str, str] = {}
|
||
if not os.path.exists(path):
|
||
return values
|
||
with open(path, "r", encoding="utf-8") as fh:
|
||
for raw in fh:
|
||
line = raw.strip()
|
||
if not line or line.startswith("#") or "=" not in line:
|
||
continue
|
||
key, value = line.split("=", 1)
|
||
values[key.strip()] = value.strip()
|
||
return values
|
||
|
||
|
||
def write_env_updates(path: str, updates: dict[str, str]) -> None:
|
||
lines: list[str] = []
|
||
seen: set[str] = set()
|
||
if os.path.exists(path):
|
||
with open(path, "r", encoding="utf-8") as fh:
|
||
for raw in fh.read().splitlines():
|
||
if "=" in raw and not raw.lstrip().startswith("#"):
|
||
key = raw.split("=", 1)[0]
|
||
if key in updates:
|
||
lines.append(f"{key}={updates[key]}")
|
||
seen.add(key)
|
||
continue
|
||
lines.append(raw)
|
||
for key, value in updates.items():
|
||
if key not in seen:
|
||
lines.append(f"{key}={value}")
|
||
tmp = f"{path}.tmp-{os.getpid()}"
|
||
with open(tmp, "w", encoding="utf-8") as fh:
|
||
fh.write("\n".join(lines) + "\n")
|
||
os.chmod(tmp, 0o600)
|
||
os.replace(tmp, path)
|
||
|
||
|
||
def write_env_removals(path: str, remove_keys: set[str], updates: dict[str, str] | None = None) -> None:
|
||
updates = updates or {}
|
||
lines: list[str] = []
|
||
seen: set[str] = set()
|
||
if os.path.exists(path):
|
||
with open(path, "r", encoding="utf-8") as fh:
|
||
for raw in fh.read().splitlines():
|
||
if "=" in raw and not raw.lstrip().startswith("#"):
|
||
key = raw.split("=", 1)[0].strip()
|
||
if key in remove_keys:
|
||
continue
|
||
if key in updates:
|
||
lines.append(f"{key}={updates[key]}")
|
||
seen.add(key)
|
||
continue
|
||
lines.append(raw)
|
||
for key, value in updates.items():
|
||
if key not in seen:
|
||
lines.append(f"{key}={value}")
|
||
tmp = f"{path}.tmp-{os.getpid()}"
|
||
with open(tmp, "w", encoding="utf-8") as fh:
|
||
fh.write("\n".join(lines) + "\n")
|
||
os.chmod(tmp, 0o600)
|
||
os.replace(tmp, path)
|
||
|
||
|
||
def reload_config_from_env_file() -> None:
|
||
values = parse_env_file(Config.env_file)
|
||
for suffix in app_suffixes():
|
||
for prefix in (
|
||
"FEISHU_APP_ID",
|
||
"FEISHU_APP_SECRET",
|
||
"FEISHU_VERIFICATION_TOKEN",
|
||
"FEISHU_DEFAULT_RECEIVE_ID",
|
||
"FEISHU_DEFAULT_RECEIVE_ID_TYPE",
|
||
"FEISHU_ALLOWED_CHAT_IDS",
|
||
):
|
||
key = f"{prefix}{suffix}"
|
||
if key not in values:
|
||
os.environ.pop(key, None)
|
||
if "FEISHU_DEFAULT_APP_ID" not in values:
|
||
os.environ.pop("FEISHU_DEFAULT_APP_ID", None)
|
||
for key, value in values.items():
|
||
os.environ[key] = value
|
||
Config.feishu_app_id = _env("FEISHU_APP_ID")
|
||
Config.feishu_app_secret = _env("FEISHU_APP_SECRET")
|
||
Config.feishu_apps = _load_feishu_apps()
|
||
Config.default_feishu_app_id = _env("FEISHU_DEFAULT_APP_ID", Config.feishu_app_id)
|
||
|
||
|
||
def sync_tokens_file() -> None:
|
||
env = parse_env_file(Config.env_file)
|
||
lines: list[str] = []
|
||
notify_token = env.get("FEISHU_NOTIFY_TOKEN")
|
||
for suffix in app_suffixes():
|
||
app_id = env.get(f"FEISHU_APP_ID{suffix}", "")
|
||
if not app_id:
|
||
continue
|
||
label = suffix or ""
|
||
lines.append(f"FEISHU_CALLBACK_URL{label}=https://hermes.kang-kang.com/feishu/events/{app_id}")
|
||
lines.append(f"FEISHU_APP_ID{label}={app_id}")
|
||
token = env.get(f"FEISHU_VERIFICATION_TOKEN{suffix}", "")
|
||
if token:
|
||
lines.append(f"FEISHU_VERIFICATION_TOKEN{label}={token}")
|
||
if notify_token:
|
||
lines.append(f"FEISHU_NOTIFY_TOKEN={notify_token}")
|
||
tmp = f"{Config.tokens_file}.tmp-{os.getpid()}"
|
||
with open(tmp, "w", encoding="utf-8") as fh:
|
||
fh.write("\n".join(lines) + "\n")
|
||
os.chmod(tmp, 0o600)
|
||
os.replace(tmp, Config.tokens_file)
|
||
|
||
|
||
def http_json(
|
||
method: str,
|
||
url: str,
|
||
payload: dict[str, Any],
|
||
headers: dict[str, str] | None = None,
|
||
timeout: float | None = None,
|
||
) -> dict[str, Any]:
|
||
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
||
req = urllib.request.Request(
|
||
url,
|
||
data=body,
|
||
method=method,
|
||
headers={
|
||
"Content-Type": "application/json; charset=utf-8",
|
||
**(headers or {}),
|
||
},
|
||
)
|
||
try:
|
||
with urllib.request.urlopen(req, timeout=timeout or Config.request_timeout) as resp:
|
||
raw = resp.read().decode("utf-8")
|
||
except urllib.error.HTTPError as exc:
|
||
raw = exc.read().decode("utf-8", errors="replace")
|
||
raise RuntimeError(f"HTTP {exc.code} {url}: {raw}") from exc
|
||
|
||
if not raw:
|
||
return {}
|
||
return json.loads(raw)
|
||
|
||
|
||
def json_text(value: Any) -> str:
|
||
if isinstance(value, str):
|
||
try:
|
||
value = json.loads(value)
|
||
except json.JSONDecodeError:
|
||
return value
|
||
if isinstance(value, dict):
|
||
for key in ("text", "title", "content"):
|
||
text = value.get(key)
|
||
if isinstance(text, str) and text.strip():
|
||
return text.strip()
|
||
return ""
|
||
|
||
|
||
def resolve_app_id(path: str, body: dict[str, Any] | None = None) -> str:
|
||
route_app_id = ""
|
||
for prefix in ("/feishu/events/", "/feishu/notify/"):
|
||
if path.startswith(prefix):
|
||
route_app_id = urllib.parse.unquote(path[len(prefix) :].strip("/"))
|
||
break
|
||
if route_app_id:
|
||
return route_app_id
|
||
|
||
body = body or {}
|
||
body_app_id = str(body.get("app_id") or body.get("header", {}).get("app_id") or "").strip()
|
||
if body_app_id in Config.feishu_apps:
|
||
return body_app_id
|
||
return Config.default_feishu_app_id
|
||
|
||
|
||
def callback_token(body: dict[str, Any]) -> str:
|
||
return str(body.get("token") or body.get("header", {}).get("token") or "")
|
||
|
||
|
||
def token_digest(value: str) -> str:
|
||
if not value:
|
||
return "(empty)"
|
||
return f"len={len(value)} sha256={hashlib.sha256(value.encode('utf-8')).hexdigest()[:12]}"
|
||
|
||
|
||
def verify_callback_token(body: dict[str, Any], app_id: str) -> bool:
|
||
app = Config.feishu_apps.get(app_id, {})
|
||
expected_tokens = app.get("verification_tokens", [])
|
||
if not expected_tokens:
|
||
return True
|
||
token = callback_token(body)
|
||
ok = token in expected_tokens
|
||
if not ok:
|
||
logging.warning(
|
||
"invalid Feishu verification token app_id=%s got=%s expected_any=%s body_keys=%s header_keys=%s event_keys=%s",
|
||
app_id,
|
||
token_digest(token),
|
||
[token_digest(item) for item in expected_tokens],
|
||
sorted(body.keys()),
|
||
sorted(body.get("header", {}).keys()) if isinstance(body.get("header"), dict) else [],
|
||
sorted(body.get("event", {}).keys()) if isinstance(body.get("event"), dict) else [],
|
||
)
|
||
return ok
|
||
|
||
|
||
def remember_event(event_id: str) -> bool:
|
||
if not event_id:
|
||
return True
|
||
now = time.time()
|
||
with seen_lock:
|
||
stale = [key for key, ts in seen_events.items() if now - ts > 3600]
|
||
for key in stale:
|
||
seen_events.pop(key, None)
|
||
if event_id in seen_events:
|
||
return False
|
||
seen_events[event_id] = now
|
||
return True
|
||
|
||
|
||
def handle_feishu_event(path: str, body: dict[str, Any]) -> tuple[int, dict[str, Any]]:
|
||
app_id = resolve_app_id(path, body)
|
||
if app_id not in Config.feishu_apps:
|
||
return 404, {"code": 404, "msg": f"unknown Feishu app_id: {app_id}"}
|
||
|
||
if "encrypt" in body:
|
||
return 400, {
|
||
"code": 400,
|
||
"msg": "encrypted callbacks are not enabled; disable Feishu event encryption or add decrypt support",
|
||
}
|
||
|
||
if body.get("type") == "url_verification":
|
||
if not verify_callback_token(body, app_id):
|
||
return 403, {"code": 403, "msg": "invalid verification token"}
|
||
return 200, {"challenge": body.get("challenge", "")}
|
||
|
||
if not verify_callback_token(body, app_id):
|
||
return 403, {"code": 403, "msg": "invalid verification token"}
|
||
|
||
event_type = body.get("header", {}).get("event_type") or body.get("event", {}).get("type")
|
||
event_id = body.get("header", {}).get("event_id", "")
|
||
if event_type != "im.message.receive_v1":
|
||
return 200, {"code": 0, "msg": "ignored"}
|
||
if not remember_event(f"{app_id}:{event_id}"):
|
||
return 200, {"code": 0, "msg": "duplicate"}
|
||
|
||
threading.Thread(target=process_message_event, args=(app_id, body), daemon=True).start()
|
||
return 200, {"code": 0, "msg": "ok"}
|
||
|
||
|
||
def process_message_event(app_id: str, body: dict[str, Any]) -> None:
|
||
try:
|
||
app = Config.feishu_apps[app_id]
|
||
event = body.get("event", {})
|
||
sender = event.get("sender", {})
|
||
if sender.get("sender_type") == "app":
|
||
return
|
||
|
||
message = event.get("message", {})
|
||
message_type = message.get("message_type")
|
||
chat_id = message.get("chat_id", "")
|
||
message_id = message.get("message_id", "")
|
||
allowed_chat_ids = app.get("allowed_chat_ids", set())
|
||
if allowed_chat_ids and chat_id not in allowed_chat_ids:
|
||
logging.info("ignored message from disallowed app_id=%s chat_id=%s", app_id, chat_id)
|
||
return
|
||
|
||
if message_type != "text":
|
||
send_feishu_text(app_id, chat_id, "我现在只支持处理文本消息。", message_id=message_id)
|
||
return
|
||
|
||
text = json_text(message.get("content"))
|
||
if not text:
|
||
send_feishu_text(app_id, chat_id, "我没有读到文本内容。", message_id=message_id)
|
||
return
|
||
|
||
answer = ask_hermes(text, event)
|
||
send_feishu_text(app_id, chat_id, answer, message_id=message_id)
|
||
except Exception:
|
||
logging.error("failed to process Feishu event:\n%s", traceback.format_exc())
|
||
|
||
|
||
def ask_hermes(text: str, event: dict[str, Any]) -> str:
|
||
payload = {
|
||
"model": Config.hermes_model,
|
||
"stream": False,
|
||
"messages": [
|
||
{"role": "system", "content": Config.hermes_system_prompt},
|
||
{
|
||
"role": "user",
|
||
"content": (
|
||
"以下消息来自飞书。\n"
|
||
f"chat_id: {event.get('message', {}).get('chat_id', '')}\n\n"
|
||
f"{text}"
|
||
),
|
||
},
|
||
],
|
||
}
|
||
headers = {}
|
||
if Config.hermes_api_key:
|
||
headers["Authorization"] = f"Bearer {Config.hermes_api_key}"
|
||
data = http_json(
|
||
"POST",
|
||
f"{Config.hermes_api_base}/chat/completions",
|
||
payload,
|
||
headers=headers,
|
||
)
|
||
try:
|
||
content = data["choices"][0]["message"]["content"]
|
||
except (KeyError, IndexError, TypeError) as exc:
|
||
raise RuntimeError(f"Unexpected Hermes response: {data}") from exc
|
||
return str(content).strip() or "Hermes 没有返回内容。"
|
||
|
||
|
||
def send_feishu_text(
|
||
app_id: str,
|
||
receive_id: str,
|
||
text: str,
|
||
receive_id_type: str = "chat_id",
|
||
message_id: str = "",
|
||
) -> dict[str, Any]:
|
||
if not receive_id:
|
||
raise RuntimeError("receive_id is required")
|
||
|
||
token = token_cache.get(app_id)
|
||
chunks = split_text(text)
|
||
result: dict[str, Any] = {}
|
||
for chunk in chunks:
|
||
content = json.dumps({"text": chunk}, ensure_ascii=False)
|
||
if message_id:
|
||
url = f"{FEISHU_API_BASE}/im/v1/messages/{message_id}/reply"
|
||
payload = {"msg_type": "text", "content": content}
|
||
else:
|
||
url = f"{FEISHU_API_BASE}/im/v1/messages?receive_id_type={receive_id_type}"
|
||
payload = {"receive_id": receive_id, "msg_type": "text", "content": content}
|
||
result = http_json(
|
||
"POST",
|
||
url,
|
||
payload,
|
||
headers={"Authorization": f"Bearer {token}"},
|
||
)
|
||
if result.get("code") not in (None, 0):
|
||
raise RuntimeError(f"Feishu send error: {result}")
|
||
return result
|
||
|
||
|
||
def split_text(text: str, limit: int = 3800) -> list[str]:
|
||
text = text.strip()
|
||
if not text:
|
||
return [""]
|
||
chunks: list[str] = []
|
||
rest = text
|
||
while len(rest) > limit:
|
||
cut = rest.rfind("\n", 0, limit)
|
||
if cut < limit // 2:
|
||
cut = limit
|
||
chunks.append(rest[:cut].strip())
|
||
rest = rest[cut:].strip()
|
||
chunks.append(rest)
|
||
return chunks
|
||
|
||
|
||
def handle_apps() -> tuple[int, dict[str, Any]]:
|
||
apps = []
|
||
for app_id, app in Config.feishu_apps.items():
|
||
apps.append({
|
||
"app_id": app_id,
|
||
"callback_url": f"https://hermes.kang-kang.com/feishu/events/{app_id}",
|
||
"default_receive_id_type": app.get("default_receive_id_type", "chat_id"),
|
||
"has_default_receive_id": bool(app.get("default_receive_id")),
|
||
"allowed_chat_ids_count": len(app.get("allowed_chat_ids", set())),
|
||
"verification_tokens_count": len(app.get("verification_tokens", [])),
|
||
})
|
||
return 200, {
|
||
"code": 0,
|
||
"service": "hermes-feishu-bridge",
|
||
"default_app_id": Config.default_feishu_app_id,
|
||
"apps": apps,
|
||
}
|
||
|
||
|
||
def run_lxc_command(args: list[str], input_text: str = "", timeout: float = 20) -> str:
|
||
cmd = [Config.incus_bin, "exec", Config.hermes_agent_lxc, "--", *args]
|
||
proc = subprocess.run(
|
||
cmd,
|
||
input=input_text,
|
||
text=True,
|
||
capture_output=True,
|
||
timeout=timeout,
|
||
check=False,
|
||
)
|
||
if proc.returncode != 0:
|
||
detail = (proc.stderr or proc.stdout or "").strip()
|
||
raise RuntimeError(detail or f"command failed with exit code {proc.returncode}")
|
||
return proc.stdout
|
||
|
||
|
||
def read_hermes_runtime_config() -> dict[str, Any]:
|
||
script = r'''
|
||
import json
|
||
import pathlib
|
||
import re
|
||
import sys
|
||
|
||
path = pathlib.Path(sys.argv[1])
|
||
text = path.read_text(encoding="utf-8")
|
||
lines = text.splitlines()
|
||
|
||
def clean_value(value):
|
||
value = value.strip()
|
||
if " #" in value:
|
||
value = value.split(" #", 1)[0].rstrip()
|
||
if len(value) >= 2 and value[0] == value[-1] and value[0] in ("'", '"'):
|
||
return value[1:-1]
|
||
return value
|
||
|
||
def read_mapping_block(key):
|
||
result = {}
|
||
in_block = False
|
||
for line in lines:
|
||
if re.match(rf"^{re.escape(key)}\s*:\s*$", line):
|
||
in_block = True
|
||
continue
|
||
if in_block:
|
||
if line and not line[0].isspace():
|
||
break
|
||
match = re.match(r"\s+([A-Za-z0-9_-]+)\s*:\s*(.*?)\s*$", line)
|
||
if match:
|
||
result[match.group(1)] = clean_value(match.group(2))
|
||
return result
|
||
|
||
def read_raw_block(key):
|
||
start = None
|
||
for idx, line in enumerate(lines):
|
||
if re.match(rf"^{re.escape(key)}\s*:\s*(?:#.*)?$", line):
|
||
start = idx
|
||
break
|
||
if start is None:
|
||
return ""
|
||
end = len(lines)
|
||
for idx in range(start + 1, len(lines)):
|
||
line = lines[idx]
|
||
if line and not line[0].isspace() and not line.lstrip().startswith("#"):
|
||
end = idx
|
||
break
|
||
return "\n".join(lines[start:end]).strip()
|
||
|
||
payload = {
|
||
"model": read_mapping_block("model"),
|
||
"mcp_servers_yaml": read_raw_block("mcp_servers"),
|
||
"config_path": str(path),
|
||
}
|
||
print(json.dumps(payload, ensure_ascii=False))
|
||
'''
|
||
path = f"{Config.hermes_agent_dir}/config.yaml"
|
||
raw = run_lxc_command(["python3", "-c", script, path], timeout=20)
|
||
data = json.loads(raw)
|
||
model = data.get("model") if isinstance(data.get("model"), dict) else {}
|
||
return {
|
||
"model": {
|
||
"default": str(model.get("default") or model.get("model") or ""),
|
||
"provider": str(model.get("provider") or ""),
|
||
"base_url": str(model.get("base_url") or ""),
|
||
},
|
||
"mcp_servers_yaml": str(data.get("mcp_servers_yaml") or ""),
|
||
"config_path": data.get("config_path") or path,
|
||
"lxc": Config.hermes_agent_lxc,
|
||
}
|
||
|
||
|
||
def normalize_mcp_yaml(value: str) -> str:
|
||
value = value.strip()
|
||
if not value:
|
||
return ""
|
||
first = next((line.strip() for line in value.splitlines() if line.strip()), "")
|
||
if re.match(r"^mcp_servers\s*:", first):
|
||
return value
|
||
indented = "\n".join((" " + line if line.strip() else line) for line in value.splitlines())
|
||
return "mcp_servers:\n" + indented
|
||
|
||
|
||
def validate_hermes_config_payload(body: dict[str, Any], current: dict[str, Any] | None = None) -> dict[str, str]:
|
||
current_model = (current or {}).get("model") if isinstance((current or {}).get("model"), dict) else {}
|
||
model = body.get("model") if isinstance(body.get("model"), dict) else {}
|
||
default_model = str(
|
||
model.get("default")
|
||
or body.get("model_default")
|
||
or current_model.get("default")
|
||
or current_model.get("model")
|
||
or ""
|
||
).strip()
|
||
provider = str(
|
||
model.get("provider")
|
||
or body.get("provider")
|
||
or current_model.get("provider")
|
||
or "openrouter"
|
||
).strip()
|
||
base_url = str(
|
||
model.get("base_url")
|
||
or body.get("base_url")
|
||
or current_model.get("base_url")
|
||
or ""
|
||
).strip()
|
||
if "mcp_servers_yaml" in body:
|
||
mcp_servers_yaml = normalize_mcp_yaml(str(body.get("mcp_servers_yaml") or ""))
|
||
else:
|
||
mcp_servers_yaml = str((current or {}).get("mcp_servers_yaml") or "")
|
||
|
||
for label, value, limit in (
|
||
("model.default", default_model, 180),
|
||
("model.provider", provider, 80),
|
||
("model.base_url", base_url, 220),
|
||
):
|
||
if "\n" in value or "\r" in value:
|
||
raise ValueError(f"{label} must be a single line")
|
||
if len(value) > limit:
|
||
raise ValueError(f"{label} is too long")
|
||
if not default_model:
|
||
raise ValueError("model.default is required")
|
||
if base_url and not re.match(r"^https?://", base_url):
|
||
raise ValueError("model.base_url must start with http:// or https://")
|
||
if len(mcp_servers_yaml) > 20000:
|
||
raise ValueError("mcp_servers_yaml is too large")
|
||
if mcp_servers_yaml and not re.match(r"^mcp_servers\s*:", mcp_servers_yaml.splitlines()[0].strip()):
|
||
raise ValueError("mcp_servers_yaml must start with mcp_servers:")
|
||
|
||
return {
|
||
"default_model": default_model,
|
||
"provider": provider,
|
||
"base_url": base_url,
|
||
"mcp_servers_yaml": mcp_servers_yaml,
|
||
}
|
||
|
||
|
||
def write_hermes_runtime_config(body: dict[str, Any]) -> dict[str, Any]:
|
||
current = read_hermes_runtime_config()
|
||
payload = validate_hermes_config_payload(body, current)
|
||
script = r'''
|
||
import json
|
||
import pathlib
|
||
import re
|
||
import shutil
|
||
import sys
|
||
import time
|
||
|
||
path = pathlib.Path(sys.argv[1])
|
||
payload = json.loads(sys.stdin.read())
|
||
text = path.read_text(encoding="utf-8")
|
||
|
||
def quote(value):
|
||
return json.dumps(value, ensure_ascii=False)
|
||
|
||
def find_block(lines, key):
|
||
start = None
|
||
for idx, line in enumerate(lines):
|
||
if re.match(rf"^{re.escape(key)}\s*:\s*(?:#.*)?$", line):
|
||
start = idx
|
||
break
|
||
if start is None:
|
||
return None, None
|
||
end = len(lines)
|
||
for idx in range(start + 1, len(lines)):
|
||
line = lines[idx]
|
||
if line and not line[0].isspace() and not line.lstrip().startswith("#"):
|
||
end = idx
|
||
break
|
||
return start, end
|
||
|
||
def replace_block(text, key, block):
|
||
lines = text.splitlines()
|
||
start, end = find_block(lines, key)
|
||
block_lines = block.rstrip().splitlines() if block.strip() else []
|
||
if start is None:
|
||
if not block_lines:
|
||
return text.rstrip() + "\n"
|
||
return text.rstrip() + "\n\n" + "\n".join(block_lines) + "\n"
|
||
if block_lines:
|
||
new_lines = lines[:start] + block_lines + lines[end:]
|
||
else:
|
||
new_lines = lines[:start] + lines[end:]
|
||
return "\n".join(new_lines).rstrip() + "\n"
|
||
|
||
model_block = "\n".join([
|
||
"model:",
|
||
f" default: {quote(payload['default_model'])}",
|
||
f" provider: {quote(payload['provider'])}",
|
||
f" base_url: {quote(payload['base_url'])}",
|
||
])
|
||
|
||
text = replace_block(text, "model", model_block)
|
||
text = replace_block(text, "mcp_servers", payload.get("mcp_servers_yaml", ""))
|
||
backup = path.with_name(path.name + ".bak-" + time.strftime("%Y%m%d%H%M%S"))
|
||
shutil.copy2(path, backup)
|
||
path.write_text(text, encoding="utf-8")
|
||
print(json.dumps({"backup": str(backup)}, ensure_ascii=False))
|
||
'''
|
||
path = f"{Config.hermes_agent_dir}/config.yaml"
|
||
raw = run_lxc_command(
|
||
["python3", "-c", script, path],
|
||
input_text=json.dumps(payload, ensure_ascii=False),
|
||
timeout=20,
|
||
)
|
||
result = json.loads(raw)
|
||
if body.get("restart", True):
|
||
run_lxc_command(
|
||
["bash", "-lc", f"cd {shlex.quote(Config.hermes_agent_dir)} && docker compose restart hermes-agent"],
|
||
timeout=90,
|
||
)
|
||
return {
|
||
"model": {
|
||
"default": payload["default_model"],
|
||
"provider": payload["provider"],
|
||
"base_url": payload["base_url"],
|
||
},
|
||
"mcp_servers_yaml": payload["mcp_servers_yaml"],
|
||
"backup": result.get("backup"),
|
||
"restarted": bool(body.get("restart", True)),
|
||
}
|
||
|
||
|
||
def handle_hermes_config_get(headers: dict[str, str]) -> tuple[int, dict[str, Any]]:
|
||
ok, status, message = is_admin_request(headers)
|
||
if not ok:
|
||
return status, {"code": status, "msg": message}
|
||
try:
|
||
config = read_hermes_runtime_config()
|
||
except Exception as exc:
|
||
logging.error("failed to read Hermes config:\n%s", traceback.format_exc())
|
||
return 500, {"code": 500, "msg": str(exc)}
|
||
return 200, {"code": 0, "msg": "ok", "config": config}
|
||
|
||
|
||
def handle_hermes_config_post(headers: dict[str, str], body: dict[str, Any]) -> tuple[int, dict[str, Any]]:
|
||
ok, status, message = is_admin_request(headers)
|
||
if not ok:
|
||
return status, {"code": status, "msg": message}
|
||
try:
|
||
config = write_hermes_runtime_config(body)
|
||
except ValueError as exc:
|
||
return 400, {"code": 400, "msg": str(exc)}
|
||
except Exception as exc:
|
||
logging.error("failed to write Hermes config:\n%s", traceback.format_exc())
|
||
return 500, {"code": 500, "msg": str(exc)}
|
||
logging.info("updated Hermes runtime config model=%s", config["model"]["default"])
|
||
return 200, {"code": 0, "msg": "ok", "config": config}
|
||
|
||
|
||
def one_line(value: Any, limit: int, default: str = "") -> str:
|
||
text = str(value if value is not None else default).strip()
|
||
text = text.replace("\r", " ").replace("\n", " ").strip()
|
||
if len(text) > limit:
|
||
text = text[:limit].rstrip()
|
||
return text
|
||
|
||
|
||
def string_list(value: Any, limit: int = 200) -> list[str]:
|
||
if not isinstance(value, list):
|
||
return []
|
||
out: list[str] = []
|
||
for item in value[:limit]:
|
||
text = one_line(item, 180)
|
||
if text:
|
||
out.append(text)
|
||
return out
|
||
|
||
|
||
def now_ms() -> int:
|
||
return int(time.time() * 1000)
|
||
|
||
|
||
def runtime_model_profile(runtime: dict[str, Any]) -> dict[str, Any]:
|
||
model = runtime.get("model") if isinstance(runtime.get("model"), dict) else {}
|
||
model_id = one_line(model.get("default") or Config.hermes_model or "gemini-3-pro-preview", 180)
|
||
provider = one_line(model.get("provider") or "openrouter", 80)
|
||
base_url = one_line(model.get("base_url") or "", 220)
|
||
created_at = now_ms()
|
||
return {
|
||
"id": "runtime-default",
|
||
"name": "线上默认模型",
|
||
"provider": provider,
|
||
"model": model_id,
|
||
"baseUrl": base_url,
|
||
"apiKeyRef": "服务器环境变量",
|
||
"enabled": True,
|
||
"isDefault": True,
|
||
"createdAt": created_at,
|
||
"updatedAt": created_at,
|
||
}
|
||
|
||
|
||
def normalize_model_profile(raw: Any, fallback: dict[str, Any], index: int) -> dict[str, Any] | None:
|
||
if not isinstance(raw, dict):
|
||
return None
|
||
profile_id = one_line(raw.get("id"), 80)
|
||
if not profile_id or not UI_ID_RE.match(profile_id):
|
||
profile_id = f"model_{index + 1}"
|
||
name = one_line(raw.get("name"), 80, fallback.get("name") or "模型接入")
|
||
model = one_line(raw.get("model"), 180, fallback.get("model") or "")
|
||
provider = one_line(raw.get("provider"), 80, fallback.get("provider") or "openrouter")
|
||
base_url = one_line(raw.get("baseUrl") or raw.get("base_url"), 220, fallback.get("baseUrl") or "")
|
||
api_key_ref = one_line(raw.get("apiKeyRef") or raw.get("api_key_ref"), 120, fallback.get("apiKeyRef") or "")
|
||
if not model:
|
||
return None
|
||
created_at = raw.get("createdAt") if isinstance(raw.get("createdAt"), (int, float)) else now_ms()
|
||
updated_at = raw.get("updatedAt") if isinstance(raw.get("updatedAt"), (int, float)) else now_ms()
|
||
return {
|
||
"id": profile_id,
|
||
"name": name or model,
|
||
"provider": provider,
|
||
"model": model,
|
||
"baseUrl": base_url,
|
||
"apiKeyRef": api_key_ref,
|
||
"enabled": bool(raw.get("enabled", True)),
|
||
"isDefault": bool(raw.get("isDefault", False)),
|
||
"createdAt": int(created_at),
|
||
"updatedAt": int(updated_at),
|
||
}
|
||
|
||
|
||
def normalize_stages(value: Any) -> dict[str, list[str]]:
|
||
value = value if isinstance(value, dict) else {}
|
||
return {
|
||
"pre": string_list(value.get("pre")),
|
||
"exec": string_list(value.get("exec")),
|
||
"post": string_list(value.get("post")),
|
||
}
|
||
|
||
|
||
def normalize_agent(raw: Any, index: int) -> dict[str, Any] | None:
|
||
if not isinstance(raw, dict):
|
||
return None
|
||
agent_id = one_line(raw.get("id"), 80)
|
||
if not agent_id or not UI_ID_RE.match(agent_id):
|
||
agent_id = f"agent_{index + 1}"
|
||
name = one_line(raw.get("name"), 80)
|
||
system_prompt = str(raw.get("systemPrompt") or raw.get("system_prompt") or "").strip()
|
||
if not name or not system_prompt:
|
||
return None
|
||
created_at = raw.get("createdAt") if isinstance(raw.get("createdAt"), (int, float)) else now_ms()
|
||
updated_at = raw.get("updatedAt") if isinstance(raw.get("updatedAt"), (int, float)) else now_ms()
|
||
return {
|
||
"id": agent_id,
|
||
"emoji": one_line(raw.get("emoji"), 16, "🤖"),
|
||
"name": name,
|
||
"desc": one_line(raw.get("desc"), 180),
|
||
"model": one_line(raw.get("model"), 180),
|
||
"modelProfileId": one_line(raw.get("modelProfileId") or raw.get("model_profile_id"), 80),
|
||
"systemPrompt": system_prompt[:20000],
|
||
"skills": string_list(raw.get("skills")),
|
||
"stages": normalize_stages(raw.get("stages")),
|
||
"createdAt": int(created_at),
|
||
"updatedAt": int(updated_at),
|
||
}
|
||
|
||
|
||
def normalize_ui_config(raw: dict[str, Any], runtime: dict[str, Any] | None = None) -> dict[str, Any]:
|
||
runtime_profile = runtime_model_profile(runtime or {})
|
||
model_profiles_raw = raw.get("modelProfiles") if isinstance(raw.get("modelProfiles"), list) else []
|
||
profiles: list[dict[str, Any]] = []
|
||
seen_profile_ids: set[str] = set()
|
||
for idx, item in enumerate(model_profiles_raw[:80]):
|
||
profile = normalize_model_profile(item, runtime_profile, idx)
|
||
if not profile or profile["id"] in seen_profile_ids:
|
||
continue
|
||
seen_profile_ids.add(profile["id"])
|
||
profiles.append(profile)
|
||
if not profiles:
|
||
profiles.append(runtime_profile)
|
||
if not any(profile.get("isDefault") for profile in profiles):
|
||
profiles[0]["isDefault"] = True
|
||
default_seen = False
|
||
for profile in profiles:
|
||
if profile.get("isDefault") and not default_seen:
|
||
default_seen = True
|
||
else:
|
||
profile["isDefault"] = False
|
||
|
||
agents_raw = raw.get("agents") if isinstance(raw.get("agents"), list) else []
|
||
agents: list[dict[str, Any]] = []
|
||
seen_agent_ids: set[str] = set()
|
||
for idx, item in enumerate(agents_raw[:200]):
|
||
agent = normalize_agent(item, idx)
|
||
if not agent or agent["id"] in seen_agent_ids:
|
||
continue
|
||
seen_agent_ids.add(agent["id"])
|
||
agents.append(agent)
|
||
|
||
return {
|
||
"version": 1,
|
||
"modelProfiles": profiles,
|
||
"agents": agents,
|
||
"updatedAt": now_ms(),
|
||
"configPath": Config.hermes_ui_config_path,
|
||
"lxc": Config.hermes_agent_lxc,
|
||
}
|
||
|
||
|
||
def read_ui_config_file() -> dict[str, Any]:
|
||
script = r'''
|
||
import json
|
||
import pathlib
|
||
import sys
|
||
|
||
path = pathlib.Path(sys.argv[1])
|
||
if not path.exists():
|
||
print("{}")
|
||
else:
|
||
print(path.read_text(encoding="utf-8"))
|
||
'''
|
||
raw = run_lxc_command(["python3", "-c", script, Config.hermes_ui_config_path], timeout=20)
|
||
if not raw.strip():
|
||
return {}
|
||
data = json.loads(raw)
|
||
return data if isinstance(data, dict) else {}
|
||
|
||
|
||
def write_ui_config_file(config: dict[str, Any]) -> None:
|
||
script = r'''
|
||
import json
|
||
import os
|
||
import pathlib
|
||
import sys
|
||
|
||
path = pathlib.Path(sys.argv[1])
|
||
payload = json.loads(sys.stdin.read())
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
tmp = path.with_name(path.name + f".tmp-{os.getpid()}")
|
||
tmp.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
|
||
tmp.chmod(0o600)
|
||
tmp.replace(path)
|
||
'''
|
||
run_lxc_command(
|
||
["python3", "-c", script, Config.hermes_ui_config_path],
|
||
input_text=json.dumps(config, ensure_ascii=False),
|
||
timeout=20,
|
||
)
|
||
|
||
|
||
def handle_ui_config_get(headers: dict[str, str]) -> tuple[int, dict[str, Any]]:
|
||
ok, status, message = is_admin_request(headers)
|
||
if not ok:
|
||
return status, {"code": status, "msg": message}
|
||
try:
|
||
runtime = read_hermes_runtime_config()
|
||
config = normalize_ui_config(read_ui_config_file(), runtime)
|
||
except Exception as exc:
|
||
logging.error("failed to read Hermes UI config:\n%s", traceback.format_exc())
|
||
return 500, {"code": 500, "msg": str(exc)}
|
||
return 200, {"code": 0, "msg": "ok", "config": config}
|
||
|
||
|
||
def handle_ui_config_post(headers: dict[str, str], body: dict[str, Any]) -> tuple[int, dict[str, Any]]:
|
||
ok, status, message = is_admin_request(headers)
|
||
if not ok:
|
||
return status, {"code": status, "msg": message}
|
||
try:
|
||
runtime = read_hermes_runtime_config()
|
||
raw_config = body.get("config") if isinstance(body.get("config"), dict) else body
|
||
config = normalize_ui_config(raw_config, runtime)
|
||
write_ui_config_file(config)
|
||
except ValueError as exc:
|
||
return 400, {"code": 400, "msg": str(exc)}
|
||
except Exception as exc:
|
||
logging.error("failed to write Hermes UI config:\n%s", traceback.format_exc())
|
||
return 500, {"code": 500, "msg": str(exc)}
|
||
logging.info(
|
||
"updated Hermes UI config model_profiles=%s agents=%s",
|
||
len(config.get("modelProfiles", [])),
|
||
len(config.get("agents", [])),
|
||
)
|
||
return 200, {"code": 0, "msg": "ok", "config": config}
|
||
|
||
|
||
def is_admin_request(headers: dict[str, str]) -> tuple[bool, int, str]:
|
||
cookie = headers.get("cookie", "")
|
||
if "hermes_auth=ok" not in cookie:
|
||
return False, 401, "login required"
|
||
origin = headers.get("origin", "")
|
||
referer = headers.get("referer", "")
|
||
allowed = "https://hermes.kang-kang.com"
|
||
if origin and origin != allowed:
|
||
return False, 403, "invalid origin"
|
||
if not origin and referer and not referer.startswith(allowed + "/"):
|
||
return False, 403, "invalid referer"
|
||
return True, 200, "ok"
|
||
|
||
|
||
def validate_feishu_credentials(app_id: str, app_secret: str) -> None:
|
||
data = http_json(
|
||
"POST",
|
||
f"{FEISHU_API_BASE}/auth/v3/tenant_access_token/internal",
|
||
{"app_id": app_id, "app_secret": app_secret},
|
||
timeout=15,
|
||
)
|
||
if data.get("code") != 0:
|
||
raise ValueError(f"Feishu credential validation failed: {data.get('msg') or data}")
|
||
|
||
|
||
def find_app_suffix(env: dict[str, str], app_id: str) -> str:
|
||
for suffix in app_suffixes():
|
||
if env.get(f"FEISHU_APP_ID{suffix}") == app_id:
|
||
return suffix
|
||
for suffix in app_suffixes():
|
||
if not env.get(f"FEISHU_APP_ID{suffix}") and not env.get(f"FEISHU_APP_SECRET{suffix}"):
|
||
return suffix
|
||
raise RuntimeError("no free Feishu app slot; supported slots are FEISHU_APP_ID through FEISHU_APP_ID_20")
|
||
|
||
|
||
def upsert_feishu_app(body: dict[str, Any]) -> dict[str, Any]:
|
||
app_id = str(body.get("app_id", "")).strip()
|
||
app_secret = str(body.get("app_secret", "")).strip()
|
||
verification_token = str(body.get("verification_token", "")).strip()
|
||
if not APP_ID_RE.match(app_id):
|
||
raise ValueError("App ID must look like cli_xxx")
|
||
if len(app_secret) < 16:
|
||
raise ValueError("App Secret is too short")
|
||
if len(verification_token) < 16:
|
||
raise ValueError("Verification Token is too short")
|
||
|
||
validate_feishu_credentials(app_id, app_secret)
|
||
|
||
env = parse_env_file(Config.env_file)
|
||
suffix = find_app_suffix(env, app_id)
|
||
updates = {
|
||
f"FEISHU_APP_ID{suffix}": app_id,
|
||
f"FEISHU_APP_SECRET{suffix}": app_secret,
|
||
f"FEISHU_VERIFICATION_TOKEN{suffix}": verification_token,
|
||
}
|
||
if not env.get(f"FEISHU_DEFAULT_RECEIVE_ID_TYPE{suffix}"):
|
||
updates[f"FEISHU_DEFAULT_RECEIVE_ID_TYPE{suffix}"] = "chat_id"
|
||
write_env_updates(Config.env_file, updates)
|
||
sync_tokens_file()
|
||
reload_config_from_env_file()
|
||
token_cache.drop(app_id)
|
||
return {
|
||
"app_id": app_id,
|
||
"callback_url": f"https://hermes.kang-kang.com/feishu/events/{app_id}",
|
||
"slot": suffix or "_1",
|
||
}
|
||
|
||
|
||
def delete_feishu_app(app_id: str) -> dict[str, Any]:
|
||
app_id = app_id.strip()
|
||
if not APP_ID_RE.match(app_id):
|
||
raise ValueError("App ID must look like cli_xxx")
|
||
|
||
env = parse_env_file(Config.env_file)
|
||
suffix = ""
|
||
for candidate in app_suffixes():
|
||
if env.get(f"FEISHU_APP_ID{candidate}") == app_id:
|
||
suffix = candidate
|
||
break
|
||
else:
|
||
raise KeyError(app_id)
|
||
|
||
remove_keys = {
|
||
f"FEISHU_APP_ID{suffix}",
|
||
f"FEISHU_APP_SECRET{suffix}",
|
||
f"FEISHU_VERIFICATION_TOKEN{suffix}",
|
||
f"FEISHU_DEFAULT_RECEIVE_ID{suffix}",
|
||
f"FEISHU_DEFAULT_RECEIVE_ID_TYPE{suffix}",
|
||
f"FEISHU_ALLOWED_CHAT_IDS{suffix}",
|
||
}
|
||
|
||
updates: dict[str, str] = {}
|
||
current_default_app_id = env.get("FEISHU_DEFAULT_APP_ID") or env.get("FEISHU_APP_ID", "")
|
||
if current_default_app_id == app_id:
|
||
remaining_app_ids = [
|
||
env.get(f"FEISHU_APP_ID{candidate}", "")
|
||
for candidate in app_suffixes()
|
||
if candidate != suffix and env.get(f"FEISHU_APP_ID{candidate}", "")
|
||
]
|
||
remove_keys.add("FEISHU_DEFAULT_APP_ID")
|
||
if remaining_app_ids:
|
||
updates["FEISHU_DEFAULT_APP_ID"] = remaining_app_ids[0]
|
||
|
||
write_env_removals(Config.env_file, remove_keys, updates)
|
||
sync_tokens_file()
|
||
reload_config_from_env_file()
|
||
token_cache.drop(app_id)
|
||
return {"app_id": app_id, "slot": suffix or "_1"}
|
||
|
||
|
||
def handle_apps_admin(headers: dict[str, str], body: dict[str, Any]) -> tuple[int, dict[str, Any]]:
|
||
ok, status, message = is_admin_request(headers)
|
||
if not ok:
|
||
return status, {"code": status, "msg": message}
|
||
try:
|
||
app = upsert_feishu_app(body)
|
||
except ValueError as exc:
|
||
return 400, {"code": 400, "msg": str(exc)}
|
||
logging.info("upserted Feishu app from Hermes settings app_id=%s", app["app_id"])
|
||
return 200, {"code": 0, "msg": "ok", "app": app}
|
||
|
||
|
||
def handle_apps_delete(path: str, headers: dict[str, str]) -> tuple[int, dict[str, Any]]:
|
||
ok, status, message = is_admin_request(headers)
|
||
if not ok:
|
||
return status, {"code": status, "msg": message}
|
||
|
||
route = urllib.parse.urlparse(path).path
|
||
app_id = urllib.parse.unquote(route[len("/feishu/apps/") :].strip("/"))
|
||
try:
|
||
app = delete_feishu_app(app_id)
|
||
except ValueError as exc:
|
||
return 400, {"code": 400, "msg": str(exc)}
|
||
except KeyError:
|
||
return 404, {"code": 404, "msg": f"unknown Feishu app_id: {app_id}"}
|
||
logging.info("deleted Feishu app from Hermes settings app_id=%s", app["app_id"])
|
||
return 200, {"code": 0, "msg": "ok", "app": app}
|
||
|
||
|
||
def handle_notify(path: str, headers: dict[str, str], body: dict[str, Any]) -> tuple[int, dict[str, Any]]:
|
||
expected = Config.notify_token
|
||
if not expected:
|
||
return 503, {"code": 503, "msg": "FEISHU_NOTIFY_TOKEN is not configured"}
|
||
|
||
auth = headers.get("authorization", "")
|
||
token = headers.get("x-hermes-feishu-token", "")
|
||
if auth.lower().startswith("bearer "):
|
||
token = auth[7:].strip()
|
||
if token != expected:
|
||
return 401, {"code": 401, "msg": "unauthorized"}
|
||
|
||
app_id = str(body.get("app_id") or resolve_app_id(path, body)).strip()
|
||
app = Config.feishu_apps.get(app_id)
|
||
if not app:
|
||
return 404, {"code": 404, "msg": f"unknown Feishu app_id: {app_id}"}
|
||
|
||
text = str(body.get("text", "")).strip()
|
||
if not text:
|
||
return 400, {"code": 400, "msg": "text is required"}
|
||
receive_id = str(body.get("receive_id") or app.get("default_receive_id", "")).strip()
|
||
receive_id_type = str(
|
||
body.get("receive_id_type") or app.get("default_receive_id_type", "chat_id")
|
||
).strip()
|
||
if not receive_id:
|
||
return 400, {"code": 400, "msg": "receive_id is required"}
|
||
|
||
result = send_feishu_text(app_id, receive_id, text, receive_id_type=receive_id_type)
|
||
return 200, {"code": 0, "msg": "ok", "app_id": app_id, "feishu": result}
|
||
|
||
|
||
class Handler(BaseHTTPRequestHandler):
|
||
server_version = "HermesFeishuBridge/1.0"
|
||
|
||
def do_GET(self) -> None:
|
||
headers = {key.lower(): value for key, value in self.headers.items()}
|
||
if self.path == "/health":
|
||
self.send_json(200, {"ok": True, "service": "feishu-bridge"})
|
||
return
|
||
if self.path == "/feishu/apps":
|
||
status, payload = handle_apps()
|
||
self.send_json(status, payload)
|
||
return
|
||
if self.path == "/feishu/hermes-config":
|
||
status, payload = handle_hermes_config_get(headers)
|
||
self.send_json(status, payload)
|
||
return
|
||
if self.path == "/feishu/ui-config":
|
||
status, payload = handle_ui_config_get(headers)
|
||
self.send_json(status, payload)
|
||
return
|
||
self.send_json(404, {"code": 404, "msg": "not found"})
|
||
|
||
def do_POST(self) -> None:
|
||
try:
|
||
body = self.read_json()
|
||
headers = {key.lower(): value for key, value in self.headers.items()}
|
||
if self.path == "/feishu/events" or self.path.startswith("/feishu/events/"):
|
||
status, payload = handle_feishu_event(self.path, body)
|
||
elif self.path == "/feishu/apps":
|
||
status, payload = handle_apps_admin(headers, body)
|
||
elif self.path == "/feishu/hermes-config":
|
||
status, payload = handle_hermes_config_post(headers, body)
|
||
elif self.path == "/feishu/ui-config":
|
||
status, payload = handle_ui_config_post(headers, body)
|
||
elif self.path == "/feishu/notify" or self.path.startswith("/feishu/notify/"):
|
||
status, payload = handle_notify(self.path, headers, body)
|
||
else:
|
||
status, payload = 404, {"code": 404, "msg": "not found"}
|
||
except Exception as exc:
|
||
logging.error("request failed:\n%s", traceback.format_exc())
|
||
status, payload = 500, {"code": 500, "msg": str(exc)}
|
||
self.send_json(status, payload)
|
||
|
||
def do_DELETE(self) -> None:
|
||
try:
|
||
headers = {key.lower(): value for key, value in self.headers.items()}
|
||
if self.path.startswith("/feishu/apps/"):
|
||
status, payload = handle_apps_delete(self.path, headers)
|
||
else:
|
||
status, payload = 404, {"code": 404, "msg": "not found"}
|
||
except Exception as exc:
|
||
logging.error("request failed:\n%s", traceback.format_exc())
|
||
status, payload = 500, {"code": 500, "msg": str(exc)}
|
||
self.send_json(status, payload)
|
||
|
||
def read_json(self) -> dict[str, Any]:
|
||
length = int(self.headers.get("Content-Length", "0"))
|
||
raw = self.rfile.read(length).decode("utf-8") if length else "{}"
|
||
if not raw:
|
||
return {}
|
||
data = json.loads(raw)
|
||
if not isinstance(data, dict):
|
||
raise ValueError("JSON object is required")
|
||
return data
|
||
|
||
def send_json(self, status: int, payload: dict[str, Any]) -> None:
|
||
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
||
self.send_response(status)
|
||
self.send_header("Content-Type", "application/json; charset=utf-8")
|
||
self.send_header("Content-Length", str(len(data)))
|
||
self.end_headers()
|
||
self.wfile.write(data)
|
||
|
||
def log_message(self, fmt: str, *args: Any) -> None:
|
||
logging.info("%s - %s", self.address_string(), fmt % args)
|
||
|
||
|
||
def main() -> None:
|
||
logging.basicConfig(
|
||
level=os.environ.get("LOG_LEVEL", "INFO"),
|
||
format="%(asctime)s %(levelname)s %(message)s",
|
||
)
|
||
missing = []
|
||
if not Config.feishu_apps:
|
||
missing.append("FEISHU_APP_ID and FEISHU_APP_SECRET")
|
||
for app_id, app in Config.feishu_apps.items():
|
||
if not app.get("app_secret"):
|
||
missing.append(f"FEISHU_APP_SECRET for {app_id}")
|
||
if missing:
|
||
logging.warning("missing required env: %s", ", ".join(missing))
|
||
logging.info("configured Feishu apps: %s", ", ".join(Config.feishu_apps) or "(none)")
|
||
httpd = ThreadingHTTPServer(("0.0.0.0", Config.port), Handler)
|
||
logging.info("Feishu bridge listening on :%s", Config.port)
|
||
httpd.serve_forever()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|