1
0
mirror of https://github.com/tensorchord/pgvecto.rs.git synced 2025-07-30 19:23:05 +03:00

merge 'main' into 'feat/binary-vector'

Signed-off-by: Mingzhuo Yin <yinmingzhuo@gmail.com>
This commit is contained in:
Mingzhuo Yin
2024-02-20 17:02:04 +08:00
93 changed files with 1733 additions and 2809 deletions

41
crates/base/Cargo.toml Normal file
View File

@ -0,0 +1,41 @@
[package]
name = "base"
version.workspace = true
edition.workspace = true
[dependencies]
bincode.workspace = true
bitvec.workspace = true
bytemuck.workspace = true
byteorder.workspace = true
half.workspace = true
libc.workspace = true
log.workspace = true
memmap2.workspace = true
num-traits.workspace = true
rand.workspace = true
rustix.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
uuid.workspace = true
validator.workspace = true
c = { path = "../c" }
detect = { path = "../detect" }
crc32fast = "1.4.0"
crossbeam = "0.8.4"
dashmap = "5.5.3"
parking_lot = "0.12.1"
rayon = "1.8.1"
arc-swap = "1.6.0"
multiversion = "0.7.3"
[lints]
clippy.derivable_impls = "allow"
clippy.len_without_is_empty = "allow"
clippy.needless_range_loop = "allow"
clippy.too_many_arguments = "allow"
rust.internal_features = "allow"
rust.unsafe_op_in_unsafe_fn = "forbid"
rust.unused_lifetimes = "warn"
rust.unused_qualifications = "warn"

95
crates/base/src/error.rs Normal file
View File

@ -0,0 +1,95 @@
use serde::{Deserialize, Serialize};
use thiserror::Error;
// control plane
#[must_use]
#[derive(Debug, Clone, Error, Serialize, Deserialize)]
pub enum CreateError {
#[error("Index of given name already exists.")]
Exist,
#[error("Invalid index options.")]
InvalidIndexOptions { reason: String },
}
#[must_use]
#[derive(Debug, Clone, Error, Serialize, Deserialize)]
pub enum DropError {
#[error("Index not found.")]
NotExist,
}
// data plane
#[must_use]
#[derive(Debug, Clone, Error, Serialize, Deserialize)]
pub enum FlushError {
#[error("Index not found.")]
NotExist,
#[error("Maintenance should be done.")]
Upgrade,
}
#[must_use]
#[derive(Debug, Clone, Error, Serialize, Deserialize)]
pub enum InsertError {
#[error("Index not found.")]
NotExist,
#[error("Maintenance should be done.")]
Upgrade,
#[error("Invalid vector.")]
InvalidVector,
}
#[must_use]
#[derive(Debug, Clone, Error, Serialize, Deserialize)]
pub enum DeleteError {
#[error("Index not found.")]
NotExist,
#[error("Maintenance should be done.")]
Upgrade,
}
#[must_use]
#[derive(Debug, Clone, Error, Serialize, Deserialize)]
pub enum BasicError {
#[error("Index not found.")]
NotExist,
#[error("Maintenance should be done.")]
Upgrade,
#[error("Invalid vector.")]
InvalidVector,
#[error("Invalid search options.")]
InvalidSearchOptions { reason: String },
}
#[must_use]
#[derive(Debug, Clone, Error, Serialize, Deserialize)]
pub enum VbaseError {
#[error("Index not found.")]
NotExist,
#[error("Maintenance should be done.")]
Upgrade,
#[error("Invalid vector.")]
InvalidVector,
#[error("Invalid search options.")]
InvalidSearchOptions { reason: String },
}
#[must_use]
#[derive(Debug, Clone, Error, Serialize, Deserialize)]
pub enum ListError {
#[error("Index not found.")]
NotExist,
#[error("Maintenance should be done.")]
Upgrade,
}
#[must_use]
#[derive(Debug, Clone, Error, Serialize, Deserialize)]
pub enum StatError {
#[error("Index not found.")]
NotExist,
#[error("Maintenance should be done.")]
Upgrade,
}

8
crates/base/src/lib.rs Normal file
View File

@ -0,0 +1,8 @@
#![feature(core_intrinsics)]
#![feature(pointer_is_aligned)]
pub mod error;
pub mod scalar;
pub mod search;
pub mod sys;
pub mod vector;

View File

@ -1,4 +1,4 @@
use crate::prelude::global::FloatCast;
use super::FloatCast;
use half::f16;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;

View File

@ -1,4 +1,4 @@
use crate::prelude::global::FloatCast;
use super::FloatCast;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::fmt::{Debug, Display};

View File

@ -0,0 +1,16 @@
mod f16;
mod f32;
pub use f16::F16;
pub use f32::F32;
pub trait FloatCast: Sized {
fn from_f32(x: f32) -> Self;
fn to_f32(self) -> f32;
fn from_f(x: F32) -> Self {
Self::from_f32(x.0)
}
fn to_f(self) -> F32 {
F32(Self::to_f32(self))
}
}

View File

@ -1,4 +1,4 @@
use crate::prelude::F32;
use crate::scalar::F32;
pub type Payload = u64;

View File

@ -1,4 +1,5 @@
use crate::prelude::*;
use super::Vector;
use crate::scalar::F32;
use bitvec::{slice::BitSlice, vec::BitVec};
use serde::{Deserialize, Serialize};

View File

@ -0,0 +1,21 @@
mod binary;
mod sparse_f32;
pub use binary::{BinaryVec, BinaryVecRef};
pub use sparse_f32::{SparseF32, SparseF32Ref};
pub trait Vector {
fn dims(&self) -> u16;
}
impl<T> Vector for Vec<T> {
fn dims(&self) -> u16 {
self.len().try_into().unwrap()
}
}
impl<'a, T> Vector for &'a [T] {
fn dims(&self) -> u16 {
self.len().try_into().unwrap()
}
}

View File

@ -1,4 +1,6 @@
use crate::prelude::*;
use super::Vector;
use crate::scalar::F32;
use num_traits::Zero;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@ -4,9 +4,9 @@ version.workspace = true
edition.workspace = true
[dev-dependencies]
half = { version = "~2.3", features = ["use-intrinsics", "rand_distr"] }
half.workspace = true
rand.workspace = true
detect = { path = "../detect" }
rand = "0.8.5"
[build-dependencies]
cc = "1.0"

View File

@ -4,5 +4,5 @@ version.workspace = true
edition.workspace = true
[dependencies]
std_detect = { git = "https://github.com/tensorchord/stdarch.git", branch = "avx512fp16" }
rustix.workspace = true
std_detect = { git = "https://github.com/tensorchord/stdarch.git", branch = "avx512fp16" }

View File

@ -0,0 +1,16 @@
[package]
name = "interprocess_atomic_wait"
version.workspace = true
edition.workspace = true
[dependencies]
libc.workspace = true
[target.'cfg(target_os = "macos")'.dependencies]
ulock-sys = "0.1.0"
[lints]
rust.internal_features = "allow"
rust.unsafe_op_in_unsafe_fn = "forbid"
rust.unused_lifetimes = "warn"
rust.unused_qualifications = "warn"

View File

