diff --git a/crates/service/src/index/mod.rs b/crates/service/src/index/mod.rs index 65bec2e..a1cd65a 100644 --- a/crates/service/src/index/mod.rs +++ b/crates/service/src/index/mod.rs @@ -448,6 +448,25 @@ impl IndexView { } })) } + pub fn list(&self) -> impl Iterator + '_ { + let sealed = self + .sealed + .values() + .flat_map(|x| (0..x.len()).map(|i| x.payload(i))); + let growing = self + .growing + .values() + .flat_map(|x| (0..x.len()).map(|i| x.payload(i))); + let write = self + .write + .iter() + .map(|(_, x)| x) + .flat_map(|x| (0..x.len()).map(|i| x.payload(i))); + sealed + .chain(growing) + .chain(write) + .filter_map(|p| self.delete.check(p)) + } pub fn insert( &self, vector: Vec, @@ -467,37 +486,8 @@ impl IndexView { Ok(Err(OutdatedError)) } } - pub fn delete bool>(&self, mut f: F) { - for (_, sealed) in self.sealed.iter() { - let n = sealed.len(); - for i in 0..n { - if let Some(p) = self.delete.check(sealed.payload(i)) { - if f(p) { - self.delete.delete(p); - } - } - } - } - for (_, growing) in self.growing.iter() { - let n = growing.len(); - for i in 0..n { - if let Some(p) = self.delete.check(growing.payload(i)) { - if f(p) { - self.delete.delete(p); - } - } - } - } - if let Some((_, write)) = &self.write { - let n = write.len(); - for i in 0..n { - if let Some(p) = self.delete.check(write.payload(i)) { - if f(p) { - self.delete.delete(p); - } - } - } - } + pub fn delete(&self, p: Pointer) { + self.delete.delete(p); } pub fn flush(&self) { self.delete.flush(); diff --git a/crates/service/src/instance/mod.rs b/crates/service/src/instance/mod.rs index 29b426d..20b061f 100644 --- a/crates/service/src/instance/mod.rs +++ b/crates/service/src/instance/mod.rs @@ -173,6 +173,16 @@ impl InstanceView { _ => Err(ServiceError::Unmatched), } } + pub fn list(&self) -> impl Iterator + '_ { + match self { + InstanceView::F32Cos(x) => Box::new(x.list()) as Box>, + InstanceView::F32Dot(x) => Box::new(x.list()), + InstanceView::F32L2(x) => Box::new(x.list()), + InstanceView::F16Cos(x) => Box::new(x.list()), + InstanceView::F16Dot(x) => Box::new(x.list()), + InstanceView::F16L2(x) => Box::new(x.list()), + } + } pub fn insert( &self, vector: DynamicVector, @@ -188,14 +198,14 @@ impl InstanceView { _ => Err(ServiceError::Unmatched), } } - pub fn delete bool>(&self, f: F) { + pub fn delete(&self, pointer: Pointer) { match self { - InstanceView::F32Cos(x) => x.delete(f), - InstanceView::F32Dot(x) => x.delete(f), - InstanceView::F32L2(x) => x.delete(f), - InstanceView::F16Cos(x) => x.delete(f), - InstanceView::F16Dot(x) => x.delete(f), - InstanceView::F16L2(x) => x.delete(f), + InstanceView::F32Cos(x) => x.delete(pointer), + InstanceView::F32Dot(x) => x.delete(pointer), + InstanceView::F32L2(x) => x.delete(pointer), + InstanceView::F16Cos(x) => x.delete(pointer), + InstanceView::F16Dot(x) => x.delete(pointer), + InstanceView::F16L2(x) => x.delete(pointer), } } pub fn flush(&self) { diff --git a/src/bgworker/normal.rs b/src/bgworker/normal.rs index aa33c0e..cf87f8b 100644 --- a/src/bgworker/normal.rs +++ b/src/bgworker/normal.rs @@ -65,28 +65,17 @@ fn session(worker: Arc, handler: RpcHandler) -> Result { + RpcHandle::Flush { handle, x } => { let view = worker.view(); - for handle in pending_deletes { - worker.instance_destroy(handle); - } - for handle in pending_dirty { - if let Some(instance) = view.get(handle) { - if let Some(view) = instance.view() { - view.flush(); - } + if let Some(instance) = view.get(handle) { + if let Some(view) = instance.view() { + view.flush(); } } handler = x.leave()?; } - RpcHandle::Abort { pending_deletes, x } => { - for handle in pending_deletes { - worker.instance_destroy(handle); - } + RpcHandle::Drop { handle, x } => { + worker.instance_destroy(handle); handler = x.leave()?; } RpcHandle::Create { handle, options, x } => { @@ -120,7 +109,7 @@ fn session(worker: Arc, handler: RpcHandler) -> Result { + RpcHandle::Delete { handle, pointer, x } => { let view = worker.view(); let Some(instance) = view.get(handle) else { x.reset(ServiceError::UnknownIndex)?; @@ -129,7 +118,7 @@ fn session(worker: Arc, handler: RpcHandler) -> Result x, None => x.reset(ServiceError::Upgrade2)?, }; - instance_view.delete(|p| x.next(p).expect("Panic in VACUUM.")); + instance_view.delete(pointer); handler = x.leave()?; } RpcHandle::Stat { handle, x } => { @@ -204,6 +193,30 @@ fn session(worker: Arc, handler: RpcHandler) -> Result { + use crate::ipc::server::ListHandle::*; + let view = worker.view(); + let Some(instance) = view.get(handle) else { + x.reset(ServiceError::UnknownIndex)?; + }; + let view = match instance.view() { + Some(x) => x, + None => x.reset(ServiceError::Upgrade2)?, + }; + let mut it = view.list(); + let mut x = x.error()?; + loop { + match x.handle()? { + Next { x: y } => { + x = y.leave(it.next())?; + } + Leave { x } => { + handler = x; + break; + } + } + } + } // admin RpcHandle::Upgrade { x } => { handler = x.leave()?; diff --git a/src/bgworker/upgrade.rs b/src/bgworker/upgrade.rs index 3cb0021..7accc5f 100644 --- a/src/bgworker/upgrade.rs +++ b/src/bgworker/upgrade.rs @@ -57,18 +57,18 @@ fn session(handler: RpcHandler) -> Result<(), ConnectionError> { let mut handler = handler; loop { match handler.handle()? { - RpcHandle::Commit { x, .. } => { - handler = x.leave()?; - } - RpcHandle::Abort { x, .. } => { + RpcHandle::Drop { x, .. } => { + // false drop handler = x.leave()?; } + RpcHandle::Flush { x, .. } => x.reset(ServiceError::Upgrade)?, RpcHandle::Create { x, .. } => x.reset(ServiceError::Upgrade)?, RpcHandle::Insert { x, .. } => x.reset(ServiceError::Upgrade)?, RpcHandle::Delete { x, .. } => x.reset(ServiceError::Upgrade)?, RpcHandle::Stat { x, .. } => x.reset(ServiceError::Upgrade)?, RpcHandle::Basic { x, .. } => x.reset(ServiceError::Upgrade)?, RpcHandle::Vbase { x, .. } => x.reset(ServiceError::Upgrade)?, + RpcHandle::List { x, .. } => x.reset(ServiceError::Upgrade)?, RpcHandle::Upgrade { x } => { let _ = std::fs::remove_dir_all("./pg_vectors"); handler = x.leave()?; diff --git a/src/index/am_update.rs b/src/index/am_update.rs index 80459d7..04647ac 100644 --- a/src/index/am_update.rs +++ b/src/index/am_update.rs @@ -10,24 +10,15 @@ pub fn update_insert(handle: Handle, vector: DynamicVector, tid: pgrx::pg_sys::I rpc.insert(handle, vector, pointer); } -pub fn update_delete(handle: Handle, hook: impl Fn(Pointer) -> bool) { +pub fn update_delete(handle: Handle, f: impl Fn(Pointer) -> bool) { callback_dirty(handle); - struct Delete { - hook: H, - } - - impl crate::ipc::client::Delete for Delete - where - H: Fn(Pointer) -> bool, - { - fn test(&mut self, p: Pointer) -> bool { - (self.hook)(p) + let mut rpc_list = crate::ipc::client::borrow_mut().list(handle); + let mut rpc = crate::ipc::client::borrow_mut(); + while let Some(p) = rpc_list.next() { + if f(p) { + rpc.delete(handle, p); } } - - let client_delete = Delete { hook }; - - let mut rpc = crate::ipc::client::borrow_mut(); - rpc.delete(handle, client_delete); + rpc_list.leave(); } diff --git a/src/index/hook_transaction.rs b/src/index/hook_transaction.rs index 8ccba57..efc63aa 100644 --- a/src/index/hook_transaction.rs +++ b/src/index/hook_transaction.rs @@ -1,31 +1,40 @@ use crate::prelude::*; use crate::utils::cells::PgRefCell; use service::prelude::*; +use std::collections::BTreeSet; use std::ops::DerefMut; -static DIRTY: PgRefCell> = unsafe { PgRefCell::new(Vec::new()) }; +static DIRTY: PgRefCell> = unsafe { PgRefCell::new(BTreeSet::new()) }; pub fn callback_dirty(handle: Handle) { - DIRTY.borrow_mut().push(handle); + DIRTY.borrow_mut().insert(handle); } pub fn commit() { - let pending_deletes = pending_deletes(true); let pending_dirty = std::mem::take(DIRTY.borrow_mut().deref_mut()); + let pending_deletes = pending_deletes(true); if pending_deletes.is_empty() && pending_dirty.is_empty() { return; } let mut rpc = crate::ipc::client::borrow_mut(); - rpc.commit(pending_deletes, pending_dirty); + for handle in pending_dirty { + rpc.flush(handle); + } + for handle in pending_deletes { + rpc.drop(handle); + } } pub fn abort() { + let _pending_dirty = std::mem::take(DIRTY.borrow_mut().deref_mut()); let pending_deletes = pending_deletes(false); if pending_deletes.is_empty() { return; } let mut rpc = crate::ipc::client::borrow_mut(); - rpc.abort(pending_deletes); + for handle in pending_deletes { + rpc.drop(handle); + } } #[cfg(any(feature = "pg12", feature = "pg13", feature = "pg14", feature = "pg15"))] diff --git a/src/ipc/client/mod.rs b/src/ipc/client/mod.rs index 83247e6..c1841fd 100644 --- a/src/ipc/client/mod.rs +++ b/src/ipc/client/mod.rs @@ -53,18 +53,15 @@ impl Rpc { } impl ClientGuard { - pub fn commit(&mut self, pending_deletes: Vec, pending_dirty: Vec) { - let packet = RpcPacket::Commit { - pending_deletes, - pending_dirty, - }; + pub fn flush(&mut self, handle: Handle) { + let packet = RpcPacket::Flush { handle }; self.socket.ok(packet).friendly(); - let commit::CommitPacket::Leave {} = self.socket.recv().friendly(); + let flush::FlushPacket::Leave {} = self.socket.recv().friendly(); } - pub fn abort(&mut self, pending_deletes: Vec) { - let packet = RpcPacket::Abort { pending_deletes }; + pub fn drop(&mut self, handle: Handle) { + let packet = RpcPacket::Drop { handle }; self.socket.ok(packet).friendly(); - let abort::AbortPacket::Leave {} = self.socket.recv().friendly(); + let drop::DropPacket::Leave {} = self.socket.recv().friendly(); } pub fn create(&mut self, handle: Handle, options: IndexOptions) { let packet = RpcPacket::Create { handle, options }; @@ -83,24 +80,13 @@ impl ClientGuard { opts, }; self.socket.ok(packet).friendly(); - let vbase::VbaseErrorPacket {} = self.socket.recv().friendly(); + let basic::BasicErrorPacket {} = self.socket.recv().friendly(); ClientGuard::map(self) } - pub fn delete(&mut self, handle: Handle, mut t: impl Delete) { - let packet = RpcPacket::Delete { handle }; + pub fn delete(&mut self, handle: Handle, pointer: Pointer) { + let packet = RpcPacket::Delete { handle, pointer }; self.socket.ok(packet).friendly(); - loop { - match self.socket.recv().friendly() { - delete::DeletePacket::Test { p } => { - self.socket - .ok(delete::DeleteTestPacket { delete: t.test(p) }) - .friendly(); - } - delete::DeletePacket::Leave {} => { - return; - } - } - } + let delete::DeletePacket::Leave {} = self.socket.recv().friendly(); } pub fn insert(&mut self, handle: Handle, vector: DynamicVector, pointer: Pointer) { let packet = RpcPacket::Insert { @@ -132,6 +118,12 @@ impl ClientGuard { let vbase::VbaseErrorPacket {} = self.socket.recv().friendly(); ClientGuard::map(self) } + pub fn list(mut self, handle: Handle) -> ClientGuard { + let packet = RpcPacket::List { handle }; + self.socket.ok(packet).friendly(); + let list::ListErrorPacket {} = self.socket.recv().friendly(); + ClientGuard::map(self) + } pub fn upgrade(&mut self) { let packet = RpcPacket::Upgrade {}; self.socket.ok(packet).friendly(); @@ -149,10 +141,6 @@ impl ClientLike for Rpc { } } -pub trait Delete { - fn test(&mut self, p: Pointer) -> bool; -} - pub struct Vbase { socket: ClientSocket, } @@ -217,6 +205,38 @@ impl ClientLike for Basic { } } +pub struct List { + socket: ClientSocket, +} + +impl List { + pub fn next(&mut self) -> Option { + let packet = list::ListPacket::Next {}; + self.socket.ok(packet).friendly(); + let list::ListNextPacket { p } = self.socket.recv().friendly(); + p + } +} + +impl ClientGuard { + pub fn leave(mut self) -> ClientGuard { + let packet = list::ListPacket::Leave {}; + self.socket.ok(packet).friendly(); + let list::ListLeavePacket {} = self.socket.recv().friendly(); + ClientGuard::map(self) + } +} + +impl ClientLike for List { + fn from_socket(socket: ClientSocket) -> Self { + Self { socket } + } + + fn to_socket(self) -> ClientSocket { + self.socket + } +} + static CLIENTS: PgRefCell> = unsafe { PgRefCell::new(Vec::new()) }; pub fn borrow_mut() -> ClientGuard { diff --git a/src/ipc/packet/delete.rs b/src/ipc/packet/delete.rs index 393354f..f950a4c 100644 --- a/src/ipc/packet/delete.rs +++ b/src/ipc/packet/delete.rs @@ -1,13 +1,6 @@ use serde::{Deserialize, Serialize}; -use service::prelude::*; #[derive(Debug, Serialize, Deserialize)] pub enum DeletePacket { - Test { p: Pointer }, Leave {}, } - -#[derive(Debug, Serialize, Deserialize)] -pub struct DeleteTestPacket { - pub delete: bool, -} diff --git a/src/ipc/packet/abort.rs b/src/ipc/packet/drop.rs similarity index 80% rename from src/ipc/packet/abort.rs rename to src/ipc/packet/drop.rs index 0b5eabc..eb6ef0d 100644 --- a/src/ipc/packet/abort.rs +++ b/src/ipc/packet/drop.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] -pub enum AbortPacket { +pub enum DropPacket { Leave {}, } diff --git a/src/ipc/packet/commit.rs b/src/ipc/packet/flush.rs similarity index 79% rename from src/ipc/packet/commit.rs rename to src/ipc/packet/flush.rs index 5c4ad85..f39543d 100644 --- a/src/ipc/packet/commit.rs +++ b/src/ipc/packet/flush.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] -pub enum CommitPacket { +pub enum FlushPacket { Leave {}, } diff --git a/src/ipc/packet/list.rs b/src/ipc/packet/list.rs new file mode 100644 index 0000000..093ed5c --- /dev/null +++ b/src/ipc/packet/list.rs @@ -0,0 +1,19 @@ +use serde::{Deserialize, Serialize}; +use service::prelude::*; + +#[derive(Debug, Serialize, Deserialize)] +pub struct ListErrorPacket {} + +#[derive(Debug, Serialize, Deserialize)] +pub enum ListPacket { + Next {}, + Leave {}, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ListNextPacket { + pub p: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ListLeavePacket {} diff --git a/src/ipc/packet/mod.rs b/src/ipc/packet/mod.rs index 15601b8..d9cb6a0 100644 --- a/src/ipc/packet/mod.rs +++ b/src/ipc/packet/mod.rs @@ -1,9 +1,10 @@ -pub mod abort; pub mod basic; -pub mod commit; pub mod create; pub mod delete; +pub mod drop; +pub mod flush; pub mod insert; +pub mod list; pub mod stat; pub mod upgrade; pub mod vbase; @@ -16,12 +17,11 @@ use service::prelude::*; #[derive(Debug, Serialize, Deserialize)] pub enum RpcPacket { // transaction - Commit { - pending_deletes: Vec, - pending_dirty: Vec, + Flush { + handle: Handle, }, - Abort { - pending_deletes: Vec, + Drop { + handle: Handle, }, Create { handle: Handle, @@ -35,6 +35,7 @@ pub enum RpcPacket { }, Delete { handle: Handle, + pointer: Pointer, }, Stat { handle: Handle, @@ -49,6 +50,9 @@ pub enum RpcPacket { vector: DynamicVector, opts: SearchOptions, }, + List { + handle: Handle, + }, // admin Upgrade {}, } diff --git a/src/ipc/server/mod.rs b/src/ipc/server/mod.rs index 4e82e41..a77d3c1 100644 --- a/src/ipc/server/mod.rs +++ b/src/ipc/server/mod.rs @@ -16,19 +16,15 @@ impl RpcHandler { } pub fn handle(mut self) -> Result { Ok(match self.socket.recv::()? { - RpcPacket::Commit { - pending_deletes, - pending_dirty, - } => RpcHandle::Commit { - pending_deletes, - pending_dirty, - x: Commit { + RpcPacket::Flush { handle } => RpcHandle::Flush { + handle, + x: Flush { socket: self.socket, }, }, - RpcPacket::Abort { pending_deletes } => RpcHandle::Abort { - pending_deletes, - x: Abort { + RpcPacket::Drop { handle } => RpcHandle::Drop { + handle, + x: Drop { socket: self.socket, }, }, @@ -51,8 +47,9 @@ impl RpcHandler { socket: self.socket, }, }, - RpcPacket::Delete { handle } => RpcHandle::Delete { + RpcPacket::Delete { handle, pointer } => RpcHandle::Delete { handle, + pointer, x: Delete { socket: self.socket, }, @@ -87,6 +84,12 @@ impl RpcHandler { socket: self.socket, }, }, + RpcPacket::List { handle } => RpcHandle::List { + handle, + x: List { + socket: self.socket, + }, + }, RpcPacket::Upgrade {} => RpcHandle::Upgrade { x: Upgrade { socket: self.socket, @@ -97,14 +100,13 @@ impl RpcHandler { } pub enum RpcHandle { - Commit { - pending_deletes: Vec, - pending_dirty: Vec, - x: Commit, + Flush { + handle: Handle, + x: Flush, }, - Abort { - pending_deletes: Vec, - x: Abort, + Drop { + handle: Handle, + x: Drop, }, Create { handle: Handle, @@ -125,6 +127,7 @@ pub enum RpcHandle { }, Delete { handle: Handle, + pointer: Pointer, x: Delete, }, Stat { @@ -137,18 +140,22 @@ pub enum RpcHandle { opts: SearchOptions, x: Vbase, }, + List { + handle: Handle, + x: List, + }, Upgrade { x: Upgrade, }, } -pub struct Commit { +pub struct Flush { socket: ServerSocket, } -impl Commit { +impl Flush { pub fn leave(mut self) -> Result { - let packet = commit::CommitPacket::Leave {}; + let packet = flush::FlushPacket::Leave {}; self.socket.ok(packet)?; Ok(RpcHandler { socket: self.socket, @@ -160,13 +167,13 @@ impl Commit { } } -pub struct Abort { +pub struct Drop { socket: ServerSocket, } -impl Abort { +impl Drop { pub fn leave(mut self) -> Result { - let packet = abort::AbortPacket::Leave {}; + let packet = drop::DropPacket::Leave {}; self.socket.ok(packet)?; Ok(RpcHandler { socket: self.socket, @@ -217,12 +224,6 @@ pub struct Delete { } impl Delete { - pub fn next(&mut self, p: Pointer) -> Result { - let packet = delete::DeletePacket::Test { p }; - self.socket.ok(packet)?; - let delete::DeleteTestPacket { delete } = self.socket.recv::()?; - Ok(delete) - } pub fn leave(mut self) -> Result { let packet = delete::DeletePacket::Leave {}; self.socket.ok(packet)?; @@ -370,6 +371,65 @@ impl VbaseNext { } } +pub struct List { + socket: ServerSocket, +} + +impl List { + pub fn error(mut self) -> Result { + self.socket.ok(list::ListErrorPacket {})?; + Ok(ListHandler { + socket: self.socket, + }) + } + pub fn reset(mut self, err: ServiceError) -> Result { + self.socket.err(err) + } +} + +pub struct ListHandler { + socket: ServerSocket, +} + +impl ListHandler { + pub fn handle(mut self) -> Result { + Ok(match self.socket.recv::()? { + list::ListPacket::Next {} => ListHandle::Next { + x: ListNext { + socket: self.socket, + }, + }, + list::ListPacket::Leave {} => { + self.socket.ok(list::ListLeavePacket {})?; + ListHandle::Leave { + x: RpcHandler { + socket: self.socket, + }, + } + } + }) + } +} + +pub enum ListHandle { + Next { x: ListNext }, + Leave { x: RpcHandler }, +} + +pub struct ListNext { + socket: ServerSocket, +} + +impl ListNext { + pub fn leave(mut self, p: Option) -> Result { + let packet = list::ListNextPacket { p }; + self.socket.ok(packet)?; + Ok(ListHandler { + socket: self.socket, + }) + } +} + pub struct Upgrade { socket: ServerSocket, } diff --git a/tests/sqllogictest/index_vacuum.slt b/tests/sqllogictest/index_vacuum.slt new file mode 100644 index 0000000..7884ebd --- /dev/null +++ b/tests/sqllogictest/index_vacuum.slt @@ -0,0 +1,25 @@ +statement ok +SET search_path TO pg_temp, vectors; + +statement ok +CREATE TABLE t (id bigserial, val vector(3)); + +statement ok +INSERT INTO t (val) SELECT ARRAY[random(), random(), random()]::real[] FROM generate_series(1, 1000); + +statement ok +CREATE INDEX ON t USING vectors (val vector_l2_ops); + +statement ok +DELETE FROM t WHERE id % 2 = 0; + +statement ok +VACUUM FULL; + +query I +SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <-> '[0,0,0]' limit 10) t2; +---- +10 + +statement ok +DROP TABLE t;