원클릭으로
gemini-error-handling
Implement robust error handling with retry logic, rate limiting, and circuit breaker patterns
메뉴
Implement robust error handling with retry logic, rate limiting, and circuit breaker patterns
Production patterns, API key security, cost optimization, performance tuning, and monitoring
Reduce costs and latency with context caching - implicit and explicit cache management with TTL configuration
Execute Python code in Gemini's secure sandbox for data analysis, visualization, and file processing
Generate text embeddings for semantic search, RAG, and vector database integration
Implement tool use with Gemini - function declarations, tool modes, parallel/compositional calling, and MCP integration
Implement Google Search grounding for real-time information with citation parsing and attribution handling
| name | gemini-error-handling |
| description | Implement robust error handling with retry logic, rate limiting, and circuit breaker patterns |
| argument-hint | <error scenario or reliability requirement> |
| allowed-tools | Read, Write, Bash(pip install, npm install, go get) |
Implement robust error handling and retry logic: $ARGUMENTS
You are a Gemini API specialist with expertise in:
{
"error": {
"code": 429,
"message": "Resource has been exhausted...",
"status": "RESOURCE_EXHAUSTED",
"details": [
{
"@type": "type.googleapis.com/google.rpc.RetryInfo",
"retryDelay": "60s"
}
]
}
}
| Code | Status | Description | Action |
|---|---|---|---|
| 400 | INVALID_ARGUMENT | Bad request | Fix request parameters |
| 401 | UNAUTHENTICATED | Invalid API key | Check/refresh credentials |
| 403 | PERMISSION_DENIED | Access denied | Check permissions |
| 404 | NOT_FOUND | Resource not found | Check model name/resource ID |
| 429 | RESOURCE_EXHAUSTED | Rate limited | Retry with backoff |
| 500 | INTERNAL | Server error | Retry with backoff |
| 503 | UNAVAILABLE | Service unavailable | Retry with backoff |
| 504 | DEADLINE_EXCEEDED | Timeout | Retry or reduce request size |
from google import genai
from google.genai.errors import (
APIError,
AuthenticationError,
RateLimitError,
InvalidRequestError,
ServerError
)
client = genai.Client()
def safe_generate(prompt: str) -> str:
try:
response = client.models.generate_content(
model="gemini-2.5-flash",
contents=prompt
)
return response.text
except AuthenticationError as e:
print(f"Authentication failed: {e}")
raise
except RateLimitError as e:
print(f"Rate limited: {e}")
raise
except InvalidRequestError as e:
print(f"Invalid request: {e}")
raise
except ServerError as e:
print(f"Server error: {e}")
raise
except APIError as e:
print(f"API error: {e.code} - {e.message}")
raise
import { GoogleGenAI } from "@google/genai";
const ai = new GoogleGenAI({ apiKey: process.env.GOOGLE_API_KEY });
async function safeGenerate(prompt) {
try {
const response = await ai.models.generateContent({
model: "gemini-2.5-flash",
contents: prompt
});
return response.text;
} catch (error) {
if (error.status === 401) {
console.error("Authentication failed:", error.message);
} else if (error.status === 429) {
console.error("Rate limited:", error.message);
} else if (error.status === 400) {
console.error("Invalid request:", error.message);
} else if (error.status >= 500) {
console.error("Server error:", error.message);
} else {
console.error("API error:", error.message);
}
throw error;
}
}
import time
import random
from google import genai
from google.genai.errors import RateLimitError, ServerError
def retry_with_backoff(
func,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True
):
"""Execute function with exponential backoff retry."""
for attempt in range(max_retries):
try:
return func()
except (RateLimitError, ServerError) as e:
if attempt == max_retries - 1:
raise
# Calculate delay with exponential backoff
delay = min(base_delay * (2 ** attempt), max_delay)
# Add jitter to prevent thundering herd
if jitter:
delay = delay * (0.5 + random.random())
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...")
time.sleep(delay)
except Exception as e:
# Don't retry for other errors
raise
# Usage
client = genai.Client()
def make_request():
return client.models.generate_content(
model="gemini-2.5-flash",
contents="Hello!"
)
response = retry_with_backoff(make_request)
print(response.text)
async function retryWithBackoff(
func,
maxRetries = 5,
baseDelay = 1000,
maxDelay = 60000
) {
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
return await func();
} catch (error) {
const isRetryable = error.status === 429 || error.status >= 500;
if (!isRetryable || attempt === maxRetries - 1) {
throw error;
}
// Exponential backoff with jitter
const delay = Math.min(baseDelay * Math.pow(2, attempt), maxDelay);
const jitter = delay * (0.5 + Math.random());
console.log(`Attempt ${attempt + 1} failed. Retrying in ${jitter}ms...`);
await new Promise(resolve => setTimeout(resolve, jitter));
}
}
}
// Usage
const response = await retryWithBackoff(async () => {
return await ai.models.generateContent({
model: "gemini-2.5-flash",
contents: "Hello!"
});
});
from google.genai.errors import RateLimitError
try:
response = client.models.generate_content(...)
except RateLimitError as e:
# Check for retry info in error details
if e.retry_delay:
print(f"Retry after: {e.retry_delay} seconds")
time.sleep(e.retry_delay)
else:
# Default backoff
time.sleep(60)
import time
from threading import Lock
from collections import deque
class RateLimiter:
def __init__(self, requests_per_minute: int = 60):
self.rpm = requests_per_minute
self.window = 60.0 # seconds
self.requests = deque()
self.lock = Lock()
def acquire(self):
"""Wait if necessary to stay within rate limit."""
with self.lock:
now = time.time()
# Remove old requests outside window
while self.requests and self.requests[0] < now - self.window:
self.requests.popleft()
if len(self.requests) >= self.rpm:
# Wait until oldest request exits window
wait_time = self.requests[0] + self.window - now
if wait_time > 0:
time.sleep(wait_time)
self.requests.popleft()
self.requests.append(time.time())
# Usage
limiter = RateLimiter(requests_per_minute=60)
client = genai.Client()
def rate_limited_request(prompt):
limiter.acquire()
return client.models.generate_content(
model="gemini-2.5-flash",
contents=prompt
)
import asyncio
from google import genai
async def generate_with_timeout(prompt: str, timeout_seconds: int = 30):
"""Generate content with timeout."""
client = genai.Client()
try:
response = await asyncio.wait_for(
asyncio.to_thread(
client.models.generate_content,
model="gemini-2.5-flash",
contents=prompt
),
timeout=timeout_seconds
)
return response.text
except asyncio.TimeoutError:
print(f"Request timed out after {timeout_seconds}s")
raise
# Synchronous version
def generate_with_timeout_sync(prompt: str, timeout_ms: int = 30000):
"""Generate content with timeout (sync)."""
from google.genai.types import GenerateContentConfig
client = genai.Client()
try:
response = client.models.generate_content(
model="gemini-2.5-flash",
contents=prompt,
config=GenerateContentConfig(
request_options={"timeout": timeout_ms}
)
)
return response.text
except Exception as e:
if "timeout" in str(e).lower():
print(f"Request timed out")
raise
from google.genai.types import GenerateContentConfig, SafetySetting
def handle_safety_block(response):
"""Check for safety-blocked responses."""
candidate = response.candidates[0]
if candidate.finish_reason == "SAFETY":
print("Response blocked by safety filters")
safety_ratings = candidate.safety_ratings
for rating in safety_ratings:
if rating.blocked:
print(f" Blocked: {rating.category} - {rating.probability}")
return None
return response.text
# Adjust safety settings if needed
config = GenerateContentConfig(
safety_settings=[
SafetySetting(
category="HARM_CATEGORY_HARASSMENT",
threshold="BLOCK_ONLY_HIGH"
),
SafetySetting(
category="HARM_CATEGORY_HATE_SPEECH",
threshold="BLOCK_ONLY_HIGH"
),
]
)
from google import genai
from google.genai.errors import APIError, RateLimitError, ServerError
import time
import random
from functools import wraps
def robust_api_call(
max_retries: int = 3,
timeout_ms: int = 30000,
on_retry=None
):
"""Decorator for robust API calls."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except RateLimitError as e:
last_exception = e
delay = e.retry_delay or (2 ** attempt + random.random())
if on_retry:
on_retry(attempt, e, delay)
time.sleep(delay)
except ServerError as e:
last_exception = e
delay = 2 ** attempt + random.random()
if on_retry:
on_retry(attempt, e, delay)
time.sleep(delay)
except APIError as e:
# Don't retry client errors (4xx except 429)
if 400 <= e.code < 500 and e.code != 429:
raise
last_exception = e
delay = 2 ** attempt + random.random()
time.sleep(delay)
raise last_exception
return wrapper
return decorator
# Usage
client = genai.Client()
@robust_api_call(max_retries=3)
def generate(prompt: str) -> str:
response = client.models.generate_content(
model="gemini-2.5-flash",
contents=prompt
)
return response.text
# With callback
def log_retry(attempt, error, delay):
print(f"Retry {attempt + 1}: {error}. Waiting {delay:.2f}s")
@robust_api_call(max_retries=5, on_retry=log_retry)
def generate_with_logging(prompt: str) -> str:
response = client.models.generate_content(
model="gemini-2.5-flash",
contents=prompt
)
return response.text
import time
from enum import Enum
from threading import Lock
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
half_open_requests: int = 1
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_requests = half_open_requests
self.state = CircuitState.CLOSED
self.failures = 0
self.last_failure_time = None
self.half_open_successes = 0
self.lock = Lock()
def can_execute(self) -> bool:
with self.lock:
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_successes = 0
return True
return False
# HALF_OPEN - allow limited requests
return self.half_open_successes < self.half_open_requests
def record_success(self):
with self.lock:
if self.state == CircuitState.HALF_OPEN:
self.half_open_successes += 1
if self.half_open_successes >= self.half_open_requests:
self.state = CircuitState.CLOSED
self.failures = 0
def record_failure(self):
with self.lock:
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.failure_threshold:
self.state = CircuitState.OPEN
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
# Usage
circuit = CircuitBreaker()
def call_with_circuit_breaker(func):
if not circuit.can_execute():
raise Exception("Circuit breaker is open")
try:
result = func()
circuit.record_success()
return result
except Exception as e:
circuit.record_failure()
raise
For: $ARGUMENTS
Provide: