Skip to content

Commit

Permalink
table: prepare for adding table impl (#19)
Browse files Browse the repository at this point in the history
* fix typo of Comparator
* use little endian in header
* use little endian when encoding offset
* use big endian in builder length

Signed-off-by: Alex Chi <[email protected]>
  • Loading branch information
skyzh committed Oct 20, 2020
1 parent 38e99a8 commit dbfc35e
Show file tree
Hide file tree
Showing 11 changed files with 130 additions and 37 deletions.
2 changes: 2 additions & 0 deletions proto/src/proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ message TableIndex {
repeated BlockOffset offsets = 1;
bytes bloom_filter = 2;
uint32 estimated_size = 3;
uint64 max_version = 4;
uint32 key_count = 5;
}

message Checksum {
Expand Down
6 changes: 3 additions & 3 deletions skiplist/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tikv_jemallocator::Jemalloc;
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

fn skiplist_round(l: &Skiplist<FixedLengthSuffixComparitor>, case: &(Bytes, bool), exp: &Bytes) {
fn skiplist_round(l: &Skiplist<FixedLengthSuffixComparator>, case: &(Bytes, bool), exp: &Bytes) {
if case.1 {
if let Some(v) = l.get(&case.0) {
assert_eq!(v, exp);
Expand All @@ -42,7 +42,7 @@ fn random_key(rng: &mut ThreadRng) -> Bytes {
fn bench_read_write_skiplist_frac(b: &mut Bencher<'_>, frac: &usize) {
let frac = *frac;
let value = Bytes::from_static(b"00123");
let comp = FixedLengthSuffixComparitor::new(8);
let comp = FixedLengthSuffixComparator::new(8);
let list = Skiplist::with_capacity(comp, 512 << 20);
let l = list.clone();
let stop = Arc::new(AtomicBool::new(false));
Expand Down Expand Up @@ -134,7 +134,7 @@ fn bench_read_write_map(c: &mut Criterion) {
}

fn bench_write_skiplist(c: &mut Criterion) {
let comp = FixedLengthSuffixComparitor::new(8);
let comp = FixedLengthSuffixComparator::new(8);
let list = Skiplist::with_capacity(comp, 512 << 21);
let value = Bytes::from_static(b"00123");
let l = list.clone();
Expand Down
27 changes: 21 additions & 6 deletions skiplist/src/key.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,39 @@
use bytes::Bytes;
use std::cmp::Ordering;

pub trait KeyComparitor {
pub trait KeyComparator {
fn compare_key(&self, lhs: &[u8], rhs: &[u8]) -> Ordering;
fn same_key(&self, lhs: &[u8], rhs: &[u8]) -> bool;
}

#[derive(Default, Debug, Clone, Copy)]
pub struct FixedLengthSuffixComparitor {
pub struct FixedLengthSuffixComparator {
len: usize,
}

impl FixedLengthSuffixComparitor {
pub fn new(len: usize) -> FixedLengthSuffixComparitor {
FixedLengthSuffixComparitor { len }
impl FixedLengthSuffixComparator {
pub const fn new(len: usize) -> FixedLengthSuffixComparator {
FixedLengthSuffixComparator { len }
}
}

impl KeyComparitor for FixedLengthSuffixComparitor {
impl KeyComparator for FixedLengthSuffixComparator {
#[inline]
fn compare_key(&self, lhs: &[u8], rhs: &[u8]) -> Ordering {
if lhs.len() < self.len {
panic!(
"cannot compare with suffix {}: {:?}",
self.len,
Bytes::copy_from_slice(lhs)
);
}
if rhs.len() < self.len {
panic!(
"cannot compare with suffix {}: {:?}",
self.len,
Bytes::copy_from_slice(rhs)
);
}
let (l_p, l_s) = lhs.split_at(lhs.len() - self.len);
let (r_p, r_s) = rhs.split_at(rhs.len() - self.len);
let res = l_p.cmp(r_p);
Expand Down
2 changes: 1 addition & 1 deletion skiplist/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ mod list;

const MAX_HEIGHT: usize = 20;

pub use key::{FixedLengthSuffixComparitor, KeyComparitor};
pub use key::{FixedLengthSuffixComparator, KeyComparator};
pub use list::Skiplist;
10 changes: 5 additions & 5 deletions skiplist/src/list.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::arena::Arena;
use super::KeyComparitor;
use super::KeyComparator;
use super::MAX_HEIGHT;
use bytes::Bytes;
use rand::Rng;
Expand Down Expand Up @@ -85,7 +85,7 @@ impl<C> Skiplist<C> {
}
}

impl<C: KeyComparitor> Skiplist<C> {
impl<C: KeyComparator> Skiplist<C> {
unsafe fn find_near(&self, key: &[u8], less: bool, allow_equal: bool) -> *const Node {
let mut cursor: *const Node = self.core.head.as_ptr();
let mut level = self.height();
Expand Down Expand Up @@ -330,7 +330,7 @@ pub struct IterRef<'a, C> {
cursor: *const Node,
}

impl<'a, C: KeyComparitor> IterRef<'a, C> {
impl<'a, C: KeyComparator> IterRef<'a, C> {
pub fn valid(&self) -> bool {
!self.cursor.is_null()
}
Expand Down Expand Up @@ -387,11 +387,11 @@ impl<'a, C: KeyComparitor> IterRef<'a, C> {
#[cfg(test)]
mod tests {
use super::*;
use crate::FixedLengthSuffixComparitor;
use crate::FixedLengthSuffixComparator;

#[test]
fn test_find_near() {
let comp = FixedLengthSuffixComparitor::new(8);
let comp = FixedLengthSuffixComparator::new(8);
let list = Skiplist::with_capacity(comp, 1 << 20);
for i in 0..1000 {
let key = Bytes::from(format!("{:05}{:08}", i * 10 + 5, 0));
Expand Down
14 changes: 7 additions & 7 deletions skiplist/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn key_with_ts(key: &str, ts: u64) -> Bytes {
#[test]
fn test_empty() {
let key = key_with_ts("aaa", 0);
let comp = FixedLengthSuffixComparitor::new(8);
let comp = FixedLengthSuffixComparator::new(8);
let list = Skiplist::with_capacity(comp, ARENA_SIZE);
let v = list.get(&key);
assert!(v.is_none());
Expand All @@ -38,7 +38,7 @@ fn test_empty() {

#[test]
fn test_basic() {
let comp = FixedLengthSuffixComparitor::new(8);
let comp = FixedLengthSuffixComparator::new(8);
let list = Skiplist::with_capacity(comp, ARENA_SIZE);
let table = vec![
("key1", new_value(42)),
Expand All @@ -63,7 +63,7 @@ fn test_basic() {

fn test_concurrent_basic(n: usize, cap: u32, value_len: usize) {
let pool = yatp::Builder::new("concurrent_basic").build_callback_pool();
let comp = FixedLengthSuffixComparitor::new(8);
let comp = FixedLengthSuffixComparator::new(8);
let list = Skiplist::with_capacity(comp, cap);
let kvs: Vec<_> = (0..n)
.map(|i| {
Expand Down Expand Up @@ -115,7 +115,7 @@ fn test_one_key() {
let n = 10000;
let write_pool = yatp::Builder::new("one_key_write").build_callback_pool();
let read_pool = yatp::Builder::new("one_key_read").build_callback_pool();
let comp = FixedLengthSuffixComparitor::new(8);
let comp = FixedLengthSuffixComparator::new(8);
let list = Skiplist::with_capacity(comp, ARENA_SIZE);
let key = key_with_ts("thekey", 0);
let (tx, rx) = mpsc::channel();
Expand Down Expand Up @@ -167,7 +167,7 @@ fn test_one_key() {
#[test]
fn test_iterator_next() {
let n = 100;
let comp = FixedLengthSuffixComparitor::new(8);
let comp = FixedLengthSuffixComparator::new(8);
let list = Skiplist::with_capacity(comp, ARENA_SIZE);
let mut iter_ref = list.iter_ref();
assert!(!iter_ref.valid());
Expand All @@ -190,7 +190,7 @@ fn test_iterator_next() {
#[test]
fn test_iterator_prev() {
let n = 100;
let comp = FixedLengthSuffixComparitor::new(8);
let comp = FixedLengthSuffixComparator::new(8);
let list = Skiplist::with_capacity(comp, ARENA_SIZE);
let mut iter_ref = list.iter_ref();
assert!(!iter_ref.valid());
Expand All @@ -213,7 +213,7 @@ fn test_iterator_prev() {
#[test]
fn test_iterator_seek() {
let n = 100;
let comp = FixedLengthSuffixComparitor::new(8);
let comp = FixedLengthSuffixComparator::new(8);
let list = Skiplist::with_capacity(comp, ARENA_SIZE);
let mut iter_ref = list.iter_ref();
assert!(!iter_ref.valid());
Expand Down
13 changes: 13 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ pub enum Error {
TooLong(String),
#[error("Invalid checksum")]
InvalidChecksum(String),
#[error("Invalid filename")]
InvalidFilename(String),
#[error("Invalid prost data: {0}")]
Decode(#[source] Box<prost::DecodeError>),
#[error("{0}")]
TableRead(String),
}

impl From<io::Error> for Error {
Expand All @@ -24,4 +30,11 @@ impl From<io::Error> for Error {
}
}

impl From<prost::DecodeError> for Error {
#[inline]
fn from(e: prost::DecodeError) -> Error {
Error::Decode(Box::new(e))
}
}

pub type Result<T> = result::Result<T, Error>;
2 changes: 1 addition & 1 deletion src/memtable.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bytes::Bytes;
use skiplist::{FixedLengthSuffixComparitor as Flsc, Skiplist};
use skiplist::{FixedLengthSuffixComparator as Flsc, Skiplist};
use std::collections::VecDeque;
use std::mem::{self, ManuallyDrop, MaybeUninit};
use std::ptr;
Expand Down
52 changes: 43 additions & 9 deletions src/table/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,32 @@ use crate::{checksum, util};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use prost::Message;
use proto::meta::{checksum::Algorithm as ChecksumAlg, BlockOffset, Checksum, TableIndex};
use std::{u16, u32};

/// Entry header stores the difference between current key and block base key.
/// `overlap` is the common prefix of key and base key, and diff is the length
/// of different part.
#[repr(C)]
struct Header {
overlap: u16,
diff: u16,
#[derive(Default)]
pub struct Header {
pub overlap: u16,
pub diff: u16,
}

pub const HEADER_SIZE: usize = std::mem::size_of::<Header>();

impl Header {
fn encode(&self, bytes: &mut BytesMut) {
pub fn encode(&self, bytes: &mut BytesMut) {
bytes.put_u32_le((self.overlap as u32) << 16 | self.diff as u32);
}

fn decode(&mut self, bytes: &mut Bytes) {
pub fn decode(&mut self, bytes: &mut Bytes) {
let h = bytes.get_u32_le();
self.overlap = (h >> 16) as u16;
self.diff = h as u16;
}
}

/// Builder builds an SST.
pub struct Builder {
buf: BytesMut,
base_key: Bytes,
Expand All @@ -36,6 +42,7 @@ pub struct Builder {
}

impl Builder {
/// Create new builder from options
pub fn new(options: Options) -> Builder {
Builder {
// approximately 16MB index + table size
Expand All @@ -50,6 +57,7 @@ impl Builder {
}
}

/// Check if the builder is empty
pub fn is_empty(&self) -> bool {
self.buf.is_empty()
}
Expand Down Expand Up @@ -82,6 +90,7 @@ impl Builder {
h.encode(&mut self.buf);
self.buf.put_slice(diff_key);
v.encode(&mut self.buf);

let sst_size = v.encoded_size() as usize + diff_key.len() + 4;
self.table_index.estimated_size += sst_size as u32 + vlog_len;
}
Expand All @@ -93,7 +102,7 @@ impl Builder {
for offset in &self.entry_offsets {
self.buf.put_u32_le(*offset);
}
self.buf.put_u32_le(self.entry_offsets.len() as u32);
self.buf.put_u32(self.entry_offsets.len() as u32);

let cs = self.build_checksum(&self.buf[self.base_offset as usize..]);
self.write_checksum(cs);
Expand All @@ -119,11 +128,16 @@ impl Builder {
8 + // sum64 in checksum proto
4; // checksum length
assert!(entries_offsets_size < u32::MAX as usize);
let estimated_size = (self.buf.len() as u32) - self.base_offset + 6 /* header size for entry */ + key.len() as u32 + value.encoded_size() as u32 + entries_offsets_size as u32;
let estimated_size = (self.buf.len() as u32)
- self.base_offset + 6 /* header size for entry */
+ key.len() as u32
+ value.encoded_size() as u32
+ entries_offsets_size as u32;
assert!(self.buf.len() + (estimated_size as usize) < u32::MAX as usize);
estimated_size > self.options.block_size as u32
}

/// Add key-value pair to table
pub fn add(&mut self, key: &Bytes, value: Value, vlog_len: u32) {
if self.should_finish_block(&key, &value) {
self.finish_block();
Expand All @@ -135,6 +149,7 @@ impl Builder {
self.add_helper(key, value, vlog_len);
}

/// Check if entries reach its capacity
pub fn reach_capacity(&self, capacity: u64) -> bool {
let block_size = self.buf.len() as u32 + // length of buffer
self.entry_offsets.len() as u32 * 4 + // all entry offsets size
Expand All @@ -147,18 +162,23 @@ impl Builder {
estimated_size as u64 > capacity
}

/// Finalize the table
pub fn finish(&mut self) -> Bytes {
self.finish_block();
if self.buf.is_empty() {
return Bytes::new();
}
let mut bytes = BytesMut::new();
// TODO: move boundaries and build index if we need to encrypt or compress
// append index to buffer
self.table_index.encode(&mut bytes).unwrap();
assert!(bytes.len() < u32::MAX as usize);
self.buf.put_slice(&bytes);
self.buf.put_u32_le(bytes.len() as u32);
self.buf.put_u32(bytes.len() as u32);
// append checksum
let cs = self.build_checksum(&bytes);
self.write_checksum(cs);
// TODO: eliminate clone if we do not need builder any more after finish
self.buf.clone().freeze()
}

Expand Down Expand Up @@ -233,4 +253,18 @@ mod tests {

b.finish();
}

#[test]
fn test_header_encode_decode() {
let mut header = Header {
overlap: 23333,
diff: 23334,
};
let mut buf = BytesMut::new();
header.encode(&mut buf);
let mut buf = buf.freeze();
header.decode(&mut buf);
assert_eq!(header.overlap, 23333);
assert_eq!(header.diff, 23334);
}
}
20 changes: 20 additions & 0 deletions src/util.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pub use skiplist::{FixedLengthSuffixComparator, KeyComparator};
use std::{cmp, ptr};
pub static COMPARATOR: FixedLengthSuffixComparator = FixedLengthSuffixComparator::new(8);

unsafe fn u64(ptr: *const u8) -> u64 {
ptr::read_unaligned(ptr as *const u64)
Expand Down Expand Up @@ -33,3 +35,21 @@ pub fn bytes_diff<'a, 'b>(base: &'a [u8], target: &'b [u8]) -> &'b [u8] {
target.get_unchecked(end..)
}
}

/// simple rewrite of golang sort.Search
pub fn search<F>(n: usize, mut f: F) -> usize
where
F: FnMut(usize) -> bool,
{
let mut i = 0;
let mut j = n;
while i < j {
let h = (i + j) / 2;
if !f(h) {
i = h + 1;
} else {
j = h;
}
}
i
}
Loading

0 comments on commit dbfc35e

Please sign in to comment.