From d24dbd544a497335308671cfe1e656a877c24d15 Mon Sep 17 00:00:00 2001 From: Mamy Ratsimbazafy Date: Wed, 27 Nov 2024 16:17:27 +0000 Subject: [PATCH 1/3] fix(MSM): properly handle edge condition in parallel MSM when bits is exactly divided by c --- .../elliptic/ec_multi_scalar_mul_parallel.nim | 87 ++++++++----------- .../t_ec_shortw_jac_g1_msm_parallel.nim | 2 +- .../t_ec_shortw_prj_g1_msm_parallel.nim | 2 +- 3 files changed, 36 insertions(+), 55 deletions(-) diff --git a/constantine/math/elliptic/ec_multi_scalar_mul_parallel.nim b/constantine/math/elliptic/ec_multi_scalar_mul_parallel.nim index 82821866a..ba57b5bd2 100644 --- a/constantine/math/elliptic/ec_multi_scalar_mul_parallel.nim +++ b/constantine/math/elliptic/ec_multi_scalar_mul_parallel.nim @@ -185,32 +185,22 @@ proc msmImpl_vartime_parallel[bits: static int, EC, ECaff]( # Last window is done sync on this thread, directly initializing r const excess = bits mod c const top = bits-excess - - when top != 0: - when excess != 0: - bucketAccumReduce_withInit( - r, - bucketsMatrix[numFullWindows*numBuckets].addr, - bitIndex = top, kTopWindow, c, - coefs, points, N) - else: - r[].setNeutral() - - # 3. Final reduction, r initialized to what would be miniMSMsReady[numWindows-1] - when excess != 0: - for w in countdown(numWindows-2, 0): - for _ in 0 ..< c: - r[].double() - discard sync miniMSMsReady[w] - r[] ~+= miniMSMsResults[w] - elif numWindows >= 2: - discard sync miniMSMsReady[numWindows-2] - r[] = miniMSMsResults[numWindows-2] - for w in countdown(numWindows-3, 0): - for _ in 0 ..< c: - r[].double() - discard sync miniMSMsReady[w] - r[] ~+= miniMSMsResults[w] + const msmKind = if top == 0: kBottomWindow + elif excess == 0: kFullWindow + else: kTopWindow + + bucketAccumReduce_withInit( + r, + bucketsMatrix[numFullWindows*numBuckets].addr, + bitIndex = top, msmKind, c, + coefs, points, N) + + # 3. Final reduction + for w in countdown(numFullWindows-1, 0): + for _ in 0 ..< c: + r[].double() + discard sync miniMSMsReady[w] + r[] ~+= miniMSMsResults[w] # Cleanup # ------- @@ -371,33 +361,24 @@ proc msmAffine_vartime_parallel[bits: static int, EC, ECaff]( # Last window is done sync on this thread, directly initializing r const excess = bits mod c const top = bits-excess - - when top != 0: - when excess != 0: - let buckets = allocHeapArray(EC, numBuckets) - for i in 0 ..< numBuckets: - buckets[i].setNeutral() - r[].bucketAccumReduce(buckets, bitIndex = top, kTopWindow, c, - coefs, points, N) - buckets.freeHeap() - else: - r[].setNeutral() - - # 2. Final reduction with latency hiding, r initialized to what would be miniMSMsReady[numWindows-1] - when excess != 0: - for w in countdown(numWindows-2, 0): - for _ in 0 ..< c: - r[].double() - discard sync miniMSMsReady[w] - r[] ~+= miniMSMsResults[w] - elif numWindows >= 2: - discard sync miniMSMsReady[numWindows-2] - r[] = miniMSMsResults[numWindows-2] - for w in countdown(numWindows-3, 0): - for _ in 0 ..< c: - r[].double() - discard sync miniMSMsReady[w] - r[] ~+= miniMSMsResults[w] + const msmKind = if top == 0: kBottomWindow + elif excess == 0: kFullWindow + else: kTopWindow + + let buckets = allocHeapArray(EC, numBuckets) + bucketAccumReduce_withInit( + r, + buckets, + bitIndex = top, msmKind, c, + coefs, points, N) + freeHeapAligned(buckets) + + # 3. Final reduction + for w in countdown(numFullWindows-1, 0): + for _ in 0 ..< c: + r[].double() + discard sync miniMSMsReady[w] + r[] ~+= miniMSMsResults[w] # Cleanup # ------- diff --git a/tests/parallel/t_ec_shortw_jac_g1_msm_parallel.nim b/tests/parallel/t_ec_shortw_jac_g1_msm_parallel.nim index fe2604697..6530a331a 100644 --- a/tests/parallel/t_ec_shortw_jac_g1_msm_parallel.nim +++ b/tests/parallel/t_ec_shortw_jac_g1_msm_parallel.nim @@ -14,7 +14,7 @@ import # Test utilities ./t_ec_template_parallel -const numPoints = [1, 2, 8, 16, 32, 64, 128, 1024, 2048, 16384] # 32768, 262144, 1048576] +const numPoints = [1, 2, 3, 4, 5, 6, 7, 8, 16, 32, 64, 128, 1024, 2048, 16384] # 32768, 262144, 1048576] run_EC_multi_scalar_mul_parallel_impl( ec = EC_ShortW_Jac[Fp[BN254_Snarks], G1], diff --git a/tests/parallel/t_ec_shortw_prj_g1_msm_parallel.nim b/tests/parallel/t_ec_shortw_prj_g1_msm_parallel.nim index 5c307ecde..54f7d66e6 100644 --- a/tests/parallel/t_ec_shortw_prj_g1_msm_parallel.nim +++ b/tests/parallel/t_ec_shortw_prj_g1_msm_parallel.nim @@ -14,7 +14,7 @@ import # Test utilities ./t_ec_template_parallel -const numPoints = [1, 2, 8, 16, 128, 1024, 2048, 16384] # 32768, 262144, 1048576] +const numPoints = [1, 2, 3, 4, 5, 6, 7, 8, 16, 128, 1024, 2048, 16384] # 32768, 262144, 1048576] run_EC_multi_scalar_mul_parallel_impl( ec = EC_ShortW_Prj[Fp[BN254_Snarks], G1], From 0aeb09ae479d84327ee8bfeaad038314c522400e Mon Sep 17 00:00:00 2001 From: Mamy Ratsimbazafy Date: Thu, 28 Nov 2024 09:53:03 +0000 Subject: [PATCH 2/3] fix some sanitizer warnings --- benchmarks/bench_eth_eip4844_kzg.nim | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/benchmarks/bench_eth_eip4844_kzg.nim b/benchmarks/bench_eth_eip4844_kzg.nim index 07e38ea7c..0535f4dca 100644 --- a/benchmarks/bench_eth_eip4844_kzg.nim +++ b/benchmarks/bench_eth_eip4844_kzg.nim @@ -74,6 +74,7 @@ proc benchBlobToKzgCommitment(b: BenchSet, ctx: ptr EthereumKZGContext, iters: i ## We require `tp` to be unintialized as even idle threads somehow reduce perf of serial benches let tp = Threadpool.new() + let numThreads = tp.numThreads let startParallel = getMonotime() block: @@ -88,7 +89,7 @@ proc benchBlobToKzgCommitment(b: BenchSet, ctx: ptr EthereumKZGContext, iters: i let perfParallel = inNanoseconds((stopParallel-startParallel) div iters) let parallelSpeedup = float(perfSerial) / float(perfParallel) - echo &"Speedup ratio parallel {tp.numThreads} threads over serial: {parallelSpeedup:>6.3f}x" + echo &"Speedup ratio parallel {numThreads} threads over serial: {parallelSpeedup:>6.3f}x" proc benchComputeKzgProof(b: BenchSet, ctx: ptr EthereumKZGContext, iters: int) = @@ -102,6 +103,7 @@ proc benchComputeKzgProof(b: BenchSet, ctx: ptr EthereumKZGContext, iters: int) ## We require `tp` to be unintialized as even idle threads somehow reduce perf of serial benches let tp = Threadpool.new() + let numThreads = tp.numThreads let startParallel = getMonotime() block: @@ -117,7 +119,7 @@ proc benchComputeKzgProof(b: BenchSet, ctx: ptr EthereumKZGContext, iters: int) let perfParallel = inNanoseconds((stopParallel-startParallel) div iters) let parallelSpeedup = float(perfSerial) / float(perfParallel) - echo &"Speedup ratio parallel {tp.numThreads} threads over serial: {parallelSpeedup:>6.3f}x" + echo &"Speedup ratio parallel {numThreads} threads over serial: {parallelSpeedup:>6.3f}x" proc benchComputeBlobKzgProof(b: BenchSet, ctx: ptr EthereumKZGContext, iters: int) = @@ -130,6 +132,7 @@ proc benchComputeBlobKzgProof(b: BenchSet, ctx: ptr EthereumKZGContext, iters: i ## We require `tp` to be unintialized as even idle threads somehow reduce perf of serial benches let tp = Threadpool.new() + let numThreads = tp.numThreads let startParallel = getMonotime() block: @@ -144,7 +147,7 @@ proc benchComputeBlobKzgProof(b: BenchSet, ctx: ptr EthereumKZGContext, iters: i let perfParallel = inNanoseconds((stopParallel-startParallel) div iters) let parallelSpeedup = float(perfSerial) / float(perfParallel) - echo &"Speedup ratio parallel {tp.numThreads} threads over serial: {parallelSpeedup:>6.3f}x" + echo &"Speedup ratio parallel {numThreads} threads over serial: {parallelSpeedup:>6.3f}x" proc benchVerifyKzgProof(b: BenchSet, ctx: ptr EthereumKZGContext, iters: int) = @@ -163,6 +166,7 @@ proc benchVerifyBlobKzgProof(b: BenchSet, ctx: ptr EthereumKZGContext, iters: in ## We require `tp` to be unintialized as even idle threads somehow reduce perf of serial benches let tp = Threadpool.new() + let numThreads = tp.numThreads let startParallel = getMonotime() block: @@ -176,7 +180,7 @@ proc benchVerifyBlobKzgProof(b: BenchSet, ctx: ptr EthereumKZGContext, iters: in let perfParallel = inNanoseconds((stopParallel-startParallel) div iters) let parallelSpeedup = float(perfSerial) / float(perfParallel) - echo &"Speedup ratio parallel {tp.numThreads} threads over serial: {parallelSpeedup:>6.3f}x" + echo &"Speedup ratio parallel {numThreads} threads over serial: {parallelSpeedup:>6.3f}x" proc benchVerifyBlobKzgProofBatch(b: BenchSet, ctx: ptr EthereumKZGContext, iters: int) = @@ -201,6 +205,7 @@ proc benchVerifyBlobKzgProofBatch(b: BenchSet, ctx: ptr EthereumKZGContext, iter ## We require `tp` to be unintialized as even idle threads somehow reduce perf of serial benches let tp = Threadpool.new() + let numTHreads = tp.numThreads let startParallel = getMonotime() block: @@ -220,7 +225,7 @@ proc benchVerifyBlobKzgProofBatch(b: BenchSet, ctx: ptr EthereumKZGContext, iter let perfParallel = inNanoseconds((stopParallel-startParallel) div iters) let parallelSpeedup = float(perfSerial) / float(perfParallel) - echo &"Speedup ratio parallel {tp.numThreads} threads over serial: {parallelSpeedup:>6.3f}x" + echo &"Speedup ratio parallel {numThreads} threads over serial: {parallelSpeedup:>6.3f}x" echo "" i *= 2 @@ -258,7 +263,7 @@ proc main() = echo "" benchVerifyBlobKzgProofBatch(b, ctx, Iters) separator() - + ctx.trusted_setup_delete() when isMainModule: main() From 1b3a486d2c3f2d98e139a9f592c81cf459052c3a Mon Sep 17 00:00:00 2001 From: mMay Date: Tue, 3 Dec 2024 11:00:02 +0100 Subject: [PATCH 3/3] windows: match the aligned alloc to avoid Windows crash, see https://github.com/mratsim/constantine/pull/485/commits/463414cfcd382cbcf104e589d2ac0e3fdd89f331 --- .../math/elliptic/ec_multi_scalar_mul.nim | 32 ++++++------- .../elliptic/ec_multi_scalar_mul_parallel.nim | 48 +++++++++---------- 2 files changed, 40 insertions(+), 40 deletions(-) diff --git a/constantine/math/elliptic/ec_multi_scalar_mul.nim b/constantine/math/elliptic/ec_multi_scalar_mul.nim index 25c1fc754..26611bfde 100644 --- a/constantine/math/elliptic/ec_multi_scalar_mul.nim +++ b/constantine/math/elliptic/ec_multi_scalar_mul.nim @@ -49,8 +49,8 @@ func multiScalarMulImpl_reference_vartime[bits: static int, EC, ECaff]( const numBuckets = 1 shl c - 1 # bucket 0 is unused const numWindows = bits.ceilDiv_vartime(c) - let miniMSMs = allocHeapArray(EC, numWindows) - let buckets = allocHeapArray(EC, numBuckets) + let miniMSMs = allocHeapArrayAligned(EC, numWindows, alignment = 64) + let buckets = allocHeapArrayAligned(EC, numBuckets, alignment = 64) # Algorithm # --------- @@ -91,8 +91,8 @@ func multiScalarMulImpl_reference_vartime[bits: static int, EC, ECaff]( # Cleanup # ------- - buckets.freeHeap() - miniMSMs.freeHeap() + buckets.freeHeapAligned() + miniMSMs.freeHeapAligned() func multiScalarMul_reference_dispatch_vartime[bits: static int, EC, ECaff]( r: var EC, @@ -151,7 +151,7 @@ func multiScalarMul_reference_vartime*[F, EC, ECaff]( coefs_big.batchFromField(coefs, n) r.multiScalarMul_reference_vartime(coefs_big, points, n) - freeHeapAligned(coefs_big) + coefs_big.freeHeapAligned() func multiScalarMul_reference_vartime*[EC, ECaff]( r: var EC, @@ -264,7 +264,7 @@ func msmImpl_vartime[bits: static int, EC, ECaff]( # ----- const numBuckets = 1 shl (c-1) - let buckets = allocHeapArray(EC, numBuckets) + let buckets = allocHeapArrayAligned(EC, numBuckets, alignment = 64) for i in 0 ..< numBuckets: buckets[i].setNeutral() @@ -293,7 +293,7 @@ func msmImpl_vartime[bits: static int, EC, ECaff]( # Cleanup # ------- - buckets.freeHeap() + buckets.freeHeapAligned() # Multi scalar multiplication with batched affine additions # ----------------------------------------------------------------------------------------------------------------------- @@ -357,8 +357,8 @@ func msmAffineImpl_vartime[bits: static int, EC, ECaff]( # Setup # ----- const (numBuckets, queueLen) = c.deriveSchedulerConstants() - let buckets = allocHeap(Buckets[numBuckets, EC, ECaff]) - let sched = allocHeap(Scheduler[numBuckets, queueLen, EC, ECaff]) + let buckets = allocHeapAligned(Buckets[numBuckets, EC, ECaff], alignment = 64) + let sched = allocHeapAligned(Scheduler[numBuckets, queueLen, EC, ECaff], alignment = 64) sched.init(points, buckets, 0, numBuckets.int32) # Algorithm @@ -389,8 +389,8 @@ func msmAffineImpl_vartime[bits: static int, EC, ECaff]( # Cleanup # ------- - sched.freeHeap() - buckets.freeHeap() + sched.freeHeapAligned() + buckets.freeHeapAligned() # Endomorphism acceleration # ----------------------------------------------------------------------------------------------------------------------- @@ -410,8 +410,8 @@ proc applyEndomorphism[bits: static int, ECaff]( else: ECaff.G const L = ECaff.getScalarField().bits().computeEndoRecodedLength(M) - let splitCoefs = allocHeapArray(array[M, BigInt[L]], N) - let endoBasis = allocHeapArray(array[M, ECaff], N) + let splitCoefs = allocHeapArrayAligned(array[M, BigInt[L]], N, alignment = 64) + let endoBasis = allocHeapArrayAligned(array[M, ECaff], N, alignment = 64) for i in 0 ..< N: var negatePoints {.noinit.}: array[M, SecretBool] @@ -448,8 +448,8 @@ template withEndo[coefsBits: static int, EC, ECaff]( # Given that bits and N changed, we are able to use a bigger `c` # but it has no significant impact on performance msmProc(r, endoCoefs, endoPoints, endoN, c) - freeHeap(endoCoefs) - freeHeap(endoPoints) + endoCoefs.freeHeapAligned() + endoPoints.freeHeapAligned() else: msmProc(r, coefs, points, N, c) @@ -555,7 +555,7 @@ func multiScalarMul_vartime*[F, EC, ECaff]( coefs_big.batchFromField(coefs, n) r.multiScalarMul_vartime(coefs_big, points, n) - freeHeapAligned(coefs_big) + coefs_big.freeHeapAligned() func multiScalarMul_vartime*[EC, ECaff]( r: var EC, diff --git a/constantine/math/elliptic/ec_multi_scalar_mul_parallel.nim b/constantine/math/elliptic/ec_multi_scalar_mul_parallel.nim index ba57b5bd2..a33510576 100644 --- a/constantine/math/elliptic/ec_multi_scalar_mul_parallel.nim +++ b/constantine/math/elliptic/ec_multi_scalar_mul_parallel.nim @@ -160,10 +160,10 @@ proc msmImpl_vartime_parallel[bits: static int, EC, ECaff]( # Instead of storing the result in futures, risking them being scattered in memory # we store them in a contiguous array, and the synchronizing future just returns a bool. # top window is done on this thread - let miniMSMsResults = allocHeapArray(EC, numFullWindows) + let miniMSMsResults = allocHeapArrayAligned(EC, numFullWindows, alignment = 64) let miniMSMsReady = allocStackArray(FlowVar[bool], numFullWindows) - let bucketsMatrix = allocHeapArray(EC, numBuckets*numWindows) + let bucketsMatrix = allocHeapArrayAligned(EC, numBuckets*numWindows, alignment = 64) # Algorithm # --------- @@ -204,8 +204,8 @@ proc msmImpl_vartime_parallel[bits: static int, EC, ECaff]( # Cleanup # ------- - miniMSMsResults.freeHeap() - bucketsMatrix.freeHeap() + miniMSMsResults.freeHeapAligned() + bucketsMatrix.freeHeapAligned() # Parallel MSM Affine - bucket accumulation # ----------------------------------------- @@ -218,8 +218,8 @@ proc bucketAccumReduce_serial[bits: static int, EC, ECaff]( N: int) = const (numBuckets, queueLen) = c.deriveSchedulerConstants() - let buckets = allocHeap(Buckets[numBuckets, EC, ECaff]) - let sched = allocHeap(Scheduler[numBuckets, queueLen, EC, ECaff]) + let buckets = allocHeapAligned(Buckets[numBuckets, EC, ECaff], alignment = 64) + let sched = allocHeapAligned(Scheduler[numBuckets, queueLen, EC, ECaff], alignment = 64) sched.init(points, buckets, 0, numBuckets.int32) # 1. Bucket Accumulation @@ -230,8 +230,8 @@ proc bucketAccumReduce_serial[bits: static int, EC, ECaff]( # Cleanup # ---------------- - sched.freeHeap() - buckets.freeHeap() + sched.freeHeapAligned() + buckets.freeHeapAligned() proc bucketAccumReduce_parallel[bits: static int, EC, ECaff]( tp: Threadpool, @@ -253,8 +253,8 @@ proc bucketAccumReduce_parallel[bits: static int, EC, ECaff]( let chunkSize = int32(numBuckets) shr log2_vartime(cast[uint32](numChunks)) # Both are power of 2 so exact division let chunksReadiness = allocStackArray(FlowVar[bool], numChunks-1) # Last chunk is done on this thread - let buckets = allocHeap(Buckets[numBuckets, EC, ECaff]) - let scheds = allocHeapArray(Scheduler[numBuckets, queueLen, EC, ECaff], numChunks) + let buckets = allocHeapAligned(Buckets[numBuckets, EC, ECaff], alignment = 64) + let scheds = allocHeapArrayAligned(Scheduler[numBuckets, queueLen, EC, ECaff], numChunks, alignment = 64) block: # 1. Bucket Accumulation for chunkID in 0'i32 ..< numChunks-1: @@ -307,8 +307,8 @@ proc bucketAccumReduce_parallel[bits: static int, EC, ECaff]( # Cleanup # ---------------- - scheds.freeHeap() - buckets.freeHeap() + scheds.freeHeapAligned() + buckets.freeHeapAligned() # Parallel MSM Affine - window-level only # --------------------------------------- @@ -328,7 +328,7 @@ proc msmAffine_vartime_parallel[bits: static int, EC, ECaff]( # Instead of storing the result in futures, risking them being scattered in memory # we store them in a contiguous array, and the synchronizing future just returns a bool. # top window is done on this thread - let miniMSMsResults = allocHeapArray(EC, numFullWindows) + let miniMSMsResults = allocHeapArrayAligned(EC, numFullWindows, alignment = 64) let miniMSMsReady = allocStackArray(Flowvar[bool], numFullWindows) # Algorithm @@ -365,13 +365,13 @@ proc msmAffine_vartime_parallel[bits: static int, EC, ECaff]( elif excess == 0: kFullWindow else: kTopWindow - let buckets = allocHeapArray(EC, numBuckets) + let buckets = allocHeapArrayAligned(EC, numBuckets, alignment = 64) bucketAccumReduce_withInit( r, buckets, bitIndex = top, msmKind, c, coefs, points, N) - freeHeapAligned(buckets) + buckets.freeHeapAligned() # 3. Final reduction for w in countdown(numFullWindows-1, 0): @@ -382,7 +382,7 @@ proc msmAffine_vartime_parallel[bits: static int, EC, ECaff]( # Cleanup # ------- - miniMSMsResults.freeHeap() + miniMSMsResults.freeHeapAligned() proc msmAffine_vartime_parallel_split[bits: static int, EC, ECaff]( tp: Threadpool, @@ -410,7 +410,7 @@ proc msmAffine_vartime_parallel_split[bits: static int, EC, ECaff]( return let chunkingDescriptor = balancedChunksPrioNumber(0, N, msmParallelism) - let splitMSMsResults = allocHeapArray(typeof(r[]), msmParallelism-1) + let splitMSMsResults = allocHeapArrayAligned(typeof(r[]), msmParallelism-1, alignment = 64) let splitMSMsReady = allocStackArray(Flowvar[bool], msmParallelism-1) for (i, start, len) in items(chunkingDescriptor): @@ -429,7 +429,7 @@ proc msmAffine_vartime_parallel_split[bits: static int, EC, ECaff]( discard sync splitMSMsReady[i] r[] ~+= splitMSMsResults[i] - freeHeap(splitMSMsResults) + splitMSMsResults.freeHeapAligned() proc applyEndomorphism_parallel[bits: static int, ECaff]( tp: Threadpool, @@ -447,8 +447,8 @@ proc applyEndomorphism_parallel[bits: static int, ECaff]( else: ECaff.G const L = ECaff.getScalarField().bits().computeEndoRecodedLength(M) - let splitCoefs = allocHeapArray(array[M, BigInt[L]], N) - let endoBasis = allocHeapArray(array[M, ECaff], N) + let splitCoefs = allocHeapArrayAligned(array[M, BigInt[L]], N, alignment = 64) + let endoBasis = allocHeapArrayAligned(array[M, ECaff], N, alignment = 64) syncScope: tp.parallelFor i in 0 ..< N: @@ -489,8 +489,8 @@ template withEndo[coefsBits: static int, EC, ECaff]( # Given that bits and N changed, we are able to use a bigger `c` # but it has no significant impact on performance msmProc(tp, r, endoCoefs, endoPoints, endoN, c) - freeHeap(endoCoefs) - freeHeap(endoPoints) + endoCoefs.freeHeapAligned() + endoPoints.freeHeapAligned() else: msmProc(tp, r, coefs, points, N, c) @@ -512,8 +512,8 @@ template withEndo[coefsBits: static int, EC, ECaff]( # Given that bits and N changed, we are able to use a bigger `c` # but it has no significant impact on performance msmProc(tp, r, endoCoefs, endoPoints, endoN, c, useParallelBuckets) - freeHeap(endoCoefs) - freeHeap(endoPoints) + endoCoefs.freeHeapAligned() + endoPoints.freeHeapAligned() else: msmProc(tp, r, coefs, points, N, c, useParallelBuckets)