mirror of
https://github.com/ShadowJonathan/conduit_toolbox.git
synced 2025-06-08 02:17:04 +03:00
add rocksdb support
This commit is contained in:
parent
a04057ed4c
commit
e20e99493e
5 changed files with 249 additions and 3 deletions
|
@ -11,4 +11,5 @@ sled = { version = "0.34.6", features = ["compression", "no_metrics"] }
|
|||
rusqlite = { version = "0.25.3", features = ["bundled"] }
|
||||
anyhow = "1.0.42"
|
||||
heed = { git = "https://github.com/timokoesters/heed.git", rev = "f6f825da7fb2c758867e05ad973ef800a6fe1d5d" }
|
||||
thiserror = "1.0.26"
|
||||
thiserror = "1.0.26"
|
||||
rocksdb = { version = "0.16.0", features = ["multi-threaded-cf"] }
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
pub mod heed;
|
||||
pub mod rocksdb;
|
||||
pub mod sled;
|
||||
pub mod sqlite;
|
||||
|
||||
|
|
110
tools/iface/src/db/rocksdb.rs
Normal file
110
tools/iface/src/db/rocksdb.rs
Normal file
|
@ -0,0 +1,110 @@
|
|||
use std::path::Path;
|
||||
|
||||
use super::{Database, Segment};
|
||||
use rocksdb::{DBWithThreadMode, MultiThreaded};
|
||||
|
||||
pub fn new_conn<P: AsRef<Path>>(path: P) -> Result<RocksDB, rocksdb::Error> {
|
||||
let mut db_opts = rocksdb::Options::default();
|
||||
db_opts.create_if_missing(true);
|
||||
db_opts.set_max_open_files(16);
|
||||
db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
|
||||
db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy);
|
||||
db_opts.set_target_file_size_base(256 << 20);
|
||||
db_opts.set_write_buffer_size(256 << 20);
|
||||
|
||||
let mut block_based_options = rocksdb::BlockBasedOptions::default();
|
||||
block_based_options.set_block_size(512 << 10);
|
||||
db_opts.set_block_based_table_factory(&block_based_options);
|
||||
|
||||
let cfs = DBWithThreadMode::<MultiThreaded>::list_cf(&db_opts, &path).unwrap_or_default();
|
||||
|
||||
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
|
||||
&db_opts,
|
||||
&path,
|
||||
cfs.iter().map(|name| {
|
||||
let mut options = rocksdb::Options::default();
|
||||
let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1);
|
||||
options.set_prefix_extractor(prefix_extractor);
|
||||
|
||||
rocksdb::ColumnFamilyDescriptor::new(name, options)
|
||||
}),
|
||||
)?;
|
||||
|
||||
Ok(RocksDB {
|
||||
rocks: db,
|
||||
old_cfs: cfs,
|
||||
})
|
||||
}
|
||||
|
||||
pub struct RocksDB {
|
||||
rocks: DBWithThreadMode<MultiThreaded>,
|
||||
old_cfs: Vec<String>,
|
||||
}
|
||||
|
||||
impl Database for RocksDB {
|
||||
fn segment<'a>(&'a mut self, name: Vec<u8>) -> Option<Box<dyn Segment + 'a>> {
|
||||
let string = String::from_utf8(name).unwrap();
|
||||
|
||||
if !self.old_cfs.contains(&string) {
|
||||
// Create if it didn't exist
|
||||
let mut options = rocksdb::Options::default();
|
||||
let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1);
|
||||
options.set_prefix_extractor(prefix_extractor);
|
||||
|
||||
let _ = self.rocks.create_cf(&string, &options);
|
||||
println!("created cf");
|
||||
}
|
||||
|
||||
Some(Box::new(RocksDBCF {
|
||||
db: self,
|
||||
name: string,
|
||||
}))
|
||||
}
|
||||
|
||||
fn names<'a>(&'a self) -> Vec<Vec<u8>> {
|
||||
self.old_cfs.iter().map(|v| v.as_bytes().to_vec()).collect()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RocksDBCF<'a> {
|
||||
db: &'a mut RocksDB,
|
||||
name: String,
|
||||
}
|
||||
|
||||
impl RocksDBCF<'_> {
|
||||
fn cf(&self) -> rocksdb::BoundColumnFamily<'_> {
|
||||
self.db.rocks.cf_handle(&self.name).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'r> Segment for RocksDBCF<'r> {
|
||||
fn batch_insert<'a>(
|
||||
&'a mut self,
|
||||
batch: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>,
|
||||
) -> anyhow::Result<()> {
|
||||
let cf = self.cf();
|
||||
for (key, value) in batch {
|
||||
self.db.rocks.put_cf(cf, key, value)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_iter(&mut self) -> Box<dyn super::SegmentIter + '_> {
|
||||
Box::new(RocksDBCFIter(self))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RocksDBCFIter<'a>(&'a RocksDBCF<'a>);
|
||||
|
||||
impl super::SegmentIter for RocksDBCFIter<'_> {
|
||||
fn iter<'a>(&'a mut self) -> super::KVIter<'a> {
|
||||
Box::new(
|
||||
self.0
|
||||
.db
|
||||
.rocks
|
||||
.iterator_cf(self.0.cf(), rocksdb::IteratorMode::Start)
|
||||
.map(|(k, v)| (Vec::from(k), Vec::from(v))),
|
||||
)
|
||||
}
|
||||
}
|
|
@ -1,5 +1,7 @@
|
|||
use clap::{App, Arg};
|
||||
use conduit_iface::db::{self, copy_database, heed::HeedDB, sled::SledDB, sqlite::SqliteDB};
|
||||
use conduit_iface::db::{
|
||||
self, copy_database, heed::HeedDB, rocksdb::RocksDB, sled::SledDB, sqlite::SqliteDB,
|
||||
};
|
||||
use std::{
|
||||
ops::{Deref, DerefMut},
|
||||
path::{Path, PathBuf},
|
||||
|
@ -9,6 +11,7 @@ enum Database {
|
|||
Sled(SledDB),
|
||||
Sqlite(SqliteDB),
|
||||
Heed(HeedDB),
|
||||
Rocks(RocksDB),
|
||||
}
|
||||
|
||||
impl Database {
|
||||
|
@ -17,6 +20,7 @@ impl Database {
|
|||
"sled" => Self::Sled(SledDB::new(db::sled::new_db(path)?)),
|
||||
"heed" => Self::Heed(HeedDB::new(db::heed::new_db(path)?)),
|
||||
"sqlite" => Self::Sqlite(SqliteDB::new(db::sqlite::new_conn(path)?)),
|
||||
"rocks" => Self::Rocks(db::rocksdb::new_conn(path)?),
|
||||
_ => panic!("unknown database type: {}", name),
|
||||
})
|
||||
}
|
||||
|
@ -30,6 +34,7 @@ impl Deref for Database {
|
|||
Database::Sled(db) => db,
|
||||
Database::Sqlite(db) => db,
|
||||
Database::Heed(db) => db,
|
||||
Database::Rocks(db) => db,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -40,11 +45,12 @@ impl DerefMut for Database {
|
|||
Database::Sled(db) => db,
|
||||
Database::Sqlite(db) => db,
|
||||
Database::Heed(db) => db,
|
||||
Database::Rocks(db) => db,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const DATABASES: &[&str] = &["heed", "sqlite", "sled"];
|
||||
const DATABASES: &[&str] = &["heed", "sqlite", "sled", "rocks"];
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
let matches = App::new("Conduit Sled to Sqlite Migrator")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue