From f7b1b982b9fe539ae66dcb8255b10302ccb43674 Mon Sep 17 00:00:00 2001 From: Peyton Date: Thu, 5 Mar 2026 12:02:34 -0800 Subject: [PATCH] refactor: extract KVP into standalone libazureinit-kvp crate --- Cargo.toml | 1 + doc/kvp_design.md | 384 +++++++++++ doc/libazurekvp.md | 406 +++++++++--- libazureinit-kvp/Cargo.toml | 26 + libazureinit-kvp/src/diagnostics.rs | 302 +++++++++ libazureinit-kvp/src/hyperv.rs | 458 +++++++++++++ libazureinit-kvp/src/lib.rs | 209 ++++++ libazureinit-kvp/src/memory.rs | 99 +++ libazureinit-kvp/src/provisioning.rs | 312 +++++++++ libazureinit-kvp/src/tracing_layer.rs | 387 +++++++++++ libazureinit/Cargo.toml | 3 +- libazureinit/src/kvp.rs | 902 -------------------------- libazureinit/src/lib.rs | 1 - libazureinit/src/logging.rs | 161 +---- src/main.rs | 30 +- 15 files changed, 2530 insertions(+), 1151 deletions(-) create mode 100644 doc/kvp_design.md create mode 100644 libazureinit-kvp/Cargo.toml create mode 100644 libazureinit-kvp/src/diagnostics.rs create mode 100644 libazureinit-kvp/src/hyperv.rs create mode 100644 libazureinit-kvp/src/lib.rs create mode 100644 libazureinit-kvp/src/memory.rs create mode 100644 libazureinit-kvp/src/provisioning.rs create mode 100644 libazureinit-kvp/src/tracing_layer.rs delete mode 100644 libazureinit/src/kvp.rs diff --git a/Cargo.toml b/Cargo.toml index 31693d8e..83d7e630 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ path = "tests/functional_tests.rs" [workspace] members = [ "libazureinit", + "libazureinit-kvp", ] [features] diff --git a/doc/kvp_design.md b/doc/kvp_design.md new file mode 100644 index 00000000..391e0121 --- /dev/null +++ b/doc/kvp_design.md @@ -0,0 +1,384 @@ +# Design: azure-init KVP APIs + +## Motivation + +**Need to External Use.** Other Linux distros +that want to emit Hyper-V KVP data had to pull in the full `libazureinit` +crate -- with all its dependencies on tokio, OpenTelemetry, IMDS, and +configuration -- just to write a key-value pair. There was no way to use +KVP storage without also setting up the tracing subscriber, the async +runtime, and the graceful shutdown machinery. + +**Unnecessarily complex lifecycle.** The async channel + background writer +pattern required callers to manage a `CancellationToken`, await a +`JoinHandle`, and call `close().await` to drain buffered writes. + +**Unclear boundaries.** Encoding, splitting, tracing, provisioning reports, +and file I/O were interleaved in the same structs and methods. Someone +looking at how diagnostics are structured had to also understand binary +record encoding. Someone looking at provisioning reports had to understand +the tracing layer's `health_report` detection logic. + +### Why the new architecture is better + +The rearchitecture extracts KVP into its own crate (`libazureinit-kvp`) +with a layered design where each layer has a single, clearly scoped +responsibility: + +- Simple external usage, as other distros depend on `libazureinit-kvp`. + +- Synchronous I/O, due to replacing the async channel + background writer with + direct `flock` + write + unlock per operation. No `close()`, no + `CancellationToken`, no `JoinHandle`. Writes go to disk immediately. + +- Testable at every layer. Any layer can be tested against + `InMemoryKvpStore` (a `HashMap`-backed test double) without touching the + filesystem. Binary encoding tests are isolated in `HyperVKvpStore`. + +- There is a clear separation of concerns. Storage knows nothing about + diagnostics, diagnostics knows nothing about tracing, and provisioning + reports know nothing about either. Each layer can be understood, modified, + and tested independently. + +## Implementation Approach + +The new API is designed around two goals: + +1. **Keep external usage simple and explicit.** External callers get a + small, dependency-light crate with straightforward synchronous APIs. +2. **Preserve azure-init's internal tracing-based emission path.** + `setup_layers` continues to wire a `TracingKvpLayer` into the tracing + subscriber stack, so `#[instrument]` and `event!` macros emit KVP data + automatically. + +## Crate Structure + +KVP is extracted into its own workspace crate with a clear dependency +graph: + +``` +azure-init ──► libazureinit ──► libazureinit-kvp +``` + +`libazureinit` depends on `libazureinit-kvp` via a workspace path +dependency. `azure-init` (the binary) depends only on `libazureinit`. +External callers can depend on `libazureinit-kvp` directly without +pulling in the rest of the azure-init stack. + +### Workspace layout + +``` +azure-init/ # workspace root +├── Cargo.toml # [workspace] members +├── src/main.rs # azure-init binary +├── libazureinit/ +│ ├── Cargo.toml # depends on libazureinit-kvp +│ └── src/ +│ ├── lib.rs +│ ├── logging.rs # wires TracingKvpLayer into subscriber +│ ├── health.rs # wireserver reporting (unchanged) +│ ├── error.rs # error types with report encoding +│ └── ... +└── libazureinit-kvp/ + ├── Cargo.toml + └── src/ + ├── lib.rs # KvpStore trait, KvpOptions, Kvp + ├── hyperv.rs # HyperVKvpStore, binary encode/decode + ├── memory.rs # InMemoryKvpStore (test double) + ├── diagnostics.rs # DiagnosticEvent, DiagnosticsKvp + ├── tracing_layer.rs # TracingKvpLayer, StringVisitor + └── provisioning.rs # ProvisioningReport +``` + +`libazureinit-kvp` has a minimal dependency footprint: `chrono`, `csv`, +`fs2`, `sysinfo`, `tracing`, `tracing-subscriber`, `uuid`. No tokio, no +OpenTelemetry, no configuration system. + +## Layered Architecture + +The library is organized into four layers, each with a single +responsibility. Higher layers depend only on the `KvpStore` trait, never +on a concrete implementation: + +``` +┌─────────────────────────────────────────────────┐ +│ Kvp (top-level client, wires layers) │ +├─────────────────────────────────────────────────┤ +│ Layer 3: ProvisioningReport │ +│ Typed accessor for PROVISIONING_REPORT│ +├─────────────────────────────────────────────────┤ +│ Layer 2: TracingKvpLayer │ +│ tracing_subscriber::Layer impl │ +├─────────────────────────────────────────────────┤ +│ Layer 1: DiagnosticsKvp │ +│ Typed diagnostic events, splitting │ +├─────────────────────────────────────────────────┤ +│ Layer 0: KvpStore trait │ +│ ┌──────────────────┐ ┌─────────────────────┐ │ +│ │ HyperVKvpStore │ │ InMemoryKvpStore │ │ +│ │ (production) │ │ (test double) │ │ +│ └──────────────────┘ └─────────────────────┘ │ +└─────────────────────────────────────────────────┘ +``` + +For detailed API signatures, struct definitions, and code examples for +each layer, see the +[Architecture](./libazurekvp.md#architecture) section in `libazurekvp.md`. + +### Layer 0 -- `KvpStore` trait + +Core storage abstraction (`write`, `read`, `entries`, `delete`). Two +implementations: + +- **`HyperVKvpStore`** -- production. Reads and writes the binary Hyper-V + pool-file format: fixed-size 2,560-byte records (512-byte key + + 2,048-byte value, zero-padded). Concurrency via `flock` (exclusive for + writes, shared for reads). +- **`InMemoryKvpStore`** -- test double. `HashMap`-backed, no filesystem. + Implements `Clone` via `Arc>` so clones share state. + +`write()` at this layer writes exactly one record per call. Value +splitting across multiple records is **not** handled here -- that is the +responsibility of higher layers that understand their data semantics. + +### Layer 1 -- `DiagnosticsKvp` + +Typed diagnostic event emission. Handles key generation using the format +`{event_prefix}|{vm_id}|{level}|{name}|{span_id}` +and value splitting at the 1,022-byte Azure platform read limit (UTF-16: +511 characters + null terminator). This keeps `HyperVKvpStore` simple +while preserving the chunking semantics the host expects. + +### Layer 2 -- `TracingKvpLayer` + +A `tracing_subscriber::Layer` that translates `#[instrument]` spans and +`event!` calls into `DiagnosticsKvp::emit()` calls. Detects +`health_report` fields and writes them as `PROVISIONING_REPORT` entries +directly to the store. + +### Layer 3 -- `ProvisioningReport` + +Typed accessor for the `PROVISIONING_REPORT` KVP key with +`write_to()` / `read_from()` and `success()` / `error()` constructors. +The wire format (pipe-delimited `key=value` segments) is fully compatible +with the existing `health.rs` `encode_report` output. + +### `Kvp` client + +Wires all layers together with public fields (`store`, `diagnostics`, +`tracing_layer`). Constructed via `Kvp::with_options(KvpOptions)` +(production) or `Kvp::from_store(store, vm_id, event_prefix)` (testing). + +## Design Principles + +### Non-fatal error handling + +KVP is a telemetry side-channel. It must never block or fail +provisioning. All KVP operations return `io::Result`, but callers +(including azure-init itself) use `let _ =` to discard errors after +logging. If KVP initialization fails entirely, `setup_layers` logs an +error and continues without the KVP layer. + +### Two-consumer model + +The library serves two distinct consumers with different needs: + +1. **External callers** (other distros, provisioning agents) -- use the + `Kvp` client API directly. They get synchronous, explicit calls with + no filtering, no config, no tracing subscriber setup. They own their + own lifecycle. + +2. **azure-init** -- uses the `TracingKvpLayer` wired into the tracing + subscriber via `setup_layers()`. KVP emission is automatic through + `#[instrument]` and `event!`. Filtering, config, and subscriber + orchestration are handled by `logging.rs`. + +This separation is documented in detail in +[What the Crate Provides vs. What azure-init Adds](./libazurekvp.md#what-the-crate-provides-vs-what-azure-init-adds) +in `libazurekvp.md`. + +### Testability via trait generics + +Every layer above Layer 0 is generic over `S: KvpStore`. This means +diagnostics, tracing, and provisioning report logic can all be tested +against `InMemoryKvpStore` without touching the filesystem. Only +`HyperVKvpStore` tests need temp files. The test suite has 41 tests +covering all layers. + +### `KvpOptions` + +Configures the production `Kvp` client: + +| Field | Type | Default | +|-------|------|---------| +| `vm_id` | `Option` | `None` (required -- caller must set) | +| `event_prefix` | `Option` | `None` (falls back to `EVENT_PREFIX`) | +| `file_path` | `PathBuf` | `/var/lib/hyperv/.kvp_pool_1` | +| `truncate_on_start` | `bool` | `true` | + +## Initialization Flow + +`Kvp::with_options(options)` performs: + +1. Validate that `vm_id` is present (return error if `None`). +2. Resolve `event_prefix` (use provided value or fall back to + `EVENT_PREFIX`, which is `"azure-init-{version}"`). +3. Create `HyperVKvpStore` pointing at `options.file_path`. +4. If `truncate_on_start` is `true`, call `store.truncate_if_stale()` to + clear records from previous boots. +5. Call `Kvp::from_store(store, vm_id, event_prefix)` to wire layers. +6. Return the initialized client. + +No background task is spawned. No channel is created. No shutdown token +is needed. + +For details on the truncation and flock semantics, see +[Truncation and Locking](./libazurekvp.md#truncation-and-locking) in +`libazurekvp.md`. + +## External Caller Model + +For external callers (other distros, provisioning agents), the API is +intentionally simple -- no async runtime, no tracing setup, no +configuration system: + +1. Construct a client (`Kvp::with_options`) +2. Emit diagnostics (`kvp.diagnostics.emit(...)`) and/or write + provisioning reports (`ProvisioningReport::success(...).write_to(...)`) +3. Done. No `close()`, no `await`, no shutdown. + +### Example 1: Minimal external caller + +```rust +use libazureinit_kvp::{Kvp, KvpOptions, ProvisioningReport}; + +fn main() -> std::io::Result<()> { + let vm_id = "00000000-0000-0000-0000-000000000001"; + let kvp = Kvp::with_options( + KvpOptions::default().vm_id(vm_id), + )?; + + ProvisioningReport::success(vm_id).write_to(&kvp.store)?; + Ok(()) +} +``` + +### Example 2: Full provisioning flow with diagnostics + +```rust +use libazureinit_kvp::{Kvp, KvpOptions, DiagnosticEvent, ProvisioningReport}; +use chrono::Utc; + +fn provision_vm(vm_id: &str) -> Result<(), String> { + // ... actual provisioning logic ... + Ok(()) +} + +fn main() { + let vm_id = "00000000-0000-0000-0000-000000000001"; + + let kvp = match Kvp::with_options(KvpOptions::default().vm_id(vm_id)) { + Ok(k) => k, + Err(e) => { + eprintln!("KVP init failed (non-fatal): {e}"); + return; + } + }; + + // Signal that provisioning is in progress + let _ = kvp.diagnostics.emit(&DiagnosticEvent { + level: "INFO".into(), + name: "provision:start".into(), + span_id: "main".into(), + message: format!("Provisioning in progress for vm_id={vm_id}"), + timestamp: Utc::now(), + }); + + match provision_vm(vm_id) { + Ok(()) => { + let _ = kvp.diagnostics.emit(&DiagnosticEvent { + level: "INFO".into(), + name: "provision:complete".into(), + span_id: "main".into(), + message: "Provisioning completed successfully".into(), + timestamp: Utc::now(), + }); + let _ = ProvisioningReport::success(vm_id) + .write_to(&kvp.store); + } + Err(reason) => { + let _ = kvp.diagnostics.emit(&DiagnosticEvent { + level: "ERROR".into(), + name: "provision:failed".into(), + span_id: "main".into(), + message: format!("Provisioning failed: {reason}"), + timestamp: Utc::now(), + }); + let _ = ProvisioningReport::error(vm_id, &reason) + .write_to(&kvp.store); + } + } +} +``` + +Note: all KVP operations use `let _ =` because KVP errors are non-fatal +and must never block provisioning. + +### Example 3: Custom identity and file path + +```rust +use libazureinit_kvp::{Kvp, KvpOptions, DiagnosticEvent}; +use chrono::Utc; + +fn main() -> std::io::Result<()> { + let kvp = Kvp::with_options( + KvpOptions::default() + .vm_id("00000000-0000-0000-0000-000000000042") + .event_prefix("my-service-1.0") + .file_path("/tmp/kvp_pool_test") + .truncate_on_start(false), + )?; + + kvp.diagnostics.emit(&DiagnosticEvent { + level: "DEBUG".into(), + name: "test:message".into(), + span_id: "test".into(), + message: "integration test message".into(), + timestamp: Utc::now(), + })?; + + Ok(()) +} +``` + +## azure-init Internal Tracing Path + +azure-init itself does not use the external caller model above. +Instead, it uses `setup_layers()` in `libazureinit::logging` to wire the +`TracingKvpLayer` into the tracing subscriber stack. This means all +existing `#[instrument]` and `event!` instrumentation automatically emits +KVP data without any code changes. + +For details on how `setup_layers` constructs the subscriber, filter +precedence, and the separation between what the kvp crate provides vs. +what azure-init adds on top, see +[What the Crate Provides vs. What azure-init Adds](./libazurekvp.md#what-the-crate-provides-vs-what-azure-init-adds) +and +[Integration with azure-init](./libazurekvp.md#integration-with-azure-init) +in `libazurekvp.md`. + +For configuration knobs (`telemetry.kvp_diagnostics`, +`telemetry.kvp_filter`, `AZURE_INIT_KVP_FILTER`, `AZURE_INIT_LOG`), see +[Configuration](./libazurekvp.md#configuration) in `libazurekvp.md`. + +## Key Differences from Previous Design + +| Concern | Previous | Current | +|---------|----------|---------| +| Storage | Async channel to background writer task | Synchronous `flock` + write + unlock per call | +| External API | Required tokio, `close().await`, shutdown tokens | Synchronous, no runtime, no lifecycle management | +| Tracing coupling | `EmitKVPLayer` owned channel, encoding, and `Layer` impl | `TracingKvpLayer` is a thin adapter over `DiagnosticsKvp` | +| Provisioning reports | Encoded inline in `emit_health_report` | `ProvisioningReport` struct with `write_to()`/`read_from()` | +| Testability | Tests required tempfiles and real binary format | Any layer testable against `InMemoryKvpStore` | +| Crate boundary | Everything in `libazureinit::kvp` (private module) | Standalone `libazureinit-kvp` crate with public API | +| Dependency weight | Full `libazureinit` (tokio, OpenTelemetry, reqwest, etc.) | Minimal (`chrono`, `csv`, `fs2`, `tracing`, `uuid`) | diff --git a/doc/libazurekvp.md b/doc/libazurekvp.md index 476bdcb1..cafecab6 100644 --- a/doc/libazurekvp.md +++ b/doc/libazurekvp.md @@ -1,126 +1,372 @@ -# Azure-init Tracing System +# libazureinit-kvp: Layered KVP Architecture ## Overview -Azure-init implements a comprehensive tracing system that captures detailed information about the provisioning process. -This information is crucial for monitoring, debugging, and troubleshooting VM provisioning issues in Azure environments. -The tracing system is built on a multi-layered architecture that provides flexibility and robustness. +`libazureinit-kvp` is a standalone workspace crate that provides a layered +library for Hyper-V KVP (Key-Value Pair) storage. It replaces the former +`kvp.rs` module in `libazureinit` with independently testable +layers and synchronous, flock-based I/O. + +The crate is consumed by `libazureinit` (via `logging.rs`) and is also +available to external callers who want to emit KVP diagnostics or +provisioning reports. ## Architecture -The tracing architecture consists of four specialized layers, each handling a specific aspect of the tracing process: +The library is organized into four layers, stacked from low-level storage +up to tracing integration: + +``` +┌─────────────────────────────────────────────────┐ +│ Kvp (top-level client, wires layers) │ +├─────────────────────────────────────────────────┤ +│ Layer 3: ProvisioningReport │ +│ Typed accessor for PROVISIONING_REPORT│ +├─────────────────────────────────────────────────┤ +│ Layer 2: TracingKvpLayer │ +│ tracing_subscriber::Layer impl │ +├─────────────────────────────────────────────────┤ +│ Layer 1: DiagnosticsKvp │ +│ Typed diagnostic events, splitting │ +├─────────────────────────────────────────────────┤ +│ Layer 0: KvpStore trait │ +│ ┌──────────────────┐ ┌─────────────────────┐ │ +│ │ HyperVKvpStore │ │ InMemoryKvpStore │ │ +│ │ (production) │ │ (test double) │ │ +│ └──────────────────┘ └─────────────────────┘ │ +└─────────────────────────────────────────────────┘ +``` + +### Layer 0: `KvpStore` trait + +The fundamental storage abstraction. All higher layers are generic over +`S: KvpStore`, making them testable without the filesystem. + +```rust +pub trait KvpStore: Send + Sync { + fn write(&self, key: &str, value: &str) -> io::Result<()>; + fn read(&self, key: &str) -> io::Result>; + fn entries(&self) -> io::Result>; + fn delete(&self, key: &str) -> io::Result; +} +``` + +`HyperVKvpStore` — Production implementation that reads and writes the +binary Hyper-V pool file (`/var/lib/hyperv/.kvp_pool_1`). Each record is +2,560 bytes (512-byte key + 2,048-byte value). Concurrency is handled via +`flock` (shared locks for reads, exclusive locks for writes). -### 1. EmitKVPLayer +`InMemoryKvpStore` — `HashMap`-backed test double. Implements +`Clone` (via `Arc>`), so clones share state, matching the +semantics expected by higher layers. -**Purpose**: Processes spans and events by capturing metadata, generating key-value pairs (KVPs), and writing to Hyper-V's data exchange file. +### Layer 1: `DiagnosticsKvp` -**Key Functions**: -- Captures span lifecycle events (creation, entry, exit, closing) -- Processes emitted events within spans -- Formats data as KVPs for Hyper-V consumption -- Writes encoded data to `/var/lib/hyperv/.kvp_pool_1` +Provides typed access to diagnostic events. Handles: -Additionally, events emitted with a `health_report` field are written as special provisioning reports using the key `PROVISIONING_REPORT`. +- Key generation using the format + `{event_prefix}|{vm_id}|{level}|{name}|{span_id}` +- Value splitting at the 1,022-byte Azure platform read limit +- Parsing diagnostic keys back into `DiagnosticEvent` structs -**Integration with Azure**: -- The `/var/lib/hyperv/.kvp_pool_1` file is monitored by the Hyper-V `hv_kvp_daemon` service -- This enables key metrics and logs to be transferred from the VM to the Azure platform -- Administrators can access this data through the Azure portal or API +```rust +kvp.diagnostics.emit(&DiagnosticEvent { + level: "INFO".into(), + name: "provision:user".into(), + span_id: "abc-123".into(), + message: "User created".into(), + timestamp: Utc::now(), +})?; +``` -### 2. OpenTelemetryLayer +### Layer 2: `TracingKvpLayer` -**Purpose**: Propagates tracing context and prepares span data for export. +A `tracing_subscriber::Layer` implementation that automatically converts +`tracing` spans and events into KVP diagnostic entries. It: -**Key Functions**: -- Maintains distributed tracing context across service boundaries -- Exports standardized trace data to compatible backends -- Enables integration with broader monitoring ecosystems +- Detects events with a `health_report` field and writes them directly + to the store under the `PROVISIONING_REPORT` key +- Converts all other events into `DiagnosticEvent` structs and calls + `DiagnosticsKvp::emit` +- Tracks span start/end times and emits timing entries on span close -### 3. Stderr Layer +This layer is registered alongside the other tracing layers (stderr, +file, OpenTelemetry) in `setup_layers`. -**Purpose**: Formats and logs trace data to stderr. +### Layer 3: `ProvisioningReport` -**Key Functions**: -- Provides human-readable logging for immediate inspection -- Supports debugging during development -- Captures trace events even when other layers might fail +A typed accessor for the `PROVISIONING_REPORT` KVP key used by the +Azure platform. Supports: -### 4. File Layer +- `ProvisioningReport::success(vm_id)` — builds a success report +- `ProvisioningReport::error(vm_id, reason)` — builds + an error report +- `report.write_to(&store)` / `ProvisioningReport::read_from(&store)` — + serialization via the pipe-delimited wire format -**Purpose**: Writes formatted logs to a file (default path: `/var/log/azure-init.log`). +The wire format is fully compatible with the existing `health.rs` +`encode_report` output. -**Key Functions**: -- Provides a persistent log for post-provisioning inspection -- Uses file permissions `0600` when possible -- Log level controlled by `AZURE_INIT_LOG` (defaults to `info` for the file layer) +### `Kvp` Client -## How the Layers Work Together +The top-level struct that wires the layers together: -Despite operating independently, these layers collaborate to provide comprehensive tracing: +```rust +pub struct Kvp { + pub store: S, + pub diagnostics: DiagnosticsKvp, + pub tracing_layer: TracingKvpLayer, +} +``` -1. **Independent Processing**: Each layer processes spans and events without dependencies on other layers -2. **Ordered Execution**: Layers are executed in the order they are registered in `setup_layers` (stderr, OpenTelemetry, KVP if enabled, file if available) -3. **Complementary Functions**: Each layer serves a specific purpose in the tracing ecosystem: - - `EmitKVPLayer` focuses on Azure Hyper-V integration - - `OpenTelemetryLayer` handles standardized tracing and exports - - `Stderr Layer` provides immediate visibility for debugging +Constructors: -### Configuration +- `Kvp::with_options(KvpOptions)` — production path, creates a + `Kvp`, requires `vm_id` to be set +- `Kvp::from_store(store, vm_id, event_prefix)` — generic constructor + for any `KvpStore` implementation (useful for testing) -The tracing system's behavior is controlled through configuration files and environment variables, allowing more control over what data is captured and where it's sent: +## What the Crate Provides vs. What azure-init Adds -- `telemetry.kvp_diagnostics` (config): Enables/disables KVP emission. Default: `true`. -- `telemetry.kvp_filter` (config): Optional `EnvFilter`-style directives to select which spans/events go to KVP. -- `azure_init_log_path.path` (config): Target path for the file layer. Default: `/var/log/azure-init.log`. -- `AZURE_INIT_KVP_FILTER` (env): Overrides `telemetry.kvp_filter`. Precedence: env > config > default. -- `AZURE_INIT_LOG` (env): Controls stderr and file fmt layers’ levels (defaults: stderr=`error`, file=`info`). +### libazureinit-kvp (standalone) -The KVP layer uses a conservative default filter aimed at essential provisioning signals; adjust that via the settings above as needed. -For more on how to use these configuration variables, see the [configuration documentation](./configuration.md#complete-configuration-example). +External callers depend on `libazureinit-kvp` directly and get: -## Practical Usage +- `KvpStore` trait + `HyperVKvpStore` + `InMemoryKvpStore` +- `DiagnosticsKvp` for typed diagnostic events with value splitting +- `TracingKvpLayer` for automatic tracing-to-KVP bridging +- `ProvisioningReport` for reading/writing provisioning reports +- `Kvp` client that wires the layers together +- `KvpOptions` builder for production construction -### Instrumenting Functions +The crate has **no filtering, no config system, and no awareness of +azure-init's log levels or environment variables**. It emits every +span/event that reaches the `TracingKvpLayer`. Callers are responsible +for applying their own `tracing_subscriber::EnvFilter` (or other filter) +via `.with_filter(...)` on the `TracingKvpLayer` if they want selective +emission. -To instrument code with tracing, use the `#[instrument]` attribute on functions: +Example for an external caller: ```rust -use tracing::{instrument, Level, event}; +use libazureinit_kvp::{Kvp, KvpOptions}; +use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Registry}; + +let kvp = Kvp::with_options( + KvpOptions::default().vm_id("my-vm-id"), +)?; + +let subscriber = Registry::default().with( + kvp.tracing_layer.with_filter(EnvFilter::new("info")), +); +``` + +### azure-init / libazureinit (via `logging.rs`) + +azure-init adds orchestration and policy on top of the raw kvp crate: + +- `setup_layers()`: wires `TracingKvpLayer` alongside stderr, file, + and OpenTelemetry layers into a single tracing subscriber. +- KVP filter resolution with three-tier precedence: + `AZURE_INIT_KVP_FILTER` env var > `telemetry.kvp_filter` config > + hardcoded default filter (conservative, provisioning-signal-only). +- vm_id resolution via `get_vm_id()` (reads DMI/SMBIOS data) before + constructing the `Kvp` client — the kvp crate itself does not perform + platform-specific ID lookups. +- config-driven enable/disable (`telemetry.kvp_diagnostics`): when + `false`, the KVP layer is not registered at all. + +This separation means the kvp crate stays dependency-light and +platform-agnostic (beyond the Hyper-V pool file format), while +azure-init owns the policy decisions about what gets logged where. + +## Integration with azure-init + +### `logging.rs` + +`setup_layers` creates the `Kvp` client and registers +`kvp.tracing_layer` as one of the subscriber layers: + +```rust +pub fn setup_layers( + vm_id: &str, + config: &Config, +) -> Result, anyhow::Error> +``` + +The function no longer requires a `CancellationToken` or returns a +`JoinHandle` — all KVP I/O is synchronous. + +### `main.rs` + +The KVP shutdown block (`graceful_shutdown.cancel()`, `handle.await`) +has been removed. The `main` function simply calls `setup_layers` and +uses the returned subscriber directly. + +### `health.rs` / `error.rs` + +These files are unchanged. The `encode_report` function in `health.rs` +continues to format the pipe-delimited report string that flows through +the tracing layer to KVP via the `health_report` field detection. + +## Configuration + +- `telemetry.kvp_diagnostics` (config): Enables/disables KVP emission. + Default: `true`. +- `telemetry.kvp_filter` (config): Optional `EnvFilter`-style directives + to select which spans/events go to KVP. +- `AZURE_INIT_KVP_FILTER` (env): Overrides `telemetry.kvp_filter`. + Precedence: env > config > default. +- `AZURE_INIT_LOG` (env): Controls stderr and file layer log levels + (defaults: stderr=`error`, file=`info`). + +The KVP layer uses a conservative default filter aimed at essential +provisioning signals. See the +[configuration documentation](./configuration.md#complete-configuration-example) +for details. + +## Truncation and Locking + +On startup, `Kvp::with_options` calls `HyperVKvpStore::truncate_if_stale`, +which checks the pool file's mtime against the system uptime. If the file +predates the current boot, it is truncated to discard stale data from a +previous session. This operation uses an exclusive flock; if the lock +cannot be acquired, initialization continues without truncation. + +All subsequent writes use per-operation exclusive flocks to ensure +safe concurrent access from multiple threads or processes. + +## Usage Examples + +### Using the KVP Client API + +```rust +use libazureinit_kvp::{Kvp, KvpOptions, DiagnosticEvent, ProvisioningReport}; +use chrono::Utc; + +fn main() -> std::io::Result<()> { + let vm_id = "00000000-0000-0000-0000-000000000001"; + let kvp = Kvp::with_options( + KvpOptions::default().vm_id(vm_id), + )?; + + // Emit a diagnostic event + kvp.diagnostics.emit(&DiagnosticEvent { + level: "INFO".into(), + name: "provision:start".into(), + span_id: "span-1".into(), + message: "Provisioning started".into(), + timestamp: Utc::now(), + })?; + + // Write a provisioning report + let report = ProvisioningReport::success(vm_id); + report.write_to(&kvp.store)?; -#[instrument(fields(user_id = ?user.id))] -async fn provision_user(user: User) -> Result<(), Error> { - event!(Level::INFO, "Starting user provisioning"); - - // Function logic - - event!(Level::INFO, "User provisioning completed successfully"); Ok(()) } ``` -### Emitting Events +### Full Provisioning Flow Example -To record specific points within a span: +A more realistic example showing how to emit diagnostics and +provisioning reports through a provision-then-report workflow: ```rust -use tracing::{event, Level}; - -fn configure_ssh_keys(user: &str, keys: &[String]) { - event!(Level::INFO, user = user, key_count = keys.len(), "Configuring SSH keys"); - - for (i, key) in keys.iter().enumerate() { - event!(Level::DEBUG, user = user, key_index = i, "Processing SSH key"); - // Process each key +use libazureinit_kvp::{Kvp, KvpOptions, DiagnosticEvent, ProvisioningReport}; +use chrono::Utc; + +fn provision_vm(vm_id: &str) -> Result<(), String> { + // ... actual provisioning logic ... + // Return Ok(()) on success, Err("reason") on failure + Ok(()) +} + +fn main() { + let vm_id = "00000000-0000-0000-0000-000000000001"; + + let kvp = match Kvp::with_options(KvpOptions::default().vm_id(vm_id)) { + Ok(k) => k, + Err(e) => { + eprintln!("KVP init failed (non-fatal): {e}"); + return; + } + }; + + // Signal that provisioning is in progress + let _ = kvp.diagnostics.emit(&DiagnosticEvent { + level: "INFO".into(), + name: "provision:start".into(), + span_id: "main".into(), + message: format!("Provisioning in progress for vm_id={vm_id}"), + timestamp: Utc::now(), + }); + + match provision_vm(vm_id) { + Ok(()) => { + let _ = kvp.diagnostics.emit(&DiagnosticEvent { + level: "INFO".into(), + name: "provision:complete".into(), + span_id: "main".into(), + message: "Provisioning completed successfully".into(), + timestamp: Utc::now(), + }); + let report = ProvisioningReport::success(vm_id); + let _ = report.write_to(&kvp.store); + } + Err(reason) => { + let _ = kvp.diagnostics.emit(&DiagnosticEvent { + level: "ERROR".into(), + name: "provision:failed".into(), + span_id: "main".into(), + message: format!("Provisioning failed: {reason}"), + timestamp: Utc::now(), + }); + let report = ProvisioningReport::error(vm_id, &reason); + let _ = report.write_to(&kvp.store); + } } - - event!(Level::INFO, user = user, "SSH keys configured successfully"); } ``` -## Reference Documentation +Note the use of `let _ =` for all KVP operations -- KVP errors are +non-fatal and should never block provisioning. This matches the +principle used throughout azure-init. + +### Using Tracing Instrumentation -For more details on how the Hyper-V Data Exchange Service works, refer to the official documentation: -[Hyper-V Data Exchange Service (KVP)](https://learn.microsoft.com/en-us/virtualization/hyper-v-on-windows/reference/integration-services#hyper-v-data-exchange-service-kvp) +`azure-init` uses `setup_layers` to register the KVP tracing layer. +Code instrumented with `#[instrument]` and `event!` automatically +emits KVP entries: + +```rust +use tracing::{event, instrument, Level}; + +#[instrument(fields(user_id = ?user.id))] +fn provision_user(user: &User) -> Result<(), Error> { + event!(Level::INFO, "Starting user provisioning"); + // ... provisioning logic ... + event!(Level::INFO, "User provisioning completed"); + Ok(()) +} +``` + +### Testing with InMemoryKvpStore + +```rust +use libazureinit_kvp::{Kvp, InMemoryKvpStore, ProvisioningReport}; + +let store = InMemoryKvpStore::default(); +let kvp = Kvp::from_store(store.clone(), "test-vm", "test-prefix"); + +let report = ProvisioningReport::success("test-vm"); +report.write_to(&kvp.store).unwrap(); + +let read_back = ProvisioningReport::read_from(&kvp.store).unwrap(); +assert!(read_back.is_some()); +``` + +## Reference Documentation -For OpenTelemetry integration details: -[OpenTelemetry for Rust](https://opentelemetry.io/docs/instrumentation/rust/) \ No newline at end of file +- [Hyper-V Data Exchange Service (KVP)](https://learn.microsoft.com/en-us/virtualization/hyper-v-on-windows/reference/integration-services#hyper-v-data-exchange-service-kvp) +- [OpenTelemetry for Rust](https://opentelemetry.io/docs/instrumentation/rust/) diff --git a/libazureinit-kvp/Cargo.toml b/libazureinit-kvp/Cargo.toml new file mode 100644 index 00000000..5accff7b --- /dev/null +++ b/libazureinit-kvp/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "libazureinit-kvp" +version = "0.1.0" +edition = "2021" +rust-version = "1.88" +repository = "https://github.com/Azure/azure-init/" +homepage = "https://github.com/Azure/azure-init/" +license = "MIT" +description = "Hyper-V KVP (Key-Value Pair) storage library for azure-init." + +[dependencies] +chrono = { version = "0.4", features = ["serde"] } +csv = "1" +fs2 = "0.4" +sysinfo = "0.38" +tracing = "0.1" +tracing-subscriber = "0.3" +uuid = { version = "1", features = ["v4"] } + +[dev-dependencies] +tempfile = "3" +tracing-test = { version = "0.2", features = ["no-env-filter"] } + +[lib] +name = "libazureinit_kvp" +path = "src/lib.rs" diff --git a/libazureinit-kvp/src/diagnostics.rs b/libazureinit-kvp/src/diagnostics.rs new file mode 100644 index 00000000..58e7516e --- /dev/null +++ b/libazureinit-kvp/src/diagnostics.rs @@ -0,0 +1,302 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Typed access to diagnostic key-value entries. +//! +//! Keys follow the `prefix|vm_id|level|name|span_id` convention. +//! Values exceeding the Azure platform's 1,022-byte read limit are +//! split across multiple records. + +use std::fmt; +use std::io; + +use chrono::{DateTime, Utc}; +use uuid::Uuid; + +use crate::KvpStore; + +/// The Azure platform reads at most 1,022 bytes from the value field +/// of each KVP record (UTF-16: 511 characters + null terminator). +/// Values longer than this must be split across multiple records. +pub const HV_KVP_AZURE_MAX_VALUE_SIZE: usize = 1022; + +/// A structured diagnostic event. +/// +/// Each field maps directly to a segment of the KVP key or value: +/// +/// - `level`: Severity level (e.g. "INFO", "WARN", "ERROR", "DEBUG"). +/// - `name`: Logical event name (e.g. "provision:user:create_user"). +/// - `span_id`: Unique identifier tying the event to a span/operation. +/// - `message`: Human-readable message / payload. +/// - `timestamp`: When the event occurred. +pub struct DiagnosticEvent { + pub level: String, + pub name: String, + pub span_id: String, + pub message: String, + pub timestamp: DateTime, +} + +impl DiagnosticEvent { + pub fn new( + level: impl Into, + name: impl Into, + message: impl Into, + ) -> Self { + Self { + level: level.into(), + name: name.into(), + span_id: Uuid::new_v4().to_string(), + message: message.into(), + timestamp: Utc::now(), + } + } +} + +impl fmt::Display for DiagnosticEvent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "[{}] {} ({}): {}", + self.level, self.name, self.span_id, self.message + ) + } +} + +/// Generates a unique event key by combining the event prefix, VM ID, +/// level, name, and span ID. +/// +/// Format: `prefix|vm_id|level|name|span_id` +pub fn generate_event_key( + event_prefix: &str, + vm_id: &str, + event_level: &str, + event_name: &str, + span_id: &str, +) -> String { + format!("{event_prefix}|{vm_id}|{event_level}|{event_name}|{span_id}") +} + +/// Typed diagnostic access layered on top of any `KvpStore`. +/// +/// Handles event key generation and value splitting. The Azure +/// platform only reads the first 1,022 bytes of the value field per +/// record, so values exceeding that limit are split across multiple +/// `store.write()` calls with the same key. +pub struct DiagnosticsKvp { + store: S, + vm_id: String, + event_prefix: String, +} + +impl DiagnosticsKvp { + pub fn new(store: S, vm_id: &str, event_prefix: &str) -> Self { + Self { + store, + vm_id: vm_id.to_string(), + event_prefix: event_prefix.to_string(), + } + } + + /// Write a diagnostic event to the store. + /// + /// The key is generated from the event's metadata using + /// [`generate_event_key`]. If the value exceeds 1,022 bytes it is + /// split across multiple records with the same key. + pub fn emit(&self, event: &DiagnosticEvent) -> io::Result<()> { + let key = generate_event_key( + &self.event_prefix, + &self.vm_id, + &event.level, + &event.name, + &event.span_id, + ); + + let value = &event.message; + + if value.len() <= HV_KVP_AZURE_MAX_VALUE_SIZE { + self.store.write(&key, value)?; + } else { + for chunk in value.as_bytes().chunks(HV_KVP_AZURE_MAX_VALUE_SIZE) { + let chunk_str = String::from_utf8_lossy(chunk); + self.store.write(&key, &chunk_str)?; + } + } + + Ok(()) + } + + /// Read all diagnostic entries from the store, parsed into + /// `DiagnosticEvent` structs. + /// + /// Only entries whose key matches the + /// `prefix|vm_id|level|name|span_id` pattern (with 5 pipe-separated + /// segments) are returned. + pub fn entries(&self) -> io::Result> { + let all = self.store.entries()?; + let mut events = Vec::new(); + + for (key, value) in all { + let parts: Vec<&str> = key.splitn(5, '|').collect(); + if parts.len() == 5 { + events.push(DiagnosticEvent { + level: parts[2].to_string(), + name: parts[3].to_string(), + span_id: parts[4].to_string(), + message: value, + timestamp: Utc::now(), + }); + } + } + + Ok(events) + } + + /// Access the underlying store. + pub fn store(&self) -> &S { + &self.store + } + + /// The VM ID this diagnostics instance is configured with. + pub fn vm_id(&self) -> &str { + &self.vm_id + } + + /// The event prefix this diagnostics instance is configured with. + pub fn event_prefix(&self) -> &str { + &self.event_prefix + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::InMemoryKvpStore; + + #[test] + fn test_generate_event_key() { + let key = generate_event_key( + "azure-init-0.1.1", + "vm-123", + "INFO", + "provision:user", + "span-abc", + ); + assert_eq!(key, "azure-init-0.1.1|vm-123|INFO|provision:user|span-abc"); + } + + #[test] + fn test_emit_short_value() { + let store = InMemoryKvpStore::default(); + let diag = DiagnosticsKvp::new(store.clone(), "vm-1", "prefix"); + + let event = DiagnosticEvent { + level: "INFO".to_string(), + name: "test_event".to_string(), + span_id: "span-1".to_string(), + message: "hello world".to_string(), + timestamp: Utc::now(), + }; + + diag.emit(&event).unwrap(); + + let key = "prefix|vm-1|INFO|test_event|span-1"; + assert_eq!(store.read(key).unwrap(), Some("hello world".to_string())); + } + + #[test] + fn test_emit_splits_long_value() { + let store = InMemoryKvpStore::default(); + let diag = DiagnosticsKvp::new(store.clone(), "vm-1", "prefix"); + + let long_message = "A".repeat(HV_KVP_AZURE_MAX_VALUE_SIZE * 2 + 50); + let event = DiagnosticEvent { + level: "DEBUG".to_string(), + name: "big_event".to_string(), + span_id: "span-2".to_string(), + message: long_message.clone(), + timestamp: Utc::now(), + }; + + diag.emit(&event).unwrap(); + + // With InMemoryKvpStore (HashMap), only the last chunk is + // retained since write overwrites. Verify the key exists and + // the stored value is at most one chunk long. + let key = "prefix|vm-1|DEBUG|big_event|span-2"; + let stored = store.read(key).unwrap().unwrap(); + assert!(stored.len() <= HV_KVP_AZURE_MAX_VALUE_SIZE); + } + + #[test] + fn test_emit_splits_long_value_on_hyperv_store() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let store = crate::HyperVKvpStore::new(tmp.path()); + let diag = DiagnosticsKvp::new(store, "vm-1", "prefix"); + + let long_message = "B".repeat(HV_KVP_AZURE_MAX_VALUE_SIZE * 3 + 10); + let event = DiagnosticEvent { + level: "INFO".to_string(), + name: "split_test".to_string(), + span_id: "span-3".to_string(), + message: long_message, + timestamp: Utc::now(), + }; + + diag.emit(&event).unwrap(); + + // HyperVKvpStore is append-only, so entries() returns all + // records including the split chunks. + let store2 = crate::HyperVKvpStore::new(tmp.path()); + let entries = store2.entries().unwrap(); + assert_eq!(entries.len(), 4); // ceil((1022*3+10) / 1022) = 4 + + let expected_key = "prefix|vm-1|INFO|split_test|span-3"; + for (k, v) in &entries { + assert_eq!(k, expected_key); + assert!(v.len() <= HV_KVP_AZURE_MAX_VALUE_SIZE); + } + } + + #[test] + fn test_entries_parses_diagnostic_keys() { + let store = InMemoryKvpStore::default(); + store + .write("prefix|vm-1|INFO|my_event|span-1", "msg1") + .unwrap(); + store + .write("prefix|vm-1|ERROR|other|span-2", "msg2") + .unwrap(); + // Non-diagnostic key should be skipped. + store + .write("PROVISIONING_REPORT", "result=success") + .unwrap(); + + let diag = DiagnosticsKvp::new(store, "vm-1", "prefix"); + let events = diag.entries().unwrap(); + + assert_eq!(events.len(), 2); + + let levels: Vec<&str> = + events.iter().map(|e| e.level.as_str()).collect(); + assert!(levels.contains(&"INFO")); + assert!(levels.contains(&"ERROR")); + } + + #[test] + fn test_emit_uses_new_helper() { + let store = InMemoryKvpStore::default(); + let diag = DiagnosticsKvp::new(store.clone(), "vm-1", "prefix"); + + let event = DiagnosticEvent::new("WARN", "test_op", "warning msg"); + diag.emit(&event).unwrap(); + + let entries = store.entries().unwrap(); + assert_eq!(entries.len(), 1); + + let (key, value) = &entries[0]; + assert!(key.contains("WARN")); + assert!(key.contains("test_op")); + assert_eq!(value, "warning msg"); + } +} diff --git a/libazureinit-kvp/src/hyperv.rs b/libazureinit-kvp/src/hyperv.rs new file mode 100644 index 00000000..eaf99a11 --- /dev/null +++ b/libazureinit-kvp/src/hyperv.rs @@ -0,0 +1,458 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Production `KvpStore` implementation backed by the Hyper-V binary +//! pool-file format with flock-based concurrency control. +//! +//! Each record is a fixed-size block of 2,560 bytes: 512 bytes for the +//! key and 2,048 bytes for the value. Unused space is zero-filled. + +use std::fs::{File, OpenOptions}; +use std::io::{self, ErrorKind, Read, Write}; +use std::os::unix::fs::MetadataExt; +use std::path::{Path, PathBuf}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use fs2::FileExt; +use sysinfo::System; + +use crate::KvpStore; + +pub const HV_KVP_EXCHANGE_MAX_KEY_SIZE: usize = 512; +pub const HV_KVP_EXCHANGE_MAX_VALUE_SIZE: usize = 2048; +pub const RECORD_SIZE: usize = + HV_KVP_EXCHANGE_MAX_KEY_SIZE + HV_KVP_EXCHANGE_MAX_VALUE_SIZE; + +/// Hyper-V KVP pool-file store. +/// +/// Reads and writes the binary Hyper-V KVP format: fixed-size records +/// of [`RECORD_SIZE`] bytes (512-byte key + 2,048-byte value) with +/// flock-based concurrency control. +#[derive(Clone)] +pub struct HyperVKvpStore { + path: PathBuf, +} + +impl HyperVKvpStore { + /// Open (or create) the pool file at the given path. + pub fn new(path: impl Into) -> Self { + Self { path: path.into() } + } + + /// Return a reference to the pool file path. + pub fn path(&self) -> &Path { + &self.path + } + + /// Truncate the file when its mtime predates the current boot + /// (stale-data guard). + /// + /// An exclusive `flock` is held while checking metadata and + /// truncating so that concurrent processes don't race on the same + /// check-then-truncate sequence. If the lock cannot be acquired + /// immediately (another client holds it), the call returns `Ok(())` + /// without blocking. + pub fn truncate_if_stale(&self) -> io::Result<()> { + let boot_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|e| io::Error::other(format!("clock error: {e}")))? + .as_secs() + - get_uptime().as_secs(); + + let file = + match OpenOptions::new().read(true).write(true).open(&self.path) { + Ok(f) => f, + Err(ref e) if e.kind() == ErrorKind::NotFound => { + return Ok(()); + } + Err(e) => return Err(e), + }; + + if FileExt::try_lock_exclusive(&file).is_err() { + return Ok(()); + } + + let result = (|| -> io::Result<()> { + let metadata = file.metadata()?; + if metadata.mtime() < boot_time as i64 { + file.set_len(0)?; + } + Ok(()) + })(); + + let _ = FileExt::unlock(&file); + result + } + + fn open_for_append(&self) -> io::Result { + OpenOptions::new() + .append(true) + .create(true) + .open(&self.path) + } + + fn open_for_read(&self) -> io::Result { + OpenOptions::new().read(true).open(&self.path) + } + + fn open_for_read_write(&self) -> io::Result { + OpenOptions::new().read(true).write(true).open(&self.path) + } +} + +/// Encode a key-value pair into a single fixed-size record. +/// +/// The key is truncated (if necessary) and zero-padded to 512 bytes. +/// The value is truncated (if necessary) and zero-padded to 2,048 +/// bytes. +pub fn encode_record(key: &str, value: &str) -> Vec { + let mut buf = vec![0u8; RECORD_SIZE]; + + let key_bytes = key.as_bytes(); + let key_len = key_bytes.len().min(HV_KVP_EXCHANGE_MAX_KEY_SIZE); + buf[..key_len].copy_from_slice(&key_bytes[..key_len]); + + let val_bytes = value.as_bytes(); + let val_len = val_bytes.len().min(HV_KVP_EXCHANGE_MAX_VALUE_SIZE); + buf[HV_KVP_EXCHANGE_MAX_KEY_SIZE..HV_KVP_EXCHANGE_MAX_KEY_SIZE + val_len] + .copy_from_slice(&val_bytes[..val_len]); + + buf +} + +/// Decode a fixed-size record into its key and value strings. +/// +/// Trailing null bytes are stripped from both fields. +pub fn decode_record(data: &[u8]) -> io::Result<(String, String)> { + if data.len() != RECORD_SIZE { + return Err(io::Error::other(format!( + "record size mismatch: expected {RECORD_SIZE}, got {}", + data.len() + ))); + } + + let key = String::from_utf8(data[..HV_KVP_EXCHANGE_MAX_KEY_SIZE].to_vec()) + .unwrap_or_default() + .trim_end_matches('\0') + .to_string(); + + let value = + String::from_utf8(data[HV_KVP_EXCHANGE_MAX_KEY_SIZE..].to_vec()) + .unwrap_or_default() + .trim_end_matches('\0') + .to_string(); + + Ok((key, value)) +} + +/// Read all records from a file that is already open and locked. +fn read_all_records(file: &mut File) -> io::Result> { + let mut contents = Vec::new(); + file.read_to_end(&mut contents)?; + + if contents.is_empty() { + return Ok(Vec::new()); + } + + if contents.len() % RECORD_SIZE != 0 { + return Err(io::Error::other(format!( + "file size ({}) is not a multiple of record size ({RECORD_SIZE})", + contents.len() + ))); + } + + contents + .chunks_exact(RECORD_SIZE) + .map(decode_record) + .collect() +} + +impl KvpStore for HyperVKvpStore { + /// Append one fixed-size record to the pool file. + /// + /// Acquires an exclusive flock, writes the record, flushes, and + /// releases the lock. + fn write(&self, key: &str, value: &str) -> io::Result<()> { + let mut file = self.open_for_append()?; + let record = encode_record(key, value); + + FileExt::lock_exclusive(&file).map_err(|e| { + io::Error::other(format!("failed to lock KVP file: {e}")) + })?; + + let write_result = file.write_all(&record).and_then(|_| file.flush()); + + let unlock_result = FileExt::unlock(&file).map_err(|e| { + io::Error::other(format!("failed to unlock KVP file: {e}")) + }); + + if let Err(err) = write_result { + let _ = unlock_result; + return Err(err); + } + unlock_result + } + + /// Scan all records and return the value of the last record + /// matching `key` (append-only semantics). + fn read(&self, key: &str) -> io::Result> { + let mut file = match self.open_for_read() { + Ok(f) => f, + Err(ref e) if e.kind() == ErrorKind::NotFound => { + return Ok(None); + } + Err(e) => return Err(e), + }; + + FileExt::lock_shared(&file).map_err(|e| { + io::Error::other(format!("failed to lock KVP file: {e}")) + })?; + + let records = read_all_records(&mut file); + let _ = FileExt::unlock(&file); + let records = records?; + + Ok(records + .into_iter() + .rev() + .find(|(k, _)| k == key) + .map(|(_, v)| v)) + } + + /// Return every record in the pool file, including duplicates. + fn entries(&self) -> io::Result> { + let mut file = match self.open_for_read() { + Ok(f) => f, + Err(ref e) if e.kind() == ErrorKind::NotFound => { + return Ok(Vec::new()); + } + Err(e) => return Err(e), + }; + + FileExt::lock_shared(&file).map_err(|e| { + io::Error::other(format!("failed to lock KVP file: {e}")) + })?; + + let records = read_all_records(&mut file); + let _ = FileExt::unlock(&file); + records + } + + /// Rewrite the pool file without the record(s) matching `key`. + /// + /// Returns `true` if at least one record was removed. + fn delete(&self, key: &str) -> io::Result { + let mut file = match self.open_for_read_write() { + Ok(f) => f, + Err(ref e) if e.kind() == ErrorKind::NotFound => { + return Ok(false); + } + Err(e) => return Err(e), + }; + + FileExt::lock_exclusive(&file).map_err(|e| { + io::Error::other(format!("failed to lock KVP file: {e}")) + })?; + + let result = (|| -> io::Result { + let records = read_all_records(&mut file)?; + let original_count = records.len(); + let kept: Vec<_> = + records.into_iter().filter(|(k, _)| k != key).collect(); + + if kept.len() == original_count { + return Ok(false); + } + + file.set_len(0)?; + // Seek to start after truncation -- set_len doesn't move + // the cursor on all platforms, but with the append flag off + // the next write goes to the current position. Reopen via + // write to position at 0 by truncating. + use std::io::Seek; + file.seek(std::io::SeekFrom::Start(0))?; + + for (k, v) in &kept { + file.write_all(&encode_record(k, v))?; + } + file.flush()?; + Ok(true) + })(); + + let _ = FileExt::unlock(&file); + result + } +} + +fn get_uptime() -> Duration { + let mut system = System::new(); + system.refresh_memory(); + system.refresh_cpu_usage(); + Duration::from_secs(System::uptime()) +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::NamedTempFile; + + #[test] + fn test_encode_decode_roundtrip() { + let key = "test_key"; + let value = "test_value"; + let record = encode_record(key, value); + + assert_eq!(record.len(), RECORD_SIZE); + + let (decoded_key, decoded_value) = + decode_record(&record).expect("decode failed"); + assert_eq!(decoded_key, key); + assert_eq!(decoded_value, value); + } + + #[test] + fn test_encode_truncates_long_key() { + let key = "K".repeat(HV_KVP_EXCHANGE_MAX_KEY_SIZE + 100); + let record = encode_record(&key, "v"); + let (decoded_key, _) = decode_record(&record).expect("decode failed"); + assert_eq!(decoded_key.len(), HV_KVP_EXCHANGE_MAX_KEY_SIZE); + } + + #[test] + fn test_encode_truncates_long_value() { + let value = "V".repeat(HV_KVP_EXCHANGE_MAX_VALUE_SIZE + 100); + let record = encode_record("k", &value); + let (_, decoded_value) = decode_record(&record).expect("decode failed"); + assert_eq!(decoded_value.len(), HV_KVP_EXCHANGE_MAX_VALUE_SIZE); + } + + #[test] + fn test_write_and_read() { + let tmp = NamedTempFile::new().unwrap(); + let store = HyperVKvpStore::new(tmp.path()); + + store.write("key1", "value1").unwrap(); + store.write("key2", "value2").unwrap(); + + assert_eq!(store.read("key1").unwrap(), Some("value1".to_string())); + assert_eq!(store.read("key2").unwrap(), Some("value2".to_string())); + assert_eq!(store.read("nonexistent").unwrap(), None); + } + + #[test] + fn test_read_returns_last_match() { + let tmp = NamedTempFile::new().unwrap(); + let store = HyperVKvpStore::new(tmp.path()); + + store.write("key", "first").unwrap(); + store.write("key", "second").unwrap(); + store.write("key", "third").unwrap(); + + assert_eq!(store.read("key").unwrap(), Some("third".to_string())); + } + + #[test] + fn test_entries_includes_duplicates() { + let tmp = NamedTempFile::new().unwrap(); + let store = HyperVKvpStore::new(tmp.path()); + + store.write("key", "v1").unwrap(); + store.write("key", "v2").unwrap(); + store.write("other", "v3").unwrap(); + + let entries = store.entries().unwrap(); + assert_eq!(entries.len(), 3); + assert_eq!(entries[0], ("key".to_string(), "v1".to_string())); + assert_eq!(entries[1], ("key".to_string(), "v2".to_string())); + assert_eq!(entries[2], ("other".to_string(), "v3".to_string())); + } + + #[test] + fn test_delete_removes_all_matches() { + let tmp = NamedTempFile::new().unwrap(); + let store = HyperVKvpStore::new(tmp.path()); + + store.write("key", "v1").unwrap(); + store.write("key", "v2").unwrap(); + store.write("other", "v3").unwrap(); + + assert!(store.delete("key").unwrap()); + assert_eq!(store.read("key").unwrap(), None); + assert_eq!(store.read("other").unwrap(), Some("v3".to_string())); + + let entries = store.entries().unwrap(); + assert_eq!(entries.len(), 1); + } + + #[test] + fn test_delete_nonexistent_returns_false() { + let tmp = NamedTempFile::new().unwrap(); + let store = HyperVKvpStore::new(tmp.path()); + + store.write("key", "value").unwrap(); + assert!(!store.delete("nonexistent").unwrap()); + } + + #[test] + fn test_read_missing_file_returns_none() { + let store = HyperVKvpStore::new("/tmp/nonexistent_kvp_pool_test"); + assert_eq!(store.read("key").unwrap(), None); + } + + #[test] + fn test_entries_missing_file_returns_empty() { + let store = HyperVKvpStore::new("/tmp/nonexistent_kvp_pool_test"); + assert!(store.entries().unwrap().is_empty()); + } + + #[test] + fn test_record_size_consistency() { + let record = encode_record("k", "v"); + assert_eq!(record.len(), RECORD_SIZE); + assert_eq!(RECORD_SIZE, 2560); + } + + #[test] + fn test_multi_thread_concurrent_writes() { + let tmp = NamedTempFile::new().unwrap(); + let path = tmp.path().to_path_buf(); + + let num_threads: usize = 20; + let iterations: usize = 1_000; + + let handles: Vec<_> = (0..num_threads) + .map(|tid| { + let p = path.clone(); + std::thread::spawn(move || { + let store = HyperVKvpStore::new(&p); + for i in 0..iterations { + let key = format!("thread-{tid}-iter-{i}"); + let value = format!("value-{tid}-{i}"); + store.write(&key, &value).expect("write failed"); + } + }) + }) + .collect(); + + for h in handles { + h.join().expect("thread panicked"); + } + + let store = HyperVKvpStore::new(&path); + let entries = store.entries().unwrap(); + assert_eq!(entries.len(), num_threads * iterations); + } + + #[test] + fn test_provisioning_report_via_store() { + let tmp = NamedTempFile::new().unwrap(); + let store = HyperVKvpStore::new(tmp.path()); + + store + .write("PROVISIONING_REPORT", "result=success|agent=test") + .unwrap(); + + let value = store.read("PROVISIONING_REPORT").unwrap(); + assert_eq!(value, Some("result=success|agent=test".to_string())); + } +} diff --git a/libazureinit-kvp/src/lib.rs b/libazureinit-kvp/src/lib.rs new file mode 100644 index 00000000..d12f4233 --- /dev/null +++ b/libazureinit-kvp/src/lib.rs @@ -0,0 +1,209 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! # libazureinit-kvp +//! +//! A layered library for Hyper-V KVP (Key-Value Pair) storage. +//! +//! The library is organized into four layers: +//! +//! - **Layer 0 (`KvpStore` trait):** Core storage abstraction with +//! `write`, `read`, `entries`, and `delete` operations. +//! - [`HyperVKvpStore`]: Production implementation using the binary +//! Hyper-V pool-file format with flock-based concurrency. +//! - [`InMemoryKvpStore`]: HashMap-backed test double. +//! +//! - **Layer 1 (`DiagnosticsKvp`):** Typed access to diagnostic +//! key-value entries with value splitting for the Azure platform's +//! 1,022-byte read limit. +//! +//! - **Layer 2 (`TracingKvpLayer`):** A `tracing_subscriber::Layer` +//! that translates span and event data into diagnostic KVP entries. +//! +//! - **Layer 3 (`ProvisioningReport`):** Typed accessor for the +//! `PROVISIONING_REPORT` KVP key used by the Azure platform. + +use std::io; +use std::path::{Path, PathBuf}; + +pub mod diagnostics; +pub mod hyperv; +pub mod memory; +pub mod provisioning; +pub mod tracing_layer; + +pub use diagnostics::{DiagnosticEvent, DiagnosticsKvp}; +pub use hyperv::HyperVKvpStore; +pub use memory::InMemoryKvpStore; +pub use provisioning::ProvisioningReport; +pub use tracing_layer::TracingKvpLayer; + +/// The default event prefix used when no custom prefix is provided. +pub const EVENT_PREFIX: &str = + concat!("azure-init-", env!("CARGO_PKG_VERSION")); + +const DEFAULT_KVP_FILE_PATH: &str = "/var/lib/hyperv/.kvp_pool_1"; + +/// The fundamental storage abstraction for KVP. +/// +/// Implementations handle encoding, persistence, and concurrency +/// internally. Higher layers build on this trait without knowledge of +/// the underlying storage mechanism. +pub trait KvpStore: Send + Sync { + /// Write a key-value pair into the store. + fn write(&self, key: &str, value: &str) -> io::Result<()>; + + /// Read the value for a given key, returning `None` if absent. + fn read(&self, key: &str) -> io::Result>; + + /// Return all key-value pairs currently in the store. + fn entries(&self) -> io::Result>; + + /// Remove a key. Returns `true` if the key existed. + fn delete(&self, key: &str) -> io::Result; +} + +/// Configuration options for creating a [`Kvp`] client. +#[derive(Clone, Debug)] +pub struct KvpOptions { + pub vm_id: Option, + pub event_prefix: Option, + pub file_path: PathBuf, + pub truncate_on_start: bool, +} + +impl Default for KvpOptions { + fn default() -> Self { + Self { + vm_id: None, + event_prefix: None, + file_path: PathBuf::from(DEFAULT_KVP_FILE_PATH), + truncate_on_start: true, + } + } +} + +impl KvpOptions { + pub fn vm_id>(mut self, vm_id: T) -> Self { + self.vm_id = Some(vm_id.into()); + self + } + + pub fn event_prefix>(mut self, event_prefix: T) -> Self { + self.event_prefix = Some(event_prefix.into()); + self + } + + pub fn file_path>(mut self, file_path: T) -> Self { + self.file_path = file_path.as_ref().to_path_buf(); + self + } + + pub fn truncate_on_start(mut self, truncate_on_start: bool) -> Self { + self.truncate_on_start = truncate_on_start; + self + } +} + +/// Top-level KVP client that wires together all layers. +/// +/// Callers interact with the layers through the public fields: +/// - `store`: raw key-value access (`write`, `read`, `entries`) +/// - `diagnostics`: typed diagnostic event emission +/// - `tracing_layer`: tracing subscriber layer for automatic KVP emission +pub struct Kvp { + pub store: S, + pub diagnostics: DiagnosticsKvp, + pub tracing_layer: TracingKvpLayer, +} + +impl Kvp { + /// Production constructor from explicit options. + /// + /// `options.vm_id` must be `Some` -- the caller (typically + /// `logging.rs`) is responsible for resolving the VM ID before + /// calling this. + pub fn with_options(options: KvpOptions) -> io::Result { + let vm_id = options.vm_id.ok_or_else(|| { + io::Error::other("vm_id is required in KvpOptions") + })?; + let event_prefix = options + .event_prefix + .unwrap_or_else(|| EVENT_PREFIX.to_string()); + + let store = HyperVKvpStore::new(&options.file_path); + + if options.truncate_on_start { + store.truncate_if_stale()?; + } + + Ok(Self::from_store(store, &vm_id, &event_prefix)) + } +} + +impl Kvp { + /// Construct a `Kvp` client from any `KvpStore` implementation. + pub fn from_store(store: S, vm_id: &str, event_prefix: &str) -> Self { + let diagnostics = + DiagnosticsKvp::new(store.clone(), vm_id, event_prefix); + let tracing_layer = TracingKvpLayer::new(DiagnosticsKvp::new( + store.clone(), + vm_id, + event_prefix, + )); + Self { + store, + diagnostics, + tracing_layer, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_from_store_wires_layers() { + let store = InMemoryKvpStore::default(); + let kvp = Kvp::from_store(store.clone(), "test-vm", "test-prefix"); + + kvp.diagnostics + .emit(&diagnostics::DiagnosticEvent { + level: "INFO".into(), + name: "test::event".into(), + span_id: "span-1".into(), + message: "hello".into(), + timestamp: chrono::Utc::now(), + }) + .unwrap(); + + let entries = store.entries().unwrap(); + assert!(!entries.is_empty()); + assert!(entries.iter().any(|(k, _)| k.contains("test-prefix"))); + } + + #[test] + fn test_provisioning_report_through_kvp_client() { + let store = InMemoryKvpStore::default(); + let kvp = Kvp::from_store(store, "vm-123", "prefix"); + + let report = ProvisioningReport::success("vm-123"); + report.write_to(&kvp.store).unwrap(); + + let read_back = ProvisioningReport::read_from(&kvp.store).unwrap(); + assert!(read_back.is_some()); + let read_back = read_back.unwrap(); + assert_eq!(read_back.result, "success"); + assert_eq!(read_back.vm_id, "vm-123"); + } + + #[test] + fn test_with_options_requires_vm_id() { + let opts = KvpOptions::default(); + let result = Kvp::with_options(opts); + assert!(result.is_err()); + let err = result.err().unwrap(); + assert!(err.to_string().contains("vm_id")); + } +} diff --git a/libazureinit-kvp/src/memory.rs b/libazureinit-kvp/src/memory.rs new file mode 100644 index 00000000..6b1be2ae --- /dev/null +++ b/libazureinit-kvp/src/memory.rs @@ -0,0 +1,99 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! In-memory `KvpStore` implementation for testing. +//! +//! `InMemoryKvpStore` is a HashMap-backed test double with no +//! filesystem access. It implements `KvpStore` so that any higher +//! layer (`DiagnosticsKvp`, `ProvisioningReport`, `TracingKvpLayer`) +//! can be tested without binary encoding, flock, or tempfiles. + +use std::collections::HashMap; +use std::io; +use std::sync::{Arc, Mutex}; + +use crate::KvpStore; + +/// A HashMap-backed KVP store with no filesystem access. +/// +/// Thread-safe via `Arc>`. Drop-in replacement for any layer +/// in unit and integration tests. +#[derive(Default, Clone)] +pub struct InMemoryKvpStore { + inner: Arc>>, +} + +impl KvpStore for InMemoryKvpStore { + fn write(&self, key: &str, value: &str) -> io::Result<()> { + let mut map = self.inner.lock().unwrap(); + map.insert(key.to_string(), value.to_string()); + Ok(()) + } + + fn read(&self, key: &str) -> io::Result> { + let map = self.inner.lock().unwrap(); + Ok(map.get(key).cloned()) + } + + fn entries(&self) -> io::Result> { + let map = self.inner.lock().unwrap(); + Ok(map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()) + } + + fn delete(&self, key: &str) -> io::Result { + let mut map = self.inner.lock().unwrap(); + Ok(map.remove(key).is_some()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_write_and_read() { + let store = InMemoryKvpStore::default(); + store.write("key1", "value1").unwrap(); + assert_eq!(store.read("key1").unwrap(), Some("value1".to_string())); + assert_eq!(store.read("missing").unwrap(), None); + } + + #[test] + fn test_write_overwrites() { + let store = InMemoryKvpStore::default(); + store.write("key", "first").unwrap(); + store.write("key", "second").unwrap(); + assert_eq!(store.read("key").unwrap(), Some("second".to_string())); + } + + #[test] + fn test_entries() { + let store = InMemoryKvpStore::default(); + store.write("a", "1").unwrap(); + store.write("b", "2").unwrap(); + + let entries = store.entries().unwrap(); + assert_eq!(entries.len(), 2); + assert!(entries.contains(&("a".to_string(), "1".to_string()))); + assert!(entries.contains(&("b".to_string(), "2".to_string()))); + } + + #[test] + fn test_delete() { + let store = InMemoryKvpStore::default(); + store.write("key", "value").unwrap(); + + assert!(store.delete("key").unwrap()); + assert_eq!(store.read("key").unwrap(), None); + assert!(!store.delete("key").unwrap()); + } + + #[test] + fn test_clone_shares_state() { + let store = InMemoryKvpStore::default(); + let clone = store.clone(); + + store.write("key", "value").unwrap(); + assert_eq!(clone.read("key").unwrap(), Some("value".to_string())); + } +} diff --git a/libazureinit-kvp/src/provisioning.rs b/libazureinit-kvp/src/provisioning.rs new file mode 100644 index 00000000..5d13487d --- /dev/null +++ b/libazureinit-kvp/src/provisioning.rs @@ -0,0 +1,312 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Typed accessor for the `PROVISIONING_REPORT` KVP key. +//! +//! The Azure platform reads this key to determine provisioning +//! outcome. The value is a pipe-delimited string of `key=value` +//! segments produced by the `csv` crate for quoting compatibility. + +use std::fmt; +use std::io; + +use chrono::{DateTime, Utc}; + +use crate::KvpStore; + +const DEFAULT_AGENT: &str = concat!("Azure-Init/", env!("CARGO_PKG_VERSION")); + +/// A structured provisioning report entry. +/// +/// - `result`: Outcome string (e.g. "success", "error"). +/// - `agent`: Agent identifier (e.g. "Azure-Init/0.1.1"). +/// - `pps_type`: PPS type (e.g. "None"). +/// - `vm_id`: VM identifier. +/// - `timestamp`: When the report was generated. +/// - `extra`: Optional additional key-value pairs (e.g. origin, +/// reason, documentation_url). +pub struct ProvisioningReport { + pub result: String, + pub agent: String, + pub pps_type: String, + pub vm_id: String, + pub timestamp: DateTime, + pub extra: Vec<(String, String)>, +} + +impl ProvisioningReport { + /// Create a success report. + /// + /// Sets `result` to `"success"`, `pps_type` to `"None"`, and + /// `agent` to the crate default. Override any field after + /// construction if needed (all fields are public). + pub fn success(vm_id: &str) -> Self { + Self { + result: "success".to_string(), + agent: DEFAULT_AGENT.to_string(), + pps_type: "None".to_string(), + vm_id: vm_id.to_string(), + timestamp: Utc::now(), + extra: Vec::new(), + } + } + + /// Create a failure/error report. + /// + /// Sets `result` to `"error"` and stores `reason` in the `extra` + /// field. + pub fn error(vm_id: &str, reason: &str) -> Self { + Self { + result: "error".to_string(), + agent: DEFAULT_AGENT.to_string(), + pps_type: "None".to_string(), + vm_id: vm_id.to_string(), + timestamp: Utc::now(), + extra: vec![("reason".to_string(), reason.to_string())], + } + } + + /// Encode as a pipe-delimited string for KVP storage. + /// + /// Produces the same wire format as the existing `encode_report()` + /// in `health.rs`: each segment is a `key=value` string, joined + /// by `|` via the `csv` crate with `QuoteStyle::Necessary`. + pub fn encode(&self) -> String { + let mut fields = vec![ + format!("result={}", self.result), + format!("agent={}", self.agent), + format!("pps_type={}", self.pps_type), + format!("vm_id={}", self.vm_id), + format!("timestamp={}", self.timestamp.to_rfc3339()), + ]; + for (k, v) in &self.extra { + fields.push(format!("{k}={v}")); + } + encode_report(&fields) + } + + /// Parse a pipe-delimited string back into a report. + pub fn decode(s: &str) -> io::Result { + let segments = decode_report(s)?; + + let mut result = None; + let mut agent = None; + let mut pps_type = None; + let mut vm_id = None; + let mut timestamp = None; + let mut extra = Vec::new(); + + for segment in &segments { + if let Some((key, value)) = segment.split_once('=') { + match key { + "result" => result = Some(value.to_string()), + "agent" => agent = Some(value.to_string()), + "pps_type" => pps_type = Some(value.to_string()), + "vm_id" => vm_id = Some(value.to_string()), + "timestamp" => { + timestamp = Some( + DateTime::parse_from_rfc3339(value) + .map(|dt| dt.with_timezone(&Utc)) + .map_err(|e| { + io::Error::other(format!( + "invalid timestamp: {e}" + )) + })?, + ); + } + _ => { + extra.push((key.to_string(), value.to_string())); + } + } + } + } + + Ok(Self { + result: result.unwrap_or_default(), + agent: agent.unwrap_or_default(), + pps_type: pps_type.unwrap_or_default(), + vm_id: vm_id.unwrap_or_default(), + timestamp: timestamp.unwrap_or_else(Utc::now), + extra, + }) + } + + /// Write this report to the store (key = `"PROVISIONING_REPORT"`). + pub fn write_to(&self, store: &impl KvpStore) -> io::Result<()> { + store.write("PROVISIONING_REPORT", &self.encode()) + } + + /// Read and parse a provisioning report from the store, if present. + pub fn read_from(store: &impl KvpStore) -> io::Result> { + store + .read("PROVISIONING_REPORT") + .map(|opt| opt.and_then(|s| Self::decode(&s).ok())) + } +} + +impl fmt::Display for ProvisioningReport { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.encode()) + } +} + +/// Encode a slice of strings as a single pipe-delimited record using +/// the `csv` crate, matching the existing `encode_report()` wire +/// format. +fn encode_report(fields: &[String]) -> String { + let mut wtr = csv::WriterBuilder::new() + .delimiter(b'|') + .quote_style(csv::QuoteStyle::Necessary) + .from_writer(vec![]); + wtr.write_record(fields).expect("CSV write failed"); + let mut bytes = wtr.into_inner().unwrap(); + if let Some(b'\n') = bytes.last() { + bytes.pop(); + } + if let Some(b'\r') = bytes.last() { + bytes.pop(); + } + String::from_utf8(bytes).expect("CSV was not utf-8") +} + +/// Decode a pipe-delimited record back into individual segments. +fn decode_report(s: &str) -> io::Result> { + let mut rdr = csv::ReaderBuilder::new() + .delimiter(b'|') + .has_headers(false) + .from_reader(s.as_bytes()); + + let record = rdr.records().next().ok_or_else(|| { + io::Error::other("empty provisioning report string") + })??; + + Ok(record.iter().map(|s| s.to_string()).collect()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::InMemoryKvpStore; + + #[test] + fn test_success_report_encode() { + let report = ProvisioningReport::success("vm-123"); + let encoded = report.encode(); + + assert!(encoded.contains("result=success")); + assert!(encoded.contains("agent=Azure-Init/")); + assert!(encoded.contains("pps_type=None")); + assert!(encoded.contains("vm_id=vm-123")); + assert!(encoded.contains("timestamp=")); + assert!(encoded.contains("|")); + assert!(!encoded.contains('\n')); + } + + #[test] + fn test_error_report_encode() { + let report = ProvisioningReport::error("vm-456", "disk full"); + let encoded = report.encode(); + + assert!(encoded.contains("result=error")); + assert!(encoded.contains("vm_id=vm-456")); + assert!(encoded.contains("reason=disk full")); + } + + #[test] + fn test_encode_decode_roundtrip() { + let mut report = ProvisioningReport::success("vm-abc"); + report + .extra + .push(("origin".to_string(), "test".to_string())); + + let encoded = report.encode(); + let decoded = + ProvisioningReport::decode(&encoded).expect("decode failed"); + + assert_eq!(decoded.result, "success"); + assert_eq!(decoded.vm_id, "vm-abc"); + assert_eq!(decoded.pps_type, "None"); + assert_eq!( + decoded.extra, + vec![("origin".to_string(), "test".to_string())] + ); + } + + #[test] + fn test_error_roundtrip() { + let report = ProvisioningReport::error("vm-err", "timeout"); + let encoded = report.encode(); + let decoded = + ProvisioningReport::decode(&encoded).expect("decode failed"); + + assert_eq!(decoded.result, "error"); + assert_eq!(decoded.vm_id, "vm-err"); + assert_eq!( + decoded.extra, + vec![("reason".to_string(), "timeout".to_string())] + ); + } + + #[test] + fn test_write_to_and_read_from_store() { + let store = InMemoryKvpStore::default(); + + let report = ProvisioningReport::success("vm-store"); + report.write_to(&store).unwrap(); + + let read_back = ProvisioningReport::read_from(&store).unwrap().unwrap(); + assert_eq!(read_back.result, "success"); + assert_eq!(read_back.vm_id, "vm-store"); + } + + #[test] + fn test_read_from_empty_store() { + let store = InMemoryKvpStore::default(); + let result = ProvisioningReport::read_from(&store).unwrap(); + assert!(result.is_none()); + } + + #[test] + fn test_display_delegates_to_encode() { + let report = ProvisioningReport::success("vm-display"); + assert_eq!(format!("{report}"), report.encode()); + } + + #[test] + fn test_extra_fields_preserved() { + let mut report = ProvisioningReport::error("vm-x", "reason1"); + report.extra.push(( + "documentation_url".to_string(), + "https://example.com".to_string(), + )); + + let encoded = report.encode(); + let decoded = + ProvisioningReport::decode(&encoded).expect("decode failed"); + + assert_eq!(decoded.extra.len(), 2); + assert!(decoded + .extra + .iter() + .any(|(k, v)| k == "reason" && v == "reason1")); + assert!(decoded.extra.iter().any( + |(k, v)| k == "documentation_url" && v == "https://example.com" + )); + } + + #[test] + fn test_wire_format_matches_existing_encode_report() { + // Verify our encoding matches what health.rs encode_report() + // produces: pipe-delimited, csv-quoted, no trailing newline. + let fields = vec![ + "result=success".to_string(), + "agent=Azure-Init/0.1.0".to_string(), + "pps_type=None".to_string(), + ]; + let encoded = encode_report(&fields); + assert_eq!( + encoded, + "result=success|agent=Azure-Init/0.1.0|pps_type=None" + ); + } +} diff --git a/libazureinit-kvp/src/tracing_layer.rs b/libazureinit-kvp/src/tracing_layer.rs new file mode 100644 index 00000000..b53e9505 --- /dev/null +++ b/libazureinit-kvp/src/tracing_layer.rs @@ -0,0 +1,387 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! A `tracing_subscriber::Layer` that translates span and event data +//! into diagnostic KVP entries via `DiagnosticsKvp`. + +use std::fmt::{self, Debug, Write as _}; +use std::time::{Instant, SystemTime, UNIX_EPOCH}; + +use chrono::{DateTime, Utc}; +use tracing::field::Visit; +use tracing::span::{Attributes, Id}; +use tracing::Subscriber; +use tracing_subscriber::layer::Context as TracingContext; +use tracing_subscriber::registry::LookupSpan; +use tracing_subscriber::Layer; +use uuid::Uuid; + +use crate::diagnostics::{DiagnosticEvent, DiagnosticsKvp}; +use crate::KvpStore; + +const HV_KVP_AZURE_MAX_VALUE_SIZE: usize = 1022; + +/// A wrapper around `std::time::Instant` for time tracking in spans. +/// +/// Stored as a span extension to measure elapsed time between +/// `on_new_span` and `on_close`. +#[derive(Clone)] +struct MyInstant(Instant); + +impl std::ops::Deref for MyInstant { + type Target = Instant; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl MyInstant { + fn now() -> Self { + MyInstant(Instant::now()) + } +} + +/// A visitor that captures all fields from a tracing event into a +/// single string with `field=value` pairs separated by commas. +pub struct StringVisitor<'a> { + string: &'a mut String, +} + +impl Visit for StringVisitor<'_> { + fn record_debug( + &mut self, + field: &tracing::field::Field, + value: &dyn Debug, + ) { + if !self.string.is_empty() { + self.string.push_str(", "); + } + write!(self.string, "{}={:?}", field.name(), value) + .expect("Writing to a string should never fail"); + } +} + +/// Given a span's metadata, constructs a span name in the format +/// `module:function`. +/// +/// Strips common crate prefixes (`libazureinit::`, `azure_init::`) +/// so that span names are concise. +fn format_span_name(metadata: &tracing::Metadata<'_>) -> String { + let target = metadata.target(); + let name = metadata.name(); + + let module_path = target + .strip_prefix("libazureinit::") + .or_else(|| target.strip_prefix("azure_init::")) + .unwrap_or(target); + + if module_path.is_empty() || module_path == target && target == name { + name.to_string() + } else { + format!("{module_path}:{name}") + } +} + +/// Tracing subscriber layer that emits KVP diagnostic entries. +/// +/// Translates span open/close and event occurrences into +/// `DiagnosticsKvp::emit()` calls. For events with a `health_report` +/// field, the report string is written directly to the store as a +/// `PROVISIONING_REPORT` entry. +pub struct TracingKvpLayer { + diagnostics: DiagnosticsKvp, +} + +impl TracingKvpLayer { + pub fn new(diagnostics: DiagnosticsKvp) -> Self { + Self { diagnostics } + } +} + +impl Layer for TracingKvpLayer +where + S: KvpStore + 'static, + Sub: Subscriber + for<'lookup> LookupSpan<'lookup>, +{ + fn on_event( + &self, + event: &tracing::Event<'_>, + ctx: TracingContext<'_, Sub>, + ) { + // Check for health_report events first -- these exist outside + // of a span and are written directly as PROVISIONING_REPORT. + let mut health_report = None; + event.record(&mut |field: &tracing::field::Field, + value: &dyn fmt::Debug| { + if field.name() == "health_report" { + health_report = + Some(format!("{value:?}").trim_matches('"').to_string()); + } + }); + + if let Some(report_str) = health_report { + if report_str.len() <= HV_KVP_AZURE_MAX_VALUE_SIZE { + let _ = self + .diagnostics + .store() + .write("PROVISIONING_REPORT", &report_str); + } else { + for chunk in + report_str.as_bytes().chunks(HV_KVP_AZURE_MAX_VALUE_SIZE) + { + let chunk_str = String::from_utf8_lossy(chunk); + let _ = self + .diagnostics + .store() + .write("PROVISIONING_REPORT", &chunk_str); + } + } + return; + } + + // All other events are inside a span. + if let Some(span) = ctx.lookup_current() { + let mut event_message = String::new(); + let mut visitor = StringVisitor { + string: &mut event_message, + }; + event.record(&mut visitor); + + let span_context = span.metadata(); + let span_id = Uuid::new_v4(); + + let event_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_else(|_| { + span.extensions() + .get::() + .map(|instant| instant.elapsed()) + .unwrap_or_default() + }); + + let event_time_dt = DateTime::::from(UNIX_EPOCH + event_time) + .format("%Y-%m-%dT%H:%M:%S%.3fZ"); + + let event_value = + format!("Time: {event_time_dt} | Event: {event_message}"); + + let formatted_span_name = format_span_name(span_context); + let diag_event = DiagnosticEvent { + level: event.metadata().level().as_str().to_string(), + name: formatted_span_name, + span_id: span_id.to_string(), + message: event_value, + timestamp: Utc::now(), + }; + + let _ = self.diagnostics.emit(&diag_event); + } + } + + fn on_new_span( + &self, + _attrs: &Attributes<'_>, + id: &Id, + ctx: TracingContext<'_, Sub>, + ) { + let start_instant = MyInstant::now(); + if let Some(span) = ctx.span(id) { + span.extensions_mut().insert(start_instant); + } + } + + fn on_close(&self, id: Id, ctx: TracingContext) { + if let Some(span) = ctx.span(&id) { + let end_time = SystemTime::now(); + + let span_context = span.metadata(); + let span_id = Uuid::new_v4(); + + if let Some(start_instant) = span.extensions().get::() { + let elapsed = start_instant.elapsed(); + + let start_time = + end_time.checked_sub(elapsed).unwrap_or(UNIX_EPOCH); + + let start_time_dt = DateTime::::from(start_time) + .format("%Y-%m-%dT%H:%M:%S%.3fZ"); + + let end_time_dt = DateTime::::from(end_time) + .format("%Y-%m-%dT%H:%M:%S%.3fZ"); + + let event_value = + format!("Start: {start_time_dt} | End: {end_time_dt}"); + + let formatted_span_name = format_span_name(span_context); + let diag_event = DiagnosticEvent { + level: span_context.level().as_str().to_string(), + name: formatted_span_name, + span_id: span_id.to_string(), + message: event_value, + timestamp: Utc::now(), + }; + + let _ = self.diagnostics.emit(&diag_event); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{HyperVKvpStore, InMemoryKvpStore, KvpStore}; + use tempfile::NamedTempFile; + use tracing::{event, instrument, Level}; + use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::Registry; + + fn make_test_layer() -> (InMemoryKvpStore, TracingKvpLayer) + { + let store = InMemoryKvpStore::default(); + let diagnostics = + DiagnosticsKvp::new(store.clone(), "test-vm", "test-prefix"); + let layer = TracingKvpLayer::new(diagnostics); + (store, layer) + } + + #[test] + fn test_on_event_writes_to_store() { + let (store, layer) = make_test_layer(); + let subscriber = Registry::default().with(layer); + + tracing::subscriber::with_default(subscriber, || { + let _span = tracing::info_span!("test_span").entered(); + event!(Level::INFO, msg = "hello from test"); + }); + + let entries = store.entries().unwrap(); + assert!( + !entries.is_empty(), + "Expected at least one entry from the event + span close" + ); + } + + #[test] + fn test_health_report_writes_provisioning_report() { + let (store, layer) = make_test_layer(); + let subscriber = Registry::default().with(layer); + + tracing::subscriber::with_default(subscriber, || { + tracing::info!( + health_report = "result=success|agent=test", + "provisioning done" + ); + }); + + let value = store.read("PROVISIONING_REPORT").unwrap(); + assert_eq!(value, Some("result=success|agent=test".to_string())); + } + + #[test] + fn test_health_report_long_value_is_chunked() { + let tmp = NamedTempFile::new().expect("create temp file"); + let store = HyperVKvpStore::new(tmp.path()); + let diagnostics = DiagnosticsKvp::new(store.clone(), "test-vm", "pfx"); + let layer = TracingKvpLayer::new(diagnostics); + let subscriber = Registry::default().with(layer); + + let long_report = "X".repeat(HV_KVP_AZURE_MAX_VALUE_SIZE * 2 + 17); + + tracing::subscriber::with_default(subscriber, || { + tracing::info!(health_report = %long_report, "long report"); + }); + + let entries = store.entries().expect("read entries"); + let report_entries: Vec<(String, String)> = entries + .into_iter() + .filter(|(k, _)| k == "PROVISIONING_REPORT") + .collect(); + + assert_eq!(report_entries.len(), 3); + assert!(report_entries + .iter() + .all(|(_, v)| v.len() <= HV_KVP_AZURE_MAX_VALUE_SIZE)); + + let reconstructed = report_entries + .iter() + .map(|(_, v)| v.as_str()) + .collect::>() + .join(""); + assert_eq!(reconstructed, long_report); + } + + #[test] + fn test_span_close_emits_start_end() { + let (store, layer) = make_test_layer(); + let subscriber = Registry::default().with(layer); + + tracing::subscriber::with_default(subscriber, || { + let _span = tracing::info_span!("my_span").entered(); + }); + + let entries = store.entries().unwrap(); + assert!(!entries.is_empty()); + + let has_start_end = entries + .iter() + .any(|(_, v)| v.contains("Start:") && v.contains("End:")); + assert!( + has_start_end, + "Expected a span close entry with Start/End timestamps" + ); + } + + #[test] + fn test_format_span_name_strips_prefix() { + let expected = vec![ + ( + "libazureinit::provision::user", + "create_user", + "provision::user:create_user", + ), + ("azure_init::main", "run", "main:run"), + ("my_crate", "my_func", "my_crate:my_func"), + ]; + + for (target, name, want) in expected { + let module_path = target + .strip_prefix("libazureinit::") + .or_else(|| target.strip_prefix("azure_init::")) + .unwrap_or(target); + + let result = if module_path.is_empty() + || module_path == target && target == name + { + name.to_string() + } else { + format!("{module_path}:{name}") + }; + + assert_eq!(result, want, "target={target}, name={name}"); + } + } + + #[test] + fn test_instrumented_function_emits_entries() { + let (store, layer) = make_test_layer(); + let subscriber = Registry::default().with(layer); + + #[instrument] + fn do_work() { + event!(Level::INFO, msg = "working"); + } + + tracing::subscriber::with_default(subscriber, || { + do_work(); + }); + + let entries = store.entries().unwrap(); + assert!( + entries.len() >= 2, + "Expected at least event + span close, got {}", + entries.len() + ); + } +} diff --git a/libazureinit/Cargo.toml b/libazureinit/Cargo.toml index eb4bb4f2..cb8b4c66 100644 --- a/libazureinit/Cargo.toml +++ b/libazureinit/Cargo.toml @@ -25,7 +25,6 @@ opentelemetry = "0.31.0" opentelemetry_sdk = "0.31.0" tracing-opentelemetry = "0.32" tokio-util = "0.7" -sysinfo = "0.38" anyhow = "1" fstab = "0.4.0" toml = "1.0" @@ -36,7 +35,7 @@ uuid = { version = "1.3", features = ["v4"] } chrono = { version = "0.4", features = ["serde"] } csv = "1" zeroize = "1.8" -fs2 = "0.4" +libazureinit-kvp = { path = "../libazureinit-kvp" } [dev-dependencies] tracing-test = { version = "0.2", features = ["no-env-filter"] } diff --git a/libazureinit/src/kvp.rs b/libazureinit/src/kvp.rs deleted file mode 100644 index ae73be50..00000000 --- a/libazureinit/src/kvp.rs +++ /dev/null @@ -1,902 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -//! This module provides core functionality for handling telemetry tracing -//! related to azure-init's interaction with Hyper-V KVP (Key-Value Pair) storage. -//! -//! # Constants -//! - `HV_KVP_EXCHANGE_MAX_KEY_SIZE`: Defines the maximum key size for KVP exchange. -//! - `HV_KVP_EXCHANGE_MAX_VALUE_SIZE`: Defines the maximum value size for KVP exchange. -//! - `HV_KVP_AZURE_MAX_VALUE_SIZE`: Maximum value size before splitting into multiple slices. -//! - -use std::{ - fmt::{self as std_fmt, Write as std_write}, - fs::{File, OpenOptions}, - io::{self, ErrorKind, Write}, - os::unix::fs::MetadataExt, - path::Path, - time::{Duration, Instant, SystemTime, UNIX_EPOCH}, -}; - -use fs2::FileExt; - -use tracing::{ - field::Visit, - span::{Attributes, Id}, - Subscriber, -}; - -use tracing_subscriber::{ - layer::Context as TracingContext, registry::LookupSpan, Layer, -}; - -use sysinfo::System; - -use tokio::{ - sync::{mpsc::UnboundedReceiver, mpsc::UnboundedSender}, - task::JoinHandle, -}; -use tokio_util::sync::CancellationToken; - -use chrono::{DateTime, Utc}; -use uuid::Uuid; - -const HV_KVP_EXCHANGE_MAX_KEY_SIZE: usize = 512; -const HV_KVP_EXCHANGE_MAX_VALUE_SIZE: usize = 2048; -const HV_KVP_AZURE_MAX_VALUE_SIZE: usize = 1022; -const EVENT_PREFIX: &str = concat!("azure-init-", env!("CARGO_PKG_VERSION")); - -/// Encapsulates the KVP (Key-Value Pair) tracing infrastructure. -/// -/// This struct holds both the `tracing` layer (`EmitKVPLayer`) that generates -/// telemetry data and the `JoinHandle` for the background task that writes this -/// data to the KVP file. This allows the caller to manage the lifecycle of the -/// writer task separately from the tracing layer. -pub struct Kvp { - /// The `tracing` layer that captures span and event data and sends it - /// to the KVP writer task. - pub(crate) tracing_layer: EmitKVPLayer, - /// The `JoinHandle` for the background task responsible for writing - /// KVP data to the file. The caller can use this handle to wait for - /// the writer to finish. - pub(crate) writer: JoinHandle>, -} - -impl Kvp { - /// Creates a new `Kvp` instance, spawning a background task for writing - /// KVP telemetry data to a file. - /// - /// This function initializes the necessary components for KVP logging: - /// - It truncates the KVP file if it contains stale data. - /// - It creates an unbounded channel for passing encoded KVP data from the - /// tracing layer to the writer task. - /// - It spawns the `kvp_writer` task, which listens for data and shutdown signals. - pub(crate) fn new( - file_path: std::path::PathBuf, - vm_id: &str, - graceful_shutdown: CancellationToken, - ) -> Result { - truncate_guest_pool_file(&file_path)?; - - let file = OpenOptions::new() - .append(true) - .create(true) - .open(&file_path)?; - - let (events_tx, events_rx): ( - UnboundedSender>, - UnboundedReceiver>, - ) = tokio::sync::mpsc::unbounded_channel(); - - let writer = - tokio::spawn(Self::kvp_writer(file, events_rx, graceful_shutdown)); - - Ok(Self { - tracing_layer: EmitKVPLayer { - events_tx, - vm_id: vm_id.to_string(), - }, - writer, - }) - } - - /// The background task that writes encoded KVP data to a file. - /// - /// This asynchronous function runs in a loop, waiting for two events: - /// 1. Receiving encoded KVP data from the `events` channel, which it then - /// writes to the specified `file`. - /// 2. A cancellation signal from the `token`. - /// - /// Upon receiving the cancellation signal, it stops accepting new events, - /// drains the `events` channel of any remaining messages, and writes them - /// to the file before exiting gracefully. - /// - /// KVP messages are batched together to minimize the number of lock/unlock - /// and flush operations, improving efficiency. - async fn kvp_writer( - mut file: File, - mut events: UnboundedReceiver>, - token: CancellationToken, - ) -> io::Result<()> { - loop { - tokio::select! { - biased; - - Some(encoded_kvp) = events.recv() => { - // Collect this message and any other immediately available ones - let mut batch = vec![encoded_kvp]; - while let Ok(kvp) = events.try_recv() { - batch.push(kvp); - } - - if let Err(e) = Self::write_kvps(&mut file, &batch) { - eprintln!("Failed to write KVP batch: {e}"); - } - } - - _ = token.cancelled() => { - // Shutdown signal received. - // close the channel and drain remaining messages. - events.close(); - - let mut batch = Vec::new(); - while let Some(encoded_kvp) = events.recv().await { - batch.push(encoded_kvp); - } - - if !batch.is_empty() { - if let Err(e) = Self::write_kvps(&mut file, &batch) { - eprintln!("Failed to write KVP batch during shutdown: {e}"); - } - } - - // All messages are drained, exit the loop. - break; - } - } - } - Ok(()) - } - - /// Writes a batch of KVP messages to the file with a single lock/unlock cycle. - /// - /// This method takes the exclusive lock once, writes all messages in the batch, - /// flushes once, and then unlocks. - /// - /// # Arguments - /// * `file` - A mutable reference to the file to write to. - /// * `kvps` - A slice of encoded KVP messages to write. - fn write_kvps(file: &mut File, kvps: &[Vec]) -> io::Result<()> { - FileExt::lock_exclusive(file).map_err(|e| { - io::Error::other(format!("Failed to lock KVP file: {e}")) - })?; - - let write_result = (|| -> io::Result<()> { - for kvp in kvps { - file.write_all(kvp)?; - } - file.flush() - })(); - - let unlock_result = FileExt::unlock(file).map_err(|e| { - io::Error::other(format!("Failed to unlock KVP file: {e}")) - }); - - // Make sure the exclusive lock is released even when writing or flushing fails. - if let Err(err) = write_result { - if let Err(ref unlock_err) = unlock_result { - eprintln!("Failed to unlock KVP file after write failure: {unlock_err}"); - } - return Err(err); - } - - unlock_result - } -} - -/// A wrapper around `std::time::Instant` that provides convenient methods -/// for time tracking in spans and events. Implements the `Deref` trait, allowing -/// access to the underlying `Instant` methods. -/// -/// This struct captures the start time of spans/events and measures the elapsed time. -#[derive(Clone)] -struct MyInstant(Instant); - -impl std::ops::Deref for MyInstant { - type Target = Instant; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} -impl MyInstant { - pub fn now() -> Self { - MyInstant(Instant::now()) - } -} - -/// A custom visitor that captures the value of the `msg` field from a tracing event. -/// It implements the `tracing::field::Visit` trait and records the value into -/// a provided mutable string reference. -/// -/// This visitor is primarily used in the `on_event` method of the `EmitKVPLayer` -/// to extract event messages and log them as key-value pairs. -pub struct StringVisitor<'a> { - string: &'a mut String, -} - -impl Visit for StringVisitor<'_> { - /// Records the debug representation of the event's value and stores it in the provided string. - /// - /// # Arguments - /// * `_field` - A reference to the event's field metadata. - /// * `value` - The debug value associated with the field. - fn record_debug( - &mut self, - field: &tracing::field::Field, - value: &dyn std_fmt::Debug, - ) { - if !self.string.is_empty() { - self.string.push_str(", "); - } - write!(self.string, "{}={:?}", field.name(), value) - .expect("Writing to a string should never fail"); - } -} - -/// A custom tracing layer that emits span and event data as key-value pairs (KVP) -/// to a file for Hyper-V telemetry consumption. The layer manages the asynchronous -/// writing of telemetry data to a specified file in KVP format. -/// -/// `EmitKVPLayer` initializes the file at creation, manages a dedicated writer -/// task, and provides functions to send encoded data for logging. -pub struct EmitKVPLayer { - events_tx: UnboundedSender>, - vm_id: String, -} - -impl EmitKVPLayer { - /// Sends encoded KVP data to the writer task for asynchronous logging. - /// - /// # Arguments - /// * `message` - The encoded data to send as a vector of bytes (Vec). - fn send_event(&self, message: Vec) { - let _ = self.events_tx.send(message); - } - - /// Handles the orchestration of key-value pair (KVP) encoding and logging operations - /// by generating a unique event key, encoding it with the provided value, and sending - /// it to the `EmitKVPLayer` for logging. - fn handle_kvp_operation( - &self, - event_level: &str, - event_name: &str, - span_id: &str, - event_value: &str, - ) { - let event_key = - generate_event_key(&self.vm_id, event_level, event_name, span_id); - let encoded_kvp = encode_kvp_item(&event_key, event_value); - let encoded_kvp_flattened: Vec = encoded_kvp.concat(); - self.send_event(encoded_kvp_flattened); - } -} - -impl Layer for EmitKVPLayer -where - S: Subscriber + for<'lookup> LookupSpan<'lookup>, -{ - /// Handles event occurrences within a span, capturing and recording the event's message - /// and context metadata as key-value pairs (KVP) for logging. - /// - /// This function extracts the event's `msg` field using `StringVisitor`, constructs a - /// formatted event string, and then encodes it as KVP data to be sent to the - /// `EmitKVPLayer` for asynchronous file storage. - /// - /// If an `ERROR` level event is encountered, it marks the span's status as a failure, - /// which will be reflected in the span's data upon closure. - /// - /// Additionally, this function checks if the event contains a `health_report` field. - /// If present, the event is delegated to [`handle_health_report`] to be uniquely formatted. - /// - /// # Arguments - /// * `event` - The tracing event instance containing the message and metadata. - /// * `ctx` - The current tracing context, which is used to access the span associated - /// with the event. - /// - /// # Example - /// ```rust - /// use tracing::{event, Level}; - /// event!(Level::INFO, msg = "Event message"); - /// ``` - fn on_event(&self, event: &tracing::Event<'_>, ctx: TracingContext<'_, S>) { - // Check for health_report events, as they exist outside of a span. - let mut health_report = None; - event.record( - &mut |field: &tracing::field::Field, - value: &dyn std::fmt::Debug| { - if field.name() == "health_report" { - health_report = Some( - format!("{value:?}").trim_matches('"').to_string(), - ); - } - }, - ); - - if let Some(report_str) = health_report { - let msg = - encode_kvp_item("PROVISIONING_REPORT", &report_str).concat(); - self.send_event(msg); - return; - } - - // All other events are inside a span. - if let Some(span) = ctx.lookup_current() { - let mut event_message = String::new(); - let mut visitor = StringVisitor { - string: &mut event_message, - }; - event.record(&mut visitor); - - let span_context = span.metadata(); - let span_id: Uuid = Uuid::new_v4(); - - let event_time = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_else(|_| { - span.extensions() - .get::() - .map(|instant| instant.elapsed()) - .unwrap_or_default() - }); - - let event_time_dt = DateTime::::from(UNIX_EPOCH + event_time) - .format("%Y-%m-%dT%H:%M:%S%.3fZ"); - - let event_value = - format!("Time: {event_time_dt} | Event: {event_message}"); - - let formatted_span_name = format_span_name(span_context); - self.handle_kvp_operation( - event.metadata().level().as_str(), - &formatted_span_name, - &span_id.to_string(), - &event_value, - ); - } - } - - /// Called when a new span is created. Records the start time of the span - /// and stores it as an extension within the span's context, to be used - /// for generating telemetry data for Hyper-V. - fn on_new_span( - &self, - _attrs: &Attributes<'_>, - id: &Id, - ctx: TracingContext<'_, S>, - ) { - let start_instant = MyInstant::now(); - if let Some(span) = ctx.span(id) { - span.extensions_mut().insert(start_instant); - } - } - /// Called when a span is closed, finalizing and logging the span's data. This method - /// records the span's start and end times, status (e.g., success or failure), and other metadata, - /// then sends it to `EmitKVPLayer` for KVP logging. - /// - /// If any errors were recorded in the span (such as `ERROR` level events), the span - /// status is marked as `Failure`; otherwise, it is marked as `Success`. - /// - /// # Arguments - /// * `id` - The unique identifier for the span. - /// * `ctx` - The current tracing context, used to access the span's metadata and status. - fn on_close(&self, id: Id, ctx: TracingContext) { - if let Some(span) = ctx.span(&id) { - let end_time = SystemTime::now(); - - let span_context = span.metadata(); - let span_id = Uuid::new_v4(); - - if let Some(start_instant) = span.extensions().get::() { - let elapsed = start_instant.elapsed(); - - let start_time = - end_time.checked_sub(elapsed).unwrap_or(UNIX_EPOCH); - - let start_time_dt = DateTime::::from(start_time) - .format("%Y-%m-%dT%H:%M:%S%.3fZ"); - - let end_time_dt = DateTime::::from(end_time) - .format("%Y-%m-%dT%H:%M:%S%.3fZ"); - - let event_value = - format!("Start: {start_time_dt} | End: {end_time_dt}"); - - let formatted_span_name = format_span_name(span_context); - self.handle_kvp_operation( - span_context.level().as_str(), - &formatted_span_name, - &span_id.to_string(), - &event_value, - ); - } - } - } -} - -/// Generates a unique event key by combining the event level, name, and span ID. -/// -/// # Arguments -/// * `event_level` - The logging level (e.g., "INFO", "DEBUG"). -/// * `event_name` - The name of the event. -/// * `span_id` - A unique identifier for the span. -fn generate_event_key( - vm_id: &str, - event_level: &str, - event_name: &str, - span_id: &str, -) -> String { - format!("{EVENT_PREFIX}|{vm_id}|{event_level}|{event_name}|{span_id}") -} - -/// Encodes a key-value pair (KVP) into one or more byte slices. If the value -/// exceeds the allowed size, it is split into multiple slices for encoding. -/// This is used for logging events to a KVP file. -/// -/// # Note -/// - The key is zero-padded to `HV_KVP_EXCHANGE_MAX_KEY_SIZE`, and -/// the value is zero-padded to `HV_KVP_AZURE_MAX_VALUE_SIZE` to meet -/// Hyper-V's expected formatting. -/// -/// # Arguments -/// * `key` - The key as a string slice. -/// * `value` - The value associated with the key. -fn encode_kvp_item(key: &str, value: &str) -> Vec> { - let key_buf = key - .as_bytes() - .iter() - .take(HV_KVP_EXCHANGE_MAX_KEY_SIZE) - .chain( - vec![0_u8; HV_KVP_EXCHANGE_MAX_KEY_SIZE.saturating_sub(key.len())] - .iter(), - ) - .copied() - .collect::>(); - - debug_assert!(key_buf.len() == HV_KVP_EXCHANGE_MAX_KEY_SIZE); - - let kvp_slices = value - .as_bytes() - .chunks(HV_KVP_AZURE_MAX_VALUE_SIZE) - .map(|chunk| { - let mut buffer = Vec::with_capacity( - HV_KVP_EXCHANGE_MAX_KEY_SIZE + HV_KVP_EXCHANGE_MAX_VALUE_SIZE, - ); - buffer.extend_from_slice(&key_buf); - buffer.extend_from_slice(chunk); - while buffer.len() - < HV_KVP_EXCHANGE_MAX_KEY_SIZE + HV_KVP_EXCHANGE_MAX_VALUE_SIZE - { - buffer.push(0); - } - - buffer - }) - .collect::>>(); - - debug_assert!(kvp_slices.iter().all(|kvp| kvp.len() - == HV_KVP_EXCHANGE_MAX_KEY_SIZE + HV_KVP_EXCHANGE_MAX_VALUE_SIZE)); - - kvp_slices -} - -/// Decodes a KVP byte slice into its corresponding key and value strings. -/// This is useful for inspecting or logging raw KVP data. -#[cfg(test)] -pub fn decode_kvp_item( - record_data: &[u8], -) -> Result<(String, String), &'static str> { - let record_data_len = record_data.len(); - let expected_len = - HV_KVP_EXCHANGE_MAX_KEY_SIZE + HV_KVP_EXCHANGE_MAX_VALUE_SIZE; - - if record_data_len != expected_len { - return Err("record_data len not correct."); - } - - let key = String::from_utf8( - record_data - .iter() - .take(HV_KVP_EXCHANGE_MAX_KEY_SIZE) - .cloned() - .collect::>(), - ) - .unwrap_or_else(|_| String::new()) - .trim_end_matches('\x00') - .to_string(); - - let value = String::from_utf8( - record_data - .iter() - .skip(HV_KVP_EXCHANGE_MAX_KEY_SIZE) - .take(HV_KVP_AZURE_MAX_VALUE_SIZE) - .cloned() - .collect::>(), - ) - .unwrap_or_else(|_| String::new()) - .trim_end_matches('\x00') - .to_string(); - - Ok((key, value)) -} - -/// Truncates the guest pool KVP file if it contains stale data (i.e., data -/// older than the system's boot time). Logs whether the file was truncated -/// or no action was needed. -fn truncate_guest_pool_file(kvp_file: &Path) -> Result<(), anyhow::Error> { - let boot_time = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() - - get_uptime().as_secs(); - - match kvp_file.metadata() { - Ok(metadata) => { - if metadata.mtime() < boot_time as i64 { - OpenOptions::new() - .write(true) - .truncate(true) - .open(kvp_file)?; - println!("Truncated the KVP file due to stale data."); - } else { - println!( - "File has been truncated since boot, no action taken." - ); - } - } - Err(ref e) if e.kind() == ErrorKind::NotFound => { - println!("File not found: {kvp_file:?}"); - return Ok(()); - } - Err(e) => { - return Err(anyhow::Error::from(e) - .context("Failed to access file metadata")); - } - } - - Ok(()) -} - -/// Retrieves the system's uptime using the `sysinfo` crate, returning the duration -/// since the system booted. This can be useful for time-based calculations or checks, -/// such as determining whether data is stale or calculating the approximate boot time. -fn get_uptime() -> Duration { - let mut system = System::new(); - system.refresh_memory(); - system.refresh_cpu_usage(); - - let uptime_seconds = System::uptime(); - Duration::from_secs(uptime_seconds) -} - -/// Given a span's metadata, this constructs a span name in the format -/// `module:function`. -/// -/// # Examples -/// - Target: `azure_init`, Name: `provision` -> `provision` -/// - Target: `libazureinit::provision::user`, Name: `create_user` -> `provision:user:create_user` -fn format_span_name(metadata: &tracing::Metadata<'_>) -> String { - let target = metadata.target(); - let name = metadata.name(); - - // Strip common crate prefixes - let module_path = target - .strip_prefix("libazureinit::") - .or_else(|| target.strip_prefix("azure_init::")) - .unwrap_or(target); - - // If there's a module path after stripping, format as module:name - // Otherwise just use the name - if module_path.is_empty() || module_path == target && target == name { - name.to_string() - } else { - format!("{module_path}:{name}") - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::config::{Config, Telemetry}; - use crate::error::Error as LibError; - use crate::health::encoded_success_report; - use tempfile::NamedTempFile; - use tokio::time::{sleep, Duration}; - use tracing::instrument; - use tracing::{event, Level}; - use tracing_subscriber::{layer::SubscriberExt, Registry}; - - #[instrument] - async fn mock_child_function(index: usize) { - event!( - Level::INFO, - msg = format!("Event in child span for item {}", index) - ); - sleep(Duration::from_millis(200)).await; - } - - #[instrument] - async fn mock_provision(vm_id: &str) -> Result<(), anyhow::Error> { - let mut system = System::new(); - system.refresh_memory(); - system.refresh_cpu_usage(); - - let kernel_version = System::kernel_version() - .unwrap_or("Unknown Kernel Version".to_string()); - let os_version = - System::os_version().unwrap_or("Unknown OS Version".to_string()); - let azure_init_version = env!("CARGO_PKG_VERSION"); - - event!( - Level::INFO, - msg = format!( - "Kernel Version: {}, OS Version: {}, Azure-Init Version: {}", - kernel_version, os_version, azure_init_version - ) - ); - - event!(Level::INFO, msg = "Provisioning started"); - - mock_child_function(0).await; - sleep(Duration::from_millis(300)).await; - - let success_report = - encoded_success_report(vm_id, Some(("origin", "mock_source"))); - event!( - Level::INFO, - health_report = %success_report, - "Provisioning completed successfully" - ); - - Ok(()) - } - - #[instrument] - async fn mock_failure_function(vm_id: &str) -> Result<(), anyhow::Error> { - let error_message = "Simulated failure during processing"; - let err = LibError::UnhandledError { - details: error_message.to_string(), - }; - let failure_report = err.as_encoded_report(vm_id); - - event!( - Level::ERROR, - health_report = %failure_report, - "Provisioning failed" - ); - - sleep(Duration::from_millis(100)).await; - Err(anyhow::anyhow!(error_message)) - } - - #[tokio::test] - async fn test_emit_kvp_layer() { - let temp_file = - NamedTempFile::new().expect("Failed to create tempfile"); - let temp_path = temp_file.path().to_path_buf(); - - let test_vm_id = "00000000-0000-0000-0000-000000000001"; - - let graceful_shutdown = CancellationToken::new(); - let kvp = - Kvp::new(temp_path.clone(), test_vm_id, graceful_shutdown.clone()) - .expect("Failed to create Kvp"); - - let subscriber = Registry::default().with(kvp.tracing_layer); - let default_guard = tracing::subscriber::set_default(subscriber); - - let _ = mock_provision(test_vm_id).await; - let _ = mock_failure_function(test_vm_id).await; - - drop(default_guard); - graceful_shutdown.cancel(); - kvp.writer - .await - .expect("KVP writer task panicked") - .expect("KVP writer task returned an IO error"); - - let contents = - std::fs::read(temp_path).expect("Failed to read temp file"); - println!("Contents of the file (in bytes):\n{contents:?}"); - - let slice_size = - HV_KVP_EXCHANGE_MAX_KEY_SIZE + HV_KVP_EXCHANGE_MAX_VALUE_SIZE; - - let num_slices = contents.len().div_ceil(slice_size); - let expected_len = num_slices * slice_size; - - assert_eq!( - contents.len(), - expected_len, - "Encoded buffer length is incorrect. Expected {} but got {}", - expected_len, - contents.len() - ); - - let mut found_success = false; - let mut found_failure = false; - - for i in 0..num_slices { - let start = i * slice_size; - let end = start + slice_size; - let slice = &contents[start..end]; - - println!("Processing slice {i}: start={start}, end={end}"); - println!("Slice length: {}", slice.len()); - - let key_section = &slice[..HV_KVP_EXCHANGE_MAX_KEY_SIZE]; - let value_section = &slice[HV_KVP_EXCHANGE_MAX_KEY_SIZE..]; - - match decode_kvp_item(slice) { - Ok((key, value)) => { - println!("Decoded KVP - Key: {key}"); - println!("Decoded KVP - Value: {value}\n"); - - // Check for success or failure reports - if key == "PROVISIONING_REPORT" - && value.contains("result=success") - { - found_success = true; - } - - if key == "PROVISIONING_REPORT" - && value.contains("result=error") - { - found_failure = true; - } - } - Err(e) => { - panic!("Failed to decode KVP: {e}"); - } - } - - assert!( - key_section.iter().any(|&b| b != 0), - "Key section in slice {i} should contain non-zero bytes" - ); - - assert!( - value_section.iter().any(|&b| b != 0), - "Value section in slice {i} should contain non-zero bytes" - ); - } - - assert!( - found_success, - "Expected to find a 'result=success' entry but did not." - ); - assert!( - found_failure, - "Expected to find a 'result=error' entry but did not." - ); - } - - #[tokio::test] - async fn test_truncate_guest_pool_file() { - let temp_file = - NamedTempFile::new().expect("Failed to create tempfile"); - let temp_path = temp_file.path().to_path_buf(); - - std::fs::write(&temp_path, "Some initial data") - .expect("Failed to write initial data"); - - let result = truncate_guest_pool_file(&temp_path); - - assert!( - result.is_ok(), - "truncate_guest_pool_file returned an error: {result:?}", - ); - - if let Ok(contents) = std::fs::read_to_string(&temp_path) { - if contents.is_empty() { - println!("File was truncated as expected."); - } else { - println!("File was not truncated (this is expected if file has been truncated since boot)."); - } - } else { - panic!("Failed to read the temp file after truncation attempt."); - } - } - - #[test] - fn test_encode_kvp_item_value_length() { - let key = "test_key"; - let value = "A".repeat(HV_KVP_AZURE_MAX_VALUE_SIZE * 2 + 50); - - let encoded_slices = encode_kvp_item(key, &value); - - assert!( - !encoded_slices.is_empty(), - "Encoded slices should not be empty" - ); - - for (i, slice) in encoded_slices.iter().enumerate() { - assert_eq!( - slice.len(), - HV_KVP_EXCHANGE_MAX_KEY_SIZE + HV_KVP_EXCHANGE_MAX_VALUE_SIZE, - "Slice {i} length is incorrect", - ); - - let (decoded_key, decoded_value) = - decode_kvp_item(slice).expect("Failed to decode slice"); - - println!("Slice {i}: Key: {decoded_key}"); - println!( - "Slice {i}: Value (length {}): {decoded_value}", - decoded_value.len() - ); - - assert_eq!(decoded_key, key, "Key mismatch in slice {i}"); - assert!( - decoded_value.len() <= HV_KVP_AZURE_MAX_VALUE_SIZE, - "Value length exceeds limit in slice {i}: {} > {HV_KVP_AZURE_MAX_VALUE_SIZE}", - decoded_value.len() - ); - } - - println!("All slices adhere to Azure's max value size limit."); - } - - #[tokio::test] - async fn test_emit_kvp_layer_disabled() { - let temp_file = - NamedTempFile::new().expect("Failed to create tempfile"); - let temp_path = temp_file.path().to_path_buf(); - - let test_vm_id = "00000000-0000-0000-0000-000000000002"; - - let telemetry_config = Telemetry { - kvp_diagnostics: false, - kvp_filter: None, - }; - - let config = Config { - telemetry: telemetry_config, - ..Default::default() - }; - - let kvp_enabled = config.telemetry.kvp_diagnostics; - - let graceful_shutdown = CancellationToken::new(); - let emit_kvp_layer = if kvp_enabled { - Some( - Kvp::new( - temp_path.clone(), - test_vm_id, - graceful_shutdown.clone(), - ) - .expect("Failed to create Kvp") - .tracing_layer, - ) - } else { - None - }; - - let subscriber = Registry::default().with(emit_kvp_layer); - let default_guard = tracing::subscriber::set_default(subscriber); - - let _ = mock_provision(test_vm_id).await; - - sleep(Duration::from_secs(1)).await; - - drop(default_guard); - - let contents = - std::fs::read(temp_path).expect("Failed to read temp file"); - - assert!( - contents.is_empty(), - "KVP file should be empty because kvp_diagnostics is disabled, but found data: {contents:?}", - ); - - println!("KVP file is empty as expected because kvp_diagnostics is disabled."); - } -} diff --git a/libazureinit/src/lib.rs b/libazureinit/src/lib.rs index 00c96283..209ce049 100644 --- a/libazureinit/src/lib.rs +++ b/libazureinit/src/lib.rs @@ -10,7 +10,6 @@ pub mod error; pub mod health; pub(crate) mod http; pub mod imds; -mod kvp; pub mod logging; pub mod media; diff --git a/libazureinit/src/logging.rs b/libazureinit/src/logging.rs index fb5c543a..7572e06b 100644 --- a/libazureinit/src/logging.rs +++ b/libazureinit/src/logging.rs @@ -5,23 +5,15 @@ use opentelemetry::{global, trace::TracerProvider}; use opentelemetry_sdk::trace::{self as sdktrace, Sampler, SdkTracerProvider}; use std::fs::{OpenOptions, Permissions}; use std::os::unix::fs::PermissionsExt; -use tokio::task::JoinHandle; -use tokio_util::sync::CancellationToken; use tracing::{event, Level, Subscriber}; use tracing_opentelemetry::OpenTelemetryLayer; use tracing_subscriber::fmt::format::FmtSpan; use tracing_subscriber::{ - filter::Filtered, fmt, layer::SubscriberExt, registry::LookupSpan, - EnvFilter, Layer, Registry, + fmt, layer::SubscriberExt, EnvFilter, Layer, Registry, }; use crate::config::Config; -use crate::kvp::{EmitKVPLayer, Kvp as KvpInternal}; - -pub type LoggingSetup = ( - Box, - Option>>, -); +pub use libazureinit_kvp::{Kvp, KvpOptions}; fn initialize_tracing() -> sdktrace::Tracer { let provider = SdkTracerProvider::builder() @@ -121,99 +113,6 @@ fn get_kvp_filter( } } -// Public KVP wrapper API for library consumers -struct KvpLayer(Filtered); - -/// Emit tracing data to the Hyper-V KVP. -/// -/// ## KVP Tracing Configuration -/// -/// The KVP tracing layer's filter can be configured at runtime by setting the -/// `AZURE_INIT_KVP_FILTER` environment variable. This allows any application -/// using this library to override the default filter and control which traces -/// are sent to the KVP pool. -/// -/// The value of the variable must be a string that follows the syntax for -/// `tracing_subscriber::EnvFilter`, which is a comma-separated list of -/// logging directives. For example: `warn,my_crate=debug` or `info,my_crate::api=trace`. -/// See `config.rs` for more details. -/// -/// The filter can also be configured via the `kvp_filter` field in the `Config` struct. -/// **Precedence**: Environment variable > Config field > Default filter. -/// If neither is set, a default filter tailored for `azure-init` (WARN level + specific modules) is used. -/// -/// If an invalid filter string is provided (via env or config), a warning is logged -/// and the default filter is used instead. -/// -/// # Example -/// -/// ```no_run -/// # use libazureinit::logging::Kvp; -/// use tracing_subscriber::layer::SubscriberExt; -/// -/// # #[tokio::main] -/// # async fn main() -> anyhow::Result<()> { -/// let mut kvp = Kvp::new("a-unique-id")?; -/// let registry = tracing_subscriber::Registry::default().with(kvp.layer()); -/// -/// // When it's time to shut down, doing this ensures all writes are flushed -/// kvp.halt().await?; -/// # Ok(()) -/// # } -/// ``` -pub struct Kvp { - layer: Option>, - /// The `JoinHandle` for the background task responsible for writing - /// KVP data to the file. The caller can use this handle to wait for - /// the writer to finish. - writer: JoinHandle>, - shutdown: CancellationToken, -} - -impl LookupSpan<'lookup>> Kvp { - /// Create a new tracing layer for KVP. - /// - /// Refer to [`libazureinit::get_vm_id`] to retrieve the VM's unique identifier. - pub fn new>(vm_id: T) -> Result { - let shutdown = CancellationToken::new(); - let inner = KvpInternal::new( - std::path::PathBuf::from("/var/lib/hyperv/.kvp_pool_1"), - vm_id.as_ref(), - shutdown.clone(), - )?; - - let kvp_filter = get_kvp_filter(None)?; - let layer = Some(KvpLayer(inner.tracing_layer.with_filter(kvp_filter))); - - Ok(Self { - layer, - writer: inner.writer, - shutdown, - }) - } - - /// Get a tracing [`Layer`] to use with a [`Registry`]. - /// - /// # Panics if this function is called more than once. - pub fn layer(&mut self) -> Filtered { - assert!( - self.layer.is_some(), - "Kvp::layer cannot be called multiple times!" - ); - self.layer.take().unwrap().0 - } - - /// Gracefully shut down the KVP writer. - /// - /// This will stop new KVP logs from being queued and wait for all pending writes to the KVP - /// pool to complete. After this returns, no further logs will be written to KVP. - pub async fn halt(self) -> Result<(), anyhow::Error> { - self.shutdown.cancel(); - self.writer.await??; - Ok(()) - } -} - /// Builds a `tracing` subscriber that can optionally write azure-init.log /// to a specific location if `Some(&Config)` is provided. /// @@ -227,8 +126,7 @@ impl LookupSpan<'lookup>> Kvp { pub fn setup_layers( vm_id: &str, config: &Config, - graceful_shutdown: CancellationToken, -) -> Result { +) -> Result, anyhow::Error> { let tracer = initialize_tracing(); let otel_layer = OpenTelemetryLayer::new(tracer).with_filter( EnvFilter::try_from_env("AZURE_INIT_LOG") @@ -237,22 +135,13 @@ pub fn setup_layers( let kvp_filter = get_kvp_filter(config.telemetry.kvp_filter.as_deref())?; - let (emit_kvp_layer, kvp_writer_handle) = if config - .telemetry - .kvp_diagnostics - { - match KvpInternal::new( - std::path::PathBuf::from("/var/lib/hyperv/.kvp_pool_1"), - vm_id, - graceful_shutdown, - ) { - Ok(kvp) => { - let layer = kvp.tracing_layer.with_filter(kvp_filter); - (Some(layer), Some(kvp.writer)) - } + let emit_kvp_layer = if config.telemetry.kvp_diagnostics { + let options = KvpOptions::default().vm_id(vm_id); + match Kvp::with_options(options) { + Ok(kvp) => Some(kvp.tracing_layer.with_filter(kvp_filter)), Err(e) => { event!(Level::ERROR, "Failed to initialize Kvp: {}. Continuing without KVP logging.", e); - (None, None) + None } } } else { @@ -260,7 +149,7 @@ pub fn setup_layers( Level::INFO, "Hyper-V KVP diagnostics are disabled via config. It is recommended to be enabled for support purposes." ); - (None, None) + None }; let stderr_layer = fmt::layer() @@ -315,7 +204,7 @@ pub fn setup_layers( .with(emit_kvp_layer) .with(file_layer); - Ok((Box::new(subscriber), kvp_writer_handle)) + Ok(Box::new(subscriber)) } #[cfg(test)] @@ -531,10 +420,9 @@ mod tests { std::env::remove_var("AZURE_INIT_KVP_FILTER"); } - #[tokio::test] + #[test] #[serial] - async fn test_azure_init_log() { - // Redirect stderr to a buffer to keep the main test output clean from ERROR logs. + fn test_azure_init_log() { let _buf = BufferRedirect::stderr().unwrap(); let log_file = NamedTempFile::new().expect("Failed to create tempfile"); @@ -545,11 +433,9 @@ mod tests { config.telemetry.kvp_diagnostics = false; let vm_id = "test-vm-id-for-logging"; - let graceful_shutdown = CancellationToken::new(); - let (subscriber, _kvp_handle) = - setup_layers(vm_id, &config, graceful_shutdown.clone()) - .expect("Failed to setup layers"); + let subscriber = + setup_layers(vm_id, &config).expect("Failed to setup layers"); tracing::subscriber::with_default(subscriber, || { tracing::trace!( @@ -569,8 +455,7 @@ mod tests { ); }); - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - graceful_shutdown.cancel(); + std::thread::sleep(std::time::Duration::from_millis(100)); let log_contents = std::fs::read_to_string(&log_path) .expect("Failed to read log file"); @@ -592,21 +477,18 @@ mod tests { && line.contains("should be logged to the file"))); } - #[tokio::test] + #[test] #[serial] - async fn test_stderr_logger_defaults_to_error() { + fn test_stderr_logger_defaults_to_error() { let mut config = Config::default(); config.telemetry.kvp_diagnostics = false; let test_vm_id = "00000000-0000-0000-0000-000000000000"; - let graceful_shutdown = CancellationToken::new(); - // Redirect stderr to a buffer let mut buf = BufferRedirect::stderr().unwrap(); - let (subscriber, _kvp_handle) = - setup_layers(test_vm_id, &config, graceful_shutdown.clone()) - .expect("Failed to setup layers"); + let subscriber = + setup_layers(test_vm_id, &config).expect("Failed to setup layers"); tracing::subscriber::with_default(subscriber, || { tracing::info!( @@ -620,14 +502,13 @@ mod tests { ); }); - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - graceful_shutdown.cancel(); + std::thread::sleep(std::time::Duration::from_millis(100)); let mut stderr_contents = String::new(); buf.read_to_string(&mut stderr_contents) .expect("Failed to read from stderr buffer"); - drop(buf); // release stderr + drop(buf); assert!(!stderr_contents.contains("This is an info message")); assert!(!stderr_contents.contains("This is a warn message")); diff --git a/src/main.rs b/src/main.rs index 2b3812f5..2915b4cd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,7 +20,6 @@ use libazureinit::{ use std::process::ExitCode; use std::time::Duration; use sysinfo::System; -use tokio_util::sync::CancellationToken; use tracing::instrument; use tracing_subscriber::{prelude::*, Layer}; @@ -249,8 +248,6 @@ async fn main() -> ExitCode { return ExitCode::SUCCESS; } - let graceful_shutdown = CancellationToken::new(); - let temp_layer = tracing_subscriber::fmt::layer() .with_span_events(tracing_subscriber::fmt::format::FmtSpan::NONE) .with_writer(std::io::stderr) @@ -264,17 +261,16 @@ async fn main() -> ExitCode { let setup_result = tracing::subscriber::with_default(temp_subscriber, || { let config = Config::load(opts.config.clone())?; - let (subscriber, rx) = - setup_layers(&vm_id, &config, graceful_shutdown.clone())?; + let subscriber = setup_layers(&vm_id, &config)?; if let Err(e) = tracing::subscriber::set_global_default(subscriber) { eprintln!("Failed to set global default subscriber: {e}"); } - Ok::<_, anyhow::Error>((config, rx)) + Ok::<_, anyhow::Error>(config) }); - let (config, kvp_completion_rx) = match setup_result { - Ok((config, rx)) => (config, rx), + let config = match setup_result { + Ok(config) => config, Err(error) => { eprintln!("Failed to load configuration: {error:?}"); eprintln!("Example configuration:\n\n{}", Config::default()); @@ -372,24 +368,6 @@ async fn main() -> ExitCode { } }; - if let Some(handle) = kvp_completion_rx { - graceful_shutdown.cancel(); - - match handle.await { - Ok(Ok(_)) => { - tracing::info!("KVP writer task finished successfully."); - } - Ok(Err(io_err)) => { - tracing::warn!( - "KVP writer task finished with an IO error: {:?}", - io_err - ); - } - Err(join_err) => { - tracing::warn!("KVP writer task panicked: {:?}", join_err); - } - } - } exit_code }