Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agate, util: fix varint decode and add header #29

Merged
merged 12 commits into from
Oct 29, 2020
51 changes: 1 addition & 50 deletions src/entry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::util::{encode_varint_uncheck, varint_len};
use bytes::{BufMut, Bytes, BytesMut};
use std::mem::MaybeUninit;
use std::ptr;

const DELETE: u8 = 1 << 0;
Expand All @@ -11,55 +11,6 @@ pub struct Entry {
pub meta: u8,
}

fn varint_len(l: usize) -> usize {
if l < 0x80 {
1
} else if l < (1 << 14) {
2
} else if l < (1 << 21) {
3
} else if 1 < (1 << 28) {
4
} else if 1 < (1usize << 35) {
5
} else if 1 < (1usize << 42) {
6
} else if 1 < (1usize << 49) {
7
} else if 1 < (1usize << 56) {
8
} else if 1 < (1usize << 63) {
9
} else {
10
}
}

unsafe fn encode_varint_uncheck(bytes: &mut [MaybeUninit<u8>], mut u: u64) -> usize {
let mut p = bytes.as_mut_ptr();
let mut c = 0;
while u >= 0x80 {
(*(&mut *p).as_mut_ptr()) = (u as u8 & 0x7f) | 0x80;
p = p.add(1);
u >>= 7;
c += 1;
}
(*(&mut *p).as_mut_ptr()) = u as u8;
c + 1
}

unsafe fn decode_varint_uncheck(bytes: &[u8]) -> (u64, usize) {
let mut a = 0;
let mut p = bytes.as_ptr();
let mut c = 0;
while *p >= 0x80 {
a = (a << 7) | (*p & 0x7f) as u64;
p = p.add(1);
c += 1;
}
((a << 7) | (*p) as u64, c + 1)
}

impl Entry {
pub fn new(key: Bytes, value: Bytes) -> Entry {
Entry {
Expand Down
91 changes: 91 additions & 0 deletions src/util.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub use skiplist::{FixedLengthSuffixComparator, KeyComparator};
use std::mem::MaybeUninit;
use std::{cmp, ptr};

pub static COMPARATOR: FixedLengthSuffixComparator = FixedLengthSuffixComparator::new(8);

unsafe fn u64(ptr: *const u8) -> u64 {
Expand Down Expand Up @@ -53,3 +55,92 @@ where
}
i
}

pub fn varint_len(l: usize) -> usize {
if l < 0x80 {
1
} else if l < (1usize << 14) {
2
} else if l < (1usize << 21) {
3
} else if l < (1usize << 28) {
4
} else if l < (1usize << 35) {
5
} else if l < (1usize << 42) {
6
} else if l < (1usize << 49) {
7
} else if l < (1usize << 56) {
8
} else if l < (1usize << 63) {
9
} else {
10
}
}

pub unsafe fn encode_varint_uncheck(bytes: &mut [MaybeUninit<u8>], mut u: u64) -> usize {
let mut p = bytes.as_mut_ptr();
let mut c = 0;
while u >= 0x80 {
(*(&mut *p).as_mut_ptr()) = (u as u8 & 0x7f) | 0x80;
p = p.add(1);
u >>= 7;
c += 1;
}
(*(&mut *p).as_mut_ptr()) = u as u8;
c + 1
}

pub unsafe fn decode_varint_uncheck(bytes: &[u8]) -> (u64, usize) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied codec.rs and modified input parameter as BytesMut, plus adapting error type to agatedb.

let mut x: u64 = 0;
let mut s = 0;
let mut i = 0;
let mut p = bytes.as_ptr();
loop {
let b = *p;
if b < 0x80 {
if i > 9 || i == 9 && b > 1 {
return (0, std::usize::MAX);
}
return (x | ((b as u64) << s), i + 1);
}
x |= ((b & 0x7f) as u64) << s;
s += 7;
i += 1;
p = p.add(1);
}
}

#[cfg(test)]
mod tests {
use super::*;
use bytes::{BufMut, BytesMut};

#[test]
fn test_varint_decode_encode() {
for i in vec![
2,
5,
233,
2333,
23333,
233333,
2333333,
23333333333,
std::u64::MAX - 23333333333,
] {
let size = varint_len(i as usize);
let mut bytes = BytesMut::new();
bytes.reserve(size);
unsafe {
let buf = bytes.bytes_mut();
encode_varint_uncheck(buf.get_unchecked_mut(..), i);
}
let (data, count) = unsafe { decode_varint_uncheck(&bytes) };
assert_eq!(data, i);
assert_eq!(count, size);
}
}
}
78 changes: 76 additions & 2 deletions src/wal.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,60 @@
use super::Result;
use crate::util::{decode_varint_uncheck, encode_varint_uncheck, varint_len};
use bytes::{BufMut, Bytes, BytesMut};
use std::fs::{File, OpenOptions};
use std::path::PathBuf;

#[derive(Default, Debug, PartialEq)]
struct Header {
key_len: usize,
value_len: usize,
key_len: u32,
value_len: u32,
expires_at: u64,
meta: u8,
user_meta: u8,
}

impl Header {
pub fn encoded_len(&self) -> usize {
skyzh marked this conversation as resolved.
Show resolved Hide resolved
1 + 1
+ varint_len(self.expires_at as usize)
+ varint_len(self.key_len as usize)
+ varint_len(self.value_len as usize)
}

pub fn encode(&self, bytes: &mut BytesMut) {
let encoded_len = self.encoded_len();
bytes.reserve(encoded_len);
let read = unsafe {
let buf = bytes.bytes_mut();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps adding an if check before entering get_unchecked_mut is better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added the check assert!(buf.len() >= encoded_len);. Seems that reserve will always ensure the capacity is enough, I'm not sure whether this length check is necessary or not.

*(*buf.get_unchecked_mut(0)).as_mut_ptr() = self.meta;
*(*buf.get_unchecked_mut(1)).as_mut_ptr() = self.user_meta;
let mut read = 2;
read += encode_varint_uncheck(buf.get_unchecked_mut(read..), self.key_len as u64);
read += encode_varint_uncheck(buf.get_unchecked_mut(read..), self.value_len as u64);
read += encode_varint_uncheck(buf.get_unchecked_mut(read..), self.expires_at);
read
};
assert_eq!(read, encoded_len);
skyzh marked this conversation as resolved.
Show resolved Hide resolved
unsafe {
bytes.advance_mut(read);
}
}

pub fn decode(&mut self, bytes: &mut Bytes) -> usize {
self.meta = bytes[0];
self.user_meta = bytes[1];
let mut index = 2;
let (key_len, count) = unsafe { decode_varint_uncheck(&bytes[index..]) };
self.key_len = key_len as u32;
index += count;
let (value_len, count) = unsafe { decode_varint_uncheck(&bytes[index..]) };
self.value_len = value_len as u32;
index += count;
let (expires_at, count) = unsafe { decode_varint_uncheck(&bytes[index..]) };
self.expires_at = expires_at;
index += count;
index
}
}

pub struct Wal {
Expand All @@ -18,3 +68,27 @@ impl Wal {
Ok(Wal { f, path })
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_header_encode() {
let header = Header {
key_len: 233333,
value_len: 2333,
expires_at: std::u64::MAX - 2333333,
user_meta: b'A',
meta: b'B',
};

let mut buf = BytesMut::new();
header.encode(&mut buf);
let mut buf = buf.freeze();

let mut new_header = Header::default();
new_header.decode(&mut buf);
assert_eq!(new_header, header);
}
}