From 36bade570493bfd8b89969b76425c41c667185f2 Mon Sep 17 00:00:00 2001 From: Richard Whitehouse Date: Sun, 12 Nov 2017 17:40:27 +0000 Subject: [PATCH] Allow handler to be passed into run() --- src/codec.rs | 133 +++++++++++++++++++++++++++++++++------------------ src/types.rs | 2 +- 2 files changed, 87 insertions(+), 48 deletions(-) diff --git a/src/codec.rs b/src/codec.rs index c597461..5d7bb5a 100644 --- a/src/codec.rs +++ b/src/codec.rs @@ -9,6 +9,7 @@ use tokio_io::codec::{Encoder, Decoder}; use std::io; use std; use nom; +use std::sync::Arc; use types::{RequestLine, StatusLine, TopLine, Header}; use parser::top_line; @@ -361,29 +362,34 @@ impl Sip { self.listeners.push(listener); } - pub fn run(self, handle: Handle) -> Box> { + pub fn run(self, + handle: Handle, + caller: Arc>) + -> Box> + where F: 'static + Fn(Message) -> Result<(), std::io::Error> + { let mut future: Box> = Box::new(future::ok(())); for listener in self.listeners { - let lis_handle = handle.clone(); + let handle = handle.clone(); + let caller = caller.clone(); // Iterate incoming connections future = Box::new(future.join(listener.incoming().for_each(move |(tcp, _)| { + let caller = caller.clone(); + // Split up the read and write halves let (_sink, stream) = tcp.framed(SipCodec::new()).split(); - let future = stream.for_each(|message| { - println!("recevied sip message: {:?}", message); + let future = stream.for_each(move |message| caller(message)); - Ok(()) - }) - .map_err(|err| println!("Error {:?}", err)); + let future = future.map_err(|err| println!("Error {:?}", err)); // Spawn the future as a concurrent task - lis_handle.spawn(future); + handle.spawn(future); Ok(()) })) @@ -397,7 +403,11 @@ impl Sip { #[cfg(test)] mod tests { + use std::ops::DerefMut; + use std::sync::Arc; + use std::sync::Mutex; use codec::Sip; + use codec::Message; use futures; use futures::Future; use futures::future::Either; @@ -406,9 +416,9 @@ mod tests { use tokio_core::net::TcpStream; use tokio_core::reactor::Core; use std::net::Shutdown; + use types::Method; - #[test] - fn it_works() { + fn decode_sip_message>(message: S) -> Message { // Create the event loop that will drive this server let mut core = Core::new().unwrap(); let handle = core.handle(); @@ -419,40 +429,25 @@ mod tests { let addr = "127.0.0.1:12345".parse().unwrap(); sip.add_listener(TcpListener::bind(&addr, &handle).unwrap()); - let server = sip.run(core.handle()); + let m = Arc::new(Mutex::new(Option::None)); + + let server = { + let m = m.clone(); + + sip.run(core.handle(), + Arc::new(Box::new(move |message| { + let mut m = m.lock().unwrap(); + *m = Some(message); + Ok(()) + }))) + }; // Send a request let socket = TcpStream::connect(&addr, &handle); - let request = socket.and_then(|socket| { - io::write_all(socket, - "MESSAGE sip:test.com \ - SIP/2.0\r\nAccept:text/plain\r\nAccept-Encoding:*\r\nAccept-Language:\ - en-gb\r\nAlert-Info:\r\nAllow:\ - MESSAGE\r\nAuthentication-Info:qop=auth\r\nAuthorization:Digest \ - username=\"Alice\"\r\nCall-ID:f81d4fae-7dec-11d0-a765-00a0c91e6bf6@foo.\ - bar.com\r\nCall-Info:;\ - purpose=icon\r\nContact:*\r\nContent-Disposition:\ - session\r\nContent-Encoding:gzip\r\nContent-Language:\ - en-gb\r\nContent-Length:0\r\nContent-Type:text/plain\r\nCSeq:1 \ - MESSAGE\r\nDate:Sat, 13 Nov 2010 23:29:00 \ - GMT\r\nError-Info:\r\nExpires:30\r\nFrom:sip:+12125551212@server.phone2net.com;\ - tag=887s\r\nIn-Reply-To:70710@saturn.bell-tel.com,17320@saturn.\ - bell-tel.com\r\nMax-Forwards:32\r\nMIME-Version:2.0\r\nMin-Expires:\ - 30\r\nOrganization:Foobar\r\nPriority:normal\r\nProxy-Authenticate:\ - Digest realm=\"atlanta.com\"\r\nProxy-Authorization:Digest \ - username=\"Bob\"\r\nProxy-Require:foo\r\nRecord-Route:\r\nReply-To:\r\nRequire:\ - baz\r\nRetry-After:18000;duration=3600\r\nRoute:\r\nServer:rust-sip \ - tokio\r\nSubject:Foobaz\r\nSupported:rec\r\nTimestamp:1 \ - 2\r\nTo:;tag=287447\r\nUnsupported:\ - 100rel\r\nUser-Agent:rust-sip\r\nVia:SIP/2.0/UDP \ - pc33.atlanta.com;branch=z9hG4bK776asdhds\r\nWarning:370 devnull \ - \"Failure\"\r\nWWW-Authenticate:Digest \ - realm=\"biloxi.com\"\r\nX-Extension:test\r\n\r\n") - }); + let message = message.into(); + + let request = socket.and_then(|socket| io::write_all(socket, message)); let finished = request.and_then(|(socket, _request)| { futures::done(socket.shutdown(Shutdown::Write).map(|_| socket)) @@ -463,14 +458,58 @@ mod tests { // Spin up the server on the event loop, and connect the client to it let res = core.run(server.select2(response)); - // Response - match res { - Ok(Either::B(((_socket, response), _))) => { - // assert_eq!("echo", String::from_utf8_lossy(&response)); + { + let mut message = m.lock().unwrap(); + + // Response + match (res, message.deref_mut()) { + (Ok(Either::B(((ref _socket, ref _response), _))), + ref mut message @ &mut Some(_)) => message.take().unwrap(), + (Err(Either::A((err, _))), _) => panic!("{:?}", err), + (Err(Either::B((err, _))), _) => panic!("{:?}", err), + (_, &mut None) => panic!("No message received"), + (Ok(_), _) => panic!("Expected client to complete"), } - Ok(_) => panic!("Expected client to complete"), - Err(Either::A((err, _))) => panic!("{:?}", err), - Err(Either::B((err, _))) => panic!("{:?}", err), + } + } + + #[test] + fn test_message_decode() { + let message = + decode_sip_message("MESSAGE sip:test.com \ + SIP/2.0\r\nAccept:text/plain\r\nAccept-Encoding:\ + *\r\nAccept-Language:en-gb\r\nAlert-Info:\r\nAllow:MESSAGE\r\nAuthentication-Info:\ + qop=auth\r\nAuthorization:Digest \ + username=\"Alice\"\r\nCall-ID:\ + f81d4fae-7dec-11d0-a765-00a0c91e6bf6@foo.bar.com\r\nCall-Info:\ + ;\ + purpose=icon\r\nContact:*\r\nContent-Disposition:\ + session\r\nContent-Encoding:gzip\r\nContent-Language:\ + en-gb\r\nContent-Length:0\r\nContent-Type:text/plain\r\nCSeq:1 \ + MESSAGE\r\nDate:Sat, 13 Nov 2010 23:29:00 \ + GMT\r\nError-Info:\r\nExpires:30\r\nFrom:sip:+12125551212@server.phone2net.com;\ + tag=887s\r\nIn-Reply-To:70710@saturn.bell-tel.com,17320@saturn.\ + bell-tel.com\r\nMax-Forwards:32\r\nMIME-Version:2.\ + 0\r\nMin-Expires:30\r\nOrganization:Foobar\r\nPriority:\ + normal\r\nProxy-Authenticate:Digest \ + realm=\"atlanta.com\"\r\nProxy-Authorization:Digest \ + username=\"Bob\"\r\nProxy-Require:foo\r\nRecord-Route:\r\nReply-To:\r\nRequire:baz\r\nRetry-After:18000;duration=3600\r\nRoute:\ + \r\nServer:rust-sip \ + tokio\r\nSubject:Foobaz\r\nSupported:rec\r\nTimestamp:1 \ + 2\r\nTo:;tag=287447\r\nUnsupported:\ + 100rel\r\nUser-Agent:rust-sip\r\nVia:SIP/2.0/UDP \ + pc33.atlanta.com;branch=z9hG4bK776asdhds\r\nWarning:370 devnull \ + \"Failure\"\r\nWWW-Authenticate:Digest \ + realm=\"biloxi.com\"\r\nX-Extension:test\r\n\r\n"); + + if let Message::Request(request) = message { + assert_eq!(request.request_line.0.method, Method::MESSAGE); + } else { + panic!("Unexpected message: {:?}", message); } } } diff --git a/src/types.rs b/src/types.rs index 88495dc..73cf82c 100644 --- a/src/types.rs +++ b/src/types.rs @@ -64,7 +64,7 @@ pub enum UserParam { Other(Vec), } -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum Method { INVITE, ACK, -- 2.34.1