@ -0,0 +1,91 @@
use std::sync::atomic::AtomicU32;
use std::time::Duration;
#[cfg(target_os = "linux")]
#[inline(always)]
pub fn wait(futex: &AtomicU32, value: u32, timeout: Duration) {
let timeout = libc::timespec {
tv_sec: i64::try_from(timeout.as_secs()).expect("Timeout is overflow."),
tv_nsec: timeout.subsec_nanos().into(),
};
unsafe {
libc::syscall(
libc::SYS_futex,
futex.as_ptr(),
libc::FUTEX_WAIT,
value,
&timeout,
);
}
}
#[cfg(target_os = "linux")]
#[inline(always)]
pub fn wake(futex: &AtomicU32) {
unsafe {
libc::syscall(libc::SYS_futex, futex.as_ptr(), libc::FUTEX_WAKE, i32::MAX);
}
}
#[cfg(target_os = "macos")]
#[inline(always)]
pub fn wait(futex: &AtomicU32, value: u32, timeout: Duration) {
let timeout = u32::try_from(timeout.as_millis()).expect("Timeout is overflow.");
unsafe {
// https://github.com/apple-oss-distributions/xnu/blob/main/bsd/kern/sys_ulock.c#L531
ulock_sys::__ulock_wait(
ulock_sys::darwin19::UL_COMPARE_AND_WAIT_SHARED,
futex.as_ptr().cast(),
value as _,
timeout,
);
}
}
#[cfg(target_os = "macos")]
#[inline(always)]
pub fn wake(futex: &AtomicU32) {
unsafe {
ulock_sys::__ulock_wake(
ulock_sys::darwin19::UL_COMPARE_AND_WAIT_SHARED,
futex.as_ptr().cast(),
0,
);
}
}
#[cfg(target_os = "freebsd")]
#[inline(always)]
pub fn wait(futex: &AtomicU32, value: u32, timeout: Duration) {
let ptr: *const AtomicU32 = futex;
let mut timeout = libc::timespec {
tv_sec: i64::try_from(timeout.as_secs()).expect("Timeout is overflow."),
tv_nsec: timeout.subsec_nanos().into(),
};
unsafe {
// https://github.com/freebsd/freebsd-src/blob/main/sys/kern/kern_umtx.c#L3943
// https://github.com/freebsd/freebsd-src/blob/main/sys/kern/kern_umtx.c#L3836
libc::_umtx_op(
ptr as *mut libc::c_void,
libc::UMTX_OP_WAIT_UINT,
value as libc::c_ulong,
std::mem::size_of_val(&timeout) as *mut std::ffi::c_void,
std::ptr::addr_of_mut!(timeout).cast(),
);
};
}
#[cfg(target_os = "freebsd")]
#[inline(always)]
pub fn wake(futex: &AtomicU32) {
let ptr: *const AtomicU32 = futex;
unsafe {
libc::_umtx_op(
ptr as *mut libc::c_void,
libc::UMTX_OP_WAKE,
i32::MAX as libc::c_ulong,
core::ptr::null_mut(),
core::ptr::null_mut(),
);
};
}

15
crates/memfd/Cargo.toml Normal file
View File

@ -0,0 +1,15 @@
[package]
name = "memfd"
version.workspace = true
edition.workspace = true
[dependencies]
rand.workspace = true
rustix.workspace = true
detect = { path = "../detect" }
[lints]
rust.internal_features = "allow"
rust.unsafe_op_in_unsafe_fn = "forbid"
rust.unused_lifetimes = "warn"
rust.unused_qualifications = "warn"

70
crates/memfd/src/lib.rs Normal file
View File

@ -0,0 +1,70 @@
use std::os::fd::OwnedFd;
#[cfg(target_os = "linux")]
pub fn memfd_create() -> std::io::Result<OwnedFd> {
if detect::linux::detect_memfd() {
use rustix::fs::MemfdFlags;
Ok(rustix::fs::memfd_create(
format!(".memfd.MEMFD.{:x}", std::process::id()),
MemfdFlags::empty(),
)?)
} else {
use rustix::fs::Mode;
use rustix::fs::OFlags;
// POSIX fcntl locking do not support shmem, so we use a regular file here.
// reference: https://man7.org/linux/man-pages/man3/fcntl.3p.html
// However, Linux shmem supports fcntl locking.
let name = format!(
".shm.MEMFD.{:x}.{:x}",
std::process::id(),
rand::random::<u32>()
);
let fd = rustix::fs::open(
&name,
OFlags::RDWR | OFlags::CREATE | OFlags::EXCL,
Mode::RUSR | Mode::WUSR,
)?;
rustix::fs::unlink(&name)?;
Ok(fd)
}
}
#[cfg(target_os = "macos")]
pub fn memfd_create() -> std::io::Result<OwnedFd> {
use rustix::fs::Mode;
use rustix::fs::OFlags;
// POSIX fcntl locking do not support shmem, so we use a regular file here.
// reference: https://man7.org/linux/man-pages/man3/fcntl.3p.html
let name = format!(
".shm.MEMFD.{:x}.{:x}",
std::process::id(),
rand::random::<u32>()
);
let fd = rustix::fs::open(
&name,
OFlags::RDWR | OFlags::CREATE | OFlags::EXCL,
Mode::RUSR | Mode::WUSR,
)?;
rustix::fs::unlink(&name)?;
Ok(fd)
}
#[cfg(target_os = "freebsd")]
pub fn memfd_create() -> std::io::Result<OwnedFd> {
use rustix::fs::Mode;
use rustix::fs::OFlags;
// POSIX fcntl locking do not support shmem, so we use a regular file here.
// reference: https://man7.org/linux/man-pages/man3/fcntl.3p.html
let name = format!(
".shm.MEMFD.{:x}.{:x}",
std::process::id(),
rand::random::<u32>()
);
let fd = rustix::fs::open(
&name,
OFlags::RDWR | OFlags::CREATE | OFlags::EXCL,
Mode::RUSR | Mode::WUSR,
)?;
rustix::fs::unlink(&name)?;
Ok(fd)
}

15
crates/send_fd/Cargo.toml Normal file
View File

@ -0,0 +1,15 @@
[package]
name = "send_fd"
version.workspace = true
edition.workspace = true
[dependencies]
libc.workspace = true
log.workspace = true
rustix.workspace = true
[lints]
rust.internal_features = "allow"
rust.unsafe_op_in_unsafe_fn = "forbid"
rust.unused_lifetimes = "warn"
rust.unused_qualifications = "warn"

79
crates/send_fd/src/lib.rs Normal file
View File

