You've already forked pgvecto.rs
mirror of
https://github.com/tensorchord/pgvecto.rs.git
synced 2025-07-30 19:23:05 +03:00
chore: fine-grained upgrade hint (#220)
* chore: upgrade instruction for every index Signed-off-by: usamoi <usamoi@outlook.com> * fix: soft_version check Signed-off-by: usamoi <usamoi@outlook.com> * fix: index_stat view if need upgrade Signed-off-by: usamoi <usamoi@outlook.com> * fix: size info of write segment Signed-off-by: usamoi <usamoi@outlook.com> --------- Signed-off-by: usamoi <usamoi@outlook.com>
This commit is contained in:
@ -15,22 +15,20 @@ byteorder.workspace = true
|
||||
bincode.workspace = true
|
||||
half.workspace = true
|
||||
num-traits.workspace = true
|
||||
rand.workspace = true
|
||||
bytemuck.workspace = true
|
||||
c = { path = "../c" }
|
||||
detect = { path = "../detect" }
|
||||
rand = "0.8.5"
|
||||
crc32fast = "1.3.2"
|
||||
crossbeam = "0.8.2"
|
||||
dashmap = "5.4.0"
|
||||
parking_lot = "0.12.1"
|
||||
memoffset = "0.9.0"
|
||||
tempfile = "3.6.0"
|
||||
arrayvec = { version = "0.7.3", features = ["serde"] }
|
||||
memmap2 = "0.9.0"
|
||||
rayon = "1.6.1"
|
||||
uuid = { version = "1.6.1", features = ["serde"] }
|
||||
arc-swap = "1.6.0"
|
||||
bytemuck = { version = "1.14.0", features = ["extern_crate_alloc"] }
|
||||
serde_with = "3.4.0"
|
||||
multiversion = "0.7.3"
|
||||
|
||||
[target.'cfg(target_os = "macos")'.dependencies]
|
||||
|
@ -64,7 +64,7 @@ pub struct IndexOptions {
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct SegmentSizeInfo {
|
||||
pub struct SegmentStat {
|
||||
pub id: Uuid,
|
||||
#[serde(rename = "type")]
|
||||
pub typ: String,
|
||||
@ -73,13 +73,13 @@ pub struct SegmentSizeInfo {
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct IndexStat {
|
||||
pub indexing: bool,
|
||||
pub sealed: Vec<u32>,
|
||||
pub growing: Vec<u32>,
|
||||
pub write: u32,
|
||||
pub options: IndexOptions,
|
||||
pub sizes: Vec<SegmentSizeInfo>,
|
||||
pub enum IndexStat {
|
||||
Normal {
|
||||
indexing: bool,
|
||||
segments: Vec<SegmentStat>,
|
||||
options: IndexOptions,
|
||||
},
|
||||
Upgrade,
|
||||
}
|
||||
|
||||
pub struct Index<S: G> {
|
||||
@ -97,6 +97,11 @@ impl<S: G> Index<S> {
|
||||
pub fn create(path: PathBuf, options: IndexOptions) -> Arc<Self> {
|
||||
assert!(options.validate().is_ok());
|
||||
std::fs::create_dir(&path).unwrap();
|
||||
std::fs::write(
|
||||
path.join("options"),
|
||||
serde_json::to_string::<IndexOptions>(&options).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
std::fs::create_dir(path.join("segments")).unwrap();
|
||||
let startup = FileAtomic::create(
|
||||
path.join("startup"),
|
||||
@ -132,7 +137,10 @@ impl<S: G> Index<S> {
|
||||
OptimizerSealing::new(index.clone()).spawn();
|
||||
index
|
||||
}
|
||||
pub fn open(path: PathBuf, options: IndexOptions) -> Arc<Self> {
|
||||
pub fn open(path: PathBuf) -> Arc<Self> {
|
||||
let options =
|
||||
serde_json::from_slice::<IndexOptions>(&std::fs::read(path.join("options")).unwrap())
|
||||
.unwrap();
|
||||
let tracker = Arc::new(IndexTracker { path: path.clone() });
|
||||
let startup = FileAtomic::<IndexStartup>::open(path.join("startup"));
|
||||
clean(
|
||||
@ -244,18 +252,22 @@ impl<S: G> Index<S> {
|
||||
}
|
||||
pub fn stat(&self) -> IndexStat {
|
||||
let view = self.view();
|
||||
IndexStat {
|
||||
IndexStat::Normal {
|
||||
indexing: self.instant_index.load() < self.instant_write.load(),
|
||||
sealed: view.sealed.values().map(|x| x.len()).collect(),
|
||||
growing: view.growing.values().map(|x| x.len()).collect(),
|
||||
write: view.write.as_ref().map(|(_, x)| x.len()).unwrap_or(0),
|
||||
options: self.options().clone(),
|
||||
sizes: view
|
||||
.sealed
|
||||
.values()
|
||||
.map(|x| x.size())
|
||||
.chain(view.growing.values().map(|x| x.size()))
|
||||
.collect(),
|
||||
segments: {
|
||||
let mut segments = Vec::new();
|
||||
for sealed in view.sealed.values() {
|
||||
segments.push(sealed.stat_sealed());
|
||||
}
|
||||
for growing in view.growing.values() {
|
||||
segments.push(growing.stat_growing());
|
||||
}
|
||||
if let Some(write) = view.write.as_ref().map(|(_, x)| x) {
|
||||
segments.push(write.stat_write());
|
||||
}
|
||||
segments
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,9 +1,7 @@
|
||||
#![allow(clippy::all)] // Clippy bug.
|
||||
|
||||
use super::SegmentTracker;
|
||||
use crate::index::IndexOptions;
|
||||
use crate::index::IndexTracker;
|
||||
use crate::index::SegmentSizeInfo;
|
||||
use crate::index::SegmentStat;
|
||||
use crate::prelude::*;
|
||||
use crate::utils::dir_ops::sync_dir;
|
||||
use crate::utils::file_wal::FileWal;
|
||||
@ -12,8 +10,7 @@ use serde::{Deserialize, Serialize};
|
||||
use std::cell::UnsafeCell;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use thiserror::Error;
|
||||
use uuid::Uuid;
|
||||
@ -44,6 +41,7 @@ impl<S: G> GrowingSegment<S> {
|
||||
sync_dir(&path);
|
||||
Arc::new(Self {
|
||||
uuid,
|
||||
#[allow(clippy::uninit_vec)]
|
||||
vec: unsafe {
|
||||
let mut vec = Vec::with_capacity(capacity as usize);
|
||||
vec.set_len(capacity as usize);
|
||||
@ -141,14 +139,22 @@ impl<S: G> GrowingSegment<S> {
|
||||
pub fn len(&self) -> u32 {
|
||||
self.len.load(Ordering::Acquire) as u32
|
||||
}
|
||||
pub fn size(&self) -> SegmentSizeInfo {
|
||||
SegmentSizeInfo {
|
||||
pub fn stat_growing(&self) -> SegmentStat {
|
||||
SegmentStat {
|
||||
id: self.uuid,
|
||||
typ: "growing".to_string(),
|
||||
length: self.len() as usize,
|
||||
size: (self.len() as u64) * (std::mem::size_of::<Log<S>>() as u64),
|
||||
}
|
||||
}
|
||||
pub fn stat_write(&self) -> SegmentStat {
|
||||
SegmentStat {
|
||||
id: self.uuid,
|
||||
typ: "write".to_string(),
|
||||
length: self.len() as usize,
|
||||
size: (self.len() as u64) * (std::mem::size_of::<Log<S>>() as u64),
|
||||
}
|
||||
}
|
||||
pub fn vector(&self, i: u32) -> &[S::Scalar] {
|
||||
let i = i as usize;
|
||||
if i >= self.len.load(Ordering::Acquire) {
|
||||
|
@ -1,7 +1,7 @@
|
||||
use super::growing::GrowingSegment;
|
||||
use super::SegmentTracker;
|
||||
use crate::index::indexing::{DynamicIndexIter, DynamicIndexing};
|
||||
use crate::index::{IndexOptions, IndexTracker, SegmentSizeInfo};
|
||||
use crate::index::{IndexOptions, IndexTracker, SegmentStat};
|
||||
use crate::prelude::*;
|
||||
use crate::utils::dir_ops::{dir_size, sync_dir};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@ -58,21 +58,14 @@ impl<S: G> SealedSegment<S> {
|
||||
pub fn len(&self) -> u32 {
|
||||
self.indexing.len()
|
||||
}
|
||||
pub fn size(&self) -> SegmentSizeInfo {
|
||||
let mut info = SegmentSizeInfo {
|
||||
pub fn stat_sealed(&self) -> SegmentStat {
|
||||
let path = self._tracker.path.join("indexing");
|
||||
SegmentStat {
|
||||
id: self.uuid,
|
||||
typ: "sealed".to_string(),
|
||||
length: self.len() as usize,
|
||||
size: 0,
|
||||
};
|
||||
let path = self._tracker.path.join("indexing");
|
||||
match dir_size(&path) {
|
||||
Ok(size) => info.size = size as u64,
|
||||
Err(e) => {
|
||||
panic!("Failed to get size of {:?}: {}", path, e);
|
||||
}
|
||||
size: dir_size(&path).unwrap(),
|
||||
}
|
||||
info
|
||||
}
|
||||
pub fn vector(&self, i: u32) -> &[S::Scalar] {
|
||||
self.indexing.vector(i)
|
||||
|
46
crates/service/src/instance/metadata.rs
Normal file
46
crates/service/src/instance/metadata.rs
Normal file
@ -0,0 +1,46 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::error::Error;
|
||||
use std::path::Path;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum MetadataError {
|
||||
#[error("Invalid version.")]
|
||||
InvalidVersion,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Metadata {
|
||||
#[serde(default)]
|
||||
pub version: Option<u64>,
|
||||
#[serde(default)]
|
||||
pub soft_version: Option<u64>,
|
||||
}
|
||||
|
||||
impl Metadata {
|
||||
const VERSION: u64 = 2;
|
||||
const SOFT_VERSION: u64 = 1;
|
||||
}
|
||||
|
||||
impl Metadata {
|
||||
pub fn write(path: impl AsRef<Path>) {
|
||||
let metadata = Metadata {
|
||||
version: Some(Self::VERSION),
|
||||
soft_version: Some(Self::SOFT_VERSION),
|
||||
};
|
||||
let contents = serde_json::to_string(&metadata).unwrap();
|
||||
std::fs::write(path, contents).unwrap();
|
||||
}
|
||||
pub fn read(path: impl AsRef<Path>) -> Result<(), Box<dyn Error>> {
|
||||
use MetadataError::*;
|
||||
let contents = std::fs::read_to_string(path)?;
|
||||
let metadata = serde_json::from_str::<Metadata>(&contents)?;
|
||||
if Self::VERSION != metadata.version.ok_or(InvalidVersion)? {
|
||||
return Err(Box::new(InvalidVersion));
|
||||
}
|
||||
if Self::SOFT_VERSION < metadata.soft_version.ok_or(InvalidVersion)? {
|
||||
return Err(Box::new(InvalidVersion));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -1,3 +1,5 @@
|
||||
pub mod metadata;
|
||||
|
||||
use crate::index::segments::SearchGucs;
|
||||
use crate::index::Index;
|
||||
use crate::index::IndexOptions;
|
||||
@ -16,67 +18,120 @@ pub enum Instance {
|
||||
F16Cos(Arc<Index<F16Cos>>),
|
||||
F16Dot(Arc<Index<F16Dot>>),
|
||||
F16L2(Arc<Index<F16L2>>),
|
||||
Upgrade,
|
||||
}
|
||||
|
||||
impl Instance {
|
||||
pub fn create(path: PathBuf, options: IndexOptions) -> Self {
|
||||
match (options.vector.d, options.vector.k) {
|
||||
(Distance::Cos, Kind::F32) => Self::F32Cos(Index::create(path, options)),
|
||||
(Distance::Dot, Kind::F32) => Self::F32Dot(Index::create(path, options)),
|
||||
(Distance::L2, Kind::F32) => Self::F32L2(Index::create(path, options)),
|
||||
(Distance::Cos, Kind::F16) => Self::F16Cos(Index::create(path, options)),
|
||||
(Distance::Dot, Kind::F16) => Self::F16Dot(Index::create(path, options)),
|
||||
(Distance::L2, Kind::F16) => Self::F16L2(Index::create(path, options)),
|
||||
(Distance::Cos, Kind::F32) => {
|
||||
let index = Index::create(path.clone(), options);
|
||||
self::metadata::Metadata::write(path.join("metadata"));
|
||||
Self::F32Cos(index)
|
||||
}
|
||||
(Distance::Dot, Kind::F32) => {
|
||||
let index = Index::create(path.clone(), options);
|
||||
self::metadata::Metadata::write(path.join("metadata"));
|
||||
Self::F32Dot(index)
|
||||
}
|
||||
(Distance::L2, Kind::F32) => {
|
||||
let index = Index::create(path.clone(), options);
|
||||
self::metadata::Metadata::write(path.join("metadata"));
|
||||
Self::F32L2(index)
|
||||
}
|
||||
(Distance::Cos, Kind::F16) => {
|
||||
let index = Index::create(path.clone(), options);
|
||||
self::metadata::Metadata::write(path.join("metadata"));
|
||||
Self::F16Cos(index)
|
||||
}
|
||||
(Distance::Dot, Kind::F16) => {
|
||||
let index = Index::create(path.clone(), options);
|
||||
self::metadata::Metadata::write(path.join("metadata"));
|
||||
Self::F16Dot(index)
|
||||
}
|
||||
(Distance::L2, Kind::F16) => {
|
||||
let index = Index::create(path.clone(), options);
|
||||
self::metadata::Metadata::write(path.join("metadata"));
|
||||
Self::F16L2(index)
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn open(path: PathBuf, options: IndexOptions) -> Self {
|
||||
pub fn open(path: PathBuf) -> Self {
|
||||
if self::metadata::Metadata::read(path.join("metadata")).is_err() {
|
||||
return Self::Upgrade;
|
||||
}
|
||||
let options =
|
||||
serde_json::from_slice::<IndexOptions>(&std::fs::read(path.join("options")).unwrap())
|
||||
.unwrap();
|
||||
match (options.vector.d, options.vector.k) {
|
||||
(Distance::Cos, Kind::F32) => Self::F32Cos(Index::open(path, options)),
|
||||
(Distance::Dot, Kind::F32) => Self::F32Dot(Index::open(path, options)),
|
||||
(Distance::L2, Kind::F32) => Self::F32L2(Index::open(path, options)),
|
||||
(Distance::Cos, Kind::F16) => Self::F16Cos(Index::open(path, options)),
|
||||
(Distance::Dot, Kind::F16) => Self::F16Dot(Index::open(path, options)),
|
||||
(Distance::L2, Kind::F16) => Self::F16L2(Index::open(path, options)),
|
||||
(Distance::Cos, Kind::F32) => Self::F32Cos(Index::open(path)),
|
||||
(Distance::Dot, Kind::F32) => Self::F32Dot(Index::open(path)),
|
||||
(Distance::L2, Kind::F32) => Self::F32L2(Index::open(path)),
|
||||
(Distance::Cos, Kind::F16) => Self::F16Cos(Index::open(path)),
|
||||
(Distance::Dot, Kind::F16) => Self::F16Dot(Index::open(path)),
|
||||
(Distance::L2, Kind::F16) => Self::F16L2(Index::open(path)),
|
||||
}
|
||||
}
|
||||
pub fn options(&self) -> &IndexOptions {
|
||||
pub fn options(&self) -> Result<&IndexOptions, FriendlyError> {
|
||||
match self {
|
||||
Instance::F32Cos(x) => x.options(),
|
||||
Instance::F32Dot(x) => x.options(),
|
||||
Instance::F32L2(x) => x.options(),
|
||||
Instance::F16Cos(x) => x.options(),
|
||||
Instance::F16Dot(x) => x.options(),
|
||||
Instance::F16L2(x) => x.options(),
|
||||
Instance::F32Cos(x) => Ok(x.options()),
|
||||
Instance::F32Dot(x) => Ok(x.options()),
|
||||
Instance::F32L2(x) => Ok(x.options()),
|
||||
Instance::F16Cos(x) => Ok(x.options()),
|
||||
Instance::F16Dot(x) => Ok(x.options()),
|
||||
Instance::F16L2(x) => Ok(x.options()),
|
||||
Instance::Upgrade => Err(FriendlyError::Upgrade2),
|
||||
}
|
||||
}
|
||||
pub fn refresh(&self) {
|
||||
pub fn refresh(&self) -> Result<(), FriendlyError> {
|
||||
match self {
|
||||
Instance::F32Cos(x) => x.refresh(),
|
||||
Instance::F32Dot(x) => x.refresh(),
|
||||
Instance::F32L2(x) => x.refresh(),
|
||||
Instance::F16Cos(x) => x.refresh(),
|
||||
Instance::F16Dot(x) => x.refresh(),
|
||||
Instance::F16L2(x) => x.refresh(),
|
||||
Instance::F32Cos(x) => {
|
||||
x.refresh();
|
||||
Ok(())
|
||||
}
|
||||
Instance::F32Dot(x) => {
|
||||
x.refresh();
|
||||
Ok(())
|
||||
}
|
||||
Instance::F32L2(x) => {
|
||||
x.refresh();
|
||||
Ok(())
|
||||
}
|
||||
Instance::F16Cos(x) => {
|
||||
x.refresh();
|
||||
Ok(())
|
||||
}
|
||||
Instance::F16Dot(x) => {
|
||||
x.refresh();
|
||||
Ok(())
|
||||
}
|
||||
Instance::F16L2(x) => {
|
||||
x.refresh();
|
||||
Ok(())
|
||||
}
|
||||
Instance::Upgrade => Err(FriendlyError::Upgrade2),
|
||||
}
|
||||
}
|
||||
pub fn view(&self) -> InstanceView {
|
||||
pub fn view(&self) -> Result<InstanceView, FriendlyError> {
|
||||
match self {
|
||||
Instance::F32Cos(x) => InstanceView::F32Cos(x.view()),
|
||||
Instance::F32Dot(x) => InstanceView::F32Dot(x.view()),
|
||||
Instance::F32L2(x) => InstanceView::F32L2(x.view()),
|
||||
Instance::F16Cos(x) => InstanceView::F16Cos(x.view()),
|
||||
Instance::F16Dot(x) => InstanceView::F16Dot(x.view()),
|
||||
Instance::F16L2(x) => InstanceView::F16L2(x.view()),
|
||||
Instance::F32Cos(x) => Ok(InstanceView::F32Cos(x.view())),
|
||||
Instance::F32Dot(x) => Ok(InstanceView::F32Dot(x.view())),
|
||||
Instance::F32L2(x) => Ok(InstanceView::F32L2(x.view())),
|
||||
Instance::F16Cos(x) => Ok(InstanceView::F16Cos(x.view())),
|
||||
Instance::F16Dot(x) => Ok(InstanceView::F16Dot(x.view())),
|
||||
Instance::F16L2(x) => Ok(InstanceView::F16L2(x.view())),
|
||||
Instance::Upgrade => Err(FriendlyError::Upgrade2),
|
||||
}
|
||||
}
|
||||
pub fn stat(&self) -> IndexStat {
|
||||
pub fn stat(&self) -> Result<IndexStat, FriendlyError> {
|
||||
match self {
|
||||
Instance::F32Cos(x) => x.stat(),
|
||||
Instance::F32Dot(x) => x.stat(),
|
||||
Instance::F32L2(x) => x.stat(),
|
||||
Instance::F16Cos(x) => x.stat(),
|
||||
Instance::F16Dot(x) => x.stat(),
|
||||
Instance::F16L2(x) => x.stat(),
|
||||
Instance::F32Cos(x) => Ok(x.stat()),
|
||||
Instance::F32Dot(x) => Ok(x.stat()),
|
||||
Instance::F32L2(x) => Ok(x.stat()),
|
||||
Instance::F16Cos(x) => Ok(x.stat()),
|
||||
Instance::F16Dot(x) => Ok(x.stat()),
|
||||
Instance::F16L2(x) => Ok(x.stat()),
|
||||
Instance::Upgrade => Ok(IndexStat::Upgrade),
|
||||
}
|
||||
}
|
||||
}
|
@ -3,6 +3,7 @@
|
||||
|
||||
pub mod algorithms;
|
||||
pub mod index;
|
||||
pub mod instance;
|
||||
pub mod prelude;
|
||||
pub mod worker;
|
||||
|
||||
|
@ -69,10 +69,15 @@ Please check the full PostgreSQL log to get more information.\
|
||||
")]
|
||||
Ipc,
|
||||
#[error("\
|
||||
The extension is upgraded. However, the index files is outdated.
|
||||
ADVICE: Please read `https://github.com/tensorchord/pgvecto.rs/blob/main/docs/upgrade.md`.\
|
||||
The extension is upgraded so all index files are outdated.
|
||||
ADVICE: Delete all index files. Please read `https://github.com/tensorchord/pgvecto.rs/blob/main/docs/upgrade.md`.\
|
||||
")]
|
||||
Upgrade,
|
||||
#[error("\
|
||||
The extension is upgraded so this index is outdated.
|
||||
ADVICE: Rebuild the index. Please read `https://github.com/tensorchord/pgvecto.rs/blob/main/docs/upgrade.md`.\
|
||||
")]
|
||||
Upgrade2,
|
||||
}
|
||||
|
||||
pub trait FriendlyErrorLike: Sized {
|
||||
|
@ -11,6 +11,6 @@ pub use self::scalar::{F16, F32};
|
||||
|
||||
pub use self::filter::{Filter, Payload};
|
||||
pub use self::heap::{Heap, HeapElement};
|
||||
pub use self::sys::{Id, Pointer};
|
||||
pub use self::sys::{Handle, Pointer};
|
||||
|
||||
pub use num_traits::{Float, Zero};
|
||||
|
@ -2,27 +2,27 @@ use serde::{Deserialize, Serialize};
|
||||
use std::{fmt::Display, num::ParseIntError, str::FromStr};
|
||||
|
||||
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
|
||||
pub struct Id {
|
||||
pub struct Handle {
|
||||
pub newtype: u32,
|
||||
}
|
||||
|
||||
impl Id {
|
||||
impl Handle {
|
||||
pub fn as_u32(self) -> u32 {
|
||||
self.newtype
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for Id {
|
||||
impl Display for Handle {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.as_u32())
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for Id {
|
||||
impl FromStr for Handle {
|
||||
type Err = ParseIntError;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
Ok(Id {
|
||||
Ok(Handle {
|
||||
newtype: u32::from_str(s)?,
|
||||
})
|
||||
}
|
||||
|
@ -7,7 +7,7 @@ pub fn sync_dir(path: impl AsRef<Path>) {
|
||||
file.sync_all().expect("Failed to sync dir.");
|
||||
}
|
||||
|
||||
pub fn dir_size(dir: &Path) -> io::Result<usize> {
|
||||
pub fn dir_size(dir: &Path) -> io::Result<u64> {
|
||||
let mut size = 0;
|
||||
if dir.is_dir() {
|
||||
for entry in read_dir(dir)? {
|
||||
@ -21,7 +21,7 @@ pub fn dir_size(dir: &Path) -> io::Result<usize> {
|
||||
if path.is_dir() {
|
||||
size += dir_size(&path)?;
|
||||
} else {
|
||||
size += entry.metadata()?.len() as usize;
|
||||
size += entry.metadata()?.len();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -18,7 +18,7 @@ pub struct Metadata {
|
||||
}
|
||||
|
||||
impl Metadata {
|
||||
const VERSION: u64 = 1;
|
||||
const VERSION: u64 = 2;
|
||||
const SOFT_VERSION: u64 = 1;
|
||||
}
|
||||
|
||||
@ -38,7 +38,7 @@ impl Metadata {
|
||||
if Self::VERSION != metadata.version.ok_or(InvalidVersion)? {
|
||||
return Err(Box::new(InvalidVersion));
|
||||
}
|
||||
if Self::SOFT_VERSION <= metadata.soft_version.ok_or(InvalidVersion)? {
|
||||
if Self::SOFT_VERSION < metadata.soft_version.ok_or(InvalidVersion)? {
|
||||
return Err(Box::new(InvalidVersion));
|
||||
}
|
||||
Ok(())
|
||||
|
@ -1,11 +1,10 @@
|
||||
pub mod instance;
|
||||
pub mod metadata;
|
||||
|
||||
use self::instance::Instance;
|
||||
use crate::index::segments::SearchGucs;
|
||||
use crate::index::IndexOptions;
|
||||
use crate::index::IndexStat;
|
||||
use crate::index::OutdatedError;
|
||||
use crate::instance::Instance;
|
||||
use crate::prelude::*;
|
||||
use crate::utils::clean::clean;
|
||||
use crate::utils::dir_ops::sync_dir;
|
||||
@ -13,8 +12,7 @@ use crate::utils::file_atomic::FileAtomic;
|
||||
use arc_swap::ArcSwap;
|
||||
use parking_lot::Mutex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::DisplayFromStr;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
@ -49,12 +47,12 @@ impl Worker {
|
||||
let startup = FileAtomic::<WorkerStartup>::open(path.join("startup"));
|
||||
clean(
|
||||
path.join("indexes"),
|
||||
startup.get().indexes.keys().map(|s| s.to_string()),
|
||||
startup.get().indexes.iter().map(|s| s.to_string()),
|
||||
);
|
||||
let mut indexes = HashMap::new();
|
||||
for (&id, options) in startup.get().indexes.iter() {
|
||||
for &id in startup.get().indexes.iter() {
|
||||
let path = path.join("indexes").join(id.to_string());
|
||||
let index = Instance::open(path, options.clone());
|
||||
let index = Instance::open(path);
|
||||
indexes.insert(id, index);
|
||||
}
|
||||
let view = Arc::new(WorkerView {
|
||||
@ -67,17 +65,17 @@ impl Worker {
|
||||
view: ArcSwap::new(view),
|
||||
})
|
||||
}
|
||||
pub fn call_create(&self, id: Id, options: IndexOptions) {
|
||||
pub fn call_create(&self, handle: Handle, options: IndexOptions) {
|
||||
let mut protect = self.protect.lock();
|
||||
let index = Instance::create(self.path.join("indexes").join(id.to_string()), options);
|
||||
if protect.indexes.insert(id, index).is_some() {
|
||||
panic!("index {} already exists", id)
|
||||
let index = Instance::create(self.path.join("indexes").join(handle.to_string()), options);
|
||||
if protect.indexes.insert(handle, index).is_some() {
|
||||
panic!("index {} already exists", handle)
|
||||
}
|
||||
protect.maintain(&self.view);
|
||||
}
|
||||
pub fn call_search<F>(
|
||||
&self,
|
||||
id: Id,
|
||||
handle: Handle,
|
||||
search: (DynamicVector, usize),
|
||||
gucs: SearchGucs,
|
||||
filter: F,
|
||||
@ -86,80 +84,90 @@ impl Worker {
|
||||
F: FnMut(Pointer) -> bool,
|
||||
{
|
||||
let view = self.view.load_full();
|
||||
let index = view.indexes.get(&id).ok_or(FriendlyError::UnknownIndex)?;
|
||||
let view = index.view();
|
||||
let index = view
|
||||
.indexes
|
||||
.get(&handle)
|
||||
.ok_or(FriendlyError::UnknownIndex)?;
|
||||
let view = index.view()?;
|
||||
view.search(search.1, search.0, gucs, filter)
|
||||
}
|
||||
pub fn call_insert(
|
||||
&self,
|
||||
id: Id,
|
||||
handle: Handle,
|
||||
insert: (DynamicVector, Pointer),
|
||||
) -> Result<(), FriendlyError> {
|
||||
let view = self.view.load_full();
|
||||
let index = view.indexes.get(&id).ok_or(FriendlyError::UnknownIndex)?;
|
||||
let index = view
|
||||
.indexes
|
||||
.get(&handle)
|
||||
.ok_or(FriendlyError::UnknownIndex)?;
|
||||
loop {
|
||||
let view = index.view();
|
||||
let view = index.view()?;
|
||||
match view.insert(insert.0.clone(), insert.1)? {
|
||||
Ok(()) => break Ok(()),
|
||||
Err(OutdatedError(_)) => index.refresh(),
|
||||
Err(OutdatedError(_)) => index.refresh()?,
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn call_delete<F>(&self, id: Id, f: F) -> Result<(), FriendlyError>
|
||||
pub fn call_delete<F>(&self, handle: Handle, f: F) -> Result<(), FriendlyError>
|
||||
where
|
||||
F: FnMut(Pointer) -> bool,
|
||||
{
|
||||
let view = self.view.load_full();
|
||||
let index = view.indexes.get(&id).ok_or(FriendlyError::UnknownIndex)?;
|
||||
let view = index.view();
|
||||
let index = view
|
||||
.indexes
|
||||
.get(&handle)
|
||||
.ok_or(FriendlyError::UnknownIndex)?;
|
||||
let view = index.view()?;
|
||||
view.delete(f);
|
||||
Ok(())
|
||||
}
|
||||
pub fn call_flush(&self, id: Id) -> Result<(), FriendlyError> {
|
||||
pub fn call_flush(&self, handle: Handle) -> Result<(), FriendlyError> {
|
||||
let view = self.view.load_full();
|
||||
let index = view.indexes.get(&id).ok_or(FriendlyError::UnknownIndex)?;
|
||||
let view = index.view();
|
||||
let index = view
|
||||
.indexes
|
||||
.get(&handle)
|
||||
.ok_or(FriendlyError::UnknownIndex)?;
|
||||
let view = index.view()?;
|
||||
view.flush();
|
||||
Ok(())
|
||||
}
|
||||
pub fn call_destory(&self, ids: Vec<Id>) {
|
||||
let mut updated = false;
|
||||
pub fn call_destory(&self, handle: Handle) {
|
||||
let mut protect = self.protect.lock();
|
||||
for id in ids {
|
||||
updated |= protect.indexes.remove(&id).is_some();
|
||||
}
|
||||
if updated {
|
||||
if protect.indexes.remove(&handle).is_some() {
|
||||
protect.maintain(&self.view);
|
||||
}
|
||||
}
|
||||
pub fn call_stat(&self, id: Id) -> Result<IndexStat, FriendlyError> {
|
||||
pub fn call_stat(&self, handle: Handle) -> Result<IndexStat, FriendlyError> {
|
||||
let view = self.view.load_full();
|
||||
let index = view.indexes.get(&id).ok_or(FriendlyError::UnknownIndex)?;
|
||||
Ok(index.stat())
|
||||
let index = view
|
||||
.indexes
|
||||
.get(&handle)
|
||||
.ok_or(FriendlyError::UnknownIndex)?;
|
||||
index.stat()
|
||||
}
|
||||
pub fn get_instance(&self, id: Id) -> Result<Instance, FriendlyError> {
|
||||
pub fn get_instance(&self, handle: Handle) -> Result<Instance, FriendlyError> {
|
||||
let view = self.view.load_full();
|
||||
let index = view.indexes.get(&id).ok_or(FriendlyError::UnknownIndex)?;
|
||||
let index = view
|
||||
.indexes
|
||||
.get(&handle)
|
||||
.ok_or(FriendlyError::UnknownIndex)?;
|
||||
Ok(index.clone())
|
||||
}
|
||||
}
|
||||
|
||||
struct WorkerView {
|
||||
indexes: HashMap<Id, Instance>,
|
||||
indexes: HashMap<Handle, Instance>,
|
||||
}
|
||||
|
||||
struct WorkerProtect {
|
||||
startup: FileAtomic<WorkerStartup>,
|
||||
indexes: HashMap<Id, Instance>,
|
||||
indexes: HashMap<Handle, Instance>,
|
||||
}
|
||||
|
||||
impl WorkerProtect {
|
||||
fn maintain(&mut self, swap: &ArcSwap<WorkerView>) {
|
||||
let indexes = self
|
||||
.indexes
|
||||
.iter()
|
||||
.map(|(&k, v)| (k, v.options().clone()))
|
||||
.collect();
|
||||
let indexes = self.indexes.keys().copied().collect();
|
||||
self.startup.set(WorkerStartup { indexes });
|
||||
swap.swap(Arc::new(WorkerView {
|
||||
indexes: self.indexes.clone(),
|
||||
@ -167,17 +175,15 @@ impl WorkerProtect {
|
||||
}
|
||||
}
|
||||
|
||||
#[serde_with::serde_as]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
struct WorkerStartup {
|
||||
#[serde_as(as = "HashMap<DisplayFromStr, _>")]
|
||||
indexes: HashMap<Id, IndexOptions>,
|
||||
indexes: HashSet<Handle>,
|
||||
}
|
||||
|
||||
impl WorkerStartup {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
indexes: HashMap::new(),
|
||||
indexes: HashSet::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user