Async vs Sync Guide¶
Understanding when to use async and sync APIs.
Overview¶
langchain-cockroachdb provides both async and sync interfaces: - Async (recommended): High-performance, I/O concurrency - Sync: Simple wrapper for scripts and batch jobs
Quick Comparison¶
| Feature | Async | Sync |
|---|---|---|
| Throughput | High (100-1000+ req/s) | Low (10-50 req/s) |
| I/O Handling | Non-blocking | Blocking |
| Concurrency | Native | Limited |
| Code Style | async/await |
Regular Python |
| Best For | Web apps, APIs, high load | Scripts, batch jobs, simple tools |
Async API¶
When to Use¶
✅ Web Applications
# FastAPI, Django Async, etc.
from fastapi import FastAPI
from langchain_cockroachdb import AsyncCockroachDBVectorStore
app = FastAPI()
@app.get("/search")
async def search(query: str):
results = await vectorstore.asimilarity_search(query, k=5)
return results
✅ High Concurrent Load
# Handle 100s of requests simultaneously
async def handle_many_requests(queries: list[str]):
tasks = [
vectorstore.asimilarity_search(q, k=5)
for q in queries
]
results = await asyncio.gather(*tasks)
return results
✅ Modern LLM APIs
# OpenAI, Anthropic APIs are async
async def rag_pipeline(query: str):
# Vector search (async)
docs = await vectorstore.asimilarity_search(query, k=5)
# LLM call (async)
response = await llm.ainvoke(docs)
return response
✅ Streaming Applications
Basic Usage¶
import asyncio
from langchain_cockroachdb import AsyncCockroachDBVectorStore, CockroachDBEngine
async def main():
# Initialize
engine = CockroachDBEngine.from_connection_string(
"cockroachdb://..."
)
await engine.ainit_vectorstore_table(
table_name="docs",
vector_dimension=1536,
)
vectorstore = AsyncCockroachDBVectorStore(
engine=engine,
embeddings=embeddings,
collection_name="docs",
)
# Add documents
ids = await vectorstore.aadd_texts([
"Document 1",
"Document 2",
])
# Search
results = await vectorstore.asimilarity_search(
"query",
k=5
)
# Cleanup
await engine.aclose()
# Run
asyncio.run(main())
Async Methods¶
All methods prefixed with a:
- aadd_texts() - Add documents
- aadd_documents() - Add document objects
- asimilarity_search() - Search
- asimilarity_search_with_score() - Search with scores
- adelete() - Delete documents
- aapply_vector_index() - Create index
- adrop_vector_index() - Drop index
Sync API¶
When to Use¶
✅ Simple Scripts
# One-off data processing
from langchain_cockroachdb import CockroachDBVectorStore
vectorstore = CockroachDBVectorStore(
engine=engine,
embeddings=embeddings,
collection_name="docs",
)
# No async/await needed
ids = vectorstore.add_texts(["Doc 1", "Doc 2"])
results = vectorstore.similarity_search("query", k=5)
✅ Batch Jobs
# Sequential processing
for batch in large_dataset:
ids = vectorstore.add_texts(batch)
print(f"Processed {len(ids)} documents")
✅ Interactive Development
# Jupyter notebooks, REPL
vectorstore.add_texts(["Test doc"])
results = vectorstore.similarity_search("test", k=1)
print(results)
✅ Legacy Code
# Integrating with existing sync codebase
def process_documents(docs: list[str]):
# No need to refactor to async
return vectorstore.add_texts(docs)
Basic Usage¶
from langchain_cockroachdb import CockroachDBVectorStore, CockroachDBEngine
# Initialize (sync wrapper handles async internally)
engine = CockroachDBEngine.from_connection_string(
"cockroachdb://..."
)
# Sync API - no async/await
engine.init_vectorstore_table(
table_name="docs",
vector_dimension=1536,
)
vectorstore = CockroachDBVectorStore(
engine=engine,
embeddings=embeddings,
collection_name="docs",
)
# Add documents (sync)
ids = vectorstore.add_texts([
"Document 1",
"Document 2",
])
# Search (sync)
results = vectorstore.similarity_search("query", k=5)
# Cleanup
engine.close()
Sync Methods¶
Same names without a prefix:
- add_texts() - Add documents
- add_documents() - Add document objects
- similarity_search() - Search
- similarity_search_with_score() - Search with scores
- delete() - Delete documents
- apply_vector_index() - Create index
- drop_vector_index() - Drop index
Performance Comparison¶
Throughput Test¶
Async:
async def async_benchmark():
# 100 concurrent searches
queries = ["query"] * 100
start = time.time()
tasks = [
vectorstore.asimilarity_search(q, k=5)
for q in queries
]
await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f"Async: {100/elapsed:.0f} queries/sec")
# Result: ~800 queries/sec
Sync:
def sync_benchmark():
# 100 sequential searches
queries = ["query"] * 100
start = time.time()
for q in queries:
vectorstore.similarity_search(q, k=5)
elapsed = time.time() - start
print(f"Sync: {100/elapsed:.0f} queries/sec")
# Result: ~40 queries/sec
Result: Async is 20x faster for concurrent operations.
Why Async is Faster¶
Sync (blocking):
Query 1: [DB wait] [process] ────┐
Query 2: [DB wait] [process] ────┐
Query 3: [DB wait] [process]
Total: 300ms
Async (non-blocking):
Query 1: [DB wait] [process]
Query 2: [DB wait] [process]
Query 3: [DB wait] [process]
▲ All waiting simultaneously
Total: 100ms
Hybrid Approach¶
Async Function with Sync Wrapper¶
# Define async logic
async def complex_rag_pipeline(query: str) -> str:
docs = await vectorstore.asimilarity_search(query, k=5)
response = await llm.ainvoke(docs)
return response.content
# Provide sync wrapper for simple use
def rag(query: str) -> str:
"""Sync wrapper for RAG pipeline."""
return asyncio.run(complex_rag_pipeline(query))
# Use sync wrapper in scripts
result = rag("What is CockroachDB?")
print(result)
Run Async from Sync¶
import asyncio
def sync_function():
# Run async code from sync context
result = asyncio.run(vectorstore.asimilarity_search("query", k=5))
return result
Use Both APIs¶
# Async for web endpoints
@app.get("/search")
async def search(query: str):
return await async_vectorstore.asimilarity_search(query, k=5)
# Sync for admin scripts
def backfill_data(documents: list):
sync_vectorstore.add_documents(documents)
Common Patterns¶
Web Application (FastAPI)¶
from fastapi import FastAPI
from langchain_cockroachdb import AsyncCockroachDBVectorStore, CockroachDBEngine
app = FastAPI()
# Initialize at startup
@app.on_event("startup")
async def startup():
global vectorstore
engine = CockroachDBEngine.from_connection_string(
os.getenv("COCKROACHDB_URL")
)
vectorstore = AsyncCockroachDBVectorStore(
engine=engine,
embeddings=embeddings,
collection_name="docs",
)
# Async endpoints
@app.get("/search")
async def search(query: str, k: int = 5):
results = await vectorstore.asimilarity_search(query, k=k)
return {"results": [doc.page_content for doc in results]}
@app.post("/add")
async def add_document(content: str):
ids = await vectorstore.aadd_texts([content])
return {"id": ids[0]}
Data Migration Script¶
from langchain_cockroachdb import CockroachDBVectorStore
def migrate_documents(source_file: str):
"""Simple sync script for one-time migration."""
vectorstore = CockroachDBVectorStore(
engine=engine,
embeddings=embeddings,
collection_name="docs",
)
# Read documents
with open(source_file) as f:
docs = f.readlines()
# Process in batches
batch_size = 100
for i in range(0, len(docs), batch_size):
batch = docs[i:i+batch_size]
ids = vectorstore.add_texts(batch)
print(f"Migrated batch {i//batch_size + 1}: {len(ids)} docs")
# Run
migrate_documents("documents.txt")
Jupyter Notebook¶
# Sync API works well in notebooks
from langchain_cockroachdb import CockroachDBVectorStore
vectorstore = CockroachDBVectorStore(
engine=engine,
embeddings=embeddings,
collection_name="docs",
)
# Interactive exploration
vectorstore.add_texts(["Test document"])
results = vectorstore.similarity_search("test", k=1)
print(results[0].page_content)
Best Practices¶
1. Choose Based on Context¶
| Context | Use | Why |
|---|---|---|
| Web API | Async | High concurrency |
| CLI Tool | Sync | Simplicity |
| Data Pipeline | Async | Better throughput |
| Jupyter | Sync | Easier debugging |
2. Don't Mix in Same Function¶
# Bad: Mixing async and sync
async def bad_example():
await vectorstore.aadd_texts(["doc1"])
results = vectorstore.similarity_search("query", k=5) # Wrong!
# Good: Use one or the other
async def good_async_example():
await vectorstore.aadd_texts(["doc1"])
results = await vectorstore.asimilarity_search("query", k=5)
def good_sync_example():
vectorstore.add_texts(["doc1"])
results = vectorstore.similarity_search("query", k=5)
3. Async All the Way¶
# If using async, use it throughout
async def full_async_pipeline(query: str):
# All async
docs = await vectorstore.asimilarity_search(query, k=5)
response = await llm.ainvoke(docs)
await log_to_db(query, response)
return response
4. Close Connections Properly¶
# Async
try:
await vectorstore.aadd_texts(texts)
finally:
await engine.aclose()
# Sync
try:
vectorstore.add_texts(texts)
finally:
engine.close()
Troubleshooting¶
"RuntimeError: asyncio.run() cannot be called from a running event loop"¶
You're trying to use asyncio.run() inside an async function:
# Wrong
async def my_function():
result = asyncio.run(vectorstore.asimilarity_search("query"))
# Right
async def my_function():
result = await vectorstore.asimilarity_search("query")
Slow Performance with Sync API¶
Consider switching to async if you have: - Multiple concurrent operations - I/O-bound workload - Web application with many users
# Before (slow)
for query in queries:
vectorstore.similarity_search(query, k=5)
# After (fast)
async def process_queries(queries):
tasks = [
vectorstore.asimilarity_search(q, k=5)
for q in queries
]
return await asyncio.gather(*tasks)
Next Steps¶
- Configuration - Optimize connection pools
- Examples - See both APIs in action
- API Reference - Full method documentation