Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement passthrough mod #2

Merged
merged 32 commits into from
Feb 8, 2019
Merged
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
8e9ef2c
passthrough implementation
bparli Jan 14, 2019
67fa75e
connection tracker size
bparli Jan 19, 2019
0821472
backend failures and RST replies
bparli Jan 21, 2019
4906fc5
fix update status bug
bparli Jan 23, 2019
72711a0
update stats page
bparli Jan 24, 2019
e4519d4
limit conn tracker lock
bparli Jan 25, 2019
ce96344
perf and dsr
bparli Jan 27, 2019
bfe5d2a
use multiple transmitters
bparli Jan 27, 2019
aed1ec8
test perf improvements
bparli Jan 27, 2019
83f788c
dsr testing
bparli Jan 30, 2019
a19074b
use to_owned for ethernetpacket
bparli Jan 30, 2019
f648537
update stats counters only every so often
bparli Jan 31, 2019
0fd1168
add unit tests passthrough backend
bparli Feb 1, 2019
bae92d5
add tx thread and channel
bparli Feb 2, 2019
ef6275a
unit tests and re-use tcp_header
bparli Feb 4, 2019
4c4d9a3
unit tests
bparli Feb 4, 2019
898334f
unit tests
bparli Feb 5, 2019
473499d
re-use tcp packet
bparli Feb 5, 2019
47efd00
perf tweaks
bparli Feb 6, 2019
141aa29
perf tweaks
bparli Feb 6, 2019
4818f5f
perf tweaks
bparli Feb 6, 2019
fe6c394
some clean up
bparli Feb 6, 2019
af8cb38
make port mapper a rw lock
bparli Feb 6, 2019
6fd7f22
improve health checking
bparli Feb 7, 2019
d78a16c
more perf tweaking
bparli Feb 8, 2019
87739c4
fix tests
bparli Feb 8, 2019
cf39025
bump version
bparli Feb 8, 2019
036c8f5
fix cmd line opts
bparli Feb 8, 2019
84bf410
Update README.md
bparli Feb 8, 2019
4d66145
update samples
bparli Feb 8, 2019
0c96373
update readme with samples
bparli Feb 8, 2019
394cc18
extra spaces
bparli Feb 8, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
re-use tcp packet
  • Loading branch information
bparli committed Feb 5, 2019
commit 473499dd2d147ae62934d4ed4a058771fe5fcf4e
69 changes: 22 additions & 47 deletions src/passthrough/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use crate::stats::StatsMssg;
use pnet::packet::ip::IpNextHeaderProtocols;
use pnet::transport::{transport_channel};
use pnet::transport::TransportChannelType::{Layer3};
use pnet::packet::tcp::{TcpPacket, MutableTcpPacket};
use pnet::packet::tcp::{MutableTcpPacket};
use pnet::packet::{tcp};
use pnet::packet::ipv4::{checksum, Ipv4Packet, MutableIpv4Packet};
use pnet::packet::{MutablePacket, Packet};
use pnet::packet::{Packet};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use pnet::datalink::{self, NetworkInterface};
use pnet::packet::ethernet::{EtherTypes, EthernetPacket};
Expand Down Expand Up @@ -240,15 +240,7 @@ impl LB {
}

