Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/auto-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ jobs:

# Generate basic release notes
cat > release_notes.md << EOF
## Release $VERSION

### What's Changed

EOF
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/bump-version.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ jobs:
run: |
gh pr create \
--title "Release: Bump version to ${{ steps.version.outputs.new }}" \
--assignee "${{ github.actor }}" \
--body "This PR bumps the version from ${{ steps.version.outputs.current }} to ${{ steps.version.outputs.new }}.

**Bump type:** ${{ github.event.inputs.bump_type }}
Expand Down
8 changes: 0 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -407,14 +407,6 @@ See [DEVELOPING](/DEVELOPING.md) for developer instructions.

See [RELEASING.md](/RELEASING.md) for release process instructions.

**Quick Start:**
- **GitHub Actions (Recommended):** Go to [Actions → Bump Version](../../actions/workflows/bump-version.yml) and click "Run workflow"

After merging the version bump PR:
1. Tag is automatically created (`auto-tag.yml`)
2. GitHub release is created (`auto-release.yml`)
3. Artifacts are built and uploaded (`release.yml`)

---

Built with ❤️ by Streamfold.
81 changes: 66 additions & 15 deletions src/tags/mod.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,48 @@
mod cache;
mod cloudwatch;
mod s3;

pub use cache::TagCache;
pub use cache::{CacheEntry, TagCache};
pub use cloudwatch::{CloudWatchError, CloudWatchTagFetcher};
pub use s3::{S3Cache, S3Error};

use crate::s3_cache::{S3Cache, S3CacheError};
use aws_sdk_cloudwatchlogs::Client as CloudWatchLogsClient;
use aws_sdk_s3::Client as S3Client;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{Duration, Instant};
use thiserror::Error;
use tracing::{debug, error, info, warn};

/// S3 cache key for storing tags
const CACHE_KEY: &str = "rotel-lambda-forwarder/cache/log-groups/tags.json.gz";

/// Serializable format for the cache file
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CacheFile {
/// Version of the cache file format
pub version: u32,
/// Map of log group names to their cache entries (with timestamps)
pub log_groups: HashMap<String, CacheEntry>,
}

impl CacheFile {
/// Create a new cache file
pub fn new(log_groups: HashMap<String, CacheEntry>) -> Self {
Self {
version: 1,
log_groups,
}
}
}

/// Errors that can occur during tag operations
#[derive(Debug, Error)]
pub enum TagError {
#[error("CloudWatch error: {0}")]
CloudWatch(#[from] CloudWatchError),

#[error("S3 error: {0}")]
S3(#[from] S3Error),
S3(#[from] S3CacheError),

#[error("No S3 bucket configured")]
NoS3Bucket,
Expand All @@ -37,7 +59,7 @@ pub struct TagManager {
/// In-memory cache
cache: TagCache,
/// S3 persistence layer (optional)
s3_cache: Option<S3Cache>,
s3_cache: Option<S3Cache<CacheFile>>,
/// CloudWatch client for fetching tags
cw_fetcher: CloudWatchTagFetcher,
/// Whether to persist to S3
Expand All @@ -57,7 +79,9 @@ impl TagManager {
let persist_enabled = s3_client.is_some() && s3_bucket.is_some();

let s3_cache = match (s3_client, s3_bucket) {
(Some(client), Some(bucket)) => Some(S3Cache::new(client, bucket)),
(Some(client), Some(bucket)) => {
Some(S3Cache::new(client, bucket, CACHE_KEY.to_string()))
}
_ => None,
};

Expand All @@ -80,9 +104,12 @@ impl TagManager {
pub async fn initialize(&mut self) -> Result<(), TagError> {
if let Some(s3_cache) = &mut self.s3_cache {
match s3_cache.load().await {
Ok(Some(snapshot)) => {
info!(entry_count = snapshot.len(), "Loaded cache from S3");
self.cache.load_snapshot(snapshot);
Ok(Some(cache_file)) => {
info!(
entry_count = cache_file.log_groups.len(),
"Loaded cache from S3"
);
self.cache.load_snapshot(cache_file.log_groups);
}
Ok(None) => {
warn!("No existing cache found in S3");
Expand Down Expand Up @@ -180,25 +207,49 @@ impl TagManager {
async fn persist_cache(&mut self) -> Result<(), TagError> {
if let Some(s3_cache) = &mut self.s3_cache {
let snapshot = self.cache.get_snapshot();
let cache_file = CacheFile::new(snapshot);

match s3_cache.save(snapshot).await {
match s3_cache.save(cache_file).await {
Ok(_) => {
debug!("Successfully persisted cache to S3");
Ok(())
}
Err(S3Error::ConditionalWriteFailed) => {
Err(S3CacheError::ConditionalWriteFailed) => {
warn!("Conditional write failed, reloading and merging cache");

// Reload from S3 and merge with our current snapshot
let snapshot = self.cache.get_snapshot();
let merged = s3_cache.reload_and_merge(snapshot).await?;
let local_snapshot = self.cache.get_snapshot();
let local_cache_file = CacheFile::new(local_snapshot);

let merged_cache_file = s3_cache
.reload_and_merge(local_cache_file, |from_s3, local| {
// Merge the log groups, keeping the most recent entry for each
let mut merged = from_s3.log_groups;
for (log_group, new_entry) in local.log_groups {
let should_update = if let Some(existing_entry) = merged.get(&log_group) {
// Keep the entry with the most recent last_seen time
new_entry.last_seen_secs > existing_entry.last_seen_secs
} else {
// New entry, always insert
true
};

if should_update {
debug!(log_group = %log_group, "Updating merged cache with more recent entry");
merged.insert(log_group, new_entry);
}
}
CacheFile::new(merged)
})
.await?;

// Merge the result back into our in-memory cache
// This keeps the most recent entry for each log group
self.cache.merge_snapshot(merged.clone());
self.cache
.merge_snapshot(merged_cache_file.log_groups.clone());

// Try to save again with the merged data
s3_cache.save(merged).await?;
s3_cache.save(merged_cache_file).await?;

info!("Successfully merged and persisted cache after conflict");
Ok(())
Expand Down
Loading