Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion pkg/app/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
ethrpcminer "github.com/chainsafe/canton-middleware/pkg/ethrpc/miner"
ethrpc "github.com/chainsafe/canton-middleware/pkg/ethrpc/service"
ethrpcstore "github.com/chainsafe/canton-middleware/pkg/ethrpc/store"
ethrpcsubmitter "github.com/chainsafe/canton-middleware/pkg/ethrpc/submitter"
indexerclient "github.com/chainsafe/canton-middleware/pkg/indexer/client"
"github.com/chainsafe/canton-middleware/pkg/keys"
"github.com/chainsafe/canton-middleware/pkg/log"
Expand Down Expand Up @@ -229,15 +230,33 @@ func initServices(
transferCache := transfer.NewPreparedTransferCache(transferCacheTTL, transferCacheMaxSize)
go transferCache.Start(ctx)

tokenService := token.NewTokenService(cfg.Token, tokenDataProvider, userStore, cantonClient.Token)

if cfg.EthRPC.Enabled {
m := ethrpcminer.New(evmStore, cfg.EthRPC.ChainID, cfg.EthRPC.GasLimit, cfg.EthRPC.MinerMaxTxsPerBlock, cfg.EthRPC.MinerInterval, logger)
go m.Start(ctx)

// Async submitter: drives pending mempool entries → completed/failed by
// calling Canton. SendRawTransaction returns the tx hash immediately
// after the pending insert; this worker is what actually moves money.
// Runs SubmitterConcurrency transfers in parallel; each Canton call is
// bounded by a package-level timeout so a hung gRPC call can't drain
// the pool.
sub := ethrpcsubmitter.New(
evmStore,
tokenService,
cfg.EthRPC.SubmitterInterval,
cfg.EthRPC.SubmitterBatchSize,
cfg.EthRPC.SubmitterConcurrency,
logger,
)
go sub.Start(ctx)
}

transferSvc := transfer.NewTransferService(cantonClient.Token, userStore, transferCache, cfg.Token, indexerClient)
return &services{
evmStore: evmStore,
tokenService: token.NewTokenService(cfg.Token, tokenDataProvider, userStore, cantonClient.Token),
tokenService: tokenService,
regSvc: userservice.NewLog(registrationService, logger),
transferSvc: transfer.NewLog(transferSvc, logger),
}, nil
Expand Down
15 changes: 15 additions & 0 deletions pkg/ethrpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,19 @@ type Config struct {
RequestTimeout time.Duration `yaml:"request_timeout" default:"30s"`
MinerInterval time.Duration `yaml:"miner_interval" default:"2s"`
MinerMaxTxsPerBlock int `yaml:"miner_max_txs_per_block" default:"500"`
// SubmitterInterval controls how often the submitter drains pending
// mempool entries by calling Canton. Kept tight by default so that
// eth_sendRawTransaction → on-Canton latency stays close to Canton's
// own commit time even though the HTTP call returns immediately.
SubmitterInterval time.Duration `yaml:"submitter_interval" default:"500ms"`
// SubmitterBatchSize caps the number of pending entries fetched in a
// single submitter tick (0 = unlimited). Bounded so a backlog never
// loads the entire pending queue into memory; the next tick picks up
// whatever is left.
SubmitterBatchSize int `yaml:"submitter_batch_size" default:"100"`
// SubmitterConcurrency is the number of pending entries the submitter
// processes in parallel within one tick. Canton commits typically take
// 5-15s, so 10 parallel transfers give ~10x throughput vs sequential
// without hammering Canton or saturating the gRPC connection.
SubmitterConcurrency int `yaml:"submitter_concurrency" default:"10"`
}
36 changes: 32 additions & 4 deletions pkg/ethrpc/miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,15 @@ func (m *Miner) mine(ctx context.Context) error {
}

blockTimestamp := uint64(time.Now().Unix()) //nolint:gosec // time.Now() is always positive
// logIndex is block-relative and contiguous across all logs in the block —
// it must not skip when a failed tx emits zero logs. Track it separately
// from txIndex so go-ethereum tooling that relies on contiguous indices
// (e.g. for ordering or de-dup) behaves correctly.
logIndex := uint(0)
for i := range entries {
e := &entries[i]
txIndex := uint(i) //nolint:gosec // i is bounded by len(entries) which fits in uint
succeeded := e.Status == ethrpc.MempoolCompleted

evmTx := &ethrpc.EvmTransaction{
TxHash: e.TxHash,
Expand All @@ -97,19 +103,28 @@ func (m *Miner) mine(ctx context.Context) error {
Nonce: e.Nonce,
Input: e.Input,
ValueWei: "0",
Status: 1,
Status: txStatus(succeeded),
BlockNumber: block.Number(),
BlockHash: block.Hash(),
TxIndex: txIndex,
GasUsed: m.gasLimit,
}
if !succeeded {
evmTx.ErrorMessage = e.ErrorMessage
}
if err = block.AddEvmTransaction(ctx, evmTx); err != nil {
return err
}

if err = block.AddEvmLog(ctx, buildTransferLog(e, block, txIndex, blockTimestamp)); err != nil {
// Failed transfers never executed on Canton, so they have no Transfer log.
// Mining the EVM tx with status=0 surfaces the failure via getTransactionReceipt.
if !succeeded {
continue
}
if err = block.AddEvmLog(ctx, buildTransferLog(e, block, txIndex, logIndex, blockTimestamp)); err != nil {
return err
}
logIndex++
}

if err = block.Finalize(ctx); err != nil {
Expand All @@ -123,9 +138,22 @@ func (m *Miner) mine(ctx context.Context) error {
return nil
}

// txStatus maps the entry outcome to the EVM-standard transaction receipt
// status (0x1 success, 0x0 failure) so MetaMask and other wallets render
// failed transfers correctly.
func txStatus(succeeded bool) uint8 {
if succeeded {
return 1
}
return 0
}

// buildTransferLog constructs the synthetic ERC-20 Transfer event log for a
// completed mempool entry. blockTimestamp is Unix seconds captured once per block.
func buildTransferLog(e *ethrpc.MempoolEntry, block ethrpc.PendingBlock, txIndex uint, blockTimestamp uint64) *ethrpc.EvmLog {
// txIndex is the tx position in the block; logIndex is the *log* position in the
// block (block-relative, contiguous across all logs — the two diverge whenever
// a failed tx contributes a status=0 receipt with no logs).
func buildTransferLog(e *ethrpc.MempoolEntry, block ethrpc.PendingBlock, txIndex, logIndex uint, blockTimestamp uint64) *ethrpc.EvmLog {
fromAddr := common.HexToAddress(e.FromAddress)
toAddr := common.HexToAddress(e.RecipientAddress)
fromTopic := common.BytesToHash(common.LeftPadBytes(fromAddr.Bytes(), evmWordSize))
Expand All @@ -135,7 +163,7 @@ func buildTransferLog(e *ethrpc.MempoolEntry, block ethrpc.PendingBlock, txIndex

return &ethrpc.EvmLog{
TxHash: e.TxHash,
LogIndex: txIndex,
LogIndex: logIndex,
Address: contractAddr.Bytes(),
Topics: [][]byte{transferEventTopic.Bytes(), fromTopic.Bytes(), toTopic.Bytes()},
Data: amountData,
Expand Down
105 changes: 102 additions & 3 deletions pkg/ethrpc/miner/miner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,103 @@ func TestMine_SingleEntry_CommitsBlock(t *testing.T) {
require.NoError(t, m.mine(context.Background()))
}

func TestMine_FailedEntry_MinesAsStatus0_NoTransferLog(t *testing.T) {
entry := sampleEntry(0x09,
"0xaaaa000000000000000000000000000000000099",
"0xcccc000000000000000000000000000000000099",
"0xdddd000000000000000000000000000000000099",
0, 7,
)
entry.Status = ethrpc.MempoolFailed
entry.ErrorMessage = "canton transfer failed: insufficient balance"

block := setupBlock(t, 6)
block.EXPECT().AddEvmTransaction(mock.Anything, mock.MatchedBy(func(tx *ethrpc.EvmTransaction) bool {
return tx.Status == 0 && tx.ErrorMessage == entry.ErrorMessage && tx.BlockNumber == 6
})).Return(nil).Once()
// No AddEvmLog expectation — failed entries must skip the Transfer event.
block.EXPECT().Finalize(mock.Anything).Return(nil).Once()
block.EXPECT().ClaimMempoolEntries(mock.Anything, testMaxTxsPerBlock).Return([]ethrpc.MempoolEntry{entry}, nil)

store := mocks.NewStore(t)
store.EXPECT().NewBlock(mock.Anything, testChainID).Return(block, nil)

m := newTestMiner(store)
require.NoError(t, m.mine(context.Background()))
block.AssertNotCalled(t, "AddEvmLog", mock.Anything, mock.Anything)
}

// TestMine_LogIndexContiguousAcrossFailures ensures that when failed transactions
// emit zero logs, the surviving Transfer logs still get block-relative contiguous
// LogIndex values (0, 1, 2…) — matching Ethereum's expected log indexing.
// Pattern: success, fail, success → logs should land at LogIndex 0 and 1, not 0 and 2.
func TestMine_LogIndexContiguousAcrossFailures(t *testing.T) {
ok1 := sampleEntry(0x20, "0xaaaa000000000000000000000000000000000020",
"0xcccc000000000000000000000000000000000001",
"0xdddd000000000000000000000000000000000020", 0, 100)
ok1.Status = ethrpc.MempoolCompleted

fail := sampleEntry(0x21, "0xaaaa000000000000000000000000000000000021",
"0xcccc000000000000000000000000000000000001",
"0xdddd000000000000000000000000000000000021", 1, 200)
fail.Status = ethrpc.MempoolFailed

ok2 := sampleEntry(0x22, "0xaaaa000000000000000000000000000000000022",
"0xcccc000000000000000000000000000000000001",
"0xdddd000000000000000000000000000000000022", 2, 300)
ok2.Status = ethrpc.MempoolCompleted

block := setupBlock(t, 9)
block.EXPECT().AddEvmTransaction(mock.Anything, mock.Anything).Return(nil).Times(3)
// Exactly two Transfer logs (only the two successes); LogIndex must be 0, 1.
block.EXPECT().AddEvmLog(mock.Anything, mock.MatchedBy(func(log *ethrpc.EvmLog) bool {
return log.LogIndex == 0 && log.TxIndex == 0
})).Return(nil).Once()
block.EXPECT().AddEvmLog(mock.Anything, mock.MatchedBy(func(log *ethrpc.EvmLog) bool {
return log.LogIndex == 1 && log.TxIndex == 2
})).Return(nil).Once()
block.EXPECT().Finalize(mock.Anything).Return(nil).Once()
block.EXPECT().ClaimMempoolEntries(mock.Anything, testMaxTxsPerBlock).
Return([]ethrpc.MempoolEntry{ok1, fail, ok2}, nil)

store := mocks.NewStore(t)
store.EXPECT().NewBlock(mock.Anything, testChainID).Return(block, nil)

m := newTestMiner(store)
require.NoError(t, m.mine(context.Background()))
}

func TestMine_MixedCompletedAndFailed(t *testing.T) {
completed := sampleEntry(0x10, "0xaaaa000000000000000000000000000000000010",
"0xcccc000000000000000000000000000000000001",
"0xdddd000000000000000000000000000000000010", 0, 100)
completed.Status = ethrpc.MempoolCompleted

failed := sampleEntry(0x11, "0xaaaa000000000000000000000000000000000011",
"0xcccc000000000000000000000000000000000001",
"0xdddd000000000000000000000000000000000011", 1, 200)
failed.Status = ethrpc.MempoolFailed
failed.ErrorMessage = "sender not found"

block := setupBlock(t, 8)
block.EXPECT().AddEvmTransaction(mock.Anything, mock.MatchedBy(func(tx *ethrpc.EvmTransaction) bool {
return tx.TxIndex == 0 && tx.Status == 1 && tx.ErrorMessage == ""
})).Return(nil).Once()
block.EXPECT().AddEvmLog(mock.Anything, mock.Anything).Return(nil).Once() // only for the success
block.EXPECT().AddEvmTransaction(mock.Anything, mock.MatchedBy(func(tx *ethrpc.EvmTransaction) bool {
return tx.TxIndex == 1 && tx.Status == 0 && tx.ErrorMessage == failed.ErrorMessage
})).Return(nil).Once()
block.EXPECT().Finalize(mock.Anything).Return(nil).Once()
block.EXPECT().ClaimMempoolEntries(mock.Anything, testMaxTxsPerBlock).
Return([]ethrpc.MempoolEntry{completed, failed}, nil)

store := mocks.NewStore(t)
store.EXPECT().NewBlock(mock.Anything, testChainID).Return(block, nil)

m := newTestMiner(store)
require.NoError(t, m.mine(context.Background()))
}

func TestMine_MultipleEntries_CorrectTxIndexAndHashes(t *testing.T) {
entries := []ethrpc.MempoolEntry{
sampleEntry(0x01, "0xaaaa000000000000000000000000000000000001",
Expand Down Expand Up @@ -244,12 +341,14 @@ func TestBuildTransferLog_CorrectTopicsAndData(t *testing.T) {
block.EXPECT().Number().Return(uint64(7)).Maybe()
block.EXPECT().Hash().Return(blockHash).Maybe()

log := buildTransferLog(entry, block, 3, 0)
log := buildTransferLog(entry, block, 3, 1, 0)

assert.Equal(t, uint64(7), log.BlockNumber)
assert.Equal(t, blockHash, log.BlockHash)
assert.Equal(t, uint(3), log.TxIndex)
assert.Equal(t, uint(3), log.LogIndex)
// LogIndex is block-relative and must NOT equal TxIndex when prior txs
// failed (and produced no log).
assert.Equal(t, uint(1), log.LogIndex)
assert.False(t, log.Removed)

// Address = contract address.
Expand Down Expand Up @@ -288,7 +387,7 @@ func TestBuildTransferLog_LargeAmount(t *testing.T) {
block.EXPECT().Number().Return(uint64(1)).Maybe()
block.EXPECT().Hash().Return(blockHash).Maybe()

log := buildTransferLog(entry, block, 0, 0)
log := buildTransferLog(entry, block, 0, 0, 0)

got := new(big.Int).SetBytes(log.Data)
assert.Equal(t, 0, got.Cmp(amount), "round-trip amount mismatch: got %s, want %s", got, amount)
Expand Down
95 changes: 0 additions & 95 deletions pkg/ethrpc/service/mocks/mock_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading