Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
Add auth scheme and write option support.
Preserve explicit ports and strip userinfo from normalized hosts.
Remove the legacy `bucket` and `INFLUX_BUCKET` aliases in favor of `database` and `INFLUX_DATABASE`.
2. [#19](https://github.com/InfluxCommunity/influxdb3-rust/pull/19): Default writes to the V2 API endpoint. Add builder methods for write defaults.
`no_sync` requires `use_v2_api=false` to write to the V3 API endpoint.
`accept_partial` applies only when writes are sent to the V3 API endpoint and is ignored otherwise.

### Bug Fixes

Expand Down
76 changes: 66 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ Or add it to `Cargo.toml`:

```toml
[dependencies]
influxdb3-client = "0.1"
influxdb3-client = "0.2"
tokio = { version = "1", features = ["full"] }
```

The optional `polars` feature adds DataFrame writes and query-to-DataFrame
conversion:

```toml
influxdb3-client = { version = "0.1", features = ["polars"] }
influxdb3-client = { version = "0.2", features = ["polars"] }
```

## Configuring a client
Expand Down Expand Up @@ -93,6 +93,18 @@ environment:
let client = influxdb3_client::Client::from_env().await?;
```

Optional environment variables configure the same write defaults used by the
builder:

- `INFLUX_AUTH_SCHEME`: authentication scheme, such as `Bearer` or `Token`.
- `INFLUX_ORG`: organization name for V2 write compatibility.
- `INFLUX_PRECISION`: write precision (`ns`, `us`, `ms`, or `s`).
- `INFLUX_GZIP_THRESHOLD`: gzip write bodies larger than this many bytes.
- `INFLUX_WRITE_NO_SYNC`: skip WAL synchronization on V3 writes; requires
`INFLUX_WRITE_USE_V2_API=false`.
- `INFLUX_WRITE_ACCEPT_PARTIAL`: allow partial success on V3 writes.
- `INFLUX_WRITE_USE_V2_API`: use the V2 write endpoint.

Or parse a connection string:

```rust
Expand All @@ -101,6 +113,10 @@ let client = influxdb3_client::Client::from_connection_string(
).await?;
```

Connection strings support the matching query parameters: `token`, `database`,
`org`, `authScheme`, `precision`, `gzipThreshold`, `writeNoSync`,
`writeAcceptPartial`, and `writeUseV2Api`.

The Arrow Flight channel used for queries is opened lazily on the first query,
so constructing a client never blocks on query connectivity.

Expand Down Expand Up @@ -142,18 +158,26 @@ client.write(points)
.batch_size(10_000) // points per HTTP request
.max_inflight(8) // concurrent in-flight requests
.default_tag("region", "us-east")
.no_sync() // acknowledge before WAL sync
.tag_order(["region", "host"])
.await?;
```

Large inputs are split into batches and sent as multiple pipelined requests; one
batch buffer is held in memory at a time.

The first write defines physical tag column order, which can affect query
performance. Use `.tag_order(...)` to serialize frequently filtered tags first.
Listed tags are emitted first when present; remaining tags are appended in
deterministic lexicographic order. For background, see
[Sort tags by query priority](https://docs.influxdata.com/influxdb3/core/write-data/best-practices/optimize-writes/#sort-tags-by-query-priority).

### High-throughput ingest

For sustained, high-volume writes the throughput levers are `batch_size` (points
per request), `max_inflight` (concurrent requests per call), and `no_sync()`
(acknowledge before the WAL is synced, trading durability for speed).
per request) and `max_inflight` (concurrent requests per call). Writes use the
V2 endpoint by default. Set `use_v2_api=false` to use the V3 endpoint, where
`no_sync()` can acknowledge writes before the WAL is synced, trading durability
for speed.

A single `write` call serialises its batches on one task. To use more CPU cores
and connections, run several `write` calls concurrently. `Client` is cheap to
Expand All @@ -177,7 +201,7 @@ for chunk in chunks { // each chunk is a Vec<Point>
.write(chunk)
.batch_size(10_000)
.max_inflight(8)
.no_sync()
.no_sync() // V3 endpoint only
.await
});
}
Expand Down Expand Up @@ -280,12 +304,26 @@ Set a default policy for all requests with `ClientConfig::builder().retry(...)`.

### Partial writes

When a batch contains invalid lines, the server accepts the valid ones and
reports the rest. This surfaces as `Error::PartialWrite`, which lists the
rejected lines:
Partial writes apply when writes use the V3 `/api/v3/write_lp` endpoint
(`use_v2_api=false`). When a batch contains invalid lines, the server accepts
the valid ones and reports the rest. This surfaces as `Error::PartialWrite`,
which lists the rejected lines:

```rust
use influxdb3_client::Error;
use influxdb3_client::{Client, ClientConfig, Error};

let client = Client::new(
ClientConfig::builder()
.host("http://localhost:8181")
.token("token")
.database("db")
.write_use_v2_api(false)
.build()?,
)
.await?;

let line_protocol =
"home,room=Sunroom temp=96 1735545600\nhome,room=Sunroom temp=\"hi\" 1735549200";

if let Err(Error::PartialWrite(e)) = client.write(line_protocol).await {
for line_error in &e.line_errors {
Expand All @@ -294,6 +332,19 @@ if let Err(Error::PartialWrite(e)) = client.write(line_protocol).await {
}
```

Set `accept_partial` to `false` in `WriteOptions` to reject the full batch when
any line fails.

### Write API compatibility

Writes use the V2 `/api/v2/write` endpoint by default for compatibility with
InfluxDB Clustered and InfluxDB Cloud Dedicated/Serverless.

Set `use_v2_api` to `false`, set `INFLUX_WRITE_USE_V2_API=false`, or use
`writeUseV2Api=false` in a connection string to send writes through the V3
endpoint. The V3 endpoint supports `accept_partial` and `no_sync`; those options
are not sent when the V2 endpoint is used.

## Polars integration

With the `polars` feature, write a DataFrame directly and read query results back
Expand Down Expand Up @@ -366,6 +417,11 @@ INFLUX_HOST=http://localhost:8181 INFLUX_TOKEN=token INFLUX_DATABASE=mydb \
cargo run --example quickstart
```

## Feedback

For bugs and feature requests, open an issue in
[InfluxCommunity/influxdb3-rust](https://github.com/InfluxCommunity/influxdb3-rust/issues).

## Contributing

Contributions are welcome. To build and check locally:
Expand Down
6 changes: 4 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ impl Client {
Client::new(ClientConfig::from_connection_string(cs)?).await
}

/// Read `INFLUX_HOST`, `INFLUX_TOKEN`, and `INFLUX_DATABASE` from the
/// environment and create a client.
/// Read configuration from environment variables and create a client.
///
/// See [`ClientConfig::from_env`] for the supported variables.
pub async fn from_env() -> Result<Self> {
Client::new(ClientConfig::from_env()?).await
}
Expand Down Expand Up @@ -290,6 +291,7 @@ impl<'a, W: WriteInput + Send + 'a> IntoFuture for WriteRequest<'a, W> {
let options = self.options;
let policy = self.retry.unwrap_or_else(|| client.config.retry.clone());
Box::pin(async move {
options.validate()?;
let max_inflight = options.max_inflight.max(1);
let batches = data.into_lp_batches(&options);

Expand Down
25 changes: 22 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct ClientConfig {
/// Database for all operations. Required; validated at construction time.
pub database: String,

/// Organization name (used for v2 API compatibility).
/// Organization name (used for V2 API compatibility).
pub org: Option<String>,

/// Default write options applied to every write call.
Expand Down Expand Up @@ -96,7 +96,7 @@ impl ClientConfig {
/// - `INFLUX_GZIP_THRESHOLD` - gzip threshold in bytes.
/// - `INFLUX_WRITE_NO_SYNC` - skip WAL synchronization for writes.
/// - `INFLUX_WRITE_ACCEPT_PARTIAL` - accept partial writes.
/// - `INFLUX_WRITE_USE_V2_API` - use the v2 write endpoint.
/// - `INFLUX_WRITE_USE_V2_API` - use the V2 write endpoint.
pub fn from_env() -> Result<Self, Error> {
let host = std::env::var("INFLUX_HOST").map_err(|_| Error::EnvVar("INFLUX_HOST".into()))?;
let database = std::env::var("INFLUX_DATABASE")
Expand Down Expand Up @@ -149,7 +149,7 @@ impl ClientConfig {
/// - `gzipThreshold` - gzip threshold in bytes.
/// - `writeNoSync` - skip WAL synchronization for writes.
/// - `writeAcceptPartial` - accept partial writes.
/// - `writeUseV2Api` - use the v2 write endpoint.
/// - `writeUseV2Api` - use the V2 write endpoint.
pub fn from_connection_string(cs: &str) -> Result<Self, Error> {
let url = Url::parse(cs)?;
let mut host_url = url.clone();
Expand Down Expand Up @@ -294,6 +294,24 @@ impl ClientConfigBuilder {
self
}

/// Set whether writes use the V2 `/api/v2/write` endpoint by default.
pub fn write_use_v2_api(mut self, use_v2_api: bool) -> Self {
self.cfg.write_options.use_v2_api = use_v2_api;
self
}

/// Set whether V3 writes can partially succeed when some lines fail.
pub fn write_accept_partial(mut self, accept_partial: bool) -> Self {
self.cfg.write_options.accept_partial = accept_partial;
self
}

/// Set whether V3 writes skip WAL synchronization by default.
pub fn write_no_sync(mut self, no_sync: bool) -> Self {
self.cfg.write_options.no_sync = no_sync;
self
}

/// Set the default retry policy for transient write/query failures.
pub fn retry(mut self, retry: RetryConfig) -> Self {
self.cfg.retry = retry;
Expand Down Expand Up @@ -351,6 +369,7 @@ impl ClientConfigBuilder {
if self.cfg.database.is_empty() {
return Err(Error::Config("database is required".into()));
}
self.cfg.write_options.validate()?;

for (key, value) in self.pending_headers {
let name = HeaderName::from_bytes(key.as_bytes())
Expand Down
17 changes: 14 additions & 3 deletions src/write.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;

use crate::{point::Point, precision::Precision};
use crate::{error::Error, point::Point, precision::Precision};

/// Options controlling a single write operation.
///
Expand Down Expand Up @@ -32,7 +32,7 @@ pub struct WriteOptions {
/// When `true`, a batch is accepted even if some lines are invalid.
pub accept_partial: bool,

/// When `true`, use the v2 (`/api/v2/write`) endpoint instead of v3.
/// When `true`, use the V2 (`/api/v2/write`) endpoint instead of V3.
pub use_v2_api: bool,

/// Optional tag ordering for deterministic line-protocol output.
Expand Down Expand Up @@ -66,14 +66,25 @@ impl Default for WriteOptions {
gzip_threshold: Some(1024),
no_sync: false,
accept_partial: true,
use_v2_api: false,
use_v2_api: true,
tag_order: Vec::new(),
batch_size: DEFAULT_BATCH_SIZE,
max_inflight: DEFAULT_MAX_INFLIGHT,
}
}
}

impl WriteOptions {
pub(crate) fn validate(&self) -> Result<(), Error> {
if self.use_v2_api && self.no_sync {
return Err(Error::Config(
"invalid write options: no_sync requires use_v2_api=false".into(),
));
}
Ok(())
}
}

/// A type that can be lazily serialised to InfluxDB line protocol for writing.
///
/// Pass anything that implements this trait to [`crate::Client::write`].
Expand Down
51 changes: 47 additions & 4 deletions tests/config_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,39 @@ fn construction_and_connection_string() {
.unwrap_err();
assert!(err.to_string().contains("invalid"), "got: {err}");

// Writes use the V2 compatibility endpoint by default.
let cfg = ClientConfig::builder()
.host("http://localhost")
.database("db")
.build()
.unwrap();
assert!(cfg.write_options.use_v2_api);

// Builder exposes common write defaults without constructing WriteOptions.
let cfg = ClientConfig::builder()
.host("http://localhost")
.database("db")
.write_no_sync(true)
.write_accept_partial(false)
.write_use_v2_api(false)
.build()
.unwrap();
assert!(cfg.write_options.no_sync);
assert!(!cfg.write_options.accept_partial);
assert!(!cfg.write_options.use_v2_api);

let err = ClientConfig::builder()
.host("http://localhost")
.database("db")
.write_no_sync(true)
.build()
.unwrap_err();
assert!(
err.to_string()
.contains("no_sync requires use_v2_api=false"),
"got: {err}"
);

// Connection string, full form.
let cfg = ClientConfig::from_connection_string(
"https://cluster.example.io/?token=TOK&database=DB&org=ORG",
Expand Down Expand Up @@ -58,15 +91,15 @@ fn construction_and_connection_string() {
let cfg = ClientConfig::from_connection_string(
"https://cluster.example.io/?token=TOK&database=DB&authScheme=Token\
&precision=ms&gzipThreshold=64&writeNoSync=true\
&writeAcceptPartial=false&writeUseV2Api=true",
&writeAcceptPartial=false&writeUseV2Api=false",
)
.unwrap();
assert_eq!(cfg.auth_scheme, "Token");
assert_eq!(cfg.write_options.precision, Precision::Millisecond);
assert_eq!(cfg.write_options.gzip_threshold, Some(64));
assert!(cfg.write_options.no_sync);
assert!(!cfg.write_options.accept_partial);
assert!(cfg.write_options.use_v2_api);
assert!(!cfg.write_options.use_v2_api);

for precision in [
"ns",
Expand All @@ -88,6 +121,7 @@ fn construction_and_connection_string() {
"https://h/?token=T&database=db&writeNoSync=invalid",
"https://h/?token=T&database=db&writeAcceptPartial=invalid",
"https://h/?token=T&database=db&writeUseV2Api=invalid",
"https://h/?token=T&database=db&writeNoSync=true&writeUseV2Api=true",
] {
let err = ClientConfig::from_connection_string(cs).unwrap_err();
assert!(
Expand Down Expand Up @@ -129,7 +163,7 @@ fn from_env() {
std::env::set_var("INFLUX_GZIP_THRESHOLD", "64");
std::env::set_var("INFLUX_WRITE_NO_SYNC", "true");
std::env::set_var("INFLUX_WRITE_ACCEPT_PARTIAL", "false");
std::env::set_var("INFLUX_WRITE_USE_V2_API", "true");
std::env::set_var("INFLUX_WRITE_USE_V2_API", "false");
std::env::set_var("INFLUX_ORG", "env-org");
let cfg = ClientConfig::from_env().unwrap();
assert_eq!(cfg.host_url(), "https://env-host");
Expand All @@ -141,7 +175,16 @@ fn from_env() {
assert_eq!(cfg.write_options.gzip_threshold, Some(64));
assert!(cfg.write_options.no_sync);
assert!(!cfg.write_options.accept_partial);
assert!(cfg.write_options.use_v2_api);
assert!(!cfg.write_options.use_v2_api);

std::env::set_var("INFLUX_WRITE_USE_V2_API", "true");
let err = ClientConfig::from_env().unwrap_err();
assert!(
err.to_string()
.contains("no_sync requires use_v2_api=false"),
"got: {err}"
);
std::env::set_var("INFLUX_WRITE_USE_V2_API", "false");

for (name, value) in [
("INFLUX_PRECISION", "invalid"),
Expand Down
1 change: 1 addition & 0 deletions tests/retry_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ async fn make_client(server: &Server) -> Client {
.host(server.url())
.database("testdb")
.token("test-token")
.write_use_v2_api(false)
.build()
.unwrap(),
)
Expand Down
Loading
Loading