From 91a07a11235be19409724f97cb4b570bec031ccf Mon Sep 17 00:00:00 2001 From: "j.zielinski" Date: Thu, 2 Jul 2026 01:34:53 +0200 Subject: [PATCH 1/3] MAX_COLUMN_BYTES is applied only by default only if no annotation is applied on a field --- .../native/NativeVerticaRecordEncoder.scala | 42 +++++++------------ .../NativeVerticaRecordEncoderTest.scala | 41 ++++++++++++++++++ 2 files changed, 56 insertions(+), 27 deletions(-) diff --git a/stream-loader-vertica/src/main/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoder.scala b/stream-loader-vertica/src/main/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoder.scala index 64bb13f0..42d7d726 100644 --- a/stream-loader-vertica/src/main/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoder.scala +++ b/stream-loader-vertica/src/main/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoder.scala @@ -48,7 +48,7 @@ trait NativeVerticaRecordEncoder[R] { object NativeVerticaRecordEncoder { private val MAX_COLUMN_BYTES = 65000 - private val MAX_COLUMN_LENGTH = MaxLength(MAX_COLUMN_BYTES, truncate = true) + private val DEFAULT_MAX_COLUMN_LENGTH = MaxLength(MAX_COLUMN_BYTES, truncate = true) /** * A macro derivation of the encoder for arbitrary case classes. @@ -79,34 +79,22 @@ object NativeVerticaRecordEncoder { case t if t =:= c.weakTypeOf[UUID] => Column(q"false", q"16", q"pw.writeUUID(r)") case t if t =:= c.weakTypeOf[String] => - val fl = typeAnnotations.collectFirst { case f: FixedLength => f } - if (fl.isDefined) { - if (fl.get.length > MAX_COLUMN_BYTES) - c.abort(c.enclosingPosition, s"String length can not exceed $MAX_COLUMN_BYTES") - Column(q"false", q"${fl.get.length}", q"pw.writeFixedString(r, ${fl.get.length}, ${fl.get.truncate})") - } else { - val ml = typeAnnotations.collectFirst { case m: MaxLength => m }.getOrElse(MAX_COLUMN_LENGTH) - if (ml.length > MAX_COLUMN_BYTES) - c.abort(c.enclosingPosition, s"String length can not exceed $MAX_COLUMN_BYTES") - Column(q"false", q"-1", q"pw.writeVarString(r, ${ml.length}, ${ml.truncate})") - } + val annotations = if (typeAnnotations.isEmpty) Seq(DEFAULT_MAX_COLUMN_LENGTH) else typeAnnotations + annotations.collectFirst { + case fl: FixedLength => + Column(q"false", q"${fl.length}", q"pw.writeFixedString(r, ${fl.length}, ${fl.truncate})") + case ml: MaxLength => + Column(q"false", q"-1", q"pw.writeVarString(r, ${ml.length}, ${ml.truncate})") + }.get case t if t =:= c.weakTypeOf[Array[Byte]] => - val fl = typeAnnotations.collectFirst { case f: FixedLength => f } - if (fl.isDefined) { - if (fl.get.length > MAX_COLUMN_BYTES) - c.abort(c.enclosingPosition, s"Byte array length can not exceed $MAX_COLUMN_BYTES") - Column( - q"false", - q"${fl.get.length}", - q"pw.writeFixedByteArray(r, ${fl.get.length}, ${fl.get.truncate}, 0)" - ) - } else { - val ml = typeAnnotations.collectFirst { case m: MaxLength => m }.getOrElse(MAX_COLUMN_LENGTH) - if (ml.length > MAX_COLUMN_BYTES) - c.abort(c.enclosingPosition, s"Byte array length can not exceed $MAX_COLUMN_BYTES") - Column(q"false", q"-1", q"pw.writeVarByteArray(r, ${ml.length}, ${ml.truncate})") - } + val annotations = if (typeAnnotations.isEmpty) Seq(DEFAULT_MAX_COLUMN_LENGTH) else typeAnnotations + annotations.collectFirst { + case fl: FixedLength => + Column(q"false", q"${fl.length}", q"pw.writeFixedByteArray(r, ${fl.length}, ${fl.truncate}, 0)") + case ml: MaxLength => + Column(q"false", q"-1", q"pw.writeVarByteArray(r, ${ml.length}, ${ml.truncate})") + }.get case t if t =:= c.weakTypeOf[BigDecimal] => val enc = typeAnnotations.collectFirst { case e @ DecimalEncoding(_, _) => e } diff --git a/stream-loader-vertica/src/test/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoderTest.scala b/stream-loader-vertica/src/test/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoderTest.scala index f65ad922..13ba6b99 100644 --- a/stream-loader-vertica/src/test/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoderTest.scala +++ b/stream-loader-vertica/src/test/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoderTest.scala @@ -206,4 +206,45 @@ class NativeVerticaRecordEncoderTest extends AnyFunSpec with Matchers { ) } } + + case class LargeFieldRecord( + largeStringFixed: String @FixedLength(100000), + largeStringMax: String @MaxLength(100000), + largeStringMaxOpt: Option[String] @MaxLength(100000), + largeByteArrayFixed: Array[Byte] @FixedLength(100000), + largeByteArrayMax: Array[Byte] @MaxLength(100000), + largeByteArrayMaxOpt: Option[Array[Byte]] @MaxLength(100000) + ) + + it("should handle large string and byte array fields exceeding default 65000 byte limit (if annotated)") { + val encoder = encoderFor[LargeFieldRecord] + + encoder.staticColumnSizes shouldEqual Array(100000, -1, -1, 100000, -1, -1) + } + + it("should correctly write large string fields with @FixedLength annotation") { + val (testWriter, expectedWriter) = (new BufferPrimitiveWriter, new BufferPrimitiveWriter) + val largeString = "x" * 80000 + + encoderFor[LargeFieldRecord].write( + LargeFieldRecord( + largeString, + largeString, + Some(largeString), + largeString.getBytes("UTF-8"), + largeString.getBytes("UTF-8"), + Some(largeString.getBytes("UTF-8")) + ), + testWriter + ) + + expectedWriter.writeFixedString(largeString, 100000, truncate = true) + expectedWriter.writeVarString(largeString, 100000, truncate = true) + expectedWriter.writeVarString(largeString, 100000, truncate = true) + expectedWriter.writeFixedByteArray(largeString.getBytes("UTF-8"), 100000, truncate = true, padWith = 0) + expectedWriter.writeVarByteArray(largeString.getBytes("UTF-8"), 100000, truncate = true) + expectedWriter.writeVarByteArray(largeString.getBytes("UTF-8"), 100000, truncate = true) + + testWriter.buffer.toByteArray shouldEqual expectedWriter.buffer.toByteArray + } } From 58108ecd126ad127822a63923c8059c10e796a47 Mon Sep 17 00:00:00 2001 From: "j.zielinski" Date: Fri, 3 Jul 2026 14:58:38 +0200 Subject: [PATCH 2/3] native vertica encoder - handle an unexpected annotation --- .../native/NativeVerticaRecordEncoder.scala | 43 ++++++++++++------- .../NativeVerticaRecordEncoderTest.scala | 8 +++- 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/stream-loader-vertica/src/main/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoder.scala b/stream-loader-vertica/src/main/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoder.scala index 42d7d726..d1bb4bf4 100644 --- a/stream-loader-vertica/src/main/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoder.scala +++ b/stream-loader-vertica/src/main/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoder.scala @@ -63,7 +63,17 @@ object NativeVerticaRecordEncoder { case class Column(isNull: c.Tree, staticSize: c.Tree, writeExpr: c.Tree) def columnExpressions(getter: MethodSymbol, typeAnnotations: Seq[DataTypeEncodingAnnotation]): Column = { - def primitiveColumnExpressions(tpe: c.Type): Column = + + def maxLengthColumn(maxLength: MaxLength, suffix: String): Column = { + val method = TermName(s"writeVar$suffix") + val maxLen = maxLength.length + if (maxLength eq DEFAULT_MAX_COLUMN_LENGTH) { + c.warning(c.enclosingPosition, s"Column ${getter.name} of $recordType uses default max length of $maxLen.") + } + Column(q"false", q"-1", q"pw.$method(r, ${maxLength.length}, ${maxLength.truncate})") + } + + def primitiveColumnExpressions(tpe: c.Type): Column = { tpe match { case t if t =:= c.weakTypeOf[Boolean] => Column(q"false", q"1", q"pw.writeByte(if (r) 1 else 0)") case t if t =:= c.weakTypeOf[Byte] => Column(q"false", q"1", q"pw.writeByte(r)") @@ -79,22 +89,24 @@ object NativeVerticaRecordEncoder { case t if t =:= c.weakTypeOf[UUID] => Column(q"false", q"16", q"pw.writeUUID(r)") case t if t =:= c.weakTypeOf[String] => - val annotations = if (typeAnnotations.isEmpty) Seq(DEFAULT_MAX_COLUMN_LENGTH) else typeAnnotations - annotations.collectFirst { - case fl: FixedLength => - Column(q"false", q"${fl.length}", q"pw.writeFixedString(r, ${fl.length}, ${fl.truncate})") - case ml: MaxLength => - Column(q"false", q"-1", q"pw.writeVarString(r, ${ml.length}, ${ml.truncate})") - }.get + typeAnnotations + .collectFirst { + case fl: FixedLength => + Column(q"false", q"${fl.length}", q"pw.writeFixedString(r, ${fl.length}, ${fl.truncate})") + case ml: MaxLength => maxLengthColumn(ml, "String") + case un => c.abort(c.enclosingPosition, s"Unexpected $un on field ${getter.name} for $recordType") + } + .getOrElse(maxLengthColumn(DEFAULT_MAX_COLUMN_LENGTH, "String")) case t if t =:= c.weakTypeOf[Array[Byte]] => - val annotations = if (typeAnnotations.isEmpty) Seq(DEFAULT_MAX_COLUMN_LENGTH) else typeAnnotations - annotations.collectFirst { - case fl: FixedLength => - Column(q"false", q"${fl.length}", q"pw.writeFixedByteArray(r, ${fl.length}, ${fl.truncate}, 0)") - case ml: MaxLength => - Column(q"false", q"-1", q"pw.writeVarByteArray(r, ${ml.length}, ${ml.truncate})") - }.get + typeAnnotations + .collectFirst { + case fl: FixedLength => + Column(q"false", q"${fl.length}", q"pw.writeFixedByteArray(r, ${fl.length}, ${fl.truncate}, 0)") + case ml: MaxLength => maxLengthColumn(ml, "ByteArray") + case un => c.abort(c.enclosingPosition, s"Unexpected $un on field ${getter.name} for $recordType") + } + .getOrElse(maxLengthColumn(DEFAULT_MAX_COLUMN_LENGTH, "ByteArray")) case t if t =:= c.weakTypeOf[BigDecimal] => val enc = typeAnnotations.collectFirst { case e @ DecimalEncoding(_, _) => e } @@ -107,6 +119,7 @@ object NativeVerticaRecordEncoder { q"implicitly[_root_.com.adform.streamloader.vertica.file.native.NativeVerticaTypeEncoder[${t.finalResultType}]]" Column(q"false", q"$nte.staticSize", q"$nte.write(r, pw)") } + } getter.returnType match { case t if t.typeConstructor =:= c.weakTypeOf[Option[_]].typeConstructor => diff --git a/stream-loader-vertica/src/test/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoderTest.scala b/stream-loader-vertica/src/test/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoderTest.scala index 13ba6b99..9b0e59cb 100644 --- a/stream-loader-vertica/src/test/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoderTest.scala +++ b/stream-loader-vertica/src/test/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoderTest.scala @@ -206,7 +206,6 @@ class NativeVerticaRecordEncoderTest extends AnyFunSpec with Matchers { ) } } - case class LargeFieldRecord( largeStringFixed: String @FixedLength(100000), largeStringMax: String @MaxLength(100000), @@ -247,4 +246,11 @@ class NativeVerticaRecordEncoderTest extends AnyFunSpec with Matchers { testWriter.buffer.toByteArray shouldEqual expectedWriter.buffer.toByteArray } + + it("should fail deriving encoder when field has unexpected annotation") { + assertTypeError(""" + case class UnexpectedAnnotationRecord(a: String @DecimalEncoding(18, 6)) + implicitly[NativeVerticaRecordEncoder[UnexpectedAnnotationRecord]] + """) + } } From 55480be6af312b44a9009f699561810cb6072ce1 Mon Sep 17 00:00:00 2001 From: "j.zielinski" Date: Sun, 5 Jul 2026 01:32:52 +0200 Subject: [PATCH 3/3] allow for bigger kafka msg size (32MB) --- .../test/scala/com/adform/streamloader/fixtures/Kafka.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Kafka.scala b/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Kafka.scala index 8cb01821..f9fc52f8 100644 --- a/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Kafka.scala +++ b/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Kafka.scala @@ -155,6 +155,7 @@ trait Kafka { this: Docker => s"KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${dockerNetwork.ip}:$kafkaPort", s"KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:$kafkaControllerPort", s"KAFKA_LOG_RETENTION_HOURS=${Int.MaxValue}", + s"KAFKA_MESSAGE_MAX_BYTES=${32_000_000}", "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1" ) .build() @@ -177,6 +178,8 @@ trait Kafka { this: Docker => private def createKafkaProducer(): KafkaProducer[Array[Byte], Array[Byte]] = { val producerProps = new Properties() + + producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 32_000_000) producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.endpoint) producerProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,