Files
hermes-glass-ui-personal/server/feishu_bridge.py

385 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Feishu <-> Hermes bridge.
This service intentionally uses only the Python standard library so it can run
next to the existing no-build Hermes UI deployment without adding package
management to the project.
"""
from __future__ import annotations
import json
import logging
import os
import threading
import time
import traceback
import urllib.error
import urllib.request
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Any
FEISHU_API_BASE = "https://open.feishu.cn/open-apis"
def _env(name: str, default: str = "") -> str:
return os.environ.get(name, default).strip()
class Config:
feishu_app_id = _env("FEISHU_APP_ID")
feishu_app_secret = _env("FEISHU_APP_SECRET")
feishu_verification_token = _env("FEISHU_VERIFICATION_TOKEN")
feishu_default_receive_id = _env("FEISHU_DEFAULT_RECEIVE_ID")
feishu_default_receive_id_type = _env("FEISHU_DEFAULT_RECEIVE_ID_TYPE", "chat_id")
notify_token = _env("FEISHU_NOTIFY_TOKEN")
allowed_chat_ids = {
item.strip()
for item in _env("FEISHU_ALLOWED_CHAT_IDS").split(",")
if item.strip()
}
reply_in_thread = _env("FEISHU_REPLY_IN_THREAD", "false").lower() in {"1", "true", "yes"}
hermes_api_base = _env("HERMES_API_BASE", "http://127.0.0.1:8642/v1").rstrip("/")
hermes_api_key = _env("HERMES_API_KEY")
hermes_model = _env("HERMES_MODEL", "gemini-3-pro-preview")
hermes_system_prompt = _env(
"HERMES_SYSTEM_PROMPT",
"你是爱马仕 Hermes。你通过飞书与用户对话回答要直接、简洁、可执行。",
)
request_timeout = float(_env("FEISHU_BRIDGE_TIMEOUT", "60"))
port = int(_env("PORT", _env("FEISHU_BRIDGE_PORT", "8787")))
class TokenCache:
def __init__(self) -> None:
self._lock = threading.Lock()
self._token = ""
self._expires_at = 0.0
def get(self) -> str:
with self._lock:
if self._token and time.time() < self._expires_at - 300:
return self._token
if not Config.feishu_app_id or not Config.feishu_app_secret:
raise RuntimeError("FEISHU_APP_ID and FEISHU_APP_SECRET are required")
payload = {
"app_id": Config.feishu_app_id,
"app_secret": Config.feishu_app_secret,
}
data = http_json(
"POST",
f"{FEISHU_API_BASE}/auth/v3/tenant_access_token/internal",
payload,
)
if data.get("code") != 0:
raise RuntimeError(f"Feishu token error: {data}")
self._token = str(data["tenant_access_token"])
self._expires_at = time.time() + int(data.get("expire", 7200))
return self._token
token_cache = TokenCache()
seen_events: dict[str, float] = {}
seen_lock = threading.Lock()
def http_json(
method: str,
url: str,
payload: dict[str, Any],
headers: dict[str, str] | None = None,
timeout: float | None = None,
) -> dict[str, Any]:
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(
url,
data=body,
method=method,
headers={
"Content-Type": "application/json; charset=utf-8",
**(headers or {}),
},
)
try:
with urllib.request.urlopen(req, timeout=timeout or Config.request_timeout) as resp:
raw = resp.read().decode("utf-8")
except urllib.error.HTTPError as exc:
raw = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {exc.code} {url}: {raw}") from exc
if not raw:
return {}
return json.loads(raw)
def json_text(value: Any) -> str:
if isinstance(value, str):
try:
value = json.loads(value)
except json.JSONDecodeError:
return value
if isinstance(value, dict):
for key in ("text", "title", "content"):
text = value.get(key)
if isinstance(text, str) and text.strip():
return text.strip()
return ""
def verify_callback_token(body: dict[str, Any]) -> bool:
expected = Config.feishu_verification_token
if not expected:
return True
token = body.get("token") or body.get("header", {}).get("token")
return token == expected
def remember_event(event_id: str) -> bool:
if not event_id:
return True
now = time.time()
with seen_lock:
stale = [key for key, ts in seen_events.items() if now - ts > 3600]
for key in stale:
seen_events.pop(key, None)
if event_id in seen_events:
return False
seen_events[event_id] = now
return True
def handle_feishu_event(body: dict[str, Any]) -> tuple[int, dict[str, Any]]:
if "encrypt" in body:
return 400, {
"code": 400,
"msg": "encrypted callbacks are not enabled; disable Feishu event encryption or add decrypt support",
}
if body.get("type") == "url_verification":
if not verify_callback_token(body):
return 403, {"code": 403, "msg": "invalid verification token"}
return 200, {"challenge": body.get("challenge", "")}
if not verify_callback_token(body):
return 403, {"code": 403, "msg": "invalid verification token"}
event_type = body.get("header", {}).get("event_type") or body.get("event", {}).get("type")
event_id = body.get("header", {}).get("event_id", "")
if event_type != "im.message.receive_v1":
return 200, {"code": 0, "msg": "ignored"}
if not remember_event(event_id):
return 200, {"code": 0, "msg": "duplicate"}
threading.Thread(target=process_message_event, args=(body,), daemon=True).start()
return 200, {"code": 0, "msg": "ok"}
def process_message_event(body: dict[str, Any]) -> None:
try:
event = body.get("event", {})
sender = event.get("sender", {})
if sender.get("sender_type") == "app":
return
message = event.get("message", {})
message_type = message.get("message_type")
chat_id = message.get("chat_id", "")
message_id = message.get("message_id", "")
if Config.allowed_chat_ids and chat_id not in Config.allowed_chat_ids:
logging.info("ignored message from disallowed chat_id=%s", chat_id)
return
if message_type != "text":
send_feishu_text(chat_id, "我现在只支持处理文本消息。", message_id=message_id)
return
text = json_text(message.get("content"))
if not text:
send_feishu_text(chat_id, "我没有读到文本内容。", message_id=message_id)
return
answer = ask_hermes(text, event)
send_feishu_text(chat_id, answer, message_id=message_id)
except Exception:
logging.error("failed to process Feishu event:\n%s", traceback.format_exc())
def ask_hermes(text: str, event: dict[str, Any]) -> str:
payload = {
"model": Config.hermes_model,
"stream": False,
"messages": [
{"role": "system", "content": Config.hermes_system_prompt},
{
"role": "user",
"content": (
"以下消息来自飞书。\n"
f"chat_id: {event.get('message', {}).get('chat_id', '')}\n\n"
f"{text}"
),
},
],
}
headers = {}
if Config.hermes_api_key:
headers["Authorization"] = f"Bearer {Config.hermes_api_key}"
data = http_json(
"POST",
f"{Config.hermes_api_base}/chat/completions",
payload,
headers=headers,
)
try:
content = data["choices"][0]["message"]["content"]
except (KeyError, IndexError, TypeError) as exc:
raise RuntimeError(f"Unexpected Hermes response: {data}") from exc
return str(content).strip() or "Hermes 没有返回内容。"
def send_feishu_text(
receive_id: str,
text: str,
receive_id_type: str = "chat_id",
message_id: str = "",
) -> dict[str, Any]:
if not receive_id:
raise RuntimeError("receive_id is required")
token = token_cache.get()
chunks = split_text(text)
result: dict[str, Any] = {}
for chunk in chunks:
content = json.dumps({"text": chunk}, ensure_ascii=False)
if Config.reply_in_thread and message_id:
url = f"{FEISHU_API_BASE}/im/v1/messages/{message_id}/reply"
payload = {"msg_type": "text", "content": content}
else:
url = f"{FEISHU_API_BASE}/im/v1/messages?receive_id_type={receive_id_type}"
payload = {"receive_id": receive_id, "msg_type": "text", "content": content}
result = http_json(
"POST",
url,
payload,
headers={"Authorization": f"Bearer {token}"},
)
if result.get("code") not in (None, 0):
raise RuntimeError(f"Feishu send error: {result}")
return result
def split_text(text: str, limit: int = 3800) -> list[str]:
text = text.strip()
if not text:
return [""]
chunks: list[str] = []
rest = text
while len(rest) > limit:
cut = rest.rfind("\n", 0, limit)
if cut < limit // 2:
cut = limit
chunks.append(rest[:cut].strip())
rest = rest[cut:].strip()
chunks.append(rest)
return chunks
def handle_notify(headers: dict[str, str], body: dict[str, Any]) -> tuple[int, dict[str, Any]]:
expected = Config.notify_token
if not expected:
return 503, {"code": 503, "msg": "FEISHU_NOTIFY_TOKEN is not configured"}
auth = headers.get("authorization", "")
token = headers.get("x-hermes-feishu-token", "")
if auth.lower().startswith("bearer "):
token = auth[7:].strip()
if token != expected:
return 401, {"code": 401, "msg": "unauthorized"}
text = str(body.get("text", "")).strip()
if not text:
return 400, {"code": 400, "msg": "text is required"}
receive_id = str(body.get("receive_id") or Config.feishu_default_receive_id).strip()
receive_id_type = str(
body.get("receive_id_type") or Config.feishu_default_receive_id_type
).strip()
if not receive_id:
return 400, {"code": 400, "msg": "receive_id is required"}
result = send_feishu_text(receive_id, text, receive_id_type=receive_id_type)
return 200, {"code": 0, "msg": "ok", "feishu": result}
class Handler(BaseHTTPRequestHandler):
server_version = "HermesFeishuBridge/1.0"
def do_GET(self) -> None:
if self.path == "/health":
self.send_json(200, {"ok": True, "service": "feishu-bridge"})
return
self.send_json(404, {"code": 404, "msg": "not found"})
def do_POST(self) -> None:
try:
body = self.read_json()
headers = {key.lower(): value for key, value in self.headers.items()}
if self.path == "/feishu/events":
status, payload = handle_feishu_event(body)
elif self.path == "/feishu/notify":
status, payload = handle_notify(headers, body)
else:
status, payload = 404, {"code": 404, "msg": "not found"}
except Exception as exc:
logging.error("request failed:\n%s", traceback.format_exc())
status, payload = 500, {"code": 500, "msg": str(exc)}
self.send_json(status, payload)
def read_json(self) -> dict[str, Any]:
length = int(self.headers.get("Content-Length", "0"))
raw = self.rfile.read(length).decode("utf-8") if length else "{}"
if not raw:
return {}
data = json.loads(raw)
if not isinstance(data, dict):
raise ValueError("JSON object is required")
return data
def send_json(self, status: int, payload: dict[str, Any]) -> None:
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def log_message(self, fmt: str, *args: Any) -> None:
logging.info("%s - %s", self.address_string(), fmt % args)
def main() -> None:
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", "INFO"),
format="%(asctime)s %(levelname)s %(message)s",
)
missing = [
name
for name, value in {
"FEISHU_APP_ID": Config.feishu_app_id,
"FEISHU_APP_SECRET": Config.feishu_app_secret,
}.items()
if not value
]
if missing:
logging.warning("missing required env: %s", ", ".join(missing))
httpd = ThreadingHTTPServer(("0.0.0.0", Config.port), Handler)
logging.info("Feishu bridge listening on :%s", Config.port)
httpd.serve_forever()
if __name__ == "__main__":
main()