From bd389c510f2b7bdab529739e7f7ba3035f7e9776 Mon Sep 17 00:00:00 2001 From: Jon Morra Date: Wed, 28 Jun 2017 12:00:47 -0700 Subject: [PATCH 1/3] Getting user defined types workign --- .../spark/avro/AvroOutputWriterFactory.scala | 2 +- .../databricks/spark/avro/DefaultSource.scala | 3 +-- .../apache}/spark/avro/AvroOutputWriter.scala | 23 +++++++++---------- .../apache}/spark/avro/SchemaConverters.scala | 17 ++++++++------ .../com/databricks/spark/avro/AvroSuite.scala | 2 +- 5 files changed, 24 insertions(+), 23 deletions(-) rename src/main/scala/{com/databricks => org/apache}/spark/avro/AvroOutputWriter.scala (93%) rename src/main/scala/{com/databricks => org/apache}/spark/avro/SchemaConverters.scala (98%) diff --git a/src/main/scala/com/databricks/spark/avro/AvroOutputWriterFactory.scala b/src/main/scala/com/databricks/spark/avro/AvroOutputWriterFactory.scala index 3f3cbf07..d1365e13 100644 --- a/src/main/scala/com/databricks/spark/avro/AvroOutputWriterFactory.scala +++ b/src/main/scala/com/databricks/spark/avro/AvroOutputWriterFactory.scala @@ -17,7 +17,7 @@ package com.databricks.spark.avro import org.apache.hadoop.mapreduce.TaskAttemptContext - +import org.apache.spark.avro.AvroOutputWriter import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} import org.apache.spark.sql.types.StructType diff --git a/src/main/scala/com/databricks/spark/avro/DefaultSource.scala b/src/main/scala/com/databricks/spark/avro/DefaultSource.scala index bfbadd7c..4835cdb5 100644 --- a/src/main/scala/com/databricks/spark/avro/DefaultSource.scala +++ b/src/main/scala/com/databricks/spark/avro/DefaultSource.scala @@ -21,7 +21,6 @@ import java.net.URI import java.util.zip.Deflater import scala.util.control.NonFatal - import com.databricks.spark.avro.DefaultSource.{AvroSchema, IgnoreFilesWithoutExtensionProperty, SerializableConfiguration} import com.esotericsoftware.kryo.{Kryo, KryoSerializable} import com.esotericsoftware.kryo.io.{Input, Output} @@ -34,8 +33,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapreduce.Job import org.slf4j.LoggerFactory - import org.apache.spark.TaskContext +import org.apache.spark.avro.SchemaConverters import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder diff --git a/src/main/scala/com/databricks/spark/avro/AvroOutputWriter.scala b/src/main/scala/org/apache/spark/avro/AvroOutputWriter.scala similarity index 93% rename from src/main/scala/com/databricks/spark/avro/AvroOutputWriter.scala rename to src/main/scala/org/apache/spark/avro/AvroOutputWriter.scala index 297c39d6..9bab1a21 100644 --- a/src/main/scala/com/databricks/spark/avro/AvroOutputWriter.scala +++ b/src/main/scala/org/apache/spark/avro/AvroOutputWriter.scala @@ -14,31 +14,29 @@ * limitations under the License. */ -package com.databricks.spark.avro +package org.apache.spark.avro import java.io.{IOException, OutputStream} import java.nio.ByteBuffer -import java.sql.Timestamp -import java.sql.Date +import java.sql.{Date, Timestamp} import java.util.HashMap -import org.apache.hadoop.fs.Path -import scala.collection.immutable.Map - import org.apache.avro.generic.GenericData.Record import org.apache.avro.generic.GenericRecord -import org.apache.avro.{Schema, SchemaBuilder} import org.apache.avro.mapred.AvroKey import org.apache.avro.mapreduce.AvroKeyOutputFormat +import org.apache.avro.{Schema, SchemaBuilder} +import org.apache.hadoop.fs.Path import org.apache.hadoop.io.NullWritable -import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext, TaskAttemptID} - +import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext} import org.apache.spark.sql.Row import org.apache.spark.sql.execution.datasources.OutputWriter import org.apache.spark.sql.types._ +import scala.collection.immutable.Map + // NOTE: This class is instantiated and used on executor side only, no need to be serializable. -private[avro] class AvroOutputWriter( +class AvroOutputWriter( path: String, context: TaskAttemptContext, schema: StructType, @@ -87,8 +85,8 @@ private[avro] class AvroOutputWriter( case bytes: Array[Byte] => ByteBuffer.wrap(bytes) } case ByteType | ShortType | IntegerType | LongType | - FloatType | DoubleType | StringType | BooleanType => identity - case _: DecimalType => (item: Any) => if (item == null) null else item.toString + FloatType | DoubleType | BooleanType => identity + case _: DecimalType | StringType => (item: Any) => if (item == null) null else item.toString case TimestampType => (item: Any) => if (item == null) null else item.asInstanceOf[Timestamp].getTime case DateType => (item: Any) => @@ -145,6 +143,7 @@ private[avro] class AvroOutputWriter( record } } + case t: UserDefinedType[_] => createConverterToAvro(t.sqlType, structName, recordNamespace) } } } diff --git a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala b/src/main/scala/org/apache/spark/avro/SchemaConverters.scala similarity index 98% rename from src/main/scala/com/databricks/spark/avro/SchemaConverters.scala rename to src/main/scala/org/apache/spark/avro/SchemaConverters.scala index 7f8e20f4..fe498ae1 100644 --- a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala +++ b/src/main/scala/org/apache/spark/avro/SchemaConverters.scala @@ -13,21 +13,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.databricks.spark.avro +package org.apache.spark.avro import java.nio.ByteBuffer -import scala.collection.JavaConverters._ - +import org.apache.avro.Schema.Type._ +import org.apache.avro.SchemaBuilder._ import org.apache.avro.generic.GenericData.Fixed import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.avro.{Schema, SchemaBuilder} -import org.apache.avro.SchemaBuilder._ -import org.apache.avro.Schema.Type._ - import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.types._ +import scala.collection.JavaConverters._ + /** * This object contains method that are used to convert sparkSQL schemas to avro schemas and vice * versa. @@ -137,7 +136,7 @@ object SchemaConverters { * @param targetSqlType Target catalyst sql type after the conversion. * @return returns a converter function to convert row in avro format to GenericRow of catalyst. */ - private[avro] def createConverterToSQL( + def createConverterToSQL( sourceAvroSchema: Schema, targetSqlType: DataType): AnyRef => AnyRef = { @@ -346,6 +345,8 @@ object SchemaConverters { schemaBuilder.record(structName).namespace(recordNamespace), recordNamespace) + case t: UserDefinedType[_] => convertTypeToAvro(t.sqlType, schemaBuilder, structName, recordNamespace) + case other => throw new IncompatibleSchemaException(s"Unexpected type $dataType.") } } @@ -390,6 +391,8 @@ object SchemaConverters { newFieldBuilder.record(structName).namespace(recordNamespace), recordNamespace) + case t: UserDefinedType[_] => convertFieldTypeToAvro(t.sqlType, newFieldBuilder, structName, recordNamespace) + case other => throw new IncompatibleSchemaException(s"Unexpected type $dataType.") } } diff --git a/src/test/scala/com/databricks/spark/avro/AvroSuite.scala b/src/test/scala/com/databricks/spark/avro/AvroSuite.scala index 4843ad46..22fe58e6 100644 --- a/src/test/scala/com/databricks/spark/avro/AvroSuite.scala +++ b/src/test/scala/com/databricks/spark/avro/AvroSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.SparkContext import org.apache.spark.sql._ import org.apache.spark.sql.types._ import org.scalatest.{BeforeAndAfterAll, FunSuite} -import com.databricks.spark.avro.SchemaConverters.IncompatibleSchemaException +import org.apache.spark.avro.SchemaConverters.IncompatibleSchemaException class AvroSuite extends FunSuite with BeforeAndAfterAll { val episodesFile = "src/test/resources/episodes.avro" From 1a1b8d4076ede431a4561e42289dc99c939b9c41 Mon Sep 17 00:00:00 2001 From: Jon Morra Date: Wed, 28 Jun 2017 12:36:10 -0700 Subject: [PATCH 2/3] Making build pass --- .../scala/org/apache/spark/avro/SchemaConverters.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/scala/org/apache/spark/avro/SchemaConverters.scala b/src/main/scala/org/apache/spark/avro/SchemaConverters.scala index fe498ae1..32a19a3c 100644 --- a/src/main/scala/org/apache/spark/avro/SchemaConverters.scala +++ b/src/main/scala/org/apache/spark/avro/SchemaConverters.scala @@ -391,7 +391,12 @@ object SchemaConverters { newFieldBuilder.record(structName).namespace(recordNamespace), recordNamespace) - case t: UserDefinedType[_] => convertFieldTypeToAvro(t.sqlType, newFieldBuilder, structName, recordNamespace) + case t: UserDefinedType[_] => convertFieldTypeToAvro( + t.sqlType, + newFieldBuilder, + structName, + recordNamespace + ) case other => throw new IncompatibleSchemaException(s"Unexpected type $dataType.") } From f3b81ce1de8ecaf68a85b14789cf84e6040d2377 Mon Sep 17 00:00:00 2001 From: Jon Morra Date: Wed, 28 Jun 2017 12:39:16 -0700 Subject: [PATCH 3/3] Making build pass --- .../scala/org/apache/spark/avro/SchemaConverters.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/scala/org/apache/spark/avro/SchemaConverters.scala b/src/main/scala/org/apache/spark/avro/SchemaConverters.scala index 32a19a3c..dfaaf3d8 100644 --- a/src/main/scala/org/apache/spark/avro/SchemaConverters.scala +++ b/src/main/scala/org/apache/spark/avro/SchemaConverters.scala @@ -345,7 +345,12 @@ object SchemaConverters { schemaBuilder.record(structName).namespace(recordNamespace), recordNamespace) - case t: UserDefinedType[_] => convertTypeToAvro(t.sqlType, schemaBuilder, structName, recordNamespace) + case t: UserDefinedType[_] => convertTypeToAvro( + t.sqlType, + schemaBuilder, + structName, + recordNamespace + ) case other => throw new IncompatibleSchemaException(s"Unexpected type $dataType.") }