diff --git a/.github/workflows/codecov.yml b/.github/workflows/codecov.yml index 9a430b3..07efe04 100644 --- a/.github/workflows/codecov.yml +++ b/.github/workflows/codecov.yml @@ -1,3 +1,86 @@ +#name: Codecov Rust +# +#on: +# push: +# branches: ["main"] +# pull_request: +# branches: ["main"] +# +#env: +# CARGO_TERM_COLOR: always +# +#jobs: +# coverage: +# runs-on: ubuntu-latest +# steps: +# - uses: actions/checkout@v4 +# +# - name: Install Rust toolchain +# uses: dtolnay/rust-toolchain@stable +# with: +# components: llvm-tools-preview +# +# - name: Build test image (with grcov included) +# run: docker compose -f docker-compose.test.yml build agent-test +# +# - name: Run tests in container +# env: +# CARGO_TARGET_DIR: /app/target +# CARGO_INCREMENTAL: 0 +# RUSTFLAGS: "-C instrument-coverage -C link-dead-code" +# LLVM_PROFILE_FILE: "/app/coverage/cargo-test-%p-%m.profraw" +# run: | +# docker compose -f docker-compose.test.yml run \ +# -e CARGO_TARGET_DIR \ +# -e CARGO_INCREMENTAL \ +# -e RUSTFLAGS \ +# -e LLVM_PROFILE_FILE \ +# agent-test bash -c "cargo clean && cargo test --verbose && sync" +# +# - name: Verify profraw files exist +# run: | +# docker compose -f docker-compose.test.yml run agent-test \ +# find /app/coverage -type f -name "*.profraw" | wc -l || true +# +# - name: Generate coverage report inside container +# run: | +# docker compose -f docker-compose.test.yml run agent-test bash -c " +# rustup component add llvm-tools && +# grcov /app/coverage \ +# --binary-path /app/target/debug \ +# -s /app \ +# --llvm \ +# -t lcov \ +# --branch \ +# --ignore-not-existing \ +# --ignore '/app/target/*' \ +# --ignore '/*' \ +# -o /app/lcov.info +# " +# +# - name: Copy lcov.info from container to host +# run: | +# docker compose -f docker-compose.test.yml cp agent-test:/app/lcov.info ./lcov.info +# +# - name: Remove container +# run: | +# docker rm agent-test-run +# +# - name: Show basic coverage report info (debug) +# run: | +# echo "lcov.info size:" $(wc -c ./lcov.info | awk '{print $1}') +# head -n 30 ./lcov.info || true +# +# - name: Upload coverage to Codecov +# uses: codecov/codecov-action@v5 +# with: +# files: ./lcov.info +# flags: unittests +# name: rust-unit-coverage +# verbose: true +# fail_ci_if_error: true +# env: +# CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} name: Codecov Rust on: @@ -8,51 +91,82 @@ on: env: CARGO_TERM_COLOR: always + CARGO_TARGET_DIR: /app/target + CARGO_INCREMENTAL: 0 + RUSTFLAGS: "-C instrument-coverage -C link-dead-code" + LLVM_PROFILE_FILE: "/app/coverage/cargo-test-%p-%m.profraw" jobs: - build: + coverage: runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - - uses: actions-rs/toolchain@v1 + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable with: - toolchain: stable - override: true components: llvm-tools-preview - - name: Install grcov - run: cargo install grcov + - name: Build test image (with grcov included) + run: docker compose -f docker-compose.test.yml build agent-test - - name: Install test requirements - run: bash scripts/tests/requirements.sh + - name: Start agent-test container + run: docker compose -f docker-compose.test.yml up -d agent-test - - name: Build - run: cargo build --verbose + - name: Run tests inside container + run: | + docker compose -f docker-compose.test.yml exec \ + -e CARGO_TARGET_DIR \ + -e CARGO_INCREMENTAL \ + -e RUSTFLAGS \ + -e LLVM_PROFILE_FILE \ + agent-test bash -c " + cargo clean && + cargo test --verbose && + sync + " - - name: Run tests - env: - CARGO_INCREMENTAL: 0 - RUSTFLAGS: "-C instrument-coverage" - LLVM_PROFILE_FILE: "cargo-test-%p-%m.profraw" - run: cargo test --verbose + - name: Verify profraw files exist + run: | + docker compose -f docker-compose.test.yml exec \ + -e LLVM_PROFILE_FILE \ + agent-test find /app/coverage -type f -name "*.profraw" | wc -l || true - - name: Generate coverage + - name: Generate coverage report inside container run: | - grcov . \ - --binary-path ./target/debug/ \ - -s . \ - -t lcov \ - --branch \ - --ignore-not-existing \ - -o lcov.info - - - name: Upload to Codecov + docker compose -f docker-compose.test.yml exec \ + agent-test bash -c " + rustup component add llvm-tools && + grcov /app/coverage \ + --binary-path /app/target/debug \ + -s /app \ + --llvm \ + -t lcov \ + --branch \ + --ignore-not-existing \ + --ignore '/app/target/*' \ + --ignore '/*' \ + -o /app/lcov.info + " + + - name: Copy lcov.info from container to host + run: docker compose -f docker-compose.test.yml cp agent-test:/app/lcov.info ./lcov.info + + - name: Show basic coverage report info (debug) + run: | + echo "lcov.info size:" $(wc -c ./lcov.info | awk '{print $1}') + head -n 30 ./lcov.info || true + + - name: Upload coverage to Codecov uses: codecov/codecov-action@v5 with: - files: lcov.info + files: ./lcov.info + flags: unittests + name: rust-unit-coverage verbose: true fail_ci_if_error: true env: - CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} \ No newline at end of file + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + + - name: Stop and remove container + run: docker compose -f docker-compose.test.yml down \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 3c32d59..26c3908 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,7 +49,7 @@ tokio-stream = "0.1.18" aes = "0.9.0-rc.4" typenum = "1.19.0" testcontainers = "0.27.1" -testcontainers-modules = { version = "0.15.0", features = ["postgres", "redis", "valkey"] } +testcontainers-modules = { version = "0.15.0", features = ["postgres", "redis", "valkey", "mysql", "mariadb", "mongo"] } postgres = "0.19.12" url = "2.5.8" @@ -57,7 +57,7 @@ url = "2.5.8" tokio = { version = "1", features = ["full"] } mockall = "0.13" testcontainers = "0.27.1" -testcontainers-modules = { version = "0.15.0", features = ["postgres", "redis"] } +testcontainers-modules = { version = "0.15.0", features = ["postgres", "redis", "mysql", "mariadb", "mongo"] } wiremock = "0.6" [profile.release] diff --git a/assets/tools/amd64/mongodb/bin/bsondump b/assets/tools/amd64/mongodb/bin/bsondump old mode 100644 new mode 100755 diff --git a/assets/tools/amd64/mongodb/bin/mongodump b/assets/tools/amd64/mongodb/bin/mongodump old mode 100644 new mode 100755 diff --git a/assets/tools/amd64/mongodb/bin/mongoexport b/assets/tools/amd64/mongodb/bin/mongoexport old mode 100644 new mode 100755 diff --git a/assets/tools/amd64/mongodb/bin/mongofiles b/assets/tools/amd64/mongodb/bin/mongofiles old mode 100644 new mode 100755 diff --git a/assets/tools/amd64/mongodb/bin/mongoimport b/assets/tools/amd64/mongodb/bin/mongoimport old mode 100644 new mode 100755 diff --git a/assets/tools/amd64/mongodb/bin/mongorestore b/assets/tools/amd64/mongodb/bin/mongorestore old mode 100644 new mode 100755 diff --git a/assets/tools/amd64/mongodb/bin/mongostat b/assets/tools/amd64/mongodb/bin/mongostat old mode 100644 new mode 100755 diff --git a/assets/tools/amd64/mongodb/bin/mongotop b/assets/tools/amd64/mongodb/bin/mongotop old mode 100644 new mode 100755 diff --git a/assets/tools/arm64/mongodb/bin/bsondump b/assets/tools/arm64/mongodb/bin/bsondump old mode 100644 new mode 100755 diff --git a/assets/tools/arm64/mongodb/bin/mongodump b/assets/tools/arm64/mongodb/bin/mongodump old mode 100644 new mode 100755 diff --git a/assets/tools/arm64/mongodb/bin/mongoexport b/assets/tools/arm64/mongodb/bin/mongoexport old mode 100644 new mode 100755 diff --git a/assets/tools/arm64/mongodb/bin/mongofiles b/assets/tools/arm64/mongodb/bin/mongofiles old mode 100644 new mode 100755 diff --git a/assets/tools/arm64/mongodb/bin/mongoimport b/assets/tools/arm64/mongodb/bin/mongoimport old mode 100644 new mode 100755 diff --git a/assets/tools/arm64/mongodb/bin/mongorestore b/assets/tools/arm64/mongodb/bin/mongorestore old mode 100644 new mode 100755 diff --git a/assets/tools/arm64/mongodb/bin/mongostat b/assets/tools/arm64/mongodb/bin/mongostat old mode 100644 new mode 100755 diff --git a/assets/tools/arm64/mongodb/bin/mongotop b/assets/tools/arm64/mongodb/bin/mongotop old mode 100644 new mode 100755 diff --git a/docker-compose.test.yml b/docker-compose.test.yml new file mode 100644 index 0000000..4022ede --- /dev/null +++ b/docker-compose.test.yml @@ -0,0 +1,22 @@ +services: + agent-test: + build: + context: . + dockerfile: docker/Dockerfile + target: dev + working_dir: /app + container_name: agent-test + volumes: + - .:/app + - cargo-registry:/usr/local/cargo/registry + - cargo-git:/usr/local/cargo/git + - /var/run/docker.sock:/var/run/docker.sock + environment: + APP_ENV: test + LOG: debug + TZ: Europe/Paris + EDGE_KEY: "" + +volumes: + cargo-registry: + cargo-git: diff --git a/docker-compose.yml b/docker-compose.yml index 51e7d19..7b19a5e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,14 +11,16 @@ services: # - ./databases.toml:/config/config.toml - cargo-registry:/usr/local/cargo/registry - cargo-git:/usr/local/cargo/git -# - cargo-target:/app/target + - /var/run/docker.sock:/var/run/docker.sock + + # - cargo-target:/app/target # - sqlite-data:/sqlite-data/workspace/data # - ./scripts/sqlite/test-db:/sqlite-data-2/workspace/data environment: APP_ENV: development LOG: debug TZ: "Europe/Paris" - EDGE_KEY: "eyJzZXJ2ZXJVcmwiOiJodHRwOi8vbG9jYWxob3N0Ojg4ODciLCJhZ2VudElkIjoiNWU1OGU2MGEtODhiMy00YTBjLWI0NDktNTQ3OWZhOTQzZDBkIiwibWFzdGVyS2V5QjY0IjoiQlhWM1hvbEM2NTZTVjdkTmdjV1BHUWxrKytycExJNmxHRGk3Q1BCNWllbz0ifQ==" + EDGE_KEY: "eyJzZXJ2ZXJVcmwiOiJodHRwOi8vbG9jYWxob3N0Ojg4ODciLCJhZ2VudElkIjoiNGI1OTM2MGItNTNkMi00ZTZmLWE1ODctODcyMmQ1NDc1MTNmIiwibWFzdGVyS2V5QjY0IjoiQlhWM1hvbEM2NTZTVjdkTmdjV1BHUWxrKytycExJNmxHRGk3Q1BCNWllbz0ifQ==" #POOLING: 1 #DATABASES_CONFIG_FILE: "config.toml" extra_hosts: diff --git a/docker/Dockerfile b/docker/Dockerfile index 5a714fa..0006a89 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -60,6 +60,7 @@ WORKDIR /app FROM base AS dev RUN cargo install cargo-watch +RUN cargo install grcov --locked COPY Cargo.toml Cargo.lock ./ RUN mkdir src && echo "fn main() {}" > src/main.rs diff --git a/scripts/README.md b/scripts/README.md index 9045b17..f94a41e 100644 --- a/scripts/README.md +++ b/scripts/README.md @@ -24,3 +24,7 @@ docker exec -it db-sqlite sqlite3 /workspace/data/app.db "SELECT * FROM users LI ```bash docker exec -it db-sqlite sqlite3 /workspace/data/app.db "SELECT name FROM sqlite_master WHERE type='table';" ``` + +```bash +docker compose -f docker-compose.test.yml run agent-test bash -c "cargo clean && cargo test" +``` \ No newline at end of file diff --git a/scripts/tests/requirements.sh b/scripts/tests/requirements.sh deleted file mode 100755 index 324244e..0000000 --- a/scripts/tests/requirements.sh +++ /dev/null @@ -1,88 +0,0 @@ -#!/usr/bin/env bash -set -e - -POSTGRES_BASE="/usr/local/postgresql" -echo "Detecting OS and architecture..." -OS_TYPE="$(uname -s)" -ARCH="$(uname -m)" - -install_pg_binaries() { - echo "Installing PostgreSQL binaries for versions 12-18..." - - for v in 12 13 14 15 16 17 18; do - TARGET_DIR="$POSTGRES_BASE/$v/bin" - sudo mkdir -p "$TARGET_DIR" - - if [[ "$OS_TYPE" == "Linux" ]]; then - if [[ "$ARCH" == "x86_64" ]]; then - SRC_DIR="./assets/tools/amd64/postgresql/postgresql-$v/bin" - elif [[ "$ARCH" == "aarch64" ]]; then - SRC_DIR="./assets/tools/arm64/postgresql/postgresql-$v/bin" - else - echo "Unsupported architecture: $ARCH" - continue - fi - - if [[ -d "$SRC_DIR" ]]; then - echo "Copying PostgreSQL $v binaries from $SRC_DIR to $TARGET_DIR" - sudo cp -r "$SRC_DIR"/* "$TARGET_DIR/" - else - echo "Binaries for PostgreSQL $v not found for Linux, skipping..." - continue - fi - - elif [[ "$OS_TYPE" == "Darwin" ]]; then - PG_SRC="$(brew --prefix postgresql@$v)/bin" 2>/dev/null || true - - if [[ ! -d "$PG_SRC" ]]; then - echo "PostgreSQL $v not installed via Homebrew. Trying to install..." - if ! brew install postgresql@$v; then - echo "PostgreSQL $v not available, skipping..." - continue - fi - PG_SRC="$(brew --prefix postgresql@$v)/bin" - fi - - echo "Copying PostgreSQL $v binaries from $PG_SRC to $TARGET_DIR" - sudo cp -r "$PG_SRC"/* "$TARGET_DIR/" - fi - - sudo chown -R "$(whoami)" "$TARGET_DIR" - chmod +x "$TARGET_DIR"/* - done - - echo "PostgreSQL binaries installed under $POSTGRES_BASE" -} - -if [[ "$OS_TYPE" == "Linux" ]]; then - if command -v apt >/dev/null 2>&1; then - echo "Linux detected with apt. Installing prerequisites..." - sudo apt update - sudo apt install -y wget gnupg lsb-release redis-tools valkey - install_pg_binaries - else - echo "Unsupported Linux distribution. Only apt-based distros are supported." - exit 1 - fi - -elif [[ "$OS_TYPE" == "Darwin" ]]; then - if command -v brew >/dev/null 2>&1; then - echo "macOS detected. Installing prerequisites..." - brew install redis - brew install valkey - - sudo mkdir -p "$POSTGRES_BASE" - sudo chown -R "$(whoami)" "$POSTGRES_BASE" - - install_pg_binaries - else - echo "Homebrew not found. Please install Homebrew first: https://brew.sh/" - exit 1 - fi - -else - echo "Unsupported OS: $OS_TYPE" - exit 1 -fi - -echo "Tools installation completed successfully." \ No newline at end of file diff --git a/src/core/agent.rs b/src/core/agent.rs index 23e56f1..bd20108 100644 --- a/src/core/agent.rs +++ b/src/core/agent.rs @@ -4,11 +4,11 @@ use crate::core::context::Context; use crate::services::backup::BackupService; use crate::services::config::ConfigService; use crate::services::cron::CronService; +use crate::services::restore::RestoreService; use crate::services::status::StatusService; use crate::utils::common::BackupMethod; use std::sync::Arc; use tracing::info; -use crate::services::restore::RestoreService; pub struct Agent { ctx: Arc, @@ -43,23 +43,30 @@ impl Agent { let ping_result = self.status_service.ping(&config.databases).await?; for db in ping_result.databases.iter() { - let database = config.databases.iter().find(|cfg_db|cfg_db.generated_id == db.generated_id).unwrap(); + let database = config + .databases + .iter() + .find(|cfg_db| cfg_db.generated_id == db.generated_id) + .unwrap(); info!( "Generated Id: {} | backup action: {} | restore action: {} | Database Name: {}", - db.generated_id, db.data.backup.action, db.data.restore.action, database.name, + db.generated_id, db.data.backup.action, db.data.restore.action, database.name, ); let _ = self.cron_service.sync(db).await; if db.data.backup.action { let _ = self .backup_service - .dispatch(&db.generated_id, &config, method.clone(), &db.storages, db.encrypt) + .dispatch( + &db.generated_id, + &config, + method.clone(), + &db.storages, + db.encrypt, + ) .await; } else if db.data.restore.action { - let _ = self - .restore_service - .dispatch(db, &config) - .await; + let _ = self.restore_service.dispatch(db, &config).await; } } diff --git a/src/core/context.rs b/src/core/context.rs index 00f7988..c37823d 100644 --- a/src/core/context.rs +++ b/src/core/context.rs @@ -35,7 +35,7 @@ impl Context { panic!("Cannot initialize AgentContext due to invalid EDGE_KEY"); } }; - + let server_url = format!("{}/api", edge_key.server_url); let api_client = ApiClient::new(server_url); diff --git a/src/core/mod.rs b/src/core/mod.rs index 869472d..6cc8135 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -1,2 +1,2 @@ -pub mod context; pub mod agent; +pub mod context; diff --git a/src/domain/factory.rs b/src/domain/factory.rs index eadd24c..d3b532a 100644 --- a/src/domain/factory.rs +++ b/src/domain/factory.rs @@ -4,18 +4,18 @@ use crate::domain::postgres::database::PostgresDatabase; use crate::domain::postgres::{detect_format_from_file, detect_format_from_size}; use crate::domain::redis::database::RedisDatabase; use crate::domain::sqlite::database::SqliteDatabase; +use crate::domain::valkey::database::ValkeyDatabase; use crate::services::config::{DatabaseConfig, DbType}; use anyhow::Result; use std::path::{Path, PathBuf}; use std::sync::Arc; -use crate::domain::valkey::database::ValkeyDatabase; #[async_trait::async_trait] pub trait Database: Send + Sync { fn file_extension(&self) -> &'static str; async fn ping(&self) -> Result; - async fn backup(&self, backup_dir: &Path, is_test: Option) -> Result; - async fn restore(&self, restore_file: &Path, is_test: Option) -> Result<()>; + async fn backup(&self, backup_dir: &Path) -> Result; + async fn restore(&self, restore_file: &Path) -> Result<()>; } pub struct DatabaseFactory; @@ -32,7 +32,7 @@ impl DatabaseFactory { DbType::MongoDB => Arc::new(MongoDatabase::new(cfg)), DbType::Sqlite => Arc::new(SqliteDatabase::new(cfg)), DbType::Redis => Arc::new(RedisDatabase::new(cfg)), - DbType::Valkey => Arc::new(ValkeyDatabase::new(cfg)) + DbType::Valkey => Arc::new(ValkeyDatabase::new(cfg)), } } @@ -47,8 +47,7 @@ impl DatabaseFactory { DbType::MongoDB => Arc::new(MongoDatabase::new(cfg)), DbType::Sqlite => Arc::new(SqliteDatabase::new(cfg)), DbType::Redis => Arc::new(RedisDatabase::new(cfg)), - DbType::Valkey => Arc::new(ValkeyDatabase::new(cfg)) - + DbType::Valkey => Arc::new(ValkeyDatabase::new(cfg)), } } } diff --git a/src/domain/mod.rs b/src/domain/mod.rs index ea49a82..534c955 100644 --- a/src/domain/mod.rs +++ b/src/domain/mod.rs @@ -1,8 +1,7 @@ pub mod factory; -pub mod postgres; -pub mod mysql; mod mongodb; -mod sqlite; +pub mod mysql; +pub mod postgres; mod redis; +mod sqlite; mod valkey; - diff --git a/src/domain/mongodb/backup.rs b/src/domain/mongodb/backup.rs index d4724b3..7abb6fd 100644 --- a/src/domain/mongodb/backup.rs +++ b/src/domain/mongodb/backup.rs @@ -15,6 +15,7 @@ pub async fn run( let file_path = backup_dir.join(format!("{}{}", cfg.generated_id, file_extension)); let mongodump = select_mongo_path().join("mongodump"); + info!("{:?}", mongodump); let uri = get_mongo_uri(cfg.clone())?; let output = Command::new(mongodump) diff --git a/src/domain/mongodb/connection.rs b/src/domain/mongodb/connection.rs index 5d3a43c..311895b 100644 --- a/src/domain/mongodb/connection.rs +++ b/src/domain/mongodb/connection.rs @@ -16,9 +16,11 @@ pub fn select_mongo_path() -> std::path::PathBuf { } pub fn get_mongo_uri(cfg: DatabaseConfig) -> Result { - if cfg.username.is_empty() || cfg.password.is_empty() { - Ok(format!("mongodb://{}:{}/{}", cfg.host, cfg.port, cfg.database)) + Ok(format!( + "mongodb://{}:{}/{}", + cfg.host, cfg.port, cfg.database + )) } else { Ok(format!( "mongodb://{}:{}@{}:{}/{}?authSource=admin", @@ -26,5 +28,3 @@ pub fn get_mongo_uri(cfg: DatabaseConfig) -> Result { )) } } - - diff --git a/src/domain/mongodb/database.rs b/src/domain/mongodb/database.rs index f0ec057..e4c40b8 100644 --- a/src/domain/mongodb/database.rs +++ b/src/domain/mongodb/database.rs @@ -27,27 +27,17 @@ impl Database for MongoDatabase { ping::run(self.cfg.clone()).await } - async fn backup(&self, dir: &Path, is_test: Option) -> Result { - let test_mode = is_test.unwrap_or(false); - if !test_mode { - FileLock::acquire(&self.cfg.generated_id, DbOpLock::Backup.as_str()).await?; - } + async fn backup(&self, dir: &Path) -> Result { + FileLock::acquire(&self.cfg.generated_id, DbOpLock::Backup.as_str()).await?; let res = backup::run(self.cfg.clone(), dir.to_path_buf(), self.file_extension()).await; - if !test_mode { - FileLock::release(&self.cfg.generated_id).await?; - } + FileLock::release(&self.cfg.generated_id).await?; res } - async fn restore(&self, file: &Path, is_test: Option) -> Result<()> { - let test_mode = is_test.unwrap_or(false); - if !test_mode { - FileLock::acquire(&self.cfg.generated_id, DbOpLock::Restore.as_str()).await?; - } + async fn restore(&self, file: &Path) -> Result<()> { + FileLock::acquire(&self.cfg.generated_id, DbOpLock::Restore.as_str()).await?; let res = restore::run(self.cfg.clone(), file.to_path_buf()).await; - if !test_mode { - FileLock::release(&self.cfg.generated_id).await?; - } + FileLock::release(&self.cfg.generated_id).await?; res } } diff --git a/src/domain/mongodb/mod.rs b/src/domain/mongodb/mod.rs index 74afdd4..a4869e1 100644 --- a/src/domain/mongodb/mod.rs +++ b/src/domain/mongodb/mod.rs @@ -1,5 +1,5 @@ mod backup; -mod restore; +mod connection; pub mod database; mod ping; -mod connection; \ No newline at end of file +mod restore; diff --git a/src/domain/mongodb/ping.rs b/src/domain/mongodb/ping.rs index 6a3478b..f70e9ef 100644 --- a/src/domain/mongodb/ping.rs +++ b/src/domain/mongodb/ping.rs @@ -24,7 +24,7 @@ pub async fn run(cfg: DatabaseConfig) -> Result { error!("Full Error: {}", e); error!("Check you database network connectivity"); error!("----------------------------------------"); - Err(anyhow::anyhow!("Ping failed for {}: {}", cfg.name, e)) + Ok(false) } } } diff --git a/src/domain/mysql/backup.rs b/src/domain/mysql/backup.rs index 1413bd4..2ec7531 100644 --- a/src/domain/mysql/backup.rs +++ b/src/domain/mysql/backup.rs @@ -1,4 +1,4 @@ -use crate::domain::mysql::connection::{server_version}; +use crate::domain::mysql::connection::server_version; use crate::services::config::DatabaseConfig; use anyhow::{Context, Result}; use std::collections::HashMap; diff --git a/src/domain/mysql/connection.rs b/src/domain/mysql/connection.rs index db26bbb..f9e14fb 100644 --- a/src/domain/mysql/connection.rs +++ b/src/domain/mysql/connection.rs @@ -1,14 +1,17 @@ use crate::services::config::DatabaseConfig; -use std::process::Command; use anyhow::Result; +use std::process::Command; pub async fn server_version(cfg: &DatabaseConfig) -> Result { - let output = Command::new("mysql") - .arg("--host").arg(&cfg.host) - .arg("--port").arg(cfg.port.to_string()) - .arg("--user").arg(&cfg.username) - .arg("-e").arg("SELECT VERSION();") + .arg("--host") + .arg(&cfg.host) + .arg("--port") + .arg(cfg.port.to_string()) + .arg("--user") + .arg(&cfg.username) + .arg("-e") + .arg("SELECT VERSION();") .env("MYSQL_PWD", &cfg.password) .output()?; @@ -19,11 +22,10 @@ pub async fn server_version(cfg: &DatabaseConfig) -> Result { let version = String::from_utf8_lossy(&output.stdout) .lines() - .nth(1) // skip column header + .nth(1) .unwrap_or_default() .trim() .to_string(); Ok(version) } - diff --git a/src/domain/mysql/database.rs b/src/domain/mysql/database.rs index d616f83..9c39bd6 100644 --- a/src/domain/mysql/database.rs +++ b/src/domain/mysql/database.rs @@ -1,14 +1,11 @@ -use std::collections::HashMap; -use anyhow::Result; -use async_trait::async_trait; -use std::path::{Path, PathBuf}; -use super::{ - backup, - ping, restore, -}; +use super::{backup, ping, restore}; use crate::domain::factory::Database; use crate::services::config::DatabaseConfig; use crate::utils::locks::{DbOpLock, FileLock}; +use anyhow::Result; +use async_trait::async_trait; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; pub struct MySQLDatabase { cfg: DatabaseConfig, @@ -36,28 +33,23 @@ impl Database for MySQLDatabase { ping::run(self.cfg.clone(), self.build_env().clone()).await } - - async fn backup(&self, dir: &Path, is_test: Option) -> Result { - let test_mode = is_test.unwrap_or(false); - if !test_mode { - FileLock::acquire(&self.cfg.generated_id, DbOpLock::Backup.as_str()).await?; - } - let res = backup::run(self.cfg.clone(), dir.to_path_buf(), self.build_env().clone(), self.file_extension()).await; - if !test_mode { - FileLock::release(&self.cfg.generated_id).await?; - } + async fn backup(&self, dir: &Path) -> Result { + FileLock::acquire(&self.cfg.generated_id, DbOpLock::Backup.as_str()).await?; + let res = backup::run( + self.cfg.clone(), + dir.to_path_buf(), + self.build_env().clone(), + self.file_extension(), + ) + .await; + FileLock::release(&self.cfg.generated_id).await?; res } - async fn restore(&self, file: &Path, is_test: Option) -> Result<()> { - let test_mode = is_test.unwrap_or(false); - if !test_mode { - FileLock::acquire(&self.cfg.generated_id, DbOpLock::Restore.as_str()).await?; - } + async fn restore(&self, file: &Path) -> Result<()> { + FileLock::acquire(&self.cfg.generated_id, DbOpLock::Restore.as_str()).await?; let res = restore::run(self.cfg.clone(), file.to_path_buf()).await; - if !test_mode { - FileLock::release(&self.cfg.generated_id).await?; - } + FileLock::release(&self.cfg.generated_id).await?; res } } diff --git a/src/domain/mysql/mod.rs b/src/domain/mysql/mod.rs index 1bfe59e..33dd5f4 100644 --- a/src/domain/mysql/mod.rs +++ b/src/domain/mysql/mod.rs @@ -1,5 +1,5 @@ pub mod backup; +mod connection; pub mod database; -mod restore; mod ping; -mod connection; \ No newline at end of file +mod restore; diff --git a/src/domain/mysql/ping.rs b/src/domain/mysql/ping.rs index dd2491f..fde0407 100644 --- a/src/domain/mysql/ping.rs +++ b/src/domain/mysql/ping.rs @@ -4,7 +4,6 @@ use tokio::process::Command; use tokio::time::{Duration, timeout}; pub async fn run(cfg: DatabaseConfig, env: HashMap) -> anyhow::Result { - let mut cmd = Command::new("mysqladmin"); cmd.arg("--host") .arg(cfg.host) diff --git a/src/domain/postgres/backup.rs b/src/domain/postgres/backup.rs index f6f71be..9ec9eb4 100644 --- a/src/domain/postgres/backup.rs +++ b/src/domain/postgres/backup.rs @@ -11,7 +11,6 @@ pub async fn run( cfg: DatabaseConfig, format: PostgresDumpFormat, backup_dir: PathBuf, - is_test: Option ) -> Result { tokio::task::spawn_blocking(move || -> Result { debug!("Starting backup for database {}", cfg.name); @@ -27,8 +26,8 @@ pub async fn run( } }; - let pg_dump = select_pg_path(&version, is_test).join("pg_dump"); - + let pg_dump = select_pg_path(&version).join("pg_dump"); + debug!("Using pg_dump at {:?}", pg_dump); match format { diff --git a/src/domain/postgres/connection.rs b/src/domain/postgres/connection.rs index 3145f9a..92648f3 100644 --- a/src/domain/postgres/connection.rs +++ b/src/domain/postgres/connection.rs @@ -28,14 +28,9 @@ pub async fn server_version(cfg: &DatabaseConfig) -> Result { Ok(version) } -pub fn select_pg_path(version: &str, is_test: Option) -> std::path::PathBuf { +pub fn select_pg_path(version: &str) -> std::path::PathBuf { let major = version.split('.').next().unwrap_or("17"); - - if is_test.unwrap_or(false) { - format!("/usr/local/postgresql/{}/bin", major).into() - } else { - format!("/usr/lib/postgresql/{}/bin", major).into() - } + format!("/usr/lib/postgresql/{}/bin", major).into() } pub async fn terminate_connections(cfg: &DatabaseConfig) -> Result<()> { diff --git a/src/domain/postgres/database.rs b/src/domain/postgres/database.rs index 7af5376..12aa7ce 100644 --- a/src/domain/postgres/database.rs +++ b/src/domain/postgres/database.rs @@ -31,27 +31,17 @@ impl Database for PostgresDatabase { ping::run(self.cfg.clone()).await } - async fn backup(&self, dir: &Path, is_test: Option) -> Result { - let test_mode = is_test.unwrap_or(false); - if !test_mode { - FileLock::acquire(&self.cfg.generated_id, DbOpLock::Backup.as_str()).await?; - } - let res = backup::run(self.cfg.clone(), self.format, dir.to_path_buf(), is_test).await; - if !test_mode { - FileLock::release(&self.cfg.generated_id).await?; - } + async fn backup(&self, dir: &Path) -> Result { + FileLock::acquire(&self.cfg.generated_id, DbOpLock::Backup.as_str()).await?; + let res = backup::run(self.cfg.clone(), self.format, dir.to_path_buf()).await; + FileLock::release(&self.cfg.generated_id).await?; res } - async fn restore(&self, file: &Path, is_test: Option) -> Result<()> { - let test_mode = is_test.unwrap_or(false); - if !test_mode { - FileLock::acquire(&self.cfg.generated_id, DbOpLock::Restore.as_str()).await?; - } - let res = restore::run(self.cfg.clone(), self.format, file.to_path_buf(), is_test).await; - if !test_mode { - FileLock::release(&self.cfg.generated_id).await?; - } + async fn restore(&self, file: &Path) -> Result<()> { + FileLock::acquire(&self.cfg.generated_id, DbOpLock::Restore.as_str()).await?; + let res = restore::run(self.cfg.clone(), self.format, file.to_path_buf()).await; + FileLock::release(&self.cfg.generated_id).await?; res } } diff --git a/src/domain/postgres/format.rs b/src/domain/postgres/format.rs index e76f18a..5415fd7 100644 --- a/src/domain/postgres/format.rs +++ b/src/domain/postgres/format.rs @@ -2,4 +2,4 @@ pub enum PostgresDumpFormat { Fc, Fd, -} \ No newline at end of file +} diff --git a/src/domain/postgres/mod.rs b/src/domain/postgres/mod.rs index f7b39e9..cd42ace 100644 --- a/src/domain/postgres/mod.rs +++ b/src/domain/postgres/mod.rs @@ -1,8 +1,8 @@ pub mod backup; -pub mod database; -mod restore; mod connection; +pub mod database; mod format; mod ping; +mod restore; -pub use connection::{detect_format_from_size, detect_format_from_file}; +pub use connection::{detect_format_from_file, detect_format_from_size}; diff --git a/src/domain/postgres/ping.rs b/src/domain/postgres/ping.rs index b1e5421..5d859f1 100644 --- a/src/domain/postgres/ping.rs +++ b/src/domain/postgres/ping.rs @@ -1,8 +1,6 @@ use super::connection::connect; use crate::services::config::DatabaseConfig; -pub async fn run( - cfg: DatabaseConfig, -) -> anyhow::Result { +pub async fn run(cfg: DatabaseConfig) -> anyhow::Result { Ok(connect(&cfg).await.is_ok()) -} \ No newline at end of file +} diff --git a/src/domain/postgres/restore.rs b/src/domain/postgres/restore.rs index a5181f7..98a7732 100644 --- a/src/domain/postgres/restore.rs +++ b/src/domain/postgres/restore.rs @@ -11,7 +11,6 @@ pub async fn run( cfg: DatabaseConfig, format: PostgresDumpFormat, restore_file: PathBuf, - is_test: Option, ) -> Result<()> { tokio::task::spawn_blocking(move || -> Result<()> { debug!("Starting restore for database {}", cfg.name); @@ -27,7 +26,7 @@ pub async fn run( } }; - let pg_restore = select_pg_path(&version, is_test).join("pg_restore"); + let pg_restore = select_pg_path(&version).join("pg_restore"); debug!("Using pg_restore at {:?}", pg_restore); diff --git a/src/domain/redis/database.rs b/src/domain/redis/database.rs index e536b18..33e67e2 100644 --- a/src/domain/redis/database.rs +++ b/src/domain/redis/database.rs @@ -27,19 +27,14 @@ impl Database for RedisDatabase { ping::run(self.cfg.clone()).await } - async fn backup(&self, dir: &Path, is_test: Option) -> Result { - let test_mode = is_test.unwrap_or(false); - if !test_mode { - FileLock::acquire(&self.cfg.generated_id, DbOpLock::Backup.as_str()).await?; - } + async fn backup(&self, dir: &Path) -> Result { + FileLock::acquire(&self.cfg.generated_id, DbOpLock::Backup.as_str()).await?; let res = backup::run(self.cfg.clone(), dir.to_path_buf(), self.file_extension()).await; - if !test_mode { - FileLock::release(&self.cfg.generated_id).await?; - } + FileLock::release(&self.cfg.generated_id).await?; res } - async fn restore(&self, _file: &Path, _is_test: Option) -> Result<()> { + async fn restore(&self, _file: &Path) -> Result<()> { bail!("Restore not supported for Redis databases") } } diff --git a/src/domain/redis/ping.rs b/src/domain/redis/ping.rs index d06df71..82b2b9b 100644 --- a/src/domain/redis/ping.rs +++ b/src/domain/redis/ping.rs @@ -1,8 +1,8 @@ -use tracing::{debug, info}; use crate::services::config::DatabaseConfig; +use anyhow::{Context, Result}; use tokio::process::Command; -use tokio::time::{timeout, Duration}; -use anyhow::{Result, Context}; +use tokio::time::{Duration, timeout}; +use tracing::{debug, info}; pub async fn run(cfg: DatabaseConfig) -> Result { let mut cmd = Command::new("redis-cli"); @@ -23,7 +23,6 @@ pub async fn run(cfg: DatabaseConfig) -> Result { debug!("Command Ping: {:?}", cmd); - let result = timeout(Duration::from_secs(10), cmd.output()).await; match result { @@ -52,4 +51,4 @@ pub async fn run(cfg: DatabaseConfig) -> Result { Ok(false) } } -} \ No newline at end of file +} diff --git a/src/domain/sqlite/backup.rs b/src/domain/sqlite/backup.rs index 578cfd6..7b4b21c 100644 --- a/src/domain/sqlite/backup.rs +++ b/src/domain/sqlite/backup.rs @@ -41,5 +41,5 @@ pub async fn run( info!("SQLite backup completed for {}", cfg.name); Ok(file_path) }) - .await? -} \ No newline at end of file + .await? +} diff --git a/src/domain/sqlite/database.rs b/src/domain/sqlite/database.rs index 5bcebab..2579061 100644 --- a/src/domain/sqlite/database.rs +++ b/src/domain/sqlite/database.rs @@ -27,26 +27,16 @@ impl Database for SqliteDatabase { ping::run(self.cfg.clone()).await } - async fn backup(&self, dir: &Path, is_test: Option) -> Result { - let test_mode = is_test.unwrap_or(false); - if !test_mode { - FileLock::acquire(&self.cfg.generated_id, DbOpLock::Backup.as_str()).await?; - } + async fn backup(&self, dir: &Path) -> Result { + FileLock::acquire(&self.cfg.generated_id, DbOpLock::Backup.as_str()).await?; let res = backup::run(self.cfg.clone(), dir.to_path_buf(), self.file_extension()).await; - if !test_mode { - FileLock::release(&self.cfg.generated_id).await?; - } + FileLock::release(&self.cfg.generated_id).await?; res } - async fn restore(&self, file: &Path, is_test: Option) -> Result<()> { - let test_mode = is_test.unwrap_or(false); - if !test_mode { - FileLock::acquire(&self.cfg.generated_id, DbOpLock::Restore.as_str()).await?; - } + async fn restore(&self, file: &Path) -> Result<()> { + FileLock::acquire(&self.cfg.generated_id, DbOpLock::Restore.as_str()).await?; let res = restore::run(self.cfg.clone(), file.to_path_buf()).await; - if !test_mode { - FileLock::release(&self.cfg.generated_id).await?; - } + FileLock::release(&self.cfg.generated_id).await?; res } } diff --git a/src/domain/sqlite/mod.rs b/src/domain/sqlite/mod.rs index 5efda1c..325a54a 100644 --- a/src/domain/sqlite/mod.rs +++ b/src/domain/sqlite/mod.rs @@ -1,4 +1,4 @@ mod backup; -mod restore; +pub mod database; mod ping; -pub mod database; \ No newline at end of file +mod restore; diff --git a/src/domain/valkey/database.rs b/src/domain/valkey/database.rs index a0677ac..8696179 100644 --- a/src/domain/valkey/database.rs +++ b/src/domain/valkey/database.rs @@ -27,19 +27,14 @@ impl Database for ValkeyDatabase { ping::run(self.cfg.clone()).await } - async fn backup(&self, dir: &Path, is_test: Option) -> Result { - let test_mode = is_test.unwrap_or(false); - if !test_mode { - FileLock::acquire(&self.cfg.generated_id, DbOpLock::Backup.as_str()).await?; - } + async fn backup(&self, dir: &Path) -> Result { + FileLock::acquire(&self.cfg.generated_id, DbOpLock::Backup.as_str()).await?; let res = backup::run(self.cfg.clone(), dir.to_path_buf(), self.file_extension()).await; - if !test_mode { - FileLock::release(&self.cfg.generated_id).await?; - } + FileLock::release(&self.cfg.generated_id).await?; res } - async fn restore(&self, _file: &Path, _is_test: Option) -> Result<()> { + async fn restore(&self, _file: &Path) -> Result<()> { bail!("Restore not supported for Valkey databases") } } diff --git a/src/domain/valkey/ping.rs b/src/domain/valkey/ping.rs index eb6f47c..851c6a6 100644 --- a/src/domain/valkey/ping.rs +++ b/src/domain/valkey/ping.rs @@ -1,8 +1,8 @@ -use tracing::{debug, info}; use crate::services::config::DatabaseConfig; +use anyhow::{Context, Result}; use tokio::process::Command; -use tokio::time::{timeout, Duration}; -use anyhow::{Result, Context}; +use tokio::time::{Duration, timeout}; +use tracing::{debug, info}; pub async fn run(cfg: DatabaseConfig) -> Result { let mut cmd = Command::new("valkey-cli"); @@ -23,7 +23,6 @@ pub async fn run(cfg: DatabaseConfig) -> Result { debug!("Command Ping: {:?}", cmd); - let result = timeout(Duration::from_secs(10), cmd.output()).await; match result { @@ -52,4 +51,4 @@ pub async fn run(cfg: DatabaseConfig) -> Result { Ok(false) } } -} \ No newline at end of file +} diff --git a/src/main.rs b/src/main.rs index a391593..5c7a3f5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,19 +3,18 @@ mod domain; mod services; mod settings; mod tasks; -mod utils; #[cfg(test)] mod tests; +mod utils; use crate::tasks::ping::ping_server; use crate::utils::locks::FileLock; +use crate::utils::logging; use utils::redis_client; use utils::task_manager::scheduler; -use crate::utils::logging; #[tokio::main] async fn main() { - logging::init_logger(); // Remove all locks on startup diff --git a/src/services/api/client.rs b/src/services/api/client.rs index f11ba33..f1e396c 100644 --- a/src/services/api/client.rs +++ b/src/services/api/client.rs @@ -1,9 +1,9 @@ #![allow(dead_code)] +use crate::services::api::ApiError; use reqwest::{Client, Method}; use serde::de::DeserializeOwned; use std::time::Duration; -use crate::services::api::ApiError; #[derive(Clone, Debug)] pub struct ApiClient { @@ -57,12 +57,7 @@ impl ApiClient { { let url = format!("{}{}", self.base_url, path); - let res = self - .http - .request(method, &url) - .json(body) - .send() - .await?; + let res = self.http.request(method, &url).json(body).send().await?; let status = res.status(); let body_text = res.text().await.unwrap_or_default(); diff --git a/src/services/api/endpoints/agent/backup/mod.rs b/src/services/api/endpoints/agent/backup/mod.rs index 114d146..5a489e4 100644 --- a/src/services/api/endpoints/agent/backup/mod.rs +++ b/src/services/api/endpoints/agent/backup/mod.rs @@ -23,7 +23,6 @@ pub struct BackupUpdateRequest { pub generated_id: String, } - impl ApiClient { pub async fn backup_create( &self, diff --git a/src/services/api/endpoints/agent/backup/upload/init.rs b/src/services/api/endpoints/agent/backup/upload/init.rs index 8b3d52f..cd39651 100644 --- a/src/services/api/endpoints/agent/backup/upload/init.rs +++ b/src/services/api/endpoints/agent/backup/upload/init.rs @@ -1,8 +1,8 @@ +use crate::services::api::models::agent::backup::BackupUploadResponse; use crate::services::api::{ApiClient, ApiError}; use anyhow::Result; use reqwest::Method; use serde::Serialize; -use crate::services::api::models::agent::backup::BackupUploadResponse; #[derive(Serialize)] pub struct InitUploadRequest { diff --git a/src/services/api/endpoints/agent/backup/upload/mod.rs b/src/services/api/endpoints/agent/backup/upload/mod.rs index 832c46e..3ed8b62 100644 --- a/src/services/api/endpoints/agent/backup/upload/mod.rs +++ b/src/services/api/endpoints/agent/backup/upload/mod.rs @@ -1,2 +1,2 @@ pub mod init; -pub mod status; \ No newline at end of file +pub mod status; diff --git a/src/services/api/endpoints/agent/mod.rs b/src/services/api/endpoints/agent/mod.rs index 161e4e6..49027be 100644 --- a/src/services/api/endpoints/agent/mod.rs +++ b/src/services/api/endpoints/agent/mod.rs @@ -1,3 +1,3 @@ -pub mod status; pub mod backup; -pub mod restore; \ No newline at end of file +pub mod restore; +pub mod status; diff --git a/src/services/api/endpoints/agent/status.rs b/src/services/api/endpoints/agent/status.rs index fe05790..868a4d4 100644 --- a/src/services/api/endpoints/agent/status.rs +++ b/src/services/api/endpoints/agent/status.rs @@ -31,6 +31,7 @@ impl ApiClient { let agent_id = agent_id.into(); let path = format!("/agent/{}/status", agent_id); - self.request_with_body(Method::POST, path.as_str(), &body).await + self.request_with_body(Method::POST, path.as_str(), &body) + .await } -} \ No newline at end of file +} diff --git a/src/services/api/endpoints/mod.rs b/src/services/api/endpoints/mod.rs index e779f0f..f498649 100644 --- a/src/services/api/endpoints/mod.rs +++ b/src/services/api/endpoints/mod.rs @@ -1,3 +1,3 @@ pub mod agent; -pub use agent::status; \ No newline at end of file +pub use agent::status; diff --git a/src/services/api/error.rs b/src/services/api/error.rs index ac82449..4862ee5 100644 --- a/src/services/api/error.rs +++ b/src/services/api/error.rs @@ -11,10 +11,7 @@ pub enum ApiError { Serialization(#[from] serde_json::Error), #[error("api error: status={status}, body={body}")] - HttpResponse { - status: StatusCode, - body: String, - }, + HttpResponse { status: StatusCode, body: String }, #[error("api returned unexpected response")] UnexpectedResponse, diff --git a/src/services/api/mod.rs b/src/services/api/mod.rs index 5c1e7d9..be0520a 100644 --- a/src/services/api/mod.rs +++ b/src/services/api/mod.rs @@ -1,8 +1,7 @@ pub mod client; +pub mod endpoints; pub mod error; pub mod models; -pub mod endpoints; pub use client::ApiClient; pub use error::ApiError; - diff --git a/src/services/api/models/agent/mod.rs b/src/services/api/models/agent/mod.rs index 161e4e6..49027be 100644 --- a/src/services/api/models/agent/mod.rs +++ b/src/services/api/models/agent/mod.rs @@ -1,3 +1,3 @@ -pub mod status; pub mod backup; -pub mod restore; \ No newline at end of file +pub mod restore; +pub mod status; diff --git a/src/services/api/models/agent/restore.rs b/src/services/api/models/agent/restore.rs index 288d4c8..d9abfb5 100644 --- a/src/services/api/models/agent/restore.rs +++ b/src/services/api/models/agent/restore.rs @@ -4,4 +4,4 @@ use serde::{Deserialize, Serialize}; pub struct ResultRestoreResponse { pub message: String, pub status: bool, -} \ No newline at end of file +} diff --git a/src/services/api/models/agent/status.rs b/src/services/api/models/agent/status.rs index 992ae2c..8b1b145 100644 --- a/src/services/api/models/agent/status.rs +++ b/src/services/api/models/agent/status.rs @@ -1,8 +1,8 @@ #![allow(dead_code)] +use crate::utils::deserializer::deserialize_snake_case; use serde::{Deserialize, Serialize}; use toml::Value; -use crate::utils::deserializer::deserialize_snake_case; #[derive(Debug, Deserialize)] pub struct PingResult { diff --git a/src/services/api/models/mod.rs b/src/services/api/models/mod.rs index 4be98f7..f17bc55 100644 --- a/src/services/api/models/mod.rs +++ b/src/services/api/models/mod.rs @@ -1,2 +1 @@ pub mod agent; - diff --git a/src/services/backup/compressor.rs b/src/services/backup/compressor.rs index d7ca3e3..ad36a09 100644 --- a/src/services/backup/compressor.rs +++ b/src/services/backup/compressor.rs @@ -1,20 +1,14 @@ use super::service::BackupService; use crate::utils::compress::compress_to_tar_gz_large; -use std::path::PathBuf; use anyhow::Result; +use std::path::PathBuf; impl BackupService { - - pub async fn compress_backup( - &self, - backup_file: Option, - ) -> Result { - - let file = backup_file - .ok_or_else(|| anyhow::anyhow!("No backup file generated"))?; + pub async fn compress_backup(&self, backup_file: Option) -> Result { + let file = backup_file.ok_or_else(|| anyhow::anyhow!("No backup file generated"))?; let compression = compress_to_tar_gz_large(&file).await?; Ok(compression.compressed_path) } -} \ No newline at end of file +} diff --git a/src/services/backup/dispatcher.rs b/src/services/backup/dispatcher.rs index b6aa2bc..bf9145a 100644 --- a/src/services/backup/dispatcher.rs +++ b/src/services/backup/dispatcher.rs @@ -1,11 +1,10 @@ use super::service::BackupService; -use crate::services::config::DatabasesConfig; use crate::services::api::models::agent::status::DatabaseStorage; +use crate::services::config::DatabasesConfig; use crate::utils::common::BackupMethod; use tracing::error; impl BackupService { - pub async fn dispatch( &self, generated_id: &String, @@ -14,7 +13,6 @@ impl BackupService { storages: &Vec, encrypt: bool, ) { - let Some(cfg) = config .databases .iter() @@ -41,4 +39,4 @@ impl BackupService { } }); } -} \ No newline at end of file +} diff --git a/src/services/backup/executor.rs b/src/services/backup/executor.rs index 203b21f..4b88c86 100644 --- a/src/services/backup/executor.rs +++ b/src/services/backup/executor.rs @@ -1,14 +1,13 @@ use super::service::BackupService; -use crate::services::config::DatabaseConfig; use crate::services::api::models::agent::status::DatabaseStorage; +use crate::services::config::DatabaseConfig; use crate::utils::common::BackupMethod; use crate::utils::locks::FileLock; -use tempfile::TempDir; use anyhow::Result; +use tempfile::TempDir; impl BackupService { - pub async fn execute_backup( &self, generated_id: String, @@ -17,11 +16,10 @@ impl BackupService { storages: Vec, encrypt: bool, ) -> Result<()> { - if FileLock::is_locked(&generated_id).await? { anyhow::bail!("backup already running"); } - + let backup = self.create_backup_record(&generated_id, &method).await?; let backup_id = backup.backup.id; @@ -46,4 +44,4 @@ impl BackupService { Ok(()) } -} \ No newline at end of file +} diff --git a/src/services/backup/helpers.rs b/src/services/backup/helpers.rs index a5a0130..3d63a82 100644 --- a/src/services/backup/helpers.rs +++ b/src/services/backup/helpers.rs @@ -1,16 +1,14 @@ use super::service::BackupService; +use crate::services::api::models::agent::backup::BackupResponse; use crate::utils::common::BackupMethod; use anyhow::{Result, anyhow}; -use crate::services::api::models::agent::backup::BackupResponse; impl BackupService { - pub async fn create_backup_record( &self, generated_id: &str, method: &BackupMethod, ) -> Result { - let response = self .ctx .api @@ -23,4 +21,4 @@ impl BackupService { response.ok_or_else(|| anyhow!("backup_create returned empty response")) } -} \ No newline at end of file +} diff --git a/src/services/backup/mod.rs b/src/services/backup/mod.rs index 2e1b5f0..98f1aec 100644 --- a/src/services/backup/mod.rs +++ b/src/services/backup/mod.rs @@ -1,11 +1,11 @@ -pub mod service; +pub mod compressor; pub mod dispatcher; pub mod executor; -pub mod compressor; -pub mod uploader; -pub mod result; -pub mod models; pub mod helpers; +pub mod models; +pub mod result; pub mod runner; +pub mod service; +pub mod uploader; -pub use service::BackupService; \ No newline at end of file +pub use service::BackupService; diff --git a/src/services/backup/models.rs b/src/services/backup/models.rs index f9db440..def8787 100644 --- a/src/services/backup/models.rs +++ b/src/services/backup/models.rs @@ -1,7 +1,7 @@ #![allow(dead_code)] -use std::path::PathBuf; use crate::services::config::DbType; +use std::path::PathBuf; #[derive(Debug, Clone)] pub struct BackupResult { @@ -19,4 +19,4 @@ pub struct UploadResult { pub error: Option, pub remote_file_path: Option, pub total_size: Option, -} \ No newline at end of file +} diff --git a/src/services/backup/runner.rs b/src/services/backup/runner.rs index 3e7708c..e2bacd9 100644 --- a/src/services/backup/runner.rs +++ b/src/services/backup/runner.rs @@ -9,12 +9,7 @@ use std::path::Path; use tracing::{error, info}; impl BackupService { - - pub async fn run( - cfg: DatabaseConfig, - tmp_path: &Path, - ) -> Result { - + pub async fn run(cfg: DatabaseConfig, tmp_path: &Path) -> Result { let db = DatabaseFactory::create_for_backup(cfg.clone()).await; let generated_id = cfg.generated_id.clone(); @@ -40,8 +35,7 @@ impl BackupService { }); } - match db.backup(tmp_path, Some(false)).await { - + match db.backup(tmp_path).await { Ok(file) => Ok(BackupResult { generated_id, db_type, @@ -67,4 +61,4 @@ impl BackupService { }), } } -} \ No newline at end of file +} diff --git a/src/services/backup/service.rs b/src/services/backup/service.rs index 043e88d..31f6c51 100644 --- a/src/services/backup/service.rs +++ b/src/services/backup/service.rs @@ -1,5 +1,5 @@ -use std::sync::Arc; use crate::core::context::Context as CoreContext; +use std::sync::Arc; pub struct BackupService { pub ctx: Arc, @@ -9,4 +9,4 @@ impl BackupService { pub fn new(ctx: Arc) -> Self { Self { ctx } } -} \ No newline at end of file +} diff --git a/src/services/backup/uploader.rs b/src/services/backup/uploader.rs index 8136f27..35ecdf5 100644 --- a/src/services/backup/uploader.rs +++ b/src/services/backup/uploader.rs @@ -1,16 +1,15 @@ -use super::service::BackupService; use super::models::{BackupResult, UploadResult}; +use super::service::BackupService; -use crate::services::storage; use crate::services::api::models::agent::status::DatabaseStorage; +use crate::services::storage; use crate::utils::common::BackupMethod; -use futures::future::join_all; use anyhow::{Result, bail}; -use tracing::{info, error}; +use futures::future::join_all; +use tracing::{error, info}; impl BackupService { - pub async fn upload( &self, result: BackupResult, @@ -19,7 +18,6 @@ impl BackupService { encrypt: bool, backup_id: &String, ) -> Result> { - if result.code.as_deref() == Some("backup_already_in_progress") { info!("Skipping send: backup already in progress"); bail!("backup_already_in_progress"); @@ -28,7 +26,6 @@ impl BackupService { let ctx = self.ctx.clone(); let futures = storages.into_iter().map(|storage| { - let ctx_clone = ctx.clone(); let result_clone = result.clone(); let provider = storage::get_provider(&storage); @@ -37,13 +34,16 @@ impl BackupService { let generated_id = result_clone.generated_id.clone(); async move { - - info!("Uploading storage -> {:?} for {:?}", storage.provider, storage_id); + info!( + "Uploading storage -> {:?} for {:?}", + storage.provider, storage_id + ); /* INIT STEP */ - let init = match ctx_clone.api + let init = match ctx_clone + .api .backup_upload_init( ctx_clone.edge_key.agent_id.clone(), generated_id.clone(), @@ -107,7 +107,11 @@ impl BackupService { ) .await; - let status = if upload_result.success { "success" } else { "failed" }; + let status = if upload_result.success { + "success" + } else { + "failed" + }; if status != "success" { return upload_result; @@ -115,49 +119,48 @@ impl BackupService { info!( "Storage {} uploaded to remote path {:?}", - storage_id, - upload_result.remote_file_path + storage_id, upload_result.remote_file_path ); /* METADATA VALIDATION */ - let (remote_path, total_size) = match ( - &upload_result.remote_file_path, - upload_result.total_size, - ) { - (Some(path), Some(size)) => (path.clone(), size), - _ => { - return UploadResult { - storage_id, - success: false, - error: Some("remote_file_path or total_size missing".into()), - remote_file_path: None, - total_size: None, - }; - } - }; + let (remote_path, total_size) = + match (&upload_result.remote_file_path, upload_result.total_size) { + (Some(path), Some(size)) => (path.clone(), size), + _ => { + return UploadResult { + storage_id, + success: false, + error: Some("remote_file_path or total_size missing".into()), + remote_file_path: None, + total_size: None, + }; + } + }; /* STATUS UPDATE */ - match ctx_clone.api.backup_upload_status( - ctx_clone.edge_key.agent_id.clone(), - generated_id, - backup_storage_id, - status, - remote_path, - total_size, - backup_id, - ).await { - + match ctx_clone + .api + .backup_upload_status( + ctx_clone.edge_key.agent_id.clone(), + generated_id, + backup_storage_id, + status, + remote_path, + total_size, + backup_id, + ) + .await + { Ok(_) => upload_result, Err(err) => { error!( "backup_upload_status failed (storage_id={}): {}", - storage_id, - err + storage_id, err ); UploadResult { @@ -178,4 +181,4 @@ impl BackupService { Ok(results) } -} \ No newline at end of file +} diff --git a/src/services/config.rs b/src/services/config.rs index 69460dd..d68989c 100644 --- a/src/services/config.rs +++ b/src/services/config.rs @@ -20,7 +20,7 @@ pub enum DbType { MongoDB, Sqlite, Redis, - Valkey + Valkey, } impl DbType { @@ -58,7 +58,6 @@ pub struct DatabasesConfig { pub databases: Vec, } - #[allow(dead_code)] #[derive(Debug, Deserialize, Clone)] pub struct InputDatabaseConfig { @@ -80,7 +79,6 @@ pub struct InputDatabasesConfig { pub databases: Vec, } - pub struct ConfigService { ctx: Arc, } @@ -133,17 +131,27 @@ impl ConfigService { _ => return Err("Unsupported config file format. Use .json or .toml".to_string()), }; - fn required(opt: &Option, db_name: &str, field_name: &str) -> Result { + fn required( + opt: &Option, + db_name: &str, + field_name: &str, + ) -> Result { match opt { Some(v) => Ok(v.clone()), None => { - let msg = format!("Missing required field '{}' for database '{}'", field_name, db_name); + let msg = format!( + "Missing required field '{}' for database '{}'", + field_name, db_name + ); Err(msg) } } } - fn optional(opt: &Option) -> T where T: Default { + fn optional(opt: &Option) -> T + where + T: Default, + { opt.clone().unwrap_or_default() } @@ -155,28 +163,42 @@ impl ConfigService { } let username = match db.db_type { - DbType::Postgresql | DbType::Mysql | DbType::Mariadb => required(&db.username, &db.name, "username")?, + DbType::Postgresql | DbType::Mysql | DbType::Mariadb => { + required(&db.username, &db.name, "username")? + } _ => optional(&db.username), }; let password = match db.db_type { - DbType::Postgresql | DbType::Mysql | DbType::Mariadb => required(&db.password, &db.name, "password")?, + DbType::Postgresql | DbType::Mysql | DbType::Mariadb => { + required(&db.password, &db.name, "password")? + } _ => optional(&db.password), }; let host = match db.db_type { - DbType::Postgresql | DbType::Mysql | DbType::Mariadb | DbType::MongoDB | DbType::Redis | DbType::Valkey => required(&db.host, &db.name, "host")?, + DbType::Postgresql + | DbType::Mysql + | DbType::Mariadb + | DbType::MongoDB + | DbType::Redis + | DbType::Valkey => required(&db.host, &db.name, "host")?, DbType::Sqlite => optional(&db.host), }; let port = match db.db_type { - DbType::Postgresql | DbType::Mysql | DbType::Mariadb | DbType::MongoDB | DbType::Redis | DbType::Valkey => required(&db.port, &db.name, "port")?, + DbType::Postgresql + | DbType::Mysql + | DbType::Mariadb + | DbType::MongoDB + | DbType::Redis + | DbType::Valkey => required(&db.port, &db.name, "port")?, DbType::Sqlite => db.port.unwrap_or(0), }; let database_name = match db.db_type { DbType::Sqlite | DbType::Redis | DbType::Valkey => optional(&db.database), - _ => required(&db.database, &db.name, "database")? + _ => required(&db.database, &db.name, "database")?, }; let path_val = match db.db_type { diff --git a/src/services/cron.rs b/src/services/cron.rs index 9895bb2..87aa1a3 100644 --- a/src/services/cron.rs +++ b/src/services/cron.rs @@ -1,13 +1,13 @@ #![allow(dead_code)] use crate::core::context::Context; +use crate::services::api::models::agent::status::DatabaseStatus; use crate::utils::common::vec_to_option_json; use crate::utils::redis_client; use crate::utils::task_manager::cron::check_and_update_cron; use redis::aio::MultiplexedConnection; use serde_json::{Value, json}; use std::sync::Arc; -use crate::services::api::models::agent::status::DatabaseStatus; pub struct CronService { ctx: Arc, diff --git a/src/services/mod.rs b/src/services/mod.rs index 009cfbc..c4f3f04 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -1,7 +1,7 @@ +pub mod api; +pub mod backup; pub mod config; pub mod cron; -pub mod backup; pub mod restore; +pub mod status; mod storage; -pub mod api; -pub mod status; \ No newline at end of file diff --git a/src/services/restore/archive.rs b/src/services/restore/archive.rs index 6cb457c..be87159 100644 --- a/src/services/restore/archive.rs +++ b/src/services/restore/archive.rs @@ -7,13 +7,11 @@ use anyhow::Result; use std::path::{Path, PathBuf}; impl RestoreService { - pub async fn prepare_archive( &self, downloaded_file: PathBuf, tmp_path: &Path, ) -> Result { - let filename = downloaded_file .file_name() .unwrap() @@ -31,7 +29,6 @@ impl RestoreService { let mut archive = downloaded_file.clone(); if encrypted { - let new_name = filename.strip_suffix(".enc").unwrap(); let decrypted = tmp_path.join(new_name); @@ -41,7 +38,7 @@ impl RestoreService { decrypted.clone(), self.ctx.edge_key.master_key_b64.clone(), ) - .await?; + .await?; archive = decrypted; } @@ -58,4 +55,4 @@ impl RestoreService { Ok(archive) } } -} \ No newline at end of file +} diff --git a/src/services/restore/dispatcher.rs b/src/services/restore/dispatcher.rs index 12a0fd5..bf0c9c4 100644 --- a/src/services/restore/dispatcher.rs +++ b/src/services/restore/dispatcher.rs @@ -1,13 +1,11 @@ use super::service::RestoreService; -use crate::services::config::DatabasesConfig; use crate::services::api::models::agent::status::DatabaseStatus; +use crate::services::config::DatabasesConfig; use tracing::error; impl RestoreService { - pub async fn dispatch(&self, db: &DatabaseStatus, config: &DatabasesConfig) { - let Some(cfg) = config .databases .iter() @@ -29,14 +27,9 @@ impl RestoreService { let db_cfg = cfg.clone(); tokio::spawn(async move { - - if let Err(e) = service - .execute_restore(db_cfg, file_to_restore) - .await - { + if let Err(e) = service.execute_restore(db_cfg, file_to_restore).await { error!("Restore failed: {}", e); } - }); } -} \ No newline at end of file +} diff --git a/src/services/restore/downloader.rs b/src/services/restore/downloader.rs index 6646b8c..0b6cf41 100644 --- a/src/services/restore/downloader.rs +++ b/src/services/restore/downloader.rs @@ -1,18 +1,12 @@ use super::service::RestoreService; -use reqwest::{Client, Url}; use anyhow::Result; +use reqwest::{Client, Url}; use std::path::{Path, PathBuf}; use tracing::info; impl RestoreService { - - pub async fn download_backup( - &self, - file_url: &str, - tmp_path: &Path, - ) -> Result { - + pub async fn download_backup(&self, file_url: &str, tmp_path: &Path) -> Result { let client = Client::new(); let response = client.get(file_url).send().await?; @@ -28,7 +22,6 @@ impl RestoreService { .and_then(|s| s.split("filename=").nth(1)) .map(|f| f.trim_matches('"').to_string()); - let filename_from_url = Url::parse(file_url).ok().and_then(|u| { u.path_segments()? .last() @@ -50,4 +43,4 @@ impl RestoreService { Ok(path) } -} \ No newline at end of file +} diff --git a/src/services/restore/executor.rs b/src/services/restore/executor.rs index 4cabb5b..b790623 100644 --- a/src/services/restore/executor.rs +++ b/src/services/restore/executor.rs @@ -1,17 +1,11 @@ use super::service::RestoreService; use crate::services::config::DatabaseConfig; -use tempfile::TempDir; use anyhow::Result; +use tempfile::TempDir; use tracing::info; impl RestoreService { - - pub async fn execute_restore( - &self, - cfg: DatabaseConfig, - file_url: String, - ) -> Result<()> { - + pub async fn execute_restore(&self, cfg: DatabaseConfig, file_url: String) -> Result<()> { let temp_dir = TempDir::new()?; let tmp_path = temp_dir.path(); @@ -27,4 +21,4 @@ impl RestoreService { Ok(()) } -} \ No newline at end of file +} diff --git a/src/services/restore/mod.rs b/src/services/restore/mod.rs index 4ca2ebe..72f54ee 100644 --- a/src/services/restore/mod.rs +++ b/src/services/restore/mod.rs @@ -1,10 +1,10 @@ -pub mod service; +pub mod archive; pub mod dispatcher; -pub mod executor; pub mod downloader; -pub mod archive; -pub mod runner; -pub mod result; +pub mod executor; pub mod models; +pub mod result; +pub mod runner; +pub mod service; -pub use service::RestoreService; \ No newline at end of file +pub use service::RestoreService; diff --git a/src/services/restore/models.rs b/src/services/restore/models.rs index e987903..d459114 100644 --- a/src/services/restore/models.rs +++ b/src/services/restore/models.rs @@ -5,4 +5,4 @@ pub struct RestoreResult { #[serde(rename = "generatedId")] pub generated_id: String, pub status: String, -} \ No newline at end of file +} diff --git a/src/services/restore/result.rs b/src/services/restore/result.rs index d7325bf..d3f61c1 100644 --- a/src/services/restore/result.rs +++ b/src/services/restore/result.rs @@ -1,24 +1,25 @@ -use super::service::RestoreService; use super::models::RestoreResult; +use super::service::RestoreService; -use tracing::{info, error}; +use tracing::{error, info}; impl RestoreService { pub async fn send_result(&self, result: RestoreResult) { - info!( "[RestoreService] DB: {} | Status: {}", result.generated_id, result.status ); - match self.ctx + match self + .ctx .api .restore_result( self.ctx.edge_key.agent_id.clone(), &result.generated_id, &result.status, ) - .await { + .await + { Ok(_) => { info!("Restoration result sent successfully"); } @@ -27,4 +28,4 @@ impl RestoreService { } } } -} \ No newline at end of file +} diff --git a/src/services/restore/runner.rs b/src/services/restore/runner.rs index 36751dd..3806865 100644 --- a/src/services/restore/runner.rs +++ b/src/services/restore/runner.rs @@ -1,21 +1,19 @@ -use super::service::RestoreService; use super::models::RestoreResult; +use super::service::RestoreService; use crate::domain::factory::DatabaseFactory; use crate::services::config::DatabaseConfig; use anyhow::Result; use std::path::PathBuf; -use tracing::{info, error}; +use tracing::{error, info}; impl RestoreService { - pub async fn run_restore( &self, cfg: DatabaseConfig, backup_file: PathBuf, ) -> Result { - let generated_id = cfg.generated_id.clone(); let db = DatabaseFactory::create_for_restore(cfg.clone(), &backup_file).await; @@ -31,8 +29,7 @@ impl RestoreService { }); } - match db.restore(&backup_file, Some(false)).await { - + match db.restore(&backup_file).await { Ok(_) => Ok(RestoreResult { generated_id, status: "success".into(), @@ -48,4 +45,4 @@ impl RestoreService { } } } -} \ No newline at end of file +} diff --git a/src/services/restore/service.rs b/src/services/restore/service.rs index 5b6be8d..b2f59bb 100644 --- a/src/services/restore/service.rs +++ b/src/services/restore/service.rs @@ -1,5 +1,5 @@ -use std::sync::Arc; use crate::core::context::Context; +use std::sync::Arc; pub struct RestoreService { pub ctx: Arc, @@ -9,4 +9,4 @@ impl RestoreService { pub fn new(ctx: Arc) -> Self { Self { ctx } } -} \ No newline at end of file +} diff --git a/src/services/status.rs b/src/services/status.rs index fe2dba3..0ddfba1 100644 --- a/src/services/status.rs +++ b/src/services/status.rs @@ -1,13 +1,13 @@ #![allow(dead_code)] use crate::core::context::Context; +use crate::services::api::endpoints::status::DatabasePayload; +use crate::services::api::models::agent::status::PingResult; use crate::services::config::DatabaseConfig; use crate::settings::CONFIG; use reqwest::Client; use std::error::Error; use std::sync::Arc; -use crate::services::api::endpoints::status::DatabasePayload; -use crate::services::api::models::agent::status::PingResult; pub struct StatusService { ctx: Arc, @@ -35,7 +35,12 @@ impl StatusService { .collect(); let version_str = CONFIG.app_version.as_str(); - let result = self.ctx.api.agent_status(&edge_key.agent_id, &version_str, databases_payload).await?.unwrap(); + let result = self + .ctx + .api + .agent_status(&edge_key.agent_id, &version_str, databases_payload) + .await? + .unwrap(); Ok(result) } } diff --git a/src/services/storage/mod.rs b/src/services/storage/mod.rs index e44d142..02d3aaf 100644 --- a/src/services/storage/mod.rs +++ b/src/services/storage/mod.rs @@ -1,15 +1,15 @@ pub mod providers; use crate::core::context::Context; +use crate::services::api::models::agent::status::DatabaseStorage; +use crate::services::backup::models::{BackupResult, UploadResult}; use crate::utils::common::BackupMethod; use async_trait::async_trait; +use providers::google_drive; use providers::local; use providers::s3; -use providers::google_drive; use std::sync::Arc; use tracing::{error, info}; -use crate::services::api::models::agent::status::DatabaseStorage; -use crate::services::backup::models::{BackupResult, UploadResult}; #[async_trait] pub trait StorageProvider: Send + Sync { diff --git a/src/services/storage/providers/google_drive/helpers.rs b/src/services/storage/providers/google_drive/helpers.rs index e7744bd..688bfa4 100644 --- a/src/services/storage/providers/google_drive/helpers.rs +++ b/src/services/storage/providers/google_drive/helpers.rs @@ -1,15 +1,15 @@ +use crate::services::storage::providers::google_drive::models::GoogleDriveProviderConfig; use anyhow::{Context, Result, anyhow}; +use bytes::Bytes; +use futures::Stream; use futures::StreamExt; use oauth2::{ AuthUrl, ClientId, ClientSecret, RefreshToken, TokenResponse, TokenUrl, basic::BasicClient, reqwest::Client as OAuth2ReqwestClient, }; use reqwest::{Client as ReqwestClient, StatusCode}; -use serde_json::{Value, json}; -use futures::{Stream}; -use bytes::Bytes; use reqwest::{Client, header}; -use crate::services::storage::providers::google_drive::models::GoogleDriveProviderConfig; +use serde_json::{Value, json}; pub async fn get_google_drive_token(config: &GoogleDriveProviderConfig) -> Result { let http_client = OAuth2ReqwestClient::new(); @@ -34,7 +34,10 @@ pub async fn get_google_drive_token(config: &GoogleDriveProviderConfig) -> Resul Ok(token_result.access_token().secret().clone()) } -pub async fn ensure_folder_path(config: &GoogleDriveProviderConfig, path_parts: &[&str]) -> Result { +pub async fn ensure_folder_path( + config: &GoogleDriveProviderConfig, + path_parts: &[&str], +) -> Result { if path_parts.is_empty() { return Ok(config.folder_id.clone()); } @@ -152,7 +155,10 @@ pub async fn upload_stream_to_google_drive( let folder_id = ensure_folder_path(config, folder_path).await?; - if find_file_by_name(config, file_name, &folder_id).await?.is_some() { + if find_file_by_name(config, file_name, &folder_id) + .await? + .is_some() + { return Err(anyhow::anyhow!("File already exists: {}", full_path)); } @@ -172,7 +178,7 @@ pub async fn upload_stream_to_google_drive( .post("https://www.googleapis.com/upload/drive/v3/files?uploadType=resumable") .bearer_auth(&token) .header("X-Upload-Content-Type", mime) - .header("X-Upload-Content-Length", total_size.to_string()) // Helps a lot + .header("X-Upload-Content-Length", total_size.to_string()) // Helps a lot .json(&metadata) .send() .await @@ -237,12 +243,15 @@ pub async fn upload_stream_to_google_drive( .put(&upload_url) .header("Content-Range", &content_range) .header("Content-Length", chunk_bytes.len().to_string()) - .body(chunk_bytes.clone()) // clone is cheap if small; optimize later if needed + .body(chunk_bytes.clone()) // clone is cheap if small; optimize later if needed .send() .await; match res { - Ok(resp) if resp.status().is_success() || resp.status() == StatusCode::PERMANENT_REDIRECT => { + Ok(resp) + if resp.status().is_success() + || resp.status() == StatusCode::PERMANENT_REDIRECT => + { // 200 or 308 = good uploaded += chunk_bytes.len() as u64; tracing::info!("Uploaded {}/{} bytes", uploaded, total_size); diff --git a/src/services/storage/providers/google_drive/mod.rs b/src/services/storage/providers/google_drive/mod.rs index c5182d5..864cf75 100644 --- a/src/services/storage/providers/google_drive/mod.rs +++ b/src/services/storage/providers/google_drive/mod.rs @@ -1,9 +1,12 @@ mod helpers; -mod models; +mod models; use crate::core::context::Context; use crate::services::api::models::agent::status::DatabaseStorage; +use crate::services::backup::models::{BackupResult, UploadResult}; use crate::services::storage::StorageProvider; +use crate::services::storage::providers::google_drive::helpers::upload_stream_to_google_drive; +use crate::services::storage::providers::google_drive::models::GoogleDriveProviderConfig; use crate::utils::common::BackupMethod; use crate::utils::file::{full_file_name, full_file_path}; use crate::utils::stream::build_stream; @@ -11,9 +14,6 @@ use async_trait::async_trait; use std::sync::Arc; use tokio::fs; use tracing::{error, info}; -use crate::services::backup::models::{BackupResult, UploadResult}; -use crate::services::storage::providers::google_drive::helpers::{upload_stream_to_google_drive}; -use crate::services::storage::providers::google_drive::models::GoogleDriveProviderConfig; pub struct GoogleDriveProvider {} @@ -53,13 +53,7 @@ impl StorageProvider for GoogleDriveProvider { let encrypt = encrypt.unwrap_or(false); - let upload = match build_stream( - &file_path, - encrypt, - &ctx.edge_key.master_key_b64 - ) - .await - { + let upload = match build_stream(&file_path, encrypt, &ctx.edge_key.master_key_b64).await { Ok(u) => u, Err(e) => { error!("Stream build failed: {}", e); @@ -86,7 +80,6 @@ impl StorageProvider for GoogleDriveProvider { } }; - let file_name = full_file_name(encrypt); info!("Uploading file {}", file_name); @@ -96,12 +89,13 @@ impl StorageProvider for GoogleDriveProvider { match upload_stream_to_google_drive( &config, &remote_file_path, - upload.stream, + upload.stream, total_size, Some("application/octet-stream"), - ).await { + ) + .await + { Ok(_file_id) => { - info!("Google Drive upload successful"); UploadResult { @@ -124,6 +118,5 @@ impl StorageProvider for GoogleDriveProvider { } } } - } } diff --git a/src/services/storage/providers/google_drive/models.rs b/src/services/storage/providers/google_drive/models.rs index 98f725b..401edfd 100644 --- a/src/services/storage/providers/google_drive/models.rs +++ b/src/services/storage/providers/google_drive/models.rs @@ -7,5 +7,3 @@ pub struct GoogleDriveProviderConfig { pub refresh_token: String, pub folder_id: String, } - - diff --git a/src/services/storage/providers/local.rs b/src/services/storage/providers/local.rs index 80eae97..b857ad7 100644 --- a/src/services/storage/providers/local.rs +++ b/src/services/storage/providers/local.rs @@ -1,5 +1,6 @@ use crate::core::context::Context; use crate::services::api::models::agent::status::DatabaseStorage; +use crate::services::backup::models::{BackupResult, UploadResult}; use crate::services::storage::StorageProvider; use crate::utils::common::BackupMethod; use crate::utils::file::{full_file_name, full_file_path}; @@ -10,7 +11,6 @@ use reqwest::header::{HeaderMap, HeaderValue}; use std::sync::Arc; use tokio::fs; use tracing::error; -use crate::services::backup::models::{BackupResult, UploadResult}; pub struct LocalProvider; @@ -53,13 +53,7 @@ impl StorageProvider for LocalProvider { } }; - let upload = match build_stream( - &file_path, - encrypt, - &ctx.edge_key.master_key_b64 - ) - .await - { + let upload = match build_stream(&file_path, encrypt, &ctx.edge_key.master_key_b64).await { Ok(u) => u, Err(e) => { error!("Stream build failed: {}", e); @@ -68,7 +62,7 @@ impl StorageProvider for LocalProvider { success: false, error: Some(e.to_string()), remote_file_path: None, - total_size: None + total_size: None, }; } }; @@ -76,7 +70,10 @@ impl StorageProvider for LocalProvider { let mut extra_headers = HeaderMap::new(); extra_headers.insert("X-File-Name", HeaderValue::from_str(&file_name).unwrap()); - extra_headers.insert("X-File-Size", HeaderValue::from_str(&total_size.to_string()).unwrap()); + extra_headers.insert( + "X-File-Size", + HeaderValue::from_str(&total_size.to_string()).unwrap(), + ); extra_headers.insert( "X-File-Path", HeaderValue::from_str(&remote_file_path).unwrap(), @@ -90,10 +87,17 @@ impl StorageProvider for LocalProvider { "X-Method", HeaderValue::from_str(&method.to_string()).unwrap(), ); - + let tus_endpoint = format!("{}/tus/files", ctx.edge_key.server_url); - match upload_to_tus_stream_with_headers(upload.stream, &tus_endpoint, extra_headers, total_size).await { + match upload_to_tus_stream_with_headers( + upload.stream, + &tus_endpoint, + extra_headers, + total_size, + ) + .await + { Ok(_) => UploadResult { storage_id: storage.id.clone(), success: true, diff --git a/src/services/storage/providers/mod.rs b/src/services/storage/providers/mod.rs index 1d01d6d..9dd23be 100644 --- a/src/services/storage/providers/mod.rs +++ b/src/services/storage/providers/mod.rs @@ -1,3 +1,3 @@ +pub mod google_drive; pub mod local; pub mod s3; -pub mod google_drive; \ No newline at end of file diff --git a/src/services/storage/providers/s3/mod.rs b/src/services/storage/providers/s3/mod.rs index dd6fccf..368d021 100644 --- a/src/services/storage/providers/s3/mod.rs +++ b/src/services/storage/providers/s3/mod.rs @@ -2,26 +2,26 @@ mod models; use crate::core::context::Context; use crate::services::api::models::agent::status::DatabaseStorage; +use crate::services::backup::models::{BackupResult, UploadResult}; use crate::services::storage::StorageProvider; use crate::services::storage::providers::s3::models::S3ProviderConfig; use crate::utils::common::BackupMethod; use crate::utils::file::{full_file_name, full_file_path}; use crate::utils::stream::build_stream; use async_trait::async_trait; +use aws_config::retry::RetryConfig; use aws_sdk_s3 as s3; use aws_sdk_s3::config::BehaviorVersion; use aws_sdk_s3::config::Region; +use aws_sdk_s3::config::retry::ReconnectMode; use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart}; use futures::StreamExt; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use aws_config::retry::RetryConfig; -use aws_sdk_s3::config::retry::ReconnectMode; use tokio::fs; use tracing::{error, info}; -use crate::services::backup::models::{BackupResult, UploadResult}; pub struct S3Provider {} @@ -97,15 +97,28 @@ impl StorageProvider for S3Provider { ); let region = Region::new(config.region.clone().unwrap_or("us-east-1".to_string())); - + let endpoint = if let Some(port) = &config.port { if port.trim().is_empty() { - format!("{}://{}", if config.ssl { "https" } else { "http" }, config.end_point_url) + format!( + "{}://{}", + if config.ssl { "https" } else { "http" }, + config.end_point_url + ) } else { - format!("{}://{}:{}", if config.ssl { "https" } else { "http" }, config.end_point_url, port) + format!( + "{}://{}:{}", + if config.ssl { "https" } else { "http" }, + config.end_point_url, + port + ) } } else { - format!("{}://{}", if config.ssl { "https" } else { "http" }, config.end_point_url) + format!( + "{}://{}", + if config.ssl { "https" } else { "http" }, + config.end_point_url + ) }; info!("S3 endpoint to {}", &endpoint); diff --git a/src/services/storage/providers/s3/models.rs b/src/services/storage/providers/s3/models.rs index ab3036c..63b795d 100644 --- a/src/services/storage/providers/s3/models.rs +++ b/src/services/storage/providers/s3/models.rs @@ -1,5 +1,5 @@ -use serde::{Deserialize, Serialize}; use crate::utils::deserializer::string_or_number_to_string; +use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize, Serialize)] pub struct S3ProviderConfig { @@ -12,4 +12,3 @@ pub struct S3ProviderConfig { #[serde(default, deserialize_with = "string_or_number_to_string")] pub port: Option, } - diff --git a/src/tasks/mod.rs b/src/tasks/mod.rs index 924dabf..a766209 100644 --- a/src/tasks/mod.rs +++ b/src/tasks/mod.rs @@ -1 +1 @@ -pub mod ping; \ No newline at end of file +pub mod ping; diff --git a/src/tasks/ping.rs b/src/tasks/ping.rs index 70d57ec..04771e0 100644 --- a/src/tasks/ping.rs +++ b/src/tasks/ping.rs @@ -21,4 +21,4 @@ pub async fn ping_server() { } tokio::time::sleep(Duration::from_secs(CONFIG.pooling as u64)).await; } -} \ No newline at end of file +} diff --git a/src/tests/domain/mariadb.rs b/src/tests/domain/mariadb.rs new file mode 100644 index 0000000..6538151 --- /dev/null +++ b/src/tests/domain/mariadb.rs @@ -0,0 +1,94 @@ +use crate::domain::factory::DatabaseFactory; +use crate::services::config::{DatabaseConfig, DbType}; +use crate::tests::init_tracing_for_test; +use crate::utils::compress::{compress_to_tar_gz_large, decompress_large_tar_gz}; +use oauth2::url; +use std::path::PathBuf; +use tempfile::TempDir; +use testcontainers::runners::AsyncRunner; +use testcontainers::{ContainerAsync, ImageExt}; +use testcontainers_modules::mariadb::Mariadb; +use tracing::{error, info}; +use url::Host; + +async fn create_config() -> (ContainerAsync, DatabaseConfig) { + let container = Mariadb::default().with_tag("11.3").start().await.unwrap(); + + let host = container + .get_host() + .await + .unwrap_or(Host::parse("127.0.0.1").unwrap()); + + let port = container.get_host_port_ipv4(3306).await.unwrap_or(3306); + + let config = DatabaseConfig { + name: "Test MariaDB".to_string(), + database: "test".to_string(), + db_type: DbType::Mariadb, + username: "root".to_string(), + password: "".to_string(), + port, + host: host.to_string(), + generated_id: "3c4b4eb4-c2c6-4bde-a423-ee1385dcf6d2".to_string(), + path: "".to_string(), + }; + + (container, config) +} + +#[tokio::test] +async fn mariadb_ping_test() { + init_tracing_for_test(); + + let (_container, config) = create_config().await; + + let db = DatabaseFactory::create_for_backup(config.clone()).await; + let reachable = db.ping().await.unwrap_or(false); + + assert!(reachable); +} + +#[tokio::test] +async fn mariadb_backup_restore_test() { + init_tracing_for_test(); + + let (_container, config) = create_config().await; + + let temp_dir = TempDir::new().unwrap(); + let backup_path = temp_dir.path(); + + let db = DatabaseFactory::create_for_backup(config.clone()).await; + let file_path = db.backup(backup_path).await.unwrap(); + + assert!(file_path.is_file()); + + let compression = compress_to_tar_gz_large(&file_path).await.unwrap(); + assert!(compression.compressed_path.is_file()); + + let files = decompress_large_tar_gz(compression.compressed_path.as_path(), temp_dir.path()) + .await + .unwrap(); + + let backup_file: PathBuf = if files.len() == 1 { + files[0].clone() + } else { + "".into() + }; + + let db = DatabaseFactory::create_for_restore(config.clone(), &backup_file).await; + let reachable = db.ping().await.unwrap_or(false); + + info!("Reachable: {}", reachable); + assert!(reachable); + + match db.restore(&backup_file).await { + Ok(_) => { + info!("Restore succeeded for {}", config.generated_id); + assert!(true) + } + Err(e) => { + error!("Restore failed for {}: {:?}", config.generated_id, e); + assert!(false) + } + } +} diff --git a/src/tests/domain/mod.rs b/src/tests/domain/mod.rs index 7668a49..bfcb388 100644 --- a/src/tests/domain/mod.rs +++ b/src/tests/domain/mod.rs @@ -1,3 +1,6 @@ +mod mariadb; +mod mongodb; +mod mysql; mod postgres; mod redis; -mod valkey; \ No newline at end of file +mod valkey; diff --git a/src/tests/domain/mongodb.rs b/src/tests/domain/mongodb.rs new file mode 100644 index 0000000..1b86ae2 --- /dev/null +++ b/src/tests/domain/mongodb.rs @@ -0,0 +1,98 @@ +use crate::domain::factory::DatabaseFactory; +use crate::services::config::{DatabaseConfig, DbType}; +use crate::tests::init_tracing_for_test; +use mongodb::{Client, bson::doc}; +use tempfile::TempDir; +use testcontainers::ContainerAsync; +use testcontainers::runners::AsyncRunner; +use testcontainers_modules::mongo::Mongo; +use tracing::{error, info}; +use url::Host; + +async fn create_config() -> (ContainerAsync, DatabaseConfig) { + let container = Mongo::default().start().await.unwrap(); + + let host = container + .get_host() + .await + .unwrap_or(Host::parse("127.0.0.1").unwrap()); + + let port = container.get_host_port_ipv4(27017).await.unwrap_or(27017); + + let config = DatabaseConfig { + name: "Test MongoDB".to_string(), + database: "testdb".to_string(), + db_type: DbType::MongoDB, + username: "".to_string(), + password: "".to_string(), + port, + host: host.to_string(), + generated_id: "96d30a9f-ff4b-47c9-aaab-f3147bb34f16".to_string(), + path: "".to_string(), + }; + + (container, config) +} + +async fn seed_database(config: &DatabaseConfig) { + let client = Client::with_uri_str(format!( + "mongodb://{}:{}/{}", + config.host, config.port, config.database + )) + .await + .unwrap(); + + let collection = client + .database(&config.database) + .collection::("sample"); + + collection + .insert_one(doc! { "name": "hello mongo" }) + .await + .unwrap(); +} + +#[tokio::test] +async fn mongodb_ping_test() { + init_tracing_for_test(); + + let (_container, config) = create_config().await; + + let db = DatabaseFactory::create_for_backup(config.clone()).await; + let reachable = db.ping().await.unwrap_or(false); + + assert!(reachable); +} + +#[tokio::test] +async fn mongodb_backup_restore_test() { + init_tracing_for_test(); + + let (_container, config) = create_config().await; + seed_database(&config).await; + + let temp_dir = TempDir::new().unwrap(); + let backup_path = temp_dir.path(); + + let db = DatabaseFactory::create_for_backup(config.clone()).await; + let file_path = db.backup(backup_path).await.unwrap(); + info!("Backup path: {:?}", file_path); + assert!(file_path.is_file()); + + let db = DatabaseFactory::create_for_restore(config.clone(), &file_path).await; + let reachable = db.ping().await.unwrap_or(false); + + info!("Reachable: {}", reachable); + assert!(reachable); + + match db.restore(&file_path).await { + Ok(_) => { + info!("Restore succeeded for {}", config.generated_id); + assert!(true) + } + Err(e) => { + error!("Restore failed for {}: {:?}", config.generated_id, e); + assert!(false) + } + } +} diff --git a/src/tests/domain/mysql.rs b/src/tests/domain/mysql.rs new file mode 100644 index 0000000..20b8861 --- /dev/null +++ b/src/tests/domain/mysql.rs @@ -0,0 +1,94 @@ +use crate::domain::factory::DatabaseFactory; +use crate::services::config::{DatabaseConfig, DbType}; +use crate::tests::init_tracing_for_test; +use crate::utils::compress::{compress_to_tar_gz_large, decompress_large_tar_gz}; +use oauth2::url; +use std::path::PathBuf; +use tempfile::TempDir; +use testcontainers::runners::AsyncRunner; +use testcontainers::{ContainerAsync, ImageExt}; +use testcontainers_modules::mysql::Mysql; +use tracing::{error, info}; +use url::Host; + +async fn create_config() -> (ContainerAsync, DatabaseConfig) { + let container = Mysql::default().with_tag("8.1").start().await.unwrap(); + + let host = container + .get_host() + .await + .unwrap_or(Host::parse("127.0.0.1").unwrap()); + + let port = container.get_host_port_ipv4(3306).await.unwrap_or(3306); + + let config = DatabaseConfig { + name: "Test MySQL".to_string(), + database: "test".to_string(), + db_type: DbType::Mysql, + username: "root".to_string(), + password: "".to_string(), + port, + host: host.to_string(), + generated_id: "0f1bb8f2-35a0-4c91-8098-e36873d3ce31".to_string(), + path: "".to_string(), + }; + + (container, config) +} + +#[tokio::test] +async fn mysql_ping_test() { + init_tracing_for_test(); + + let (_container, config) = create_config().await; + + let db = DatabaseFactory::create_for_backup(config.clone()).await; + let reachable = db.ping().await.unwrap_or(false); + + assert!(reachable); +} + +#[tokio::test] +async fn mysql_backup_restore_test() { + init_tracing_for_test(); + + let (_container, config) = create_config().await; + + let temp_dir = TempDir::new().unwrap(); + let backup_path = temp_dir.path(); + + let db = DatabaseFactory::create_for_backup(config.clone()).await; + let file_path = db.backup(backup_path).await.unwrap(); + + assert!(file_path.is_file()); + + let compression = compress_to_tar_gz_large(&file_path).await.unwrap(); + assert!(compression.compressed_path.is_file()); + + let files = decompress_large_tar_gz(compression.compressed_path.as_path(), temp_dir.path()) + .await + .unwrap(); + + let backup_file: PathBuf = if files.len() == 1 { + files[0].clone() + } else { + "".into() + }; + + let db = DatabaseFactory::create_for_restore(config.clone(), &backup_file).await; + let reachable = db.ping().await.unwrap_or(false); + + info!("Reachable: {}", reachable); + assert!(reachable); + + match db.restore(&backup_file).await { + Ok(_) => { + info!("Restore succeeded for {}", config.generated_id); + assert!(true) + } + Err(e) => { + error!("Restore failed for {}: {:?}", config.generated_id, e); + assert!(false) + } + } +} diff --git a/src/tests/domain/postgres.rs b/src/tests/domain/postgres.rs index ca0543f..bac4f6d 100644 --- a/src/tests/domain/postgres.rs +++ b/src/tests/domain/postgres.rs @@ -66,7 +66,7 @@ async fn postgres_backup_restore_test() { let db = DatabaseFactory::create_for_backup(config.clone()).await; - let file_path = db.backup(backup_path, Some(true)).await.unwrap(); + let file_path = db.backup(backup_path).await.unwrap(); assert!(file_path.is_file()); @@ -96,7 +96,7 @@ async fn postgres_backup_restore_test() { info!("Running pg_restore: {:?}", backup_file); - match db.restore(&backup_file, Some(true)).await { + match db.restore(&backup_file).await { Ok(_) => { info!("Restore succeeded for {}", config.generated_id); assert!(true) diff --git a/src/tests/domain/redis.rs b/src/tests/domain/redis.rs index f704aab..32e3ebd 100644 --- a/src/tests/domain/redis.rs +++ b/src/tests/domain/redis.rs @@ -1,6 +1,6 @@ use tempfile::TempDir; -use testcontainers::runners::AsyncRunner; use testcontainers::ContainerAsync; +use testcontainers::runners::AsyncRunner; use testcontainers_modules::redis::Redis; use url::Host; @@ -16,10 +16,7 @@ async fn create_config() -> (ContainerAsync, DatabaseConfig) { .await .unwrap_or(Host::parse("127.0.0.1").unwrap()); - let port = container - .get_host_port_ipv4(6379) - .await - .unwrap_or(6379); + let port = container.get_host_port_ipv4(6379).await.unwrap_or(6379); let config = DatabaseConfig { name: "Test Redis".to_string(), @@ -59,7 +56,7 @@ async fn redis_backup_test() { let db = DatabaseFactory::create_for_backup(config.clone()).await; - let file_path = db.backup(backup_path, Some(true)).await.unwrap(); + let file_path = db.backup(backup_path).await.unwrap(); assert!(file_path.is_file()); -} \ No newline at end of file +} diff --git a/src/tests/domain/valkey.rs b/src/tests/domain/valkey.rs index 840f35e..f018541 100644 --- a/src/tests/domain/valkey.rs +++ b/src/tests/domain/valkey.rs @@ -1,11 +1,11 @@ -use tempfile::TempDir; -use testcontainers::runners::AsyncRunner; -use testcontainers::ContainerAsync; -use testcontainers_modules::valkey::{Valkey}; -use url::Host; use crate::domain::factory::DatabaseFactory; use crate::services::config::{DatabaseConfig, DbType}; use crate::tests::init_tracing_for_test; +use tempfile::TempDir; +use testcontainers::ContainerAsync; +use testcontainers::runners::AsyncRunner; +use testcontainers_modules::valkey::Valkey; +use url::Host; async fn create_config() -> (ContainerAsync, DatabaseConfig) { let container = Valkey::default().start().await.unwrap(); @@ -15,10 +15,7 @@ async fn create_config() -> (ContainerAsync, DatabaseConfig) { .await .unwrap_or(Host::parse("127.0.0.1").unwrap()); - let port = container - .get_host_port_ipv4(6379) - .await - .unwrap_or(6379); + let port = container.get_host_port_ipv4(6379).await.unwrap_or(6379); let config = DatabaseConfig { name: "Test Valkey".to_string(), @@ -58,7 +55,7 @@ async fn valkey_backup_test() { let db = DatabaseFactory::create_for_backup(config.clone()).await; - let file_path = db.backup(backup_path, Some(true)).await.unwrap(); + let file_path = db.backup(backup_path).await.unwrap(); assert!(file_path.is_file()); -} \ No newline at end of file +} diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 6ece72c..168c8ef 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -1,7 +1,6 @@ -mod utils; mod domain; - - +mod services; +mod utils; use once_cell::sync::Lazy; use tracing_subscriber; @@ -15,4 +14,4 @@ static TRACING: Lazy<()> = Lazy::new(|| { fn init_tracing_for_test() -> () { Lazy::force(&TRACING); -} \ No newline at end of file +} diff --git a/src/tests/services/api_models_tests.rs b/src/tests/services/api_models_tests.rs new file mode 100644 index 0000000..0cf7bd6 --- /dev/null +++ b/src/tests/services/api_models_tests.rs @@ -0,0 +1,112 @@ +use serde_json::json; + +use crate::services::api::models::agent::backup::{BackupResponse, BackupUploadResponse}; +use crate::services::api::models::agent::restore::ResultRestoreResponse; +use crate::services::api::models::agent::status::PingResult; + +#[test] +fn backup_response_deserializes_nested_backup_id() { + let response: BackupResponse = serde_json::from_value(json!({ + "message": "created", + "backup": { + "id": "backup-123" + } + })) + .unwrap(); + + assert_eq!(response.message, "created"); + assert_eq!(response.backup.id, "backup-123"); +} + +#[test] +fn backup_upload_response_deserializes_storage_payload() { + let response: BackupUploadResponse = serde_json::from_value(json!({ + "message": "uploaded", + "backupStorage": { + "id": "storage-456" + } + })) + .unwrap(); + + assert_eq!(response.message, "uploaded"); + assert_eq!(response.backup_storage.id, "storage-456"); +} + +#[test] +fn restore_response_deserializes_status() { + let response: ResultRestoreResponse = serde_json::from_value(json!({ + "message": "ok", + "status": true + })) + .unwrap(); + + assert_eq!(response.message, "ok"); + assert!(response.status); +} + +#[test] +fn ping_result_deserializes_and_normalizes_storage_config_keys() { + let payload = json!({ + "agent": { + "id": "agent-1", + "lastContact": "2026-03-22T10:00:00Z" + }, + "databases": [{ + "dbms": "postgres", + "generatedId": "db-1", + "storages": [{ + "id": "storage-1", + "provider": "s3", + "config": { + "bucketName": "agent-backups", + "nestedConfig": { + "regionName": "eu-west-3" + }, + "allowedRegions": [ + { "regionCode": "eu-west-3" } + ] + } + }], + "encrypt": true, + "data": { + "backup": { + "action": true, + "cron": "*/5 * * * *" + }, + "restore": { + "action": false, + "file": null, + "metaFile": null + } + } + }] + }); + + let result: PingResult = serde_json::from_value(payload).unwrap(); + let storage = &result.databases[0].storages[0]; + + assert_eq!(result.agent.id, "agent-1"); + assert_eq!(result.agent.last_contact, "2026-03-22T10:00:00Z"); + assert_eq!(result.databases[0].generated_id, "db-1"); + assert_eq!(storage.provider, "s3"); + assert_eq!( + storage.config["bucket_name"].as_str(), + Some("agent-backups") + ); + assert_eq!( + storage.config["nested_config"]["region_name"].as_str(), + Some("eu-west-3") + ); + assert_eq!( + storage.config["allowed_regions"][0]["region_code"].as_str(), + Some("eu-west-3") + ); + assert_eq!( + result.databases[0].data.backup.cron.as_deref(), + Some("*/5 * * * *") + ); + assert!(result.databases[0].data.backup.action); + assert!(!result.databases[0].data.restore.action); + assert!(result.databases[0].data.restore.file.is_none()); + assert!(result.databases[0].data.restore.meta_file.is_none()); +} diff --git a/src/tests/services/mod.rs b/src/tests/services/mod.rs new file mode 100644 index 0000000..4a12318 --- /dev/null +++ b/src/tests/services/mod.rs @@ -0,0 +1 @@ +mod api_models_tests; diff --git a/src/tests/utils/common_tests.rs b/src/tests/utils/common_tests.rs index ad5ccbd..cff10da 100644 --- a/src/tests/utils/common_tests.rs +++ b/src/tests/utils/common_tests.rs @@ -1,5 +1,5 @@ +use crate::utils::common::{BackupMethod, vec_to_option_json}; use serde_json::json; -use crate::utils::common::{vec_to_option_json, BackupMethod}; #[test] fn backup_method_to_string_automatic() { @@ -39,8 +39,11 @@ fn vec_to_option_json_serializes_struct_vector() { let v = vec![Item { id: 1 }, Item { id: 2 }]; let result = vec_to_option_json(v); - assert_eq!(result, Some(json!([ + assert_eq!( + result, + Some(json!([ { "id": 1 }, { "id": 2 } - ]))); -} \ No newline at end of file + ])) + ); +} diff --git a/src/tests/utils/compress_tests.rs b/src/tests/utils/compress_tests.rs index f4d3b42..b735d95 100644 --- a/src/tests/utils/compress_tests.rs +++ b/src/tests/utils/compress_tests.rs @@ -1,7 +1,7 @@ -use tempfile::tempdir; -use tokio::fs::{write, read}; -use anyhow::Result; use crate::utils::compress::{compress_to_tar_gz_large, decompress_large_tar_gz}; +use anyhow::Result; +use tempfile::tempdir; +use tokio::fs::{read, write}; #[tokio::test] async fn compress_creates_tar_gz() -> Result<()> { @@ -39,7 +39,8 @@ async fn decompress_restores_file() -> Result<()> { let output_dir = tmp.path().join("out"); tokio::fs::create_dir_all(&output_dir).await?; - let extracted_files = decompress_large_tar_gz(&compress_result.compressed_path, &output_dir).await?; + let extracted_files = + decompress_large_tar_gz(&compress_result.compressed_path, &output_dir).await?; assert_eq!(extracted_files.len(), 1); let extracted_content = read(&extracted_files[0]).await?; diff --git a/src/tests/utils/deserializer.rs b/src/tests/utils/deserializer.rs index df1fd4e..2a0c8aa 100644 --- a/src/tests/utils/deserializer.rs +++ b/src/tests/utils/deserializer.rs @@ -1,9 +1,9 @@ #[cfg(test)] mod tests { + use crate::utils::deserializer::{camel_to_snake, deserialize_snake_case, to_snake_case}; use serde::Deserialize; - use toml::map::Map; use toml::Value; - use crate::utils::deserializer::{camel_to_snake, deserialize_snake_case, to_snake_case}; + use toml::map::Map; #[test] fn camel_to_snake_simple() { @@ -45,10 +45,7 @@ mod tests { let mut table2 = Map::new(); table2.insert("AnotherKey".into(), Value::Integer(2)); - let value = Value::Array(vec![ - Value::Table(table1), - Value::Table(table2), - ]); + let value = Value::Array(vec![Value::Table(table1), Value::Table(table2)]); let mut expected_table1 = Map::new(); expected_table1.insert("camel_key".into(), Value::Integer(1)); @@ -101,4 +98,4 @@ mod tests { let result = to_snake_case(value.clone()); assert_eq!(result, value); } -} \ No newline at end of file +} diff --git a/src/tests/utils/edge_key_tests.rs b/src/tests/utils/edge_key_tests.rs index 948c61f..e38d1f8 100644 --- a/src/tests/utils/edge_key_tests.rs +++ b/src/tests/utils/edge_key_tests.rs @@ -1,6 +1,6 @@ -use base64::{engine::general_purpose, Engine as _}; +use crate::utils::edge_key::{EdgeKeyError, decode_edge_key}; +use base64::{Engine as _, engine::general_purpose}; use serde_json::json; -use crate::utils::edge_key::{decode_edge_key, EdgeKeyError}; #[test] fn decode_valid_edge_key() { @@ -9,7 +9,10 @@ fn decode_valid_edge_key() { assert_eq!(decoded.server_url, "http://localhost:8887"); assert_eq!(decoded.agent_id, "625043cf-7c00-43c8-bcc9-d35199896dcd"); - assert_eq!(decoded.master_key_b64, "BXV3XolC656SV7dNgcWPGQlk++rpLI6lGDi7CPB5ieo="); + assert_eq!( + decoded.master_key_b64, + "BXV3XolC656SV7dNgcWPGQlk++rpLI6lGDi7CPB5ieo=" + ); } #[test] @@ -19,7 +22,7 @@ fn decode_edge_key_missing_field() { "agentId": "123" // masterKeyB64 is missing }) - .to_string(); + .to_string(); let b64 = general_purpose::URL_SAFE.encode(incomplete_json); let result = decode_edge_key(&b64); diff --git a/src/tests/utils/file_tests.rs b/src/tests/utils/file_tests.rs new file mode 100644 index 0000000..26581b9 --- /dev/null +++ b/src/tests/utils/file_tests.rs @@ -0,0 +1,88 @@ +use anyhow::Result; +use base64::{Engine as _, engine::general_purpose}; +use futures::StreamExt; +use serde_json::Value; +use tempfile::tempdir; +use tokio::fs; +use tokio::io::AsyncWriteExt; + +use crate::utils::file::{ + decrypt_file_stream_gcm, encrypt_file_stream_gcm, full_extension, full_file_name, + full_file_path, +}; + +#[test] +fn full_extension_returns_everything_after_first_dot() { + assert_eq!( + full_extension(std::path::Path::new("archive.tar.gz")), + ".tar.gz" + ); + assert_eq!(full_extension(std::path::Path::new("README")), ""); +} + +#[test] +fn full_file_name_matches_expected_suffix() { + let unencrypted = full_file_name(false); + let encrypted = full_file_name(true); + + assert!(unencrypted.ends_with(".tar.gz")); + assert!(encrypted.ends_with(".tar.gz.enc")); +} + +#[test] +fn full_file_path_prefixes_backups_directory_and_date() { + let file_name = "backup.tar.gz".to_string(); + let full_path = full_file_path(&file_name); + + assert!(full_path.starts_with("backups/")); + assert!(full_path.ends_with("/backup.tar.gz")); +} + +#[tokio::test] +async fn encrypt_and_decrypt_round_trip_restores_original_bytes() -> Result<()> { + let tmp = tempdir()?; + let input_path = tmp.path().join("plain.txt"); + let encrypted_path = tmp.path().join("cipher.bin"); + let decrypted_path = tmp.path().join("plain.out.txt"); + let original = b"encryption round-trip payload"; + + fs::write(&input_path, original).await?; + + let key = general_purpose::STANDARD.encode([7u8; 32]); + let mut encrypted_stream = encrypt_file_stream_gcm(input_path.clone(), key.clone()).await?; + let mut encrypted_file = fs::File::create(&encrypted_path).await?; + + while let Some(chunk) = encrypted_stream.next().await { + encrypted_file.write_all(&chunk?).await?; + } + + decrypt_file_stream_gcm(encrypted_path, decrypted_path.clone(), key).await?; + + let decrypted = fs::read(decrypted_path).await?; + assert_eq!(decrypted, original); + + Ok(()) +} + +#[tokio::test] +async fn encrypt_stream_starts_with_json_header_line() -> Result<()> { + let tmp = tempdir()?; + let input_path = tmp.path().join("plain.txt"); + fs::write(&input_path, b"header test").await?; + + let key = general_purpose::STANDARD.encode([9u8; 32]); + let mut encrypted_stream = encrypt_file_stream_gcm(input_path, key).await?; + let first_chunk = encrypted_stream.next().await.unwrap()?; + + let header_end = first_chunk + .iter() + .position(|byte| *byte == b'\n') + .expect("missing header newline"); + let header: Value = serde_json::from_slice(&first_chunk[..header_end])?; + + assert_eq!(header["version"], 1); + assert_eq!(header["cipher"], "AES-256-GCM"); + assert_eq!(header["chunk_size"], 16 * 1024 * 1024); + + Ok(()) +} diff --git a/src/tests/utils/mod.rs b/src/tests/utils/mod.rs index c942141..858d6c1 100644 --- a/src/tests/utils/mod.rs +++ b/src/tests/utils/mod.rs @@ -1,5 +1,7 @@ -mod normalize_cron_tests; mod common_tests; mod compress_tests; mod deserializer; -mod edge_key_tests; \ No newline at end of file +mod edge_key_tests; +mod file_tests; +mod normalize_cron_tests; +mod stream_tests; diff --git a/src/tests/utils/normalize_cron_tests.rs b/src/tests/utils/normalize_cron_tests.rs index 2c9a1ab..2144e05 100644 --- a/src/tests/utils/normalize_cron_tests.rs +++ b/src/tests/utils/normalize_cron_tests.rs @@ -1,7 +1,7 @@ +use crate::utils::task_manager::cron::next_run_timestamp; use crate::utils::text::normalize_cron; use cron::Schedule; use std::str::FromStr; -use crate::utils::task_manager::cron::next_run_timestamp; #[test] fn normalize_adds_seconds_to_five_field_cron() { diff --git a/src/tests/utils/stream_tests.rs b/src/tests/utils/stream_tests.rs new file mode 100644 index 0000000..b31cc24 --- /dev/null +++ b/src/tests/utils/stream_tests.rs @@ -0,0 +1,54 @@ +use anyhow::Result; +use base64::{Engine as _, engine::general_purpose}; +use futures::StreamExt; +use tempfile::tempdir; +use tokio::fs; + +use crate::utils::file::decrypt_file_stream_gcm; +use crate::utils::stream::build_stream; + +#[tokio::test] +async fn build_stream_without_encryption_yields_original_bytes() -> Result<()> { + let tmp = tempdir()?; + let file_path = tmp.path().join("plain.txt"); + let content = b"plain upload stream"; + fs::write(&file_path, content).await?; + + let mut upload_stream = build_stream(&file_path, false, &String::new()) + .await? + .stream; + let mut collected = Vec::new(); + + while let Some(chunk) = upload_stream.next().await { + collected.extend_from_slice(&chunk?); + } + + assert_eq!(collected, content); + + Ok(()) +} + +#[tokio::test] +async fn build_stream_with_encryption_produces_decryptable_content() -> Result<()> { + let tmp = tempdir()?; + let input_path = tmp.path().join("plain.txt"); + let encrypted_path = tmp.path().join("encrypted.bin"); + let decrypted_path = tmp.path().join("decrypted.txt"); + let content = b"encrypted upload stream"; + fs::write(&input_path, content).await?; + + let key = general_purpose::STANDARD.encode([5u8; 32]); + let mut upload_stream = build_stream(&input_path, true, &key).await?.stream; + let mut encrypted_bytes = Vec::new(); + + while let Some(chunk) = upload_stream.next().await { + encrypted_bytes.extend_from_slice(&chunk?); + } + + fs::write(&encrypted_path, encrypted_bytes).await?; + decrypt_file_stream_gcm(encrypted_path, decrypted_path.clone(), key).await?; + + assert_eq!(fs::read(decrypted_path).await?, content); + + Ok(()) +} diff --git a/src/utils/common.rs b/src/utils/common.rs index afb5017..deb8ec4 100644 --- a/src/utils/common.rs +++ b/src/utils/common.rs @@ -16,11 +16,10 @@ impl ToString for BackupMethod { } } - pub fn vec_to_option_json(v: Vec) -> Option { if v.is_empty() { None } else { Some(serde_json::to_value(v).expect("serialization failed")) } -} \ No newline at end of file +} diff --git a/src/utils/compress.rs b/src/utils/compress.rs index f354b36..64bb945 100644 --- a/src/utils/compress.rs +++ b/src/utils/compress.rs @@ -4,7 +4,7 @@ use async_compression::tokio::write::GzipEncoder as AsyncGzipEncoder; use futures::StreamExt; use std::path::{Path, PathBuf}; use tokio::fs::File; -use tokio::fs::{create_dir_all}; +use tokio::fs::create_dir_all; use tokio::io::AsyncWriteExt; use tokio::io::BufReader; use tokio_tar::Archive; diff --git a/src/utils/deserializer.rs b/src/utils/deserializer.rs index e4d094a..33955ac 100644 --- a/src/utils/deserializer.rs +++ b/src/utils/deserializer.rs @@ -1,7 +1,6 @@ use serde::{Deserialize, Deserializer}; +use serde_json::Value as ValueJson; use toml::Value; -use serde_json::{Value as ValueJson, }; - pub fn deserialize_snake_case<'de, D>(deserializer: D) -> Result where @@ -39,7 +38,6 @@ pub fn camel_to_snake(s: &str) -> String { out } - pub fn string_or_number_to_string<'de, D>(deserializer: D) -> Result, D::Error> where D: Deserializer<'de>, diff --git a/src/utils/edge_key.rs b/src/utils/edge_key.rs index 43566c0..d18b87d 100644 --- a/src/utils/edge_key.rs +++ b/src/utils/edge_key.rs @@ -34,7 +34,7 @@ pub fn decode_edge_key(edge_key: &str) -> Result { let decoded_str = String::from_utf8_lossy(&decoded_bytes); let parsed: Value = serde_json::from_str(&decoded_str)?; - + if parsed.get("serverUrl").is_some() && parsed.get("agentId").is_some() && parsed.get("masterKeyB64").is_some() diff --git a/src/utils/file.rs b/src/utils/file.rs index f4a1b9b..2cfc31d 100644 --- a/src/utils/file.rs +++ b/src/utils/file.rs @@ -14,8 +14,8 @@ use base64::Engine; use base64::engine::general_purpose; use bytes::Bytes; use futures::Stream; -use rand::rngs::OsRng; use rand::TryRngCore; +use rand::rngs::OsRng; use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; @@ -77,7 +77,8 @@ pub async fn encrypt_file_stream_gcm( rng.try_fill_bytes(&mut base_nonce).unwrap(); let key = Key::::try_from(master_key_bytes.as_slice()) - .map_err(|_| anyhow::anyhow!("Invalid AES-256 key length")).unwrap(); + .map_err(|_| anyhow::anyhow!("Invalid AES-256 key length")) + .unwrap(); let cipher = Aes256Gcm::new(&key); @@ -105,7 +106,8 @@ pub async fn encrypt_file_stream_gcm( nonce_bytes[..8].copy_from_slice(&base_nonce); nonce_bytes[8..].copy_from_slice(&chunk_index.to_be_bytes()); let nonce = Nonce::try_from(&nonce_bytes[..]) - .map_err(|_| anyhow::anyhow!("Invalid nonce length")).unwrap(); + .map_err(|_| anyhow::anyhow!("Invalid nonce length")) + .unwrap(); let ciphertext = cipher.encrypt(&nonce, &buffer[..n]).unwrap(); let mut out = Vec::with_capacity(4 + ciphertext.len()); diff --git a/src/utils/mod.rs b/src/utils/mod.rs index c6cfcf3..ebe8b00 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,12 +1,12 @@ pub mod common; +pub mod compress; +pub mod deserializer; pub mod edge_key; -pub mod redis_client; -pub mod task_manager; -pub mod text; pub mod file; pub mod locks; pub mod logging; +pub mod redis_client; +pub mod stream; +pub mod task_manager; +pub mod text; pub mod tus; -pub mod deserializer; -pub mod compress; -pub mod stream; \ No newline at end of file diff --git a/src/utils/redis_client.rs b/src/utils/redis_client.rs index e7664fb..24159c3 100644 --- a/src/utils/redis_client.rs +++ b/src/utils/redis_client.rs @@ -1,9 +1,8 @@ -use redis::{aio::MultiplexedConnection, Client}; use crate::settings::CONFIG; +use redis::{Client, aio::MultiplexedConnection}; pub async fn redis_connection() -> MultiplexedConnection { - let client = Client::open(CONFIG.redis_url.clone()) - .expect("Invalid Redis URL"); + let client = Client::open(CONFIG.redis_url.clone()).expect("Invalid Redis URL"); client .get_multiplexed_async_connection() diff --git a/src/utils/task_manager/cron.rs b/src/utils/task_manager/cron.rs index a4e5a7a..381ccd4 100644 --- a/src/utils/task_manager/cron.rs +++ b/src/utils/task_manager/cron.rs @@ -50,18 +50,11 @@ pub async fn check_and_update_cron( let metadata_changed = stored.metadata != metadata; if cron_changed || args_changed || metadata_changed { - upsert_task( - conn, - &task_name, - task, - &cron, - args.clone(), - metadata, - ) - .await - .unwrap_or_else(|e| { - tracing::error!("Failed to update task {}: {:?}", task_name, e); - }); + upsert_task(conn, &task_name, task, &cron, args.clone(), metadata) + .await + .unwrap_or_else(|e| { + tracing::error!("Failed to update task {}: {:?}", task_name, e); + }); info!( "Task {} updated (cron: {}, args: {}, metadata: {})", diff --git a/src/utils/task_manager/scheduler.rs b/src/utils/task_manager/scheduler.rs index c5b5fea..1018c79 100644 --- a/src/utils/task_manager/scheduler.rs +++ b/src/utils/task_manager/scheduler.rs @@ -1,4 +1,5 @@ use crate::core::context::Context; +use crate::services::api::models::agent::status::DatabaseStorage; use crate::services::backup::BackupService; use crate::services::config::ConfigService; use crate::utils::common::BackupMethod; @@ -11,7 +12,6 @@ use serde_json::Value; use std::sync::Arc; use tracing::error; use tracing::info; -use crate::services::api::models::agent::status::DatabaseStorage; pub async fn scheduler_loop(mut conn: MultiplexedConnection) { loop { @@ -80,16 +80,22 @@ pub async fn execute_task( let storages_value: &Value = metadata_obj .get("storages") .ok_or_else(|| anyhow::anyhow!("storages key missing"))?; - + let encrypt_value: &Value = metadata_obj .get("encrypt") .ok_or_else(|| anyhow::anyhow!("encrypt key missing"))?; - + let storages: Vec = serde_json::from_value(storages_value.clone())?; - let encrypt : bool = serde_json::from_value(encrypt_value.clone())?; + let encrypt: bool = serde_json::from_value(encrypt_value.clone())?; backup_service - .dispatch(generated_id, &config, BackupMethod::Automatic, &storages, encrypt) + .dispatch( + generated_id, + &config, + BackupMethod::Automatic, + &storages, + encrypt, + ) .await; Ok(()) diff --git a/src/utils/task_manager/tasks.rs b/src/utils/task_manager/tasks.rs index 1ec0acc..28bb115 100644 --- a/src/utils/task_manager/tasks.rs +++ b/src/utils/task_manager/tasks.rs @@ -23,7 +23,7 @@ pub async fn upsert_task( cron: cron.to_string(), args, enabled: true, - metadata + metadata, }; let payload = serde_json::to_string(&entry).unwrap(); diff --git a/src/utils/text.rs b/src/utils/text.rs index 5772fa5..c72b743 100644 --- a/src/utils/text.rs +++ b/src/utils/text.rs @@ -4,4 +4,4 @@ pub fn normalize_cron(expr: &str) -> String { } else { expr.to_string() } -} \ No newline at end of file +}