From 75f0abca28144d7cde500177c780b5821af25dab Mon Sep 17 00:00:00 2001 From: Matee Ullah Malik Date: Fri, 21 Nov 2025 15:03:00 +0000 Subject: [PATCH] Updates to use the orignal ZSTD compression via C Wrapper --- go.mod | 4 +- pkg/cascadekit/cascadekit_test.go | 10 ++-- pkg/cascadekit/ids.go | 29 +++++------- pkg/utils/utils.go | 77 +++++-------------------------- 4 files changed, 30 insertions(+), 90 deletions(-) diff --git a/go.mod b/go.mod index 17a511bc..ff0b4585 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ replace github.com/cosmos/cosmos-sdk => github.com/cosmos/cosmos-sdk v0.50.14 require ( cosmossdk.io/math v1.5.3 github.com/AlecAivazis/survey/v2 v2.3.7 + github.com/DataDog/zstd v1.5.7 github.com/LumeraProtocol/lumera v1.8.4 github.com/LumeraProtocol/rq-go v0.2.1 github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce @@ -24,7 +25,6 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 github.com/jmoiron/sqlx v1.4.0 github.com/json-iterator/go v1.1.12 - github.com/klauspost/compress v1.18.0 github.com/kolesa-team/go-webp v1.0.4 github.com/mattn/go-sqlite3 v1.14.24 github.com/patrickmn/go-cache v2.1.0+incompatible @@ -60,7 +60,6 @@ require ( github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect github.com/99designs/keyring v1.2.2 // indirect github.com/DataDog/datadog-go v4.8.3+incompatible // indirect - github.com/DataDog/zstd v1.5.7 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect @@ -133,6 +132,7 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jmhodges/levigo v1.0.0 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect + github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/cpuid/v2 v2.2.10 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect diff --git a/pkg/cascadekit/cascadekit_test.go b/pkg/cascadekit/cascadekit_test.go index d3299705..ce07785f 100644 --- a/pkg/cascadekit/cascadekit_test.go +++ b/pkg/cascadekit/cascadekit_test.go @@ -5,7 +5,7 @@ import ( "testing" "github.com/LumeraProtocol/supernode/v2/pkg/codec" - "github.com/klauspost/compress/zstd" + "github.com/DataDog/zstd" ) func TestExtractIndexAndCreatorSig_Strict(t *testing.T) { @@ -32,9 +32,7 @@ func TestParseCompressedIndexFile_Strict(t *testing.T) { } payload := []byte(idxB64 + "." + base64.StdEncoding.EncodeToString([]byte("sig2")) + ".0") - enc, _ := zstd.NewWriter(nil) - defer enc.Close() - compressed := enc.EncodeAll(payload, nil) + compressed, _ := zstd.CompressLevel(nil, payload, 3) got, err := ParseCompressedIndexFile(compressed) if err != nil { @@ -45,12 +43,12 @@ func TestParseCompressedIndexFile_Strict(t *testing.T) { } // malformed: only two segments - compressedBad := enc.EncodeAll([]byte("a.b"), nil) + compressedBad, _ := zstd.CompressLevel(nil, []byte("a.b"), 3) if _, err := ParseCompressedIndexFile(compressedBad); err == nil { t.Fatalf("expected error for two segments") } // malformed: four segments - compressedBad4 := enc.EncodeAll([]byte("a.b.c.d"), nil) + compressedBad4, _ := zstd.CompressLevel(nil, []byte("a.b.c.d"), 3) if _, err := ParseCompressedIndexFile(compressedBad4); err == nil { t.Fatalf("expected error for four segments") } diff --git a/pkg/cascadekit/ids.go b/pkg/cascadekit/ids.go index bd9540c9..098c5ce8 100644 --- a/pkg/cascadekit/ids.go +++ b/pkg/cascadekit/ids.go @@ -7,7 +7,7 @@ import ( "github.com/LumeraProtocol/supernode/v2/pkg/errors" "github.com/LumeraProtocol/supernode/v2/pkg/utils" "github.com/cosmos/btcutil/base58" - "github.com/klauspost/compress/zstd" + "github.com/DataDog/zstd" ) // GenerateLayoutIDs computes IDs for redundant layout files (not the final index IDs). @@ -36,13 +36,6 @@ func generateIDFiles(base []byte, ic uint32, max uint32) (ids []string, files [] ids = make([]string, 0, max) var buffer bytes.Buffer - // Reuse a single zstd encoder across iterations - enc, zerr := zstd.NewWriter(nil) - if zerr != nil { - return ids, idFiles, errors.Errorf("compress identifiers file: %w", zerr) - } - defer enc.Close() - for i := uint32(0); i < max; i++ { buffer.Reset() counter := ic + i @@ -54,7 +47,11 @@ func generateIDFiles(base []byte, ic uint32, max uint32) (ids []string, files [] cnt := strconv.AppendUint(tmp[:0], uint64(counter), 10) buffer.Write(cnt) - compressedData := enc.EncodeAll(buffer.Bytes(), nil) + // Compress with official zstd C library at level 3 (matches SDK-JS) + compressedData, zerr := zstd.CompressLevel(nil, buffer.Bytes(), 3) + if zerr != nil { + return ids, idFiles, errors.Errorf("compress identifiers file: %w", zerr) + } idFiles = append(idFiles, compressedData) @@ -70,7 +67,7 @@ func generateIDFiles(base []byte, ic uint32, max uint32) (ids []string, files [] } // generateIDs computes base58(blake3(zstd(base + '.' + counter))) for counters ic..ic+max-1. -// It reuses a single zstd encoder and avoids per-iteration heap churn. +// Uses official zstd C library at level 3 to match SDK-JS compression. func generateIDs(base []byte, ic, max uint32) ([]string, error) { ids := make([]string, max) @@ -78,12 +75,6 @@ func generateIDs(base []byte, ic, max uint32) ([]string, error) { // Reserve base length + dot + up to 10 digits buffer.Grow(len(base) + 12) - enc, err := zstd.NewWriter(nil) - if err != nil { - return nil, errors.Errorf("zstd encoder init: %w", err) - } - defer enc.Close() - for i := uint32(0); i < max; i++ { buffer.Reset() buffer.Write(base) @@ -92,7 +83,11 @@ func generateIDs(base []byte, ic, max uint32) ([]string, error) { cnt := strconv.AppendUint(tmp[:0], uint64(ic+i), 10) buffer.Write(cnt) - compressed := enc.EncodeAll(buffer.Bytes(), nil) + // Compress with official zstd C library at level 3 (matches SDK-JS) + compressed, err := zstd.CompressLevel(nil, buffer.Bytes(), 3) + if err != nil { + return nil, errors.Errorf("zstd compress (i=%d): %w", i, err) + } h, err := utils.Blake3Hash(compressed) if err != nil { return nil, errors.Errorf("blake3 hash (i=%d): %w", i, err) diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 555f0936..720b2f87 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -8,7 +8,6 @@ import ( "encoding/base64" "encoding/binary" "fmt" - "io" "log" "math" "math/big" @@ -16,7 +15,6 @@ import ( "os" "os/exec" "path/filepath" - "runtime" "strconv" "strings" "time" @@ -24,7 +22,7 @@ import ( "github.com/LumeraProtocol/supernode/v2/pkg/errors" "golang.org/x/sync/semaphore" - "github.com/klauspost/compress/zstd" + "github.com/DataDog/zstd" ) const ( @@ -262,49 +260,22 @@ func Compress(data []byte, level int) ([]byte, error) { return nil, fmt.Errorf("invalid compression level: %d - allowed levels are 1 - 4", level) } - numCPU := runtime.NumCPU() - // Create a buffer to store compressed data - var compressedData bytes.Buffer - - // Create a new Zstd encoder with concurrency set to the number of CPU cores - encoder, err := zstd.NewWriter(&compressedData, zstd.WithEncoderConcurrency(numCPU), zstd.WithEncoderLevel(zstd.EncoderLevel(level))) - if err != nil { - return nil, fmt.Errorf("failed to create Zstd encoder: %v", err) - } - - // Perform the compression - _, err = io.Copy(encoder, bytes.NewReader(data)) + compressed, err := zstd.CompressLevel(nil, data, level) if err != nil { return nil, fmt.Errorf("failed to compress data: %v", err) } - // Close the encoder to flush any remaining data - if err := encoder.Close(); err != nil { - return nil, fmt.Errorf("failed to close encoder: %v", err) - } - - return compressedData.Bytes(), nil + return compressed, nil } // Decompress decompresses the data func Decompress(data []byte) ([]byte, error) { - // Get the number of CPU cores available - numCPU := runtime.NumCPU() - - // Create a new Zstd decoder with concurrency set to the number of CPU cores - decoder, err := zstd.NewReader(bytes.NewReader(data), zstd.WithDecoderConcurrency(numCPU)) - if err != nil { - return nil, fmt.Errorf("failed to create Zstd decoder: %v", err) - } - defer decoder.Close() - - // Perform the decompression - decompressedData, err := io.ReadAll(decoder) + decompressed, err := zstd.Decompress(nil, data) if err != nil { return nil, fmt.Errorf("failed to decompress data: %v", err) } - return decompressedData, nil + return decompressed, nil } // RandomDuration returns a random duration between min and max @@ -319,28 +290,20 @@ func RandomDuration(min, max int) time.Duration { } func ZstdCompress(data []byte) ([]byte, error) { - encoder, err := zstd.NewWriter(nil) + compressed, err := zstd.CompressLevel(nil, data, 3) if err != nil { - return nil, fmt.Errorf("failed to create zstd encoder: %v", err) + return nil, fmt.Errorf("failed to compress with zstd: %v", err) } - defer encoder.Close() - - return encoder.EncodeAll(data, nil), nil + return compressed, nil } func ZstdDecompress(data []byte) ([]byte, error) { - decoder, err := zstd.NewReader(nil) - if err != nil { - return nil, fmt.Errorf("failed to create zstd decoder: %v", err) - } - defer decoder.Close() - - decoded, err := decoder.DecodeAll(data, nil) + decompressed, err := zstd.Decompress(nil, data) if err != nil { return nil, fmt.Errorf("failed to decompress zstd data: %v", err) } - return decoded, nil + return decompressed, nil } // HighCompress compresses the data @@ -354,28 +317,12 @@ func HighCompress(cctx context.Context, data []byte) ([]byte, error) { } defer sem.Release(semaphoreWeight) // Ensure that the semaphore is always released - numCPU := runtime.NumCPU() - // Create a buffer to store compressed data - var compressedData bytes.Buffer - - // Create a new Zstd encoder with concurrency set to the number of CPU cores - encoder, err := zstd.NewWriter(&compressedData, zstd.WithEncoderConcurrency(numCPU), zstd.WithEncoderLevel(zstd.EncoderLevel(highCompressionLevel))) - if err != nil { - return nil, fmt.Errorf("failed to create Zstd encoder: %v", err) - } - - // Perform the compression - _, err = io.Copy(encoder, bytes.NewReader(data)) + compressed, err := zstd.CompressLevel(nil, data, highCompressionLevel) if err != nil { return nil, fmt.Errorf("failed to compress data: %v", err) } - // Close the encoder to flush any remaining data - if err := encoder.Close(); err != nil { - return nil, fmt.Errorf("failed to close encoder: %v", err) - } - - return compressedData.Bytes(), nil + return compressed, nil } // LoadSymbols takes a directory path and a map where keys are filenames. It reads each file in the directory