Skip to content

Half-blocking method. Is it supported/possible? #271

Description

@MageSlayer

Hi

I am trying to implement "half-blocking" mode.
That is blocking write & non-blocking read.
Currently I use following code together with tungstenite. Blocking write is done using "write_inner". TcpStream is from "mio" crate.

Unfortunately, it does not work reliably.
I am getting various errors at handshake & later time.

I'd like to ask. If rust-websocket could be used to implement working scheme like that?

struct TcpCustomStream {
    s: TcpStream,

    block: bool,
    poll: Poll,
    events: Events,
}

impl TcpCustomStream {
    pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
        let a = if let Some(x) = addr.to_socket_addrs()?.next() {
            x
        } else {
            return Err(std::io::Error::new(
                std::io::ErrorKind::Other,
                "Tcp connect call failed",
            ));
        };
        match TcpStream::connect(a) {
            Ok(mut x) => {
                let poll = Poll::new()?;
                let events = Events::with_capacity(128);

                // Register the socket with `Poll`
                poll.registry()
                    .register(&mut x, Token(0), Interest::WRITABLE)?;

                Ok(Self {
                    s: x,
                    poll,
                    events,
                    block: true,
                })
            }
            Err(x) => Err(x),
        }
    }

    pub fn set_nonblocking(&mut self) -> io::Result<()> {
        self.block = false;
        Ok(())
    }

    pub fn wait_write(&mut self) -> io::Result<()> {
        loop {
            //println!("tcp wait_write poll");
            self.poll.poll(&mut self.events, None)?;

            for event in &self.events {
                if event.token() == Token(0) && event.is_writable() {
                    // The socket connected (probably, it could still be a spurious
                    // wakeup)
                    //println!("tcp wait_write ready");
                    return Ok(());
                }
            }
        }
    }

}

impl std::io::Read for TcpCustomStream {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        loop {
            match self.s.read(buf) {
                Err(x) if self.block && would_block(&x) => {
                    //println!("tcp read would_block {:?}", x);
                    thread::yield_now();
                }
                // x @ Err(_) => {
                //     //println!("tcp read err {:?}", x);
                //     break x;
                // }
                x => break x,
            }
        }
    }
}

impl std::io::Write for TcpCustomStream {
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
        match self.s.write(buf) {
            // x @ Err(_) => {
            //     println!("tcp write err {:?}", x);
            //     x
            // }
            x => x,
        }
    }
    fn flush(&mut self) -> std::io::Result<()> {
        self.s.flush()
    }
}

... skipped
pub struct Client {
    socket: Option<WebSocket<TcpCustomStream>>,
}

fn would_block(err: &std::io::Error) -> bool {
    match err {
        x if (x.kind() == io::ErrorKind::WouldBlock)
            || (x.kind() == std::io::ErrorKind::Interrupted) =>
        {
            true
        }
        _ => false,
    }
}

impl Client {
    pub fn new() -> Self {
        Self {
            socket: None,
        }
    }

    pub fn connect<'a>(&mut self, uri_str: &'a str) -> Result<(), std::io::Error> {
        println!("Client started");

        let (mut client, _) = {
            let mut c = 10;
            loop {
                let req = Uri::from_maybe_shared(uri_str.to_string()).map_err(|_| {
                    std::io::Error::new(std::io::ErrorKind::InvalidInput, "Invalid connect URI")
                })?;

                let host_port = req
                    .authority()
                    .ok_or(std::io::Error::new(
                        std::io::ErrorKind::InvalidInput,
                        "Invalid connect URI",
                    ))?
                    .as_str();

                let stream = TcpCustomStream::connect(host_port)?;
                println!("Client connected");

                match client(req, stream) {
                    Err(x) => {
                        println!("Connect {:?}", x);
                        if c == 1 {
                            return Err(std::io::Error::new(
                                std::io::ErrorKind::Other,
                                format!("Cannot connect to {}", uri_str),
                            ));
                        }
                        c -= 1;
                        thread::sleep(std::time::Duration::from_millis(1000));
                    }
                    Ok(x) => break x,
                }
            }
        };
        println!("WSClient connected");

        client.get_mut().set_nonblocking()?;
        println!("Set nonblocking mode");

        self.socket = Some(client);

        Ok(())
    }

    fn write_inner(socket: &mut WebSocket<TcpCustomStream>, m: Message) -> Result<(), Error> {
        match socket.write_message(m) {
            Err(tungstenite::Error::Io(e)) if would_block(&e) => {
                //println!("cl write would_block");
                return loop {
                    socket.get_mut().wait_write()?;
                    match socket.write_pending() {
                        Err(tungstenite::Error::Io(e)) if would_block(&e) => continue,
                        x => break x,
                    }
                };
            }
            x => {
                //println!("cl write {:?}", x);
                x
            }
        }
    }

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions