vd

vd — one Pythonic interface to every vector database.

vd is a facade over vector databases. Its purpose is to let you operate on any vectorDB, and switch between them with a one-argument change, while keeping each backend’s particular power one escape hatch away. It does three things:

  1. Chooserecommend_backend(), print_backends_table() and the provider registry help you (or an AI agent) pick the right backend.

  2. Set upcheck_requirements() and setup_guide() diagnose and walk you through installing and starting a backend.

  3. Operateconnect() returns a uniform client; collections behave as MutableMapping of Document plus a search() method.

Quick start

>>> import vd
>>> client = vd.connect('memory')          # switch DB = change this one word
>>> col = client.create_collection('docs')
>>> col['a'] = vd.Document(id='a', text='cats', vector=[1.0, 0.0])
>>> col['b'] = vd.Document(id='b', text='dogs', vector=[0.0, 1.0])
>>> [hit['id'] for hit in col.search([0.9, 0.1], limit=1)]
['a']

Embedding is external

vd stores and searches vectors. Turning text into vectors is another package’s job (e.g. ef). Pass an embedder to connect() only for the convenience of writing/searching raw text; otherwise pass Document objects carrying vectors, and pre-computed query vectors.

class vd.AbstractClient(*, embedder: Callable[[str], list[float]] | None = None, **config)[source]

Base class implementing the Client contract for adapters.

A Client is a Mapping[str, Collection]. A backend subclasses this and implements create_collection(), get_collection(), delete_collection(), and list_collections(); the mapping behavior, the get_or_create_collection() convenience, the client escape hatch, and context-manager support come for free.

Parameters:
  • embedder (callable, optional) – A text -> vector function. Passed to every collection so text inputs are accepted as a convenience. None (the default) makes the client vector-only.

  • **config – Backend-specific connection configuration.

backend_name: str = ''

The registry name of this backend (e.g. "chroma"). Adapters set it.

property client: Any

The raw backend client — a supported, documented escape hatch.

Drop to it for backend-specific operations the facade does not expose. Returns None for backends with no external client object (e.g. the in-memory backend).

close() None[source]

Release backend resources. Default no-op; adapters override as needed.

abstractmethod create_collection(name: str, *, dimension: int | None = None, metric: str = 'cosine', **index_config) Collection[source]

Create a new collection.

Parameters:
  • name (str) – Collection name.

  • dimension (int, optional) – Vector dimension. May be None for backends that can infer it from the first written vector; required up front by backends that cannot.

  • metric (str) – Distance metric: "cosine", "dot", or "l2".

  • **index_config – Backend-specific index tuning (HNSW M/ef, IVF nlist, …). Documented per adapter; never abstracted into a common enum.

Raises:

ValueError – If a collection of that name already exists.

abstractmethod delete_collection(name: str) None[source]

Drop a collection; raise KeyError if absent.

abstractmethod get_collection(name: str) Collection[source]

Return an existing collection; raise KeyError if absent.

get_or_create_collection(name: str, *, dimension: int | None = None, metric: str = 'cosine', **index_config) Collection[source]

Return the collection name, creating it if it does not exist.

The common idiom that every consumer otherwise re-implements as a try get_collection / except KeyError: create_collection.

abstractmethod list_collections() Iterator[str][source]

Iterate collection names.

class vd.AbstractCollection[source]

Base class implementing the Collection contract for adapters.

A backend subclasses this and implements the raw primitives below; everything users see is provided here, once, uniformly:

Subclass responsibilities (raw primitives)

_write(doc)

Upsert one document. Its vector is guaranteed non-None and dimension-checked.

_read(key) -> Document

Fetch one document; raise KeyError if absent.

_drop(key)

Delete one document; raise KeyError if absent.

_keys() -> Iterator[str]

Iterate document ids.

_count() -> int

Number of documents.

_query(vector, *, limit, filter, **kwargs) -> Iterable[SearchResult]

Raw nearest-neighbor search. filter is the canonical AST — the adapter translates it. Each result is a dict with at least id, text, score, metadata.

Optional overrides

_write_many(docs)

Efficient bulk upsert. Defaults to a loop over _write.

native (property)

The raw backend collection handle (escape hatch).

add_documents(documents: Iterable[str | tuple | Document], *, batch_size: int = 100) None[source]

Add many documents, embedding and writing them in batches.

Each item may be a string, a (text, ...) tuple, or a Document (see DocumentInput). Items without an id get a deterministic auto-generated one.

embed(text: str) list[float][source]

Embed text to a vector, or raise EmbeddingRequiredError.

This is the single place text becomes a vector inside vd.

property has_embedder: bool

Whether a text->vector embedder is configured on this collection.

property native: Any

The raw backend collection handle — a supported, documented escape hatch.

Use it to reach backend-specific features the facade does not expose, rather than circumventing vd. Returns None if the adapter has no distinct native object.

search(query: str | list[float], *, limit: int = 10, filter: dict[str, Any] | None = None, egress: Callable[[dict[str, Any]], Any] | None = None, **kwargs) Iterator[dict[str, Any]][source]

Return the limit documents most similar to query.

Parameters:
  • query (str or list[float]) – Query text (embedded via the client’s embedder) or a pre-computed query vector.

  • limit (int) – Maximum number of results.

  • filter (dict, optional) – Metadata filter in the canonical vd dialect (see vd.filters). Validated against this backend’s supported_filter_operators before the query runs, so an unsupported operator fails with a clear UnsupportedFilterError.

  • egress (callable, optional) – Transform applied to each result dict before it is yielded.

  • **kwargs – Backend-specific search options, passed through to _query.

Yields:

dict{"id", "text", "score", "metadata"} — or whatever egress returns. score is a higher-is-better, per-metric canonical similarity (see the “Score semantics” table at the top of vd.base): cosine in [-1, 1], dot in (-inf, +inf), l2 squashed to (0, 1]. Adapters whose backend returns a native combined-ranking score on a different scale (e.g. Elasticsearch, Atlas, Pinecone) document the deviation in their own docstring.

supported_filter_operators: frozenset = frozenset({})

the full language. Adapters narrow this; search() validates against it.

Type:

Filter operators this backend can honor. Default

supports_incremental_writes: bool = True

Whether the backend accepts writes after creation. Static-index backends set this False and raise StaticIndexError on write.

upsert(document: Document) None[source]

Insert or replace document (equivalent to self[doc.id] = doc).

class vd.AsyncClient(*args, **kwargs)[source]

The async sibling of Client.

Same operations — collection create / fetch / drop / list — exposed as awaitables and async iterators. Construct via vd.connect_async().

class vd.AsyncClientWrapper(sync_client: Any)[source]

Adapt a sync Client to the AsyncClient contract by dispatching every method to asyncio.to_thread().

Use connect_async() rather than instantiating this directly.

Parameters:

sync_client – A live Client (typically obtained from vd.connect()).

native_async

Always False for this wrapper.

Type:

bool

property client: Any

Pass through to the wrapped client’s client.

async close() None[source]

Release backend resources. Calls close() on the sync client if present.

async create_collection(name: str, *, dimension: int | None = None, metric: str = 'cosine', **index_config) AsyncCollection[source]

Create a new collection; raise ValueError if it exists.

async delete_collection(name: str) None[source]

Drop a collection; raise KeyError if absent.

async get_collection(name: str) AsyncCollection[source]

Return an existing collection; raise KeyError if absent.

async get_or_create_collection(name: str, *, dimension: int | None = None, metric: str = 'cosine', **index_config) AsyncCollection[source]

Return collection name, creating it if missing.

async list_collections() AsyncIterator[str][source]

Yield collection names.

property sync: Any

The underlying sync Client — a documented escape hatch.

class vd.AsyncCollection(*args, **kwargs)[source]

The async sibling of Collection.

Same conceptual surface — storage + search — but every method is awaitable and iterators are AsyncIterator. The mapping interface is exposed as explicit get / set / delete / keys / count methods (the stdlib’s MutableMapping ABC has no async counterpart; explicit methods are the Motor / aiopg convention).

