From 194ff8e8eb6caac0fa2a234c4d1db8b8b6ba35b3 Mon Sep 17 00:00:00 2001 From: Dimitris Date: Thu, 22 Jan 2026 15:22:01 +0200 Subject: [PATCH 1/7] First draft --- pkg/txm/clientwrappers/chain_client.go | 3 +- .../dualbroadcast/flashbots_client.go | 78 ++++++++++++++++++- .../dualbroadcast/meta_client.go | 2 +- pkg/txm/mock_client_test.go | 21 ++--- pkg/txm/mock_tx_store_test.go | 59 ++++++++++++++ pkg/txm/storage/inmemory_store.go | 20 +++++ pkg/txm/storage/inmemory_store_manager.go | 7 ++ pkg/txm/txm.go | 5 +- 8 files changed, 179 insertions(+), 16 deletions(-) diff --git a/pkg/txm/clientwrappers/chain_client.go b/pkg/txm/clientwrappers/chain_client.go index 1f05f38165..f0e9330ddc 100644 --- a/pkg/txm/clientwrappers/chain_client.go +++ b/pkg/txm/clientwrappers/chain_client.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/smartcontractkit/chainlink-evm/pkg/client" + "github.com/smartcontractkit/chainlink-evm/pkg/txm" "github.com/smartcontractkit/chainlink-evm/pkg/txm/types" ) @@ -26,6 +27,6 @@ func (c *ChainClient) PendingNonceAt(ctx context.Context, address common.Address return c.c.PendingNonceAt(ctx, address) } -func (c *ChainClient) SendTransaction(ctx context.Context, _ *types.Transaction, attempt *types.Attempt) error { +func (c *ChainClient) SendTransaction(ctx context.Context, _ *types.Transaction, attempt *types.Attempt, txStore txm.TxStore) error { return c.c.SendTransaction(ctx, attempt.SignedTransaction) } diff --git a/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go index 9313490e2d..42db777e39 100644 --- a/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go +++ b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go @@ -55,7 +55,7 @@ func (d *FlashbotsClient) PendingNonceAt(ctx context.Context, address common.Add return nonce, nil } -func (d *FlashbotsClient) SendTransaction(ctx context.Context, tx *types.Transaction, attempt *types.Attempt) error { +func (d *FlashbotsClient) SendTransaction(ctx context.Context, tx *types.Transaction, attempt *types.Attempt, txStore txm.TxStore) error { meta, err := tx.GetMeta() if err != nil { return err @@ -72,7 +72,14 @@ func (d *FlashbotsClient) SendTransaction(ctx context.Context, tx *types.Transac } body := []byte(fmt.Sprintf(`{"jsonrpc":"2.0","method":"eth_sendRawTransaction","params":["%s"], "id":1}`, hexutil.Encode(data))) _, err = d.signAndPostMessage(ctx, tx.FromAddress, body, params) - return err + if err != nil { + return err + } + + // After successfully sending the transaction, send a bundle with all unconfirmed transactions + _ = d.SendBundle(ctx, txStore, tx.FromAddress, params) + // Don't return bundle error - the single transaction was already sent successfully + return nil } return d.c.SendTransaction(ctx, attempt.SignedTransaction) @@ -126,3 +133,70 @@ type postResponse struct { type postError struct { Message string `json:"message,omitempty"` } + +func (d *FlashbotsClient) SendBundle(ctx context.Context, txStore txm.TxStore, fromAddress common.Address, urlParams string) error { + unconfirmedTxs, err := txStore.FetchUnconfirmedTransactions(ctx, fromAddress) + if err != nil { + return fmt.Errorf("failed to fetch unconfirmed transactions: %w", err) + } + + // TODO: Get the first attempt from each transaction for now. + attempts := make([]*types.Attempt, 0, len(unconfirmedTxs)) + for _, unconfirmedTx := range unconfirmedTxs { + if len(unconfirmedTx.Attempts) > 0 { + attempts = append(attempts, unconfirmedTx.Attempts[0]) + } + } + + // Need at least 2 transactions to send a bundle + if len(attempts) < 2 { + return nil + } + + // TODO: we don't have a good way to get this other than making an RPC call. Some async caching may help with the overhead. + currentBlock, err := d.c.LatestBlockHeight(ctx) + if err != nil { + return fmt.Errorf("failed to get current block height: %w", err) + } + targetBlock := new(big.Int).Add(currentBlock, big.NewInt(1)) + + bodyItems := make([]map[string]any, 0, len(attempts)) + for _, attempt := range attempts { + if attempt.SignedTransaction == nil { + return fmt.Errorf("attempt with ID %d has nil SignedTransaction", attempt.ID) + } + + txData, err := attempt.SignedTransaction.MarshalBinary() + if err != nil { + return fmt.Errorf("failed to marshal transaction for attempt ID %d: %w", attempt.ID, err) + } + + bodyItems = append(bodyItems, map[string]interface{}{ + "tx": hexutil.Encode(txData), + "canRevert": false, + }) + } + + bundleParams := map[string]interface{}{ + "version": "v0.1", + "inclusion": map[string]interface{}{ + "block": hexutil.EncodeBig(targetBlock), + }, + "body": bodyItems, + } + + requestBody := map[string]interface{}{ + "jsonrpc": "2.0", + "id": 1, + "method": "mev_sendBundle", + "params": []any{bundleParams}, + } + + bodyBytes, err := json.Marshal(requestBody) + if err != nil { + return fmt.Errorf("failed to marshal bundle request: %w", err) + } + + _, err = d.signAndPostMessage(ctx, fromAddress, bodyBytes, urlParams) + return err +} diff --git a/pkg/txm/clientwrappers/dualbroadcast/meta_client.go b/pkg/txm/clientwrappers/dualbroadcast/meta_client.go index ed120d5244..a4b72433e3 100644 --- a/pkg/txm/clientwrappers/dualbroadcast/meta_client.go +++ b/pkg/txm/clientwrappers/dualbroadcast/meta_client.go @@ -163,7 +163,7 @@ func (a *MetaClient) PendingNonceAt(ctx context.Context, address common.Address) return a.c.PendingNonceAt(ctx, address) } -func (a *MetaClient) SendTransaction(ctx context.Context, tx *types.Transaction, attempt *types.Attempt) error { +func (a *MetaClient) SendTransaction(ctx context.Context, tx *types.Transaction, attempt *types.Attempt, txStore txm.TxStore) error { meta, err := tx.GetMeta() if err != nil { return err diff --git a/pkg/txm/mock_client_test.go b/pkg/txm/mock_client_test.go index 549a1c851d..2bef743594 100644 --- a/pkg/txm/mock_client_test.go +++ b/pkg/txm/mock_client_test.go @@ -141,17 +141,17 @@ func (_c *MockClient_PendingNonceAt_Call) RunAndReturn(run func(context.Context, return _c } -// SendTransaction provides a mock function with given fields: ctx, tx, attempt -func (_m *MockClient) SendTransaction(ctx context.Context, tx *types.Transaction, attempt *types.Attempt) error { - ret := _m.Called(ctx, tx, attempt) +// SendTransaction provides a mock function with given fields: ctx, tx, attempt, store +func (_m *MockClient) SendTransaction(ctx context.Context, tx *types.Transaction, attempt *types.Attempt, store TxStore) error { + ret := _m.Called(ctx, tx, attempt, store) if len(ret) == 0 { panic("no return value specified for SendTransaction") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *types.Transaction, *types.Attempt) error); ok { - r0 = rf(ctx, tx, attempt) + if rf, ok := ret.Get(0).(func(context.Context, *types.Transaction, *types.Attempt, TxStore) error); ok { + r0 = rf(ctx, tx, attempt, store) } else { r0 = ret.Error(0) } @@ -168,13 +168,14 @@ type MockClient_SendTransaction_Call struct { // - ctx context.Context // - tx *types.Transaction // - attempt *types.Attempt -func (_e *MockClient_Expecter) SendTransaction(ctx interface{}, tx interface{}, attempt interface{}) *MockClient_SendTransaction_Call { - return &MockClient_SendTransaction_Call{Call: _e.mock.On("SendTransaction", ctx, tx, attempt)} +// - store txm.TxStore +func (_e *MockClient_Expecter) SendTransaction(ctx interface{}, tx interface{}, attempt interface{}, store interface{}) *MockClient_SendTransaction_Call { + return &MockClient_SendTransaction_Call{Call: _e.mock.On("SendTransaction", ctx, tx, attempt, store)} } -func (_c *MockClient_SendTransaction_Call) Run(run func(ctx context.Context, tx *types.Transaction, attempt *types.Attempt)) *MockClient_SendTransaction_Call { +func (_c *MockClient_SendTransaction_Call) Run(run func(ctx context.Context, tx *types.Transaction, attempt *types.Attempt, store TxStore)) *MockClient_SendTransaction_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*types.Transaction), args[2].(*types.Attempt)) + run(args[0].(context.Context), args[1].(*types.Transaction), args[2].(*types.Attempt), args[3].(TxStore)) }) return _c } @@ -184,7 +185,7 @@ func (_c *MockClient_SendTransaction_Call) Return(_a0 error) *MockClient_SendTra return _c } -func (_c *MockClient_SendTransaction_Call) RunAndReturn(run func(context.Context, *types.Transaction, *types.Attempt) error) *MockClient_SendTransaction_Call { +func (_c *MockClient_SendTransaction_Call) RunAndReturn(run func(context.Context, *types.Transaction, *types.Attempt, TxStore) error) *MockClient_SendTransaction_Call { _c.Call.Return(run) return _c } diff --git a/pkg/txm/mock_tx_store_test.go b/pkg/txm/mock_tx_store_test.go index 2c2388d93f..440fd0f965 100644 --- a/pkg/txm/mock_tx_store_test.go +++ b/pkg/txm/mock_tx_store_test.go @@ -357,6 +357,65 @@ func (_c *MockTxStore_FetchUnconfirmedTransactionAtNonceWithCount_Call) RunAndRe return _c } +// FetchUnconfirmedTransactions provides a mock function with given fields: _a0, _a1 +func (_m *MockTxStore) FetchUnconfirmedTransactions(_a0 context.Context, _a1 common.Address) ([]*types.Transaction, error) { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for FetchUnconfirmedTransactions") + } + + var r0 []*types.Transaction + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, common.Address) ([]*types.Transaction, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, common.Address) []*types.Transaction); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*types.Transaction) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, common.Address) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockTxStore_FetchUnconfirmedTransactions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FetchUnconfirmedTransactions' +type MockTxStore_FetchUnconfirmedTransactions_Call struct { + *mock.Call +} + +// FetchUnconfirmedTransactions is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 common.Address +func (_e *MockTxStore_Expecter) FetchUnconfirmedTransactions(_a0 interface{}, _a1 interface{}) *MockTxStore_FetchUnconfirmedTransactions_Call { + return &MockTxStore_FetchUnconfirmedTransactions_Call{Call: _e.mock.On("FetchUnconfirmedTransactions", _a0, _a1)} +} + +func (_c *MockTxStore_FetchUnconfirmedTransactions_Call) Run(run func(_a0 context.Context, _a1 common.Address)) *MockTxStore_FetchUnconfirmedTransactions_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.Address)) + }) + return _c +} + +func (_c *MockTxStore_FetchUnconfirmedTransactions_Call) Return(_a0 []*types.Transaction, _a1 error) *MockTxStore_FetchUnconfirmedTransactions_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockTxStore_FetchUnconfirmedTransactions_Call) RunAndReturn(run func(context.Context, common.Address) ([]*types.Transaction, error)) *MockTxStore_FetchUnconfirmedTransactions_Call { + _c.Call.Return(run) + return _c +} + // MarkConfirmedAndReorgedTransactions provides a mock function with given fields: _a0, _a1, _a2 func (_m *MockTxStore) MarkConfirmedAndReorgedTransactions(_a0 context.Context, _a1 uint64, _a2 common.Address) ([]*types.Transaction, []uint64, error) { ret := _m.Called(_a0, _a1, _a2) diff --git a/pkg/txm/storage/inmemory_store.go b/pkg/txm/storage/inmemory_store.go index a1123f8dcc..10832e0423 100644 --- a/pkg/txm/storage/inmemory_store.go +++ b/pkg/txm/storage/inmemory_store.go @@ -190,6 +190,26 @@ func (m *InMemoryStore) FetchUnconfirmedTransactionAtNonceWithCount(latestNonce return } +func (m *InMemoryStore) FetchUnconfirmedTransactions() ([]*types.Transaction, error) { + m.RLock() + defer m.RUnlock() + + transactions := make([]*types.Transaction, 0, len(m.UnconfirmedTransactions)) + for _, tx := range m.UnconfirmedTransactions { + if tx.Nonce == nil { + return nil, fmt.Errorf("unconfirmed transaction with ID %d has nil nonce", tx.ID) + } + transactions = append(transactions, tx.DeepCopy()) + } + + // Sort by nonce in ascending order + sort.Slice(transactions, func(i, j int) bool { + return *transactions[i].Nonce < *transactions[j].Nonce + }) + + return transactions, nil +} + func (m *InMemoryStore) MarkConfirmedAndReorgedTransactions(latestNonce uint64) ([]*types.Transaction, []uint64, error) { m.Lock() defer m.Unlock() diff --git a/pkg/txm/storage/inmemory_store_manager.go b/pkg/txm/storage/inmemory_store_manager.go index 99b4fdca7b..5d8ef25ed6 100644 --- a/pkg/txm/storage/inmemory_store_manager.go +++ b/pkg/txm/storage/inmemory_store_manager.go @@ -82,6 +82,13 @@ func (m *InMemoryStoreManager) FetchUnconfirmedTransactionAtNonceWithCount(_ con return nil, 0, fmt.Errorf(StoreNotFoundForAddress, fromAddress) } +func (m *InMemoryStoreManager) FetchUnconfirmedTransactions(_ context.Context, fromAddress common.Address) ([]*types.Transaction, error) { + if store, exists := m.InMemoryStoreMap[fromAddress]; exists { + return store.FetchUnconfirmedTransactions() + } + return nil, fmt.Errorf(StoreNotFoundForAddress, fromAddress) +} + func (m *InMemoryStoreManager) MarkConfirmedAndReorgedTransactions(_ context.Context, nonce uint64, fromAddress common.Address) (confirmedTxs []*types.Transaction, unconfirmedTxIDs []uint64, err error) { if store, exists := m.InMemoryStoreMap[fromAddress]; exists { confirmedTxs, unconfirmedTxIDs, err = store.MarkConfirmedAndReorgedTransactions(nonce) diff --git a/pkg/txm/txm.go b/pkg/txm/txm.go index eb65317e01..630b80e30e 100644 --- a/pkg/txm/txm.go +++ b/pkg/txm/txm.go @@ -30,7 +30,7 @@ const ( type Client interface { PendingNonceAt(context.Context, common.Address) (uint64, error) NonceAt(context.Context, common.Address, *big.Int) (uint64, error) - SendTransaction(ctx context.Context, tx *types.Transaction, attempt *types.Attempt) error + SendTransaction(ctx context.Context, tx *types.Transaction, attempt *types.Attempt, store TxStore) error } type TxStore interface { @@ -39,6 +39,7 @@ type TxStore interface { CreateEmptyUnconfirmedTransaction(context.Context, common.Address, uint64, uint64) (*types.Transaction, error) CreateTransaction(context.Context, *types.TxRequest) (*types.Transaction, error) FetchUnconfirmedTransactionAtNonceWithCount(context.Context, uint64, common.Address) (*types.Transaction, int, error) + FetchUnconfirmedTransactions(context.Context, common.Address) ([]*types.Transaction, error) MarkConfirmedAndReorgedTransactions(context.Context, uint64, common.Address) ([]*types.Transaction, []uint64, error) MarkUnconfirmedTransactionPurgeable(context.Context, uint64, common.Address) error UpdateTransactionBroadcast(context.Context, uint64, uint64, common.Hash, common.Address) error @@ -325,7 +326,7 @@ func (t *Txm) sendTransactionWithError(ctx context.Context, tx *types.Transactio return fmt.Errorf("nonce for txID: %v is empty", tx.ID) } start := time.Now() - txErr := t.client.SendTransaction(ctx, tx, attempt) + txErr := t.client.SendTransaction(ctx, tx, attempt, t.txStore) tx.AttemptCount++ t.lggr.Infow("Broadcasted attempt", "tx", tx, "attempt", attempt, "duration", time.Since(start), "txErr: ", txErr) if txErr != nil && t.errorHandler != nil { From b8d8882b8c4102a39377d2cd874bd4748bc7dc6f Mon Sep 17 00:00:00 2001 From: Dimitris Date: Wed, 4 Feb 2026 15:07:33 +0200 Subject: [PATCH 2/7] Pass txStore in constructor --- pkg/txm/clientwrappers/chain_client.go | 3 +- .../dualbroadcast/flashbots_client.go | 15 ++++-- .../clientwrappers/dualbroadcast/selector.go | 4 +- pkg/txm/mock_client_test.go | 21 ++++---- pkg/txm/mock_tx_store_test.go | 52 +++++++++++++++++++ 5 files changed, 75 insertions(+), 20 deletions(-) diff --git a/pkg/txm/clientwrappers/chain_client.go b/pkg/txm/clientwrappers/chain_client.go index f0e9330ddc..1f05f38165 100644 --- a/pkg/txm/clientwrappers/chain_client.go +++ b/pkg/txm/clientwrappers/chain_client.go @@ -7,7 +7,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/smartcontractkit/chainlink-evm/pkg/client" - "github.com/smartcontractkit/chainlink-evm/pkg/txm" "github.com/smartcontractkit/chainlink-evm/pkg/txm/types" ) @@ -27,6 +26,6 @@ func (c *ChainClient) PendingNonceAt(ctx context.Context, address common.Address return c.c.PendingNonceAt(ctx, address) } -func (c *ChainClient) SendTransaction(ctx context.Context, _ *types.Transaction, attempt *types.Attempt, txStore txm.TxStore) error { +func (c *ChainClient) SendTransaction(ctx context.Context, _ *types.Transaction, attempt *types.Attempt) error { return c.c.SendTransaction(ctx, attempt.SignedTransaction) } diff --git a/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go index 42db777e39..411ce37709 100644 --- a/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go +++ b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go @@ -23,17 +23,22 @@ import ( var _ txm.Client = &FlashbotsClient{} +type FlashbotsTxStore interface { + FetchUnconfirmedTransactions(context.Context, common.Address) ([]*types.Transaction, error) +} type FlashbotsClient struct { c client.Client keystore keys.MessageSigner customURL *url.URL + txStore FlashbotsTxStore } -func NewFlashbotsClient(c client.Client, keystore keys.MessageSigner, customURL *url.URL) *FlashbotsClient { +func NewFlashbotsClient(c client.Client, keystore keys.MessageSigner, customURL *url.URL, txStore FlashbotsTxStore) *FlashbotsClient { return &FlashbotsClient{ c: c, keystore: keystore, customURL: customURL, + txStore: txStore, } } @@ -55,7 +60,7 @@ func (d *FlashbotsClient) PendingNonceAt(ctx context.Context, address common.Add return nonce, nil } -func (d *FlashbotsClient) SendTransaction(ctx context.Context, tx *types.Transaction, attempt *types.Attempt, txStore txm.TxStore) error { +func (d *FlashbotsClient) SendTransaction(ctx context.Context, tx *types.Transaction, attempt *types.Attempt) error { meta, err := tx.GetMeta() if err != nil { return err @@ -77,7 +82,7 @@ func (d *FlashbotsClient) SendTransaction(ctx context.Context, tx *types.Transac } // After successfully sending the transaction, send a bundle with all unconfirmed transactions - _ = d.SendBundle(ctx, txStore, tx.FromAddress, params) + _ = d.SendBundle(ctx, tx.FromAddress, params) // Don't return bundle error - the single transaction was already sent successfully return nil } @@ -134,8 +139,8 @@ type postError struct { Message string `json:"message,omitempty"` } -func (d *FlashbotsClient) SendBundle(ctx context.Context, txStore txm.TxStore, fromAddress common.Address, urlParams string) error { - unconfirmedTxs, err := txStore.FetchUnconfirmedTransactions(ctx, fromAddress) +func (d *FlashbotsClient) SendBundle(ctx context.Context, fromAddress common.Address, urlParams string) error { + unconfirmedTxs, err := d.txStore.FetchUnconfirmedTransactions(ctx, fromAddress) if err != nil { return fmt.Errorf("failed to fetch unconfirmed transactions: %w", err) } diff --git a/pkg/txm/clientwrappers/dualbroadcast/selector.go b/pkg/txm/clientwrappers/dualbroadcast/selector.go index c99c41962a..77ad85ad86 100644 --- a/pkg/txm/clientwrappers/dualbroadcast/selector.go +++ b/pkg/txm/clientwrappers/dualbroadcast/selector.go @@ -12,11 +12,11 @@ import ( "github.com/smartcontractkit/chainlink-evm/pkg/txm" ) -func SelectClient(lggr logger.Logger, client client.Client, keyStore keys.ChainStore, url *url.URL, chainID *big.Int, txStore MetaClientTxStore) (txm.Client, txm.ErrorHandler, error) { +func SelectClient(lggr logger.Logger, client client.Client, keyStore keys.ChainStore, url *url.URL, chainID *big.Int, txStore txm.TxStore) (txm.Client, txm.ErrorHandler, error) { urlString := url.String() switch { case strings.Contains(urlString, "flashbots"): - return NewFlashbotsClient(client, keyStore, url), nil, nil + return NewFlashbotsClient(client, keyStore, url, txStore), nil, nil default: mc, err := NewMetaClient(lggr, client, keyStore, url, chainID, txStore) if err != nil { diff --git a/pkg/txm/mock_client_test.go b/pkg/txm/mock_client_test.go index 2bef743594..549a1c851d 100644 --- a/pkg/txm/mock_client_test.go +++ b/pkg/txm/mock_client_test.go @@ -141,17 +141,17 @@ func (_c *MockClient_PendingNonceAt_Call) RunAndReturn(run func(context.Context, return _c } -// SendTransaction provides a mock function with given fields: ctx, tx, attempt, store -func (_m *MockClient) SendTransaction(ctx context.Context, tx *types.Transaction, attempt *types.Attempt, store TxStore) error { - ret := _m.Called(ctx, tx, attempt, store) +// SendTransaction provides a mock function with given fields: ctx, tx, attempt +func (_m *MockClient) SendTransaction(ctx context.Context, tx *types.Transaction, attempt *types.Attempt) error { + ret := _m.Called(ctx, tx, attempt) if len(ret) == 0 { panic("no return value specified for SendTransaction") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *types.Transaction, *types.Attempt, TxStore) error); ok { - r0 = rf(ctx, tx, attempt, store) + if rf, ok := ret.Get(0).(func(context.Context, *types.Transaction, *types.Attempt) error); ok { + r0 = rf(ctx, tx, attempt) } else { r0 = ret.Error(0) } @@ -168,14 +168,13 @@ type MockClient_SendTransaction_Call struct { // - ctx context.Context // - tx *types.Transaction // - attempt *types.Attempt -// - store txm.TxStore -func (_e *MockClient_Expecter) SendTransaction(ctx interface{}, tx interface{}, attempt interface{}, store interface{}) *MockClient_SendTransaction_Call { - return &MockClient_SendTransaction_Call{Call: _e.mock.On("SendTransaction", ctx, tx, attempt, store)} +func (_e *MockClient_Expecter) SendTransaction(ctx interface{}, tx interface{}, attempt interface{}) *MockClient_SendTransaction_Call { + return &MockClient_SendTransaction_Call{Call: _e.mock.On("SendTransaction", ctx, tx, attempt)} } -func (_c *MockClient_SendTransaction_Call) Run(run func(ctx context.Context, tx *types.Transaction, attempt *types.Attempt, store TxStore)) *MockClient_SendTransaction_Call { +func (_c *MockClient_SendTransaction_Call) Run(run func(ctx context.Context, tx *types.Transaction, attempt *types.Attempt)) *MockClient_SendTransaction_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*types.Transaction), args[2].(*types.Attempt), args[3].(TxStore)) + run(args[0].(context.Context), args[1].(*types.Transaction), args[2].(*types.Attempt)) }) return _c } @@ -185,7 +184,7 @@ func (_c *MockClient_SendTransaction_Call) Return(_a0 error) *MockClient_SendTra return _c } -func (_c *MockClient_SendTransaction_Call) RunAndReturn(run func(context.Context, *types.Transaction, *types.Attempt, TxStore) error) *MockClient_SendTransaction_Call { +func (_c *MockClient_SendTransaction_Call) RunAndReturn(run func(context.Context, *types.Transaction, *types.Attempt) error) *MockClient_SendTransaction_Call { _c.Call.Return(run) return _c } diff --git a/pkg/txm/mock_tx_store_test.go b/pkg/txm/mock_tx_store_test.go index f7d6488b97..aac0e4f39c 100644 --- a/pkg/txm/mock_tx_store_test.go +++ b/pkg/txm/mock_tx_store_test.go @@ -7,6 +7,8 @@ import ( common "github.com/ethereum/go-ethereum/common" + coretypes "github.com/ethereum/go-ethereum/core/types" + mock "github.com/stretchr/testify/mock" types "github.com/smartcontractkit/chainlink-evm/pkg/txm/types" @@ -593,6 +595,56 @@ func (_c *MockTxStore_MarkUnconfirmedTransactionPurgeable_Call) RunAndReturn(run return _c } +// UpdateSignedAttempt provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4 +func (_m *MockTxStore) UpdateSignedAttempt(_a0 context.Context, _a1 uint64, _a2 uint64, _a3 *coretypes.Transaction, _a4 common.Address) error { + ret := _m.Called(_a0, _a1, _a2, _a3, _a4) + + if len(ret) == 0 { + panic("no return value specified for UpdateSignedAttempt") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64, *coretypes.Transaction, common.Address) error); ok { + r0 = rf(_a0, _a1, _a2, _a3, _a4) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockTxStore_UpdateSignedAttempt_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateSignedAttempt' +type MockTxStore_UpdateSignedAttempt_Call struct { + *mock.Call +} + +// UpdateSignedAttempt is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 uint64 +// - _a2 uint64 +// - _a3 *coretypes.Transaction +// - _a4 common.Address +func (_e *MockTxStore_Expecter) UpdateSignedAttempt(_a0 interface{}, _a1 interface{}, _a2 interface{}, _a3 interface{}, _a4 interface{}) *MockTxStore_UpdateSignedAttempt_Call { + return &MockTxStore_UpdateSignedAttempt_Call{Call: _e.mock.On("UpdateSignedAttempt", _a0, _a1, _a2, _a3, _a4)} +} + +func (_c *MockTxStore_UpdateSignedAttempt_Call) Run(run func(_a0 context.Context, _a1 uint64, _a2 uint64, _a3 *coretypes.Transaction, _a4 common.Address)) *MockTxStore_UpdateSignedAttempt_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(uint64), args[2].(uint64), args[3].(*coretypes.Transaction), args[4].(common.Address)) + }) + return _c +} + +func (_c *MockTxStore_UpdateSignedAttempt_Call) Return(_a0 error) *MockTxStore_UpdateSignedAttempt_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTxStore_UpdateSignedAttempt_Call) RunAndReturn(run func(context.Context, uint64, uint64, *coretypes.Transaction, common.Address) error) *MockTxStore_UpdateSignedAttempt_Call { + _c.Call.Return(run) + return _c +} + // UpdateTransactionBroadcast provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4 func (_m *MockTxStore) UpdateTransactionBroadcast(_a0 context.Context, _a1 uint64, _a2 uint64, _a3 common.Hash, _a4 common.Address) error { ret := _m.Called(_a0, _a1, _a2, _a3, _a4) From 2bf7ec73ab5800d45745ac07034bde212058bdd6 Mon Sep 17 00:00:00 2001 From: Dimitris Date: Thu, 12 Feb 2026 16:10:27 +0200 Subject: [PATCH 3/7] Update Flashbots client for bundles --- .../dualbroadcast/flashbots_client.go | 86 +++++++++++++------ .../clientwrappers/dualbroadcast/selector.go | 2 +- 2 files changed, 62 insertions(+), 26 deletions(-) diff --git a/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go index 411ce37709..be8eef5465 100644 --- a/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go +++ b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go @@ -10,31 +10,42 @@ import ( "math/big" "net/http" "net/url" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + evmtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" - "github.com/smartcontractkit/chainlink-evm/pkg/client" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-evm/pkg/keys" - "github.com/smartcontractkit/chainlink-evm/pkg/txm" "github.com/smartcontractkit/chainlink-evm/pkg/txm/types" ) -var _ txm.Client = &FlashbotsClient{} +const flashbotsRPCTimeout = 10 * time.Second type FlashbotsTxStore interface { FetchUnconfirmedTransactions(context.Context, common.Address) ([]*types.Transaction, error) } + +type FlashbotsClientRPC interface { + BlockByNumber(ctx context.Context, number *big.Int) (*evmtypes.Block, error) + NonceAt(context.Context, common.Address, *big.Int) (uint64, error) + SendTransaction(context.Context, *evmtypes.Transaction) error +} + type FlashbotsClient struct { - c client.Client + lggr logger.SugaredLogger + c FlashbotsClientRPC keystore keys.MessageSigner customURL *url.URL txStore FlashbotsTxStore } -func NewFlashbotsClient(c client.Client, keystore keys.MessageSigner, customURL *url.URL, txStore FlashbotsTxStore) *FlashbotsClient { +func NewFlashbotsClient(lggr logger.Logger, c FlashbotsClientRPC, keystore keys.MessageSigner, customURL *url.URL, txStore FlashbotsTxStore) *FlashbotsClient { return &FlashbotsClient{ + lggr: logger.Sugared(logger.Named(lggr, "Txm.FlashbotsClient")), c: c, keystore: keystore, customURL: customURL, @@ -47,15 +58,21 @@ func (d *FlashbotsClient) NonceAt(ctx context.Context, address common.Address, b } func (d *FlashbotsClient) PendingNonceAt(ctx context.Context, address common.Address) (uint64, error) { + ctx, cancel := context.WithTimeout(ctx, flashbotsRPCTimeout) + defer cancel() body := []byte(fmt.Sprintf(`{"jsonrpc":"2.0","method":"eth_getTransactionCount","params":["%s","pending"], "id":1}`, address.String())) - response, err := d.signAndPostMessage(ctx, address, body, "") + raw, err := d.signAndPostMessage(ctx, address, body, "") if err != nil { return 0, err } - nonce, err := hexutil.DecodeUint64(response) + var resultStr string + if err := json.Unmarshal(raw, &resultStr); err != nil { + return 0, fmt.Errorf("failed to unmarshal response into string: %w", err) + } + nonce, err := hexutil.DecodeUint64(resultStr) if err != nil { - return 0, fmt.Errorf("failed to decode response %v into uint64: %w", response, err) + return 0, fmt.Errorf("failed to decode response %v into uint64: %w", resultStr, err) } return nonce, nil } @@ -82,25 +99,29 @@ func (d *FlashbotsClient) SendTransaction(ctx context.Context, tx *types.Transac } // After successfully sending the transaction, send a bundle with all unconfirmed transactions - _ = d.SendBundle(ctx, tx.FromAddress, params) - // Don't return bundle error - the single transaction was already sent successfully + // Don't act on a bundle error - this is a fire and forget operation but we do want to log the error. + if err := d.SendBundle(ctx, tx.FromAddress, params); err != nil { + d.lggr.Error("error sending bundle: ", err) + } return nil } return d.c.SendTransaction(ctx, attempt.SignedTransaction) } -func (d *FlashbotsClient) signAndPostMessage(ctx context.Context, address common.Address, body []byte, urlParams string) (result string, err error) { +func (d *FlashbotsClient) signAndPostMessage(ctx context.Context, address common.Address, body []byte, urlParams string) (json.RawMessage, error) { + ctx, cancel := context.WithTimeout(ctx, flashbotsRPCTimeout) + defer cancel() bodyReader := bytes.NewReader(body) postReq, err := http.NewRequestWithContext(ctx, http.MethodPost, d.customURL.String()+"?"+urlParams, bodyReader) if err != nil { - return + return nil, err } hashedBody := crypto.Keccak256Hash(body).Hex() signedMessage, err := d.keystore.SignMessage(ctx, address, []byte(hashedBody)) if err != nil { - return + return nil, err } postReq.Header.Add("X-Flashbots-signature", address.String()+":"+hexutil.Encode(signedMessage)) @@ -108,30 +129,30 @@ func (d *FlashbotsClient) signAndPostMessage(ctx context.Context, address common resp, err := http.DefaultClient.Do(postReq) if err != nil { - return result, fmt.Errorf("request %v failed: %w", postReq, err) + return nil, fmt.Errorf("request %v failed: %w", postReq, err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return result, fmt.Errorf("request %v failed with status: %d", postReq, resp.StatusCode) + return nil, fmt.Errorf("request %v failed with status: %d", postReq, resp.StatusCode) } keyJSON, err := io.ReadAll(resp.Body) if err != nil { - return + return nil, err } var response postResponse err = json.Unmarshal(keyJSON, &response) if err != nil { - return result, fmt.Errorf("failed to unmarshal response into struct: %w: %s", err, string(keyJSON)) + return nil, fmt.Errorf("failed to unmarshal response into struct: %w: %s", err, string(keyJSON)) } if response.Error.Message != "" { - return result, errors.New(response.Error.Message) + return nil, errors.New(response.Error.Message) } return response.Result, nil } type postResponse struct { - Result string `json:"result,omitempty"` + Result json.RawMessage `json:"result,omitempty"` Error postError } @@ -147,9 +168,13 @@ func (d *FlashbotsClient) SendBundle(ctx context.Context, fromAddress common.Add // TODO: Get the first attempt from each transaction for now. attempts := make([]*types.Attempt, 0, len(unconfirmedTxs)) + nonces := make([]uint64, 0, len(unconfirmedTxs)) for _, unconfirmedTx := range unconfirmedTxs { if len(unconfirmedTx.Attempts) > 0 { attempts = append(attempts, unconfirmedTx.Attempts[0]) + if unconfirmedTx.Nonce != nil { + nonces = append(nonces, *unconfirmedTx.Nonce) + } } } @@ -159,11 +184,11 @@ func (d *FlashbotsClient) SendBundle(ctx context.Context, fromAddress common.Add } // TODO: we don't have a good way to get this other than making an RPC call. Some async caching may help with the overhead. - currentBlock, err := d.c.LatestBlockHeight(ctx) + currentBlock, err := d.c.BlockByNumber(ctx, nil) if err != nil { return fmt.Errorf("failed to get current block height: %w", err) } - targetBlock := new(big.Int).Add(currentBlock, big.NewInt(1)) + targetBlock := currentBlock.NumberU64() + 10 bodyItems := make([]map[string]any, 0, len(attempts)) for _, attempt := range attempts { @@ -178,14 +203,14 @@ func (d *FlashbotsClient) SendBundle(ctx context.Context, fromAddress common.Add bodyItems = append(bodyItems, map[string]interface{}{ "tx": hexutil.Encode(txData), - "canRevert": false, + "canRevert": true, }) } bundleParams := map[string]interface{}{ "version": "v0.1", "inclusion": map[string]interface{}{ - "block": hexutil.EncodeBig(targetBlock), + "block": hexutil.EncodeBig(new(big.Int).SetUint64(targetBlock)), }, "body": bodyItems, } @@ -202,6 +227,17 @@ func (d *FlashbotsClient) SendBundle(ctx context.Context, fromAddress common.Add return fmt.Errorf("failed to marshal bundle request: %w", err) } - _, err = d.signAndPostMessage(ctx, fromAddress, bodyBytes, urlParams) - return err + raw, err := d.signAndPostMessage(ctx, fromAddress, bodyBytes, urlParams) + if err != nil { + return err + } + + var bundleResult struct { + BundleHash string `json:"bundleHash"` + } + if err := json.Unmarshal(raw, &bundleResult); err != nil { + return fmt.Errorf("failed to decode response %v into bundle result: %w", raw, err) + } + d.lggr.Infow("Broadcasted transaction bundle", "nonces", nonces, "bundleHash", bundleResult.BundleHash) + return nil } diff --git a/pkg/txm/clientwrappers/dualbroadcast/selector.go b/pkg/txm/clientwrappers/dualbroadcast/selector.go index 77ad85ad86..7244dc29d1 100644 --- a/pkg/txm/clientwrappers/dualbroadcast/selector.go +++ b/pkg/txm/clientwrappers/dualbroadcast/selector.go @@ -16,7 +16,7 @@ func SelectClient(lggr logger.Logger, client client.Client, keyStore keys.ChainS urlString := url.String() switch { case strings.Contains(urlString, "flashbots"): - return NewFlashbotsClient(client, keyStore, url, txStore), nil, nil + return NewFlashbotsClient(lggr, client, keyStore, url, txStore), nil, nil default: mc, err := NewMetaClient(lggr, client, keyStore, url, chainID, txStore) if err != nil { From 6fe2367f45414c9fe69ac0f731701187560cf354 Mon Sep 17 00:00:00 2001 From: Dimitris Date: Tue, 17 Feb 2026 13:25:58 +0200 Subject: [PATCH 4/7] Update bundle body --- .../dualbroadcast/flashbots_client.go | 84 ++++++++++++++++--- 1 file changed, 73 insertions(+), 11 deletions(-) diff --git a/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go index be8eef5465..22b7a0450b 100644 --- a/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go +++ b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go @@ -10,6 +10,8 @@ import ( "math/big" "net/http" "net/url" + "strconv" + "strings" "time" "github.com/ethereum/go-ethereum/common" @@ -125,6 +127,7 @@ func (d *FlashbotsClient) signAndPostMessage(ctx context.Context, address common } postReq.Header.Add("X-Flashbots-signature", address.String()+":"+hexutil.Encode(signedMessage)) + postReq.Header.Add("X-Flashbots-Origin", "chainlink") postReq.Header.Add("Content-Type", "application/json") resp, err := http.DefaultClient.Do(postReq) @@ -188,7 +191,7 @@ func (d *FlashbotsClient) SendBundle(ctx context.Context, fromAddress common.Add if err != nil { return fmt.Errorf("failed to get current block height: %w", err) } - targetBlock := currentBlock.NumberU64() + 10 + targetBlock := currentBlock.NumberU64() + 24 bodyItems := make([]map[string]any, 0, len(attempts)) for _, attempt := range attempts { @@ -201,21 +204,32 @@ func (d *FlashbotsClient) SendBundle(ctx context.Context, fromAddress common.Add return fmt.Errorf("failed to marshal transaction for attempt ID %d: %w", attempt.ID, err) } - bodyItems = append(bodyItems, map[string]interface{}{ - "tx": hexutil.Encode(txData), - "canRevert": true, + bodyItems = append(bodyItems, map[string]any{ + "tx": hexutil.Encode(txData), + "revertMode": "allow", // we always want to allow reverts so bundles are valid even if a single transaction within the bundle goes through }) } + privacy, refundConfig, err := parseURLParams(urlParams) + if err != nil { + return err + } - bundleParams := map[string]interface{}{ - "version": "v0.1", - "inclusion": map[string]interface{}{ - "block": hexutil.EncodeBig(new(big.Int).SetUint64(targetBlock)), - }, + bundleParams := map[string]any{ "body": bodyItems, + "inclusion": map[string]any{ + "block": hexutil.EncodeBig(new(big.Int).SetUint64(currentBlock.NumberU64())), + "maxBlock": hexutil.EncodeBig(new(big.Int).SetUint64(targetBlock)), + }, + "privacy": privacy, + "version": "v0.1", + } + if refundConfig.Address != "" { + bundleParams["validity"] = map[string]any{ + "refundConfig": []any{refundConfig}, + } } - requestBody := map[string]interface{}{ + requestBody := map[string]any{ "jsonrpc": "2.0", "id": 1, "method": "mev_sendBundle", @@ -227,7 +241,7 @@ func (d *FlashbotsClient) SendBundle(ctx context.Context, fromAddress common.Add return fmt.Errorf("failed to marshal bundle request: %w", err) } - raw, err := d.signAndPostMessage(ctx, fromAddress, bodyBytes, urlParams) + raw, err := d.signAndPostMessage(ctx, fromAddress, bodyBytes, "") if err != nil { return err } @@ -241,3 +255,51 @@ func (d *FlashbotsClient) SendBundle(ctx context.Context, fromAddress common.Add d.lggr.Infow("Broadcasted transaction bundle", "nonces", nonces, "bundleHash", bundleResult.BundleHash) return nil } + +func parseURLParams(params string) (Privacy, RefundConfig, error) { + values, err := url.ParseQuery(params) + if err != nil { + return Privacy{}, RefundConfig{}, fmt.Errorf("unable to parse params: %w", err) + } + + privacy := Privacy{} + if timeout, err := strconv.Atoi(values.Get("auctionTimeout")); err == nil { + privacy.AuctionTimeout = timeout + } + + privacy.Builders = append(privacy.Builders, values["builder"]...) + + privacy.Hints = append(privacy.Hints, values["hint"]...) + + refundConfig := RefundConfig{} + refundRaw := values.Get("refund") + if refundRaw != "" { + parts := strings.Split(refundRaw, ":") + if len(parts) == 2 { + address := parts[0] + percentVal, err := strconv.Atoi(parts[1]) + if err != nil { + return Privacy{}, RefundConfig{}, fmt.Errorf("unable to parse percentage: %w", err) + } + + privacy.WantRefund = percentVal + refundConfig = RefundConfig{ + Address: address, + Percent: 100, // wantRefund is an absolute percent of the kickback, and refundConfig.percent=100 means entire refund goes to this address (no longer supported) + } + } + } + return privacy, refundConfig, nil +} + +type Privacy struct { + WantRefund int `json:"wantRefund"` + AuctionTimeout int `json:"auctionTimeout"` + Builders []string `json:"builders"` + Hints []string `json:"hints"` +} + +type RefundConfig struct { + Address string `json:"address"` + Percent int `json:"percent"` +} From a1d5fa828420a647f5da025e3b51ffbcdeb89403 Mon Sep 17 00:00:00 2001 From: Dimitris Date: Wed, 18 Feb 2026 11:59:30 +0200 Subject: [PATCH 5/7] Add bundle config --- CONFIG.md | 7 +++++++ pkg/config/chain_scoped_transactions.go | 4 ++++ pkg/config/config.go | 1 + pkg/config/toml/config.go | 4 ++++ pkg/config/toml/config_test.go | 3 +++ pkg/config/toml/docs.toml | 2 ++ pkg/config/toml/testdata/config-full.toml | 1 + .../clientwrappers/dualbroadcast/flashbots_client.go | 11 ++++++++--- pkg/txm/clientwrappers/dualbroadcast/selector.go | 4 ++-- pkg/txmgr/builder.go | 2 +- 10 files changed, 33 insertions(+), 6 deletions(-) diff --git a/CONFIG.md b/CONFIG.md index b5eaaf1354..d897fbc461 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -390,6 +390,7 @@ Enabled = false # Default BlockTime = '10s' # Example CustomURL = 'https://example.api.io' # Example DualBroadcast = false # Example +Bundles = false # Example ``` @@ -417,6 +418,12 @@ DualBroadcast = false # Example ``` DualBroadcast enables DualBroadcast functionality. +### Bundles +```toml +Bundles = false # Example +``` +Bundles enables sending bundles for auctioning (not compatible with all OFAs). + ## BalanceMonitor ```toml [BalanceMonitor] diff --git a/pkg/config/chain_scoped_transactions.go b/pkg/config/chain_scoped_transactions.go index 72220f7471..7a70ce140c 100644 --- a/pkg/config/chain_scoped_transactions.go +++ b/pkg/config/chain_scoped_transactions.go @@ -64,6 +64,10 @@ func (t *transactionManagerV2Config) DualBroadcast() *bool { return t.c.DualBroadcast } +func (t *transactionManagerV2Config) Bundles() *bool { + return t.c.Bundles +} + func (t *transactionsConfig) AutoPurge() AutoPurgeConfig { return &autoPurgeConfig{c: t.c.AutoPurge} } diff --git a/pkg/config/config.go b/pkg/config/config.go index 9593c96d96..97e7025f62 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -133,6 +133,7 @@ type TransactionManagerV2 interface { BlockTime() *time.Duration CustomURL() *url.URL DualBroadcast() *bool + Bundles() *bool } type GasEstimator interface { diff --git a/pkg/config/toml/config.go b/pkg/config/toml/config.go index 1e70fe46a1..7727b22062 100644 --- a/pkg/config/toml/config.go +++ b/pkg/config/toml/config.go @@ -589,6 +589,7 @@ type TransactionManagerV2Config struct { BlockTime *commonconfig.Duration `toml:",omitempty"` CustomURL *commonconfig.URL `toml:",omitempty"` DualBroadcast *bool `toml:",omitempty"` + Bundles *bool `toml:",omitempty"` } func (t *TransactionManagerV2Config) setFrom(f *TransactionManagerV2Config) { @@ -604,6 +605,9 @@ func (t *TransactionManagerV2Config) setFrom(f *TransactionManagerV2Config) { if v := f.DualBroadcast; v != nil { t.DualBroadcast = f.DualBroadcast } + if v := f.Bundles; v != nil { + t.Bundles = f.Bundles + } } func (t *TransactionManagerV2Config) ValidateConfig() (err error) { diff --git a/pkg/config/toml/config_test.go b/pkg/config/toml/config_test.go index ff92277074..aaaa821cb1 100644 --- a/pkg/config/toml/config_test.go +++ b/pkg/config/toml/config_test.go @@ -64,6 +64,7 @@ func TestDefaults_fieldsNotNil(t *testing.T) { unknown.Transactions.TransactionManagerV2.BlockTime = new(config.Duration) unknown.Transactions.TransactionManagerV2.CustomURL = new(config.URL) unknown.Transactions.TransactionManagerV2.DualBroadcast = ptr(false) + unknown.Transactions.TransactionManagerV2.Bundles = ptr(false) unknown.Transactions.AutoPurge.Threshold = ptr(uint32(0)) unknown.Transactions.AutoPurge.MinAttempts = ptr(uint32(0)) unknown.Transactions.AutoPurge.DetectionApiUrl = new(config.URL) @@ -159,6 +160,7 @@ func TestDocs(t *testing.T) { docDefaults.Transactions.TransactionManagerV2.BlockTime = nil docDefaults.Transactions.TransactionManagerV2.CustomURL = nil docDefaults.Transactions.TransactionManagerV2.DualBroadcast = nil + docDefaults.Transactions.TransactionManagerV2.Bundles = nil // Fallback DA oracle is not set docDefaults.GasEstimator.DAOracle = DAOracle{} @@ -283,6 +285,7 @@ var fullConfig = EVMConfig{ TransactionManagerV2: TransactionManagerV2Config{ Enabled: ptr(false), DualBroadcast: ptr(true), + Bundles: ptr(false), BlockTime: config.MustNewDuration(42 * time.Second), CustomURL: config.MustParseURL("http://txs.org"), }, diff --git a/pkg/config/toml/docs.toml b/pkg/config/toml/docs.toml index 72947f8394..ea088098d5 100644 --- a/pkg/config/toml/docs.toml +++ b/pkg/config/toml/docs.toml @@ -171,6 +171,8 @@ BlockTime = '10s' # Example CustomURL = 'https://example.api.io' # Example # DualBroadcast enables DualBroadcast functionality. DualBroadcast = false # Example +# Bundles enables sending bundles for auctioning (not compatible with all OFAs). +Bundles = false # Example [BalanceMonitor] # Enabled balance monitoring for all keys. diff --git a/pkg/config/toml/testdata/config-full.toml b/pkg/config/toml/testdata/config-full.toml index 0284665631..3647cd4cba 100644 --- a/pkg/config/toml/testdata/config-full.toml +++ b/pkg/config/toml/testdata/config-full.toml @@ -47,6 +47,7 @@ Enabled = false BlockTime = '42s' CustomURL = 'http://txs.org' DualBroadcast = true +Bundles = false [BalanceMonitor] Enabled = true diff --git a/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go index 22b7a0450b..88235baf5c 100644 --- a/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go +++ b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go @@ -43,15 +43,18 @@ type FlashbotsClient struct { keystore keys.MessageSigner customURL *url.URL txStore FlashbotsTxStore + bundles bool } -func NewFlashbotsClient(lggr logger.Logger, c FlashbotsClientRPC, keystore keys.MessageSigner, customURL *url.URL, txStore FlashbotsTxStore) *FlashbotsClient { +func NewFlashbotsClient(lggr logger.Logger, c FlashbotsClientRPC, keystore keys.MessageSigner, customURL *url.URL, txStore FlashbotsTxStore, bundles *bool) *FlashbotsClient { + b := bundles != nil && *bundles return &FlashbotsClient{ lggr: logger.Sugared(logger.Named(lggr, "Txm.FlashbotsClient")), c: c, keystore: keystore, customURL: customURL, txStore: txStore, + bundles: b, } } @@ -102,8 +105,10 @@ func (d *FlashbotsClient) SendTransaction(ctx context.Context, tx *types.Transac // After successfully sending the transaction, send a bundle with all unconfirmed transactions // Don't act on a bundle error - this is a fire and forget operation but we do want to log the error. - if err := d.SendBundle(ctx, tx.FromAddress, params); err != nil { - d.lggr.Error("error sending bundle: ", err) + if d.bundles { + if err := d.SendBundle(ctx, tx.FromAddress, params); err != nil { + d.lggr.Error("error sending bundle: ", err) + } } return nil } diff --git a/pkg/txm/clientwrappers/dualbroadcast/selector.go b/pkg/txm/clientwrappers/dualbroadcast/selector.go index 7244dc29d1..f42e187d7f 100644 --- a/pkg/txm/clientwrappers/dualbroadcast/selector.go +++ b/pkg/txm/clientwrappers/dualbroadcast/selector.go @@ -12,11 +12,11 @@ import ( "github.com/smartcontractkit/chainlink-evm/pkg/txm" ) -func SelectClient(lggr logger.Logger, client client.Client, keyStore keys.ChainStore, url *url.URL, chainID *big.Int, txStore txm.TxStore) (txm.Client, txm.ErrorHandler, error) { +func SelectClient(lggr logger.Logger, client client.Client, keyStore keys.ChainStore, url *url.URL, chainID *big.Int, txStore txm.TxStore, bundles *bool) (txm.Client, txm.ErrorHandler, error) { urlString := url.String() switch { case strings.Contains(urlString, "flashbots"): - return NewFlashbotsClient(lggr, client, keyStore, url, txStore), nil, nil + return NewFlashbotsClient(lggr, client, keyStore, url, txStore, bundles), nil, nil default: mc, err := NewMetaClient(lggr, client, keyStore, url, chainID, txStore) if err != nil { diff --git a/pkg/txmgr/builder.go b/pkg/txmgr/builder.go index 441d7edb97..02cbbb0267 100644 --- a/pkg/txmgr/builder.go +++ b/pkg/txmgr/builder.go @@ -151,7 +151,7 @@ func NewTxmV2( var c txm.Client if txmV2Config.DualBroadcast() != nil && *txmV2Config.DualBroadcast() && txmV2Config.CustomURL() != nil { var err error - c, eh, err = dualbroadcast.SelectClient(lggr, client, keyStore, txmV2Config.CustomURL(), chainID, inMemoryStoreManager) + c, eh, err = dualbroadcast.SelectClient(lggr, client, keyStore, txmV2Config.CustomURL(), chainID, inMemoryStoreManager, txmV2Config.Bundles()) if err != nil { return nil, fmt.Errorf("failed to create dual broadcast client: %w", err) } From b017969c2e567c44619ce1af96094ae7ab89cdeb Mon Sep 17 00:00:00 2001 From: Dimitris Date: Wed, 18 Feb 2026 17:45:48 +0200 Subject: [PATCH 6/7] Fix failed request error message format --- pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go index 88235baf5c..48cafb7f94 100644 --- a/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go +++ b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go @@ -135,13 +135,14 @@ func (d *FlashbotsClient) signAndPostMessage(ctx context.Context, address common postReq.Header.Add("X-Flashbots-Origin", "chainlink") postReq.Header.Add("Content-Type", "application/json") + reqDesc := fmt.Sprintf("%s %s body: %s", postReq.Method, postReq.URL.String(), string(body)) resp, err := http.DefaultClient.Do(postReq) if err != nil { - return nil, fmt.Errorf("request %v failed: %w", postReq, err) + return nil, fmt.Errorf("request %s failed: %w", reqDesc, err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("request %v failed with status: %d", postReq, resp.StatusCode) + return nil, fmt.Errorf("request %s failed with status: %d", reqDesc, resp.StatusCode) } keyJSON, err := io.ReadAll(resp.Body) From 7111a3e874927dd24e1bc277b7385cc2b6fa4a45 Mon Sep 17 00:00:00 2001 From: Dimitris Date: Thu, 19 Feb 2026 14:20:05 +0200 Subject: [PATCH 7/7] Add parsing tests --- .../dualbroadcast/flashbots_client.go | 9 +- .../dualbroadcast/flashbots_client_test.go | 112 ++++++++++++++++++ 2 files changed, 118 insertions(+), 3 deletions(-) create mode 100644 pkg/txm/clientwrappers/dualbroadcast/flashbots_client_test.go diff --git a/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go index 48cafb7f94..e0e24d8878 100644 --- a/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go +++ b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go @@ -169,13 +169,16 @@ type postError struct { Message string `json:"message,omitempty"` } +// SendBundle sends a bundle of all the in-flight transactions. func (d *FlashbotsClient) SendBundle(ctx context.Context, fromAddress common.Address, urlParams string) error { unconfirmedTxs, err := d.txStore.FetchUnconfirmedTransactions(ctx, fromAddress) if err != nil { return fmt.Errorf("failed to fetch unconfirmed transactions: %w", err) } - // TODO: Get the first attempt from each transaction for now. + // We fetch all the unconfirmed transactions in an ascending nonce order. + // For the bundle we need a signed transaction so we get the first attempt from each transaction. + // TODO: Implement a more sophisticated attempt selection logic if necessary. attempts := make([]*types.Attempt, 0, len(unconfirmedTxs)) nonces := make([]uint64, 0, len(unconfirmedTxs)) for _, unconfirmedTx := range unconfirmedTxs { @@ -197,7 +200,7 @@ func (d *FlashbotsClient) SendBundle(ctx context.Context, fromAddress common.Add if err != nil { return fmt.Errorf("failed to get current block height: %w", err) } - targetBlock := currentBlock.NumberU64() + 24 + maxBlock := currentBlock.NumberU64() + 24 bodyItems := make([]map[string]any, 0, len(attempts)) for _, attempt := range attempts { @@ -224,7 +227,7 @@ func (d *FlashbotsClient) SendBundle(ctx context.Context, fromAddress common.Add "body": bodyItems, "inclusion": map[string]any{ "block": hexutil.EncodeBig(new(big.Int).SetUint64(currentBlock.NumberU64())), - "maxBlock": hexutil.EncodeBig(new(big.Int).SetUint64(targetBlock)), + "maxBlock": hexutil.EncodeBig(new(big.Int).SetUint64(maxBlock)), }, "privacy": privacy, "version": "v0.1", diff --git a/pkg/txm/clientwrappers/dualbroadcast/flashbots_client_test.go b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client_test.go new file mode 100644 index 0000000000..67bd4476f1 --- /dev/null +++ b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client_test.go @@ -0,0 +1,112 @@ +package dualbroadcast + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParseURLParams(t *testing.T) { + tests := []struct { + name string + params string + wantPrivacy Privacy + wantRefund RefundConfig + wantErr bool + wantErrContain string + }{ + { + name: "empty params", + params: "", + wantPrivacy: Privacy{}, + wantRefund: RefundConfig{}, + }, + { + name: "auctionTimeout", + params: "auctionTimeout=60", + wantPrivacy: Privacy{AuctionTimeout: 60}, + wantRefund: RefundConfig{}, + }, + { + name: "auctionTimeout invalid ignored", + params: "auctionTimeout=notanint", + wantPrivacy: Privacy{}, + wantRefund: RefundConfig{}, + }, + { + name: "single builder", + params: "builder=test_builder", + wantPrivacy: Privacy{Builders: []string{"test_builder"}}, + wantRefund: RefundConfig{}, + }, + { + name: "multiple builders", + params: "builder=test_builder_1&builder=test_builder_2", + wantPrivacy: Privacy{Builders: []string{"test_builder_1", "test_builder_2"}}, + wantRefund: RefundConfig{}, + }, + { + name: "single hint", + params: "hint=calldata", + wantPrivacy: Privacy{Hints: []string{"calldata"}}, + wantRefund: RefundConfig{}, + }, + { + name: "multiple hints", + params: "hint=calldata&hint=hash", + wantPrivacy: Privacy{Hints: []string{"calldata", "hash"}}, + wantRefund: RefundConfig{}, + }, + { + name: "refund valid", + params: "refund=0xRefundAddr:50", + wantPrivacy: Privacy{WantRefund: 50}, + wantRefund: RefundConfig{Address: "0xRefundAddr", Percent: 100}, + }, + { + name: "refund invalid percent", + params: "refund=0xRefundAddr:bad", + wantErr: true, + wantErrContain: "unable to parse percentage", + }, + { + name: "refund single part ignored", + params: "refund=0xRefundAddr", + wantPrivacy: Privacy{}, + wantRefund: RefundConfig{}, + }, + { + name: "invalid query", + params: "%", + wantErr: true, + wantErrContain: "unable to parse params", + }, + { + name: "combined params", + params: "auctionTimeout=120&builder=test_builder_1&builder=test_builder_2&hint=h1&refund=0xR:75", + wantPrivacy: Privacy{ + AuctionTimeout: 120, + Builders: []string{"test_builder_1", "test_builder_2"}, + Hints: []string{"h1"}, + WantRefund: 75, + }, + wantRefund: RefundConfig{Address: "0xR", Percent: 100}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + privacy, refund, err := parseURLParams(tt.params) + if tt.wantErr { + require.Error(t, err) + if tt.wantErrContain != "" { + assert.Contains(t, err.Error(), tt.wantErrContain) + } + return + } + require.NoError(t, err) + assert.Equal(t, tt.wantPrivacy, privacy) + assert.Equal(t, tt.wantRefund, refund) + }) + } +}