diff --git a/bindings/java/src/lib.rs b/bindings/java/src/lib.rs index 80296680829e..e6d719ac4fc5 100644 --- a/bindings/java/src/lib.rs +++ b/bindings/java/src/lib.rs @@ -232,6 +232,7 @@ fn make_write_options<'a>( if_none_match: convert::read_string_field(env, options, "ifNoneMatch")?, if_not_exists: convert::read_bool_field(env, options, "ifNotExists").unwrap_or_default(), user_metadata: convert::read_map_field(env, options, "userMetadata")?, + expires: None, concurrent, chunk: convert::read_jlong_field_to_usize(env, options, "chunk")?, }) diff --git a/bindings/nodejs/src/options.rs b/bindings/nodejs/src/options.rs index ccb1cae5b020..fa7e44cf07a3 100644 --- a/bindings/nodejs/src/options.rs +++ b/bindings/nodejs/src/options.rs @@ -501,6 +501,7 @@ impl From for opendal::options::WriteOptions { if_match: value.if_match, if_none_match: value.if_none_match, if_not_exists: value.if_not_exists.unwrap_or_default(), + expires: None, concurrent: value.concurrent.unwrap_or_default() as usize, } } diff --git a/bindings/python/src/options.rs b/bindings/python/src/options.rs index 1d37e0e7112e..a23c36550ecc 100644 --- a/bindings/python/src/options.rs +++ b/bindings/python/src/options.rs @@ -195,6 +195,7 @@ impl From for ocore::options::WriteOptions { if_match: opts.if_match, if_none_match: opts.if_none_match, if_not_exists: opts.if_not_exists.unwrap_or(false), + expires: None, } } } diff --git a/core/core/src/layers/correctness_check.rs b/core/core/src/layers/correctness_check.rs index 020acf4075c5..a7cfe5011e80 100644 --- a/core/core/src/layers/correctness_check.rs +++ b/core/core/src/layers/correctness_check.rs @@ -166,6 +166,13 @@ impl LayeredAccess for CorrectnessAccessor { return Err(err); } } + if args.expires().is_some() && !capability.write_with_expires { + return Err(new_unsupported_error( + &self.info, + Operation::Write, + "expires", + )); + } self.inner.write(path, args).await } @@ -460,6 +467,24 @@ mod tests { }); let res = op.writer_with("path").append(true).await; assert!(res.is_ok()); + + let res = op + .write_with("path", "".as_bytes()) + .expires(Duration::from_secs(60)) + .await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + write: true, + write_with_expires: true, + ..Default::default() + }); + let res = op + .write_with("path", "".as_bytes()) + .expires(Duration::from_secs(60)) + .await; + assert!(res.is_ok()); } #[tokio::test] diff --git a/core/core/src/raw/ops.rs b/core/core/src/raw/ops.rs index 30ca7395c3a5..5b0c53a10502 100644 --- a/core/core/src/raw/ops.rs +++ b/core/core/src/raw/ops.rs @@ -706,6 +706,7 @@ pub struct OpWrite { if_none_match: Option, if_not_exists: bool, user_metadata: Option>, + expires: Option, } impl OpWrite { @@ -833,6 +834,17 @@ impl OpWrite { pub fn user_metadata(&self) -> Option<&HashMap> { self.user_metadata.as_ref() } + + /// Set the expiration duration of the op. + pub fn with_expires(mut self, expires: Duration) -> Self { + self.expires = Some(expires); + self + } + + /// Get the expiration duration from the op. + pub fn expires(&self) -> Option { + self.expires + } } /// Args for `writer` operation. @@ -884,6 +896,7 @@ impl From for (OpWrite, OpWriter) { if_none_match: value.if_none_match, if_not_exists: value.if_not_exists, user_metadata: value.user_metadata, + expires: value.expires, }, OpWriter { chunk: value.chunk }, ) diff --git a/core/core/src/types/capability.rs b/core/core/src/types/capability.rs index 032077765a3c..88366cc6fa94 100644 --- a/core/core/src/types/capability.rs +++ b/core/core/src/types/capability.rs @@ -130,6 +130,8 @@ pub struct Capability { pub write_with_if_not_exists: bool, /// Indicates if custom user metadata can be attached during write operations. pub write_with_user_metadata: bool, + /// Indicates if write operations can expire the object after a duration. + pub write_with_expires: bool, /// Maximum size supported for multipart uploads. /// For example, AWS S3 supports up to 5GiB per part in multipart uploads. pub write_multi_max_size: Option, diff --git a/core/core/src/types/operator/operator_futures.rs b/core/core/src/types/operator/operator_futures.rs index 093395fb3364..1bfefb099dcb 100644 --- a/core/core/src/types/operator/operator_futures.rs +++ b/core/core/src/types/operator/operator_futures.rs @@ -921,6 +921,29 @@ impl>> FutureWrite { self.args.0.user_metadata = Some(HashMap::from_iter(data)); self } + + /// Sets a duration after which the written object should expire. + /// + /// Refer to [`options::WriteOptions::expires`] for more details. + /// + /// ### Example + /// + /// ``` + /// # use opendal_core::Result; + /// # use opendal_core::Operator; + /// # use std::time::Duration; + /// # async fn test(op: Operator) -> Result<()> { + /// let _ = op + /// .write_with("path/to/file", vec![0; 4096]) + /// .expires(Duration::from_secs(60)) + /// .await?; + /// # Ok(()) + /// # } + /// ``` + pub fn expires(mut self, v: Duration) -> Self { + self.args.0.expires = Some(v); + self + } } /// Future that generated by [`Operator::writer_with`]. @@ -1260,6 +1283,31 @@ impl>> FutureWriter { self.args.user_metadata = Some(HashMap::from_iter(data)); self } + + /// Sets a duration after which the written object should expire. + /// + /// Refer to [`options::WriteOptions::expires`] for more details. + /// + /// ### Example + /// + /// ``` + /// # use opendal_core::Result; + /// # use opendal_core::Operator; + /// # use std::time::Duration; + /// # async fn test(op: Operator) -> Result<()> { + /// let mut w = op + /// .writer_with("path/to/file") + /// .expires(Duration::from_secs(60)) + /// .await?; + /// w.write(vec![0; 4096]).await?; + /// w.close().await?; + /// # Ok(()) + /// # } + /// ``` + pub fn expires(mut self, v: Duration) -> Self { + self.args.expires = Some(v); + self + } } /// Future that generated by [`Operator::delete_with`]. diff --git a/core/core/src/types/options.rs b/core/core/src/types/options.rs index b5bdc9102e52..59bdb7564c4b 100644 --- a/core/core/src/types/options.rs +++ b/core/core/src/types/options.rs @@ -17,7 +17,7 @@ //! Options module provides options definitions for operations. -use crate::raw::{BytesRange, Timestamp}; +use crate::raw::{BytesRange, Duration, Timestamp}; use std::collections::HashMap; /// Options for delete operations. @@ -420,6 +420,21 @@ pub struct WriteOptions { /// This metadata can be retrieved later when reading the object. pub user_metadata: Option>, + /// Sets a duration after which the written object should expire. + /// + /// ### Capability + /// + /// Check [`Capability::write_with_expires`] before using this feature. + /// + /// ### Behavior + /// + /// - If supported, the target object will expire after the provided duration. + /// - Services without native expiration support will return an unsupported error. + /// - If a service also has a configured default TTL, this per-write value takes precedence. + /// + /// This operation is useful for cache-like backends such as Redis and Memcached. + pub expires: Option, + /// Sets If-Match header for this write request. /// /// ### Capability diff --git a/core/services/memcached/src/backend.rs b/core/services/memcached/src/backend.rs index 9062267a8a72..068c5205add2 100644 --- a/core/services/memcached/src/backend.rs +++ b/core/services/memcached/src/backend.rs @@ -196,6 +196,7 @@ impl MemcachedBackend { stat: true, write: true, write_can_empty: true, + write_with_expires: true, delete: true, shared: true, ..Default::default() @@ -251,9 +252,12 @@ impl Access for MemcachedBackend { Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize()))) } - async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { let p = build_abs_path(&self.root, path); - Ok((RpWrite::new(), MemcachedWriter::new(self.core.clone(), p))) + Ok(( + RpWrite::new(), + MemcachedWriter::new(self.core.clone(), p, args), + )) } async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { diff --git a/core/services/memcached/src/core.rs b/core/services/memcached/src/core.rs index 1221bb08ed7f..afc4eef5ceb5 100644 --- a/core/services/memcached/src/core.rs +++ b/core/services/memcached/src/core.rs @@ -203,16 +203,15 @@ impl MemcachedCore { Ok(result.map(Buffer::from)) } - pub async fn set(&self, key: &str, value: Buffer) -> Result<()> { + pub async fn set(&self, key: &str, value: Buffer, expires: Option) -> Result<()> { let mut conn = self.conn().await?; + let ttl = expires.or(self.default_ttl); conn.set( &percent_encode_path(key), &value.to_vec(), // Set expiration to 0 if ttl not set. - self.default_ttl - .map(|v| v.as_secs() as u32) - .unwrap_or_default(), + ttl.map(|v| v.as_secs() as u32).unwrap_or_default(), ) .await } diff --git a/core/services/memcached/src/writer.rs b/core/services/memcached/src/writer.rs index bd5f0e476892..2c9fe888b67f 100644 --- a/core/services/memcached/src/writer.rs +++ b/core/services/memcached/src/writer.rs @@ -17,7 +17,7 @@ use std::sync::Arc; -use opendal_core::raw::oio; +use opendal_core::raw::{OpWrite, oio}; use opendal_core::*; use super::core::*; @@ -25,14 +25,16 @@ use super::core::*; pub struct MemcachedWriter { core: Arc, path: String, + op: OpWrite, buffer: oio::QueueBuf, } impl MemcachedWriter { - pub fn new(core: Arc, path: String) -> Self { + pub fn new(core: Arc, path: String, op: OpWrite) -> Self { Self { core, path, + op, buffer: oio::QueueBuf::new(), } } @@ -47,7 +49,7 @@ impl oio::Write for MemcachedWriter { async fn close(&mut self) -> Result { let buf = self.buffer.clone().collect(); let length = buf.len() as u64; - self.core.set(&self.path, buf).await?; + self.core.set(&self.path, buf, self.op.expires()).await?; let meta = Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length); Ok(meta) diff --git a/core/services/redis/src/backend.rs b/core/services/redis/src/backend.rs index a06012348527..884759607d2f 100644 --- a/core/services/redis/src/backend.rs +++ b/core/services/redis/src/backend.rs @@ -289,6 +289,7 @@ impl RedisBackend { delete: true, stat: true, write_can_empty: true, + write_with_expires: true, shared: true, ..Default::default() }); @@ -361,9 +362,9 @@ impl Access for RedisBackend { Ok((RpRead::new(), buffer)) } - async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { let p = build_abs_path(&self.root, path); - Ok((RpWrite::new(), RedisWriter::new(self.core.clone(), p))) + Ok((RpWrite::new(), RedisWriter::new(self.core.clone(), p, args))) } async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { diff --git a/core/services/redis/src/core.rs b/core/services/redis/src/core.rs index 1f3574a563e7..8d202e1670d9 100644 --- a/core/services/redis/src/core.rs +++ b/core/services/redis/src/core.rs @@ -184,10 +184,10 @@ impl RedisCore { Ok(result.map(Buffer::from)) } - pub async fn set(&self, key: &str, value: Buffer) -> Result<()> { + pub async fn set(&self, key: &str, value: Buffer, expires: Option) -> Result<()> { let mut conn = self.conn().await?; let value = value.to_vec(); - if let Some(dur) = self.default_ttl { + if let Some(dur) = expires.or(self.default_ttl) { let _: () = conn .set_ex(key, value, dur.as_secs()) .await diff --git a/core/services/redis/src/writer.rs b/core/services/redis/src/writer.rs index b96b50378fb7..90f8b393742e 100644 --- a/core/services/redis/src/writer.rs +++ b/core/services/redis/src/writer.rs @@ -25,14 +25,16 @@ use super::core::RedisCore; pub struct RedisWriter { core: Arc, path: String, + op: OpWrite, buffer: oio::QueueBuf, } impl RedisWriter { - pub fn new(core: Arc, path: String) -> Self { + pub fn new(core: Arc, path: String, op: OpWrite) -> Self { Self { core, path, + op, buffer: oio::QueueBuf::new(), } } @@ -47,7 +49,7 @@ impl oio::Write for RedisWriter { async fn close(&mut self) -> Result { let buf = self.buffer.clone().collect(); let length = buf.len() as u64; - self.core.set(&self.path, buf).await?; + self.core.set(&self.path, buf, self.op.expires()).await?; let meta = Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length); Ok(meta) diff --git a/core/tests/behavior/async_write.rs b/core/tests/behavior/async_write.rs index 9c3f0cb4a7d2..8070e63e7405 100644 --- a/core/tests/behavior/async_write.rs +++ b/core/tests/behavior/async_write.rs @@ -16,6 +16,7 @@ // under the License. use std::collections::HashMap; +use std::time::Duration; use anyhow::Result; use bytes::Bytes; @@ -46,6 +47,7 @@ pub fn tests(op: &Operator, tests: &mut Vec) { test_write_with_if_not_exists, test_write_with_if_match, test_write_with_user_metadata, + test_write_with_expires, test_write_returns_metadata, test_writer_write, test_writer_write_with_overwrite, @@ -249,6 +251,27 @@ pub async fn test_write_with_user_metadata(op: Operator) -> Result<()> { Ok(()) } +/// Write a single file with expires should expire the object. +pub async fn test_write_with_expires(op: Operator) -> Result<()> { + if !op.info().full_capability().write_with_expires { + return Ok(()); + } + + let (path, content, size) = TEST_FIXTURE.new_file(op.clone()); + + op.write_with(&path, content) + .expires(Duration::from_secs(1)) + .await?; + + let meta = op.stat(&path).await.expect("stat must succeed"); + assert_eq!(meta.content_length(), size as u64); + + tokio::time::sleep(Duration::from_secs(2)).await; + assert!(!op.exists(&path).await?); + + Ok(()) +} + pub async fn test_write_returns_metadata(op: Operator) -> Result<()> { let (path, content, _) = TEST_FIXTURE.new_file(op.clone());