Construct via vd.connect_async(); the universal AsyncCollectionWrapper in vd.asynchronous adapts every backend to this protocol by dispatching to the sync API through asyncio.to_thread(). Backends with native async SDKs override the wrapper and additionally satisfy SupportsNativeAsync.

class vd.AsyncCollectionWrapper(sync_collection: Any)[source]

Adapt a sync Collection to the AsyncCollection contract by dispatching every method to asyncio.to_thread().

Use connect_async() rather than instantiating this directly — it will pick this wrapper or a native async adapter as appropriate.

Parameters:

sync_collection – A live Collection (typically obtained from a Client).

native_async

Always False for this wrapper. The wrapper still satisfies SupportsNativeAsync structurally (the attribute is present), but the boolean tells callers that I/O is happening in a thread pool rather than on the event loop. Prefer a native implementation for high-concurrency workloads.

Type:

bool

async add_documents(documents: Iterable[Any], *, batch_size: int = 100) None[source]

Batch upsert — mirrors add_documents().

async count() int[source]

Return the number of documents.

async delete(key: str) None[source]

Delete a document; raises KeyError if absent.

async get(key: str) Document[source]

Fetch one document; raises KeyError if absent.

async keys() AsyncIterator[str][source]

Yield document ids.

property native: Any

Pass through to the wrapped collection’s native.

native_async: bool = False

This wrapper offloads to a thread pool; it doesn’t do non-blocking I/O.

async search(query: str | list[float], *, limit: int = 10, filter: dict[str, Any] | None = None, egress: Callable[[dict[str, Any]], Any] | None = None, **kwargs) AsyncIterator[dict[str, Any]][source]

Yield the limit documents most similar to query.

The underlying search runs once on a worker thread; results stream from memory. (Most backends’ sync search already returns a list or a fully-realized iterator under the hood.)

async set(key: str, value: str | tuple | Document) None[source]

Insert or replace a document (idempotent upsert).

property sync: Any

The underlying sync Collection — a documented escape hatch.

async upsert(document: Document) None[source]

Insert or replace document.

class vd.BM25Index(collection: ~vd.base.Collection, *, filter: dict | None = None, tokenize: ~typing.Callable[[str], list[str]] = <function _tokenize>)[source]

A reusable Okapi BM25 index over a vd collection’s stored text.

Builds the query-independent term statistics — per-document token lists, document frequencies, document lengths, and the mean length — once in __init__(), then answers many queries against them via search(). This is the build-once / query-many companion to bm25_lexical_search() (which builds a throwaway index for a single query): for batch evaluation or any repeated querying of the same collection it turns an O(N · Q) workload (re-tokenizing every document on every query) into O(N + scoring · Q).

Construction is O(N) in the collection size; each search() is O(matching documents). Fine for prototypes and collections up to ~100k documents; for larger workloads switch to a backend with a native text index (weaviate, elasticsearch, redis, …).

Parameters:
  • collection (Collection) – Any vd Collection (or mapping-like id -> obj exposing .text and .metadata). Documents whose text is empty contribute zero score and are dropped at build time.

  • filter (dict, optional) – Canonical vd metadata filter, applied once at build time (via vd.filters.matches_filter()) so the index covers only the matching documents and its statistics reflect that subset.

  • tokenize (Callable[[str], list[str]], optional) – Tokenizer (default: lowercased \w+ tokens). Pass a custom one for stemming, CJK, etc.

Examples

>>> import vd
>>> c = vd.connect('memory').create_collection('t', dimension=2)
>>> c['a'] = vd.Document(id='a', text='the quick brown fox', vector=[1.0, 0.0])
>>> c['b'] = vd.Document(id='b', text='lazy dog sleeps', vector=[0.0, 1.0])
>>> index = vd.BM25Index(c)
>>> index.search('quick fox', limit=1)[0]['id']
'a'
search(query_text: str, *, limit: int = 10, k1: float = 1.5, b: float = 0.75) list[dict[str, Any]][source]

Okapi BM25 scores for query_text over the indexed documents.

Returns result dicts in the same shape as Collection.search(){"id", "text", "score", "metadata"} — sorted by descending score. k1 / b are the standard Okapi hyperparameters (scoring-time, so one index can be queried with different settings).

exception vd.BackendNotInstalledError[source]

Raised when a known backend’s Python package is not installed.

Distinct from an unknown backend name (a plain ValueError): the backend exists in vd’s provider registry, but its client library is missing. The message carries the pip install command to fix it.

class vd.Client(*args, **kwargs)[source]

A live connection to one backend: Mapping[str, Collection].

Collections are created explicitly (so create-time parameters such as dimension and metric can be supplied) and fetched either by get_collection() or by mapping access client[name].

create_collection(name: str, *, dimension: int | None = None, metric: str = 'cosine', **index_config) Collection[source]

Create a new collection; raise ValueError if it exists.

delete_collection(name: str) None[source]

Drop a collection; raise KeyError if absent.

get_collection(name: str) Collection[source]

Return an existing collection; raise KeyError if absent.

list_collections() Iterator[str][source]

Iterate collection names.

class vd.Collection(*args, **kwargs)[source]

A collection of documents: MutableMapping[str, Document] + search.

The mapping half is storage; search() is the single retrieval extension. This minimal surface is everything vd’s tooling depends on. Batch insertion is an optional capability — see SupportsBatch.

search(query: str | list[float], *, limit: int = 10, filter: dict[str, Any] | None = None, egress: Callable[[dict[str, Any]], Any] | None = None, **kwargs) Iterator[dict[str, Any]][source]

Return the limit documents most similar to query.

class vd.Document(id: str, text: str = '', vector: list[float] | None = None, metadata: dict[str, ~typing.Any]=<factory>)[source]

The unit stored in a Collection.

Parameters:
  • id (str) – Unique identifier; the key under which the document lives in a collection.

  • text (str) – The text content. May be empty for vector-first use cases where no text is associated with a vector.

  • vector (list[float], optional) – The embedding. If None when written, the collection embeds text with its client’s embedder — or raises EmbeddingRequiredError if none is configured.

  • metadata (dict) – Arbitrary metadata, used for filtering and carried through search results.

Examples

>>> doc = Document(id="doc1", text="Hello world")
>>> doc.id, doc.text, doc.metadata
('doc1', 'Hello world', {})
>>> Document(id="v1", vector=[0.1, 0.2]).text
''
exception vd.EmbeddingRequiredError[source]

Raised when text is given but no embedder is configured.

vd operates on vectors. Passing raw text to collection[key] = text or collection.search(text) only works when the Client was created with an embedder. Otherwise, pass a Document with a vector (or a pre-computed query vector) directly.

exception vd.StaticIndexError[source]

Raised on a write to a static (immutable) index.

Some backends — notably a plain FAISS flat index — build an index that cannot accept incremental __setitem__ / __delitem__ after creation. Such collections set AbstractCollection.supports_incremental_writes to False and raise this on write. Callers branch on that flag before triggering the error, and use the adapter’s documented rebuild() path.

class vd.SupportsBatch(*args, **kwargs)[source]

A collection that supports efficient batch insertion.

add_documents and upsert are not part of the minimal Collection contract. Every adapter built on AbstractCollection happens to provide them, but generic code should still feature-discover:

if isinstance(collection, SupportsBatch):
    collection.add_documents(many_docs, batch_size=256)
class vd.SupportsHybrid(*args, **kwargs)[source]

A collection that supports native hybrid (dense + lexical) search.

Hybrid search has no syntactic convergence across vector databases, so it is an opt-in capability, never baseline. Prefer the top-level vd.hybrid_search() — it dispatches to this protocol when the collection implements it and falls back to a pure-Python BM25 + RRF fusion otherwise. Feature-discover directly only when you specifically need to refuse the fallback path:

if isinstance(collection, SupportsHybrid):
    hits = collection.hybrid_search("query text", limit=20)

