diff --git a/CONFIG.md b/CONFIG.md index b5eaaf1354..d897fbc461 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -390,6 +390,7 @@ Enabled = false # Default BlockTime = '10s' # Example CustomURL = 'https://example.api.io' # Example DualBroadcast = false # Example +Bundles = false # Example ``` @@ -417,6 +418,12 @@ DualBroadcast = false # Example ``` DualBroadcast enables DualBroadcast functionality. +### Bundles +```toml +Bundles = false # Example +``` +Bundles enables sending bundles for auctioning (not compatible with all OFAs). + ## BalanceMonitor ```toml [BalanceMonitor] diff --git a/pkg/config/chain_scoped_transactions.go b/pkg/config/chain_scoped_transactions.go index 72220f7471..7a70ce140c 100644 --- a/pkg/config/chain_scoped_transactions.go +++ b/pkg/config/chain_scoped_transactions.go @@ -64,6 +64,10 @@ func (t *transactionManagerV2Config) DualBroadcast() *bool { return t.c.DualBroadcast } +func (t *transactionManagerV2Config) Bundles() *bool { + return t.c.Bundles +} + func (t *transactionsConfig) AutoPurge() AutoPurgeConfig { return &autoPurgeConfig{c: t.c.AutoPurge} } diff --git a/pkg/config/config.go b/pkg/config/config.go index 9593c96d96..97e7025f62 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -133,6 +133,7 @@ type TransactionManagerV2 interface { BlockTime() *time.Duration CustomURL() *url.URL DualBroadcast() *bool + Bundles() *bool } type GasEstimator interface { diff --git a/pkg/config/toml/config.go b/pkg/config/toml/config.go index 6e9cbf4ee3..c3c664c020 100644 --- a/pkg/config/toml/config.go +++ b/pkg/config/toml/config.go @@ -589,6 +589,7 @@ type TransactionManagerV2Config struct { BlockTime *commonconfig.Duration `toml:",omitempty"` CustomURL *commonconfig.URL `toml:",omitempty"` DualBroadcast *bool `toml:",omitempty"` + Bundles *bool `toml:",omitempty"` } func (t *TransactionManagerV2Config) setFrom(f *TransactionManagerV2Config) { @@ -604,6 +605,9 @@ func (t *TransactionManagerV2Config) setFrom(f *TransactionManagerV2Config) { if v := f.DualBroadcast; v != nil { t.DualBroadcast = f.DualBroadcast } + if v := f.Bundles; v != nil { + t.Bundles = f.Bundles + } } func (t *TransactionManagerV2Config) ValidateConfig() (err error) { diff --git a/pkg/config/toml/config_test.go b/pkg/config/toml/config_test.go index 47328b9dda..d329b6715d 100644 --- a/pkg/config/toml/config_test.go +++ b/pkg/config/toml/config_test.go @@ -64,6 +64,7 @@ func TestDefaults_fieldsNotNil(t *testing.T) { unknown.Transactions.TransactionManagerV2.BlockTime = new(config.Duration) unknown.Transactions.TransactionManagerV2.CustomURL = new(config.URL) unknown.Transactions.TransactionManagerV2.DualBroadcast = ptr(false) + unknown.Transactions.TransactionManagerV2.Bundles = ptr(false) unknown.Transactions.AutoPurge.Threshold = ptr(uint32(0)) unknown.Transactions.AutoPurge.MinAttempts = ptr(uint32(0)) unknown.Transactions.AutoPurge.DetectionApiUrl = new(config.URL) @@ -159,6 +160,7 @@ func TestDocs(t *testing.T) { docDefaults.Transactions.TransactionManagerV2.BlockTime = nil docDefaults.Transactions.TransactionManagerV2.CustomURL = nil docDefaults.Transactions.TransactionManagerV2.DualBroadcast = nil + docDefaults.Transactions.TransactionManagerV2.Bundles = nil // Fallback DA oracle is not set docDefaults.GasEstimator.DAOracle = DAOracle{} @@ -283,6 +285,7 @@ var fullConfig = EVMConfig{ TransactionManagerV2: TransactionManagerV2Config{ Enabled: ptr(false), DualBroadcast: ptr(true), + Bundles: ptr(false), BlockTime: config.MustNewDuration(42 * time.Second), CustomURL: config.MustParseURL("http://txs.org"), }, diff --git a/pkg/config/toml/docs.toml b/pkg/config/toml/docs.toml index 72947f8394..ea088098d5 100644 --- a/pkg/config/toml/docs.toml +++ b/pkg/config/toml/docs.toml @@ -171,6 +171,8 @@ BlockTime = '10s' # Example CustomURL = 'https://example.api.io' # Example # DualBroadcast enables DualBroadcast functionality. DualBroadcast = false # Example +# Bundles enables sending bundles for auctioning (not compatible with all OFAs). +Bundles = false # Example [BalanceMonitor] # Enabled balance monitoring for all keys. diff --git a/pkg/config/toml/testdata/config-full.toml b/pkg/config/toml/testdata/config-full.toml index 0284665631..3647cd4cba 100644 --- a/pkg/config/toml/testdata/config-full.toml +++ b/pkg/config/toml/testdata/config-full.toml @@ -47,6 +47,7 @@ Enabled = false BlockTime = '42s' CustomURL = 'http://txs.org' DualBroadcast = true +Bundles = false [BalanceMonitor] Enabled = true diff --git a/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go index 9313490e2d..e0e24d8878 100644 --- a/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go +++ b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go @@ -10,30 +10,51 @@ import ( "math/big" "net/http" "net/url" + "strconv" + "strings" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + evmtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" - "github.com/smartcontractkit/chainlink-evm/pkg/client" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-evm/pkg/keys" - "github.com/smartcontractkit/chainlink-evm/pkg/txm" "github.com/smartcontractkit/chainlink-evm/pkg/txm/types" ) -var _ txm.Client = &FlashbotsClient{} +const flashbotsRPCTimeout = 10 * time.Second + +type FlashbotsTxStore interface { + FetchUnconfirmedTransactions(context.Context, common.Address) ([]*types.Transaction, error) +} + +type FlashbotsClientRPC interface { + BlockByNumber(ctx context.Context, number *big.Int) (*evmtypes.Block, error) + NonceAt(context.Context, common.Address, *big.Int) (uint64, error) + SendTransaction(context.Context, *evmtypes.Transaction) error +} type FlashbotsClient struct { - c client.Client + lggr logger.SugaredLogger + c FlashbotsClientRPC keystore keys.MessageSigner customURL *url.URL + txStore FlashbotsTxStore + bundles bool } -func NewFlashbotsClient(c client.Client, keystore keys.MessageSigner, customURL *url.URL) *FlashbotsClient { +func NewFlashbotsClient(lggr logger.Logger, c FlashbotsClientRPC, keystore keys.MessageSigner, customURL *url.URL, txStore FlashbotsTxStore, bundles *bool) *FlashbotsClient { + b := bundles != nil && *bundles return &FlashbotsClient{ + lggr: logger.Sugared(logger.Named(lggr, "Txm.FlashbotsClient")), c: c, keystore: keystore, customURL: customURL, + txStore: txStore, + bundles: b, } } @@ -42,15 +63,21 @@ func (d *FlashbotsClient) NonceAt(ctx context.Context, address common.Address, b } func (d *FlashbotsClient) PendingNonceAt(ctx context.Context, address common.Address) (uint64, error) { + ctx, cancel := context.WithTimeout(ctx, flashbotsRPCTimeout) + defer cancel() body := []byte(fmt.Sprintf(`{"jsonrpc":"2.0","method":"eth_getTransactionCount","params":["%s","pending"], "id":1}`, address.String())) - response, err := d.signAndPostMessage(ctx, address, body, "") + raw, err := d.signAndPostMessage(ctx, address, body, "") if err != nil { return 0, err } - nonce, err := hexutil.DecodeUint64(response) + var resultStr string + if err := json.Unmarshal(raw, &resultStr); err != nil { + return 0, fmt.Errorf("failed to unmarshal response into string: %w", err) + } + nonce, err := hexutil.DecodeUint64(resultStr) if err != nil { - return 0, fmt.Errorf("failed to decode response %v into uint64: %w", response, err) + return 0, fmt.Errorf("failed to decode response %v into uint64: %w", resultStr, err) } return nonce, nil } @@ -72,57 +99,216 @@ func (d *FlashbotsClient) SendTransaction(ctx context.Context, tx *types.Transac } body := []byte(fmt.Sprintf(`{"jsonrpc":"2.0","method":"eth_sendRawTransaction","params":["%s"], "id":1}`, hexutil.Encode(data))) _, err = d.signAndPostMessage(ctx, tx.FromAddress, body, params) - return err + if err != nil { + return err + } + + // After successfully sending the transaction, send a bundle with all unconfirmed transactions + // Don't act on a bundle error - this is a fire and forget operation but we do want to log the error. + if d.bundles { + if err := d.SendBundle(ctx, tx.FromAddress, params); err != nil { + d.lggr.Error("error sending bundle: ", err) + } + } + return nil } return d.c.SendTransaction(ctx, attempt.SignedTransaction) } -func (d *FlashbotsClient) signAndPostMessage(ctx context.Context, address common.Address, body []byte, urlParams string) (result string, err error) { +func (d *FlashbotsClient) signAndPostMessage(ctx context.Context, address common.Address, body []byte, urlParams string) (json.RawMessage, error) { + ctx, cancel := context.WithTimeout(ctx, flashbotsRPCTimeout) + defer cancel() bodyReader := bytes.NewReader(body) postReq, err := http.NewRequestWithContext(ctx, http.MethodPost, d.customURL.String()+"?"+urlParams, bodyReader) if err != nil { - return + return nil, err } hashedBody := crypto.Keccak256Hash(body).Hex() signedMessage, err := d.keystore.SignMessage(ctx, address, []byte(hashedBody)) if err != nil { - return + return nil, err } postReq.Header.Add("X-Flashbots-signature", address.String()+":"+hexutil.Encode(signedMessage)) + postReq.Header.Add("X-Flashbots-Origin", "chainlink") postReq.Header.Add("Content-Type", "application/json") + reqDesc := fmt.Sprintf("%s %s body: %s", postReq.Method, postReq.URL.String(), string(body)) resp, err := http.DefaultClient.Do(postReq) if err != nil { - return result, fmt.Errorf("request %v failed: %w", postReq, err) + return nil, fmt.Errorf("request %s failed: %w", reqDesc, err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return result, fmt.Errorf("request %v failed with status: %d", postReq, resp.StatusCode) + return nil, fmt.Errorf("request %s failed with status: %d", reqDesc, resp.StatusCode) } keyJSON, err := io.ReadAll(resp.Body) if err != nil { - return + return nil, err } var response postResponse err = json.Unmarshal(keyJSON, &response) if err != nil { - return result, fmt.Errorf("failed to unmarshal response into struct: %w: %s", err, string(keyJSON)) + return nil, fmt.Errorf("failed to unmarshal response into struct: %w: %s", err, string(keyJSON)) } if response.Error.Message != "" { - return result, errors.New(response.Error.Message) + return nil, errors.New(response.Error.Message) } return response.Result, nil } type postResponse struct { - Result string `json:"result,omitempty"` + Result json.RawMessage `json:"result,omitempty"` Error postError } type postError struct { Message string `json:"message,omitempty"` } + +// SendBundle sends a bundle of all the in-flight transactions. +func (d *FlashbotsClient) SendBundle(ctx context.Context, fromAddress common.Address, urlParams string) error { + unconfirmedTxs, err := d.txStore.FetchUnconfirmedTransactions(ctx, fromAddress) + if err != nil { + return fmt.Errorf("failed to fetch unconfirmed transactions: %w", err) + } + + // We fetch all the unconfirmed transactions in an ascending nonce order. + // For the bundle we need a signed transaction so we get the first attempt from each transaction. + // TODO: Implement a more sophisticated attempt selection logic if necessary. + attempts := make([]*types.Attempt, 0, len(unconfirmedTxs)) + nonces := make([]uint64, 0, len(unconfirmedTxs)) + for _, unconfirmedTx := range unconfirmedTxs { + if len(unconfirmedTx.Attempts) > 0 { + attempts = append(attempts, unconfirmedTx.Attempts[0]) + if unconfirmedTx.Nonce != nil { + nonces = append(nonces, *unconfirmedTx.Nonce) + } + } + } + + // Need at least 2 transactions to send a bundle + if len(attempts) < 2 { + return nil + } + + // TODO: we don't have a good way to get this other than making an RPC call. Some async caching may help with the overhead. + currentBlock, err := d.c.BlockByNumber(ctx, nil) + if err != nil { + return fmt.Errorf("failed to get current block height: %w", err) + } + maxBlock := currentBlock.NumberU64() + 24 + + bodyItems := make([]map[string]any, 0, len(attempts)) + for _, attempt := range attempts { + if attempt.SignedTransaction == nil { + return fmt.Errorf("attempt with ID %d has nil SignedTransaction", attempt.ID) + } + + txData, err := attempt.SignedTransaction.MarshalBinary() + if err != nil { + return fmt.Errorf("failed to marshal transaction for attempt ID %d: %w", attempt.ID, err) + } + + bodyItems = append(bodyItems, map[string]any{ + "tx": hexutil.Encode(txData), + "revertMode": "allow", // we always want to allow reverts so bundles are valid even if a single transaction within the bundle goes through + }) + } + privacy, refundConfig, err := parseURLParams(urlParams) + if err != nil { + return err + } + + bundleParams := map[string]any{ + "body": bodyItems, + "inclusion": map[string]any{ + "block": hexutil.EncodeBig(new(big.Int).SetUint64(currentBlock.NumberU64())), + "maxBlock": hexutil.EncodeBig(new(big.Int).SetUint64(maxBlock)), + }, + "privacy": privacy, + "version": "v0.1", + } + if refundConfig.Address != "" { + bundleParams["validity"] = map[string]any{ + "refundConfig": []any{refundConfig}, + } + } + + requestBody := map[string]any{ + "jsonrpc": "2.0", + "id": 1, + "method": "mev_sendBundle", + "params": []any{bundleParams}, + } + + bodyBytes, err := json.Marshal(requestBody) + if err != nil { + return fmt.Errorf("failed to marshal bundle request: %w", err) + } + + raw, err := d.signAndPostMessage(ctx, fromAddress, bodyBytes, "") + if err != nil { + return err + } + + var bundleResult struct { + BundleHash string `json:"bundleHash"` + } + if err := json.Unmarshal(raw, &bundleResult); err != nil { + return fmt.Errorf("failed to decode response %v into bundle result: %w", raw, err) + } + d.lggr.Infow("Broadcasted transaction bundle", "nonces", nonces, "bundleHash", bundleResult.BundleHash) + return nil +} + +func parseURLParams(params string) (Privacy, RefundConfig, error) { + values, err := url.ParseQuery(params) + if err != nil { + return Privacy{}, RefundConfig{}, fmt.Errorf("unable to parse params: %w", err) + } + + privacy := Privacy{} + if timeout, err := strconv.Atoi(values.Get("auctionTimeout")); err == nil { + privacy.AuctionTimeout = timeout + } + + privacy.Builders = append(privacy.Builders, values["builder"]...) + + privacy.Hints = append(privacy.Hints, values["hint"]...) + + refundConfig := RefundConfig{} + refundRaw := values.Get("refund") + if refundRaw != "" { + parts := strings.Split(refundRaw, ":") + if len(parts) == 2 { + address := parts[0] + percentVal, err := strconv.Atoi(parts[1]) + if err != nil { + return Privacy{}, RefundConfig{}, fmt.Errorf("unable to parse percentage: %w", err) + } + + privacy.WantRefund = percentVal + refundConfig = RefundConfig{ + Address: address, + Percent: 100, // wantRefund is an absolute percent of the kickback, and refundConfig.percent=100 means entire refund goes to this address (no longer supported) + } + } + } + return privacy, refundConfig, nil +} + +type Privacy struct { + WantRefund int `json:"wantRefund"` + AuctionTimeout int `json:"auctionTimeout"` + Builders []string `json:"builders"` + Hints []string `json:"hints"` +} + +type RefundConfig struct { + Address string `json:"address"` + Percent int `json:"percent"` +} diff --git a/pkg/txm/clientwrappers/dualbroadcast/flashbots_client_test.go b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client_test.go new file mode 100644 index 0000000000..67bd4476f1 --- /dev/null +++ b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client_test.go @@ -0,0 +1,112 @@ +package dualbroadcast + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParseURLParams(t *testing.T) { + tests := []struct { + name string + params string + wantPrivacy Privacy + wantRefund RefundConfig + wantErr bool + wantErrContain string + }{ + { + name: "empty params", + params: "", + wantPrivacy: Privacy{}, + wantRefund: RefundConfig{}, + }, + { + name: "auctionTimeout", + params: "auctionTimeout=60", + wantPrivacy: Privacy{AuctionTimeout: 60}, + wantRefund: RefundConfig{}, + }, + { + name: "auctionTimeout invalid ignored", + params: "auctionTimeout=notanint", + wantPrivacy: Privacy{}, + wantRefund: RefundConfig{}, + }, + { + name: "single builder", + params: "builder=test_builder", + wantPrivacy: Privacy{Builders: []string{"test_builder"}}, + wantRefund: RefundConfig{}, + }, + { + name: "multiple builders", + params: "builder=test_builder_1&builder=test_builder_2", + wantPrivacy: Privacy{Builders: []string{"test_builder_1", "test_builder_2"}}, + wantRefund: RefundConfig{}, + }, + { + name: "single hint", + params: "hint=calldata", + wantPrivacy: Privacy{Hints: []string{"calldata"}}, + wantRefund: RefundConfig{}, + }, + { + name: "multiple hints", + params: "hint=calldata&hint=hash", + wantPrivacy: Privacy{Hints: []string{"calldata", "hash"}}, + wantRefund: RefundConfig{}, + }, + { + name: "refund valid", + params: "refund=0xRefundAddr:50", + wantPrivacy: Privacy{WantRefund: 50}, + wantRefund: RefundConfig{Address: "0xRefundAddr", Percent: 100}, + }, + { + name: "refund invalid percent", + params: "refund=0xRefundAddr:bad", + wantErr: true, + wantErrContain: "unable to parse percentage", + }, + { + name: "refund single part ignored", + params: "refund=0xRefundAddr", + wantPrivacy: Privacy{}, + wantRefund: RefundConfig{}, + }, + { + name: "invalid query", + params: "%", + wantErr: true, + wantErrContain: "unable to parse params", + }, + { + name: "combined params", + params: "auctionTimeout=120&builder=test_builder_1&builder=test_builder_2&hint=h1&refund=0xR:75", + wantPrivacy: Privacy{ + AuctionTimeout: 120, + Builders: []string{"test_builder_1", "test_builder_2"}, + Hints: []string{"h1"}, + WantRefund: 75, + }, + wantRefund: RefundConfig{Address: "0xR", Percent: 100}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + privacy, refund, err := parseURLParams(tt.params) + if tt.wantErr { + require.Error(t, err) + if tt.wantErrContain != "" { + assert.Contains(t, err.Error(), tt.wantErrContain) + } + return + } + require.NoError(t, err) + assert.Equal(t, tt.wantPrivacy, privacy) + assert.Equal(t, tt.wantRefund, refund) + }) + } +} diff --git a/pkg/txm/clientwrappers/dualbroadcast/selector.go b/pkg/txm/clientwrappers/dualbroadcast/selector.go index c99c41962a..f42e187d7f 100644 --- a/pkg/txm/clientwrappers/dualbroadcast/selector.go +++ b/pkg/txm/clientwrappers/dualbroadcast/selector.go @@ -12,11 +12,11 @@ import ( "github.com/smartcontractkit/chainlink-evm/pkg/txm" ) -func SelectClient(lggr logger.Logger, client client.Client, keyStore keys.ChainStore, url *url.URL, chainID *big.Int, txStore MetaClientTxStore) (txm.Client, txm.ErrorHandler, error) { +func SelectClient(lggr logger.Logger, client client.Client, keyStore keys.ChainStore, url *url.URL, chainID *big.Int, txStore txm.TxStore, bundles *bool) (txm.Client, txm.ErrorHandler, error) { urlString := url.String() switch { case strings.Contains(urlString, "flashbots"): - return NewFlashbotsClient(client, keyStore, url), nil, nil + return NewFlashbotsClient(lggr, client, keyStore, url, txStore, bundles), nil, nil default: mc, err := NewMetaClient(lggr, client, keyStore, url, chainID, txStore) if err != nil { diff --git a/pkg/txm/mock_tx_store_test.go b/pkg/txm/mock_tx_store_test.go index 7b87a1aacc..aac0e4f39c 100644 --- a/pkg/txm/mock_tx_store_test.go +++ b/pkg/txm/mock_tx_store_test.go @@ -7,6 +7,8 @@ import ( common "github.com/ethereum/go-ethereum/common" + coretypes "github.com/ethereum/go-ethereum/core/types" + mock "github.com/stretchr/testify/mock" types "github.com/smartcontractkit/chainlink-evm/pkg/txm/types" @@ -369,6 +371,65 @@ func (_c *MockTxStore_FetchUnconfirmedTransactionAtNonceWithCount_Call) RunAndRe return _c } +// FetchUnconfirmedTransactions provides a mock function with given fields: _a0, _a1 +func (_m *MockTxStore) FetchUnconfirmedTransactions(_a0 context.Context, _a1 common.Address) ([]*types.Transaction, error) { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for FetchUnconfirmedTransactions") + } + + var r0 []*types.Transaction + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, common.Address) ([]*types.Transaction, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, common.Address) []*types.Transaction); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*types.Transaction) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, common.Address) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockTxStore_FetchUnconfirmedTransactions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FetchUnconfirmedTransactions' +type MockTxStore_FetchUnconfirmedTransactions_Call struct { + *mock.Call +} + +// FetchUnconfirmedTransactions is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 common.Address +func (_e *MockTxStore_Expecter) FetchUnconfirmedTransactions(_a0 interface{}, _a1 interface{}) *MockTxStore_FetchUnconfirmedTransactions_Call { + return &MockTxStore_FetchUnconfirmedTransactions_Call{Call: _e.mock.On("FetchUnconfirmedTransactions", _a0, _a1)} +} + +func (_c *MockTxStore_FetchUnconfirmedTransactions_Call) Run(run func(_a0 context.Context, _a1 common.Address)) *MockTxStore_FetchUnconfirmedTransactions_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.Address)) + }) + return _c +} + +func (_c *MockTxStore_FetchUnconfirmedTransactions_Call) Return(_a0 []*types.Transaction, _a1 error) *MockTxStore_FetchUnconfirmedTransactions_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockTxStore_FetchUnconfirmedTransactions_Call) RunAndReturn(run func(context.Context, common.Address) ([]*types.Transaction, error)) *MockTxStore_FetchUnconfirmedTransactions_Call { + _c.Call.Return(run) + return _c +} + // MarkConfirmedAndReorgedTransactions provides a mock function with given fields: _a0, _a1, _a2 func (_m *MockTxStore) MarkConfirmedAndReorgedTransactions(_a0 context.Context, _a1 uint64, _a2 common.Address) ([]*types.Transaction, []uint64, error) { ret := _m.Called(_a0, _a1, _a2) @@ -534,6 +595,56 @@ func (_c *MockTxStore_MarkUnconfirmedTransactionPurgeable_Call) RunAndReturn(run return _c } +// UpdateSignedAttempt provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4 +func (_m *MockTxStore) UpdateSignedAttempt(_a0 context.Context, _a1 uint64, _a2 uint64, _a3 *coretypes.Transaction, _a4 common.Address) error { + ret := _m.Called(_a0, _a1, _a2, _a3, _a4) + + if len(ret) == 0 { + panic("no return value specified for UpdateSignedAttempt") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64, *coretypes.Transaction, common.Address) error); ok { + r0 = rf(_a0, _a1, _a2, _a3, _a4) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockTxStore_UpdateSignedAttempt_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateSignedAttempt' +type MockTxStore_UpdateSignedAttempt_Call struct { + *mock.Call +} + +// UpdateSignedAttempt is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 uint64 +// - _a2 uint64 +// - _a3 *coretypes.Transaction +// - _a4 common.Address +func (_e *MockTxStore_Expecter) UpdateSignedAttempt(_a0 interface{}, _a1 interface{}, _a2 interface{}, _a3 interface{}, _a4 interface{}) *MockTxStore_UpdateSignedAttempt_Call { + return &MockTxStore_UpdateSignedAttempt_Call{Call: _e.mock.On("UpdateSignedAttempt", _a0, _a1, _a2, _a3, _a4)} +} + +func (_c *MockTxStore_UpdateSignedAttempt_Call) Run(run func(_a0 context.Context, _a1 uint64, _a2 uint64, _a3 *coretypes.Transaction, _a4 common.Address)) *MockTxStore_UpdateSignedAttempt_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(uint64), args[2].(uint64), args[3].(*coretypes.Transaction), args[4].(common.Address)) + }) + return _c +} + +func (_c *MockTxStore_UpdateSignedAttempt_Call) Return(_a0 error) *MockTxStore_UpdateSignedAttempt_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTxStore_UpdateSignedAttempt_Call) RunAndReturn(run func(context.Context, uint64, uint64, *coretypes.Transaction, common.Address) error) *MockTxStore_UpdateSignedAttempt_Call { + _c.Call.Return(run) + return _c +} + // UpdateTransactionBroadcast provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4 func (_m *MockTxStore) UpdateTransactionBroadcast(_a0 context.Context, _a1 uint64, _a2 uint64, _a3 common.Hash, _a4 common.Address) error { ret := _m.Called(_a0, _a1, _a2, _a3, _a4) diff --git a/pkg/txm/storage/inmemory_store.go b/pkg/txm/storage/inmemory_store.go index decfcb9689..83db34a854 100644 --- a/pkg/txm/storage/inmemory_store.go +++ b/pkg/txm/storage/inmemory_store.go @@ -195,6 +195,26 @@ func (m *InMemoryStore) FetchUnconfirmedTransactionAtNonceWithCount(latestNonce return } +func (m *InMemoryStore) FetchUnconfirmedTransactions() ([]*types.Transaction, error) { + m.RLock() + defer m.RUnlock() + + transactions := make([]*types.Transaction, 0, len(m.UnconfirmedTransactions)) + for _, tx := range m.UnconfirmedTransactions { + if tx.Nonce == nil { + return nil, fmt.Errorf("unconfirmed transaction with ID %d has nil nonce", tx.ID) + } + transactions = append(transactions, tx.DeepCopy()) + } + + // Sort by nonce in ascending order + sort.Slice(transactions, func(i, j int) bool { + return *transactions[i].Nonce < *transactions[j].Nonce + }) + + return transactions, nil +} + func (m *InMemoryStore) MarkConfirmedAndReorgedTransactions(latestNonce uint64) ([]*types.Transaction, []uint64, error) { m.Lock() defer m.Unlock() diff --git a/pkg/txm/storage/inmemory_store_manager.go b/pkg/txm/storage/inmemory_store_manager.go index fde77e9037..4d6d421e18 100644 --- a/pkg/txm/storage/inmemory_store_manager.go +++ b/pkg/txm/storage/inmemory_store_manager.go @@ -83,6 +83,13 @@ func (m *InMemoryStoreManager) FetchUnconfirmedTransactionAtNonceWithCount(_ con return nil, 0, fmt.Errorf(StoreNotFoundForAddress, fromAddress) } +func (m *InMemoryStoreManager) FetchUnconfirmedTransactions(_ context.Context, fromAddress common.Address) ([]*types.Transaction, error) { + if store, exists := m.InMemoryStoreMap[fromAddress]; exists { + return store.FetchUnconfirmedTransactions() + } + return nil, fmt.Errorf(StoreNotFoundForAddress, fromAddress) +} + func (m *InMemoryStoreManager) MarkConfirmedAndReorgedTransactions(_ context.Context, nonce uint64, fromAddress common.Address) (confirmedTxs []*types.Transaction, unconfirmedTxIDs []uint64, err error) { if store, exists := m.InMemoryStoreMap[fromAddress]; exists { confirmedTxs, unconfirmedTxIDs, err = store.MarkConfirmedAndReorgedTransactions(nonce) diff --git a/pkg/txm/txm.go b/pkg/txm/txm.go index 0c2924e3ab..8ce3da1087 100644 --- a/pkg/txm/txm.go +++ b/pkg/txm/txm.go @@ -8,6 +8,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + evmtypes "github.com/ethereum/go-ethereum/core/types" "github.com/jpillora/backoff" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -38,9 +39,11 @@ type TxStore interface { AppendAttemptToTransaction(context.Context, uint64, common.Address, *types.Attempt) (attempts []*types.Attempt, err error) CreateEmptyUnconfirmedTransaction(context.Context, common.Address, uint64, uint64) (*types.Transaction, error) CreateTransaction(context.Context, *types.TxRequest) (*types.Transaction, error) + FetchUnconfirmedTransactions(context.Context, common.Address) ([]*types.Transaction, error) FetchUnconfirmedTransactionAtNonceWithCount(context.Context, uint64, common.Address) (*types.Transaction, int, error) MarkConfirmedAndReorgedTransactions(context.Context, uint64, common.Address) ([]*types.Transaction, []uint64, error) MarkUnconfirmedTransactionPurgeable(context.Context, uint64, common.Address) error + UpdateSignedAttempt(context.Context, uint64, uint64, *evmtypes.Transaction, common.Address) error UpdateTransactionBroadcast(context.Context, uint64, uint64, common.Hash, common.Address) error UpdateUnstartedTransactionWithNonce(context.Context, common.Address, uint64) (*types.Transaction, error) diff --git a/pkg/txmgr/builder.go b/pkg/txmgr/builder.go index 6564a08926..924adf2231 100644 --- a/pkg/txmgr/builder.go +++ b/pkg/txmgr/builder.go @@ -152,7 +152,7 @@ func NewTxmV2( var c txm.Client if txmV2Config.DualBroadcast() != nil && *txmV2Config.DualBroadcast() && txmV2Config.CustomURL() != nil { var err error - c, eh, err = dualbroadcast.SelectClient(lggr, client, keyStore, txmV2Config.CustomURL(), chainID, inMemoryStoreManager) + c, eh, err = dualbroadcast.SelectClient(lggr, client, keyStore, txmV2Config.CustomURL(), chainID, inMemoryStoreManager, txmV2Config.Bundles()) if err != nil { return nil, fmt.Errorf("failed to create dual broadcast client: %w", err) }