diff --git a/file/src/main/java/org/apache/pulsar/io/file/FileSource.java b/file/src/main/java/org/apache/pulsar/io/file/FileSource.java index c201977d56..ff82ce8854 100644 --- a/file/src/main/java/org/apache/pulsar/io/file/FileSource.java +++ b/file/src/main/java/org/apache/pulsar/io/file/FileSource.java @@ -36,15 +36,21 @@ public class FileSource extends PushSource { private ExecutorService executor; - private final BlockingQueue workQueue = new LinkedBlockingQueue<>(); - private final BlockingQueue inProcess = new LinkedBlockingQueue<>(); - private final BlockingQueue recentlyProcessed = new LinkedBlockingQueue<>(); + private BlockingQueue workQueue; + private BlockingQueue inProcess; + private BlockingQueue recentlyProcessed; @Override public void open(Map config, SourceContext sourceContext) throws Exception { FileSourceConfig fileConfig = FileSourceConfig.load(config); fileConfig.validate(); + // Initialize bounded queues based on configuration to prevent OOM + int queueCapacity = fileConfig.getMaxQueueSize(); + workQueue = new LinkedBlockingQueue<>(queueCapacity); + inProcess = new LinkedBlockingQueue<>(queueCapacity); + recentlyProcessed = new LinkedBlockingQueue<>(queueCapacity); + // One extra for the File listing thread, and another for the cleanup thread executor = Executors.newFixedThreadPool(fileConfig.getNumWorkers() + 2); executor.execute(new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed)); diff --git a/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java b/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java index d9b1e1f3a4..fb2614de86 100644 --- a/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java +++ b/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java @@ -110,6 +110,12 @@ public class FileSourceConfig implements Serializable { */ private Integer numWorkers = 1; + /** + * The maximum capacity of the internal queues. This provides backpressure + * to prevent OutOfMemoryErrors when processing is slower than file listing. + */ + private Integer maxQueueSize = 1000; + /** * If set, do not delete but only rename file that has been processed. * This config only work when 'keepFile' property is false. @@ -170,6 +176,10 @@ public void validate() { throw new IllegalArgumentException("The property numWorkers must be greater than zero"); } + if (maxQueueSize != null && maxQueueSize <= 0) { + throw new IllegalArgumentException("The property maxQueueSize must be greater than zero"); + } + if (processedFileSuffix != null && keepFile) { throw new IllegalArgumentException( "The property keepFile must be false if the property processedFileSuffix is set"); diff --git a/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTest.java b/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTest.java index e7d497f5a9..6097e2d1ae 100644 --- a/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTest.java +++ b/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTest.java @@ -18,8 +18,10 @@ */ package org.apache.pulsar.io.file; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import java.io.File; import java.io.IOException; @@ -142,6 +144,52 @@ public final void invalidKeepFileTest() throws IOException { config.validate(); } + @Test + public void testDefaultMaxQueueSize() throws Exception { + Map map = new HashMap<>(); + map.put("inputDirectory", "/tmp"); // Dummy directory just to pass basic validation + + FileSourceConfig config = FileSourceConfig.load(map); + + // Assert that if the user provides no value, it defaults to 1000 + assertEquals(config.getMaxQueueSize(), Integer.valueOf(1000)); + } + + @Test + public void testValidMaxQueueSize() throws Exception { + Map map = new HashMap<>(); + map.put("inputDirectory", "/tmp"); + map.put("maxQueueSize", 5000); + + FileSourceConfig config = FileSourceConfig.load(map); + + // Assert that the custom value is loaded correctly + assertEquals(config.getMaxQueueSize(), Integer.valueOf(5000)); + + // Should not throw any exceptions + config.validate(); + } + + @Test + public void testInvalidMaxQueueSize() { + Map map = new HashMap<>(); + map.put("inputDirectory", "/tmp"); + + // Test 0 + map.put("maxQueueSize", 0); + assertThrows(IllegalArgumentException.class, () -> { + FileSourceConfig config = FileSourceConfig.load(map); + config.validate(); + }); + + // Test Negative Number + map.put("maxQueueSize", -5); + assertThrows(IllegalArgumentException.class, () -> { + FileSourceConfig config = FileSourceConfig.load(map); + config.validate(); + }); + } + private File getFile(String name) { ClassLoader classLoader = getClass().getClassLoader(); return new File(classLoader.getResource(name).getFile());