Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bindings/c/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ cbindgen = "0.29.0"

[dependencies]
bytes = "1.4.0"
futures-util = "0.3.32"
# this crate won't be published, we always use the local version
opendal = { version = ">=0", path = "../../core", features = ["blocking"] }
opendal = { version = ">=0", path = "../../core" }
tokio = { version = "1.27", features = ["fs", "macros", "rt-multi-thread"] }

[features]
Expand Down
310 changes: 306 additions & 4 deletions bindings/c/include/opendal.h

Large diffs are not rendered by default.

197 changes: 197 additions & 0 deletions bindings/c/src/cancel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::ffi::c_void;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};

use ::opendal as core;

use crate::runtime::RUNTIME;

struct CancelState {
cancelled: AtomicBool,
waker: Mutex<Option<Waker>>,
}

impl CancelState {
fn new() -> Self {
Self {
cancelled: AtomicBool::new(false),
waker: Mutex::new(None),
}
}

fn cancel(&self) {
self.cancelled.store(true, Ordering::SeqCst);
if let Some(waker) = self.waker.lock().expect("cancel waker poisoned").take() {
waker.wake();
}
}
}

#[derive(Clone)]
pub(crate) struct CancelToken {
state: Arc<CancelState>,
}

impl CancelToken {
fn new() -> Self {
Self {
state: Arc::new(CancelState::new()),
}
}

fn cancel(&self) {
self.state.cancel();
}

fn cancelled(&self) -> Cancelled {
Cancelled {
state: self.state.clone(),
}
}
}

pub(crate) struct Cancelled {
state: Arc<CancelState>,
}

impl Future for Cancelled {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.state.cancelled.load(Ordering::SeqCst) {
return Poll::Ready(());
}

*self.state.waker.lock().expect("cancel waker poisoned") = Some(cx.waker().clone());

if self.state.cancelled.load(Ordering::SeqCst) {
Poll::Ready(())
} else {
Poll::Pending
}
}
}

/// \brief A cancellation token for cancellable OpenDAL operations.
#[repr(C)]
pub struct opendal_cancel_token {
/// The pointer to the Rust cancellation token.
/// Only touch this on judging whether it is NULL.
inner: *mut c_void,
}

impl opendal_cancel_token {
fn deref(&self) -> &CancelToken {
// Safety: inner is initialized by opendal_cancel_token_new.
unsafe { &*(self.inner as *mut CancelToken) }
}

/// \brief Construct a cancellation token.
#[no_mangle]
pub unsafe extern "C" fn opendal_cancel_token_new() -> *mut Self {
Box::into_raw(Box::new(Self {
inner: Box::into_raw(Box::new(CancelToken::new())) as _,
}))
}

/// \brief Cancel operations using this token.
#[no_mangle]
pub unsafe extern "C" fn opendal_cancel_token_cancel(ptr: *const Self) {
if !ptr.is_null() {
unsafe { (*ptr).deref().cancel() };
}
}

/// \brief Free a cancellation token.
#[no_mangle]
pub unsafe extern "C" fn opendal_cancel_token_free(ptr: *mut Self) {
if !ptr.is_null() {
unsafe {
drop(Box::from_raw((*ptr).inner as *mut CancelToken));
drop(Box::from_raw(ptr));
}
}
}
}

pub(crate) unsafe fn clone_token(ptr: *const opendal_cancel_token) -> Option<CancelToken> {
if ptr.is_null() {
None
} else {
Some(unsafe { (*ptr).deref().clone() })
}
}

pub(crate) async fn run<T, F>(token: Option<CancelToken>, fut: F) -> core::Result<T>
where
F: Future<Output = core::Result<T>>,
{
match token {
Some(token) => {
tokio::select! {
biased;
_ = token.cancelled() => Err(cancelled_error()),
result = fut => result,
}
}
None => fut.await,
}
}

pub(crate) fn cancelled_error() -> core::Error {
core::Error::new(core::ErrorKind::Unexpected, "operation cancelled")
}

/// Block on a cancellable future on the current runtime thread.
///
/// Use this for futures that capture non-`Send` borrows, such as reader/writer/lister
/// handles accessed through `&mut self`.
pub(crate) fn block_on_cancelable<T, F>(
token: *const opendal_cancel_token,
fut: F,
) -> core::Result<T>
where
F: Future<Output = core::Result<T>>,
{
let token = unsafe { clone_token(token) };
RUNTIME.block_on(run(token, fut))
}

