use futures::future;
use futures::{Future, Stream};
use std;
+use std::error::Error;
use std::sync::Arc;
use tokio_core::net::TcpListener;
use tokio_core::reactor::Handle;
pub struct Sip {
listeners: Vec<TcpListener>,
+ handle: Handle,
}
impl Sip {
- pub fn new() -> Sip {
+ pub fn new(handle : Handle) -> Sip {
Sip {
listeners: Vec::new(),
+ handle: handle,
}
}
+ pub fn listen_tcp(handle: Handle, addr: &str) -> Result<Sip, Box<Error>> {
+ let mut s = Sip::new(handle);
+ s.add_tcp_listen_address(addr)?;
+ Ok(s)
+ }
+
+ pub fn add_tcp_listen_address(&mut self, addr: &str) -> Result<(), Box<Error>> {
+ let addr = addr.parse()?;
+ let t = TcpListener::bind(&addr, &self.handle)?;
+ self.add_listener(t);
+ Ok(())
+ }
+
pub fn add_listener(&mut self, listener: TcpListener) {
self.listeners.push(listener);
}
pub fn run<F>(
self,
- handle: Handle,
caller: Arc<Box<F>>,
) -> Box<Future<Item = (), Error = std::io::Error>>
where
let mut future: Box<Future<Item = (), Error = std::io::Error>> = Box::new(future::ok(()));
for listener in self.listeners {
- let handle = handle.clone();
+ let handle = self.handle.clone();
let caller = caller.clone();
// Iterate incoming connections
use std::ops::DerefMut;
use std::sync::Arc;
use std::sync::Mutex;
- use tokio_core::net::TcpListener;
use tokio_core::net::TcpStream;
use tokio_core::reactor::Core;
use tokio_io::io;
fn decode_sip_message<S: Into<String>>(message: S) -> Message {
// Create the event loop that will drive this server
let mut core = Core::new().unwrap();
- let handle = core.handle();
- let mut sip = Sip::new();
-
- // Bind the server's socket
- let addr = "127.0.0.1:12345".parse().unwrap();
- sip.add_listener(TcpListener::bind(&addr, &handle).unwrap());
+ // Address to listen on, and connect to.
+ let addr = "127.0.0.1:12345";
+ // Create a SIP server which listens on TCP, and stores a message
+ // when it arrives.
let m = Arc::new(Mutex::new(Option::None));
+ let sip = Sip::listen_tcp(core.handle(), addr).unwrap();
let server = {
let m = m.clone();
sip.run(
- core.handle(),
Arc::new(Box::new(move |message| {
let mut m = m.lock().unwrap();
*m = Some(message);
)
};
- // Send a request
+ // Send the request
+ let handle = core.handle();
+ let addr = "127.0.0.1:12345".parse().unwrap();
let socket = TcpStream::connect(&addr, &handle);
-
let message = message.into();
+ // Wait for the send to complete, and any response to arrive
let request = socket.and_then(|socket| io::write_all(socket, message));
let finished = request.and_then(|(socket, _request)| {
let response = finished.and_then(|socket| io::read_to_end(socket, Vec::new()));
- // Spin up the server on the event loop, and connect the client to it
+ // Run the event loop to completion
let res = core.run(server.select2(response));
+ // Check what message was decoded
{
let mut message = m.lock().unwrap();