Separate out the server
authorRichard Whitehouse <github@richardwhiuk.com>
Mon, 22 Oct 2018 12:04:33 +0000 (08:04 -0400)
committerRichard Whitehouse <github@richardwhiuk.com>
Mon, 22 Oct 2018 12:04:33 +0000 (08:04 -0400)
src/codec.rs
src/lib.rs
src/server.rs [new file with mode: 0644]

index 91120d57cfd2e2d4883be1480c64f348b8a479c0..6921f7a9020f2d87c50149fd9dacaf018410e765 100644 (file)
@@ -1,18 +1,12 @@
 use bytes::BytesMut;
-
-use futures::future;
-use futures::{Future, Stream};
 use nom;
 use std;
 use std::io;
-use std::sync::Arc;
-use tokio_core::net::TcpListener;
-use tokio_core::reactor::Handle;
 use tokio_io::codec::{Decoder, Encoder};
 
 use parser::header;
 use parser::top_line;
-use types::{Header, RequestLine, StatusLine, TopLine};
+use types::{Header, Method, RequestLine, StatusLine, TopLine};
 
 const SPACE: u8 = b' ';
 const TAB: u8 = b'\t';
@@ -261,6 +255,10 @@ impl Request {
     fn as_bytes(&self) -> &[u8] {
         return self.request_line.1.as_bytes();
     }
+
+    pub fn method(&self) -> &Method {
+        &self.request_line.0.method
+    }
 }
 
 impl Message {
@@ -334,10 +332,6 @@ impl Encoder for SipCodec {
     }
 }
 
-pub struct Sip {
-    listeners: Vec<TcpListener>,
-}
-
 enum CharType {
     Line,
     Whitespace,
@@ -351,195 +345,3 @@ fn chartype(char: Option<&u8>) -> CharType {
         Some(_) => CharType::Other,
     }
 }
