diff --git a/src/main/java/io/ringbroker/broker/ingress/Ingress.java b/src/main/java/io/ringbroker/broker/ingress/Ingress.java index f320403..3a05253 100644 --- a/src/main/java/io/ringbroker/broker/ingress/Ingress.java +++ b/src/main/java/io/ringbroker/broker/ingress/Ingress.java @@ -1,6 +1,9 @@ package io.ringbroker.broker.ingress; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; +import com.google.protobuf.InvalidProtocolBufferException; import io.ringbroker.core.ring.RingBuffer; import io.ringbroker.ledger.orchestrator.LedgerOrchestrator; import io.ringbroker.registry.TopicRegistry; @@ -90,18 +93,22 @@ public void publish(final String topic, final byte[] payload) { */ public void publish(final String topic, final int retries, final byte[] rawPayload) { // 1) validate base topic - if (!registry.contains(topic)) throw new IllegalArgumentException("topic not registered: " + topic); + if (!registry.contains(topic)) { + throw new IllegalArgumentException("topic not registered: " + topic); + } - // 2) DLQ routing + // 2) DLQ routing based on retry count String outTopic = retries > MAX_RETRIES ? topic + ".DLQ" : topic; - if (!registry.contains(outTopic)) throw new IllegalArgumentException("topic not registered: " + outTopic); + if (!registry.contains(outTopic)) { + throw new IllegalArgumentException("topic not registered: " + outTopic); + } - // 3) schema-validate - try { - DynamicMessage.parseFrom(registry.descriptor(outTopic), rawPayload); - } catch (final Exception ex) { + // 3) schema‐validate without throwing out + if (!isWireValid(rawPayload, registry.descriptor(outTopic))) { outTopic = topic + ".DLQ"; - if (!registry.contains(outTopic)) throw new IllegalArgumentException("DLQ not registered: " + outTopic); + if (!registry.contains(outTopic)) { + throw new IllegalArgumentException("DLQ not registered: " + outTopic); + } } // 4) enqueue without allocation; spin if queue is momentarily full @@ -158,6 +165,29 @@ public void close() throws IOException { this.segments.writable().close(); } + /** + * Returns true if payload is a well‐formed instance of the given descriptor’s message + * (i.e. no truncated stream, bad varint, negative length, etc.). Never throws. + */ + private static boolean isWireValid(byte[] raw, final Descriptors.Descriptor descriptor) { + CodedInputStream in = CodedInputStream.newInstance(raw); + try { + // Try to merge into a DynamicMessage.Builder; this will throw on any wire‐format error. + DynamicMessage.newBuilder(descriptor) + .mergeFrom(in) + .buildPartial(); + // Also ensure we consumed exactly all bytes + return in.isAtEnd(); + } catch (InvalidProtocolBufferException e) { + return false; + } + + catch (IOException exception) { + return false; + } + } + + /* * Allocation‑free bounded lock‑free multi‑producer / multi‑consumer queue * (heavily simplified Vyukov algorithm). diff --git a/src/main/java/test/SanityCheckMain.java b/src/main/java/test/SanityCheckMain.java index 44e66df..9523f9d 100644 --- a/src/main/java/test/SanityCheckMain.java +++ b/src/main/java/test/SanityCheckMain.java @@ -7,6 +7,7 @@ import io.ringbroker.offset.InMemoryOffsetStore; import io.ringbroker.proto.test.EventsProto; import io.ringbroker.registry.TopicRegistry; +import lombok.extern.slf4j.Slf4j; import java.io.DataInputStream; import java.io.EOFException; @@ -15,7 +16,10 @@ import java.nio.channels.Channels; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; -import java.nio.file.*; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -28,6 +32,7 @@ * – verify the in-memory subscription sees every record * – replay every segment on disk and confirm the same IDs partition-by-partition */ +@Slf4j public class SanityCheckMain { /* ---------- test parameters ---------- */ @@ -42,11 +47,7 @@ public class SanityCheckMain { private static final String GROUP = "sanity-latch"; private static final Path DATA = Paths.get("data"); - /* ---------- main ---------- */ - public static void main(String[] args) throws Exception { - - /* 0) clean data dir ------------------------------------------------- */ if (Files.exists(DATA)) { try (Stream w = Files.walk(DATA)) { w.sorted(Comparator.reverseOrder()) @@ -55,7 +56,6 @@ public static void main(String[] args) throws Exception { } Files.createDirectories(DATA); - /* 1) registry + broker --------------------------------------------- */ TopicRegistry registry = new TopicRegistry.Builder() .topic(TOPIC, EventsProto.OrderCreated.getDescriptor()) .build(); @@ -76,17 +76,14 @@ public static void main(String[] args) throws Exception { new InMemoryOffsetStore() ); - /* expected IDs per partition --------------------------------------- */ RoundRobinPartitioner psel = new RoundRobinPartitioner(); Map> expected = new HashMap<>(); for (int p = 0; p < PARTITIONS; p++) expected.put(p, new HashSet<>()); - /* latch for subscriber --------------------------------------------- */ CountDownLatch latch = new CountDownLatch(TOTAL_MSGS); ingress.subscribeTopic(TOPIC, GROUP, (seq, payload) -> latch.countDown()); - /* 2) publish ------------------------------------------------------- */ - System.out.println("=== publishing " + TOTAL_MSGS + " messages ==="); + log.info("=== publishing {} messages ===", TOTAL_MSGS); for (int i = 0; i < TOTAL_MSGS; i++) { String id = "msg-" + i; byte[] key = id.getBytes(StandardCharsets.UTF_8); @@ -102,21 +99,15 @@ public static void main(String[] args) throws Exception { ingress.publish(TOPIC, key, 0, evt.toByteArray()); } - /* writers push tail batches immediately (patch already in place) */ - - /* 3) wait for subscriber ------------------------------------------- */ - System.out.println("waiting for in-memory delivery…"); + log.info("waiting for in-memory delivery…"); if (!latch.await(30, TimeUnit.SECONDS)) { - System.err.printf("❌ saw only %d/%d messages%n", - TOTAL_MSGS - latch.getCount(), TOTAL_MSGS); + log.error("saw only {}/{} messages", TOTAL_MSGS - latch.getCount(), TOTAL_MSGS); System.exit(1); } - System.out.println("✅ all messages delivered in-memory"); + log.info("all messages delivered in-memory"); - /* 4) shutdown (forces fsync) --------------------------------------- */ ingress.shutdown(); - /* 5) replay segments per partition --------------------------------- */ Map> seen = new HashMap<>(); for (int p = 0; p < PARTITIONS; p++) { seen.put(p, new HashSet<>()); @@ -130,27 +121,26 @@ public static void main(String[] args) throws Exception { } } - /* 6) compare ------------------------------------------------------- */ boolean pass = true; - System.out.println("\n=== partition results ==="); + log.info("\n=== partition results ==="); for (int p = 0; p < PARTITIONS; p++) { Set exp = expected.get(p), got = seen.get(p); - System.out.printf("partition-%2d: exp=%3d, got=%3d%n", p, exp.size(), got.size()); + log.info("partition-{}: exp={}, got={}", p, exp.size(), got.size()); if (!exp.equals(got)) { + log.error(" missing: {}", diff(exp, got)); + log.error(" extra : {}", diff(got, exp)); pass = false; - System.err.println(" missing: " + diff(exp, got)); - System.err.println(" extra : " + diff(got, exp)); } } - System.out.println(pass - ? "\n✅ SANITY-PASS: all routing + writes succeeded." - : "\n❌ SANITY-FAIL: see mismatches above."); - if (!pass) System.exit(1); + if (pass) { + log.info("\nSANITY-PASS: all routing + writes succeeded."); + } else { + log.error("\nSANITY-FAIL: see mismatches above."); + System.exit(1); + } } - /* ---------- helpers --------------------------------------------------- */ - /** read a 32-bit little-endian int from the stream */ private static int readIntLE(DataInputStream in) throws IOException { int b0 = in.readUnsignedByte(); @@ -181,7 +171,7 @@ private static void parseSegmentLittleEndian(Path seg, Set outSet) throw CRC32 crc = new CRC32(); crc.update(buf, 0, len); if ((int) crc.getValue() != storedCrc) { - System.err.printf("CRC mismatch in %s%n", seg.getFileName()); + log.error("CRC mismatch in {}", seg.getFileName()); break; }