The portable contract is Reciprocal Rank Fusion (every native backend supports it). Weighted-blend (alpha) and other backend-specific fusion variants are accepted via **kwargs and documented per adapter — they are not portable across backends.

Parameters:
  • query (str or list[float]) – Query text (embedded via the collection’s embedder if configured) or a pre-computed query vector for the dense side.

  • query_text (str, optional) – Explicit text for the lexical side. Defaults to query when query is a string. Required when query is a vector.

  • limit (int) – Number of fused results to return.

  • filter (dict, optional) – Canonical vd metadata filter applied to both sub-searches.

  • k_dense (int, optional) – How many results to fetch from each sub-search before fusion. Both default to max(4 * limit, 50). Widen for higher recall.

  • k_lexical (int, optional) – How many results to fetch from each sub-search before fusion. Both default to max(4 * limit, 50). Widen for higher recall.

  • rrf_k (int) – Reciprocal Rank Fusion constant (typically 60).

  • egress (callable, optional) – Transform applied to each fused result before it is yielded.

  • **kwargs – Backend-specific knobs (e.g. alpha=0.7 on weaviate, ranker="weighted" on milvus). Documented per adapter.

class vd.SupportsNativeAsync(*args, **kwargs)[source]

Marker protocol set on async clients/collections that use a backend’s native async SDK rather than the universal asyncio.to_thread() wrapper.

Why care: in high-concurrency event-loop apps (FastAPI, Starlette, etc.), a to_thread-wrapped backend still blocks a worker thread per request. For real non-blocking I/O, prefer collections that satisfy this protocol. The wrapper sets this attribute to False; native adapters set it to True. isinstance(c, SupportsNativeAsync) matches both — check c.native_async for the boolean.

class vd.TimeIndexedCollection(collection: ~collections.abc.MutableMapping, *, ts_field: str = 'ts', ts_parser: ~collections.abc.Callable[[~typing.Any], ~datetime.datetime] = <function to_datetime>)[source]

Time-indexed wrapper over any vd Collection.

Maintains a sorted (ts_epoch, id) index alongside the underlying collection. Each stored document MUST carry a timestamp in its metadata under ts_field (default "ts"). The stored value is normalized to an ISO-8601 string so backend-side filtering remains usable.

Parameters:
  • collection – Any vd Collection (MutableMapping + search).

  • ts_field – Metadata key holding the timestamp.

  • ts_parser – Optional custom parser Any -> datetime. Defaults to to_datetime().

Notes

The index is rebuilt on construction from whatever the underlying collection already contains (so the wrapper is safe to re-wrap a persisted collection across process restarts).

property base: MutableMapping

The wrapped underlying collection.

query_window(start: str | datetime | int | float | None = None, end: str | datetime | int | float | None = None, *, filt: dict[str, Any] | None = None) Iterator[Document][source]

Yield documents with start <= ts < end, in chronological order.

start / end may be None for half-open infinity. filt is an optional MongoDB-style predicate applied to document metadata, evaluated client-side (so it works on any backend).

reindex() None[source]

Force-rebuild the in-memory time index from the underlying collection.

search_window(query: str | Sequence[float], *, start: str | datetime | int | float | None = None, end: str | datetime | int | float | None = None, limit: int = 10, filt: dict[str, Any] | None = None, **kwargs) Iterator[dict][source]

Semantic search restricted to a time window.

Builds a metadata filter on ts_field and delegates to the underlying collection’s search. Falls back to a client-side post-filter for backends that don’t honor the filter.

time_range() tuple[datetime, datetime] | None[source]

Return (min_ts, max_ts) as aware datetimes, or None if empty.

>>> from vd import connect, Document
>>> import hashlib
>>> emb = lambda t: [b/128.0-1.0 for b in hashlib.md5(t.encode()).digest()[:4]]
>>> col = connect('memory', embedder=emb).create_collection('t')
>>> t = TimeIndexedCollection(col)
>>> t['a'] = Document(id='a', text='x', metadata={'ts': '2025-01-01'})
>>> t['b'] = Document(id='b', text='y', metadata={'ts': '2025-03-01'})
>>> [d.isoformat() for d in t.time_range()]
['2025-01-01T00:00:00+00:00', '2025-03-01T00:00:00+00:00']
window_iter(window: str | ~datetime.timedelta | int | float = '1d', *, start: str | ~datetime.datetime | int | float | None = None, end: str | ~datetime.datetime | int | float | None = None, reducer: ~collections.abc.Callable[[~collections.abc.Iterable[~vd.base.Document]], ~typing.Any] = <function count_docs>, skip_empty: bool = False, align: bool = True) Iterator[tuple[datetime, datetime, Any]][source]

Yield (window_start, window_end, reducer_value) over fixed windows.

Parameters:
  • window – Window size. See parse_window() for accepted forms.

  • start – Override the data range. Default: actual min/max ts in the index.

  • end – Override the data range. Default: actual min/max ts in the index.

  • reducer – Callable taking the iterable of in-window ``Document``s. Default is count_docs(). See also mean_vector().

  • skip_empty – If True, omit windows that contained zero documents.

  • align – If True (default), align start to the previous midnight (for daily windows) or to window-rounded boundary so downstream joins are clean. If False, use the literal start.

exception vd.UnsupportedCapabilityError[source]

Raised when an operation needs a capability the backend lacks.

Prefer feature-discovery — isinstance(collection, SupportsHybrid) — over catching this, but it is the clear, typed fallback when an optional operation is called on a backend that does not implement it.

exception vd.UnsupportedFilterError[source]

Raised when a metadata filter uses an operator a backend cannot honor.

The canonical, backend-agnostic filter language lives in vd.filters (a MongoDB-style JSON dialect). When a filter uses an operator outside a backend’s documented subset — or one that does not exist at all — this is raised, so the caller can simplify the filter or drop to the backend’s native filter via the escape hatch (collection.native).

exception vd.VdError[source]

Base class for every error vd raises on its own behalf.

class vd.WindowSlice(start: datetime, end: datetime)[source]

A time window: [start, end).

vd.benchmark_insert(collection: Collection, n_documents: int = 100, *, text_length: int = 100, batch_size: int = 10) dict[str, Any][source]

Benchmark document insertion performance.

Parameters:
  • collection (Collection) – Collection to benchmark

  • n_documents (int, default 100) – Number of documents to insert

  • text_length (int, default 100) – Length of test documents

  • batch_size (int, default 10) – Batch size for insertion

Returns:

Benchmark results

Return type:

dict

Examples

>>> import vd
>>> client = vd.connect('memory')
>>> docs = client.create_collection('test')
>>> results = vd.benchmark_insert(docs, n_documents=50)

Benchmark search performance on a collection.

Parameters:
  • collection (Collection) – Collection to benchmark

  • query (str) – Query text to use

  • n_queries (int, default 100) – Number of queries to run

  • limit (int, default 10) – Number of results per query

Returns:

Benchmark results with: - total_time: Total time for all queries - avg_latency: Average query latency - min_latency: Minimum latency - max_latency: Maximum latency - p50, p95, p99: Latency percentiles - queries_per_second: Throughput

Return type:

dict

Examples

>>> import vd
>>> client = vd.connect('memory')
>>> docs = client.create_collection('test')
>>> # Add some documents...
>>> results = vd.benchmark_search(docs, "test query", n_queries=50)

Brute-force BM25 lexical search over a vd collection’s stored text.

Builds a throwaway BM25Index over collection and runs a single query against it. Used as the default lexical side of hybrid_search() when a collection does not implement SupportsHybrid.

Cost is O(N) in the collection size — fine for prototypes and collections up to ~100k documents. For repeated queries over the same collection, build a BM25Index once and call BM25Index.search() per query instead of calling this function in a loop — the term statistics are then computed once rather than on every call. For larger workloads, switch to a backend with native hybrid search (weaviate, elasticsearch, redis, …) or pass a custom lexical_search callable to hybrid_search() that consults a real text index.

Parameters:
  • collection (Collection) – Any vd Collection. Documents whose text is empty contribute zero score and are filtered out of the result.

  • query_text (str) – The lexical query.

  • limit (int) – Maximum number of results.

  • filter (dict, optional) – Canonical vd metadata filter. Applied client-side via vd.filters.matches_filter().

  • k1 (float) – BM25 hyperparameters. Defaults match the standard Okapi BM25.

  • b (float) – BM25 hyperparameters. Defaults match the standard Okapi BM25.

Returns:

Result dicts in the same shape as Collection.search(){"id", "text", "score", "metadata"} — sorted by descending score.

Return type:

list[dict]

Examples

>>> import vd
>>> c = vd.connect('memory').create_collection('t', dimension=2)
>>> c['a'] = vd.Document(id='a', text='the quick brown fox', vector=[1.0, 0.0])
>>> c['b'] = vd.Document(id='b', text='lazy dog sleeps', vector=[0.0, 1.0])
>>> hits = bm25_lexical_search(c, 'quick fox', limit=1)
>>> hits[0]['id']
'a'
vd.check_requirements(backend: str, *, verbose: bool = True) dict[str, Any][source]

Diagnose whether backend is ready to use, and say what to do if not.

Runs an installed-check plus archetype-specific checks (embedded / server / managed), then computes the single most useful next step.

Parameters:
  • backend (str) – A provider name (see vd.list_all_backends()).

  • verbose (bool) – Print a human-readable report (in addition to returning the dict).

Returns:

{"backend", "archetype", "ok", "checks", "next_step"} where checks is a list of {"name", "ok", "detail"} records.

Return type:

dict

Examples

>>> report = check_requirements('memory', verbose=False)
>>> report['ok']
True
vd.chunk_documents(documents: Iterator[tuple[str, str | dict]], chunk_size: int = 500, *, overlap: int = 50, strategy: str = 'chars', id_template: str = '{doc_id}_chunk_{chunk_num}', preserve_metadata: bool = True) Iterator[tuple[str, str, dict]][source]

Chunk multiple documents while preserving metadata.

Parameters:
  • documents (iterator of tuples) – Iterator of (doc_id, text) or (doc_id, text, metadata) tuples

  • chunk_size (int) – Size of each chunk

  • overlap (int) – Overlap between chunks

  • strategy (str) – Chunking strategy (see chunk_text)

  • id_template (str) – Template for chunk IDs. Can use {doc_id} and {chunk_num}

  • preserve_metadata (bool) – Whether to copy metadata to all chunks

Yields:

tuple – (chunk_id, chunk_text, metadata) tuples

Examples

>>> docs = [('doc1', 'Long text...', {'author': 'Alice'})]
>>> chunks = list(chunk_documents(docs, chunk_size=20))
>>> len(chunks) >= 1
True
vd.chunk_text(text: str, chunk_size: int = 500, *, overlap: int = 50, strategy: str = 'chars', preserve_sentences: bool = True) list[str][source]

Chunk text into smaller pieces.

Parameters:
  • text (str) – Text to chunk

  • chunk_size (int, default 500) – Target size of each chunk (in characters or tokens depending on strategy)

  • overlap (int, default 50) – Number of characters/tokens to overlap between chunks

  • strategy (str, default 'chars') – Chunking strategy: - ‘chars’: Character-based chunking - ‘words’: Word-based chunking - ‘sentences’: Sentence-based chunking - ‘paragraphs’: Paragraph-based chunking

  • preserve_sentences (bool, default True) – Try to avoid breaking sentences when using chars/words strategy

Returns:

List of text chunks

Return type:

list of str

Examples

>>> text = "This is sentence one. This is sentence two. This is sentence three."
>>> chunks = chunk_text(text, chunk_size=30, strategy='chars')
>>> len(chunks) >= 2
True
>>> chunks = chunk_text(text, strategy='sentences')
>>> len(chunks)
3
vd.clean_text(text: str, *, lowercase: bool = False, remove_extra_whitespace: bool = True, remove_urls: bool = False, remove_emails: bool = False, remove_numbers: bool = False, remove_punctuation: bool = False) str[source]

Clean and normalize text.

Parameters:
  • text (str) – Text to clean

  • lowercase (bool, default False) – Convert to lowercase

  • remove_extra_whitespace (bool, default True) – Collapse multiple spaces/newlines

  • remove_urls (bool, default False) – Remove URLs

  • remove_emails (bool, default False) – Remove email addresses

  • remove_numbers (bool, default False) – Remove numbers

  • remove_punctuation (bool, default False) – Remove punctuation

Returns:

Cleaned text

Return type:

str

Examples

>>> text = "Hello   World!  Visit https://example.com"
>>> clean_text(text, remove_urls=True)
'Hello World! Visit'
>>> clean_text(text, lowercase=True, remove_punctuation=True)
'hello world visit https examplecom'
vd.collection_stats(collection: Collection) dict[str, Any][source]

Compute comprehensive statistics for a collection.

Parameters:

collection (Collection) – Collection to analyze

Returns:

Statistics including: - total_documents: Number of documents - avg_text_length: Average text length in characters - min_text_length: Minimum text length - max_text_length: Maximum text length - total_chars: Total characters across all documents - metadata_fields: Set of all metadata fields used - metadata_field_counts: Count of documents with each metadata field - embedding_dimension: Dimension of embeddings (if available) - has_vectors: Number of documents with vectors

Return type:

dict

Examples

>>> import vd
>>> client = vd.connect('memory')
>>> docs = client.create_collection('test')
>>> docs['doc1'] = ("Hello", {'category': 'greeting'})
>>> stats = vd.collection_stats(docs)
>>> print(stats['total_documents'])
1
vd.compare_backends(names: list[str], *, characteristics: list[str] | None = None) dict[str, dict[str, Any]][source]

Return a {name: {characteristic: value}} table for the given providers.

vd.connect(backend: str, *, embedder: Callable[[str], list[float]] | None = None, **backend_kwargs) Client[source]

Connect to a vector database backend and return its Client.

This is the single entry point of vd. Switching vector databases is a one-argument change here.

Parameters:
  • backend (str) – Backend name: "memory", "chroma", "qdrant", "faiss", "lancedb", "sqlite_vec", "duckdb", "pgvector", "pinecone", … Run vd.list_backends() for what is installed.

  • embedder (callable, optional) – A text -> vector function. Supply it only if you want the convenience of passing raw text to collection[key] = "text" and collection.search("query text"). vd never embeds on its own — with no embedder, pass Document objects with vectors and pre-computed query vectors.

  • **backend_kwargs – Backend-specific connection options (persist_directory, url, api_key, path, …). See each adapter’s docstring.

Returns:

A connected client — a Mapping of collection name to collection.

Return type:

Client

Examples

>>> client = connect('memory')
>>> client = connect('chroma', persist_directory='./db')
>>> client = connect('qdrant', url='http://localhost:6333')
async vd.connect_async(backend: str, **kwargs) AsyncClient[source]

Async sibling of vd.connect().

Returns an AsyncClient. Today every backend goes through the universal AsyncClientWrapper (built on asyncio.to_thread()); Phase 2 follow-ups will plug in native async clients per backend, which connect_async() will return instead.

Parameters:
Returns:

A live async client. await once at session start:

client = await vd.connect_async("memory")

Return type:

AsyncClient

Examples

>>> import asyncio, vd
>>> async def go():
...     client = await vd.connect_async("memory")
...     col = await client.create_collection("docs", dimension=2)
...     await col.set("a", vd.Document(id="a", text="x", vector=[1.0, 0.0]))
...     return await col.count()
>>> asyncio.run(go())
1
vd.connect_from_config(path: str | Path | None = None, *, profile: str | None = None, apply_env: bool = True, embedder: Callable[[str], list[float]] | None = None, **overrides) Client[source]

Connect to a backend using configuration from a file.

Parameters:
  • path (str or Path, optional) – Path to configuration file. If not provided, searches for default config files.

  • profile (str, optional) – Profile name to use from configuration. Defaults to ‘default’ or the VD_PROFILE environment variable.

  • apply_env (bool, default True) – Whether to apply environment variable overrides

  • embedder (callable, optional) – Optional text -> vector convenience embedder, passed to vd.connect(). A vd config file describes the backend connection, not embedding — embedding stays the caller’s concern.

  • **overrides – Additional keyword arguments to override configuration values

Returns:

Connected client instance

Return type:

Client

Examples

>>> # With a config file
>>> client = connect_from_config('vd.yaml')
>>> # With a specific profile
>>> client = connect_from_config('vd.yaml', profile='production')
>>> # With environment variable VD_PROFILE=dev
>>> client = connect_from_config()
>>> # With overrides
>>> client = connect_from_config('vd.yaml', persist_directory='./data')
vd.copy_collection(source: tuple[str, str] | Collection, target: tuple[str, str, dict] | Collection, *, batch_size: int = 100, preserve_vectors: bool = True) dict[str, Any][source]

Copy a collection with flexible source/target specification.

Parameters:
  • source (tuple or Collection) – Either a Collection object or (backend_name, collection_name) tuple

  • target (tuple or Collection) – Either a Collection object or (backend_name, collection_name, config) tuple

  • batch_size (int) – Batch size for copying

  • preserve_vectors (bool) – Whether to preserve vectors

Returns:

Migration statistics

Return type:

dict

Examples

>>> import vd
>>> # Copy between backends
>>> stats = vd.copy_collection(
...     source=('memory', 'docs'),
...     target=('chroma', 'docs', {'persist_directory': './data'}),
... )
vd.cosine_similarity(vec1: list[float], vec2: list[float]) float[source]

Cosine similarity of two vectors (1.0 identical, 0.0 orthogonal).

Examples

>>> cosine_similarity([1.0, 0.0], [1.0, 0.0])
1.0
>>> cosine_similarity([1.0, 0.0], [0.0, 1.0])
0.0
vd.count_docs(docs: Iterable[Document]) int[source]

len reducer that also handles generator inputs.

>>> count_docs(iter([1, 2, 3]))
3
vd.create_example_config(format: str = 'yaml') str[source]

Generate an example configuration file content.

Parameters:

format (str, default 'yaml') – Format of configuration: ‘yaml’ or ‘toml’

Returns:

Example configuration as a string

Return type:

str

Examples

>>> yaml_config = create_example_config('yaml')
>>> print(yaml_config)
>>> toml_config = create_example_config('toml')
vd.deduplicate_results(results: Iterator[dict[str, Any]], *, key: str = 'id', keep: str = 'first') Iterator[dict[str, Any]][source]

Remove duplicate results.

Parameters:
  • results (iterator) – Search results

  • key (str, default 'id') – Field to check for duplicates

  • keep (str, default 'first') – Which duplicate to keep: ‘first’ or ‘highest_score’

Yields:

dict – Deduplicated results

Examples

>>> results = [
...     {'id': 'doc1', 'score': 0.9},
...     {'id': 'doc1', 'score': 0.8},
...     {'id': 'doc2', 'score': 0.7}
... ]
>>> unique = list(deduplicate_results(iter(results)))
>>> len(unique)
2
vd.euclidean_distance(vec1: list[float], vec2: list[float]) float[source]

Euclidean (L2) distance between two vectors.

Examples

>>> euclidean_distance([1.0, 0.0], [1.0, 0.0])
0.0
vd.export_collection(collection: Collection, output_path: str | Path, *, format: str = 'jsonl', **kwargs) int[source]

Export a collection to a file in the specified format.

Parameters:
  • collection (Collection) – Collection to export

  • output_path (str or Path) – Output file/directory path

  • format (str) – Export format: ‘jsonl’, ‘json’, ‘directory’

  • **kwargs – Additional format-specific options

Returns:

Number of documents exported

Return type:

int

Examples

>>> import vd
>>> client = vd.connect('memory')
>>> docs = client.create_collection('test')
>>> vd.export_collection(docs, 'backup.jsonl')
vd.export_to_directory(collection: Collection, output_dir: str | Path, *, include_vectors: bool = True) int[source]

Export collection as a directory with one JSON file per document.

Useful for version control and easy browsing.

Parameters:
  • collection (Collection) – Collection to export

  • output_dir (str or Path) – Output directory path

  • include_vectors (bool, default True) – Whether to include vectors

Returns:

Number of documents exported

Return type:

int

vd.export_to_json(collection: Collection, output_path: str | Path, *, include_vectors: bool = True, indent: int | None = 2) int[source]

Export a collection to JSON format.

Creates a JSON array of all documents.

Parameters:
  • collection (Collection) – Collection to export

  • output_path (str or Path) – Output file path

  • include_vectors (bool, default True) – Whether to include embedding vectors

  • indent (int, optional) – JSON indentation (None for compact)

Returns:

Number of documents exported

Return type:

int

vd.export_to_jsonl(collection: Collection, output_path: str | Path, *, include_vectors: bool = True) int[source]

Export a collection to JSONL (JSON Lines) format.

Each line is a JSON object representing a document.

Parameters:
  • collection (Collection) – Collection to export

  • output_path (str or Path) – Output file path

  • include_vectors (bool, default True) – Whether to include embedding vectors in the export

Returns:

Number of documents exported

Return type:

int

Examples

>>> import vd
>>> client = vd.connect('memory')
>>> docs = client.create_collection('test')
>>> docs['doc1'] = "Hello"
>>> vd.export_to_jsonl(docs, 'backup.jsonl')
1
vd.extract_metadata(text: str, *, extract_title: bool = True, extract_length: bool = True, extract_word_count: bool = True, extract_language: bool = False) dict[str, Any][source]

Extract metadata from text.

Parameters:
  • text (str) – Text to analyze

  • extract_title (bool) – Extract first line as title

  • extract_length (bool) – Add text length

  • extract_word_count (bool) – Add word count

  • extract_language (bool) – Detect language (requires langdetect)

Returns:

Extracted metadata

Return type:

dict

Examples

>>> text = "My Title\n\nThis is the content."
>>> meta = extract_metadata(text)
>>> meta['title']
'My Title'
>>> meta['char_count']
28
vd.find_duplicates(collection: Collection, *, threshold: float = 0.95, method: str = 'cosine') list[tuple[str, str, float]][source]

Find near-duplicate documents in a collection.

Parameters:
  • collection (Collection) – Collection to analyze

  • threshold (float, default 0.95) – Similarity threshold above which documents are considered duplicates

  • method (str, default 'cosine') – Similarity method: ‘cosine’ or ‘exact’

Returns:

List of (doc_id1, doc_id2, similarity) tuples for duplicates

Return type:

list of tuples

Examples

>>> import vd
>>> client = vd.connect('memory')
>>> docs = client.create_collection('test')
>>> docs['doc1'] = "Hello world"
>>> docs['doc2'] = "Hello world"
>>> duplicates = vd.find_duplicates(docs)
>>> len(duplicates) > 0
True
vd.find_outliers(collection: Collection, *, n_neighbors: int = 5, threshold: float = 0.3) list[tuple[str, float]][source]

Find outlier documents (those dissimilar to their neighbors).

Parameters:
  • collection (Collection) – Collection to analyze

  • n_neighbors (int, default 5) – Number of neighbors to consider

  • threshold (float, default 0.3) – Average similarity threshold below which a document is an outlier

Returns:

List of (doc_id, avg_similarity) for outliers

Return type:

list of tuples

Examples

>>> import vd
>>> client = vd.connect('memory')
>>> docs = client.create_collection('test')
>>> # Add some documents...
>>> outliers = vd.find_outliers(docs)
vd.get_backend_characteristics() dict[str, dict[str, Any]][source]

Return a compact {name: characteristics} map for comparison tooling.

vd.get_backend_info(name: str) dict[str, Any][source]

Return one provider’s metadata with installed/has_adapter flags.

vd.get_install_instructions(name: str) str[source]

