Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 59 additions & 5 deletions core/services/memcached/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use fastpool::bounded;
use opendal_core::raw::*;
use opendal_core::*;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
Expand All @@ -42,10 +41,7 @@ pub enum SocketStream {

impl SocketStream {
pub async fn connect_tcp(addr_str: &str) -> io::Result<Self> {
let socket_addr: SocketAddr = addr_str
.parse()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
let stream = TcpStream::connect(socket_addr).await?;
let stream = TcpStream::connect(addr_str).await?;
Ok(SocketStream::Tcp(stream))
}

Expand Down Expand Up @@ -223,3 +219,61 @@ impl MemcachedCore {
conn.delete(&percent_encode_path(key)).await
}
}

#[cfg(test)]
mod tests {
use super::*;
use tokio::net::TcpListener;

// regression test for connecting to a webdav server.
// Because setting up a dedicated behavior test is expensive, we choose to test `SocketStream::connect_tcp` instead.
// In the future, we could set up a webdav server to test TCP connection properly.
#[tokio::test]
Comment thread
flip1995 marked this conversation as resolved.
async fn connect_tcp_socket() -> std::io::Result<()> {
for addr in &["127.0.0.1:11211", "localhost:11211", "[::1]:11211"] {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I usually don't use hard-coded port number -- 11211 seems to be a well-known port number for memcached.

let listener = TcpListener::bind(addr).await?;
let addr = listener.local_addr()?.to_string();

let accepted = tokio::spawn(async move {
let _ = listener.accept().await?;
Ok::<_, std::io::Error>(())
});

let _stream = SocketStream::connect_tcp(&addr).await?;
accepted.await.unwrap()?;
}

Ok(())
}

// regression test for connecting to a webdav server.
// Because setting up a dedicated behavior test is expensive, we choose to test `SocketStream::connect_unix` instead.
// In the future, we could set up a webdav server to test UNIX socket connection properly.
#[cfg(unix)]
Comment thread
flip1995 marked this conversation as resolved.
#[tokio::test]
async fn connect_unix_socket() -> std::io::Result<()> {
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::net::UnixListener;

let path = std::env::temp_dir().join(format!(
"opendal-memcached-{}.sock",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos()
));

let listener = UnixListener::bind(&path)?;

let accepted = tokio::spawn(async move {
let _ = listener.accept().await?;
Ok::<_, std::io::Error>(())
});

let _stream = SocketStream::connect_unix(path.to_str().unwrap()).await?;
accepted.await.unwrap()?;

let _ = std::fs::remove_file(path);
Ok(())
}
}
Loading