refactor, add heed, add migrate tool

This commit is contained in:
Jonathan de Jong 2021-07-30 20:58:22 +02:00
parent 03305cd144
commit df8d3c95de
9 changed files with 673 additions and 62 deletions

View file

@ -1,3 +1,4 @@
pub mod heed;
pub mod sled;
pub mod sqlite;
@ -8,7 +9,7 @@ pub type KVIter<'a> = Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>;
pub type TreeKVIter<'a> = Box<dyn Iterator<Item = (Vec<u8>, KVIter<'a>)> + 'a>;
pub trait Database {
fn iter<'a>(&'a self) -> TreeKVIter<'a>;
fn names<'a>(&'a self) -> Vec<Vec<u8>>;
fn segment<'a>(&'a mut self, name: Vec<u8>) -> Option<Box<dyn Segment + 'a>>; // change return type to Result
}
@ -18,25 +19,42 @@ pub trait Segment {
&'a mut self,
batch: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>,
) -> anyhow::Result<()>;
fn get_iter<'a>(&'a mut self) -> Box<dyn SegmentIter + 'a>;
}
pub trait SegmentIter {
fn iter<'a>(&'a mut self) -> KVIter<'a>;
}
pub fn copy_database(
src: &impl Database,
dst: &mut impl Database,
src: &mut dyn Database,
dst: &mut dyn Database,
chunk_size: usize,
) -> anyhow::Result<()> {
for (tree, i) in src.iter() {
dbg!(&tree);
// todo remove unwraps
for seg_name in src.names() {
drop(dbg!(String::from_utf8(seg_name.clone())));
let mut t = dst.segment(tree).unwrap(); // todo remove unwrap
let mut src_seg = src.segment(seg_name.clone()).unwrap();
let mut dst_seg = dst.segment(seg_name).unwrap();
let mut src_seg_iter = src_seg.get_iter();
let i = src_seg_iter.iter();
let mut x: usize = 0;
for chunk in &i.chunks(chunk_size) {
dbg!(&x);
t.batch_insert(Box::new(chunk))?;
dst_seg.batch_insert(Box::new(chunk))?;
x += chunk_size;
}
drop(dst_seg);
drop(src_seg_iter);
}
Ok(())

106
tools/iface/src/db/heed.rs Normal file
View file

@ -0,0 +1,106 @@
use super::{Database, KVIter, Segment, SegmentIter};
use heed::UntypedDatabase;
use itertools::Itertools;
use std::path::Path;
use thiserror::Error;
#[derive(Error, Debug)]
#[error("There was a problem with the connection to the heed database: {0}")]
pub struct HeedError(String);
impl From<heed::Error> for HeedError {
fn from(err: heed::Error) -> Self {
Self(err.to_string())
}
}
pub fn new_db<P: AsRef<Path>>(path: P) -> Result<heed::Env, HeedError> {
let mut env_builder = heed::EnvOpenOptions::new();
env_builder.map_size(1024 * 1024 * 1024); // 1 Terabyte
env_builder.max_readers(126);
env_builder.max_dbs(128);
Ok(env_builder.open(path)?)
}
pub struct HeedDB(heed::Env);
impl HeedDB {
pub fn new(env: heed::Env) -> Self {
Self(env)
}
}
impl Database for HeedDB {
fn segment<'a>(&'a mut self, name: Vec<u8>) -> Option<Box<dyn super::Segment + 'a>> {
let name = String::from_utf8(name).ok()?;
let db: UntypedDatabase = self.0.create_database(Some(name.as_str())).ok()?;
Some(Box::new(HeedSegment {
env: self.0.clone(),
db,
}))
}
fn names<'a>(&'a self) -> Vec<Vec<u8>> {
let db: UntypedDatabase = self.0.open_database(None).unwrap().unwrap();
let txn = self.0.read_txn().unwrap();
db.iter(&txn)
.unwrap()
.filter_map(|r| -> Option<(Vec<u8>, UntypedDatabase)> {
let (k, _) = r.ok()?;
let name = String::from_utf8(k.to_vec()).ok()?;
if let Some(db) = (self.0.open_database(Some(name.as_str()))).ok().flatten() {
Some((k.to_vec(), db))
} else {
None
}
})
.map(|(k, _)| k)
.collect_vec()
}
}
pub struct HeedSegment {
env: heed::Env,
db: heed::UntypedDatabase,
}
impl Segment for HeedSegment {
fn batch_insert<'a>(
&'a mut self,
batch: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>,
) -> anyhow::Result<()> {
let mut txn = self.env.write_txn().unwrap();
for (k, v) in batch {
self.db.put(&mut txn, &k.as_slice(), &v.as_slice()).unwrap();
}
txn.commit().unwrap();
Ok(())
}
fn get_iter<'a>(&'a mut self) -> Box<dyn super::SegmentIter + 'a> {
todo!()
}
}
struct HeedSegmentIter<'a>(heed::RoTxn<'a>, &'a heed::UntypedDatabase);
impl SegmentIter for HeedSegmentIter<'_> {
fn iter<'a>(&'a mut self) -> KVIter<'a> {
Box::new(self.1.iter(&self.0).unwrap().filter_map(|r| {
if let Ok(t) = r {
Some((t.0.to_vec(), t.1.to_vec()))
} else {
None
}
}))
}
}

View file

