mirror of
https://github.com/ShadowJonathan/conduit_toolbox.git
synced 2025-07-25 21:04:08 +03:00
update for rocksdb fuckery
This commit is contained in:
parent
3c9fbaa8c9
commit
5d76bfd26e
3 changed files with 119 additions and 122 deletions
|
@ -12,4 +12,4 @@ rusqlite = { version = "0.25.3", features = ["bundled"] }
|
|||
anyhow = "1.0.42"
|
||||
heed = { git = "https://github.com/timokoesters/heed.git", rev = "f6f825da7fb2c758867e05ad973ef800a6fe1d5d" }
|
||||
thiserror = "1.0.26"
|
||||
rocksdb = { version = "0.16.0", features = ["multi-threaded-cf"] }
|
||||
rocksdb = { version = "0.17.0", features = ["multi-threaded-cf", "zstd"] }
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
use std::path::Path;
|
||||
use std::{path::Path, sync::Arc};
|
||||
|
||||
use super::{Database, Segment};
|
||||
use rocksdb::{DBWithThreadMode, MultiThreaded};
|
||||
|
||||
pub fn new_conn<P: AsRef<Path>>(path: P) -> Result<RocksDB, rocksdb::Error> {
|
||||
pub fn options() -> rocksdb::Options {
|
||||
let mut db_opts = rocksdb::Options::default();
|
||||
|
||||
db_opts.create_if_missing(true);
|
||||
|
@ -21,10 +21,16 @@ pub fn new_conn<P: AsRef<Path>>(path: P) -> Result<RocksDB, rocksdb::Error> {
|
|||
block_based_options.set_cache_index_and_filter_blocks(true);
|
||||
db_opts.set_block_based_table_factory(&block_based_options);
|
||||
|
||||
let cfs = DBWithThreadMode::<MultiThreaded>::list_cf(&db_opts, &path).unwrap_or_default();
|
||||
db_opts
|
||||
}
|
||||
|
||||
pub fn new_conn<P: AsRef<Path>>(path: P) -> Result<RocksDB, rocksdb::Error> {
|
||||
let opts = options();
|
||||
|
||||
let cfs = DBWithThreadMode::<MultiThreaded>::list_cf(&opts, &path).unwrap_or_default();
|
||||
|
||||
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
|
||||
&db_opts,
|
||||
&opts,
|
||||
&path,
|
||||
cfs.iter().map(|name| {
|
||||
let mut options = rocksdb::Options::default();
|
||||
|
@ -50,9 +56,10 @@ impl Database for RocksDB {
|
|||
fn segment<'a>(&'a mut self, name: Vec<u8>) -> Option<Box<dyn Segment + 'a>> {
|
||||
let string = String::from_utf8(name).unwrap();
|
||||
|
||||
// Create if it didn't exist
|
||||
if !self.old_cfs.contains(&string) {
|
||||
// Create if it didn't exist
|
||||
let mut options = rocksdb::Options::default();
|
||||
let mut options = options();
|
||||
|
||||
let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1);
|
||||
options.set_prefix_extractor(prefix_extractor);
|
||||
|
||||
|
@ -67,7 +74,11 @@ impl Database for RocksDB {
|
|||
}
|
||||
|
||||
fn names<'a>(&'a self) -> Vec<Vec<u8>> {
|
||||
self.old_cfs.iter().filter(|&v| &*v != "default").map(|v| v.as_bytes().to_vec()).collect()
|
||||
self.old_cfs
|
||||
.iter()
|
||||
.filter(|&v| &*v != "default")
|
||||
.map(|v| v.as_bytes().to_vec())
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -84,7 +95,7 @@ pub struct RocksDBCF<'a> {
|
|||
}
|
||||
|
||||
impl RocksDBCF<'_> {
|
||||
fn cf(&self) -> rocksdb::BoundColumnFamily<'_> {
|
||||
fn cf(&self) -> Arc<rocksdb::BoundColumnFamily<'_>> {
|
||||
self.db.rocks.cf_handle(&self.name).unwrap()
|
||||
}
|
||||
}
|
||||
|
@ -96,7 +107,7 @@ impl<'r> Segment for RocksDBCF<'r> {
|
|||
) -> anyhow::Result<()> {
|
||||
let cf = self.cf();
|
||||
for (key, value) in batch {
|
||||
self.db.rocks.put_cf(cf, key, value)?;
|
||||
self.db.rocks.put_cf(&cf, key, value)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -115,7 +126,7 @@ impl super::SegmentIter for RocksDBCFIter<'_> {
|
|||
self.0
|
||||
.db
|
||||
.rocks
|
||||
.iterator_cf(self.0.cf(), rocksdb::IteratorMode::Start)
|
||||
.iterator_cf(&self.0.cf(), rocksdb::IteratorMode::Start)
|
||||
.map(|(k, v)| (Vec::from(k), Vec::from(v))),
|
||||
)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue