Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

### IMPROVEMENTS

- `[blocksync]` validate blocksync response sender and signature count
([\#5860](https://github.com/cometbft/cometbft/pull/5860))

### FEATURES

### BUG-FIXES
Expand Down
13 changes: 13 additions & 0 deletions blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,19 @@ func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequester
return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters)
}

// HasPendingRequestFrom reports whether we have at least one outstanding block
// request directed at the given peer.
func (pool *BlockPool) HasPendingRequestFrom(peerID p2p.ID) bool {
pool.mtx.Lock()
defer pool.mtx.Unlock()
for _, r := range pool.requesters {
if r.didRequestFrom(peerID) {
return true
}
}
return false
}

// IsCaughtUp returns true if this node is caught up, false - otherwise.
// TODO: relax conditions, prevent abuse.
func (pool *BlockPool) IsCaughtUp() bool {
Expand Down
49 changes: 49 additions & 0 deletions blocksync/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,3 +558,52 @@ func TestBlockPoolMaxPeerHeightRefreshesOnPopRequest(t *testing.T) {
require.EqualValues(t, 100, pool.MaxPeerHeight(),
"peer B must contribute to maxPeerHeight once pool.height reaches its base")
}

func TestBlockPoolHasPendingRequestFrom(t *testing.T) {
requestsCh := make(chan BlockRequest, 10)
errorsCh := make(chan peerError, 10)

pool := NewBlockPool(1, requestsCh, errorsCh)
pool.SetLogger(log.TestingLogger())

const (
primary = p2p.ID("primary")
secondary = p2p.ID("secondary")
stranger = p2p.ID("stranger")
)

// check initial state
require.False(t, pool.HasPendingRequestFrom(primary))
require.False(t, pool.HasPendingRequestFrom(secondary))
require.False(t, pool.HasPendingRequestFrom(stranger))

// Install a requester for height 1 targeting `primary`. We set the
// fields directly so we don't have to spin up the request goroutine.
pool.mtx.Lock()
req1 := newBPRequester(pool, 1)
req1.peerID = primary
pool.requesters[1] = req1
pool.mtx.Unlock()

require.True(t, pool.HasPendingRequestFrom(primary), "requested peer should be reported as pending")
require.False(t, pool.HasPendingRequestFrom(stranger), "non-requested peer must not be reported as pending")

// A second requester at height 2 also covers the secondPeerID slot.
pool.mtx.Lock()
req2 := newBPRequester(pool, 2)
req2.peerID = primary
req2.secondPeerID = secondary
pool.requesters[2] = req2
pool.mtx.Unlock()

require.True(t, pool.HasPendingRequestFrom(secondary), "secondary peer slot should count as pending")

// Removing both requesters drops the pending state.
pool.mtx.Lock()
delete(pool.requesters, 1)
delete(pool.requesters, 2)
pool.mtx.Unlock()

require.False(t, pool.HasPendingRequestFrom(primary))
require.False(t, pool.HasPendingRequestFrom(secondary))
}
112 changes: 91 additions & 21 deletions blocksync/reactor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package blocksync

import (
"errors"
"fmt"
"reflect"
"sync"
Expand Down Expand Up @@ -45,6 +46,9 @@ func (e peerError) Error() string {
return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error())
}

// Reactor implements MsgBytesFilter
var _ p2p.MsgBytesFilter = (*Reactor)(nil)

// Reactor handles long-term catchup syncing.
type Reactor struct {
p2p.BaseReactor
Expand Down Expand Up @@ -238,8 +242,93 @@ func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest, src p2p.Peer) (queu
})
}

func (r *Reactor) handlePeerResponse(msg *bcproto.BlockResponse, src p2p.Peer) {
bi, err := types.BlockFromProto(msg.Block)
if err != nil {
r.Logger.Error("Peer sent us invalid block", "peer", src, "msg", msg, "err", err)
r.Switch.StopPeerForError(src, err)
return
}

var extCommit *types.ExtendedCommit
if msg.ExtCommit != nil {
extCommit, err = types.ExtendedCommitFromProto(msg.ExtCommit)
if err != nil {
r.Logger.Error("Failed to convert extended commit from proto", "peer", src, "err", err)
r.Switch.StopPeerForError(src, err)
return
}
}

if err := r.pool.AddBlock(src.ID(), bi, extCommit, msg.Block.Size()); err != nil {
r.Logger.Error("Failed to add block", "peer", src, "err", err)
}
}

// FilterMsgBytes implements p2p.MsgBytesFilter and rejects messages from
// unexpected peers before unmarshalling the request.
func (bcR *Reactor) FilterMsgBytes(chID byte, src p2p.Peer, msgBytes []byte) error {
// do not check invalid messages, will fail unmarshalling
if chID != BlocksyncChannel || len(msgBytes) == 0 {
return nil
}

// unmarshal into custom stub struct that will do no allocations so we can
// quickly and cheaply check the validity of BlockResponse message
var stub bcproto.SigCountMessage
if err := stub.Unmarshal(msgBytes); err != nil {
return fmt.Errorf("malformed blocksync message from peer %s: %w", src.ID(), err)
}
if stub.BlockResponse == nil {
// Not a BlockResponse oneof case, no extra validation to do in this
// case
return nil
}

// blocksync not running, we should not be getting a BlockResponse
if !bcR.pool.IsRunning() {
return errors.New("unsolicited BlockResponse: blocksync not active")
}

// ensure we have an outstanding request to this peer
if !bcR.pool.HasPendingRequestFrom(src.ID()) {
return fmt.Errorf("unsolicited BlockResponse from peer %s", src.ID())
}

// validate the commit count in the response
if err := validateMaxVotes(stub.BlockResponse); err != nil {
return fmt.Errorf("validating max votes in BlockResponse from peer %s: %w", src.ID(), err)
}

return nil
}

// validateMaxVotes validates that the number of commit signatures and extended
// commit signatures are both less than the MaxVotesCount, returns an error if
// not.
func validateMaxVotes(br *bcproto.SigCountBlockResponse) error {
commitSigs, extSigs := 0, 0
if br != nil {
if br.Block != nil && br.Block.LastCommit != nil {
commitSigs = len(br.Block.LastCommit.Signatures)
}
if br.ExtCommit != nil {
extSigs = len(br.ExtCommit.ExtendedSignatures)
}
}

if commitSigs > types.MaxVotesCount {
return fmt.Errorf("too many commit signatures: %d (max %d)", commitSigs, types.MaxVotesCount)
}
if extSigs > types.MaxVotesCount {
return fmt.Errorf("too many extended commit signatures: %d (max %d)", extSigs, types.MaxVotesCount)
}

return nil
}

// Receive implements Reactor by handling 4 types of messages (look below).
func (bcR *Reactor) Receive(e p2p.Envelope) { //nolint: dupl // recreated in a test
func (bcR *Reactor) Receive(e p2p.Envelope) {
if err := ValidateMsg(e.Message); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err)
bcR.Switch.StopPeerForError(e.Src, err)
Expand All @@ -252,26 +341,7 @@ func (bcR *Reactor) Receive(e p2p.Envelope) { //nolint: dupl // recreated in a t
case *bcproto.BlockRequest:
bcR.respondToPeer(msg, e.Src)
case *bcproto.BlockResponse:
bi, err := types.BlockFromProto(msg.Block)
if err != nil {
bcR.Logger.Error("Block content is invalid", "err", err)
return
}
var extCommit *types.ExtendedCommit
if msg.ExtCommit != nil {
var err error
extCommit, err = types.ExtendedCommitFromProto(msg.ExtCommit)
if err != nil {
bcR.Logger.Error("failed to convert extended commit from proto",
"peer", e.Src,
"err", err)
return
}
}

if err := bcR.pool.AddBlock(e.Src.ID(), bi, extCommit, msg.Block.Size()); err != nil {
bcR.Logger.Error("failed to add block", "peer", e.Src, "err", err)
}
bcR.handlePeerResponse(msg, e.Src)
case *bcproto.StatusRequest:
// Send peer our state.
e.Src.TrySend(p2p.Envelope{
Expand Down
Loading
Loading