A LangGraph checkpointer is not memory. It does not know facts. It knows where in the loop your agent was when the process died, what state was in flight, and how to keep going. It is reliability plus a time machine. When this finishes you have a 3-node graph that survives Ctrl-C mid-run, picks up exactly where it stopped, and can fork from any earlier step.
A memory layer (Mem0, Letta, Zep) stores facts your agent knows. A checkpointer stores the agent's own runtime state: which node ran last, what messages accumulated, what tool calls are pending. Kill the Python process mid-graph and the checkpointer lets the next invocation walk in, read state for thread_id="xyz", and resume from the next node. Same primitive lets you walk state history backwards and fork from a past snapshot. The official LangGraph backends are SqliteSaver (local file, lab-friendly) and PostgresSaver (multi-process production).
Two packages. The saver lives in a separate distribution from the core SDK.
pip install langgraph langgraph-checkpoint-sqlite langchain-openai
Set the OpenAI key (any LLM works; we wire OpenAI because every attendee already has one).
# macOS / Linux export OPENAI_API_KEY="sk-..." # Windows PowerShell $env:OPENAI_API_KEY = "sk-..."
Three nodes: plan, step, finish. State is a counter plus a list of notes. The graph is intentionally slow so you can Ctrl-C mid-run.
import sys
import time
from typing import Annotated, TypedDict
from operator import add
sys.stdout.reconfigure(encoding="utf-8")
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.sqlite import SqliteSaver
class State(TypedDict):
counter: int
notes: Annotated[list[str], add]
def plan_node(state: State) -> dict:
print(f"[plan] counter={state['counter']}")
return {"notes": ["planned the work"]}
def step_node(state: State) -> dict:
counter = state["counter"] + 1
print(f"[step] counter={counter} (sleeping 2s, Ctrl-C now to test resume)")
time.sleep(2)
return {"counter": counter, "notes": [f"did step {counter}"]}
def finish_node(state: State) -> dict:
print(f"[finish] counter={state['counter']} notes={state['notes']}")
return {"notes": ["wrapped up"]}
def should_loop(state: State) -> str:
return "step" if state["counter"] < 3 else "finish"
builder = StateGraph(State)
builder.add_node("plan", plan_node)
builder.add_node("step", step_node)
builder.add_node("finish", finish_node)
builder.add_edge(START, "plan")
builder.add_edge("plan", "step")
builder.add_conditional_edges("step", should_loop, {"step": "step", "finish": "finish"})
builder.add_edge("finish", END)
# SqliteSaver writes to a local file; thread_id keys each run.
saver = SqliteSaver.from_conn_string("checkpoints.sqlite")
graph = builder.compile(checkpointer=saver)
if __name__ == "__main__":
config = {"configurable": {"thread_id": "lab-05-run-1"}}
initial = {"counter": 0, "notes": []}
for event in graph.stream(initial, config=config):
pass
Run it. Hit Ctrl-C after the first or second step prints.
python graph.py
[plan] counter=0 [step] counter=1 (sleeping 2s, Ctrl-C now to test resume) ^C
Append this to graph.py (or write a sibling resume.py that imports graph from it). Same thread_id picks up where you crashed. Then we walk history and fork from an earlier snapshot.
import sys
sys.stdout.reconfigure(encoding="utf-8")
from graph import graph
config = {"configurable": {"thread_id": "lab-05-run-1"}}
# RESUME: pass None as the input. LangGraph reads the last checkpoint
# for this thread_id and continues from the next pending node.
print("\n--- RESUMING ---")
for event in graph.stream(None, config=config):
pass
# TIME TRAVEL: walk the checkpoint history for this thread.
print("\n--- HISTORY ---")
snapshots = list(graph.get_state_history(config))
for i, snap in enumerate(snapshots):
next_nodes = snap.next or ("END",)
print(f" [{i}] next={next_nodes} counter={snap.values.get('counter')} notes={snap.values.get('notes')}")
# FORK: pick a past snapshot and re-run from there on a new thread_id.
# checkpoint_id is the unique handle for that point in the graph's life.
if len(snapshots) >= 3:
past = snapshots[2] # arbitrary earlier point
fork_config = {
"configurable": {
"thread_id": "lab-05-fork-from-2",
"checkpoint_id": past.config["configurable"]["checkpoint_id"],
}
}
print(f"\n--- FORKING from snapshot[2] (counter={past.values.get('counter')}) ---")
for event in graph.stream(None, fork_config):
pass
Run the resume:
python resume.py
--- RESUMING ---
[step] counter=2 (sleeping 2s, Ctrl-C now to test resume)
[step] counter=3 (sleeping 2s, Ctrl-C now to test resume)
[finish] counter=3 notes=['planned the work', 'did step 1', 'did step 2', 'did step 3']
--- HISTORY ---
[0] next=('END',) counter=3 notes=[...]
[1] next=('finish',) counter=3 notes=[...]
[2] next=('step',) counter=2 notes=[...]
[3] next=('step',) counter=1 notes=[...]
...
--- FORKING from snapshot[2] (counter=2) ---
[step] counter=3 ...
[finish] counter=3 ...
Nothing in this lab knows that you prefer pnpm or that you live in SF. The checkpointer remembers the graph's own runtime state: which node ran, what messages accumulated, what is queued next. Pair it with Mem0 / Letta / Zep when you need the agent to know things across users and threads. Lab 06 wires both together.
The killer use case nobody talks about: time-travel debugging. When a production agent does something wrong, you can replay from the exact checkpoint, change an input, and watch the alternate timeline. No re-running upstream tool calls.
Swap SqliteSaver for PostgresSaver (in langgraph-checkpoint-postgres) for multi-worker setups. Same API. Each worker can pick up any thread because state lives in Postgres, not a local file. You usually pair this with a queue (Celery, RQ, BullMQ) that hands a thread_id to whichever worker is free.
"ModuleNotFoundError: langgraph.checkpoint.sqlite". The saver moved to its own package. pip install langgraph-checkpoint-sqlite separately from langgraph.
Resume restarts from the beginning. The thread_id in your second run does not match the first. Same string, exact match, same SQLite file in the same directory.
"AttributeError: SqliteSaver has no attribute from_conn_string". On older builds the constructor name was SqliteSaver.from_conn_string("file.sqlite") directly; on the very newest builds you may need SqliteSaver(sqlite3.connect("file.sqlite", check_same_thread=False)). The import path is the same; the constructor signature drifts. The docs page for your installed version is canon.