@ -0,0 +1,79 @@
use rustix::fd::BorrowedFd;
use rustix::fd::{AsFd, OwnedFd};
use rustix::net::{RecvAncillaryBuffer, RecvAncillaryMessage, RecvFlags};
use rustix::net::{SendAncillaryBuffer, SendAncillaryMessage, SendFlags};
use std::io::{IoSlice, IoSliceMut};
use std::os::unix::net::UnixStream;
#[repr(C)]
pub struct SendFd {
tx: OwnedFd,
rx: OwnedFd,
}
impl SendFd {
pub fn new() -> std::io::Result<Self> {
let (tx, rx) = UnixStream::pair()?;
Ok(Self {
tx: tx.into(),
rx: rx.into(),
})
}
pub fn recv(&self) -> std::io::Result<OwnedFd> {
let rx = self.rx.as_fd();
recv_fd(rx)
}
pub fn send(&self, fd: BorrowedFd<'_>) -> std::io::Result<()> {
let tx = self.tx.as_fd();
send_fd(tx, fd)?;
Ok(())
}
}
fn send_fd(tx: BorrowedFd<'_>, fd: BorrowedFd<'_>) -> std::io::Result<()> {
let fds = [fd];
let mut buffer = AncillaryBuffer([0u8; rustix::cmsg_space!(ScmRights(1))]);
let mut control = SendAncillaryBuffer::new(&mut buffer.0);
let pushed = control.push(SendAncillaryMessage::ScmRights(&fds));
assert!(pushed);
let ios = IoSlice::new(&[b'$']);
rustix::net::sendmsg(tx, &[ios], &mut control, SendFlags::empty())?;
Ok(())
}
fn recv_fd(rx: BorrowedFd<'_>) -> std::io::Result<OwnedFd> {
loop {
let mut buffer = AncillaryBuffer([0u8; rustix::cmsg_space!(ScmRights(1))]);
let mut control = RecvAncillaryBuffer::new(&mut buffer.0);
let mut buffer_ios = [b'.'];
let ios = IoSliceMut::new(&mut buffer_ios);
let returned = rustix::net::recvmsg(rx, &mut [ios], &mut control, RecvFlags::empty())?;
if returned.flags.bits() & libc::MSG_CTRUNC as u32 != 0 {
log::warn!("Ancillary is truncated.");
}
// it's impossible for a graceful shutdown since we opened the other end
assert_eq!(returned.bytes, 1);
assert_eq!(buffer_ios[0], b'$');
let mut fds = vec![];
for message in control.drain() {
match message {
RecvAncillaryMessage::ScmRights(iter) => {
fds.extend(iter);
}
_ => {
// impossible to receive other than one file descriptor since we do not send
unreachable!()
}
}
}
// it's impossible for more than one file descriptor since the buffer can only contain one
assert!(fds.len() <= 1);
if let Some(fd) = fds.pop() {
return Ok(fd);
}
log::warn!("Ancillary is expected.");
}
}
#[repr(C, align(32))]
struct AncillaryBuffer([u8; rustix::cmsg_space!(ScmRights(1))]);

View File

@ -4,37 +4,33 @@ version.workspace = true
edition.workspace = true
[dependencies]
bincode.workspace = true
bitvec.workspace = true
bytemuck.workspace = true
byteorder.workspace = true
half.workspace = true
libc.workspace = true
log.workspace = true
serde.workspace = true
serde_json.workspace = true
validator.workspace = true
rustix.workspace = true
thiserror.workspace = true
byteorder.workspace = true
bincode.workspace = true
half.workspace = true
memmap2.workspace = true
num-traits.workspace = true
rand.workspace = true
bytemuck.workspace = true
bitvec.workspace = true
rustix.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
uuid.workspace = true
validator.workspace = true
base = { path = "../base" }
c = { path = "../c" }
detect = { path = "../detect" }
crc32fast = "1.3.2"
crossbeam = "0.8.2"
dashmap = "5.4.0"
crc32fast = "1.4.0"
crossbeam = "0.8.4"
dashmap = "5.5.3"
parking_lot = "0.12.1"
memoffset = "0.9.0"
arrayvec = { version = "0.7.3", features = ["serde"] }
memmap2 = "0.9.0"
rayon = "1.6.1"
uuid = { version = "1.6.1", features = ["v4", "serde"] }
rayon = "1.8.1"
arc-swap = "1.6.0"
multiversion = "0.7.3"
[target.'cfg(target_os = "macos")'.dependencies]
ulock-sys = "0.1.0"
[lints]
clippy.derivable_impls = "allow"
clippy.len_without_is_empty = "allow"

View File

@ -1,5 +1,6 @@
use crate::prelude::*;
use crate::utils::vec2::Vec2;
use base::scalar::FloatCast;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use std::ops::{Index, IndexMut};

View File

@ -5,6 +5,7 @@ use crate::index::IndexOptions;
use crate::prelude::*;
use crate::utils::dir_ops::sync_dir;
use crate::utils::mmap_array::MmapArray;
use base::scalar::FloatCast;
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::sync::Arc;

View File

