auto-save 2026-05-09 18:20 (~6)

This commit is contained in:
2026-05-09 18:20:08 +08:00
parent a64489f40e
commit ee51790383
6 changed files with 271 additions and 23 deletions

View File

@@ -12,6 +12,7 @@ import json
import logging
import os
import hashlib
import re
import threading
import time
import traceback
@@ -23,6 +24,7 @@ from typing import Any
FEISHU_API_BASE = "https://open.feishu.cn/open-apis"
APP_ID_RE = re.compile(r"^cli_[A-Za-z0-9]+$")
def _env(name: str, default: str = "") -> str:
@@ -41,9 +43,13 @@ def _csv_list(value: str) -> list[str]:
return [item.strip() for item in value.split(",") if item.strip()]
def app_suffixes() -> list[str]:
return ["", *[f"_{idx}" for idx in range(2, 21)]]
def _load_feishu_apps() -> dict[str, dict[str, Any]]:
apps: dict[str, dict[str, Any]] = {}
for suffix in ["", *[f"_{idx}" for idx in range(2, 10)]]:
for suffix in app_suffixes():
app_id = _env(f"FEISHU_APP_ID{suffix}")
app_secret = _env(f"FEISHU_APP_SECRET{suffix}")
if not app_id and not app_secret:
@@ -63,6 +69,8 @@ def _load_feishu_apps() -> dict[str, dict[str, Any]]:
class Config:
env_file = _env("FEISHU_BRIDGE_ENV_FILE", "/etc/hermes-feishu-bridge.env")
tokens_file = _env("FEISHU_BRIDGE_TOKENS_FILE", "/root/hermes-feishu-bridge.tokens")
feishu_app_id = _env("FEISHU_APP_ID")
feishu_app_secret = _env("FEISHU_APP_SECRET")
feishu_apps = _load_feishu_apps()
@@ -115,12 +123,86 @@ class TokenCache:
self._tokens[app_id] = (token, expires_at)
return token
def drop(self, app_id: str) -> None:
with self._lock:
self._tokens.pop(app_id, None)
token_cache = TokenCache()
seen_events: dict[str, float] = {}
seen_lock = threading.Lock()
def parse_env_file(path: str) -> dict[str, str]:
values: dict[str, str] = {}
if not os.path.exists(path):
return values
with open(path, "r", encoding="utf-8") as fh:
for raw in fh:
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
values[key.strip()] = value.strip()
return values
def write_env_updates(path: str, updates: dict[str, str]) -> None:
lines: list[str] = []
seen: set[str] = set()
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as fh:
for raw in fh.read().splitlines():
if "=" in raw and not raw.lstrip().startswith("#"):
key = raw.split("=", 1)[0]
if key in updates:
lines.append(f"{key}={updates[key]}")
seen.add(key)
continue
lines.append(raw)
for key, value in updates.items():
if key not in seen:
lines.append(f"{key}={value}")
tmp = f"{path}.tmp-{os.getpid()}"
with open(tmp, "w", encoding="utf-8") as fh:
fh.write("\n".join(lines) + "\n")
os.chmod(tmp, 0o600)
os.replace(tmp, path)
def reload_config_from_env_file() -> None:
values = parse_env_file(Config.env_file)
for key, value in values.items():
os.environ[key] = value
Config.feishu_app_id = _env("FEISHU_APP_ID")
Config.feishu_app_secret = _env("FEISHU_APP_SECRET")
Config.feishu_apps = _load_feishu_apps()
Config.default_feishu_app_id = _env("FEISHU_DEFAULT_APP_ID", Config.feishu_app_id)
def sync_tokens_file() -> None:
env = parse_env_file(Config.env_file)
lines: list[str] = []
notify_token = env.get("FEISHU_NOTIFY_TOKEN")
for suffix in app_suffixes():
app_id = env.get(f"FEISHU_APP_ID{suffix}", "")
if not app_id:
continue
label = suffix or ""
lines.append(f"FEISHU_CALLBACK_URL{label}=https://hermes.kang-kang.com/feishu/events/{app_id}")
lines.append(f"FEISHU_APP_ID{label}={app_id}")
token = env.get(f"FEISHU_VERIFICATION_TOKEN{suffix}", "")
if token:
lines.append(f"FEISHU_VERIFICATION_TOKEN{label}={token}")
if notify_token:
lines.append(f"FEISHU_NOTIFY_TOKEN={notify_token}")
tmp = f"{Config.tokens_file}.tmp-{os.getpid()}"
with open(tmp, "w", encoding="utf-8") as fh:
fh.write("\n".join(lines) + "\n")
os.chmod(tmp, 0o600)
os.replace(tmp, Config.tokens_file)
def http_json(
method: str,
url: str,
@@ -385,6 +467,86 @@ def handle_apps() -> tuple[int, dict[str, Any]]:
}
def is_admin_request(headers: dict[str, str]) -> tuple[bool, int, str]:
cookie = headers.get("cookie", "")
if "hermes_auth=ok" not in cookie:
return False, 401, "login required"
origin = headers.get("origin", "")
referer = headers.get("referer", "")
allowed = "https://hermes.kang-kang.com"
if origin and origin != allowed:
return False, 403, "invalid origin"
if not origin and referer and not referer.startswith(allowed + "/"):
return False, 403, "invalid referer"
return True, 200, "ok"
def validate_feishu_credentials(app_id: str, app_secret: str) -> None:
data = http_json(
"POST",
f"{FEISHU_API_BASE}/auth/v3/tenant_access_token/internal",
{"app_id": app_id, "app_secret": app_secret},
timeout=15,
)
if data.get("code") != 0:
raise ValueError(f"Feishu credential validation failed: {data.get('msg') or data}")
def find_app_suffix(env: dict[str, str], app_id: str) -> str:
for suffix in app_suffixes():
if env.get(f"FEISHU_APP_ID{suffix}") == app_id:
return suffix
for suffix in app_suffixes():
if not env.get(f"FEISHU_APP_ID{suffix}") and not env.get(f"FEISHU_APP_SECRET{suffix}"):
return suffix
raise RuntimeError("no free Feishu app slot; supported slots are FEISHU_APP_ID through FEISHU_APP_ID_20")
def upsert_feishu_app(body: dict[str, Any]) -> dict[str, Any]:
app_id = str(body.get("app_id", "")).strip()
app_secret = str(body.get("app_secret", "")).strip()
verification_token = str(body.get("verification_token", "")).strip()
if not APP_ID_RE.match(app_id):
raise ValueError("App ID must look like cli_xxx")
if len(app_secret) < 16:
raise ValueError("App Secret is too short")
if len(verification_token) < 16:
raise ValueError("Verification Token is too short")
validate_feishu_credentials(app_id, app_secret)
env = parse_env_file(Config.env_file)
suffix = find_app_suffix(env, app_id)
updates = {
f"FEISHU_APP_ID{suffix}": app_id,
f"FEISHU_APP_SECRET{suffix}": app_secret,
f"FEISHU_VERIFICATION_TOKEN{suffix}": verification_token,
}
if not env.get(f"FEISHU_DEFAULT_RECEIVE_ID_TYPE{suffix}"):
updates[f"FEISHU_DEFAULT_RECEIVE_ID_TYPE{suffix}"] = "chat_id"
write_env_updates(Config.env_file, updates)
sync_tokens_file()
reload_config_from_env_file()
token_cache.drop(app_id)
return {
"app_id": app_id,
"callback_url": f"https://hermes.kang-kang.com/feishu/events/{app_id}",
"slot": suffix or "_1",
}
def handle_apps_admin(headers: dict[str, str], body: dict[str, Any]) -> tuple[int, dict[str, Any]]:
ok, status, message = is_admin_request(headers)
if not ok:
return status, {"code": status, "msg": message}
try:
app = upsert_feishu_app(body)
except ValueError as exc:
return 400, {"code": 400, "msg": str(exc)}
logging.info("upserted Feishu app from Hermes settings app_id=%s", app["app_id"])
return 200, {"code": 0, "msg": "ok", "app": app}
def handle_notify(path: str, headers: dict[str, str], body: dict[str, Any]) -> tuple[int, dict[str, Any]]:
expected = Config.notify_token
if not expected:
@@ -435,6 +597,8 @@ class Handler(BaseHTTPRequestHandler):
headers = {key.lower(): value for key, value in self.headers.items()}
if self.path == "/feishu/events" or self.path.startswith("/feishu/events/"):
status, payload = handle_feishu_event(self.path, body)
elif self.path == "/feishu/apps":
status, payload = handle_apps_admin(headers, body)
elif self.path == "/feishu/notify" or self.path.startswith("/feishu/notify/"):
status, payload = handle_notify(self.path, headers, body)
else: