diff --git a/README.md b/README.md index 9de3b55..8612094 100644 --- a/README.md +++ b/README.md @@ -79,7 +79,7 @@ Instead of a fixed circular buffer, `StreamBuffer` uses a `Deque` that g The `InputStream` and `OutputStream` can be used concurrently from different threads without additional synchronization: - All `Deque` accesses are guarded by a `bufferLock` object. -- State fields (`streamClosed`, `safeWrite`, `availableBytes`, `positionAtCurrentBufferEntry`, `maxBufferElements`) are `volatile`. +- State fields (`streamClosed`, `safeWrite`, `availableBytes`, `positionAtCurrentBufferEntry`, `maxBufferElements`, `maxAllocationSize`, `isTrimRunning`, `maxObservedBytes`, `totalBytesWritten`, `totalBytesRead`) are `volatile`. - A `Semaphore signalModification` blocks reading threads until data is written or the stream is closed, avoiding busy-waiting. External semaphores can be registered via `addSignal` for thread-decoupled notification. ### No Write Deadlock @@ -113,6 +113,10 @@ sb.setMaxBufferElements(0); // disable trimming entirely Trimming is triggered by writes, not by `setMaxBufferElements`. The trim internally bypasses `safeWrite` (via an `ignoreSafeWrite` flag) because the byte arrays it produces are not reachable from outside the buffer. +If `maxAllocationSize` is set (see below), trim may produce multiple smaller chunks instead of one. Trim is also skipped automatically when consolidation would not reduce the chunk count below `maxBufferElements` — for example, when `maxAllocationSize` is small enough that the resulting chunk count would still exceed the threshold. This prevents repeated no-op trims on every write. + +Use `isTrimRunning()` to observe whether a trim is currently executing. This value is `volatile` and can change at any time in concurrent scenarios. + ### Large Buffer Support `available()` returns `Integer.MAX_VALUE` when the number of buffered bytes exceeds `Integer.MAX_VALUE`, correctly handling buffers larger than 2 GB. @@ -142,8 +146,75 @@ while (!done) { | `addSignal(Semaphore)` | Registers an external semaphore; throws `NullPointerException` if null | | `removeSignal(Semaphore)` | Removes a semaphore; returns `false` if not found or null | +For trim lifecycle events specifically, see [Trim Observer Signals](#trim-observer-signals) below. + Signals are stored in a `CopyOnWriteArrayList` for thread-safe iteration. The "max 1 permit" pattern means rapid writes collapse into a single wake-up — the observer should check buffer state (e.g., `isClosed()`, `available()`) after waking to determine what changed. +### Statistics Tracking + +`StreamBuffer` tracks cumulative I/O statistics for the bytes flowing through the user-facing API. Internal reads and writes performed by the trim operation are excluded — `isTrimRunning` is checked before updating the counters so the statistics always reflect user I/O only. + +```java +StreamBuffer sb = new StreamBuffer(); +OutputStream os = sb.getOutputStream(); +InputStream is = sb.getInputStream(); + +os.write(new byte[]{1, 2, 3}); // totalBytesWritten = 3 + +byte[] buf = new byte[2]; +is.read(buf); // totalBytesRead = 2 + +System.out.println(sb.getTotalBytesWritten()); // 3 +System.out.println(sb.getTotalBytesRead()); // 2 +System.out.println(sb.getMaxObservedBytes()); // 3 (peak bytes in buffer) +``` + +**API:** + +| Method | Description | +|--------|-------------| +| `getTotalBytesWritten()` | Cumulative bytes written by user I/O operations (excludes internal trim) | +| `getTotalBytesRead()` | Cumulative bytes consumed by user reads (excludes internal trim) | +| `getMaxObservedBytes()` | Peak value of `availableBytes` ever observed | + +### Configurable Trim Allocation Size + +By default trim consolidates all buffered data into a single byte array. `maxAllocationSize` limits how large each individual byte array can be during consolidation. When `availableBytes > maxAllocationSize`, trim produces multiple smaller chunks: + +```java +sb.setMaxAllocationSize(1024 * 1024); // cap each consolidated chunk at 1 MiB +long current = sb.getMaxAllocationSize(); +``` + +- Default: `Integer.MAX_VALUE` (effectively one chunk per trim pass). +- Throws `IllegalArgumentException` if the value is ≤ 0. +- Trim is skipped automatically when the resulting chunk count would not be smaller than the current `Deque` size — this prevents repeated no-op trims when `maxAllocationSize` is very small. + +### Trim Observer Signals + +Register `Semaphore` objects to be notified when a trim cycle starts or ends. This uses the same "max 1 permit" semaphore pattern as the general modification signals: + +```java +Semaphore trimStarted = new Semaphore(0); +Semaphore trimEnded = new Semaphore(0); +sb.addTrimStartSignal(trimStarted); +sb.addTrimEndSignal(trimEnded); + +// Observer thread: +trimStarted.acquire(); // blocks until trim begins +// ... trim is running ... +trimEnded.acquire(); // blocks until trim finishes +``` + +**API:** + +| Method | Description | +|--------|-------------| +| `addTrimStartSignal(Semaphore)` | Registers a semaphore released when trim starts; throws `NullPointerException` if null | +| `removeTrimStartSignal(Semaphore)` | Removes a trim-start semaphore; returns `false` if not found or null | +| `addTrimEndSignal(Semaphore)` | Registers a semaphore released when trim ends; throws `NullPointerException` if null | +| `removeTrimEndSignal(Semaphore)` | Removes a trim-end semaphore; returns `false` if not found or null | + ## API Reference ### `StreamBuffer` @@ -163,9 +234,20 @@ public class StreamBuffer implements Closeable | `setSafeWrite(boolean)` | Enables or disables safe write (byte array cloning) | | `getMaxBufferElements()` | Returns the current trim threshold | | `setMaxBufferElements(int)` | Sets the trim threshold; `<= 0` disables trimming | -| `getBufferSize()` | Returns the current number of byte array entries in the FIFO | +| `getBufferSize()` | Returns the current number of byte array entries in the FIFO (legacy) | +| `getBufferElementCount()` | Returns the current number of byte arrays in the internal queue (synchronized) | +| `getTotalBytesWritten()` | Cumulative bytes written by user I/O (excludes internal trim) | +| `getTotalBytesRead()` | Cumulative bytes consumed by user reads (excludes internal trim) | +| `getMaxObservedBytes()` | Peak value of available bytes ever observed | +| `getMaxAllocationSize()` | Returns the maximum byte-array size used during trim consolidation | +| `setMaxAllocationSize(long)` | Sets the maximum allocation size; throws `IllegalArgumentException` if ≤ 0 | +| `isTrimRunning()` | `true` while trim is executing; volatile, may change at any time | | `addSignal(Semaphore)` | Registers an external semaphore for thread-decoupled notification | | `removeSignal(Semaphore)` | Removes a registered semaphore | +| `addTrimStartSignal(Semaphore)` | Registers a semaphore released when trim starts | +| `removeTrimStartSignal(Semaphore)` | Removes a trim-start semaphore | +| `addTrimEndSignal(Semaphore)` | Registers a semaphore released when trim ends | +| `removeTrimEndSignal(Semaphore)` | Removes a trim-end semaphore | | `blockDataAvailable()` | **Deprecated.** Blocks until at least one byte is available | ### Static Validation Methods @@ -216,7 +298,7 @@ mvn org.pitest:pitest-maven:mutationCoverage ## Testing -Tests are in `StreamBufferTest` using JUnit 4 with `DataProviderRunner` from `junit-dataprovider`. Most behavioral tests are parameterized across three write strategies: +Tests are in `StreamBufferTest` using JUnit 5 (JUnit Jupiter). Most behavioral tests are parameterized across three write strategies: | `WriteMethod` | Description | |---------------|-------------| @@ -242,8 +324,15 @@ Test coverage includes: - `addSignal(null)` throwing `NullPointerException` - Thread-decoupled signal barrier — observer wakes in its own thread - `correctOffsetAndLengthToRead` and `correctOffsetAndLengthToWrite` — all branches including integer overflow -- `getBufferSize()` on an empty buffer +- `getBufferSize()` and `getBufferElementCount()` on an empty buffer - `blockDataAvailable()` with data written before and after the call +- Statistics tracking: `getTotalBytesWritten`, `getTotalBytesRead`, `getMaxObservedBytes` — user I/O only, excluding internal trim operations +- `setMaxAllocationSize` / `getMaxAllocationSize` — boundary values, trim with chunked allocation +- `isTrimRunning()` flag transitions during concurrent trim execution +- Trim observer signals (`addTrimStartSignal`, `addTrimEndSignal`) — semaphore released at correct lifecycle points +- Configuration changes during active trim (`setMaxBufferElements`, `setMaxAllocationSize`) — verified not to affect running trim +- Concurrent close during active trim — no exceptions or deadlock +- `decideTrimExecution` pure function — comprehensive table-driven tests covering all boundary conditions and the smart-skip edge case ## License