Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ Calls to `track`, `identify`, etc. are **buffered in-memory** by the proxy and r
- `debug` (Boolean, optional): Enable debug mode
- `flushIntervalSeconds` (Int, optional): Interval in seconds to flush events (default: 10)
- `maxQueueEvents` (Int, optional): Max events stored in memory (default: 2000)
- `maxDiskEvents` (Int, optional): Max events persisted to disk during extended offline periods (default: 10000). Set to `0` to disable disk persistence entirely — the queue then operates as a pure in-memory ring buffer (oldest event dropped at capacity). Negative values are rejected.

**Proxy behavior (quick notes):**

Expand Down Expand Up @@ -306,7 +307,9 @@ Or in Android Studio: Filter by "MetaRouter"

Queue capacity: The SDK keeps up to 2,000 events in memory (configurable via `maxQueueEvents` in `InitOptions`) with an additional 5 MB byte cap. When either limit is reached, the oldest events are dropped first (drop-oldest).

Disk persistence: The SDK writes a snapshot of the in-memory queue to disk when the app goes to background, when a flush threshold is reached (500 events or 2 MB), and on low-memory callbacks. On next launch, events are rehydrated from disk — events older than 7 days are dropped during rehydration. The snapshot is stored in `noBackupFilesDir` and is deleted after successful rehydration or on `reset()`.
Disk persistence: The SDK writes a snapshot of the in-memory queue to disk when the app goes to background, when a flush threshold is reached (500 events or 2 MB), and on low-memory callbacks. On next launch, events are rehydrated from disk — events older than 7 days are dropped during rehydration. The snapshot is stored in `noBackupFilesDir` and is deleted after successful rehydration or on `reset()`. Disk capacity is bounded by `maxDiskEvents` (default 10,000) — when the cap is hit, the oldest persisted events are dropped first.

Opting out of disk persistence: Set `maxDiskEvents = 0` to disable the disk store entirely. In this mode the SDK behaves as a pure in-memory ring buffer — `maxQueueEvents` (and the 5 MB byte cap) still bound the queue, and the oldest event is dropped when full. Background/low-memory crash-safety flushes become no-ops, so events in memory will not survive a process kill. Use this when local persistence is unwanted (privacy constraints, ephemeral environments, tests).

