Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ trait Kafka { this: Docker =>
s"KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${dockerNetwork.ip}:$kafkaPort",
s"KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:$kafkaControllerPort",
s"KAFKA_LOG_RETENTION_HOURS=${Int.MaxValue}",
s"KAFKA_MESSAGE_MAX_BYTES=${32_000_000}",
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1"
)
.build()
Expand All @@ -177,6 +178,8 @@ trait Kafka { this: Docker =>

private def createKafkaProducer(): KafkaProducer[Array[Byte], Array[Byte]] = {
val producerProps = new Properties()

producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 32_000_000)
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.endpoint)
producerProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ trait NativeVerticaRecordEncoder[R] {
object NativeVerticaRecordEncoder {

private val MAX_COLUMN_BYTES = 65000
private val MAX_COLUMN_LENGTH = MaxLength(MAX_COLUMN_BYTES, truncate = true)
private val DEFAULT_MAX_COLUMN_LENGTH = MaxLength(MAX_COLUMN_BYTES, truncate = true)

/**
* A macro derivation of the encoder for arbitrary case classes.
Expand All @@ -63,7 +63,17 @@ object NativeVerticaRecordEncoder {
case class Column(isNull: c.Tree, staticSize: c.Tree, writeExpr: c.Tree)

def columnExpressions(getter: MethodSymbol, typeAnnotations: Seq[DataTypeEncodingAnnotation]): Column = {
def primitiveColumnExpressions(tpe: c.Type): Column =

def maxLengthColumn(maxLength: MaxLength, suffix: String): Column = {
val method = TermName(s"writeVar$suffix")
val maxLen = maxLength.length
if (maxLength eq DEFAULT_MAX_COLUMN_LENGTH) {
c.warning(c.enclosingPosition, s"Column ${getter.name} of $recordType uses default max length of $maxLen.")
}
Column(q"false", q"-1", q"pw.$method(r, ${maxLength.length}, ${maxLength.truncate})")
}

def primitiveColumnExpressions(tpe: c.Type): Column = {
tpe match {
case t if t =:= c.weakTypeOf[Boolean] => Column(q"false", q"1", q"pw.writeByte(if (r) 1 else 0)")
case t if t =:= c.weakTypeOf[Byte] => Column(q"false", q"1", q"pw.writeByte(r)")
Expand All @@ -79,34 +89,24 @@ object NativeVerticaRecordEncoder {
case t if t =:= c.weakTypeOf[UUID] => Column(q"false", q"16", q"pw.writeUUID(r)")

case t if t =:= c.weakTypeOf[String] =>
val fl = typeAnnotations.collectFirst { case f: FixedLength => f }
if (fl.isDefined) {
if (fl.get.length > MAX_COLUMN_BYTES)
c.abort(c.enclosingPosition, s"String length can not exceed $MAX_COLUMN_BYTES")
Column(q"false", q"${fl.get.length}", q"pw.writeFixedString(r, ${fl.get.length}, ${fl.get.truncate})")
} else {
val ml = typeAnnotations.collectFirst { case m: MaxLength => m }.getOrElse(MAX_COLUMN_LENGTH)
if (ml.length > MAX_COLUMN_BYTES)
c.abort(c.enclosingPosition, s"String length can not exceed $MAX_COLUMN_BYTES")
Column(q"false", q"-1", q"pw.writeVarString(r, ${ml.length}, ${ml.truncate})")
}
typeAnnotations
.collectFirst {
case fl: FixedLength =>
Column(q"false", q"${fl.length}", q"pw.writeFixedString(r, ${fl.length}, ${fl.truncate})")
case ml: MaxLength => maxLengthColumn(ml, "String")
case un => c.abort(c.enclosingPosition, s"Unexpected $un on field ${getter.name} for $recordType")
}
.getOrElse(maxLengthColumn(DEFAULT_MAX_COLUMN_LENGTH, "String"))

case t if t =:= c.weakTypeOf[Array[Byte]] =>
val fl = typeAnnotations.collectFirst { case f: FixedLength => f }
if (fl.isDefined) {
if (fl.get.length > MAX_COLUMN_BYTES)
c.abort(c.enclosingPosition, s"Byte array length can not exceed $MAX_COLUMN_BYTES")
Column(
q"false",
q"${fl.get.length}",
q"pw.writeFixedByteArray(r, ${fl.get.length}, ${fl.get.truncate}, 0)"
)
} else {
val ml = typeAnnotations.collectFirst { case m: MaxLength => m }.getOrElse(MAX_COLUMN_LENGTH)
if (ml.length > MAX_COLUMN_BYTES)
c.abort(c.enclosingPosition, s"Byte array length can not exceed $MAX_COLUMN_BYTES")
Column(q"false", q"-1", q"pw.writeVarByteArray(r, ${ml.length}, ${ml.truncate})")
}
typeAnnotations
.collectFirst {
case fl: FixedLength =>
Column(q"false", q"${fl.length}", q"pw.writeFixedByteArray(r, ${fl.length}, ${fl.truncate}, 0)")
case ml: MaxLength => maxLengthColumn(ml, "ByteArray")
case un => c.abort(c.enclosingPosition, s"Unexpected $un on field ${getter.name} for $recordType")
}
.getOrElse(maxLengthColumn(DEFAULT_MAX_COLUMN_LENGTH, "ByteArray"))

case t if t =:= c.weakTypeOf[BigDecimal] =>
val enc = typeAnnotations.collectFirst { case e @ DecimalEncoding(_, _) => e }
Expand All @@ -119,6 +119,7 @@ object NativeVerticaRecordEncoder {
q"implicitly[_root_.com.adform.streamloader.vertica.file.native.NativeVerticaTypeEncoder[${t.finalResultType}]]"
Column(q"false", q"$nte.staticSize", q"$nte.write(r, pw)")
}
}

getter.returnType match {
case t if t.typeConstructor =:= c.weakTypeOf[Option[_]].typeConstructor =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,51 @@ class NativeVerticaRecordEncoderTest extends AnyFunSpec with Matchers {
)
}
}
case class LargeFieldRecord(
largeStringFixed: String @FixedLength(100000),
largeStringMax: String @MaxLength(100000),
largeStringMaxOpt: Option[String] @MaxLength(100000),
largeByteArrayFixed: Array[Byte] @FixedLength(100000),
largeByteArrayMax: Array[Byte] @MaxLength(100000),
largeByteArrayMaxOpt: Option[Array[Byte]] @MaxLength(100000)
)

it("should handle large string and byte array fields exceeding default 65000 byte limit (if annotated)") {
val encoder = encoderFor[LargeFieldRecord]

encoder.staticColumnSizes shouldEqual Array(100000, -1, -1, 100000, -1, -1)
}

it("should correctly write large string fields with @FixedLength annotation") {
val (testWriter, expectedWriter) = (new BufferPrimitiveWriter, new BufferPrimitiveWriter)
val largeString = "x" * 80000

encoderFor[LargeFieldRecord].write(
LargeFieldRecord(
largeString,
largeString,
Some(largeString),
largeString.getBytes("UTF-8"),
largeString.getBytes("UTF-8"),
Some(largeString.getBytes("UTF-8"))
),
testWriter
)

expectedWriter.writeFixedString(largeString, 100000, truncate = true)
expectedWriter.writeVarString(largeString, 100000, truncate = true)
expectedWriter.writeVarString(largeString, 100000, truncate = true)
expectedWriter.writeFixedByteArray(largeString.getBytes("UTF-8"), 100000, truncate = true, padWith = 0)
expectedWriter.writeVarByteArray(largeString.getBytes("UTF-8"), 100000, truncate = true)
expectedWriter.writeVarByteArray(largeString.getBytes("UTF-8"), 100000, truncate = true)

testWriter.buffer.toByteArray shouldEqual expectedWriter.buffer.toByteArray
}

it("should fail deriving encoder when field has unexpected annotation") {
assertTypeError("""
case class UnexpectedAnnotationRecord(a: String @DecimalEncoding(18, 6))
implicitly[NativeVerticaRecordEncoder[UnexpectedAnnotationRecord]]
""")
}
}
Loading