Skip to content

support a staging bucket in bulk storage#6018

Open
isegall-da wants to merge 43 commits into
mainfrom
isegall/bulk-staging-2
Open

support a staging bucket in bulk storage#6018
isegall-da wants to merge 43 commits into
mainfrom
isegall/bulk-staging-2

Conversation

@isegall-da

@isegall-da isegall-da commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Fixes #6162
(See #5884 for context)

Could benefit from some more unit testing of the update history part, but this PR is a behemoth as it is (sorry, couldn't break it in a very sensible way), so I'm leaving that as a followup PR (and hence not competing #6126 here) on closer look, the two pipelines share 99% of the code now, and I don't see significant coverage gaps actually.

Note that the changes are not bwd compatible, so require disabling bulk storage and deleting the data on CILR&DevNet where we enabled it (tracked in #5884)

Pull Request Checklist

Cluster Testing

  • If a cluster test is required, comment /cluster_test on this PR to request it, and ping someone with access to the DA-internal system to approve it.
  • If an upgrade test is required, comment /upgrade_test on this PR to request it, and ping someone with access to the DA-internal system to approve it.
  • If a hard-migration test is required (from the latest release), comment /hdm_test on this PR to request it, and ping someone with access to the DA-internal system to approve it.
  • If a logical synchronizer upgrade test is required (from canton-3.5), comment /lsu_test on this PR to request it, and ping someone with access to the DA-internal system to approve it.

PR Guidelines

  • Include any change that might be observable by our partners or affect their deployment in the release notes.
  • Specify fixed issues with Fixes #n, and mention issues worked on using #n
  • Include a screenshot for frontend-related PRs - see README or use your favorite screenshot tool

Merge Guidelines

  • Make the git commit message look sensible when squash-merging on GitHub (most likely: just copy your PR description).

Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
.
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
…ging-2

Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
@isegall-da isegall-da changed the title WIP: bulk staging support a staging bucket in bulk storage Jun 30, 2026
@isegall-da isegall-da marked this pull request as ready for review June 30, 2026 02:22
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
@isegall-da

Copy link
Copy Markdown
Contributor Author

@rautenrieth-da @ray-roestenburg-da I know this PR is a bit of a monster (sorry..), but can I get a review still please? 🙏

Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
automationConfig: AutomationConfig,
backoffClock: Clock,
retryProvider: RetryProvider,
)(implicit tracer: Tracer): PekkoRetryingService[S]

@ray-roestenburg-da ray-roestenburg-da Jul 2, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this not return RetryingService[S]? or rename the trait to PekkoRetryableService

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed the trait, and also the method.

@ray-roestenburg-da ray-roestenburg-da left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! looks good, I only had one minor question

.sourceKey(key)
.build()

s3Client.copyObject(copyReq).asScala.map(_ => ())

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is unwrapCompletionException also useful here? (and in deleteObject)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean, and in what way you think it would be useful.

else { client.deleteObjects(deleteObjRequest).asScala }
} yield {
()
}).futureValue

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you are ignoring the future value?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait scalatest, so this will just fail, never mind

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(maybe slightly better to map over the buckets and future sequence them and then get the futureValue on that)

NotUsed,
] =
Flow[(T, Seq[ObjectKeyAndChecksum])]
.mapAsync(parallelism = 1) { case (ts, objs) =>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you could set a val in this class to 1 and pass it to all mapAsyncs here cause it's important that's always one. val noParallelism = 1 or something

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually am not sure if all these parallelism = 1 are actually necessary, or we can tolerate parallelism. Left a TODO to revisit that. Clearly 1 is safest, so leaving as 1 for now.

)
.filter(identity)
.take(1)
// // Wait for update history to initialize and for history backfilling to complete before starting bulk storage dumps

@ray-roestenburg-da ray-roestenburg-da Jul 2, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Prefer no commented code, rather remove it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops, for sure, this is just a leftover

Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
@isegall-da

Copy link
Copy Markdown
Contributor Author

(will merge on Monday, didn't yet merge the PR on cn-internal, and don't want to do that right before logging off for the long weekend)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Introduce the basic mechanism for the dual-bucket approach, including moving objects, but without BFT checks still

2 participants