This SDK uses a circuit breaker around network I/O. It keeps ordering stable, avoids tight retry loops, and backs off cleanly when your cluster is unhealthy or throttling.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ import com.metarouter.analytics.utils.Logger
* @property maxQueueEvents Maximum enriched events held in queue (default: 2000).
* This value also determines the incoming event channel capacity (minimum 100).
* When limits are exceeded, oldest events are dropped.
* @property maxOfflineDiskEvents Maximum events stored on disk during extended offline periods (default: 10000).
* @property maxDiskEvents Maximum events stored on disk during extended offline periods
* (default: 10000). Set to `0` to opt out of disk persistence entirely — the queue then
* operates as a purely in-memory ring buffer, dropping the oldest event when full.
* Negative values are rejected.
*
* @throws IllegalArgumentException if validation fails
*/
Expand All @@ -22,13 +25,15 @@ data class InitOptions(
val flushIntervalSeconds: Int = 10,
val debug: Boolean = false,
val maxQueueEvents: Int = 2000,
val maxOfflineDiskEvents: Int = 10000
val maxDiskEvents: Int = 10000
) {
init {
validateWriteKey()
validateIngestionHost()
validateFlushInterval()
validateMaxQueueEvents()
validateMaxOfflineDiskEvents()
warnIfDiskCapBelowMemoryCap()
}

private fun validateWriteKey() {
Expand Down Expand Up @@ -65,6 +70,21 @@ data class InitOptions(
}
}

private fun validateMaxOfflineDiskEvents() {
require(maxDiskEvents >= 0) {
"MetaRouterAnalyticsClient initialization failed: `maxDiskEvents` must be >= 0 (use 0 to disable disk persistence)."
}
}

private fun warnIfDiskCapBelowMemoryCap() {
if (maxDiskEvents in 1 until maxQueueEvents) {
Logger.warn(
"maxDiskEvents ($maxDiskEvents) is less than maxQueueEvents ($maxQueueEvents) — " +
"memory can hold more events than disk can preserve; events may be dropped during background flush."
)
}
}

private fun isValidUrl(url: String): Boolean {
return try {
val parsed = java.net.URL(url)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class MetaRouterAnalyticsClient private constructor(
val pQueue = PersistableEventQueue(
maxCapacity = options.maxQueueEvents,
diskStore = diskStore,
maxOfflineDiskEvents = options.maxOfflineDiskEvents
maxDiskEvents = options.maxDiskEvents
)
pQueue.rehydrate()
eventQueue = pQueue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class PersistableEventQueue(
private val flushThresholdBytes: Long = 2L * 1024 * 1024,
private val maxCapacityBytes: Long = 5L * 1024 * 1024,
private val eventTTLMs: Long = 7L * 24 * 60 * 60 * 1000,
private val maxOfflineDiskEvents: Int = 10000
private val maxDiskEvents: Int = 10000
) : EventQueueInterface {

companion object {
Expand Down Expand Up @@ -92,11 +92,22 @@ class PersistableEventQueue(
* Enqueue an event to memory.
* When at capacity (event count or byte size), flushes the entire memory queue
* to disk. Events are only dropped if capacity is hit and disk operations fail.
*
* If [maxDiskEvents] is 0, disk persistence is disabled and the queue
* acts as a pure in-memory ring buffer: at capacity, the oldest event is dropped.
*/
@Synchronized
override fun enqueue(event: EnrichedEventPayload) {
val eventSize = estimateEventSize(event)

if (maxDiskEvents == 0) {
dropOldestUntilFits(eventSize)
memoryQueue.addLast(event)
eventSizes.addLast(eventSize)
estimatedBytes += eventSize
return
}

// Over byte capacity: flush memory queue to disk
if (estimatedBytes + eventSize > maxCapacityBytes && memoryQueue.isNotEmpty()) {
flushMemoryToDiskInternal()
Expand Down Expand Up @@ -135,6 +146,19 @@ class PersistableEventQueue(
*/
@Synchronized
override fun requeueToFront(events: List<EnrichedEventPayload>) {
if (maxDiskEvents == 0) {
// In-memory-only mode: at cap, drop newest (back of queue) so the
// requeued retry events at the front are preserved.
events.asReversed().forEach { event ->
val eventSize = estimateEventSize(event)
dropNewestUntilFits(eventSize)
memoryQueue.addFirst(event)
eventSizes.addFirst(eventSize)
estimatedBytes += eventSize
}
return
}

// Initial flush: if adding these events would exceed capacity, flush current memory first.
if ((memoryQueue.size + events.size > maxCapacity ||
estimatedBytes + events.sumOf { estimateEventSize(it) } > maxCapacityBytes) &&
Expand Down Expand Up @@ -179,21 +203,24 @@ class PersistableEventQueue(

/**
* Flush memory queue to disk when the dispatcher triggers a flush while offline.
* Returns true if events were flushed to disk.
* Returns true if events were flushed to disk. No-op (returns false) when
* [maxDiskEvents] is 0.
*/
@Synchronized
override fun flushToOfflineStorage(): Boolean {
if (maxDiskEvents == 0) return false
if (memoryQueue.isEmpty()) return false
flushMemoryToDiskInternal()
return true
}

/**
* Best-effort crash-safety flush: appends current memory queue to disk.
* Called on app background / onTrimMemory.
* Called on app background / onTrimMemory. No-op when [maxDiskEvents] is 0.
*/
@Synchronized
fun flushToDisk() {
if (maxDiskEvents == 0) return
if (memoryQueue.isEmpty()) return
flushMemoryToDiskInternal()
}
Expand Down Expand Up @@ -359,7 +386,7 @@ class PersistableEventQueue(

/**
* Flush all events from memory queue to disk. Appends to any existing disk data,
* enforces [maxOfflineDiskEvents] cap, and resets the memory queue.
* enforces [maxDiskEvents] cap, and resets the memory queue.
* Must be called while holding `synchronized(this)`.
*
* Note: performs disk I/O (read + write) while holding both `this` and `diskLock`.
Expand All @@ -383,8 +410,8 @@ class PersistableEventQueue(
}

var combined = existing + batch
if (combined.size > maxOfflineDiskEvents) {
val dropCount = combined.size - maxOfflineDiskEvents
if (combined.size > maxDiskEvents) {
val dropCount = combined.size - maxDiskEvents
combined = combined.drop(dropCount)
Logger.warn("Offline disk cap reached — dropped $dropCount oldest events")
}
Expand All @@ -395,6 +422,44 @@ class PersistableEventQueue(
}
}

/**
* In-memory-only mode: drop oldest events until the new event of [newEventSize]
* fits under both the event count and byte capacity caps.
* Must be called while holding `synchronized(this)`.
*/
private fun dropOldestUntilFits(newEventSize: Long) {
var dropped = 0
while (memoryQueue.isNotEmpty() &&
(memoryQueue.size >= maxCapacity ||
estimatedBytes + newEventSize > maxCapacityBytes)) {
memoryQueue.removeFirst()
estimatedBytes -= eventSizes.removeFirst()
dropped++
}
if (dropped > 0) {
Logger.warn("In-memory queue cap reached — dropped $dropped oldest event(s) (maxDiskEvents=0)")
}
}

/**
* In-memory-only mode (requeue path): drop newest events from the back so that
* older requeued retries at the front are preserved.
* Must be called while holding `synchronized(this)`.
*/
private fun dropNewestUntilFits(newEventSize: Long) {
var dropped = 0
while (memoryQueue.isNotEmpty() &&
(memoryQueue.size >= maxCapacity ||
estimatedBytes + newEventSize > maxCapacityBytes)) {
memoryQueue.removeLast()
estimatedBytes -= eventSizes.removeLast()
dropped++
}
if (dropped > 0) {
Logger.warn("In-memory queue cap reached during requeue — dropped $dropped newest event(s) (maxDiskEvents=0)")
}
}

/** Delete the disk file and clear the hasOverflowData flag. */
private fun deleteDiskStore() {
diskStore.delete()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package com.metarouter.analytics

import android.util.Log
import io.mockk.every
import io.mockk.mockkStatic
import io.mockk.unmockkStatic
import io.mockk.verify
import org.junit.After
import org.junit.Assert.*
import org.junit.Before
import org.junit.Test
import org.junit.runner.RunWith
import org.robolectric.RobolectricTestRunner
Expand Down Expand Up @@ -200,26 +207,125 @@ class InitOptionsTest {
assertTrue(exception?.message?.contains("writeKey") == true)
}

// ===== maxOfflineDiskEvents =====
// ===== maxDiskEvents =====

@Test
fun `maxOfflineDiskEvents defaults to 10000`() {
fun `maxDiskEvents defaults to 10000`() {
val options = InitOptions(
writeKey = "test-key",
ingestionHost = "https://example.com"
)

assertEquals(10000, options.maxOfflineDiskEvents)
assertEquals(10000, options.maxDiskEvents)
}

@Test
fun `custom maxOfflineDiskEvents accepted`() {
fun `custom maxDiskEvents accepted`() {
val options = InitOptions(
writeKey = "test-key",
ingestionHost = "https://example.com",
maxOfflineDiskEvents = 5000
maxDiskEvents = 5000
)

assertEquals(5000, options.maxOfflineDiskEvents)
assertEquals(5000, options.maxDiskEvents)
}

@Test
fun `zero maxDiskEvents is accepted as in-memory-only opt-out`() {
val options = InitOptions(
writeKey = "test-key",
ingestionHost = "https://example.com",
maxDiskEvents = 0
)

assertEquals(0, options.maxDiskEvents)
}

@Test(expected = IllegalArgumentException::class)
fun `negative maxDiskEvents throws exception`() {
InitOptions(
writeKey = "test-key",
ingestionHost = "https://example.com",
maxDiskEvents = -1
)
}

// ===== maxDiskEvents vs maxQueueEvents mismatch warning =====

@Test
fun `warns when maxDiskEvents is less than maxQueueEvents`() {
mockkStatic(Log::class)
every { Log.w(any(), any<String>()) } returns 0
try {
InitOptions(
writeKey = "test-key",
ingestionHost = "https://example.com",
maxQueueEvents = 2000,
maxDiskEvents = 500
)
verify {
Log.w(
any(),
match<String> {
it.contains("maxDiskEvents (500)") &&
it.contains("maxQueueEvents (2000)") &&
it.contains("dropped")
}
)
}
} finally {
unmockkStatic(Log::class)
}
}

@Test
fun `does not warn when maxDiskEvents equals maxQueueEvents`() {
mockkStatic(Log::class)
every { Log.w(any(), any<String>()) } returns 0
try {
InitOptions(
writeKey = "test-key",
ingestionHost = "https://example.com",
maxQueueEvents = 2000,
maxDiskEvents = 2000
)
verify(exactly = 0) { Log.w(any(), any<String>()) }
} finally {
unmockkStatic(Log::class)
}
}

@Test
fun `does not warn when maxDiskEvents exceeds maxQueueEvents`() {
mockkStatic(Log::class)
every { Log.w(any(), any<String>()) } returns 0
try {
InitOptions(
writeKey = "test-key",
ingestionHost = "https://example.com",
maxQueueEvents = 2000,
maxDiskEvents = 10000
)
verify(exactly = 0) { Log.w(any(), any<String>()) }
} finally {
unmockkStatic(Log::class)
}
}

@Test
fun `does not warn when maxDiskEvents is zero (in-memory-only opt-out)`() {
mockkStatic(Log::class)
every { Log.w(any(), any<String>()) } returns 0
try {
InitOptions(
writeKey = "test-key",
ingestionHost = "https://example.com",
maxQueueEvents = 2000,
maxDiskEvents = 0
)
verify(exactly = 0) { Log.w(any(), any<String>()) }
} finally {
unmockkStatic(Log::class)
}
}
}
Loading
Loading