vd.asynchronous
Async support for vd: universal wrapper + opt-in native implementations.
This module gives every vd backend an async/await surface day one,
without forking the adapter hierarchy. Two pieces:
AsyncCollectionWrapper/AsyncClientWrapper— thin adapters that take any syncvd.Collection/vd.Clientand dispatch every method toasyncio.to_thread(). This is the universal fallback: every backend works through it.connect_async()— the entry point. Mirrorsvd.connect(). If a backend ships a native async client (Phase 2 follow-ups: chroma, qdrant, weaviate, elasticsearch, redis, mongodb, lancedb, milvus, pinecone, turbopuffer),connect_async()returns that; otherwise it returns the wrapper.
The asyncio.to_thread wrapper does not unblock the event loop — it just
moves blocking calls off the main thread, freeing the loop. For real
non-blocking I/O against a network backend, use a client that satisfies
vd.SupportsNativeAsync.
The module name is vd.asynchronous (not vd.async) because async
is a Python keyword.
- class vd.asynchronous.AsyncClientWrapper(sync_client: Any)[source]
Adapt a sync
Clientto theAsyncClientcontract by dispatching every method toasyncio.to_thread().Use
connect_async()rather than instantiating this directly.- Parameters:
sync_client – A live
Client(typically obtained fromvd.connect()).
- native_async
Always
Falsefor this wrapper.- Type:
bool
- property client: Any
Pass through to the wrapped client’s
client.
- async create_collection(name: str, *, dimension: int | None = None, metric: str = 'cosine', **index_config) AsyncCollection[source]
Create a new collection; raise
ValueErrorif it exists.
- async get_collection(name: str) AsyncCollection[source]
Return an existing collection; raise
KeyErrorif absent.
- async get_or_create_collection(name: str, *, dimension: int | None = None, metric: str = 'cosine', **index_config) AsyncCollection[source]
Return collection
name, creating it if missing.
- class vd.asynchronous.AsyncCollectionWrapper(sync_collection: Any)[source]
Adapt a sync
Collectionto theAsyncCollectioncontract by dispatching every method toasyncio.to_thread().Use
connect_async()rather than instantiating this directly — it will pick this wrapper or a native async adapter as appropriate.- Parameters:
sync_collection – A live
Collection(typically obtained from aClient).
- native_async
Always
Falsefor this wrapper. The wrapper still satisfiesSupportsNativeAsyncstructurally (the attribute is present), but the boolean tells callers that I/O is happening in a thread pool rather than on the event loop. Prefer a native implementation for high-concurrency workloads.- Type:
bool
- async add_documents(documents: Iterable[Any], *, batch_size: int = 100) None[source]
Batch upsert — mirrors
add_documents().
- property native: Any
Pass through to the wrapped collection’s
native.
- native_async: bool = False
This wrapper offloads to a thread pool; it doesn’t do non-blocking I/O.
- async search(query: str | list[float], *, limit: int = 10, filter: dict[str, Any] | None = None, egress: Callable[[dict[str, Any]], Any] | None = None, **kwargs) AsyncIterator[dict[str, Any]][source]
Yield the
limitdocuments most similar toquery.The underlying search runs once on a worker thread; results stream from memory. (Most backends’ sync
searchalready returns a list or a fully-realized iterator under the hood.)
- async set(key: str, value: str | tuple | Document) None[source]
Insert or replace a document (idempotent upsert).
- property sync: Any
The underlying sync
Collection— a documented escape hatch.
- class vd.asynchronous.SupportsHybrid(*args, **kwargs)[source]
A collection that supports native hybrid (dense + lexical) search.
Hybrid search has no syntactic convergence across vector databases, so it is an opt-in capability, never baseline. Prefer the top-level
vd.hybrid_search()— it dispatches to this protocol when the collection implements it and falls back to a pure-Python BM25 + RRF fusion otherwise. Feature-discover directly only when you specifically need to refuse the fallback path:if isinstance(collection, SupportsHybrid): hits = collection.hybrid_search("query text", limit=20)
The portable contract is Reciprocal Rank Fusion (every native backend supports it). Weighted-blend (
alpha) and other backend-specific fusion variants are accepted via**kwargsand documented per adapter — they are not portable across backends.- Parameters:
query (str or list[float]) – Query text (embedded via the collection’s embedder if configured) or a pre-computed query vector for the dense side.
query_text (str, optional) – Explicit text for the lexical side. Defaults to
querywhenqueryis a string. Required whenqueryis a vector.limit (int) – Number of fused results to return.
filter (dict, optional) – Canonical
vdmetadata filter applied to both sub-searches.k_dense (int, optional) – How many results to fetch from each sub-search before fusion. Both default to
max(4 * limit, 50). Widen for higher recall.k_lexical (int, optional) – How many results to fetch from each sub-search before fusion. Both default to
max(4 * limit, 50). Widen for higher recall.rrf_k (int) – Reciprocal Rank Fusion constant (typically 60).
egress (callable, optional) – Transform applied to each fused result before it is yielded.
**kwargs – Backend-specific knobs (e.g.
alpha=0.7on weaviate,ranker="weighted"on milvus). Documented per adapter.
- async vd.asynchronous.connect_async(backend: str, **kwargs) AsyncClient[source]
Async sibling of
vd.connect().Returns an
AsyncClient. Today every backend goes through the universalAsyncClientWrapper(built onasyncio.to_thread()); Phase 2 follow-ups will plug in native async clients per backend, whichconnect_async()will return instead.- Parameters:
backend (str) – Backend name — same vocabulary as
vd.connect().**kwargs – Forwarded to
vd.connect().
- Returns:
A live async client.
awaitonce at session start:client = await vd.connect_async("memory")
- Return type:
Examples
>>> import asyncio, vd >>> async def go(): ... client = await vd.connect_async("memory") ... col = await client.create_collection("docs", dimension=2) ... await col.set("a", vd.Document(id="a", text="x", vector=[1.0, 0.0])) ... return await col.count() >>> asyncio.run(go()) 1
- async vd.asynchronous.hybrid_search_async(collection: AsyncCollection, query: str | list[float], *, query_text: str | None = None, limit: int = 10, filter: dict[str, Any] | None = None, k_dense: int | None = None, k_lexical: int | None = None, rrf_k: int = 60, lexical_search: Callable[[...], list[dict[str, Any]]] | None = None, egress: Callable[[dict[str, Any]], Any] | None = None, **kwargs) AsyncIterator[dict[str, Any]][source]
Async sibling of
vd.hybrid_search().If the wrapped sync collection’s class supports native hybrid (i.e. satisfies
SupportsHybrid), dispatches the whole fused call to a worker thread. Otherwise runs the universal client-side BM25 + RRF fallback in a worker thread too. In both cases the awaitable + async iterator interface stays uniform.Parameters mirror
vd.hybrid_search()exactly; see that function for the full docs.- Yields:
dict – Fused result dicts.
Examples
>>> import asyncio, vd >>> async def go(): ... client = await vd.connect_async("memory") ... col = await client.create_collection("docs", dimension=2) ... await col.set("a", vd.Document(id="a", text="cats", ... vector=[1.0, 0.0])) ... await col.set("b", vd.Document(id="b", text="dogs", ... vector=[0.0, 1.0])) ... hits = [] ... async for h in vd.hybrid_search_async(col, [0.9, 0.1], ... query_text="cats", limit=1): ... hits.append(h["id"]) ... return hits >>> asyncio.run(go()) ['a']