Skip to content

IcebergUtil.readDataFileAsIterator leaks the parent CloseableIterable / S3InputStream per data-file read #5143

@mengw15

Description

@mengw15

What happened?

IcebergUtil.readDataFileAsIterator builds a CloseableIterable[Record] from Parquet.read(...).build() but returns only .iterator().asScala:

def readDataFileAsIterator(...): Iterator[Record] = {
  ...
  val closeableIterable: CloseableIterable[Record] =
    Parquet.read(inputFile).project(schema).createReaderFunc(readerFunc).build()
  closeableIterable.iterator().asScala     // ← parent ref dropped
}

The closeableIterable local goes out of scope after the method returns; the caller receives a bare scala.collection.Iterator[Record] with no close() and no reference to its parent — the underlying Parquet reader / S3InputStream can never be released.

Symptom chain under S3FileIO:

  1. Every read leaks one S3InputStream (it stays open until GC because nothing in the call graph can close it).
  2. Iceberg-AWS S3InputStream.finalize() logs WARN S3InputStream - Unclosed input stream created by: ... when the GC'd stream is finalized — flooding the amber stdout.
  3. The leaked stream had already borrowed one slot from the AWS SDK's ApacheHttpClient connection pool (default 50 — Texera does not override; grep -rn maxConnections across common/, amber/, computing-unit-managing-service/ returns zero hits).
  4. After ~50 leaked reads the pool saturates; new S3 reads block indefinitely on acquireConnection. JVM restart is the only known recovery.
  5. IcebergCatalogInstance (IcebergCatalogInstance.scala) is a process-lifetime singleton and is never recreated, so leaked pool slots accumulate over the JVM lifetime.
  6. ComputingUnitManagingResource @DELETE /{cuid}/terminate only deletes the K8s pod and writes a DB timestamp — in-flight Iceberg readers are not closed and the catalog / S3FileIO is untouched. Frequent CU delete/recreate cycles amplify the leak.

The same anti-pattern recurs in IcebergDocument.scala at lines 128, 207-219, 339, 462 — table.newScan().planFiles().iterator() similarly drops the CloseableIterable<FileScanTask>.

Fix sketch:

  • Return an AutoCloseable wrapper that owns both the iterator and the parent CloseableIterable; expose close() to the caller.
  • Propagate the close handle through IcebergDocument's public read methods (Using.resource(...) or equivalent at every call site).
  • Optionally set s3.connection-pool-max on the catalog properties as a defense in depth.

How to reproduce?

Add this ScalaTest spec at common/workflow-core/src/test/scala/org/apache/amber/util/IcebergUtilLeakSpec.scala:

package org.apache.amber.util

import org.apache.iceberg.io.{CloseableIterable, CloseableIterator}
import org.scalatest.flatspec.AnyFlatSpec
import java.util.concurrent.atomic.AtomicInteger
import scala.jdk.CollectionConverters._

class IcebergUtilLeakSpec extends AnyFlatSpec {

  it should "close the parent CloseableIterable after the iterator is consumed" in {
    val parentCloseCount = new AtomicInteger(0)

    val ci: CloseableIterable[Int] = new CloseableIterable[Int] {
      override def iterator(): CloseableIterator[Int] = {
        val backing = java.util.Arrays.asList(1, 2, 3).iterator()
        new CloseableIterator[Int] {
          override def hasNext: Boolean = backing.hasNext
          override def next(): Int      = backing.next()
          override def close(): Unit    = ()
        }
      }
      override def close(): Unit = parentCloseCount.incrementAndGet()
    }

    // Exercise the exact leaky pattern from IcebergUtil.scala:426-432
    val leakyEscape: Iterator[Int] = ci.iterator().asScala
    leakyEscape.toList

    assert(parentCloseCount.get() >= 1,
      s"Parent CloseableIterable was never closed (close count = ${parentCloseCount.get()}).")
  }
}

Run:

sbt "WorkflowCore/testOnly org.apache.amber.util.IcebergUtilLeakSpec"

Output on current master:

[info] IcebergUtilLeakSpec:
[info] - should close the parent CloseableIterable after the iterator is consumed *** FAILED ***
[info]   0 was not greater than or equal to 1 Parent CloseableIterable was never closed (close count = 0).
[info] *** 1 TEST FAILED ***

The test exercises the exact anti-pattern (CloseableIterable.iterator().asScala with the parent reference dropped) without needing real Parquet / S3, so it reproduces the structural defect deterministically. The user-visible WARN S3InputStream - Unclosed input stream created by: ... flood and the eventual stall come from running the buggy method under S3FileIO repeatedly until the bounded Apache HTTP pool saturates.

Version

1.1.0-incubating (Pre-release/Master)

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No fields configured for Bug.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions