← labs | 05 | langgraph checkpointer
lab 05 | ~6 min

Kill the process. Resume from the last step.

A LangGraph checkpointer is not memory. It does not know facts. It knows where in the loop your agent was when the process died, what state was in flight, and how to keep going. It is reliability plus a time machine. When this finishes you have a 3-node graph that survives Ctrl-C mid-run, picks up exactly where it stopped, and can fork from any earlier step.

checkpointer vs memory in one paragraph

A memory layer (Mem0, Letta, Zep) stores facts your agent knows. A checkpointer stores the agent's own runtime state: which node ran last, what messages accumulated, what tool calls are pending. Kill the Python process mid-graph and the checkpointer lets the next invocation walk in, read state for thread_id="xyz", and resume from the next node. Same primitive lets you walk state history backwards and fork from a past snapshot. The official LangGraph backends are SqliteSaver (local file, lab-friendly) and PostgresSaver (multi-process production).

"Checkpointers persist a snapshot of the graph state at every super-step. Threads are conversations; checkpoints are turns. You can replay, fork, or resume any thread."
langchain-ai/langgraph, How-tos > Persistence
step 1

Install LangGraph + the SQLite saver.

Two packages. The saver lives in a separate distribution from the core SDK.

install
pip install langgraph langgraph-checkpoint-sqlite langchain-openai

Set the OpenAI key (any LLM works; we wire OpenAI because every attendee already has one).

env (any shell)
# macOS / Linux
export OPENAI_API_KEY="sk-..."

# Windows PowerShell
$env:OPENAI_API_KEY = "sk-..."
step 2

Write a 3-node graph with a checkpointer.

Three nodes: plan, step, finish. State is a counter plus a list of notes. The graph is intentionally slow so you can Ctrl-C mid-run.

graph.py
import sys
import time
from typing import Annotated, TypedDict
from operator import add

sys.stdout.reconfigure(encoding="utf-8")

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.sqlite import SqliteSaver


class State(TypedDict):
    counter: int
    notes: Annotated[list[str], add]


def plan_node(state: State) -> dict:
    print(f"[plan]   counter={state['counter']}")
    return {"notes": ["planned the work"]}


def step_node(state: State) -> dict:
    counter = state["counter"] + 1
    print(f"[step]   counter={counter} (sleeping 2s, Ctrl-C now to test resume)")
    time.sleep(2)
    return {"counter": counter, "notes": [f"did step {counter}"]}


def finish_node(state: State) -> dict:
    print(f"[finish] counter={state['counter']} notes={state['notes']}")
    return {"notes": ["wrapped up"]}


def should_loop(state: State) -> str:
    return "step" if state["counter"] < 3 else "finish"


builder = StateGraph(State)
builder.add_node("plan", plan_node)
builder.add_node("step", step_node)
builder.add_node("finish", finish_node)
builder.add_edge(START, "plan")
builder.add_edge("plan", "step")
builder.add_conditional_edges("step", should_loop, {"step": "step", "finish": "finish"})
builder.add_edge("finish", END)

# SqliteSaver writes to a local file; thread_id keys each run.
saver = SqliteSaver.from_conn_string("checkpoints.sqlite")
graph = builder.compile(checkpointer=saver)


if __name__ == "__main__":
    config = {"configurable": {"thread_id": "lab-05-run-1"}}
    initial = {"counter": 0, "notes": []}
    for event in graph.stream(initial, config=config):
        pass

Run it. Hit Ctrl-C after the first or second step prints.

terminal
python graph.py
expected before you Ctrl-C
[plan]   counter=0
[step]   counter=1 (sleeping 2s, Ctrl-C now to test resume)
^C
step 3

Resume + time-travel.

Append this to graph.py (or write a sibling resume.py that imports graph from it). Same thread_id picks up where you crashed. Then we walk history and fork from an earlier snapshot.

resume.py
import sys
sys.stdout.reconfigure(encoding="utf-8")

from graph import graph

config = {"configurable": {"thread_id": "lab-05-run-1"}}

# RESUME: pass None as the input. LangGraph reads the last checkpoint
# for this thread_id and continues from the next pending node.
print("\n--- RESUMING ---")
for event in graph.stream(None, config=config):
    pass

# TIME TRAVEL: walk the checkpoint history for this thread.
print("\n--- HISTORY ---")
snapshots = list(graph.get_state_history(config))
for i, snap in enumerate(snapshots):
    next_nodes = snap.next or ("END",)
    print(f"  [{i}] next={next_nodes} counter={snap.values.get('counter')} notes={snap.values.get('notes')}")

# FORK: pick a past snapshot and re-run from there on a new thread_id.
# checkpoint_id is the unique handle for that point in the graph's life.
if len(snapshots) >= 3:
    past = snapshots[2]  # arbitrary earlier point
    fork_config = {
        "configurable": {
            "thread_id": "lab-05-fork-from-2",
            "checkpoint_id": past.config["configurable"]["checkpoint_id"],
        }
    }
    print(f"\n--- FORKING from snapshot[2] (counter={past.values.get('counter')}) ---")
    for event in graph.stream(None, fork_config):
        pass

Run the resume:

terminal
python resume.py
expected output (approximate)
--- RESUMING ---
[step]   counter=2 (sleeping 2s, Ctrl-C now to test resume)
[step]   counter=3 (sleeping 2s, Ctrl-C now to test resume)
[finish] counter=3 notes=['planned the work', 'did step 1', 'did step 2', 'did step 3']

--- HISTORY ---
  [0] next=('END',) counter=3 notes=[...]
  [1] next=('finish',) counter=3 notes=[...]
  [2] next=('step',) counter=2 notes=[...]
  [3] next=('step',) counter=1 notes=[...]
  ...

--- FORKING from snapshot[2] (counter=2) ---
[step]   counter=3 ...
[finish] counter=3 ...

why this is not memory

Nothing in this lab knows that you prefer pnpm or that you live in SF. The checkpointer remembers the graph's own runtime state: which node ran, what messages accumulated, what is queued next. Pair it with Mem0 / Letta / Zep when you need the agent to know things across users and threads. Lab 06 wires both together.

The killer use case nobody talks about: time-travel debugging. When a production agent does something wrong, you can replay from the exact checkpoint, change an input, and watch the alternate timeline. No re-running upstream tool calls.

going further | PostgresSaver in production

Swap SqliteSaver for PostgresSaver (in langgraph-checkpoint-postgres) for multi-worker setups. Same API. Each worker can pick up any thread because state lives in Postgres, not a local file. You usually pair this with a queue (Celery, RQ, BullMQ) that hands a thread_id to whichever worker is free.

troubleshooting

"ModuleNotFoundError: langgraph.checkpoint.sqlite". The saver moved to its own package. pip install langgraph-checkpoint-sqlite separately from langgraph.

Resume restarts from the beginning. The thread_id in your second run does not match the first. Same string, exact match, same SQLite file in the same directory.

"AttributeError: SqliteSaver has no attribute from_conn_string". On older builds the constructor name was SqliteSaver.from_conn_string("file.sqlite") directly; on the very newest builds you may need SqliteSaver(sqlite3.connect("file.sqlite", check_same_thread=False)). The import path is the same; the constructor signature drifts. The docs page for your installed version is canon.