Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix segmented buf len implementation #11

Merged
merged 3 commits into from
Mar 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "logdna-client"
version = "0.4.0"
version = "0.4.1"
authors = ["[email protected]"]
edition = "2018"
license = "MIT"
Expand Down Expand Up @@ -47,3 +47,6 @@ tokio-test = "0.3"
proptest = "0.10"
flate2 = "1.0"
serial_test = "0.5"

[profile.release]
debug=true
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ test-local:

.PHONY:test
test: ## Run unit tests
$(RUST_COMMAND) "--env RUST_BACKTRACE=full --env RUST_LOG=$(RUST_LOG) --env LOGDNA_HOST=$(LOGDNA_HOST) --env API_KEY=$(LOGDNA_INGESTION_KEY) " "cargo test --no-run && cargo test --lib --release $(TESTS) -- --nocapture"
$(RUST_COMMAND) "--env RUST_BACKTRACE=full --env RUST_LOG=$(RUST_LOG) --env LOGDNA_HOST=$(LOGDNA_HOST) --env API_KEY=$(LOGDNA_INGESTION_KEY) " "cargo test --no-run && cargo test --lib --release $(TESTS) -- --nocapture --test-threads=1"

.PHONY:clean
clean: ## Clean all artifacts from the build process
Expand Down
21 changes: 14 additions & 7 deletions src/segmented_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,15 @@ impl<T> SegmentedBuf<T> {

impl SegmentedBuf<Reusable<Buffer>> {
pub fn len(&self) -> usize {
let mut pos = self.pos;
let mut rem = self.bufs[pos].len() - self.offset;
pos += 1;

while pos < self.bufs.len() {
let mut pos = 0;
let mut rem = 0;
// Count the full buffers
while pos < self.pos {
rem += self.bufs[pos].len();
pos += 1;
}

// Add on the last, partial buffer
rem += self.offset;
rem
}

Expand Down Expand Up @@ -727,6 +727,7 @@ where
mod test {
use super::*;
use serial_test::serial;
use std::sync::atomic::{fence, Ordering};

macro_rules! aw {
($e:expr) => {
Expand All @@ -749,6 +750,7 @@ mod test {
use std::io::Write;
buf.write_all(&inp.1).unwrap();

assert_eq!(buf.len(), inp.0);
assert_eq!(buf.iter()
.zip(inp.1.iter())
.fold(true,
Expand All @@ -758,7 +760,6 @@ mod test {
true);

assert_eq!(inp.0, buf.iter().count());

}

}
Expand All @@ -778,6 +779,7 @@ mod test {
buf
});

assert_eq!(buf.len(), inp.0);
assert_eq!(buf.iter()
.zip(inp.1.iter())
.fold(true,
Expand Down Expand Up @@ -835,6 +837,7 @@ mod test {
{
let b = Buffer::new(BytesMut::new());
drop(b);
fence(Ordering::SeqCst);
// Ensure we havn't allocated any bufs yet
let counts = countme::get::<Buffer>();
assert_eq!(counts.live, 0);
Expand All @@ -849,6 +852,7 @@ mod test {
// Keep a reference to the pool around
let pool = buf.pool.clone();

fence(Ordering::SeqCst);
// Ensure we havn't allocated more bufs than necessary
let counts = countme::get::<Buffer>();
assert!(counts.live > 0);
Expand All @@ -865,6 +869,7 @@ mod test {
);

// Ensure we never allocated more buffers than were needed to hold the total elements
fence(Ordering::SeqCst);
let counts = countme::get::<Buffer>();
assert!(
counts.total - base_total
Expand All @@ -882,12 +887,14 @@ mod test {
}

drop(buf);
fence(Ordering::SeqCst);
let counts = countme::get::<Buffer>();

// Ensure pool is cleared up
assert!(counts.live <= serialization_buf_reserve_segments);

drop(pool);
fence(Ordering::SeqCst);
let counts = countme::get::<Buffer>();
assert!(counts.live == 0);
}
Expand Down