Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 0 additions & 139 deletions plugins/pipes/gemini_manifold.py
Original file line number Diff line number Diff line change
Expand Up @@ -3104,145 +3104,6 @@ def _check_free_tier_eligibility(

return True

async def _get_resilient_response_stream(
self,
valves: "Pipe.Valves",
__user__: "UserData",
gen_content_args: dict,
is_streaming: bool,
__metadata__: "Metadata",
event_emitter: EventEmitter,
model_config: dict,
) -> AsyncIterator[types.GenerateContentResponse] | None:
"""
Generates a stream of responses with automatic fallback logic.

1. Identifies if routing (Free -> Paid) is possible.
2. Tries Free Tier first.
3. If Free Tier fails on the FIRST chunk with a Quota/Permission/Overload error, switches to Paid.
4. Updates __metadata__['is_paid_api'] so cost calculation is accurate.
"""

model_id = __metadata__.get("canonical_model_id")

# --- 1. Determine Routing Strategy ---
has_free_key = bool(valves.GEMINI_FREE_API_KEY)
has_paid_key = bool(valves.GEMINI_PAID_API_KEY)
use_vertex = valves.USE_VERTEX_AI and valves.VERTEX_PROJECT

execution_order = []
if use_vertex:
execution_order = ["vertex"]
elif has_free_key:
is_eligible = self._check_free_tier_eligibility(
model_id, model_config, __metadata__.get("features", {}) # type: ignore
)

if is_eligible:
execution_order = ["free"]
if has_paid_key:
execution_order.append("paid")
else:
execution_order = ["paid"] if has_paid_key else ["free"]
elif has_paid_key:
execution_order = ["paid"]
else:
execution_order = ["standard"]

log.debug(f"Routing strategy for {model_id}: {execution_order}")

# --- 2. Execution Loop ---
for attempt_idx, tier in enumerate(execution_order):
is_last_attempt = attempt_idx == len(execution_order) - 1
current_valves = copy.copy(valves)

if tier == "free":
current_valves.GEMINI_PAID_API_KEY = None
log.info(f"Attempting execution on FREE Tier ({model_id})...")
__metadata__["is_paid_api"] = False
elif tier == "paid":
current_valves.GEMINI_FREE_API_KEY = None
log.info(f"Attempting execution on PAID Tier ({model_id})...")
__metadata__["is_paid_api"] = True

try:
client = self._get_user_client(current_valves, __user__["email"])
except Exception as e:
if not is_last_attempt:
log.warning(f"Client creation failed for {tier}: {e}. Retrying...")
continue
raise e

try:
if is_streaming:
stream = await client.aio.models.generate_content_stream(**gen_content_args) # type: ignore
else:
response = await client.aio.models.generate_content(
**gen_content_args
)

async def one_shot_iter():
yield response

stream = one_shot_iter()

# --- 3. The "Peek" Logic ---
# We fetch the first chunk to catch early errors (429, 403, 503)
iterator = stream.__aiter__()
try:
first_chunk = await iterator.__anext__()
except StopAsyncIteration:
return

# Success! Reconstruct the stream and return
async def reconstructed_stream():
yield first_chunk
async for chunk in iterator:
yield chunk

return reconstructed_stream()

except Exception as e:
error_str = str(e).upper()

# Check for Quota (429), Permissions (403), or Overloaded/Unavailable (503)
is_fallback_eligible_error = (
"429" in error_str
or "403" in error_str
or "503" in error_str
or "UNAVAILABLE" in error_str
or (
isinstance(e, genai_errors.ClientError) and e.code in [429, 403]
)
or (isinstance(e, genai_errors.ServerError) and e.code in [503])
)

should_retry = (
not is_last_attempt
and tier == "free"
and is_fallback_eligible_error
)

if should_retry:
reason = (
"quota exceeded" if "429" in error_str else "model overloaded"
)
log.warning(
f"Free Tier {reason} (Error: {e}). Switching to Paid API..."
)

asyncio.create_task(
event_emitter.emit_status(
f"Free Tier {reason}, switching to Paid API...", done=False
)
)
continue
else:
log.error(f"Error during request execution (Tier: {tier}): {e}")
raise e

raise ValueError("Exhausted execution options without result.")

async def _unified_response_processor(
self,
response_stream: AsyncIterator[types.GenerateContentResponse],
Expand Down
Loading