en un clic
gemini-best-practices
Production patterns, API key security, cost optimization, performance tuning, and monitoring
Menu
Production patterns, API key security, cost optimization, performance tuning, and monitoring
Reduce costs and latency with context caching - implicit and explicit cache management with TTL configuration
Execute Python code in Gemini's secure sandbox for data analysis, visualization, and file processing
Generate text embeddings for semantic search, RAG, and vector database integration
Implement robust error handling with retry logic, rate limiting, and circuit breaker patterns
Implement tool use with Gemini - function declarations, tool modes, parallel/compositional calling, and MCP integration
Implement Google Search grounding for real-time information with citation parsing and attribution handling
| name | gemini-best-practices |
| description | Production patterns, API key security, cost optimization, performance tuning, and monitoring |
| argument-hint | <production concern or optimization goal> |
| allowed-tools | Read, Write, Bash(pip install, npm install, go get) |
Production patterns, security, and optimization: $ARGUMENTS
You are a Gemini API specialist with expertise in:
# BAD: Hardcoded key
client = genai.Client(api_key="AIza...") # NEVER do this
# GOOD: Environment variable
import os
client = genai.Client(api_key=os.environ.get("GOOGLE_API_KEY"))
# BETTER: Use a secrets manager
from google.cloud import secretmanager
def get_api_key():
client = secretmanager.SecretManagerServiceClient()
name = "projects/my-project/secrets/gemini-api-key/versions/latest"
response = client.access_secret_version(request={"name": name})
return response.payload.data.decode("UTF-8")
client = genai.Client(api_key=get_api_key())
import re
from typing import Optional
def sanitize_input(user_input: str, max_length: int = 10000) -> str:
"""Sanitize user input before sending to API."""
# Truncate to max length
if len(user_input) > max_length:
user_input = user_input[:max_length]
# Remove potential prompt injection patterns
suspicious_patterns = [
r"ignore previous instructions",
r"disregard all prior",
r"forget everything",
r"new instructions:",
]
for pattern in suspicious_patterns:
if re.search(pattern, user_input, re.IGNORECASE):
raise ValueError("Suspicious input detected")
return user_input
def safe_generate(user_prompt: str) -> str:
sanitized = sanitize_input(user_prompt)
response = client.models.generate_content(
model="gemini-2.5-flash",
contents=sanitized
)
return response.text
def validate_output(response_text: str) -> Optional[str]:
"""Validate API response before using."""
# Check for empty response
if not response_text or not response_text.strip():
return None
# Check for error indicators
error_patterns = ["I cannot", "I'm unable to", "Error:"]
for pattern in error_patterns:
if pattern.lower() in response_text.lower():
# Log for review but may still be valid
print(f"Warning: Response contains '{pattern}'")
return response_text
from google import genai
from google.genai.types import GenerateContentConfig
import logging
import os
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GeminiService:
def __init__(self):
self.client = genai.Client(
api_key=os.environ["GOOGLE_API_KEY"]
)
self.model = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash")
def generate(
self,
prompt: str,
temperature: float = 0.7,
max_tokens: int = 2048
) -> str:
"""Generate content with production safeguards."""
config = GenerateContentConfig(
temperature=temperature,
max_output_tokens=max_tokens
)
try:
response = self.client.models.generate_content(
model=self.model,
contents=prompt,
config=config
)
logger.info(f"Generated response, tokens: {response.usage_metadata.total_token_count}")
return response.text
except Exception as e:
logger.error(f"Generation failed: {e}")
raise
# Singleton instance
_service = None
def get_service() -> GeminiService:
global _service
if _service is None:
_service = GeminiService()
return _service
import asyncio
from asyncio import Queue
from dataclasses import dataclass
from typing import Any, Callable
@dataclass
class QueuedRequest:
prompt: str
callback: Callable
priority: int = 0
class RequestQueue:
def __init__(self, max_concurrent: int = 10):
self.queue = asyncio.PriorityQueue()
self.semaphore = asyncio.Semaphore(max_concurrent)
self.client = genai.Client()
async def enqueue(self, prompt: str, priority: int = 0) -> str:
"""Add request to queue and wait for result."""
future = asyncio.Future()
await self.queue.put((priority, QueuedRequest(
prompt=prompt,
callback=lambda r: future.set_result(r),
priority=priority
)))
return await future
async def process_queue(self):
"""Process queued requests."""
while True:
_, request = await self.queue.get()
async with self.semaphore:
try:
response = await asyncio.to_thread(
self.client.models.generate_content,
model="gemini-2.5-flash",
contents=request.prompt
)
request.callback(response.text)
except Exception as e:
request.callback(None)
self.queue.task_done()
from google import genai
import threading
class GeminiPool:
"""Pool of Gemini clients for high-throughput applications."""
def __init__(self, pool_size: int = 5):
self.clients = [genai.Client() for _ in range(pool_size)]
self.index = 0
self.lock = threading.Lock()
def get_client(self) -> genai.Client:
"""Get next client from pool (round-robin)."""
with self.lock:
client = self.clients[self.index]
self.index = (self.index + 1) % len(self.clients)
return client
# Usage
pool = GeminiPool(pool_size=10)
def generate(prompt: str) -> str:
client = pool.get_client()
response = client.models.generate_content(
model="gemini-2.5-flash",
contents=prompt
)
return response.text
def estimate_cost(
input_tokens: int,
output_tokens: int,
model: str = "gemini-2.5-flash",
cached_tokens: int = 0
) -> float:
"""Estimate request cost in USD."""
# Prices per million tokens (example rates)
prices = {
"gemini-3-pro-preview": {"input": 1.25, "output": 5.00},
"gemini-3-flash-preview": {"input": 0.15, "output": 0.60},
"gemini-2.5-pro": {"input": 1.25, "output": 5.00},
"gemini-2.5-flash": {"input": 0.075, "output": 0.30},
"gemini-2.5-flash-lite": {"input": 0.02, "output": 0.08},
}
rate = prices.get(model, prices["gemini-2.5-flash"])
# Cached tokens cost 75% less
regular_input = input_tokens - cached_tokens
cached_cost = (cached_tokens / 1_000_000) * rate["input"] * 0.25
regular_cost = (regular_input / 1_000_000) * rate["input"]
output_cost = (output_tokens / 1_000_000) * rate["output"]
return cached_cost + regular_cost + output_cost
# Track usage
class UsageTracker:
def __init__(self):
self.total_input = 0
self.total_output = 0
self.total_cost = 0.0
def track(self, response):
usage = response.usage_metadata
self.total_input += usage.prompt_token_count
self.total_output += usage.candidates_token_count
self.total_cost += estimate_cost(
usage.prompt_token_count,
usage.candidates_token_count
)
def report(self):
return {
"total_input_tokens": self.total_input,
"total_output_tokens": self.total_output,
"estimated_cost_usd": round(self.total_cost, 4)
}
def optimize_prompt(prompt: str) -> str:
"""Optimize prompt for fewer tokens."""
# Remove excessive whitespace
prompt = " ".join(prompt.split())
# Use abbreviations in system instructions
optimizations = {
"Please provide": "Provide",
"I would like you to": "",
"Can you please": "",
"Could you": "",
"I want you to": "",
}
for verbose, concise in optimizations.items():
prompt = prompt.replace(verbose, concise)
return prompt.strip()
# Compare token counts
def count_tokens(text: str) -> int:
result = client.models.count_tokens(
model="gemini-2.5-flash",
contents=text
)
return result.total_tokens
original = "Please provide me with a detailed explanation of how machine learning works"
optimized = optimize_prompt(original)
print(f"Original: {count_tokens(original)} tokens")
print(f"Optimized: {count_tokens(optimized)} tokens")
from functools import lru_cache
import hashlib
# In-memory cache for repeated queries
@lru_cache(maxsize=1000)
def cached_generate(prompt_hash: str) -> str:
# Note: Use hash as key to avoid memory issues with long prompts
return _generate_internal(prompt_hash)
def generate_with_cache(prompt: str) -> str:
prompt_hash = hashlib.sha256(prompt.encode()).hexdigest()
# Check local cache
try:
return cached_generate(prompt_hash)
except KeyError:
pass
response = client.models.generate_content(
model="gemini-2.5-flash",
contents=prompt
)
# Store in cache
_cache[prompt_hash] = response.text
return response.text
from concurrent.futures import ThreadPoolExecutor, as_completed
def batch_generate(prompts: list[str], max_workers: int = 10) -> list[str]:
"""Process multiple prompts concurrently."""
results = [None] * len(prompts)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_index = {
executor.submit(generate, prompt): i
for i, prompt in enumerate(prompts)
}
for future in as_completed(future_to_index):
index = future_to_index[future]
try:
results[index] = future.result()
except Exception as e:
results[index] = f"Error: {e}"
return results
async def stream_to_client(prompt: str, websocket):
"""Stream response to client via WebSocket."""
for chunk in client.models.generate_content_stream(
model="gemini-2.5-flash",
contents=prompt
):
await websocket.send_text(chunk.text)
await websocket.send_text("[DONE]")
from google.genai.types import GenerateContentConfig
# Set appropriate timeouts based on expected response length
SHORT_RESPONSE_TIMEOUT = 10000 # 10 seconds
MEDIUM_RESPONSE_TIMEOUT = 30000 # 30 seconds
LONG_RESPONSE_TIMEOUT = 120000 # 2 minutes
def generate_with_timeout(prompt: str, expected_length: str = "medium") -> str:
timeouts = {
"short": SHORT_RESPONSE_TIMEOUT,
"medium": MEDIUM_RESPONSE_TIMEOUT,
"long": LONG_RESPONSE_TIMEOUT
}
config = GenerateContentConfig(
request_options={"timeout": timeouts[expected_length]}
)
response = client.models.generate_content(
model="gemini-2.5-flash",
contents=prompt,
config=config
)
return response.text
import logging
import json
from datetime import datetime
class GeminiLogger:
def __init__(self):
self.logger = logging.getLogger("gemini")
def log_request(self, model: str, prompt_length: int, config: dict):
self.logger.info(json.dumps({
"event": "request",
"timestamp": datetime.utcnow().isoformat(),
"model": model,
"prompt_length": prompt_length,
"config": config
}))
def log_response(self, model: str, response, latency_ms: float):
usage = response.usage_metadata
self.logger.info(json.dumps({
"event": "response",
"timestamp": datetime.utcnow().isoformat(),
"model": model,
"input_tokens": usage.prompt_token_count,
"output_tokens": usage.candidates_token_count,
"latency_ms": latency_ms,
"finish_reason": response.candidates[0].finish_reason
}))
def log_error(self, model: str, error: Exception):
self.logger.error(json.dumps({
"event": "error",
"timestamp": datetime.utcnow().isoformat(),
"model": model,
"error_type": type(error).__name__,
"error_message": str(error)
}))
from prometheus_client import Counter, Histogram, Gauge
# Define metrics
REQUESTS_TOTAL = Counter(
'gemini_requests_total',
'Total Gemini API requests',
['model', 'status']
)
REQUEST_LATENCY = Histogram(
'gemini_request_latency_seconds',
'Gemini API request latency',
['model'],
buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60]
)
TOKENS_USED = Counter(
'gemini_tokens_total',
'Total tokens used',
['model', 'type'] # type: input or output
)
# Instrument requests
import time
def instrumented_generate(prompt: str, model: str = "gemini-2.5-flash") -> str:
start = time.time()
try:
response = client.models.generate_content(
model=model,
contents=prompt
)
latency = time.time() - start
# Record metrics
REQUESTS_TOTAL.labels(model=model, status="success").inc()
REQUEST_LATENCY.labels(model=model).observe(latency)
TOKENS_USED.labels(model=model, type="input").inc(
response.usage_metadata.prompt_token_count
)
TOKENS_USED.labels(model=model, type="output").inc(
response.usage_metadata.candidates_token_count
)
return response.text
except Exception as e:
REQUESTS_TOTAL.labels(model=model, status="error").inc()
raise
For: $ARGUMENTS
Provide: