| name | langgraph-persistence |
| description | INVOKE THIS SKILL when your LangGraph needs to persist state, remember conversations, travel through history, or configure subgraph checkpointer scoping. Covers checkpointers, thread_id, time travel, Store, and subgraph persistence modes. |
LangGraph's persistence layer enables durable execution by checkpointing graph state:
- Checkpointer: Saves/loads graph state at every super-step
- Thread ID: Identifies separate checkpoint sequences (conversations)
- Store: Cross-thread memory for user preferences, facts
Two memory types:
- Short-term (checkpointer): Thread-scoped conversation history
- Long-term (store): Cross-thread user preferences, facts
| Checkpointer | Use Case | Production Ready |
|---|
InMemorySaver | Testing, development | No |
SqliteSaver | Local development | Partial |
PostgresSaver | Production | Yes |
Checkpointer Setup
Set up a basic graph with in-memory checkpointing and thread-based state persistence.
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict, Annotated
import operator
class State(TypedDict):
messages: Annotated[list, operator.add]
def add_message(state: State) -> dict:
return {"messages": ["Bot response"]}
checkpointer = InMemorySaver()
graph = (
StateGraph(State)
.add_node("respond", add_message)
.add_edge(START, "respond")
.add_edge("respond", END)
.compile(checkpointer=checkpointer)
)
config = {"configurable": {"thread_id": "conversation-1"}}
result1 = graph.invoke({"messages": ["Hello"]}, config)
print(len(result1["messages"]))
result2 = graph.invoke({"messages": ["How are you?"]}, config)
print(len(result2["messages"]))
Set up a basic graph with in-memory checkpointing and thread-based state persistence.
import { MemorySaver, StateGraph, StateSchema, MessagesValue, START, END } from "@langchain/langgraph";
import { HumanMessage } from "@langchain/core/messages";
const State = new StateSchema({ messages: MessagesValue });
const addMessage = async (state: typeof State.State) => {
return { messages: [{ role: "assistant", content: "Bot response" }] };
};
const checkpointer = new MemorySaver();
const graph = new StateGraph(State)
.addNode("respond", addMessage)
.addEdge(START, "respond")
.addEdge("respond", END)
.compile({ checkpointer });
const config = { configurable: { thread_id: "conversation-1" } };
const result1 = await graph.invoke({ messages: [new HumanMessage("Hello")] }, config);
console.log(result1.messages.length);
const result2 = await graph.invoke({ messages: [new HumanMessage("How are you?")] }, config);
console.log(result2.messages.length);
Configure PostgreSQL-backed checkpointing for production deployments.
import os
from langgraph.checkpoint.postgres import PostgresSaver
with PostgresSaver.from_conn_string(os.environ["DATABASE_URL"]) as checkpointer:
graph = builder.compile(checkpointer=checkpointer)
Configure PostgreSQL-backed checkpointing for production deployments.
import { PostgresSaver } from "@langchain/langgraph-checkpoint-postgres";
const checkpointer = PostgresSaver.fromConnString(process.env.DATABASE_URL!);
const graph = builder.compile({ checkpointer });
Thread Management
Demonstrate isolated state between different thread IDs.
alice_config = {"configurable": {"thread_id": "user-alice"}}
bob_config = {"configurable": {"thread_id": "user-bob"}}
graph.invoke({"messages": ["Hi from Alice"]}, alice_config)
graph.invoke({"messages": ["Hi from Bob"]}, bob_config)
Demonstrate isolated state between different thread IDs.
const aliceConfig = { configurable: { thread_id: "user-alice" } };
const bobConfig = { configurable: { thread_id: "user-bob" } };
await graph.invoke({ messages: [new HumanMessage("Hi from Alice")] }, aliceConfig);
await graph.invoke({ messages: [new HumanMessage("Hi from Bob")] }, bobConfig);
State History & Time Travel
Time travel: browse checkpoint history and replay or fork from a past state.
config = {"configurable": {"thread_id": "session-1"}}
result = graph.invoke({"messages": ["start"]}, config)
states = list(graph.get_state_history(config))
past = states[-2]
result = graph.invoke(None, past.config)
fork_config = graph.update_state(past.config, {"messages": ["edited"]})
result = graph.invoke(None, fork_config)
Time travel: browse checkpoint history and replay or fork from a past state.
const config = { configurable: { thread_id: "session-1" } };
const result = await graph.invoke({ messages: ["start"] }, config);
const states: Awaited<ReturnType<typeof graph.getState>>[] = [];
for await (const state of graph.getStateHistory(config)) {
states.push(state);
}
const past = states[states.length - 2];
const replayed = await graph.invoke(null, past.config);
const forkConfig = await graph.updateState(past.config, { messages: ["edited"] });
const forked = await graph.invoke(null, forkConfig);
Manually update graph state before resuming execution.
config = {"configurable": {"thread_id": "session-1"}}
graph.update_state(config, {"data": "manually_updated"})
result = graph.invoke(None, config)
Manually update graph state before resuming execution.
const config = { configurable: { thread_id: "session-1" } };
await graph.updateState(config, { data: "manually_updated" });
const result = await graph.invoke(null, config);
Subgraph Checkpointer Scoping
When compiling a subgraph, the checkpointer parameter controls persistence behavior. This is critical for subgraphs that use interrupts, need multi-turn memory, or run in parallel.
| Feature | checkpointer=False | None (default) | True |
|---|
| Interrupts (HITL) | No | Yes | Yes |
| Multi-turn memory | No | No | Yes |
| Multiple calls (different subgraphs) | Yes | Yes | Warning (namespace conflicts possible) |
| Multiple calls (same subgraph) | Yes | Yes | No |
| State inspection | No | Warning (current invocation only) | Yes |
When to use each mode
checkpointer=False — Subgraph doesn't need interrupts or persistence. Simplest option, no checkpoint overhead.
None (default / omit checkpointer) — Subgraph needs interrupt() but not multi-turn memory. Each invocation starts fresh but can pause/resume. Parallel execution works because each invocation gets a unique namespace.
checkpointer=True — Subgraph needs to remember state across invocations (multi-turn conversations). Each call picks up where the last left off.
Warning: Stateful subgraphs (checkpointer=True) do NOT support calling the same subgraph instance multiple times within a single node — the calls write to the same checkpoint namespace and conflict.
Choose the right checkpointer mode for your subgraph.
subgraph = subgraph_builder.compile(checkpointer=False)
subgraph = subgraph_builder.compile()
subgraph = subgraph_builder.compile(checkpointer=True)
Choose the right checkpointer mode for your subgraph.
const subgraph = subgraphBuilder.compile({ checkpointer: false });
const subgraph = subgraphBuilder.compile();
const subgraph = subgraphBuilder.compile({ checkpointer: true });
Parallel subgraph namespacing
When multiple different stateful subgraphs run in parallel, wrap each in its own StateGraph with a unique node name for stable namespace isolation:
from langgraph.graph import MessagesState, StateGraph
def create_sub_agent(model, *, name, **kwargs):
"""Wrap an agent with a unique node name for namespace isolation."""
agent = create_agent(model=model, name=name, **kwargs)
return (
StateGraph(MessagesState)
.add_node(name, agent)
.add_edge("__start__", name)
.compile()
)
fruit_agent = create_sub_agent(
"gpt-4.1-mini", name="fruit_agent",
tools=[fruit_info], prompt="...", checkpointer=True,
)
veggie_agent = create_sub_agent(
"gpt-4.1-mini", name="veggie_agent",
tools=[veggie_info], prompt="...", checkpointer=True,
)
import { StateGraph, StateSchema, MessagesValue, START } from "@langchain/langgraph";
function createSubAgent(model: string, { name, ...kwargs }: { name: string; [key: string]: any }) {
const agent = createAgent({ model, name, ...kwargs });
return new StateGraph(new StateSchema({ messages: MessagesValue }))
.addNode(name, agent)
.addEdge(START, name)
.compile();
}
const fruitAgent = createSubAgent("gpt-4.1-mini", {
name: "fruit_agent", tools: [fruitInfo], prompt: "...", checkpointer: true,
});
const veggieAgent = createSubAgent("gpt-4.1-mini", {
name: "veggie_agent", tools: [veggieInfo], prompt: "...", checkpointer: true,
});
Note: Subgraphs added as nodes (via add_node) already get name-based namespaces automatically and don't need this wrapper.
Long-Term Memory (Store)
Use a Store for cross-thread memory to share user preferences across conversations.
from langgraph.store.memory import InMemoryStore
store = InMemoryStore()
store.put(("alice", "preferences"), "language", {"preference": "short responses"})
from langgraph.runtime import Runtime
def respond(state, runtime: Runtime):
prefs = runtime.store.get((state["user_id"], "preferences"), "language")
return {"response": f"Using preference: {prefs.value}"}
graph = builder.compile(checkpointer=checkpointer, store=store)
graph.invoke({"user_id": "alice"}, {"configurable": {"thread_id": "thread-1"}})
graph.invoke({"user_id": "alice"}, {"configurable": {"thread_id": "thread-2"}})
Use a Store for cross-thread memory to share user preferences across conversations.
import { MemoryStore } from "@langchain/langgraph";
const store = new MemoryStore();
await store.put(["alice", "preferences"], "language", { preference: "short responses" });
const respond = async (state: typeof State.State, runtime: any) => {
const item = await runtime.store?.get(["alice", "preferences"], "language");
return { response: `Using preference: ${item?.value?.preference}` };
};
const graph = builder.compile({ checkpointer, store });
await graph.invoke({ userId: "alice" }, { configurable: { thread_id: "thread-1" } });
await graph.invoke({ userId: "alice" }, { configurable: { thread_id: "thread-2" } });
Basic store operations: put, get, search, and delete.
from langgraph.store.memory import InMemoryStore
store = InMemoryStore()
store.put(("user-123", "facts"), "location", {"city": "San Francisco"})
item = store.get(("user-123", "facts"), "location")
results = store.search(("user-123", "facts"), filter={"city": "San Francisco"})
store.delete(("user-123", "facts"), "location")
Fixes
Always provide thread_id in config to enable state persistence.
graph.invoke({"messages": ["Hello"]})
graph.invoke({"messages": ["What did I say?"]})
config = {"configurable": {"thread_id": "session-1"}}
graph.invoke({"messages": ["Hello"]}, config)
graph.invoke({"messages": ["What did I say?"]}, config)
Always provide thread_id in config to enable state persistence.
await graph.invoke({ messages: [new HumanMessage("Hello")] });
await graph.invoke({ messages: [new HumanMessage("What did I say?")] });
const config = { configurable: { thread_id: "session-1" } };
await graph.invoke({ messages: [new HumanMessage("Hello")] }, config);
await graph.invoke({ messages: [new HumanMessage("What did I say?")] }, config);
Use PostgresSaver instead of InMemorySaver for production persistence.
checkpointer = InMemorySaver()
from langgraph.checkpoint.postgres import PostgresSaver
with PostgresSaver.from_conn_string("postgresql://...") as checkpointer:
checkpointer.setup()
graph = builder.compile(checkpointer=checkpointer)
Use PostgresSaver instead of MemorySaver for production persistence.
const checkpointer = new MemorySaver();
import { PostgresSaver } from "@langchain/langgraph-checkpoint-postgres";
const checkpointer = PostgresSaver.fromConnString("postgresql://...");
await checkpointer.setup();
Use Overwrite to replace state values instead of passing through reducers.
from langgraph.types import Overwrite
graph.update_state(config, {"items": ["C"]})
graph.update_state(config, {"items": Overwrite(["C"])})
Use Overwrite to replace state values instead of passing through reducers.
import { Overwrite } from "@langchain/langgraph";
await graph.updateState(config, { items: ["C"] });
await graph.updateState(config, { items: new Overwrite(["C"]) });
Access store via the Runtime object in graph nodes.
def my_node(state):
store.put(...)
from langgraph.runtime import Runtime
def my_node(state, runtime: Runtime):
runtime.store.put(...)
Access store via runtime parameter in graph nodes.
const myNode = async (state) => {
store.put(...);
};
const myNode = async (state, runtime) => {
await runtime.store?.put(...);
};
### What You Should NOT Do
- Use
InMemorySaver in production — data lost on restart; use PostgresSaver
- Forget
thread_id — state won't persist without it
- Expect
update_state to bypass reducers — it passes through them; use Overwrite to replace
- Run the same stateful subgraph (
checkpointer=True) in parallel within one node — namespace conflict
- Access store directly in a node — use
runtime.store via the Runtime param