@ -1,8 +1,7 @@
use std::path::Path;
use super::{Database, KVIter, Segment, SegmentIter};
use itertools::Itertools;
use sled::{Batch, Config, Db, Result, Tree};
use super::{Database, KVIter, Segment, TreeKVIter};
use std::path::Path;
pub fn new_db<P: AsRef<Path>>(path: P) -> Result<Db> {
Config::default().path(path).use_compression(true).open()
@ -17,31 +16,12 @@ impl SledDB {
}
impl Database for SledDB {
fn iter<'a>(&'a self) -> TreeKVIter<'a> {
Box::new(
self.0
.tree_names()
.into_iter()
.map(|v| v.to_vec())
.filter_map(move |v| {
if let Ok(t) = self.0.open_tree(&v) {
Some((v, t))
} else {
None
}
})
.map(|(v, t): (Vec<u8>, Tree)| -> (Vec<u8>, KVIter<'a>) {
let i = t.into_iter().filter_map(|r| {
if let Ok(t) = r {
Some((t.0.to_vec(), t.1.to_vec()))
} else {
None
}
});
(v, Box::new(i))
}),
)
fn names<'a>(&'a self) -> Vec<Vec<u8>> {
self.0
.tree_names()
.into_iter()
.map(|v| v.to_vec())
.collect_vec()
}
fn segment(&mut self, name: Vec<u8>) -> Option<Box<dyn Segment>> {
@ -65,4 +45,22 @@ impl Segment for Tree {
self.apply_batch(sled_batch).map_err(Into::into)
}
fn get_iter<'a>(&'a mut self) -> Box<dyn super::SegmentIter + 'a> {
Box::new(SledTreeIter(self))
}
}
struct SledTreeIter<'a>(&'a mut Tree);
impl SegmentIter for SledTreeIter<'_> {
fn iter<'a>(&'a mut self) -> KVIter<'a> {
Box::new(self.0.iter().filter_map(|r| {
if let Ok(t) = r {
Some((t.0.to_vec(), t.1.to_vec()))
} else {
None
}
}))
}
}

View file

@ -1,8 +1,8 @@
use std::path::Path;
use itertools::Itertools;
use rusqlite::{self, Connection, DatabaseName::Main, Statement};
use std::{collections::HashSet, iter::FromIterator, path::Path};
use rusqlite::{self, Connection, DatabaseName::Main};
use super::{Database, Segment};
use super::{Database, KVIter, Segment, SegmentIter};
pub fn new_conn<P: AsRef<Path>>(path: P) -> rusqlite::Result<Connection> {
let path = path.as_ref().join("conduit.db");
@ -13,37 +13,71 @@ pub fn new_conn<P: AsRef<Path>>(path: P) -> rusqlite::Result<Connection> {
Ok(conn)
}
pub struct SqliteDB(Connection);
pub struct SqliteDB {
conn: Connection,
}
impl SqliteDB {
const CORRECT_TABLE_SET: &[&str] = &["key", "value"];
impl<'a> SqliteDB {
pub fn new(conn: Connection) -> Self {
Self(conn)
Self { conn }
}
fn valid_tables(&self) -> Vec<String> {
self.conn
.prepare("SELECT name FROM sqlite_master WHERE type='table'")
.unwrap()
.query_map([], |row| row.get(0))
.unwrap()
.map(|r| r.unwrap())
.filter(|a| self.test_table(a))
.collect()
}
fn test_table(&self, table: &String) -> bool {
let set: HashSet<String> = self
.conn
.prepare("SELECT name FROM pragma_table_info(?)")
.unwrap()
.query_map([table], |row| row.get(0))
.unwrap()
.map(|r| r.unwrap())
.collect();
set == HashSet::from_iter(CORRECT_TABLE_SET.iter().map(|s| s.to_string()))
}
}
impl Database for SqliteDB {
fn iter<'a>(&'a self) -> super::TreeKVIter<'a> {
todo!("iterate over tables, pick only tables that have columns 'key' and 'value', then iterate over that with values")
fn names<'a>(&'a self) -> Vec<Vec<u8>> {
self.valid_tables().into_iter().map_into().collect_vec()
}
fn segment<'a>(&'a mut self, name: Vec<u8>) -> Option<Box<dyn Segment + 'a>> {
let string = String::from_utf8(name).unwrap();
// taken from src/database/abstraction/sqlite.rs
self.0.execute(format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", &string).as_str(), []).unwrap();
self.conn.execute(format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", &string).as_str(), []).unwrap();
Some(Box::new(SqliteTable(&mut self.0, string)))
Some(Box::new(SqliteSegment {
conn: &mut self.conn,
name: string,
}))
}
}
pub struct SqliteTable<'a>(&'a mut Connection, String);
pub struct SqliteSegment<'a> {
conn: &'a mut Connection,
name: String,
}
impl Segment for SqliteTable<'_> {
impl Segment for SqliteSegment<'_> {
fn batch_insert(
&mut self,
batch: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + '_>,
) -> anyhow::Result<()> {
let tx = self.0.transaction()?;
let sql_s = format!("INSERT INTO {} (key, value) VALUES (?, ?)", &self.1);
let tx = self.conn.transaction()?;
let sql_s = format!("INSERT INTO {} (key, value) VALUES (?, ?)", &self.name);
let sql = sql_s.as_str();
for (k, v) in batch {
@ -52,4 +86,25 @@ impl Segment for SqliteTable<'_> {
tx.commit().map_err(Into::into)
}
fn get_iter(&mut self) -> Box<dyn super::SegmentIter + '_> {
Box::new(SqliteSegmentIter(
self.conn
.prepare(format!("SELECT key, value FROM {}", self.name).as_str())
.unwrap(),
))
}
}
struct SqliteSegmentIter<'a>(Statement<'a>);
impl SegmentIter for SqliteSegmentIter<'_> {
fn iter<'f>(&'f mut self) -> KVIter<'f> {
Box::new(
self.0
.query_map([], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
.unwrap()
.map(|r| r.unwrap()),
)
}
}