-
-impl Sip {
-    pub fn new() -> Sip {
-        Sip {
-            listeners: Vec::new(),
-        }
-    }
-
-    pub fn add_listener(&mut self, listener: TcpListener) {
-        self.listeners.push(listener);
-    }
-
-    pub fn run<F>(
-        self,
-        handle: Handle,
-        caller: Arc<Box<F>>,
-    ) -> Box<Future<Item = (), Error = std::io::Error>>
-    where
-        F: 'static + Fn(Message) -> Result<(), std::io::Error>,
-    {
-        let mut future: Box<Future<Item = (), Error = std::io::Error>> = Box::new(future::ok(()));
-
-        for listener in self.listeners {
-            let handle = handle.clone();
-            let caller = caller.clone();
-
-            // Iterate incoming connections
-            future = Box::new(
-                future
-                    .join(listener.incoming().for_each(move |(tcp, _)| {
-                        let caller = caller.clone();
-
-                        // Split up the read and write halves
-                        let decoder = SipCodec::new();
-
-                        let (_sink, stream) = decoder.framed(tcp).split();
-
-                        let future = stream.for_each(move |message| caller(message));
-
-                        let future = future.map_err(|err| error!("Error {:?}", err));
-
-                        // Spawn the future as a concurrent task
-                        handle.spawn(future);
-
-                        Ok(())
-                    })).map(|_| ()),
-            );
-        }
-
-        future
-    }
-}
-
-#[cfg(test)]
-mod tests {
-
-    extern crate env_logger;
-
-    use codec::Message;
-    use codec::Sip;
-    use futures;
-    use futures::future::Either;
-    use futures::Future;
-    use std::net::Shutdown;
-    use std::ops::DerefMut;
-    use std::sync::Arc;
-    use std::sync::Mutex;
-    use tokio_core::net::TcpListener;
-    use tokio_core::net::TcpStream;
-    use tokio_core::reactor::Core;
-    use tokio_io::io;
-    use types::Method;
-
-    fn decode_sip_message<S: Into<String>>(message: S) -> Message {
-        // Create the event loop that will drive this server
-        let mut core = Core::new().unwrap();
-        let handle = core.handle();
-
-        let mut sip = Sip::new();
-
-        // Bind the server's socket
-        let addr = "127.0.0.1:12345".parse().unwrap();
-        sip.add_listener(TcpListener::bind(&addr, &handle).unwrap());
-
-        let m = Arc::new(Mutex::new(Option::None));
-
-        let server = {
-            let m = m.clone();
-
-            sip.run(
-                core.handle(),
-                Arc::new(Box::new(move |message| {
-                    let mut m = m.lock().unwrap();
-                    *m = Some(message);
-                    Ok(())
-                })),
-            )
-        };
-
-        // Send a request
-        let socket = TcpStream::connect(&addr, &handle);
-
-        let message = message.into();
-
-        let request = socket.and_then(|socket| io::write_all(socket, message));
-
-        let finished = request.and_then(|(socket, _request)| {
-            futures::done(socket.shutdown(Shutdown::Write).map(|_| socket))
-        });
-
-        let response = finished.and_then(|socket| io::read_to_end(socket, Vec::new()));
-
-        // Spin up the server on the event loop, and connect the client to it
-        let res = core.run(server.select2(response));
-
-        {
-            let mut message = m.lock().unwrap();
-
-            // Response
-            match (res, message.deref_mut()) {
-                (
-                    Ok(Either::B(((ref _socket, ref _response), _))),
-                    ref mut message @ &mut Some(_),
-                ) => message.take().unwrap(),
-                (Err(Either::A((err, _))), _) => panic!("{:?}", err),
-                (Err(Either::B((err, _))), _) => panic!("{:?}", err),
-                (_, &mut None) => panic!("No message received"),
-                (Ok(_), _) => panic!("Expected client to complete"),
-            }
-        }
-    }
-
-    #[test]
-    fn test_message_decode() {
-        env_logger::init().unwrap();
-
-        let message = decode_sip_message(
-            "MESSAGE sip:test.com SIP/2.0\r\n\
-             Accept: text/plain\r\n\
-             Accept-Encoding: *\r\n\
-             Accept-Language: en-gb\r\n\
-             Alert-Info: <http://www.example.com/sounds/moo.wav>\r\n\
-             Allow: MESSAGE\r\n\
-             Authentication-Info: qop=auth\r\n\
-             Authorization: Digest username=\"Alice\"\r\n\
-             Call-ID: f81d4fae-7dec-11d0-a765-00a0c91e6bf6@foo.bar.com\r\n\
-             Call-Info: <http://wwww.example.com/alice/photo.jpg>; purpose=icon\r\n\
-             Contact: *\r\n\
-             Content-Disposition: session\r\n\
-             Content-Encoding: gzip\r\n\
-             Content-Language: en-gb\r\n\
-             Content-Length: 0\r\n\
-             Content-Type: text/plain\r\n\
-             CSeq: 1 MESSAGE\r\n\
-             Date: Sat, 13 Nov 2010 23:29:00 GMT\r\n\
-             Error-Info: <sip:not-in-service-recording@atlanta.com>\r\n\
-             Expires: 30\r\n\
-             From: sip:+12125551212@server.phone2net.com; tag=887s\r\n\
-             In-Reply-To: 70710@saturn.bell-tel.com,17320@saturn.bell-tel. com\r\n\
-             Max-Forwards: 32\r\n\
-             MIME-Version:2.0\r\n\
-             Min-Expires: 30\r\n\
-             Organization: Foobar\r\n\
-             Priority: normal\r\n\
-             Proxy-Authenticate: Digest realm=\"atlanta.com\"\r\n\
-             Proxy-Authorization: Digest username=\"Bob\"\r\n\
-             Proxy-Require: foo\r\n\
-             Record-Route: <sip:server10.biloxi.com;lr>\r\n\
-             Reply-To: <sip:bob@biloxi.com>\r\n\
-             Require: baz\r\n\
-             Retry-After:18000;duration=3600\r\n\
-             Route: <sip:bigbox3.site3.atlanta.com;lr>\r\n\
-             Server: rust-sip tokio\r\n\
-             Subject: Foobaz\r\n\
-             Supported: rec\r\n\
-             Timestamp: 1 2\r\n\
-             To: <sip:operator@cs.columbia.edu>; tag=287447\r\n\
-             Unsupported: 100rel\r\n\
-             User-Agent: rust-sip\r\n\
-             Via: SIP/2.0/UDP pc33.atlanta.com;branch=z9hG4bK776asdhds\r\n\
-             Warning: 370 devnull \"Failure\"\r\n\
-             WWW-Authenticate: Digest realm=\"biloxi.com\"\r\n\
-             X-Extension: test\r\n\r\n",
-        );
-
-        if let Message::Request(request) = message {
-            assert_eq!(request.request_line.0.method, Method::MESSAGE);
-        } else {
-            panic!("Unexpected message: {:?}", message);
-        }
-    }
-}
index bc3d89343111ad9c92505b9a73b69d2d55e80fa2..10161b32734143d54e7f6c0d9db48b08b59d98af 100644 (file)
@@ -11,6 +11,7 @@ extern crate log;
 #[macro_use]
 extern crate nom;
 
