Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
3b72387
externaize the package for cache so it can be imported
nileshsolankimeesho Dec 24, 2025
b8ffbaf
do not start lockless functions if lockless not enabled
nileshsolankimeesho Jan 9, 2026
bfd1337
should rewrite logic off for now
nileshsolankimeesho Jan 9, 2026
90cae0f
change shard cache metric maps to sync maps
nileshsolankimeesho Jan 9, 2026
19887a9
improve metrics package
nileshsolankimeesho Jan 12, 2026
0d014c4
move metrics to pkg
nileshsolankimeesho Jan 12, 2026
8a56a25
full sampling rate
nileshsolankimeesho Jan 13, 2026
f565b32
fix metrics
nileshsolankimeesho Jan 13, 2026
f76e9e8
fix metrics
nileshsolankimeesho Jan 13, 2026
626ded4
fix metrics
nileshsolankimeesho Jan 13, 2026
4eb459d
include fine grained stats
nileshsolankimeesho Jan 15, 2026
54ddd21
correct stats
nileshsolankimeesho Jan 19, 2026
f5a1d6c
remove syncpool changes
nileshsolankimeesho Jan 19, 2026
1788d64
grid search fixes
nileshsolankimeesho Jan 21, 2026
97da61e
clear files at mountpoint on start
nileshsolankimeesho Jan 22, 2026
56a5080
fixed delete manager and added file stats
nileshsolankimeesho Jan 22, 2026
659d8e2
fix filewrite error after first punch hole
nileshsolankimeesho Jan 22, 2026
900845b
console logger fixes
nileshsolankimeesho Feb 10, 2026
7d279f5
console logger fixes
nileshsolankimeesho Feb 10, 2026
e92786d
add direct statsd metrics for latencies
nileshsolankimeesho Feb 10, 2026
1ce67d1
direct statsD metric for write latency
nileshsolankimeesho Feb 10, 2026
903165d
try lockless
nileshsolankimeesho Feb 10, 2026
b0a8e47
return error on trim needed
nileshsolankimeesho Feb 10, 2026
f16d4a6
add pread and pwrite latencies
nileshsolankimeesho Feb 11, 2026
dcac7f9
remove dsync from pwrite
nileshsolankimeesho Feb 11, 2026
0b27e24
remove o-direct from write path
nileshsolankimeesho Feb 11, 2026
aeeff96
error read if more than 1ms
nileshsolankimeesho Feb 11, 2026
aeb8bd3
revert ticker
nileshsolankimeesho Feb 11, 2026
cbc6d3d
add memtable chunking on flush
nileshsolankimeesho Feb 11, 2026
e3abf42
chunk size smaller
nileshsolankimeesho Feb 11, 2026
710c80e
stats time wasted in lock
nileshsolankimeesho Feb 11, 2026
8fb761f
simplify metrics
nileshsolankimeesho Feb 12, 2026
f1d3b26
implement iouring
nileshsolankimeesho Feb 12, 2026
98362c2
change lock position for rlock
nileshsolankimeesho Feb 12, 2026
0afc603
metable chunk size 16*4
nileshsolankimeesho Feb 12, 2026
02f92f7
fix rb loop
nileshsolankimeesho Feb 12, 2026
fbaa622
correct the mutex used for rlock
nileshsolankimeesho Feb 12, 2026
5036e0b
implement iouring batching
nileshsolankimeesho Feb 12, 2026
8c510a3
add pread metric and change iouring to wait 500microsecs
nileshsolankimeesho Feb 13, 2026
f2126ff
change write iouring batch size
nileshsolankimeesho Feb 13, 2026
a9cefea
fix iouring write
nileshsolankimeesho Feb 13, 2026
737e651
track chunked pwrite and pread latency
nileshsolankimeesho Feb 13, 2026
399d797
iouring no wait fixes
nileshsolankimeesho Feb 17, 2026
93c7164
remove metrics and use statsD only
nileshsolankimeesho Feb 20, 2026
7025143
add more metrics
nileshsolankimeesho Feb 22, 2026
213d00c
parallelize iourings
nileshsolankimeesho Feb 22, 2026
08c3475
add iouring size metrics
nileshsolankimeesho Feb 22, 2026
fa4e31c
disable metrics by default
nileshsolankimeesho Feb 23, 2026
af09e73
disable metrics by default
nileshsolankimeesho Feb 23, 2026
36de0fa
wait 1 milli second
nileshsolankimeesho Feb 23, 2026
0602210
remove profiling and reduce aligned page count
nileshsolankimeesho Feb 24, 2026
ccb8465
4 iorings
nileshsolankimeesho Feb 24, 2026
cc7b1ce
2 io urings
nileshsolankimeesho Feb 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"mode": "debug",
"program": "${workspaceFolder}/flashring/cmd/flashringtest",
"env": {
"PLAN": "readthrough-batched"
"PLAN": "readthrough"
}
}

