diff --git a/src/main/java/io/ringbroker/broker/ingress/ClusteredIngress.java b/src/main/java/io/ringbroker/broker/ingress/ClusteredIngress.java index 75e94f6..d53c5ed 100644 --- a/src/main/java/io/ringbroker/broker/ingress/ClusteredIngress.java +++ b/src/main/java/io/ringbroker/broker/ingress/ClusteredIngress.java @@ -1180,50 +1180,77 @@ private void backfillTick() { for (final EpochMetadata em : cfg.epochs()) { final long epoch = em.epoch(); + final long sealedEnd = em.endSeq(); if (!em.isSealed()) continue; if (!em.placement().getStorageNodes().contains(myNodeId)) continue; - if (ing.getVirtualLog().hasEpoch(epoch)) continue; + + final boolean epochExists = ing.getVirtualLog().hasEpoch(epoch); + final long localHwm = epochExists ? ing.getVirtualLog().forEpoch(epoch).getHighWaterMark() : -1L; + if (localHwm >= sealedEnd) { + backfillPlanner.markPresent(pid, epoch); + continue; + } + + long nextOffset = Math.max(0L, localHwm + 1); + boolean completed = false; for (final int target : em.placement().getStorageNodesArray()) { if (target == myNodeId) continue; final RemoteBrokerClient client = clusterNodes.get(target); if (client == null) continue; try { - final BrokerApi.Envelope req = BrokerApi.Envelope.newBuilder() - .setBackfill(BrokerApi.BackfillRequest.newBuilder() - .setPartitionId(pid) - .setEpoch(epoch) - .setOffset(0) - .setMaxBytes(256 * 1024) - .build()) - .build(); - final BrokerApi.BackfillReply reply = client.sendBackfill(req).get(5, TimeUnit.SECONDS); - if (!reply.getRedirectNodesList().isEmpty()) continue; - final byte[] payload = reply.getPayload().toByteArray(); - if (payload.length == 0) continue; - - int pos = 0; - int count = 0; - final byte[][] batch = new byte[backfillBatchSize][]; - while (pos + Integer.BYTES <= payload.length && count < backfillBatchSize) { - final int len = (payload[pos] & 0xFF) | - ((payload[pos + 1] & 0xFF) << 8) | - ((payload[pos + 2] & 0xFF) << 16) | - ((payload[pos + 3] & 0xFF) << 24); - pos += Integer.BYTES; - if (pos + len > payload.length) break; - final byte[] rec = new byte[len]; - System.arraycopy(payload, pos, rec, 0, len); - batch[count++] = rec; - pos += len; - } - if (count > 0) { + while (true) { + if (nextOffset > sealedEnd) { + completed = true; + break; + } + final BrokerApi.Envelope req = BrokerApi.Envelope.newBuilder() + .setBackfill(BrokerApi.BackfillRequest.newBuilder() + .setPartitionId(pid) + .setEpoch(epoch) + .setOffset(nextOffset) + .setMaxBytes(256 * 1024) + .build()) + .build(); + final BrokerApi.BackfillReply reply = client.sendBackfill(req).get(5, TimeUnit.SECONDS); + if (!reply.getRedirectNodesList().isEmpty()) break; + final byte[] payload = reply.getPayload().toByteArray(); + if (payload.length == 0) { + if (reply.getEndOfEpoch()) completed = true; + break; + } + + int pos = 0; + int count = 0; + final byte[][] batch = new byte[backfillBatchSize][]; + while (pos + Integer.BYTES <= payload.length && count < backfillBatchSize) { + final int len = (payload[pos] & 0xFF) | + ((payload[pos + 1] & 0xFF) << 8) | + ((payload[pos + 2] & 0xFF) << 16) | + ((payload[pos + 3] & 0xFF) << 24); + pos += Integer.BYTES; + if (pos + len > payload.length) break; + final byte[] rec = new byte[len]; + System.arraycopy(payload, pos, rec, 0, len); + batch[count++] = rec; + pos += len; + } + if (count == 0) break; + ing.appendBackfillBatch(epoch, batch, count); - backfillPlanner.markPresent(pid, epoch); + nextOffset += count; + + if (reply.getEndOfEpoch()) { + completed = true; + break; + } } - if (reply.getEndOfEpoch()) break; } catch (final Exception ignored) { } + if (completed) { + backfillPlanner.markPresent(pid, epoch); + break; + } } } } diff --git a/src/test/java/io/ringbroker/broker/ingress/ClusteredIngressTest.java b/src/test/java/io/ringbroker/broker/ingress/ClusteredIngressTest.java index 61a7a34..4217ef8 100644 --- a/src/test/java/io/ringbroker/broker/ingress/ClusteredIngressTest.java +++ b/src/test/java/io/ringbroker/broker/ingress/ClusteredIngressTest.java @@ -1,11 +1,13 @@ package io.ringbroker.broker.ingress; +import com.google.protobuf.ByteString; import io.ringbroker.api.BrokerApi; import io.ringbroker.broker.role.BrokerRole; import io.ringbroker.cluster.client.RemoteBrokerClient; import io.ringbroker.cluster.membership.member.Member; import io.ringbroker.cluster.membership.replicator.AdaptiveReplicator; import io.ringbroker.cluster.membership.resolver.ReplicaSetResolver; +import io.ringbroker.cluster.metadata.EpochPlacement; import io.ringbroker.cluster.metadata.JournaledLogMetadataStore; import io.ringbroker.core.wait.Blocking; import io.ringbroker.offset.InMemoryOffsetStore; @@ -13,11 +15,13 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import java.io.ByteArrayOutputStream; import java.nio.file.Path; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeoutException; import static org.junit.jupiter.api.Assertions.*; @@ -55,6 +59,26 @@ void replicationTimeoutSurfacesToCaller(@TempDir final Path dir) throws Exceptio c.close(); } + @Test + void backfillTickResumesFromLocalHighWatermark(@TempDir final Path dir) throws Exception { + final RecordingBackfillClient peer = new RecordingBackfillClient(List.of( + "m0".getBytes(), + "m1".getBytes(), + "m2".getBytes() + )); + + final Components harness = backfillHarness(dir, peer); + + invokeBackfillTick(harness.ingress); + + final Ingress local = harness.ingress.getIngressMap().get(0); + assertNotNull(local); + assertEquals(2L, local.getVirtualLog().forEpoch(0L).getHighWaterMark(), "should backfill full sealed epoch"); + assertEquals(List.of(0L, 1L, 2L), peer.offsets(), "offset should advance between backfill batches"); + + harness.close(); + } + private Components singleNode(final Path base) throws Exception { final TopicRegistry registry = TopicRegistry.builder() .topic("t", BrokerApi.Message.getDescriptor()) @@ -141,6 +165,124 @@ public CompletableFuture sendEnvelopeWithAck(final Bro return new Components(ingress, offsets); } + private Components backfillHarness(final Path base, final RecordingBackfillClient peer) throws Exception { + final TopicRegistry registry = TopicRegistry.builder() + .topic("t", BrokerApi.Message.getDescriptor()) + .build(); + final InMemoryOffsetStore offsets = new InMemoryOffsetStore(base.resolve("offsets")); + final JournaledLogMetadataStore metadata = new JournaledLogMetadataStore(base.resolve("meta")); + + final EpochPlacement p0 = new EpochPlacement(0L, List.of(0, 1), 1); + metadata.bootstrapIfAbsent(0, p0, 0L); + final EpochPlacement p1 = new EpochPlacement(1L, List.of(0, 1), 1); + metadata.sealAndCreateEpoch(0, 0L, peer.sealedEnd(), p1, 1L, 1L); + + final Map clients = Map.of(1, peer); + final AdaptiveReplicator replicator = new AdaptiveReplicator(1, clients, 50); + + final List members = List.of( + new Member(0, BrokerRole.PERSISTENCE, new java.net.InetSocketAddress("localhost", 0), System.currentTimeMillis(), 1), + new Member(1, BrokerRole.PERSISTENCE, new java.net.InetSocketAddress("localhost", 0), System.currentTimeMillis(), 1) + ); + final ReplicaSetResolver resolver = new ReplicaSetResolver(2, () -> members); + + final ClusteredIngress ingress = ClusteredIngress.create( + registry, + (key, total) -> 0, + 1, + 0, + 2, + clients, + base.resolve("data"), + 8, + new Blocking(), + 512, + 4, + false, + offsets, + BrokerRole.PERSISTENCE, + resolver, + replicator, + metadata + ); + return new Components(ingress, offsets); + } + + private void invokeBackfillTick(final ClusteredIngress ingress) throws Exception { + final var m = ClusteredIngress.class.getDeclaredMethod("backfillTick"); + m.setAccessible(true); + m.invoke(ingress); + } + + private static final class RecordingBackfillClient implements RemoteBrokerClient { + private final List data; + private final long sealedEnd; + private final CopyOnWriteArrayList offsets = new CopyOnWriteArrayList<>(); + + RecordingBackfillClient(final List data) { + this.data = data; + this.sealedEnd = data.size() - 1L; + } + + long sealedEnd() { + return sealedEnd; + } + + List offsets() { + return offsets; + } + + @Override + public void sendMessage(final String topic, final byte[] key, final byte[] payload) { + // no-op for tests + } + + @Override + public CompletableFuture sendEnvelopeWithAck(final BrokerApi.Envelope envelope) { + return CompletableFuture.completedFuture( + BrokerApi.ReplicationAck.newBuilder() + .setStatus(BrokerApi.ReplicationAck.Status.SUCCESS) + .build() + ); + } + + @Override + public CompletableFuture sendBackfill(final BrokerApi.Envelope envelope) { + final BrokerApi.BackfillRequest req = envelope.getBackfill(); + offsets.add(req.getOffset()); + + final long off = req.getOffset(); + if (off >= data.size()) { + return CompletableFuture.completedFuture( + BrokerApi.BackfillReply.newBuilder() + .setPayload(ByteString.EMPTY) + .setEndOfEpoch(true) + .build() + ); + } + + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + int count = 0; + for (long i = off; i < data.size() && count < 1; i++) { + final byte[] rec = data.get((int) i); + out.write(rec.length & 0xFF); + out.write((rec.length >>> 8) & 0xFF); + out.write((rec.length >>> 16) & 0xFF); + out.write((rec.length >>> 24) & 0xFF); + out.write(rec, 0, rec.length); + count++; + } + + final boolean end = (off + count) > sealedEnd; + return CompletableFuture.completedFuture( + BrokerApi.BackfillReply.newBuilder() + .setPayload(ByteString.copyFrom(out.toByteArray())) + .setEndOfEpoch(end) + .build() + ); + } + } + private record Components(ClusteredIngress ingress, AutoCloseable offsets) implements AutoCloseable { @Override public void close() throws Exception {