// Guides architectural decisions for LangGraph applications. Use when deciding between LangGraph vs alternatives, choosing state management strategies, designing multi-agent systems, or selecting persistence and streaming approaches.
| name | langgraph-architecture |
| description | Guides architectural decisions for LangGraph applications. Use when deciding between LangGraph vs alternatives, choosing state management strategies, designing multi-agent systems, or selecting persistence and streaming approaches. |
| Scenario | Alternative | Why |
|---|---|---|
| Single LLM call | Direct API call | Overhead not justified |
| Linear pipeline | LangChain LCEL | Simpler abstraction |
| Stateless tool use | Function calling | No persistence needed |
| Simple RAG | LangChain retrievers | Built-in patterns |
| Batch processing | Async tasks | Different execution model |
| TypedDict | Pydantic |
|---|---|
| Lightweight, faster | Runtime validation |
| Dict-like access | Attribute access |
| No validation overhead | Type coercion |
| Simpler serialization | Complex nested models |
Recommendation: Use TypedDict for most cases. Use Pydantic when you need validation or complex nested structures.
| Use Case | Reducer | Example |
|---|---|---|
| Chat messages | add_messages | Handles IDs, RemoveMessage |
| Simple append | operator.add | Annotated[list, operator.add] |
| Keep latest | None (LastValue) | field: str |
| Custom merge | Lambda | Annotated[list, lambda a, b: ...] |
| Overwrite list | Overwrite | Bypass reducer |
# SMALL STATE (< 1MB) - Put in state
class State(TypedDict):
messages: Annotated[list, add_messages]
context: str
# LARGE DATA - Use Store
class State(TypedDict):
messages: Annotated[list, add_messages]
document_ref: str # Reference to store
def node(state, *, store: BaseStore):
doc = store.get(namespace, state["document_ref"])
# Process without bloating checkpoints
Single Graph when:
Subgraphs when:
| Conditional Edges | Command |
|---|---|
| Routing based on state | Routing + state update |
| Separate router function | Decision in node |
| Clearer visualization | More flexible |
| Standard patterns | Dynamic destinations |
# Conditional Edge - when routing is the focus
def router(state) -> Literal["a", "b"]:
return "a" if condition else "b"
builder.add_conditional_edges("node", router)
# Command - when combining routing with updates
def node(state) -> Command:
return Command(goto="next", update={"step": state["step"] + 1})
Static Edges (add_edge):
Dynamic Routing (add_conditional_edges, Command, Send):
| Checkpointer | Use Case | Characteristics |
|---|---|---|
InMemorySaver | Testing only | Lost on restart |
SqliteSaver | Development | Single file, local |
PostgresSaver | Production | Scalable, concurrent |
| Custom | Special needs | Implement BaseCheckpointSaver |
# Full persistence (default)
graph = builder.compile(checkpointer=checkpointer)
# Subgraph options
subgraph = sub_builder.compile(
checkpointer=None, # Inherit from parent
checkpointer=True, # Independent checkpointing
checkpointer=False, # No checkpointing (runs atomically)
)
Best for:
┌─────────────┐
│ Supervisor │
└──────┬──────┘
┌────────┬───┴───┬────────┐
▼ ▼ ▼ ▼
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
│Agent1│ │Agent2│ │Agent3│ │Agent4│
└──────┘ └──────┘ └──────┘ └──────┘
Best for:
┌──────┐ ┌──────┐
│Agent1│◄───►│Agent2│
└──┬───┘ └───┬──┘
│ │
▼ ▼
┌──────┐ ┌──────┐
│Agent3│◄───►│Agent4│
└──────┘ └──────┘
Best for:
┌────────┐ ┌────────┐ ┌────────┐
│Research│───►│Planning│───►│Execute │
└────────┘ └────────┘ └────────┘
| Mode | Use Case | Data |
|---|---|---|
updates | UI updates | Node outputs only |
values | State inspection | Full state each step |
messages | Chat UX | LLM tokens |
custom | Progress/logs | Your data via StreamWriter |
debug | Debugging | Tasks + checkpoints |
# Stream from subgraphs
async for chunk in graph.astream(
input,
stream_mode="updates",
subgraphs=True # Include subgraph events
):
namespace, data = chunk # namespace indicates depth
| Strategy | Use Case |
|---|---|
interrupt_before | Approval before action |
interrupt_after | Review after completion |
interrupt() in node | Dynamic, contextual pauses |
# Simple resume (same thread)
graph.invoke(None, config)
# Resume with value
graph.invoke(Command(resume="approved"), config)
# Resume specific interrupt
graph.invoke(Command(resume={interrupt_id: value}), config)
# Modify state and resume
graph.update_state(config, {"field": "new_value"})
graph.invoke(None, config)
# Per-node retry
RetryPolicy(
initial_interval=0.5,
backoff_factor=2.0,
max_interval=60.0,
max_attempts=3,
retry_on=lambda e: isinstance(e, (APIError, TimeoutError))
)
# Multiple policies (first match wins)
builder.add_node("node", fn, retry_policy=[
RetryPolicy(retry_on=RateLimitError, max_attempts=5),
RetryPolicy(retry_on=Exception, max_attempts=2),
])
def node_with_fallback(state):
try:
return primary_operation(state)
except PrimaryError:
return fallback_operation(state)
# Or use conditional edges for complex fallback routing
def route_on_error(state) -> Literal["retry", "fallback", "__end__"]:
if state.get("error") and state["attempts"] < 3:
return "retry"
elif state.get("error"):
return "fallback"
return END
# Set recursion limit
config = {"recursion_limit": 50}
graph.invoke(input, config)
# Track remaining steps in state
class State(TypedDict):
remaining_steps: RemainingSteps
def check_budget(state):
if state["remaining_steps"] < 5:
return "wrap_up"
return "continue"
Before implementing: