diff --git a/contracts/platform/sources/forwarder.move b/contracts/platform/sources/forwarder.move index 7d2f54bf..0a3a0377 100644 --- a/contracts/platform/sources/forwarder.move +++ b/contracts/platform/sources/forwarder.move @@ -53,6 +53,19 @@ module platform::forwarder { oracles: vector } + /* + struct Transmission { + address transmitter; + // with an increased gas limit. + bool success; // this can actually hold the transmission state (notAttempted, inProgress, successful, failed) + // or we can derive it in getTransmissionState(), do check chainlink-evm/contracts/cre/src/keystone/KeystoneForwarder.sol + // The amount of gas allocated for the `IReceiver.onReport` call. uint80 allows storing gas for known EVM block + // gas limits. Ensures that the minimum gas requested by the user is available during the transmission attempt. + // If the transmission fails (indicated by a `false` success state), it can be retried with an increased gas limit. + uint80 gasLimit; + } + */ + #[event] struct ConfigSet has drop, store { don_id: u32, @@ -212,6 +225,20 @@ module platform::forwarder { ); } + // TODO: split this into report_validation() and report_delivery() + // both called with same params and payload + // report_validation() verifies config/payload/sigs and updates transmission info with transmitter address and state to InProgress + // report_delivery() will revert if transmission not in progress + // report_delivery() will revert (or return success) ? if transmission already successful + // report_delivery() updates transmisison info with transmitter address (because different nodes can be calling report_validation() and report_delivery()) + // so we want to look at the AccountTransactions of the correct node + // evm also does the following and it would be nice if aptos had a similar mechanism, this can help send retry attempts with increased gas limit + /* + uint256 gasLimit = gasleft() - INTERNAL_GAS_REQUIREMENTS; + if (gasLimit < MINIMUM_GAS_LIMIT) revert InsufficientGasForRouting(transmissionId); + s_transmissions[transmissionId].gasLimit = uint80(gasLimit); + */ + entry fun report( transmitter: &signer, receiver: address, @@ -322,6 +349,34 @@ module platform::forwarder { (metadata, data) } + /* + function getTransmissionInfo( + address receiver, + bytes32 workflowExecutionId, + bytes2 reportId + ) external view returns (TransmissionInfo memory) { + bytes32 transmissionId = getTransmissionId(receiver, workflowExecutionId, reportId); + + Transmission memory transmission = s_transmissions[transmissionId]; + + TransmissionState state; + + if (transmission.transmitter == address(0)) { + state = IRouter.TransmissionState.NOT_ATTEMPTED; + } else { + state = transmission.success ? IRouter.TransmissionState.SUCCEEDED : IRouter.TransmissionState.FAILED; + } + + return TransmissionInfo({ + gasLimit: transmission.gasLimit, + state: state, + success: transmission.success, + transmissionId: transmissionId, + transmitter: transmission.transmitter + }); + } + */ + #[view] public fun get_transmission_state( receiver: address, workflow_execution_id: vector, report_id: u16 diff --git a/go.mod b/go.mod index 31a92c34..4a9c0e8a 100644 --- a/go.mod +++ b/go.mod @@ -16,8 +16,8 @@ require ( github.com/prometheus/client_golang v1.22.0 github.com/shopspring/decimal v1.4.0 github.com/smacker/go-tree-sitter v0.0.0-20240827094217-dd81d9e9be82 - github.com/smartcontractkit/chainlink-common v0.9.6-0.20260209153333-67bf1aaa3e1e - github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260206000552-087e235a7963 + github.com/smartcontractkit/chainlink-common v0.10.1-0.20260218165451-e87d34b6ea14 + github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260217103918-db5a4ede0b6f github.com/stretchr/testify v1.11.1 github.com/valyala/fastjson v1.6.4 go.opentelemetry.io/otel v1.39.0 diff --git a/go.sum b/go.sum index 3105237d..19c9cd41 100644 --- a/go.sum +++ b/go.sum @@ -324,12 +324,12 @@ github.com/smacker/go-tree-sitter v0.0.0-20240827094217-dd81d9e9be82 h1:6C8qej6f github.com/smacker/go-tree-sitter v0.0.0-20240827094217-dd81d9e9be82/go.mod h1:xe4pgH49k4SsmkQq5OT8abwhWmnzkhpgnXeekbx2efw= github.com/smartcontractkit/chain-selectors v1.0.89 h1:L9oWZGqQXWyTPnC6ODXgu3b0DFyLmJ9eHv+uJrE9IZY= github.com/smartcontractkit/chain-selectors v1.0.89/go.mod h1:qy7whtgG5g+7z0jt0nRyii9bLND9m15NZTzuQPkMZ5w= -github.com/smartcontractkit/chainlink-common v0.9.6-0.20260209153333-67bf1aaa3e1e h1:3zBkN2h2JzgjEntuV/YqqqCC9vNrBdwC5/FKfJi+1G8= -github.com/smartcontractkit/chainlink-common v0.9.6-0.20260209153333-67bf1aaa3e1e/go.mod h1:TDyLV7/Y+lnZegvfeZXj5myOG0cKrsmuGnJ8OQQuPWo= +github.com/smartcontractkit/chainlink-common v0.10.1-0.20260218165451-e87d34b6ea14 h1:k22Otc6zoXwV3C5bOU/UDXZqUUH0QTGvcjBl0xbyQ8c= +github.com/smartcontractkit/chainlink-common v0.10.1-0.20260218165451-e87d34b6ea14/go.mod h1:DOY8qT1C6yBA+lmJFijYdCSlUsrCKM00OG7Lenf92Xo= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9oqASnkS03RE1HQwYQQxrO4l46O5JSzxqLgg= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260206000552-087e235a7963 h1:DCLvEn4KkFzYbK/AYl4vJmf6EHaskPYvGDGdd0kOma0= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260206000552-087e235a7963/go.mod h1:Jqt53s27Tr0jDl8mdBXg1xhu6F8Fci8JOuq43tgHOM8= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260217103918-db5a4ede0b6f h1:wIi0g1zvEIIMe9fd6GNar0HzSGYr4bKPn4AGFhuHe3U= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260217103918-db5a4ede0b6f/go.mod h1:Jqt53s27Tr0jDl8mdBXg1xhu6F8Fci8JOuq43tgHOM8= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b h1:QuI6SmQFK/zyUlVWEf0GMkiUYBPY4lssn26nKSd/bOM= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b/go.mod h1:qSTSwX3cBP3FKQwQacdjArqv0g6QnukjV4XuzO6UyoY= github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b h1:36knUpKHHAZ86K4FGWXtx8i/EQftGdk2bqCoEu/Cha8= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index bfd499a5..cb189dee 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -14,7 +14,7 @@ require ( github.com/rs/zerolog v1.34.0 github.com/smartcontractkit/chain-selectors v1.0.89 github.com/smartcontractkit/chainlink-aptos v0.0.0-20251212131933-e5e85d6fa4d3 - github.com/smartcontractkit/chainlink-common v0.9.6-0.20260209153333-67bf1aaa3e1e + github.com/smartcontractkit/chainlink-common v0.10.1-0.20260218143149-f4e93991b518 github.com/smartcontractkit/chainlink-deployments-framework v0.74.2 github.com/smartcontractkit/chainlink-testing-framework/framework v0.13.0 github.com/smartcontractkit/chainlink/core/scripts v0.0.0-20260114190217-6f3f008c67a6 @@ -348,7 +348,7 @@ require ( github.com/smartcontractkit/chainlink-protos/chainlink-ccv/committee-verifier v0.0.0-20251211142334-5c3421fe2c8d // indirect github.com/smartcontractkit/chainlink-protos/chainlink-ccv/message-discovery v0.0.0-20251211142334-5c3421fe2c8d // indirect github.com/smartcontractkit/chainlink-protos/chainlink-ccv/verifier v0.0.0-20251211142334-5c3421fe2c8d // indirect - github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260206000552-087e235a7963 // indirect + github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260217043601-5cc966896c4f // indirect github.com/smartcontractkit/chainlink-protos/job-distributor v0.17.0 // indirect github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b // indirect github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b // indirect diff --git a/integration-tests/go.sum b/integration-tests/go.sum index b28a2ce1..528a6684 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1307,8 +1307,8 @@ github.com/smartcontractkit/chainlink-ccip/deployment v0.0.0-20251222203705-84e9 github.com/smartcontractkit/chainlink-ccip/deployment v0.0.0-20251222203705-84e93cab86b5/go.mod h1:kDMTKjZB4pnhQVAdwVMzA0THXAxjaON58JSO+CYLYBg= github.com/smartcontractkit/chainlink-ccv v0.0.0-20260106165728-3d896e87cc56 h1:M6eS2r11Vbbll/bve5Us17cNYDlgs+dvrggPFVnhrgQ= github.com/smartcontractkit/chainlink-ccv v0.0.0-20260106165728-3d896e87cc56/go.mod h1:6N8NSPmsy+sxtRBmBUwWlDyxPyauS7HMDzUl/lyJw7Y= -github.com/smartcontractkit/chainlink-common v0.9.6-0.20260209153333-67bf1aaa3e1e h1:3zBkN2h2JzgjEntuV/YqqqCC9vNrBdwC5/FKfJi+1G8= -github.com/smartcontractkit/chainlink-common v0.9.6-0.20260209153333-67bf1aaa3e1e/go.mod h1:TDyLV7/Y+lnZegvfeZXj5myOG0cKrsmuGnJ8OQQuPWo= +github.com/smartcontractkit/chainlink-common v0.10.1-0.20260218143149-f4e93991b518 h1:wPdjEUQjw+cp/tOeJDI32u1MDkRGwK3ZLaD5a9tm0qI= +github.com/smartcontractkit/chainlink-common v0.10.1-0.20260218143149-f4e93991b518/go.mod h1:HXgSKzmZ/bhSx8nHU7hHW6dR+BHSXkdcpFv2T8qJcS8= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20251211140724-319861e514c4 h1:NOUsjsMzNecbjiPWUQGlRSRAutEvCFrqqyETDJeh5q4= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20251211140724-319861e514c4/go.mod h1:Zpvul9sTcZNAZOVzt5vBl1XZGNvQebFpnpn3/KOQvOQ= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20251215152504-b1e41f508340 h1:PsjEI+5jZIz9AS4eOsLS5VpSWJINf38clXV3wryPyMk= @@ -1341,8 +1341,8 @@ github.com/smartcontractkit/chainlink-protos/chainlink-ccv/message-discovery v0. github.com/smartcontractkit/chainlink-protos/chainlink-ccv/message-discovery v0.0.0-20251211142334-5c3421fe2c8d/go.mod h1:ATjAPIVJibHRcIfiG47rEQkUIOoYa6KDvWj3zwCAw6g= github.com/smartcontractkit/chainlink-protos/chainlink-ccv/verifier v0.0.0-20251211142334-5c3421fe2c8d h1:AJy55QJ/pBhXkZjc7N+ATnWfxrcjq9BI9DmdtdjwDUQ= github.com/smartcontractkit/chainlink-protos/chainlink-ccv/verifier v0.0.0-20251211142334-5c3421fe2c8d/go.mod h1:5JdppgngCOUS76p61zCinSCgOhPeYQ+OcDUuome5THQ= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260206000552-087e235a7963 h1:DCLvEn4KkFzYbK/AYl4vJmf6EHaskPYvGDGdd0kOma0= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260206000552-087e235a7963/go.mod h1:Jqt53s27Tr0jDl8mdBXg1xhu6F8Fci8JOuq43tgHOM8= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260217043601-5cc966896c4f h1:MHlgzqiDPyDV397bZkzS9TtWXb3FR9Pb8FR9cP9h0As= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260217043601-5cc966896c4f/go.mod h1:Jqt53s27Tr0jDl8mdBXg1xhu6F8Fci8JOuq43tgHOM8= github.com/smartcontractkit/chainlink-protos/job-distributor v0.17.0 h1:xHPmFDhff7QpeFxKsZfk+24j4AlnQiFjjRh5O87Peu4= github.com/smartcontractkit/chainlink-protos/job-distributor v0.17.0/go.mod h1:/dVVLXrsp+V0AbcYGJo3XMzKg3CkELsweA/TTopCsKE= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b h1:QuI6SmQFK/zyUlVWEf0GMkiUYBPY4lssn26nKSd/bOM= diff --git a/relayer/aptos_service.go b/relayer/aptos_service.go new file mode 100644 index 00000000..8e2000cc --- /dev/null +++ b/relayer/aptos_service.go @@ -0,0 +1,310 @@ +package relayer + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math/big" + "time" + + aptosgosdk "github.com/aptos-labs/aptos-go-sdk" + "github.com/aptos-labs/aptos-go-sdk/bcs" + "github.com/google/uuid" + + "github.com/smartcontractkit/chainlink-aptos/relayer/chain" + "github.com/smartcontractkit/chainlink-aptos/relayer/utils" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" + commonaptos "github.com/smartcontractkit/chainlink-common/pkg/types/chains/aptos" + "github.com/smartcontractkit/chainlink-common/pkg/utils/retry" +) + +type aptosService struct { + commontypes.UnimplementedAptosService + chain chain.Chain + logger logger.Logger +} + +func (s *aptosService) AccountAPTBalance(ctx context.Context, req commonaptos.AccountAPTBalanceRequest) (*commonaptos.AccountAPTBalanceReply, error) { + client, err := s.chain.GetClient() + if err != nil { + return nil, fmt.Errorf("failed to get client: %w", err) + } + sdkAddr := aptosgosdk.AccountAddress(req.Address[:]) + reply, err := client.AccountAPTBalance(sdkAddr) + if err != nil { + return nil, fmt.Errorf("failed to get account APT balance: %w", err) + } + return &commonaptos.AccountAPTBalanceReply{Value: reply}, nil +} + +func (s *aptosService) View(ctx context.Context, req commonaptos.ViewRequest) (*commonaptos.ViewReply, error) { + if req.Payload == nil { + return nil, fmt.Errorf("view payload is required") + } + + client, err := s.chain.GetClient() + if err != nil { + return nil, fmt.Errorf("failed to get client: %w", err) + } + + sdkPayload := &aptosgosdk.ViewPayload{ + Module: aptosgosdk.ModuleId{ + Address: aptosgosdk.AccountAddress(req.Payload.Module.Address), + Name: req.Payload.Module.Name, + }, + Function: req.Payload.Function, + ArgTypes: convertTypeTagsToSDK(req.Payload.ArgTypes), + Args: req.Payload.Args, + } + + result, err := client.View(sdkPayload) + if err != nil { + return nil, fmt.Errorf("failed to call view function: %w", err) + } + + data, err := json.Marshal(result) + if err != nil { + return nil, fmt.Errorf("failed to marshal view result: %w", err) + } + + return &commonaptos.ViewReply{Data: data}, nil +} + +func (s *aptosService) TransactionByHash(ctx context.Context, req commonaptos.TransactionByHashRequest) (*commonaptos.TransactionByHashReply, error) { + client, err := s.chain.GetClient() + if err != nil { + return nil, fmt.Errorf("failed to get client: %w", err) + } + + tx, err := client.TransactionByHash(req.Hash) + if err != nil { + return nil, fmt.Errorf("failed to get transaction by hash: %w", err) + } + + data, err := json.Marshal(tx.Inner) + if err != nil { + return nil, fmt.Errorf("failed to marshal transaction data: %w", err) + } + + return &commonaptos.TransactionByHashReply{ + Transaction: &commonaptos.Transaction{ + Type: commonaptos.TransactionVariant(tx.Type), + Hash: string(tx.Hash()), + Version: tx.Version(), + Success: tx.Success(), + Data: data, + }, + }, nil +} + +func (s *aptosService) SubmitTransaction(ctx context.Context, req commonaptos.SubmitTransactionRequest) (*commonaptos.SubmitTransactionReply, error) { + // Deserialize the BCS-encoded TransactionPayload (containing an EntryFunction) + var txPayload aptosgosdk.TransactionPayload + if err := bcs.Deserialize(&txPayload, req.EncodedPayload); err != nil { + return nil, fmt.Errorf("failed to deserialize transaction payload: %w", err) + } + + entryFn, ok := txPayload.Payload.(*aptosgosdk.EntryFunction) + if !ok { + return nil, fmt.Errorf("expected EntryFunction payload, got %T", txPayload.Payload) + } + + gasLimit := big.NewInt(int64(req.GasConfig.MaxGasAmount)) + accounts, err := s.chain.KeyStore().Accounts(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get accounts: %w", err) + } + + if len(accounts) == 0 { + return nil, errors.New("no enabled accounts available") + } + + // Find account with highest balance + publicKey, err := s.getAccountWithHighestBalance(ctx, accounts) + if err != nil { + return nil, fmt.Errorf("failed to determine account for SubmitTransaction: %w", err) + } + txID := uuid.New().String() + err = s.chain.TxManager().EnqueueCRE( + txID, + &commontypes.TxMeta{ + GasLimit: gasLimit, + }, + publicKey, + entryFn, + true, // simulateTx + ) + if err != nil { + return nil, fmt.Errorf("failed to enqueue transaction: %w", err) + } + // TODO: dont use txmgr config, create and use workflow/cre config + maximumWaitTime := time.Duration(s.chain.Config().TransactionManager.TxExpirationSecs) * time.Second + + retryCtx, cancel := context.WithTimeout(ctx, maximumWaitTime) + defer cancel() + txStatus, err := retry.Do(retryCtx, s.logger, func(ctx context.Context) (commonaptos.TransactionStatus, error) { + txStatus, txStatusErr := s.chain.TxManager().GetStatus(txID) + if txStatusErr != nil { + return commonaptos.TxFatal, txStatusErr + } + switch txStatus { + case commontypes.Fatal, commontypes.Failed: + return commonaptos.TxFatal, nil + case commontypes.Unconfirmed, commontypes.Finalized: + return commonaptos.TxSuccess, nil + case commontypes.Pending, commontypes.Unknown: + return commonaptos.TxFatal, fmt.Errorf("tx still in state pending or unknown, tx status is %d for tx with ID %s", txStatus, txID) + default: + return commonaptos.TxFatal, fmt.Errorf("unexpected transaction status %d for tx with ID %s", txStatus, txID) + } + }) + + if err != nil { + return nil, fmt.Errorf("failed getting transaction status: %w", err) + } + + if txStatus == commonaptos.TxFatal { + return &commonaptos.SubmitTransactionReply{ + TxStatus: commonaptos.TxFatal, + TxIdempotencyKey: txID, + }, nil + } else { + return &commonaptos.SubmitTransactionReply{ + TxStatus: commonaptos.TxSuccess, + TxIdempotencyKey: txID, + }, nil + } + // TODO: + // get tx hash + // make write report get tx by hash and check for success or revert reason + // but then we also need to poll for transmission info because some other node might have done a success + // so we need to go through all possible cases and then figure out how to handle retries + + /* + receipt, err := retry.Do(retryContext, e.logger, func(ctx context.Context) (*evmtxmgr.ChainReceipt, error) { + receipt, receiptErr := e.chain.TxManager().GetTransactionReceipt(ctx, txID) + if receiptErr != nil { + return nil, fmt.Errorf("failed to get TX receipt for tx with ID %s: %w", txID, receiptErr) + } + if receipt == nil { + return nil, fmt.Errorf("receipt was nil for TX with ID %s", txID) + } + return receipt, nil + }) + + if err != nil { + return nil, fmt.Errorf("failed getting transaction receipt. %w", err) + } + + return &evm.TransactionResult{ + TxStatus: evm.TxSuccess, + TxHash: (*receipt).GetTxHash(), + TxIdempotencyKey: txID, + }, nil + */ +} + +// getAccountWithHighestBalance returns the public key of the account with the highest APT balance. +func (s *aptosService) getAccountWithHighestBalance(ctx context.Context, accounts []string) (string, error) { + if len(accounts) == 0 { + return "", errors.New("no accounts provided") + } + if len(accounts) == 1 { + s.logger.Debugw("only one enabled account for chain", "account", accounts[0]) + return accounts[0], nil + } + + client, err := s.chain.GetClient() + if err != nil { + return "", fmt.Errorf("failed to get client: %w", err) + } + + var highestBalance uint64 + var selectedAccount string + var foundAny bool + + for _, account := range accounts { + addr, err := utils.HexPublicKeyToAddress(account) + if err != nil { + s.logger.Warnw("failed to convert public key to address, skipping", "account", account, "error", err) + continue + } + + balance, err := client.AccountAPTBalance(addr) + if err != nil { + s.logger.Warnw("failed to get balance for account, skipping", "account", account, "address", addr.String(), "error", err) + continue + } + + if !foundAny || balance > highestBalance { + highestBalance = balance + selectedAccount = account + foundAny = true + } + } + + if !foundAny { + // Fallback to first account if all balance queries failed + return accounts[0], nil + } + + s.logger.Debugw("selected account with highest balance for chain", + "account", selectedAccount, + "balance", highestBalance, + "totalAccounts", len(accounts)) + + return selectedAccount, nil +} + +// convertTypeTagsToSDK converts common TypeTags to SDK TypeTags. +func convertTypeTagsToSDK(tags []commonaptos.TypeTag) []aptosgosdk.TypeTag { + out := make([]aptosgosdk.TypeTag, len(tags)) + for i, tag := range tags { + out[i] = aptosgosdk.TypeTag{Value: convertTypeTagImplToSDK(tag.Value)} + } + return out +} + +func convertTypeTagImplToSDK(impl commonaptos.TypeTagImpl) aptosgosdk.TypeTagImpl { + switch v := impl.(type) { + case commonaptos.BoolTag: + return &aptosgosdk.BoolTag{} + case commonaptos.U8Tag: + return &aptosgosdk.U8Tag{} + case commonaptos.U16Tag: + return &aptosgosdk.U16Tag{} + case commonaptos.U32Tag: + return &aptosgosdk.U32Tag{} + case commonaptos.U64Tag: + return &aptosgosdk.U64Tag{} + case commonaptos.U128Tag: + return &aptosgosdk.U128Tag{} + case commonaptos.U256Tag: + return &aptosgosdk.U256Tag{} + case commonaptos.AddressTag: + return &aptosgosdk.AddressTag{} + case commonaptos.SignerTag: + return &aptosgosdk.SignerTag{} + case commonaptos.VectorTag: + return &aptosgosdk.VectorTag{ + TypeParam: aptosgosdk.TypeTag{Value: convertTypeTagImplToSDK(v.ElementType.Value)}, + } + case commonaptos.StructTag: + typeParams := make([]aptosgosdk.TypeTag, len(v.TypeParams)) + for i, tp := range v.TypeParams { + typeParams[i] = aptosgosdk.TypeTag{Value: convertTypeTagImplToSDK(tp.Value)} + } + return &aptosgosdk.StructTag{ + Address: aptosgosdk.AccountAddress(v.Address), + Module: v.Module, + Name: v.Name, + TypeParams: typeParams, + } + case commonaptos.GenericTag: + return &aptosgosdk.GenericTag{Num: uint64(v.Index)} + default: + return nil + } +} diff --git a/relayer/chain/chain.go b/relayer/chain/chain.go index ad53db34..9ae975ce 100644 --- a/relayer/chain/chain.go +++ b/relayer/chain/chain.go @@ -39,6 +39,7 @@ type Chain interface { TxManager() *txm.AptosTxm LogPoller() *logpoller.AptosLogPoller GetClient() (aptos.AptosRpcClient, error) + KeyStore() loop.Keystore } type ChainOpts struct { @@ -73,10 +74,11 @@ var _ Chain = (*chain)(nil) type chain struct { starter commonutils.StartStopOnce - id string - cfg *config.TOMLConfig - lggr logger.Logger - ds sqlutil.DataSource + id string + cfg *config.TOMLConfig + lggr logger.Logger + ds sqlutil.DataSource + keyStore loop.Keystore // Sub-services txm *txm.AptosTxm @@ -113,10 +115,11 @@ func newChain(cfg *config.TOMLConfig, loopKs loop.Keystore, lggr logger.Logger, } ch := &chain{ - id: cfg.ChainID, - cfg: cfg, - lggr: logger.Named(lggr, "Chain"), - ds: ds, + id: cfg.ChainID, + cfg: cfg, + lggr: logger.Named(lggr, "Chain"), + ds: ds, + keyStore: loopKs, } ch.txm, err = txm.New(lggr, loopKs, *cfg.TransactionManager, ch.GetClient) @@ -169,6 +172,10 @@ func (c *chain) ChainID() string { return c.id } +func (c *chain) KeyStore() loop.Keystore { + return c.keyStore +} + // GetClient returns a client, randomly selecting one from available and valid nodes func (c *chain) GetClient() (aptos.AptosRpcClient, error) { var node *config.Node diff --git a/relayer/relay.go b/relayer/relay.go index 06f7f799..875b2dd3 100644 --- a/relayer/relay.go +++ b/relayer/relay.go @@ -21,6 +21,7 @@ import ( ) var _ types.Relayer = (*relayer)(nil) //nolint:staticcheck +var _ types.AptosService = (*relayer)(nil) type relayer struct { chain chain.Chain @@ -28,11 +29,13 @@ type relayer struct { starter utils.StartStopOnce stopCh services.StopChan + aptosService } func NewRelayer(lggr logger.Logger, chain chain.Chain, capRegistry core.CapabilitiesRegistry) (*relayer, error) { ctx := context.TODO() + // CAN I REMOVE THIS ? if chain.Config().Workflow != nil { capability, err := write_target.NewAptosWriteTarget(ctx, chain, lggr) if err != nil { @@ -49,6 +52,10 @@ func NewRelayer(lggr logger.Logger, chain chain.Chain, capRegistry core.Capabili chain: chain, lggr: lggr, stopCh: make(chan struct{}), + aptosService: aptosService{ + chain: chain, + logger: lggr, + }, }, nil } @@ -56,6 +63,10 @@ func (r *relayer) Name() string { return r.lggr.Name() } +func (r *relayer) Replay(ctx context.Context, fromBlock string, args map[string]any) error { + return errors.ErrUnsupported +} + // Start starts the relayer respecting the given context. func (r *relayer) Start(ctx context.Context) error { return r.starter.StartOnce("AptosRelayer", func() error { @@ -180,10 +191,6 @@ func (r *relayer) Solana() (types.SolanaService, error) { return nil, errors.New("SolanaService is not supported for aptos") } -func (r *relayer) Replay(ctx context.Context, fromBlock string, args map[string]any) error { - return errors.ErrUnsupported -} - // ChainService interface func (r *relayer) GetChainStatus(ctx context.Context) (types.ChainStatus, error) { return r.chain.GetChainStatus(ctx) @@ -204,3 +211,7 @@ func (r *relayer) ListNodeStatuses(ctx context.Context, pageSize int32, pageToke func (r *relayer) Transact(ctx context.Context, from, to string, amount *big.Int, balanceCheck bool) error { return r.chain.Transact(ctx, from, to, amount, balanceCheck) } + +func (r *relayer) Aptos() (types.AptosService, error) { + return r, nil +} diff --git a/relayer/txm/txm.go b/relayer/txm/txm.go index 4c8f4e1b..fb7ea48f 100644 --- a/relayer/txm/txm.go +++ b/relayer/txm/txm.go @@ -221,6 +221,93 @@ func (a *AptosTxm) Enqueue(transactionID string, txMetadata *commontypes.TxMeta, return nil } +// EnqueueCRE is like Enqueue but accepts a deserialized EntryFunction directly, +// skipping the string-based function parsing and BCS serialisation of parameters. +// The EntryFunction already contains the module, function name, type tags, and +// pre-encoded BCS args. +func (a *AptosTxm) EnqueueCRE(transactionID string, txMetadata *commontypes.TxMeta, publicKey string, entryFunction *aptos.EntryFunction, simulateTx bool) error { + if entryFunction == nil { + return errors.New("entry function is required") + } + + if transactionID == "" { + transactionID = uuid.New().String() + } else { + a.transactionsLock.Lock() + _, transactionExists := a.transactions[transactionID] + a.transactionsLock.Unlock() + if transactionExists { + return errors.New("transaction already exists") + } + } + + ctxLogger := GetContexedTxLogger(a.baseLogger, transactionID, txMetadata) + + ed25519PublicKey, err := utils.HexPublicKeyToEd25519PublicKey(publicKey) + if err != nil { + return fmt.Errorf("failed to convert public key: %+w", err) + } + + acc := utils.Ed25519PublicKeyToAddress(ed25519PublicKey) + fromAddress := acc.String() + + fromAccountAddress := &aptos.AccountAddress{} + err = fromAccountAddress.ParseStringRelaxed(fromAddress) + if err != nil { + return fmt.Errorf("failed to parse from address: %+w", err) + } + + currentTimestamp := getTimestampSecs() + tx := &AptosTx{ + ID: transactionID, + Metadata: txMetadata, + Timestamp: currentTimestamp, + FromAddress: *fromAccountAddress, + PublicKey: ed25519PublicKey, + ContractAddress: entryFunction.Module.Address, + ModuleName: entryFunction.Module.Name, + FunctionName: entryFunction.Function, + TypeTags: entryFunction.ArgTypes, + BcsValues: entryFunction.Args, + Status: commontypes.Pending, + Simulate: simulateTx, + } + + a.transactionsLock.Lock() + if (currentTimestamp - a.transactionsLastPruneTime) > a.config.PruneIntervalSecs { + for txID, tx := range a.transactions { + if tx.Status != commontypes.Finalized && tx.Status != commontypes.Failed && tx.Status != commontypes.Fatal { + continue + } + if (currentTimestamp - tx.Timestamp) < a.config.PruneTxExpirationSecs { + continue + } + ctxLogger.Debugw("Pruning transaction", "status", tx.Status) + delete(a.transactions, txID) + } + a.transactionsLastPruneTime = currentTimestamp + } + a.transactions[transactionID] = tx + a.transactionsLock.Unlock() + + select { + case a.broadcastChan <- transactionID: + ctxLogger.Debugw("Tx enqueued", "fromAddr", fromAddress) + default: + // if the channel is full, we drop the transaction. + // we do this instead of setting the tx in `a.transactions` post-broadcast to avoid a race + // with the broadcastLoop, which expects to find the tx in `a.transactions` upon reception of + // the id. + a.transactionsLock.Lock() + delete(a.transactions, transactionID) + a.transactionsLock.Unlock() + + return fmt.Errorf("failed to enqueue tx: %+v", tx) + } + + return nil +} + func (a *AptosTxm) GetStatus(transactionID string) (commontypes.TransactionStatus, error) { if transactionID == "" { return commontypes.Unknown, errors.New("nil tx id") diff --git a/relayer/txm/txm_local_test.go b/relayer/txm/txm_local_test.go index 55b021b3..ff2fa303 100644 --- a/relayer/txm/txm_local_test.go +++ b/relayer/txm/txm_local_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/aptos-labs/aptos-go-sdk" + "github.com/aptos-labs/aptos-go-sdk/bcs" "github.com/google/uuid" "github.com/stretchr/testify/require" "golang.org/x/crypto/sha3" @@ -152,6 +153,75 @@ func runTxmTest(t *testing.T, logger logger.Logger, config Config, rpcURL string logger.Debugw("Counter value after test", "value", counterValue) require.Equal(t, expectedValue, counterValue) + + // submit all txs at once and wait for all afterwards + // helps testing reties and failure recoveries + var txIDsCRE []string + + accountBytes, err := bcs.Serialize(&accountAddress) + require.NoError(t, err) + + threeBytes, err := bcs.SerializeU64(3) + require.NoError(t, err) + fourBytes, err := bcs.SerializeU64(4) + require.NoError(t, err) + + for i := 0; i < iterations; i++ { + incrementId := uuid.New().String() + err := txm.EnqueueCRE( + incrementId, + getSampleTxMetadata(), + publicKeyHex, + &aptos.EntryFunction{ + Module: aptos.ModuleId{ + Address: accountAddress, + Name: "counter", + }, + Function: "increment", + ArgTypes: []aptos.TypeTag{}, + Args: [][]byte{ + accountBytes, + }, + }, + true, + ) + require.NoError(t, err) + expectedValue += 1 + txIDsCRE = append(txIDsCRE, incrementId) + + incrementMultId := uuid.New().String() + err = txm.EnqueueCRE( + incrementMultId, + getSampleTxMetadata(), + publicKeyHex, + &aptos.EntryFunction{ + Module: aptos.ModuleId{ + Address: accountAddress, + Name: "counter", + }, + Function: "increment_mult", + ArgTypes: []aptos.TypeTag{}, + Args: [][]byte{ + accountBytes, + threeBytes, + fourBytes, + }, + }, + true, + ) + require.NoError(t, err) + expectedValue += 3 * 4 + txIDsCRE = append(txIDsCRE, incrementMultId) + } + + for _, txId := range txIDsCRE { + waitForTxmId(t, txm, txId, time.Minute*2) + } + + counterValueCRE := testutils.ReadCounterValue(t, client, accountAddress) + logger.Debugw("Counter value after test", "value", counterValueCRE) + + require.Equal(t, expectedValue, counterValueCRE) } func deployTestModule(t *testing.T, txm *AptosTxm, fromAddress aptos.AccountAddress, publicKeyHex string) {