From: Richard Whitehouse Date: Mon, 22 Oct 2018 20:56:20 +0000 (-0400) Subject: Rename Sip to Transport X-Git-Url: https://git.richardwhiuk.com/?a=commitdiff_plain;h=d06a04f807d86d5ec28d31e2522b4b922de1f01c;p=rust-sip.git Rename Sip to Transport --- diff --git a/src/lib.rs b/src/lib.rs index 1446ef7..b3275a3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,5 +14,5 @@ extern crate nom; mod codec; mod message; mod parser; -pub mod server; +pub mod transport; mod types; diff --git a/src/server.rs b/src/server.rs deleted file mode 100644 index b84b180..0000000 --- a/src/server.rs +++ /dev/null @@ -1,221 +0,0 @@ -use futures::future; -use futures::{Future, Stream}; -use std; -use std::error::Error; -use std::sync::Arc; -use tokio_core::net::TcpListener; -use tokio_core::reactor::Handle; -use tokio_io::codec::Decoder; - -use codec::SipCodec; -use message::Message; - -pub struct Sip { - listeners: Vec, - handle: Handle, -} - -impl Sip { - pub fn new(handle : Handle) -> Sip { - Sip { - listeners: Vec::new(), - handle: handle, - } - } - - pub fn listen_tcp(handle: Handle, addr: &str) -> Result> { - let mut s = Sip::new(handle); - s.add_tcp_listen_address(addr)?; - Ok(s) - } - - pub fn add_tcp_listen_address(&mut self, addr: &str) -> Result<(), Box> { - let addr = addr.parse()?; - let t = TcpListener::bind(&addr, &self.handle)?; - self.add_listener(t); - Ok(()) - } - - pub fn add_listener(&mut self, listener: TcpListener) { - self.listeners.push(listener); - } - - pub fn run( - self, - caller: Arc>, - ) -> Box> - where - F: 'static + Fn(Message) -> Result<(), std::io::Error>, - { - let mut future: Box> = Box::new(future::ok(())); - - for listener in self.listeners { - let handle = self.handle.clone(); - let caller = caller.clone(); - - // Iterate incoming connections - future = Box::new( - future - .join(listener.incoming().for_each(move |(tcp, _)| { - let caller = caller.clone(); - - // Split up the read and write halves - let decoder = SipCodec::new(); - - let (_sink, stream) = decoder.framed(tcp).split(); - - let future = stream.for_each(move |message| caller(message)); - - let future = future.map_err(|err| error!("Error {:?}", err)); - - // Spawn the future as a concurrent task - handle.spawn(future); - - Ok(()) - })).map(|_| ()), - ); - } - - future - } -} - -#[cfg(test)] -mod tests { - - extern crate env_logger; - - use super::Sip; - use futures; - use futures::future::Either; - use futures::Future; - use message::Message; - use std::net::Shutdown; - use std::ops::DerefMut; - use std::sync::Arc; - use std::sync::Mutex; - use tokio_core::net::TcpStream; - use tokio_core::reactor::Core; - use tokio_io::io; - use types::Method; - - fn decode_sip_message>(message: S) -> Message { - // Create the event loop that will drive this server - let mut core = Core::new().unwrap(); - - // Address to listen on, and connect to. - let addr = "127.0.0.1:12345"; - - // Create a SIP server which listens on TCP, and stores a message - // when it arrives. - let m = Arc::new(Mutex::new(Option::None)); - - let sip = Sip::listen_tcp(core.handle(), addr).unwrap(); - let server = { - let m = m.clone(); - - sip.run( - Arc::new(Box::new(move |message| { - let mut m = m.lock().unwrap(); - *m = Some(message); - Ok(()) - })), - ) - }; - - // Send the request - let handle = core.handle(); - let addr = "127.0.0.1:12345".parse().unwrap(); - let socket = TcpStream::connect(&addr, &handle); - let message = message.into(); - - // Wait for the send to complete, and any response to arrive - let request = socket.and_then(|socket| io::write_all(socket, message)); - - let finished = request.and_then(|(socket, _request)| { - futures::done(socket.shutdown(Shutdown::Write).map(|_| socket)) - }); - - let response = finished.and_then(|socket| io::read_to_end(socket, Vec::new())); - - // Run the event loop to completion - let res = core.run(server.select2(response)); - - // Check what message was decoded - { - let mut message = m.lock().unwrap(); - - // Response - match (res, message.deref_mut()) { - ( - Ok(Either::B(((ref _socket, ref _response), _))), - ref mut message @ &mut Some(_), - ) => message.take().unwrap(), - (Err(Either::A((err, _))), _) => panic!("{:?}", err), - (Err(Either::B((err, _))), _) => panic!("{:?}", err), - (_, &mut None) => panic!("No message received"), - (Ok(_), _) => panic!("Expected client to complete"), - } - } - } - - #[test] - fn test_message_decode() { - env_logger::init().unwrap(); - - let message = decode_sip_message( - "MESSAGE sip:test.com SIP/2.0\r\n\ - Accept: text/plain\r\n\ - Accept-Encoding: *\r\n\ - Accept-Language: en-gb\r\n\ - Alert-Info: \r\n\ - Allow: MESSAGE\r\n\ - Authentication-Info: qop=auth\r\n\ - Authorization: Digest username=\"Alice\"\r\n\ - Call-ID: f81d4fae-7dec-11d0-a765-00a0c91e6bf6@foo.bar.com\r\n\ - Call-Info: ; purpose=icon\r\n\ - Contact: *\r\n\ - Content-Disposition: session\r\n\ - Content-Encoding: gzip\r\n\ - Content-Language: en-gb\r\n\ - Content-Length: 0\r\n\ - Content-Type: text/plain\r\n\ - CSeq: 1 MESSAGE\r\n\ - Date: Sat, 13 Nov 2010 23:29:00 GMT\r\n\ - Error-Info: \r\n\ - Expires: 30\r\n\ - From: sip:+12125551212@server.phone2net.com; tag=887s\r\n\ - In-Reply-To: 70710@saturn.bell-tel.com,17320@saturn.bell-tel.com\r\n\ - Max-Forwards: 32\r\n\ - MIME-Version:2.0\r\n\ - Min-Expires: 30\r\n\ - Organization: Foobar\r\n\ - Priority: normal\r\n\ - Proxy-Authenticate: Digest realm=\"atlanta.com\"\r\n\ - Proxy-Authorization: Digest username=\"Bob\"\r\n\ - Proxy-Require: foo\r\n\ - Record-Route: \r\n\ - Reply-To: \r\n\ - Require: baz\r\n\ - Retry-After:18000;duration=3600\r\n\ - Route: \r\n\ - Server: rust-sip tokio\r\n\ - Subject: Foobaz\r\n\ - Supported: rec\r\n\ - Timestamp: 1 2\r\n\ - To: ; tag=287447\r\n\ - Unsupported: 100rel\r\n\ - User-Agent: rust-sip\r\n\ - Via: SIP/2.0/UDP pc33.atlanta.com;branch=z9hG4bK776asdhds\r\n\ - Warning: 370 devnull \"Failure\"\r\n\ - WWW-Authenticate: Digest realm=\"biloxi.com\"\r\n\ - X-Extension: test\r\n\r\n", - ); - - if let Message::Request(request) = message { - assert_eq!(*request.method(), Method::MESSAGE); - } else { - panic!("Unexpected message: {:?}", message); - } - } -} diff --git a/src/transport.rs b/src/transport.rs new file mode 100644 index 0000000..804a352 --- /dev/null +++ b/src/transport.rs @@ -0,0 +1,216 @@ +use futures::future; +use futures::{Future, Stream}; +use std; +use std::error::Error; +use std::sync::Arc; +use tokio_core::net::TcpListener; +use tokio_core::reactor::Handle; +use tokio_io::codec::Decoder; + +use codec::SipCodec; +use message::Message; + +pub struct Transport { + listeners: Vec, + handle: Handle, +} + +impl Transport { + pub fn new(handle: Handle) -> Transport { + Transport { + listeners: Vec::new(), + handle: handle, + } + } + + pub fn listen_tcp(handle: Handle, addr: &str) -> Result> { + let mut s = Transport::new(handle); + s.add_tcp_listen_address(addr)?; + Ok(s) + } + + pub fn add_tcp_listen_address(&mut self, addr: &str) -> Result<(), Box> { + let addr = addr.parse()?; + let t = TcpListener::bind(&addr, &self.handle)?; + self.add_listener(t); + Ok(()) + } + + pub fn add_listener(&mut self, listener: TcpListener) { + self.listeners.push(listener); + } + + pub fn run(self, caller: Arc>) -> Box> + where + F: 'static + Fn(Message) -> Result<(), std::io::Error>, + { + let mut future: Box> = Box::new(future::ok(())); + + for listener in self.listeners { + let handle = self.handle.clone(); + let caller = caller.clone(); + + // Iterate incoming connections + future = Box::new( + future + .join(listener.incoming().for_each(move |(tcp, _)| { + let caller = caller.clone(); + + // Split up the read and write halves + let decoder = SipCodec::new(); + + let (_sink, stream) = decoder.framed(tcp).split(); + + let future = stream.for_each(move |message| caller(message)); + + let future = future.map_err(|err| error!("Error {:?}", err)); + + // Spawn the future as a concurrent task + handle.spawn(future); + + Ok(()) + })).map(|_| ()), + ); + } + + future + } +} + +#[cfg(test)] +mod tests { + + extern crate env_logger; + + use super::Transport; + use futures; + use futures::future::Either; + use futures::Future; + use message::Message; + use std::net::Shutdown; + use std::ops::DerefMut; + use std::sync::Arc; + use std::sync::Mutex; + use tokio_core::net::TcpStream; + use tokio_core::reactor::Core; + use tokio_io::io; + use types::Method; + + fn decode_sip_message>(message: S) -> Message { + // Create the event loop that will drive this server + let mut core = Core::new().unwrap(); + + // Address to listen on, and connect to. + let addr = "127.0.0.1:12345"; + + // Create a SIP server which listens on TCP, and stores a message + // when it arrives. + let m = Arc::new(Mutex::new(Option::None)); + + let sip = Transport::listen_tcp(core.handle(), addr).unwrap(); + let server = { + let m = m.clone(); + + sip.run(Arc::new(Box::new(move |message| { + let mut m = m.lock().unwrap(); + *m = Some(message); + Ok(()) + }))) + }; + + // Send the request + let handle = core.handle(); + let addr = "127.0.0.1:12345".parse().unwrap(); + let socket = TcpStream::connect(&addr, &handle); + let message = message.into(); + + // Wait for the send to complete, and any response to arrive + let request = socket.and_then(|socket| io::write_all(socket, message)); + + let finished = request.and_then(|(socket, _request)| { + futures::done(socket.shutdown(Shutdown::Write).map(|_| socket)) + }); + + let response = finished.and_then(|socket| io::read_to_end(socket, Vec::new())); + + // Run the event loop to completion + let res = core.run(server.select2(response)); + + // Check what message was decoded + { + let mut message = m.lock().unwrap(); + + // Response + match (res, message.deref_mut()) { + ( + Ok(Either::B(((ref _socket, ref _response), _))), + ref mut message @ &mut Some(_), + ) => message.take().unwrap(), + (Err(Either::A((err, _))), _) => panic!("{:?}", err), + (Err(Either::B((err, _))), _) => panic!("{:?}", err), + (_, &mut None) => panic!("No message received"), + (Ok(_), _) => panic!("Expected client to complete"), + } + } + } + + #[test] + fn test_message_decode() { + env_logger::init().unwrap(); + + let message = decode_sip_message( + "MESSAGE sip:test.com SIP/2.0\r\n\ + Accept: text/plain\r\n\ + Accept-Encoding: *\r\n\ + Accept-Language: en-gb\r\n\ + Alert-Info: \r\n\ + Allow: MESSAGE\r\n\ + Authentication-Info: qop=auth\r\n\ + Authorization: Digest username=\"Alice\"\r\n\ + Call-ID: f81d4fae-7dec-11d0-a765-00a0c91e6bf6@foo.bar.com\r\n\ + Call-Info: ; purpose=icon\r\n\ + Contact: *\r\n\ + Content-Disposition: session\r\n\ + Content-Encoding: gzip\r\n\ + Content-Language: en-gb\r\n\ + Content-Length: 0\r\n\ + Content-Type: text/plain\r\n\ + CSeq: 1 MESSAGE\r\n\ + Date: Sat, 13 Nov 2010 23:29:00 GMT\r\n\ + Error-Info: \r\n\ + Expires: 30\r\n\ + From: sip:+12125551212@server.phone2net.com; tag=887s\r\n\ + In-Reply-To: 70710@saturn.bell-tel.com,17320@saturn.bell-tel.com\r\n\ + Max-Forwards: 32\r\n\ + MIME-Version:2.0\r\n\ + Min-Expires: 30\r\n\ + Organization: Foobar\r\n\ + Priority: normal\r\n\ + Proxy-Authenticate: Digest realm=\"atlanta.com\"\r\n\ + Proxy-Authorization: Digest username=\"Bob\"\r\n\ + Proxy-Require: foo\r\n\ + Record-Route: \r\n\ + Reply-To: \r\n\ + Require: baz\r\n\ + Retry-After:18000;duration=3600\r\n\ + Route: \r\n\ + Server: rust-sip tokio\r\n\ + Subject: Foobaz\r\n\ + Supported: rec\r\n\ + Timestamp: 1 2\r\n\ + To: ; tag=287447\r\n\ + Unsupported: 100rel\r\n\ + User-Agent: rust-sip\r\n\ + Via: SIP/2.0/UDP pc33.atlanta.com;branch=z9hG4bK776asdhds\r\n\ + Warning: 370 devnull \"Failure\"\r\n\ + WWW-Authenticate: Digest realm=\"biloxi.com\"\r\n\ + X-Extension: test\r\n\r\n", + ); + + if let Message::Request(request) = message { + assert_eq!(*request.method(), Method::MESSAGE); + } else { + panic!("Unexpected message: {:?}", message); + } + } +}