diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ad7183..c8b1b66 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ Add auth scheme and write option support. Preserve explicit ports and strip userinfo from normalized hosts. Remove the legacy `bucket` and `INFLUX_BUCKET` aliases in favor of `database` and `INFLUX_DATABASE`. +2. [#19](https://github.com/InfluxCommunity/influxdb3-rust/pull/19): Default writes to the V2 API endpoint. Add builder methods for write defaults. + `no_sync` requires `use_v2_api=false` to write to the V3 API endpoint. + `accept_partial` applies only when writes are sent to the V3 API endpoint and is ignored otherwise. ### Bug Fixes diff --git a/README.md b/README.md index 0666fc0..1e47268 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ Or add it to `Cargo.toml`: ```toml [dependencies] -influxdb3-client = "0.1" +influxdb3-client = "0.2" tokio = { version = "1", features = ["full"] } ``` @@ -61,7 +61,7 @@ The optional `polars` feature adds DataFrame writes and query-to-DataFrame conversion: ```toml -influxdb3-client = { version = "0.1", features = ["polars"] } +influxdb3-client = { version = "0.2", features = ["polars"] } ``` ## Configuring a client @@ -93,6 +93,18 @@ environment: let client = influxdb3_client::Client::from_env().await?; ``` +Optional environment variables configure the same write defaults used by the +builder: + +- `INFLUX_AUTH_SCHEME`: authentication scheme, such as `Bearer` or `Token`. +- `INFLUX_ORG`: organization name for V2 write compatibility. +- `INFLUX_PRECISION`: write precision (`ns`, `us`, `ms`, or `s`). +- `INFLUX_GZIP_THRESHOLD`: gzip write bodies larger than this many bytes. +- `INFLUX_WRITE_NO_SYNC`: skip WAL synchronization on V3 writes; requires + `INFLUX_WRITE_USE_V2_API=false`. +- `INFLUX_WRITE_ACCEPT_PARTIAL`: allow partial success on V3 writes. +- `INFLUX_WRITE_USE_V2_API`: use the V2 write endpoint. + Or parse a connection string: ```rust @@ -101,6 +113,10 @@ let client = influxdb3_client::Client::from_connection_string( ).await?; ``` +Connection strings support the matching query parameters: `token`, `database`, +`org`, `authScheme`, `precision`, `gzipThreshold`, `writeNoSync`, +`writeAcceptPartial`, and `writeUseV2Api`. + The Arrow Flight channel used for queries is opened lazily on the first query, so constructing a client never blocks on query connectivity. @@ -142,18 +158,26 @@ client.write(points) .batch_size(10_000) // points per HTTP request .max_inflight(8) // concurrent in-flight requests .default_tag("region", "us-east") - .no_sync() // acknowledge before WAL sync + .tag_order(["region", "host"]) .await?; ``` Large inputs are split into batches and sent as multiple pipelined requests; one batch buffer is held in memory at a time. +The first write defines physical tag column order, which can affect query +performance. Use `.tag_order(...)` to serialize frequently filtered tags first. +Listed tags are emitted first when present; remaining tags are appended in +deterministic lexicographic order. For background, see +[Sort tags by query priority](https://docs.influxdata.com/influxdb3/core/write-data/best-practices/optimize-writes/#sort-tags-by-query-priority). + ### High-throughput ingest For sustained, high-volume writes the throughput levers are `batch_size` (points -per request), `max_inflight` (concurrent requests per call), and `no_sync()` -(acknowledge before the WAL is synced, trading durability for speed). +per request) and `max_inflight` (concurrent requests per call). Writes use the +V2 endpoint by default. Set `use_v2_api=false` to use the V3 endpoint, where +`no_sync()` can acknowledge writes before the WAL is synced, trading durability +for speed. A single `write` call serialises its batches on one task. To use more CPU cores and connections, run several `write` calls concurrently. `Client` is cheap to @@ -177,7 +201,7 @@ for chunk in chunks { // each chunk is a Vec .write(chunk) .batch_size(10_000) .max_inflight(8) - .no_sync() + .no_sync() // V3 endpoint only .await }); } @@ -280,12 +304,26 @@ Set a default policy for all requests with `ClientConfig::builder().retry(...)`. ### Partial writes -When a batch contains invalid lines, the server accepts the valid ones and -reports the rest. This surfaces as `Error::PartialWrite`, which lists the -rejected lines: +Partial writes apply when writes use the V3 `/api/v3/write_lp` endpoint +(`use_v2_api=false`). When a batch contains invalid lines, the server accepts +the valid ones and reports the rest. This surfaces as `Error::PartialWrite`, +which lists the rejected lines: ```rust -use influxdb3_client::Error; +use influxdb3_client::{Client, ClientConfig, Error}; + +let client = Client::new( + ClientConfig::builder() + .host("http://localhost:8181") + .token("token") + .database("db") + .write_use_v2_api(false) + .build()?, +) +.await?; + +let line_protocol = + "home,room=Sunroom temp=96 1735545600\nhome,room=Sunroom temp=\"hi\" 1735549200"; if let Err(Error::PartialWrite(e)) = client.write(line_protocol).await { for line_error in &e.line_errors { @@ -294,6 +332,19 @@ if let Err(Error::PartialWrite(e)) = client.write(line_protocol).await { } ``` +Set `accept_partial` to `false` in `WriteOptions` to reject the full batch when +any line fails. + +### Write API compatibility + +Writes use the V2 `/api/v2/write` endpoint by default for compatibility with +InfluxDB Clustered and InfluxDB Cloud Dedicated/Serverless. + +Set `use_v2_api` to `false`, set `INFLUX_WRITE_USE_V2_API=false`, or use +`writeUseV2Api=false` in a connection string to send writes through the V3 +endpoint. The V3 endpoint supports `accept_partial` and `no_sync`; those options +are not sent when the V2 endpoint is used. + ## Polars integration With the `polars` feature, write a DataFrame directly and read query results back @@ -366,6 +417,11 @@ INFLUX_HOST=http://localhost:8181 INFLUX_TOKEN=token INFLUX_DATABASE=mydb \ cargo run --example quickstart ``` +## Feedback + +For bugs and feature requests, open an issue in +[InfluxCommunity/influxdb3-rust](https://github.com/InfluxCommunity/influxdb3-rust/issues). + ## Contributing Contributions are welcome. To build and check locally: diff --git a/src/client.rs b/src/client.rs index cabdfec..9ed3aa9 100644 --- a/src/client.rs +++ b/src/client.rs @@ -53,8 +53,9 @@ impl Client { Client::new(ClientConfig::from_connection_string(cs)?).await } - /// Read `INFLUX_HOST`, `INFLUX_TOKEN`, and `INFLUX_DATABASE` from the - /// environment and create a client. + /// Read configuration from environment variables and create a client. + /// + /// See [`ClientConfig::from_env`] for the supported variables. pub async fn from_env() -> Result { Client::new(ClientConfig::from_env()?).await } @@ -290,6 +291,7 @@ impl<'a, W: WriteInput + Send + 'a> IntoFuture for WriteRequest<'a, W> { let options = self.options; let policy = self.retry.unwrap_or_else(|| client.config.retry.clone()); Box::pin(async move { + options.validate()?; let max_inflight = options.max_inflight.max(1); let batches = data.into_lp_batches(&options); diff --git a/src/config.rs b/src/config.rs index 9ab77d8..c98b885 100644 --- a/src/config.rs +++ b/src/config.rs @@ -24,7 +24,7 @@ pub struct ClientConfig { /// Database for all operations. Required; validated at construction time. pub database: String, - /// Organization name (used for v2 API compatibility). + /// Organization name (used for V2 API compatibility). pub org: Option, /// Default write options applied to every write call. @@ -96,7 +96,7 @@ impl ClientConfig { /// - `INFLUX_GZIP_THRESHOLD` - gzip threshold in bytes. /// - `INFLUX_WRITE_NO_SYNC` - skip WAL synchronization for writes. /// - `INFLUX_WRITE_ACCEPT_PARTIAL` - accept partial writes. - /// - `INFLUX_WRITE_USE_V2_API` - use the v2 write endpoint. + /// - `INFLUX_WRITE_USE_V2_API` - use the V2 write endpoint. pub fn from_env() -> Result { let host = std::env::var("INFLUX_HOST").map_err(|_| Error::EnvVar("INFLUX_HOST".into()))?; let database = std::env::var("INFLUX_DATABASE") @@ -149,7 +149,7 @@ impl ClientConfig { /// - `gzipThreshold` - gzip threshold in bytes. /// - `writeNoSync` - skip WAL synchronization for writes. /// - `writeAcceptPartial` - accept partial writes. - /// - `writeUseV2Api` - use the v2 write endpoint. + /// - `writeUseV2Api` - use the V2 write endpoint. pub fn from_connection_string(cs: &str) -> Result { let url = Url::parse(cs)?; let mut host_url = url.clone(); @@ -294,6 +294,24 @@ impl ClientConfigBuilder { self } + /// Set whether writes use the V2 `/api/v2/write` endpoint by default. + pub fn write_use_v2_api(mut self, use_v2_api: bool) -> Self { + self.cfg.write_options.use_v2_api = use_v2_api; + self + } + + /// Set whether V3 writes can partially succeed when some lines fail. + pub fn write_accept_partial(mut self, accept_partial: bool) -> Self { + self.cfg.write_options.accept_partial = accept_partial; + self + } + + /// Set whether V3 writes skip WAL synchronization by default. + pub fn write_no_sync(mut self, no_sync: bool) -> Self { + self.cfg.write_options.no_sync = no_sync; + self + } + /// Set the default retry policy for transient write/query failures. pub fn retry(mut self, retry: RetryConfig) -> Self { self.cfg.retry = retry; @@ -351,6 +369,7 @@ impl ClientConfigBuilder { if self.cfg.database.is_empty() { return Err(Error::Config("database is required".into())); } + self.cfg.write_options.validate()?; for (key, value) in self.pending_headers { let name = HeaderName::from_bytes(key.as_bytes()) diff --git a/src/write.rs b/src/write.rs index 0270e0d..99d3c08 100644 --- a/src/write.rs +++ b/src/write.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use crate::{point::Point, precision::Precision}; +use crate::{error::Error, point::Point, precision::Precision}; /// Options controlling a single write operation. /// @@ -32,7 +32,7 @@ pub struct WriteOptions { /// When `true`, a batch is accepted even if some lines are invalid. pub accept_partial: bool, - /// When `true`, use the v2 (`/api/v2/write`) endpoint instead of v3. + /// When `true`, use the V2 (`/api/v2/write`) endpoint instead of V3. pub use_v2_api: bool, /// Optional tag ordering for deterministic line-protocol output. @@ -66,7 +66,7 @@ impl Default for WriteOptions { gzip_threshold: Some(1024), no_sync: false, accept_partial: true, - use_v2_api: false, + use_v2_api: true, tag_order: Vec::new(), batch_size: DEFAULT_BATCH_SIZE, max_inflight: DEFAULT_MAX_INFLIGHT, @@ -74,6 +74,17 @@ impl Default for WriteOptions { } } +impl WriteOptions { + pub(crate) fn validate(&self) -> Result<(), Error> { + if self.use_v2_api && self.no_sync { + return Err(Error::Config( + "invalid write options: no_sync requires use_v2_api=false".into(), + )); + } + Ok(()) + } +} + /// A type that can be lazily serialised to InfluxDB line protocol for writing. /// /// Pass anything that implements this trait to [`crate::Client::write`]. diff --git a/tests/config_tests.rs b/tests/config_tests.rs index d546e0d..ed23cd3 100644 --- a/tests/config_tests.rs +++ b/tests/config_tests.rs @@ -21,6 +21,39 @@ fn construction_and_connection_string() { .unwrap_err(); assert!(err.to_string().contains("invalid"), "got: {err}"); + // Writes use the V2 compatibility endpoint by default. + let cfg = ClientConfig::builder() + .host("http://localhost") + .database("db") + .build() + .unwrap(); + assert!(cfg.write_options.use_v2_api); + + // Builder exposes common write defaults without constructing WriteOptions. + let cfg = ClientConfig::builder() + .host("http://localhost") + .database("db") + .write_no_sync(true) + .write_accept_partial(false) + .write_use_v2_api(false) + .build() + .unwrap(); + assert!(cfg.write_options.no_sync); + assert!(!cfg.write_options.accept_partial); + assert!(!cfg.write_options.use_v2_api); + + let err = ClientConfig::builder() + .host("http://localhost") + .database("db") + .write_no_sync(true) + .build() + .unwrap_err(); + assert!( + err.to_string() + .contains("no_sync requires use_v2_api=false"), + "got: {err}" + ); + // Connection string, full form. let cfg = ClientConfig::from_connection_string( "https://cluster.example.io/?token=TOK&database=DB&org=ORG", @@ -58,7 +91,7 @@ fn construction_and_connection_string() { let cfg = ClientConfig::from_connection_string( "https://cluster.example.io/?token=TOK&database=DB&authScheme=Token\ &precision=ms&gzipThreshold=64&writeNoSync=true\ - &writeAcceptPartial=false&writeUseV2Api=true", + &writeAcceptPartial=false&writeUseV2Api=false", ) .unwrap(); assert_eq!(cfg.auth_scheme, "Token"); @@ -66,7 +99,7 @@ fn construction_and_connection_string() { assert_eq!(cfg.write_options.gzip_threshold, Some(64)); assert!(cfg.write_options.no_sync); assert!(!cfg.write_options.accept_partial); - assert!(cfg.write_options.use_v2_api); + assert!(!cfg.write_options.use_v2_api); for precision in [ "ns", @@ -88,6 +121,7 @@ fn construction_and_connection_string() { "https://h/?token=T&database=db&writeNoSync=invalid", "https://h/?token=T&database=db&writeAcceptPartial=invalid", "https://h/?token=T&database=db&writeUseV2Api=invalid", + "https://h/?token=T&database=db&writeNoSync=true&writeUseV2Api=true", ] { let err = ClientConfig::from_connection_string(cs).unwrap_err(); assert!( @@ -129,7 +163,7 @@ fn from_env() { std::env::set_var("INFLUX_GZIP_THRESHOLD", "64"); std::env::set_var("INFLUX_WRITE_NO_SYNC", "true"); std::env::set_var("INFLUX_WRITE_ACCEPT_PARTIAL", "false"); - std::env::set_var("INFLUX_WRITE_USE_V2_API", "true"); + std::env::set_var("INFLUX_WRITE_USE_V2_API", "false"); std::env::set_var("INFLUX_ORG", "env-org"); let cfg = ClientConfig::from_env().unwrap(); assert_eq!(cfg.host_url(), "https://env-host"); @@ -141,7 +175,16 @@ fn from_env() { assert_eq!(cfg.write_options.gzip_threshold, Some(64)); assert!(cfg.write_options.no_sync); assert!(!cfg.write_options.accept_partial); - assert!(cfg.write_options.use_v2_api); + assert!(!cfg.write_options.use_v2_api); + + std::env::set_var("INFLUX_WRITE_USE_V2_API", "true"); + let err = ClientConfig::from_env().unwrap_err(); + assert!( + err.to_string() + .contains("no_sync requires use_v2_api=false"), + "got: {err}" + ); + std::env::set_var("INFLUX_WRITE_USE_V2_API", "false"); for (name, value) in [ ("INFLUX_PRECISION", "invalid"), diff --git a/tests/retry_tests.rs b/tests/retry_tests.rs index 6d52727..af6198b 100644 --- a/tests/retry_tests.rs +++ b/tests/retry_tests.rs @@ -10,6 +10,7 @@ async fn make_client(server: &Server) -> Client { .host(server.url()) .database("testdb") .token("test-token") + .write_use_v2_api(false) .build() .unwrap(), ) diff --git a/tests/write_tests.rs b/tests/write_tests.rs index e25ffb2..688a8b2 100644 --- a/tests/write_tests.rs +++ b/tests/write_tests.rs @@ -8,6 +8,7 @@ async fn make_client(server: &Server) -> Client { .host(server.url()) .database("testdb") .token("test-token") + .write_use_v2_api(false) .build() .unwrap(), ) @@ -17,7 +18,7 @@ async fn make_client(server: &Server) -> Client { #[tokio::test] async fn lp_string_with_overrides() { - // Covers: v3 endpoint, db param, auth header, content-type, + // Covers: V3 endpoint, db param, auth header, content-type, // precision + no_sync overrides reaching the URL. let mut server = Server::new_async().await; let _m = server @@ -58,11 +59,42 @@ async fn v2_write_uses_bucket_query_parameter() { .create_async() .await; - let client = make_client(&server).await; - client.write("cpu usage=1.0").use_v2_api().await.unwrap(); + let client = Client::new( + ClientConfig::builder() + .host(server.url()) + .database("testdb") + .token("test-token") + .build() + .unwrap(), + ) + .await + .unwrap(); + client.write("cpu usage=1.0").await.unwrap(); m.assert_async().await; } +#[tokio::test] +async fn no_sync_requires_v3_endpoint() { + let server = Server::new_async().await; + let client = Client::new( + ClientConfig::builder() + .host(server.url()) + .database("testdb") + .token("test-token") + .build() + .unwrap(), + ) + .await + .unwrap(); + + let err = client.write("cpu usage=1.0").no_sync().await.unwrap_err(); + assert!( + err.to_string() + .contains("no_sync requires use_v2_api=false"), + "got: {err}" + ); +} + #[tokio::test] async fn points_batch_splitting() { // 5 points at batch_size=2 means 3 sequential requests.