#!/usr/bin/env python3 """Feishu <-> Hermes bridge. This service intentionally uses only the Python standard library so it can run next to the existing no-build Hermes UI deployment without adding package management to the project. """ from __future__ import annotations import json import logging import os import hashlib import threading import time import traceback import urllib.error import urllib.parse import urllib.request from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from typing import Any FEISHU_API_BASE = "https://open.feishu.cn/open-apis" def _env(name: str, default: str = "") -> str: return os.environ.get(name, default).strip() def _bool_env(name: str, default: str = "false") -> bool: return _env(name, default).lower() in {"1", "true", "yes"} def _csv_set(value: str) -> set[str]: return {item.strip() for item in value.split(",") if item.strip()} def _csv_list(value: str) -> list[str]: return [item.strip() for item in value.split(",") if item.strip()] def _load_feishu_apps() -> dict[str, dict[str, Any]]: apps: dict[str, dict[str, Any]] = {} for suffix in ["", *[f"_{idx}" for idx in range(2, 10)]]: app_id = _env(f"FEISHU_APP_ID{suffix}") app_secret = _env(f"FEISHU_APP_SECRET{suffix}") if not app_id and not app_secret: continue apps[app_id] = { "app_id": app_id, "app_secret": app_secret, "verification_tokens": _csv_list(_env(f"FEISHU_VERIFICATION_TOKEN{suffix}")), "default_receive_id": _env(f"FEISHU_DEFAULT_RECEIVE_ID{suffix}"), "default_receive_id_type": _env( f"FEISHU_DEFAULT_RECEIVE_ID_TYPE{suffix}", "chat_id", ), "allowed_chat_ids": _csv_set(_env(f"FEISHU_ALLOWED_CHAT_IDS{suffix}")), } return apps class Config: feishu_app_id = _env("FEISHU_APP_ID") feishu_app_secret = _env("FEISHU_APP_SECRET") feishu_apps = _load_feishu_apps() default_feishu_app_id = _env("FEISHU_DEFAULT_APP_ID", feishu_app_id) notify_token = _env("FEISHU_NOTIFY_TOKEN") reply_in_thread = _bool_env("FEISHU_REPLY_IN_THREAD") hermes_api_base = _env("HERMES_API_BASE", "http://127.0.0.1:8642/v1").rstrip("/") hermes_api_key = _env("HERMES_API_KEY") hermes_model = _env("HERMES_MODEL", "gemini-3-pro-preview") hermes_system_prompt = _env( "HERMES_SYSTEM_PROMPT", "你是爱马仕 Hermes。你通过飞书与用户对话,回答要直接、简洁、可执行。", ) request_timeout = float(_env("FEISHU_BRIDGE_TIMEOUT", "60")) port = int(_env("PORT", _env("FEISHU_BRIDGE_PORT", "8787"))) class TokenCache: def __init__(self) -> None: self._lock = threading.Lock() self._tokens: dict[str, tuple[str, float]] = {} def get(self, app_id: str) -> str: with self._lock: token, expires_at = self._tokens.get(app_id, ("", 0.0)) if token and time.time() < expires_at - 300: return token app = Config.feishu_apps.get(app_id) if not app: raise RuntimeError(f"unknown Feishu app_id: {app_id}") if not app.get("app_id") or not app.get("app_secret"): raise RuntimeError(f"FEISHU_APP_ID and FEISHU_APP_SECRET are required for {app_id}") payload = { "app_id": app["app_id"], "app_secret": app["app_secret"], } data = http_json( "POST", f"{FEISHU_API_BASE}/auth/v3/tenant_access_token/internal", payload, ) if data.get("code") != 0: raise RuntimeError(f"Feishu token error: {data}") token = str(data["tenant_access_token"]) expires_at = time.time() + int(data.get("expire", 7200)) self._tokens[app_id] = (token, expires_at) return token token_cache = TokenCache() seen_events: dict[str, float] = {} seen_lock = threading.Lock() def http_json( method: str, url: str, payload: dict[str, Any], headers: dict[str, str] | None = None, timeout: float | None = None, ) -> dict[str, Any]: body = json.dumps(payload, ensure_ascii=False).encode("utf-8") req = urllib.request.Request( url, data=body, method=method, headers={ "Content-Type": "application/json; charset=utf-8", **(headers or {}), }, ) try: with urllib.request.urlopen(req, timeout=timeout or Config.request_timeout) as resp: raw = resp.read().decode("utf-8") except urllib.error.HTTPError as exc: raw = exc.read().decode("utf-8", errors="replace") raise RuntimeError(f"HTTP {exc.code} {url}: {raw}") from exc if not raw: return {} return json.loads(raw) def json_text(value: Any) -> str: if isinstance(value, str): try: value = json.loads(value) except json.JSONDecodeError: return value if isinstance(value, dict): for key in ("text", "title", "content"): text = value.get(key) if isinstance(text, str) and text.strip(): return text.strip() return "" def resolve_app_id(path: str, body: dict[str, Any] | None = None) -> str: route_app_id = "" for prefix in ("/feishu/events/", "/feishu/notify/"): if path.startswith(prefix): route_app_id = urllib.parse.unquote(path[len(prefix) :].strip("/")) break if route_app_id: return route_app_id body = body or {} body_app_id = str(body.get("app_id") or body.get("header", {}).get("app_id") or "").strip() if body_app_id in Config.feishu_apps: return body_app_id return Config.default_feishu_app_id def callback_token(body: dict[str, Any]) -> str: return str(body.get("token") or body.get("header", {}).get("token") or "") def token_digest(value: str) -> str: if not value: return "(empty)" return f"len={len(value)} sha256={hashlib.sha256(value.encode('utf-8')).hexdigest()[:12]}" def verify_callback_token(body: dict[str, Any], app_id: str) -> bool: app = Config.feishu_apps.get(app_id, {}) expected_tokens = app.get("verification_tokens", []) if not expected_tokens: return True token = callback_token(body) ok = token in expected_tokens if not ok: logging.warning( "invalid Feishu verification token app_id=%s got=%s expected_any=%s body_keys=%s header_keys=%s event_keys=%s", app_id, token_digest(token), [token_digest(item) for item in expected_tokens], sorted(body.keys()), sorted(body.get("header", {}).keys()) if isinstance(body.get("header"), dict) else [], sorted(body.get("event", {}).keys()) if isinstance(body.get("event"), dict) else [], ) return ok def remember_event(event_id: str) -> bool: if not event_id: return True now = time.time() with seen_lock: stale = [key for key, ts in seen_events.items() if now - ts > 3600] for key in stale: seen_events.pop(key, None) if event_id in seen_events: return False seen_events[event_id] = now return True def handle_feishu_event(path: str, body: dict[str, Any]) -> tuple[int, dict[str, Any]]: app_id = resolve_app_id(path, body) if app_id not in Config.feishu_apps: return 404, {"code": 404, "msg": f"unknown Feishu app_id: {app_id}"} if "encrypt" in body: return 400, { "code": 400, "msg": "encrypted callbacks are not enabled; disable Feishu event encryption or add decrypt support", } if body.get("type") == "url_verification": if not verify_callback_token(body, app_id): return 403, {"code": 403, "msg": "invalid verification token"} return 200, {"challenge": body.get("challenge", "")} if not verify_callback_token(body, app_id): return 403, {"code": 403, "msg": "invalid verification token"} event_type = body.get("header", {}).get("event_type") or body.get("event", {}).get("type") event_id = body.get("header", {}).get("event_id", "") if event_type != "im.message.receive_v1": return 200, {"code": 0, "msg": "ignored"} if not remember_event(f"{app_id}:{event_id}"): return 200, {"code": 0, "msg": "duplicate"} threading.Thread(target=process_message_event, args=(app_id, body), daemon=True).start() return 200, {"code": 0, "msg": "ok"} def process_message_event(app_id: str, body: dict[str, Any]) -> None: try: app = Config.feishu_apps[app_id] event = body.get("event", {}) sender = event.get("sender", {}) if sender.get("sender_type") == "app": return message = event.get("message", {}) message_type = message.get("message_type") chat_id = message.get("chat_id", "") message_id = message.get("message_id", "") allowed_chat_ids = app.get("allowed_chat_ids", set()) if allowed_chat_ids and chat_id not in allowed_chat_ids: logging.info("ignored message from disallowed app_id=%s chat_id=%s", app_id, chat_id) return if message_type != "text": send_feishu_text(app_id, chat_id, "我现在只支持处理文本消息。", message_id=message_id) return text = json_text(message.get("content")) if not text: send_feishu_text(app_id, chat_id, "我没有读到文本内容。", message_id=message_id) return answer = ask_hermes(text, event) send_feishu_text(app_id, chat_id, answer, message_id=message_id) except Exception: logging.error("failed to process Feishu event:\n%s", traceback.format_exc()) def ask_hermes(text: str, event: dict[str, Any]) -> str: payload = { "model": Config.hermes_model, "stream": False, "messages": [ {"role": "system", "content": Config.hermes_system_prompt}, { "role": "user", "content": ( "以下消息来自飞书。\n" f"chat_id: {event.get('message', {}).get('chat_id', '')}\n\n" f"{text}" ), }, ], } headers = {} if Config.hermes_api_key: headers["Authorization"] = f"Bearer {Config.hermes_api_key}" data = http_json( "POST", f"{Config.hermes_api_base}/chat/completions", payload, headers=headers, ) try: content = data["choices"][0]["message"]["content"] except (KeyError, IndexError, TypeError) as exc: raise RuntimeError(f"Unexpected Hermes response: {data}") from exc return str(content).strip() or "Hermes 没有返回内容。" def send_feishu_text( app_id: str, receive_id: str, text: str, receive_id_type: str = "chat_id", message_id: str = "", ) -> dict[str, Any]: if not receive_id: raise RuntimeError("receive_id is required") token = token_cache.get(app_id) chunks = split_text(text) result: dict[str, Any] = {} for chunk in chunks: content = json.dumps({"text": chunk}, ensure_ascii=False) if Config.reply_in_thread and message_id: url = f"{FEISHU_API_BASE}/im/v1/messages/{message_id}/reply" payload = {"msg_type": "text", "content": content} else: url = f"{FEISHU_API_BASE}/im/v1/messages?receive_id_type={receive_id_type}" payload = {"receive_id": receive_id, "msg_type": "text", "content": content} result = http_json( "POST", url, payload, headers={"Authorization": f"Bearer {token}"}, ) if result.get("code") not in (None, 0): raise RuntimeError(f"Feishu send error: {result}") return result def split_text(text: str, limit: int = 3800) -> list[str]: text = text.strip() if not text: return [""] chunks: list[str] = [] rest = text while len(rest) > limit: cut = rest.rfind("\n", 0, limit) if cut < limit // 2: cut = limit chunks.append(rest[:cut].strip()) rest = rest[cut:].strip() chunks.append(rest) return chunks def handle_apps() -> tuple[int, dict[str, Any]]: apps = [] for app_id, app in Config.feishu_apps.items(): apps.append({ "app_id": app_id, "callback_url": f"https://hermes.kang-kang.com/feishu/events/{app_id}", "default_receive_id_type": app.get("default_receive_id_type", "chat_id"), "has_default_receive_id": bool(app.get("default_receive_id")), "allowed_chat_ids_count": len(app.get("allowed_chat_ids", set())), "verification_tokens_count": len(app.get("verification_tokens", [])), }) return 200, { "code": 0, "service": "hermes-feishu-bridge", "default_app_id": Config.default_feishu_app_id, "apps": apps, } def handle_notify(path: str, headers: dict[str, str], body: dict[str, Any]) -> tuple[int, dict[str, Any]]: expected = Config.notify_token if not expected: return 503, {"code": 503, "msg": "FEISHU_NOTIFY_TOKEN is not configured"} auth = headers.get("authorization", "") token = headers.get("x-hermes-feishu-token", "") if auth.lower().startswith("bearer "): token = auth[7:].strip() if token != expected: return 401, {"code": 401, "msg": "unauthorized"} app_id = str(body.get("app_id") or resolve_app_id(path, body)).strip() app = Config.feishu_apps.get(app_id) if not app: return 404, {"code": 404, "msg": f"unknown Feishu app_id: {app_id}"} text = str(body.get("text", "")).strip() if not text: return 400, {"code": 400, "msg": "text is required"} receive_id = str(body.get("receive_id") or app.get("default_receive_id", "")).strip() receive_id_type = str( body.get("receive_id_type") or app.get("default_receive_id_type", "chat_id") ).strip() if not receive_id: return 400, {"code": 400, "msg": "receive_id is required"} result = send_feishu_text(app_id, receive_id, text, receive_id_type=receive_id_type) return 200, {"code": 0, "msg": "ok", "app_id": app_id, "feishu": result} class Handler(BaseHTTPRequestHandler): server_version = "HermesFeishuBridge/1.0" def do_GET(self) -> None: if self.path == "/health": self.send_json(200, {"ok": True, "service": "feishu-bridge"}) return if self.path == "/feishu/apps": status, payload = handle_apps() self.send_json(status, payload) return self.send_json(404, {"code": 404, "msg": "not found"}) def do_POST(self) -> None: try: body = self.read_json() headers = {key.lower(): value for key, value in self.headers.items()} if self.path == "/feishu/events" or self.path.startswith("/feishu/events/"): status, payload = handle_feishu_event(self.path, body) elif self.path == "/feishu/notify" or self.path.startswith("/feishu/notify/"): status, payload = handle_notify(self.path, headers, body) else: status, payload = 404, {"code": 404, "msg": "not found"} except Exception as exc: logging.error("request failed:\n%s", traceback.format_exc()) status, payload = 500, {"code": 500, "msg": str(exc)} self.send_json(status, payload) def read_json(self) -> dict[str, Any]: length = int(self.headers.get("Content-Length", "0")) raw = self.rfile.read(length).decode("utf-8") if length else "{}" if not raw: return {} data = json.loads(raw) if not isinstance(data, dict): raise ValueError("JSON object is required") return data def send_json(self, status: int, payload: dict[str, Any]) -> None: data = json.dumps(payload, ensure_ascii=False).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(data))) self.end_headers() self.wfile.write(data) def log_message(self, fmt: str, *args: Any) -> None: logging.info("%s - %s", self.address_string(), fmt % args) def main() -> None: logging.basicConfig( level=os.environ.get("LOG_LEVEL", "INFO"), format="%(asctime)s %(levelname)s %(message)s", ) missing = [] if not Config.feishu_apps: missing.append("FEISHU_APP_ID and FEISHU_APP_SECRET") for app_id, app in Config.feishu_apps.items(): if not app.get("app_secret"): missing.append(f"FEISHU_APP_SECRET for {app_id}") if missing: logging.warning("missing required env: %s", ", ".join(missing)) logging.info("configured Feishu apps: %s", ", ".join(Config.feishu_apps) or "(none)") httpd = ThreadingHTTPServer(("0.0.0.0", Config.port), Handler) logging.info("Feishu bridge listening on :%s", Config.port) httpd.serve_forever() if __name__ == "__main__": main()