Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::aggregator::{
empty_batch_aggregations,
http_handlers::{
test_util::{decode_response_body, take_problem_details, HttpHandlerTest},
tests::aggregate_share::post_aggregate_share_request,
AggregatorHandlerBuilder,
},
test_util::{
Expand Down Expand Up @@ -30,10 +31,10 @@ use janus_core::{
};
use janus_messages::{
query_type::{FixedSize, TimeInterval},
AggregationJobId, AggregationJobInitializeReq, AggregationJobResp, AggregationJobStep,
Duration, Extension, ExtensionType, HpkeCiphertext, InputShareAad, Interval,
PartialBatchSelector, PrepareError, PrepareInit, PrepareStepResult, ReportIdChecksum,
ReportMetadata, ReportShare, Time,
AggregateShareReq, AggregationJobId, AggregationJobInitializeReq, AggregationJobResp,
AggregationJobStep, BatchId, BatchSelector, Duration, Extension, ExtensionType, HpkeCiphertext,
InputShareAad, Interval, PartialBatchSelector, PrepareError, PrepareInit, PrepareStepResult,
ReportIdChecksum, ReportMetadata, ReportShare, Time,
};
use prio::{codec::Encode, vdaf::dummy};
use rand::random;
Expand Down Expand Up @@ -978,3 +979,144 @@ async fn aggregate_init_duplicated_report_id() {
assert_task_aggregation_counter(&datastore, *task.id(), TaskAggregationCounter::default())
.await;
}

#[tokio::test]
async fn aggregate_init_partially_replayed_aggregation_init() {
// Create 5 reports, 1-5. Send one aggregation job init request containing reports 1 and 2. It
// should succeed normally. Then send another init request containing reports 1-5. We expect:
// - the request overall succeeds (i.e. HTTP 200)
// - the PrepareResps for reports 1 and 2 indicate rejection
// - the PrepareResps for reports 3-5 indicate success
// We then send an aggregate share request for the batch ID. It should succeed and all five
// reports should be included.
let HttpHandlerTest {
clock,
ephemeral_datastore: _ephemeral_datastore,
datastore,
handler,
hpke_keypair,
..
} = HttpHandlerTest::new().await;

let task = TaskBuilder::new(
QueryType::FixedSize {
max_batch_size: None,
batch_time_window_size: None,
},
VdafInstance::Fake { rounds: 1 },
)
.with_min_batch_size(1)
.build();

let batch_id = BatchId::from([12; 32]);
let agg_param = dummy::AggregationParam(0).get_encoded().unwrap();
let partial_batch_selector = PartialBatchSelector::new_fixed_size(batch_id);

let helper_task = task.helper_view().unwrap();
let prep_init_generator = PrepareInitGenerator::new(
clock.clone(),
helper_task.clone(),
hpke_keypair.config().clone(),
dummy::Vdaf::new(1),
dummy::AggregationParam(0),
);

datastore.put_aggregator_task(&helper_task).await.unwrap();

let (prepare_init_1, _) = prep_init_generator.next(&1);
let (prepare_init_2, _) = prep_init_generator.next(&2);
let (prepare_init_3, _) = prep_init_generator.next(&3);
let (prepare_init_4, _) = prep_init_generator.next(&4);
let (prepare_init_5, _) = prep_init_generator.next(&5);
let report_ids: Vec<_> = [
&prepare_init_1,
&prepare_init_2,
&prepare_init_3,
&prepare_init_4,
&prepare_init_5,
]
.iter()
.map(|pi| *pi.report_share().metadata().id())
.collect();

let request = AggregationJobInitializeReq::new(
agg_param.clone(),
partial_batch_selector.clone(),
Vec::from([prepare_init_1.clone(), prepare_init_2.clone()]),
);

let mut test_conn = put_aggregation_job(&task, &random(), &request, &handler).await;
assert_eq!(test_conn.status(), Some(Status::Ok));
let aggregate_resp: AggregationJobResp = decode_response_body(&mut test_conn).await;

// Response contains all the reports from the request
assert_eq!(
&report_ids[0..2],
request
.prepare_inits()
.iter()
.map(|init| *init.report_share().metadata().id())
.collect::<Vec<_>>()
.as_slice(),
);
for resp in aggregate_resp.prepare_resps() {
assert_matches!(resp.result(), &PrepareStepResult::Continue { .. });
}

let request = AggregationJobInitializeReq::new(
agg_param.clone(),
partial_batch_selector,
Vec::from([
prepare_init_1.clone(),
prepare_init_2.clone(),
prepare_init_3.clone(),
prepare_init_4.clone(),
prepare_init_5.clone(),
]),
);

let mut test_conn = put_aggregation_job(&task, &random(), &request, &handler).await;
assert_eq!(test_conn.status(), Some(Status::Ok));
let aggregate_resp: AggregationJobResp = decode_response_body(&mut test_conn).await;

// Response contains all the reports from the request
assert_eq!(
report_ids,
request
.prepare_inits()
.iter()
.map(|init| *init.report_share().metadata().id())
.collect::<Vec<_>>(),
);
for resp in aggregate_resp.prepare_resps() {
if report_ids[0..2].contains(resp.report_id()) {
assert_matches!(
resp.result(),
&PrepareStepResult::Reject(PrepareError::ReportReplayed),
"first two reports must be rejected as replays",
)
}
if report_ids[2..5].contains(resp.report_id()) {
assert_matches!(
resp.result(),
&PrepareStepResult::Continue { .. },
"last three reports must be accepted",
);
}
}

let request = AggregateShareReq::new(
BatchSelector::new_fixed_size(batch_id),
agg_param.clone(),
5,
ReportIdChecksum::from_report_ids(&report_ids),
);
// If the request succeeds, then the checksum was valid and helper agrees that all 5 reports are
// included.
assert_eq!(
post_aggregate_share_request(&task, &request, &handler)
.await
.status(),
Some(Status::Ok)
);
}
12 changes: 11 additions & 1 deletion core/src/report_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,20 @@ use aws_lc_rs::digest::{digest, SHA256, SHA256_OUTPUT_LEN};
use janus_messages::{ReportId, ReportIdChecksum};

/// Additional methods for working with a [`ReportIdChecksum`].
pub trait ReportIdChecksumExt {
pub trait ReportIdChecksumExt: Sized {
/// Initialize a checksum from a single report ID.
fn for_report_id(report_id: &ReportId) -> Self;

/// Initialize a checksum from multiple report IDs.
fn from_report_ids(report_ids: &[ReportId]) -> Self {
let mut ret = Self::for_report_id(&report_ids[0]);
for report_id in &report_ids[1..] {
ret = ret.updated_with(report_id);
}

ret
}

/// Incorporate the provided report ID into this checksum.
fn updated_with(self, report_id: &ReportId) -> Self;

Expand Down
Loading