Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions file/src/main/java/org/apache/pulsar/io/file/FileSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,21 @@
public class FileSource extends PushSource<byte[]> {

private ExecutorService executor;
private final BlockingQueue<File> workQueue = new LinkedBlockingQueue<>();
private final BlockingQueue<File> inProcess = new LinkedBlockingQueue<>();
private final BlockingQueue<File> recentlyProcessed = new LinkedBlockingQueue<>();
private BlockingQueue<File> workQueue;
private BlockingQueue<File> inProcess;
private BlockingQueue<File> recentlyProcessed;

@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
FileSourceConfig fileConfig = FileSourceConfig.load(config);
fileConfig.validate();

// Initialize bounded queues based on configuration to prevent OOM
int queueCapacity = fileConfig.getMaxQueueSize();
workQueue = new LinkedBlockingQueue<>(queueCapacity);
inProcess = new LinkedBlockingQueue<>(queueCapacity);
recentlyProcessed = new LinkedBlockingQueue<>(queueCapacity);

// One extra for the File listing thread, and another for the cleanup thread
executor = Executors.newFixedThreadPool(fileConfig.getNumWorkers() + 2);
executor.execute(new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed));
Expand Down
10 changes: 10 additions & 0 deletions file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ public class FileSourceConfig implements Serializable {
*/
private Integer numWorkers = 1;

/**
* The maximum capacity of the internal queues. This provides backpressure
* to prevent OutOfMemoryErrors when processing is slower than file listing.
*/
private Integer maxQueueSize = 1000;

/**
* If set, do not delete but only rename file that has been processed.
* This config only work when 'keepFile' property is false.
Expand Down Expand Up @@ -170,6 +176,10 @@ public void validate() {
throw new IllegalArgumentException("The property numWorkers must be greater than zero");
}

if (maxQueueSize != null && maxQueueSize <= 0) {
throw new IllegalArgumentException("The property maxQueueSize must be greater than zero");
}

if (processedFileSuffix != null && keepFile) {
throw new IllegalArgumentException(
"The property keepFile must be false if the property processedFileSuffix is set");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
*/
package org.apache.pulsar.io.file;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -142,6 +144,52 @@ public final void invalidKeepFileTest() throws IOException {
config.validate();
}

@Test
public void testDefaultMaxQueueSize() throws Exception {
Map<String, Object> map = new HashMap<>();
map.put("inputDirectory", "/tmp"); // Dummy directory just to pass basic validation

FileSourceConfig config = FileSourceConfig.load(map);

// Assert that if the user provides no value, it defaults to 1000
assertEquals(config.getMaxQueueSize(), Integer.valueOf(1000));
}

@Test
public void testValidMaxQueueSize() throws Exception {
Map<String, Object> map = new HashMap<>();
map.put("inputDirectory", "/tmp");
map.put("maxQueueSize", 5000);

FileSourceConfig config = FileSourceConfig.load(map);

// Assert that the custom value is loaded correctly
assertEquals(config.getMaxQueueSize(), Integer.valueOf(5000));

// Should not throw any exceptions
config.validate();
}

@Test
public void testInvalidMaxQueueSize() {
Map<String, Object> map = new HashMap<>();
map.put("inputDirectory", "/tmp");

// Test 0
map.put("maxQueueSize", 0);
assertThrows(IllegalArgumentException.class, () -> {
FileSourceConfig config = FileSourceConfig.load(map);
config.validate();
});

// Test Negative Number
map.put("maxQueueSize", -5);
assertThrows(IllegalArgumentException.class, () -> {
FileSourceConfig config = FileSourceConfig.load(map);
config.validate();
});
}

private File getFile(String name) {
ClassLoader classLoader = getClass().getClassLoader();
return new File(classLoader.getResource(name).getFile());
Expand Down