Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 59 additions & 32 deletions src/main/java/io/ringbroker/broker/ingress/ClusteredIngress.java
Original file line number Diff line number Diff line change
Expand Up @@ -1180,50 +1180,77 @@ private void backfillTick() {

for (final EpochMetadata em : cfg.epochs()) {
final long epoch = em.epoch();
final long sealedEnd = em.endSeq();
if (!em.isSealed()) continue;
if (!em.placement().getStorageNodes().contains(myNodeId)) continue;
if (ing.getVirtualLog().hasEpoch(epoch)) continue;

final boolean epochExists = ing.getVirtualLog().hasEpoch(epoch);
final long localHwm = epochExists ? ing.getVirtualLog().forEpoch(epoch).getHighWaterMark() : -1L;
if (localHwm >= sealedEnd) {
backfillPlanner.markPresent(pid, epoch);
continue;
}

long nextOffset = Math.max(0L, localHwm + 1);
boolean completed = false;

for (final int target : em.placement().getStorageNodesArray()) {
if (target == myNodeId) continue;
final RemoteBrokerClient client = clusterNodes.get(target);
if (client == null) continue;
try {
final BrokerApi.Envelope req = BrokerApi.Envelope.newBuilder()
.setBackfill(BrokerApi.BackfillRequest.newBuilder()
.setPartitionId(pid)
.setEpoch(epoch)
.setOffset(0)
.setMaxBytes(256 * 1024)
.build())
.build();
final BrokerApi.BackfillReply reply = client.sendBackfill(req).get(5, TimeUnit.SECONDS);
if (!reply.getRedirectNodesList().isEmpty()) continue;
final byte[] payload = reply.getPayload().toByteArray();
if (payload.length == 0) continue;

int pos = 0;
int count = 0;
final byte[][] batch = new byte[backfillBatchSize][];
while (pos + Integer.BYTES <= payload.length && count < backfillBatchSize) {
final int len = (payload[pos] & 0xFF) |
((payload[pos + 1] & 0xFF) << 8) |
((payload[pos + 2] & 0xFF) << 16) |
((payload[pos + 3] & 0xFF) << 24);
pos += Integer.BYTES;
if (pos + len > payload.length) break;
final byte[] rec = new byte[len];
System.arraycopy(payload, pos, rec, 0, len);
batch[count++] = rec;
pos += len;
}
if (count > 0) {
while (true) {
if (nextOffset > sealedEnd) {
completed = true;
break;
}
final BrokerApi.Envelope req = BrokerApi.Envelope.newBuilder()
.setBackfill(BrokerApi.BackfillRequest.newBuilder()
.setPartitionId(pid)
.setEpoch(epoch)
.setOffset(nextOffset)
.setMaxBytes(256 * 1024)
.build())
.build();
final BrokerApi.BackfillReply reply = client.sendBackfill(req).get(5, TimeUnit.SECONDS);
if (!reply.getRedirectNodesList().isEmpty()) break;
final byte[] payload = reply.getPayload().toByteArray();
if (payload.length == 0) {
if (reply.getEndOfEpoch()) completed = true;
break;
}

int pos = 0;
int count = 0;
final byte[][] batch = new byte[backfillBatchSize][];
while (pos + Integer.BYTES <= payload.length && count < backfillBatchSize) {
final int len = (payload[pos] & 0xFF) |
((payload[pos + 1] & 0xFF) << 8) |
((payload[pos + 2] & 0xFF) << 16) |
((payload[pos + 3] & 0xFF) << 24);
pos += Integer.BYTES;
if (pos + len > payload.length) break;
final byte[] rec = new byte[len];
System.arraycopy(payload, pos, rec, 0, len);
batch[count++] = rec;
pos += len;
}
if (count == 0) break;

ing.appendBackfillBatch(epoch, batch, count);
backfillPlanner.markPresent(pid, epoch);
nextOffset += count;

if (reply.getEndOfEpoch()) {
completed = true;
break;
}
}
if (reply.getEndOfEpoch()) break;
} catch (final Exception ignored) {
}
if (completed) {
backfillPlanner.markPresent(pid, epoch);
break;
}
}
}
}
Expand Down
142 changes: 142 additions & 0 deletions src/test/java/io/ringbroker/broker/ingress/ClusteredIngressTest.java
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
package io.ringbroker.broker.ingress;

import com.google.protobuf.ByteString;
import io.ringbroker.api.BrokerApi;
import io.ringbroker.broker.role.BrokerRole;
import io.ringbroker.cluster.client.RemoteBrokerClient;
import io.ringbroker.cluster.membership.member.Member;
import io.ringbroker.cluster.membership.replicator.AdaptiveReplicator;
import io.ringbroker.cluster.membership.resolver.ReplicaSetResolver;
import io.ringbroker.cluster.metadata.EpochPlacement;
import io.ringbroker.cluster.metadata.JournaledLogMetadataStore;
import io.ringbroker.core.wait.Blocking;
import io.ringbroker.offset.InMemoryOffsetStore;
import io.ringbroker.registry.TopicRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.ByteArrayOutputStream;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeoutException;

import static org.junit.jupiter.api.Assertions.*;
Expand Down Expand Up @@ -55,6 +59,26 @@ void replicationTimeoutSurfacesToCaller(@TempDir final Path dir) throws Exceptio
c.close();
}

