Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ replace github.com/cosmos/cosmos-sdk => github.com/cosmos/cosmos-sdk v0.50.14
require (
cosmossdk.io/math v1.5.3
github.com/AlecAivazis/survey/v2 v2.3.7
github.com/DataDog/zstd v1.5.7
github.com/LumeraProtocol/lumera v1.8.4
github.com/LumeraProtocol/rq-go v0.2.1
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce
Expand All @@ -24,7 +25,6 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3
github.com/jmoiron/sqlx v1.4.0
github.com/json-iterator/go v1.1.12
github.com/klauspost/compress v1.18.0
github.com/kolesa-team/go-webp v1.0.4
github.com/mattn/go-sqlite3 v1.14.24
github.com/patrickmn/go-cache v2.1.0+incompatible
Expand Down Expand Up @@ -60,7 +60,6 @@ require (
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.2 // indirect
github.com/DataDog/datadog-go v4.8.3+incompatible // indirect
github.com/DataDog/zstd v1.5.7 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand Down Expand Up @@ -133,6 +132,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmhodges/levigo v1.0.0 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.10 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
Expand Down
10 changes: 4 additions & 6 deletions pkg/cascadekit/cascadekit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"testing"

"github.com/LumeraProtocol/supernode/v2/pkg/codec"
"github.com/klauspost/compress/zstd"
"github.com/DataDog/zstd"
)

func TestExtractIndexAndCreatorSig_Strict(t *testing.T) {
Expand All @@ -32,9 +32,7 @@ func TestParseCompressedIndexFile_Strict(t *testing.T) {
}
payload := []byte(idxB64 + "." + base64.StdEncoding.EncodeToString([]byte("sig2")) + ".0")

enc, _ := zstd.NewWriter(nil)
defer enc.Close()
compressed := enc.EncodeAll(payload, nil)
compressed, _ := zstd.CompressLevel(nil, payload, 3)

got, err := ParseCompressedIndexFile(compressed)
if err != nil {
Expand All @@ -45,12 +43,12 @@ func TestParseCompressedIndexFile_Strict(t *testing.T) {
}

// malformed: only two segments
compressedBad := enc.EncodeAll([]byte("a.b"), nil)
compressedBad, _ := zstd.CompressLevel(nil, []byte("a.b"), 3)
if _, err := ParseCompressedIndexFile(compressedBad); err == nil {
t.Fatalf("expected error for two segments")
}
// malformed: four segments
compressedBad4 := enc.EncodeAll([]byte("a.b.c.d"), nil)
compressedBad4, _ := zstd.CompressLevel(nil, []byte("a.b.c.d"), 3)
if _, err := ParseCompressedIndexFile(compressedBad4); err == nil {
t.Fatalf("expected error for four segments")
}
Expand Down
29 changes: 12 additions & 17 deletions pkg/cascadekit/ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/LumeraProtocol/supernode/v2/pkg/errors"
"github.com/LumeraProtocol/supernode/v2/pkg/utils"
"github.com/cosmos/btcutil/base58"
"github.com/klauspost/compress/zstd"
"github.com/DataDog/zstd"
)

// GenerateLayoutIDs computes IDs for redundant layout files (not the final index IDs).
Expand Down Expand Up @@ -36,13 +36,6 @@ func generateIDFiles(base []byte, ic uint32, max uint32) (ids []string, files []
ids = make([]string, 0, max)
var buffer bytes.Buffer

// Reuse a single zstd encoder across iterations
enc, zerr := zstd.NewWriter(nil)
if zerr != nil {
return ids, idFiles, errors.Errorf("compress identifiers file: %w", zerr)
}
defer enc.Close()

for i := uint32(0); i < max; i++ {
buffer.Reset()
counter := ic + i
Expand All @@ -54,7 +47,11 @@ func generateIDFiles(base []byte, ic uint32, max uint32) (ids []string, files []
cnt := strconv.AppendUint(tmp[:0], uint64(counter), 10)
buffer.Write(cnt)

compressedData := enc.EncodeAll(buffer.Bytes(), nil)
// Compress with official zstd C library at level 3 (matches SDK-JS)
compressedData, zerr := zstd.CompressLevel(nil, buffer.Bytes(), 3)
if zerr != nil {
return ids, idFiles, errors.Errorf("compress identifiers file: %w", zerr)
}

idFiles = append(idFiles, compressedData)

Expand All @@ -70,20 +67,14 @@ func generateIDFiles(base []byte, ic uint32, max uint32) (ids []string, files []
}

// generateIDs computes base58(blake3(zstd(base + '.' + counter))) for counters ic..ic+max-1.
// It reuses a single zstd encoder and avoids per-iteration heap churn.
// Uses official zstd C library at level 3 to match SDK-JS compression.
func generateIDs(base []byte, ic, max uint32) ([]string, error) {
ids := make([]string, max)

var buffer bytes.Buffer
// Reserve base length + dot + up to 10 digits
buffer.Grow(len(base) + 12)

enc, err := zstd.NewWriter(nil)
if err != nil {
return nil, errors.Errorf("zstd encoder init: %w", err)
}
defer enc.Close()

for i := uint32(0); i < max; i++ {
buffer.Reset()
buffer.Write(base)
Expand All @@ -92,7 +83,11 @@ func generateIDs(base []byte, ic, max uint32) ([]string, error) {
cnt := strconv.AppendUint(tmp[:0], uint64(ic+i), 10)
buffer.Write(cnt)

compressed := enc.EncodeAll(buffer.Bytes(), nil)
// Compress with official zstd C library at level 3 (matches SDK-JS)
compressed, err := zstd.CompressLevel(nil, buffer.Bytes(), 3)
if err != nil {
return nil, errors.Errorf("zstd compress (i=%d): %w", i, err)
}
h, err := utils.Blake3Hash(compressed)
if err != nil {
return nil, errors.Errorf("blake3 hash (i=%d): %w", i, err)
Expand Down
77 changes: 12 additions & 65 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,21 @@ import (
"encoding/base64"
"encoding/binary"
"fmt"
"io"
"log"
"math"
"math/big"
"net"
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"

"github.com/LumeraProtocol/supernode/v2/pkg/errors"
"golang.org/x/sync/semaphore"

"github.com/klauspost/compress/zstd"
"github.com/DataDog/zstd"
)

const (
Expand Down Expand Up @@ -262,49 +260,22 @@ func Compress(data []byte, level int) ([]byte, error) {
return nil, fmt.Errorf("invalid compression level: %d - allowed levels are 1 - 4", level)
}

numCPU := runtime.NumCPU()
// Create a buffer to store compressed data
var compressedData bytes.Buffer

// Create a new Zstd encoder with concurrency set to the number of CPU cores
encoder, err := zstd.NewWriter(&compressedData, zstd.WithEncoderConcurrency(numCPU), zstd.WithEncoderLevel(zstd.EncoderLevel(level)))
if err != nil {
return nil, fmt.Errorf("failed to create Zstd encoder: %v", err)
}

// Perform the compression
_, err = io.Copy(encoder, bytes.NewReader(data))
compressed, err := zstd.CompressLevel(nil, data, level)
if err != nil {
return nil, fmt.Errorf("failed to compress data: %v", err)
}

// Close the encoder to flush any remaining data
if err := encoder.Close(); err != nil {
return nil, fmt.Errorf("failed to close encoder: %v", err)
}

return compressedData.Bytes(), nil
return compressed, nil
}

// Decompress decompresses the data
func Decompress(data []byte) ([]byte, error) {
// Get the number of CPU cores available
numCPU := runtime.NumCPU()

// Create a new Zstd decoder with concurrency set to the number of CPU cores
decoder, err := zstd.NewReader(bytes.NewReader(data), zstd.WithDecoderConcurrency(numCPU))
if err != nil {
return nil, fmt.Errorf("failed to create Zstd decoder: %v", err)
}
defer decoder.Close()

// Perform the decompression
decompressedData, err := io.ReadAll(decoder)
decompressed, err := zstd.Decompress(nil, data)
if err != nil {
return nil, fmt.Errorf("failed to decompress data: %v", err)
}

return decompressedData, nil
return decompressed, nil
}

// RandomDuration returns a random duration between min and max
Expand All @@ -319,28 +290,20 @@ func RandomDuration(min, max int) time.Duration {
}

func ZstdCompress(data []byte) ([]byte, error) {
encoder, err := zstd.NewWriter(nil)
compressed, err := zstd.CompressLevel(nil, data, 3)
if err != nil {
return nil, fmt.Errorf("failed to create zstd encoder: %v", err)
return nil, fmt.Errorf("failed to compress with zstd: %v", err)
}
defer encoder.Close()

return encoder.EncodeAll(data, nil), nil
return compressed, nil
}
Comment on lines 292 to 298
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ZstdCompress is functionally equivalent to Compress with level=3. Consider reusing Compress to reduce duplication and centralize logic.

func ZstdCompress(data []byte) ([]byte, error) {
  return Compress(data, 3)
}

Fix it with Roo Code or mention @roomote and request a fix.


func ZstdDecompress(data []byte) ([]byte, error) {
decoder, err := zstd.NewReader(nil)
if err != nil {
return nil, fmt.Errorf("failed to create zstd decoder: %v", err)
}
defer decoder.Close()

decoded, err := decoder.DecodeAll(data, nil)
decompressed, err := zstd.Decompress(nil, data)
if err != nil {
return nil, fmt.Errorf("failed to decompress zstd data: %v", err)
}

return decoded, nil
return decompressed, nil
}

// HighCompress compresses the data
Expand All @@ -354,28 +317,12 @@ func HighCompress(cctx context.Context, data []byte) ([]byte, error) {
}
defer sem.Release(semaphoreWeight) // Ensure that the semaphore is always released

numCPU := runtime.NumCPU()
// Create a buffer to store compressed data
var compressedData bytes.Buffer

// Create a new Zstd encoder with concurrency set to the number of CPU cores
encoder, err := zstd.NewWriter(&compressedData, zstd.WithEncoderConcurrency(numCPU), zstd.WithEncoderLevel(zstd.EncoderLevel(highCompressionLevel)))
if err != nil {
return nil, fmt.Errorf("failed to create Zstd encoder: %v", err)
}

// Perform the compression
_, err = io.Copy(encoder, bytes.NewReader(data))
compressed, err := zstd.CompressLevel(nil, data, highCompressionLevel)
if err != nil {
return nil, fmt.Errorf("failed to compress data: %v", err)
}

// Close the encoder to flush any remaining data
if err := encoder.Close(); err != nil {
return nil, fmt.Errorf("failed to close encoder: %v", err)
}

return compressedData.Bytes(), nil
return compressed, nil
}

// LoadSymbols takes a directory path and a map where keys are filenames. It reads each file in the directory
Expand Down
Loading