Repository URL to install this package:
|
Version:
0.23.3 ▾
|
pylance
/
vector.py
|
|---|
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors
"""Embedding vector utilities"""
from __future__ import annotations
import re
import tempfile
from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Tuple, Union
import pyarrow as pa
from tqdm.auto import tqdm
from . import write_dataset
from .dependencies import (
_check_for_numpy,
torch,
)
from .dependencies import numpy as np
from .log import LOGGER
from .util import MetricType, _normalize_metric_type
if TYPE_CHECKING:
from pathlib import Path
from . import LanceDataset
def _normalize_vectors(vectors, ndim):
if ndim is None:
ndim = len(next(iter(vectors)))
values = np.array(vectors, dtype="float32").ravel()
return pa.FixedSizeListArray.from_arrays(values, list_size=ndim)
def _validate_ndim(values, ndim):
for v in values:
if ndim is None:
ndim = len(v)
else:
if ndim != len(v):
raise ValueError(f"Expected {ndim} dimensions but got {len(v)} for {v}")
return ndim
def vec_to_table(
data: Union[dict, list, np.ndarray],
names: Optional[Union[str, list]] = None,
ndim: Optional[int] = None,
check_ndim: bool = True,
) -> pa.Table:
"""
Create a pyarrow Table containing vectors.
Vectors are created as FixedSizeListArray's in pyarrow with Float32 values.
Examples
--------
>>> import numpy as np
>>> np.random.seed(0)
>>> from lance.vector import vec_to_table
>>> dd = {"vector0": np.random.randn(10), "vector1": np.random.randn(10)}
>>> vec_to_table(dd)
pyarrow.Table
id: string
vector: fixed_size_list<item: float>[10]
child 0, item: float
----
id: [["vector0","vector1"]]
vector: [[[1.7640524,0.4001572,0.978738,2.2408931,1.867558,-0.9772779,0.95008844,\
-0.1513572,-0.10321885,0.41059852],[0.14404356,1.4542735,0.7610377,\
0.121675014,0.44386324,0.33367434,1.4940791,-0.20515826,0.3130677,-0.85409576]]]
>>> vec_to_table(dd).to_pandas()
id vector
0 vector0 [1.7640524, 0.4001572, 0.978738, 2.2408931, 1....
1 vector1 [0.14404356, 1.4542735, 0.7610377, 0.121675014...
Parameters
----------
data: dict, list, or np.ndarray
If dict, the keys are added as "id" column
If list, then each element is assumed to be a vector
If ndarray, then each row is assumed to be a vector
names: str or list, optional
If data is dict, then names should be a list of 2 str; default ["id", "vector"]
If data is list or ndarray, then names should be str; default "vector"
ndim: int, optional
Number of dimensions of the vectors. Inferred if omitted.
check_ndim: bool, default True
Whether to verify that all vectors have the same length
Returns
-------
tbl: pa.Table
A pyarrow Table with vectors converted to appropriate types
"""
if isinstance(data, dict):
if names is None:
names = ["id", "vector"]
elif not isinstance(names, (list, tuple)) and len(names) == 2:
raise ValueError(
"If data is a dict, names must be a list or tuple of 2 strings"
)
values = list(data.values())
if check_ndim:
ndim = _validate_ndim(values, ndim)
vectors = _normalize_vectors(values, ndim)
ids = pa.array(data.keys())
arrays = [ids, vectors]
elif isinstance(data, list) or (
_check_for_numpy(data) and isinstance(data, np.ndarray)
):
if names is None:
names = ["vector"]
elif isinstance(names, str):
names = [names]
elif not isinstance(names, (list, tuple)) and len(names) == 1:
raise ValueError(f"names cannot be more than 1 got {len(names)}")
if check_ndim:
ndim = _validate_ndim(data, ndim)
vectors = _normalize_vectors(data, ndim)
arrays = [vectors]
else:
raise NotImplementedError(
f"data must be dict, list, or ndarray (require numpy installed), \
got {type(data)} instead"
)
return pa.Table.from_arrays(arrays, names=names)
CUDA_REGEX = re.compile(r"^cuda(:\d+)?$")
def train_pq_codebook_on_accelerator(
dataset: LanceDataset | Path | str,
metric_type: MetricType,
accelerator: Union[str, "torch.Device"],
num_sub_vectors: int,
batch_size: int = 1024 * 10 * 4,
) -> Tuple[np.ndarray, List[Any]]:
"""Use accelerator (GPU or MPS) to train pq codebook."""
from .torch.data import LanceDataset as TorchDataset
from .torch.kmeans import KMeans
metric_type = _normalize_metric_type(metric_type)
centroids_list = []
kmeans_list = []
field_names = [f"__residual_subvec_{i + 1}" for i in range(num_sub_vectors)]
sample_size = 256 * 256
ds_init = TorchDataset(
dataset,
batch_size=256,
columns=field_names,
samples=256,
)
init_centroids = next(iter(ds_init))
ds_fit = TorchDataset(
dataset,
batch_size=20480,
columns=field_names,
samples=sample_size,
cache=True,
)
for sub_vector in range(num_sub_vectors):
LOGGER.info("Training IVF partitions using GPU(%s)", accelerator)
if num_sub_vectors == 1:
# sampler has different behaviour with one column
init_centroids_slice = init_centroids
else:
init_centroids_slice = init_centroids[field_names[sub_vector]]
kmeans_local = KMeans(
256,
max_iters=50,
metric=metric_type,
device=accelerator,
centroids=init_centroids_slice,
)
if num_sub_vectors == 1:
kmeans_local.fit(ds_fit)
else:
kmeans_local.fit(ds_fit, column=field_names[sub_vector])
ivf_centroids_local = kmeans_local.centroids.cpu().numpy()
centroids_list.append(ivf_centroids_local)
kmeans_list.append(kmeans_local)
pq_codebook = np.stack(centroids_list)
return pq_codebook, kmeans_list
def train_ivf_centroids_on_accelerator(
dataset: LanceDataset,
column: str,
k: int,
metric_type: MetricType,
accelerator: Union[str, "torch.Device"],
batch_size: int = 1024 * 10 * 4,
*,
sample_rate: int = 256,
max_iters: int = 50,
filter_nan: bool = True,
) -> Tuple[np.ndarray, Any]:
"""Use accelerator (GPU or MPS) to train kmeans."""
from .torch.data import LanceDataset as TorchDataset
from .torch.kmeans import KMeans
metric_type = _normalize_metric_type(metric_type)
if isinstance(accelerator, str) and (
not (CUDA_REGEX.match(accelerator) or accelerator == "mps")
):
raise ValueError(
"Train ivf centroids on accelerator: "
+ f"only support 'cuda' or 'mps' as accelerator, got '{accelerator}'."
)
sample_size = k * sample_rate
k = int(k)
if dataset.schema.field(column).nullable and filter_nan:
filt = f"{column} is not null"
else:
filt = None
LOGGER.info("Randomly select %s centroids from %s (filt=%s)", k, dataset, filt)
ds = TorchDataset(
dataset,
batch_size=k,
columns=[column],
samples=sample_size,
filter=filt,
)
init_centroids = next(iter(ds))
LOGGER.info("Done sampling: centroids shape: %s", init_centroids.shape)
ds = TorchDataset(
dataset,
batch_size=20480,
columns=[column],
samples=sample_size,
filter=filt,
cache=True,
)
LOGGER.info("Training IVF partitions using GPU(%s)", accelerator)
kmeans = KMeans(
k,
max_iters=max_iters,
metric=metric_type,
device=accelerator,
centroids=init_centroids,
)
kmeans.fit(ds)
centroids = kmeans.centroids.cpu().numpy()
with tempfile.NamedTemporaryFile(delete=False) as f:
np.save(f, centroids)
LOGGER.info("Saved centroids to %s", f.name)
return centroids, kmeans
def compute_pq_codes(
dataset: LanceDataset,
kmeans_list: List[Any], # KMeans
batch_size: int = 1024 * 10 * 4,
dst_dataset_uri: Optional[Union[str, Path]] = None,
allow_cuda_tf32: bool = True,
) -> Tuple[Union[str, Path], List[str]]:
"""Compute pq codes for each row using GPU kmeans and spill to disk.
Parameters
----------
dataset: LanceDataset
Dataset to compute pq codes for.
kmeans_list: List[lance.torch.kmeans.KMeans]
KMeans models to use to compute pq (one per subspace)
batch_size: int, default 10240
The batch size used to read the dataset.
dst_dataset_uri: Union[str, Path], optional
The path to store the partitions. If not specified a random
directory is used instead
allow_tf32: bool, default True
Whether to allow tf32 for matmul on CUDA.
Returns
-------
Tuple[Union[str, Path], List[str]]
The absolute path of the pq codes dataset and shuffle buffers
"""
from .torch.data import LanceDataset as TorchDataset
torch.backends.cuda.matmul.allow_tf32 = allow_cuda_tf32
num_rows = dataset.count_rows()
num_sub_vectors = len(kmeans_list)
field_names = [f"__residual_subvec_{i + 1}" for i in range(num_sub_vectors)]
torch_ds = TorchDataset(
dataset,
batch_size=batch_size,
with_row_id=False,
columns=["row_id", "partition"] + field_names,
)
loader = torch.utils.data.DataLoader(
torch_ds,
batch_size=1,
pin_memory=True,
collate_fn=_collate_fn,
)
output_schema = pa.schema(
[
pa.field("_rowid", pa.uint64()),
pa.field("__ivf_part_id", pa.uint32()),
pa.field("__pq_code", pa.list_(pa.uint8(), list_size=num_sub_vectors)),
]
)
progress = tqdm(total=num_rows)
progress.set_description("Assigning PQ codes")
device = kmeans_list[0].device
def _pq_codes_assignment() -> Iterable[pa.RecordBatch]:
with torch.no_grad():
for batch in loader:
vecs_lists = [
batch[field_names[i]]
.to(device)
.reshape(-1, kmeans_list[i].centroids.shape[1])
for i in range(num_sub_vectors)
]
pq_codes = torch.stack(
[
kmeans_list[i].transform(vecs_lists[i])
for i in range(num_sub_vectors)
],
dim=1,
)
pq_codes = pq_codes.to(torch.uint8)
ids = batch["row_id"].reshape(-1)
partitions = batch["partition"].reshape(-1)
ids = ids.cpu()
partitions = partitions.cpu()
pq_codes = pq_codes.cpu()
pq_values = pa.array(pq_codes.numpy().reshape(-1))
pq_codes = pa.FixedSizeListArray.from_arrays(pq_values, num_sub_vectors)
part_batch = pa.RecordBatch.from_arrays(
[ids, partitions, pq_codes],
schema=output_schema,
)
progress.update(part_batch.num_rows)
yield part_batch
rbr = pa.RecordBatchReader.from_batches(output_schema, _pq_codes_assignment())
if dst_dataset_uri is None:
dst_dataset_uri = tempfile.mkdtemp()
ds = write_dataset(
rbr,
dst_dataset_uri,
schema=output_schema,
data_storage_version="legacy",
)
progress.close()
LOGGER.info("Saved precomputed pq_codes to %s", dst_dataset_uri)
shuffle_buffers = [
data_file.path for frag in ds.get_fragments() for data_file in frag.data_files()
]
return dst_dataset_uri, shuffle_buffers
def _collate_fn(batch):
return batch[0]
def compute_partitions(
dataset: LanceDataset,
column: str,
kmeans: Any, # KMeans
batch_size: int = 1024 * 10 * 4,
dst_dataset_uri: Optional[Union[str, Path]] = None,
allow_cuda_tf32: bool = True,
num_sub_vectors: Optional[int] = None,
filter_nan: bool = True,
sample_size: Optional[int] = None,
) -> str:
"""Compute partitions for each row using GPU kmeans and spill to disk.
Parameters
----------
dataset: LanceDataset
Dataset to compute partitions for.
column: str
Column name of the vector column.
kmeans: lance.torch.kmeans.KMeans
KMeans model to use to compute partitions.
batch_size: int, default 10240
The batch size used to read the dataset.
dst_dataset_uri: Union[str, Path], optional
The path to store the partitions. If not specified a random
directory is used instead
allow_tf32: bool, default True
Whether to allow tf32 for matmul on CUDA.
Returns
-------
str
The absolute path of the partition dataset.
"""
from .torch.data import LanceDataset as TorchDataset
torch.backends.cuda.matmul.allow_tf32 = allow_cuda_tf32
num_rows = dataset.count_rows()
if dataset.schema.field(column).nullable and filter_nan:
filt = f"{column} is not null"
else:
filt = None
torch_ds = TorchDataset(
dataset,
batch_size=batch_size,
with_row_id=True,
columns=[column],
samples=sample_size,
filter=filt,
)
loader = torch.utils.data.DataLoader(
torch_ds,
batch_size=1,
pin_memory=True,
collate_fn=_collate_fn,
)
dim = kmeans.centroids.shape[1]
fields = []
if num_sub_vectors is not None:
field_names = [f"__residual_subvec_{i + 1}" for i in range(num_sub_vectors)]
subvector_size = dim // num_sub_vectors
fields = [
pa.field(name, pa.list_(pa.float32(), list_size=subvector_size))
for name in field_names
]
output_schema = pa.schema(
[
pa.field("row_id", pa.uint64()),
pa.field("partition", pa.uint32()),
]
+ fields
)
progress = tqdm(total=num_rows)
if num_sub_vectors is not None:
progress.set_description("Assigning partitions and computing residuals")
else:
progress.set_description("Assigning partitions")
def _partition_assignment() -> Iterable[pa.RecordBatch]:
id_offset = 0
with torch.no_grad():
for batch in loader:
if sample_size is None:
vecs = batch[column]
ids = batch["_rowid"].reshape(-1)
else:
# No row ids with sampling
vecs = batch
ids = torch.arange(id_offset, id_offset + vecs.size(0))
id_offset += vecs.size(0)
vecs = vecs.to(kmeans.device).reshape(-1, kmeans.centroids.shape[1])
partitions = kmeans.transform(vecs)
# this is expected to be true, so just assert
assert vecs.shape[0] == ids.shape[0]
# Ignore any invalid vectors.
mask_gpu = partitions.isfinite()
mask = mask_gpu.cpu()
ids = ids[mask]
partitions = partitions[mask_gpu]
partitions = partitions.cpu()
split_columns = []
if num_sub_vectors is not None:
residual_vecs = vecs - kmeans.centroids[partitions]
for i in range(num_sub_vectors):
subvector_tensor = residual_vecs[
:, i * subvector_size : (i + 1) * subvector_size
]
subvector_arr = pa.array(
subvector_tensor.cpu().detach().numpy().reshape(-1)
)
subvector_fsl = pa.FixedSizeListArray.from_arrays(
subvector_arr, subvector_size
)
split_columns.append(subvector_fsl)
part_batch = pa.RecordBatch.from_arrays(
[
ids.numpy(),
partitions.numpy(),
]
+ split_columns,
schema=output_schema,
)
if len(part_batch) < len(ids):
LOGGER.warning(
"%s vectors are ignored during partition assignment",
len(part_batch) - len(ids),
)
progress.update(part_batch.num_rows)
yield part_batch
rbr = pa.RecordBatchReader.from_batches(output_schema, _partition_assignment())
if dst_dataset_uri is None:
dst_dataset_uri = tempfile.mkdtemp()
write_dataset(
rbr,
dst_dataset_uri,
schema=output_schema,
max_rows_per_file=dataset.count_rows(),
data_storage_version="stable",
)
progress.close()
LOGGER.info("Saved precomputed partitions to %s", dst_dataset_uri)
return str(dst_dataset_uri)
def one_pass_train_ivf_pq_on_accelerator(
dataset: LanceDataset,
column: str,
k: int,
metric_type: MetricType,
accelerator: Union[str, "torch.Device"],
num_sub_vectors: int,
batch_size: int = 1024 * 10 * 4,
*,
sample_rate: int = 256,
max_iters: int = 50,
filter_nan: bool = True,
):
metric_type = _normalize_metric_type(metric_type)
centroids, kmeans = train_ivf_centroids_on_accelerator(
dataset,
column,
k,
metric_type,
accelerator,
batch_size,
sample_rate=sample_rate,
max_iters=max_iters,
filter_nan=filter_nan,
)
dataset_residuals = compute_partitions(
dataset,
column,
kmeans,
batch_size,
num_sub_vectors=num_sub_vectors,
filter_nan=filter_nan,
sample_size=256 * 256,
)
pq_codebook, kmeans_list = train_pq_codebook_on_accelerator(
dataset_residuals, metric_type, accelerator, num_sub_vectors, batch_size
)
pq_codebook = pq_codebook.astype(dtype=centroids.dtype)
return centroids, kmeans, pq_codebook, kmeans_list
def one_pass_assign_ivf_pq_on_accelerator(
dataset: LanceDataset,
column: str,
metric_type: MetricType,
accelerator: Union[str, "torch.Device"],
ivf_kmeans: Any, # KMeans
pq_kmeans_list: List[Any], # List[KMeans]
dst_dataset_uri: Optional[Union[str, Path]] = None,
batch_size: int = 1024 * 10 * 4,
*,
filter_nan: bool = True,
allow_cuda_tf32: bool = True,
):
"""Compute partitions for each row using GPU kmeans and spill to disk.
Parameters
----------
Returns
-------
str
The absolute path of the ivfpq codes dataset, as precomputed partition buffers.
"""
from .torch.data import LanceDataset as TorchDataset
torch.backends.cuda.matmul.allow_tf32 = allow_cuda_tf32
num_rows = dataset.count_rows()
if dataset.schema.field(column).nullable and filter_nan:
filt = f"{column} is not null"
else:
filt = None
torch_ds = TorchDataset(
dataset,
batch_size=batch_size,
with_row_id=True,
columns=[column],
filter=filt,
)
loader = torch.utils.data.DataLoader(
torch_ds,
batch_size=1,
pin_memory=True,
collate_fn=_collate_fn,
)
num_sub_vectors = len(pq_kmeans_list)
dim = ivf_kmeans.centroids.shape[1]
subvector_size = dim // num_sub_vectors
output_schema = pa.schema(
[
pa.field("_rowid", pa.uint64()),
pa.field("__ivf_part_id", pa.uint32()),
pa.field("__pq_code", pa.list_(pa.uint8(), list_size=num_sub_vectors)),
]
)
progress = tqdm(total=num_rows)
progress.set_description("Assigning partitions and computing pq codes")
def _partition_and_pq_codes_assignment() -> Iterable[pa.RecordBatch]:
with torch.no_grad():
first_iter = True
for batch in loader:
vecs = (
batch[column]
.to(ivf_kmeans.device)
.reshape(-1, ivf_kmeans.centroids.shape[1])
)
partitions = ivf_kmeans.transform(vecs)
ids = batch["_rowid"].reshape(-1)
# this is expected to be true, so just assert
assert vecs.shape[0] == ids.shape[0]
# Ignore any invalid vectors.
mask_gpu = partitions.isfinite()
ids = ids.to(ivf_kmeans.device)[mask_gpu].cpu().reshape(-1)
partitions = partitions[mask_gpu].cpu()
vecs = vecs[mask_gpu]
residual_vecs = vecs - ivf_kmeans.centroids[partitions]
# cast centroids to the same dtype as vecs
if first_iter:
first_iter = False
LOGGER.info("Residual shape: %s", residual_vecs.shape)
for kmeans in pq_kmeans_list:
cents: torch.Tensor = kmeans.centroids
kmeans.centroids = cents.to(
dtype=vecs.dtype, device=ivf_kmeans.device
)
pq_codes = torch.stack(
[
pq_kmeans_list[i].transform(
residual_vecs[
:, i * subvector_size : (i + 1) * subvector_size
]
)
for i in range(num_sub_vectors)
],
dim=1,
)
pq_codes = pq_codes.to(torch.uint8)
pq_values = pa.array(pq_codes.cpu().numpy().reshape(-1))
pq_codes = pa.FixedSizeListArray.from_arrays(pq_values, num_sub_vectors)
part_batch = pa.RecordBatch.from_arrays(
[ids, partitions, pq_codes],
schema=output_schema,
)
if len(part_batch) < len(ids):
LOGGER.warning(
"%s vectors are ignored during partition assignment",
len(part_batch) - len(ids),
)
progress.update(part_batch.num_rows)
yield part_batch
rbr = pa.RecordBatchReader.from_batches(
output_schema, _partition_and_pq_codes_assignment()
)
if dst_dataset_uri is None:
dst_dataset_uri = tempfile.mkdtemp()
ds = write_dataset(
rbr,
dst_dataset_uri,
schema=output_schema,
data_storage_version="legacy",
)
progress.close()
LOGGER.info("Saved precomputed pq_codes to %s", dst_dataset_uri)
shuffle_buffers = [
data_file.path()
for frag in ds.get_fragments()
for data_file in frag.data_files()
]
return dst_dataset_uri, shuffle_buffers