1
0
mirror of https://github.com/tensorchord/pgvecto.rs.git synced 2025-08-07 03:22:55 +03:00

refactor: refine delete RPC (#351)

Signed-off-by: usamoi <usamoi@outlook.com>
This commit is contained in:
Usamoi
2024-02-07 10:31:40 +08:00
committed by GitHub
parent 6d4745b3ed
commit a4f5d28c10
14 changed files with 290 additions and 156 deletions

View File

@@ -448,6 +448,25 @@ impl<S: G> IndexView<S> {
}
}))
}
pub fn list(&self) -> impl Iterator<Item = Pointer> + '_ {
let sealed = self
.sealed
.values()
.flat_map(|x| (0..x.len()).map(|i| x.payload(i)));
let growing = self
.growing
.values()
.flat_map(|x| (0..x.len()).map(|i| x.payload(i)));
let write = self
.write
.iter()
.map(|(_, x)| x)
.flat_map(|x| (0..x.len()).map(|i| x.payload(i)));
sealed
.chain(growing)
.chain(write)
.filter_map(|p| self.delete.check(p))
}
pub fn insert(
&self,
vector: Vec<S::Scalar>,
@@ -467,37 +486,8 @@ impl<S: G> IndexView<S> {
Ok(Err(OutdatedError))
}
}
pub fn delete<F: FnMut(Pointer) -> bool>(&self, mut f: F) {
for (_, sealed) in self.sealed.iter() {
let n = sealed.len();
for i in 0..n {
if let Some(p) = self.delete.check(sealed.payload(i)) {
if f(p) {
self.delete.delete(p);
}
}
}
}
for (_, growing) in self.growing.iter() {
let n = growing.len();
for i in 0..n {
if let Some(p) = self.delete.check(growing.payload(i)) {
if f(p) {
self.delete.delete(p);
}
}
}
}
if let Some((_, write)) = &self.write {
let n = write.len();
for i in 0..n {
if let Some(p) = self.delete.check(write.payload(i)) {
if f(p) {
self.delete.delete(p);
}
}
}
}
pub fn delete(&self, p: Pointer) {
self.delete.delete(p);
}
pub fn flush(&self) {
self.delete.flush();

View File

@@ -173,6 +173,16 @@ impl InstanceView {
_ => Err(ServiceError::Unmatched),
}
}
pub fn list(&self) -> impl Iterator<Item = Pointer> + '_ {
match self {
InstanceView::F32Cos(x) => Box::new(x.list()) as Box<dyn Iterator<Item = Pointer>>,
InstanceView::F32Dot(x) => Box::new(x.list()),
InstanceView::F32L2(x) => Box::new(x.list()),
InstanceView::F16Cos(x) => Box::new(x.list()),
InstanceView::F16Dot(x) => Box::new(x.list()),
InstanceView::F16L2(x) => Box::new(x.list()),
}
}
pub fn insert(
&self,
vector: DynamicVector,
@@ -188,14 +198,14 @@ impl InstanceView {
_ => Err(ServiceError::Unmatched),
}
}
pub fn delete<F: FnMut(Pointer) -> bool>(&self, f: F) {
pub fn delete(&self, pointer: Pointer) {
match self {
InstanceView::F32Cos(x) => x.delete(f),
InstanceView::F32Dot(x) => x.delete(f),
InstanceView::F32L2(x) => x.delete(f),
InstanceView::F16Cos(x) => x.delete(f),
InstanceView::F16Dot(x) => x.delete(f),
InstanceView::F16L2(x) => x.delete(f),
InstanceView::F32Cos(x) => x.delete(pointer),
InstanceView::F32Dot(x) => x.delete(pointer),
InstanceView::F32L2(x) => x.delete(pointer),
InstanceView::F16Cos(x) => x.delete(pointer),
InstanceView::F16Dot(x) => x.delete(pointer),
InstanceView::F16L2(x) => x.delete(pointer),
}
}
pub fn flush(&self) {

View File

@@ -65,28 +65,17 @@ fn session(worker: Arc<Worker>, handler: RpcHandler) -> Result<!, ConnectionErro
loop {
match handler.handle()? {
// transaction
RpcHandle::Commit {
pending_deletes,
pending_dirty,
x,
} => {
RpcHandle::Flush { handle, x } => {
let view = worker.view();
for handle in pending_deletes {
worker.instance_destroy(handle);
}
for handle in pending_dirty {
if let Some(instance) = view.get(handle) {
if let Some(view) = instance.view() {
view.flush();
}
if let Some(instance) = view.get(handle) {
if let Some(view) = instance.view() {
view.flush();
}
}
handler = x.leave()?;
}
RpcHandle::Abort { pending_deletes, x } => {
for handle in pending_deletes {
worker.instance_destroy(handle);
}
RpcHandle::Drop { handle, x } => {
worker.instance_destroy(handle);
handler = x.leave()?;
}
RpcHandle::Create { handle, options, x } => {
@@ -120,7 +109,7 @@ fn session(worker: Arc<Worker>, handler: RpcHandler) -> Result<!, ConnectionErro
}
handler = x.leave()?;
}
RpcHandle::Delete { handle, mut x } => {
RpcHandle::Delete { handle, pointer, x } => {
let view = worker.view();
let Some(instance) = view.get(handle) else {
x.reset(ServiceError::UnknownIndex)?;
@@ -129,7 +118,7 @@ fn session(worker: Arc<Worker>, handler: RpcHandler) -> Result<!, ConnectionErro
Some(x) => x,
None => x.reset(ServiceError::Upgrade2)?,
};
instance_view.delete(|p| x.next(p).expect("Panic in VACUUM."));
instance_view.delete(pointer);
handler = x.leave()?;
}
RpcHandle::Stat { handle, x } => {
@@ -204,6 +193,30 @@ fn session(worker: Arc<Worker>, handler: RpcHandler) -> Result<!, ConnectionErro
}
}
}
RpcHandle::List { handle, x } => {
use crate::ipc::server::ListHandle::*;
let view = worker.view();
let Some(instance) = view.get(handle) else {
x.reset(ServiceError::UnknownIndex)?;
};
let view = match instance.view() {
Some(x) => x,
None => x.reset(ServiceError::Upgrade2)?,
};
let mut it = view.list();
let mut x = x.error()?;
loop {
match x.handle()? {
Next { x: y } => {
x = y.leave(it.next())?;
}
Leave { x } => {
handler = x;
break;
}
}
}
}
// admin
RpcHandle::Upgrade { x } => {
handler = x.leave()?;

View File

@@ -57,18 +57,18 @@ fn session(handler: RpcHandler) -> Result<(), ConnectionError> {
let mut handler = handler;
loop {
match handler.handle()? {
RpcHandle::Commit { x, .. } => {
handler = x.leave()?;
}
RpcHandle::Abort { x, .. } => {
RpcHandle::Drop { x, .. } => {
// false drop
handler = x.leave()?;
}
RpcHandle::Flush { x, .. } => x.reset(ServiceError::Upgrade)?,
RpcHandle::Create { x, .. } => x.reset(ServiceError::Upgrade)?,
RpcHandle::Insert { x, .. } => x.reset(ServiceError::Upgrade)?,
RpcHandle::Delete { x, .. } => x.reset(ServiceError::Upgrade)?,
RpcHandle::Stat { x, .. } => x.reset(ServiceError::Upgrade)?,
RpcHandle::Basic { x, .. } => x.reset(ServiceError::Upgrade)?,
RpcHandle::Vbase { x, .. } => x.reset(ServiceError::Upgrade)?,
RpcHandle::List { x, .. } => x.reset(ServiceError::Upgrade)?,
RpcHandle::Upgrade { x } => {
let _ = std::fs::remove_dir_all("./pg_vectors");
handler = x.leave()?;

View File

@@ -10,24 +10,15 @@ pub fn update_insert(handle: Handle, vector: DynamicVector, tid: pgrx::pg_sys::I
rpc.insert(handle, vector, pointer);
}
pub fn update_delete(handle: Handle, hook: impl Fn(Pointer) -> bool) {
pub fn update_delete(handle: Handle, f: impl Fn(Pointer) -> bool) {
callback_dirty(handle);
struct Delete<H> {
hook: H,
}
impl<H> crate::ipc::client::Delete for Delete<H>
where
H: Fn(Pointer) -> bool,
{
fn test(&mut self, p: Pointer) -> bool {
(self.hook)(p)
let mut rpc_list = crate::ipc::client::borrow_mut().list(handle);
let mut rpc = crate::ipc::client::borrow_mut();
while let Some(p) = rpc_list.next() {
if f(p) {
rpc.delete(handle, p);
}
}
let client_delete = Delete { hook };
let mut rpc = crate::ipc::client::borrow_mut();
rpc.delete(handle, client_delete);
rpc_list.leave();
}

View File

@@ -1,31 +1,40 @@
use crate::prelude::*;
use crate::utils::cells::PgRefCell;
use service::prelude::*;
use std::collections::BTreeSet;
use std::ops::DerefMut;
static DIRTY: PgRefCell<Vec<Handle>> = unsafe { PgRefCell::new(Vec::new()) };
static DIRTY: PgRefCell<BTreeSet<Handle>> = unsafe { PgRefCell::new(BTreeSet::new()) };
pub fn callback_dirty(handle: Handle) {
DIRTY.borrow_mut().push(handle);
DIRTY.borrow_mut().insert(handle);
}
pub fn commit() {
let pending_deletes = pending_deletes(true);
let pending_dirty = std::mem::take(DIRTY.borrow_mut().deref_mut());
let pending_deletes = pending_deletes(true);
if pending_deletes.is_empty() && pending_dirty.is_empty() {
return;
}
let mut rpc = crate::ipc::client::borrow_mut();
rpc.commit(pending_deletes, pending_dirty);
for handle in pending_dirty {
rpc.flush(handle);
}
for handle in pending_deletes {
rpc.drop(handle);
}
}
pub fn abort() {
let _pending_dirty = std::mem::take(DIRTY.borrow_mut().deref_mut());
let pending_deletes = pending_deletes(false);
if pending_deletes.is_empty() {
return;
}
let mut rpc = crate::ipc::client::borrow_mut();
rpc.abort(pending_deletes);
for handle in pending_deletes {
rpc.drop(handle);
}
}
#[cfg(any(feature = "pg12", feature = "pg13", feature = "pg14", feature = "pg15"))]

View File

@@ -53,18 +53,15 @@ impl Rpc {
}
impl ClientGuard<Rpc> {
pub fn commit(&mut self, pending_deletes: Vec<Handle>, pending_dirty: Vec<Handle>) {
let packet = RpcPacket::Commit {
pending_deletes,
pending_dirty,
};
pub fn flush(&mut self, handle: Handle) {
let packet = RpcPacket::Flush { handle };
self.socket.ok(packet).friendly();
let commit::CommitPacket::Leave {} = self.socket.recv().friendly();
let flush::FlushPacket::Leave {} = self.socket.recv().friendly();
}
pub fn abort(&mut self, pending_deletes: Vec<Handle>) {
let packet = RpcPacket::Abort { pending_deletes };
pub fn drop(&mut self, handle: Handle) {
let packet = RpcPacket::Drop { handle };
self.socket.ok(packet).friendly();
let abort::AbortPacket::Leave {} = self.socket.recv().friendly();
let drop::DropPacket::Leave {} = self.socket.recv().friendly();
}
pub fn create(&mut self, handle: Handle, options: IndexOptions) {
let packet = RpcPacket::Create { handle, options };
@@ -83,24 +80,13 @@ impl ClientGuard<Rpc> {
opts,
};
self.socket.ok(packet).friendly();
let vbase::VbaseErrorPacket {} = self.socket.recv().friendly();
let basic::BasicErrorPacket {} = self.socket.recv().friendly();
ClientGuard::map(self)
}
pub fn delete(&mut self, handle: Handle, mut t: impl Delete) {
let packet = RpcPacket::Delete { handle };
pub fn delete(&mut self, handle: Handle, pointer: Pointer) {
let packet = RpcPacket::Delete { handle, pointer };
self.socket.ok(packet).friendly();
loop {
match self.socket.recv().friendly() {
delete::DeletePacket::Test { p } => {
self.socket
.ok(delete::DeleteTestPacket { delete: t.test(p) })
.friendly();
}
delete::DeletePacket::Leave {} => {
return;
}
}
}
let delete::DeletePacket::Leave {} = self.socket.recv().friendly();
}
pub fn insert(&mut self, handle: Handle, vector: DynamicVector, pointer: Pointer) {
let packet = RpcPacket::Insert {
@@ -132,6 +118,12 @@ impl ClientGuard<Rpc> {
let vbase::VbaseErrorPacket {} = self.socket.recv().friendly();
ClientGuard::map(self)
}
pub fn list(mut self, handle: Handle) -> ClientGuard<List> {
let packet = RpcPacket::List { handle };
self.socket.ok(packet).friendly();
let list::ListErrorPacket {} = self.socket.recv().friendly();
ClientGuard::map(self)
}
pub fn upgrade(&mut self) {
let packet = RpcPacket::Upgrade {};
self.socket.ok(packet).friendly();
@@ -149,10 +141,6 @@ impl ClientLike for Rpc {
}
}
pub trait Delete {
fn test(&mut self, p: Pointer) -> bool;
}
pub struct Vbase {
socket: ClientSocket,
}
@@ -217,6 +205,38 @@ impl ClientLike for Basic {
}
}
pub struct List {
socket: ClientSocket,
}
impl List {
pub fn next(&mut self) -> Option<Pointer> {
let packet = list::ListPacket::Next {};
self.socket.ok(packet).friendly();
let list::ListNextPacket { p } = self.socket.recv().friendly();
p
}
}
impl ClientGuard<List> {
pub fn leave(mut self) -> ClientGuard<Rpc> {
let packet = list::ListPacket::Leave {};
self.socket.ok(packet).friendly();
let list::ListLeavePacket {} = self.socket.recv().friendly();
ClientGuard::map(self)
}
}
impl ClientLike for List {
fn from_socket(socket: ClientSocket) -> Self {
Self { socket }
}
fn to_socket(self) -> ClientSocket {
self.socket
}
}
static CLIENTS: PgRefCell<Vec<ClientSocket>> = unsafe { PgRefCell::new(Vec::new()) };
pub fn borrow_mut() -> ClientGuard<Rpc> {

View File

@@ -1,13 +1,6 @@
use serde::{Deserialize, Serialize};
use service::prelude::*;
#[derive(Debug, Serialize, Deserialize)]
pub enum DeletePacket {
Test { p: Pointer },
Leave {},
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DeleteTestPacket {
pub delete: bool,
}

View File

@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub enum AbortPacket {
pub enum DropPacket {
Leave {},
}

View File

@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub enum CommitPacket {
pub enum FlushPacket {
Leave {},
}

19
src/ipc/packet/list.rs Normal file
View File

@@ -0,0 +1,19 @@
use serde::{Deserialize, Serialize};
use service::prelude::*;
#[derive(Debug, Serialize, Deserialize)]
pub struct ListErrorPacket {}
#[derive(Debug, Serialize, Deserialize)]
pub enum ListPacket {
Next {},
Leave {},
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ListNextPacket {
pub p: Option<Pointer>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ListLeavePacket {}

View File

@@ -1,9 +1,10 @@
pub mod abort;
pub mod basic;
pub mod commit;
pub mod create;
pub mod delete;
pub mod drop;
pub mod flush;
pub mod insert;
pub mod list;
pub mod stat;
pub mod upgrade;
pub mod vbase;
@@ -16,12 +17,11 @@ use service::prelude::*;
#[derive(Debug, Serialize, Deserialize)]
pub enum RpcPacket {
// transaction
Commit {
pending_deletes: Vec<Handle>,
pending_dirty: Vec<Handle>,
Flush {
handle: Handle,
},
Abort {
pending_deletes: Vec<Handle>,
Drop {
handle: Handle,
},
Create {
handle: Handle,
@@ -35,6 +35,7 @@ pub enum RpcPacket {
},
Delete {
handle: Handle,
pointer: Pointer,
},
Stat {
handle: Handle,
@@ -49,6 +50,9 @@ pub enum RpcPacket {
vector: DynamicVector,
opts: SearchOptions,
},
List {
handle: Handle,
},
// admin
Upgrade {},
}

View File

@@ -16,19 +16,15 @@ impl RpcHandler {
}
pub fn handle(mut self) -> Result<RpcHandle, ConnectionError> {
Ok(match self.socket.recv::<RpcPacket>()? {
RpcPacket::Commit {
pending_deletes,
pending_dirty,
} => RpcHandle::Commit {
pending_deletes,
pending_dirty,
x: Commit {
RpcPacket::Flush { handle } => RpcHandle::Flush {
handle,
x: Flush {
socket: self.socket,
},
},
RpcPacket::Abort { pending_deletes } => RpcHandle::Abort {
pending_deletes,
x: Abort {
RpcPacket::Drop { handle } => RpcHandle::Drop {
handle,
x: Drop {
socket: self.socket,
},
},
@@ -51,8 +47,9 @@ impl RpcHandler {
socket: self.socket,
},
},
RpcPacket::Delete { handle } => RpcHandle::Delete {
RpcPacket::Delete { handle, pointer } => RpcHandle::Delete {
handle,
pointer,
x: Delete {
socket: self.socket,
},
@@ -87,6 +84,12 @@ impl RpcHandler {
socket: self.socket,
},
},
RpcPacket::List { handle } => RpcHandle::List {
handle,
x: List {
socket: self.socket,
},
},
RpcPacket::Upgrade {} => RpcHandle::Upgrade {
x: Upgrade {
socket: self.socket,
@@ -97,14 +100,13 @@ impl RpcHandler {
}
pub enum RpcHandle {
Commit {
pending_deletes: Vec<Handle>,
pending_dirty: Vec<Handle>,
x: Commit,
Flush {
handle: Handle,
x: Flush,
},
Abort {
pending_deletes: Vec<Handle>,
x: Abort,
Drop {
handle: Handle,
x: Drop,
},
Create {
handle: Handle,
@@ -125,6 +127,7 @@ pub enum RpcHandle {
},
Delete {
handle: Handle,
pointer: Pointer,
x: Delete,
},
Stat {
@@ -137,18 +140,22 @@ pub enum RpcHandle {
opts: SearchOptions,
x: Vbase,
},
List {
handle: Handle,
x: List,
},
Upgrade {
x: Upgrade,
},
}
pub struct Commit {
pub struct Flush {
socket: ServerSocket,
}
impl Commit {
impl Flush {
pub fn leave(mut self) -> Result<RpcHandler, ConnectionError> {
let packet = commit::CommitPacket::Leave {};
let packet = flush::FlushPacket::Leave {};
self.socket.ok(packet)?;
Ok(RpcHandler {
socket: self.socket,
@@ -160,13 +167,13 @@ impl Commit {
}
}
pub struct Abort {
pub struct Drop {
socket: ServerSocket,
}
impl Abort {
impl Drop {
pub fn leave(mut self) -> Result<RpcHandler, ConnectionError> {
let packet = abort::AbortPacket::Leave {};
let packet = drop::DropPacket::Leave {};
self.socket.ok(packet)?;
Ok(RpcHandler {
socket: self.socket,
@@ -217,12 +224,6 @@ pub struct Delete {
}
impl Delete {
pub fn next(&mut self, p: Pointer) -> Result<bool, ConnectionError> {
let packet = delete::DeletePacket::Test { p };
self.socket.ok(packet)?;
let delete::DeleteTestPacket { delete } = self.socket.recv::<delete::DeleteTestPacket>()?;
Ok(delete)
}
pub fn leave(mut self) -> Result<RpcHandler, ConnectionError> {
let packet = delete::DeletePacket::Leave {};
self.socket.ok(packet)?;
@@ -370,6 +371,65 @@ impl VbaseNext {
}
}
pub struct List {
socket: ServerSocket,
}
impl List {
pub fn error(mut self) -> Result<ListHandler, ConnectionError> {
self.socket.ok(list::ListErrorPacket {})?;
Ok(ListHandler {
socket: self.socket,
})
}
pub fn reset(mut self, err: ServiceError) -> Result<!, ConnectionError> {
self.socket.err(err)
}
}
pub struct ListHandler {
socket: ServerSocket,
}
impl ListHandler {
pub fn handle(mut self) -> Result<ListHandle, ConnectionError> {
Ok(match self.socket.recv::<list::ListPacket>()? {
list::ListPacket::Next {} => ListHandle::Next {
x: ListNext {
socket: self.socket,
},
},
list::ListPacket::Leave {} => {
self.socket.ok(list::ListLeavePacket {})?;
ListHandle::Leave {
x: RpcHandler {
socket: self.socket,
},
}
}
})
}
}
pub enum ListHandle {
Next { x: ListNext },
Leave { x: RpcHandler },
}
pub struct ListNext {
socket: ServerSocket,
}
impl ListNext {
pub fn leave(mut self, p: Option<Pointer>) -> Result<ListHandler, ConnectionError> {
let packet = list::ListNextPacket { p };
self.socket.ok(packet)?;
Ok(ListHandler {
socket: self.socket,
})
}
}
pub struct Upgrade {
socket: ServerSocket,
}

View File

@@ -0,0 +1,25 @@
statement ok
SET search_path TO pg_temp, vectors;
statement ok
CREATE TABLE t (id bigserial, val vector(3));
statement ok
INSERT INTO t (val) SELECT ARRAY[random(), random(), random()]::real[] FROM generate_series(1, 1000);
statement ok
CREATE INDEX ON t USING vectors (val vector_l2_ops);
statement ok
DELETE FROM t WHERE id % 2 = 0;
statement ok
VACUUM FULL;
query I
SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <-> '[0,0,0]' limit 10) t2;
----
10
statement ok
DROP TABLE t;