mirror of
https://github.com/ShadowJonathan/conduit_toolbox.git
synced 2025-07-13 05:18:16 +03:00
Merge pull request #11 from CobaltCause/discard-malformed-sqlite-rows
This commit is contained in:
commit
82c4c82b43
3 changed files with 60 additions and 13 deletions
|
@ -15,6 +15,11 @@ pub type KVIter<'a> = Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>;
|
||||||
|
|
||||||
pub type TreeKVIter<'a> = Box<dyn Iterator<Item = (Vec<u8>, KVIter<'a>)> + 'a>;
|
pub type TreeKVIter<'a> = Box<dyn Iterator<Item = (Vec<u8>, KVIter<'a>)> + 'a>;
|
||||||
|
|
||||||
|
#[derive(Clone, Copy)]
|
||||||
|
pub struct Config {
|
||||||
|
ignore_broken_rows: bool,
|
||||||
|
}
|
||||||
|
|
||||||
pub trait Database {
|
pub trait Database {
|
||||||
fn names<'a>(&'a self) -> Vec<Vec<u8>>;
|
fn names<'a>(&'a self) -> Vec<Vec<u8>>;
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,7 @@ use itertools::Itertools;
|
||||||
use rusqlite::{self, Connection, DatabaseName::Main, Statement};
|
use rusqlite::{self, Connection, DatabaseName::Main, Statement};
|
||||||
use std::{collections::HashSet, iter::FromIterator, path::Path};
|
use std::{collections::HashSet, iter::FromIterator, path::Path};
|
||||||
|
|
||||||
use super::{Database, KVIter, Segment, SegmentIter};
|
use super::{Config, Database, KVIter, Segment, SegmentIter};
|
||||||
|
|
||||||
pub fn new_conn<P: AsRef<Path>>(path: P) -> rusqlite::Result<Connection> {
|
pub fn new_conn<P: AsRef<Path>>(path: P) -> rusqlite::Result<Connection> {
|
||||||
let path = path.as_ref().join("conduit.db");
|
let path = path.as_ref().join("conduit.db");
|
||||||
|
@ -15,13 +15,14 @@ pub fn new_conn<P: AsRef<Path>>(path: P) -> rusqlite::Result<Connection> {
|
||||||
|
|
||||||
pub struct SqliteDB {
|
pub struct SqliteDB {
|
||||||
conn: Connection,
|
conn: Connection,
|
||||||
|
config: Config,
|
||||||
}
|
}
|
||||||
|
|
||||||
const CORRECT_TABLE_SET: &[&str] = &["key", "value"];
|
const CORRECT_TABLE_SET: &[&str] = &["key", "value"];
|
||||||
|
|
||||||
impl<'a> SqliteDB {
|
impl<'a> SqliteDB {
|
||||||
pub fn new(conn: Connection) -> Self {
|
pub fn new(conn: Connection, config: Config) -> Self {
|
||||||
Self { conn }
|
Self { conn, config }
|
||||||
}
|
}
|
||||||
|
|
||||||
fn valid_tables(&self) -> Vec<String> {
|
fn valid_tables(&self) -> Vec<String> {
|
||||||
|
@ -62,6 +63,7 @@ impl Database for SqliteDB {
|
||||||
Some(Box::new(SqliteSegment {
|
Some(Box::new(SqliteSegment {
|
||||||
conn: &mut self.conn,
|
conn: &mut self.conn,
|
||||||
name: string,
|
name: string,
|
||||||
|
config: self.config,
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,6 +75,7 @@ impl Database for SqliteDB {
|
||||||
pub struct SqliteSegment<'a> {
|
pub struct SqliteSegment<'a> {
|
||||||
conn: &'a mut Connection,
|
conn: &'a mut Connection,
|
||||||
name: String,
|
name: String,
|
||||||
|
config: Config,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Segment for SqliteSegment<'_> {
|
impl Segment for SqliteSegment<'_> {
|
||||||
|
@ -92,23 +95,52 @@ impl Segment for SqliteSegment<'_> {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_iter(&mut self) -> Box<dyn super::SegmentIter + '_> {
|
fn get_iter(&mut self) -> Box<dyn super::SegmentIter + '_> {
|
||||||
Box::new(SqliteSegmentIter(
|
Box::new(SqliteSegmentIter {
|
||||||
self.conn
|
statement: self
|
||||||
|
.conn
|
||||||
.prepare(format!("SELECT key, value FROM {}", self.name).as_str())
|
.prepare(format!("SELECT key, value FROM {}", self.name).as_str())
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
))
|
config: self.config,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct SqliteSegmentIter<'a>(Statement<'a>);
|
struct SqliteSegmentIter<'a> {
|
||||||
|
statement: Statement<'a>,
|
||||||
|
config: Config,
|
||||||
|
}
|
||||||
|
|
||||||
impl SegmentIter for SqliteSegmentIter<'_> {
|
impl SegmentIter for SqliteSegmentIter<'_> {
|
||||||
fn iter<'f>(&'f mut self) -> KVIter<'f> {
|
fn iter<'f>(&'f mut self) -> KVIter<'f> {
|
||||||
|
let config = self.config;
|
||||||
|
|
||||||
Box::new(
|
Box::new(
|
||||||
self.0
|
self.statement
|
||||||
.query_map([], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
|
.query_map([], |row| Ok((row.get(0), row.get(1))))
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.map(|r| r.unwrap()),
|
.map(|x| x.unwrap())
|
||||||
|
.filter_map(move |(k, v)| {
|
||||||
|
let advice = "You could try using `--ignore-broken-rows` to complete the migration, but take note of its caveats.";
|
||||||
|
let Ok(k) = k else {
|
||||||
|
if config.ignore_broken_rows {
|
||||||
|
println!("ignored a row because its key is malformed");
|
||||||
|
} else {
|
||||||
|
panic!("This row has a malformed key. {}", advice);
|
||||||
|
}
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
|
||||||
|
let Ok(v) = v else {
|
||||||
|
if config.ignore_broken_rows {
|
||||||
|
println!("ignored a row because its value is malformed");
|
||||||
|
} else {
|
||||||
|
panic!("This row has a malformed value. {}", advice);
|
||||||
|
}
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
|
||||||
|
Some((k, v))
|
||||||
|
}),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use clap::{App, Arg};
|
use clap::{App, Arg};
|
||||||
use conduit_iface::db::{self, copy_database};
|
use conduit_iface::db::{self, copy_database, Config};
|
||||||
use std::{
|
use std::{
|
||||||
ops::{Deref, DerefMut},
|
ops::{Deref, DerefMut},
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
|
@ -19,14 +19,17 @@ enum Database {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Database {
|
impl Database {
|
||||||
fn new(name: &str, path: PathBuf) -> anyhow::Result<Self> {
|
fn new(name: &str, path: PathBuf, config: Config) -> anyhow::Result<Self> {
|
||||||
Ok(match name {
|
Ok(match name {
|
||||||
#[cfg(feature = "sled")]
|
#[cfg(feature = "sled")]
|
||||||
"sled" => Self::Sled(db::sled::SledDB::new(db::sled::new_db(path)?)),
|
"sled" => Self::Sled(db::sled::SledDB::new(db::sled::new_db(path)?)),
|
||||||
#[cfg(feature = "heed")]
|
#[cfg(feature = "heed")]
|
||||||
"heed" => Self::Heed(db::heed::HeedDB::new(db::heed::new_db(path)?)),
|
"heed" => Self::Heed(db::heed::HeedDB::new(db::heed::new_db(path)?)),
|
||||||
#[cfg(feature = "sqlite")]
|
#[cfg(feature = "sqlite")]
|
||||||
"sqlite" => Self::Sqlite(db::sqlite::SqliteDB::new(db::sqlite::new_conn(path)?)),
|
"sqlite" => Self::Sqlite(db::sqlite::SqliteDB::new(
|
||||||
|
db::sqlite::new_conn(path)?,
|
||||||
|
config,
|
||||||
|
)),
|
||||||
#[cfg(feature = "rocksdb")]
|
#[cfg(feature = "rocksdb")]
|
||||||
"rocks" => Self::Rocks(db::rocksdb::new_conn(path)?),
|
"rocks" => Self::Rocks(db::rocksdb::new_conn(path)?),
|
||||||
#[cfg(feature = "persy")]
|
#[cfg(feature = "persy")]
|
||||||
|
@ -129,6 +132,11 @@ fn main() -> anyhow::Result<()> {
|
||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
.required(true),
|
.required(true),
|
||||||
)
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("ignore_broken_rows")
|
||||||
|
.long("ignore-broken-rows")
|
||||||
|
.long_help("Lossy migration methodology if parts of the database are malformed due to e.g. improper manual database surgery. Currently only applies to SQLite.")
|
||||||
|
)
|
||||||
.get_matches();
|
.get_matches();
|
||||||
|
|
||||||
let src_dir = matches.value_of("from_dir").unwrap_or(".");
|
let src_dir = matches.value_of("from_dir").unwrap_or(".");
|
||||||
|
@ -155,6 +163,8 @@ fn main() -> anyhow::Result<()> {
|
||||||
|
|
||||||
dbg!(&src_dir, &dst_dir);
|
dbg!(&src_dir, &dst_dir);
|
||||||
|
|
||||||
|
let ignore_broken_rows = matches.is_present("ignore_broken_rows");
|
||||||
|
|
||||||
let mut src_db = Database::new(matches.value_of("from").unwrap(), src_dir)?;
|
let mut src_db = Database::new(matches.value_of("from").unwrap(), src_dir)?;
|
||||||
|
|
||||||
let mut dst_db = Database::new(matches.value_of("to").unwrap(), dst_dir)?;
|
let mut dst_db = Database::new(matches.value_of("to").unwrap(), dst_dir)?;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue