Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ slog-stdlog = "4.1.0"
num-bigint = "0.4.6"
bigdecimal = "0.4.8"
bitvec = "1.0.1"
config = "0.13.4"
config = "0.15.9"
tempfile = "3.21.0"
md-5 = "0.10.6"
bitflags = "2.10.0"
Expand Down
81 changes: 42 additions & 39 deletions curvine-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ use curvine_client::unified::UnifiedFileSystem;
use curvine_common::conf::ClusterConf;
use curvine_common::version;
use orpc::common::{Logger, Utils};
use orpc::io::net::InetAddr;
use orpc::runtime::RpcRuntime;
use orpc::{err_box, CommonResult};
use orpc::CommonResult;
use serde::Serialize;
use serde_json::json;
use std::sync::Arc;

#[derive(Parser, Debug)]
#[derive(Parser, Debug, Serialize)]
#[command(author, version = version::VERSION, about, long_about = None)]
pub struct CurvineArgs {
/// Configuration file path (optional)
Expand All @@ -48,58 +49,60 @@ pub struct CurvineArgs {
)]
pub master_addrs: Option<String>,

#[serde(skip_serializing)]
#[command(subcommand)]
command: Commands,
}

impl CurvineArgs {
/// Get cluster configuration with priority: CLI args > config file > env vars > defaults
pub fn get_conf(&self) -> CommonResult<ClusterConf> {
// Priority 1: Try to load from config file (CLI arg or env var)
let conf_path = self
.conf
.clone()
.or_else(|| std::env::var(ClusterConf::ENV_CONF_FILE).ok());

let mut conf = if let Some(path) = conf_path {
match ClusterConf::from(&path) {
Ok(c) => c,
Err(e) => {
eprintln!("Warning: Failed to load config file '{}': {}", path, e);
eprintln!("Using default configuration");
Self::create_default_conf()
}
}
let args_value = serde_json::to_value(self).unwrap();

// Change the format of master_addrs from "m1:8995,..." to [{"hostname": "m1", "port": 8995}, ...]
let master_addrs = if let Some(master_addrs_str) =
args_value.get("master_addrs").and_then(|v| v.as_str())
{
master_addrs_str
.split(',')
.filter(|s| !s.trim().is_empty())
.map(|addr| {
let addr = addr.trim();
let parts: Vec<&str> = addr.split(':').collect();

if parts.len() == 2 {
let hostname = parts[0];
let port = parts[1]
.parse::<u16>()
.map_err(|_| format!("Invalid port: {}", parts[1]))?;

Ok(json!({
"hostname": hostname,
"port": port
}))
} else {
Err(format!("Invalid address format: {}", addr))
}
})
.collect::<Result<Vec<_>, _>>()
.unwrap()
} else {
println!("No config file specified, using default configuration");
Self::create_default_conf()
Vec::new()
};

// Priority 2: Override with CLI master_addrs if provided
if let Some(master_addrs) = &self.master_addrs {
let mut vec = vec![];
for node in master_addrs.split(',') {
let tmp: Vec<&str> = node.split(':').collect();
if tmp.len() != 2 {
return err_box!("Invalid master_addrs format: '{}'. Expected format: 'host1:port1,host2:port2'", master_addrs);
}
let hostname = tmp[0].to_string();
let port: u16 = tmp[1]
.parse()
.map_err(|_| format!("Invalid port number in master_addrs: '{}'", tmp[1]))?;
vec.push(InetAddr::new(hostname, port));
let args_json = json!({
"cli": args_value,
"client": {
"master_addrs": master_addrs
}
conf.client.master_addrs = vec;
}

// Initialize configuration (parse string values to actual types)
conf.client.init()?;

Ok(conf)
}
})
.to_string();

fn create_default_conf() -> ClusterConf {
ClusterConf::default()
ClusterConf::from(conf_path.unwrap(), Some(&args_json))
}
}

Expand Down
1 change: 1 addition & 0 deletions curvine-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ slog = { workspace = true }
slog-stdlog = { workspace = true }
chrono = { workspace = true }
trait-variant = { workspace = true }
config = { workspace = true }

anyhow = { workspace = true }
tracing = { workspace = true }
Expand Down
8 changes: 7 additions & 1 deletion curvine-common/src/conf/cli_conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use orpc::common::LogConf;
use orpc::{common::LogConf, CommonResult};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -21,6 +21,12 @@ pub struct CliConf {
pub log: LogConf,
}

impl CliConf {
pub fn init(&mut self) -> CommonResult<()> {
Ok(())
}
}

impl Default for CliConf {
fn default() -> Self {
CliConf {
Expand Down
100 changes: 71 additions & 29 deletions curvine-common/src/conf/cluster_conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@ use crate::conf::CliConf;
use crate::conf::{ClientConf, FuseConf, JobConf, JournalConf, MasterConf, WorkerConf};
use crate::rocksdb::DBConf;
use crate::version;
use config::{Config, Environment, File, FileFormat};
use log::info;
use orpc::client::{ClientConf as RpcConf, ClientFactory, SyncClient};
use orpc::common::{LogConf, Utils};
use orpc::io::net::{InetAddr, NodeAddr};
use orpc::io::retry::TimeBondedRetryBuilder;
use orpc::server::ServerConf;
use orpc::{err_box, try_err, CommonResult};
use orpc::{err_box, CommonResult};
use serde::{Deserialize, Serialize};
use std::env;
use std::fmt::{Display, Formatter};
use std::fs::read_to_string;
use std::time::Duration;

// Cluster configuration files.
Expand Down Expand Up @@ -75,38 +74,71 @@ impl ClusterConf {
pub const ENV_CLIENT_HOSTNAME: &'static str = "CURVINE_CLIENT_HOSTNAME";
pub const ENV_CONF_FILE: &'static str = "CURVINE_CONF_FILE";

pub fn from<T: AsRef<str>>(path: T) -> CommonResult<Self> {
let str = try_err!(read_to_string(path.as_ref()));
let mut conf = try_err!(toml::from_str::<Self>(&str));

if let Ok(v) = env::var(Self::ENV_MASTER_HOSTNAME) {
conf.master.hostname = v.to_owned();
conf.journal.hostname = v;
pub fn from<T: AsRef<str>>(path: T, args: Option<&str>) -> CommonResult<Self> {
let cli_source = args.unwrap_or("{}").to_string();

// Get cluster configuration with priority: CLI args > env vars > config file > defaults
let config = Config::builder()
.add_source(File::new(path.as_ref(), FileFormat::Toml))
.add_source(Environment::with_prefix("CURVINE").separator("_"))
.add_source(File::from_str(&cli_source, FileFormat::Json));

let mut conf;
match config.build() {
Ok(config) => {
conf = Self {
format_master: true,
format_worker: true,
testing: false,
cluster_id: "curvine".to_string(),
master: config.get::<MasterConf>("master").unwrap_or_default(),
journal: config.get::<JournalConf>("journal").unwrap_or_default(),
worker: config.get::<WorkerConf>("worker").unwrap_or_default(),
log: config.get::<LogConf>("log").unwrap_or_default(),
client: config.get::<ClientConf>("client").unwrap_or_default(),
fuse: config.get::<FuseConf>("fuse").unwrap_or_default(),
s3_gateway: config
.get::<S3GatewayConf>("s3_gateway")
.unwrap_or_default(),
job: config.get::<JobConf>("job").unwrap_or_default(),
cli: config.get::<CliConf>("cli").unwrap_or_default(),
};
}
Err(e) => {
eprintln!(
"Warning: Failed to load config file '{}': {}",
path.as_ref(),
e
);
eprintln!("Using default configuration");
conf = ClusterConf::default();
}
}

// Apply worker hostname from environment variable (used by worker process)
if let Ok(v) = env::var(Self::ENV_WORKER_HOSTNAME) {
conf.worker.hostname = v;
}
conf.init()?;

// Apply client hostname from environment variable
if let Ok(v) = env::var(Self::ENV_CLIENT_HOSTNAME) {
conf.client.hostname = v;
}
Ok(conf)
}

conf.master.init()?;
conf.client.init()?;
conf.fuse.init()?;
conf.job.init()?;
pub fn init(&mut self) -> CommonResult<()> {
self.master.init()?;
self.journal.init()?;
self.worker.init()?;
self.log.init()?;
self.client.init()?;
self.fuse.init()?;
self.s3_gateway.init()?;
self.job.init()?;
self.cli.init()?;

if conf.client.master_addrs.is_empty() {
for peer in &mut conf.journal.journal_addrs {
let node = InetAddr::new(&peer.hostname, conf.master.rpc_port);
conf.client.master_addrs.push(node);
if self.client.master_addrs.is_empty() {
for peer in &mut self.journal.journal_addrs {
let node = InetAddr::new(&peer.hostname, self.master.rpc_port);
self.client.master_addrs.push(node);
}
}

Ok(conf)
Ok(())
}

pub fn check_master_hostname(&mut self) -> CommonResult<()> {
Expand Down Expand Up @@ -268,7 +300,7 @@ impl ClusterConf {

impl Default for ClusterConf {
fn default() -> Self {
Self {
let mut conf = Self {
format_master: true,
format_worker: true,
testing: false,
Expand All @@ -282,7 +314,11 @@ impl Default for ClusterConf {
s3_gateway: Default::default(),
job: Default::default(),
cli: Default::default(),
}
};

conf.init().unwrap();

conf
}
}

Expand Down Expand Up @@ -310,6 +346,12 @@ pub struct S3GatewayConf {
pub web_port: u16,
}

impl S3GatewayConf {
pub fn init(&mut self) -> CommonResult<()> {
Ok(())
}
}

impl Default for S3GatewayConf {
fn default() -> Self {
Self {
Expand Down
5 changes: 5 additions & 0 deletions curvine-common/src/conf/journal_conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use orpc::client::ClientConf;
use orpc::common::Utils;
use orpc::io::net::{InetAddr, NetUtils};
use orpc::runtime::Runtime;
use orpc::CommonResult;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::vec;
Expand Down Expand Up @@ -108,6 +109,10 @@ pub struct JournalConf {
impl JournalConf {
pub const DEFAULT_NODE_ID: u64 = 0;

pub fn init(&mut self) -> CommonResult<()> {
Ok(())
}

// Create a test configuration, which will also randomly select a server port.
pub fn with_test() -> Self {
let mut conf = Self::default();
Expand Down
2 changes: 1 addition & 1 deletion curvine-common/src/conf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ mod tests {
#[test]
fn cluster() {
let path = "../etc/curvine-cluster.toml";
let conf = ClusterConf::from(path).unwrap();
let conf = ClusterConf::from(path, None).unwrap();
println!("conf = {:#?}", conf)
}

Expand Down
4 changes: 4 additions & 0 deletions curvine-common/src/conf/worker_conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ pub struct WorkerConf {
}

impl WorkerConf {
pub fn init(&mut self) -> CommonResult<()> {
Ok(())
}

pub fn io_slow_us(&self) -> u64 {
let dur = DurationUnit::from_str(&self.io_slow_threshold).unwrap();
dur.as_millis() * 1000
Expand Down
3 changes: 3 additions & 0 deletions curvine-fuse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ mini-moka = { workspace = true }
fxhash = { workspace = true }
dashmap = { workspace = true }
bitflags = { workspace = true }
serde = { workspace = true }
serde_with = { workspace = true }
serde_json = { workspace = true }
axum = { workspace = true }
tracing = { workspace = true }
tracing-appender = { workspace = true }
Expand Down
Loading