One of the actions performed is: converting two fields (epoch seconds and nanos) to one field timestamp (and casting it to spark logical type TimestampType)
convertToMillis(df.col("seconds"), df.col("nanoseconds")).cast(TimestampType)
After this resulted avro files are uploaded to Redshift. The corresponding column in Redshift is TIMESTAMP.
During the upgrade of spark from 2.2.0 to 2.4.4 uploading to Redshift stopped working.
What has changed: in version 2.4.0 built-in Avro support was added to Spark SPARK-24768 (to replace databricks spark-avro library). As a result in the new version spark stores timestamp field as a long in microseconds, while in the old version in milliseconds. If the avro files are processed only by spark (old or new version), the change is invisible. However, if you use a vanilla avro library to read the data, the value will be 1000 bigger than expected.
Avro schema
"name": "Event",
"namespace": "test",
"type": "record",
"fields": [
{"name": "epochSeconds", "type": ["null", "long"], "default": null, "doc" : "epoch seconds"}
For spark 2.2 with the dependecy com.databricks:spark-avro_2.11:4.0.0
test("avro -> read avro as spark -> cast timestamp -> write via spark -> read as avro reader") {
val epochSeconds = System.currentTimeMillis() / 1000
val schema = getSchema
val record = createRecordWithEpochSeconds(schema, epochSeconds)
record.get("epochSeconds") shouldEqual epochSeconds
val tempSourceFile = File.createTempFile("timestamp-", ".avro")
writeRecord(schema, record, tempSourceFile)
//read avro file by spark
val df ="com.databricks.spark.avro").load(s"file://${tempSourceFile.getPath}")
val castedDf = df.withColumn("epochSeconds_casted", df.col("epochSeconds").cast(TimestampType))
//save casted column as an avro
val tempCastedAvroDestination = File.createTempFile("timestamp-", ".avro")
//read from casted avro
val tempCastedAvroDestinationFile = tempCastedAvroDestination.listFiles(new FilenameFilter() {
override def accept(dir: File, name: String): Boolean = name.endsWith(".avro") && name.startsWith("part")
val castedRecord = readOneRecordFromAvro(tempCastedAvroDestinationFile)
castedRecord.get("epochSeconds") shouldEqual epochSeconds
castedRecord.get("epochSeconds_casted") shouldEqual epochSeconds*1000
private def getSchema = new Schema.Parser().parse(new File(getClass.getResource("/event.avsc").getPath))
private def createRecordWithEpochSeconds(schema: Schema, epochSeconds: Long) =
new GenericRecordBuilder(schema).set("epochSeconds", epochSeconds).build()
private def readOneRecordFromAvro(destinationFile: File): GenericRecord = {
new DataFileReader[GenericRecord](destinationFile, new GenericDatumReader[GenericRecord])) {
reader =>
private def readOneRecordFromParquet(destinationFile: File): GenericRecord ={
val hadoopInputFile = HadoopInputFile.fromPath(new Path(s"file://${destinationFile.getPath}"), new Configuration)
AvroParquetReader.builder[GenericRecord](hadoopInputFile).build) {
reader =>
For spark 2.4.0 only small changes in the code are required and the databricks library needs to be replaced with org.apache.spark:spark-avro_2.11:2.4.0
After this the last assertion is failing. As the epochSeconds_casted column has a value in microseconds (not in milliseconds as before). The schema after version upgrade looks as
"name" : "epochSeconds_casted",
"type" : [ {
"type" : "long",
"logicalType" : "timestamp-micros"
}, "null" ]
We removed casting to Timestamp (in case of avro), but left casting to Timestamp in case of Parquet files.