-pub mod codec;
+mod codec;
 mod parser;
+pub mod server;
 mod types;
diff --git a/src/server.rs b/src/server.rs
new file mode 100644 (file)
index 0000000..a40b3ce
--- /dev/null
@@ -0,0 +1,204 @@
+use codec::{Message, SipCodec};
+use futures::future;
+use futures::{Future, Stream};
+use std;
+use std::sync::Arc;
+use tokio_core::net::TcpListener;
+use tokio_core::reactor::Handle;
+use tokio_io::codec::Decoder;
+
+pub struct Sip {
+    listeners: Vec<TcpListener>,
+}
+
+impl Sip {
+    pub fn new() -> Sip {
+        Sip {
+            listeners: Vec::new(),
+        }
+    }
+
+    pub fn add_listener(&mut self, listener: TcpListener) {
+        self.listeners.push(listener);
+    }
+
+    pub fn run<F>(
+        self,
+        handle: Handle,
+        caller: Arc<Box<F>>,
+    ) -> Box<Future<Item = (), Error = std::io::Error>>
+    where
+        F: 'static + Fn(Message) -> Result<(), std::io::Error>,
+    {
+        let mut future: Box<Future<Item = (), Error = std::io::Error>> = Box::new(future::ok(()));
+
+        for listener in self.listeners {
+            let handle = handle.clone();
+            let caller = caller.clone();
+
+            // Iterate incoming connections
+            future = Box::new(
+                future
+                    .join(listener.incoming().for_each(move |(tcp, _)| {
+                        let caller = caller.clone();
+
+                        // Split up the read and write halves
+                        let decoder = SipCodec::new();
+
+                        let (_sink, stream) = decoder.framed(tcp).split();
+
+                        let future = stream.for_each(move |message| caller(message));
+
+                        let future = future.map_err(|err| error!("Error {:?}", err));
+
+                        // Spawn the future as a concurrent task
+                        handle.spawn(future);
+
+                        Ok(())
+                    })).map(|_| ()),
+            );
+        }
+
+        future
+    }
+}
+
+#[cfg(test)]
+mod tests {
+
+    extern crate env_logger;
+
+    use super::Sip;
+    use codec::Message;
+    use futures;
+    use futures::future::Either;
+    use futures::Future;
+    use std::net::Shutdown;
+    use std::ops::DerefMut;
+    use std::sync::Arc;
+    use std::sync::Mutex;
+    use tokio_core::net::TcpListener;
+    use tokio_core::net::TcpStream;
+    use tokio_core::reactor::Core;
+    use tokio_io::io;
+    use types::Method;
+
+    fn decode_sip_message<S: Into<String>>(message: S) -> Message {
+        // Create the event loop that will drive this server
+        let mut core = Core::new().unwrap();
+        let handle = core.handle();
+
+        let mut sip = Sip::new();
+
+        // Bind the server's socket
+        let addr = "127.0.0.1:12345".parse().unwrap();
+        sip.add_listener(TcpListener::bind(&addr, &handle).unwrap());
+
+        let m = Arc::new(Mutex::new(Option::None));
+
+        let server = {
+            let m = m.clone();
+
+            sip.run(
+                core.handle(),
+                Arc::new(Box::new(move |message| {
+                    let mut m = m.lock().unwrap();
+                    *m = Some(message);
+                    Ok(())
+                })),
+            )
+        };
+
+        // Send a request
+        let socket = TcpStream::connect(&addr, &handle);
+
+        let message = message.into();
+
+        let request = socket.and_then(|socket| io::write_all(socket, message));
+
+        let finished = request.and_then(|(socket, _request)| {
+            futures::done(socket.shutdown(Shutdown::Write).map(|_| socket))
+        });
+
+        let response = finished.and_then(|socket| io::read_to_end(socket, Vec::new()));
+
+        // Spin up the server on the event loop, and connect the client to it
+        let res = core.run(server.select2(response));
+
+        {
+            let mut message = m.lock().unwrap();
+
+            // Response
+            match (res, message.deref_mut()) {
+                (
+                    Ok(Either::B(((ref _socket, ref _response), _))),
+                    ref mut message @ &mut Some(_),
+                ) => message.take().unwrap(),
+                (Err(Either::A((err, _))), _) => panic!("{:?}", err),
+                (Err(Either::B((err, _))), _) => panic!("{:?}", err),
+                (_, &mut None) => panic!("No message received"),
+                (Ok(_), _) => panic!("Expected client to complete"),
+            }
+        }
+    }
+
+    #[test]
+    fn test_message_decode() {
+        env_logger::init().unwrap();
+
+        let message = decode_sip_message(
+            "MESSAGE sip:test.com SIP/2.0\r\n\
+             Accept: text/plain\r\n\
+             Accept-Encoding: *\r\n\
+             Accept-Language: en-gb\r\n\
+             Alert-Info: <http://www.example.com/sounds/moo.wav>\r\n\
+             Allow: MESSAGE\r\n\
+             Authentication-Info: qop=auth\r\n\
+             Authorization: Digest username=\"Alice\"\r\n\
+             Call-ID: f81d4fae-7dec-11d0-a765-00a0c91e6bf6@foo.bar.com\r\n\
+             Call-Info: <http://wwww.example.com/alice/photo.jpg>; purpose=icon\r\n\
+             Contact: *\r\n\
+             Content-Disposition: session\r\n\
+             Content-Encoding: gzip\r\n\
+             Content-Language: en-gb\r\n\
+             Content-Length: 0\r\n\
+             Content-Type: text/plain\r\n\
+             CSeq: 1 MESSAGE\r\n\
+             Date: Sat, 13 Nov 2010 23:29:00 GMT\r\n\
+             Error-Info: <sip:not-in-service-recording@atlanta.com>\r\n\
+             Expires: 30\r\n\
+             From: sip:+12125551212@server.phone2net.com; tag=887s\r\n\
+             In-Reply-To: 70710@saturn.bell-tel.com,17320@saturn.bell-tel.com\r\n\
+             Max-Forwards: 32\r\n\
+             MIME-Version:2.0\r\n\
+             Min-Expires: 30\r\n\
+             Organization: Foobar\r\n\
+             Priority: normal\r\n\
+             Proxy-Authenticate: Digest realm=\"atlanta.com\"\r\n\
+             Proxy-Authorization: Digest username=\"Bob\"\r\n\
+             Proxy-Require: foo\r\n\
+             Record-Route: <sip:server10.biloxi.com;lr>\r\n\
+             Reply-To: <sip:bob@biloxi.com>\r\n\
+             Require: baz\r\n\
+             Retry-After:18000;duration=3600\r\n\
+             Route: <sip:bigbox3.site3.atlanta.com;lr>\r\n\
+             Server: rust-sip tokio\r\n\
+             Subject: Foobaz\r\n\
+             Supported: rec\r\n\
+             Timestamp: 1 2\r\n\
+             To: <sip:operator@cs.columbia.edu>; tag=287447\r\n\
+             Unsupported: 100rel\r\n\
+             User-Agent: rust-sip\r\n\
+             Via: SIP/2.0/UDP pc33.atlanta.com;branch=z9hG4bK776asdhds\r\n\
+             Warning: 370 devnull \"Failure\"\r\n\
+             WWW-Authenticate: Digest realm=\"biloxi.com\"\r\n\
+             X-Extension: test\r\n\r\n",
+        );
+
+        if let Message::Request(request) = message {
+            assert_eq!(*request.method(), Method::MESSAGE);
+        } else {
+            panic!("Unexpected message: {:?}", message);
+        }
+    }
+}