Asynchronous Usage

DataBridge provides full asynchronous support through the AsyncDataBridge class. This guide demonstrates how to use DataBridge in asynchronous applications.

Getting Started

Import and initialize the async client:

from databridge.async_ import AsyncDataBridge

# Using async context manager
async with AsyncDataBridge() as db:
    # Your async code here
    pass

# Manual initialization
db = AsyncDataBridge()
try:
    # Your async code here
    pass
finally:
    await db.close()  # Always close the client

Basic Operations

The async API mirrors the synchronous API but uses async/await syntax:

import asyncio
from databridge.async_ import AsyncDataBridge

async def main():
    async with AsyncDataBridge() as db:
        # Ingest a document
        doc = await db.ingest_text(
            "Machine learning is the study of computer algorithms that can improve automatically through experience.",
            metadata={"category": "tech", "topic": "machine learning"}
        )
        
        print(f"Document ID: {doc.external_id}")
        
        # Retrieve chunks
        chunks = await db.retrieve_chunks(
            "What is machine learning?",
            filters={"category": "tech"}
        )
        
        for chunk in chunks:
            print(f"Score: {chunk.score}")
            print(f"Content: {chunk.content}")
            print("-" * 50)

# Run the async function
asyncio.run(main())

Parallel Operations

One of the key benefits of async is the ability to run operations in parallel:

import asyncio
from databridge.async_ import AsyncDataBridge

async def main():
    async with AsyncDataBridge() as db:
        # Run multiple operations in parallel
        results = await asyncio.gather(
            db.retrieve_chunks("machine learning"),
            db.retrieve_chunks("natural language processing"),
            db.retrieve_chunks("computer vision")
        )
        
        # Process results
        for i, chunks in enumerate(results):
            print(f"Query {i+1} results:")
            for chunk in chunks:
                print(f"- {chunk.score}: {chunk.content[:100]}...")
            print()

# Run the async function
asyncio.run(main())

Batched Document Processing

Process multiple documents efficiently:

import asyncio
from databridge.async_ import AsyncDataBridge

async def process_documents(db, document_ids):
    # Process documents in parallel with concurrency limit
    semaphore = asyncio.Semaphore(5)  # Limit concurrent tasks
    
    async def process_doc(doc_id):
        async with semaphore:
            doc = await db.get_document(doc_id)
            # Process document...
            return doc
    
    # Create tasks for all documents
    tasks = [process_doc(doc_id) for doc_id in document_ids]
    
    # Run all tasks and collect results
    return await asyncio.gather(*tasks)

async def main():
    async with AsyncDataBridge() as db:
        # Get list of documents
        docs = await db.list_documents(limit=20)
        doc_ids = [doc.external_id for doc in docs]
        
        # Process in parallel
        processed_docs = await process_documents(db, doc_ids)
        print(f"Processed {len(processed_docs)} documents")

# Run the async function
asyncio.run(main())

Async Cache Operations

Working with caches asynchronously:

import asyncio
from databridge.async_ import AsyncDataBridge

async def main():
    async with AsyncDataBridge() as db:
        # Create a cache
        await db.create_cache(
            name="research_cache",
            model="llama2",
            gguf_file="llama-2-7b-chat.Q4_K_M.gguf",
            filters={"type": "research"}
        )
        
        # Get the cache
        cache = await db.get_cache("research_cache")
        
        # Update the cache
        success = await cache.update()
        print(f"Cache update {'successful' if success else 'failed'}")
        
        # Add specific documents
        await cache.add_docs(["doc_123", "doc_456"])
        
        # Query the cache
        response = await cache.query(
            "Summarize the key findings",
            max_tokens=500
        )
        
        print(response.completion)

# Run the async function
asyncio.run(main())

Integration with Web Frameworks

FastAPI Example

from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
from databridge.async_ import AsyncDataBridge

app = FastAPI()

# DataBridge client as a dependency
async def get_databridge():
    db = AsyncDataBridge()
    try:
        yield db
    finally:
        await db.close()

class QueryRequest(BaseModel):
    query: str
    filters: Optional[Dict[str, Any]] = None
    k: int = 4

class DocumentResponse(BaseModel):
    id: str
    score: float
    metadata: Dict[str, Any]

@app.post("/search", response_model=List[DocumentResponse])
async def search(request: QueryRequest, db: AsyncDataBridge = Depends(get_databridge)):
    try:
        results = await db.retrieve_docs(
            query=request.query,
            filters=request.filters,
            k=request.k
        )
        
        return [
            DocumentResponse(
                id=doc.document_id,
                score=doc.score,
                metadata=doc.metadata
            ) for doc in results
        ]
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/query")
async def query(request: QueryRequest, db: AsyncDataBridge = Depends(get_databridge)):
    try:
        response = await db.query(
            query=request.query,
            filters=request.filters,
            k=request.k
        )
        
        return {"completion": response.completion}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Best Practices

  1. Always Close the Client: Use async context managers or explicitly call await db.close() to ensure proper cleanup.

  2. Control Concurrency: Use semaphores to limit concurrent operations to avoid overwhelming resources.

  3. Error Handling: Properly handle exceptions in async code to prevent unhandled promise rejections.

async def safe_operation():
    try:
        # Async operations
        pass
    except Exception as e:
        print(f"Error: {str(e)}")
        # Handle error
  1. Cancel Long Operations: Implement cancellation for long-running operations.
async def with_timeout(coroutine, timeout_seconds=10):
    try:
        return await asyncio.wait_for(coroutine, timeout=timeout_seconds)
    except asyncio.TimeoutError:
        print("Operation timed out")
        return None
  1. Use Task Groups: For Python 3.11+, use task groups for cleaner async code.
# Python 3.11+
async def main():
    async with AsyncDataBridge() as db:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(db.retrieve_chunks("query 1"))
            task2 = tg.create_task(db.retrieve_chunks("query 2"))
        
        results1 = task1.result()
        results2 = task2.result()