use std::io;
use std;
use nom;
+use std::sync::Arc;
use types::{RequestLine, StatusLine, TopLine, Header};
use parser::top_line;
self.listeners.push(listener);
}
- pub fn run(self, handle: Handle) -> Box<Future<Item = (), Error = std::io::Error>> {
+ pub fn run<F>(self,
+ handle: Handle,
+ caller: Arc<Box<F>>)
+ -> Box<Future<Item = (), Error = std::io::Error>>
+ where F: 'static + Fn(Message) -> Result<(), std::io::Error>
+ {
let mut future: Box<Future<Item = (), Error = std::io::Error>> = Box::new(future::ok(()));
for listener in self.listeners {
- let lis_handle = handle.clone();
+ let handle = handle.clone();
+ let caller = caller.clone();
// Iterate incoming connections
future = Box::new(future.join(listener.incoming().for_each(move |(tcp, _)| {
+ let caller = caller.clone();
+
// Split up the read and write halves
let (_sink, stream) = tcp.framed(SipCodec::new()).split();
- let future = stream.for_each(|message| {
- println!("recevied sip message: {:?}", message);
+ let future = stream.for_each(move |message| caller(message));
- Ok(())
- })
- .map_err(|err| println!("Error {:?}", err));
+ let future = future.map_err(|err| println!("Error {:?}", err));
// Spawn the future as a concurrent task
- lis_handle.spawn(future);
+ handle.spawn(future);
Ok(())
}))
#[cfg(test)]
mod tests {
+ use std::ops::DerefMut;
+ use std::sync::Arc;
+ use std::sync::Mutex;
use codec::Sip;
+ use codec::Message;
use futures;
use futures::Future;
use futures::future::Either;
use tokio_core::net::TcpStream;
use tokio_core::reactor::Core;
use std::net::Shutdown;
+ use types::Method;
- #[test]
- fn it_works() {
+ fn decode_sip_message<S: Into<String>>(message: S) -> Message {
// Create the event loop that will drive this server
let mut core = Core::new().unwrap();
let handle = core.handle();
let addr = "127.0.0.1:12345".parse().unwrap();
sip.add_listener(TcpListener::bind(&addr, &handle).unwrap());
- let server = sip.run(core.handle());
+ let m = Arc::new(Mutex::new(Option::None));
+
+ let server = {
+ let m = m.clone();
+
+ sip.run(core.handle(),
+ Arc::new(Box::new(move |message| {
+ let mut m = m.lock().unwrap();
+ *m = Some(message);
+ Ok(())
+ })))
+ };
// Send a request
let socket = TcpStream::connect(&addr, &handle);
- let request = socket.and_then(|socket| {
- io::write_all(socket,
- "MESSAGE sip:test.com \
- SIP/2.0\r\nAccept:text/plain\r\nAccept-Encoding:*\r\nAccept-Language:\
- en-gb\r\nAlert-Info:<http://www.example.com/sounds/moo.wav>\r\nAllow:\
- MESSAGE\r\nAuthentication-Info:qop=auth\r\nAuthorization:Digest \
- username=\"Alice\"\r\nCall-ID:f81d4fae-7dec-11d0-a765-00a0c91e6bf6@foo.\
- bar.com\r\nCall-Info:<http://wwww.example.com/alice/photo.jpg>;\
- purpose=icon\r\nContact:*\r\nContent-Disposition:\
- session\r\nContent-Encoding:gzip\r\nContent-Language:\
- en-gb\r\nContent-Length:0\r\nContent-Type:text/plain\r\nCSeq:1 \
- MESSAGE\r\nDate:Sat, 13 Nov 2010 23:29:00 \
- GMT\r\nError-Info:<sip:not-in-service-recording@atlanta.\
- com>\r\nExpires:30\r\nFrom:sip:+12125551212@server.phone2net.com;\
- tag=887s\r\nIn-Reply-To:70710@saturn.bell-tel.com,17320@saturn.\
- bell-tel.com\r\nMax-Forwards:32\r\nMIME-Version:2.0\r\nMin-Expires:\
- 30\r\nOrganization:Foobar\r\nPriority:normal\r\nProxy-Authenticate:\
- Digest realm=\"atlanta.com\"\r\nProxy-Authorization:Digest \
- username=\"Bob\"\r\nProxy-Require:foo\r\nRecord-Route:<sip:server10.\
- biloxi.com;lr>\r\nReply-To:<sip:bob@biloxi.com>\r\nRequire:\
- baz\r\nRetry-After:18000;duration=3600\r\nRoute:<sip:bigbox3.site3.\
- atlanta.com;lr>\r\nServer:rust-sip \
- tokio\r\nSubject:Foobaz\r\nSupported:rec\r\nTimestamp:1 \
- 2\r\nTo:<sip:operator@cs.columbia.edu>;tag=287447\r\nUnsupported:\
- 100rel\r\nUser-Agent:rust-sip\r\nVia:SIP/2.0/UDP \
- pc33.atlanta.com;branch=z9hG4bK776asdhds\r\nWarning:370 devnull \
- \"Failure\"\r\nWWW-Authenticate:Digest \
- realm=\"biloxi.com\"\r\nX-Extension:test\r\n\r\n")
- });
+ let message = message.into();
+
+ let request = socket.and_then(|socket| io::write_all(socket, message));
let finished = request.and_then(|(socket, _request)| {
futures::done(socket.shutdown(Shutdown::Write).map(|_| socket))
// Spin up the server on the event loop, and connect the client to it
let res = core.run(server.select2(response));
- // Response
- match res {
- Ok(Either::B(((_socket, response), _))) => {
- // assert_eq!("echo", String::from_utf8_lossy(&response));
+ {
+ let mut message = m.lock().unwrap();
+
+ // Response
+ match (res, message.deref_mut()) {
+ (Ok(Either::B(((ref _socket, ref _response), _))),
+ ref mut message @ &mut Some(_)) => message.take().unwrap(),
+ (Err(Either::A((err, _))), _) => panic!("{:?}", err),
+ (Err(Either::B((err, _))), _) => panic!("{:?}", err),
+ (_, &mut None) => panic!("No message received"),
+ (Ok(_), _) => panic!("Expected client to complete"),
}
- Ok(_) => panic!("Expected client to complete"),
- Err(Either::A((err, _))) => panic!("{:?}", err),
- Err(Either::B((err, _))) => panic!("{:?}", err),
+ }
+ }
+
+ #[test]
+ fn test_message_decode() {
+ let message =
+ decode_sip_message("MESSAGE sip:test.com \
+ SIP/2.0\r\nAccept:text/plain\r\nAccept-Encoding:\
+ *\r\nAccept-Language:en-gb\r\nAlert-Info:<http://www.example.\
+ com/sounds/moo.wav>\r\nAllow:MESSAGE\r\nAuthentication-Info:\
+ qop=auth\r\nAuthorization:Digest \
+ username=\"Alice\"\r\nCall-ID:\
+ f81d4fae-7dec-11d0-a765-00a0c91e6bf6@foo.bar.com\r\nCall-Info:\
+ <http://wwww.example.com/alice/photo.jpg>;\
+ purpose=icon\r\nContact:*\r\nContent-Disposition:\
+ session\r\nContent-Encoding:gzip\r\nContent-Language:\
+ en-gb\r\nContent-Length:0\r\nContent-Type:text/plain\r\nCSeq:1 \
+ MESSAGE\r\nDate:Sat, 13 Nov 2010 23:29:00 \
+ GMT\r\nError-Info:<sip:not-in-service-recording@atlanta.\
+ com>\r\nExpires:30\r\nFrom:sip:+12125551212@server.phone2net.com;\
+ tag=887s\r\nIn-Reply-To:70710@saturn.bell-tel.com,17320@saturn.\
+ bell-tel.com\r\nMax-Forwards:32\r\nMIME-Version:2.\
+ 0\r\nMin-Expires:30\r\nOrganization:Foobar\r\nPriority:\
+ normal\r\nProxy-Authenticate:Digest \
+ realm=\"atlanta.com\"\r\nProxy-Authorization:Digest \
+ username=\"Bob\"\r\nProxy-Require:foo\r\nRecord-Route:<sip:\
+ server10.biloxi.com;lr>\r\nReply-To:<sip:bob@biloxi.\
+ com>\r\nRequire:baz\r\nRetry-After:18000;duration=3600\r\nRoute:\
+ <sip:bigbox3.site3.atlanta.com;lr>\r\nServer:rust-sip \
+ tokio\r\nSubject:Foobaz\r\nSupported:rec\r\nTimestamp:1 \
+ 2\r\nTo:<sip:operator@cs.columbia.edu>;tag=287447\r\nUnsupported:\
+ 100rel\r\nUser-Agent:rust-sip\r\nVia:SIP/2.0/UDP \
+ pc33.atlanta.com;branch=z9hG4bK776asdhds\r\nWarning:370 devnull \
+ \"Failure\"\r\nWWW-Authenticate:Digest \
+ realm=\"biloxi.com\"\r\nX-Extension:test\r\n\r\n");
+
+ if let Message::Request(request) = message {
+ assert_eq!(request.request_line.0.method, Method::MESSAGE);
+ } else {
+ panic!("Unexpected message: {:?}", message);
}
}
}