Checkpointer API¶
LangGraph checkpoint savers for CockroachDB.
CockroachDBSaver¶
Synchronous checkpointer for LangGraph workflows.
langchain_cockroachdb.checkpointer.saver.CockroachDBSaver
¶
Bases: BaseCockroachDBSaver
Synchronous checkpointer that stores checkpoints in CockroachDB.
Uses raw psycopg3 connections (not SQLAlchemy) for compatibility with LangGraph's checkpointer interface.
Example
delete_thread(thread_id)
¶
Delete all checkpoints and writes for a thread.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
The thread ID to delete. |
required |
from_conn_string(conn_string, *, pipeline=False)
classmethod
¶
Create a new CockroachDBSaver from a connection string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conn_string
|
str
|
CockroachDB connection string. Supports cockroachdb:// and postgresql:// formats. |
required |
pipeline
|
bool
|
Whether to use Pipeline mode (may not be supported by all CockroachDB versions). |
False
|
Yields:
| Type | Description |
|---|---|
CockroachDBSaver
|
CockroachDBSaver instance. |
get_tuple(config)
¶
Get a checkpoint tuple from the database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
RunnableConfig
|
Config identifying the checkpoint. |
required |
Returns:
| Type | Description |
|---|---|
CheckpointTuple | None
|
CheckpointTuple or None if not found. |
list(config, *, filter=None, before=None, limit=None)
¶
List checkpoints from the database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
RunnableConfig | None
|
Config for filtering checkpoints. |
required |
filter
|
dict[str, Any] | None
|
Additional metadata filtering criteria. |
None
|
before
|
RunnableConfig | None
|
Only return checkpoints before this checkpoint ID. |
None
|
limit
|
int | None
|
Maximum number of checkpoints to return. |
None
|
Yields:
| Type | Description |
|---|---|
CheckpointTuple
|
CheckpointTuple for each matching checkpoint. |
put(config, checkpoint, metadata, new_versions)
¶
Save a checkpoint to the database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
RunnableConfig
|
Config to associate with the checkpoint. |
required |
checkpoint
|
Checkpoint
|
The checkpoint to save. |
required |
metadata
|
CheckpointMetadata
|
Additional metadata. |
required |
new_versions
|
ChannelVersions
|
New channel versions as of this write. |
required |
Returns:
| Type | Description |
|---|---|
RunnableConfig
|
Updated configuration with checkpoint ID. |
put_writes(config, writes, task_id, task_path='')
¶
Store intermediate writes linked to a checkpoint.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
RunnableConfig
|
Configuration of the related checkpoint. |
required |
writes
|
Sequence[tuple[str, Any]]
|
List of (channel, value) pairs to store. |
required |
task_id
|
str
|
Identifier for the task creating the writes. |
required |
task_path
|
str
|
Path of the task creating the writes. |
''
|
setup()
¶
Set up the checkpoint database.
Creates necessary tables and runs migrations. MUST be called the first time the checkpointer is used.
AsyncCockroachDBSaver¶
Asynchronous checkpointer for LangGraph workflows.
langchain_cockroachdb.checkpointer.async_saver.AsyncCockroachDBSaver
¶
Bases: BaseCockroachDBSaver
Asynchronous checkpointer that stores checkpoints in CockroachDB.
Uses raw psycopg3 async connections (not SQLAlchemy) for compatibility with LangGraph's checkpointer interface.
Example
from langchain_cockroachdb import AsyncCockroachDBSaver
DB_URI = "cockroachdb://root@localhost:26257/defaultdb?sslmode=disable"
async with AsyncCockroachDBSaver.from_conn_string(DB_URI) as checkpointer:
await checkpointer.setup()
# Use with LangGraph
graph = workflow.compile(checkpointer=checkpointer)
adelete_thread(thread_id)
async
¶
Delete all checkpoints and writes for a thread.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
The thread ID to delete. |
required |
aget_tuple(config)
async
¶
Get a checkpoint tuple from the database asynchronously.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
RunnableConfig
|
Config identifying the checkpoint. |
required |
Returns:
| Type | Description |
|---|---|
CheckpointTuple | None
|
CheckpointTuple or None if not found. |
alist(config, *, filter=None, before=None, limit=None)
async
¶
List checkpoints from the database asynchronously.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
RunnableConfig | None
|
Base configuration for filtering checkpoints. |
required |
filter
|
dict[str, Any] | None
|
Additional metadata filtering criteria. |
None
|
before
|
RunnableConfig | None
|
Only return checkpoints before this checkpoint ID. |
None
|
limit
|
int | None
|
Maximum number of checkpoints to return. |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[CheckpointTuple]
|
CheckpointTuple for each matching checkpoint. |
aput(config, checkpoint, metadata, new_versions)
async
¶
Save a checkpoint to the database asynchronously.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
RunnableConfig
|
Config to associate with the checkpoint. |
required |
checkpoint
|
Checkpoint
|
The checkpoint to save. |
required |
metadata
|
CheckpointMetadata
|
Additional metadata. |
required |
new_versions
|
ChannelVersions
|
New channel versions as of this write. |
required |
Returns:
| Type | Description |
|---|---|
RunnableConfig
|
Updated configuration with checkpoint ID. |
aput_writes(config, writes, task_id, task_path='')
async
¶
Store intermediate writes linked to a checkpoint asynchronously.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
RunnableConfig
|
Configuration of the related checkpoint. |
required |
writes
|
Sequence[tuple[str, Any]]
|
List of (channel, value) pairs to store. |
required |
task_id
|
str
|
Identifier for the task creating the writes. |
required |
task_path
|
str
|
Path of the task creating the writes. |
''
|
delete_thread(thread_id)
¶
Delete thread (sync wrapper for adelete_thread).
from_conn_string(conn_string, *, pipeline=False, serde=None)
async
classmethod
¶
Create a new AsyncCockroachDBSaver from a connection string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conn_string
|
str
|
CockroachDB connection string. Supports cockroachdb:// and postgresql:// formats. |
required |
pipeline
|
bool
|
Whether to use AsyncPipeline mode. |
False
|
serde
|
SerializerProtocol | None
|
Custom serializer protocol. |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[AsyncCockroachDBSaver]
|
AsyncCockroachDBSaver instance. |
get_tuple(config)
¶
Get a checkpoint tuple (sync wrapper for aget_tuple).
list(config, *, filter=None, before=None, limit=None)
¶
List checkpoints (sync wrapper for alist).
put(config, checkpoint, metadata, new_versions)
¶
Save a checkpoint (sync wrapper for aput).
put_writes(config, writes, task_id, task_path='')
¶
Store intermediate writes (sync wrapper for aput_writes).
setup()
async
¶
Set up the checkpoint database asynchronously.
Creates necessary tables and runs migrations. MUST be called the first time the checkpointer is used.
Key Methods¶
Shared (Sync & Async)¶
| Method | Async Method | Description |
|---|---|---|
setup() |
setup() |
Create tables and run migrations |
put() |
aput() |
Save a checkpoint |
get_tuple() |
aget_tuple() |
Retrieve a checkpoint |
list() |
alist() |
List checkpoints with optional filters |
put_writes() |
aput_writes() |
Store intermediate writes |
delete_thread() |
adelete_thread() |
Delete all checkpoints for a thread |
Factory Methods¶
| Method | Description |
|---|---|
from_conn_string(conn_string) |
Create saver from a connection string (context manager) |
Connection Types¶
Both savers accept these connection types:
| Type | Description |
|---|---|
psycopg.Connection / AsyncConnection |
Single connection (simple scripts) |
psycopg_pool.ConnectionPool / AsyncConnectionPool |
Connection pool (production) |
Examples¶
See Checkpointer Guide for comprehensive usage examples.