|
1 | | -use std::collections::BTreeMap; |
| 1 | +use std::collections::{BTreeMap, HashMap}; |
2 | 2 |
|
3 | 3 | use async_trait::async_trait; |
| 4 | +use graph::components::subgraph::ProofOfIndexingFinisher; |
4 | 5 | use graph::data::query::Trace; |
5 | 6 | use graph::data::store::Id; |
6 | 7 | use graph::prelude::alloy::primitives::Address; |
@@ -454,6 +455,183 @@ where |
454 | 455 | Ok(r::Value::List(public_poi_results)) |
455 | 456 | } |
456 | 457 |
|
| 458 | + async fn resolve_block_for_poi( |
| 459 | + &self, |
| 460 | + field: &a::Field, |
| 461 | + ) -> Result<r::Value, QueryExecutionError> { |
| 462 | + const MAX_BLOCK_RANGE: i32 = 1_000_000; |
| 463 | + const BLOCK_HASH_CHUNK_SIZE: usize = 50_000; |
| 464 | + |
| 465 | + let deployment_id = field |
| 466 | + .get_required::<DeploymentHash>("subgraph") |
| 467 | + .expect("Valid subgraph required"); |
| 468 | + let target_poi_hash = field |
| 469 | + .get_required::<BlockHash>("targetPoi") |
| 470 | + .expect("Valid targetPoi required"); |
| 471 | + let start_block = field |
| 472 | + .get_required::<BlockNumber>("startBlock") |
| 473 | + .expect("Valid startBlock required"); |
| 474 | + let end_block = field |
| 475 | + .get_required::<BlockNumber>("endBlock") |
| 476 | + .expect("Valid endBlock required"); |
| 477 | + |
| 478 | + if end_block <= start_block { |
| 479 | + return Ok(r::Value::Null); |
| 480 | + } |
| 481 | + |
| 482 | + if end_block - start_block > MAX_BLOCK_RANGE { |
| 483 | + return Err(QueryExecutionError::TooExpensive); |
| 484 | + } |
| 485 | + |
| 486 | + let target_bytes: [u8; 32] = match target_poi_hash.as_slice().try_into() { |
| 487 | + Ok(bytes) => bytes, |
| 488 | + Err(_) => { |
| 489 | + error!( |
| 490 | + self.logger, |
| 491 | + "Invalid targetPoi: expected 32 bytes"; |
| 492 | + "got_bytes" => target_poi_hash.as_slice().len() |
| 493 | + ); |
| 494 | + return Ok(r::Value::Null); |
| 495 | + } |
| 496 | + }; |
| 497 | + |
| 498 | + // Resolve the network for this deployment |
| 499 | + let network = match self.store.network_for_deployment(&deployment_id).await { |
| 500 | + Ok(n) => n, |
| 501 | + Err(e) => { |
| 502 | + error!( |
| 503 | + self.logger, |
| 504 | + "Failed to resolve network for deployment"; |
| 505 | + "subgraph" => &deployment_id, |
| 506 | + "error" => format!("{:?}", e) |
| 507 | + ); |
| 508 | + return Ok(r::Value::Null); |
| 509 | + } |
| 510 | + }; |
| 511 | + |
| 512 | + // Fetch the full digest history for the block range |
| 513 | + let history = match self |
| 514 | + .store |
| 515 | + .get_poi_digest_history(&deployment_id, start_block..end_block) |
| 516 | + .await |
| 517 | + { |
| 518 | + Ok(Some(h)) => h, |
| 519 | + Ok(None) => return Ok(r::Value::Null), |
| 520 | + Err(e) => { |
| 521 | + error!( |
| 522 | + self.logger, |
| 523 | + "Failed to fetch POI digest history"; |
| 524 | + "subgraph" => &deployment_id, |
| 525 | + "error" => format!("{:?}", e) |
| 526 | + ); |
| 527 | + return Ok(r::Value::Null); |
| 528 | + } |
| 529 | + }; |
| 530 | + |
| 531 | + let poi_version = history.poi_version; |
| 532 | + |
| 533 | + // Build a lookup structure: for each causality region id, a sorted |
| 534 | + // vec of (start_block, end_block, digest) for binary search. |
| 535 | + let mut region_entries: HashMap<Id, Vec<(BlockNumber, BlockNumber, Vec<u8>)>> = |
| 536 | + HashMap::new(); |
| 537 | + for entry in history.entries { |
| 538 | + region_entries.entry(entry.id).or_default().push(( |
| 539 | + entry.start_block, |
| 540 | + entry.end_block, |
| 541 | + entry.digest, |
| 542 | + )); |
| 543 | + } |
| 544 | + // Entries are already sorted by (id, start_block) from the SQL query, |
| 545 | + // but sort each region's vec to be safe. |
| 546 | + for entries in region_entries.values_mut() { |
| 547 | + entries.sort_by_key(|(start, _, _)| *start); |
| 548 | + } |
| 549 | + |
| 550 | + let chain_store = match self.store.block_store().chain_store(&network).await { |
| 551 | + Some(cs) => cs, |
| 552 | + None => { |
| 553 | + error!( |
| 554 | + self.logger, |
| 555 | + "Chain store not found for network"; |
| 556 | + "network" => &network |
| 557 | + ); |
| 558 | + return Ok(r::Value::Null); |
| 559 | + } |
| 560 | + }; |
| 561 | + |
| 562 | + // Process blocks in chunks to avoid excessive memory usage |
| 563 | + let mut current = start_block; |
| 564 | + while current < end_block { |
| 565 | + let chunk_end = std::cmp::min(current + BLOCK_HASH_CHUNK_SIZE as i32, end_block); |
| 566 | + |
| 567 | + // Batch-fetch block hashes for this chunk using block_ptrs_by_numbers |
| 568 | + let block_numbers: Vec<BlockNumber> = (current..chunk_end).collect(); |
| 569 | + let block_ptrs = match chain_store |
| 570 | + .cheap_clone() |
| 571 | + .block_ptrs_by_numbers(block_numbers) |
| 572 | + .await |
| 573 | + { |
| 574 | + Ok(ptrs) => ptrs, |
| 575 | + Err(e) => { |
| 576 | + error!( |
| 577 | + self.logger, |
| 578 | + "Failed to fetch block hashes"; |
| 579 | + "network" => &network, |
| 580 | + "range" => format!("{}..{}", current, chunk_end), |
| 581 | + "error" => format!("{:?}", e) |
| 582 | + ); |
| 583 | + return Ok(r::Value::Null); |
| 584 | + } |
| 585 | + }; |
| 586 | + |
| 587 | + for block_num in current..chunk_end { |
| 588 | + let ptrs = match block_ptrs.get(&block_num) { |
| 589 | + Some(p) if p.len() == 1 => p, |
| 590 | + _ => continue, // Skip blocks with no hash or ambiguous hashes |
| 591 | + }; |
| 592 | + let block_hash = &ptrs[0].hash; |
| 593 | + let block_ptr = BlockPtr::new(block_hash.clone(), block_num); |
| 594 | + |
| 595 | + // Compute POI for this block using digest entries |
| 596 | + let mut finisher = ProofOfIndexingFinisher::new( |
| 597 | + &block_ptr, |
| 598 | + &deployment_id, |
| 599 | + &Some(Address::ZERO), |
| 600 | + poi_version, |
| 601 | + ); |
| 602 | + |
| 603 | + for (region_id, entries) in ®ion_entries { |
| 604 | + // Binary search for the entry that covers this block number |
| 605 | + let idx = entries.partition_point(|(start, _, _)| *start <= block_num); |
| 606 | + if idx == 0 { |
| 607 | + continue; |
| 608 | + } |
| 609 | + let (start, end, ref digest) = entries[idx - 1]; |
| 610 | + if block_num >= start && block_num < end { |
| 611 | + finisher.add_causality_region(region_id, digest); |
| 612 | + } |
| 613 | + } |
| 614 | + |
| 615 | + let computed_poi = finisher.finish(); |
| 616 | + if computed_poi == target_bytes { |
| 617 | + return Ok(object! { |
| 618 | + __typename: "PoiSearchResult", |
| 619 | + deployment: deployment_id.to_string(), |
| 620 | + block: object! { |
| 621 | + hash: block_hash.hash_hex(), |
| 622 | + number: block_num, |
| 623 | + }, |
| 624 | + proofOfIndexing: format!("0x{}", hex::encode(computed_poi)), |
| 625 | + }); |
| 626 | + } |
| 627 | + } |
| 628 | + |
| 629 | + current = chunk_end; |
| 630 | + } |
| 631 | + |
| 632 | + Ok(r::Value::Null) |
| 633 | + } |
| 634 | + |
457 | 635 | async fn resolve_indexing_status_for_version( |
458 | 636 | &self, |
459 | 637 | field: &a::Field, |
@@ -858,6 +1036,7 @@ where |
858 | 1036 | // The top-level `subgraphVersions` field |
859 | 1037 | (None, "apiVersions") => self.resolve_api_versions(field), |
860 | 1038 | (None, "version") => self.version(), |
| 1039 | + (None, "blockForPoi") => self.resolve_block_for_poi(field).await, |
861 | 1040 |
|
862 | 1041 | // Resolve fields of `Object` values (e.g. the `latestBlock` field of `EthereumBlock`) |
863 | 1042 | (value, _) => Ok(value.unwrap_or(r::Value::Null)), |
|
0 commit comments