From 405e268380e1dfdc3f058ce2b8689232ab06818d Mon Sep 17 00:00:00 2001 From: Matee Ullah Malik Date: Fri, 21 Nov 2025 12:15:33 +0000 Subject: [PATCH 1/2] Add ZSTD wrapper library --- go.mod | 4 ++-- x/action/v1/keeper/crypto.go | 38 ++++++++---------------------------- 2 files changed, 10 insertions(+), 32 deletions(-) diff --git a/go.mod b/go.mod index 1705ee75..7503091e 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( cosmossdk.io/x/upgrade v0.2.0 github.com/CosmWasm/wasmd v0.55.0-ibc2.0 github.com/CosmWasm/wasmvm/v3 v3.0.0-ibc2.0 + github.com/DataDog/zstd v1.5.7 github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce github.com/cometbft/cometbft v0.38.18 github.com/cosmos/btcutil v1.0.5 @@ -42,7 +43,6 @@ require ( github.com/golang/protobuf v1.5.4 github.com/gorilla/mux v1.8.1 github.com/grpc-ecosystem/grpc-gateway v1.16.0 - github.com/klauspost/compress v1.18.0 github.com/pkg/errors v0.9.1 github.com/spf13/cobra v1.10.1 github.com/spf13/pflag v1.0.10 @@ -97,7 +97,6 @@ require ( github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c // indirect github.com/Crocmagnon/fatcontext v0.7.1 // indirect github.com/DataDog/datadog-go v4.8.3+incompatible // indirect - github.com/DataDog/zstd v1.5.7 // indirect github.com/Djarvur/go-err113 v0.0.0-20210108212216-aea10b59be24 // indirect github.com/GaijinEntertainment/go-exhaustruct/v3 v3.3.1 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.29.0 // indirect @@ -278,6 +277,7 @@ require ( github.com/karamaru-alpha/copyloopvar v1.2.1 // indirect github.com/kisielk/errcheck v1.9.0 // indirect github.com/kkHAIKE/contextcheck v1.1.6 // indirect + github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/cpuid/v2 v2.2.10 // indirect github.com/klauspost/pgzip v1.2.6 // indirect github.com/kr/pretty v0.3.1 // indirect diff --git a/x/action/v1/keeper/crypto.go b/x/action/v1/keeper/crypto.go index d9416ddb..a0ef0089 100644 --- a/x/action/v1/keeper/crypto.go +++ b/x/action/v1/keeper/crypto.go @@ -1,27 +1,23 @@ package keeper import ( - "bytes" "context" "crypto/rand" "encoding/asn1" "encoding/base64" "encoding/json" "fmt" - "io" - "runtime" + "math/big" "sort" "time" "golang.org/x/sync/semaphore" - "math/big" - errorsmod "cosmossdk.io/errors" + "github.com/DataDog/zstd" actiontypes "github.com/LumeraProtocol/lumera/x/action/v1/types" "github.com/cosmos/btcutil/base58" sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/klauspost/compress/zstd" "lukechampine.com/blake3" ) @@ -183,15 +179,13 @@ func CreateKademliaID(signatures string, counter uint64) (string, error) { return base58.Encode(hashedData[:]), nil } -// ZstdCompress Helper function for zstd compression +// ZstdCompress Helper function for zstd compression using official C library at level 3 func ZstdCompress(data []byte) ([]byte, error) { - encoder, err := zstd.NewWriter(nil) + compressed, err := zstd.CompressLevel(nil, data, 3) if err != nil { - return nil, fmt.Errorf("failed to create zstd encoder: %v", err) + return nil, fmt.Errorf("failed to compress with zstd: %v", err) } - defer encoder.Close() - - return encoder.EncodeAll(data, nil), nil + return compressed, nil } func HighCompress(data []byte) ([]byte, error) { @@ -206,28 +200,12 @@ func HighCompress(data []byte) ([]byte, error) { } defer sem.Release(semaphoreWeight) // Ensure that the semaphore is always released - numCPU := runtime.NumCPU() - // Create a buffer to store compressed data - var compressedData bytes.Buffer - - // Create a new Zstd encoder with concurrency set to the number of CPU cores - encoder, err := zstd.NewWriter(&compressedData, zstd.WithEncoderConcurrency(numCPU), zstd.WithEncoderLevel(zstd.EncoderLevel(highCompressionLevel))) - if err != nil { - return nil, fmt.Errorf("failed to create Zstd encoder: %v", err) - } - - // Perform the compression - _, err = io.Copy(encoder, bytes.NewReader(data)) + compressed, err := zstd.CompressLevel(nil, data, highCompressionLevel) if err != nil { return nil, fmt.Errorf("failed to compress data: %v", err) } - // Close the encoder to flush any remaining data - if err := encoder.Close(); err != nil { - return nil, fmt.Errorf("failed to close encoder: %v", err) - } - - return compressedData.Bytes(), nil + return compressed, nil } // --- DER → 64-byte r||s --- From 12ad0eea58f76a3a05c61bcf059584be4b9eba6d Mon Sep 17 00:00:00 2001 From: Matee ullah Date: Sun, 23 Nov 2025 01:40:39 +0500 Subject: [PATCH 2/2] fix sempahore usage --- x/action/v1/keeper/crypto.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/x/action/v1/keeper/crypto.go b/x/action/v1/keeper/crypto.go index a0ef0089..f1666c0f 100644 --- a/x/action/v1/keeper/crypto.go +++ b/x/action/v1/keeper/crypto.go @@ -23,11 +23,13 @@ import ( const ( semaphoreWeight = 1 - maxParallelHighCompressCalls = 5 + maxParallelHighCompressCalls = 10 highCompressionLevel = 4 highCompressTimeout = 30 * time.Minute ) +var highCompressSem = semaphore.NewWeighted(maxParallelHighCompressCalls) + // VerifySignature verifies that a signature is valid for given data and signer. // // The function performs these validation steps: @@ -192,13 +194,11 @@ func HighCompress(data []byte) ([]byte, error) { ctx, cancel := context.WithTimeout(context.Background(), highCompressTimeout) defer cancel() - sem := semaphore.NewWeighted(maxParallelHighCompressCalls) - - // Acquire the semaphore. This will block if 5 other goroutines are already inside this function. - if err := sem.Acquire(ctx, semaphoreWeight); err != nil { + // limit total concurrent high-compress calls across the process + if err := highCompressSem.Acquire(ctx, semaphoreWeight); err != nil { return nil, fmt.Errorf("failed to acquire semaphore: %v", err) } - defer sem.Release(semaphoreWeight) // Ensure that the semaphore is always released + defer highCompressSem.Release(semaphoreWeight) compressed, err := zstd.CompressLevel(nil, data, highCompressionLevel) if err != nil {