diff --git a/asynclogger/.cursor/rules/global/golang/code-quality.mdc b/asynclogger/.cursor/rules/global/golang/code-quality.mdc new file mode 100644 index 00000000..9a48e63e --- /dev/null +++ b/asynclogger/.cursor/rules/global/golang/code-quality.mdc @@ -0,0 +1,17 @@ +--- +description: "Guidelines for maintaining code quality, clarity, and consistency across imports, function usage, and documentation" +globs: +alwaysApply: false +--- + +### Import statements quality +1. Do not change the import order of existing import files and packages, just add the new ones +2. Avoid wild card imports + +### Functions usage +1. Do not change the private functions to public unless we want other packages to use it +2. If the same package is using the function then keep it private + +### Code comments and documentation +1. Add comments only when necessary, do not add comments when the code itself gives the understanding +2. Add comments for architectural decision diff --git a/asynclogger/.cursor/rules/global/golang/compliance.mdc b/asynclogger/.cursor/rules/global/golang/compliance.mdc new file mode 100644 index 00000000..2b802036 --- /dev/null +++ b/asynclogger/.cursor/rules/global/golang/compliance.mdc @@ -0,0 +1,36 @@ +--- +description: "Provide the list of rules which are obeyed in every response" +globs: +alwaysApply: true +--- + +# Rule Compliance Tracking + +## Response Format Requirement +When responding to user queries about code patterns, architecture, or implementation examples: + +1. **Always List Obeyed Rules**: At the end of your response, include a section titled "**Rules Followed:**" that lists the specific rules from the codebase that are being obeyed or demonstrated in your response. + +2. **Rule Identification**: Identify rules from the following files: + - `.cursor/rules/unit-testing.mdc` + - `.cursor/rules/requirement-planning.mdc` + - `.cursor/rules/code-quality.mdc` + - `.cursor/rules/compliance.mdc` + +3. **Example Format**: + ``` + **Rules Followed:** + - Dependency Injection Pattern + - Configuration Management + - Interface Design + ``` + +4. **Context Awareness**: Only list rules that are actually relevant to the specific response or examples provided. + +5. **Rule Verification**: When analyzing code examples, verify which rule categories are being followed and explicitly mention the rule headings. + +## Implementation Guidelines +- Scan the codebase for examples that demonstrate rule compliance +- Provide concrete code examples when possible +- Explain how the examples follow the specific rule categories +- Use the rule headings/categories from the source files when listing them diff --git a/asynclogger/.cursor/rules/global/golang/data-interface-concurrency-errors.mdc b/asynclogger/.cursor/rules/global/golang/data-interface-concurrency-errors.mdc new file mode 100644 index 00000000..f359b79e --- /dev/null +++ b/asynclogger/.cursor/rules/global/golang/data-interface-concurrency-errors.mdc @@ -0,0 +1,61 @@ +--- +description: These rules cover data shapes, allocation, interface design, and safe concurrency/error handling—areas where AI can subtly violate idioms or introduce leaks and races. +alwaysApply: false +globs: *.go +--- + +### Effective Go Rules: Data, Allocation, Interfaces, Concurrency, Errors + +These rules cover data shapes, allocation, interface design, and safe concurrency/error handling—areas where AI can subtly violate idioms or introduce leaks and races. + +## Arrays, Slices, Maps +- Reassign the result of `append`; capacity may change. +- Preallocate capacity when known; use `copy` for duplication. +- If copying a slice or map, the pointers nested in these will not be copied. +- Maps: missing keys yield zero values; use comma-ok to test presence. + +Example: +```go +v, ok := m[key] +if !ok { /* handle missing */ } +``` + +## Variadics & Append +- Use `...T` for flexible APIs; forward with `f(v...)`. +- Concatenate slices with `append(dst, src...)`. + + +## Methods: Pointer vs Value +- Pointer receivers for mutation/big copies. + +## Interfaces +- Define small, behavior-driven interfaces near use sites. +- Constructors should return the narrowest interface needed. +- Convert types to reuse method sets (e.g., `sort.IntSlice(s).Sort()`). + + +## Concurrency Basics +- Start goroutines only when beneficial; ensure they can terminate. +- Use channels for synchronization/communication; avoid shared memory without coordination. + +Use Worker pool at appropriate places: +```go +jobs := make(chan Job) +for i := 0; i < N; i++ { go func() { for j := range jobs { handle(j) } }() } +``` + +Channel ownership: +- Close channels only from the sender/owner. + +## Context & Cancellation +- Accept `context.Context` and honor `Done()` for long-lived operations. +- Avoid `time.Sleep` polling; use timers/tickers with `select`. + +## Panic, Recover, Errors +- Prefer `error` returns; reserve `panic` for unrecoverable programmer errors. +- At boundaries, `defer` + `recover` to convert internal panics to errors; type-assert expected panic values. +- Structure error strings without caps/punctuation; prefix with operation or package when helpful. +- Avoid double-reporting (log and return); choose a single owner. +- Use rich error types (ex: `*os.PathError`) and `%w` wrapping. +- Always try to return a meaningfull error code and message, client should be able to find out the failure reason without exposing internal details. +- Either log or return - not both. If you're returning an API response, then log it as well. diff --git a/asynclogger/.cursor/rules/global/golang/formatting-control.mdc b/asynclogger/.cursor/rules/global/golang/formatting-control.mdc new file mode 100644 index 00000000..e9f6d7ec --- /dev/null +++ b/asynclogger/.cursor/rules/global/golang/formatting-control.mdc @@ -0,0 +1,34 @@ +--- +description: These rules focus on the places where AI often drifts from idiomatic Go,, naming that avoids stutter, brace placement, early-returns, and minimalistic control flow. Follow these to produce idiomatic, readable Go. +alwaysApply: false +globs: *.go +--- + +### Effective Go Rules: Formatting, Comments, Naming, Semicolons, Control Flow + +These rules focus on the places where AI often drifts from idiomatic Go: formatting handled by tools, naming that avoids stutter, brace placement, early-returns, and minimalistic control flow. Follow these to produce idiomatic, readable Go. + + +## Naming +- Do not prefix getters with `Get`; prefer `Owner()` over `GetOwner()`; setters as `SetOwner(x)`. + +## Control Flow Essentials +- Prefer early returns for errors; let the success path flow downward. +- Omit `else` when the `if` body ends in `return/break/continue`. +- Use `if`/`switch` init statements to scope locals. +- Prefer `switch` over long `if-else` chains; avoid implicit fallthrough. + +## `defer` and Resources +- Place `defer` immediately after acquiring a resource; args evaluated at `defer` time. +- Deferred calls run LIFO. Avoid deferring inside hot loops. + +## Printing and Diagnostics +- Prefer `%v` for values, `%+v` to include field names, `%#v` for Go syntax, `%T` for type. + +## Shadowing and Short Decls +- Reuse `err` with `:=` when at least one new variable exists; avoid accidental shadowing across scopes. +- Declare variables at first use; avoid predeclaring far from usage. + + +## Extra +- Prefer `const` for stable magic numbers; otherwise localize and comment literals. diff --git a/asynclogger/.cursor/rules/global/golang/requirement-planning.mdc b/asynclogger/.cursor/rules/global/golang/requirement-planning.mdc new file mode 100644 index 00000000..868ffa59 --- /dev/null +++ b/asynclogger/.cursor/rules/global/golang/requirement-planning.mdc @@ -0,0 +1,12 @@ +--- +description: "Promotes thorough requirement analysis, careful planning, and implementation of robust, general-purpose solutions." +alwaysApply: false +--- +### Requirement Understanding + +Before creating a plan you MUST ask questions from the user to get clarity on the problem statement + +1. Break down a problem statement into smaller problem statements +2. Please write a high quality, general purpose solution. Implement a solution that works correctly for all valid inputs, not just the test cases. Do not hard-code values or create solutions that only work for specific test inputs. Instead, implement the actual logic that solves the problem generally. +3. Focus on understanding the problem requirements and implementing the correct algorithm. Tests are there to verify correctness, not to define the solution. Provide a principled implementation that follows best practices and software design principles. +4. If the task is unreasonable or infeasible, or if any of the tests are incorrect, please tell me. The solution should be robust, maintainable, and extendable. diff --git a/asynclogger/.cursor/rules/global/golang/unit-testing.mdc b/asynclogger/.cursor/rules/global/golang/unit-testing.mdc new file mode 100644 index 00000000..c159d66f --- /dev/null +++ b/asynclogger/.cursor/rules/global/golang/unit-testing.mdc @@ -0,0 +1,108 @@ +--- +description: "Go testing patterns including test structure, mocking, test data, assertions and more" +globs: *.go +alwaysApply: false +--- + +# Go Testing Patterns + +## Instructions + +### Test Plan Creation + +When writing unit tests, get clarity on the system under test and come up with a list of what all cases can be possible within that system. Generate the unit test for all those cases + +### Coverage Guidelines +- MUST run `go test -cover` to measure coverage, instead of calculating yourself and aim for 80%+ code coverage for critical business logic +- Focus on testing behavior, not just coverage percentage +- Test error paths and edge cases + +## Test Structure and Organization + +### Test File Organization +- Group related tests using subtests with `t.Run()` +- If there is a single test, then don't use `t.Run()` +- Generate Go tests using testify's suite package for structure wherever applicable + +### Test Naming Conventions +Use descriptive test names that clearly indicate the scenario: + +```go +// ✅ Good - Clear scenario description +func TestConfigService_GetConfig_ReturnsConfigWhenFound(t *testing.T) {} +func TestConfigService_GetConfig_ReturnsErrorWhenNotFound(t *testing.T) {} +func TestConfigService_ValidateConfig_ReturnsErrorOnInvalidData(t *testing.T) {} + +// ❌ Avoid - Vague test names +func TestGetConfig(t *testing.T) {} +func TestValidation(t *testing.T) {} +``` + +## Mocking and Dependencies + +### Interface-Based Mocking +Use interfaces for dependencies to enable easy mocking + +### Mock Setup Patterns +mockRepo := mocks.NewMockRepository(ctrl) +mockRepo.EXPECT(). + GetConfig("test-id"). + Return(&Config{ID: "test-id"}, nil). + Times(1) + +## Test Data and Fixtures + +### Test Data Creation +Create helper functions for building test data where the test data is common + +### Table-Driven Tests +Use table-driven tests for testing multiple scenarios + +## Assertion Patterns + +### Using testify/assert +Prefer testify/assert for better error messages + +```go +// ✅ Good - Clear assertions with testify +assert.NoError(t, err) +assert.Equal(t, expectedValue, actualValue) +assert.Contains(t, slice, element) +assert.Len(t, collection, expectedLength) + +// ❌ Avoid - Basic Go testing with poor error messages +if err != nil { + t.Errorf("expected no error, got %v", err) +} +``` + +### Error Testing +```go +// Testing specific error types +assert.ErrorIs(t, err, ErrConfigNotFound) +assert.ErrorAs(t, err, &validationErr) +assert.NoError(t, err) + +// Testing error messages +assert.EqualError(t, err, "expected error message") +assert.Contains(t, err.Error(), "partial error message") +``` + +## HTTP Handler Testing + +### Testing HTTP Handlers +Use `httptest` for testing HTTP handlers + +## Test Quality + +### Test Organization +- Use setup/teardown functions for complex test scenarios +- Keep tests independent - one test should not depend on another +- Use parallel tests where appropriate: `t.Parallel()` + +### Common Pitfalls to Avoid +1. **Testing Implementation Details**: Focus on behavior, not internal implementation +2. **Ignoring Error Cases**: Always test both success and error scenarios +3. **Flaky Tests**: Avoid time-dependent tests, use deterministic test data +4. **Over-Mocking**: Don't mock everything, test real integrations where valuable +5. **Poor Test Data**: Use realistic test data that reflects production scenarios diff --git a/asynclogger/.cursor/rules/global/security-golang/secure-golang-rules.mdc b/asynclogger/.cursor/rules/global/security-golang/secure-golang-rules.mdc new file mode 100644 index 00000000..49cffe70 --- /dev/null +++ b/asynclogger/.cursor/rules/global/security-golang/secure-golang-rules.mdc @@ -0,0 +1,211 @@ +--- +description: Rules to ensure secure coding in Golang +globs: **/*.go +alwaysApply: true +--- +These rules apply to all Go code in the repository and aim to prevent common security risks through disciplined input handling, safe APIs, and secure defaults. + +All violations must include a clear explanation of which rule was triggered and why, so developers can fix issues quickly.\ +Generated code must not violate these rules. If a rule is violated, add a code comment that explains the problem and proposes a correction. + +## 1. Decode Untrusted Data Safely + +- Do not deserialize untrusted data with unsafe or permissive decoders. Prefer strict JSON or protobuf with size limits. Reject unknown fields. Avoid `encoding/gob` for untrusted input. Use strict YAML decoding only if required. +- + ```go + // Accepts arbitrarily large input and unknown fields + var in any + _ = json.NewDecoder(r.Body).Decode(&in) + ``` +- + ```go + type CreateUser struct { + Name string `json:"name"` + Email string `json:"email"` + } + + dec := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)) // 1 MB cap + dec.DisallowUnknownFields() + dec.UseNumber() + + var in CreateUser + if err := dec.Decode(&in); err != nil { /* handle */ } + ``` +- **YAML (only if needed):** + ```go + dec := yaml.NewDecoder(bytes.NewReader(b)) + dec.KnownFields(true) // yaml.v3 + if err := dec.Decode(&cfg); err != nil { /* handle */ } + ``` +- **Protobuf JSON:** + ```go + opts := protojson.UnmarshalOptions{DiscardUnknown: false} + if err := opts.Unmarshal(b, msg); err != nil { /* handle */ } + ``` + +## 2. Use Parameterized Queries for Database Access + +- Never format SQL or NoSQL queries with user input. Use placeholders and arguments. Use context with timeouts. +- + ```go + query := fmt.Sprintf("SELECT * FROM users WHERE name = '%s'", name) + rows, _ := db.Query(query) + ``` +- + ```go + ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second) + defer cancel() + + row := db.QueryRowContext(ctx, "SELECT id FROM users WHERE name = $1", name) + ``` + +## 3. Prevent Command Injection + +- Do not pass untrusted input to shells. Use `exec.CommandContext` with fixed program and separate args. Validate inputs against allow lists. +- + ```go + exec.Command("sh", "-c", "ls "+userArg).Run() + ``` +- + ```go + // validatedArg must pass strict allow list or regex + cmd := exec.CommandContext(ctx, "ls", validatedArg) + cmd.Stdout = w + cmd.Stderr = w + _ = cmd.Run() + ``` + +## 4. Prevent Path Traversal Vulnerabilities + +- : Do not use untrusted input directly in filesystem APIs (os.Open, os.ReadFile, os.Create). + Always sanitize path input to remove traversal elements and enforce a strict allow-list for filenames. +- : If the filename doesn't need to be user-controlled, use a UUID or another randomly generated string instead. This is the most secure method. +- : + ```go + import "path/filepath" + + fileName := r.URL.Query().Get("filename") // Attacker can provide "../../etc/passwd" as a filename + path := filepath.Join("/var/data/", fileName) + data, err := os.ReadFile(path) + ``` +- (Never use user input directly. Sanitize it with filepath.Base and validate with an allow-list): + ```go + import ( + "path/filepath" + "regexp" + ) + + userInput := r.URL.Query().Get("filename") + + // 1. Sanitize the input to get only the final path component. + // removes and turns "../../etc/passwd" into "passwd". + filename := filepath.Base(userInput) + + // 2. Validate the sanitized filename against a strict allow-list. + isValid, _ := regexp.MatchString(`^[A-Za-z0-9_-]{1,200}\.png$`, filename) + + if !isValid { + return + } + ``` + +// 3. Now it's safe to join with the base directory. +safePath := filepath.Join("/var/data/", filename) + ``` + +## 5. Template Safety + +- Use `html/template` for HTML to get auto-escaping. Never use `text/template` for HTML. +- + ```go + t := template.Must(template.New("x").Parse("
{{.UserInput}}
")) + ``` +- + ```go + t := template.Must(htmltemplate.New("x").Parse("
{{.UserInput}}
")) + ``` + +## 6. Log and Error Hygiene + +- Do not log secrets, tokens, personal data, or full request bodies. Redact sensitive fields. Return generic error messages to clients. Use a recover middleware to hide panics. +- + ```go + // Example redaction + logger.Info("login", "user", in.Email, "token", "[redacted]") + + // Recover middleware + func Recover(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer func() { + if rec := recover(); rec != nil { + http.Error(w, "internal error", http.StatusInternalServerError) + } + }() + next.ServeHTTP(w, r) + }) + } + ``` + +## 7. Concurrency and Race Safety + +- Avoid TOCTOU on files and permissions. Guard shared state. +- + ```go + f, err := os.OpenFile(p, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0600) + ``` + +## 8. Cookies and Sessions + +- Set `Secure`, `HttpOnly`, and an appropriate `SameSite`. Do not store secrets in client cookies unless encrypted and signed with a server key. +- + ```go + http.SetCookie(w, &http.Cookie{ + Name: "sid", + Value: token, + Secure: true, + HttpOnly: true, + SameSite: http.SameSiteLaxMode, + Path: "/", + }) + ``` + +## 9. CSRF and CORS + +- Use CSRF protections for state-changing requests in browser-based apps. For CORS, allow list origins and avoid `Access-Control-Allow-Origin: *` with credentials. +- : + ```go + // Pseudocode: only allow https://app.example.com + w.Header().Set("Access-Control-Allow-Origin", "https://app.example.com") + w.Header().Set("Vary", "Origin") + ``` + +## 10. Avoid Reflection, `unsafe`, and Cgo for Untrusted Data + +- Do not use `reflect`, `unsafe`, or Cgo to parse or transform untrusted inputs. Keep type boundaries strict. + +## 11. Operational Hardening + +- Run with least privilege. In containers, avoid root, drop capabilities, mount only what you need. Never expose debug endpoints publicly. +- + ```go + // Exposing pprof on 0.0.0.0 in prod + http.ListenAndServe(":6060", http.DefaultServeMux) + ``` +- + ```go + // Bind pprof to localhost only, or protect with auth and network policy + go func() { _ = http.ListenAndServe("127.0.0.1:6060", nil) }() + ``` +## 12. Parsing XML Securly: +- Use encoding/xml for parsing XML. It does not support DTDs or external entities, so it’s safe against XXE by default. Always limit input size and nesting depth to prevent denial-of-service. +- Never use third-party XML libraries that enable DTDs or external entities. Wrap input with io.LimitReader to cap size. Track nesting depth in custom decoders to prevent stack exhaustion. Validate parsed data against business rules (schemas, required fields, etc.). + +--- + +### Additional Guidance + +- Check inputs as early as possible: validate type, length, format, and restrict to safe values. +- Impose strict size limits on untrusted data such as HTTP requests, file uploads, or archives. +- Avoid reflecting user-controlled content in error messages, HTML, or logs. +- Use `context.Context` to manage timeouts and cancellations consistently across I/O operations. +- Clearly explain any deviations from these rules in the code, along with a follow-up task to remove them. diff --git a/asynclogger/.cursor/rules/global/security-golang/ssrf-prevention-rules.mdc b/asynclogger/.cursor/rules/global/security-golang/ssrf-prevention-rules.mdc new file mode 100644 index 00000000..fd8906b7 --- /dev/null +++ b/asynclogger/.cursor/rules/global/security-golang/ssrf-prevention-rules.mdc @@ -0,0 +1,30 @@ +--- +description: "Rules to prevent Server-Side Request Forgery (SSRF) vulnerabilities" +globs: +alwaysApply: true +--- +# SSRF Prevention + +These rules apply to all code that performs outbound network requests, regardless of language or framework, including generated code. + +All violations must include a clear explanation of which rule was triggered and why, to help developers understand and fix the issue effectively. +Generated code must not violate these rules. If a rule is violated, a comment must be added explaining the issue and suggesting a correction. + +## 1. Do Not Allow User Input to Control Target URLs +- **Rule:** Never use raw or unchecked user input as the destination for outbound HTTP requests. +- **Example (unsafe):** + ```python + requests.get(request.args["url"]) + ``` + +## 2. Block Access to Private/Internal IP Ranges +- **Rule:** Outbound requests must not be allowed to reach `localhost`, `127.0.0.1`, `169.254.0.0/16`, `192.168.0.0/16`, `10.0.0.0/8`, or other internal/reserved ranges. + +## 3. Resolve and Validate Hostnames Before Use +- **Rule:** Perform DNS resolution and validate that the resolved IP is in an allowed range before initiating a request. + +## 4. Restrict Allowed Protocols and Ports +- **Rule:** Only allow HTTP/HTTPS protocols and known-safe ports (e.g., 80, 443). Block access to file URLs, gopher, FTP, or custom handlers. + +## 5. Do Not Forward Authorization Headers Automatically +- **Rule:** Never pass internal tokens, cookies, or auth headers when proxying or forwarding outbound requests unless explicitly scoped and audited. diff --git a/asynclogger/blob-logger/shard/shard.go b/asynclogger/blob-logger/shard/shard.go new file mode 100644 index 00000000..041a30ee --- /dev/null +++ b/asynclogger/blob-logger/shard/shard.go @@ -0,0 +1,36 @@ +package shard + +import "sync/atomic" + +type Shard struct { + data []byte + offset atomic.Int32 + capacity int32 + id uint32 +} + +func NewShard(capacity int, id uint32) *Shard { + return &Shard{ + data: make([]byte, capacity), + offset: atomic.Int32{}, + capacity: int32(capacity), + id: id, + } +} + +func (b *Shard) Write(p []byte) (int32, bool) { + size := int32(len(p)) + if size == 0 { + return 0, false + } + + newOffset := b.offset.Add(size) + + if newOffset > b.capacity { + return 0, true + } + + copy(b.data[newOffset-size:newOffset], p) + + return size, false +} diff --git a/asynclogger/blob-logger/shard/shard_bench_test.go b/asynclogger/blob-logger/shard/shard_bench_test.go new file mode 100644 index 00000000..b36fa2ca --- /dev/null +++ b/asynclogger/blob-logger/shard/shard_bench_test.go @@ -0,0 +1,116 @@ +package shard + +import ( + "testing" +) + +func BenchmarkShard_Write(b *testing.B) { + const chunkSize = 1024 * 256 // 256 KB + + data := make([]byte, chunkSize) + + // Ensure the shard has enough capacity for all iterations so we stay on the fast path. + shard := NewShard(chunkSize*b.N, 0) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if _, full := shard.Write(data); full { + b.Fatalf("shard reported full unexpectedly at iteration %d", i) + } + } +} + +func BenchmarkShard_Write_concurrent(b *testing.B) { + const chunkSize = 1024 * 256 // 256 KB payload + + data := make([]byte, chunkSize) + + // Allocate enough capacity for ALL writes across ALL goroutines. + // RunParallel executes exactly b.N iterations total. + shard := NewShard(chunkSize*b.N, 0) + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Each Next() == one Write() + if _, full := shard.Write(data); full { + panic("shard reported full unexpectedly") + } + } + }) +} + +func BenchmarkShardV2_Write(b *testing.B) { + const chunkSize = 1024 * 256 // 256 KB + + data := make([]byte, chunkSize) + + // Ensure the shard has enough capacity for all iterations so we stay on the fast path. + shard := NewShardV2(chunkSize*b.N, 0) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if _, _, full, _ := shard.Write(data); full { + b.Fatalf("shard reported full unexpectedly at iteration %d", i) + } + } +} + +func BenchmarkShardV2_Write_concurrent(b *testing.B) { + const chunkSize = 1024 * 256 // 256 KB payload + + data := make([]byte, chunkSize) + + // Allocate enough capacity for ALL writes across ALL goroutines. + // RunParallel executes exactly b.N iterations total. + shard := NewShardV2(chunkSize*b.N, 0) + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Each Next() == one Write() + if _, _, full, _ := shard.Write(data); full { + panic("shard reported full unexpectedly") + } + } + }) +} + +func BenchmarkShardV3_Write(b *testing.B) { + const chunkSize = 1024 * 256 // 256 KB + + data := make([]byte, chunkSize) + + // Ensure the shard has enough capacity for all iterations so we stay on the fast path. + shard := NewShardV3(chunkSize*b.N, 0) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if _, full := shard.Write(data); full { + b.Fatalf("shard reported full unexpectedly at iteration %d", i) + } + } +} + +func BenchmarkShardV3_Write_concurrent(b *testing.B) { + const chunkSize = 1024 * 256 // 256 KB payload + + data := make([]byte, chunkSize) + + // Allocate enough capacity for ALL writes across ALL goroutines. + // RunParallel executes exactly b.N iterations total. + shard := NewShardV3(chunkSize*b.N, 0) + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Each Next() == one Write() + if _, full := shard.Write(data); full { + panic("shard reported full unexpectedly") + } + } + }) +} diff --git a/asynclogger/blob-logger/shard/shard_handler.go b/asynclogger/blob-logger/shard/shard_handler.go new file mode 100644 index 00000000..58f712de --- /dev/null +++ b/asynclogger/blob-logger/shard/shard_handler.go @@ -0,0 +1,128 @@ +package shard + +import ( + "fmt" + "math/rand/v2" + "runtime" + "time" + + "github.com/Meesho/BharatMLStack/asynclogger/blob-logger/ssdio" +) + +type SwapShard struct { + shards []*ShardV2 + active uint64 + writer *ssdio.SSDWriter + semaphore chan int +} + +func NewSwapShard(shards []*ShardV2, writer *ssdio.SSDWriter) *SwapShard { + + return &SwapShard{ + shards: shards, + active: 0, + writer: writer, + semaphore: make(chan int, 1), + } +} + +func (s *SwapShard) Swap() { + s.shards[s.active].readyForFlush.Store(1) + s.active = (s.active + 1) % uint64(len(s.shards)) +} +func (s *SwapShard) SwapBlocking() { + s.shards[s.active].readyForFlush.Store(1) + next := (s.active + 1) % uint64(len(s.shards)) + nextShard := s.shards[next] + + for nextShard.readyForFlush.Load() == 1 { + runtime.Gosched() + } + + s.active = next +} +func (s *SwapShard) GetActive() *ShardV2 { + return s.shards[s.active] +} + +func (s *SwapShard) GetInactive() *ShardV2 { + return s.shards[(s.active+1)%uint64(len(s.shards))] +} + +type ShardHandler struct { + swapShards map[uint32]*SwapShard +} + +func NewShardHandler(shardCount int, capacity int, fileSize int, dirPath string) *ShardHandler { + swapShards := make(map[uint32]*SwapShard) + j := 0 + for i := 0; i < shardCount; i += 2 { + shardA := NewShardV2(capacity, uint32(i)) + shardB := NewShardV2(capacity, uint32(i+1)) + writer, err := ssdio.NewSSDWriter(fileSize, dirPath, j) + if err != nil { + panic(err) + } + swapShards[uint32(j)] = NewSwapShard([]*ShardV2{shardA, shardB}, writer) + j++ + } + return &ShardHandler{ + swapShards: swapShards, + } +} + +func (s *ShardHandler) Write(p []byte) bool { + randomNumber := rand.IntN(10000) + swapShardId := uint32(randomNumber % len(s.swapShards)) + swapShard := s.swapShards[swapShardId] + _, id, full, _ := swapShard.GetActive().Write(p) + if !full { + return true + } + swapShard.semaphore <- 1 + currShard := swapShard.GetActive() + if id == currShard.id { + s.flush(swapShardId, id) + swapShard.SwapBlocking() + _, _, full, _ := swapShard.GetActive().Write(p) + <-swapShard.semaphore + if !full { + return true + } else { + return false + } + } else { + <-swapShard.semaphore + _, _, full, _ := currShard.Write(p) + if !full { + return true + } else { + return false + } + } +} + +func (s *ShardHandler) flush(swapShardId, shardId uint32) { + swapShard := s.swapShards[swapShardId] + var flushShard *ShardV2 + if shardId == swapShard.GetActive().id { + flushShard = swapShard.GetActive() + } else { + flushShard = swapShard.GetInactive() + } + + go func() { + for flushShard.inflight.Load() != 0 { + //fmt.Printf("waiting for inflight to be 0 %d %d\n", swapShardId, shardId) + runtime.Gosched() + } + startTime := time.Now() + _, err := swapShard.writer.Write(flushShard.data) + if err != nil { + panic(err) + } + duration := time.Since(startTime) + fmt.Printf("flush duration: %d\n", duration.Nanoseconds()) + flushShard.Reset() + }() +} diff --git a/asynclogger/blob-logger/shard/shard_handler_bench_test.go b/asynclogger/blob-logger/shard/shard_handler_bench_test.go new file mode 100644 index 00000000..2df822fb --- /dev/null +++ b/asynclogger/blob-logger/shard/shard_handler_bench_test.go @@ -0,0 +1,56 @@ +package shard + +import ( + "sync/atomic" + "testing" + "time" +) + +func BenchmarkShardHandler_Write_concurrent(b *testing.B) { + const chunkSize = 1024 * 256 // 256 KB payload + + data := make([]byte, chunkSize) + + // Allocate enough capacity for ALL writes across ALL goroutines. + // RunParallel executes exactly b.N iterations total. + shardHandler := NewShardHandler(4, 256*1024*1024, 10*1024*1024*1024, "/Users/adarsha.das/Desktop/open_source/BharatMLStack/asynclogger") + + b.ResetTimer() + // for i := 0; i < b.N; i++ { + // if !shardHandler.Write(data) { + // panic("shard reported full unexpectedly") + // } + // } + + total := atomic.Int64{} + success := atomic.Int64{} + failure := atomic.Int64{} + duration := atomic.Int64{} + b.RunParallel(func(pb *testing.PB) { + successCount := 0 + failureCount := 0 + totalCount := 0 + startTime := time.Now() + for pb.Next() { + // Each Next() == one Write() + if !shardHandler.Write(data) { + failureCount++ + } else { + successCount++ + } + totalCount++ + } + duration.Add(int64(time.Since(startTime).Nanoseconds())) + total.Add(int64(totalCount)) + success.Add(int64(successCount)) + failure.Add(int64(failureCount)) + b.Logf("successCount: %d", successCount) + b.Logf("failureCount: %d", failureCount) + b.Logf("totalCount: %d", totalCount) + }) + + b.Logf("total: %d", total.Load()) + b.Logf("success: %d", success.Load()) + b.Logf("failure: %d", failure.Load()) + b.Logf("throughput: %d", total.Load()*chunkSize*1000000000/duration.Load()) +} diff --git a/asynclogger/blob-logger/shard/shardv2_default.go b/asynclogger/blob-logger/shard/shardv2_default.go new file mode 100644 index 00000000..cde0a01d --- /dev/null +++ b/asynclogger/blob-logger/shard/shardv2_default.go @@ -0,0 +1,126 @@ +package shard + +import ( + "sync/atomic" + "unsafe" + _ "unsafe" // required for go:linkname + + "golang.org/x/sys/unix" +) + +// memmove is linked to the Go runtime's highly optimized memmove implementation. +// We mark it noescape so the compiler knows the pointers won't escape. +// +//go:noescape +//go:linkname memmove runtime.memmove +func memmove(dst, src unsafe.Pointer, n uintptr) + +// Shard is a strictly append-only buffer with atomic offset. +// Writes are linear appends; when capacity is exceeded, Write reports "full". +type ShardV2 struct { + data []byte // 64-byte aligned backing buffer + offset int32 // current write offset (in bytes), updated atomically + capacity int32 // total capacity in bytes + id uint32 + inflight atomic.Int64 + readyForFlush atomic.Uint32 +} + +// NewShard allocates a shard with a 64-byte–aligned data slice of the given capacity. +func NewShardV2(capacity int, id uint32) *ShardV2 { + if capacity <= 0 { + panic("NewShard: capacity must be > 0") + } + // Round capacity up to page size (4096) + const page = 4096 + padded := (capacity + page - 1) &^ (page - 1) + + // Create an anonymous private mapping + data, err := unix.Mmap( + -1, 0, + padded, + unix.PROT_READ|unix.PROT_WRITE, + unix.MAP_PRIVATE|unix.MAP_ANONYMOUS, + ) + if err != nil { + panic(err) + } + + // // Allocate a bit extra so that after alignment we still have `capacity` bytes. + // raw := make([]byte, capacity+64) + + // base := uintptr(unsafe.Pointer(&raw[0])) + // aligned := (base + 63) &^ 63 // round up to next multiple of 64 + + // data := unsafe.Slice((*byte)(unsafe.Pointer(aligned)), capacity) + + return &ShardV2{ + data: data, + offset: 0, + capacity: int32(capacity), + id: id, + inflight: atomic.Int64{}, + readyForFlush: atomic.Uint32{}, + } +} + +// Write appends p to the shard. +// Returns (bytesWritten, full): +// - bytesWritten == len(p) and full == false on success +// - bytesWritten == 0 and full == true if there is no space left +func (s *ShardV2) Write(p []byte) (int32, uint32, bool, bool) { + size := int32(len(p)) + if size == 0 { + return 0, s.id, false, false + } + + // Atomically reserve space. + newOffset := atomic.AddInt32(&s.offset, size) + + // If we exceeded capacity, signal "full" to the caller. + if newOffset > s.capacity { + + if s.inflight.Load() == 0 { + return 0, s.id, true, true + } + + return 0, s.id, true, false + } + + s.inflight.Add(1) + + start := newOffset - size + + // Bounds are guaranteed by the capacity check above. + dst := unsafe.Add(unsafe.Pointer(&s.data[0]), uintptr(start)) + // Go 1.20+: unsafe.SliceData(p); fall back to &p[0] for broad compatibility. + src := unsafe.Pointer(&p[0]) + + memmove(dst, src, uintptr(size)) + + s.inflight.Add(-1) + + if s.inflight.Load() == 0 { + return size, s.id, false, true + } + + return size, s.id, false, false +} + +// Remaining returns how many bytes of capacity are left (approximate under concurrency). +func (s *ShardV2) Remaining() int32 { + off := atomic.LoadInt32(&s.offset) + rem := s.capacity - off + if rem < 0 { + return 0 + } + return rem +} + +// Reset zeroes the offset so the shard can be reused. Caller must ensure +// no concurrent writers when calling Reset. +func (s *ShardV2) Reset() { + atomic.StoreInt32(&s.offset, 0) + s.readyForFlush.Store(0) + s.inflight.Store(0) +} diff --git a/asynclogger/blob-logger/shard/shardv3.go b/asynclogger/blob-logger/shard/shardv3.go new file mode 100644 index 00000000..5bbffa0c --- /dev/null +++ b/asynclogger/blob-logger/shard/shardv3.go @@ -0,0 +1,88 @@ +package shard + +import ( + "sync/atomic" + "unsafe" + _ "unsafe" // for go:linkname +) + +const cacheLineSize = 64 + +// Align Shard header so `offset` sits alone in its cache line. +// This reduces false sharing between producers. +type ShardV3 struct { + _ [cacheLineSize]byte // padding before hot fields + basePtr unsafe.Pointer // &data[0] + baseUintptr uintptr // uintptr(basePtr) + capacityBytes uintptr + offset atomic.Uintptr // atomic pointer-sized offset + _ [cacheLineSize]byte // padding after hot fields + alignedStorage []byte // keeps backing array alive + id uint32 +} + +// NewShard allocates a 64-byte aligned region and stores precomputed pointers. +func NewShardV3(capacity int, id uint32) *ShardV3 { + if capacity <= 0 { + panic("invalid shard capacity") + } + + raw := make([]byte, capacity+cacheLineSize) + rawBase := uintptr(unsafe.Pointer(&raw[0])) + + // Align to next 64 bytes + aligned := (rawBase + cacheLineSize - 1) &^ (cacheLineSize - 1) + + data := unsafe.Slice((*byte)(unsafe.Pointer(aligned)), capacity) + + s := &ShardV3{ + basePtr: unsafe.Pointer(&data[0]), + baseUintptr: uintptr(unsafe.Pointer(&data[0])), + capacityBytes: uintptr(capacity), + alignedStorage: raw, + id: id, + } + return s +} + +//go:nosplit +func (s *ShardV3) Write(p []byte) (uintptr, bool) { + size := uintptr(len(p)) + if size == 0 { + return 0, false + } + + // Atomically reserve space — relaxed atomic semantics. + newOffset := s.offset.Add(size) + + // Bounds check - minimal branching + if newOffset > s.capacityBytes { + return 0, true + } + + start := newOffset - size + + // Calculate destination quickly using uintptr math + dst := unsafe.Pointer(s.baseUintptr + start) + + // Source pointer + src := unsafe.Pointer(&p[0]) + + // Hardware-accelerated memmove + memmove(dst, src, size) + + return size, false +} + +// Remaining returns how many bytes are left. Not exact under concurrency. +func (s *ShardV3) Remaining() uintptr { + off := s.offset.Load() + if off >= s.capacityBytes { + return 0 + } + return s.capacityBytes - off +} + +func (s *ShardV3) Reset() { + s.offset.Store(0) +} diff --git a/asynclogger/blob-logger/ssdio/writer_default.go b/asynclogger/blob-logger/ssdio/writer_default.go new file mode 100644 index 00000000..ab908891 --- /dev/null +++ b/asynclogger/blob-logger/ssdio/writer_default.go @@ -0,0 +1,119 @@ +//go:build !linux +// +build !linux + +package ssdio + +import ( + "fmt" + "os" + "path/filepath" + "time" +) + +type SSDWriter struct { + dirPath string + swapShardId int + + filePath string + file *os.File + fileOffset int64 + capacity int64 + + uploadChan chan string + nextFilePath string + nextFile *os.File +} + +func NewSSDWriter(capacity int, dirPath string, shardId int) (*SSDWriter, error) { + capacityAligned := int64(capacity) // no strict alignment needed + + path := fmt.Sprintf("%s/%s/swap_shard_%d.bin", + dirPath, + time.Now().Format("2006-01-02-15-04-05"), + shardId, + ) + + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + return nil, err + } + + f, err := os.Create(path) + if err != nil { + return nil, err + } + + return &SSDWriter{ + dirPath: dirPath, + swapShardId: shardId, + filePath: path, + file: f, + fileOffset: 0, + capacity: capacityAligned, + uploadChan: make(chan string, 1000), + }, nil +} + +func (w *SSDWriter) Write(p []byte) (int, error) { + size := int64(len(p)) + + if w.fileOffset+size > w.capacity { + old := w.filePath + if err := w.swapFiles(); err != nil { + return 0, err + } + w.uploadChan <- old + } + + // Pre-create next file at 90% + if w.nextFile == nil && w.fileOffset+size >= int64(float64(w.capacity)*0.9) { + if err := w.createNewFile(); err != nil { + return 0, err + } + } + + n, err := w.file.WriteAt(p, w.fileOffset) + if err != nil { + return 0, err + } + w.fileOffset += int64(n) + return n, nil +} + +func (w *SSDWriter) createNewFile() error { + path := fmt.Sprintf("%s/%s/swap_shard_%d.bin", + w.dirPath, + time.Now().Format("2006-01-02-15-04-05"), + w.swapShardId, + ) + + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + return err + } + + f, err := os.Create(path) + if err != nil { + return err + } + + w.nextFile = f + w.nextFilePath = path + return nil +} + +func (w *SSDWriter) swapFiles() error { + if w.file != nil { + _ = w.file.Close() + } + + if w.nextFile == nil || w.nextFilePath == "" { + return fmt.Errorf("next file is not set") + } + + w.file = w.nextFile + w.filePath = w.nextFilePath + w.fileOffset = 0 + + w.nextFile = nil + w.nextFilePath = "" + return nil +} diff --git a/asynclogger/blob-logger/ssdio/writer_linux.go b/asynclogger/blob-logger/ssdio/writer_linux.go new file mode 100644 index 00000000..1cf6f3a5 --- /dev/null +++ b/asynclogger/blob-logger/ssdio/writer_linux.go @@ -0,0 +1,135 @@ +//go:build linux +// +build linux + +package ssdio + +import ( + "fmt" + "os" + "path/filepath" + "time" + + "golang.org/x/sys/unix" +) + +type SSDWriter struct { + dirPath string + swapShardId int + filePath string + file *os.File + fd int + fileOffset int + capacity int64 + uploadChan chan string + nextFilePath string + nextFile *os.File + nextFd int + nextFileOffset int +} + +const directIOBlockSize = 4096 // typical; tune if your device needs different + +// alignUp rounds n up to the next multiple of align (power of 2). +func alignUp(n, align int64) int64 { + return (n + align - 1) &^ (align - 1) +} + +func NewSSDWriter(capacity int, dirPath string, swapShardId int) (*SSDWriter, error) { + capacityAligned := alignUp(int64(capacity), directIOBlockSize) + path := fmt.Sprintf("%s/%s/swap_shard_%d.bin", dirPath, time.Now().Format("2006-01-02-15-04-05"), swapShardId) + os.MkdirAll(filepath.Dir(path), 0o755) + fd, err := unix.Open( + path, + unix.O_CREAT|unix.O_TRUNC|unix.O_WRONLY|unix.O_DIRECT|unix.O_SYNC, + 0o644, + ) + if err != nil { + panic(err) + } + // Preallocate the file so the extents are ready for direct I/O. + if err := unix.Fallocate(fd, 0, 0, capacityAligned); err != nil { + _ = unix.Close(fd) + return nil, err + } + file := os.NewFile(uintptr(fd), path) + if file == nil { + _ = unix.Close(fd) + return nil, fmt.Errorf("failed to create file descriptor") + } + return &SSDWriter{ + dirPath: dirPath, + swapShardId: swapShardId, + filePath: path, + file: file, + fd: fd, + fileOffset: 0, + capacity: capacityAligned, + uploadChan: make(chan string, 1000), + }, nil +} + +func (w *SSDWriter) Write(p []byte) (int, error) { + size := len(p) + if int64(w.fileOffset+size) > w.capacity { + filePath := w.filePath + err := w.swapFiles() + if err != nil { + return 0, err + } + w.uploadChan <- filePath + } + if w.fileOffset+size >= int(float64(w.capacity)*0.9) { + return 0, w.createNewFile() + } + n, err := unix.Pwrite(w.fd, p, int64(w.fileOffset)) + if err != nil { + return 0, err + } + w.fileOffset += n + return n, nil +} + +func (w *SSDWriter) createNewFile() error { + path := fmt.Sprintf("%s/%s/swap_shard_%d.bin", w.dirPath, time.Now().Format("2006-01-02-15-04-05"), w.swapShardId) + os.MkdirAll(filepath.Dir(path), 0o755) + fd, err := unix.Open( + path, + unix.O_CREAT|unix.O_TRUNC|unix.O_WRONLY|unix.O_DIRECT|unix.O_SYNC, + 0o644, + ) + if err != nil { + panic(err) + } + // Preallocate the file so the extents are ready for direct I/O. + if err := unix.Fallocate(fd, 0, 0, w.capacity); err != nil { + _ = unix.Close(fd) + return err + } + file := os.NewFile(uintptr(fd), path) + if file == nil { + _ = unix.Close(fd) + return fmt.Errorf("failed to create file descriptor") + } + w.nextFile = file + w.nextFd = fd + w.nextFileOffset = 0 + w.nextFilePath = path + return nil +} + +func (w *SSDWriter) swapFiles() error { + unix.Fsync(w.fd) + _ = unix.Close(w.fd) + if w.nextFile == nil || w.nextFd == 0 || w.nextFileOffset != 0 || w.nextFilePath == "" { + return fmt.Errorf("next file is not set") + } + w.file = w.nextFile + w.fd = w.nextFd + w.fileOffset = w.nextFileOffset + w.filePath = w.nextFilePath + w.nextFile = nil + w.nextFd = 0 + w.nextFileOffset = 0 + w.nextFilePath = "" + return nil +}