Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `post-tx` command and force inclusion server to submit transaction directly to the DA layer. ([#2888](https://github.com/evstack/ev-node/pull/2888))
Additionally, modified the core package to support marking transactions as forced included transactions.
The execution client ought to perform basic validation on those transactions as they have skipped the execution client's mempool.
- Add batching stategies (default stay time-based, unchanged with previous betas). Currently available strategies are `time`, `size`, `immediate` and `adaptive`.

### Changed

Expand Down
4 changes: 2 additions & 2 deletions block/internal/cache/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func BenchmarkManager_GetPendingHeaders(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
hs, err := m.GetPendingHeaders(ctx)
hs, _, err := m.GetPendingHeaders(ctx)
if err != nil {
b.Fatal(err)
}
Expand All @@ -93,7 +93,7 @@ func BenchmarkManager_GetPendingData(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
ds, err := m.GetPendingData(ctx)
ds, _, err := m.GetPendingData(ctx)
if err != nil {
b.Fatal(err)
}
Expand Down
20 changes: 11 additions & 9 deletions block/internal/cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ type CacheManager interface {

// PendingManager provides operations for managing pending headers and data
type PendingManager interface {
GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error)
GetPendingData(ctx context.Context) ([]*types.SignedData, error)
GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, [][]byte, error)
GetPendingData(ctx context.Context) ([]*types.SignedData, [][]byte, error)
SetLastSubmittedHeaderHeight(ctx context.Context, height uint64)
SetLastSubmittedDataHeight(ctx context.Context, height uint64)
NumPendingHeaders() uint64
Expand Down Expand Up @@ -318,20 +318,21 @@ func (m *implementation) DeleteHeight(blockHeight uint64) {
}

// Pending operations
func (m *implementation) GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error) {
func (m *implementation) GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, [][]byte, error) {
return m.pendingHeaders.GetPendingHeaders(ctx)
}

func (m *implementation) GetPendingData(ctx context.Context) ([]*types.SignedData, error) {
// Get pending raw data
dataList, err := m.pendingData.GetPendingData(ctx)
func (m *implementation) GetPendingData(ctx context.Context) ([]*types.SignedData, [][]byte, error) {
// Get pending raw data with marshalled bytes
dataList, marshalledData, err := m.pendingData.GetPendingData(ctx)
if err != nil {
return nil, err
return nil, nil, err
}

// Convert to SignedData (this logic was in manager.go)
signedDataList := make([]*types.SignedData, 0, len(dataList))
for _, data := range dataList {
marshalledSignedData := make([][]byte, 0, len(dataList))
for i, data := range dataList {
if len(data.Txs) == 0 {
continue // Skip empty data
}
Expand All @@ -342,9 +343,10 @@ func (m *implementation) GetPendingData(ctx context.Context) ([]*types.SignedDat
Data: *data,
// Signature and Signer will be set by executing component
})
marshalledSignedData = append(marshalledSignedData, marshalledData[i])
}

return signedDataList, nil
return signedDataList, marshalledSignedData, nil
}

func (m *implementation) SetLastSubmittedHeaderHeight(ctx context.Context, height uint64) {
Expand Down
8 changes: 4 additions & 4 deletions block/internal/cache/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,14 @@ func TestPendingHeadersAndData_Flow(t *testing.T) {
require.NoError(t, err)

// headers: all 3 should be pending initially
headers, err := cm.GetPendingHeaders(ctx)
headers, _, err := cm.GetPendingHeaders(ctx)
require.NoError(t, err)
require.Len(t, headers, 3)
assert.Equal(t, uint64(1), headers[0].Height())
assert.Equal(t, uint64(3), headers[2].Height())

// data: empty one filtered, so 2 and 3 only
signedData, err := cm.GetPendingData(ctx)
signedData, _, err := cm.GetPendingData(ctx)
require.NoError(t, err)
require.Len(t, signedData, 2)
assert.Equal(t, uint64(2), signedData[0].Height())
Expand All @@ -200,12 +200,12 @@ func TestPendingHeadersAndData_Flow(t *testing.T) {
cm.SetLastSubmittedHeaderHeight(ctx, 1)
cm.SetLastSubmittedDataHeight(ctx, 2)

headers, err = cm.GetPendingHeaders(ctx)
headers, _, err = cm.GetPendingHeaders(ctx)
require.NoError(t, err)
require.Len(t, headers, 2)
assert.Equal(t, uint64(2), headers[0].Height())

signedData, err = cm.GetPendingData(ctx)
signedData, _, err = cm.GetPendingData(ctx)
require.NoError(t, err)
require.Len(t, signedData, 1)
assert.Equal(t, uint64(3), signedData[0].Height())
Expand Down
30 changes: 30 additions & 0 deletions block/internal/cache/pending_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"sync"
"sync/atomic"

ds "github.com/ipfs/go-datastore"
Expand All @@ -22,6 +23,9 @@ type pendingBase[T any] struct {
metaKey string
fetch func(ctx context.Context, store store.Store, height uint64) (T, error)
lastHeight atomic.Uint64

// Marshalling cache to avoid redundant marshalling
marshalledCache sync.Map // key: uint64 (height), value: []byte
}

// newPendingBase constructs a new pendingBase for a given type.
Expand Down Expand Up @@ -80,6 +84,9 @@ func (pb *pendingBase[T]) setLastSubmittedHeight(ctx context.Context, newLastSub
if err != nil {
pb.logger.Error().Err(err).Msg("failed to store height of latest item submitted to DA")
}

// Clear marshalled cache for submitted heights
pb.clearMarshalledCacheUpTo(newLastSubmittedHeight)
}
}

Expand All @@ -101,3 +108,26 @@ func (pb *pendingBase[T]) init() error {
pb.lastHeight.CompareAndSwap(0, lsh)
return nil
}

// getMarshalledForHeight returns cached marshalled bytes for a height, or nil if not cached
func (pb *pendingBase[T]) getMarshalledForHeight(height uint64) []byte {
if val, ok := pb.marshalledCache.Load(height); ok {
return val.([]byte)
}
return nil
}

// setMarshalledForHeight caches marshalled bytes for a height
func (pb *pendingBase[T]) setMarshalledForHeight(height uint64, marshalled []byte) {
pb.marshalledCache.Store(height, marshalled)
}

// clearMarshalledCacheUpTo removes cached marshalled bytes up to and including the given height
func (pb *pendingBase[T]) clearMarshalledCacheUpTo(height uint64) {
pb.marshalledCache.Range(func(key, _ any) bool {
if h := key.(uint64); h <= height {
pb.marshalledCache.Delete(h)
}
return true
})
}
2 changes: 1 addition & 1 deletion block/internal/cache/pending_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestPendingBase_ErrorConditions(t *testing.T) {
// ensure store height stays lower (0)
ph, err := NewPendingHeaders(st, logger)
require.NoError(t, err)
pending, err := ph.GetPendingHeaders(ctx)
pending, _, err := ph.GetPendingHeaders(ctx)
assert.Error(t, err)
assert.Len(t, pending, 0)

Expand Down
40 changes: 37 additions & 3 deletions block/internal/cache/pending_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"context"
"fmt"

"github.com/rs/zerolog"

Expand Down Expand Up @@ -46,9 +47,42 @@ func (pd *PendingData) init() error {
return pd.base.init()
}

// GetPendingData returns a sorted slice of pending Data.
func (pd *PendingData) GetPendingData(ctx context.Context) ([]*types.Data, error) {
return pd.base.getPending(ctx)
// GetPendingData returns a sorted slice of pending Data along with their marshalled bytes.
// It uses an internal cache to avoid re-marshalling data on subsequent calls.
func (pd *PendingData) GetPendingData(ctx context.Context) ([]*types.Data, [][]byte, error) {
dataList, err := pd.base.getPending(ctx)
if err != nil {
return nil, nil, err
}

if len(dataList) == 0 {
return nil, nil, nil
}

marshalled := make([][]byte, len(dataList))
lastSubmitted := pd.base.lastHeight.Load()

for i, data := range dataList {
height := lastSubmitted + uint64(i) + 1

// Try to get from cache first
if cached := pd.base.getMarshalledForHeight(height); cached != nil {
marshalled[i] = cached
continue
}

// Marshal if not in cache
dataBytes, err := data.MarshalBinary()
if err != nil {
return nil, nil, fmt.Errorf("failed to marshal data at height %d: %w", height, err)
}
marshalled[i] = dataBytes

// Store in cache
pd.base.setMarshalledForHeight(height, dataBytes)
}

return dataList, marshalled, nil
}

func (pd *PendingData) NumPendingData() uint64 {
Expand Down
6 changes: 3 additions & 3 deletions block/internal/cache/pending_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestPendingData_BasicFlow(t *testing.T) {

// initially all 3 data items are pending, incl. empty
require.Equal(t, uint64(3), pendingData.NumPendingData())
pendingDataList, err := pendingData.GetPendingData(ctx)
pendingDataList, _, err := pendingData.GetPendingData(ctx)
require.NoError(t, err)
require.Len(t, pendingDataList, 3)
require.Equal(t, uint64(1), pendingDataList[0].Height())
Expand All @@ -53,7 +53,7 @@ func TestPendingData_BasicFlow(t *testing.T) {
require.Equal(t, uint64(1), binary.LittleEndian.Uint64(metadataRaw))

require.Equal(t, uint64(2), pendingData.NumPendingData())
pendingDataList, err = pendingData.GetPendingData(ctx)
pendingDataList, _, err = pendingData.GetPendingData(ctx)
require.NoError(t, err)
require.Len(t, pendingDataList, 2)
require.Equal(t, uint64(2), pendingDataList[0].Height())
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestPendingData_GetPending_PropagatesFetchError(t *testing.T) {
require.NoError(t, err)

// fetching pending should propagate the not-found error from store
pending, err := pendingData.GetPendingData(ctx)
pending, _, err := pendingData.GetPendingData(ctx)
require.Error(t, err)
require.Empty(t, pending)
}
40 changes: 37 additions & 3 deletions block/internal/cache/pending_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"context"
"fmt"

"github.com/rs/zerolog"

Expand Down Expand Up @@ -39,9 +40,42 @@ func NewPendingHeaders(store storepkg.Store, logger zerolog.Logger) (*PendingHea
return &PendingHeaders{base: base}, nil
}

// GetPendingHeaders returns a sorted slice of pending headers.
func (ph *PendingHeaders) GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error) {
return ph.base.getPending(ctx)
// GetPendingHeaders returns a sorted slice of pending headers along with their marshalled bytes.
// It uses an internal cache to avoid re-marshalling headers on subsequent calls.
func (ph *PendingHeaders) GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, [][]byte, error) {
headers, err := ph.base.getPending(ctx)
if err != nil {
return nil, nil, err
}

if len(headers) == 0 {
return nil, nil, nil
}

marshalled := make([][]byte, len(headers))
lastSubmitted := ph.base.lastHeight.Load()

for i, header := range headers {
height := lastSubmitted + uint64(i) + 1

// Try to get from cache first
if cached := ph.base.getMarshalledForHeight(height); cached != nil {
marshalled[i] = cached
continue
}

// Marshal if not in cache
data, err := header.MarshalBinary()
if err != nil {
return nil, nil, fmt.Errorf("failed to marshal header at height %d: %w", height, err)
}
marshalled[i] = data

// Store in cache
ph.base.setMarshalledForHeight(height, data)
}

return headers, marshalled, nil
}

func (ph *PendingHeaders) NumPendingHeaders() uint64 {
Expand Down
6 changes: 3 additions & 3 deletions block/internal/cache/pending_headers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestPendingHeaders_BasicFlow(t *testing.T) {

// initially all three are pending
require.Equal(t, uint64(3), pendingHeaders.NumPendingHeaders())
headers, err := pendingHeaders.GetPendingHeaders(ctx)
headers, _, err := pendingHeaders.GetPendingHeaders(ctx)
require.NoError(t, err)
require.Len(t, headers, 3)
require.Equal(t, uint64(1), headers[0].Height())
Expand All @@ -53,7 +53,7 @@ func TestPendingHeaders_BasicFlow(t *testing.T) {
require.Equal(t, uint64(2), binary.LittleEndian.Uint64(metadataRaw))

require.Equal(t, uint64(1), pendingHeaders.NumPendingHeaders())
headers, err = pendingHeaders.GetPendingHeaders(ctx)
headers, _, err = pendingHeaders.GetPendingHeaders(ctx)
require.NoError(t, err)
require.Len(t, headers, 1)
require.Equal(t, uint64(3), headers[0].Height())
Expand Down Expand Up @@ -82,7 +82,7 @@ func TestPendingHeaders_EmptyWhenUpToDate(t *testing.T) {
// set last submitted to the current height, so nothing pending
pendingHeaders.SetLastSubmittedHeaderHeight(ctx, 1)
require.Equal(t, uint64(0), pendingHeaders.NumPendingHeaders())
headers, err := pendingHeaders.GetPendingHeaders(ctx)
headers, _, err := pendingHeaders.GetPendingHeaders(ctx)
require.NoError(t, err)
require.Empty(t, headers)
}
2 changes: 1 addition & 1 deletion block/internal/common/consts.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package common

const DefaultMaxBlobSize = 2 * 1024 * 1024 // 2MB fallback blob size limit
const DefaultMaxBlobSize = 7 * 1024 * 1024 // 7MB fallback blob size limit
Loading
Loading