Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 25 additions & 34 deletions .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ name: docker
on:
push:
branches:
- optional-proofs
- unstable
- stable
tags:
Expand All @@ -13,10 +14,9 @@ concurrency:
cancel-in-progress: true

env:
DOCKER_PASSWORD: ${{ secrets.DH_KEY }}
DOCKER_USERNAME: ${{ secrets.DH_ORG }}
# Enable self-hosted runners for the sigp repo only.
SELF_HOSTED_RUNNERS: ${{ github.repository == 'sigp/lighthouse' }}
REGISTRY: ghcr.io

jobs:
# Extract the VERSION which is either `latest` or `vX.Y.Z`, and the VERSION_SUFFIX
Expand All @@ -38,6 +38,11 @@ jobs:
run: |
echo "VERSION=latest" >> $GITHUB_ENV
echo "VERSION_SUFFIX=-unstable" >> $GITHUB_ENV
- name: Extract version (if optional-proofs)
if: github.event.ref == 'refs/heads/optional-proofs'
run: |
echo "VERSION=latest" >> $GITHUB_ENV
echo "VERSION_SUFFIX=-optional-proofs" >> $GITHUB_ENV
- name: Extract version (if tagged release)
if: startsWith(github.event.ref, 'refs/tags')
run: |
Expand All @@ -52,8 +57,7 @@ jobs:
runs-on: ${{ github.repository == 'sigp/lighthouse' && fromJson('["self-hosted", "linux", "release"]') || 'ubuntu-22.04' }}
strategy:
matrix:
binary: [lighthouse,
lcli]
binary: [lighthouse]
cpu_arch: [aarch64,
x86_64]
include:
Expand All @@ -68,9 +72,12 @@ jobs:
- name: Update Rust
if: env.SELF_HOSTED_RUNNERS == 'false'
run: rustup update stable
- name: Dockerhub login
run: |
echo "${DOCKER_PASSWORD}" | docker login --username ${DOCKER_USERNAME} --password-stdin
- name: Login to GitHub Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: Sets env vars for Lighthouse
if: startsWith(matrix.binary, 'lighthouse')
Expand All @@ -82,11 +89,6 @@ jobs:
run: |
echo "MAKE_CMD=build-${{ matrix.cpu_arch }}" >> $GITHUB_ENV

- name: Set `make` command for lcli
if: startsWith(matrix.binary, 'lcli')
run: |
echo "MAKE_CMD=build-lcli-${{ matrix.cpu_arch }}" >> $GITHUB_ENV

- name: Cross build binaries
run: |
cargo install cross
Expand Down Expand Up @@ -123,28 +125,14 @@ jobs:
platforms: linux/${{ env.SHORT_ARCH }}
push: true
tags: |
${{ github.repository_owner}}/${{ matrix.binary }}:${{ env.VERSION }}-${{ env.SHORT_ARCH }}${{ env.VERSION_SUFFIX }}

- name: Build and push (lcli)
if: startsWith(matrix.binary, 'lcli')
uses: docker/build-push-action@v5
with:
file: ./lcli/Dockerfile.cross
context: .
platforms: linux/${{ env.SHORT_ARCH }}
push: true

tags: |
${{ github.repository_owner}}/${{ matrix.binary }}:${{ env.VERSION }}-${{ env.SHORT_ARCH }}${{ env.VERSION_SUFFIX }}

${{ env.REGISTRY }}/${{ github.repository }}/${{ matrix.binary }}:${{ env.VERSION }}-${{ env.SHORT_ARCH }}${{ env.VERSION_SUFFIX }}

build-docker-multiarch:
name: build-docker-${{ matrix.binary }}-multiarch
runs-on: ubuntu-22.04
strategy:
matrix:
binary: [lighthouse,
lcli]
binary: [lighthouse]
needs: [build-docker-single-arch, extract-version]
env:
VERSION: ${{ needs.extract-version.outputs.VERSION }}
Expand All @@ -153,13 +141,16 @@ jobs:
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Dockerhub login
run: |
echo "${DOCKER_PASSWORD}" | docker login --username ${DOCKER_USERNAME} --password-stdin
- name: Login to GitHub Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: Create and push multiarch manifests
run: |
docker buildx imagetools create -t ${{ github.repository_owner}}/${{ matrix.binary }}:${VERSION}${VERSION_SUFFIX} \
${{ github.repository_owner}}/${{ matrix.binary }}:${VERSION}-arm64${VERSION_SUFFIX} \
${{ github.repository_owner}}/${{ matrix.binary }}:${VERSION}-amd64${VERSION_SUFFIX};
docker buildx imagetools create -t ${{ env.REGISTRY }}/${{ github.repository }}/${{ matrix.binary }}:${VERSION}${VERSION_SUFFIX} \
${{ env.REGISTRY }}/${{ github.repository }}/${{ matrix.binary }}:${VERSION}-arm64${VERSION_SUFFIX} \
${{ env.REGISTRY }}/${{ github.repository }}/${{ matrix.binary }}:${VERSION}-amd64${VERSION_SUFFIX};

29 changes: 24 additions & 5 deletions beacon_node/http_api/src/beacon/pool.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::task_spawner::{Priority, TaskSpawner};
use crate::utils::{NetworkTxFilter, OptionalConsensusVersionHeaderFilter, ResponseFilter};
use crate::utils::{
NetworkTxFilter, OptionalConsensusVersionHeaderFilter, ResponseFilter, SyncTxFilter,
};
use crate::version::{
ResponseIncludesVersion, V1, V2, add_consensus_version_header, beacon_response,
unsupported_version_rejection,
Expand All @@ -10,10 +12,10 @@ use beacon_chain::execution_proof_verification::{
};
use beacon_chain::observed_data_sidecars::Observe;
use beacon_chain::observed_operations::ObservationOutcome;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use beacon_chain::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes};
use eth2::types::{AttestationPoolQuery, EndpointVersion, Failure, GenericResponse};
use lighthouse_network::PubsubMessage;
use network::NetworkMessage;
use network::{NetworkMessage, SyncMessage};
use operation_pool::ReceivedPreCapella;
use slot_clock::SlotClock;
use std::collections::HashSet;
Expand Down Expand Up @@ -533,6 +535,7 @@ pub fn post_beacon_pool_attestations_v2<T: BeaconChainTypes>(
/// If the proof makes a block available, the block will be imported.
pub fn post_beacon_pool_execution_proofs<T: BeaconChainTypes>(
network_tx_filter: &NetworkTxFilter<T>,
sync_tx_filter: &SyncTxFilter<T>,
beacon_pool_path: &BeaconPoolPathFilter<T>,
) -> ResponseFilter {
beacon_pool_path
Expand All @@ -541,12 +544,15 @@ pub fn post_beacon_pool_execution_proofs<T: BeaconChainTypes>(
.and(warp::path::end())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.and(sync_tx_filter.clone())
.then(
|_task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
proof: ExecutionProof,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| async move {
let result = publish_execution_proof(chain, proof, network_tx).await;
network_tx_filter: UnboundedSender<NetworkMessage<T::EthSpec>>,
sync_tx_filter: UnboundedSender<SyncMessage<T::EthSpec>>| async move {
let result =
publish_execution_proof(chain, proof, network_tx_filter, sync_tx_filter).await;
convert_rejection(result.map(|()| warp::reply::json(&()))).await
},
)
Expand All @@ -558,6 +564,7 @@ async fn publish_execution_proof<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
proof: ExecutionProof,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
sync_tx: UnboundedSender<SyncMessage<T::EthSpec>>,
) -> Result<(), warp::Rejection> {
let proof = Arc::new(proof);

Expand Down Expand Up @@ -614,6 +621,18 @@ async fn publish_execution_proof<T: BeaconChainTypes>(
?status,
"Execution proof submitted and published"
);

if let AvailabilityProcessingStatus::Imported(_) = status {
chain.recompute_head_at_current_slot().await;

// Notify that block was imported via HTTP API
if let Err(e) = sync_tx.send(SyncMessage::GossipBlockProcessResult {
block_root,
imported: true,
}) {
debug!(error = %e, "Could not send message to the sync service")
};
}
}
Err(e) => {
// Log the error but don't fail the request - the proof was already
Expand Down
19 changes: 18 additions & 1 deletion beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,23 @@ pub fn serve<T: BeaconChainTypes>(
})
.boxed();

// Create a `warp` filter that provides access to the sync sender channel.
let sync_tx = ctx
.network_senders
.as_ref()
.map(|senders| senders.sync_send());
let sync_tx_filter = warp::any()
.map(move || sync_tx.clone())
.and_then(|sync_tx| async move {
match sync_tx {
Some(sync_tx) => Ok(sync_tx),
None => Err(warp_utils::reject::custom_not_found(
"The networking stack has not yet started (sync_tx).".to_string(),
)),
}
})
.boxed();

// Create a `warp` filter that rejects requests whilst the node is syncing.
let not_while_syncing_filter =
warp::any()
Expand Down Expand Up @@ -1515,7 +1532,7 @@ pub fn serve<T: BeaconChainTypes>(

// POST beacon/pool/execution_proofs
let post_beacon_pool_execution_proofs =
post_beacon_pool_execution_proofs(&network_tx_filter, &beacon_pool_path);
post_beacon_pool_execution_proofs(&network_tx_filter, &sync_tx_filter, &beacon_pool_path);

let beacon_rewards_path = eth_v1
.clone()
Expand Down
4 changes: 3 additions & 1 deletion beacon_node/http_api/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2::types::EndpointVersion;
use lighthouse_network::PubsubMessage;
use lighthouse_network::rpc::methods::MetaData;
use network::{NetworkMessage, ValidatorSubscriptionMessage};
use network::{NetworkMessage, SyncMessage, ValidatorSubscriptionMessage};
use parking_lot::RwLock;
use std::sync::Arc;
use tokio::sync::mpsc::{Sender, UnboundedSender};
Expand All @@ -20,6 +20,8 @@ pub type TaskSpawnerFilter<T> = BoxedFilter<(TaskSpawner<<T as BeaconChainTypes>
pub type ValidatorSubscriptionTxFilter = BoxedFilter<(Sender<ValidatorSubscriptionMessage>,)>;
pub type NetworkTxFilter<T> =
BoxedFilter<(UnboundedSender<NetworkMessage<<T as BeaconChainTypes>::EthSpec>>,)>;
pub type SyncTxFilter<T> =
BoxedFilter<(UnboundedSender<SyncMessage<<T as BeaconChainTypes>::EthSpec>>,)>;
pub type OptionalConsensusVersionHeaderFilter = BoxedFilter<(Option<ForkName>,)>;

pub fn from_meta_data<E: EthSpec>(
Expand Down
1 change: 1 addition & 0 deletions beacon_node/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ pub use lighthouse_network::NetworkConfig;
pub use service::{
NetworkMessage, NetworkReceivers, NetworkSenders, NetworkService, ValidatorSubscriptionMessage,
};
pub use sync::manager::SyncMessage;
5 changes: 2 additions & 3 deletions beacon_node/network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,13 @@ impl<T: BeaconChainTypes> Router<T> {
invalid_block_storage: InvalidBlockStorage,
beacon_processor_send: BeaconProcessorSend<T::EthSpec>,
fork_context: Arc<ForkContext>,
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
sync_recv: mpsc::UnboundedReceiver<SyncMessage<T::EthSpec>>,
) -> Result<mpsc::UnboundedSender<RouterMessage<T::EthSpec>>, String> {
trace!("Service starting");

let (handler_send, handler_recv) = mpsc::unbounded_channel();

// generate the message channel
let (sync_send, sync_recv) = mpsc::unbounded_channel::<SyncMessage<T::EthSpec>>();

let network_beacon_processor = NetworkBeaconProcessor {
beacon_processor_send,
duplicate_cache: DuplicateCache::default(),
Expand Down
13 changes: 13 additions & 0 deletions beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::network_beacon_processor::InvalidBlockStorage;
use crate::persisted_dht::{clear_dht, load_dht, persist_dht};
use crate::router::{Router, RouterMessage};
use crate::subnet_service::{SubnetService, SubnetServiceMessage, Subscription};
use crate::sync::manager::SyncMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use beacon_processor::BeaconProcessorSend;
use futures::channel::mpsc::Sender;
Expand Down Expand Up @@ -138,25 +139,30 @@ pub enum ValidatorSubscriptionMessage {
pub struct NetworkSenders<E: EthSpec> {
network_send: mpsc::UnboundedSender<NetworkMessage<E>>,
validator_subscription_send: mpsc::Sender<ValidatorSubscriptionMessage>,
sync_send: mpsc::UnboundedSender<SyncMessage<E>>,
}

pub struct NetworkReceivers<E: EthSpec> {
pub network_recv: mpsc::UnboundedReceiver<NetworkMessage<E>>,
pub validator_subscription_recv: mpsc::Receiver<ValidatorSubscriptionMessage>,
pub sync_recv: mpsc::UnboundedReceiver<SyncMessage<E>>,
}

impl<E: EthSpec> NetworkSenders<E> {
pub fn new() -> (Self, NetworkReceivers<E>) {
let (network_send, network_recv) = mpsc::unbounded_channel::<NetworkMessage<E>>();
let (validator_subscription_send, validator_subscription_recv) =
mpsc::channel(VALIDATOR_SUBSCRIPTION_MESSAGE_QUEUE_SIZE);
let (sync_send, sync_recv) = mpsc::unbounded_channel::<SyncMessage<E>>();
let senders = Self {
network_send,
validator_subscription_send,
sync_send,
};
let receivers = NetworkReceivers {
network_recv,
validator_subscription_recv,
sync_recv,
};
(senders, receivers)
}
Expand All @@ -168,6 +174,10 @@ impl<E: EthSpec> NetworkSenders<E> {
pub fn validator_subscription_send(&self) -> mpsc::Sender<ValidatorSubscriptionMessage> {
self.validator_subscription_send.clone()
}

pub fn sync_send(&self) -> mpsc::UnboundedSender<SyncMessage<E>> {
self.sync_send.clone()
}
}

/// Service that handles communication between internal services and the `lighthouse_network` network service.
Expand Down Expand Up @@ -320,6 +330,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
invalid_block_storage,
beacon_processor_send,
fork_context.clone(),
network_senders.sync_send(),
network_receivers.sync_recv,
)?;

// attestation and sync committee subnet service
Expand All @@ -338,6 +350,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
let NetworkReceivers {
network_recv,
validator_subscription_recv,
sync_recv: _,
} = network_receivers;

// create the network service and spawn the task
Expand Down
Loading