Return a human-readable setup blurb for one provider.

vd.health_check_backend(backend_name: str, **config) dict[str, Any][source]

Check if a backend is healthy and accessible.

Parameters:
  • backend_name (str) – Backend name to check

  • **config – Backend-specific configuration

Returns:

Health report with keys: - status: ‘healthy’, ‘unhealthy’, or ‘unavailable’ - available: Whether backend is installed - registered: Whether backend is registered - message: Status message - details: Additional details (if connected successfully)

Return type:

dict

Examples

>>> import vd
>>> status = vd.health_check_backend('memory')
>>> print(status['status'])
'healthy'
vd.health_check_collection(collection: Collection) dict[str, Any][source]

Check collection health and compute basic stats.

Parameters:

collection (Collection) – Collection to check

Returns:

Health report

Return type:

dict

Examples

>>> import vd
>>> client = vd.connect('memory')
>>> docs = client.create_collection('test')
>>> status = vd.health_check_collection(docs)

Hybrid (dense + lexical) search that works on any vd Collection.

Dispatches to the collection’s native hybrid_search when it implements SupportsHybrid (efficient, server-side). Otherwise fuses the collection’s own dense search() with a client-side lexical scan (default: bm25_lexical_search()) via Reciprocal Rank Fusion.

The portable contract is RRF. Backend-specific knobs (weighted blend alpha, fusion-type variants, native ranker choices) are accepted via **kwargs and forwarded to the adapter when it has a native implementation; they are ignored by the client-side fallback.

Parameters:
  • collection (Collection) – Any vd Collection — native-hybrid or not.

  • query (str or list[float]) – Query text (embedded by the collection if it has an embedder) or a pre-computed query vector. When query is a vector, query_text is required.

  • query_text (str, optional) – Explicit text for the lexical side. Defaults to query when query is a string.

  • limit (int) – Number of fused results to return.

  • filter (dict, optional) – Canonical vd metadata filter, applied to both sub-searches.

  • k_dense (int, optional) – How many results to fetch from each sub-search before fusion. Default is max(4 * limit, 50) for each side. Widen for higher recall.

  • k_lexical (int, optional) – How many results to fetch from each sub-search before fusion. Default is max(4 * limit, 50) for each side. Widen for higher recall.

  • rrf_k (int) – Reciprocal Rank Fusion constant (typically 60).

  • lexical_search (callable, optional) – Custom lexical_search(collection, query_text, *, limit, filter, **kwargs) -> list[SearchResult]. Defaults to bm25_lexical_search(). Used only on the fallback path.

  • egress (callable, optional) – Per-result transform applied before yielding.

  • **kwargs – Extra options. On the native path they are forwarded to the adapter (e.g. alpha=0.7 on weaviate). On the fallback path they are ignored.

Yields:

dict – Fused result dicts. score is the RRF score on the fallback path, or the adapter’s fused score on the native path.

Examples

>>> import vd
>>> client = vd.connect('memory')
>>> col = client.create_collection('docs', dimension=2)
>>> col['a'] = vd.Document(id='a', text='cats purr',
...                        vector=[1.0, 0.0])
>>> col['b'] = vd.Document(id='b', text='dogs bark',
...                        vector=[0.0, 1.0])
>>> hits = list(vd.hybrid_search(col, [0.9, 0.1], query_text='cats',
...                              limit=1))
>>> hits[0]['id']
'a'
async vd.hybrid_search_async(collection: AsyncCollection, query: str | list[float], *, query_text: str | None = None, limit: int = 10, filter: dict[str, Any] | None = None, k_dense: int | None = None, k_lexical: int | None = None, rrf_k: int = 60, lexical_search: Callable[[...], list[dict[str, Any]]] | None = None, egress: Callable[[dict[str, Any]], Any] | None = None, **kwargs) AsyncIterator[dict[str, Any]][source]

Async sibling of vd.hybrid_search().

If the wrapped sync collection’s class supports native hybrid (i.e. satisfies SupportsHybrid), dispatches the whole fused call to a worker thread. Otherwise runs the universal client-side BM25 + RRF fallback in a worker thread too. In both cases the awaitable + async iterator interface stays uniform.

Parameters mirror vd.hybrid_search() exactly; see that function for the full docs.

Yields:

dict – Fused result dicts.

Examples

>>> import asyncio, vd
>>> async def go():
...     client = await vd.connect_async("memory")
...     col = await client.create_collection("docs", dimension=2)
...     await col.set("a", vd.Document(id="a", text="cats",
...                                    vector=[1.0, 0.0]))
...     await col.set("b", vd.Document(id="b", text="dogs",
...                                    vector=[0.0, 1.0]))
...     hits = []
...     async for h in vd.hybrid_search_async(col, [0.9, 0.1],
...                                           query_text="cats", limit=1):
...         hits.append(h["id"])
...     return hits
>>> asyncio.run(go())
['a']
vd.id_and_score(result: dict[str, Any]) tuple[str, float][source]

Egress: keep (id, score).

vd.id_only(result: dict[str, Any]) str[source]

Egress: keep only the document id.

vd.id_text_score(result: dict[str, Any]) tuple[str, str, float][source]

Egress: keep (id, text, score).

vd.import_collection(collection: Collection, input_path: str | Path, *, format: str | None = None, **kwargs) int[source]

Import documents into a collection from a file.

Parameters:
  • collection (Collection) – Collection to import into

  • input_path (str or Path) – Input file/directory path

  • format (str, optional) – Import format: ‘jsonl’, ‘json’, ‘directory’ If None, inferred from file extension

  • **kwargs – Additional format-specific options

Returns:

Number of documents imported

Return type:

int

Examples

>>> import vd
>>> client = vd.connect('memory')
>>> docs = client.create_collection('test')
>>> vd.import_collection(docs, 'backup.jsonl')
vd.import_from_directory(collection: Collection, input_dir: str | Path, *, batch_size: int = 100, skip_existing: bool = False, pattern: str = '*.json') int[source]

Import documents from a directory of JSON files.

Parameters:
  • collection (Collection) – Collection to import into

  • input_dir (str or Path) – Input directory path

  • batch_size (int, default 100) – Batch size for adding documents

  • skip_existing (bool, default False) – If True, skip documents with IDs that already exist

  • pattern (str, default ‘*.json’) – File pattern to match

Returns:

Number of documents imported

Return type:

int

vd.import_from_json(collection: Collection, input_path: str | Path, *, batch_size: int = 100, skip_existing: bool = False) int[source]

Import documents from JSON format into a collection.

Parameters:
  • collection (Collection) – Collection to import into

  • input_path (str or Path) – Input file path

  • batch_size (int, default 100) – Batch size for adding documents

  • skip_existing (bool, default False) – If True, skip documents with IDs that already exist

Returns:

Number of documents imported

Return type:

int

vd.import_from_jsonl(collection: Collection, input_path: str | Path, *, batch_size: int = 100, skip_existing: bool = False) int[source]

Import documents from JSONL format into a collection.

Parameters:
  • collection (Collection) – Collection to import into

  • input_path (str or Path) – Input file path

  • batch_size (int, default 100) – Batch size for adding documents

  • skip_existing (bool, default False) – If True, skip documents with IDs that already exist

Returns:

Number of documents imported

Return type:

int

Examples

>>> import vd
>>> client = vd.connect('memory')
>>> docs = client.create_collection('test')
>>> vd.import_from_jsonl(docs, 'backup.jsonl')
1
vd.install_backend(backend: str, *, run: bool = False) str[source]

Return (and optionally run) the pip install command for backend.

Parameters:
  • backend (str) – Provider name.

  • run (bool) – If True, actually invoke pip in the current interpreter. If False (the default), only return the command — the caller decides.

Returns:

The pip command (or a note that nothing is needed).

Return type:

str

vd.install_command(name: str) str[source]

Return the pip install command that makes name usable.

Examples