/// Block on a cancellable `Send` future by spawning it on the runtime.
///
/// Use this for operator-level calls where the future owns cloned `Operator` state.
pub(crate) fn block_on_cancelable_spawn<T, F>(
token: *const opendal_cancel_token,
fut: F,
) -> core::Result<T>
where
T: Send + 'static,
F: Future<Output = core::Result<T>> + Send + 'static,
{
let token = unsafe { clone_token(token) };
RUNTIME
.block_on(RUNTIME.spawn(run(token, fut)))
.map_err(|err| {
core::Error::new(core::ErrorKind::Unexpected, "cancellable task failed").set_source(err)
})?
}
5 changes: 5 additions & 0 deletions bindings/c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ mod error;
pub use error::opendal_code;
pub use error::opendal_error;

mod runtime;

mod cancel;
pub use cancel::opendal_cancel_token;

mod lister;
pub use lister::opendal_lister;

Expand Down
62 changes: 37 additions & 25 deletions bindings/c/src/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use ::opendal as core;
use futures_util::StreamExt;
use std::ffi::c_void;

use super::*;
Expand All @@ -35,20 +36,51 @@ pub struct opendal_lister {
}

impl opendal_lister {
fn deref_mut(&mut self) -> &mut core::blocking::Lister {
fn deref_mut(&mut self) -> &mut core::Lister {
// Safety: the inner should never be null once constructed
// The use-after-free is undefined behavior
unsafe { &mut *(self.inner as *mut core::blocking::Lister) }
unsafe { &mut *(self.inner as *mut core::Lister) }
}
}

impl opendal_lister {
pub(crate) fn new(lister: core::blocking::Lister) -> Self {
pub(crate) fn new_async(lister: core::Lister) -> Self {
Self {
inner: Box::into_raw(Box::new(lister)) as _,
}
}

fn result_next(result: core::Result<Option<core::Entry>>) -> opendal_result_lister_next {
match result {
Ok(Some(e)) => opendal_result_lister_next {
entry: Box::into_raw(Box::new(opendal_entry::new(e))),
error: std::ptr::null_mut(),
},
Ok(None) => opendal_result_lister_next {
entry: std::ptr::null_mut(),
error: std::ptr::null_mut(),
},
Err(e) => opendal_result_lister_next {
entry: std::ptr::null_mut(),
error: opendal_error::new(e),
},
}
}

/// \brief Like `opendal_lister_next` with cooperative cancellation.
///
/// Pass NULL for `token` to block until completion.
#[no_mangle]
pub unsafe extern "C" fn opendal_lister_next_with_cancel(
&mut self,
token: *const opendal_cancel_token,
) -> opendal_result_lister_next {
let lister = self.deref_mut();
Self::result_next(cancel::block_on_cancelable(token, async move {
lister.next().await.transpose()
}))
}

/// \brief Return the next object to be listed
///
/// Lister is an iterator of the objects under its path, this method is the same as
Expand All @@ -58,35 +90,15 @@ impl opendal_lister {
/// @see opendal_operator_list()
#[no_mangle]
pub unsafe extern "C" fn opendal_lister_next(&mut self) -> opendal_result_lister_next {
let e = self.deref_mut().next();
if e.is_none() {
return opendal_result_lister_next {
entry: std::ptr::null_mut(),
error: std::ptr::null_mut(),
};
}

match e.unwrap() {
Ok(e) => {
let ent = Box::into_raw(Box::new(opendal_entry::new(e)));
opendal_result_lister_next {
entry: ent,
error: std::ptr::null_mut(),
}
}
Err(e) => opendal_result_lister_next {
entry: std::ptr::null_mut(),
error: opendal_error::new(e),
},
}
unsafe { Self::opendal_lister_next_with_cancel(self, std::ptr::null()) }
}

/// \brief Free the heap-allocated metadata used by opendal_lister
#[no_mangle]
pub unsafe extern "C" fn opendal_lister_free(ptr: *mut opendal_lister) {
unsafe {
if !ptr.is_null() {
drop(Box::from_raw((*ptr).inner as *mut core::blocking::Lister));
drop(Box::from_raw((*ptr).inner as *mut core::Lister));
drop(Box::from_raw(ptr));
}
}
Expand Down
Loading