diff --git a/.spelling b/.spelling index 3527af725..035ee6526 100644 --- a/.spelling +++ b/.spelling @@ -593,3 +593,10 @@ deterministically dereferences honour MPSC +Azure +SDK +TypeSpec +typespec +Seekable +Spawner +awaitable diff --git a/CHANGELOG.md b/CHANGELOG.md index b6631cdff..6f0976792 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ Please see each crate's change log below: - [`anyspawn`](./crates/anyspawn/CHANGELOG.md) +- [`anyspawn_azure`](./crates/anyspawn_azure/CHANGELOG.md) - [`bytesbuf`](./crates/bytesbuf/CHANGELOG.md) - [`bytesbuf_io`](./crates/bytesbuf_io/CHANGELOG.md) - [`cachet`](./crates/cachet/CHANGELOG.md) @@ -13,6 +14,7 @@ Please see each crate's change log below: - [`data_privacy_macros`](./crates/data_privacy_macros/CHANGELOG.md) - [`data_privacy_macros_impl`](./crates/data_privacy_macros_impl/CHANGELOG.md) - [`fetch`](./crates/fetch/CHANGELOG.md) +- [`fetch_azure`](./crates/fetch_azure/CHANGELOG.md) - [`fetch_hyper`](./crates/fetch_hyper/CHANGELOG.md) - [`fetch_options`](./crates/fetch_options/CHANGELOG.md) - [`fetch_tls`](./crates/fetch_tls/CHANGELOG.md) diff --git a/Cargo.lock b/Cargo.lock index 74407936b..720b7d966 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -91,6 +91,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "anyspawn_azure" +version = "0.1.0" +dependencies = [ + "anyspawn", + "async-trait", + "azure_identity", + "futures", + "tick", + "tokio", + "typespec_client_core", +] + [[package]] name = "argh" version = "0.1.19" @@ -248,12 +261,45 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-task" version = "4.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -298,6 +344,74 @@ dependencies = [ "fs_extra", ] +[[package]] +name = "azure_core" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe6a26a7d374b440015cbbcbf2d9d8be5a133aa940599f5e5dc569504baa262e" +dependencies = [ + "async-lock", + "async-trait", + "azure_core_macros", + "bytes", + "futures", + "pin-project", + "rustc_version", + "serde", + "serde_json", + "tracing", + "typespec", + "typespec_client_core", +] + +[[package]] +name = "azure_core_macros" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9b52dba6a345f3ad2d42ff8d0d63df9d0994cfa29657bf18ffdbf149f78a4f5" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "tracing", +] + +[[package]] +name = "azure_identity" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32edf96b356ca7c51d7590c4925cc36efc3947a5da4468e8e0b25c56ecbb3de5" +dependencies = [ + "async-lock", + "async-trait", + "azure_core", + "futures", + "pin-project", + "serde", + "serde_json", + "time", + "tracing", + "url", +] + +[[package]] +name = "azure_storage_blob" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1756febbcca86c862ef718b983b505d08bd65a9bc984a915b0a16af4a4c3fe5b" +dependencies = [ + "async-stream", + "async-trait", + "azure_core", + "bytes", + "futures", + "percent-encoding", + "pin-project", + "serde", + "serde_json", + "time", +] + [[package]] name = "base64" version = "0.22.1" @@ -956,6 +1070,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" dependencies = [ "powerfmt", + "serde_core", ] [[package]] @@ -1016,6 +1131,12 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" +[[package]] +name = "dyn-clone" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" + [[package]] name = "dynosaur" version = "0.3.0" @@ -1158,6 +1279,27 @@ dependencies = [ "wiremock", ] +[[package]] +name = "fetch_azure" +version = "0.1.0" +dependencies = [ + "anyspawn", + "anyspawn_azure", + "async-trait", + "azure_core", + "azure_identity", + "azure_storage_blob", + "bytesbuf", + "fetch", + "futures", + "http", + "layered", + "mutants", + "tick", + "tokio", + "typespec_client_core", +] + [[package]] name = "fetch_hyper" version = "0.4.0" @@ -2796,6 +2938,16 @@ dependencies = [ "syn", ] +[[package]] +name = "quick-xml" +version = "0.39.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdcc8dd4e2f670d309a5f0e83fe36dfdc05af317008fea29144da1a2ac858e5e" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.45" @@ -3788,6 +3940,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ "pin-project-lite", + "tracing-attributes", "tracing-core", ] @@ -3804,6 +3957,17 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "tracing-attributes" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tracing-core" version = "0.1.36" @@ -3902,6 +4066,56 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c" +[[package]] +name = "typespec" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21666a31293beab8f41d38c2849ddbc342cd9c7cb4d71a9818868287a8934e53" +dependencies = [ + "base64", + "bytes", + "futures", + "quick-xml", + "serde", + "serde_json", + "url", +] + +[[package]] +name = "typespec_client_core" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924f0c734e0ac3b881ab99d032bd28fcc969d2bb73ef1b8dd4772fd8e518a382" +dependencies = [ + "async-trait", + "base64", + "bytes", + "dyn-clone", + "futures", + "pin-project", + "rand 0.10.1", + "serde", + "serde_json", + "time", + "tracing", + "typespec", + "typespec_macros", + "url", + "uuid", +] + +[[package]] +name = "typespec_macros" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c608f4427943f8adb211abc95c87672b1b98847152783507d54e3246e502f60" +dependencies = [ + "proc-macro2", + "quote", + "rustc_version", + "syn", +] + [[package]] name = "unicode-ident" version = "1.0.24" diff --git a/Cargo.toml b/Cargo.toml index 21c47484f..ff6fbf8b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ homepage = "https://github.com/microsoft/oxidizer" # local dependencies anyspawn = { path = "crates/anyspawn", default-features = false, version = "0.5.3" } +anyspawn_azure = { path = "crates/anyspawn_azure", default-features = false, version = "0.1.0" } bytesbuf = { path = "crates/bytesbuf", default-features = false, version = "0.5.3" } bytesbuf_io = { path = "crates/bytesbuf_io", default-features = false, version = "0.5.4" } cachet = { path = "crates/cachet", default-features = false, version = "0.6.6" } @@ -35,6 +36,7 @@ data_privacy_core = { path = "crates/data_privacy_core", default-features = fals data_privacy_macros = { path = "crates/data_privacy_macros", default-features = false, version = "0.10.1" } data_privacy_macros_impl = { path = "crates/data_privacy_macros_impl", default-features = false, version = "0.10.1" } fetch = { path = "crates/fetch", default-features = false, version = "0.11.0" } +fetch_azure = { path = "crates/fetch_azure", default-features = false, version = "0.1.0" } fetch_hyper = { path = "crates/fetch_hyper", default-features = false, version = "0.4.0" } fetch_options = { path = "crates/fetch_options", default-features = false, version = "0.2.1" } fetch_tls = { path = "crates/fetch_tls", default-features = false, version = "0.2.2" } @@ -66,6 +68,10 @@ allocator-api2 = { version = "0.4.0", default-features = false } anyhow = { version = "1.0.100", default-features = false } argh = { version = "0.1.13", default-features = false } async-once-cell = { version = "0.5.0", default-features = false } +async-trait = { version = "0.1.89", default-features = false } +azure_core = { version = "1.0.0", default-features = false } +azure_identity = { version = "1.0.0", default-features = false } +azure_storage_blob = { version = "1.0.0", default-features = false } base64 = { version = "0.22.0", default-features = false, features = ["alloc"] } bolero = { version = "0.13.4", default-features = false } bumpalo = { version = "3.20.2", default-features = false } @@ -151,6 +157,7 @@ tracing-test = { version = "0.2.6", default-features = false } trait-variant = { version = "0.1.2", default-features = false } trybuild = { version = "1.0.114", default-features = false } typeid = { version = "1.0.3", default-features = false } +typespec_client_core = { version = "1.0.0", default-features = false } uuid = { version = "1.21.0", default-features = false } widestring = { version = "1.2.1", default-features = false } windows-sys = { version = "0.61.2", default-features = false } diff --git a/README.md b/README.md index 0f4f9d6c3..c5de8d200 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,7 @@ This repository contains a set of crates that help you build robust highly scala These are the primary crates built out of this repo: - [`anyspawn`](./crates/anyspawn/README.md) - A generic task spawner compatible with any async runtime. +- [`anyspawn_azure`](./crates/anyspawn_azure/README.md) - Azure SDK async runtime and process executor backed by an anyspawn spawner and a tick clock. - [`bytesbuf`](./crates/bytesbuf/README.md) - Types for creating and manipulating byte sequences. - [`bytesbuf_io`](./crates/bytesbuf_io/README.md) - Asynchronous I/O abstractions expressed via `bytesbuf` types. - [`cachet`](./crates/cachet/README.md) - A composable, customizable multi-tier caching library with rich feature support. @@ -35,6 +36,7 @@ These are the primary crates built out of this repo: - [`cachet_tier`](./crates/cachet_tier/README.md) - Core cache tier trait and abstractions for building cache backends. - [`data_privacy`](./crates/data_privacy/README.md) - Mechanisms to classify, manipulate, and redact sensitive data. - [`fetch`](./crates/fetch/README.md) - "Universal, composable and resilient HTTP client." +- [`fetch_azure`](./crates/fetch_azure/README.md) - Azure SDK HTTP transport backed by the fetch HTTP client. - [`fetch_hyper`](./crates/fetch_hyper/README.md) - Hyper-based HTTP transport utilities for fetch. - [`fetch_options`](./crates/fetch_options/README.md) - Options types for 'fetch' crate. - [`fundle`](./crates/fundle/README.md) - Compile-time safe dependency injection for Rust. diff --git a/crates/anyspawn_azure/CHANGELOG.md b/crates/anyspawn_azure/CHANGELOG.md new file mode 100644 index 000000000..7298e7a85 --- /dev/null +++ b/crates/anyspawn_azure/CHANGELOG.md @@ -0,0 +1,13 @@ +# Changelog + +## [0.1.0] + +- ✨ Features + + - introduce `anyspawn_azure`, adapting Oxidizer primitives to Azure SDK + runtime abstractions: + - `Runtime` implements `azure_core::async_runtime::AsyncRuntime` on top + of an `anyspawn::Spawner` (spawning) and a `tick::Clock` (sleeping). + - with the optional `azure-identity` feature, `Runtime` also implements + `azure_identity::Executor`, running credential commands on the + `anyspawn::Spawner`. diff --git a/crates/anyspawn_azure/Cargo.toml b/crates/anyspawn_azure/Cargo.toml new file mode 100644 index 000000000..2c77e6224 --- /dev/null +++ b/crates/anyspawn_azure/Cargo.toml @@ -0,0 +1,57 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +[package] +name = "anyspawn_azure" +description = "Azure SDK async runtime and process executor backed by an anyspawn spawner and a tick clock." +version = "0.1.0" +readme = "README.md" +keywords = ["oxidizer", "azure", "async", "runtime", "spawner"] +categories = ["asynchronous"] + +edition = { workspace = true } +rust-version = { workspace = true } +authors = { workspace = true } +license = { workspace = true } +homepage = { workspace = true } +repository = "https://github.com/microsoft/oxidizer/tree/main/crates/anyspawn_azure" + +[package.metadata.cargo_check_external_types] +allowed_external_types = [ + # Workspace sibling crates + "anyspawn::*", + "tick::*", + # External dependencies that define the AsyncRuntime and Executor traits + "azure_identity::*", + "typespec_client_core::*", +] + +[package.metadata.docs.rs] +all-features = true + +[features] +## Implement [`azure_identity::Executor`] for [`Runtime`], allowing it to run the +## subprocesses that developer credentials (e.g. the Azure CLI) rely on. +azure-identity = ["dep:azure_identity", "dep:async-trait"] + +[dependencies] +# internal +anyspawn = { workspace = true } +tick = { workspace = true } + +# external +async-trait = { workspace = true, optional = true } +azure_identity = { workspace = true, optional = true } +futures = { workspace = true, features = ["std"] } +typespec_client_core = { workspace = true, features = ["http"] } + +[dev-dependencies] +# internal +anyspawn = { path = "../anyspawn", features = ["tokio"] } +tick = { path = "../tick", features = ["tokio"] } + +# external +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "time"] } + +[lints] +workspace = true diff --git a/crates/anyspawn_azure/README.md b/crates/anyspawn_azure/README.md new file mode 100644 index 000000000..669059563 --- /dev/null +++ b/crates/anyspawn_azure/README.md @@ -0,0 +1,61 @@ +
+ Anyspawn Azure Logo + +# Anyspawn Azure + +[![crate.io](https://img.shields.io/crates/v/anyspawn_azure.svg)](https://crates.io/crates/anyspawn_azure) +[![docs.rs](https://docs.rs/anyspawn_azure/badge.svg)](https://docs.rs/anyspawn_azure) +[![MSRV](https://img.shields.io/crates/msrv/anyspawn_azure)](https://crates.io/crates/anyspawn_azure) +[![CI](https://github.com/microsoft/oxidizer/actions/workflows/main.yml/badge.svg?event=push)](https://github.com/microsoft/oxidizer/actions/workflows/main.yml) +[![Coverage](https://codecov.io/gh/microsoft/oxidizer/graph/badge.svg?token=FCUG0EL5TI)](https://codecov.io/gh/microsoft/oxidizer) +[![License](https://img.shields.io/badge/license-MIT-blue.svg)](../../LICENSE) +This crate was developed as part of the Oxidizer project + +
+ +Bundle [`anyspawn`][__link0] and [`tick`][__link1] as Azure SDK runtime abstractions. + +The Azure SDK abstracts its task spawning, sleeping, and yielding behind the +[`typespec_client_core::async_runtime::AsyncRuntime`][__link2] trait, and the process +execution that developer credentials rely on behind the `azure_identity::Executor` +trait. This crate adapts those primitives to both: + +* [`Runtime`][__link3] implements [`typespec_client_core::async_runtime::AsyncRuntime`][__link4] on top of + an [`anyspawn::Spawner`][__link5] (spawning) and a [`tick::Clock`][__link6] (sleeping). +* With the `azure-identity` feature, [`Runtime`][__link7] also implements + `azure_identity::Executor`, running credential commands on the + [`anyspawn::Spawner`][__link8]. + +## Example + +```rust +use std::sync::Arc; + +use anyspawn::Spawner; +use anyspawn_azure::Runtime; +use tick::Clock; +use typespec_client_core::async_runtime::{AsyncRuntime, set_async_runtime}; + +// Install an `anyspawn`-backed async runtime (sleeping on a `tick::Clock`). +fn install_runtime(spawner: Spawner, clock: Clock) { + let runtime: Arc = Runtime::new(spawner, clock).into(); + let _ = set_async_runtime(runtime); +} +``` + + +
+ +This crate was developed as part of The Oxidizer Project. Browse this crate's source code. + + + [__cargo_doc2readme_dependencies_info]: ggGmYW0CYXZlMC43LjJhdIQbLiTyV0MU86EbZU15e0PmecoboQ9jo59bnAEbyDXw04U13GlhYvRhcoQbBIQZjhLbN24b4SQLJr0OB3sbAJBFhMk9gnYbZcuWw6vetXBhZISCaGFueXNwYXduZTAuNS4zgm5hbnlzcGF3bl9henVyZWUwLjEuMIJkdGlja2UwLjMuM4J0dHlwZXNwZWNfY2xpZW50X2NvcmVlMS4wLjA + [__link0]: https://crates.io/crates/anyspawn/0.5.3 + [__link1]: https://crates.io/crates/tick/0.3.3 + [__link2]: https://docs.rs/typespec_client_core/1.0.0/typespec_client_core/?search=async_runtime::AsyncRuntime + [__link3]: https://docs.rs/anyspawn_azure/0.1.0/anyspawn_azure/?search=Runtime + [__link4]: https://docs.rs/typespec_client_core/1.0.0/typespec_client_core/?search=async_runtime::AsyncRuntime + [__link5]: https://docs.rs/anyspawn/0.5.3/anyspawn/?search=Spawner + [__link6]: https://docs.rs/tick/0.3.3/tick/?search=Clock + [__link7]: https://docs.rs/anyspawn_azure/0.1.0/anyspawn_azure/?search=Runtime + [__link8]: https://docs.rs/anyspawn/0.5.3/anyspawn/?search=Spawner diff --git a/crates/anyspawn_azure/favicon.ico b/crates/anyspawn_azure/favicon.ico new file mode 100644 index 000000000..e8aa25ff1 --- /dev/null +++ b/crates/anyspawn_azure/favicon.ico @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:9eb58f6eca7bfa5554daab00b06c30bfd0d9d2eabc82d1ed4c9d1cede030f2f3 +size 167984 diff --git a/crates/anyspawn_azure/logo.png b/crates/anyspawn_azure/logo.png new file mode 100644 index 000000000..8fc9e4be7 --- /dev/null +++ b/crates/anyspawn_azure/logo.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:2500d13a5e0017dbfd0c7312411cce10cefdafabcea25692e24b8c0ca283f5f6 +size 10094 diff --git a/crates/anyspawn_azure/src/lib.rs b/crates/anyspawn_azure/src/lib.rs new file mode 100644 index 000000000..2aaab6df9 --- /dev/null +++ b/crates/anyspawn_azure/src/lib.rs @@ -0,0 +1,42 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#![cfg_attr(all(coverage_nightly, test), feature(coverage_attribute))] +#![cfg_attr(docsrs, feature(doc_cfg))] +#![doc(html_logo_url = "https://media.githubusercontent.com/media/microsoft/oxidizer/refs/heads/main/crates/anyspawn_azure/logo.png")] +#![doc(html_favicon_url = "https://media.githubusercontent.com/media/microsoft/oxidizer/refs/heads/main/crates/anyspawn_azure/favicon.ico")] + +//! Bundle [`anyspawn`] and [`tick`] as Azure SDK runtime abstractions. +//! +//! The Azure SDK abstracts its task spawning, sleeping, and yielding behind the +//! [`typespec_client_core::async_runtime::AsyncRuntime`] trait, and the process +//! execution that developer credentials rely on behind the `azure_identity::Executor` +//! trait. This crate adapts those primitives to both: +//! +//! - [`Runtime`] implements [`typespec_client_core::async_runtime::AsyncRuntime`] on top of +//! an [`anyspawn::Spawner`] (spawning) and a [`tick::Clock`] (sleeping). +//! - With the `azure-identity` feature, [`Runtime`] also implements +//! `azure_identity::Executor`, running credential commands on the +//! [`anyspawn::Spawner`]. +//! +//! # Example +//! +//! ``` +//! use std::sync::Arc; +//! +//! use anyspawn::Spawner; +//! use anyspawn_azure::Runtime; +//! use tick::Clock; +//! use typespec_client_core::async_runtime::{AsyncRuntime, set_async_runtime}; +//! +//! // Install an `anyspawn`-backed async runtime (sleeping on a `tick::Clock`). +//! fn install_runtime(spawner: Spawner, clock: Clock) { +//! let runtime: Arc = Runtime::new(spawner, clock).into(); +//! let _ = set_async_runtime(runtime); +//! } +//! # let _ = install_runtime; +//! ``` + +mod runtime; + +pub use runtime::Runtime; diff --git a/crates/anyspawn_azure/src/runtime.rs b/crates/anyspawn_azure/src/runtime.rs new file mode 100644 index 000000000..ae49b8e07 --- /dev/null +++ b/crates/anyspawn_azure/src/runtime.rs @@ -0,0 +1,134 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! The [`Runtime`] async-runtime adapter. + +use std::future::ready; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use anyspawn::{JoinHandle, Spawner}; +use futures::future::{AbortHandle, Abortable}; +use tick::Clock; +use typespec_client_core::async_runtime::{AbortableTask, AsyncRuntime, SpawnedTask, TaskFuture}; +use typespec_client_core::time::Duration; + +/// An [`AsyncRuntime`] that spawns work on an [`anyspawn::Spawner`] and sleeps +/// on a [`tick::Clock`]. +/// +/// Construct one from an existing [`Spawner`] and [`Clock`] with +/// [`Runtime::new`], then convert it into an `Arc` via +/// [`From`] / [`Into`] and install it with +/// [`typespec_client_core::async_runtime::set_async_runtime`]. +#[derive(Debug, Clone)] +pub struct Runtime { + spawner: Spawner, + clock: Clock, +} + +impl Runtime { + /// Creates a new runtime that spawns work on `spawner` and sleeps on `clock`. + #[must_use] + pub const fn new(spawner: Spawner, clock: Clock) -> Self { + Self { spawner, clock } + } + + /// Returns a reference to the wrapped [`Spawner`]. + pub const fn spawner(&self) -> &Spawner { + &self.spawner + } + + /// Returns a reference to the wrapped [`Clock`]. + #[must_use] + pub const fn clock(&self) -> &Clock { + &self.clock + } +} + +impl From for Arc { + fn from(runtime: Runtime) -> Self { + Arc::new(runtime) + } +} + +#[cfg(feature = "azure-identity")] +impl From for Arc { + fn from(runtime: Runtime) -> Self { + Arc::new(runtime) + } +} + +impl AsyncRuntime for Runtime { + fn spawn(&self, f: TaskFuture) -> SpawnedTask { + // Wrap the task so that `abort` cancels it through `futures`: aborting + // wakes the spawned task, which resolves, which in turn wakes anyone + // awaiting the returned `SpawnedTask`. + let (abort_handle, abort_registration) = AbortHandle::new_pair(); + let task = Abortable::new(f, abort_registration); + let handle = self.spawner.spawn(async move { + // The `Aborted` result is expected when cancelled and carries no value. + let _ = task.await; + }); + Box::pin(RuntimeTask { handle, abort_handle }) + } + + fn sleep(&self, duration: Duration) -> TaskFuture { + let clock = self.clock.clone(); + Box::pin(async move { + // `time::Duration` can be negative; clamp such values to zero. + let duration = std::time::Duration::try_from(duration).unwrap_or_default(); + clock.delay(duration).await; + }) + } + + fn yield_now(&self) -> TaskFuture { + std::thread::yield_now(); + Box::pin(ready(())) + } +} + +/// Adapts an [`anyspawn::JoinHandle`] into an [`AbortableTask`]. +/// +/// Holds the [`AbortHandle`] of the spawned [`Abortable`] task so [`abort`] +/// can cancel work that is still pending and wake anyone awaiting it. +/// +/// [`abort`]: AbortableTask::abort +struct RuntimeTask { + handle: JoinHandle<()>, + abort_handle: AbortHandle, +} + +impl Future for RuntimeTask { + type Output = Result<(), Box>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.get_mut().handle).poll(cx).map(Ok) + } +} + +impl AbortableTask for RuntimeTask { + fn abort(&self) { + // Cancels the `Abortable` task, which wakes its executor, completes the + // join handle, and unblocks any pending waiter on this task. + self.abort_handle.abort(); + } +} + +/// Runs developer-credential commands (e.g. the Azure CLI) on the blocking +/// pool of the [`Spawner`], so credentials like `DeveloperToolsCredential` +/// work on the same runtime as the rest of the SDK. +#[cfg(feature = "azure-identity")] +#[async_trait::async_trait] +impl azure_identity::Executor for Runtime { + async fn run(&self, program: &std::ffi::OsStr, args: &[&std::ffi::OsStr]) -> std::io::Result { + // The program and arguments are borrowed, so own them before moving the + // blocking work onto the spawner's pool. + let program = program.to_os_string(); + let args: Vec = args.iter().map(|arg| (*arg).to_os_string()).collect(); + + self.spawner + .spawn_blocking(move || std::process::Command::new(&program).args(&args).output()) + .await + } +} diff --git a/crates/anyspawn_azure/tests/runtime.rs b/crates/anyspawn_azure/tests/runtime.rs new file mode 100644 index 000000000..e395815e0 --- /dev/null +++ b/crates/anyspawn_azure/tests/runtime.rs @@ -0,0 +1,126 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#![allow(missing_docs, reason = "test module")] +#![cfg(not(miri))] // Miri cannot run tokio's runtime or spawn OS processes. + +//! Integration tests for [`anyspawn_azure::Runtime`]. +//! +//! These drive the runtime adapter on a real Tokio spawner and clock. + +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; + +use anyspawn::Spawner; +use anyspawn_azure::Runtime; +use tick::Clock; +use typespec_client_core::async_runtime::AsyncRuntime; +use typespec_client_core::time::Duration; + +#[tokio::test] +async fn runtime_spawn_runs_task_to_completion() { + let runtime = Runtime::new(Spawner::new_tokio(), Clock::new_tokio()); + let ran = Arc::new(AtomicBool::new(false)); + let ran_in_task = Arc::clone(&ran); + + let task = runtime.spawn(Box::pin(async move { + ran_in_task.store(true, Ordering::SeqCst); + })); + task.await.unwrap(); + + assert!(ran.load(Ordering::SeqCst)); +} + +#[tokio::test] +async fn runtime_abort_resolves_without_waiting() { + let runtime = Runtime::new(Spawner::new_tokio(), Clock::new_tokio()); + + // The task never completes on its own; aborting must wake the waiter so the + // await resolves rather than hanging forever. The timeout bounds the wait so + // a broken `abort` fails the test promptly instead of hanging. + let task = runtime.spawn(Box::pin(std::future::pending::<()>())); + task.abort(); + + tokio::time::timeout(std::time::Duration::from_secs(10), task) + .await + .expect("abort should wake the waiter so the task resolves promptly") + .unwrap(); +} + +#[tokio::test] +async fn runtime_sleep_completes() { + let runtime = Runtime::new(Spawner::new_tokio(), Clock::new_tokio()); + + runtime.sleep(Duration::milliseconds(1)).await; +} + +#[tokio::test] +async fn runtime_yield_now_completes() { + let runtime = Runtime::new(Spawner::new_tokio(), Clock::new_tokio()); + + runtime.yield_now().await; +} + +#[tokio::test] +async fn runtime_converts_into_dyn_runtime() { + let runtime: Arc = Runtime::new(Spawner::new_tokio(), Clock::new_tokio()).into(); + + runtime.spawn(Box::pin(async {})).await.unwrap(); +} + +#[tokio::test] +async fn runtime_accessors_round_trip() { + let runtime = Runtime::new(Spawner::new_tokio(), Clock::new_tokio()); + + // `spawner` and `clock` expose the wrapped components; rebuild from them. + let runtime = Runtime::new(runtime.spawner().clone(), runtime.clock().clone()); + + runtime.yield_now().await; +} + +#[cfg(feature = "azure-identity")] +#[tokio::test] +async fn runtime_executor_runs_command() { + use std::ffi::OsStr; + + use azure_identity::Executor; + + let runtime = Runtime::new(Spawner::new_tokio(), Clock::new_tokio()); + + #[cfg(windows)] + let output = runtime + .run(OsStr::new("cmd"), &[OsStr::new("/C"), OsStr::new("echo hello")]) + .await + .unwrap(); + #[cfg(not(windows))] + let output = runtime + .run(OsStr::new("/bin/sh"), &[OsStr::new("-c"), OsStr::new("echo hello")]) + .await + .unwrap(); + + assert!(output.status.success()); + assert!(String::from_utf8_lossy(&output.stdout).contains("hello")); +} + +#[cfg(feature = "azure-identity")] +#[tokio::test] +async fn runtime_converts_into_dyn_executor() { + use std::ffi::OsStr; + + use azure_identity::Executor; + + let executor: Arc = Runtime::new(Spawner::new_tokio(), Clock::new_tokio()).into(); + + #[cfg(windows)] + let output = executor + .run(OsStr::new("cmd"), &[OsStr::new("/C"), OsStr::new("echo hello")]) + .await + .unwrap(); + #[cfg(not(windows))] + let output = executor + .run(OsStr::new("/bin/sh"), &[OsStr::new("-c"), OsStr::new("echo hello")]) + .await + .unwrap(); + + assert!(output.status.success()); +} diff --git a/crates/fetch/src/lib.rs b/crates/fetch/src/lib.rs index 98798ca1e..217461429 100644 --- a/crates/fetch/src/lib.rs +++ b/crates/fetch/src/lib.rs @@ -2,6 +2,7 @@ // Licensed under the MIT License. #![cfg_attr(all(coverage_nightly, test), feature(coverage_attribute))] +#![cfg_attr(docsrs, feature(doc_cfg))] #![cfg_attr( not(feature = "json"), allow( diff --git a/crates/fetch_azure/CHANGELOG.md b/crates/fetch_azure/CHANGELOG.md new file mode 100644 index 000000000..ae595c4bb --- /dev/null +++ b/crates/fetch_azure/CHANGELOG.md @@ -0,0 +1,9 @@ +# Changelog + +## [0.1.0] + +- ✨ Features + + - introduce `fetch_azure`, adapting a `fetch::HttpClient` into an Azure SDK + HTTP transport: `HttpClient` implements `typespec_client_core::http::HttpClient` + on top of a `fetch::HttpClient`. diff --git a/crates/fetch_azure/Cargo.toml b/crates/fetch_azure/Cargo.toml new file mode 100644 index 000000000..b426b46ca --- /dev/null +++ b/crates/fetch_azure/Cargo.toml @@ -0,0 +1,59 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +[package] +name = "fetch_azure" +description = "Azure SDK HTTP transport backed by the fetch HTTP client." +version = "0.1.0" +readme = "README.md" +keywords = ["oxidizer", "azure", "fetch", "http", "transport"] +categories = ["network-programming"] + +edition = { workspace = true } +rust-version = { workspace = true } +authors = { workspace = true } +license = { workspace = true } +homepage = { workspace = true } +repository = "https://github.com/microsoft/oxidizer/tree/main/crates/fetch_azure" + +[package.metadata.cargo_check_external_types] +allowed_external_types = [ + # Workspace sibling crates + "fetch::*", + # External dependency that defines the HttpClient trait + "typespec_client_core::*", +] + +[package.metadata.docs.rs] +all-features = true + +[dependencies] +# internal +bytesbuf = { workspace = true, features = ["bytes-compat"] } +fetch = { workspace = true } +layered = { workspace = true } + +# external +async-trait = { workspace = true } +futures = { workspace = true, features = ["std"] } +http = { workspace = true } +typespec_client_core = { workspace = true, features = ["http"] } + +[dev-dependencies] +# internal +anyspawn = { path = "../anyspawn", features = ["tokio"] } +anyspawn_azure = { path = "../anyspawn_azure", features = ["azure-identity"] } +fetch = { path = "../fetch", features = ["test-util", "tokio", "rustls"] } +tick = { path = "../tick", features = ["tokio"] } + +# external +async-trait = { workspace = true } +azure_core = { workspace = true } +azure_identity = { workspace = true } +azure_storage_blob = { workspace = true } +futures = { workspace = true, features = ["std"] } +mutants = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } + +[lints] +workspace = true diff --git a/crates/fetch_azure/README.md b/crates/fetch_azure/README.md new file mode 100644 index 000000000..416340d26 --- /dev/null +++ b/crates/fetch_azure/README.md @@ -0,0 +1,50 @@ +
+ Fetch Azure Logo + +# Fetch Azure + +[![crate.io](https://img.shields.io/crates/v/fetch_azure.svg)](https://crates.io/crates/fetch_azure) +[![docs.rs](https://docs.rs/fetch_azure/badge.svg)](https://docs.rs/fetch_azure) +[![MSRV](https://img.shields.io/crates/msrv/fetch_azure)](https://crates.io/crates/fetch_azure) +[![CI](https://github.com/microsoft/oxidizer/actions/workflows/main.yml/badge.svg?event=push)](https://github.com/microsoft/oxidizer/actions/workflows/main.yml) +[![Coverage](https://codecov.io/gh/microsoft/oxidizer/graph/badge.svg?token=FCUG0EL5TI)](https://codecov.io/gh/microsoft/oxidizer) +[![License](https://img.shields.io/badge/license-MIT-blue.svg)](../../LICENSE) +This crate was developed as part of the Oxidizer project + +
+ +Adapt a [`fetch::HttpClient`][__link0] into an Azure SDK HTTP transport. + +The Azure SDK abstracts its HTTP transport behind the +[`typespec_client_core::http::HttpClient`][__link1] trait. [`HttpClient`][__link2] implements that +trait on top of a [`fetch::HttpClient`][__link3], so Azure SDK pipelines run over +`fetch` and benefit from its resilience and observability. + +To run the Azure SDK on an `anyspawn`-backed async runtime, see the +`anyspawn_azure` crate. + +## Example + +```rust +use std::sync::Arc; + +use fetch::HttpClient as FetchClient; +use fetch_azure::HttpClient; + +// Adapt a `fetch` client into an Azure SDK transport. +fn transport(client: FetchClient) -> Arc { + HttpClient::from(client).into() +} +``` + + +
+ +This crate was developed as part of The Oxidizer Project. Browse this crate's source code. + + + [__cargo_doc2readme_dependencies_info]: ggGmYW0CYXZlMC43LjJhdIQbLiTyV0MU86EbZU15e0PmecoboQ9jo59bnAEbyDXw04U13GlhYvRhcoQbUKR9me1iXZ8bPNIxVoL75w4bTc-gWeJmBuMbzmMs_QiBJylhZIOCZWZldGNoZjAuMTEuMIJrZmV0Y2hfYXp1cmVlMC4xLjCCdHR5cGVzcGVjX2NsaWVudF9jb3JlZTEuMC4w + [__link0]: https://docs.rs/fetch/0.11.0/fetch/?search=HttpClient + [__link1]: https://docs.rs/typespec_client_core/1.0.0/typespec_client_core/?search=http::HttpClient + [__link2]: https://docs.rs/fetch_azure/0.1.0/fetch_azure/?search=HttpClient + [__link3]: https://docs.rs/fetch/0.11.0/fetch/?search=HttpClient diff --git a/crates/fetch_azure/examples/azure_transport.rs b/crates/fetch_azure/examples/azure_transport.rs new file mode 100644 index 000000000..ae6fcdb8e --- /dev/null +++ b/crates/fetch_azure/examples/azure_transport.rs @@ -0,0 +1,29 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Adapts a Tokio-based [`fetch::HttpClient`] into an Azure SDK transport and +//! issues a request through the [`azure_core::http::HttpClient`] trait. +//! +//! Run with: `cargo run --example azure_transport` + +use std::sync::Arc; + +use azure_core::http::{HttpClient as HttpClientTrait, Method, Request, Url}; +use fetch::HttpClient as FetchClient; +use fetch_azure::HttpClient; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Build a `fetch` client (Tokio runtime + rustls TLS) and adapt it so it can + // be used wherever the Azure SDK expects an `Arc` transport. + let transport: Arc = HttpClient::from(FetchClient::new_tokio()).into(); + + // In a real application you would hand `transport` to an Azure SDK client's + // options. Here we drive it directly to show the round-trip. + let request = Request::new(Url::parse("https://example.com")?, Method::Get); + let response = transport.execute_request(&request).await?; + + println!("request completed with status: {}", u16::from(response.status())); + + Ok(()) +} diff --git a/crates/fetch_azure/examples/blob_list.rs b/crates/fetch_azure/examples/blob_list.rs new file mode 100644 index 000000000..f28ee5257 --- /dev/null +++ b/crates/fetch_azure/examples/blob_list.rs @@ -0,0 +1,62 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Lists blobs in an Azure Storage container using `fetch` as the transport and +//! an `anyspawn`/`tick`-backed [`Runtime`] as the credential executor. +//! +//! Set `AZURE_STORAGE_SERVICE_ENDPOINT` (and sign in with `az`/`azd`), then run: +//! `cargo run --example blob_list --features azure-identity` + +use std::env; +use std::sync::Arc; + +use anyspawn::Spawner; +use anyspawn_azure::Runtime; +use azure_core::credentials::TokenCredential; +use azure_core::http::{ClientOptions, Transport, Url}; +use azure_identity::{DeveloperToolsCredential, DeveloperToolsCredentialOptions, Executor}; +use azure_storage_blob::{BlobServiceClient, BlobServiceClientOptions}; +use fetch::HttpClient as FetchClient; +use fetch_azure::HttpClient; +use futures::TryStreamExt as _; +use tick::Clock; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // This example needs a live Storage account and developer sign-in, so it + // no-ops when the endpoint is not configured (e.g. in CI). + let Ok(endpoint) = env::var("AZURE_STORAGE_SERVICE_ENDPOINT") else { + println!("AZURE_STORAGE_SERVICE_ENDPOINT is not set; skipping blob listing."); + return Ok(()); + }; + let service_url: Url = endpoint.parse()?; + + // Run developer-credential subprocesses (e.g. the Azure CLI) on a + // tokio-backed `Runtime` used as the credential's `Executor`. + let executor: Arc = Arc::new(Runtime::new(Spawner::new_tokio(), Clock::new_tokio())); + let credential: Arc = + DeveloperToolsCredential::new(Some(DeveloperToolsCredentialOptions { executor: Some(executor) }))?; + + // Use a tokio `fetch` client as the Azure SDK transport. + let transport = Transport::new(HttpClient::from(FetchClient::new_tokio()).into()); + let options = BlobServiceClientOptions { + client_options: ClientOptions { + transport: Some(transport), + ..Default::default() + }, + ..Default::default() + }; + + let client = BlobServiceClient::new(service_url, Some(credential), Some(options))?.blob_container_client("examples"); + + // Enumerate blobs in the "examples" container. + let mut pager = client.list_blobs(None)?; + while let Some(blob) = pager.try_next().await? { + let name = blob.name.as_deref().unwrap_or("(unknown)"); + let content_type = blob.properties.and_then(|properties| properties.content_type); + let content_type = content_type.as_deref().unwrap_or("(unknown)"); + println!("{name} ({content_type})"); + } + + Ok(()) +} diff --git a/crates/fetch_azure/favicon.ico b/crates/fetch_azure/favicon.ico new file mode 100644 index 000000000..82fe3de36 --- /dev/null +++ b/crates/fetch_azure/favicon.ico @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:140215583c71ac6fd9cf69ab5b44e11cbb053916426104e9dd40eeba6b0ae865 +size 23418 diff --git a/crates/fetch_azure/logo.png b/crates/fetch_azure/logo.png new file mode 100644 index 000000000..cf77a7a66 --- /dev/null +++ b/crates/fetch_azure/logo.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:8ac6d1cfa25763c51eb14541fa4defa965e8f16462e9d61e7de7fee6387a2553 +size 67062 diff --git a/crates/fetch_azure/src/client.rs b/crates/fetch_azure/src/client.rs new file mode 100644 index 000000000..0ddf40fb1 --- /dev/null +++ b/crates/fetch_azure/src/client.rs @@ -0,0 +1,148 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! The [`HttpClient`] transport adapter. + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use bytesbuf::BytesView; +use futures::{StreamExt as _, TryStreamExt as _}; +use layered::Service as _; +use typespec_client_core::error::{Error, ErrorKind}; +use typespec_client_core::http::headers::{HeaderName, HeaderValue, Headers}; +use typespec_client_core::http::request::{Body, Request}; +use typespec_client_core::http::response::PinnedStream; +use typespec_client_core::http::{AsyncRawResponse, HttpClient as HttpClientTrait}; + +/// A [`typespec_client_core::http::HttpClient`] backed by a [`fetch::HttpClient`] transport. +/// +/// Construct one from an existing `fetch` client with [`HttpClient::new`] +/// (or via [`From`]), then convert it into an `Arc` via [`From`] +/// / [`Into`] to hand to the Azure SDK: +/// +/// ``` +/// # use std::sync::Arc; +/// # use fetch_azure::HttpClient; +/// # fn wrap(client: fetch::HttpClient) -> Arc { +/// HttpClient::from(client).into() +/// # } +/// ``` +#[derive(Debug, Clone)] +pub struct HttpClient { + client: fetch::HttpClient, +} + +impl HttpClient { + /// Creates a new adapter that forwards requests to the given `fetch` client. + #[must_use] + pub const fn new(client: fetch::HttpClient) -> Self { + Self { client } + } + + /// Converts a typespec [`Request`] into a `fetch` request. + fn to_fetch_request(&self, request: &Request) -> typespec_client_core::Result { + // `Method::as_str` yields a canonical token (e.g. "GET") that `fetch`'s + // builder parses into an `http::Method`; this avoids matching on the + // `#[non_exhaustive]` typespec `Method` enum. + let mut builder = self.client.request(request.method().as_str(), request.url().as_str()); + + for (name, value) in request.headers().iter() { + builder = builder.header(name.as_str(), value.as_str()); + } + + builder.body(self.to_fetch_body(request.body())).build().map_err(|error| { + Error::with_error( + ErrorKind::DataConversion, + error, + "failed to convert the Azure request into a fetch request", + ) + }) + } + + /// Converts a typespec request [`Body`] into a `fetch` [`HttpBody`](fetch::HttpBody). + /// + /// Empty byte bodies reuse a shared empty body, and non-empty byte bodies are + /// wrapped without copying. Seekable streams are forwarded as a chunk stream. + // The empty-body fast path yields a body that is observationally identical to + // the general bytes path (both report a zero-length body), so the + // `is_empty()` guard is an equivalent mutant that no test can distinguish. + #[cfg_attr(test, mutants::skip)] + fn to_fetch_body(&self, body: &Body) -> fetch::HttpBody { + let builder: &fetch::HttpBodyBuilder = self.client.as_ref(); + + match body { + Body::Bytes(bytes) if bytes.is_empty() => builder.empty(), + Body::Bytes(bytes) => builder.bytes(BytesView::from(bytes.clone())), + Body::SeekableStream(stream) => { + let stream = stream.clone().map(|chunk| { + chunk + .map(BytesView::from) + .map_err(|error| fetch::HttpError::unavailable(format!("failed to read the Azure request body: {error}"))) + }); + builder.stream(stream, &fetch::options::HttpBodyOptions::default()) + } + } + } +} + +impl From for HttpClient { + fn from(client: fetch::HttpClient) -> Self { + Self::new(client) + } +} + +impl From for Arc { + fn from(client: HttpClient) -> Self { + Arc::new(client) + } +} + +#[async_trait] +impl HttpClientTrait for HttpClient { + async fn execute_request(&self, request: &Request) -> typespec_client_core::Result { + let request = self.to_fetch_request(request)?; + + let response = self + .client + .execute(request) + .await + .map_err(|error| Error::with_error(ErrorKind::Io, error, "the fetch HTTP client failed to execute the request"))?; + + Ok(to_async_raw_response(response)) + } +} + +/// Converts a `fetch` [`HttpResponse`](fetch::HttpResponse) into an [`AsyncRawResponse`]. +fn to_async_raw_response(response: fetch::HttpResponse) -> AsyncRawResponse { + let (parts, body) = response.into_parts(); + let status = parts.status.as_u16().into(); + let headers = to_headers(&parts.headers); + + let body = body + .into_stream() + .map_ok(|view| view.to_bytes()) + .map_err(|error| Error::with_error(ErrorKind::Io, error, "failed to read the response body")); + let body: PinnedStream = Box::pin(body); + + AsyncRawResponse::new(status, headers, body) +} + +/// Converts an [`http::HeaderMap`] into [`Headers`]. +/// +/// Header values that are not valid UTF-8 are skipped, mirroring the behavior +/// of the built-in `reqwest` transport in the Azure SDK. +fn to_headers(map: &http::HeaderMap) -> Headers { + let headers = map + .iter() + .filter_map(|(name, value)| { + value + .to_str() + .ok() + .map(|value| (HeaderName::from(name.as_str().to_owned()), HeaderValue::from(value.to_owned()))) + }) + .collect::>(); + + Headers::from(headers) +} diff --git a/crates/fetch_azure/src/lib.rs b/crates/fetch_azure/src/lib.rs new file mode 100644 index 000000000..5bff3458a --- /dev/null +++ b/crates/fetch_azure/src/lib.rs @@ -0,0 +1,36 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#![cfg_attr(all(coverage_nightly, test), feature(coverage_attribute))] +#![cfg_attr(docsrs, feature(doc_cfg))] +#![doc(html_logo_url = "https://media.githubusercontent.com/media/microsoft/oxidizer/refs/heads/main/crates/fetch_azure/logo.png")] +#![doc(html_favicon_url = "https://media.githubusercontent.com/media/microsoft/oxidizer/refs/heads/main/crates/fetch_azure/favicon.ico")] + +//! Adapt a [`fetch::HttpClient`] into an Azure SDK HTTP transport. +//! +//! The Azure SDK abstracts its HTTP transport behind the +//! [`typespec_client_core::http::HttpClient`] trait. [`HttpClient`] implements that +//! trait on top of a [`fetch::HttpClient`], so Azure SDK pipelines run over +//! `fetch` and benefit from its resilience and observability. +//! +//! To run the Azure SDK on an `anyspawn`-backed async runtime, see the +//! `anyspawn_azure` crate. +//! +//! # Example +//! +//! ``` +//! use std::sync::Arc; +//! +//! use fetch::HttpClient as FetchClient; +//! use fetch_azure::HttpClient; +//! +//! // Adapt a `fetch` client into an Azure SDK transport. +//! fn transport(client: FetchClient) -> Arc { +//! HttpClient::from(client).into() +//! } +//! # let _ = transport; +//! ``` + +mod client; + +pub use client::HttpClient; diff --git a/crates/fetch_azure/tests/client.rs b/crates/fetch_azure/tests/client.rs new file mode 100644 index 000000000..90dbf8073 --- /dev/null +++ b/crates/fetch_azure/tests/client.rs @@ -0,0 +1,260 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#![allow(missing_docs, reason = "test module")] +#![cfg(not(miri))] // Miri cannot run the tokio-backed fetch transport. + +//! Integration tests for [`fetch_azure::HttpClient`]. +//! +//! These exercise the transport adapter end-to-end using `fetch`'s +//! `FakeHandler`, so no real network access is required. + +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use async_trait::async_trait; +use fetch::fake::FakeHandler; +use fetch::{HttpClient as FetchClient, HttpResponseBuilder}; +use fetch_azure::HttpClient; +use futures::io::AsyncRead; +use typespec_client_core::Bytes; +use typespec_client_core::http::headers::HeaderName; +use typespec_client_core::http::request::{Body, Request}; +use typespec_client_core::http::{HttpClient as HttpClientTrait, Method, Url}; +use typespec_client_core::stream::{BytesStream, SeekableStream}; + +fn request(method: Method) -> Request { + Request::new(Url::parse("https://example.com/path").expect("valid url"), method) +} + +/// A handler that always responds with the given status code and an empty body. +fn status_handler(status: u16) -> FakeHandler { + FakeHandler::from_fn(move |_request| HttpResponseBuilder::new_fake().status(status).build()) +} + +#[tokio::test] +async fn execute_request_maps_status_headers_and_body() { + let handler = FakeHandler::from_fn(|_request| { + HttpResponseBuilder::new_fake() + .status(201u16) + .header("x-test", "hello") + .text("world") + .build() + }); + let client = HttpClient::new(FetchClient::new_fake(handler)); + + let response = client.execute_request(&request(Method::Get)).await.unwrap(); + + assert_eq!(response.status(), 201u16); + assert_eq!(response.headers().get_optional_str(&HeaderName::from("x-test")), Some("hello")); + + let body = response.into_body().collect().await.unwrap(); + assert_eq!(&*body, b"world"); +} + +#[tokio::test] +async fn execute_request_forwards_method_and_bytes_body() { + // The handler echoes the request body back, but only for POST requests. + let handler = FakeHandler::from_async_fn(|request| async move { + if request.method().as_str() != "POST" { + return HttpResponseBuilder::new_fake().status(400u16).build(); + } + + let body = request.into_body().into_bytes().await?; + HttpResponseBuilder::new_fake().status(200u16).bytes(body).build() + }); + let client = HttpClient::new(FetchClient::new_fake(handler)); + + let mut request = request(Method::Post); + request.set_body(Bytes::from_static(b"payload")); + + let response = client.execute_request(&request).await.unwrap(); + + assert_eq!(response.status(), 200u16); + let body = response.into_body().collect().await.unwrap(); + assert_eq!(&*body, b"payload"); +} + +#[tokio::test] +async fn execute_request_forwards_seekable_stream_body() { + let handler = FakeHandler::from_async_fn(|request| async move { + let body = request.into_body().into_bytes().await?; + HttpResponseBuilder::new_fake().status(200u16).bytes(body).build() + }); + let client = HttpClient::new(FetchClient::new_fake(handler)); + + let mut request = request(Method::Put); + request.set_body(BytesStream::new(Bytes::from_static(b"streamed"))); + + let response = client.execute_request(&request).await.unwrap(); + + assert_eq!(response.status(), 200u16); + let body = response.into_body().collect().await.unwrap(); + assert_eq!(&*body, b"streamed"); +} + +#[tokio::test] +async fn execute_request_forwards_request_headers() { + let handler = FakeHandler::from_fn(|request| { + let forwarded = request.headers().get("x-correlation").and_then(|value| value.to_str().ok()) == Some("abc123"); + let status = if forwarded { 200u16 } else { 400u16 }; + HttpResponseBuilder::new_fake().status(status).build() + }); + let client = HttpClient::new(FetchClient::new_fake(handler)); + + let mut request = request(Method::Get); + request.insert_header("x-correlation", "abc123"); + + let response = client.execute_request(&request).await.unwrap(); + + assert_eq!(response.status(), 200u16); +} + +#[tokio::test] +async fn execute_request_maps_all_methods() { + for method in [Method::Delete, Method::Get, Method::Head, Method::Patch, Method::Post, Method::Put] { + let expected = method.as_str(); + let handler = FakeHandler::from_fn(move |request| { + let status = if request.method().as_str() == expected { 200u16 } else { 400u16 }; + HttpResponseBuilder::new_fake().status(status).build() + }); + let client = HttpClient::new(FetchClient::new_fake(handler)); + + let response = client.execute_request(&request(method)).await.unwrap(); + + assert_eq!(response.status(), 200u16, "method {method:?} was not forwarded"); + } +} + +#[tokio::test] +async fn execute_request_maps_transport_error() { + let handler = FakeHandler::from_error_fn(|_request| fetch::HttpError::unavailable("simulated transport failure")); + let client = HttpClient::new(FetchClient::new_fake(handler)); + + let error = client.execute_request(&request(Method::Get)).await.unwrap_err(); + + assert!( + error.to_string().contains("the fetch HTTP client failed to execute the request"), + "unexpected error: {error}" + ); +} + +#[tokio::test] +async fn azure_http_client_converts_into_dyn_client() { + let client: Arc = HttpClient::from(FetchClient::new_fake(status_handler(202))).into(); + + let response = client.execute_request(&request(Method::Get)).await.unwrap(); + + assert_eq!(response.status(), 202u16); +} + +#[tokio::test] +async fn execute_request_maps_request_build_failure() { + let client = HttpClient::new(FetchClient::new_fake(status_handler(200))); + + // A header value containing a control character is rejected by the `http` + // crate when the fetch request is built, exercising the DataConversion path. + let mut request = request(Method::Get); + request.insert_header("x-invalid", "bad\nvalue"); + + let error = client.execute_request(&request).await.unwrap_err(); + + assert!( + error + .to_string() + .contains("failed to convert the Azure request into a fetch request"), + "unexpected error: {error}" + ); +} + +#[tokio::test] +async fn execute_request_skips_non_utf8_response_headers() { + let handler = FakeHandler::from_fn(|_request| { + let binary = fetch::HeaderValue::from_bytes(&[0xff, 0xfe]).expect("valid header value bytes"); + HttpResponseBuilder::new_fake() + .status(200u16) + .header("x-valid", "ok") + .header("x-binary", binary) + .build() + }); + let client = HttpClient::new(FetchClient::new_fake(handler)); + + let response = client.execute_request(&request(Method::Get)).await.unwrap(); + + assert_eq!(response.headers().get_optional_str(&HeaderName::from("x-valid")), Some("ok")); + assert_eq!(response.headers().get_optional_str(&HeaderName::from("x-binary")), None); +} + +#[tokio::test] +async fn execute_request_maps_seekable_stream_read_error() { + let handler = FakeHandler::from_async_fn(|request| async move { + // Reading the body drives the erroring stream, surfacing the failure. + request.into_body().into_bytes().await?; + HttpResponseBuilder::new_fake().status(200u16).build() + }); + let client = HttpClient::new(FetchClient::new_fake(handler)); + + let mut request = request(Method::Post); + request.set_body(Body::SeekableStream(Box::new(ErroringStream))); + + let error = client.execute_request(&request).await.unwrap_err(); + + assert!( + error_chain(&error).contains("failed to read the Azure request body"), + "unexpected error: {error}" + ); +} + +#[tokio::test] +async fn execute_request_maps_response_body_read_error() { + let handler = FakeHandler::from_fn(|_request| { + let body = fetch::HttpBodyBuilder::new_fake().stream( + futures::stream::iter([Err(fetch::HttpError::unavailable("boom"))]), + &fetch::options::HttpBodyOptions::default(), + ); + HttpResponseBuilder::new_fake().status(200u16).body(body).build() + }); + let client = HttpClient::new(FetchClient::new_fake(handler)); + + let response = client.execute_request(&request(Method::Get)).await.unwrap(); + let error = response.into_body().collect().await.unwrap_err(); + + assert!( + error.to_string().contains("failed to read the response body"), + "unexpected error: {error}" + ); +} + +/// A [`SeekableStream`] whose reads always fail, used to cover the request-body error path. +#[derive(Debug, Clone)] +struct ErroringStream; + +impl AsyncRead for ErroringStream { + fn poll_read(self: Pin<&mut Self>, _cx: &mut Context<'_>, _buf: &mut [u8]) -> Poll> { + Poll::Ready(Err(std::io::Error::other("boom"))) + } +} + +#[async_trait] +impl SeekableStream for ErroringStream { + async fn reset(&mut self) -> typespec_client_core::Result<()> { + Ok(()) + } + + fn len(&self) -> Option { + None + } +} + +/// Joins an error and its `source` chain into a single string for assertions. +fn error_chain(error: &dyn std::error::Error) -> String { + let mut chain = error.to_string(); + let mut source = error.source(); + while let Some(cause) = source { + chain.push_str(" | "); + chain.push_str(&cause.to_string()); + source = cause.source(); + } + chain +}