Expand Down
2 changes: 1 addition & 1 deletion flashring/cmd/flashringtest/plan_badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"strings"
"sync"

cachepkg "github.com/Meesho/BharatMLStack/flashring/internal/cache"
cachepkg "github.com/Meesho/BharatMLStack/flashring/pkg/cache"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
Expand Down
2 changes: 1 addition & 1 deletion flashring/cmd/flashringtest/plan_freecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"strings"
"sync"

cachepkg "github.com/Meesho/BharatMLStack/flashring/internal/cache"
cachepkg "github.com/Meesho/BharatMLStack/flashring/pkg/cache"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
Expand Down
34 changes: 12 additions & 22 deletions flashring/cmd/flashringtest/plan_lockless.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"sync"
"time"

cachepkg "github.com/Meesho/BharatMLStack/flashring/internal/cache"
cachepkg "github.com/Meesho/BharatMLStack/flashring/pkg/cache"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
Expand All @@ -35,13 +35,13 @@ func planLockless() {
cpuProfile string
)

flag.StringVar(&mountPoint, "mount", "/media/a0d00kc/trishul/", "data directory for shard files")
flag.IntVar(&numShards, "shards", 500, "number of shards")
flag.IntVar(&keysPerShard, "keys-per-shard", 10_00_00, "keys per shard")
flag.IntVar(&memtableMB, "memtable-mb", 16, "memtable size in MiB")
flag.StringVar(&mountPoint, "mount", "/mnt/disks/nvme", "data directory for shard files")
flag.IntVar(&numShards, "shards", 100, "number of shards")
flag.IntVar(&keysPerShard, "keys-per-shard", 3_00_000, "keys per shard")
flag.IntVar(&memtableMB, "memtable-mb", 2, "memtable size in MiB")
flag.IntVar(&fileSizeMultiplier, "file-size-multiplier", 2, "file size in GiB per shard")
flag.IntVar(&readWorkers, "readers", 8, "number of read workers")
flag.IntVar(&writeWorkers, "writers", 8, "number of write workers")
flag.IntVar(&readWorkers, "readers", 16, "number of read workers")
flag.IntVar(&writeWorkers, "writers", 16, "number of write workers")
flag.IntVar(&sampleSecs, "sample-secs", 30, "predictor sampling window in seconds")
flag.Int64Var(&iterations, "iterations", 100_000_000, "number of iterations")
flag.Float64Var(&aVal, "a", 0.4, "a value for the predictor")
Expand Down Expand Up @@ -84,7 +84,7 @@ func planLockless() {
}

memtableSizeInBytes := int32(memtableMB) * 1024 * 1024
fileSizeInBytes := int64(fileSizeMultiplier) * int64(memtableSizeInBytes)
fileSizeInBytes := int64(fileSizeMultiplier) * 1024 * 1024 * 1024 // fileSizeMultiplier in GiB

