What happened?
IcebergUtil.readDataFileAsIterator builds a CloseableIterable[Record] from Parquet.read(...).build() but returns only .iterator().asScala:
def readDataFileAsIterator(...): Iterator[Record] = {
...
val closeableIterable: CloseableIterable[Record] =
Parquet.read(inputFile).project(schema).createReaderFunc(readerFunc).build()
closeableIterable.iterator().asScala // ← parent ref dropped
}
The closeableIterable local goes out of scope after the method returns; the caller receives a bare scala.collection.Iterator[Record] with no close() and no reference to its parent — the underlying Parquet reader / S3InputStream can never be released.
Symptom chain under S3FileIO:
- Every read leaks one
S3InputStream (it stays open until GC because nothing in the call graph can close it).
- Iceberg-AWS
S3InputStream.finalize() logs WARN S3InputStream - Unclosed input stream created by: ... when the GC'd stream is finalized — flooding the amber stdout.
- The leaked stream had already borrowed one slot from the AWS SDK's
ApacheHttpClient connection pool (default 50 — Texera does not override; grep -rn maxConnections across common/, amber/, computing-unit-managing-service/ returns zero hits).
- After ~50 leaked reads the pool saturates; new S3 reads block indefinitely on
acquireConnection. JVM restart is the only known recovery.
IcebergCatalogInstance (IcebergCatalogInstance.scala) is a process-lifetime singleton and is never recreated, so leaked pool slots accumulate over the JVM lifetime.
ComputingUnitManagingResource @DELETE /{cuid}/terminate only deletes the K8s pod and writes a DB timestamp — in-flight Iceberg readers are not closed and the catalog / S3FileIO is untouched. Frequent CU delete/recreate cycles amplify the leak.
The same anti-pattern recurs in IcebergDocument.scala at lines 128, 207-219, 339, 462 — table.newScan().planFiles().iterator() similarly drops the CloseableIterable<FileScanTask>.
Fix sketch:
- Return an
AutoCloseable wrapper that owns both the iterator and the parent CloseableIterable; expose close() to the caller.
- Propagate the close handle through
IcebergDocument's public read methods (Using.resource(...) or equivalent at every call site).
- Optionally set
s3.connection-pool-max on the catalog properties as a defense in depth.
How to reproduce?
Add this ScalaTest spec at common/workflow-core/src/test/scala/org/apache/amber/util/IcebergUtilLeakSpec.scala:
package org.apache.amber.util
import org.apache.iceberg.io.{CloseableIterable, CloseableIterator}
import org.scalatest.flatspec.AnyFlatSpec
import java.util.concurrent.atomic.AtomicInteger
import scala.jdk.CollectionConverters._
class IcebergUtilLeakSpec extends AnyFlatSpec {
it should "close the parent CloseableIterable after the iterator is consumed" in {
val parentCloseCount = new AtomicInteger(0)
val ci: CloseableIterable[Int] = new CloseableIterable[Int] {
override def iterator(): CloseableIterator[Int] = {
val backing = java.util.Arrays.asList(1, 2, 3).iterator()
new CloseableIterator[Int] {
override def hasNext: Boolean = backing.hasNext
override def next(): Int = backing.next()
override def close(): Unit = ()
}
}
override def close(): Unit = parentCloseCount.incrementAndGet()
}
// Exercise the exact leaky pattern from IcebergUtil.scala:426-432
val leakyEscape: Iterator[Int] = ci.iterator().asScala
leakyEscape.toList
assert(parentCloseCount.get() >= 1,
s"Parent CloseableIterable was never closed (close count = ${parentCloseCount.get()}).")
}
}
Run:
sbt "WorkflowCore/testOnly org.apache.amber.util.IcebergUtilLeakSpec"
Output on current master:
[info] IcebergUtilLeakSpec:
[info] - should close the parent CloseableIterable after the iterator is consumed *** FAILED ***
[info] 0 was not greater than or equal to 1 Parent CloseableIterable was never closed (close count = 0).
[info] *** 1 TEST FAILED ***
The test exercises the exact anti-pattern (CloseableIterable.iterator().asScala with the parent reference dropped) without needing real Parquet / S3, so it reproduces the structural defect deterministically. The user-visible WARN S3InputStream - Unclosed input stream created by: ... flood and the eventual stall come from running the buggy method under S3FileIO repeatedly until the bounded Apache HTTP pool saturates.
Version
1.1.0-incubating (Pre-release/Master)
What happened?
IcebergUtil.readDataFileAsIteratorbuilds aCloseableIterable[Record]fromParquet.read(...).build()but returns only.iterator().asScala:The
closeableIterablelocal goes out of scope after the method returns; the caller receives a barescala.collection.Iterator[Record]with noclose()and no reference to its parent — the underlying Parquet reader /S3InputStreamcan never be released.Symptom chain under
S3FileIO:S3InputStream(it stays open until GC because nothing in the call graph can close it).S3InputStream.finalize()logsWARN S3InputStream - Unclosed input stream created by: ...when the GC'd stream is finalized — flooding the amber stdout.ApacheHttpClientconnection pool (default 50 — Texera does not override;grep -rn maxConnectionsacrosscommon/,amber/,computing-unit-managing-service/returns zero hits).acquireConnection. JVM restart is the only known recovery.IcebergCatalogInstance(IcebergCatalogInstance.scala) is a process-lifetime singleton and is never recreated, so leaked pool slots accumulate over the JVM lifetime.ComputingUnitManagingResource @DELETE /{cuid}/terminateonly deletes the K8s pod and writes a DB timestamp — in-flight Iceberg readers are not closed and the catalog /S3FileIOis untouched. Frequent CU delete/recreate cycles amplify the leak.The same anti-pattern recurs in
IcebergDocument.scalaat lines 128, 207-219, 339, 462 —table.newScan().planFiles().iterator()similarly drops theCloseableIterable<FileScanTask>.Fix sketch:
AutoCloseablewrapper that owns both the iterator and the parentCloseableIterable; exposeclose()to the caller.IcebergDocument's public read methods (Using.resource(...)or equivalent at every call site).s3.connection-pool-maxon the catalog properties as a defense in depth.How to reproduce?
Add this ScalaTest spec at
common/workflow-core/src/test/scala/org/apache/amber/util/IcebergUtilLeakSpec.scala:Run:
Output on current
master:The test exercises the exact anti-pattern (
CloseableIterable.iterator().asScalawith the parent reference dropped) without needing real Parquet / S3, so it reproduces the structural defect deterministically. The user-visibleWARN S3InputStream - Unclosed input stream created by: ...flood and the eventual stall come from running the buggy method underS3FileIOrepeatedly until the bounded Apache HTTP pool saturates.Version
1.1.0-incubating (Pre-release/Master)