@ -1,456 +0,0 @@
#![allow(unused)]
use crate::algorithms::raw::Raw;
use crate::prelude::*;
use crossbeam::atomic::AtomicCell;
use parking_lot::RwLock;
use parking_lot::RwLockReadGuard;
use parking_lot::RwLockWriteGuard;
use rand::distributions::Uniform;
use rand::prelude::SliceRandom;
use rand::Rng;
use rayon::prelude::*;
use std::cmp::Reverse;
use std::collections::{BTreeMap, BinaryHeap, HashSet};
use std::sync::Arc;
pub struct VertexWithDistance {
pub id: u32,
pub distance: Scalar,
}
impl VertexWithDistance {
pub fn new(id: u32, distance: Scalar) -> Self {
Self { id, distance }
}
}
impl PartialEq for VertexWithDistance {
fn eq(&self, other: &Self) -> bool {
self.distance.eq(&other.distance)
}
}
impl Eq for VertexWithDistance {}
impl PartialOrd for VertexWithDistance {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.distance.cmp(&other.distance))
}
}
impl Ord for VertexWithDistance {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.distance.cmp(&other.distance)
}
}
/// DiskANN search state.
pub struct SearchState {
pub visited: HashSet<u32>,
candidates: BTreeMap<Scalar, u32>,
heap: BinaryHeap<Reverse<VertexWithDistance>>,
heap_visited: HashSet<u32>,
l: usize,
/// Number of results to return.
//TODO: used during search.
k: usize,
}
impl SearchState {
/// Creates a new search state.
pub(crate) fn new(k: usize, l: usize) -> Self {
Self {
visited: HashSet::new(),
candidates: BTreeMap::new(),
heap: BinaryHeap::new(),
heap_visited: HashSet::new(),
k,
l,
}
}
/// Return the next unvisited vertex.
fn pop(&mut self) -> Option<u32> {
while let Some(vertex) = self.heap.pop() {
if !self.candidates.contains_key(&vertex.0.distance) {
// The vertex has been removed from the candidate lists,
// from [`push()`].
continue;
}
self.visited.insert(vertex.0.id);
return Some(vertex.0.id);
}
None
}
/// Push a new (unvisited) vertex into the search state.
fn push(&mut self, vertex_id: u32, distance: Scalar) {
assert!(!self.visited.contains(&vertex_id));
self.heap_visited.insert(vertex_id);
self.heap
.push(Reverse(VertexWithDistance::new(vertex_id, distance)));
self.candidates.insert(distance, vertex_id);
if self.candidates.len() > self.l {
self.candidates.pop_last();
}
}
/// Mark a vertex as visited.
fn visit(&mut self, vertex_id: u32) {
self.visited.insert(vertex_id);
}
// Returns true if the vertex has been visited.
fn is_visited(&self, vertex_id: u32) -> bool {
self.visited.contains(&vertex_id) || self.heap_visited.contains(&vertex_id)
}
}
pub struct VamanaImpl {
raw: Arc<Raw>,
/// neighbors[vertex_id*r..(vertex_id+1)*r] records r neighbors for each vertex
neighbors: Vec<AtomicCell<u32>>,
/// neighbor_size[vertex_id] records the actual number of neighbors for each vertex
/// the RwLock is for protecting both the data for size and original data
neighbor_size: Vec<RwLock<u32>>,
/// the entry for the entire graph, the closet vector to centroid
medoid: u32,
dims: u16,
r: u32,
alpha: f32,
l: usize,
d: Distance,
}
unsafe impl Send for VamanaImpl {}
unsafe impl Sync for VamanaImpl {}
impl VamanaImpl {
pub fn new(
raw: Arc<Raw>,
n: u32,
dims: u16,
r: u32,
alpha: f32,
l: usize,
d: Distance,
) -> Self {
let neighbors = {
let mut result = Vec::new();
result.resize_with(r as usize * n as usize, || AtomicCell::new(0));
result
};
let neighbor_size = unsafe {
let mut result = Vec::new();
result.resize_with(n as usize, || RwLock::new(0));
result
};
let medoid = 0;
let mut new_vamana = Self {
raw,
neighbors,
neighbor_size,
medoid,
dims,
r,
alpha,
l,
d,
};
// 1. init graph with r random neighbors for each node
let rng = rand::thread_rng();
new_vamana._init_graph(n, rng.clone());
// 2. find medoid
new_vamana.medoid = new_vamana._find_medoid(n);
// 3. iterate pass
new_vamana._one_pass(n, 1.0, r, l, rng.clone());
new_vamana._one_pass(n, alpha, r, l, rng.clone());
new_vamana
}
pub fn search<F>(&self, target: Box<[Scalar]>, k: usize, f: F) -> Vec<(Scalar, Payload)>
where
F: FnMut(Payload) -> bool,
{
// TODO: filter
let state = self._greedy_search_with_filter(0, &target, k, k * 2, f);
let mut results = BinaryHeap::<(Scalar, u32)>::new();
for (distance, row) in state.candidates {
if results.len() == k {
break;
}
results.push((distance, row));
}
let mut res_vec: Vec<(Scalar, Payload)> = results
.iter()
.map(|x| (x.0, self.raw.payload(x.1)))
.collect();
res_vec.sort();
res_vec
}
fn _greedy_search_with_filter<F>(
&self,
start: u32,
query: &[Scalar],
k: usize,
search_size: usize,
mut f: F,
) -> SearchState
where
F: FnMut(Payload) -> bool,
{
let mut state = SearchState::new(k, search_size);
let dist = self.d.distance(query, self.raw.vector(start));
state.push(start, dist);
while let Some(id) = state.pop() {
// only pop id in the search list but not visited
state.visit(id);
{
let guard = self.neighbor_size[id as usize].read();
let neighbor_ids = self._get_neighbors(id, &guard);
for neighbor_id in neighbor_ids {
let neighbor_id = neighbor_id.load();
if state.is_visited(neighbor_id) {
continue;
}
if f(self.raw.payload(neighbor_id)) {
let dist = self.d.distance(query, self.raw.vector(neighbor_id));
state.push(neighbor_id, dist); // push and retain closet l nodes
}
}
}
}
state
}
fn _init_graph(&self, n: u32, mut rng: impl Rng) {
let distribution = Uniform::new(0, n);
for i in 0..n {
let mut neighbor_ids: HashSet<u32> = HashSet::new();
if self.r < n {
while neighbor_ids.len() < self.r as usize {
let neighbor_id = rng.sample(distribution);
if neighbor_id != i {
neighbor_ids.insert(neighbor_id);
}
}
} else {
neighbor_ids = (0..n).collect();
}
{
let mut guard = self.neighbor_size[i as usize].write();
self._set_neighbors(i, &neighbor_ids, &mut guard);
}
}
}
fn _set_neighbors(
&self,
vertex_index: u32,
neighbor_ids: &HashSet<u32>,
guard: &mut RwLockWriteGuard<u32>,
) {
assert!(neighbor_ids.len() <= self.r as usize);
for (i, item) in neighbor_ids.iter().enumerate() {
self.neighbors[vertex_index as usize * self.r as usize + i].store(*item);
}
**guard = neighbor_ids.len() as u32;
}
fn _get_neighbors(
&self,
vertex_index: u32,
guard: &RwLockReadGuard<u32>,
) -> &[AtomicCell<u32>] {
//TODO: store neighbor length
let size = **guard;
&self.neighbors[(vertex_index as usize * self.r as usize)
..(vertex_index as usize * self.r as usize + size as usize)]
}
fn _get_neighbors_with_write_guard(
&self,
vertex_index: u32,
guard: &RwLockWriteGuard<u32>,
) -> &[AtomicCell<u32>] {
let size = **guard;
&self.neighbors[(vertex_index as usize * self.r as usize)
..(vertex_index as usize * self.r as usize + size as usize)]
}
fn _find_medoid(&self, n: u32) -> u32 {
let centroid = self._compute_centroid(n);
let centroid_arr: &[Scalar] = &centroid;
let mut medoid_index = 0;
let mut min_dis = Scalar::INFINITY;
for i in 0..n {
let dis = self.d.distance(centroid_arr, self.raw.vector(i));
if dis < min_dis {
min_dis = dis;
medoid_index = i;
}
}
medoid_index
}
fn _compute_centroid(&self, n: u32) -> Vec<Scalar> {
let dim = self.dims as usize;
let mut sum = vec![0_f64; dim]; // change to f32 to avoid overflow
for i in 0..n {
let vec = self.raw.vector(i);
for j in 0..dim {
sum[j] += f32::from(vec[j]) as f64;
}
}
let collection: Vec<Scalar> = sum
.iter()
.map(|v| Scalar::from((*v / n as f64) as f32))
.collect();
collection
}
// r and l leave here for multiple pass extension
fn _one_pass(&self, n: u32, alpha: f32, r: u32, l: usize, mut rng: impl Rng) {
let mut ids = (0..n).collect::<Vec<_>>();
ids.shuffle(&mut rng);
ids.into_par_iter()
.for_each(|id| self.search_and_prune_for_one_vertex(id, alpha, r, l));
}
fn search_and_prune_for_one_vertex(&self, id: u32, alpha: f32, r: u32, l: usize) {
let query = self.raw.vector(id);
let mut state = self._greedy_search(self.medoid, query, 1, l);
state.visited.remove(&id); // in case visited has id itself
let mut new_neighbor_ids: HashSet<u32> = HashSet::new();
{
let mut guard = self.neighbor_size[id as usize].write();
let neighbor_ids = self._get_neighbors_with_write_guard(id, &guard);
state.visited.extend(neighbor_ids.iter().map(|x| x.load()));
let neighbor_ids = self._robust_prune(id, state.visited, alpha, r);
let neighbor_ids: HashSet<u32> = neighbor_ids.into_iter().collect();
self._set_neighbors(id, &neighbor_ids, &mut guard);
new_neighbor_ids = neighbor_ids;
}
for &neighbor_id in new_neighbor_ids.iter() {
{
let mut guard = self.neighbor_size[neighbor_id as usize].write();
let old_neighbors = self._get_neighbors_with_write_guard(neighbor_id, &guard);
let mut old_neighbors: HashSet<u32> =
old_neighbors.iter().map(|x| x.load()).collect();
old_neighbors.insert(id);
if old_neighbors.len() > r as usize {
// need robust prune
let new_neighbors = self._robust_prune(neighbor_id, old_neighbors, alpha, r);
let new_neighbors: HashSet<u32> = new_neighbors.into_iter().collect();
self._set_neighbors(neighbor_id, &new_neighbors, &mut guard);
} else {
self._set_neighbors(neighbor_id, &old_neighbors, &mut guard);
}
}
}
}
fn _greedy_search(
&self,
start: u32,
query: &[Scalar],
k: usize,
search_size: usize,
) -> SearchState {
let mut state = SearchState::new(k, search_size);
let dist = self.d.distance(query, self.raw.vector(start));
state.push(start, dist);
while let Some(id) = state.pop() {
// only pop id in the search list but not visited
state.visit(id);
{
let guard = self.neighbor_size[id as usize].read();
let neighbor_ids = self._get_neighbors(id, &guard);
for neighbor_id in neighbor_ids {
let neighbor_id = neighbor_id.load();
if state.is_visited(neighbor_id) {
continue;
}
let dist = self.d.distance(query, self.raw.vector(neighbor_id));
state.push(neighbor_id, dist); // push and retain closet l nodes
}
}
}
state
}
fn _robust_prune(&self, id: u32, mut visited: HashSet<u32>, alpha: f32, r: u32) -> Vec<u32> {
let mut heap: BinaryHeap<VertexWithDistance> = visited
.iter()
.map(|v| {
let dist = self.d.distance(self.raw.vector(id), self.raw.vector(*v));
VertexWithDistance {
id: *v,
distance: dist,
}
})
.collect();
let mut new_neighbor_ids: Vec<u32> = vec![];
while !visited.is_empty() {
if let Some(mut p) = heap.pop() {
while !visited.contains(&p.id) {
match heap.pop() {
Some(value) => {
p = value;
}
None => {
return new_neighbor_ids;
}
}
}
new_neighbor_ids.push(p.id);
if new_neighbor_ids.len() >= r as usize {
break;
}
let mut to_remove: HashSet<u32> = HashSet::new();
for pv in visited.iter() {
let dist_prime = self.d.distance(self.raw.vector(p.id), self.raw.vector(*pv));
let dist_query = self.d.distance(self.raw.vector(id), self.raw.vector(*pv));
if Scalar::from(alpha) * dist_prime <= dist_query {
to_remove.insert(*pv);
}
}
for pv in to_remove.iter() {
visited.remove(pv);
}
} else {
return new_neighbor_ids;
}
}
new_neighbor_ids
}
}

