diff --git a/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Kafka.scala b/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Kafka.scala index 8cb01821..f9fc52f8 100644 --- a/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Kafka.scala +++ b/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/Kafka.scala @@ -155,6 +155,7 @@ trait Kafka { this: Docker => s"KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${dockerNetwork.ip}:$kafkaPort", s"KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:$kafkaControllerPort", s"KAFKA_LOG_RETENTION_HOURS=${Int.MaxValue}", + s"KAFKA_MESSAGE_MAX_BYTES=${32_000_000}", "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1" ) .build() @@ -177,6 +178,8 @@ trait Kafka { this: Docker => private def createKafkaProducer(): KafkaProducer[Array[Byte], Array[Byte]] = { val producerProps = new Properties() + + producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 32_000_000) producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.endpoint) producerProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, diff --git a/stream-loader-vertica/src/main/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoder.scala b/stream-loader-vertica/src/main/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoder.scala index 64bb13f0..d1bb4bf4 100644 --- a/stream-loader-vertica/src/main/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoder.scala +++ b/stream-loader-vertica/src/main/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoder.scala @@ -48,7 +48,7 @@ trait NativeVerticaRecordEncoder[R] { object NativeVerticaRecordEncoder { private val MAX_COLUMN_BYTES = 65000 - private val MAX_COLUMN_LENGTH = MaxLength(MAX_COLUMN_BYTES, truncate = true) + private val DEFAULT_MAX_COLUMN_LENGTH = MaxLength(MAX_COLUMN_BYTES, truncate = true) /** * A macro derivation of the encoder for arbitrary case classes. @@ -63,7 +63,17 @@ object NativeVerticaRecordEncoder { case class Column(isNull: c.Tree, staticSize: c.Tree, writeExpr: c.Tree) def columnExpressions(getter: MethodSymbol, typeAnnotations: Seq[DataTypeEncodingAnnotation]): Column = { - def primitiveColumnExpressions(tpe: c.Type): Column = + + def maxLengthColumn(maxLength: MaxLength, suffix: String): Column = { + val method = TermName(s"writeVar$suffix") + val maxLen = maxLength.length + if (maxLength eq DEFAULT_MAX_COLUMN_LENGTH) { + c.warning(c.enclosingPosition, s"Column ${getter.name} of $recordType uses default max length of $maxLen.") + } + Column(q"false", q"-1", q"pw.$method(r, ${maxLength.length}, ${maxLength.truncate})") + } + + def primitiveColumnExpressions(tpe: c.Type): Column = { tpe match { case t if t =:= c.weakTypeOf[Boolean] => Column(q"false", q"1", q"pw.writeByte(if (r) 1 else 0)") case t if t =:= c.weakTypeOf[Byte] => Column(q"false", q"1", q"pw.writeByte(r)") @@ -79,34 +89,24 @@ object NativeVerticaRecordEncoder { case t if t =:= c.weakTypeOf[UUID] => Column(q"false", q"16", q"pw.writeUUID(r)") case t if t =:= c.weakTypeOf[String] => - val fl = typeAnnotations.collectFirst { case f: FixedLength => f } - if (fl.isDefined) { - if (fl.get.length > MAX_COLUMN_BYTES) - c.abort(c.enclosingPosition, s"String length can not exceed $MAX_COLUMN_BYTES") - Column(q"false", q"${fl.get.length}", q"pw.writeFixedString(r, ${fl.get.length}, ${fl.get.truncate})") - } else { - val ml = typeAnnotations.collectFirst { case m: MaxLength => m }.getOrElse(MAX_COLUMN_LENGTH) - if (ml.length > MAX_COLUMN_BYTES) - c.abort(c.enclosingPosition, s"String length can not exceed $MAX_COLUMN_BYTES") - Column(q"false", q"-1", q"pw.writeVarString(r, ${ml.length}, ${ml.truncate})") - } + typeAnnotations + .collectFirst { + case fl: FixedLength => + Column(q"false", q"${fl.length}", q"pw.writeFixedString(r, ${fl.length}, ${fl.truncate})") + case ml: MaxLength => maxLengthColumn(ml, "String") + case un => c.abort(c.enclosingPosition, s"Unexpected $un on field ${getter.name} for $recordType") + } + .getOrElse(maxLengthColumn(DEFAULT_MAX_COLUMN_LENGTH, "String")) case t if t =:= c.weakTypeOf[Array[Byte]] => - val fl = typeAnnotations.collectFirst { case f: FixedLength => f } - if (fl.isDefined) { - if (fl.get.length > MAX_COLUMN_BYTES) - c.abort(c.enclosingPosition, s"Byte array length can not exceed $MAX_COLUMN_BYTES") - Column( - q"false", - q"${fl.get.length}", - q"pw.writeFixedByteArray(r, ${fl.get.length}, ${fl.get.truncate}, 0)" - ) - } else { - val ml = typeAnnotations.collectFirst { case m: MaxLength => m }.getOrElse(MAX_COLUMN_LENGTH) - if (ml.length > MAX_COLUMN_BYTES) - c.abort(c.enclosingPosition, s"Byte array length can not exceed $MAX_COLUMN_BYTES") - Column(q"false", q"-1", q"pw.writeVarByteArray(r, ${ml.length}, ${ml.truncate})") - } + typeAnnotations + .collectFirst { + case fl: FixedLength => + Column(q"false", q"${fl.length}", q"pw.writeFixedByteArray(r, ${fl.length}, ${fl.truncate}, 0)") + case ml: MaxLength => maxLengthColumn(ml, "ByteArray") + case un => c.abort(c.enclosingPosition, s"Unexpected $un on field ${getter.name} for $recordType") + } + .getOrElse(maxLengthColumn(DEFAULT_MAX_COLUMN_LENGTH, "ByteArray")) case t if t =:= c.weakTypeOf[BigDecimal] => val enc = typeAnnotations.collectFirst { case e @ DecimalEncoding(_, _) => e } @@ -119,6 +119,7 @@ object NativeVerticaRecordEncoder { q"implicitly[_root_.com.adform.streamloader.vertica.file.native.NativeVerticaTypeEncoder[${t.finalResultType}]]" Column(q"false", q"$nte.staticSize", q"$nte.write(r, pw)") } + } getter.returnType match { case t if t.typeConstructor =:= c.weakTypeOf[Option[_]].typeConstructor => diff --git a/stream-loader-vertica/src/test/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoderTest.scala b/stream-loader-vertica/src/test/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoderTest.scala index f65ad922..9b0e59cb 100644 --- a/stream-loader-vertica/src/test/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoderTest.scala +++ b/stream-loader-vertica/src/test/scala/com/adform/streamloader/vertica/file/native/NativeVerticaRecordEncoderTest.scala @@ -206,4 +206,51 @@ class NativeVerticaRecordEncoderTest extends AnyFunSpec with Matchers { ) } } + case class LargeFieldRecord( + largeStringFixed: String @FixedLength(100000), + largeStringMax: String @MaxLength(100000), + largeStringMaxOpt: Option[String] @MaxLength(100000), + largeByteArrayFixed: Array[Byte] @FixedLength(100000), + largeByteArrayMax: Array[Byte] @MaxLength(100000), + largeByteArrayMaxOpt: Option[Array[Byte]] @MaxLength(100000) + ) + + it("should handle large string and byte array fields exceeding default 65000 byte limit (if annotated)") { + val encoder = encoderFor[LargeFieldRecord] + + encoder.staticColumnSizes shouldEqual Array(100000, -1, -1, 100000, -1, -1) + } + + it("should correctly write large string fields with @FixedLength annotation") { + val (testWriter, expectedWriter) = (new BufferPrimitiveWriter, new BufferPrimitiveWriter) + val largeString = "x" * 80000 + + encoderFor[LargeFieldRecord].write( + LargeFieldRecord( + largeString, + largeString, + Some(largeString), + largeString.getBytes("UTF-8"), + largeString.getBytes("UTF-8"), + Some(largeString.getBytes("UTF-8")) + ), + testWriter + ) + + expectedWriter.writeFixedString(largeString, 100000, truncate = true) + expectedWriter.writeVarString(largeString, 100000, truncate = true) + expectedWriter.writeVarString(largeString, 100000, truncate = true) + expectedWriter.writeFixedByteArray(largeString.getBytes("UTF-8"), 100000, truncate = true, padWith = 0) + expectedWriter.writeVarByteArray(largeString.getBytes("UTF-8"), 100000, truncate = true) + expectedWriter.writeVarByteArray(largeString.getBytes("UTF-8"), 100000, truncate = true) + + testWriter.buffer.toByteArray shouldEqual expectedWriter.buffer.toByteArray + } + + it("should fail deriving encoder when field has unexpected annotation") { + assertTypeError(""" + case class UnexpectedAnnotationRecord(a: String @DecimalEncoding(18, 6)) + implicitly[NativeVerticaRecordEncoder[UnexpectedAnnotationRecord]] + """) + } }