From: Richard Whitehouse Date: Mon, 22 Oct 2018 12:04:33 +0000 (-0400) Subject: Separate out the server X-Git-Url: https://git.richardwhiuk.com/?a=commitdiff_plain;h=a38da4563b146935ab5704339712b7c492fa8733;p=rust-sip.git Separate out the server --- diff --git a/src/codec.rs b/src/codec.rs index 91120d5..6921f7a 100644 --- a/src/codec.rs +++ b/src/codec.rs @@ -1,18 +1,12 @@ use bytes::BytesMut; - -use futures::future; -use futures::{Future, Stream}; use nom; use std; use std::io; -use std::sync::Arc; -use tokio_core::net::TcpListener; -use tokio_core::reactor::Handle; use tokio_io::codec::{Decoder, Encoder}; use parser::header; use parser::top_line; -use types::{Header, RequestLine, StatusLine, TopLine}; +use types::{Header, Method, RequestLine, StatusLine, TopLine}; const SPACE: u8 = b' '; const TAB: u8 = b'\t'; @@ -261,6 +255,10 @@ impl Request { fn as_bytes(&self) -> &[u8] { return self.request_line.1.as_bytes(); } + + pub fn method(&self) -> &Method { + &self.request_line.0.method + } } impl Message { @@ -334,10 +332,6 @@ impl Encoder for SipCodec { } } -pub struct Sip { - listeners: Vec, -} - enum CharType { Line, Whitespace, @@ -351,195 +345,3 @@ fn chartype(char: Option<&u8>) -> CharType { Some(_) => CharType::Other, } } - -impl Sip { - pub fn new() -> Sip { - Sip { - listeners: Vec::new(), - } - } - - pub fn add_listener(&mut self, listener: TcpListener) { - self.listeners.push(listener); - } - - pub fn run( - self, - handle: Handle, - caller: Arc>, - ) -> Box> - where - F: 'static + Fn(Message) -> Result<(), std::io::Error>, - { - let mut future: Box> = Box::new(future::ok(())); - - for listener in self.listeners { - let handle = handle.clone(); - let caller = caller.clone(); - - // Iterate incoming connections - future = Box::new( - future - .join(listener.incoming().for_each(move |(tcp, _)| { - let caller = caller.clone(); - - // Split up the read and write halves - let decoder = SipCodec::new(); - - let (_sink, stream) = decoder.framed(tcp).split(); - - let future = stream.for_each(move |message| caller(message)); - - let future = future.map_err(|err| error!("Error {:?}", err)); - - // Spawn the future as a concurrent task - handle.spawn(future); - - Ok(()) - })).map(|_| ()), - ); - } - - future - } -} - -#[cfg(test)] -mod tests { - - extern crate env_logger; - - use codec::Message; - use codec::Sip; - use futures; - use futures::future::Either; - use futures::Future; - use std::net::Shutdown; - use std::ops::DerefMut; - use std::sync::Arc; - use std::sync::Mutex; - use tokio_core::net::TcpListener; - use tokio_core::net::TcpStream; - use tokio_core::reactor::Core; - use tokio_io::io; - use types::Method; - - fn decode_sip_message>(message: S) -> Message { - // Create the event loop that will drive this server - let mut core = Core::new().unwrap(); - let handle = core.handle(); - - let mut sip = Sip::new(); - - // Bind the server's socket - let addr = "127.0.0.1:12345".parse().unwrap(); - sip.add_listener(TcpListener::bind(&addr, &handle).unwrap()); - - let m = Arc::new(Mutex::new(Option::None)); - - let server = { - let m = m.clone(); - - sip.run( - core.handle(), - Arc::new(Box::new(move |message| { - let mut m = m.lock().unwrap(); - *m = Some(message); - Ok(()) - })), - ) - }; - - // Send a request - let socket = TcpStream::connect(&addr, &handle); - - let message = message.into(); - - let request = socket.and_then(|socket| io::write_all(socket, message)); - - let finished = request.and_then(|(socket, _request)| { - futures::done(socket.shutdown(Shutdown::Write).map(|_| socket)) - }); - - let response = finished.and_then(|socket| io::read_to_end(socket, Vec::new())); - - // Spin up the server on the event loop, and connect the client to it - let res = core.run(server.select2(response)); - - { - let mut message = m.lock().unwrap(); - - // Response - match (res, message.deref_mut()) { - ( - Ok(Either::B(((ref _socket, ref _response), _))), - ref mut message @ &mut Some(_), - ) => message.take().unwrap(), - (Err(Either::A((err, _))), _) => panic!("{:?}", err), - (Err(Either::B((err, _))), _) => panic!("{:?}", err), - (_, &mut None) => panic!("No message received"), - (Ok(_), _) => panic!("Expected client to complete"), - } - } - } - - #[test] - fn test_message_decode() { - env_logger::init().unwrap(); - - let message = decode_sip_message( - "MESSAGE sip:test.com SIP/2.0\r\n\ - Accept: text/plain\r\n\ - Accept-Encoding: *\r\n\ - Accept-Language: en-gb\r\n\ - Alert-Info: \r\n\ - Allow: MESSAGE\r\n\ - Authentication-Info: qop=auth\r\n\ - Authorization: Digest username=\"Alice\"\r\n\ - Call-ID: f81d4fae-7dec-11d0-a765-00a0c91e6bf6@foo.bar.com\r\n\ - Call-Info: ; purpose=icon\r\n\ - Contact: *\r\n\ - Content-Disposition: session\r\n\ - Content-Encoding: gzip\r\n\ - Content-Language: en-gb\r\n\ - Content-Length: 0\r\n\ - Content-Type: text/plain\r\n\ - CSeq: 1 MESSAGE\r\n\ - Date: Sat, 13 Nov 2010 23:29:00 GMT\r\n\ - Error-Info: \r\n\ - Expires: 30\r\n\ - From: sip:+12125551212@server.phone2net.com; tag=887s\r\n\ - In-Reply-To: 70710@saturn.bell-tel.com,17320@saturn.bell-tel. com\r\n\ - Max-Forwards: 32\r\n\ - MIME-Version:2.0\r\n\ - Min-Expires: 30\r\n\ - Organization: Foobar\r\n\ - Priority: normal\r\n\ - Proxy-Authenticate: Digest realm=\"atlanta.com\"\r\n\ - Proxy-Authorization: Digest username=\"Bob\"\r\n\ - Proxy-Require: foo\r\n\ - Record-Route: \r\n\ - Reply-To: \r\n\ - Require: baz\r\n\ - Retry-After:18000;duration=3600\r\n\ - Route: \r\n\ - Server: rust-sip tokio\r\n\ - Subject: Foobaz\r\n\ - Supported: rec\r\n\ - Timestamp: 1 2\r\n\ - To: ; tag=287447\r\n\ - Unsupported: 100rel\r\n\ - User-Agent: rust-sip\r\n\ - Via: SIP/2.0/UDP pc33.atlanta.com;branch=z9hG4bK776asdhds\r\n\ - Warning: 370 devnull \"Failure\"\r\n\ - WWW-Authenticate: Digest realm=\"biloxi.com\"\r\n\ - X-Extension: test\r\n\r\n", - ); - - if let Message::Request(request) = message { - assert_eq!(request.request_line.0.method, Method::MESSAGE); - } else { - panic!("Unexpected message: {:?}", message); - } - } -} diff --git a/src/lib.rs b/src/lib.rs index bc3d893..10161b3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,7 @@ extern crate log; #[macro_use] extern crate nom; -pub mod codec; +mod codec; mod parser; +pub mod server; mod types; diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..a40b3ce --- /dev/null +++ b/src/server.rs @@ -0,0 +1,204 @@ +use codec::{Message, SipCodec}; +use futures::future; +use futures::{Future, Stream}; +use std; +use std::sync::Arc; +use tokio_core::net::TcpListener; +use tokio_core::reactor::Handle; +use tokio_io::codec::Decoder; + +pub struct Sip { + listeners: Vec, +} + +impl Sip { + pub fn new() -> Sip { + Sip { + listeners: Vec::new(), + } + } + + pub fn add_listener(&mut self, listener: TcpListener) { + self.listeners.push(listener); + } + + pub fn run( + self, + handle: Handle, + caller: Arc>, + ) -> Box> + where + F: 'static + Fn(Message) -> Result<(), std::io::Error>, + { + let mut future: Box> = Box::new(future::ok(())); + + for listener in self.listeners { + let handle = handle.clone(); + let caller = caller.clone(); + + // Iterate incoming connections + future = Box::new( + future + .join(listener.incoming().for_each(move |(tcp, _)| { + let caller = caller.clone(); + + // Split up the read and write halves + let decoder = SipCodec::new(); + + let (_sink, stream) = decoder.framed(tcp).split(); + + let future = stream.for_each(move |message| caller(message)); + + let future = future.map_err(|err| error!("Error {:?}", err)); + + // Spawn the future as a concurrent task + handle.spawn(future); + + Ok(()) + })).map(|_| ()), + ); + } + + future + } +} + +#[cfg(test)] +mod tests { + + extern crate env_logger; + + use super::Sip; + use codec::Message; + use futures; + use futures::future::Either; + use futures::Future; + use std::net::Shutdown; + use std::ops::DerefMut; + use std::sync::Arc; + use std::sync::Mutex; + use tokio_core::net::TcpListener; + use tokio_core::net::TcpStream; + use tokio_core::reactor::Core; + use tokio_io::io; + use types::Method; + + fn decode_sip_message>(message: S) -> Message { + // Create the event loop that will drive this server + let mut core = Core::new().unwrap(); + let handle = core.handle(); + + let mut sip = Sip::new(); + + // Bind the server's socket + let addr = "127.0.0.1:12345".parse().unwrap(); + sip.add_listener(TcpListener::bind(&addr, &handle).unwrap()); + + let m = Arc::new(Mutex::new(Option::None)); + + let server = { + let m = m.clone(); + + sip.run( + core.handle(), + Arc::new(Box::new(move |message| { + let mut m = m.lock().unwrap(); + *m = Some(message); + Ok(()) + })), + ) + }; + + // Send a request + let socket = TcpStream::connect(&addr, &handle); + + let message = message.into(); + + let request = socket.and_then(|socket| io::write_all(socket, message)); + + let finished = request.and_then(|(socket, _request)| { + futures::done(socket.shutdown(Shutdown::Write).map(|_| socket)) + }); + + let response = finished.and_then(|socket| io::read_to_end(socket, Vec::new())); + + // Spin up the server on the event loop, and connect the client to it + let res = core.run(server.select2(response)); + + { + let mut message = m.lock().unwrap(); + + // Response + match (res, message.deref_mut()) { + ( + Ok(Either::B(((ref _socket, ref _response), _))), + ref mut message @ &mut Some(_), + ) => message.take().unwrap(), + (Err(Either::A((err, _))), _) => panic!("{:?}", err), + (Err(Either::B((err, _))), _) => panic!("{:?}", err), + (_, &mut None) => panic!("No message received"), + (Ok(_), _) => panic!("Expected client to complete"), + } + } + } + + #[test] + fn test_message_decode() { + env_logger::init().unwrap(); + + let message = decode_sip_message( + "MESSAGE sip:test.com SIP/2.0\r\n\ + Accept: text/plain\r\n\ + Accept-Encoding: *\r\n\ + Accept-Language: en-gb\r\n\ + Alert-Info: \r\n\ + Allow: MESSAGE\r\n\ + Authentication-Info: qop=auth\r\n\ + Authorization: Digest username=\"Alice\"\r\n\ + Call-ID: f81d4fae-7dec-11d0-a765-00a0c91e6bf6@foo.bar.com\r\n\ + Call-Info: ; purpose=icon\r\n\ + Contact: *\r\n\ + Content-Disposition: session\r\n\ + Content-Encoding: gzip\r\n\ + Content-Language: en-gb\r\n\ + Content-Length: 0\r\n\ + Content-Type: text/plain\r\n\ + CSeq: 1 MESSAGE\r\n\ + Date: Sat, 13 Nov 2010 23:29:00 GMT\r\n\ + Error-Info: \r\n\ + Expires: 30\r\n\ + From: sip:+12125551212@server.phone2net.com; tag=887s\r\n\ + In-Reply-To: 70710@saturn.bell-tel.com,17320@saturn.bell-tel.com\r\n\ + Max-Forwards: 32\r\n\ + MIME-Version:2.0\r\n\ + Min-Expires: 30\r\n\ + Organization: Foobar\r\n\ + Priority: normal\r\n\ + Proxy-Authenticate: Digest realm=\"atlanta.com\"\r\n\ + Proxy-Authorization: Digest username=\"Bob\"\r\n\ + Proxy-Require: foo\r\n\ + Record-Route: \r\n\ + Reply-To: \r\n\ + Require: baz\r\n\ + Retry-After:18000;duration=3600\r\n\ + Route: \r\n\ + Server: rust-sip tokio\r\n\ + Subject: Foobaz\r\n\ + Supported: rec\r\n\ + Timestamp: 1 2\r\n\ + To: ; tag=287447\r\n\ + Unsupported: 100rel\r\n\ + User-Agent: rust-sip\r\n\ + Via: SIP/2.0/UDP pc33.atlanta.com;branch=z9hG4bK776asdhds\r\n\ + Warning: 370 devnull \"Failure\"\r\n\ + WWW-Authenticate: Digest realm=\"biloxi.com\"\r\n\ + X-Extension: test\r\n\r\n", + ); + + if let Message::Request(request) = message { + assert_eq!(*request.method(), Method::MESSAGE); + } else { + panic!("Unexpected message: {:?}", message); + } + } +}