use bytes::BytesMut;
-
-use futures::future;
-use futures::{Future, Stream};
use nom;
use std;
use std::io;
-use std::sync::Arc;
-use tokio_core::net::TcpListener;
-use tokio_core::reactor::Handle;
use tokio_io::codec::{Decoder, Encoder};
use parser::header;
use parser::top_line;
-use types::{Header, RequestLine, StatusLine, TopLine};
+use types::{Header, Method, RequestLine, StatusLine, TopLine};
const SPACE: u8 = b' ';
const TAB: u8 = b'\t';
fn as_bytes(&self) -> &[u8] {
return self.request_line.1.as_bytes();
}
+
+ pub fn method(&self) -> &Method {
+ &self.request_line.0.method
+ }
}
impl Message {
}
}
-pub struct Sip {
- listeners: Vec<TcpListener>,
-}
-
enum CharType {
Line,
Whitespace,
Some(_) => CharType::Other,
}
}
-
-impl Sip {
- pub fn new() -> Sip {
- Sip {
- listeners: Vec::new(),
- }
- }
-
- pub fn add_listener(&mut self, listener: TcpListener) {
- self.listeners.push(listener);
- }
-
- pub fn run<F>(
- self,
- handle: Handle,
- caller: Arc<Box<F>>,
- ) -> Box<Future<Item = (), Error = std::io::Error>>
- where
- F: 'static + Fn(Message) -> Result<(), std::io::Error>,
- {
- let mut future: Box<Future<Item = (), Error = std::io::Error>> = Box::new(future::ok(()));
-
- for listener in self.listeners {
- let handle = handle.clone();
- let caller = caller.clone();
-
- // Iterate incoming connections
- future = Box::new(
- future
- .join(listener.incoming().for_each(move |(tcp, _)| {
- let caller = caller.clone();
-
- // Split up the read and write halves
- let decoder = SipCodec::new();
-
- let (_sink, stream) = decoder.framed(tcp).split();
-
- let future = stream.for_each(move |message| caller(message));
-
- let future = future.map_err(|err| error!("Error {:?}", err));
-
- // Spawn the future as a concurrent task
- handle.spawn(future);
-
- Ok(())
- })).map(|_| ()),
- );
- }
-
- future
- }
-}
-
-#[cfg(test)]
-mod tests {
-
- extern crate env_logger;
-
- use codec::Message;
- use codec::Sip;
- use futures;
- use futures::future::Either;
- use futures::Future;
- use std::net::Shutdown;
- use std::ops::DerefMut;
- use std::sync::Arc;
- use std::sync::Mutex;
- use tokio_core::net::TcpListener;
- use tokio_core::net::TcpStream;
- use tokio_core::reactor::Core;
- use tokio_io::io;
- use types::Method;
-
- fn decode_sip_message<S: Into<String>>(message: S) -> Message {
- // Create the event loop that will drive this server
- let mut core = Core::new().unwrap();
- let handle = core.handle();
-
- let mut sip = Sip::new();
-
- // Bind the server's socket
- let addr = "127.0.0.1:12345".parse().unwrap();
- sip.add_listener(TcpListener::bind(&addr, &handle).unwrap());
-
- let m = Arc::new(Mutex::new(Option::None));
-
- let server = {
- let m = m.clone();
-
- sip.run(
- core.handle(),
- Arc::new(Box::new(move |message| {
- let mut m = m.lock().unwrap();
- *m = Some(message);
- Ok(())
- })),
- )
- };
-
- // Send a request
- let socket = TcpStream::connect(&addr, &handle);
-
- let message = message.into();
-
- let request = socket.and_then(|socket| io::write_all(socket, message));
-
- let finished = request.and_then(|(socket, _request)| {
- futures::done(socket.shutdown(Shutdown::Write).map(|_| socket))
- });
-
- let response = finished.and_then(|socket| io::read_to_end(socket, Vec::new()));
-
- // Spin up the server on the event loop, and connect the client to it
- let res = core.run(server.select2(response));
-
- {
- let mut message = m.lock().unwrap();
-
- // Response
- match (res, message.deref_mut()) {
- (
- Ok(Either::B(((ref _socket, ref _response), _))),
- ref mut message @ &mut Some(_),
- ) => message.take().unwrap(),
- (Err(Either::A((err, _))), _) => panic!("{:?}", err),
- (Err(Either::B((err, _))), _) => panic!("{:?}", err),
- (_, &mut None) => panic!("No message received"),
- (Ok(_), _) => panic!("Expected client to complete"),
- }
- }
- }
-
- #[test]
- fn test_message_decode() {
- env_logger::init().unwrap();
-
- let message = decode_sip_message(
- "MESSAGE sip:test.com SIP/2.0\r\n\
- Accept: text/plain\r\n\
- Accept-Encoding: *\r\n\
- Accept-Language: en-gb\r\n\
- Alert-Info: <http://www.example.com/sounds/moo.wav>\r\n\
- Allow: MESSAGE\r\n\
- Authentication-Info: qop=auth\r\n\
- Authorization: Digest username=\"Alice\"\r\n\
- Call-ID: f81d4fae-7dec-11d0-a765-00a0c91e6bf6@foo.bar.com\r\n\
- Call-Info: <http://wwww.example.com/alice/photo.jpg>; purpose=icon\r\n\
- Contact: *\r\n\
- Content-Disposition: session\r\n\
- Content-Encoding: gzip\r\n\
- Content-Language: en-gb\r\n\
- Content-Length: 0\r\n\
- Content-Type: text/plain\r\n\
- CSeq: 1 MESSAGE\r\n\
- Date: Sat, 13 Nov 2010 23:29:00 GMT\r\n\
- Error-Info: <sip:not-in-service-recording@atlanta.com>\r\n\
- Expires: 30\r\n\
- From: sip:+12125551212@server.phone2net.com; tag=887s\r\n\
- In-Reply-To: 70710@saturn.bell-tel.com,17320@saturn.bell-tel. com\r\n\
- Max-Forwards: 32\r\n\
- MIME-Version:2.0\r\n\
- Min-Expires: 30\r\n\
- Organization: Foobar\r\n\
- Priority: normal\r\n\
- Proxy-Authenticate: Digest realm=\"atlanta.com\"\r\n\
- Proxy-Authorization: Digest username=\"Bob\"\r\n\
- Proxy-Require: foo\r\n\
- Record-Route: <sip:server10.biloxi.com;lr>\r\n\
- Reply-To: <sip:bob@biloxi.com>\r\n\
- Require: baz\r\n\
- Retry-After:18000;duration=3600\r\n\
- Route: <sip:bigbox3.site3.atlanta.com;lr>\r\n\
- Server: rust-sip tokio\r\n\
- Subject: Foobaz\r\n\
- Supported: rec\r\n\
- Timestamp: 1 2\r\n\
- To: <sip:operator@cs.columbia.edu>; tag=287447\r\n\
- Unsupported: 100rel\r\n\
- User-Agent: rust-sip\r\n\
- Via: SIP/2.0/UDP pc33.atlanta.com;branch=z9hG4bK776asdhds\r\n\
- Warning: 370 devnull \"Failure\"\r\n\
- WWW-Authenticate: Digest realm=\"biloxi.com\"\r\n\
- X-Extension: test\r\n\r\n",
- );
-
- if let Message::Request(request) = message {
- assert_eq!(request.request_line.0.method, Method::MESSAGE);
- } else {
- panic!("Unexpected message: {:?}", message);
- }
- }
-}
--- /dev/null
+use codec::{Message, SipCodec};
+use futures::future;
+use futures::{Future, Stream};
+use std;
+use std::sync::Arc;
+use tokio_core::net::TcpListener;
+use tokio_core::reactor::Handle;
+use tokio_io::codec::Decoder;
+
+pub struct Sip {
+ listeners: Vec<TcpListener>,
+}
+
+impl Sip {
+ pub fn new() -> Sip {
+ Sip {
+ listeners: Vec::new(),
+ }
+ }
+
+ pub fn add_listener(&mut self, listener: TcpListener) {
+ self.listeners.push(listener);
+ }
+
+ pub fn run<F>(
+ self,
+ handle: Handle,
+ caller: Arc<Box<F>>,
+ ) -> Box<Future<Item = (), Error = std::io::Error>>
+ where
+ F: 'static + Fn(Message) -> Result<(), std::io::Error>,
+ {
+ let mut future: Box<Future<Item = (), Error = std::io::Error>> = Box::new(future::ok(()));
+
+ for listener in self.listeners {
+ let handle = handle.clone();
+ let caller = caller.clone();
+
+ // Iterate incoming connections
+ future = Box::new(
+ future
+ .join(listener.incoming().for_each(move |(tcp, _)| {
+ let caller = caller.clone();
+
+ // Split up the read and write halves
+ let decoder = SipCodec::new();
+
+ let (_sink, stream) = decoder.framed(tcp).split();
+
+ let future = stream.for_each(move |message| caller(message));
+
+ let future = future.map_err(|err| error!("Error {:?}", err));
+
+ // Spawn the future as a concurrent task
+ handle.spawn(future);
+
+ Ok(())
+ })).map(|_| ()),
+ );
+ }
+
+ future
+ }
+}
+
+#[cfg(test)]
+mod tests {
+
+ extern crate env_logger;
+
+ use super::Sip;
+ use codec::Message;
+ use futures;
+ use futures::future::Either;
+ use futures::Future;
+ use std::net::Shutdown;
+ use std::ops::DerefMut;
+ use std::sync::Arc;
+ use std::sync::Mutex;
+ use tokio_core::net::TcpListener;
+ use tokio_core::net::TcpStream;
+ use tokio_core::reactor::Core;
+ use tokio_io::io;
+ use types::Method;
+
+ fn decode_sip_message<S: Into<String>>(message: S) -> Message {
+ // Create the event loop that will drive this server
+ let mut core = Core::new().unwrap();
+ let handle = core.handle();
+
+ let mut sip = Sip::new();
+
+ // Bind the server's socket
+ let addr = "127.0.0.1:12345".parse().unwrap();
+ sip.add_listener(TcpListener::bind(&addr, &handle).unwrap());
+
+ let m = Arc::new(Mutex::new(Option::None));
+
+ let server = {
+ let m = m.clone();
+
+ sip.run(
+ core.handle(),
+ Arc::new(Box::new(move |message| {
+ let mut m = m.lock().unwrap();
+ *m = Some(message);
+ Ok(())
+ })),
+ )
+ };
+
+ // Send a request
+ let socket = TcpStream::connect(&addr, &handle);
+
+ let message = message.into();
+
+ let request = socket.and_then(|socket| io::write_all(socket, message));
+
+ let finished = request.and_then(|(socket, _request)| {
+ futures::done(socket.shutdown(Shutdown::Write).map(|_| socket))
+ });
+
+ let response = finished.and_then(|socket| io::read_to_end(socket, Vec::new()));
+
+ // Spin up the server on the event loop, and connect the client to it
+ let res = core.run(server.select2(response));
+
+ {
+ let mut message = m.lock().unwrap();
+
+ // Response
+ match (res, message.deref_mut()) {
+ (
+ Ok(Either::B(((ref _socket, ref _response), _))),
+ ref mut message @ &mut Some(_),
+ ) => message.take().unwrap(),
+ (Err(Either::A((err, _))), _) => panic!("{:?}", err),
+ (Err(Either::B((err, _))), _) => panic!("{:?}", err),
+ (_, &mut None) => panic!("No message received"),
+ (Ok(_), _) => panic!("Expected client to complete"),
+ }
+ }
+ }
+
+ #[test]
+ fn test_message_decode() {
+ env_logger::init().unwrap();
+
+ let message = decode_sip_message(
+ "MESSAGE sip:test.com SIP/2.0\r\n\
+ Accept: text/plain\r\n\
+ Accept-Encoding: *\r\n\
+ Accept-Language: en-gb\r\n\
+ Alert-Info: <http://www.example.com/sounds/moo.wav>\r\n\
+ Allow: MESSAGE\r\n\
+ Authentication-Info: qop=auth\r\n\
+ Authorization: Digest username=\"Alice\"\r\n\
+ Call-ID: f81d4fae-7dec-11d0-a765-00a0c91e6bf6@foo.bar.com\r\n\
+ Call-Info: <http://wwww.example.com/alice/photo.jpg>; purpose=icon\r\n\
+ Contact: *\r\n\
+ Content-Disposition: session\r\n\
+ Content-Encoding: gzip\r\n\
+ Content-Language: en-gb\r\n\
+ Content-Length: 0\r\n\
+ Content-Type: text/plain\r\n\
+ CSeq: 1 MESSAGE\r\n\
+ Date: Sat, 13 Nov 2010 23:29:00 GMT\r\n\
+ Error-Info: <sip:not-in-service-recording@atlanta.com>\r\n\
+ Expires: 30\r\n\
+ From: sip:+12125551212@server.phone2net.com; tag=887s\r\n\
+ In-Reply-To: 70710@saturn.bell-tel.com,17320@saturn.bell-tel.com\r\n\
+ Max-Forwards: 32\r\n\
+ MIME-Version:2.0\r\n\
+ Min-Expires: 30\r\n\
+ Organization: Foobar\r\n\
+ Priority: normal\r\n\
+ Proxy-Authenticate: Digest realm=\"atlanta.com\"\r\n\
+ Proxy-Authorization: Digest username=\"Bob\"\r\n\
+ Proxy-Require: foo\r\n\
+ Record-Route: <sip:server10.biloxi.com;lr>\r\n\
+ Reply-To: <sip:bob@biloxi.com>\r\n\
+ Require: baz\r\n\
+ Retry-After:18000;duration=3600\r\n\
+ Route: <sip:bigbox3.site3.atlanta.com;lr>\r\n\
+ Server: rust-sip tokio\r\n\
+ Subject: Foobaz\r\n\
+ Supported: rec\r\n\
+ Timestamp: 1 2\r\n\
+ To: <sip:operator@cs.columbia.edu>; tag=287447\r\n\
+ Unsupported: 100rel\r\n\
+ User-Agent: rust-sip\r\n\
+ Via: SIP/2.0/UDP pc33.atlanta.com;branch=z9hG4bK776asdhds\r\n\
+ Warning: 370 devnull \"Failure\"\r\n\
+ WWW-Authenticate: Digest realm=\"biloxi.com\"\r\n\
+ X-Extension: test\r\n\r\n",
+ );
+
+ if let Message::Request(request) = message {
+ assert_eq!(*request.method(), Method::MESSAGE);
+ } else {
+ panic!("Unexpected message: {:?}", message);
+ }
+ }
+}