#!/usr/bin/env python3 """Feishu <-> Hermes bridge. This service intentionally uses only the Python standard library so it can run next to the existing no-build Hermes UI deployment without adding package management to the project. """ from __future__ import annotations import json import logging import os import hashlib import re import shlex import subprocess import threading import time import traceback import urllib.error import urllib.parse import urllib.request from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from typing import Any FEISHU_API_BASE = "https://open.feishu.cn/open-apis" APP_ID_RE = re.compile(r"^cli_[A-Za-z0-9]+$") UI_ID_RE = re.compile(r"^[A-Za-z0-9_-]{1,80}$") def _env(name: str, default: str = "") -> str: return os.environ.get(name, default).strip() def _bool_env(name: str, default: str = "false") -> bool: return _env(name, default).lower() in {"1", "true", "yes"} def _csv_set(value: str) -> set[str]: return {item.strip() for item in value.split(",") if item.strip()} def _csv_list(value: str) -> list[str]: return [item.strip() for item in value.split(",") if item.strip()] def app_suffixes() -> list[str]: return ["", *[f"_{idx}" for idx in range(2, 21)]] def _load_feishu_apps() -> dict[str, dict[str, Any]]: apps: dict[str, dict[str, Any]] = {} for suffix in app_suffixes(): app_id = _env(f"FEISHU_APP_ID{suffix}") app_secret = _env(f"FEISHU_APP_SECRET{suffix}") if not app_id and not app_secret: continue apps[app_id] = { "app_id": app_id, "app_secret": app_secret, "verification_tokens": _csv_list(_env(f"FEISHU_VERIFICATION_TOKEN{suffix}")), "default_receive_id": _env(f"FEISHU_DEFAULT_RECEIVE_ID{suffix}"), "default_receive_id_type": _env( f"FEISHU_DEFAULT_RECEIVE_ID_TYPE{suffix}", "chat_id", ), "allowed_chat_ids": _csv_set(_env(f"FEISHU_ALLOWED_CHAT_IDS{suffix}")), } return apps class Config: env_file = _env("FEISHU_BRIDGE_ENV_FILE", "/etc/hermes-feishu-bridge.env") tokens_file = _env("FEISHU_BRIDGE_TOKENS_FILE", "/root/hermes-feishu-bridge.tokens") feishu_app_id = _env("FEISHU_APP_ID") feishu_app_secret = _env("FEISHU_APP_SECRET") feishu_apps = _load_feishu_apps() default_feishu_app_id = _env("FEISHU_DEFAULT_APP_ID", feishu_app_id) notify_token = _env("FEISHU_NOTIFY_TOKEN") reply_in_thread = _bool_env("FEISHU_REPLY_IN_THREAD") hermes_api_base = _env("HERMES_API_BASE", "http://127.0.0.1:8642/v1").rstrip("/") hermes_api_key = _env("HERMES_API_KEY") hermes_model = _env("HERMES_MODEL", "gemini-3-pro-preview") hermes_agent_lxc = _env("HERMES_AGENT_LXC", "hermes-personal") hermes_agent_dir = _env("HERMES_AGENT_DIR", "/opt/hermes-agent") hermes_ui_config_path = _env( "HERMES_UI_CONFIG_PATH", os.path.join(hermes_agent_dir, "hermes-ui-config.json"), ) incus_bin = _env("INCUS_BIN", "/usr/bin/incus") hermes_system_prompt = _env( "HERMES_SYSTEM_PROMPT", "你是爱马仕 Hermes。你通过飞书与用户对话,回答要直接、简洁、可执行。", ) request_timeout = float(_env("FEISHU_BRIDGE_TIMEOUT", "60")) port = int(_env("PORT", _env("FEISHU_BRIDGE_PORT", "8787"))) class TokenCache: def __init__(self) -> None: self._lock = threading.Lock() self._tokens: dict[str, tuple[str, float]] = {} def get(self, app_id: str) -> str: with self._lock: token, expires_at = self._tokens.get(app_id, ("", 0.0)) if token and time.time() < expires_at - 300: return token app = Config.feishu_apps.get(app_id) if not app: raise RuntimeError(f"unknown Feishu app_id: {app_id}") if not app.get("app_id") or not app.get("app_secret"): raise RuntimeError(f"FEISHU_APP_ID and FEISHU_APP_SECRET are required for {app_id}") payload = { "app_id": app["app_id"], "app_secret": app["app_secret"], } data = http_json( "POST", f"{FEISHU_API_BASE}/auth/v3/tenant_access_token/internal", payload, ) if data.get("code") != 0: raise RuntimeError(f"Feishu token error: {data}") token = str(data["tenant_access_token"]) expires_at = time.time() + int(data.get("expire", 7200)) self._tokens[app_id] = (token, expires_at) return token def drop(self, app_id: str) -> None: with self._lock: self._tokens.pop(app_id, None) token_cache = TokenCache() seen_events: dict[str, float] = {} seen_lock = threading.Lock() def parse_env_file(path: str) -> dict[str, str]: values: dict[str, str] = {} if not os.path.exists(path): return values with open(path, "r", encoding="utf-8") as fh: for raw in fh: line = raw.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) values[key.strip()] = value.strip() return values def write_env_updates(path: str, updates: dict[str, str]) -> None: lines: list[str] = [] seen: set[str] = set() if os.path.exists(path): with open(path, "r", encoding="utf-8") as fh: for raw in fh.read().splitlines(): if "=" in raw and not raw.lstrip().startswith("#"): key = raw.split("=", 1)[0] if key in updates: lines.append(f"{key}={updates[key]}") seen.add(key) continue lines.append(raw) for key, value in updates.items(): if key not in seen: lines.append(f"{key}={value}") tmp = f"{path}.tmp-{os.getpid()}" with open(tmp, "w", encoding="utf-8") as fh: fh.write("\n".join(lines) + "\n") os.chmod(tmp, 0o600) os.replace(tmp, path) def write_env_removals(path: str, remove_keys: set[str], updates: dict[str, str] | None = None) -> None: updates = updates or {} lines: list[str] = [] seen: set[str] = set() if os.path.exists(path): with open(path, "r", encoding="utf-8") as fh: for raw in fh.read().splitlines(): if "=" in raw and not raw.lstrip().startswith("#"): key = raw.split("=", 1)[0].strip() if key in remove_keys: continue if key in updates: lines.append(f"{key}={updates[key]}") seen.add(key) continue lines.append(raw) for key, value in updates.items(): if key not in seen: lines.append(f"{key}={value}") tmp = f"{path}.tmp-{os.getpid()}" with open(tmp, "w", encoding="utf-8") as fh: fh.write("\n".join(lines) + "\n") os.chmod(tmp, 0o600) os.replace(tmp, path) def reload_config_from_env_file() -> None: values = parse_env_file(Config.env_file) for suffix in app_suffixes(): for prefix in ( "FEISHU_APP_ID", "FEISHU_APP_SECRET", "FEISHU_VERIFICATION_TOKEN", "FEISHU_DEFAULT_RECEIVE_ID", "FEISHU_DEFAULT_RECEIVE_ID_TYPE", "FEISHU_ALLOWED_CHAT_IDS", ): key = f"{prefix}{suffix}" if key not in values: os.environ.pop(key, None) if "FEISHU_DEFAULT_APP_ID" not in values: os.environ.pop("FEISHU_DEFAULT_APP_ID", None) for key, value in values.items(): os.environ[key] = value Config.feishu_app_id = _env("FEISHU_APP_ID") Config.feishu_app_secret = _env("FEISHU_APP_SECRET") Config.feishu_apps = _load_feishu_apps() Config.default_feishu_app_id = _env("FEISHU_DEFAULT_APP_ID", Config.feishu_app_id) Config.hermes_api_base = _env("HERMES_API_BASE", Config.hermes_api_base).rstrip("/") Config.hermes_api_key = _env("HERMES_API_KEY") Config.hermes_model = _env("HERMES_MODEL", Config.hermes_model) def sync_tokens_file() -> None: env = parse_env_file(Config.env_file) lines: list[str] = [] notify_token = env.get("FEISHU_NOTIFY_TOKEN") for suffix in app_suffixes(): app_id = env.get(f"FEISHU_APP_ID{suffix}", "") if not app_id: continue label = suffix or "" lines.append(f"FEISHU_CALLBACK_URL{label}=https://hermes.kang-kang.com/feishu/events/{app_id}") lines.append(f"FEISHU_APP_ID{label}={app_id}") token = env.get(f"FEISHU_VERIFICATION_TOKEN{suffix}", "") if token: lines.append(f"FEISHU_VERIFICATION_TOKEN{label}={token}") if notify_token: lines.append(f"FEISHU_NOTIFY_TOKEN={notify_token}") tmp = f"{Config.tokens_file}.tmp-{os.getpid()}" with open(tmp, "w", encoding="utf-8") as fh: fh.write("\n".join(lines) + "\n") os.chmod(tmp, 0o600) os.replace(tmp, Config.tokens_file) def http_json( method: str, url: str, payload: dict[str, Any], headers: dict[str, str] | None = None, timeout: float | None = None, ) -> dict[str, Any]: body = json.dumps(payload, ensure_ascii=False).encode("utf-8") req = urllib.request.Request( url, data=body, method=method, headers={ "Content-Type": "application/json; charset=utf-8", **(headers or {}), }, ) try: with urllib.request.urlopen(req, timeout=timeout or Config.request_timeout) as resp: raw = resp.read().decode("utf-8") except urllib.error.HTTPError as exc: raw = exc.read().decode("utf-8", errors="replace") raise RuntimeError(f"HTTP {exc.code} {url}: {raw}") from exc if not raw: return {} return json.loads(raw) def openai_completion_url(base_url: str) -> str: base = base_url.rstrip("/") if base.endswith("/chat/completions"): return base return f"{base}/chat/completions" def read_env_value(key: str) -> str: key = key.strip() if not key: return "" return os.environ.get(key, "") or parse_env_file(Config.env_file).get(key, "") def resolve_profile_api_key(profile: dict[str, Any]) -> str: ref = str(profile.get("apiKeyRef") or "").strip() if not ref or ref in {"服务器环境变量", "server env", "server-env"}: return Config.hermes_api_key if ref.startswith("env:"): ref = ref[4:].strip() if re.match(r"^[A-Z][A-Z0-9_]{1,120}$", ref): return read_env_value(ref) return "" def profile_for_chat(profile_id: str) -> dict[str, Any]: runtime = read_hermes_runtime_config() config = normalize_ui_config(read_ui_config_file(), runtime) profiles = config.get("modelProfiles") if isinstance(config.get("modelProfiles"), list) else [] if profile_id: for profile in profiles: if profile.get("id") == profile_id: return profile raise ValueError(f"unknown modelProfileId: {profile_id}") for profile in profiles: if profile.get("isDefault"): return profile if profiles: return profiles[0] return runtime_model_profile(runtime) def build_profile_chat_request(body: dict[str, Any]) -> tuple[urllib.request.Request | None, bool, int, dict[str, Any] | None]: profile_id = str(body.get("modelProfileId") or body.get("model_profile_id") or "").strip() profile = profile_for_chat(profile_id) if profile.get("enabled") is False: return None, False, 400, {"code": 400, "msg": "model profile is disabled"} base_url = str(profile.get("baseUrl") or "").strip() or Config.hermes_api_base if not base_url: return None, False, 400, {"code": 400, "msg": "model profile baseUrl is required"} model = str(profile.get("model") or body.get("model") or Config.hermes_model).strip() if not model: return None, False, 400, {"code": 400, "msg": "model is required"} if not re.match(r"^https?://", base_url): return None, False, 400, {"code": 400, "msg": "model profile baseUrl must start with http:// or https://"} payload = dict(body) payload.pop("modelProfileId", None) payload.pop("model_profile_id", None) payload["model"] = model stream = bool(payload.get("stream")) headers = {"Accept": "text/event-stream" if stream else "application/json"} api_key = resolve_profile_api_key(profile) api_key_ref = str(profile.get("apiKeyRef") or "").strip() if api_key: headers["Authorization"] = f"Bearer {api_key}" elif api_key_ref and api_key_ref not in {"服务器环境变量", "server env", "server-env"}: return None, stream, 400, {"code": 400, "msg": f"API key env not found: {api_key_ref}"} url = openai_completion_url(base_url) raw_body = json.dumps(payload, ensure_ascii=False).encode("utf-8") req = urllib.request.Request( url, data=raw_body, method="POST", headers={ "Content-Type": "application/json; charset=utf-8", **headers, }, ) return req, stream, 200, None def proxy_chat_completion(body: dict[str, Any]) -> tuple[int, dict[str, Any] | bytes, dict[str, str]]: req, stream, status, error = build_profile_chat_request(body) if error is not None or req is None: return status, error or {"code": status, "msg": "invalid chat request"}, {} try: with urllib.request.urlopen(req, timeout=Config.request_timeout) as resp: content_type = resp.headers.get("Content-Type") or ( "text/event-stream" if stream else "application/json" ) if stream: return resp.status, resp.read(), {"Content-Type": content_type} raw = resp.read() except urllib.error.HTTPError as exc: raw = exc.read() return exc.code, raw or json.dumps({"code": exc.code, "msg": exc.reason}).encode("utf-8"), { "Content-Type": exc.headers.get("Content-Type") or "application/json" } return 200, raw, {"Content-Type": "application/json"} def json_text(value: Any) -> str: if isinstance(value, str): try: value = json.loads(value) except json.JSONDecodeError: return value if isinstance(value, dict): for key in ("text", "title", "content"): text = value.get(key) if isinstance(text, str) and text.strip(): return text.strip() return "" def resolve_app_id(path: str, body: dict[str, Any] | None = None) -> str: route_app_id = "" for prefix in ("/feishu/events/", "/feishu/notify/"): if path.startswith(prefix): route_app_id = urllib.parse.unquote(path[len(prefix) :].strip("/")) break if route_app_id: return route_app_id body = body or {} body_app_id = str(body.get("app_id") or body.get("header", {}).get("app_id") or "").strip() if body_app_id in Config.feishu_apps: return body_app_id return Config.default_feishu_app_id def callback_token(body: dict[str, Any]) -> str: return str(body.get("token") or body.get("header", {}).get("token") or "") def token_digest(value: str) -> str: if not value: return "(empty)" return f"len={len(value)} sha256={hashlib.sha256(value.encode('utf-8')).hexdigest()[:12]}" def verify_callback_token(body: dict[str, Any], app_id: str) -> bool: app = Config.feishu_apps.get(app_id, {}) expected_tokens = app.get("verification_tokens", []) if not expected_tokens: return True token = callback_token(body) ok = token in expected_tokens if not ok: logging.warning( "invalid Feishu verification token app_id=%s got=%s expected_any=%s body_keys=%s header_keys=%s event_keys=%s", app_id, token_digest(token), [token_digest(item) for item in expected_tokens], sorted(body.keys()), sorted(body.get("header", {}).keys()) if isinstance(body.get("header"), dict) else [], sorted(body.get("event", {}).keys()) if isinstance(body.get("event"), dict) else [], ) return ok def remember_event(event_id: str) -> bool: if not event_id: return True now = time.time() with seen_lock: stale = [key for key, ts in seen_events.items() if now - ts > 3600] for key in stale: seen_events.pop(key, None) if event_id in seen_events: return False seen_events[event_id] = now return True def handle_feishu_event(path: str, body: dict[str, Any]) -> tuple[int, dict[str, Any]]: app_id = resolve_app_id(path, body) if app_id not in Config.feishu_apps: return 404, {"code": 404, "msg": f"unknown Feishu app_id: {app_id}"} if "encrypt" in body: return 400, { "code": 400, "msg": "encrypted callbacks are not enabled; disable Feishu event encryption or add decrypt support", } if body.get("type") == "url_verification": if not verify_callback_token(body, app_id): return 403, {"code": 403, "msg": "invalid verification token"} return 200, {"challenge": body.get("challenge", "")} if not verify_callback_token(body, app_id): return 403, {"code": 403, "msg": "invalid verification token"} event_type = body.get("header", {}).get("event_type") or body.get("event", {}).get("type") event_id = body.get("header", {}).get("event_id", "") if event_type != "im.message.receive_v1": return 200, {"code": 0, "msg": "ignored"} if not remember_event(f"{app_id}:{event_id}"): return 200, {"code": 0, "msg": "duplicate"} threading.Thread(target=process_message_event, args=(app_id, body), daemon=True).start() return 200, {"code": 0, "msg": "ok"} def process_message_event(app_id: str, body: dict[str, Any]) -> None: try: app = Config.feishu_apps[app_id] event = body.get("event", {}) sender = event.get("sender", {}) if sender.get("sender_type") == "app": return message = event.get("message", {}) message_type = message.get("message_type") chat_id = message.get("chat_id", "") message_id = message.get("message_id", "") allowed_chat_ids = app.get("allowed_chat_ids", set()) if allowed_chat_ids and chat_id not in allowed_chat_ids: logging.info("ignored message from disallowed app_id=%s chat_id=%s", app_id, chat_id) return if message_type != "text": send_feishu_text(app_id, chat_id, "我现在只支持处理文本消息。", message_id=message_id) return text = json_text(message.get("content")) if not text: send_feishu_text(app_id, chat_id, "我没有读到文本内容。", message_id=message_id) return answer = ask_hermes(text, event) send_feishu_text(app_id, chat_id, answer, message_id=message_id) except Exception: logging.error("failed to process Feishu event:\n%s", traceback.format_exc()) def ask_hermes(text: str, event: dict[str, Any]) -> str: payload = { "model": Config.hermes_model, "stream": False, "messages": [ {"role": "system", "content": Config.hermes_system_prompt}, { "role": "user", "content": ( "以下消息来自飞书。\n" f"chat_id: {event.get('message', {}).get('chat_id', '')}\n\n" f"{text}" ), }, ], } headers = {} if Config.hermes_api_key: headers["Authorization"] = f"Bearer {Config.hermes_api_key}" data = http_json( "POST", f"{Config.hermes_api_base}/chat/completions", payload, headers=headers, ) try: content = data["choices"][0]["message"]["content"] except (KeyError, IndexError, TypeError) as exc: raise RuntimeError(f"Unexpected Hermes response: {data}") from exc return str(content).strip() or "Hermes 没有返回内容。" def send_feishu_text( app_id: str, receive_id: str, text: str, receive_id_type: str = "chat_id", message_id: str = "", ) -> dict[str, Any]: if not receive_id: raise RuntimeError("receive_id is required") token = token_cache.get(app_id) chunks = split_text(text) result: dict[str, Any] = {} for chunk in chunks: content = json.dumps({"text": chunk}, ensure_ascii=False) if message_id: url = f"{FEISHU_API_BASE}/im/v1/messages/{message_id}/reply" payload = {"msg_type": "text", "content": content} else: url = f"{FEISHU_API_BASE}/im/v1/messages?receive_id_type={receive_id_type}" payload = {"receive_id": receive_id, "msg_type": "text", "content": content} result = http_json( "POST", url, payload, headers={"Authorization": f"Bearer {token}"}, ) if result.get("code") not in (None, 0): raise RuntimeError(f"Feishu send error: {result}") return result def split_text(text: str, limit: int = 3800) -> list[str]: text = text.strip() if not text: return [""] chunks: list[str] = [] rest = text while len(rest) > limit: cut = rest.rfind("\n", 0, limit) if cut < limit // 2: cut = limit chunks.append(rest[:cut].strip()) rest = rest[cut:].strip() chunks.append(rest) return chunks def handle_apps() -> tuple[int, dict[str, Any]]: apps = [] for app_id, app in Config.feishu_apps.items(): apps.append({ "app_id": app_id, "callback_url": f"https://hermes.kang-kang.com/feishu/events/{app_id}", "default_receive_id_type": app.get("default_receive_id_type", "chat_id"), "has_default_receive_id": bool(app.get("default_receive_id")), "allowed_chat_ids_count": len(app.get("allowed_chat_ids", set())), "verification_tokens_count": len(app.get("verification_tokens", [])), }) return 200, { "code": 0, "service": "hermes-feishu-bridge", "default_app_id": Config.default_feishu_app_id, "apps": apps, } def run_lxc_command(args: list[str], input_text: str = "", timeout: float = 20) -> str: cmd = [Config.incus_bin, "exec", Config.hermes_agent_lxc, "--", *args] proc = subprocess.run( cmd, input=input_text, text=True, capture_output=True, timeout=timeout, check=False, ) if proc.returncode != 0: detail = (proc.stderr or proc.stdout or "").strip() raise RuntimeError(detail or f"command failed with exit code {proc.returncode}") return proc.stdout def read_hermes_runtime_config() -> dict[str, Any]: script = r''' import json import pathlib import re import sys path = pathlib.Path(sys.argv[1]) text = path.read_text(encoding="utf-8") lines = text.splitlines() def clean_value(value): value = value.strip() if " #" in value: value = value.split(" #", 1)[0].rstrip() if len(value) >= 2 and value[0] == value[-1] and value[0] in ("'", '"'): return value[1:-1] return value def read_mapping_block(key): result = {} in_block = False for line in lines: if re.match(rf"^{re.escape(key)}\s*:\s*$", line): in_block = True continue if in_block: if line and not line[0].isspace(): break match = re.match(r"\s+([A-Za-z0-9_-]+)\s*:\s*(.*?)\s*$", line) if match: result[match.group(1)] = clean_value(match.group(2)) return result def read_raw_block(key): start = None for idx, line in enumerate(lines): if re.match(rf"^{re.escape(key)}\s*:\s*(?:#.*)?$", line): start = idx break if start is None: return "" end = len(lines) for idx in range(start + 1, len(lines)): line = lines[idx] if line and not line[0].isspace() and not line.lstrip().startswith("#"): end = idx break return "\n".join(lines[start:end]).strip() payload = { "model": read_mapping_block("model"), "mcp_servers_yaml": read_raw_block("mcp_servers"), "config_path": str(path), } print(json.dumps(payload, ensure_ascii=False)) ''' path = f"{Config.hermes_agent_dir}/config.yaml" raw = run_lxc_command(["python3", "-c", script, path], timeout=20) data = json.loads(raw) model = data.get("model") if isinstance(data.get("model"), dict) else {} return { "model": { "default": str(model.get("default") or model.get("model") or ""), "provider": str(model.get("provider") or ""), "base_url": str(model.get("base_url") or ""), }, "mcp_servers_yaml": str(data.get("mcp_servers_yaml") or ""), "config_path": data.get("config_path") or path, "lxc": Config.hermes_agent_lxc, } def normalize_mcp_yaml(value: str) -> str: value = value.strip() if not value: return "" first = next((line.strip() for line in value.splitlines() if line.strip()), "") if re.match(r"^mcp_servers\s*:", first): return value indented = "\n".join((" " + line if line.strip() else line) for line in value.splitlines()) return "mcp_servers:\n" + indented def validate_hermes_config_payload(body: dict[str, Any], current: dict[str, Any] | None = None) -> dict[str, str]: current_model = (current or {}).get("model") if isinstance((current or {}).get("model"), dict) else {} model = body.get("model") if isinstance(body.get("model"), dict) else {} default_model = str( model.get("default") or body.get("model_default") or current_model.get("default") or current_model.get("model") or Config.hermes_model or "" ).strip() provider = str( model.get("provider") or body.get("provider") or current_model.get("provider") or "openrouter" ).strip() base_url = str( model.get("base_url") or body.get("base_url") or current_model.get("base_url") or "" ).strip() if "mcp_servers_yaml" in body: mcp_servers_yaml = normalize_mcp_yaml(str(body.get("mcp_servers_yaml") or "")) else: mcp_servers_yaml = str((current or {}).get("mcp_servers_yaml") or "") for label, value, limit in ( ("model.default", default_model, 180), ("model.provider", provider, 80), ("model.base_url", base_url, 220), ): if "\n" in value or "\r" in value: raise ValueError(f"{label} must be a single line") if len(value) > limit: raise ValueError(f"{label} is too long") if not default_model: raise ValueError("model.default is required") if base_url and not re.match(r"^https?://", base_url): raise ValueError("model.base_url must start with http:// or https://") if len(mcp_servers_yaml) > 20000: raise ValueError("mcp_servers_yaml is too large") if mcp_servers_yaml and not re.match(r"^mcp_servers\s*:", mcp_servers_yaml.splitlines()[0].strip()): raise ValueError("mcp_servers_yaml must start with mcp_servers:") return { "default_model": default_model, "provider": provider, "base_url": base_url, "mcp_servers_yaml": mcp_servers_yaml, } def write_hermes_runtime_config(body: dict[str, Any]) -> dict[str, Any]: current = read_hermes_runtime_config() payload = validate_hermes_config_payload(body, current) script = r''' import json import pathlib import re import shutil import sys import time path = pathlib.Path(sys.argv[1]) payload = json.loads(sys.stdin.read()) text = path.read_text(encoding="utf-8") def quote(value): return json.dumps(value, ensure_ascii=False) def find_block(lines, key): start = None for idx, line in enumerate(lines): if re.match(rf"^{re.escape(key)}\s*:\s*(?:#.*)?$", line): start = idx break if start is None: return None, None end = len(lines) for idx in range(start + 1, len(lines)): line = lines[idx] if line and not line[0].isspace() and not line.lstrip().startswith("#"): end = idx break return start, end def replace_block(text, key, block): lines = text.splitlines() start, end = find_block(lines, key) block_lines = block.rstrip().splitlines() if block.strip() else [] if start is None: if not block_lines: return text.rstrip() + "\n" return text.rstrip() + "\n\n" + "\n".join(block_lines) + "\n" if block_lines: new_lines = lines[:start] + block_lines + lines[end:] else: new_lines = lines[:start] + lines[end:] return "\n".join(new_lines).rstrip() + "\n" model_block = "\n".join([ "model:", f" default: {quote(payload['default_model'])}", f" provider: {quote(payload['provider'])}", f" base_url: {quote(payload['base_url'])}", ]) text = replace_block(text, "model", model_block) text = replace_block(text, "mcp_servers", payload.get("mcp_servers_yaml", "")) backup = path.with_name(path.name + ".bak-" + time.strftime("%Y%m%d%H%M%S")) shutil.copy2(path, backup) path.write_text(text, encoding="utf-8") print(json.dumps({"backup": str(backup)}, ensure_ascii=False)) ''' path = f"{Config.hermes_agent_dir}/config.yaml" raw = run_lxc_command( ["python3", "-c", script, path], input_text=json.dumps(payload, ensure_ascii=False), timeout=20, ) result = json.loads(raw) if body.get("restart", True): run_lxc_command( ["bash", "-lc", f"cd {shlex.quote(Config.hermes_agent_dir)} && docker compose restart hermes-agent"], timeout=90, ) return { "model": { "default": payload["default_model"], "provider": payload["provider"], "base_url": payload["base_url"], }, "mcp_servers_yaml": payload["mcp_servers_yaml"], "backup": result.get("backup"), "restarted": bool(body.get("restart", True)), } def handle_hermes_config_get(headers: dict[str, str]) -> tuple[int, dict[str, Any]]: ok, status, message = is_admin_request(headers) if not ok: return status, {"code": status, "msg": message} try: config = read_hermes_runtime_config() except Exception as exc: logging.error("failed to read Hermes config:\n%s", traceback.format_exc()) return 500, {"code": 500, "msg": str(exc)} return 200, {"code": 0, "msg": "ok", "config": config} def handle_hermes_config_post(headers: dict[str, str], body: dict[str, Any]) -> tuple[int, dict[str, Any]]: ok, status, message = is_admin_request(headers) if not ok: return status, {"code": status, "msg": message} try: config = write_hermes_runtime_config(body) except ValueError as exc: return 400, {"code": 400, "msg": str(exc)} except Exception as exc: logging.error("failed to write Hermes config:\n%s", traceback.format_exc()) return 500, {"code": 500, "msg": str(exc)} logging.info("updated Hermes runtime config model=%s", config["model"]["default"]) return 200, {"code": 0, "msg": "ok", "config": config} def one_line(value: Any, limit: int, default: str = "") -> str: text = str(value if value is not None else default).strip() text = text.replace("\r", " ").replace("\n", " ").strip() if len(text) > limit: text = text[:limit].rstrip() return text def string_list(value: Any, limit: int = 200) -> list[str]: if not isinstance(value, list): return [] out: list[str] = [] for item in value[:limit]: text = one_line(item, 180) if text: out.append(text) return out def now_ms() -> int: return int(time.time() * 1000) def runtime_model_profile(runtime: dict[str, Any]) -> dict[str, Any]: model = runtime.get("model") if isinstance(runtime.get("model"), dict) else {} model_id = one_line(model.get("default") or Config.hermes_model or "gemini-3-pro-preview", 180) provider = one_line(model.get("provider") or "openrouter", 80) base_url = one_line(model.get("base_url") or "", 220) created_at = now_ms() return { "id": "runtime-default", "name": "线上默认模型", "provider": provider, "model": model_id, "baseUrl": base_url, "apiKeyRef": "服务器环境变量", "enabled": True, "isDefault": True, "createdAt": created_at, "updatedAt": created_at, } def normalize_model_profile(raw: Any, fallback: dict[str, Any], index: int) -> dict[str, Any] | None: if not isinstance(raw, dict): return None profile_id = one_line(raw.get("id"), 80) if not profile_id or not UI_ID_RE.match(profile_id): profile_id = f"model_{index + 1}" name = one_line(raw.get("name"), 80, fallback.get("name") or "模型接入") model = one_line(raw.get("model"), 180, fallback.get("model") or "") provider = one_line(raw.get("provider"), 80, fallback.get("provider") or "openrouter") base_url = one_line(raw.get("baseUrl") or raw.get("base_url"), 220, fallback.get("baseUrl") or "") api_key_ref = one_line(raw.get("apiKeyRef") or raw.get("api_key_ref"), 120, fallback.get("apiKeyRef") or "") if not model: return None created_at = raw.get("createdAt") if isinstance(raw.get("createdAt"), (int, float)) else now_ms() updated_at = raw.get("updatedAt") if isinstance(raw.get("updatedAt"), (int, float)) else now_ms() return { "id": profile_id, "name": name or model, "provider": provider, "model": model, "baseUrl": base_url, "apiKeyRef": api_key_ref, "enabled": bool(raw.get("enabled", True)), "isDefault": bool(raw.get("isDefault", False)), "createdAt": int(created_at), "updatedAt": int(updated_at), } def normalize_stages(value: Any) -> dict[str, list[str]]: value = value if isinstance(value, dict) else {} return { "pre": string_list(value.get("pre")), "exec": string_list(value.get("exec")), "post": string_list(value.get("post")), } def normalize_agent(raw: Any, index: int) -> dict[str, Any] | None: if not isinstance(raw, dict): return None agent_id = one_line(raw.get("id"), 80) if not agent_id or not UI_ID_RE.match(agent_id): agent_id = f"agent_{index + 1}" name = one_line(raw.get("name"), 80) system_prompt = str(raw.get("systemPrompt") or raw.get("system_prompt") or "").strip() if not name or not system_prompt: return None created_at = raw.get("createdAt") if isinstance(raw.get("createdAt"), (int, float)) else now_ms() updated_at = raw.get("updatedAt") if isinstance(raw.get("updatedAt"), (int, float)) else now_ms() return { "id": agent_id, "emoji": one_line(raw.get("emoji"), 16, "🤖"), "name": name, "desc": one_line(raw.get("desc"), 180), "model": one_line(raw.get("model"), 180), "modelProfileId": one_line(raw.get("modelProfileId") or raw.get("model_profile_id"), 80), "systemPrompt": system_prompt[:20000], "skills": string_list(raw.get("skills")), "stages": normalize_stages(raw.get("stages")), "createdAt": int(created_at), "updatedAt": int(updated_at), } def normalize_ui_config(raw: dict[str, Any], runtime: dict[str, Any] | None = None) -> dict[str, Any]: runtime_profile = runtime_model_profile(runtime or {}) model_profiles_raw = raw.get("modelProfiles") if isinstance(raw.get("modelProfiles"), list) else [] profiles: list[dict[str, Any]] = [] seen_profile_ids: set[str] = set() for idx, item in enumerate(model_profiles_raw[:80]): profile = normalize_model_profile(item, runtime_profile, idx) if not profile or profile["id"] in seen_profile_ids: continue seen_profile_ids.add(profile["id"]) profiles.append(profile) if not profiles: profiles.append(runtime_profile) if not any(profile.get("isDefault") for profile in profiles): profiles[0]["isDefault"] = True default_seen = False for profile in profiles: if profile.get("isDefault") and not default_seen: default_seen = True else: profile["isDefault"] = False agents_raw = raw.get("agents") if isinstance(raw.get("agents"), list) else [] agents: list[dict[str, Any]] = [] seen_agent_ids: set[str] = set() for idx, item in enumerate(agents_raw[:200]): agent = normalize_agent(item, idx) if not agent or agent["id"] in seen_agent_ids: continue seen_agent_ids.add(agent["id"]) agents.append(agent) return { "version": 1, "modelProfiles": profiles, "agents": agents, "updatedAt": now_ms(), "configPath": Config.hermes_ui_config_path, "lxc": Config.hermes_agent_lxc, } def read_ui_config_file() -> dict[str, Any]: script = r''' import json import pathlib import sys path = pathlib.Path(sys.argv[1]) if not path.exists(): print("{}") else: print(path.read_text(encoding="utf-8")) ''' raw = run_lxc_command(["python3", "-c", script, Config.hermes_ui_config_path], timeout=20) if not raw.strip(): return {} data = json.loads(raw) return data if isinstance(data, dict) else {} def write_ui_config_file(config: dict[str, Any]) -> None: script = r''' import json import os import pathlib import sys path = pathlib.Path(sys.argv[1]) payload = json.loads(sys.stdin.read()) path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_name(path.name + f".tmp-{os.getpid()}") tmp.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") tmp.chmod(0o600) tmp.replace(path) ''' run_lxc_command( ["python3", "-c", script, Config.hermes_ui_config_path], input_text=json.dumps(config, ensure_ascii=False), timeout=20, ) def handle_ui_config_get(headers: dict[str, str]) -> tuple[int, dict[str, Any]]: ok, status, message = is_admin_request(headers) if not ok: return status, {"code": status, "msg": message} try: runtime = read_hermes_runtime_config() config = normalize_ui_config(read_ui_config_file(), runtime) except Exception as exc: logging.error("failed to read Hermes UI config:\n%s", traceback.format_exc()) return 500, {"code": 500, "msg": str(exc)} return 200, {"code": 0, "msg": "ok", "config": config} def handle_ui_config_post(headers: dict[str, str], body: dict[str, Any]) -> tuple[int, dict[str, Any]]: ok, status, message = is_admin_request(headers) if not ok: return status, {"code": status, "msg": message} try: runtime = read_hermes_runtime_config() raw_config = body.get("config") if isinstance(body.get("config"), dict) else body config = normalize_ui_config(raw_config, runtime) write_ui_config_file(config) except ValueError as exc: return 400, {"code": 400, "msg": str(exc)} except Exception as exc: logging.error("failed to write Hermes UI config:\n%s", traceback.format_exc()) return 500, {"code": 500, "msg": str(exc)} logging.info( "updated Hermes UI config model_profiles=%s agents=%s", len(config.get("modelProfiles", [])), len(config.get("agents", [])), ) return 200, {"code": 0, "msg": "ok", "config": config} def admin_allowed_origins(headers: dict[str, str]) -> set[str]: origins = _csv_set(_env("HERMES_ADMIN_ORIGINS", "https://hermes.kang-kang.com")) host = (headers.get("x-forwarded-host") or headers.get("host") or "").split(",", 1)[0].strip() proto = (headers.get("x-forwarded-proto") or "").split(",", 1)[0].strip() if host: origins.add(f"{proto or 'https'}://{host}") origins.add(f"http://{host}") return origins def is_admin_request(headers: dict[str, str]) -> tuple[bool, int, str]: cookie = headers.get("cookie", "") if "hermes_auth=ok" not in cookie: return False, 401, "login required" origin = headers.get("origin", "") referer = headers.get("referer", "") allowed = admin_allowed_origins(headers) if origin and origin not in allowed: return False, 403, "invalid origin" if not origin and referer and not any( referer.startswith(item.rstrip("/") + "/") for item in allowed ): return False, 403, "invalid referer" return True, 200, "ok" COMMON_PROVIDER_KEYS = [ "HERMES_API_KEY", "OPENROUTER_API_KEY", "OPENAI_API_KEY", "ANTHROPIC_API_KEY", "GOOGLE_API_KEY", "GEMINI_API_KEY", "DEEPSEEK_API_KEY", "XAI_API_KEY", "GROQ_API_KEY", "DASHSCOPE_API_KEY", ] def provider_env_key(raw: Any) -> str: key = str(raw or "").strip() if key.startswith("env:"): key = key[4:].strip() if not re.match(r"^[A-Z][A-Z0-9_]{1,120}$", key): return "" return key def validate_provider_env_key(raw: Any) -> str: key = provider_env_key(raw) if not key: raise ValueError("env key must look like OPENROUTER_API_KEY") blocked_prefixes = ("FEISHU_", "HERMES_ADMIN_", "FEISHU_BRIDGE_") blocked_keys = {"PORT", "PATH", "HOME", "SHELL", "PWD"} if key in blocked_keys or key.startswith(blocked_prefixes): raise ValueError("this env key is managed by deployment, not by provider settings") if not key.endswith("_API_KEY"): raise ValueError("provider env key must end with _API_KEY") return key def provider_key_profiles() -> dict[str, list[str]]: result: dict[str, list[str]] = {} try: config = read_ui_config_file() except Exception: logging.warning("provider key profile map unavailable:\n%s", traceback.format_exc()) return result profiles = config.get("modelProfiles") if isinstance(config.get("modelProfiles"), list) else [] for profile in profiles: if not isinstance(profile, dict): continue key = provider_env_key(profile.get("apiKeyRef")) if not key: continue name = str(profile.get("name") or profile.get("model") or "Profile")[:80] result.setdefault(key, []).append(name) return result def handle_provider_keys_get(headers: dict[str, str]) -> tuple[int, dict[str, Any]]: ok, status, message = is_admin_request(headers) if not ok: return status, {"code": status, "msg": message} try: env = parse_env_file(Config.env_file) profile_map = provider_key_profiles() keys = set(COMMON_PROVIDER_KEYS) | set(profile_map.keys()) payload = [] for key in sorted(keys): present = bool(os.environ.get(key) or env.get(key)) payload.append({ "key": key, "present": present, "profiles": profile_map.get(key, []), }) return 200, {"code": 0, "msg": "ok", "keys": payload} except Exception as exc: logging.error("failed to read provider keys:\n%s", traceback.format_exc()) return 500, {"code": 500, "msg": str(exc)} def handle_provider_keys_post(headers: dict[str, str], body: dict[str, Any]) -> tuple[int, dict[str, Any]]: ok, status, message = is_admin_request(headers) if not ok: return status, {"code": status, "msg": message} try: key = validate_provider_env_key(body.get("key")) value = str(body.get("value") or body.get("api_key") or "").strip() if not value: raise ValueError("api key value is required") if "\n" in value or "\r" in value: raise ValueError("api key value must be a single line") if len(value) > 10000: raise ValueError("api key value is too large") write_env_updates(Config.env_file, {key: value}) reload_config_from_env_file() logging.info("updated provider env key %s", key) return 200, {"code": 0, "msg": "ok", "key": {"key": key, "present": True}} except ValueError as exc: return 400, {"code": 400, "msg": str(exc)} except Exception as exc: logging.error("failed to write provider key:\n%s", traceback.format_exc()) return 500, {"code": 500, "msg": str(exc)} def handle_provider_keys_delete(path: str, headers: dict[str, str]) -> tuple[int, dict[str, Any]]: ok, status, message = is_admin_request(headers) if not ok: return status, {"code": status, "msg": message} try: raw_key = urllib.parse.unquote(path[len("/feishu/provider-keys/") :].strip("/")) key = validate_provider_env_key(raw_key) write_env_removals(Config.env_file, {key}) reload_config_from_env_file() logging.info("removed provider env key %s", key) return 200, {"code": 0, "msg": "ok", "key": {"key": key, "present": False}} except ValueError as exc: return 400, {"code": 400, "msg": str(exc)} except Exception as exc: logging.error("failed to remove provider key:\n%s", traceback.format_exc()) return 500, {"code": 500, "msg": str(exc)} def validate_feishu_credentials(app_id: str, app_secret: str) -> None: data = http_json( "POST", f"{FEISHU_API_BASE}/auth/v3/tenant_access_token/internal", {"app_id": app_id, "app_secret": app_secret}, timeout=15, ) if data.get("code") != 0: raise ValueError(f"Feishu credential validation failed: {data.get('msg') or data}") def find_app_suffix(env: dict[str, str], app_id: str) -> str: for suffix in app_suffixes(): if env.get(f"FEISHU_APP_ID{suffix}") == app_id: return suffix for suffix in app_suffixes(): if not env.get(f"FEISHU_APP_ID{suffix}") and not env.get(f"FEISHU_APP_SECRET{suffix}"): return suffix raise RuntimeError("no free Feishu app slot; supported slots are FEISHU_APP_ID through FEISHU_APP_ID_20") def upsert_feishu_app(body: dict[str, Any]) -> dict[str, Any]: app_id = str(body.get("app_id", "")).strip() app_secret = str(body.get("app_secret", "")).strip() verification_token = str(body.get("verification_token", "")).strip() if not APP_ID_RE.match(app_id): raise ValueError("App ID must look like cli_xxx") if len(app_secret) < 16: raise ValueError("App Secret is too short") if len(verification_token) < 16: raise ValueError("Verification Token is too short") validate_feishu_credentials(app_id, app_secret) env = parse_env_file(Config.env_file) suffix = find_app_suffix(env, app_id) updates = { f"FEISHU_APP_ID{suffix}": app_id, f"FEISHU_APP_SECRET{suffix}": app_secret, f"FEISHU_VERIFICATION_TOKEN{suffix}": verification_token, } if not env.get(f"FEISHU_DEFAULT_RECEIVE_ID_TYPE{suffix}"): updates[f"FEISHU_DEFAULT_RECEIVE_ID_TYPE{suffix}"] = "chat_id" write_env_updates(Config.env_file, updates) sync_tokens_file() reload_config_from_env_file() token_cache.drop(app_id) return { "app_id": app_id, "callback_url": f"https://hermes.kang-kang.com/feishu/events/{app_id}", "slot": suffix or "_1", } def delete_feishu_app(app_id: str) -> dict[str, Any]: app_id = app_id.strip() if not APP_ID_RE.match(app_id): raise ValueError("App ID must look like cli_xxx") env = parse_env_file(Config.env_file) suffix = "" for candidate in app_suffixes(): if env.get(f"FEISHU_APP_ID{candidate}") == app_id: suffix = candidate break else: raise KeyError(app_id) remove_keys = { f"FEISHU_APP_ID{suffix}", f"FEISHU_APP_SECRET{suffix}", f"FEISHU_VERIFICATION_TOKEN{suffix}", f"FEISHU_DEFAULT_RECEIVE_ID{suffix}", f"FEISHU_DEFAULT_RECEIVE_ID_TYPE{suffix}", f"FEISHU_ALLOWED_CHAT_IDS{suffix}", } updates: dict[str, str] = {} current_default_app_id = env.get("FEISHU_DEFAULT_APP_ID") or env.get("FEISHU_APP_ID", "") if current_default_app_id == app_id: remaining_app_ids = [ env.get(f"FEISHU_APP_ID{candidate}", "") for candidate in app_suffixes() if candidate != suffix and env.get(f"FEISHU_APP_ID{candidate}", "") ] remove_keys.add("FEISHU_DEFAULT_APP_ID") if remaining_app_ids: updates["FEISHU_DEFAULT_APP_ID"] = remaining_app_ids[0] write_env_removals(Config.env_file, remove_keys, updates) sync_tokens_file() reload_config_from_env_file() token_cache.drop(app_id) return {"app_id": app_id, "slot": suffix or "_1"} def handle_apps_admin(headers: dict[str, str], body: dict[str, Any]) -> tuple[int, dict[str, Any]]: ok, status, message = is_admin_request(headers) if not ok: return status, {"code": status, "msg": message} try: app = upsert_feishu_app(body) except ValueError as exc: return 400, {"code": 400, "msg": str(exc)} logging.info("upserted Feishu app from Hermes settings app_id=%s", app["app_id"]) return 200, {"code": 0, "msg": "ok", "app": app} def handle_apps_delete(path: str, headers: dict[str, str]) -> tuple[int, dict[str, Any]]: ok, status, message = is_admin_request(headers) if not ok: return status, {"code": status, "msg": message} route = urllib.parse.urlparse(path).path app_id = urllib.parse.unquote(route[len("/feishu/apps/") :].strip("/")) try: app = delete_feishu_app(app_id) except ValueError as exc: return 400, {"code": 400, "msg": str(exc)} except KeyError: return 404, {"code": 404, "msg": f"unknown Feishu app_id: {app_id}"} logging.info("deleted Feishu app from Hermes settings app_id=%s", app["app_id"]) return 200, {"code": 0, "msg": "ok", "app": app} def handle_notify(path: str, headers: dict[str, str], body: dict[str, Any]) -> tuple[int, dict[str, Any]]: expected = Config.notify_token if not expected: return 503, {"code": 503, "msg": "FEISHU_NOTIFY_TOKEN is not configured"} auth = headers.get("authorization", "") token = headers.get("x-hermes-feishu-token", "") if auth.lower().startswith("bearer "): token = auth[7:].strip() if token != expected: return 401, {"code": 401, "msg": "unauthorized"} app_id = str(body.get("app_id") or resolve_app_id(path, body)).strip() app = Config.feishu_apps.get(app_id) if not app: return 404, {"code": 404, "msg": f"unknown Feishu app_id: {app_id}"} text = str(body.get("text", "")).strip() if not text: return 400, {"code": 400, "msg": "text is required"} receive_id = str(body.get("receive_id") or app.get("default_receive_id", "")).strip() receive_id_type = str( body.get("receive_id_type") or app.get("default_receive_id_type", "chat_id") ).strip() if not receive_id: return 400, {"code": 400, "msg": "receive_id is required"} result = send_feishu_text(app_id, receive_id, text, receive_id_type=receive_id_type) return 200, {"code": 0, "msg": "ok", "app_id": app_id, "feishu": result} class Handler(BaseHTTPRequestHandler): server_version = "HermesFeishuBridge/1.0" def proxy_chat_completions(self, body: dict[str, Any]) -> None: req, stream, status, error = build_profile_chat_request(body) if error is not None or req is None: self.send_json(status, error or {"code": status, "msg": "invalid chat request"}) return try: with urllib.request.urlopen(req, timeout=Config.request_timeout) as resp: content_type = resp.headers.get("Content-Type") or ( "text/event-stream" if stream else "application/json" ) self.send_response(resp.status) self.send_header("Content-Type", content_type) self.send_header("Cache-Control", "no-cache") self.end_headers() while True: chunk = resp.read(8192) if not chunk: break self.wfile.write(chunk) self.wfile.flush() except urllib.error.HTTPError as exc: raw = exc.read() self.send_response(exc.code) self.send_header("Content-Type", exc.headers.get("Content-Type") or "application/json") self.end_headers() self.wfile.write(raw or json.dumps({"code": exc.code, "msg": exc.reason}).encode("utf-8")) except BrokenPipeError: logging.info("chat proxy client disconnected") except Exception as exc: logging.error("chat proxy failed:\n%s", traceback.format_exc()) self.send_json(500, {"code": 500, "msg": str(exc)}) def do_GET(self) -> None: headers = {key.lower(): value for key, value in self.headers.items()} if self.path == "/health": self.send_json(200, {"ok": True, "service": "feishu-bridge"}) return if self.path == "/feishu/apps": status, payload = handle_apps() self.send_json(status, payload) return if self.path == "/feishu/hermes-config": status, payload = handle_hermes_config_get(headers) self.send_json(status, payload) return if self.path == "/feishu/ui-config": status, payload = handle_ui_config_get(headers) self.send_json(status, payload) return if self.path == "/feishu/provider-keys": status, payload = handle_provider_keys_get(headers) self.send_json(status, payload) return self.send_json(404, {"code": 404, "msg": "not found"}) def do_POST(self) -> None: try: body = self.read_json() headers = {key.lower(): value for key, value in self.headers.items()} if self.path == "/feishu/events" or self.path.startswith("/feishu/events/"): status, payload = handle_feishu_event(self.path, body) elif self.path == "/feishu/apps": status, payload = handle_apps_admin(headers, body) elif self.path == "/feishu/hermes-config": status, payload = handle_hermes_config_post(headers, body) elif self.path == "/feishu/ui-config": status, payload = handle_ui_config_post(headers, body) elif self.path == "/feishu/provider-keys": status, payload = handle_provider_keys_post(headers, body) elif self.path == "/feishu/chat/completions": ok, status, message = is_admin_request(headers) if not ok: self.send_json(status, {"code": status, "msg": message}) else: self.proxy_chat_completions(body) return elif self.path == "/feishu/notify" or self.path.startswith("/feishu/notify/"): status, payload = handle_notify(self.path, headers, body) else: status, payload = 404, {"code": 404, "msg": "not found"} except Exception as exc: logging.error("request failed:\n%s", traceback.format_exc()) status, payload = 500, {"code": 500, "msg": str(exc)} self.send_json(status, payload) def do_DELETE(self) -> None: try: headers = {key.lower(): value for key, value in self.headers.items()} if self.path.startswith("/feishu/apps/"): status, payload = handle_apps_delete(self.path, headers) elif self.path.startswith("/feishu/provider-keys/"): status, payload = handle_provider_keys_delete(self.path, headers) else: status, payload = 404, {"code": 404, "msg": "not found"} except Exception as exc: logging.error("request failed:\n%s", traceback.format_exc()) status, payload = 500, {"code": 500, "msg": str(exc)} self.send_json(status, payload) def read_json(self) -> dict[str, Any]: length = int(self.headers.get("Content-Length", "0")) raw = self.rfile.read(length).decode("utf-8") if length else "{}" if not raw: return {} data = json.loads(raw) if not isinstance(data, dict): raise ValueError("JSON object is required") return data def send_json(self, status: int, payload: dict[str, Any]) -> None: data = json.dumps(payload, ensure_ascii=False).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(data))) self.end_headers() self.wfile.write(data) def log_message(self, fmt: str, *args: Any) -> None: logging.info("%s - %s", self.address_string(), fmt % args) def main() -> None: logging.basicConfig( level=os.environ.get("LOG_LEVEL", "INFO"), format="%(asctime)s %(levelname)s %(message)s", ) missing = [] if not Config.feishu_apps: missing.append("FEISHU_APP_ID and FEISHU_APP_SECRET") for app_id, app in Config.feishu_apps.items(): if not app.get("app_secret"): missing.append(f"FEISHU_APP_SECRET for {app_id}") if missing: logging.warning("missing required env: %s", ", ".join(missing)) logging.info("configured Feishu apps: %s", ", ".join(Config.feishu_apps) or "(none)") httpd = ThreadingHTTPServer(("0.0.0.0", Config.port), Handler) logging.info("Feishu bridge listening on :%s", Config.port) httpd.serve_forever() if __name__ == "__main__": main()