Skip to content

Commit

Permalink
Fixed http codec bug, server gives client IP in stream.
Browse files Browse the repository at this point in the history
  • Loading branch information
illegalprime committed May 25, 2017
1 parent aacd6ab commit 7ee2a0d
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 8 deletions.
5 changes: 3 additions & 2 deletions examples/async-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ fn main() {
let f = server.incoming()
// we don't wanna save the stream if it drops
.map_err(|InvalidConnection { error, .. }| error)
.for_each(|upgrade| {
.for_each(|(upgrade, addr)| {
println!("Got a connection from: {}", addr);
// check if it has the protocol we want
if !upgrade.protocols().iter().any(|s| s == "rust-websocket") {
// reject it if it doesn't
Expand All @@ -41,7 +42,7 @@ fn main() {
// simple echo server impl
.and_then(|s| {
let (sink, stream) = s.split();
stream.filter_map(|m| {
stream.take_while(|m| Ok(!m.is_close())).filter_map(|m| {
println!("Message from Client: {:?}", m);
match m {
OwnedMessage::Ping(p) => Some(OwnedMessage::Pong(p)),
Expand Down
2 changes: 1 addition & 1 deletion src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ impl<'u> ClientBuilder<'u> {
message
.ok_or(WebSocketError::ProtocolError(
"Connection closed before handshake could complete."))
.and_then(|message| builder.validate(&message).map(|_| (message, stream)))
.and_then(|message| builder.validate(&message).map(|()| (message, stream)))
})

// output the final client and metadata
Expand Down
2 changes: 1 addition & 1 deletion src/codec/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl Decoder for HttpClientCodec {
// TODO: this is ineffecient, but hyper does not give us a better way to parse
match split_off_http(src) {
Some(buf) => {
let mut reader = BufReader::new(&*src as &[u8]);
let mut reader = BufReader::with_capacity(&*buf as &[u8], buf.len());
let res = match parse_response(&mut reader) {
Err(hyper::Error::Io(ref e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
return Ok(None)
Expand Down
13 changes: 9 additions & 4 deletions src/server/async.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::io;
use std::net::ToSocketAddrs;
use std::net::SocketAddr;
use server::{WsServer, NoTlsAcceptor};
use tokio_core::net::{TcpListener, TcpStream};
use futures::{Stream, Future};
Expand All @@ -15,7 +16,8 @@ use tokio_tls::{TlsAcceptorExt, TlsStream};

pub type Server<S> = WsServer<S, TcpListener>;

pub type Incoming<S> = Box<Stream<Item = Upgrade<S>, Error = InvalidConnection<S, BytesMut>>>;
pub type Incoming<S> = Box<Stream<Item = (Upgrade<S>, SocketAddr),
Error = InvalidConnection<S, BytesMut>>>;

pub enum AcceptError<E> {
Io(io::Error),
Expand Down Expand Up @@ -43,7 +45,7 @@ impl WsServer<NoTlsAcceptor, TcpListener> {
error: e.into(),
}
})
.and_then(|(stream, _)| {
.and_then(|(stream, a)| {
stream.into_ws()
.map_err(|(stream, req, buf, err)| {
InvalidConnection {
Expand All @@ -53,6 +55,7 @@ impl WsServer<NoTlsAcceptor, TcpListener> {
error: err,
}
})
.map(move |u| (u, a))
});
Box::new(future)
}
Expand Down Expand Up @@ -85,7 +88,7 @@ impl WsServer<TlsAcceptor, TcpListener> {
error: e.into(),
}
})
.and_then(move |(stream, _)| {
.and_then(move |(stream, a)| {
acceptor.accept_async(stream)
.map_err(|e| {
InvalidConnection {
Expand All @@ -96,8 +99,9 @@ impl WsServer<TlsAcceptor, TcpListener> {
error: io::Error::new(io::ErrorKind::Other, e).into(),
}
})
.map(move |s| (s, a))
})
.and_then(|stream| {
.and_then(|(stream, a)| {
stream.into_ws()
.map_err(|(stream, req, buf, err)| {
InvalidConnection {
Expand All @@ -107,6 +111,7 @@ impl WsServer<TlsAcceptor, TcpListener> {
error: err,
}
})
.map(move |u| (u, a))
});
Box::new(future)
}
Expand Down

0 comments on commit 7ee2a0d

Please sign in to comment.