support a staging bucket in bulk storage#6018
Conversation
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
…ging-2 Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
|
@rautenrieth-da @ray-roestenburg-da I know this PR is a bit of a monster (sorry..), but can I get a review still please? 🙏 |
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
| automationConfig: AutomationConfig, | ||
| backoffClock: Clock, | ||
| retryProvider: RetryProvider, | ||
| )(implicit tracer: Tracer): PekkoRetryingService[S] |
There was a problem hiding this comment.
should this not return RetryingService[S]? or rename the trait to PekkoRetryableService
There was a problem hiding this comment.
renamed the trait, and also the method.
ray-roestenburg-da
left a comment
There was a problem hiding this comment.
Thanks! looks good, I only had one minor question
| .sourceKey(key) | ||
| .build() | ||
|
|
||
| s3Client.copyObject(copyReq).asScala.map(_ => ()) |
There was a problem hiding this comment.
Is unwrapCompletionException also useful here? (and in deleteObject)
There was a problem hiding this comment.
Not sure what you mean, and in what way you think it would be useful.
| else { client.deleteObjects(deleteObjRequest).asScala } | ||
| } yield { | ||
| () | ||
| }).futureValue |
There was a problem hiding this comment.
Looks like you are ignoring the future value?
There was a problem hiding this comment.
Oh wait scalatest, so this will just fail, never mind
There was a problem hiding this comment.
(maybe slightly better to map over the buckets and future sequence them and then get the futureValue on that)
| NotUsed, | ||
| ] = | ||
| Flow[(T, Seq[ObjectKeyAndChecksum])] | ||
| .mapAsync(parallelism = 1) { case (ts, objs) => |
There was a problem hiding this comment.
nit: you could set a val in this class to 1 and pass it to all mapAsyncs here cause it's important that's always one. val noParallelism = 1 or something
There was a problem hiding this comment.
I actually am not sure if all these parallelism = 1 are actually necessary, or we can tolerate parallelism. Left a TODO to revisit that. Clearly 1 is safest, so leaving as 1 for now.
| ) | ||
| .filter(identity) | ||
| .take(1) | ||
| // // Wait for update history to initialize and for history backfilling to complete before starting bulk storage dumps |
There was a problem hiding this comment.
nit: Prefer no commented code, rather remove it.
There was a problem hiding this comment.
whoops, for sure, this is just a leftover
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
|
(will merge on Monday, didn't yet merge the PR on cn-internal, and don't want to do that right before logging off for the long weekend) |
Fixes #6162
(See #5884 for context)
Could benefit from some more unit testing of the update history part, but this PR is a behemoth as it is (sorry, couldn't break it in a very sensible way), so I'm leaving that as a followup PR (and hence not competing #6126 here)on closer look, the two pipelines share 99% of the code now, and I don't see significant coverage gaps actually.Note that the changes are not bwd compatible, so require disabling bulk storage and deleting the data on CILR&DevNet where we enabled it (tracked in #5884)
Pull Request Checklist
Cluster Testing
/cluster_teston this PR to request it, and ping someone with access to the DA-internal system to approve it./upgrade_teston this PR to request it, and ping someone with access to the DA-internal system to approve it./hdm_teston this PR to request it, and ping someone with access to the DA-internal system to approve it./lsu_teston this PR to request it, and ping someone with access to the DA-internal system to approve it.PR Guidelines
Fixes #n, and mention issues worked on using#nMerge Guidelines