>>> install_command('qdrant')
'pip install qdrant-client'
>>> install_command('memory')
'memory needs no installation (built into vd)'
vd.list_all_backends() dict[str, dict[str, Any]][source]

Return every provider with live installed / has_adapter flags added.

vd.list_available_backends() list[str][source]

Return providers vd can connect() right now.

A backend is available iff its adapter module imported successfully — which happens only when its client library is installed. This is exactly the set of registered backends.

vd.list_backends() list[str][source]

Return the names of all backends with a registered (importable) adapter.

vd.load_config(path: str | Path | None = None, *, format: str | None = None) dict[source]

Load configuration from a file.

Automatically detects format from file extension if not specified.

Parameters:
  • path (str or Path, optional) – Path to configuration file. If not provided, looks for default config files in: ./vd.yaml, ./vd.yml, ./vd.toml, ~/.vd/config.yaml, etc.

  • format (str, optional) – Configuration format: ‘yaml’ or ‘toml’. Auto-detected from extension if not provided.

Returns:

Configuration dictionary

Return type:

dict

Examples

>>> config = load_config('vd.yaml')
>>> config = load_config('vd.toml')
>>> config = load_config()  # Looks for default config files
vd.matches_filter(metadata: Mapping[str, Any], filter: dict[str, Any] | None) bool[source]

Return True if metadata satisfies the MongoDB-style filter.

An empty or None filter matches everything. Unknown operators raise UnsupportedFilterError — they never silently match.

Parameters:
  • metadata (Mapping) – A document’s metadata dict.

  • filter (dict or None) – A filter in the canonical vd dialect (see the module docstring).

Examples

>>> matches_filter({'year': 2024}, None)
True
>>> matches_filter({'year': 2024, 'cat': 'tech'},
...                {'year': {'$gte': 2020}, 'cat': 'tech'})
True
>>> matches_filter({'views': 50}, {'views': {'$gte': 10, '$lte': 100}})
True
vd.mean_vector(docs: Iterable[Document]) list[float] | None[source]

Element-wise mean of document embeddings. None if empty / no vectors.

>>> from vd.base import Document
>>> mean_vector([
...     Document(id='a', text='', vector=[1.0, 2.0]),
...     Document(id='b', text='', vector=[3.0, 4.0]),
... ])
[2.0, 3.0]
>>> mean_vector([]) is None
True
vd.metadata_distribution(collection: Collection, field: str, *, top_n: int | None = None) dict[Any, int][source]

Get the distribution of values for a metadata field.

Parameters:
  • collection (Collection) – Collection to analyze

  • field (str) – Metadata field name

  • top_n (int, optional) – If specified, return only the top N most common values

Returns:

Mapping of field values to their counts

Return type:

dict

Examples

>>> import vd
>>> client = vd.connect('memory')
>>> docs = client.create_collection('test')
>>> docs['doc1'] = ("Hello", {'category': 'A'})
>>> docs['doc2'] = ("World", {'category': 'A'})
>>> docs['doc3'] = ("Test", {'category': 'B'})
>>> dist = vd.metadata_distribution(docs, 'category')
>>> print(dist)
{'A': 2, 'B': 1}
vd.migrate_client(source_client: Client, target_client: Client, *, collection_names: list[str] | None = None, batch_size: int = 100, preserve_vectors: bool = True, progress_callback: Callable[[str, int, int], None] | None = None) dict[str, Any][source]

Migrate all (or selected) collections from one client to another.

Parameters:
  • source_client (Client) – Source database client

  • target_client (Client) – Target database client

  • collection_names (list of str, optional) – Specific collections to migrate. If None, migrates all.

  • batch_size (int, default 100) – Batch size for migration

  • preserve_vectors (bool, default True) – Whether to preserve vectors

  • progress_callback (callable, optional) – Function called with (collection_name, current, total)

Returns:

Overall migration statistics

Return type:

dict

Examples

>>> import vd
>>> source = vd.connect('memory')
>>> target = vd.connect('chroma', persist_directory='./backup')
>>> stats = vd.migrate_client(source, target)
vd.migrate_collection(source_collection: Collection, target_collection: Collection, *, batch_size: int = 100, preserve_vectors: bool = True, progress_callback: Callable[[int, int], None] | None = None, skip_existing: bool = False) dict[str, Any][source]

Migrate a collection from one backend to another.

Parameters:
  • source_collection (Collection) – Source collection to migrate from

  • target_collection (Collection) – Target collection to migrate to

  • batch_size (int, default 100) – Number of documents to migrate per batch

  • preserve_vectors (bool, default True) – Whether to preserve pre-computed vectors

  • progress_callback (callable, optional) – Function called with (current, total) to report progress

  • skip_existing (bool, default False) – If True, skip documents that already exist in target

Returns:

Migration statistics with keys: - total: Total documents in source - migrated: Number of documents migrated - skipped: Number of documents skipped - failed: Number of failures - errors: List of error messages

Return type:

dict

Examples

>>> import vd
>>> # Create source and target
>>> source_client = vd.connect('memory')
>>> target_client = vd.connect('chroma', persist_directory='./data')
>>> source = source_client.get_collection('my_docs')
>>> target = target_client.create_collection('my_docs')
>>>
>>> # Migrate
>>> stats = vd.migrate_collection(source, target)
>>> print(f"Migrated {stats['migrated']} documents")

Search with multiple queries and combine results.

Parameters:
  • collection (Collection) – Collection to search

  • queries (list of str) – Multiple query strings

  • limit (int, default 10) – Total number of results to return

  • combine (str, default 'interleave') – How to combine results: - ‘interleave’: Interleave results from each query - ‘concatenate’: Concatenate all results - ‘union’: Remove duplicates across queries - ‘best’: Take best results across all queries

  • filter (dict, optional) – Metadata filter

  • **kwargs – Additional search options

Yields:

dict – Search results

Examples

>>> import vd
>>> client = vd.connect('memory')
>>> docs = client.create_collection('test')
>>> results = vd.multi_query_search(
...     docs,
...     ["What is AI?", "How does ML work?"],
...     limit=10
... )
vd.normalize_document_input(doc_input: str | tuple | Document, *, auto_id: bool = True) Document[source]

Normalize a flexible document input to a Document.

Accepted shapes: a Document; a str (just text); a tuple (text, id), (text, metadata), or (text, id, metadata).

Parameters:
  • doc_input (DocumentInput) – The input to normalize.

  • auto_id (bool) – When the input carries no id, generate one (vs. leaving it empty).

Examples

>>> normalize_document_input(("Hello", "doc1")).id
'doc1'
>>> normalize_document_input(("Hello", {"k": "v"})).metadata
{'k': 'v'}
>>> normalize_document_input("Hello world").id.startswith('doc_')
True
vd.normalize_whitespace(text: str) str[source]

Normalize whitespace in text.

Replaces tabs, multiple spaces, and multiple newlines with single versions.

Parameters:

text (str) – Text to normalize

Returns:

Normalized text

Return type:

str

Examples

>>> normalize_whitespace("Hello\t\tWorld  \n\n\nTest")
'Hello World \nTest'
vd.parse_window(window: str | timedelta | int | float) timedelta[source]

Parse a window spec into a timedelta.

Strings use a trailing unit char: "1d", "4h", "30m", "15s", "1w". Numbers are treated as seconds. A timedelta is returned as-is.

>>> parse_window('1d') == timedelta(days=1)
True
>>> parse_window('4h') == timedelta(hours=4)
True
>>> parse_window(3600) == timedelta(hours=1)
True
vd.print_backends_table() None[source]

Print every known vector database, grouped by deployment archetype.

vd.print_comparison(names: list[str]) None[source]

Print a side-by-side comparison table of the given providers.

vd.print_recommendation(**kwargs) None[source]

Run recommend_backend() and print the recommendation readably.

vd.provider(name: str) dict[str, Any] | None[source]

Return one provider’s metadata, or None if name is unknown.

vd.providers() dict[str, dict[str, Any]][source]

Return the full provider registry as {name: metadata}.

