mirror of
https://github.com/fluencelabs/llamadb
synced 2025-05-16 17:21:21 +00:00
Query plan execution now works, for the most part
This commit is contained in:
parent
5ce2f7e56f
commit
372b0fca42
@ -2,7 +2,7 @@ use databaseinfo::DatabaseInfo;
|
||||
|
||||
pub trait DatabaseStorage {
|
||||
type Info: DatabaseInfo;
|
||||
type ScanTableRowIterator: Iterator<Item=Box<[<Self::Info as DatabaseInfo>::ColumnValue]>>;
|
||||
|
||||
fn scan_table(&self, table: &<Self::Info as DatabaseInfo>::Table) -> Self::ScanTableRowIterator;
|
||||
fn scan_table<'a>(&'a self, table: &'a <Self::Info as DatabaseInfo>::Table)
|
||||
-> Box<Iterator<Item=Box<[<Self::Info as DatabaseInfo>::ColumnValue]>> + 'a>;
|
||||
}
|
||||
|
18
src/lib.rs
18
src/lib.rs
@ -1,25 +1,27 @@
|
||||
#![feature(core, old_io, old_path, os, collections, unicode)]
|
||||
#![feature(core, old_io, collections)]
|
||||
#![allow(unused_variables, dead_code)]
|
||||
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
|
||||
pub mod btree;
|
||||
pub mod pager;
|
||||
pub mod pagermemory;
|
||||
pub mod pagerstream;
|
||||
// pub mod btree;
|
||||
// pub mod pager;
|
||||
// pub mod pagermemory;
|
||||
// pub mod pagerstream;
|
||||
pub mod sqlsyntax;
|
||||
pub mod tempdb;
|
||||
|
||||
mod byteutils;
|
||||
mod columnvalueops;
|
||||
mod databaseinfo;
|
||||
mod databasestorage;
|
||||
mod identifier;
|
||||
mod queryplan;
|
||||
mod types;
|
||||
|
||||
pub use self::pager::Pager;
|
||||
pub use self::pagermemory::PagerMemory;
|
||||
pub use self::pagerstream::PagerStream;
|
||||
// pub use self::pager::Pager;
|
||||
// pub use self::pagermemory::PagerMemory;
|
||||
// pub use self::pagerstream::PagerStream;
|
||||
|
||||
pub enum SQLError {
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
use columnvalueops::ColumnValueOps;
|
||||
use databaseinfo::DatabaseInfo;
|
||||
use databasestorage::DatabaseStorage;
|
||||
use super::sexpression::SExpression;
|
||||
use super::sexpression::{BinaryOp, SExpression};
|
||||
|
||||
struct Source<'a, ColumnValue: Sized + 'static> {
|
||||
parent: Option<&'a Source<'a, ColumnValue>>,
|
||||
@ -41,17 +41,17 @@ where <Storage::Info as DatabaseInfo>::Table: 'a
|
||||
}
|
||||
}
|
||||
|
||||
pub fn execute_query_plan<'b, R>(&self, expr: &SExpression<'a, Storage::Info>, result_cb: R)
|
||||
pub fn execute_query_plan<'b, 'c>(&self, expr: &SExpression<'a, Storage::Info>,
|
||||
result_cb: &'c mut FnMut(&[<Storage::Info as DatabaseInfo>::ColumnValue]) -> Result<(), ()>)
|
||||
-> Result<(), ()>
|
||||
where R: Fn(&[<Storage::Info as DatabaseInfo>::ColumnValue]) -> Result<(), ()>
|
||||
{
|
||||
self.execute(expr, &result_cb, None)
|
||||
self.execute(expr, result_cb, None)
|
||||
}
|
||||
|
||||
fn execute<'b, 'c, R>(&self, expr: &SExpression<'a, Storage::Info>, result_cb: &'c R,
|
||||
fn execute<'b, 'c>(&self, expr: &SExpression<'a, Storage::Info>,
|
||||
result_cb: &'c mut FnMut(&[<Storage::Info as DatabaseInfo>::ColumnValue]) -> Result<(), ()>,
|
||||
source: Option<&Source<'b, <Storage::Info as DatabaseInfo>::ColumnValue>>)
|
||||
-> Result<(), ()>
|
||||
where R: Fn(&[<Storage::Info as DatabaseInfo>::ColumnValue]) -> Result<(), ()>
|
||||
{
|
||||
match expr {
|
||||
&SExpression::Scan { table, source_id, ref yield_fn } => {
|
||||
@ -68,7 +68,7 @@ where <Storage::Info as DatabaseInfo>::Table: 'a
|
||||
Ok(())
|
||||
},
|
||||
&SExpression::Map { source_id, ref yield_in_fn, ref yield_out_fn } => {
|
||||
self.execute(yield_in_fn, &|row| {
|
||||
self.execute(yield_in_fn, &mut |row| {
|
||||
let new_source = Source {
|
||||
parent: source,
|
||||
source_id: source_id,
|
||||
@ -122,11 +122,44 @@ where <Storage::Info as DatabaseInfo>::Table: 'a
|
||||
let r = try!(self.resolve_value(rhs, source));
|
||||
|
||||
Ok(match op {
|
||||
// Equal => l.equal(&r),
|
||||
// NotEqual => l.not_equal(&r),
|
||||
BinaryOp::Equal => l.equals(&r),
|
||||
BinaryOp::NotEqual => l.not_equals(&r),
|
||||
BinaryOp::And => l.and(&r),
|
||||
BinaryOp::Or => l.or(&r),
|
||||
BinaryOp::Concatenate => l.concat(&r),
|
||||
_ => unimplemented!()
|
||||
})
|
||||
},
|
||||
&SExpression::Map { source_id, ref yield_in_fn, ref yield_out_fn } => {
|
||||
trace!("resolve_value; map {}", source_id);
|
||||
|
||||
// yield_in_fn is expected to yield exactly one row
|
||||
// yield_out_fn is expected to return a single resolved value
|
||||
let mut r = None;
|
||||
let mut row_count = 0;
|
||||
|
||||
try!(self.execute(yield_in_fn, &mut |row| {
|
||||
if row_count == 0 {
|
||||
r = Some(row.to_vec());
|
||||
}
|
||||
row_count += 1;
|
||||
Ok(())
|
||||
}, source));
|
||||
|
||||
if row_count == 1 {
|
||||
let row = r.unwrap();
|
||||
|
||||
let new_source = Source {
|
||||
parent: source,
|
||||
source_id: source_id,
|
||||
row: &row
|
||||
};
|
||||
|
||||
self.resolve_value(yield_out_fn, Some(&new_source))
|
||||
} else {
|
||||
Err(())
|
||||
}
|
||||
},
|
||||
_ => Err(())
|
||||
}
|
||||
}
|
||||
|
@ -6,7 +6,9 @@
|
||||
use std::borrow::Cow;
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
use columnvalueops::ColumnValueOps;
|
||||
use databaseinfo::{DatabaseInfo, TableInfo, ColumnInfo};
|
||||
use databasestorage::DatabaseStorage;
|
||||
use identifier::Identifier;
|
||||
use types::{DbType, Variant};
|
||||
use sqlsyntax::ast;
|
||||
@ -39,6 +41,60 @@ impl DatabaseInfo for TempDb {
|
||||
}
|
||||
}
|
||||
|
||||
impl DatabaseStorage for TempDb {
|
||||
type Info = TempDb;
|
||||
|
||||
fn scan_table<'a>(&'a self, table: &'a Table)
|
||||
-> Box<Iterator<Item=Box<[Variant]>> + 'a>
|
||||
{
|
||||
let columns: &'a [self::table::Column] = &table.columns;
|
||||
|
||||
Box::new(table.rowid_index.iter().map(move |key_v| {
|
||||
use byteutils;
|
||||
use std::borrow::IntoCow;
|
||||
|
||||
let raw_key: &[u8] = &key_v;
|
||||
trace!("KEY: {:?}", raw_key);
|
||||
|
||||
let variable_column_count = columns.iter().filter(|column| {
|
||||
column.dbtype.is_variable_length()
|
||||
}).count();
|
||||
|
||||
let variable_lengths: Vec<_> = (0..variable_column_count).map(|i| {
|
||||
let o = raw_key.len() - (variable_column_count + i) * 8;
|
||||
byteutils::read_udbinteger(&raw_key[o..o+8])
|
||||
}).collect();
|
||||
|
||||
trace!("variable lengths: {:?}", variable_lengths);
|
||||
|
||||
let _rowid: u64 = byteutils::read_udbinteger(&raw_key[0..8]);
|
||||
|
||||
let mut variable_length_offset = 0;
|
||||
let mut key_offset = 8;
|
||||
|
||||
let v: Vec<_> = table.columns.iter().map(|column| {
|
||||
let size = match column.dbtype.get_fixed_length() {
|
||||
Some(l) => l as usize,
|
||||
None => {
|
||||
let l = variable_lengths[variable_length_offset];
|
||||
variable_length_offset += 1;
|
||||
l as usize
|
||||
}
|
||||
};
|
||||
|
||||
let bytes = &raw_key[key_offset..key_offset + size];
|
||||
|
||||
trace!("from bytes: {:?}, {:?}", column.dbtype, bytes);
|
||||
let value = ColumnValueOps::from_bytes(column.dbtype, bytes.into_cow()).unwrap();
|
||||
key_offset += size;
|
||||
value
|
||||
}).collect();
|
||||
|
||||
v.into_boxed_slice()
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl TempDb {
|
||||
pub fn new() -> TempDb {
|
||||
TempDb {
|
||||
@ -163,12 +219,19 @@ impl TempDb {
|
||||
}
|
||||
|
||||
fn select(&self, stmt: ast::SelectStatement) -> ExecuteStatementResult {
|
||||
use queryplan::QueryPlan;
|
||||
use queryplan::{ExecuteQueryPlan, QueryPlan};
|
||||
|
||||
let plan = try!(QueryPlan::compile_select(self, stmt).map_err(|e| format!("{}", e)));
|
||||
|
||||
debug!("{}", plan);
|
||||
|
||||
let execute = ExecuteQueryPlan::new(self);
|
||||
let result = execute.execute_query_plan(&plan.expr, &mut |rows| {
|
||||
debug!("ROW: {:?}", rows);
|
||||
Ok(())
|
||||
});
|
||||
|
||||
trace!("{:?}", result);
|
||||
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
@ -203,7 +266,6 @@ impl TempDb {
|
||||
|
||||
fn ast_expression_to_data(expr: &ast::Expression, column_type: DbType, buf: &mut Vec<u8>) {
|
||||
use sqlsyntax::ast::Expression::*;
|
||||
use columnvalueops::ColumnValueOps;
|
||||
use std::borrow::IntoCow;
|
||||
|
||||
let value: Variant = match expr {
|
||||
|
@ -97,14 +97,21 @@ impl DbType {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_variable_length(&self) -> bool {
|
||||
pub fn get_fixed_length(&self) -> Option<u64> {
|
||||
match self {
|
||||
&DbType::Null => false,
|
||||
&DbType::ByteDynamic => true,
|
||||
&DbType::ByteFixed(_) => false,
|
||||
&DbType::Integer {..} => false,
|
||||
&DbType::F64 => false,
|
||||
&DbType::String => true
|
||||
&DbType::Null => Some(0),
|
||||
&DbType::ByteDynamic => None,
|
||||
&DbType::ByteFixed(n) => Some(n),
|
||||
&DbType::Integer { bytes, ..} => Some(bytes as u64),
|
||||
&DbType::F64 => Some(8),
|
||||
&DbType::String => None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_variable_length(&self) -> bool {
|
||||
match self.get_fixed_length() {
|
||||
Some(_) => false,
|
||||
None => true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -27,6 +27,12 @@ impl fmt::Display for Variant {
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Variant {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||
write!(f, "{}", self)
|
||||
}
|
||||
}
|
||||
|
||||
fn from_bool(value: bool) -> Variant {
|
||||
Variant::UnsignedInteger(if value { 1 } else { 0 })
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user