From be2218f9de0514d2aaa6d77121a8f53dfb096181 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Thu, 11 Jun 2026 20:33:21 +0900 Subject: [PATCH] feat(binding/go): Add Copier Signed-off-by: PoAn Yang --- bindings/c/include/opendal.h | 133 ++++++++++ bindings/c/src/copier.rs | 105 ++++++++ bindings/c/src/lib.rs | 5 + bindings/c/src/operator.rs | 96 ++++++++ bindings/c/src/result.rs | 27 ++ bindings/go/copier.go | 233 ++++++++++++++++++ bindings/go/copy.go | 75 ++++-- bindings/go/tests/behavior_tests/copy_test.go | 135 +++++++++- bindings/go/types.go | 32 +++ 9 files changed, 815 insertions(+), 26 deletions(-) create mode 100644 bindings/c/src/copier.rs create mode 100644 bindings/go/copier.go diff --git a/bindings/c/include/opendal.h b/bindings/c/include/opendal.h index 4bc85d59765c..be3d843f7f1b 100644 --- a/bindings/c/include/opendal.h +++ b/bindings/c/include/opendal.h @@ -784,6 +784,42 @@ typedef struct opendal_copy_options { uintptr_t chunk; } opendal_copy_options; +/** + * \brief BlockingCopier is designed to drive a long-running copy operation in a + * blocking manner. + * + * Users can construct a copier by `opendal_operator_copier` or + * `opendal_operator_copier_with`. Each `opendal_copier_next` call drives the copy + * forward by one step and reports the number of bytes copied; the copy is complete + * once `has_next` is false. + * + * @see opendal_operator_copier() + */ +typedef struct opendal_copier { + /** + * The pointer to the opendal::blocking::Copier in the Rust code. + * Only touch this on judging whether it is NULL. + */ + void *inner; +} opendal_copier; + +/** + * \brief The result type returned by opendal_operator_copier(). + * The result type for opendal_operator_copier(), the field `copier` contains the copier + * used to drive a long-running copy operation. The field `error` represents whether the + * operation is successful. If successful, the `error` field is null. + */ +typedef struct opendal_result_operator_copier { + /** + * The pointer for opendal_copier + */ + struct opendal_copier *copier; + /** + * The error, if ok, it is null + */ + struct opendal_error *error; +} opendal_result_operator_copier; + /** * \brief Metadata for **operator**, users can use this metadata to get information * of operator. @@ -1131,6 +1167,28 @@ typedef struct opendal_result_writer_write { struct opendal_error *error; } opendal_result_writer_write; +/** + * \brief The result type returned by opendal_copier_next(). + * The result type contains a `size` field, which is the number of bytes copied in this + * step (zero on error or completion), and a `has_next` field, which is true when the copy + * made progress and should be driven again. When `has_next` is false and `error` is null, + * the copy has completed. The error field is the error code and error message. + */ +typedef struct opendal_result_copier_next { + /** + * The number of bytes copied in this step. + */ + uintptr_t size; + /** + * Whether the copy operation made progress and should be driven again. + */ + bool has_next; + /** + * The error, if ok, it is null + */ + struct opendal_error *error; +} opendal_result_copier_next; + #ifdef __cplusplus extern "C" { #endif // __cplusplus @@ -2102,6 +2160,57 @@ struct opendal_error *opendal_operator_copy_with(const struct opendal_operator * struct opendal_error *opendal_operator_check(const struct opendal_operator *op); +/** + * \brief Blocking create a copier to copy a file from `src` to `dest`. + * + * The returned copier drives a long-running copy operation. Call + * `opendal_copier_next` repeatedly to make progress, and `opendal_copier_free` + * to release it once finished. + * + * @param op The opendal_operator created previously + * @param src The designated source path you want to copy + * @param dest The designated destination path you want to copy + * @see opendal_operator + * @see opendal_copier + * @see opendal_result_operator_copier + * @return opendal_result_operator_copier, containing a copier and an opendal_error. + * If the operation succeeds, the `copier` field holds a valid copier and the `error` + * field is null. Otherwise, the `copier` will be null and the `error` will be set + * correspondingly. + * + * # Safety + * + * It is **safe** under the cases below + * * The memory pointed to by `src` and `dest` must contain a valid nul terminator at the + * end of the string. + * + * # Panic + * + * * If the `src` or `dest` points to NULL, this function panics + */ +struct opendal_result_operator_copier opendal_operator_copier(const struct opendal_operator *op, + const char *src, + const char *dest); + +/** + * \brief Blocking create a copier to copy a file from `src` to `dest` with options. + * + * This is the same as `opendal_operator_copier` but accepts an `opendal_copy_options` + * to control the behavior, e.g. `concurrent` or `chunk`. Pass NULL to use defaults. + * + * @param op The opendal_operator created previously + * @param src The designated source path you want to copy + * @param dest The designated destination path you want to copy + * @param opts The options for the copy operation; pass NULL to use defaults + * @see opendal_operator_copier + * @see opendal_copy_options + * @return opendal_result_operator_copier, containing a copier and an opendal_error. + */ +struct opendal_result_operator_copier opendal_operator_copier_with(const struct opendal_operator *op, + const char *src, + const char *dest, + const struct opendal_copy_options *opts); + /** * \brief Get information of underlying accessor. * @@ -2685,6 +2794,30 @@ struct opendal_error *opendal_writer_close(struct opendal_writer *ptr); */ void opendal_writer_free(struct opendal_writer *ptr); +/** + * \brief Drive the copy operation forward by one step. + * + * Returns the number of bytes copied in this step. When `has_next` is true the + * caller should keep calling this function to drive the copy. When `has_next` is + * false and `error` is null the copy has completed. + * + * @see opendal_operator_copier() + */ +struct opendal_result_copier_next opendal_copier_next(struct opendal_copier *self); + +/** + * \brief Abort the pending copy operation. + * + * Returns NULL if the abort succeeds, otherwise it contains the error code and + * error message. + */ +struct opendal_error *opendal_copier_abort(struct opendal_copier *self); + +/** + * \brief Free the heap memory used by the opendal_copier. + */ +void opendal_copier_free(struct opendal_copier *ptr); + #ifdef __cplusplus } // extern "C" #endif // __cplusplus diff --git a/bindings/c/src/copier.rs b/bindings/c/src/copier.rs new file mode 100644 index 000000000000..fe9ba32f36ce --- /dev/null +++ b/bindings/c/src/copier.rs @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use ::opendal as core; +use std::ffi::c_void; + +use super::*; + +/// \brief BlockingCopier is designed to drive a long-running copy operation in a +/// blocking manner. +/// +/// Users can construct a copier by `opendal_operator_copier` or +/// `opendal_operator_copier_with`. Each `opendal_copier_next` call drives the copy +/// forward by one step and reports the number of bytes copied; the copy is complete +/// once `has_next` is false. +/// +/// @see opendal_operator_copier() +#[repr(C)] +pub struct opendal_copier { + /// The pointer to the opendal::blocking::Copier in the Rust code. + /// Only touch this on judging whether it is NULL. + inner: *mut c_void, +} + +impl opendal_copier { + fn deref_mut(&mut self) -> &mut core::blocking::Copier { + // Safety: the inner should never be null once constructed + // The use-after-free is undefined behavior + unsafe { &mut *(self.inner as *mut core::blocking::Copier) } + } +} + +impl opendal_copier { + pub(crate) fn new(copier: core::blocking::Copier) -> Self { + Self { + inner: Box::into_raw(Box::new(copier)) as _, + } + } + + /// \brief Drive the copy operation forward by one step. + /// + /// Returns the number of bytes copied in this step. When `has_next` is true the + /// caller should keep calling this function to drive the copy. When `has_next` is + /// false and `error` is null the copy has completed. + /// + /// @see opendal_operator_copier() + #[no_mangle] + pub unsafe extern "C" fn opendal_copier_next(&mut self) -> opendal_result_copier_next { + match self.deref_mut().next() { + Some(Ok(n)) => opendal_result_copier_next { + size: n, + has_next: true, + error: std::ptr::null_mut(), + }, + None => opendal_result_copier_next { + size: 0, + has_next: false, + error: std::ptr::null_mut(), + }, + Some(Err(e)) => opendal_result_copier_next { + size: 0, + has_next: false, + error: opendal_error::new(e), + }, + } + } + + /// \brief Abort the pending copy operation. + /// + /// Returns NULL if the abort succeeds, otherwise it contains the error code and + /// error message. + #[no_mangle] + pub unsafe extern "C" fn opendal_copier_abort(&mut self) -> *mut opendal_error { + if let Err(e) = self.deref_mut().abort() { + opendal_error::new(e) + } else { + std::ptr::null_mut() + } + } + + /// \brief Free the heap memory used by the opendal_copier. + #[no_mangle] + pub unsafe extern "C" fn opendal_copier_free(ptr: *mut opendal_copier) { + unsafe { + if !ptr.is_null() { + drop(Box::from_raw((*ptr).inner as *mut core::blocking::Copier)); + drop(Box::from_raw(ptr)); + } + } + } +} diff --git a/bindings/c/src/lib.rs b/bindings/c/src/lib.rs index 1ef4148a2d83..245bf5c90f12 100644 --- a/bindings/c/src/lib.rs +++ b/bindings/c/src/lib.rs @@ -58,10 +58,12 @@ pub use presign::opendal_presigned_request; pub use presign::opendal_result_presign; mod result; +pub use result::opendal_result_copier_next; pub use result::opendal_result_exists; pub use result::opendal_result_is_exist; pub use result::opendal_result_list; pub use result::opendal_result_lister_next; +pub use result::opendal_result_operator_copier; pub use result::opendal_result_operator_new; pub use result::opendal_result_operator_reader; pub use result::opendal_result_operator_writer; @@ -89,3 +91,6 @@ pub use reader::opendal_reader; mod writer; pub use writer::opendal_writer; + +mod copier; +pub use copier::opendal_copier; diff --git a/bindings/c/src/operator.rs b/bindings/c/src/operator.rs index cb610640242f..0663bc1fe497 100644 --- a/bindings/c/src/operator.rs +++ b/bindings/c/src/operator.rs @@ -1307,3 +1307,99 @@ pub unsafe extern "C" fn opendal_operator_check(op: &opendal_operator) -> *mut o std::ptr::null_mut() } } + +/// \brief Blocking create a copier to copy a file from `src` to `dest`. +/// +/// The returned copier drives a long-running copy operation. Call +/// `opendal_copier_next` repeatedly to make progress, and `opendal_copier_free` +/// to release it once finished. +/// +/// @param op The opendal_operator created previously +/// @param src The designated source path you want to copy +/// @param dest The designated destination path you want to copy +/// @see opendal_operator +/// @see opendal_copier +/// @see opendal_result_operator_copier +/// @return opendal_result_operator_copier, containing a copier and an opendal_error. +/// If the operation succeeds, the `copier` field holds a valid copier and the `error` +/// field is null. Otherwise, the `copier` will be null and the `error` will be set +/// correspondingly. +/// +/// # Safety +/// +/// It is **safe** under the cases below +/// * The memory pointed to by `src` and `dest` must contain a valid nul terminator at the +/// end of the string. +/// +/// # Panic +/// +/// * If the `src` or `dest` points to NULL, this function panics +#[no_mangle] +pub unsafe extern "C" fn opendal_operator_copier( + op: &opendal_operator, + src: *const c_char, + dest: *const c_char, +) -> opendal_result_operator_copier { + assert!(!src.is_null()); + assert!(!dest.is_null()); + let src = std::ffi::CStr::from_ptr(src) + .to_str() + .expect("malformed src"); + let dest = std::ffi::CStr::from_ptr(dest) + .to_str() + .expect("malformed dest"); + match op.deref().copier(src, dest) { + Ok(copier) => opendal_result_operator_copier { + copier: Box::into_raw(Box::new(opendal_copier::new(copier))), + error: std::ptr::null_mut(), + }, + Err(err) => opendal_result_operator_copier { + copier: std::ptr::null_mut(), + error: opendal_error::new(err), + }, + } +} + +/// \brief Blocking create a copier to copy a file from `src` to `dest` with options. +/// +/// This is the same as `opendal_operator_copier` but accepts an `opendal_copy_options` +/// to control the behavior, e.g. `concurrent` or `chunk`. Pass NULL to use defaults. +/// +/// @param op The opendal_operator created previously +/// @param src The designated source path you want to copy +/// @param dest The designated destination path you want to copy +/// @param opts The options for the copy operation; pass NULL to use defaults +/// @see opendal_operator_copier +/// @see opendal_copy_options +/// @return opendal_result_operator_copier, containing a copier and an opendal_error. +#[no_mangle] +pub unsafe extern "C" fn opendal_operator_copier_with( + op: &opendal_operator, + src: *const c_char, + dest: *const c_char, + opts: *const opendal_copy_options, +) -> opendal_result_operator_copier { + assert!(!src.is_null()); + assert!(!dest.is_null()); + let src = std::ffi::CStr::from_ptr(src) + .to_str() + .expect("malformed src"); + let dest = std::ffi::CStr::from_ptr(dest) + .to_str() + .expect("malformed dest"); + let copy_opts = if opts.is_null() { + core::options::CopyOptions::default() + } else { + core::options::CopyOptions::from(&*opts) + }; + match op.deref().copier_options(src, dest, copy_opts) { + Ok(copier) => opendal_result_operator_copier { + copier: Box::into_raw(Box::new(opendal_copier::new(copier))), + error: std::ptr::null_mut(), + }, + Err(err) => opendal_result_operator_copier { + copier: std::ptr::null_mut(), + error: opendal_error::new(err), + }, + } +} diff --git a/bindings/c/src/result.rs b/bindings/c/src/result.rs index 601a80138943..9f58bfea8bcd 100644 --- a/bindings/c/src/result.rs +++ b/bindings/c/src/result.rs @@ -180,3 +180,30 @@ pub struct opendal_result_writer_write { /// The error, if ok, it is null pub error: *mut opendal_error, } + +/// \brief The result type returned by opendal_operator_copier(). +/// The result type for opendal_operator_copier(), the field `copier` contains the copier +/// used to drive a long-running copy operation. The field `error` represents whether the +/// operation is successful. If successful, the `error` field is null. +#[repr(C)] +pub struct opendal_result_operator_copier { + /// The pointer for opendal_copier + pub copier: *mut opendal_copier, + /// The error, if ok, it is null + pub error: *mut opendal_error, +} + +/// \brief The result type returned by opendal_copier_next(). +/// The result type contains a `size` field, which is the number of bytes copied in this +/// step (zero on error or completion), and a `has_next` field, which is true when the copy +/// made progress and should be driven again. When `has_next` is false and `error` is null, +/// the copy has completed. The error field is the error code and error message. +#[repr(C)] +pub struct opendal_result_copier_next { + /// The number of bytes copied in this step. + pub size: usize, + /// Whether the copy operation made progress and should be driven again. + pub has_next: bool, + /// The error, if ok, it is null + pub error: *mut opendal_error, +} diff --git a/bindings/go/copier.go b/bindings/go/copier.go new file mode 100644 index 000000000000..369557a183a1 --- /dev/null +++ b/bindings/go/copier.go @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package opendal + +import ( + "context" + "runtime" + "unsafe" + + "github.com/jupiterrider/ffi" +) + +// Copier drives a long-running copy operation from a source to a destination. +// +// Unlike Copy, which performs the whole copy in a single call, a Copier lets the +// caller advance the copy step by step via Next, which is useful for reporting +// progress on large copies. The copy is complete once Next reports that no more +// progress is available. Always call Close to release the resources held by the +// Copier. +// +// # Example +// +// func exampleCopier(op *opendal.Operator) { +// copier, err := op.Copier("path/from/file", "path/to/file") +// if err != nil { +// log.Fatal(err) +// } +// defer copier.Close() +// for { +// n, ok, err := copier.Next() +// if err != nil { +// log.Fatal(err) +// } +// if !ok { +// break +// } +// log.Printf("copied %d bytes", n) +// } +// } +type Copier struct { + inner *opendalCopier + ctx context.Context +} + +// Copier creates a Copier to copy a file from src to dest. +// +// Copier is a wrapper around the C-binding function `opendal_operator_copier`. +// When options are provided, it uses `opendal_operator_copier_with`. +// +// # Parameters +// +// - src: The source file path. +// - dest: The destination file path. +// - opts: Optional copy options (shared with Copy). +// +// # Returns +// +// - *Copier: A Copier used to drive the copy, or an error if the operation fails. +// +// # Behavior +// +// - Both src and dest must be file paths, not directories. +// - The returned Copier must be released with Close. +func (op *Operator) Copier(src, dest string, opts ...WithCopyFn) (*Copier, error) { + if len(opts) == 0 { + inner, err := ffiOperatorCopier.symbol(op.ctx)(op.inner, src, dest) + if err != nil { + return nil, err + } + return &Copier{inner: inner, ctx: op.ctx}, nil + } + + o := parseCopyOptions(opts...) + cOpts, keepAlive, err := newOpendalCopyOptions(op.ctx, o) + if err != nil { + return nil, err + } + defer ffiCopyOptionsFree.symbol(op.ctx)(cOpts) + inner, err := ffiOperatorCopierWith.symbol(op.ctx)(op.inner, src, dest, cOpts) + runtime.KeepAlive(keepAlive) + if err != nil { + return nil, err + } + return &Copier{inner: inner, ctx: op.ctx}, nil +} + +// Next drives the copy forward by one step. +// +// It returns the number of bytes copied in this step. ok is false when the copy +// has completed; once that happens, further calls keep returning (0, false, nil). +// A non-nil error reports a failed copy. +func (c *Copier) Next() (size uint, ok bool, err error) { + return ffiCopierNext.symbol(c.ctx)(c.inner) +} + +// Abort aborts the pending copy operation. +// +// After Abort, the Copier should still be released with Close. +func (c *Copier) Abort() error { + return ffiCopierAbort.symbol(c.ctx)(c.inner) +} + +// Close releases the resources held by the Copier. It must be called once the +// Copier is no longer needed. Close is safe to call more than once; subsequent +// calls are no-ops. +func (c *Copier) Close() error { + if c.inner == nil { + return nil + } + ffiCopierFree.symbol(c.ctx)(c.inner) + c.inner = nil + return nil +} + +var ffiOperatorCopier = newFFI(ffiOpts{ + sym: "opendal_operator_copier", + rType: &typeResultOperatorCopier, + aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer, &ffi.TypePointer}, +}, func(ctx context.Context, ffiCall ffiCall) func(op *opendalOperator, src, dest string) (*opendalCopier, error) { + return func(op *opendalOperator, src, dest string) (*opendalCopier, error) { + byteSrc, err := BytePtrFromString(src) + if err != nil { + return nil, err + } + byteDest, err := BytePtrFromString(dest) + if err != nil { + return nil, err + } + var result resultOperatorCopier + ffiCall( + unsafe.Pointer(&result), + unsafe.Pointer(&op), + unsafe.Pointer(&byteSrc), + unsafe.Pointer(&byteDest), + ) + if result.error != nil { + return nil, parseError(ctx, result.error) + } + return result.copier, nil + } +}) + +var ffiOperatorCopierWith = newFFI(ffiOpts{ + sym: "opendal_operator_copier_with", + rType: &typeResultOperatorCopier, + aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer, &ffi.TypePointer, &ffi.TypePointer}, +}, func(ctx context.Context, ffiCall ffiCall) func(op *opendalOperator, src, dest string, opts *opendalCopyOptions) (*opendalCopier, error) { + return func(op *opendalOperator, src, dest string, opts *opendalCopyOptions) (*opendalCopier, error) { + byteSrc, err := BytePtrFromString(src) + if err != nil { + return nil, err + } + byteDest, err := BytePtrFromString(dest) + if err != nil { + return nil, err + } + var result resultOperatorCopier + ffiCall( + unsafe.Pointer(&result), + unsafe.Pointer(&op), + unsafe.Pointer(&byteSrc), + unsafe.Pointer(&byteDest), + unsafe.Pointer(&opts), + ) + if result.error != nil { + return nil, parseError(ctx, result.error) + } + return result.copier, nil + } +}) + +var ffiCopierNext = newFFI(ffiOpts{ + sym: "opendal_copier_next", + rType: &typeResultCopierNext, + aTypes: []*ffi.Type{&ffi.TypePointer}, +}, func(ctx context.Context, ffiCall ffiCall) func(c *opendalCopier) (uint, bool, error) { + return func(c *opendalCopier) (uint, bool, error) { + var result resultCopierNext + ffiCall( + unsafe.Pointer(&result), + unsafe.Pointer(&c), + ) + if result.error != nil { + return 0, false, parseError(ctx, result.error) + } + return result.size, result.has_next == 1, nil + } +}) + +var ffiCopierAbort = newFFI(ffiOpts{ + sym: "opendal_copier_abort", + rType: &ffi.TypePointer, + aTypes: []*ffi.Type{&ffi.TypePointer}, +}, func(ctx context.Context, ffiCall ffiCall) func(c *opendalCopier) error { + return func(c *opendalCopier) error { + var e *opendalError + ffiCall( + unsafe.Pointer(&e), + unsafe.Pointer(&c), + ) + return parseError(ctx, e) + } +}) + +var ffiCopierFree = newFFI(ffiOpts{ + sym: "opendal_copier_free", + rType: &ffi.TypeVoid, + aTypes: []*ffi.Type{&ffi.TypePointer}, +}, func(_ context.Context, ffiCall ffiCall) func(c *opendalCopier) { + return func(c *opendalCopier) { + ffiCall( + nil, + unsafe.Pointer(&c), + ) + } +}) diff --git a/bindings/go/copy.go b/bindings/go/copy.go index 39cb4be0bcc1..d8d5e98b5f29 100644 --- a/bindings/go/copy.go +++ b/bindings/go/copy.go @@ -126,46 +126,73 @@ func (op *Operator) Copy(src, dest string, opts ...WithCopyFn) error { return ffiOperatorCopy.symbol(op.ctx)(op.inner, src, dest) } + o := parseCopyOptions(opts...) + cOpts, keepAlive, err := newOpendalCopyOptions(op.ctx, o) + if err != nil { + return err + } + defer ffiCopyOptionsFree.symbol(op.ctx)(cOpts) + err = ffiOperatorCopyWith.symbol(op.ctx)(op.inner, src, dest, cOpts) + runtime.KeepAlive(keepAlive) + return err +} + +func parseCopyOptions(opts ...WithCopyFn) *copyOptions { o := ©Options{} for _, opt := range opts { opt(o) } - cOpts := ffiCopyOptionsNew.symbol(op.ctx)() - free := ffiCopyOptionsFree.symbol(op.ctx) + return o +} - ffiCopyOptionsSetIfNotExists.symbol(op.ctx)(cOpts, o.ifNotExists) - var ifMatchData []byte - if o.ifMatch != "" { - data, err := ffiCopyOptionsSetIfMatch.symbol(op.ctx)(cOpts, o.ifMatch) - if err != nil { - free(cOpts) - return err - } - ifMatchData = data +// copyOptionsKeepAlive holds byte slices that must outlive the FFI call that +// references them, since the C side borrows the underlying memory. +type copyOptionsKeepAlive struct { + strings [][]byte +} + +// newOpendalCopyOptions builds a C-side opendal_copy_options from o. On error it +// frees the allocated options before returning. The caller is responsible for +// freeing the returned options and for keeping keepAlive reachable until after +// the FFI call that consumes the options. +func newOpendalCopyOptions(ctx context.Context, o *copyOptions) (*opendalCopyOptions, copyOptionsKeepAlive, error) { + cOpts := ffiCopyOptionsNew.symbol(ctx)() + keepAlive := copyOptionsKeepAlive{} + + fail := func(err error) (*opendalCopyOptions, copyOptionsKeepAlive, error) { + ffiCopyOptionsFree.symbol(ctx)(cOpts) + return nil, copyOptionsKeepAlive{}, err } - var sourceVersionData []byte - if o.sourceVersion != "" { - data, err := ffiCopyOptionsSetSourceVersion.symbol(op.ctx)(cOpts, o.sourceVersion) + + setString := func(value string, set func(*opendalCopyOptions, string) ([]byte, error)) error { + if value == "" { + return nil + } + data, err := set(cOpts, value) if err != nil { - free(cOpts) return err } - sourceVersionData = data + keepAlive.strings = append(keepAlive.strings, data) + return nil + } + + ffiCopyOptionsSetIfNotExists.symbol(ctx)(cOpts, o.ifNotExists) + if err := setString(o.ifMatch, ffiCopyOptionsSetIfMatch.symbol(ctx)); err != nil { + return fail(err) + } + if err := setString(o.sourceVersion, ffiCopyOptionsSetSourceVersion.symbol(ctx)); err != nil { + return fail(err) } if o.sourceContentLengthHint != 0 { - ffiCopyOptionsSetSourceContentLengthHint.symbol(op.ctx)(cOpts, o.sourceContentLengthHint) + ffiCopyOptionsSetSourceContentLengthHint.symbol(ctx)(cOpts, o.sourceContentLengthHint) } if o.concurrent != 0 { - ffiCopyOptionsSetConcurrent.symbol(op.ctx)(cOpts, o.concurrent) + ffiCopyOptionsSetConcurrent.symbol(ctx)(cOpts, o.concurrent) } if o.chunk != 0 { - ffiCopyOptionsSetChunk.symbol(op.ctx)(cOpts, o.chunk) + ffiCopyOptionsSetChunk.symbol(ctx)(cOpts, o.chunk) } - err := ffiOperatorCopyWith.symbol(op.ctx)(op.inner, src, dest, cOpts) - free(cOpts) - runtime.KeepAlive(ifMatchData) - runtime.KeepAlive(sourceVersionData) - return err + return cOpts, keepAlive, nil } var ffiCopyOptionsNew = newFFI(ffiOpts{ diff --git a/bindings/go/tests/behavior_tests/copy_test.go b/bindings/go/tests/behavior_tests/copy_test.go index eaf812694f3b..32c2daac5bbe 100644 --- a/bindings/go/tests/behavior_tests/copy_test.go +++ b/bindings/go/tests/behavior_tests/copy_test.go @@ -38,12 +38,16 @@ func testsCopy(cap *opendal.Capability) []behaviorTest { testCopySelf, testCopyNested, testCopyOverwrite, + testCopierFile, + testCopierCompletion, + testCopierAbort, } if cap.CreateDir() { tests = append(tests, testCopySourceDir, testCopyTargetDir) } if isCapEnabled(cap.CopyWithIfNotExists, "copy_with_if_not_exists") { - tests = append(tests, testCopyWithIfNotExistsToNewFile, testCopyWithIfNotExistsToExistingFile) + tests = append(tests, testCopyWithIfNotExistsToNewFile, testCopyWithIfNotExistsToExistingFile, + testCopierWithIfNotExistsToNewFile, testCopierWithIfNotExistsToExistingFile) } if isCapEnabled(cap.CopyWithIfMatch, "copy_with_if_match") { tests = append(tests, testCopyWithIfMatchMatch, testCopyWithIfMatchMismatch) @@ -52,7 +56,7 @@ func testsCopy(cap *opendal.Capability) []behaviorTest { tests = append(tests, testCopyWithSourceVersionToNewFile, testCopyWithSourceVersionToSameFile) } if isCapEnabled(cap.CopyCanMulti, "copy_can_multi") { - tests = append(tests, testCopyWithChunk, testCopyWithChunkAndConcurrent) + tests = append(tests, testCopyWithChunk, testCopyWithChunkAndConcurrent, testCopierMultipart) } return tests } @@ -163,6 +167,98 @@ func testCopyOverwrite(assert *require.Assertions, op *opendal.Operator, fixture assert.Equal(sourceContent, targetContent) } +func driveCopier(copier *opendal.Copier) error { + for { + _, ok, err := copier.Next() + if err != nil { + return err + } + if !ok { + return nil + } + } +} + +func testCopierFile(assert *require.Assertions, op *opendal.Operator, fixture *fixture) { + sourcePath, sourceContent, _ := fixture.NewFile() + assert.Nil(op.Write(sourcePath, sourceContent)) + + targetPath := fixture.NewFilePath() + copier, err := op.Copier(sourcePath, targetPath) + assert.Nil(err, "create copier must succeed") + defer copier.Close() + + assert.Nil(driveCopier(copier), "copier must drive to completion") + + targetContent, err := op.Read(targetPath) + assert.Nil(err, "read must succeed") + assert.Equal(sourceContent, targetContent) +} + +func testCopierCompletion(assert *require.Assertions, op *opendal.Operator, fixture *fixture) { + sourcePath, sourceContent, _ := fixture.NewFile() + assert.Nil(op.Write(sourcePath, sourceContent)) + + targetPath := fixture.NewFilePath() + copier, err := op.Copier(sourcePath, targetPath) + assert.Nil(err, "create copier must succeed") + defer copier.Close() + + assert.Nil(driveCopier(copier), "copier must drive to completion") + + _, ok, err := copier.Next() + assert.Nil(err, "next on completed copier must not error") + assert.False(ok, "completed copier must keep reporting no progress") +} + +func testCopierAbort(assert *require.Assertions, op *opendal.Operator, fixture *fixture) { + sourcePath, sourceContent, _ := fixture.NewFile() + assert.Nil(op.Write(sourcePath, sourceContent)) + + targetPath := fixture.NewFilePath() + copier, err := op.Copier(sourcePath, targetPath) + assert.Nil(err, "create copier must succeed") + defer copier.Close() + + assert.Nil(copier.Abort(), "abort must succeed") +} + +func testCopierWithIfNotExistsToNewFile(assert *require.Assertions, op *opendal.Operator, fixture *fixture) { + sourcePath, sourceContent, _ := fixture.NewFile() + assert.Nil(op.Write(sourcePath, sourceContent)) + + targetPath := fixture.NewFilePath() + copier, err := op.Copier(sourcePath, targetPath, opendal.CopyWithIfNotExists(true)) + assert.Nil(err, "create copier must succeed") + defer copier.Close() + + assert.Nil(driveCopier(copier), "copier must drive to completion") + + bs, err := op.Read(targetPath) + assert.Nil(err, "read must succeed") + assert.Equal(sourceContent, bs) +} + +func testCopierWithIfNotExistsToExistingFile(assert *require.Assertions, op *opendal.Operator, fixture *fixture) { + sourcePath, sourceContent, _ := fixture.NewFile() + assert.Nil(op.Write(sourcePath, sourceContent)) + + targetPath, targetContent, _ := fixture.NewFile() + assert.Nil(op.Write(targetPath, targetContent)) + + copier, err := op.Copier(sourcePath, targetPath, opendal.CopyWithIfNotExists(true)) + assert.Nil(err, "create copier must succeed") + defer copier.Close() + + err = driveCopier(copier) + assert.NotNil(err, "copier must fail when target exists") + assert.Equal(opendal.CodeConditionNotMatch, assertErrorCode(err)) + + bs, err := op.Read(targetPath) + assert.Nil(err, "read must succeed") + assert.Equal(targetContent, bs, "target must not be overwritten") +} + func testCopyWithIfNotExistsToNewFile(assert *require.Assertions, op *opendal.Operator, fixture *fixture) { sourcePath, sourceContent, _ := fixture.NewFile() assert.Nil(op.Write(sourcePath, sourceContent)) @@ -317,3 +413,38 @@ func testCopyWithChunkAndConcurrent(assert *require.Assertions, op *opendal.Oper assert.Nil(err, "read must succeed") assert.Equal(sourceContent, bs) } + +func testCopierMultipart(assert *require.Assertions, op *opendal.Operator, fixture *fixture) { + chunk, sourceSize := copyMultiChunkSize(op.Info().GetFullCapability()) + if sourceSize == 0 { + return + } + + sourcePath := fixture.NewFilePath() + sourceContent := genFixedBytes(sourceSize) + assert.Nil(op.Write(sourcePath, sourceContent)) + + targetPath := fixture.NewFilePath() + copier, err := op.Copier(sourcePath, targetPath, opendal.CopyWithChunk(uint(chunk))) + assert.Nil(err, "create copier must succeed") + defer copier.Close() + + var steps int + var total uint + for { + n, ok, err := copier.Next() + assert.Nil(err, "copier next must succeed") + if !ok { + break + } + steps++ + total += n + } + + assert.Greater(steps, 0, "multipart copier must report incremental progress") + assert.Equal(uint(sourceSize), total, "reported progress must sum to source size") + + bs, err := op.Read(targetPath) + assert.Nil(err, "read must succeed") + assert.Equal(sourceContent, bs) +} diff --git a/bindings/go/types.go b/bindings/go/types.go index bd39c4177cee..f29642c147ba 100644 --- a/bindings/go/types.go +++ b/bindings/go/types.go @@ -108,6 +108,25 @@ var ( }[0], } + typeResultOperatorCopier = ffi.Type{ + Type: ffi.Struct, + Elements: &[]*ffi.Type{ + &ffi.TypePointer, + &ffi.TypePointer, + nil, + }[0], + } + + typeResultCopierNext = ffi.Type{ + Type: ffi.Struct, + Elements: &[]*ffi.Type{ + &ffi.TypePointer, // size (usize) + &ffi.TypeUint8, // has_next (bool) + &ffi.TypePointer, // error + nil, + }[0], + } + typeResultReaderRead = ffi.Type{ Type: ffi.Struct, Elements: &[]*ffi.Type{ @@ -300,6 +319,19 @@ type resultWriterWrite struct { error *opendalError } +type opendalCopier struct{} + +type resultOperatorCopier struct { + copier *opendalCopier + error *opendalError +} + +type resultCopierNext struct { + size uint + has_next uint8 + error *opendalError +} + type resultReaderRead struct { size uint error *opendalError