View File

@ -92,13 +92,10 @@ pub struct SegmentStat {
}
#[derive(Debug, Serialize, Deserialize)]
pub enum IndexStat {
Normal {
indexing: bool,
segments: Vec<SegmentStat>,
options: IndexOptions,
},
Upgrade,
pub struct IndexStat {
pub indexing: bool,
pub segments: Vec<SegmentStat>,
pub options: IndexOptions,
}
pub struct Index<S: G> {
@ -113,10 +110,10 @@ pub struct Index<S: G> {
}
impl<S: G> Index<S> {
pub fn create(path: PathBuf, options: IndexOptions) -> Result<Arc<Self>, ServiceError> {
pub fn create(path: PathBuf, options: IndexOptions) -> Result<Arc<Self>, CreateError> {
if let Err(err) = options.validate() {
return Err(ServiceError::BadOption {
validation: err.to_string(),
return Err(CreateError::InvalidIndexOptions {
reason: err.to_string(),
});
}
std::fs::create_dir(&path).unwrap();
@ -277,7 +274,7 @@ impl<S: G> Index<S> {
}
pub fn stat(&self) -> IndexStat {
let view = self.view();
IndexStat::Normal {
IndexStat {
indexing: self.instant_index.load() < self.instant_write.load(),
options: self.options().clone(),
segments: {
@ -326,9 +323,14 @@ impl<S: G> IndexView<S> {
vector: S::VectorRef<'_>,
opts: &'a SearchOptions,
filter: F,
) -> Result<impl Iterator<Item = Pointer> + 'a, ServiceError> {
) -> Result<impl Iterator<Item = Pointer> + 'a, BasicError> {
if self.options.vector.dims != vector.dims() {
return Err(ServiceError::Unmatched);
return Err(BasicError::InvalidVector);
}
if let Err(err) = opts.validate() {
return Err(BasicError::InvalidSearchOptions {
reason: err.to_string(),
});
}
struct Comparer(std::collections::BinaryHeap<Reverse<Element>>);
@ -399,9 +401,14 @@ impl<S: G> IndexView<S> {
vector: S::VectorRef<'a>,
opts: &'a SearchOptions,
filter: F,
) -> Result<impl Iterator<Item = Pointer> + 'a, ServiceError> {
) -> Result<impl Iterator<Item = Pointer> + 'a, VbaseError> {
if self.options.vector.dims != vector.dims() {
return Err(ServiceError::Unmatched);
return Err(VbaseError::InvalidVector);
}
if let Err(err) = opts.validate() {
return Err(VbaseError::InvalidSearchOptions {
reason: err.to_string(),
});
}
struct Filtering<'a, F: 'a> {
@ -463,7 +470,7 @@ impl<S: G> IndexView<S> {
}
}))
}
pub fn list(&self) -> impl Iterator<Item = Pointer> + '_ {
pub fn list(&self) -> Result<impl Iterator<Item = Pointer> + '_, ListError> {
let sealed = self
.sealed
.values()
@ -477,18 +484,19 @@ impl<S: G> IndexView<S> {
.iter()
.map(|(_, x)| x)
.flat_map(|x| (0..x.len()).map(|i| x.payload(i)));
sealed
let iter = sealed
.chain(growing)
.chain(write)
.filter_map(|p| self.delete.check(p))
.filter_map(|p| self.delete.check(p));
Ok(iter)
}
pub fn insert(
&self,
vector: S::VectorOwned,
pointer: Pointer,
) -> Result<Result<(), OutdatedError>, ServiceError> {
) -> Result<Result<(), OutdatedError>, InsertError> {
if self.options.vector.dims != vector.dims() {
return Err(ServiceError::Unmatched);
return Err(InsertError::InvalidVector);
}
let payload = (pointer.as_u48() << 16) | self.delete.version(pointer) as Payload;
@ -502,14 +510,16 @@ impl<S: G> IndexView<S> {
Ok(Err(OutdatedError))
}
}
pub fn delete(&self, p: Pointer) {
pub fn delete(&self, p: Pointer) -> Result<(), DeleteError> {
self.delete.delete(p);
Ok(())
}
pub fn flush(&self) {
pub fn flush(&self) -> Result<(), FlushError> {
self.delete.flush();
if let Some((_, write)) = &self.write {
write.flush();
}
Ok(())
}
}

View File

@ -14,9 +14,9 @@ pub struct OptimizingOptions {
#[serde(default = "OptimizingOptions::default_sealing_size")]
#[validate(range(min = 1, max = 4_000_000_000))]
pub sealing_size: u32,
#[serde(default = "OptimizingOptions::default_deleted_threshold", skip)]
#[serde(default = "OptimizingOptions::default_delete_threshold")]
#[validate(range(min = 0.01, max = 1.00))]
pub deleted_threshold: f64,
pub delete_threshold: f64,
#[serde(default = "OptimizingOptions::default_optimizing_threads")]
#[validate(range(min = 1, max = 65535))]
pub optimizing_threads: usize,
@ -29,7 +29,7 @@ impl OptimizingOptions {
fn default_sealing_size() -> u32 {
1
}
fn default_deleted_threshold() -> f64 {
fn default_delete_threshold() -> f64 {
0.2
}
fn default_optimizing_threads() -> usize {
@ -45,7 +45,7 @@ impl Default for OptimizingOptions {
Self {
sealing_secs: Self::default_sealing_secs(),
sealing_size: Self::default_sealing_size(),
deleted_threshold: Self::default_deleted_threshold(),
delete_threshold: Self::default_delete_threshold(),
optimizing_threads: Self::default_optimizing_threads(),
}
}

View File

@ -10,6 +10,22 @@ use crate::prelude::*;
use std::path::PathBuf;
use std::sync::Arc;
pub trait InstanceViewOperations {
fn basic<'a, F: Fn(Pointer) -> bool + Clone + 'a>(
&'a self,
vector: &'a DynamicVector,
opts: &'a SearchOptions,
filter: F,
) -> Result<Box<dyn Iterator<Item = Pointer> + 'a>, BasicError>;
fn vbase<'a, F: FnMut(Pointer) -> bool + Clone + 'a>(
&'a self,
vector: &'a DynamicVector,
opts: &'a SearchOptions,
filter: F,
) -> Result<Box<dyn Iterator<Item = Pointer> + 'a>, VbaseError>;
fn list(&self) -> Result<Box<dyn Iterator<Item = Pointer> + '_>, ListError>;
}
#[derive(Clone)]
pub enum Instance {
F32Cos(Arc<Index<F32Cos>>),
@ -28,7 +44,7 @@ pub enum Instance {
}
impl Instance {
pub fn create(path: PathBuf, options: IndexOptions) -> Result<Self, ServiceError> {
pub fn create(path: PathBuf, options: IndexOptions) -> Result<Self, CreateError> {
match (options.vector.d, options.vector.k) {
(Distance::Cos, Kind::F32) => {
let index = Index::create(path.clone(), options)?;
@ -148,21 +164,21 @@ impl Instance {
Instance::Upgrade => None,
}
}
pub fn stat(&self) -> IndexStat {
pub fn stat(&self) -> Option<IndexStat> {
match self {
Instance::F32Cos(x) => x.stat(),
Instance::F32Dot(x) => x.stat(),
Instance::F32L2(x) => x.stat(),
Instance::F16Cos(x) => x.stat(),
Instance::F16Dot(x) => x.stat(),
Instance::F16L2(x) => x.stat(),
Instance::SparseF32L2(x) => x.stat(),
Instance::SparseF32Cos(x) => x.stat(),
Instance::SparseF32Dot(x) => x.stat(),
Instance::BinaryCos(x) => x.stat(),
Instance::BinaryDot(x) => x.stat(),
Instance::BinaryL2(x) => x.stat(),
Instance::Upgrade => IndexStat::Upgrade,
Instance::F32Cos(x) => Some(x.stat()),
Instance::F32Dot(x) => Some(x.stat()),
Instance::F32L2(x) => Some(x.stat()),
Instance::F16Cos(x) => Some(x.stat()),
Instance::F16Dot(x) => Some(x.stat()),
Instance::F16L2(x) => Some(x.stat()),
Instance::SparseF32L2(x) => Some(x.stat()),
Instance::SparseF32Cos(x) => Some(x.stat()),
Instance::SparseF32Dot(x) => Some(x.stat()),
Instance::BinaryCos(x) => Some(x.stat()),
Instance::BinaryDot(x) => Some(x.stat()),
Instance::BinaryL2(x) => Some(x.stat()),
Instance::Upgrade => None,
}
}
}
@ -182,13 +198,13 @@ pub enum InstanceView {
BinaryL2(Arc<IndexView<BinaryL2>>),
}
impl InstanceView {
pub fn basic<'a, F: Fn(Pointer) -> bool + Clone + 'a>(
impl InstanceViewOperations for InstanceView {
fn basic<'a, F: Fn(Pointer) -> bool + Clone + 'a>(
&'a self,
vector: &'a DynamicVector,
opts: &'a SearchOptions,
filter: F,
) -> Result<impl Iterator<Item = Pointer> + 'a, ServiceError> {
) -> Result<Box<dyn Iterator<Item = Pointer> + 'a>, BasicError> {
match (self, vector) {
(InstanceView::F32Cos(x), DynamicVector::F32(vector)) => {
Ok(Box::new(x.basic(vector, opts, filter)?) as Box<dyn Iterator<Item = Pointer>>)
@ -226,15 +242,15 @@ impl InstanceView {
(InstanceView::BinaryL2(x), DynamicVector::Binary(vector)) => {
Ok(Box::new(x.basic(vector.into(), opts, filter)?))
}
_ => Err(ServiceError::Unmatched),
_ => Err(BasicError::InvalidVector),
}
}
pub fn vbase<'a, F: FnMut(Pointer) -> bool + Clone + 'a>(
fn vbase<'a, F: FnMut(Pointer) -> bool + Clone + 'a>(
&'a self,
vector: &'a DynamicVector,
opts: &'a SearchOptions,
filter: F,
) -> Result<impl Iterator<Item = Pointer> + '_, ServiceError> {
) -> Result<Box<dyn Iterator<Item = Pointer> + 'a>, VbaseError> {
match (self, vector) {
(InstanceView::F32Cos(x), DynamicVector::F32(vector)) => {
Ok(Box::new(x.vbase(vector, opts, filter)?) as Box<dyn Iterator<Item = Pointer>>)
@ -272,30 +288,33 @@ impl InstanceView {
(InstanceView::BinaryL2(x), DynamicVector::Binary(vector)) => {
Ok(Box::new(x.vbase(vector.into(), opts, filter)?))
}
_ => Err(ServiceError::Unmatched),
_ => Err(VbaseError::InvalidVector),
}
}
pub fn list(&self) -> impl Iterator<Item = Pointer> + '_ {
fn list(&self) -> Result<Box<dyn Iterator<Item = Pointer> + '_>, ListError> {
match self {
InstanceView::F32Cos(x) => Box::new(x.list()) as Box<dyn Iterator<Item = Pointer>>,
InstanceView::F32Dot(x) => Box::new(x.list()),
InstanceView::F32L2(x) => Box::new(x.list()),
InstanceView::F16Cos(x) => Box::new(x.list()),
InstanceView::F16Dot(x) => Box::new(x.list()),
InstanceView::F16L2(x) => Box::new(x.list()),
InstanceView::SparseF32Cos(x) => Box::new(x.list()),
InstanceView::SparseF32Dot(x) => Box::new(x.list()),
InstanceView::SparseF32L2(x) => Box::new(x.list()),
InstanceView::BinaryCos(x) => Box::new(x.list()),
InstanceView::BinaryDot(x) => Box::new(x.list()),
InstanceView::BinaryL2(x) => Box::new(x.list()),
InstanceView::F32Cos(x) => Ok(Box::new(x.list()?) as Box<dyn Iterator<Item = Pointer>>),
InstanceView::F32Dot(x) => Ok(Box::new(x.list()?)),
InstanceView::F32L2(x) => Ok(Box::new(x.list()?)),
InstanceView::F16Cos(x) => Ok(Box::new(x.list()?)),
InstanceView::F16Dot(x) => Ok(Box::new(x.list()?)),
InstanceView::F16L2(x) => Ok(Box::new(x.list()?)),
InstanceView::SparseF32Cos(x) => Ok(Box::new(x.list()?)),
InstanceView::SparseF32Dot(x) => Ok(Box::new(x.list()?)),
InstanceView::SparseF32L2(x) => Ok(Box::new(x.list()?)),
InstanceView::BinaryCos(x) => Ok(Box::new(x.list()?)),
InstanceView::BinaryDot(x) => Ok(Box::new(x.list()?)),
InstanceView::BinaryL2(x) => Ok(Box::new(x.list()?)),
}
}
}
impl InstanceView {
pub fn insert(
&self,
vector: DynamicVector,
pointer: Pointer,
) -> Result<Result<(), OutdatedError>, ServiceError> {
) -> Result<Result<(), OutdatedError>, InsertError> {
match (self, vector) {
(InstanceView::F32Cos(x), DynamicVector::F32(vector)) => x.insert(vector, pointer),
(InstanceView::F32Dot(x), DynamicVector::F32(vector)) => x.insert(vector, pointer),
@ -319,10 +338,10 @@ impl InstanceView {
x.insert(vector, pointer)
}
(InstanceView::BinaryL2(x), DynamicVector::Binary(vector)) => x.insert(vector, pointer),
_ => Err(ServiceError::Unmatched),
_ => Err(InsertError::InvalidVector),
}
}
pub fn delete(&self, pointer: Pointer) {
pub fn delete(&self, pointer: Pointer) -> Result<(), DeleteError> {
match self {
InstanceView::F32Cos(x) => x.delete(pointer),
InstanceView::F32Dot(x) => x.delete(pointer),
@ -338,7 +357,7 @@ impl InstanceView {
InstanceView::BinaryL2(x) => x.delete(pointer),
}
}
pub fn flush(&self) {
pub fn flush(&self) -> Result<(), FlushError> {
match self {
InstanceView::F32Cos(x) => x.flush(),
InstanceView::F32Dot(x) => x.flush(),

View File

@ -1,6 +1,5 @@
#![feature(core_intrinsics)]
#![feature(avx512_target_feature)]
#![feature(pointer_is_aligned)]
pub mod algorithms;
pub mod index;

View File

@ -1,37 +0,0 @@
use serde::{Deserialize, Serialize};
use thiserror::Error;
#[must_use]
#[derive(Debug, Clone, Error, Serialize, Deserialize)]
#[rustfmt::skip]
pub enum ServiceError {
#[error("\
The given index option is invalid.
INFORMATION: reason = {validation:?}\
")]
BadOption { validation: String },
#[error("\
The index is not existing in the background worker.
ADVICE: Drop or rebuild the index.\
")]
UnknownIndex,
#[error("\
The index is already existing in the background worker.\
")]
KnownIndex,
#[error("\
The given vector is invalid for input.
ADVICE: Check if dimensions and scalar type of the vector is matched with the index.\
")]
Unmatched,
#[error("\
The extension is upgraded so all index files are outdated.
ADVICE: Delete all index files. Please read `https://docs.pgvecto.rs/admin/upgrading.html`.\
")]
Upgrade,
#[error("\
The extension is upgraded so this index is outdated.
ADVICE: Rebuild the index. Please read `https://docs.pgvecto.rs/admin/upgrading.html`.\
")]
Upgrade2,
}

View File

@ -1,4 +1,5 @@
use crate::prelude::*;
use base::scalar::FloatCast;
pub fn cosine(lhs: &[F16], rhs: &[F16]) -> F32 {
#[inline(always)]

View File

@ -1,6 +1,6 @@
use std::borrow::Cow;
use crate::prelude::*;
use base::scalar::FloatCast;
use std::borrow::Cow;
#[derive(Debug, Clone, Copy)]
pub enum F16Cos {}

View File

@ -1,6 +1,6 @@
use std::borrow::Cow;
use crate::prelude::*;
use base::scalar::FloatCast;
use std::borrow::Cow;
#[derive(Debug, Clone, Copy)]
pub enum F16Dot {}

View File

@ -1,6 +1,6 @@
use std::borrow::Cow;
use crate::prelude::*;
use base::scalar::FloatCast;
use std::borrow::Cow;
#[derive(Debug, Clone, Copy)]
pub enum F16L2 {}

View File

@ -1,6 +1,5 @@
use std::borrow::Cow;
use crate::prelude::*;
use std::borrow::Cow;
#[derive(Debug, Clone, Copy)]
pub enum F32L2 {}

View File

@ -15,6 +15,9 @@ mod sparse_f32_cos;
mod sparse_f32_dot;
mod sparse_f32_l2;
pub use binary_cos::BinaryCos;
pub use binary_dot::BinaryDot;
pub use binary_l2::BinaryL2;
pub use f16_cos::F16Cos;
pub use f16_dot::F16Dot;
pub use f16_l2::F16L2;
@ -24,9 +27,6 @@ pub use f32_l2::F32L2;
pub use sparse_f32_cos::SparseF32Cos;
pub use sparse_f32_dot::SparseF32Dot;
pub use sparse_f32_l2::SparseF32L2;
pub use binary_cos::BinaryCos;
pub use binary_dot::BinaryDot;
pub use binary_l2::BinaryL2;
use crate::prelude::*;
use serde::{Deserialize, Serialize};
@ -50,7 +50,7 @@ pub trait G: Copy + Debug + 'static {
+ Zero
+ num_traits::NumOps
+ num_traits::NumAssignOps
+ FloatCast;
+ base::scalar::FloatCast;
type Storage: for<'a> Storage<VectorRef<'a> = Self::VectorRef<'a>>;
type L2: for<'a> G<Scalar = Self::Scalar, VectorRef<'a> = &'a [Self::Scalar]>;
type VectorOwned: Vector + Clone + Serialize + for<'a> Deserialize<'a>;
@ -126,33 +126,6 @@ pub trait G: Copy + Debug + 'static {
}
}
pub trait FloatCast: Sized {
fn from_f32(x: f32) -> Self;
fn to_f32(self) -> f32;
fn from_f(x: F32) -> Self {
Self::from_f32(x.0)
}
fn to_f(self) -> F32 {
F32(Self::to_f32(self))
}
}
pub trait Vector {
fn dims(&self) -> u16;
}
impl<T> Vector for Vec<T> {
fn dims(&self) -> u16 {
self.len().try_into().unwrap()
}
}
impl<'a, T> Vector for &'a [T] {
fn dims(&self) -> u16 {
self.len().try_into().unwrap()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DynamicVector {
F32(Vec<F32>),

View File

@ -47,6 +47,6 @@ impl G for SparseF32Cos {
}
fn elkan_k_means_distance2(lhs: &SparseF32, rhs: &[Self::Scalar]) -> F32 {
super::sparse_f32::dot_2( SparseF32Ref::from(lhs), rhs).acos()
super::sparse_f32::dot_2(SparseF32Ref::from(lhs), rhs).acos()
}
}

View File

@ -1,15 +1,13 @@
mod error;
mod global;
mod scalar;
mod search;
mod storage;
mod sys;
pub use self::error::ServiceError;
pub use self::global::*;
pub use self::scalar::{BinaryVec, BinaryVecRef, SparseF32, SparseF32Ref, F16, F32};
pub use self::search::{Element, Filter, Payload};
pub use self::storage::{DenseMmap, SparseMmap, Storage, BinaryMmap};
pub use self::sys::{Handle, Pointer};
pub use self::storage::{BinaryMmap, DenseMmap, SparseMmap, Storage};
pub use base::error::*;
pub use base::scalar::{F16, F32};
pub use base::search::{Element, Filter, Payload};
pub use base::sys::{Handle, Pointer};
pub use base::vector::{BinaryVec, BinaryVecRef, SparseF32, SparseF32Ref, Vector};
pub use num_traits::{Float, Zero};

View File

@ -1,9 +0,0 @@
mod binary;
mod f16;
mod f32;
mod sparse_f32;
pub use binary::{BinaryVec, BinaryVecRef};
pub use f16::F16;
pub use f32::F32;
pub use sparse_f32::{SparseF32, SparseF32Ref};

View File

@ -53,7 +53,9 @@ impl Storage for BinaryMmap {
ram: RawRam<S>,
) -> Self {
let n = ram.len();
let vectors_iter = (0..n).flat_map(|i| ram.vector(i).as_bytes().iter()).copied();
let vectors_iter = (0..n)
.flat_map(|i| ram.vector(i).as_bytes().iter())
.copied();
let payload_iter = (0..n).map(|i| ram.payload(i));
let vectors = MmapArray::create(&path.join("vectors"), vectors_iter);
let payload = MmapArray::create(&path.join("payload"), payload_iter);

View File

@ -1,10 +1,10 @@
mod binary;
mod dense;
mod sparse;
mod binary;
pub use binary::BinaryMmap;
pub use dense::DenseMmap;
pub use sparse::SparseMmap;
pub use binary::BinaryMmap;
use crate::algorithms::raw::RawRam;
use crate::index::IndexOptions;

View File

@ -1,7 +1,7 @@
pub mod metadata;
use crate::index::IndexOptions;
use crate::instance::Instance;
use crate::index::{IndexOptions, IndexStat};
use crate::instance::{Instance, InstanceView, InstanceViewOperations};
use crate::prelude::*;
use crate::utils::clean::clean;
use crate::utils::dir_ops::sync_dir;
@ -13,6 +13,25 @@ use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::Arc;
pub trait WorkerOperations {
type InstanceView: InstanceViewOperations;
fn create(&self, handle: Handle, options: IndexOptions) -> Result<(), CreateError>;
fn drop(&self, handle: Handle) -> Result<(), DropError>;
fn flush(&self, handle: Handle) -> Result<(), FlushError>;
fn insert(
&self,
handle: Handle,
vector: DynamicVector,
pointer: Pointer,
) -> Result<(), InsertError>;
fn delete(&self, handle: Handle, pointer: Pointer) -> Result<(), DeleteError>;
fn basic_view(&self, handle: Handle) -> Result<Self::InstanceView, BasicError>;
fn vbase_view(&self, handle: Handle) -> Result<Self::InstanceView, VbaseError>;
fn list_view(&self, handle: Handle) -> Result<Self::InstanceView, ListError>;
fn stat(&self, handle: Handle) -> Result<IndexStat, StatError>;
}
pub struct Worker {
path: PathBuf,
protect: Mutex<WorkerProtect>,
@ -65,11 +84,12 @@ impl Worker {
pub fn view(&self) -> Arc<WorkerView> {
self.view.load_full()
}
pub fn instance_create(
&self,
handle: Handle,
options: IndexOptions,
) -> Result<(), ServiceError> {
}
impl WorkerOperations for Worker {
type InstanceView = InstanceView;
fn create(&self, handle: Handle, options: IndexOptions) -> Result<(), CreateError> {
use std::collections::hash_map::Entry;
let mut protect = self.protect.lock();
match protect.indexes.entry(handle) {
@ -80,15 +100,70 @@ impl Worker {
protect.maintain(&self.view);
Ok(())
}
Entry::Occupied(_) => Err(ServiceError::KnownIndex),
Entry::Occupied(_) => Err(CreateError::Exist),
}
}
pub fn instance_destroy(&self, handle: Handle) {
fn drop(&self, handle: Handle) -> Result<(), DropError> {
let mut protect = self.protect.lock();
if protect.indexes.remove(&handle).is_some() {
protect.maintain(&self.view);
Ok(())
} else {
Err(DropError::NotExist)
}
}
fn flush(&self, handle: Handle) -> Result<(), FlushError> {
let view = self.view();
let instance = view.get(handle).ok_or(FlushError::NotExist)?;
let view = instance.view().ok_or(FlushError::Upgrade)?;
view.flush()?;
Ok(())
}
fn insert(
&self,
handle: Handle,
vector: DynamicVector,
pointer: Pointer,
) -> Result<(), InsertError> {
let view = self.view();
let instance = view.get(handle).ok_or(InsertError::NotExist)?;
loop {
let view = instance.view().ok_or(InsertError::Upgrade)?;
match view.insert(vector.clone(), pointer)? {
Ok(()) => break,
Err(_) => instance.refresh(),
}
}
Ok(())
}
fn delete(&self, handle: Handle, pointer: Pointer) -> Result<(), DeleteError> {
let view = self.view();
let instance = view.get(handle).ok_or(DeleteError::NotExist)?;
let view = instance.view().ok_or(DeleteError::Upgrade)?;
view.delete(pointer)?;
Ok(())
}
fn basic_view(&self, handle: Handle) -> Result<InstanceView, BasicError> {
let view = self.view();
let instance = view.get(handle).ok_or(BasicError::NotExist)?;
instance.view().ok_or(BasicError::Upgrade)
}
fn vbase_view(&self, handle: Handle) -> Result<InstanceView, VbaseError> {
let view = self.view();
let instance = view.get(handle).ok_or(VbaseError::NotExist)?;
instance.view().ok_or(VbaseError::Upgrade)
}
fn list_view(&self, handle: Handle) -> Result<InstanceView, ListError> {
let view = self.view();
let instance = view.get(handle).ok_or(ListError::NotExist)?;
instance.view().ok_or(ListError::Upgrade)
}
fn stat(&self, handle: Handle) -> Result<IndexStat, StatError> {
let view = self.view();
let instance = view.get(handle).ok_or(StatError::NotExist)?;
let stat = instance.stat().ok_or(StatError::Upgrade)?;
Ok(stat)
}
}
pub struct WorkerView {