vd.reciprocal_rank_fusion(result_lists: list[list[dict[str, Any]]], *, k: int = 60) list[dict[str, Any]][source]

Combine multiple result lists using Reciprocal Rank Fusion.

RRF is a simple yet effective way to combine rankings from multiple sources.

Parameters:
  • result_lists (list of lists) – Multiple lists of search results

  • k (int, default 60) – Constant for RRF formula (typically 60)

Returns:

Combined and re-ranked results

Return type:

list

Examples

>>> results1 = [{'id': 'doc1', 'score': 0.9}, {'id': 'doc2', 'score': 0.8}]
>>> results2 = [{'id': 'doc2', 'score': 0.95}, {'id': 'doc3', 'score': 0.7}]
>>> combined = reciprocal_rank_fusion([results1, results2])
vd.recommend_backend(*, corpus_size: str = 'medium', persistence: bool = True, can_run_docker: bool = True, cloud_ok: bool = True, budget: str = 'free', existing_db: str | None = None, needs_hybrid: bool = False, air_gapped: bool = False) dict[str, Any][source]

Recommend a vector database from a few yes/no facts about the situation.

A direct encoding of the decision framework in the report’s §4. Returns a primary pick, a runner-up, and the reasoning trail.

Parameters:
  • corpus_size ({'tiny', 'small', 'medium', 'large', 'huge'}) – Rough vector count: tiny <100k, small <10M, medium ~10M, large <100M, huge >100M.

  • persistence (bool) – Must data survive a process restart?

  • can_run_docker (bool) – Can the user run Docker / operate a server process?

  • cloud_ok (bool) – Is a managed cloud service acceptable (vs. on-prem only)?

  • budget ({'free', 'paid'}) – Free-tier-only, or is paid acceptable?

  • existing_db ({'postgres', 'redis', 'elastic', 'mongo', 'sqlite', 'duckdb', None}) – A database the user already operates — strongly biases the pick.

  • needs_hybrid (bool) – Need keyword + vector ranking fused in one query?

  • air_gapped (bool) – Must run with zero network / zero telemetry?

Returns:

{"primary", "runner_up", "reasoning", "alternatives"}.

Return type:

dict

Examples

>>> rec = recommend_backend(corpus_size='tiny', persistence=False)
>>> rec['primary']
'memory'
>>> rec = recommend_backend(existing_db='postgres')
>>> rec['primary']
'pgvector'
vd.register_backend(name: str) Callable[[type], type][source]

Class decorator: register an adapter Client under name.

Examples

>>> from vd.base import AbstractClient
>>> @register_backend('example')
... class ExampleClient(AbstractClient):
...     ...
vd.sample_collection(collection: Collection, n: int, *, method: str = 'random', seed: int | None = None) list[str][source]

Sample document IDs from a collection.

Parameters:
  • collection (Collection) – Collection to sample from

  • n (int) – Number of documents to sample

  • method (str, default 'random') – Sampling method: ‘random’, ‘first’, ‘diverse’

  • seed (int, optional) – Random seed for reproducibility

Returns:

Sampled document IDs

Return type:

list of str

Examples

>>> import vd
>>> client = vd.connect('memory')
>>> docs = client.create_collection('test')
>>> # Add 100 documents...
>>> sample = vd.sample_collection(docs, 10, method='random')
>>> len(sample)
10
vd.save_config(config: dict, path: str | Path, *, format: str | None = None) None[source]

Save configuration to a file.

Parameters:
  • config (dict) – Configuration dictionary to save

  • path (str or Path) – Path to save configuration file

  • format (str, optional) – Format to save: ‘yaml’ or ‘toml’. Auto-detected from extension if not provided.

Examples

>>> config = {
...     'profiles': {
...         'dev': {'backend': 'memory'},
...         'prod': {'backend': 'chroma', 'persist_directory': './data'}
...     }
... }
>>> save_config(config, 'vd.yaml')
vd.search_similar_to_document(collection: Collection, doc_id: str, *, limit: int = 10, exclude_self: bool = True, filter: dict | None = None, **kwargs) Iterator[dict[str, Any]][source]

Find documents similar to a specific document.

Parameters:
  • collection (Collection) – Collection to search

  • doc_id (str) – ID of the reference document

  • limit (int, default 10) – Number of similar documents to return

  • exclude_self (bool, default True) – Whether to exclude the reference document from results

  • filter (dict, optional) – Metadata filter

  • **kwargs – Additional search options

Yields:

dict – Search results

Examples

>>> import vd
>>> client = vd.connect('memory')
>>> docs = client.create_collection('test')
>>> similar = vd.search_similar_to_document(docs, 'doc1', limit=5)
vd.setup_guide(backend: str) str[source]

Return a full, copy-pasteable setup playbook for backend.

Covers: the pip install, a Docker one-liner for server backends, the environment variables for managed backends, a verify command, and the relevant documentation links.

vd.skills_dir() Path[source]

Return the path to the bundled AI-agent skills directory.

vd.text_only(result: dict[str, Any]) str[source]

Egress: keep only the text. >>> text_only({'text': 'hi'}) -> 'hi'.

vd.to_datetime(ts: str | datetime | int | float) datetime[source]

Coerce a timestamp-like value into a tz-aware UTC datetime.

Accepts ISO-8601 strings (with or without timezone), date-only strings ("2025-03-13"), epoch seconds (int or float), and datetime objects. Naive datetimes / strings are assumed UTC.

>>> to_datetime('2025-03-13T09:00:00').isoformat()
'2025-03-13T09:00:00+00:00'
>>> to_datetime('2025-03-13').isoformat()
'2025-03-13T00:00:00+00:00'
>>> to_datetime(1741856400).isoformat()
'2025-03-13T09:00:00+00:00'
vd.to_iso(ts: str | datetime | int | float) str[source]

ISO-8601 (UTC) string suitable for cross-backend metadata storage.

>>> to_iso('2025-03-13T09:00:00')
'2025-03-13T09:00:00+00:00'
vd.truncate_text(text: str, max_length: int, *, suffix: str = '...') str[source]

Truncate text to maximum length.

Parameters:
  • text (str) – Text to truncate

  • max_length (int) – Maximum length

  • suffix (str) – Suffix to add to truncated text

Returns:

Truncated text

Return type:

str

Examples

>>> truncate_text("This is a long text", 10)
'This is...'
vd.validate_collection(collection: Collection) dict[str, Any][source]

Validate collection integrity and identify issues.

Parameters:

collection (Collection) – Collection to validate

Returns:

Validation report with: - valid: Whether collection is valid - issues: List of issue descriptions - warnings: List of warning messages - stats: Basic stats

Return type:

dict

Examples

>>> import vd
>>> client = vd.connect('memory')
>>> docs = client.create_collection('test')
>>> report = vd.validate_collection(docs)
>>> print(report['valid'])
True
vd.validate_filter(filter: dict[str, Any] | None, *, supported: Iterable[str] = frozenset({'$and', '$eq', '$exists', '$gt', '$gte', '$in', '$lt', '$lte', '$ne', '$nin', '$not', '$or'})) None[source]

Walk filter and raise UnsupportedFilterError on any operator that is unknown or not in supported.

Backends that translate the canonical filter to a native query call this with their own (possibly narrower) supported subset, so callers get a clear vd error up front instead of an opaque backend error later.

Parameters:
  • filter (dict or None) – A filter in the canonical vd dialect. None / empty is valid.

  • supported (iterable of str, optional) – The operator subset to allow. Defaults to every operator the language defines.

Examples

>>> validate_filter({'year': {'$gte': 2020}})            # ok, returns None
>>> validate_filter({'a': {'$regex': '.*'}})             # not in the language
Traceback (most recent call last):
    ...
vd.base.UnsupportedFilterError: Unknown filter operator '$regex'. ...
>>> validate_filter({'a': {'$exists': True}}, supported={'$eq'})
Traceback (most recent call last):
    ...
vd.base.UnsupportedFilterError: Filter operator '$exists' is not supported ...