feat: add Gemini image fallback circuit breaker

This commit is contained in:
2026-05-19 23:56:20 +08:00
parent 516d99ba8c
commit 3756259850
7 changed files with 212 additions and 54 deletions

View File

@@ -97,14 +97,24 @@ AI_HTTP_PROXY = (
or os.getenv("http_proxy")
or ""
).strip()
# Product decision: every image-generation/editing path is locked to gpt-image-2.
# Environment variables may still choose the gateway URL/key, but not the model.
# Product decision: gpt-image-2 remains the primary image model. Gemini is only
# allowed as an outage fallback when the primary gateway times out or returns
# transient upstream failures.
GPT_IMAGE_MODEL = "gpt-image-2"
IMAGE_FALLBACK_MODEL = os.getenv("IMAGE_FALLBACK_MODEL", "gemini-3-pro-image-preview").strip() or ""
IMAGE_FALLBACK_ENABLED = os.getenv("IMAGE_FALLBACK_ENABLED", "true").strip().lower() not in {"0", "false", "no", "off"}
IMAGE_MODEL = GPT_IMAGE_MODEL
PRODUCT_VIEW_MODEL = GPT_IMAGE_MODEL
SUBJECT_ASSET_IMAGE_MODEL = GPT_IMAGE_MODEL
SUBJECT_ASSET_IMAGE_MODELS = [GPT_IMAGE_MODEL]
SUBJECT_ASSET_IMAGE_MODELS = [GPT_IMAGE_MODEL] + (
[IMAGE_FALLBACK_MODEL] if IMAGE_FALLBACK_ENABLED and IMAGE_FALLBACK_MODEL and IMAGE_FALLBACK_MODEL != GPT_IMAGE_MODEL else []
)
IMAGE_REQUEST_TIMEOUT_SECONDS = max(15, min(180, int(os.getenv("IMAGE_REQUEST_TIMEOUT_SECONDS", "60"))))
IMAGE_CIRCUIT_FAILURE_THRESHOLD = max(1, int(os.getenv("IMAGE_CIRCUIT_FAILURE_THRESHOLD", "2")))
IMAGE_CIRCUIT_COOLDOWN_SECONDS = max(60, int(os.getenv("IMAGE_CIRCUIT_COOLDOWN_SECONDS", "600")))
_IMAGE_CIRCUIT_LOCK = threading.Lock()
_IMAGE_PRIMARY_FAILURES = 0
_IMAGE_PRIMARY_OPEN_UNTIL = 0.0
PRODUCT_ASSET_MAX_SIDE = max(1024, int(os.getenv("PRODUCT_ASSET_MAX_SIDE", "1600")))
PRODUCT_ASSET_MIN_LONG_SIDE = max(512, int(os.getenv("PRODUCT_ASSET_MIN_LONG_SIDE", "900")))
PRODUCT_ASSET_MIN_SHORT_SIDE = max(320, int(os.getenv("PRODUCT_ASSET_MIN_SHORT_SIDE", "600")))
@@ -3511,6 +3521,83 @@ def _image_is_transport_error(message: str) -> bool:
)
def _image_fallback_models() -> list[str]:
if not IMAGE_FALLBACK_ENABLED or not IMAGE_FALLBACK_MODEL or IMAGE_FALLBACK_MODEL == GPT_IMAGE_MODEL:
return []
return [IMAGE_FALLBACK_MODEL]
def _image_circuit_snapshot() -> dict:
now = time.time()
with _IMAGE_CIRCUIT_LOCK:
open_until = _IMAGE_PRIMARY_OPEN_UNTIL
return {
"primary": GPT_IMAGE_MODEL,
"fallbacks": _image_fallback_models(),
"failure_threshold": IMAGE_CIRCUIT_FAILURE_THRESHOLD,
"cooldown_seconds": IMAGE_CIRCUIT_COOLDOWN_SECONDS,
"primary_failures": _IMAGE_PRIMARY_FAILURES,
"primary_open": open_until > now,
"primary_open_until": open_until if open_until > now else 0,
"primary_open_remaining_seconds": max(0, int(open_until - now)),
}
def _image_primary_circuit_open() -> bool:
return _image_circuit_snapshot()["primary_open"]
def _image_model_candidates(force_fallback: bool = False) -> list[str]:
fallbacks = _image_fallback_models()
if not fallbacks:
return [GPT_IMAGE_MODEL]
if force_fallback or _image_primary_circuit_open():
return fallbacks
return [GPT_IMAGE_MODEL, *fallbacks]
def _image_failure_can_fallback(status_code: int, body: str, last_err: str) -> bool:
if status_code in (400, 401, 403, 404):
return False
return (
status_code == 429
or status_code >= 500
or _image_is_capacity_error(status_code, body)
or _image_is_transport_error(last_err)
or "timeout" in (body or "").lower()
)
def _image_record_primary_success() -> None:
global _IMAGE_PRIMARY_FAILURES, _IMAGE_PRIMARY_OPEN_UNTIL
with _IMAGE_CIRCUIT_LOCK:
if _IMAGE_PRIMARY_FAILURES or _IMAGE_PRIMARY_OPEN_UNTIL:
print(f"[image circuit] primary {GPT_IMAGE_MODEL} recovered", flush=True)
_IMAGE_PRIMARY_FAILURES = 0
_IMAGE_PRIMARY_OPEN_UNTIL = 0.0
def _image_record_primary_failure(reason: str) -> None:
global _IMAGE_PRIMARY_FAILURES, _IMAGE_PRIMARY_OPEN_UNTIL
if not _image_fallback_models():
return
with _IMAGE_CIRCUIT_LOCK:
_IMAGE_PRIMARY_FAILURES += 1
if _IMAGE_PRIMARY_FAILURES >= IMAGE_CIRCUIT_FAILURE_THRESHOLD:
_IMAGE_PRIMARY_OPEN_UNTIL = time.time() + IMAGE_CIRCUIT_COOLDOWN_SECONDS
print(
f"[image circuit] primary {GPT_IMAGE_MODEL} opened for {IMAGE_CIRCUIT_COOLDOWN_SECONDS}s "
f"after {_IMAGE_PRIMARY_FAILURES} failures; fallback={IMAGE_FALLBACK_MODEL}; reason={reason[:220]}",
flush=True,
)
else:
print(
f"[image circuit] primary {GPT_IMAGE_MODEL} failure {_IMAGE_PRIMARY_FAILURES}/{IMAGE_CIRCUIT_FAILURE_THRESHOLD}; "
f"fallback={IMAGE_FALLBACK_MODEL}; reason={reason[:220]}",
flush=True,
)
def _image_failure_message(kind: str, attempts: int, last_err: str, capacity_seen: bool) -> str:
if capacity_seen:
return (
@@ -3604,36 +3691,37 @@ def _image_edit_call(
fallback_text: bool = False,
max_attempts: int = 3,
max_side: int = 1024,
force_fallback_model: bool = False,
) -> tuple[bytes, str]:
"""通用 image edit 调用 · 失败重试 + 可选 text fallback。
返回 (image_bytes, effective_mode) where effective_mode in {"edit","text"}。
失败 raise RuntimeError。
输入图自动 resize 到 max_side默认 1024边长后再用 multipart 上传;多参考图使用 image[]。
生图模型按产品规则强制使用 gpt-image-2model/models 参数只保留兼容旧调用。"""
生图模型主路径使用 gpt-image-2Gemini 只在主模型上游异常时兜底。model/models 参数只保留兼容旧调用。"""
import base64 as b64lib
import time as _time
import httpx
if not IMAGE_API_KEY:
raise RuntimeError("IMAGE_API_KEY 或 LLM_API_KEY 未配置")
models_cycle = [GPT_IMAGE_MODEL]
model = GPT_IMAGE_MODEL
image_paths = image_path if isinstance(image_path, list) else [image_path]
image_paths = [path for path in image_paths if path and path.exists()][:10]
if not image_paths:
raise RuntimeError("image edit reference image missing")
img_bytes_list = [_prepare_image_edit_bytes(path, max_side) for path in image_paths]
plan: list[str] = ["edit"] * max_attempts
model_candidates = _image_model_candidates(force_fallback=force_fallback_model)
mode_plan: list[str] = ["edit"] if model_candidates != [GPT_IMAGE_MODEL] else ["edit"] * max_attempts
if fallback_text:
plan.append("text")
mode_plan.append("text")
attempt_steps = [(current_mode, current_model) for current_mode in mode_plan for current_model in model_candidates]
last_err = ""
resp_data: dict = {}
effective_mode = "edit"
capacity_seen = False
attempts_done = 0
for attempt, current_mode in enumerate(plan):
for attempt, (current_mode, current_model) in enumerate(attempt_steps):
attempts_done = attempt + 1
current_model = models_cycle[min(attempt, len(models_cycle) - 1)]
status_code = 0
body = ""
retry_after: str | None = None
@@ -3660,8 +3748,10 @@ def _image_edit_call(
else:
resp_data = _image_generation_response(prompt, current_model)
if resp_data.get("data"):
effective_mode = current_mode
effective_mode = f"{current_mode}:{current_model}"
model = current_model # 记录实际成功的 model
if current_model == GPT_IMAGE_MODEL:
_image_record_primary_success()
break
err_obj = resp_data.get("error") or {}
last_err = f"empty data · {err_obj.get('code', '')} · {str(err_obj.get('message', ''))[:200]} · model={current_model}"
@@ -3677,9 +3767,15 @@ def _image_edit_call(
except Exception as e:
last_err = f"{type(e).__name__}: {e} · model={current_model}"
next_mode_changed = attempt < len(plan) - 1 and plan[attempt + 1] != current_mode
if _image_should_retry(attempt, len(plan), status_code, body, last_err, next_mode_changed):
tag = f"retry {attempt + 1}/{len(plan)}{GPT_IMAGE_MODEL}"
fallbackable = current_model == GPT_IMAGE_MODEL and _image_failure_can_fallback(status_code, body, last_err)
if fallbackable:
_image_record_primary_failure(last_err)
if any(next_model != GPT_IMAGE_MODEL for _next_mode, next_model in attempt_steps[attempt + 1:]):
print(f"[image edit fallback → {IMAGE_FALLBACK_MODEL}] {last_err}", flush=True)
continue
next_mode_changed = attempt < len(attempt_steps) - 1 and attempt_steps[attempt + 1][0] != current_mode
if _image_should_retry(attempt, len(attempt_steps), status_code, body, last_err, next_mode_changed):
tag = f"retry {attempt + 1}/{len(attempt_steps)}{current_model}"
delay = _image_retry_delay(attempt, status_code, body, retry_after)
print(f"[image edit {tag}, sleep {delay:.0f}s] {last_err}", flush=True)
_time.sleep(delay)
@@ -3706,20 +3802,21 @@ def _image_text_call(
model: str | None = None,
models: list[str] | None = None,
max_attempts: int = 3,
force_fallback_model: bool = False,
) -> tuple[bytes, str]:
"""Text-only image generation. 生图模型强制使用 gpt-image-2"""
"""Text-only image generation. gpt-image-2 primary, Gemini only as outage fallback."""
import base64 as b64lib
import time as _time
import httpx
if not IMAGE_API_KEY:
raise RuntimeError("IMAGE_API_KEY 或 LLM_API_KEY 未配置")
models_cycle = [GPT_IMAGE_MODEL]
candidates = _image_model_candidates(force_fallback=force_fallback_model)
attempt_models = candidates if candidates != [GPT_IMAGE_MODEL] else [GPT_IMAGE_MODEL] * max_attempts
last_err = ""
capacity_seen = False
attempts_done = 0
for attempt in range(max_attempts):
for attempt, current_model in enumerate(attempt_models):
attempts_done = attempt + 1
current_model = models_cycle[min(attempt, len(models_cycle) - 1)]
status_code = 0
body = ""
retry_after: str | None = None
@@ -3729,12 +3826,16 @@ def _image_text_call(
item = resp_data["data"][0]
b64 = item.get("b64_json")
if b64:
return b64lib.b64decode(b64), "text"
if current_model == GPT_IMAGE_MODEL:
_image_record_primary_success()
return b64lib.b64decode(b64), f"text:{current_model}"
if item.get("url"):
with ai_http_client(timeout=IMAGE_REQUEST_TIMEOUT_SECONDS) as client:
image_resp = client.get(item["url"])
image_resp.raise_for_status()
return image_resp.content, "text"
if current_model == GPT_IMAGE_MODEL:
_image_record_primary_success()
return image_resp.content, f"text:{current_model}"
err_obj = resp_data.get("error") or {}
last_err = f"empty data · {err_obj.get('code', '')} · {str(err_obj.get('message', ''))[:200]} · model={current_model}"
except httpx.HTTPStatusError as e:
@@ -3748,9 +3849,15 @@ def _image_text_call(
body = str(e)
status_code = 429 if "429" in body or "saturated" in body.lower() or "饱和" in body else 0
capacity_seen = capacity_seen or _image_is_capacity_error(status_code, body)
if _image_should_retry(attempt, max_attempts, status_code, body, last_err):
fallbackable = current_model == GPT_IMAGE_MODEL and _image_failure_can_fallback(status_code, body, last_err)
if fallbackable:
_image_record_primary_failure(last_err)
if any(next_model != GPT_IMAGE_MODEL for next_model in attempt_models[attempt + 1:]):
print(f"[image text fallback → {IMAGE_FALLBACK_MODEL}] {last_err}", flush=True)
continue
if _image_should_retry(attempt, len(attempt_models), status_code, body, last_err):
delay = _image_retry_delay(attempt, status_code, body, retry_after)
print(f"[image text retry {attempt + 1}/{max_attempts}{GPT_IMAGE_MODEL}, sleep {delay:.0f}s] {last_err}", flush=True)
print(f"[image text retry {attempt + 1}/{len(attempt_models)}{current_model}, sleep {delay:.0f}s] {last_err}", flush=True)
_time.sleep(delay)
else:
break
@@ -4116,7 +4223,8 @@ def health() -> dict:
"image_base_url": IMAGE_BASE_URL or LLM_BASE_URL or "openai-default",
"image_request_timeout_seconds": IMAGE_REQUEST_TIMEOUT_SECONDS,
"ai_proxy_configured": bool(AI_HTTP_PROXY),
"image_fallbacks": [GPT_IMAGE_MODEL],
"image_fallbacks": _image_fallback_models(),
"image_circuit": _image_circuit_snapshot(),
"subject_image": SUBJECT_ASSET_IMAGE_MODEL,
"subject_image_fallbacks": SUBJECT_ASSET_IMAGE_MODELS,
"voice_provider": VOICE_PROVIDER,
@@ -4447,16 +4555,18 @@ def generate_image(job_id: str, idx: int, req: GenerateReq) -> Job:
if req.mode == "edit":
img_bytes_in = reference_path.read_bytes()
# 尝试 i2i 最多 3 次,全失败时降级 text-only 再试 1 次
plan: list[str] = ([req.mode] * 3) if req.mode == "edit" else [req.mode]
# 尝试 i2i;主模型上游异常时允许 Gemini 兜底。无兜底时保留旧的多次重试。
model_candidates = _image_model_candidates()
plan: list[str] = ([req.mode] if model_candidates != [GPT_IMAGE_MODEL] else [req.mode] * 3) if req.mode == "edit" else [req.mode]
if req.mode == "edit":
plan.append("text") # i2i 都失败时自动降级
attempt_steps = [(current_mode, current_model) for current_mode in plan for current_model in model_candidates]
resp_data: dict = {}
last_err = ""
effective_mode = req.mode
capacity_seen = False
attempts_done = 0
for attempt, current_mode in enumerate(plan):
for attempt, (current_mode, current_model) in enumerate(attempt_steps):
attempts_done = attempt + 1
status_code = 0
body = ""
@@ -4471,20 +4581,23 @@ def generate_image(job_id: str, idx: int, req: GenerateReq) -> Job:
headers={
"Authorization": f"Bearer {IMAGE_API_KEY}",
},
data={"model": model, "prompt": full_prompt, "n": "1"},
data={"model": current_model, "prompt": full_prompt, "n": "1"},
files={"image": ("reference.jpg", img_bytes_in, "image/jpeg")},
)
r.raise_for_status()
resp_data = r.json()
else:
# text-only
resp_data = _image_generation_response(full_prompt, model)
resp_data = _image_generation_response(full_prompt, current_model)
if resp_data.get("data"):
effective_mode = current_mode
effective_mode = f"{current_mode}:{current_model}"
model = current_model
if current_model == GPT_IMAGE_MODEL:
_image_record_primary_success()
break
err_obj = resp_data.get("error") or {}
last_err = f"empty data · {err_obj.get('code', '')} · {str(err_obj.get('message', ''))[:200]}"
last_err = f"empty data · {err_obj.get('code', '')} · {str(err_obj.get('message', ''))[:200]} · model={current_model}"
except httpx.HTTPStatusError as e:
body = e.response.text
status_code = e.response.status_code
@@ -4498,16 +4611,22 @@ def generate_image(job_id: str, idx: int, req: GenerateReq) -> Job:
or "timeout" in body.lower()
or _image_is_capacity_error(status_code, body)
)
last_err = f"HTTP {status_code}: {body[:200]}"
last_err = f"HTTP {status_code}: {body[:200]} · model={current_model}"
if not transient:
raise HTTPException(500, f"image gen HTTP {status_code}: {body[:300]}")
except Exception as e:
last_err = f"{type(e).__name__}: {e}"
last_err = f"{type(e).__name__}: {e} · model={current_model}"
next_mode_changed = attempt < len(plan) - 1 and plan[attempt + 1] != current_mode
if _image_should_retry(attempt, len(plan), status_code, body, last_err, next_mode_changed):
next_mode = plan[attempt + 1]
tag = f"fallback → {next_mode}" if next_mode != current_mode else f"retry {attempt + 1}/{len(plan)}"
fallbackable = current_model == GPT_IMAGE_MODEL and _image_failure_can_fallback(status_code, body, last_err)
if fallbackable:
_image_record_primary_failure(last_err)
if any(next_model != GPT_IMAGE_MODEL for _next_mode, next_model in attempt_steps[attempt + 1:]):
print(f"[image gen fallback → {IMAGE_FALLBACK_MODEL}] {last_err}", flush=True)
continue
next_mode_changed = attempt < len(attempt_steps) - 1 and attempt_steps[attempt + 1][0] != current_mode
if _image_should_retry(attempt, len(attempt_steps), status_code, body, last_err, next_mode_changed):
next_mode = attempt_steps[attempt + 1][0]
tag = f"fallback → {next_mode}" if next_mode != current_mode else f"retry {attempt + 1}/{len(attempt_steps)}"
print(f"[image gen {tag}] {last_err}", flush=True)
_time.sleep(_image_retry_delay(attempt, status_code, body, retry_after))
else:
@@ -5677,10 +5796,11 @@ def _generate_subject_assets_sync(job_id: str, idx: int, element_id: str, req: G
"Avoid bulky collars, scarves, hair, hoods, props, or poses that hide the neck/shoulder placement area. "
"For back and close-up views, prioritize the cervical spine, shoulder blades, upper trapezius, and clean wearable-device contact area. "
)
models = [GPT_IMAGE_MODEL]
models = SUBJECT_ASSET_IMAGE_MODELS
generated: list[SubjectAsset] = []
generation_errors: list[str] = []
first_generation_error: RuntimeError | None = None
pack_force_fallback_model = _image_primary_circuit_open()
try:
for view, view_label in _subject_view_labels(req.subject_kind, req.views):
closeup_view = view in {"bust", "back_detail", "bust_front", "bust_left_45", "bust_right_45", "back_neck_detail"} or "detail" in view
@@ -5741,14 +5861,18 @@ def _generate_subject_assets_sync(job_id: str, idx: int, element_id: str, req: G
try:
if similar_mode:
print(
f"[subject assets] reconstruction_mode=similar endpoint=/images/generations view={view} image_refs=0 model={GPT_IMAGE_MODEL}",
f"[subject assets] reconstruction_mode=similar endpoint=/images/generations view={view} image_refs=0 model={'fallback' if pack_force_fallback_model else GPT_IMAGE_MODEL}",
flush=True,
)
img_bytes, _mode = _image_text_call(prompt, models=models, max_attempts=3)
img_bytes, _mode = _image_text_call(prompt, models=models, max_attempts=3, force_fallback_model=pack_force_fallback_model)
if _mode.endswith(f":{IMAGE_FALLBACK_MODEL}"):
pack_force_fallback_model = True
else:
if model_src is None:
raise RuntimeError("subject asset edit reference image missing")
img_bytes, _mode = _image_edit_call(model_src, prompt, models=models, fallback_text=False, max_attempts=3, max_side=1280)
img_bytes, _mode = _image_edit_call(model_src, prompt, models=models, fallback_text=False, max_attempts=3, max_side=1280, force_fallback_model=pack_force_fallback_model)
if _mode.endswith(f":{IMAGE_FALLBACK_MODEL}"):
pack_force_fallback_model = True
except RuntimeError as e:
if first_generation_error is None:
first_generation_error = e