diff --git a/core/services/memcached/src/core.rs b/core/services/memcached/src/core.rs index 1221bb08ed7f..a272a2b5186f 100644 --- a/core/services/memcached/src/core.rs +++ b/core/services/memcached/src/core.rs @@ -23,7 +23,6 @@ use fastpool::bounded; use opendal_core::raw::*; use opendal_core::*; use std::io; -use std::net::SocketAddr; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; @@ -42,10 +41,7 @@ pub enum SocketStream { impl SocketStream { pub async fn connect_tcp(addr_str: &str) -> io::Result { - let socket_addr: SocketAddr = addr_str - .parse() - .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; - let stream = TcpStream::connect(socket_addr).await?; + let stream = TcpStream::connect(addr_str).await?; Ok(SocketStream::Tcp(stream)) } @@ -223,3 +219,61 @@ impl MemcachedCore { conn.delete(&percent_encode_path(key)).await } } + +#[cfg(test)] +mod tests { + use super::*; + use tokio::net::TcpListener; + + // regression test for connecting to a webdav server. + // Because setting up a dedicated behavior test is expensive, we choose to test `SocketStream::connect_tcp` instead. + // In the future, we could set up a webdav server to test TCP connection properly. + #[tokio::test] + async fn connect_tcp_socket() -> std::io::Result<()> { + for addr in &["127.0.0.1:11211", "localhost:11211", "[::1]:11211"] { + let listener = TcpListener::bind(addr).await?; + let addr = listener.local_addr()?.to_string(); + + let accepted = tokio::spawn(async move { + let _ = listener.accept().await?; + Ok::<_, std::io::Error>(()) + }); + + let _stream = SocketStream::connect_tcp(&addr).await?; + accepted.await.unwrap()?; + } + + Ok(()) + } + + // regression test for connecting to a webdav server. + // Because setting up a dedicated behavior test is expensive, we choose to test `SocketStream::connect_unix` instead. + // In the future, we could set up a webdav server to test UNIX socket connection properly. + #[cfg(unix)] + #[tokio::test] + async fn connect_unix_socket() -> std::io::Result<()> { + use std::time::{SystemTime, UNIX_EPOCH}; + use tokio::net::UnixListener; + + let path = std::env::temp_dir().join(format!( + "opendal-memcached-{}.sock", + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() + )); + + let listener = UnixListener::bind(&path)?; + + let accepted = tokio::spawn(async move { + let _ = listener.accept().await?; + Ok::<_, std::io::Error>(()) + }); + + let _stream = SocketStream::connect_unix(path.to_str().unwrap()).await?; + accepted.await.unwrap()?; + + let _ = std::fs::remove_file(path); + Ok(()) + } +}