441 lines
15 KiB
Python
441 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
"""Feishu <-> Hermes bridge.
|
||
|
||
This service intentionally uses only the Python standard library so it can run
|
||
next to the existing no-build Hermes UI deployment without adding package
|
||
management to the project.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import os
|
||
import threading
|
||
import time
|
||
import traceback
|
||
import urllib.error
|
||
import urllib.parse
|
||
import urllib.request
|
||
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
||
from typing import Any
|
||
|
||
|
||
FEISHU_API_BASE = "https://open.feishu.cn/open-apis"
|
||
|
||
|
||
def _env(name: str, default: str = "") -> str:
|
||
return os.environ.get(name, default).strip()
|
||
|
||
|
||
def _bool_env(name: str, default: str = "false") -> bool:
|
||
return _env(name, default).lower() in {"1", "true", "yes"}
|
||
|
||
|
||
def _csv_set(value: str) -> set[str]:
|
||
return {item.strip() for item in value.split(",") if item.strip()}
|
||
|
||
|
||
def _load_feishu_apps() -> dict[str, dict[str, Any]]:
|
||
apps: dict[str, dict[str, Any]] = {}
|
||
for suffix in ["", *[f"_{idx}" for idx in range(2, 10)]]:
|
||
app_id = _env(f"FEISHU_APP_ID{suffix}")
|
||
app_secret = _env(f"FEISHU_APP_SECRET{suffix}")
|
||
if not app_id and not app_secret:
|
||
continue
|
||
apps[app_id] = {
|
||
"app_id": app_id,
|
||
"app_secret": app_secret,
|
||
"verification_token": _env(f"FEISHU_VERIFICATION_TOKEN{suffix}"),
|
||
"default_receive_id": _env(f"FEISHU_DEFAULT_RECEIVE_ID{suffix}"),
|
||
"default_receive_id_type": _env(
|
||
f"FEISHU_DEFAULT_RECEIVE_ID_TYPE{suffix}",
|
||
"chat_id",
|
||
),
|
||
"allowed_chat_ids": _csv_set(_env(f"FEISHU_ALLOWED_CHAT_IDS{suffix}")),
|
||
}
|
||
return apps
|
||
|
||
|
||
class Config:
|
||
feishu_app_id = _env("FEISHU_APP_ID")
|
||
feishu_app_secret = _env("FEISHU_APP_SECRET")
|
||
feishu_apps = _load_feishu_apps()
|
||
default_feishu_app_id = _env("FEISHU_DEFAULT_APP_ID", feishu_app_id)
|
||
notify_token = _env("FEISHU_NOTIFY_TOKEN")
|
||
reply_in_thread = _bool_env("FEISHU_REPLY_IN_THREAD")
|
||
|
||
hermes_api_base = _env("HERMES_API_BASE", "http://127.0.0.1:8642/v1").rstrip("/")
|
||
hermes_api_key = _env("HERMES_API_KEY")
|
||
hermes_model = _env("HERMES_MODEL", "gemini-3-pro-preview")
|
||
hermes_system_prompt = _env(
|
||
"HERMES_SYSTEM_PROMPT",
|
||
"你是爱马仕 Hermes。你通过飞书与用户对话,回答要直接、简洁、可执行。",
|
||
)
|
||
request_timeout = float(_env("FEISHU_BRIDGE_TIMEOUT", "60"))
|
||
port = int(_env("PORT", _env("FEISHU_BRIDGE_PORT", "8787")))
|
||
|
||
|
||
class TokenCache:
|
||
def __init__(self) -> None:
|
||
self._lock = threading.Lock()
|
||
self._tokens: dict[str, tuple[str, float]] = {}
|
||
|
||
def get(self, app_id: str) -> str:
|
||
with self._lock:
|
||
token, expires_at = self._tokens.get(app_id, ("", 0.0))
|
||
if token and time.time() < expires_at - 300:
|
||
return token
|
||
|
||
app = Config.feishu_apps.get(app_id)
|
||
if not app:
|
||
raise RuntimeError(f"unknown Feishu app_id: {app_id}")
|
||
if not app.get("app_id") or not app.get("app_secret"):
|
||
raise RuntimeError(f"FEISHU_APP_ID and FEISHU_APP_SECRET are required for {app_id}")
|
||
|
||
payload = {
|
||
"app_id": app["app_id"],
|
||
"app_secret": app["app_secret"],
|
||
}
|
||
data = http_json(
|
||
"POST",
|
||
f"{FEISHU_API_BASE}/auth/v3/tenant_access_token/internal",
|
||
payload,
|
||
)
|
||
if data.get("code") != 0:
|
||
raise RuntimeError(f"Feishu token error: {data}")
|
||
|
||
token = str(data["tenant_access_token"])
|
||
expires_at = time.time() + int(data.get("expire", 7200))
|
||
self._tokens[app_id] = (token, expires_at)
|
||
return token
|
||
|
||
|
||
token_cache = TokenCache()
|
||
seen_events: dict[str, float] = {}
|
||
seen_lock = threading.Lock()
|
||
|
||
|
||
def http_json(
|
||
method: str,
|
||
url: str,
|
||
payload: dict[str, Any],
|
||
headers: dict[str, str] | None = None,
|
||
timeout: float | None = None,
|
||
) -> dict[str, Any]:
|
||
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
||
req = urllib.request.Request(
|
||
url,
|
||
data=body,
|
||
method=method,
|
||
headers={
|
||
"Content-Type": "application/json; charset=utf-8",
|
||
**(headers or {}),
|
||
},
|
||
)
|
||
try:
|
||
with urllib.request.urlopen(req, timeout=timeout or Config.request_timeout) as resp:
|
||
raw = resp.read().decode("utf-8")
|
||
except urllib.error.HTTPError as exc:
|
||
raw = exc.read().decode("utf-8", errors="replace")
|
||
raise RuntimeError(f"HTTP {exc.code} {url}: {raw}") from exc
|
||
|
||
if not raw:
|
||
return {}
|
||
return json.loads(raw)
|
||
|
||
|
||
def json_text(value: Any) -> str:
|
||
if isinstance(value, str):
|
||
try:
|
||
value = json.loads(value)
|
||
except json.JSONDecodeError:
|
||
return value
|
||
if isinstance(value, dict):
|
||
for key in ("text", "title", "content"):
|
||
text = value.get(key)
|
||
if isinstance(text, str) and text.strip():
|
||
return text.strip()
|
||
return ""
|
||
|
||
|
||
def resolve_app_id(path: str, body: dict[str, Any] | None = None) -> str:
|
||
route_app_id = ""
|
||
for prefix in ("/feishu/events/", "/feishu/notify/"):
|
||
if path.startswith(prefix):
|
||
route_app_id = urllib.parse.unquote(path[len(prefix) :].strip("/"))
|
||
break
|
||
if route_app_id:
|
||
return route_app_id
|
||
|
||
body = body or {}
|
||
body_app_id = str(body.get("app_id") or body.get("header", {}).get("app_id") or "").strip()
|
||
if body_app_id in Config.feishu_apps:
|
||
return body_app_id
|
||
return Config.default_feishu_app_id
|
||
|
||
|
||
def verify_callback_token(body: dict[str, Any], app_id: str) -> bool:
|
||
app = Config.feishu_apps.get(app_id, {})
|
||
expected = app.get("verification_token", "")
|
||
if not expected:
|
||
return True
|
||
token = body.get("token") or body.get("header", {}).get("token")
|
||
return token == expected
|
||
|
||
|
||
def remember_event(event_id: str) -> bool:
|
||
if not event_id:
|
||
return True
|
||
now = time.time()
|
||
with seen_lock:
|
||
stale = [key for key, ts in seen_events.items() if now - ts > 3600]
|
||
for key in stale:
|
||
seen_events.pop(key, None)
|
||
if event_id in seen_events:
|
||
return False
|
||
seen_events[event_id] = now
|
||
return True
|
||
|
||
|
||
def handle_feishu_event(path: str, body: dict[str, Any]) -> tuple[int, dict[str, Any]]:
|
||
app_id = resolve_app_id(path, body)
|
||
if app_id not in Config.feishu_apps:
|
||
return 404, {"code": 404, "msg": f"unknown Feishu app_id: {app_id}"}
|
||
|
||
if "encrypt" in body:
|
||
return 400, {
|
||
"code": 400,
|
||
"msg": "encrypted callbacks are not enabled; disable Feishu event encryption or add decrypt support",
|
||
}
|
||
|
||
if body.get("type") == "url_verification":
|
||
if not verify_callback_token(body, app_id):
|
||
return 403, {"code": 403, "msg": "invalid verification token"}
|
||
return 200, {"challenge": body.get("challenge", "")}
|
||
|
||
if not verify_callback_token(body, app_id):
|
||
return 403, {"code": 403, "msg": "invalid verification token"}
|
||
|
||
event_type = body.get("header", {}).get("event_type") or body.get("event", {}).get("type")
|
||
event_id = body.get("header", {}).get("event_id", "")
|
||
if event_type != "im.message.receive_v1":
|
||
return 200, {"code": 0, "msg": "ignored"}
|
||
if not remember_event(f"{app_id}:{event_id}"):
|
||
return 200, {"code": 0, "msg": "duplicate"}
|
||
|
||
threading.Thread(target=process_message_event, args=(app_id, body), daemon=True).start()
|
||
return 200, {"code": 0, "msg": "ok"}
|
||
|
||
|
||
def process_message_event(app_id: str, body: dict[str, Any]) -> None:
|
||
try:
|
||
app = Config.feishu_apps[app_id]
|
||
event = body.get("event", {})
|
||
sender = event.get("sender", {})
|
||
if sender.get("sender_type") == "app":
|
||
return
|
||
|
||
message = event.get("message", {})
|
||
message_type = message.get("message_type")
|
||
chat_id = message.get("chat_id", "")
|
||
message_id = message.get("message_id", "")
|
||
allowed_chat_ids = app.get("allowed_chat_ids", set())
|
||
if allowed_chat_ids and chat_id not in allowed_chat_ids:
|
||
logging.info("ignored message from disallowed app_id=%s chat_id=%s", app_id, chat_id)
|
||
return
|
||
|
||
if message_type != "text":
|
||
send_feishu_text(app_id, chat_id, "我现在只支持处理文本消息。", message_id=message_id)
|
||
return
|
||
|
||
text = json_text(message.get("content"))
|
||
if not text:
|
||
send_feishu_text(app_id, chat_id, "我没有读到文本内容。", message_id=message_id)
|
||
return
|
||
|
||
answer = ask_hermes(text, event)
|
||
send_feishu_text(app_id, chat_id, answer, message_id=message_id)
|
||
except Exception:
|
||
logging.error("failed to process Feishu event:\n%s", traceback.format_exc())
|
||
|
||
|
||
def ask_hermes(text: str, event: dict[str, Any]) -> str:
|
||
payload = {
|
||
"model": Config.hermes_model,
|
||
"stream": False,
|
||
"messages": [
|
||
{"role": "system", "content": Config.hermes_system_prompt},
|
||
{
|
||
"role": "user",
|
||
"content": (
|
||
"以下消息来自飞书。\n"
|
||
f"chat_id: {event.get('message', {}).get('chat_id', '')}\n\n"
|
||
f"{text}"
|
||
),
|
||
},
|
||
],
|
||
}
|
||
headers = {}
|
||
if Config.hermes_api_key:
|
||
headers["Authorization"] = f"Bearer {Config.hermes_api_key}"
|
||
data = http_json(
|
||
"POST",
|
||
f"{Config.hermes_api_base}/chat/completions",
|
||
payload,
|
||
headers=headers,
|
||
)
|
||
try:
|
||
content = data["choices"][0]["message"]["content"]
|
||
except (KeyError, IndexError, TypeError) as exc:
|
||
raise RuntimeError(f"Unexpected Hermes response: {data}") from exc
|
||
return str(content).strip() or "Hermes 没有返回内容。"
|
||
|
||
|
||
def send_feishu_text(
|
||
app_id: str,
|
||
receive_id: str,
|
||
text: str,
|
||
receive_id_type: str = "chat_id",
|
||
message_id: str = "",
|
||
) -> dict[str, Any]:
|
||
if not receive_id:
|
||
raise RuntimeError("receive_id is required")
|
||
|
||
token = token_cache.get(app_id)
|
||
chunks = split_text(text)
|
||
result: dict[str, Any] = {}
|
||
for chunk in chunks:
|
||
content = json.dumps({"text": chunk}, ensure_ascii=False)
|
||
if Config.reply_in_thread and message_id:
|
||
url = f"{FEISHU_API_BASE}/im/v1/messages/{message_id}/reply"
|
||
payload = {"msg_type": "text", "content": content}
|
||
else:
|
||
url = f"{FEISHU_API_BASE}/im/v1/messages?receive_id_type={receive_id_type}"
|
||
payload = {"receive_id": receive_id, "msg_type": "text", "content": content}
|
||
result = http_json(
|
||
"POST",
|
||
url,
|
||
payload,
|
||
headers={"Authorization": f"Bearer {token}"},
|
||
)
|
||
if result.get("code") not in (None, 0):
|
||
raise RuntimeError(f"Feishu send error: {result}")
|
||
return result
|
||
|
||
|
||
def split_text(text: str, limit: int = 3800) -> list[str]:
|
||
text = text.strip()
|
||
if not text:
|
||
return [""]
|
||
chunks: list[str] = []
|
||
rest = text
|
||
while len(rest) > limit:
|
||
cut = rest.rfind("\n", 0, limit)
|
||
if cut < limit // 2:
|
||
cut = limit
|
||
chunks.append(rest[:cut].strip())
|
||
rest = rest[cut:].strip()
|
||
chunks.append(rest)
|
||
return chunks
|
||
|
||
|
||
def handle_notify(path: str, headers: dict[str, str], body: dict[str, Any]) -> tuple[int, dict[str, Any]]:
|
||
expected = Config.notify_token
|
||
if not expected:
|
||
return 503, {"code": 503, "msg": "FEISHU_NOTIFY_TOKEN is not configured"}
|
||
|
||
auth = headers.get("authorization", "")
|
||
token = headers.get("x-hermes-feishu-token", "")
|
||
if auth.lower().startswith("bearer "):
|
||
token = auth[7:].strip()
|
||
if token != expected:
|
||
return 401, {"code": 401, "msg": "unauthorized"}
|
||
|
||
app_id = str(body.get("app_id") or resolve_app_id(path, body)).strip()
|
||
app = Config.feishu_apps.get(app_id)
|
||
if not app:
|
||
return 404, {"code": 404, "msg": f"unknown Feishu app_id: {app_id}"}
|
||
|
||
text = str(body.get("text", "")).strip()
|
||
if not text:
|
||
return 400, {"code": 400, "msg": "text is required"}
|
||
receive_id = str(body.get("receive_id") or app.get("default_receive_id", "")).strip()
|
||
receive_id_type = str(
|
||
body.get("receive_id_type") or app.get("default_receive_id_type", "chat_id")
|
||
).strip()
|
||
if not receive_id:
|
||
return 400, {"code": 400, "msg": "receive_id is required"}
|
||
|
||
result = send_feishu_text(app_id, receive_id, text, receive_id_type=receive_id_type)
|
||
return 200, {"code": 0, "msg": "ok", "app_id": app_id, "feishu": result}
|
||
|
||
|
||
class Handler(BaseHTTPRequestHandler):
|
||
server_version = "HermesFeishuBridge/1.0"
|
||
|
||
def do_GET(self) -> None:
|
||
if self.path == "/health":
|
||
self.send_json(200, {"ok": True, "service": "feishu-bridge"})
|
||
return
|
||
self.send_json(404, {"code": 404, "msg": "not found"})
|
||
|
||
def do_POST(self) -> None:
|
||
try:
|
||
body = self.read_json()
|
||
headers = {key.lower(): value for key, value in self.headers.items()}
|
||
if self.path == "/feishu/events" or self.path.startswith("/feishu/events/"):
|
||
status, payload = handle_feishu_event(self.path, body)
|
||
elif self.path == "/feishu/notify" or self.path.startswith("/feishu/notify/"):
|
||
status, payload = handle_notify(self.path, headers, body)
|
||
else:
|
||
status, payload = 404, {"code": 404, "msg": "not found"}
|
||
except Exception as exc:
|
||
logging.error("request failed:\n%s", traceback.format_exc())
|
||
status, payload = 500, {"code": 500, "msg": str(exc)}
|
||
self.send_json(status, payload)
|
||
|
||
def read_json(self) -> dict[str, Any]:
|
||
length = int(self.headers.get("Content-Length", "0"))
|
||
raw = self.rfile.read(length).decode("utf-8") if length else "{}"
|
||
if not raw:
|
||
return {}
|
||
data = json.loads(raw)
|
||
if not isinstance(data, dict):
|
||
raise ValueError("JSON object is required")
|
||
return data
|
||
|
||
def send_json(self, status: int, payload: dict[str, Any]) -> None:
|
||
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
||
self.send_response(status)
|
||
self.send_header("Content-Type", "application/json; charset=utf-8")
|
||
self.send_header("Content-Length", str(len(data)))
|
||
self.end_headers()
|
||
self.wfile.write(data)
|
||
|
||
def log_message(self, fmt: str, *args: Any) -> None:
|
||
logging.info("%s - %s", self.address_string(), fmt % args)
|
||
|
||
|
||
def main() -> None:
|
||
logging.basicConfig(
|
||
level=os.environ.get("LOG_LEVEL", "INFO"),
|
||
format="%(asctime)s %(levelname)s %(message)s",
|
||
)
|
||
missing = []
|
||
if not Config.feishu_apps:
|
||
missing.append("FEISHU_APP_ID and FEISHU_APP_SECRET")
|
||
for app_id, app in Config.feishu_apps.items():
|
||
if not app.get("app_secret"):
|
||
missing.append(f"FEISHU_APP_SECRET for {app_id}")
|
||
if missing:
|
||
logging.warning("missing required env: %s", ", ".join(missing))
|
||
logging.info("configured Feishu apps: %s", ", ".join(Config.feishu_apps) or "(none)")
|
||
httpd = ThreadingHTTPServer(("0.0.0.0", Config.port), Handler)
|
||
logging.info("Feishu bridge listening on :%s", Config.port)
|
||
httpd.serve_forever()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|