Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 93 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ Instead of a fixed circular buffer, `StreamBuffer` uses a `Deque<byte[]>` that g
The `InputStream` and `OutputStream` can be used concurrently from different threads without additional synchronization:

- All `Deque` accesses are guarded by a `bufferLock` object.
- State fields (`streamClosed`, `safeWrite`, `availableBytes`, `positionAtCurrentBufferEntry`, `maxBufferElements`) are `volatile`.
- State fields (`streamClosed`, `safeWrite`, `availableBytes`, `positionAtCurrentBufferEntry`, `maxBufferElements`, `maxAllocationSize`, `isTrimRunning`, `maxObservedBytes`, `totalBytesWritten`, `totalBytesRead`) are `volatile`.
- A `Semaphore signalModification` blocks reading threads until data is written or the stream is closed, avoiding busy-waiting. External semaphores can be registered via `addSignal` for thread-decoupled notification.

### No Write Deadlock
Expand Down Expand Up @@ -113,6 +113,10 @@ sb.setMaxBufferElements(0); // disable trimming entirely

Trimming is triggered by writes, not by `setMaxBufferElements`. The trim internally bypasses `safeWrite` (via an `ignoreSafeWrite` flag) because the byte arrays it produces are not reachable from outside the buffer.

If `maxAllocationSize` is set (see below), trim may produce multiple smaller chunks instead of one. Trim is also skipped automatically when consolidation would not reduce the chunk count below `maxBufferElements` — for example, when `maxAllocationSize` is small enough that the resulting chunk count would still exceed the threshold. This prevents repeated no-op trims on every write.

Use `isTrimRunning()` to observe whether a trim is currently executing. This value is `volatile` and can change at any time in concurrent scenarios.

### Large Buffer Support

