From 95a861c38f2091709fb29708669131cb5bf27f24 Mon Sep 17 00:00:00 2001 From: sadiq1971 Date: Tue, 26 May 2026 10:30:58 +0600 Subject: [PATCH 1/2] feat: async eth_sendRawTransaction (#278) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Return tx hash immediately on eth_sendRawTransaction and drive the Canton transfer asynchronously, so MetaMask and other wallets stop timing out when Canton commits take 5-15s+. - SendRawTransaction now validates the signed tx (signature, contract whitelist, calldata) and inserts a pending mempool row, returning the hash without waiting for Canton. - New pkg/ethrpc/submitter worker polls pending entries on a ticker, calls Canton in parallel (concurrency 10 by default, configurable), and transitions each entry to completed or failed. Canton's tx-hash command-id makes retries idempotent. Each Canton call is bounded by a 60s timeout so a hung gRPC call can't drain the pool. - Miner now claims both completed and failed terminal entries in a single block, mining failures as status=0 EVM transactions with no Transfer log so eth_getTransactionReceipt surfaces them naturally. - LogIndex is tracked separately from TxIndex to stay block-relative contiguous when failed txs contribute zero logs. - RPCReceipt grows an optional `revertReason` field, populated from the Canton error message so clients see why a tx failed instead of just status=0. Standard clients ignore unknown fields. - pgstore GetMempoolEntriesByStatus takes a limit so a backlog never loads the whole pending queue into memory; submitter forwards its batch size (default 100). - New WaitForAPITxReceipt DSL helper for e2e tests that submit signed EVM transactions via the /eth facade — polls eth_getTransactionReceipt and fails on status=0 with the revertReason rather than timing out. --- pkg/app/api/server.go | 21 +- pkg/ethrpc/config.go | 15 + pkg/ethrpc/miner/miner.go | 36 +- pkg/ethrpc/miner/miner_test.go | 105 ++++- pkg/ethrpc/service/mocks/mock_store.go | 95 ---- pkg/ethrpc/service/service.go | 47 +- pkg/ethrpc/service/service_test.go | 86 ++-- pkg/ethrpc/store/pg.go | 88 ++-- pkg/ethrpc/store/pg_test.go | 34 +- pkg/ethrpc/submitter/mocks/mock_erc20.go | 416 +++++++++++++++++ pkg/ethrpc/submitter/mocks/mock_store.go | 192 ++++++++ .../submitter/mocks/mock_token_service.go | 95 ++++ pkg/ethrpc/submitter/submitter.go | 251 ++++++++++ pkg/ethrpc/submitter/submitter_test.go | 437 ++++++++++++++++++ pkg/ethrpc/types.go | 19 +- tests/e2e/devstack/dsl/helpers.go | 87 ++++ 16 files changed, 1823 insertions(+), 201 deletions(-) create mode 100644 pkg/ethrpc/submitter/mocks/mock_erc20.go create mode 100644 pkg/ethrpc/submitter/mocks/mock_store.go create mode 100644 pkg/ethrpc/submitter/mocks/mock_token_service.go create mode 100644 pkg/ethrpc/submitter/submitter.go create mode 100644 pkg/ethrpc/submitter/submitter_test.go diff --git a/pkg/app/api/server.go b/pkg/app/api/server.go index 4fa711ae..55ad8f8a 100644 --- a/pkg/app/api/server.go +++ b/pkg/app/api/server.go @@ -18,6 +18,7 @@ import ( ethrpcminer "github.com/chainsafe/canton-middleware/pkg/ethrpc/miner" ethrpc "github.com/chainsafe/canton-middleware/pkg/ethrpc/service" ethrpcstore "github.com/chainsafe/canton-middleware/pkg/ethrpc/store" + ethrpcsubmitter "github.com/chainsafe/canton-middleware/pkg/ethrpc/submitter" indexerclient "github.com/chainsafe/canton-middleware/pkg/indexer/client" "github.com/chainsafe/canton-middleware/pkg/keys" "github.com/chainsafe/canton-middleware/pkg/log" @@ -229,15 +230,33 @@ func initServices( transferCache := transfer.NewPreparedTransferCache(transferCacheTTL, transferCacheMaxSize) go transferCache.Start(ctx) + tokenService := token.NewTokenService(cfg.Token, tokenDataProvider, userStore, cantonClient.Token) + if cfg.EthRPC.Enabled { m := ethrpcminer.New(evmStore, cfg.EthRPC.ChainID, cfg.EthRPC.GasLimit, cfg.EthRPC.MinerMaxTxsPerBlock, cfg.EthRPC.MinerInterval, logger) go m.Start(ctx) + + // Async submitter: drives pending mempool entries → completed/failed by + // calling Canton. SendRawTransaction returns the tx hash immediately + // after the pending insert; this worker is what actually moves money. + // Runs SubmitterConcurrency transfers in parallel; each Canton call is + // bounded by a package-level timeout so a hung gRPC call can't drain + // the pool. + sub := ethrpcsubmitter.New( + evmStore, + tokenService, + cfg.EthRPC.SubmitterInterval, + cfg.EthRPC.SubmitterBatchSize, + cfg.EthRPC.SubmitterConcurrency, + logger, + ) + go sub.Start(ctx) } transferSvc := transfer.NewTransferService(cantonClient.Token, userStore, transferCache, cfg.Token, indexerClient) return &services{ evmStore: evmStore, - tokenService: token.NewTokenService(cfg.Token, tokenDataProvider, userStore, cantonClient.Token), + tokenService: tokenService, regSvc: userservice.NewLog(registrationService, logger), transferSvc: transfer.NewLog(transferSvc, logger), }, nil diff --git a/pkg/ethrpc/config.go b/pkg/ethrpc/config.go index fd0d13cb..6bd6ce5a 100644 --- a/pkg/ethrpc/config.go +++ b/pkg/ethrpc/config.go @@ -14,4 +14,19 @@ type Config struct { RequestTimeout time.Duration `yaml:"request_timeout" default:"30s"` MinerInterval time.Duration `yaml:"miner_interval" default:"2s"` MinerMaxTxsPerBlock int `yaml:"miner_max_txs_per_block" default:"500"` + // SubmitterInterval controls how often the submitter drains pending + // mempool entries by calling Canton. Kept tight by default so that + // eth_sendRawTransaction → on-Canton latency stays close to Canton's + // own commit time even though the HTTP call returns immediately. + SubmitterInterval time.Duration `yaml:"submitter_interval" default:"500ms"` + // SubmitterBatchSize caps the number of pending entries fetched in a + // single submitter tick (0 = unlimited). Bounded so a backlog never + // loads the entire pending queue into memory; the next tick picks up + // whatever is left. + SubmitterBatchSize int `yaml:"submitter_batch_size" default:"100"` + // SubmitterConcurrency is the number of pending entries the submitter + // processes in parallel within one tick. Canton commits typically take + // 5-15s, so 10 parallel transfers give ~10x throughput vs sequential + // without hammering Canton or saturating the gRPC connection. + SubmitterConcurrency int `yaml:"submitter_concurrency" default:"10"` } diff --git a/pkg/ethrpc/miner/miner.go b/pkg/ethrpc/miner/miner.go index 17879c50..a0047a3b 100644 --- a/pkg/ethrpc/miner/miner.go +++ b/pkg/ethrpc/miner/miner.go @@ -86,9 +86,15 @@ func (m *Miner) mine(ctx context.Context) error { } blockTimestamp := uint64(time.Now().Unix()) //nolint:gosec // time.Now() is always positive + // logIndex is block-relative and contiguous across all logs in the block — + // it must not skip when a failed tx emits zero logs. Track it separately + // from txIndex so go-ethereum tooling that relies on contiguous indices + // (e.g. for ordering or de-dup) behaves correctly. + logIndex := uint(0) for i := range entries { e := &entries[i] txIndex := uint(i) //nolint:gosec // i is bounded by len(entries) which fits in uint + succeeded := e.Status == ethrpc.MempoolCompleted evmTx := ðrpc.EvmTransaction{ TxHash: e.TxHash, @@ -97,19 +103,28 @@ func (m *Miner) mine(ctx context.Context) error { Nonce: e.Nonce, Input: e.Input, ValueWei: "0", - Status: 1, + Status: txStatus(succeeded), BlockNumber: block.Number(), BlockHash: block.Hash(), TxIndex: txIndex, GasUsed: m.gasLimit, } + if !succeeded { + evmTx.ErrorMessage = e.ErrorMessage + } if err = block.AddEvmTransaction(ctx, evmTx); err != nil { return err } - if err = block.AddEvmLog(ctx, buildTransferLog(e, block, txIndex, blockTimestamp)); err != nil { + // Failed transfers never executed on Canton, so they have no Transfer log. + // Mining the EVM tx with status=0 surfaces the failure via getTransactionReceipt. + if !succeeded { + continue + } + if err = block.AddEvmLog(ctx, buildTransferLog(e, block, txIndex, logIndex, blockTimestamp)); err != nil { return err } + logIndex++ } if err = block.Finalize(ctx); err != nil { @@ -123,9 +138,22 @@ func (m *Miner) mine(ctx context.Context) error { return nil } +// txStatus maps the entry outcome to the EVM-standard transaction receipt +// status (0x1 success, 0x0 failure) so MetaMask and other wallets render +// failed transfers correctly. +func txStatus(succeeded bool) uint8 { + if succeeded { + return 1 + } + return 0 +} + // buildTransferLog constructs the synthetic ERC-20 Transfer event log for a // completed mempool entry. blockTimestamp is Unix seconds captured once per block. -func buildTransferLog(e *ethrpc.MempoolEntry, block ethrpc.PendingBlock, txIndex uint, blockTimestamp uint64) *ethrpc.EvmLog { +// txIndex is the tx position in the block; logIndex is the *log* position in the +// block (block-relative, contiguous across all logs — the two diverge whenever +// a failed tx contributes a status=0 receipt with no logs). +func buildTransferLog(e *ethrpc.MempoolEntry, block ethrpc.PendingBlock, txIndex, logIndex uint, blockTimestamp uint64) *ethrpc.EvmLog { fromAddr := common.HexToAddress(e.FromAddress) toAddr := common.HexToAddress(e.RecipientAddress) fromTopic := common.BytesToHash(common.LeftPadBytes(fromAddr.Bytes(), evmWordSize)) @@ -135,7 +163,7 @@ func buildTransferLog(e *ethrpc.MempoolEntry, block ethrpc.PendingBlock, txIndex return ðrpc.EvmLog{ TxHash: e.TxHash, - LogIndex: txIndex, + LogIndex: logIndex, Address: contractAddr.Bytes(), Topics: [][]byte{transferEventTopic.Bytes(), fromTopic.Bytes(), toTopic.Bytes()}, Data: amountData, diff --git a/pkg/ethrpc/miner/miner_test.go b/pkg/ethrpc/miner/miner_test.go index 1163916b..ebfaf208 100644 --- a/pkg/ethrpc/miner/miner_test.go +++ b/pkg/ethrpc/miner/miner_test.go @@ -97,6 +97,103 @@ func TestMine_SingleEntry_CommitsBlock(t *testing.T) { require.NoError(t, m.mine(context.Background())) } +func TestMine_FailedEntry_MinesAsStatus0_NoTransferLog(t *testing.T) { + entry := sampleEntry(0x09, + "0xaaaa000000000000000000000000000000000099", + "0xcccc000000000000000000000000000000000099", + "0xdddd000000000000000000000000000000000099", + 0, 7, + ) + entry.Status = ethrpc.MempoolFailed + entry.ErrorMessage = "canton transfer failed: insufficient balance" + + block := setupBlock(t, 6) + block.EXPECT().AddEvmTransaction(mock.Anything, mock.MatchedBy(func(tx *ethrpc.EvmTransaction) bool { + return tx.Status == 0 && tx.ErrorMessage == entry.ErrorMessage && tx.BlockNumber == 6 + })).Return(nil).Once() + // No AddEvmLog expectation — failed entries must skip the Transfer event. + block.EXPECT().Finalize(mock.Anything).Return(nil).Once() + block.EXPECT().ClaimMempoolEntries(mock.Anything, testMaxTxsPerBlock).Return([]ethrpc.MempoolEntry{entry}, nil) + + store := mocks.NewStore(t) + store.EXPECT().NewBlock(mock.Anything, testChainID).Return(block, nil) + + m := newTestMiner(store) + require.NoError(t, m.mine(context.Background())) + block.AssertNotCalled(t, "AddEvmLog", mock.Anything, mock.Anything) +} + +// TestMine_LogIndexContiguousAcrossFailures ensures that when failed transactions +// emit zero logs, the surviving Transfer logs still get block-relative contiguous +// LogIndex values (0, 1, 2…) — matching Ethereum's expected log indexing. +// Pattern: success, fail, success → logs should land at LogIndex 0 and 1, not 0 and 2. +func TestMine_LogIndexContiguousAcrossFailures(t *testing.T) { + ok1 := sampleEntry(0x20, "0xaaaa000000000000000000000000000000000020", + "0xcccc000000000000000000000000000000000001", + "0xdddd000000000000000000000000000000000020", 0, 100) + ok1.Status = ethrpc.MempoolCompleted + + fail := sampleEntry(0x21, "0xaaaa000000000000000000000000000000000021", + "0xcccc000000000000000000000000000000000001", + "0xdddd000000000000000000000000000000000021", 1, 200) + fail.Status = ethrpc.MempoolFailed + + ok2 := sampleEntry(0x22, "0xaaaa000000000000000000000000000000000022", + "0xcccc000000000000000000000000000000000001", + "0xdddd000000000000000000000000000000000022", 2, 300) + ok2.Status = ethrpc.MempoolCompleted + + block := setupBlock(t, 9) + block.EXPECT().AddEvmTransaction(mock.Anything, mock.Anything).Return(nil).Times(3) + // Exactly two Transfer logs (only the two successes); LogIndex must be 0, 1. + block.EXPECT().AddEvmLog(mock.Anything, mock.MatchedBy(func(log *ethrpc.EvmLog) bool { + return log.LogIndex == 0 && log.TxIndex == 0 + })).Return(nil).Once() + block.EXPECT().AddEvmLog(mock.Anything, mock.MatchedBy(func(log *ethrpc.EvmLog) bool { + return log.LogIndex == 1 && log.TxIndex == 2 + })).Return(nil).Once() + block.EXPECT().Finalize(mock.Anything).Return(nil).Once() + block.EXPECT().ClaimMempoolEntries(mock.Anything, testMaxTxsPerBlock). + Return([]ethrpc.MempoolEntry{ok1, fail, ok2}, nil) + + store := mocks.NewStore(t) + store.EXPECT().NewBlock(mock.Anything, testChainID).Return(block, nil) + + m := newTestMiner(store) + require.NoError(t, m.mine(context.Background())) +} + +func TestMine_MixedCompletedAndFailed(t *testing.T) { + completed := sampleEntry(0x10, "0xaaaa000000000000000000000000000000000010", + "0xcccc000000000000000000000000000000000001", + "0xdddd000000000000000000000000000000000010", 0, 100) + completed.Status = ethrpc.MempoolCompleted + + failed := sampleEntry(0x11, "0xaaaa000000000000000000000000000000000011", + "0xcccc000000000000000000000000000000000001", + "0xdddd000000000000000000000000000000000011", 1, 200) + failed.Status = ethrpc.MempoolFailed + failed.ErrorMessage = "sender not found" + + block := setupBlock(t, 8) + block.EXPECT().AddEvmTransaction(mock.Anything, mock.MatchedBy(func(tx *ethrpc.EvmTransaction) bool { + return tx.TxIndex == 0 && tx.Status == 1 && tx.ErrorMessage == "" + })).Return(nil).Once() + block.EXPECT().AddEvmLog(mock.Anything, mock.Anything).Return(nil).Once() // only for the success + block.EXPECT().AddEvmTransaction(mock.Anything, mock.MatchedBy(func(tx *ethrpc.EvmTransaction) bool { + return tx.TxIndex == 1 && tx.Status == 0 && tx.ErrorMessage == failed.ErrorMessage + })).Return(nil).Once() + block.EXPECT().Finalize(mock.Anything).Return(nil).Once() + block.EXPECT().ClaimMempoolEntries(mock.Anything, testMaxTxsPerBlock). + Return([]ethrpc.MempoolEntry{completed, failed}, nil) + + store := mocks.NewStore(t) + store.EXPECT().NewBlock(mock.Anything, testChainID).Return(block, nil) + + m := newTestMiner(store) + require.NoError(t, m.mine(context.Background())) +} + func TestMine_MultipleEntries_CorrectTxIndexAndHashes(t *testing.T) { entries := []ethrpc.MempoolEntry{ sampleEntry(0x01, "0xaaaa000000000000000000000000000000000001", @@ -244,12 +341,14 @@ func TestBuildTransferLog_CorrectTopicsAndData(t *testing.T) { block.EXPECT().Number().Return(uint64(7)).Maybe() block.EXPECT().Hash().Return(blockHash).Maybe() - log := buildTransferLog(entry, block, 3, 0) + log := buildTransferLog(entry, block, 3, 1, 0) assert.Equal(t, uint64(7), log.BlockNumber) assert.Equal(t, blockHash, log.BlockHash) assert.Equal(t, uint(3), log.TxIndex) - assert.Equal(t, uint(3), log.LogIndex) + // LogIndex is block-relative and must NOT equal TxIndex when prior txs + // failed (and produced no log). + assert.Equal(t, uint(1), log.LogIndex) assert.False(t, log.Removed) // Address = contract address. @@ -288,7 +387,7 @@ func TestBuildTransferLog_LargeAmount(t *testing.T) { block.EXPECT().Number().Return(uint64(1)).Maybe() block.EXPECT().Hash().Return(blockHash).Maybe() - log := buildTransferLog(entry, block, 0, 0) + log := buildTransferLog(entry, block, 0, 0, 0) got := new(big.Int).SetBytes(log.Data) assert.Equal(t, 0, got.Cmp(amount), "round-trip amount mismatch: got %s, want %s", got, amount) diff --git a/pkg/ethrpc/service/mocks/mock_store.go b/pkg/ethrpc/service/mocks/mock_store.go index 93d63da8..206d7969 100644 --- a/pkg/ethrpc/service/mocks/mock_store.go +++ b/pkg/ethrpc/service/mocks/mock_store.go @@ -22,101 +22,6 @@ func (_m *Store) EXPECT() *Store_Expecter { return &Store_Expecter{mock: &_m.Mock} } -// CompleteMempoolEntry provides a mock function with given fields: ctx, txHash -func (_m *Store) CompleteMempoolEntry(ctx context.Context, txHash []byte) error { - ret := _m.Called(ctx, txHash) - - if len(ret) == 0 { - panic("no return value specified for CompleteMempoolEntry") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []byte) error); ok { - r0 = rf(ctx, txHash) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// Store_CompleteMempoolEntry_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CompleteMempoolEntry' -type Store_CompleteMempoolEntry_Call struct { - *mock.Call -} - -// CompleteMempoolEntry is a helper method to define mock.On call -// - ctx context.Context -// - txHash []byte -func (_e *Store_Expecter) CompleteMempoolEntry(ctx interface{}, txHash interface{}) *Store_CompleteMempoolEntry_Call { - return &Store_CompleteMempoolEntry_Call{Call: _e.mock.On("CompleteMempoolEntry", ctx, txHash)} -} - -func (_c *Store_CompleteMempoolEntry_Call) Run(run func(ctx context.Context, txHash []byte)) *Store_CompleteMempoolEntry_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]byte)) - }) - return _c -} - -func (_c *Store_CompleteMempoolEntry_Call) Return(_a0 error) *Store_CompleteMempoolEntry_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *Store_CompleteMempoolEntry_Call) RunAndReturn(run func(context.Context, []byte) error) *Store_CompleteMempoolEntry_Call { - _c.Call.Return(run) - return _c -} - -// FailMempoolEntry provides a mock function with given fields: ctx, txHash, errMsg -func (_m *Store) FailMempoolEntry(ctx context.Context, txHash []byte, errMsg string) error { - ret := _m.Called(ctx, txHash, errMsg) - - if len(ret) == 0 { - panic("no return value specified for FailMempoolEntry") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []byte, string) error); ok { - r0 = rf(ctx, txHash, errMsg) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// Store_FailMempoolEntry_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FailMempoolEntry' -type Store_FailMempoolEntry_Call struct { - *mock.Call -} - -// FailMempoolEntry is a helper method to define mock.On call -// - ctx context.Context -// - txHash []byte -// - errMsg string -func (_e *Store_Expecter) FailMempoolEntry(ctx interface{}, txHash interface{}, errMsg interface{}) *Store_FailMempoolEntry_Call { - return &Store_FailMempoolEntry_Call{Call: _e.mock.On("FailMempoolEntry", ctx, txHash, errMsg)} -} - -func (_c *Store_FailMempoolEntry_Call) Run(run func(ctx context.Context, txHash []byte, errMsg string)) *Store_FailMempoolEntry_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]byte), args[2].(string)) - }) - return _c -} - -func (_c *Store_FailMempoolEntry_Call) Return(_a0 error) *Store_FailMempoolEntry_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *Store_FailMempoolEntry_Call) RunAndReturn(run func(context.Context, []byte, string) error) *Store_FailMempoolEntry_Call { - _c.Call.Return(run) - return _c -} - // GetBlockNumberByHash provides a mock function with given fields: ctx, blockHash func (_m *Store) GetBlockNumberByHash(ctx context.Context, blockHash []byte) (uint64, error) { ret := _m.Called(ctx, blockHash) diff --git a/pkg/ethrpc/service/service.go b/pkg/ethrpc/service/service.go index 0be7fd93..3924f90f 100644 --- a/pkg/ethrpc/service/service.go +++ b/pkg/ethrpc/service/service.go @@ -31,10 +31,11 @@ type Store interface { GetEvmLogs(ctx context.Context, address []byte, topic0 []byte, fromBlock, toBlock uint64) ([]*ethrpc.EvmLog, error) GetBlockNumberByHash(ctx context.Context, blockHash []byte) (uint64, error) - // Mempool intent log — written by SendRawTransaction, consumed by the miner. + // InsertMempoolEntry records a new transfer intent with status=pending. + // SendRawTransaction returns immediately after this insert; the submitter + // worker drives pending → completed/failed asynchronously, and the miner + // then seals the entry into a synthetic EVM block. InsertMempoolEntry(ctx context.Context, entry *ethrpc.MempoolEntry) error - CompleteMempoolEntry(ctx context.Context, txHash []byte) error - FailMempoolEntry(ctx context.Context, txHash []byte, errMsg string) error } // TokenService is the narrow token-service interface consumed by the EthRPC service. @@ -185,12 +186,21 @@ func (*ethService) Syncing(_ context.Context) bool { return false } +// SendRawTransaction validates an EVM-encoded ERC-20 transfer, records the +// intent in the mempool as pending, and returns the tx hash immediately. The +// Canton transfer is executed asynchronously by the submitter worker, which +// transitions the entry to completed or failed. The miner then seals the entry +// into a synthetic EVM block, at which point eth_getTransactionReceipt yields a +// terminal status=0x1 (success) or status=0x0 (failure) receipt. +// +// This matches standard Ethereum semantics where the node accepts a signed +// transaction synchronously and clients poll the receipt — avoiding 5-15s+ HTTP +// stalls under Canton load that previously broke MetaMask compatibility. func (s *ethService) SendRawTransaction(ctx context.Context, data hexutil.Bytes) (common.Hash, error) { var tx types.Transaction if err := tx.UnmarshalBinary(data); err != nil { return common.Hash{}, apperr.BadRequestError(err, "invalid transaction encoding") } - tx.To() signer := types.LatestSignerForChainID(s.chainID) from, err := types.Sender(signer, &tx) @@ -215,14 +225,13 @@ func (s *ethService) SendRawTransaction(ctx context.Context, data hexutil.Bytes) return common.Hash{}, apperr.BadRequestError(err, "invalid transaction data") } - erc20, err := s.tokenService.ERC20(*contractAddress) - if err != nil { + // Reject unsupported contracts synchronously so wallets surface the error + // before they ever start polling for a receipt that will never arrive. + if _, err = s.tokenService.ERC20(*contractAddress); err != nil { return common.Hash{}, apperr.BadRequestError(err, fmt.Sprintf("contract not supported: %s", contractAddress.Hex())) } txHash := tx.Hash() - - // Record intent in mempool before executing the Canton transfer. entry := ðrpc.MempoolEntry{ TxHash: txHash.Bytes(), FromAddress: from.Hex(), @@ -235,25 +244,6 @@ func (s *ethService) SendRawTransaction(ctx context.Context, data hexutil.Bytes) if err = s.store.InsertMempoolEntry(ctx, entry); err != nil { return common.Hash{}, apperr.DependencyError(err, "insert mempool entry") } - - // Execute the Canton transfer. - transferErr := erc20.TransferFrom(ctx, txHash.Hex(), from, toAddr, *amount) - - // Update mempool status regardless of transfer outcome. - if transferErr != nil { - if updateErr := s.store.FailMempoolEntry(ctx, txHash.Bytes(), transferErr.Error()); updateErr != nil { - // Non-fatal: the miner's reconciler will handle stuck pending entries. - _ = updateErr - } - // Pass categorized errors through so callers receive the correct - // JSON-RPC error code (e.g. -32602 for "sender not found"). - return common.Hash{}, transferErr - } - if updateErr := s.store.CompleteMempoolEntry(ctx, txHash.Bytes()); updateErr != nil { - // Non-fatal: the miner's reconciler will handle stuck pending entries. - _ = updateErr - } - return txHash, nil } @@ -338,6 +328,9 @@ func (s *ethService) GetTransactionReceipt(ctx context.Context, hash common.Hash Status: hexutil.Uint64(row.Status), EffectiveGasPrice: hexutil.Uint64(defaultGasPriceWeiUint64), Type: hexutil.Uint64(2), + // RevertReason is omitted when empty (omitempty), so successful + // receipts keep the standard JSON shape. + RevertReason: row.ErrorMessage, }, nil } diff --git a/pkg/ethrpc/service/service_test.go b/pkg/ethrpc/service/service_test.go index 943735ce..673095f9 100644 --- a/pkg/ethrpc/service/service_test.go +++ b/pkg/ethrpc/service/service_test.go @@ -231,20 +231,18 @@ func TestService_SendRawTransaction(t *testing.T) { recipient := common.HexToAddress("0x3000000000000000000000000000000000000003") amount := big.NewInt(42_000_000_000_000_000) - t.Run("success inserts mempool entry and returns hash", func(t *testing.T) { + t.Run("returns hash immediately after inserting pending mempool entry", func(t *testing.T) { payload, expectedHash := buildSignedTransferTx(t, chainID, tokenAddr, recipient, amount) - mockERC20 := mocks.NewERC20(t) - mockERC20.EXPECT(). - TransferFrom(mock.Anything, mock.Anything, mock.Anything, recipient, mock.Anything). - Return(nil) - store := mocks.NewStore(t) - store.EXPECT().InsertMempoolEntry(mock.Anything, mock.Anything).Return(nil) - store.EXPECT().CompleteMempoolEntry(mock.Anything, mock.Anything).Return(nil) + store.EXPECT().InsertMempoolEntry(mock.Anything, mock.MatchedBy(func(entry *ethrpc.MempoolEntry) bool { + return entry != nil && entry.RecipientAddress == recipient.Hex() && entry.ContractAddress == tokenAddr.Hex() + })).Return(nil) + // ERC20() is called only as a whitelist check; the actual TransferFrom + // happens asynchronously in the submitter, not here. mockTokenSvc := mocks.NewTokenService(t) - mockTokenSvc.EXPECT().ERC20(tokenAddr).Return(mockERC20, nil) + mockTokenSvc.EXPECT().ERC20(tokenAddr).Return(mocks.NewERC20(t), nil) svc := newSvc(t, defaultCfg(), store, mockTokenSvc) got, err := svc.SendRawTransaction(context.Background(), hexutil.Bytes(payload)) @@ -252,53 +250,57 @@ func TestService_SendRawTransaction(t *testing.T) { assert.Equal(t, expectedHash, got) }) - t.Run("unsupported contract returns BadRequestError", func(t *testing.T) { + t.Run("unsupported contract returns BadRequestError without touching mempool", func(t *testing.T) { unsupportedAddr := common.HexToAddress("0x9999999999999999999999999999999999999999") payload, _ := buildSignedTransferTx(t, chainID, unsupportedAddr, recipient, amount) mockTokenSvc := mocks.NewTokenService(t) mockTokenSvc.EXPECT().ERC20(unsupportedAddr).Return(nil, errors.New("token not supported")) - svc := newSvc(t, defaultCfg(), nil, mockTokenSvc) + // Store mock with no expectations — must not be called when whitelist rejects. + store := mocks.NewStore(t) + + svc := newSvc(t, defaultCfg(), store, mockTokenSvc) _, err := svc.SendRawTransaction(context.Background(), hexutil.Bytes(payload)) require.Error(t, err) assert.True(t, apperr.Is(err, apperr.CategoryDataError)) + store.AssertNotCalled(t, "InsertMempoolEntry", mock.Anything, mock.Anything) }) - t.Run("TransferFrom categorized error updates mempool and is passed through", func(t *testing.T) { + t.Run("InsertMempoolEntry error propagates as DependencyFailure", func(t *testing.T) { payload, _ := buildSignedTransferTx(t, chainID, tokenAddr, recipient, amount) - mockERC20 := mocks.NewERC20(t) - mockERC20.EXPECT(). - TransferFrom(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). - Return(apperr.BadRequestError(errors.New("user not found"), "failed to get sender")) + mockTokenSvc := mocks.NewTokenService(t) + mockTokenSvc.EXPECT().ERC20(tokenAddr).Return(mocks.NewERC20(t), nil) store := mocks.NewStore(t) - store.EXPECT().InsertMempoolEntry(mock.Anything, mock.Anything).Return(nil) - store.EXPECT().FailMempoolEntry(mock.Anything, mock.Anything, mock.AnythingOfType("string")).Return(nil) - - mockTokenSvc := mocks.NewTokenService(t) - mockTokenSvc.EXPECT().ERC20(tokenAddr).Return(mockERC20, nil) + store.EXPECT().InsertMempoolEntry(mock.Anything, mock.Anything).Return(errors.New("db error")) svc := newSvc(t, defaultCfg(), store, mockTokenSvc) _, err := svc.SendRawTransaction(context.Background(), hexutil.Bytes(payload)) require.Error(t, err) - assert.True(t, apperr.Is(err, apperr.CategoryDataError)) + assert.True(t, apperr.Is(err, apperr.CategoryDependencyFailure)) }) - t.Run("InsertMempoolEntry error propagates", func(t *testing.T) { + t.Run("does not call Canton TransferFrom synchronously", func(t *testing.T) { + // Regression guard: the whole point of the async path is that + // SendRawTransaction must never wait on Canton. If ERC20.TransferFrom + // ever gets wired back in here, this test will fail because the mock + // is left bare with no expectations. payload, _ := buildSignedTransferTx(t, chainID, tokenAddr, recipient, amount) + mockERC20 := mocks.NewERC20(t) + // No TransferFrom expectation — calling it would fail the mock. + mockTokenSvc := mocks.NewTokenService(t) - mockTokenSvc.EXPECT().ERC20(tokenAddr).Return(mocks.NewERC20(t), nil) + mockTokenSvc.EXPECT().ERC20(tokenAddr).Return(mockERC20, nil) store := mocks.NewStore(t) - store.EXPECT().InsertMempoolEntry(mock.Anything, mock.Anything).Return(errors.New("db error")) + store.EXPECT().InsertMempoolEntry(mock.Anything, mock.Anything).Return(nil) svc := newSvc(t, defaultCfg(), store, mockTokenSvc) _, err := svc.SendRawTransaction(context.Background(), hexutil.Bytes(payload)) - require.Error(t, err) - assert.True(t, apperr.Is(err, apperr.CategoryDependencyFailure)) + require.NoError(t, err) }) } @@ -350,6 +352,36 @@ func TestService_GetTransactionReceipt(t *testing.T) { assert.Nil(t, got) }) + t.Run("failed mined entry returns status=0 receipt with revert reason", func(t *testing.T) { + // After the async refactor, failed mempool entries get mined as status=0 + // EVM transactions with ErrorMessage set. The receipt must surface both + // the failure status and the human-readable cause so wallets can show + // it to the user instead of polling forever. + failedRow := ðrpc.EvmTransaction{ + TxHash: txHash.Bytes(), + FromAddress: from.Hex(), + ToAddress: to.Hex(), + Nonce: 4, + Status: 0, + BlockNumber: 43, + BlockHash: blockHashBytes, + TxIndex: 0, + GasUsed: 21000, + ErrorMessage: "canton transfer failed: insufficient balance", + } + + store := mocks.NewStore(t) + store.EXPECT().GetEvmTransaction(mock.Anything, txHash.Bytes()).Return(failedRow, nil) + store.EXPECT().GetEvmLogsByTxHash(mock.Anything, txHash.Bytes()).Return(nil, nil) + svc := newSvc(t, defaultCfg(), store, nil) + + got, err := svc.GetTransactionReceipt(context.Background(), txHash) + require.NoError(t, err) + require.NotNil(t, got) + assert.Equal(t, hexutil.Uint64(0), got.Status) + assert.Equal(t, failedRow.ErrorMessage, got.RevertReason) + }) + t.Run("store error propagates", func(t *testing.T) { store := mocks.NewStore(t) store.EXPECT().GetEvmTransaction(mock.Anything, txHash.Bytes()).Return(nil, errors.New("db error")) diff --git a/pkg/ethrpc/store/pg.go b/pkg/ethrpc/store/pg.go index 3d0a5d89..fbe2d5d7 100644 --- a/pkg/ethrpc/store/pg.go +++ b/pkg/ethrpc/store/pg.go @@ -5,7 +5,6 @@ import ( "database/sql" "errors" "fmt" - "sort" "github.com/chainsafe/canton-middleware/pkg/ethereum" "github.com/chainsafe/canton-middleware/pkg/ethrpc" @@ -72,37 +71,55 @@ type pendingBlock struct { func (b *pendingBlock) Number() uint64 { return b.blockNumber } func (b *pendingBlock) Hash() []byte { return b.blockHash } -// ClaimMempoolEntries atomically marks up to maxTxsPerBlock completed mempool -// entries as mined and returns them, using a single UPDATE … RETURNING within -// the block's transaction. The limit caps block size to prevent excessively -// large blocks after traffic spikes or Canton downtime. +// ClaimMempoolEntries atomically claims up to maxTxsPerBlock terminal mempool +// entries (status = completed or failed) and flips them to mined within the +// block's transaction. The limit caps block size to prevent excessively large +// blocks after traffic spikes or Canton downtime. +// +// The claim is performed as SELECT … FOR UPDATE followed by an UPDATE by ID so +// the returned entries retain their *pre-mined* status (completed vs failed). +// The miner relies on that distinction to synthesize EVM tx status=1 (success) +// vs status=0 (failure) and to skip the Transfer event log for failures. +// // Because the transaction already holds the evm_state row lock, concurrent // miners are serialized: by the time a second miner acquires the lock, the // first miner's commit has already flipped these rows to mined. func (b *pendingBlock) ClaimMempoolEntries(ctx context.Context, maxTxsPerBlock int) ([]ethrpc.MempoolEntry, error) { var daos []MempoolEntryDao - if _, err := b.tx.NewUpdate(). - Model((*MempoolEntryDao)(nil)). - Set("status = ?", string(ethrpc.MempoolMined)). - Set("updated_at = current_timestamp"). - Where("id IN (SELECT id FROM mempool WHERE status = ? ORDER BY id LIMIT ?)", - string(ethrpc.MempoolCompleted), maxTxsPerBlock). - Returning("*"). - Exec(ctx, &daos); err != nil { - return nil, fmt.Errorf("claim mempool entries: %w", err) + if err := b.tx.NewSelect(). + Model(&daos). + Where("status IN (?)", bun.List([]string{ + string(ethrpc.MempoolCompleted), + string(ethrpc.MempoolFailed), + })). + OrderExpr("id ASC"). + Limit(maxTxsPerBlock). + For("UPDATE"). + Scan(ctx); err != nil { + return nil, fmt.Errorf("select claimable mempool entries: %w", err) } if len(daos) == 0 { return nil, nil } - // RETURNING does not guarantee row order; sort by insertion ID so the - // miner assigns deterministic, monotonic txIndex values. - sort.Slice(daos, func(i, j int) bool { return daos[i].ID < daos[j].ID }) + ids := make([]int64, len(daos)) + for i := range daos { + ids[i] = daos[i].ID + } + if _, err := b.tx.NewUpdate(). + Model((*MempoolEntryDao)(nil)). + Set("status = ?", string(ethrpc.MempoolMined)). + Set("updated_at = current_timestamp"). + Where("id IN (?)", bun.List(ids)). + Exec(ctx); err != nil { + return nil, fmt.Errorf("mark mempool entries mined: %w", err) + } entries := make([]ethrpc.MempoolEntry, 0, len(daos)) for i := range daos { dao := &daos[i] - entries = append(entries, ethrpc.MempoolEntry{ + entry := ethrpc.MempoolEntry{ + ID: dao.ID, TxHash: dao.TxHash, FromAddress: dao.FromAddress, ContractAddress: dao.ContractAddress, @@ -110,8 +127,12 @@ func (b *pendingBlock) ClaimMempoolEntries(ctx context.Context, maxTxsPerBlock i Nonce: dao.Nonce, Input: dao.Input, AmountData: dao.AmountData, - Status: ethrpc.MempoolMined, - }) + Status: ethrpc.MempoolStatus(dao.Status), + } + if dao.ErrorMessage != nil { + entry.ErrorMessage = *dao.ErrorMessage + } + entries = append(entries, entry) } return entries, nil } @@ -329,23 +350,28 @@ func (s *PGStore) FailMempoolEntry(ctx context.Context, txHash []byte, errMsg st return nil } -// GetMempoolEntriesByStatus returns all mempool entries with the given status, -// ordered by insertion ID. -func (s *PGStore) GetMempoolEntriesByStatus(ctx context.Context, status ethrpc.MempoolStatus) ([]ethrpc.MempoolEntry, error) { +// GetMempoolEntriesByStatus returns mempool entries with the given status, +// ordered by insertion ID. limit caps how many rows are returned (limit <= 0 +// means no limit). The submitter passes its batch size so a backlog after +// Canton downtime never loads the entire pending queue into memory. +func (s *PGStore) GetMempoolEntriesByStatus(ctx context.Context, status ethrpc.MempoolStatus, limit int) ([]ethrpc.MempoolEntry, error) { var daos []MempoolEntryDao - err := s.db.NewSelect(). + query := s.db.NewSelect(). Model(&daos). Where("status = ?", string(status)). - OrderExpr("id ASC"). - Scan(ctx) - if err != nil { + OrderExpr("id ASC") + if limit > 0 { + query = query.Limit(limit) + } + if err := query.Scan(ctx); err != nil { return nil, fmt.Errorf("get mempool entries by status %q: %w", status, err) } entries := make([]ethrpc.MempoolEntry, 0, len(daos)) for i := range daos { dao := &daos[i] - entries = append(entries, ethrpc.MempoolEntry{ + entry := ethrpc.MempoolEntry{ + ID: dao.ID, TxHash: dao.TxHash, FromAddress: dao.FromAddress, ContractAddress: dao.ContractAddress, @@ -354,7 +380,11 @@ func (s *PGStore) GetMempoolEntriesByStatus(ctx context.Context, status ethrpc.M Input: dao.Input, AmountData: dao.AmountData, Status: status, - }) + } + if dao.ErrorMessage != nil { + entry.ErrorMessage = *dao.ErrorMessage + } + entries = append(entries, entry) } return entries, nil } diff --git a/pkg/ethrpc/store/pg_test.go b/pkg/ethrpc/store/pg_test.go index 11bdf3e9..cbad1aa8 100644 --- a/pkg/ethrpc/store/pg_test.go +++ b/pkg/ethrpc/store/pg_test.go @@ -517,7 +517,7 @@ func TestPGStore_Mempool(t *testing.T) { } // All three start as pending. - pending, err := store.GetMempoolEntriesByStatus(ctx, ethrpc.MempoolPending) + pending, err := store.GetMempoolEntriesByStatus(ctx, ethrpc.MempoolPending, 0) if err != nil { t.Fatalf("GetMempoolEntriesByStatus(pending) failed: %v", err) } @@ -533,7 +533,7 @@ func TestPGStore_Mempool(t *testing.T) { t.Fatalf("FailMempoolEntry(entry3) failed: %v", err) } - pending, err = store.GetMempoolEntriesByStatus(ctx, ethrpc.MempoolPending) + pending, err = store.GetMempoolEntriesByStatus(ctx, ethrpc.MempoolPending, 0) if err != nil { t.Fatalf("GetMempoolEntriesByStatus(pending after updates) failed: %v", err) } @@ -541,7 +541,7 @@ func TestPGStore_Mempool(t *testing.T) { t.Fatalf("expected only entry2 pending, got %d entries", len(pending)) } - failed, err := store.GetMempoolEntriesByStatus(ctx, ethrpc.MempoolFailed) + failed, err := store.GetMempoolEntriesByStatus(ctx, ethrpc.MempoolFailed, 0) if err != nil { t.Fatalf("GetMempoolEntriesByStatus(failed) failed: %v", err) } @@ -549,8 +549,10 @@ func TestPGStore_Mempool(t *testing.T) { t.Fatalf("expected only entry3 failed, got %d entries", len(failed)) } - // ClaimMempoolEntries: entry1 is completed; claiming it must atomically mark it mined - // and return it. The block transaction commits via Finalize. + // ClaimMempoolEntries: entry1 is completed and entry3 is failed; both must be + // claimed in a single block so failures land in the EVM tx table alongside + // successes (status=0 vs status=1 receipts). The returned entries must + // retain their pre-mined status so the miner can branch on it. block, err := store.NewBlock(ctx, testChainID) if err != nil { t.Fatalf("NewBlock for ClaimMempoolEntries failed: %v", err) @@ -559,19 +561,29 @@ func TestPGStore_Mempool(t *testing.T) { if err != nil { t.Fatalf("ClaimMempoolEntries failed: %v", err) } - if len(claimed) != 1 || !bytes.Equal(claimed[0].TxHash, entry1.TxHash) { - t.Fatalf("ClaimMempoolEntries: expected entry1, got %d entries", len(claimed)) + if len(claimed) != 2 { + t.Fatalf("ClaimMempoolEntries: expected 2 entries (entry1 completed + entry3 failed), got %d", len(claimed)) + } + claimedByHash := map[string]*ethrpc.MempoolEntry{} + for i := range claimed { + claimedByHash[string(claimed[i].TxHash)] = &claimed[i] + } + if e := claimedByHash[string(entry1.TxHash)]; e == nil || e.Status != ethrpc.MempoolCompleted { + t.Fatalf("entry1 must be returned with status=completed, got %+v", e) + } + if e := claimedByHash[string(entry3.TxHash)]; e == nil || e.Status != ethrpc.MempoolFailed || e.ErrorMessage == "" { + t.Fatalf("entry3 must be returned with status=failed and error message, got %+v", e) } if err = block.Finalize(ctx); err != nil { t.Fatalf("Finalize after ClaimMempoolEntries failed: %v", err) } - mined, err := store.GetMempoolEntriesByStatus(ctx, ethrpc.MempoolMined) + mined, err := store.GetMempoolEntriesByStatus(ctx, ethrpc.MempoolMined, 0) if err != nil { t.Fatalf("GetMempoolEntriesByStatus(mined) failed: %v", err) } - if len(mined) != 1 || !bytes.Equal(mined[0].TxHash, entry1.TxHash) { - t.Fatalf("expected entry1 mined after Finalize, got %d mined entries", len(mined)) + if len(mined) != 2 { + t.Fatalf("expected 2 mined entries after Finalize, got %d", len(mined)) } // ClaimMempoolEntries inside an aborted block must NOT persist the status change. @@ -590,7 +602,7 @@ func TestPGStore_Mempool(t *testing.T) { } // entry2 must still be completed — the claim was rolled back with the block. - stillCompleted, err := store.GetMempoolEntriesByStatus(ctx, ethrpc.MempoolCompleted) + stillCompleted, err := store.GetMempoolEntriesByStatus(ctx, ethrpc.MempoolCompleted, 0) if err != nil { t.Fatalf("GetMempoolEntriesByStatus(completed after abort) failed: %v", err) } diff --git a/pkg/ethrpc/submitter/mocks/mock_erc20.go b/pkg/ethrpc/submitter/mocks/mock_erc20.go new file mode 100644 index 00000000..104ddb67 --- /dev/null +++ b/pkg/ethrpc/submitter/mocks/mock_erc20.go @@ -0,0 +1,416 @@ +// Code generated by mockery v2.53.6. DO NOT EDIT. + +package mocks + +import ( + context "context" + big "math/big" + + common "github.com/ethereum/go-ethereum/common" + + mock "github.com/stretchr/testify/mock" +) + +// ERC20 is an autogenerated mock type for the ERC20 type +type ERC20 struct { + mock.Mock +} + +type ERC20_Expecter struct { + mock *mock.Mock +} + +func (_m *ERC20) EXPECT() *ERC20_Expecter { + return &ERC20_Expecter{mock: &_m.Mock} +} + +// Allowance provides a mock function with given fields: ctx, owner, spender +func (_m *ERC20) Allowance(ctx context.Context, owner common.Address, spender common.Address) big.Int { + ret := _m.Called(ctx, owner, spender) + + if len(ret) == 0 { + panic("no return value specified for Allowance") + } + + var r0 big.Int + if rf, ok := ret.Get(0).(func(context.Context, common.Address, common.Address) big.Int); ok { + r0 = rf(ctx, owner, spender) + } else { + r0 = ret.Get(0).(big.Int) + } + + return r0 +} + +// ERC20_Allowance_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Allowance' +type ERC20_Allowance_Call struct { + *mock.Call +} + +// Allowance is a helper method to define mock.On call +// - ctx context.Context +// - owner common.Address +// - spender common.Address +func (_e *ERC20_Expecter) Allowance(ctx interface{}, owner interface{}, spender interface{}) *ERC20_Allowance_Call { + return &ERC20_Allowance_Call{Call: _e.mock.On("Allowance", ctx, owner, spender)} +} + +func (_c *ERC20_Allowance_Call) Run(run func(ctx context.Context, owner common.Address, spender common.Address)) *ERC20_Allowance_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.Address), args[2].(common.Address)) + }) + return _c +} + +func (_c *ERC20_Allowance_Call) Return(_a0 big.Int) *ERC20_Allowance_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *ERC20_Allowance_Call) RunAndReturn(run func(context.Context, common.Address, common.Address) big.Int) *ERC20_Allowance_Call { + _c.Call.Return(run) + return _c +} + +// Approve provides a mock function with given fields: ctx, spender, amount +func (_m *ERC20) Approve(ctx context.Context, spender common.Address, amount big.Int) error { + ret := _m.Called(ctx, spender, amount) + + if len(ret) == 0 { + panic("no return value specified for Approve") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, common.Address, big.Int) error); ok { + r0 = rf(ctx, spender, amount) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// ERC20_Approve_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Approve' +type ERC20_Approve_Call struct { + *mock.Call +} + +// Approve is a helper method to define mock.On call +// - ctx context.Context +// - spender common.Address +// - amount big.Int +func (_e *ERC20_Expecter) Approve(ctx interface{}, spender interface{}, amount interface{}) *ERC20_Approve_Call { + return &ERC20_Approve_Call{Call: _e.mock.On("Approve", ctx, spender, amount)} +} + +func (_c *ERC20_Approve_Call) Run(run func(ctx context.Context, spender common.Address, amount big.Int)) *ERC20_Approve_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.Address), args[2].(big.Int)) + }) + return _c +} + +func (_c *ERC20_Approve_Call) Return(_a0 error) *ERC20_Approve_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *ERC20_Approve_Call) RunAndReturn(run func(context.Context, common.Address, big.Int) error) *ERC20_Approve_Call { + _c.Call.Return(run) + return _c +} + +// BalanceOf provides a mock function with given fields: ctx, address +func (_m *ERC20) BalanceOf(ctx context.Context, address common.Address) big.Int { + ret := _m.Called(ctx, address) + + if len(ret) == 0 { + panic("no return value specified for BalanceOf") + } + + var r0 big.Int + if rf, ok := ret.Get(0).(func(context.Context, common.Address) big.Int); ok { + r0 = rf(ctx, address) + } else { + r0 = ret.Get(0).(big.Int) + } + + return r0 +} + +// ERC20_BalanceOf_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BalanceOf' +type ERC20_BalanceOf_Call struct { + *mock.Call +} + +// BalanceOf is a helper method to define mock.On call +// - ctx context.Context +// - address common.Address +func (_e *ERC20_Expecter) BalanceOf(ctx interface{}, address interface{}) *ERC20_BalanceOf_Call { + return &ERC20_BalanceOf_Call{Call: _e.mock.On("BalanceOf", ctx, address)} +} + +func (_c *ERC20_BalanceOf_Call) Run(run func(ctx context.Context, address common.Address)) *ERC20_BalanceOf_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.Address)) + }) + return _c +} + +func (_c *ERC20_BalanceOf_Call) Return(_a0 big.Int) *ERC20_BalanceOf_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *ERC20_BalanceOf_Call) RunAndReturn(run func(context.Context, common.Address) big.Int) *ERC20_BalanceOf_Call { + _c.Call.Return(run) + return _c +} + +// Decimals provides a mock function with given fields: ctx +func (_m *ERC20) Decimals(ctx context.Context) uint8 { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for Decimals") + } + + var r0 uint8 + if rf, ok := ret.Get(0).(func(context.Context) uint8); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(uint8) + } + + return r0 +} + +// ERC20_Decimals_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Decimals' +type ERC20_Decimals_Call struct { + *mock.Call +} + +// Decimals is a helper method to define mock.On call +// - ctx context.Context +func (_e *ERC20_Expecter) Decimals(ctx interface{}) *ERC20_Decimals_Call { + return &ERC20_Decimals_Call{Call: _e.mock.On("Decimals", ctx)} +} + +func (_c *ERC20_Decimals_Call) Run(run func(ctx context.Context)) *ERC20_Decimals_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *ERC20_Decimals_Call) Return(_a0 uint8) *ERC20_Decimals_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *ERC20_Decimals_Call) RunAndReturn(run func(context.Context) uint8) *ERC20_Decimals_Call { + _c.Call.Return(run) + return _c +} + +// Name provides a mock function with given fields: ctx +func (_m *ERC20) Name(ctx context.Context) string { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for Name") + } + + var r0 string + if rf, ok := ret.Get(0).(func(context.Context) string); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// ERC20_Name_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Name' +type ERC20_Name_Call struct { + *mock.Call +} + +// Name is a helper method to define mock.On call +// - ctx context.Context +func (_e *ERC20_Expecter) Name(ctx interface{}) *ERC20_Name_Call { + return &ERC20_Name_Call{Call: _e.mock.On("Name", ctx)} +} + +func (_c *ERC20_Name_Call) Run(run func(ctx context.Context)) *ERC20_Name_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *ERC20_Name_Call) Return(_a0 string) *ERC20_Name_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *ERC20_Name_Call) RunAndReturn(run func(context.Context) string) *ERC20_Name_Call { + _c.Call.Return(run) + return _c +} + +// Symbol provides a mock function with given fields: ctx +func (_m *ERC20) Symbol(ctx context.Context) string { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for Symbol") + } + + var r0 string + if rf, ok := ret.Get(0).(func(context.Context) string); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// ERC20_Symbol_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Symbol' +type ERC20_Symbol_Call struct { + *mock.Call +} + +// Symbol is a helper method to define mock.On call +// - ctx context.Context +func (_e *ERC20_Expecter) Symbol(ctx interface{}) *ERC20_Symbol_Call { + return &ERC20_Symbol_Call{Call: _e.mock.On("Symbol", ctx)} +} + +func (_c *ERC20_Symbol_Call) Run(run func(ctx context.Context)) *ERC20_Symbol_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *ERC20_Symbol_Call) Return(_a0 string) *ERC20_Symbol_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *ERC20_Symbol_Call) RunAndReturn(run func(context.Context) string) *ERC20_Symbol_Call { + _c.Call.Return(run) + return _c +} + +// TotalSupply provides a mock function with given fields: ctx +func (_m *ERC20) TotalSupply(ctx context.Context) big.Int { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for TotalSupply") + } + + var r0 big.Int + if rf, ok := ret.Get(0).(func(context.Context) big.Int); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(big.Int) + } + + return r0 +} + +// ERC20_TotalSupply_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TotalSupply' +type ERC20_TotalSupply_Call struct { + *mock.Call +} + +// TotalSupply is a helper method to define mock.On call +// - ctx context.Context +func (_e *ERC20_Expecter) TotalSupply(ctx interface{}) *ERC20_TotalSupply_Call { + return &ERC20_TotalSupply_Call{Call: _e.mock.On("TotalSupply", ctx)} +} + +func (_c *ERC20_TotalSupply_Call) Run(run func(ctx context.Context)) *ERC20_TotalSupply_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *ERC20_TotalSupply_Call) Return(_a0 big.Int) *ERC20_TotalSupply_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *ERC20_TotalSupply_Call) RunAndReturn(run func(context.Context) big.Int) *ERC20_TotalSupply_Call { + _c.Call.Return(run) + return _c +} + +// TransferFrom provides a mock function with given fields: ctx, idempotencyKey, from, to, amount +func (_m *ERC20) TransferFrom(ctx context.Context, idempotencyKey string, from common.Address, to common.Address, amount big.Int) error { + ret := _m.Called(ctx, idempotencyKey, from, to, amount) + + if len(ret) == 0 { + panic("no return value specified for TransferFrom") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, common.Address, common.Address, big.Int) error); ok { + r0 = rf(ctx, idempotencyKey, from, to, amount) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// ERC20_TransferFrom_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TransferFrom' +type ERC20_TransferFrom_Call struct { + *mock.Call +} + +// TransferFrom is a helper method to define mock.On call +// - ctx context.Context +// - idempotencyKey string +// - from common.Address +// - to common.Address +// - amount big.Int +func (_e *ERC20_Expecter) TransferFrom(ctx interface{}, idempotencyKey interface{}, from interface{}, to interface{}, amount interface{}) *ERC20_TransferFrom_Call { + return &ERC20_TransferFrom_Call{Call: _e.mock.On("TransferFrom", ctx, idempotencyKey, from, to, amount)} +} + +func (_c *ERC20_TransferFrom_Call) Run(run func(ctx context.Context, idempotencyKey string, from common.Address, to common.Address, amount big.Int)) *ERC20_TransferFrom_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(common.Address), args[3].(common.Address), args[4].(big.Int)) + }) + return _c +} + +func (_c *ERC20_TransferFrom_Call) Return(_a0 error) *ERC20_TransferFrom_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *ERC20_TransferFrom_Call) RunAndReturn(run func(context.Context, string, common.Address, common.Address, big.Int) error) *ERC20_TransferFrom_Call { + _c.Call.Return(run) + return _c +} + +// NewERC20 creates a new instance of ERC20. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewERC20(t interface { + mock.TestingT + Cleanup(func()) +}) *ERC20 { + mock := &ERC20{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/ethrpc/submitter/mocks/mock_store.go b/pkg/ethrpc/submitter/mocks/mock_store.go new file mode 100644 index 00000000..e0461db9 --- /dev/null +++ b/pkg/ethrpc/submitter/mocks/mock_store.go @@ -0,0 +1,192 @@ +// Code generated by mockery v2.53.6. DO NOT EDIT. + +package mocks + +import ( + context "context" + + ethrpc "github.com/chainsafe/canton-middleware/pkg/ethrpc" + mock "github.com/stretchr/testify/mock" +) + +// Store is an autogenerated mock type for the Store type +type Store struct { + mock.Mock +} + +type Store_Expecter struct { + mock *mock.Mock +} + +func (_m *Store) EXPECT() *Store_Expecter { + return &Store_Expecter{mock: &_m.Mock} +} + +// CompleteMempoolEntry provides a mock function with given fields: ctx, txHash +func (_m *Store) CompleteMempoolEntry(ctx context.Context, txHash []byte) error { + ret := _m.Called(ctx, txHash) + + if len(ret) == 0 { + panic("no return value specified for CompleteMempoolEntry") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []byte) error); ok { + r0 = rf(ctx, txHash) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Store_CompleteMempoolEntry_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CompleteMempoolEntry' +type Store_CompleteMempoolEntry_Call struct { + *mock.Call +} + +// CompleteMempoolEntry is a helper method to define mock.On call +// - ctx context.Context +// - txHash []byte +func (_e *Store_Expecter) CompleteMempoolEntry(ctx interface{}, txHash interface{}) *Store_CompleteMempoolEntry_Call { + return &Store_CompleteMempoolEntry_Call{Call: _e.mock.On("CompleteMempoolEntry", ctx, txHash)} +} + +func (_c *Store_CompleteMempoolEntry_Call) Run(run func(ctx context.Context, txHash []byte)) *Store_CompleteMempoolEntry_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]byte)) + }) + return _c +} + +func (_c *Store_CompleteMempoolEntry_Call) Return(_a0 error) *Store_CompleteMempoolEntry_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Store_CompleteMempoolEntry_Call) RunAndReturn(run func(context.Context, []byte) error) *Store_CompleteMempoolEntry_Call { + _c.Call.Return(run) + return _c +} + +// FailMempoolEntry provides a mock function with given fields: ctx, txHash, errMsg +func (_m *Store) FailMempoolEntry(ctx context.Context, txHash []byte, errMsg string) error { + ret := _m.Called(ctx, txHash, errMsg) + + if len(ret) == 0 { + panic("no return value specified for FailMempoolEntry") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []byte, string) error); ok { + r0 = rf(ctx, txHash, errMsg) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Store_FailMempoolEntry_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FailMempoolEntry' +type Store_FailMempoolEntry_Call struct { + *mock.Call +} + +// FailMempoolEntry is a helper method to define mock.On call +// - ctx context.Context +// - txHash []byte +// - errMsg string +func (_e *Store_Expecter) FailMempoolEntry(ctx interface{}, txHash interface{}, errMsg interface{}) *Store_FailMempoolEntry_Call { + return &Store_FailMempoolEntry_Call{Call: _e.mock.On("FailMempoolEntry", ctx, txHash, errMsg)} +} + +func (_c *Store_FailMempoolEntry_Call) Run(run func(ctx context.Context, txHash []byte, errMsg string)) *Store_FailMempoolEntry_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]byte), args[2].(string)) + }) + return _c +} + +func (_c *Store_FailMempoolEntry_Call) Return(_a0 error) *Store_FailMempoolEntry_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Store_FailMempoolEntry_Call) RunAndReturn(run func(context.Context, []byte, string) error) *Store_FailMempoolEntry_Call { + _c.Call.Return(run) + return _c +} + +// GetMempoolEntriesByStatus provides a mock function with given fields: ctx, status, limit +func (_m *Store) GetMempoolEntriesByStatus(ctx context.Context, status ethrpc.MempoolStatus, limit int) ([]ethrpc.MempoolEntry, error) { + ret := _m.Called(ctx, status, limit) + + if len(ret) == 0 { + panic("no return value specified for GetMempoolEntriesByStatus") + } + + var r0 []ethrpc.MempoolEntry + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, ethrpc.MempoolStatus, int) ([]ethrpc.MempoolEntry, error)); ok { + return rf(ctx, status, limit) + } + if rf, ok := ret.Get(0).(func(context.Context, ethrpc.MempoolStatus, int) []ethrpc.MempoolEntry); ok { + r0 = rf(ctx, status, limit) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]ethrpc.MempoolEntry) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, ethrpc.MempoolStatus, int) error); ok { + r1 = rf(ctx, status, limit) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Store_GetMempoolEntriesByStatus_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetMempoolEntriesByStatus' +type Store_GetMempoolEntriesByStatus_Call struct { + *mock.Call +} + +// GetMempoolEntriesByStatus is a helper method to define mock.On call +// - ctx context.Context +// - status ethrpc.MempoolStatus +// - limit int +func (_e *Store_Expecter) GetMempoolEntriesByStatus(ctx interface{}, status interface{}, limit interface{}) *Store_GetMempoolEntriesByStatus_Call { + return &Store_GetMempoolEntriesByStatus_Call{Call: _e.mock.On("GetMempoolEntriesByStatus", ctx, status, limit)} +} + +func (_c *Store_GetMempoolEntriesByStatus_Call) Run(run func(ctx context.Context, status ethrpc.MempoolStatus, limit int)) *Store_GetMempoolEntriesByStatus_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(ethrpc.MempoolStatus), args[2].(int)) + }) + return _c +} + +func (_c *Store_GetMempoolEntriesByStatus_Call) Return(_a0 []ethrpc.MempoolEntry, _a1 error) *Store_GetMempoolEntriesByStatus_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Store_GetMempoolEntriesByStatus_Call) RunAndReturn(run func(context.Context, ethrpc.MempoolStatus, int) ([]ethrpc.MempoolEntry, error)) *Store_GetMempoolEntriesByStatus_Call { + _c.Call.Return(run) + return _c +} + +// NewStore creates a new instance of Store. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewStore(t interface { + mock.TestingT + Cleanup(func()) +}) *Store { + mock := &Store{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/ethrpc/submitter/mocks/mock_token_service.go b/pkg/ethrpc/submitter/mocks/mock_token_service.go new file mode 100644 index 00000000..4100b269 --- /dev/null +++ b/pkg/ethrpc/submitter/mocks/mock_token_service.go @@ -0,0 +1,95 @@ +// Code generated by mockery v2.53.6. DO NOT EDIT. + +package mocks + +import ( + common "github.com/ethereum/go-ethereum/common" + mock "github.com/stretchr/testify/mock" + + token "github.com/chainsafe/canton-middleware/pkg/token" +) + +// TokenService is an autogenerated mock type for the TokenService type +type TokenService struct { + mock.Mock +} + +type TokenService_Expecter struct { + mock *mock.Mock +} + +func (_m *TokenService) EXPECT() *TokenService_Expecter { + return &TokenService_Expecter{mock: &_m.Mock} +} + +// ERC20 provides a mock function with given fields: address +func (_m *TokenService) ERC20(address common.Address) (token.ERC20, error) { + ret := _m.Called(address) + + if len(ret) == 0 { + panic("no return value specified for ERC20") + } + + var r0 token.ERC20 + var r1 error + if rf, ok := ret.Get(0).(func(common.Address) (token.ERC20, error)); ok { + return rf(address) + } + if rf, ok := ret.Get(0).(func(common.Address) token.ERC20); ok { + r0 = rf(address) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(token.ERC20) + } + } + + if rf, ok := ret.Get(1).(func(common.Address) error); ok { + r1 = rf(address) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// TokenService_ERC20_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ERC20' +type TokenService_ERC20_Call struct { + *mock.Call +} + +// ERC20 is a helper method to define mock.On call +// - address common.Address +func (_e *TokenService_Expecter) ERC20(address interface{}) *TokenService_ERC20_Call { + return &TokenService_ERC20_Call{Call: _e.mock.On("ERC20", address)} +} + +func (_c *TokenService_ERC20_Call) Run(run func(address common.Address)) *TokenService_ERC20_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(common.Address)) + }) + return _c +} + +func (_c *TokenService_ERC20_Call) Return(_a0 token.ERC20, _a1 error) *TokenService_ERC20_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *TokenService_ERC20_Call) RunAndReturn(run func(common.Address) (token.ERC20, error)) *TokenService_ERC20_Call { + _c.Call.Return(run) + return _c +} + +// NewTokenService creates a new instance of TokenService. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewTokenService(t interface { + mock.TestingT + Cleanup(func()) +}) *TokenService { + mock := &TokenService{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/ethrpc/submitter/submitter.go b/pkg/ethrpc/submitter/submitter.go new file mode 100644 index 00000000..b200907e --- /dev/null +++ b/pkg/ethrpc/submitter/submitter.go @@ -0,0 +1,251 @@ +// Package submitter drains pending mempool entries by submitting the +// corresponding ERC-20 transfer to Canton. It is the asynchronous counterpart +// to ethrpc.service.SendRawTransaction: that handler records intent and +// returns the tx hash immediately, and this worker transitions each entry to +// completed (Canton accepted the transfer) or failed (Canton rejected it). +// The miner then seals the terminal entry into a synthetic EVM block, so +// eth_getTransactionReceipt returns a status=0x1 / 0x0 receipt as usual. +package submitter + +import ( + "context" + "errors" + "fmt" + "math/big" + "sync" + "time" + + apperr "github.com/chainsafe/canton-middleware/pkg/app/errors" + "github.com/chainsafe/canton-middleware/pkg/ethrpc" + "github.com/chainsafe/canton-middleware/pkg/token" + + "github.com/ethereum/go-ethereum/common" + "go.uber.org/zap" +) + +const ( + // defaultConcurrency is the fallback when New is called with a non-positive + // concurrency value, so misconfigurations don't silently disable the worker. + defaultConcurrency = 10 + + // cantonCallTimeout bounds a single Canton transfer call. Canton commits + // typically land in 5-15s; 60s leaves generous headroom for the slow tail + // while still ensuring a hung gRPC call can't park a worker slot + // indefinitely. Deliberately not configurable — the value should be a + // property of the Canton SLO, not per-deployment tuning. + cantonCallTimeout = 60 * time.Second +) + +// Store is the narrow data-access interface the submitter needs. +// +//go:generate mockery --name Store --output mocks --outpkg mocks --filename mock_store.go --with-expecter +type Store interface { + // GetMempoolEntriesByStatus returns up to limit entries with the given + // status, ordered by insertion ID. A limit of 0 means unlimited; the + // submitter passes its batch size so a backlog never loads the entire + // pending queue into memory. + GetMempoolEntriesByStatus(ctx context.Context, status ethrpc.MempoolStatus, limit int) ([]ethrpc.MempoolEntry, error) + CompleteMempoolEntry(ctx context.Context, txHash []byte) error + FailMempoolEntry(ctx context.Context, txHash []byte, errMsg string) error +} + +// TokenService is the narrow token-service interface needed for Canton transfers. +// +//go:generate mockery --name TokenService --output mocks --outpkg mocks --filename mock_token_service.go --with-expecter +type TokenService interface { + ERC20(address common.Address) (token.ERC20, error) +} + +// Submitter polls pending mempool entries and pushes them through Canton. +type Submitter struct { + store Store + tokenSvc TokenService + interval time.Duration + batchSize int + concurrency int + logger *zap.Logger +} + +// New creates a Submitter. +// +// - interval is the tick spacing between drains. +// - batchSize caps how many pending entries are fetched per tick (0 = +// unlimited). Bounded so a backlog never loads the entire pending queue +// into memory; the next tick picks up whatever is left. +// - concurrency is the worker-pool width: how many Canton transfers run in +// parallel within one tick (<= 0 defaults to defaultConcurrency so a +// misconfiguration never silently disables the worker). +// +// The per-call Canton timeout (cantonCallTimeout) is fixed at package level +// — it's a property of the Canton SLO, not a per-deployment knob. +func New( + store Store, + tokenSvc TokenService, + interval time.Duration, + batchSize, concurrency int, + logger *zap.Logger, +) *Submitter { + if concurrency <= 0 { + concurrency = defaultConcurrency + } + return &Submitter{ + store: store, + tokenSvc: tokenSvc, + interval: interval, + batchSize: batchSize, + concurrency: concurrency, + logger: logger, + } +} + +// Start runs the submitter loop until ctx is canceled. +func (s *Submitter) Start(ctx context.Context) { + ticker := time.NewTicker(s.interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := s.drain(ctx); err != nil { + s.logger.Error("ethrpc submitter: drain failed", zap.Error(err)) + } + } + } +} + +// drain processes a bounded batch of pending entries in parallel using a +// worker-pool of size s.concurrency. Each entry runs in its own goroutine with +// its own timed context (see process). drain returns only after every spawned +// goroutine finishes so two ticks never overlap on the same entry — the +// for-select loop in Start already serializes ticks, but drain doing wg.Wait() +// makes the contract explicit and survives any future refactor. +// +// On ctx cancellation: drain stops launching new goroutines, but lets in-flight +// ones finish (their child ctxs are derived from ctx, so they unwind quickly). +func (s *Submitter) drain(ctx context.Context) error { + entries, err := s.store.GetMempoolEntriesByStatus(ctx, ethrpc.MempoolPending, s.batchSize) + if err != nil { + return fmt.Errorf("list pending mempool entries: %w", err) + } + if len(entries) == 0 { + return nil + } + + // Buffered channel acts as a counting semaphore — at most s.concurrency + // goroutines can hold a slot at once. + sem := make(chan struct{}, s.concurrency) + var wg sync.WaitGroup + for i := range entries { + // Two-stage cancellation check: the explicit ctx.Err() up top makes + // cancellation deterministic when both channels are ready (Go's + // select would otherwise pick randomly); the select inside handles + // the case where ctx is canceled while we're blocked on a saturated + // pool. + if ctx.Err() != nil { + break + } + entry := &entries[i] + select { + case <-ctx.Done(): + wg.Wait() + return nil + case sem <- struct{}{}: + } + wg.Add(1) + go func() { + defer wg.Done() + defer func() { <-sem }() + s.process(ctx, entry) + }() + } + wg.Wait() + return nil +} + +// process submits a single pending entry to Canton, recording the outcome on +// the mempool row. Permanent (client-side) failures are recorded as failed so +// they reach the receipt; transient failures (network, gRPC unavailable) — and +// timeouts — leave the entry pending for retry on the next tick. Canton's +// command-id idempotency makes the retry safe. +// +// Each invocation runs under a per-entry timed context (cantonCallTimeout) so +// a single hung gRPC call cannot park a worker slot indefinitely. The same +// timed ctx is used for the mempool update too — if Canton commits at second +// 59.9 and the update can't complete before second 60, the entry simply stays +// pending and gets reconciled on the next tick (the Canton commit itself is +// preserved by Canton's idempotency contract). +func (s *Submitter) process(parent context.Context, entry *ethrpc.MempoolEntry) { + ctx, cancel := context.WithTimeout(parent, cantonCallTimeout) + defer cancel() + + contractAddr := common.HexToAddress(entry.ContractAddress) + fromAddr := common.HexToAddress(entry.FromAddress) + toAddr := common.HexToAddress(entry.RecipientAddress) + amount := new(big.Int).SetBytes(entry.AmountData) + txHash := common.BytesToHash(entry.TxHash) + + erc20, err := s.tokenSvc.ERC20(contractAddr) + if err != nil { + // Contract whitelist is validated synchronously in SendRawTransaction, + // so reaching here means config drifted under us. Mark failed so the + // client sees the error via the receipt rather than polling forever. + s.failEntry(ctx, entry, fmt.Errorf("contract not supported: %w", err)) + return + } + + transferErr := erc20.TransferFrom(ctx, txHash.Hex(), fromAddr, toAddr, *amount) + if transferErr == nil { + if completeErr := s.store.CompleteMempoolEntry(ctx, entry.TxHash); completeErr != nil { + s.logger.Error("ethrpc submitter: complete mempool entry failed", + zap.String("tx", txHash.Hex()), + zap.Error(completeErr), + ) + } + return + } + + if isPermanentError(transferErr) { + s.failEntry(ctx, entry, transferErr) + return + } + // Transient (network error, gRPC unavailable, ctx deadline exceeded): + // leave as pending. Idempotent retry on the next tick. + s.logger.Warn("ethrpc submitter: transient transfer failure, will retry", + zap.String("tx", txHash.Hex()), + zap.Error(transferErr), + ) +} + +func (s *Submitter) failEntry(ctx context.Context, entry *ethrpc.MempoolEntry, cause error) { + if updateErr := s.store.FailMempoolEntry(ctx, entry.TxHash, cause.Error()); updateErr != nil { + s.logger.Error("ethrpc submitter: fail mempool entry update failed", + zap.String("tx", common.BytesToHash(entry.TxHash).Hex()), + zap.Error(updateErr), + ) + } +} + +// isPermanentError returns true when err is a categorized application error +// that would not benefit from retry — input validation, unsupported method, +// not-found, forbidden, conflict, gone. Dependency, recovering, timeout, and +// generic errors are treated as transient. +func isPermanentError(err error) bool { + var svcErr *apperr.ServiceError + if !errors.As(err, &svcErr) { + return false + } + switch svcErr.Category { + case apperr.CategoryDataError, + apperr.CategoryNotSupported, + apperr.CategoryUnauthorized, + apperr.CategoryForbidden, + apperr.CategoryResourceNotFound, + apperr.CategoryDataConflict, + apperr.CategoryGone: + return true + default: + return false + } +} diff --git a/pkg/ethrpc/submitter/submitter_test.go b/pkg/ethrpc/submitter/submitter_test.go new file mode 100644 index 00000000..6a356bfe --- /dev/null +++ b/pkg/ethrpc/submitter/submitter_test.go @@ -0,0 +1,437 @@ +package submitter + +import ( + "context" + "errors" + "math/big" + "sync/atomic" + "testing" + "time" + + apperr "github.com/chainsafe/canton-middleware/pkg/app/errors" + "github.com/chainsafe/canton-middleware/pkg/ethrpc" + "github.com/chainsafe/canton-middleware/pkg/ethrpc/submitter/mocks" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +const ( + testFrom = "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + testRecipient = "0xdddddddddddddddddddddddddddddddddddddddd" + testContract = "0xcccccccccccccccccccccccccccccccccccccccc" +) + +func samplePendingEntry(txHash byte, amount int64) ethrpc.MempoolEntry { + return ethrpc.MempoolEntry{ + ID: int64(txHash), + TxHash: []byte{txHash}, + FromAddress: testFrom, + ContractAddress: testContract, + RecipientAddress: testRecipient, + Nonce: uint64(txHash), + Input: []byte{0xa9, 0x05, 0x9c, 0xbb}, + AmountData: big.NewInt(amount).Bytes(), + Status: ethrpc.MempoolPending, + } +} + +func newTestSubmitter(store Store, tokenSvc TokenService) *Submitter { + // Concurrency 1 keeps tests deterministic for assertions that depend on + // ordering; tests that exercise the worker pool override New directly. + return New(store, tokenSvc, 10*time.Millisecond, 0, 1, zap.NewNop()) +} + +// ─── drain() ───────────────────────────────────────────────────────────────── + +func TestDrain_NoEntries(t *testing.T) { + store := mocks.NewStore(t) + store.EXPECT().GetMempoolEntriesByStatus(mock.Anything, ethrpc.MempoolPending, mock.Anything). + Return(nil, nil) + + s := newTestSubmitter(store, mocks.NewTokenService(t)) + require.NoError(t, s.drain(context.Background())) +} + +func TestDrain_GetEntriesError(t *testing.T) { + store := mocks.NewStore(t) + store.EXPECT().GetMempoolEntriesByStatus(mock.Anything, ethrpc.MempoolPending, mock.Anything). + Return(nil, errors.New("db down")) + + s := newTestSubmitter(store, mocks.NewTokenService(t)) + err := s.drain(context.Background()) + require.Error(t, err) + assert.Contains(t, err.Error(), "db down") +} + +// ─── process(): success path ───────────────────────────────────────────────── + +func TestProcess_Success_MarksCompleted(t *testing.T) { + entry := samplePendingEntry(0x01, 42) + + erc20 := mocks.NewERC20(t) + erc20.EXPECT(). + TransferFrom(mock.Anything, mock.Anything, common.HexToAddress(testFrom), common.HexToAddress(testRecipient), mock.Anything). + Return(nil) + + tokenSvc := mocks.NewTokenService(t) + tokenSvc.EXPECT().ERC20(common.HexToAddress(testContract)).Return(erc20, nil) + + store := mocks.NewStore(t) + store.EXPECT().CompleteMempoolEntry(mock.Anything, entry.TxHash).Return(nil) + + s := newTestSubmitter(store, tokenSvc) + s.process(context.Background(), &entry) +} + +// ─── process(): permanent failure ──────────────────────────────────────────── + +func TestProcess_PermanentFailure_MarksFailed(t *testing.T) { + entry := samplePendingEntry(0x02, 100) + + transferErr := apperr.BadRequestError(errors.New("user not found"), "failed to get sender") + + erc20 := mocks.NewERC20(t) + erc20.EXPECT(). + TransferFrom(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(transferErr) + + tokenSvc := mocks.NewTokenService(t) + tokenSvc.EXPECT().ERC20(common.HexToAddress(testContract)).Return(erc20, nil) + + store := mocks.NewStore(t) + store.EXPECT().FailMempoolEntry(mock.Anything, entry.TxHash, mock.AnythingOfType("string")).Return(nil) + + s := newTestSubmitter(store, tokenSvc) + s.process(context.Background(), &entry) +} + +// ─── process(): transient failure ──────────────────────────────────────────── + +func TestProcess_TransientFailure_LeavesPending(t *testing.T) { + entry := samplePendingEntry(0x03, 50) + + transferErr := apperr.DependencyError(errors.New("gRPC unavailable"), "canton transfer failed") + + erc20 := mocks.NewERC20(t) + erc20.EXPECT(). + TransferFrom(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(transferErr) + + tokenSvc := mocks.NewTokenService(t) + tokenSvc.EXPECT().ERC20(common.HexToAddress(testContract)).Return(erc20, nil) + + store := mocks.NewStore(t) + // No Complete or Fail calls — store must not be touched on transient errors. + + s := newTestSubmitter(store, tokenSvc) + s.process(context.Background(), &entry) + + // Cross-check by AssertExpectations (called via NewStore cleanup). + store.AssertNotCalled(t, "CompleteMempoolEntry", mock.Anything, mock.Anything) + store.AssertNotCalled(t, "FailMempoolEntry", mock.Anything, mock.Anything, mock.Anything) +} + +// Uncategorized error must also be treated as transient — the contract is +// "permanent only if explicitly categorized as a client-side problem". +func TestProcess_UncategorizedError_LeavesPending(t *testing.T) { + entry := samplePendingEntry(0x04, 75) + + erc20 := mocks.NewERC20(t) + erc20.EXPECT(). + TransferFrom(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(errors.New("connection refused")) + + tokenSvc := mocks.NewTokenService(t) + tokenSvc.EXPECT().ERC20(common.HexToAddress(testContract)).Return(erc20, nil) + + store := mocks.NewStore(t) + + s := newTestSubmitter(store, tokenSvc) + s.process(context.Background(), &entry) + + store.AssertNotCalled(t, "CompleteMempoolEntry", mock.Anything, mock.Anything) + store.AssertNotCalled(t, "FailMempoolEntry", mock.Anything, mock.Anything, mock.Anything) +} + +// ─── process(): contract whitelist drift ───────────────────────────────────── + +func TestProcess_ContractNotSupported_MarksFailed(t *testing.T) { + entry := samplePendingEntry(0x05, 1) + + tokenSvc := mocks.NewTokenService(t) + tokenSvc.EXPECT().ERC20(common.HexToAddress(testContract)).Return(nil, errors.New("unknown token")) + + store := mocks.NewStore(t) + store.EXPECT().FailMempoolEntry(mock.Anything, entry.TxHash, mock.MatchedBy(func(msg string) bool { + return msg != "" + })).Return(nil) + + s := newTestSubmitter(store, tokenSvc) + s.process(context.Background(), &entry) +} + +// ─── drain(): batch size is pushed to the store as the SQL limit ───────────── + +func TestDrain_BatchSizePushedToStore(t *testing.T) { + // Submitter must forward its batch size to the store so the LIMIT is + // applied in SQL (preventing a backlog from being loaded into memory). + const batchSize = 2 + + store := mocks.NewStore(t) + store.EXPECT().GetMempoolEntriesByStatus(mock.Anything, ethrpc.MempoolPending, batchSize). + Return([]ethrpc.MempoolEntry{ + samplePendingEntry(0x01, 1), + samplePendingEntry(0x02, 2), + }, nil) + store.EXPECT().CompleteMempoolEntry(mock.Anything, mock.Anything).Return(nil).Times(2) + + erc20 := mocks.NewERC20(t) + erc20.EXPECT(). + TransferFrom(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(nil).Times(2) + + tokenSvc := mocks.NewTokenService(t) + tokenSvc.EXPECT().ERC20(mock.Anything).Return(erc20, nil).Times(2) + + s := New(store, tokenSvc, time.Second, batchSize, 1, zap.NewNop()) + require.NoError(t, s.drain(context.Background())) +} + +// ─── drain(): ctx cancellation stops mid-batch ─────────────────────────────── + +func TestDrain_ContextCanceledMidBatch(t *testing.T) { + entries := []ethrpc.MempoolEntry{ + samplePendingEntry(0x01, 1), + samplePendingEntry(0x02, 2), + } + + store := mocks.NewStore(t) + store.EXPECT().GetMempoolEntriesByStatus(mock.Anything, ethrpc.MempoolPending, mock.Anything).Return(entries, nil) + + tokenSvc := mocks.NewTokenService(t) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // already canceled + + s := newTestSubmitter(store, tokenSvc) + require.NoError(t, s.drain(ctx)) + + // No ERC20 lookups should occur because the loop exits before processing. + tokenSvc.AssertNotCalled(t, "ERC20", mock.Anything) +} + +// ─── Start() lifecycle ─────────────────────────────────────────────────────── + +func TestStart_StopsOnContextCancel(t *testing.T) { + store := mocks.NewStore(t) + store.EXPECT().GetMempoolEntriesByStatus(mock.Anything, ethrpc.MempoolPending, mock.Anything).Return(nil, nil).Maybe() + + tokenSvc := mocks.NewTokenService(t) + + s := New(store, tokenSvc, 5*time.Millisecond, 0, 1, zap.NewNop()) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + s.Start(ctx) + close(done) + }() + + time.Sleep(30 * time.Millisecond) + cancel() + + select { + case <-done: + // Start returned. + case <-time.After(2 * time.Second): + t.Fatal("Start did not return after context cancellation") + } +} + +// ─── concurrency ───────────────────────────────────────────────────────────── + +// TestDrain_RunsEntriesConcurrently verifies that the worker pool actually +// processes entries in parallel. We block each TransferFrom on a barrier that +// only releases once we observe `concurrency` goroutines in flight at the same +// time — if drain were sequential, the test would deadlock and time out. +func TestDrain_RunsEntriesConcurrently(t *testing.T) { + const concurrency = 3 + entries := make([]ethrpc.MempoolEntry, concurrency) + for i := range entries { + entries[i] = samplePendingEntry(byte(i+1), int64(i+1)) + } + + store := mocks.NewStore(t) + store.EXPECT().GetMempoolEntriesByStatus(mock.Anything, ethrpc.MempoolPending, mock.Anything).Return(entries, nil) + store.EXPECT().CompleteMempoolEntry(mock.Anything, mock.Anything).Return(nil).Times(concurrency) + + // Counts goroutines that have entered TransferFrom; release() unblocks + // them once we've seen all `concurrency` arrive simultaneously. + var inFlight int32 + allArrived := make(chan struct{}) + release := make(chan struct{}) + erc20 := mocks.NewERC20(t) + erc20.EXPECT(). + TransferFrom(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(_ context.Context, _ string, _, _ common.Address, _ big.Int) error { + if atomic.AddInt32(&inFlight, 1) == int32(concurrency) { + close(allArrived) + } + <-release + return nil + }).Times(concurrency) + + tokenSvc := mocks.NewTokenService(t) + tokenSvc.EXPECT().ERC20(mock.Anything).Return(erc20, nil).Times(concurrency) + + s := New(store, tokenSvc, time.Second, 0, concurrency, zap.NewNop()) + + drained := make(chan error, 1) + go func() { drained <- s.drain(context.Background()) }() + + // All `concurrency` workers must reach TransferFrom before any returns. + select { + case <-allArrived: + case <-time.After(2 * time.Second): + t.Fatalf("only %d goroutines reached TransferFrom; pool is not concurrent", + atomic.LoadInt32(&inFlight)) + } + close(release) + + select { + case err := <-drained: + require.NoError(t, err) + case <-time.After(2 * time.Second): + t.Fatal("drain did not return after releasing workers") + } +} + +// TestDrain_RespectsConcurrencyCap verifies that no more than `concurrency` +// TransferFrom calls run simultaneously even when the batch has more entries +// than the pool size. Track peak in-flight count and assert it never exceeds +// the cap. +func TestDrain_RespectsConcurrencyCap(t *testing.T) { + const ( + concurrency = 2 + batch = 6 + ) + entries := make([]ethrpc.MempoolEntry, batch) + for i := range entries { + entries[i] = samplePendingEntry(byte(i+1), int64(i+1)) + } + + store := mocks.NewStore(t) + store.EXPECT().GetMempoolEntriesByStatus(mock.Anything, ethrpc.MempoolPending, mock.Anything).Return(entries, nil) + store.EXPECT().CompleteMempoolEntry(mock.Anything, mock.Anything).Return(nil).Times(batch) + + var ( + inFlight int32 + peak int32 + ) + erc20 := mocks.NewERC20(t) + erc20.EXPECT(). + TransferFrom(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(_ context.Context, _ string, _, _ common.Address, _ big.Int) error { + cur := atomic.AddInt32(&inFlight, 1) + // Track the high-water mark of concurrent workers. + for { + p := atomic.LoadInt32(&peak) + if cur <= p || atomic.CompareAndSwapInt32(&peak, p, cur) { + break + } + } + time.Sleep(20 * time.Millisecond) + atomic.AddInt32(&inFlight, -1) + return nil + }).Times(batch) + + tokenSvc := mocks.NewTokenService(t) + tokenSvc.EXPECT().ERC20(mock.Anything).Return(erc20, nil).Times(batch) + + s := New(store, tokenSvc, time.Second, 0, concurrency, zap.NewNop()) + require.NoError(t, s.drain(context.Background())) + + require.LessOrEqual(t, int(atomic.LoadInt32(&peak)), concurrency, + "peak concurrent workers exceeded the cap") +} + +// New() must coerce a non-positive concurrency to the package default rather +// than silently disabling the pool — a zero-buffered semaphore would deadlock. +func TestNew_NonPositiveConcurrencyDefaults(t *testing.T) { + s := New(nil, nil, time.Second, 0, 0, zap.NewNop()) + require.Equal(t, defaultConcurrency, s.concurrency) + + s = New(nil, nil, time.Second, 0, -5, zap.NewNop()) + require.Equal(t, defaultConcurrency, s.concurrency) +} + +// ─── per-process timed context ─────────────────────────────────────────────── + +// TestProcess_CantonContextDone_LeavesPending verifies that a Canton call +// returning ctx-derived cancellation is treated as transient — entry stays +// pending so the next tick retries. The hardcoded cantonCallTimeout is too +// long to wait on in tests, but its derived ctx still inherits cancellation +// from the parent, so we cancel the parent and let propagation drive the +// timeout path with no real delay. +func TestProcess_CantonContextDone_LeavesPending(t *testing.T) { + entry := samplePendingEntry(0x06, 1) + + erc20 := mocks.NewERC20(t) + erc20.EXPECT(). + TransferFrom(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(ctx context.Context, _ string, _, _ common.Address, _ big.Int) error { + // Block until the per-process ctx fires (propagated from parent). + <-ctx.Done() + return ctx.Err() + }) + + tokenSvc := mocks.NewTokenService(t) + tokenSvc.EXPECT().ERC20(common.HexToAddress(testContract)).Return(erc20, nil) + + store := mocks.NewStore(t) + + ctx, cancel := context.WithCancel(context.Background()) + // Fire cancellation after a brief moment so TransferFrom is definitely + // blocked on <-ctx.Done() rather than racing with the cancel. + go func() { + time.Sleep(20 * time.Millisecond) + cancel() + }() + + s := New(store, tokenSvc, time.Second, 0, 1, zap.NewNop()) + s.process(ctx, &entry) + + // Cancellation/timeout is transient → no Complete/Fail call. + store.AssertNotCalled(t, "CompleteMempoolEntry", mock.Anything, mock.Anything) + store.AssertNotCalled(t, "FailMempoolEntry", mock.Anything, mock.Anything, mock.Anything) +} + +// ─── isPermanentError ──────────────────────────────────────────────────────── + +func TestIsPermanentError(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + {"nil is not permanent", nil, false}, + {"plain error is transient", errors.New("oops"), false}, + {"BadRequest is permanent", apperr.BadRequestError(errors.New(""), "x"), true}, + {"NotSupported is permanent", apperr.NotSupportedError(errors.New(""), "x"), true}, + {"Forbidden is permanent", apperr.ForbiddenError(errors.New(""), "x"), true}, + {"NotFound is permanent", apperr.ResourceNotFoundError(errors.New(""), "x"), true}, + {"Conflict is permanent", apperr.ConflictError(errors.New(""), "x"), true}, + {"Gone is permanent", apperr.GoneError(errors.New(""), "x"), true}, + {"Dependency is transient", apperr.DependencyError(errors.New(""), "x"), false}, + {"General is transient", apperr.GeneralError(errors.New("x")), false}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.want, isPermanentError(tc.err)) + }) + } +} diff --git a/pkg/ethrpc/types.go b/pkg/ethrpc/types.go index b87085f9..1f9bc36a 100644 --- a/pkg/ethrpc/types.go +++ b/pkg/ethrpc/types.go @@ -140,6 +140,13 @@ type RPCReceipt struct { Status hexutil.Uint64 `json:"status"` EffectiveGasPrice hexutil.Uint64 `json:"effectiveGasPrice"` Type hexutil.Uint64 `json:"type"` + // RevertReason is a non-standard, optional, human-readable failure cause + // surfaced for status=0 receipts. Real EVM nodes don't include this — the + // reason is encoded in revert data — but our Canton-backed receipts have + // no on-chain revert data to encode, so we expose the Canton error message + // directly. Standard Ethereum clients ignore unknown fields; tools that + // care (Loop wallet, our own scripts) can render the cause to the user. + RevertReason string `json:"revertReason,omitempty"` } // RPCTransaction represents a transaction in JSON-RPC format @@ -202,8 +209,8 @@ const ( MempoolMined MempoolStatus = "mined" // Included in a synthetic EVM block ) -// MempoolEntry is the intent log record written by SendRawTransaction -// and consumed by the miner goroutine. +// MempoolEntry is the intent log record written by SendRawTransaction, +// processed by the submitter, and consumed by the miner. type MempoolEntry struct { ID int64 TxHash []byte // EVM transaction hash @@ -213,8 +220,12 @@ type MempoolEntry struct { Nonce uint64 Input []byte // raw EVM calldata AmountData []byte // big.Int.Bytes() of the transfer amount; used in Transfer log - Status MempoolStatus - ErrorMessage string + // Status is the lifecycle state observed when this record was loaded. For + // entries returned by ClaimMempoolEntries it reflects the value the entry + // held immediately before being sealed (completed or failed), so the miner + // can synthesize the correct EVM transaction status (1 vs 0). + Status MempoolStatus + ErrorMessage string // populated for failed entries; surfaced via the receipt } // SyncStatus represents the syncing status response diff --git a/tests/e2e/devstack/dsl/helpers.go b/tests/e2e/devstack/dsl/helpers.go index 8f7d60bc..b31430a2 100644 --- a/tests/e2e/devstack/dsl/helpers.go +++ b/tests/e2e/devstack/dsl/helpers.go @@ -5,14 +5,18 @@ package dsl import ( "context" "encoding/hex" + "errors" "fmt" "math/big" "strings" "testing" "time" + ethereum "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" + gethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" "github.com/chainsafe/canton-middleware/pkg/indexer" "github.com/chainsafe/canton-middleware/pkg/relayer" @@ -27,6 +31,11 @@ const ( indexerEventTimeout = 60 * time.Second ethBalanceTimeout = 120 * time.Second eventPageSize = 50 + // apiTxReceiptTimeout bounds the wait for an async sendRawTransaction + // to reach a terminal state. Canton commits typically land in 5-15s; the + // submitter ticks every 500ms; the miner ticks every 2s. 90s gives ample + // headroom while still catching genuinely stuck submissions. + apiTxReceiptTimeout = 90 * time.Second ) // WaitForRelayerReady polls until the relayer reports ready or the 60s timeout @@ -341,6 +350,84 @@ func AmountLT(amount, min string) bool { // amountGTE is the internal alias used by DSL polling helpers. func amountGTE(amount, min string) bool { return AmountGTE(amount, min) } +// WaitForAPITxReceipt polls the api-server's /eth JSON-RPC facade until the +// transaction at txHash reaches a terminal state, then asserts success. +// +// This helper exists because the api-server's eth_sendRawTransaction is +// asynchronous: the call returns the tx hash immediately after recording the +// pending intent, and the submitter worker subsequently drives the Canton +// transfer to completion (or failure). Standard Ethereum-tooling flow applies: +// submit, then poll eth_getTransactionReceipt. +// +// Behavior: +// - receipt with Status=1 → returns the receipt. +// - receipt with Status=0 → t.Fatalf, including the RevertReason from the +// receipt (we set this from the Canton error message in the miner). +// - no receipt yet → keeps polling until apiTxReceiptTimeout. +// +// Use this in any e2e test that submits a signed EVM transaction through +// `sys.APIServer.RPC().SendTransaction(...)` and needs to assert it actually +// landed on Canton — relying on WaitForAPIBalance alone hides failure +// reasons (it just times out without telling you why). +func (d *DSL) WaitForAPITxReceipt(ctx context.Context, t *testing.T, txHash common.Hash) *gethtypes.Receipt { + t.Helper() + if d.apiServer == nil { + t.Fatal("WaitForAPITxReceipt not available: APIServer shim not initialized") + return nil + } + rpc := d.apiServer.RPC() + deadline := time.Now().Add(apiTxReceiptTimeout) + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + + var lastErr error + for time.Now().Before(deadline) { + receipt, err := rpc.TransactionReceipt(ctx, txHash) + switch { + case err == nil && receipt != nil: + if receipt.Status == gethtypes.ReceiptStatusSuccessful { + return receipt + } + // Try to surface the Canton failure cause. ethclient's standard + // Receipt type drops unknown fields, so we re-query via the raw + // RPC client to pick up the non-standard revertReason field. + t.Fatalf("WaitForAPITxReceipt: tx %s reverted (status=0)%s", + txHash.Hex(), formatRevertReason(ctx, d.apiServer.RPC(), txHash)) + return nil + case err != nil && !errors.Is(err, ethereum.NotFound): + // Genuine RPC failure, not just "tx not yet visible". Surface it + // instead of looping silently until the timeout. + lastErr = err + } + select { + case <-ctx.Done(): + t.Fatal("context canceled waiting for API tx receipt") + return nil + case <-ticker.C: + } + } + if lastErr != nil { + t.Fatalf("WaitForAPITxReceipt: timed out for tx %s: last RPC error: %v", txHash.Hex(), lastErr) + } + t.Fatalf("WaitForAPITxReceipt: timed out waiting for tx %s receipt", txHash.Hex()) + return nil +} + +// formatRevertReason fetches the receipt as a raw JSON map to pull out the +// non-standard revertReason field surfaced by our api-server for status=0 +// receipts. Best-effort: returns "" if anything goes wrong so we never mask +// the underlying Status=0 assertion failure. +func formatRevertReason(ctx context.Context, client *ethclient.Client, txHash common.Hash) string { + var raw map[string]any + if err := client.Client().CallContext(ctx, &raw, "eth_getTransactionReceipt", txHash); err != nil { + return "" + } + if reason, ok := raw["revertReason"].(string); ok && reason != "" { + return ": " + reason + } + return "" +} + // WaitForEthBalance polls the Anvil ERC-20 balance of ownerAddr for tokenAddr // until it is >= minWei, or the 120s timeout is reached. minWei is expressed in // the token's smallest unit (wei for 18-decimal tokens). Use this to confirm From 0a4948cbfd31a045bfd2c7225b81c2ed7e77266c Mon Sep 17 00:00:00 2001 From: sadiq1971 Date: Tue, 26 May 2026 10:49:07 +0600 Subject: [PATCH 2/2] fix(submitter): isolate mempool-status writes from Canton ctx Per Gemini review on #281: CompleteMempoolEntry / FailMempoolEntry were sharing the Canton-scoped ctx (60s deadline). A Canton call that nearly exhausted its budget would leave the DB write with an expired deadline, the row would stay pending, and a permanent failure would loop forever against a Canton that already rejected the transfer. Derive the DB-write ctx from `parent` instead, with its own dbWriteTimeout (10s). Extract a completeEntry helper to mirror failEntry. Two new tests pin the property: the ctx CompleteMempoolEntry / FailMempoolEntry receive is distinct from the Canton ctx and has at least 5s of budget left. --- pkg/ethrpc/submitter/submitter.go | 62 +++++++++++++-------- pkg/ethrpc/submitter/submitter_test.go | 74 ++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 21 deletions(-) diff --git a/pkg/ethrpc/submitter/submitter.go b/pkg/ethrpc/submitter/submitter.go index b200907e..3c723c18 100644 --- a/pkg/ethrpc/submitter/submitter.go +++ b/pkg/ethrpc/submitter/submitter.go @@ -34,6 +34,14 @@ const ( // indefinitely. Deliberately not configurable — the value should be a // property of the Canton SLO, not per-deployment tuning. cantonCallTimeout = 60 * time.Second + + // dbWriteTimeout bounds the mempool-status update that follows a Canton + // call. It's a fresh deadline derived from the drain ctx (not the Canton + // ctx) so a Canton call that just barely beats its 60s budget still has + // room to record the outcome — otherwise a permanent failure could leave + // the entry pending and the submitter would retry against a Canton that + // already rejected (or, worse, already accepted) the transfer. + dbWriteTimeout = 10 * time.Second ) // Store is the narrow data-access interface the submitter needs. @@ -170,16 +178,14 @@ func (s *Submitter) drain(ctx context.Context) error { // timeouts — leave the entry pending for retry on the next tick. Canton's // command-id idempotency makes the retry safe. // -// Each invocation runs under a per-entry timed context (cantonCallTimeout) so -// a single hung gRPC call cannot park a worker slot indefinitely. The same -// timed ctx is used for the mempool update too — if Canton commits at second -// 59.9 and the update can't complete before second 60, the entry simply stays -// pending and gets reconciled on the next tick (the Canton commit itself is -// preserved by Canton's idempotency contract). +// The Canton call runs under its own cantonCallTimeout deadline so a hung gRPC +// call can't park a worker slot indefinitely. The follow-up mempool-status +// write is intentionally derived from parent (not from the Canton ctx) with +// its own dbWriteTimeout — otherwise a Canton call that nearly exhausts its +// 60s budget would leave no time for the DB update, the row would stay +// pending, and a permanent failure would loop forever against a Canton that +// already rejected it. func (s *Submitter) process(parent context.Context, entry *ethrpc.MempoolEntry) { - ctx, cancel := context.WithTimeout(parent, cantonCallTimeout) - defer cancel() - contractAddr := common.HexToAddress(entry.ContractAddress) fromAddr := common.HexToAddress(entry.FromAddress) toAddr := common.HexToAddress(entry.RecipientAddress) @@ -191,23 +197,20 @@ func (s *Submitter) process(parent context.Context, entry *ethrpc.MempoolEntry) // Contract whitelist is validated synchronously in SendRawTransaction, // so reaching here means config drifted under us. Mark failed so the // client sees the error via the receipt rather than polling forever. - s.failEntry(ctx, entry, fmt.Errorf("contract not supported: %w", err)) + s.failEntry(parent, entry, fmt.Errorf("contract not supported: %w", err)) return } - transferErr := erc20.TransferFrom(ctx, txHash.Hex(), fromAddr, toAddr, *amount) + cantonCtx, cancel := context.WithTimeout(parent, cantonCallTimeout) + defer cancel() + transferErr := erc20.TransferFrom(cantonCtx, txHash.Hex(), fromAddr, toAddr, *amount) if transferErr == nil { - if completeErr := s.store.CompleteMempoolEntry(ctx, entry.TxHash); completeErr != nil { - s.logger.Error("ethrpc submitter: complete mempool entry failed", - zap.String("tx", txHash.Hex()), - zap.Error(completeErr), - ) - } + s.completeEntry(parent, entry, txHash) return } if isPermanentError(transferErr) { - s.failEntry(ctx, entry, transferErr) + s.failEntry(parent, entry, transferErr) return } // Transient (network error, gRPC unavailable, ctx deadline exceeded): @@ -218,11 +221,28 @@ func (s *Submitter) process(parent context.Context, entry *ethrpc.MempoolEntry) ) } -func (s *Submitter) failEntry(ctx context.Context, entry *ethrpc.MempoolEntry, cause error) { - if updateErr := s.store.FailMempoolEntry(ctx, entry.TxHash, cause.Error()); updateErr != nil { +// completeEntry writes the pending → completed transition under its own short +// deadline derived from parent (see dbWriteTimeout doc). +func (s *Submitter) completeEntry(parent context.Context, entry *ethrpc.MempoolEntry, txHash common.Hash) { + ctx, cancel := context.WithTimeout(parent, dbWriteTimeout) + defer cancel() + if err := s.store.CompleteMempoolEntry(ctx, entry.TxHash); err != nil { + s.logger.Error("ethrpc submitter: complete mempool entry failed", + zap.String("tx", txHash.Hex()), + zap.Error(err), + ) + } +} + +// failEntry writes the pending → failed transition under its own short +// deadline derived from parent (see dbWriteTimeout doc). +func (s *Submitter) failEntry(parent context.Context, entry *ethrpc.MempoolEntry, cause error) { + ctx, cancel := context.WithTimeout(parent, dbWriteTimeout) + defer cancel() + if err := s.store.FailMempoolEntry(ctx, entry.TxHash, cause.Error()); err != nil { s.logger.Error("ethrpc submitter: fail mempool entry update failed", zap.String("tx", common.BytesToHash(entry.TxHash).Hex()), - zap.Error(updateErr), + zap.Error(err), ) } } diff --git a/pkg/ethrpc/submitter/submitter_test.go b/pkg/ethrpc/submitter/submitter_test.go index 6a356bfe..0117a6b9 100644 --- a/pkg/ethrpc/submitter/submitter_test.go +++ b/pkg/ethrpc/submitter/submitter_test.go @@ -410,6 +410,80 @@ func TestProcess_CantonContextDone_LeavesPending(t *testing.T) { store.AssertNotCalled(t, "FailMempoolEntry", mock.Anything, mock.Anything, mock.Anything) } +// TestProcess_DBWriteUsesFreshContext_OnSuccess proves the mempool status +// update is *not* tied to the Canton-scoped ctx — otherwise a Canton call +// that nearly exhausts its 60s budget would leave no room to write the +// outcome, and a permanently-failing entry would loop forever. We assert +// CompleteMempoolEntry lands with a non-expired ctx whose deadline matches +// dbWriteTimeout (within tolerance), even when TransferFrom has just observed +// its own ctx fire. +func TestProcess_DBWriteUsesFreshContext_OnSuccess(t *testing.T) { + entry := samplePendingEntry(0x07, 1) + + // Capture the Canton ctx so we can confirm the DB ctx is a different one. + var cantonCtx context.Context + erc20 := mocks.NewERC20(t) + erc20.EXPECT(). + TransferFrom(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(ctx context.Context, _ string, _, _ common.Address, _ big.Int) error { + cantonCtx = ctx + return nil + }) + + tokenSvc := mocks.NewTokenService(t) + tokenSvc.EXPECT().ERC20(common.HexToAddress(testContract)).Return(erc20, nil) + + store := mocks.NewStore(t) + store.EXPECT(). + CompleteMempoolEntry(mock.MatchedBy(func(ctx context.Context) bool { + if ctx == cantonCtx { // must be a freshly-derived ctx, not the Canton one + return false + } + if err := ctx.Err(); err != nil { + return false + } + deadline, ok := ctx.Deadline() + // dbWriteTimeout = 10s; allow a generous lower bound so test + // timing jitter doesn't flake the assertion. + return ok && time.Until(deadline) > 5*time.Second + }), entry.TxHash). + Return(nil) + + s := New(store, tokenSvc, time.Second, 0, 1, zap.NewNop()) + s.process(context.Background(), &entry) +} + +// TestFailEntry_UsesFreshContext mirrors the success path: a permanent error +// must record `failed` under its own dbWriteTimeout-bounded ctx, not the +// Canton ctx. This is the bug Gemini flagged: if FailMempoolEntry inherits +// an expired Canton ctx, the entry stays pending and the submitter retries +// the permanently-failing transaction forever. +func TestFailEntry_UsesFreshContext(t *testing.T) { + entry := samplePendingEntry(0x08, 1) + + erc20 := mocks.NewERC20(t) + erc20.EXPECT(). + TransferFrom(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(apperr.BadRequestError(errors.New("nope"), "permanent")) + + tokenSvc := mocks.NewTokenService(t) + tokenSvc.EXPECT().ERC20(common.HexToAddress(testContract)).Return(erc20, nil) + + store := mocks.NewStore(t) + store.EXPECT(). + FailMempoolEntry(mock.MatchedBy(func(ctx context.Context) bool { + if err := ctx.Err(); err != nil { + return false + } + deadline, ok := ctx.Deadline() + return ok && time.Until(deadline) > 5*time.Second + }), entry.TxHash, mock.AnythingOfType("string")). + Return(nil) + + s := New(store, tokenSvc, time.Second, 0, 1, zap.NewNop()) + s.process(context.Background(), &entry) +} + // ─── isPermanentError ──────────────────────────────────────────────────────── func TestIsPermanentError(t *testing.T) {