@Test
void backfillTickResumesFromLocalHighWatermark(@TempDir final Path dir) throws Exception {
final RecordingBackfillClient peer = new RecordingBackfillClient(List.of(
"m0".getBytes(),
"m1".getBytes(),
"m2".getBytes()
));

final Components harness = backfillHarness(dir, peer);

invokeBackfillTick(harness.ingress);

final Ingress local = harness.ingress.getIngressMap().get(0);
assertNotNull(local);
assertEquals(2L, local.getVirtualLog().forEpoch(0L).getHighWaterMark(), "should backfill full sealed epoch");
assertEquals(List.of(0L, 1L, 2L), peer.offsets(), "offset should advance between backfill batches");

harness.close();
}

private Components singleNode(final Path base) throws Exception {
final TopicRegistry registry = TopicRegistry.builder()
.topic("t", BrokerApi.Message.getDescriptor())
Expand Down Expand Up @@ -141,6 +165,124 @@ public CompletableFuture<BrokerApi.ReplicationAck> sendEnvelopeWithAck(final Bro
return new Components(ingress, offsets);
}

private Components backfillHarness(final Path base, final RecordingBackfillClient peer) throws Exception {
final TopicRegistry registry = TopicRegistry.builder()
.topic("t", BrokerApi.Message.getDescriptor())
.build();
final InMemoryOffsetStore offsets = new InMemoryOffsetStore(base.resolve("offsets"));
final JournaledLogMetadataStore metadata = new JournaledLogMetadataStore(base.resolve("meta"));

final EpochPlacement p0 = new EpochPlacement(0L, List.of(0, 1), 1);
metadata.bootstrapIfAbsent(0, p0, 0L);
final EpochPlacement p1 = new EpochPlacement(1L, List.of(0, 1), 1);
metadata.sealAndCreateEpoch(0, 0L, peer.sealedEnd(), p1, 1L, 1L);

final Map<Integer, RemoteBrokerClient> clients = Map.of(1, peer);
final AdaptiveReplicator replicator = new AdaptiveReplicator(1, clients, 50);

final List<Member> members = List.of(
new Member(0, BrokerRole.PERSISTENCE, new java.net.InetSocketAddress("localhost", 0), System.currentTimeMillis(), 1),
new Member(1, BrokerRole.PERSISTENCE, new java.net.InetSocketAddress("localhost", 0), System.currentTimeMillis(), 1)
);
final ReplicaSetResolver resolver = new ReplicaSetResolver(2, () -> members);

final ClusteredIngress ingress = ClusteredIngress.create(
registry,
(key, total) -> 0,
1,
0,
2,
clients,
base.resolve("data"),
8,
new Blocking(),
512,
4,
false,
offsets,
BrokerRole.PERSISTENCE,
resolver,
replicator,
metadata
);
return new Components(ingress, offsets);
}

private void invokeBackfillTick(final ClusteredIngress ingress) throws Exception {
final var m = ClusteredIngress.class.getDeclaredMethod("backfillTick");
m.setAccessible(true);
m.invoke(ingress);
}

private static final class RecordingBackfillClient implements RemoteBrokerClient {
private final List<byte[]> data;
private final long sealedEnd;
private final CopyOnWriteArrayList<Long> offsets = new CopyOnWriteArrayList<>();

RecordingBackfillClient(final List<byte[]> data) {
this.data = data;
this.sealedEnd = data.size() - 1L;
}

long sealedEnd() {
return sealedEnd;
}

List<Long> offsets() {
return offsets;
}

@Override
public void sendMessage(final String topic, final byte[] key, final byte[] payload) {
// no-op for tests
}

@Override
public CompletableFuture<BrokerApi.ReplicationAck> sendEnvelopeWithAck(final BrokerApi.Envelope envelope) {
return CompletableFuture.completedFuture(
BrokerApi.ReplicationAck.newBuilder()
.setStatus(BrokerApi.ReplicationAck.Status.SUCCESS)
.build()
);
}

@Override
public CompletableFuture<BrokerApi.BackfillReply> sendBackfill(final BrokerApi.Envelope envelope) {
final BrokerApi.BackfillRequest req = envelope.getBackfill();
offsets.add(req.getOffset());

final long off = req.getOffset();
if (off >= data.size()) {
return CompletableFuture.completedFuture(
BrokerApi.BackfillReply.newBuilder()
.setPayload(ByteString.EMPTY)
.setEndOfEpoch(true)
.build()
);
}

final ByteArrayOutputStream out = new ByteArrayOutputStream();
int count = 0;
for (long i = off; i < data.size() && count < 1; i++) {
final byte[] rec = data.get((int) i);
out.write(rec.length & 0xFF);
out.write((rec.length >>> 8) & 0xFF);
out.write((rec.length >>> 16) & 0xFF);
out.write((rec.length >>> 24) & 0xFF);
out.write(rec, 0, rec.length);
count++;
}

final boolean end = (off + count) > sealedEnd;
return CompletableFuture.completedFuture(
BrokerApi.BackfillReply.newBuilder()
.setPayload(ByteString.copyFrom(out.toByteArray()))
.setEndOfEpoch(end)
.build()
);
}
}

private record Components(ClusteredIngress ingress, AutoCloseable offsets) implements AutoCloseable {
@Override
public void close() throws Exception {
Expand Down
Loading