`available()` returns `Integer.MAX_VALUE` when the number of buffered bytes exceeds `Integer.MAX_VALUE`, correctly handling buffers larger than 2 GB.
Expand Down Expand Up @@ -142,8 +146,75 @@ while (!done) {
| `addSignal(Semaphore)` | Registers an external semaphore; throws `NullPointerException` if null |
| `removeSignal(Semaphore)` | Removes a semaphore; returns `false` if not found or null |

For trim lifecycle events specifically, see [Trim Observer Signals](#trim-observer-signals) below.

Signals are stored in a `CopyOnWriteArrayList` for thread-safe iteration. The "max 1 permit" pattern means rapid writes collapse into a single wake-up — the observer should check buffer state (e.g., `isClosed()`, `available()`) after waking to determine what changed.

### Statistics Tracking

`StreamBuffer` tracks cumulative I/O statistics for the bytes flowing through the user-facing API. Internal reads and writes performed by the trim operation are excluded — `isTrimRunning` is checked before updating the counters so the statistics always reflect user I/O only.

```java
StreamBuffer sb = new StreamBuffer();
OutputStream os = sb.getOutputStream();
InputStream is = sb.getInputStream();

os.write(new byte[]{1, 2, 3}); // totalBytesWritten = 3

byte[] buf = new byte[2];
is.read(buf); // totalBytesRead = 2

System.out.println(sb.getTotalBytesWritten()); // 3
System.out.println(sb.getTotalBytesRead()); // 2
System.out.println(sb.getMaxObservedBytes()); // 3 (peak bytes in buffer)
```

**API:**

| Method | Description |
|--------|-------------|
| `getTotalBytesWritten()` | Cumulative bytes written by user I/O operations (excludes internal trim) |
| `getTotalBytesRead()` | Cumulative bytes consumed by user reads (excludes internal trim) |
| `getMaxObservedBytes()` | Peak value of `availableBytes` ever observed |

### Configurable Trim Allocation Size

By default trim consolidates all buffered data into a single byte array. `maxAllocationSize` limits how large each individual byte array can be during consolidation. When `availableBytes > maxAllocationSize`, trim produces multiple smaller chunks:

```java
sb.setMaxAllocationSize(1024 * 1024); // cap each consolidated chunk at 1 MiB
long current = sb.getMaxAllocationSize();
```

- Default: `Integer.MAX_VALUE` (effectively one chunk per trim pass).
- Throws `IllegalArgumentException` if the value is ≤ 0.
- Trim is skipped automatically when the resulting chunk count would not be smaller than the current `Deque` size — this prevents repeated no-op trims when `maxAllocationSize` is very small.

### Trim Observer Signals

Register `Semaphore` objects to be notified when a trim cycle starts or ends. This uses the same "max 1 permit" semaphore pattern as the general modification signals:

```java
Semaphore trimStarted = new Semaphore(0);
Semaphore trimEnded = new Semaphore(0);
sb.addTrimStartSignal(trimStarted);
sb.addTrimEndSignal(trimEnded);

// Observer thread:
trimStarted.acquire(); // blocks until trim begins
// ... trim is running ...
trimEnded.acquire(); // blocks until trim finishes
```

**API:**

| Method | Description |
|--------|-------------|
| `addTrimStartSignal(Semaphore)` | Registers a semaphore released when trim starts; throws `NullPointerException` if null |
| `removeTrimStartSignal(Semaphore)` | Removes a trim-start semaphore; returns `false` if not found or null |
| `addTrimEndSignal(Semaphore)` | Registers a semaphore released when trim ends; throws `NullPointerException` if null |
| `removeTrimEndSignal(Semaphore)` | Removes a trim-end semaphore; returns `false` if not found or null |

## API Reference

### `StreamBuffer`
Expand All @@ -163,9 +234,20 @@ public class StreamBuffer implements Closeable
| `setSafeWrite(boolean)` | Enables or disables safe write (byte array cloning) |
| `getMaxBufferElements()` | Returns the current trim threshold |
| `setMaxBufferElements(int)` | Sets the trim threshold; `<= 0` disables trimming |
| `getBufferSize()` | Returns the current number of byte array entries in the FIFO |
| `getBufferSize()` | Returns the current number of byte array entries in the FIFO (legacy) |
| `getBufferElementCount()` | Returns the current number of byte arrays in the internal queue (synchronized) |
| `getTotalBytesWritten()` | Cumulative bytes written by user I/O (excludes internal trim) |
| `getTotalBytesRead()` | Cumulative bytes consumed by user reads (excludes internal trim) |
| `getMaxObservedBytes()` | Peak value of available bytes ever observed |
| `getMaxAllocationSize()` | Returns the maximum byte-array size used during trim consolidation |
| `setMaxAllocationSize(long)` | Sets the maximum allocation size; throws `IllegalArgumentException` if ≤ 0 |
| `isTrimRunning()` | `true` while trim is executing; volatile, may change at any time |
| `addSignal(Semaphore)` | Registers an external semaphore for thread-decoupled notification |
| `removeSignal(Semaphore)` | Removes a registered semaphore |
| `addTrimStartSignal(Semaphore)` | Registers a semaphore released when trim starts |
| `removeTrimStartSignal(Semaphore)` | Removes a trim-start semaphore |
| `addTrimEndSignal(Semaphore)` | Registers a semaphore released when trim ends |
| `removeTrimEndSignal(Semaphore)` | Removes a trim-end semaphore |
| `blockDataAvailable()` | **Deprecated.** Blocks until at least one byte is available |

### Static Validation Methods
Expand Down Expand Up @@ -216,7 +298,7 @@ mvn org.pitest:pitest-maven:mutationCoverage

## Testing

Tests are in `StreamBufferTest` using JUnit 4 with `DataProviderRunner` from `junit-dataprovider`. Most behavioral tests are parameterized across three write strategies:
Tests are in `StreamBufferTest` using JUnit 5 (JUnit Jupiter). Most behavioral tests are parameterized across three write strategies:

| `WriteMethod` | Description |
|---------------|-------------|
Expand All @@ -242,8 +324,15 @@ Test coverage includes:
- `addSignal(null)` throwing `NullPointerException`
- Thread-decoupled signal barrier — observer wakes in its own thread
- `correctOffsetAndLengthToRead` and `correctOffsetAndLengthToWrite` — all branches including integer overflow
- `getBufferSize()` on an empty buffer
- `getBufferSize()` and `getBufferElementCount()` on an empty buffer
- `blockDataAvailable()` with data written before and after the call
- Statistics tracking: `getTotalBytesWritten`, `getTotalBytesRead`, `getMaxObservedBytes` — user I/O only, excluding internal trim operations
- `setMaxAllocationSize` / `getMaxAllocationSize` — boundary values, trim with chunked allocation
- `isTrimRunning()` flag transitions during concurrent trim execution
- Trim observer signals (`addTrimStartSignal`, `addTrimEndSignal`) — semaphore released at correct lifecycle points
- Configuration changes during active trim (`setMaxBufferElements`, `setMaxAllocationSize`) — verified not to affect running trim
- Concurrent close during active trim — no exceptions or deadlock
- `decideTrimExecution` pure function — comprehensive table-driven tests covering all boundary conditions and the smart-skip edge case

## License

Expand Down
Loading