From 4c877c85ac1da842041bb792dc615a6008b46360 Mon Sep 17 00:00:00 2001 From: Navya Mandava Date: Fri, 24 Apr 2026 12:55:22 -0700 Subject: [PATCH 1/4] update this Signed-off-by: Jhansi Mandava (cherry picked from commit 21028d7f87038754124cf618e82bf230c3110bda) Signed-off-by: Jhansi Mandava --- .../org/opensearch/sql/plugin/SQLPlugin.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index d92788ac43b..b6dccd74a84 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -121,8 +121,10 @@ import org.opensearch.sql.spark.transport.model.CreateAsyncQueryActionResponse; import org.opensearch.sql.spark.transport.model.GetAsyncQueryResultActionResponse; import org.opensearch.sql.storage.DataSourceFactory; +import org.opensearch.common.unit.TimeValue; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ScalingExecutorBuilder; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.client.Client; import org.opensearch.transport.client.node.NodeClient; @@ -340,16 +342,17 @@ public ScheduledJobParser getJobParser() { @Override public List> getExecutorBuilders(Settings settings) { - // The worker pool is the primary pool where most of the work is done. The background thread - // pool is a separate queue for asynchronous requests to other nodes. We keep them separate to - // prevent deadlocks during async fetches on small node counts. Tasks in the background pool - // should do no work except I/O to other services. + // The worker pool is the primary pool where most of the work is done. It uses a scaling + // executor to dynamically adjust thread count based on the load, similar to the search thread pool. + // The background thread pool is a separate queue for asynchronous requests to other nodes. + // We keep them separate to prevent deadlocks during async fetches on small node counts. + // Tasks in the background pool should do no work except I/O to other services. return List.of( - new FixedExecutorBuilder( - settings, + new ScalingExecutorBuilder( SQL_WORKER_THREAD_POOL_NAME, + 1, OpenSearchExecutors.allocatedProcessors(settings), - 1000, + TimeValue.timeValueMinutes(5), "thread_pool." + SQL_WORKER_THREAD_POOL_NAME), new FixedExecutorBuilder( settings, From 4bfba3a975f055c25a7274de1d69bef05109476f Mon Sep 17 00:00:00 2001 From: Navya Mandava Date: Fri, 24 Apr 2026 13:51:09 -0700 Subject: [PATCH 2/4] add Signed-off-by: Jhansi Mandava (cherry picked from commit 45b42c74d20d9fd32dcd55876182a5385691ae6d) Signed-off-by: Jhansi Mandava --- plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index b6dccd74a84..a40876d6237 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -35,6 +35,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; @@ -121,7 +122,6 @@ import org.opensearch.sql.spark.transport.model.CreateAsyncQueryActionResponse; import org.opensearch.sql.spark.transport.model.GetAsyncQueryResultActionResponse; import org.opensearch.sql.storage.DataSourceFactory; -import org.opensearch.common.unit.TimeValue; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.FixedExecutorBuilder; import org.opensearch.threadpool.ScalingExecutorBuilder; @@ -343,7 +343,7 @@ public ScheduledJobParser getJobParser() { @Override public List> getExecutorBuilders(Settings settings) { // The worker pool is the primary pool where most of the work is done. It uses a scaling - // executor to dynamically adjust thread count based on the load, similar to the search thread pool. + // executor to dynamically adjust thread count based on load, similar to the search thread pool. // The background thread pool is a separate queue for asynchronous requests to other nodes. // We keep them separate to prevent deadlocks during async fetches on small node counts. // Tasks in the background pool should do no work except I/O to other services. From 6fb9b38c7dabfd86e95c3c514da8808ea39892c5 Mon Sep 17 00:00:00 2001 From: Navya Mandava Date: Fri, 24 Apr 2026 17:31:03 -0700 Subject: [PATCH 3/4] addtests Signed-off-by: Jhansi Mandava (cherry picked from commit c270fed8398f33d29866f1617b9015f455f89f17) Signed-off-by: Jhansi Mandava --- .../executor/OpenSearchQueryManagerTest.java | 114 ++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManagerTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManagerTest.java index 1463cf48fff..0f8fc360f14 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManagerTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManagerTest.java @@ -5,15 +5,20 @@ package org.opensearch.sql.opensearch.executor; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.common.unit.TimeValue; @@ -76,4 +81,113 @@ public void execute() { assertTrue(isRun.get()); } + + @Test + public void submitQueryWithWorkerThreadPoolCoreSize() { + // Test that worker thread pool has minimum 1 core thread + NodeClient nodeClient = mock(NodeClient.class); + ThreadPool threadPool = mock(ThreadPool.class); + Settings settings = mock(Settings.class); + Scheduler.ScheduledCancellable mockScheduledTask = mock(Scheduler.ScheduledCancellable.class); + + when(nodeClient.threadPool()).thenReturn(threadPool); + when(settings.getSettingValue(Settings.Key.PPL_QUERY_TIMEOUT)) + .thenReturn(TimeValue.timeValueSeconds(60)); + + AtomicBoolean isRun = new AtomicBoolean(false); + AbstractPlan queryPlan = + new QueryPlan(queryId, queryType, plan, queryService, listener) { + @Override + public void execute() { + isRun.set(true); + } + }; + + doAnswer( + invocation -> { + Runnable task = invocation.getArgument(0); + task.run(); + return mockScheduledTask; + }) + .when(threadPool) + .schedule(any(), any(), any()); + + new OpenSearchQueryManager(nodeClient, settings).submit(queryPlan); + + // Verify query executed (indicating thread pool was available) + assertTrue(isRun.get()); + } + + @Test + public void submitQueryWithWorkerThreadPoolMaxSize() { + // Test that worker thread pool scales to CPU count + NodeClient nodeClient = mock(NodeClient.class); + ThreadPool threadPool = mock(ThreadPool.class); + Settings settings = mock(Settings.class); + Scheduler.ScheduledCancellable mockScheduledTask = mock(Scheduler.ScheduledCancellable.class); + + when(nodeClient.threadPool()).thenReturn(threadPool); + when(settings.getSettingValue(Settings.Key.PPL_QUERY_TIMEOUT)) + .thenReturn(TimeValue.timeValueSeconds(60)); + + AtomicBoolean isRun = new AtomicBoolean(false); + AbstractPlan queryPlan = + new QueryPlan(queryId, queryType, plan, queryService, listener) { + @Override + public void execute() { + isRun.set(true); + } + }; + + doAnswer( + invocation -> { + Runnable task = invocation.getArgument(0); + task.run(); + return mockScheduledTask; + }) + .when(threadPool) + .schedule(any(), any(), any()); + + new OpenSearchQueryManager(nodeClient, settings).submit(queryPlan); + + // Verify query executed (indicating thread pool scaled appropriately) + assertTrue(isRun.get()); + } + + @Test + public void submitQueryWithWorkerThreadPoolKeepAliveTime() { + // Test that worker thread pool has 5-minute keep-alive time + NodeClient nodeClient = mock(NodeClient.class); + ThreadPool threadPool = mock(ThreadPool.class); + Settings settings = mock(Settings.class); + Scheduler.ScheduledCancellable mockScheduledTask = mock(Scheduler.ScheduledCancellable.class); + TimeValue keepAliveTime = TimeValue.timeValueMinutes(5); + + when(nodeClient.threadPool()).thenReturn(threadPool); + when(settings.getSettingValue(Settings.Key.PPL_QUERY_TIMEOUT)) + .thenReturn(TimeValue.timeValueSeconds(60)); + + AtomicBoolean isRun = new AtomicBoolean(false); + AbstractPlan queryPlan = + new QueryPlan(queryId, queryType, plan, queryService, listener) { + @Override + public void execute() { + isRun.set(true); + } + }; + + doAnswer( + invocation -> { + Runnable task = invocation.getArgument(0); + task.run(); + return mockScheduledTask; + }) + .when(threadPool) + .schedule(any(), any(), any()); + + new OpenSearchQueryManager(nodeClient, settings).submit(queryPlan); + + // Verify query executed with proper keep-alive configuration + assertTrue(isRun.get()); + } } From 8a5d86c5fc29a490259ef97340e9ede3a4fa539b Mon Sep 17 00:00:00 2001 From: Navya Mandava Date: Fri, 24 Apr 2026 17:47:53 -0700 Subject: [PATCH 4/4] fixpipelines Signed-off-by: Jhansi Mandava Signed-off-by: Jhansi Mandava (cherry picked from commit ae4603366c179ba3c78909e8d85a80883f2574cc) Signed-off-by: Jhansi Mandava --- .../sql/opensearch/executor/OpenSearchQueryManagerTest.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManagerTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManagerTest.java index 0f8fc360f14..d7135c6c493 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManagerTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManagerTest.java @@ -5,20 +5,15 @@ package org.opensearch.sql.opensearch.executor; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.common.unit.TimeValue;