From bf489f85e32cb1c0e81c037a8cd869519e51be70 Mon Sep 17 00:00:00 2001 From: Jes Bak Hansen Date: Fri, 4 Jun 2021 10:55:21 +0200 Subject: [PATCH] Use both ends of the pipe in parallel By splitting the pipe into the writing end and reading end it seems to be able to slightly boost performance. On my system it benchmarks as roughly ~10% faster to have the file being parsed while it is loading in. ``` ini BenchmarkDotNet=v0.13.0, OS=Windows 10.0.19042.985 (20H2/October2020Update) AMD Ryzen 7 5800X, 1 CPU, 16 logical and 8 physical cores .NET SDK=5.0.203 [Host] : .NET 5.0.6 (5.0.621.22011), X64 RyuJIT DefaultJob : .NET 5.0.6 (5.0.621.22011), X64 RyuJIT ``` | Method | Mean | Error | StdDev | Rank | Gen 0 | Gen 1 | Gen 2 | Allocated | |------------ |----------:|---------:|---------:|-----:|----------:|----------:|----------:|----------:| | FullPipe | 51.77 ms | 0.363 ms | 0.303 ms | 1 | 1000.0000 | 500.0000 | - | 17 MB | | PipeLines | 56.97 ms | 0.400 ms | 0.355 ms | 2 | 1000.0000 | 444.4444 | - | 17 MB | | CsvHelper | 136.83 ms | 1.827 ms | 1.709 ms | 3 | 5250.0000 | 2250.0000 | 1500.0000 | 77 MB | | AsyncStream | 153.04 ms | 2.995 ms | 4.483 ms | 4 | 4250.0000 | 2500.0000 | 750.0000 | 64 MB | --- src/FileIO/WithPipeLines.cs | 31 ++++++++++++++++++++++++--- tests/FileIO.Benchmarks/FileIOTest.cs | 17 +++++++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/src/FileIO/WithPipeLines.cs b/src/FileIO/WithPipeLines.cs index 5a49eb4..a33a0e7 100644 --- a/src/FileIO/WithPipeLines.cs +++ b/src/FileIO/WithPipeLines.cs @@ -23,6 +23,25 @@ public async Task ProcessFileAsync(string filePath, Employee[] employeeReco if (!File.Exists(filePath)) return position; await using var fileStream = File.OpenRead(filePath); var pipeReader = PipeReader.Create(fileStream); + return await ReadFromPipe(pipeReader, employeeRecords, position); + } + + public async Task ProcessWithFullPipeAsync(string filePath, Employee[] employeeRecords) + { + var position = 0; + if (!File.Exists(filePath)) return position; + await using var fileStream = File.OpenRead(filePath); + Pipe p = new(); + var fillPipe = FillPipe(fileStream, p.Writer); + var readPipe = ReadFromPipe(p.Reader, employeeRecords, position); + await Task.WhenAll(fillPipe, readPipe); + + return await readPipe; + } + + private static async Task ReadFromPipe(PipeReader pipeReader, Employee[] employeeRecords, int position) + { + int pos = position; while (true) { var fileData = await pipeReader.ReadAsync(); @@ -30,7 +49,7 @@ public async Task ProcessFileAsync(string filePath, Employee[] employeeReco // convert to Buffer var fileDataBuffer = fileData.Buffer; - var sequencePosition = ParseLines(employeeRecords, fileDataBuffer, ref position); + var sequencePosition = ParseLines(employeeRecords, fileDataBuffer, ref pos); pipeReader.AdvanceTo(sequencePosition, fileDataBuffer.End); @@ -40,8 +59,14 @@ public async Task ProcessFileAsync(string filePath, Employee[] employeeReco } } - await pipeReader.CompleteAsync(); // marking pipereader as Completed - return position; + await pipeReader.CompleteAsync(); // marking pipe reader as Completed + return pos; + } + + private async Task FillPipe(FileStream fileStream, PipeWriter writer) + { + await fileStream.CopyToAsync(writer.AsStream()); + await writer.CompleteAsync(); } private static SequencePosition ParseLines(Employee[] employeeRecords, in ReadOnlySequence buffer, ref int position) diff --git a/tests/FileIO.Benchmarks/FileIOTest.cs b/tests/FileIO.Benchmarks/FileIOTest.cs index 2d55254..4b97fc5 100644 --- a/tests/FileIO.Benchmarks/FileIOTest.cs +++ b/tests/FileIO.Benchmarks/FileIOTest.cs @@ -39,6 +39,23 @@ public async Task PipeLines() } } + [Benchmark] + public async Task FullPipe() + { + var pool = ArrayPool.Shared; + var employeeRecords = pool.Rent(100000); + var pipeLinesTest = new WithPipeLines(); + + try + { + await pipeLinesTest.ProcessWithFullPipeAsync(_filePath, employeeRecords); + } + finally + { + pool.Return(employeeRecords, clearArray: true); + } + } + [Benchmark] public async Task> AsyncStream() {