Skip to content

Checkpointer API

LangGraph checkpoint savers for CockroachDB.

CockroachDBSaver

Synchronous checkpointer for LangGraph workflows.

langchain_cockroachdb.checkpointer.saver.CockroachDBSaver

Bases: BaseCockroachDBSaver

Synchronous checkpointer that stores checkpoints in CockroachDB.

Uses raw psycopg3 connections (not SQLAlchemy) for compatibility with LangGraph's checkpointer interface.

Example
from langchain_cockroachdb import CockroachDBSaver

DB_URI = "cockroachdb://root@localhost:26257/defaultdb?sslmode=disable"
with CockroachDBSaver.from_conn_string(DB_URI) as checkpointer:
    checkpointer.setup()
    # Use with LangGraph
    graph = workflow.compile(checkpointer=checkpointer)

delete_thread(thread_id)

Delete all checkpoints and writes for a thread.

Parameters:

Name Type Description Default
thread_id str

The thread ID to delete.

required

from_conn_string(conn_string, *, pipeline=False) classmethod

Create a new CockroachDBSaver from a connection string.

Parameters:

Name Type Description Default
conn_string str

CockroachDB connection string. Supports cockroachdb:// and postgresql:// formats.

required
pipeline bool

Whether to use Pipeline mode (may not be supported by all CockroachDB versions).

False

Yields:

Type Description
CockroachDBSaver

CockroachDBSaver instance.

get_tuple(config)

Get a checkpoint tuple from the database.

Parameters:

Name Type Description Default
config RunnableConfig

Config identifying the checkpoint.

required

Returns:

Type Description
CheckpointTuple | None

CheckpointTuple or None if not found.

list(config, *, filter=None, before=None, limit=None)

List checkpoints from the database.

Parameters:

Name Type Description Default
config RunnableConfig | None

Config for filtering checkpoints.

required
filter dict[str, Any] | None

Additional metadata filtering criteria.

None
before RunnableConfig | None

Only return checkpoints before this checkpoint ID.

None
limit int | None

Maximum number of checkpoints to return.

None

Yields:

Type Description
CheckpointTuple

CheckpointTuple for each matching checkpoint.

put(config, checkpoint, metadata, new_versions)

Save a checkpoint to the database.

Parameters:

Name Type Description Default
config RunnableConfig

Config to associate with the checkpoint.

required
checkpoint Checkpoint

The checkpoint to save.

required
metadata CheckpointMetadata

Additional metadata.

required
new_versions ChannelVersions

New channel versions as of this write.

required

Returns:

Type Description
RunnableConfig

Updated configuration with checkpoint ID.

put_writes(config, writes, task_id, task_path='')

Store intermediate writes linked to a checkpoint.

Parameters:

Name Type Description Default
config RunnableConfig

Configuration of the related checkpoint.

required
writes Sequence[tuple[str, Any]]

List of (channel, value) pairs to store.

required
task_id str

Identifier for the task creating the writes.

required
task_path str

Path of the task creating the writes.

''

setup()

Set up the checkpoint database.

Creates necessary tables and runs migrations. MUST be called the first time the checkpointer is used.

AsyncCockroachDBSaver

Asynchronous checkpointer for LangGraph workflows.

langchain_cockroachdb.checkpointer.async_saver.AsyncCockroachDBSaver

Bases: BaseCockroachDBSaver

Asynchronous checkpointer that stores checkpoints in CockroachDB.

Uses raw psycopg3 async connections (not SQLAlchemy) for compatibility with LangGraph's checkpointer interface.

Example
from langchain_cockroachdb import AsyncCockroachDBSaver

DB_URI = "cockroachdb://root@localhost:26257/defaultdb?sslmode=disable"
async with AsyncCockroachDBSaver.from_conn_string(DB_URI) as checkpointer:
    await checkpointer.setup()
    # Use with LangGraph
    graph = workflow.compile(checkpointer=checkpointer)

adelete_thread(thread_id) async

Delete all checkpoints and writes for a thread.

Parameters:

Name Type Description Default
thread_id str

The thread ID to delete.

required

aget_tuple(config) async

Get a checkpoint tuple from the database asynchronously.

Parameters:

Name Type Description Default
config RunnableConfig

Config identifying the checkpoint.

required

Returns:

Type Description
CheckpointTuple | None

CheckpointTuple or None if not found.

alist(config, *, filter=None, before=None, limit=None) async

List checkpoints from the database asynchronously.

Parameters:

Name Type Description Default
config RunnableConfig | None

Base configuration for filtering checkpoints.

required
filter dict[str, Any] | None

Additional metadata filtering criteria.

None
before RunnableConfig | None

Only return checkpoints before this checkpoint ID.

None
limit int | None

Maximum number of checkpoints to return.

None

Yields:

Type Description
AsyncIterator[CheckpointTuple]

CheckpointTuple for each matching checkpoint.

aput(config, checkpoint, metadata, new_versions) async

Save a checkpoint to the database asynchronously.

Parameters:

Name Type Description Default
config RunnableConfig

Config to associate with the checkpoint.

required
checkpoint Checkpoint

The checkpoint to save.

required
metadata CheckpointMetadata

Additional metadata.

required
new_versions ChannelVersions

New channel versions as of this write.

required

Returns:

Type Description
RunnableConfig

Updated configuration with checkpoint ID.

aput_writes(config, writes, task_id, task_path='') async

Store intermediate writes linked to a checkpoint asynchronously.

Parameters:

Name Type Description Default
config RunnableConfig

Configuration of the related checkpoint.

required
writes Sequence[tuple[str, Any]]

List of (channel, value) pairs to store.

required
task_id str

Identifier for the task creating the writes.

required
task_path str

Path of the task creating the writes.

''

delete_thread(thread_id)

Delete thread (sync wrapper for adelete_thread).

from_conn_string(conn_string, *, pipeline=False, serde=None) async classmethod

Create a new AsyncCockroachDBSaver from a connection string.

Parameters:

Name Type Description Default
conn_string str

CockroachDB connection string. Supports cockroachdb:// and postgresql:// formats.

required
pipeline bool

Whether to use AsyncPipeline mode.

False
serde SerializerProtocol | None

Custom serializer protocol.

None

Yields:

Type Description
AsyncIterator[AsyncCockroachDBSaver]

AsyncCockroachDBSaver instance.

get_tuple(config)

Get a checkpoint tuple (sync wrapper for aget_tuple).

list(config, *, filter=None, before=None, limit=None)

List checkpoints (sync wrapper for alist).

put(config, checkpoint, metadata, new_versions)

Save a checkpoint (sync wrapper for aput).

put_writes(config, writes, task_id, task_path='')

Store intermediate writes (sync wrapper for aput_writes).

setup() async

Set up the checkpoint database asynchronously.

Creates necessary tables and runs migrations. MUST be called the first time the checkpointer is used.

Key Methods

Shared (Sync & Async)

Method Async Method Description
setup() setup() Create tables and run migrations
put() aput() Save a checkpoint
get_tuple() aget_tuple() Retrieve a checkpoint
list() alist() List checkpoints with optional filters
put_writes() aput_writes() Store intermediate writes
delete_thread() adelete_thread() Delete all checkpoints for a thread

Factory Methods

Method Description
from_conn_string(conn_string) Create saver from a connection string (context manager)

Connection Types

Both savers accept these connection types:

Type Description
psycopg.Connection / AsyncConnection Single connection (simple scripts)
psycopg_pool.ConnectionPool / AsyncConnectionPool Connection pool (production)

Examples

See Checkpointer Guide for comprehensive usage examples.