cfg := cachepkg.WrapCacheConfig{
NumShards: numShards,
Expand All @@ -95,21 +95,11 @@ func planLockless() {
GridSearchEpsilon: 0.0001,
SampleDuration: time.Duration(sampleSecs) * time.Second,

// Pass the metrics collector to record cache metrics
MetricsRecorder: InitMetricsCollector(),
//lockless mode for PutLL/GetLL
EnableLockless: true,
}

// Set additional input parameters that the cache doesn't know about
metricsCollector.SetShards(numShards)
metricsCollector.SetKeysPerShard(keysPerShard)
metricsCollector.SetReadWorkers(readWorkers)
metricsCollector.SetWriteWorkers(writeWorkers)
metricsCollector.SetPlan("lockless")

// Start background goroutine to wait for shutdown signal and export CSV
go RunmetricsWaitForShutdown()

pc, err := cachepkg.NewWrapCache(cfg, mountPoint, logStats)
pc, err := cachepkg.NewWrapCache(cfg, mountPoint)
if err != nil {
panic(err)
}
Expand All @@ -121,7 +111,7 @@ func planLockless() {
missedKeyChanList[i] = make(chan int)
}

totalKeys := keysPerShard * numShards
totalKeys := 30_000_000
str1kb := strings.Repeat("a", 1024)
str1kb = "%d" + str1kb

Expand Down
6 changes: 3 additions & 3 deletions flashring/cmd/flashringtest/plan_random_gausian.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"sync"
"time"

cachepkg "github.com/Meesho/BharatMLStack/flashring/internal/cache"
cachepkg "github.com/Meesho/BharatMLStack/flashring/pkg/cache"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
Expand Down Expand Up @@ -83,7 +83,7 @@ func planRandomGaussian() {
}

memtableSizeInBytes := int32(memtableMB) * 1024 * 1024
fileSizeInBytes := int64(fileSizeMultiplier) * int64(memtableSizeInBytes)
fileSizeInBytes := int64(fileSizeMultiplier) * 1024 * 1024 * 1024 // fileSizeMultiplier in GiB

cfg := cachepkg.WrapCacheConfig{
NumShards: numShards,
Expand All @@ -95,7 +95,7 @@ func planRandomGaussian() {
SampleDuration: time.Duration(sampleSecs) * time.Second,
}

pc, err := cachepkg.NewWrapCache(cfg, mountPoint, logStats)
pc, err := cachepkg.NewWrapCache(cfg, mountPoint)
if err != nil {
panic(err)
}
Expand Down
46 changes: 17 additions & 29 deletions flashring/cmd/flashringtest/plan_readthrough_gausian.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"sync"
"time"

cachepkg "github.com/Meesho/BharatMLStack/flashring/internal/cache"
cachepkg "github.com/Meesho/BharatMLStack/flashring/pkg/cache"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
Expand All @@ -24,7 +24,7 @@ func planReadthroughGaussian() {
numShards int
keysPerShard int
memtableMB int
fileSizeMultiplier int
fileSizeMultiplier float64
readWorkers int
writeWorkers int
sampleSecs int
Expand All @@ -35,13 +35,13 @@ func planReadthroughGaussian() {
cpuProfile string
)

flag.StringVar(&mountPoint, "mount", "/media/a0d00kc/trishul/", "data directory for shard files")
flag.IntVar(&numShards, "shards", 500, "number of shards")
flag.IntVar(&keysPerShard, "keys-per-shard", 4_00_00, "keys per shard")
flag.IntVar(&memtableMB, "memtable-mb", 16, "memtable size in MiB")
flag.IntVar(&fileSizeMultiplier, "file-size-multiplier", 2, "file size in GiB per shard")
flag.IntVar(&readWorkers, "readers", 8, "number of read workers")
flag.IntVar(&writeWorkers, "writers", 8, "number of write workers")
flag.StringVar(&mountPoint, "mount", "/mnt/disks/nvme/", "data directory for shard files")
flag.IntVar(&numShards, "shards", 50, "number of shards")
flag.IntVar(&keysPerShard, "keys-per-shard", 6_00_000, "keys per shard")
flag.IntVar(&memtableMB, "memtable-mb", 2, "memtable size in MiB")
flag.Float64Var(&fileSizeMultiplier, "file-size-multiplier", 0.25, "file size in GiB per shard")
flag.IntVar(&readWorkers, "readers", 16, "number of read workers")
flag.IntVar(&writeWorkers, "writers", 16, "number of write workers")
flag.IntVar(&sampleSecs, "sample-secs", 30, "predictor sampling window in seconds")
flag.Int64Var(&iterations, "iterations", 100_000_000, "number of iterations")
flag.Float64Var(&aVal, "a", 0.4, "a value for the predictor")
Expand Down Expand Up @@ -84,7 +84,7 @@ func planReadthroughGaussian() {
}

memtableSizeInBytes := int32(memtableMB) * 1024 * 1024
fileSizeInBytes := int64(fileSizeMultiplier) * int64(memtableSizeInBytes)
fileSizeInBytes := int64(float64(fileSizeMultiplier) * 1024 * 1024 * 1024) // fileSizeMultiplier in GiB

cfg := cachepkg.WrapCacheConfig{
NumShards: numShards,
Expand All @@ -94,22 +94,9 @@ func planReadthroughGaussian() {
ReWriteScoreThreshold: 0.8,
GridSearchEpsilon: 0.0001,
SampleDuration: time.Duration(sampleSecs) * time.Second,

// Pass the metrics collector to record cache metrics
MetricsRecorder: InitMetricsCollector(),
}

// Set additional input parameters that the cache doesn't know about
metricsCollector.SetShards(numShards)
metricsCollector.SetKeysPerShard(keysPerShard)
metricsCollector.SetReadWorkers(readWorkers)
metricsCollector.SetWriteWorkers(writeWorkers)
metricsCollector.SetPlan("readthrough")

// Start background goroutine to wait for shutdown signal and export CSV
go RunmetricsWaitForShutdown()

pc, err := cachepkg.NewWrapCache(cfg, mountPoint, logStats)
pc, err := cachepkg.NewWrapCache(cfg, mountPoint)
if err != nil {
panic(err)
}
Expand All @@ -121,7 +108,7 @@ func planReadthroughGaussian() {
missedKeyChanList[i] = make(chan int)
}

totalKeys := keysPerShard * numShards
totalKeys := 30_000_000
str1kb := strings.Repeat("a", 1024)
str1kb = "%d" + str1kb

Expand All @@ -139,7 +126,7 @@ func planReadthroughGaussian() {
key := fmt.Sprintf("key%d", k)
val := []byte(fmt.Sprintf(str1kb, k))
if err := pc.Put(key, val, 60); err != nil {
panic(err)
log.Error().Err(err).Msgf("error putting key %s", key)
}
if k%5000000 == 0 {
fmt.Printf("----------------------------------------------prepopulated %d keys\n", k)
Expand All @@ -158,7 +145,7 @@ func planReadthroughGaussian() {
key := fmt.Sprintf("key%d", mk)
val := []byte(fmt.Sprintf(str1kb, mk))
if err := pc.Put(key, val, 60); err != nil {
panic(err)
log.Error().Err(err).Msgf("error putting key %s", key)
}
}
}(w)
Expand All @@ -183,13 +170,14 @@ func planReadthroughGaussian() {
}

if expired {
panic("key expired")
log.Error().Msgf("key %s expired", key)
// panic("key expired")

}
if found && string(val) != fmt.Sprintf(str1kb, randomval) {
panic("value mismatch")
}
if k%5000000 == 0 {
if k%50000 == 0 {
fmt.Printf("----------------------------------------------read %d keys %d readerid\n", k, workerID)
}
}
Expand Down
19 changes: 3 additions & 16 deletions flashring/cmd/flashringtest/plan_readthrough_gausian_batched.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"sync"
"time"

cachepkg "github.com/Meesho/BharatMLStack/flashring/internal/cache"
cachepkg "github.com/Meesho/BharatMLStack/flashring/pkg/cache"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
Expand Down Expand Up @@ -93,7 +93,7 @@ func planReadthroughGaussianBatched() {
}

memtableSizeInBytes := int32(memtableMB) * 1024 * 1024
fileSizeInBytes := int64(fileSizeMultiplier) * int64(memtableSizeInBytes)
fileSizeInBytes := int64(fileSizeMultiplier) * 1024 * 1024 * 1024 // fileSizeMultiplier in GiB

cfg := cachepkg.WrapCacheConfig{
NumShards: numShards,
Expand All @@ -108,22 +108,9 @@ func planReadthroughGaussianBatched() {
EnableBatching: enableBatching,
BatchWindowMicros: batchWindowMicros,
MaxBatchSize: maxBatchSize,

// Pass the metrics collector to record cache metrics
MetricsRecorder: InitMetricsCollector(),
}

// Set additional input parameters that the cache doesn't know about
metricsCollector.SetShards(numShards)
metricsCollector.SetKeysPerShard(keysPerShard)
metricsCollector.SetReadWorkers(readWorkers)
metricsCollector.SetWriteWorkers(writeWorkers)
metricsCollector.SetPlan("readthrough-batched")

// Start background goroutine to wait for shutdown signal and export CSV
go RunmetricsWaitForShutdown()

pc, err := cachepkg.NewWrapCache(cfg, mountPoint, logStats)
pc, err := cachepkg.NewWrapCache(cfg, mountPoint)
if err != nil {
panic(err)
}
Expand Down
Loading