// handle repsonse packets from a backend server passing back through the loadbalancer
fn server_response_handler(&mut self, ip_header: &Ipv4Packet, client_addr: &SocketAddr, tx: Sender<MutableIpv4Packet>) -> Option<StatsMssg> {
let mut tcp_header = match MutableTcpPacket::owned(ip_header.payload().to_owned()) {
Some(tcp_header) => tcp_header,
None => {
error!("Unable to decapsulate tcp header");
return None
}
};

fn server_response_handler(&mut self, tcp_header: &mut MutableTcpPacket, client_addr: &SocketAddr, tx: Sender<MutableIpv4Packet>) -> Option<StatsMssg> {
match client_addr.ip() {
IpAddr::V4(client_ipv4) => {
let mut mssg = StatsMssg{frontend: None,
Expand Down Expand Up @@ -294,15 +286,7 @@ impl LB {
}

// handle requests packets from a client
fn client_handler(&mut self, ip_header: &Ipv4Packet, tx: Sender<MutableIpv4Packet>) -> Option<StatsMssg> {
let mut tcp_header = match MutableTcpPacket::owned(ip_header.payload().to_owned()) {
Some(tcp_header) => tcp_header,
None => {
error!("Unable to decapsulate tcp header");
return None
}
};

fn client_handler(&mut self, ip_header: &Ipv4Packet, tcp_header: &mut MutableTcpPacket, tx: Sender<MutableIpv4Packet>) -> Option<StatsMssg> {
let client_port = tcp_header.get_source();

// setup stats update return
Expand All @@ -316,7 +300,6 @@ impl LB {
let ipbuf: Vec<u8> = vec!(0; tcp_header.packet().len() + IPV4_HEADER_LEN);
let mut new_ipv4 = MutableIpv4Packet::owned(ipbuf).unwrap();

new_ipv4.clone_from(ip_header);
new_ipv4.set_total_length(tcp_header.packet().len() as u16 + IPV4_HEADER_LEN as u16);
new_ipv4.set_version(4);
new_ipv4.set_ttl(225);
Expand All @@ -326,6 +309,8 @@ impl LB {
// leave original ip source if dsr
if !self.dsr {
new_ipv4.set_source(self.listen_ip);
} else{
new_ipv4.set_source(ip_header.get_source());
}

//check if we are already tracking this connection
Expand Down Expand Up @@ -510,10 +495,10 @@ fn process_packets(lb: &mut LB, rx: crossbeam_channel::Receiver<EthernetPacket>,
Some(mut ip_header) => {
let ip_addr = ip_header.get_destination();
if ip_addr == lb.listen_ip {
match TcpPacket::new(ip_header.payload()) {
Some(tcp_header) => {
match MutableTcpPacket::new(&mut ip_header.payload().to_owned()) {
Some(mut tcp_header) => {
if tcp_header.get_destination() == lb.listen_port {
if let Some(stats_update) = lb.client_handler(&mut ip_header, loop_tx.clone()) {
if let Some(stats_update) = lb.client_handler(&mut ip_header, &mut tcp_header, loop_tx.clone()) {
stats.connections += &stats_update.connections;
stats.bytes_rx += &stats_update.bytes_rx;
stats.bytes_tx += &stats_update.bytes_tx;
Expand All @@ -522,7 +507,7 @@ fn process_packets(lb: &mut LB, rx: crossbeam_channel::Receiver<EthernetPacket>,
// only handling server repsonses if not using dsr
if let Some(client_addr) = lb.port_mapper.lock().unwrap().get_mut(&tcp_header.get_destination()) {
// if true the client socketaddr is in portmapper and the connection/response from backend server is relevant
if let Some(stats_update) = lb.clone().server_response_handler(&ip_header, &SocketAddr::new( client_addr.ip, client_addr.port), loop_tx.clone()) {
if let Some(stats_update) = lb.clone().server_response_handler(&mut tcp_header, &SocketAddr::new( client_addr.ip, client_addr.port), loop_tx.clone()) {
stats.connections += &stats_update.connections;
stats.bytes_rx += &stats_update.bytes_rx;
stats.bytes_tx += &stats_update.bytes_tx;
Expand Down Expand Up @@ -679,17 +664,14 @@ mod tests {
use std::thread;
use crate::config::{Config};
use crate::passthrough;
use hyper::{Body, Request, Response, Server};
use hyper::service::service_fn_ok;
use hyper::rt::{self, Future};
use std::fs::File;
use std::io::{Read, Write};
use std::{time};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use self::passthrough::utils::{build_dummy_eth, build_dummy_ip};
use self::passthrough::{EPHEMERAL_PORT_LOWER, EPHEMERAL_PORT_UPPER, Node, process_packets};
use pnet::packet::tcp::{TcpPacket};
use pnet::packet::ipv4::{Ipv4Packet, MutableIpv4Packet};
use pnet::packet::tcp::{TcpPacket, MutableTcpPacket};
use pnet::packet::ipv4::{MutableIpv4Packet};
use pnet::packet::ethernet::EthernetPacket;
use pnet::packet::Packet;
use crossbeam_channel::unbounded;
Expand All @@ -710,18 +692,6 @@ mod tests {

#[test]
fn test_new_passthrough() {
// thread::spawn( move ||{
// let addr = format!("{}{}", "127.0.0.1", ":3080").parse().unwrap();
// let server = Server::bind(&addr)
// .serve(|| {
// service_fn_ok(move |_: Request<Body>| {
// Response::new(Body::from("Success DummyA Server"))
// })
// })
// .map_err(|e| eprintln!("server error: {}", e));
// rt::run(server);
// });

let conf = Config::new("testdata/passthrough_test.toml").unwrap();
let srv = passthrough::Server::new(conf.clone(), false);
let mut lb = srv.lbs[0].clone();
Expand Down Expand Up @@ -752,7 +722,8 @@ mod tests {
for i in 0..5 {
let tx = tx.clone();
let ip_header = build_dummy_ip(dummy_ip, dummy_ip, 35000 + i, 3000);
lb.client_handler(&mut ip_header.to_immutable(), tx);
let mut tcp_header = MutableTcpPacket::owned(ip_header.payload().to_owned()).unwrap();
lb.client_handler(&mut ip_header.to_immutable(), &mut tcp_header, tx);
}

assert_eq!(lb.conn_tracker.lock().unwrap().len(), 2);
Expand Down Expand Up @@ -810,8 +781,9 @@ mod tests {

// simulate response from server at port 80 to local (ephemeral) port 35000
let resp_header = build_dummy_ip(backend_srv_ip, lb_ip, 80, 35000);
let mut tcp_header = MutableTcpPacket::owned(resp_header.payload().to_owned()).unwrap();
// server should respond to client ip at client's port
lb.server_response_handler(&mut resp_header.to_immutable(), &SocketAddr::new(IpAddr::V4(client_ip), 55000), tx);
lb.server_response_handler(&mut tcp_header, &SocketAddr::new(IpAddr::V4(client_ip), 55000), tx);
let srv_resp: MutableIpv4Packet = rx.recv().unwrap();
assert_eq!(srv_resp.get_destination(), client_ip);
assert_eq!(srv_resp.get_source(), lb_ip);
Expand Down Expand Up @@ -844,9 +816,10 @@ mod tests {

// gen test ip/tcp packet with simulated client
let req_header = build_dummy_ip(client_ip, lb_ip, 43000, 3000);
let mut tcp_header = MutableTcpPacket::owned(req_header.payload().to_owned()).unwrap();

// call client_handler and verify packet being sent out to healthy backend server
lb.client_handler(&mut req_header.to_immutable(), tx.clone());
lb.client_handler(&mut req_header.to_immutable(), &mut tcp_header, tx.clone());
let fwd_pkt: MutableIpv4Packet = rx.recv().unwrap();
assert_eq!(fwd_pkt.get_destination(), backend_srv_ip);
assert_eq!(fwd_pkt.get_source(), lb_ip);
Expand All @@ -872,7 +845,8 @@ mod tests {

{
// check same client again to verify connection tracker is used
lb.client_handler(&mut req_header.to_immutable(), tx.clone());
let mut tcp_header = MutableTcpPacket::owned(req_header.payload().to_owned()).unwrap();
lb.client_handler(&mut req_header.to_immutable(), &mut tcp_header, tx.clone());
// next port should not have incremented
assert_eq!(*lb.next_port.lock().unwrap(), EPHEMERAL_PORT_LOWER + 1);

Expand All @@ -896,7 +870,8 @@ mod tests {
}

// check same client again to verify connection is failed
lb.client_handler(&mut req_header.to_immutable(), tx.clone());
let mut tcp_header = MutableTcpPacket::owned(req_header.payload().to_owned()).unwrap();
lb.client_handler(&mut req_header.to_immutable(), &mut tcp_header, tx.clone());
// since there are not healthy backend servers there should be no connections added to map
assert_eq!(lb.conn_tracker.lock().unwrap().len(), 0);
}
Expand Down