From 67c388c42ee1f2c91ff0cc76bd11399914f8f8f5 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Fri, 3 Apr 2026 14:19:40 +0530 Subject: [PATCH 1/3] Initial broken commit Signed-off-by: Arpit Bandejiya --- .gitignore | 3 + .../opensearch/be/lucene/LuceneCommitter.java | 168 +++++++++ .../lucene/LuceneIndexingExecutionEngine.java | 138 ++++++++ .../be/lucene/LuceneReaderManager.java | 75 ++-- .../be/lucene/LuceneSearchEnginePlugin.java | 41 ++- .../be/lucene/LuceneCommitterTests.java | 109 ++++++ .../LuceneIndexingExecutionEngineTests.java | 161 +++++++++ .../be/lucene/LuceneReaderManagerTests.java | 319 ++++++++++++++++++ .../lucene/LuceneSearchEnginePluginTests.java | 34 ++ .../exec/DefaultPlanExecutorTests.java | 5 - .../composite/CompositeEnginePlugin.java | 10 +- .../CompositeIndexingExecutionEngine.java | 69 +++- .../composite/CompositeEnginePluginTests.java | 1 + ...CompositeIndexingExecutionEngineTests.java | 274 ++++++++++++++- .../composite/CompositeTestHelper.java | 44 ++- .../parquet/ParquetDataFormatPlugin.java | 8 +- .../index/engine/DataFormatAwareEngine.java | 21 ++ .../engine/dataformat/DataFormatPlugin.java | 18 +- .../engine/dataformat/DataFormatRegistry.java | 65 ++-- .../exec/DataFormatAwareEngineFactory.java | 50 +-- .../index/engine/exec/commit/Committer.java | 82 +++++ .../engine/exec/commit/CommitterSettings.java | 35 ++ .../engine/exec/commit/package-info.java | 10 + .../engine/exec/coord/CatalogSnapshot.java | 3 - .../coord/DataformatAwareCatalogSnapshot.java | 6 - .../coord/SegmentInfosCatalogSnapshot.java | 6 - .../opensearch/index/shard/IndexShard.java | 9 +- .../org/opensearch/plugins/EnginePlugin.java | 13 + .../plugins/SearchBackEndPlugin.java | 12 +- .../DataFormatAwareEngineCommitterTests.java | 68 ++++ .../dataformat/DataFormatPluginTests.java | 1 + .../dataformat/DataFormatRegistryTests.java | 6 +- .../dataformat/stub/MockCatalogSnapshot.java | 6 - .../dataformat/stub/MockDataFormatPlugin.java | 8 +- .../stub/MockSearchBackEndPlugin.java | 5 +- .../engine/exec/commit/CommitterTests.java | 193 +++++++++++ 36 files changed, 1923 insertions(+), 153 deletions(-) create mode 100644 sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneCommitter.java create mode 100644 sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneIndexingExecutionEngine.java create mode 100644 sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneCommitterTests.java create mode 100644 sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneIndexingExecutionEngineTests.java create mode 100644 sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneReaderManagerTests.java create mode 100644 sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneSearchEnginePluginTests.java create mode 100644 server/src/main/java/org/opensearch/index/engine/exec/commit/Committer.java create mode 100644 server/src/main/java/org/opensearch/index/engine/exec/commit/CommitterSettings.java create mode 100644 server/src/main/java/org/opensearch/index/engine/exec/commit/package-info.java create mode 100644 server/src/test/java/org/opensearch/index/engine/DataFormatAwareEngineCommitterTests.java create mode 100644 server/src/test/java/org/opensearch/index/engine/exec/commit/CommitterTests.java diff --git a/.gitignore b/.gitignore index 1f2fe4ab3a1a8..55b0e5c3e2e15 100644 --- a/.gitignore +++ b/.gitignore @@ -67,6 +67,9 @@ testfixtures_shared/ # These are generated from .ci/jobs.t .ci/jobs/ +# native build files +*.dylib + # build files generated doc-tools/missing-doclet/bin/ **/Cargo.lock diff --git a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneCommitter.java b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneCommitter.java new file mode 100644 index 0000000000000..6e31d61c7b281 --- /dev/null +++ b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneCommitter.java @@ -0,0 +1,168 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.lucene; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.CommitStats; +import org.opensearch.index.engine.EngineConfig; +import org.opensearch.index.engine.SafeCommitInfo; +import org.opensearch.index.engine.exec.commit.Committer; +import org.opensearch.index.engine.exec.commit.CommitterSettings; +import org.opensearch.index.seqno.SequenceNumbers; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; + +/** + * Lucene-specific {@link Committer} that owns the {@link IndexWriter} lifecycle. + *

+ * Responsibilities: + *

+ *

+ * The IndexWriter is exposed via {@link #getIndexWriter()} so that + * {@link LuceneIndexingExecutionEngine} (which handles {@code addIndexes} during refresh) + * and {@link LuceneReaderManager} (which opens DirectoryReaders) can share the same writer. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class LuceneCommitter implements Committer { + + private static final Logger logger = LogManager.getLogger(LuceneCommitter.class); + + /** Subdirectory under the shard data path where the Lucene index is stored. */ + static final String LUCENE_DIR_NAME = "lucene"; + + private IndexWriter indexWriter; + + /** Creates a new LuceneCommitter. */ + public LuceneCommitter() {} + + @Override + public void init(CommitterSettings settings) throws IOException { + Path luceneDir = settings.shardPath().getDataPath().resolve(LUCENE_DIR_NAME); + Files.createDirectories(luceneDir); + Directory directory = FSDirectory.open(luceneDir); + IndexWriterConfig iwc = createIndexWriterConfig(settings.engineConfig()); + this.indexWriter = new IndexWriter(directory, iwc); + } + + /** + * Creates an {@link IndexWriterConfig} from the engine configuration. + * When an {@link EngineConfig} is provided, the analyzer, codec, index sort, + * and similarity are taken from it. Otherwise a default config is used. + */ + private IndexWriterConfig createIndexWriterConfig(EngineConfig engineConfig) { + if (engineConfig == null) { + return new IndexWriterConfig(); + } + IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer()); + iwc.setCodec(engineConfig.getCodec()); + iwc.setSimilarity(engineConfig.getSimilarity()); + iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac()); + iwc.setUseCompoundFile(engineConfig.useCompoundFile()); + if (engineConfig.getIndexSort() != null) { + iwc.setIndexSort(engineConfig.getIndexSort()); + } + iwc.setCommitOnClose(false); + iwc.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND); + return iwc; + } + + @Override + public void commit(Map commitData) throws IOException { + if (indexWriter == null) { + throw new IllegalStateException("LuceneCommitter is closed"); + } + indexWriter.setLiveCommitData(commitData.entrySet()); + indexWriter.commit(); + } + + @Override + public void close() throws IOException { + if (indexWriter != null) { + indexWriter.close(); + indexWriter = null; + } + } + + /** + * Returns the underlying IndexWriter. + * Used by {@link LuceneIndexingExecutionEngine} for {@code addIndexes} and + * by {@link LuceneReaderManager} for opening DirectoryReaders. + * Package-private — only accessible within the analytics-backend-lucene plugin. + * + * @return the IndexWriter, or null if not initialized + */ + IndexWriter getIndexWriter() { + return indexWriter; + } + + @Override + public Map getLastCommittedData() throws IOException { + if (indexWriter == null) { + return Map.of(); + } + Iterable> liveCommitData = indexWriter.getLiveCommitData(); + if (liveCommitData == null) { + return Map.of(); + } + Map result = new HashMap<>(); + for (Map.Entry entry : liveCommitData) { + result.put(entry.getKey(), entry.getValue()); + } + return Map.copyOf(result); + } + + @Override + public CommitStats getCommitStats() { + if (indexWriter == null) { + return null; + } + try { + SegmentInfos segmentInfos = SegmentInfos.readLatestCommit(indexWriter.getDirectory()); + return new CommitStats(segmentInfos); + } catch (IOException e) { + logger.warn("Failed to read segment infos for commit stats", e); + return null; + } + } + + @Override + public SafeCommitInfo getSafeCommitInfo() { + if (indexWriter == null) { + return SafeCommitInfo.EMPTY; + } + try { + Map commitData = getLastCommittedData(); + long localCheckpoint = Long.parseLong( + commitData.getOrDefault(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(SequenceNumbers.NO_OPS_PERFORMED)) + ); + int docCount = indexWriter.getDocStats().numDocs; + return new SafeCommitInfo(localCheckpoint, docCount); + } catch (IOException e) { + logger.warn("Failed to get safe commit info", e); + return SafeCommitInfo.EMPTY; + } + } +} diff --git a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneIndexingExecutionEngine.java b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneIndexingExecutionEngine.java new file mode 100644 index 0000000000000..299064b082326 --- /dev/null +++ b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneIndexingExecutionEngine.java @@ -0,0 +1,138 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.lucene; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.misc.store.HardlinkCopyDirectoryWrapper; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.NIOFSDirectory; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.util.io.IOUtils; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.DocumentInput; +import org.opensearch.index.engine.dataformat.IndexingExecutionEngine; +import org.opensearch.index.engine.dataformat.Merger; +import org.opensearch.index.engine.dataformat.RefreshInput; +import org.opensearch.index.engine.dataformat.RefreshResult; +import org.opensearch.index.engine.dataformat.Writer; +import org.opensearch.index.engine.exec.WriterFileSet; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +/** + * Lucene-specific {@link IndexingExecutionEngine} that incorporates flushed segments + * during refresh via {@code IndexWriter.addIndexes(Directory...)}. + *

+ * Does not own the {@link IndexWriter} — it receives an optional parent writer + * from the {@link LuceneCommitter} which owns the writer lifecycle. This separation allows + * the committer to be used standalone (committer-only scenario) while the indexing engine + * is only created when Lucene participates as a per-format engine in the composite engine. + *

+ * When no parent writer is provided, refresh is a no-op (no segments to incorporate). + * + * @opensearch.experimental + */ +@ExperimentalApi +public class LuceneIndexingExecutionEngine implements IndexingExecutionEngine> { + + private static final Logger logger = LogManager.getLogger(LuceneIndexingExecutionEngine.class); + // TODO:: This will go once we implement the Dataformat plugin in Lucene + private static final String LUCENE_FORMAT_NAME = "lucene"; + + private final IndexWriter parentIndexWriter; + + /** + * Creates a new LuceneIndexingExecutionEngine. + * + * @param parentIndexWriter the IndexWriter from the LuceneCommitter, or null if not available (refresh will be a no-op) + */ + public LuceneIndexingExecutionEngine(IndexWriter parentIndexWriter) { + this.parentIndexWriter = parentIndexWriter; + } + + @Override + public RefreshResult refresh(RefreshInput refreshInput) throws IOException { + if (refreshInput == null || parentIndexWriter == null) { + return new RefreshResult(List.of()); + } + + List directories = new ArrayList<>(); + List sourcePaths = new ArrayList<>(); + try { + for (WriterFileSet wfs : refreshInput.writerFiles()) { + Path dirPath = Path.of(wfs.directory()); + if (dirPath.getFileName().toString().equals(LUCENE_FORMAT_NAME) == false) { + continue; + } + if (Files.isDirectory(dirPath)) { + directories.add(new HardlinkCopyDirectoryWrapper(new NIOFSDirectory(dirPath))); + sourcePaths.add(dirPath); + } + } + if (directories.isEmpty() == false) { + parentIndexWriter.addIndexes(directories.toArray(new Directory[0])); + } + } finally { + // Close directory handles first, then delete source files + for (Directory dir : directories) { + try { + dir.close(); + } catch (IOException e) { + logger.warn("Failed to close directory after addIndexes", e); + } + } + for (Path sourcePath : sourcePaths) { + try { + IOUtils.rm(sourcePath); + } catch (IOException e) { + logger.warn("Failed to delete source directory [{}] after addIndexes", sourcePath, e); + } + } + } + return new RefreshResult(List.of()); + } + + @Override + public Writer> createWriter(long writerGeneration) { + throw new UnsupportedOperationException("createWriter not yet implemented for Lucene engine"); + } + + @Override + public Merger getMerger() { + return null; + } + + @Override + public long getNextWriterGeneration() { + throw new UnsupportedOperationException("getNextWriterGeneration not yet implemented for Lucene engine"); + } + + @Override + public DataFormat getDataFormat() { + throw new UnsupportedOperationException("getDataFormat not yet implemented — LuceneDataFormat deferred to future PR"); + } + + @Override + public void deleteFiles(Map> filesToDelete) throws IOException { + // Stub: file deletion to be implemented in a future iteration + } + + @Override + public DocumentInput newDocumentInput() { + throw new UnsupportedOperationException("newDocumentInput not yet implemented — LuceneDocumentInput deferred to future PR"); + } +} diff --git a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneReaderManager.java b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneReaderManager.java index d3fe2c6338089..5f7cdb45d01b6 100644 --- a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneReaderManager.java +++ b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneReaderManager.java @@ -9,8 +9,8 @@ package org.opensearch.be.lucene; import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.search.ReferenceManager; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.index.engine.dataformat.DataFormat; import org.opensearch.index.engine.exec.EngineReaderManager; import org.opensearch.index.engine.exec.coord.CatalogSnapshot; @@ -19,66 +19,81 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Objects; /** * Lucene implementation of {@link EngineReaderManager}. *

- * Wraps Lucene's {@link ReferenceManager} for {@link DirectoryReader}. - * Acquire increments the ref count on the current reader; - * release decrements it — same pattern as {@code DatafusionReaderManager}. + * Constructed with a {@link DataFormat} and an initial {@link OpenSearchDirectoryReader} + * (typically opened from an IndexWriter and wrapped via + * {@link OpenSearchDirectoryReader#wrap}). Maintains a map of {@link CatalogSnapshot} + * to {@link OpenSearchDirectoryReader} so each snapshot gets the reader that was current + * at the time of its refresh. On each {@link #afterRefresh}, the current reader is + * refreshed via {@link DirectoryReader#openIfChanged}. * * @opensearch.experimental */ @ExperimentalApi -public class LuceneReaderManager implements EngineReaderManager { +public class LuceneReaderManager implements EngineReaderManager { - Map readers = new HashMap<>(); - DataFormat dataFormat; + private final DataFormat dataFormat; + private final Map readers = new HashMap<>(); + private volatile OpenSearchDirectoryReader currentReader; /** - * Creates a new LuceneReaderManager for the given data format. + * Creates a new LuceneReaderManager. * - * @param dataFormat the data format for this reader manager + * @param dataFormat the data format this reader manager serves + * @param initialReader the initial OpenSearchDirectoryReader, must not be null + * @throws NullPointerException if initialReader is null */ - @SuppressWarnings("unchecked") - public LuceneReaderManager(DataFormat dataFormat) { + public LuceneReaderManager(DataFormat dataFormat, OpenSearchDirectoryReader initialReader) { this.dataFormat = dataFormat; + Objects.requireNonNull(initialReader, "initialReader must not be null"); + this.currentReader = initialReader; } - /** - * Called when files are deleted after merges. - * - * @param files the collection of deleted file paths - */ - public void onFilesDeleted(Collection files) throws IOException { - // no-op + @Override + public OpenSearchDirectoryReader getReader(CatalogSnapshot catalogSnapshot) throws IOException { + OpenSearchDirectoryReader reader = readers.get(catalogSnapshot); + if (reader == null) { + throw new IllegalStateException("No reader available for catalog snapshot [gen=" + catalogSnapshot.getGeneration() + "]"); + } + return reader; } @Override - public void onFilesAdded(Collection files) throws IOException { + public void beforeRefresh() throws IOException { // no-op } @Override - public DirectoryReader getReader(CatalogSnapshot catalogSnapshot) throws IOException { - return readers.get(catalogSnapshot); + public void afterRefresh(boolean didRefresh, CatalogSnapshot catalogSnapshot) throws IOException { + if (didRefresh == false || readers.containsKey(catalogSnapshot)) { + return; + } + DirectoryReader refreshed = DirectoryReader.openIfChanged(currentReader); + if (refreshed != null) { + currentReader = (OpenSearchDirectoryReader) refreshed; + } + readers.put(catalogSnapshot, currentReader); } @Override - public void beforeRefresh() throws IOException { - + public void onDeleted(CatalogSnapshot catalogSnapshot) throws IOException { + OpenSearchDirectoryReader reader = readers.remove(catalogSnapshot); + if (reader != null) { + reader.close(); + } } @Override - public void afterRefresh(boolean didRefresh, CatalogSnapshot catalogSnapshot) throws IOException { - if (readers.containsKey(catalogSnapshot)) { - return; - } - readers.put(catalogSnapshot, (DirectoryReader) catalogSnapshot.getReader(dataFormat)); + public void onFilesDeleted(Collection files) throws IOException { + // no-op } @Override - public void onDeleted(CatalogSnapshot catalogSnapshot) throws IOException { - readers.remove(catalogSnapshot).close(); + public void onFilesAdded(Collection files) throws IOException { + // no-op } } diff --git a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchEnginePlugin.java b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchEnginePlugin.java index 103ffefdf718a..8c13b994db4fe 100644 --- a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchEnginePlugin.java +++ b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchEnginePlugin.java @@ -9,22 +9,38 @@ package org.opensearch.be.lucene; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; +import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.dataformat.DataFormat; import org.opensearch.index.engine.exec.EngineReaderManager; +import org.opensearch.index.engine.exec.commit.Committer; import org.opensearch.index.shard.ShardPath; +import org.opensearch.plugins.EnginePlugin; import org.opensearch.plugins.SearchBackEndPlugin; import java.io.IOException; import java.util.List; +import java.util.Optional; /** - * Plugin providing Lucene as an index filter or source provider. + * Plugin providing Lucene as a search back-end and committer for the composite engine. + *

+ * Implements: + *

+ *

+ * Both {@link #createReaderManager} accepts a {@link Committer}. + * When the committer is a {@link LuceneCommitter}, the IndexWriter is extracted and shared + * with the reader manager on a given shard. * * @opensearch.experimental */ @ExperimentalApi -public class LuceneSearchEnginePlugin implements SearchBackEndPlugin { +public class LuceneSearchEnginePlugin implements SearchBackEndPlugin, EnginePlugin { /** Creates a new LuceneSearchEnginePlugin. */ public LuceneSearchEnginePlugin() {} @@ -34,13 +50,30 @@ public String name() { return "lucene-analytics-backend"; } + // --- SearchBackEndPlugin --- + @Override - public EngineReaderManager createReaderManager(DataFormat format, ShardPath shardPath) throws IOException { - return new LuceneReaderManager(format); + public EngineReaderManager createReaderManager(Committer committer, DataFormat format, ShardPath shardPath) + throws IOException { + if (committer instanceof LuceneCommitter) { + IndexWriter writer = ((LuceneCommitter) committer).getIndexWriter(); + if (writer != null) { + OpenSearchDirectoryReader osReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardPath.getShardId()); + return new LuceneReaderManager(format, osReader); + } + } + throw new IllegalStateException("Cannot create LuceneReaderManager without an initialized LuceneCommitter"); } @Override public List getSupportedFormats() { return List.of(); } + + // --- EnginePlugin --- + + @Override + public Optional getCommitter(IndexSettings indexSettings) { + return Optional.of(new LuceneCommitter()); + } } diff --git a/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneCommitterTests.java b/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneCommitterTests.java new file mode 100644 index 0000000000000..1a2c2c7448dc8 --- /dev/null +++ b/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneCommitterTests.java @@ -0,0 +1,109 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.lucene; + +import org.apache.lucene.index.IndexWriter; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.exec.commit.CommitterSettings; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.test.IndexSettingsModule; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; + +/** + * Tests for {@link LuceneCommitter}. + */ +public class LuceneCommitterTests extends OpenSearchTestCase { + + private CommitterSettings createCommitterSettings() throws IOException { + Path baseDir = createTempDir(); + ShardId shardId = new ShardId("test", "_na_", 0); + Path dataPath = baseDir.resolve(shardId.getIndex().getUUID()).resolve(Integer.toString(shardId.id())); + Files.createDirectories(dataPath); + ShardPath shardPath = new ShardPath(false, dataPath, dataPath, shardId); + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", Settings.EMPTY); + return new CommitterSettings(shardPath, indexSettings); + } + + public void testInitOpensIndexWriter() throws IOException { + LuceneCommitter committer = new LuceneCommitter(); + try { + assertNull(committer.getIndexWriter()); + committer.init(createCommitterSettings()); + IndexWriter writer = committer.getIndexWriter(); + assertNotNull(writer); + assertTrue(writer.isOpen()); + } finally { + committer.close(); + } + } + + public void testCloseReleasesIndexWriter() throws IOException { + LuceneCommitter committer = new LuceneCommitter(); + committer.init(createCommitterSettings()); + assertNotNull(committer.getIndexWriter()); + + committer.close(); + assertNull(committer.getIndexWriter()); + } + + public void testCommitRoundTrip() throws IOException { + LuceneCommitter committer = new LuceneCommitter(); + try { + committer.init(createCommitterSettings()); + + Map commitData = Map.of("key1", "value1", "key2", "value2", "_snapshot_", "serialized-data"); + committer.commit(commitData); + + Map readBack = new HashMap<>(); + for (Map.Entry entry : committer.getIndexWriter().getLiveCommitData()) { + readBack.put(entry.getKey(), entry.getValue()); + } + + assertEquals("value1", readBack.get("key1")); + assertEquals("value2", readBack.get("key2")); + assertEquals("serialized-data", readBack.get("_snapshot_")); + } finally { + committer.close(); + } + } + + public void testCommitWithEmptyData() throws IOException { + LuceneCommitter committer = new LuceneCommitter(); + try { + committer.init(createCommitterSettings()); + + committer.commit(Map.of()); + + Map readBack = new HashMap<>(); + for (Map.Entry entry : committer.getIndexWriter().getLiveCommitData()) { + readBack.put(entry.getKey(), entry.getValue()); + } + + assertTrue(readBack.isEmpty()); + } finally { + committer.close(); + } + } + + public void testCommitAfterCloseThrows() throws IOException { + LuceneCommitter committer = new LuceneCommitter(); + committer.init(createCommitterSettings()); + committer.close(); + + expectThrows(IllegalStateException.class, () -> committer.commit(Map.of("key", "value"))); + } +} diff --git a/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneIndexingExecutionEngineTests.java b/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneIndexingExecutionEngineTests.java new file mode 100644 index 0000000000000..4f7e23e26acfb --- /dev/null +++ b/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneIndexingExecutionEngineTests.java @@ -0,0 +1,161 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.lucene; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.dataformat.RefreshInput; +import org.opensearch.index.engine.dataformat.RefreshResult; +import org.opensearch.index.engine.exec.WriterFileSet; +import org.opensearch.index.engine.exec.commit.CommitterSettings; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.test.IndexSettingsModule; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +/** + * Tests for {@link LuceneIndexingExecutionEngine}. + */ +public class LuceneIndexingExecutionEngineTests extends OpenSearchTestCase { + + private LuceneCommitter committer; + + private LuceneCommitter createAndInitCommitter() throws IOException { + Path baseDir = createTempDir(); + ShardId shardId = new ShardId("test", "_na_", 0); + Path dataPath = baseDir.resolve(shardId.getIndex().getUUID()).resolve(Integer.toString(shardId.id())); + Files.createDirectories(dataPath); + ShardPath shardPath = new ShardPath(false, dataPath, dataPath, shardId); + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", Settings.EMPTY); + + LuceneCommitter c = new LuceneCommitter(); + c.init(new CommitterSettings(shardPath, indexSettings)); + return c; + } + + @Override + public void setUp() throws Exception { + super.setUp(); + committer = createAndInitCommitter(); + } + + @Override + public void tearDown() throws Exception { + if (committer != null) { + committer.close(); + } + super.tearDown(); + } + + /** + * Property 1: refresh incorporates Lucene segments when parent writer is present. + * Validates: Requirements 2.3 + */ + public void testRefreshIncorporatesLuceneSegments() throws IOException { + LuceneIndexingExecutionEngine engine = new LuceneIndexingExecutionEngine(committer.getIndexWriter()); + IndexWriter writer = committer.getIndexWriter(); + assertEquals(0, writer.getDocStats().numDocs); + + int numDocs = randomIntBetween(1, 20); + // Directory must end with the data format name ("lucene") for the engine to recognize it + Path externalDir = createTempDir().resolve("lucene"); + Files.createDirectories(externalDir); + try ( + Directory extDirectory = FSDirectory.open(externalDir); + IndexWriter extWriter = new IndexWriter(extDirectory, new IndexWriterConfig()) + ) { + for (int i = 0; i < numDocs; i++) { + Document doc = new Document(); + doc.add(new StringField("id", "doc_" + i, Field.Store.YES)); + extWriter.addDocument(doc); + } + extWriter.commit(); + } + + WriterFileSet writerFileSet = WriterFileSet.builder().directory(externalDir).writerGeneration(1L).addNumRows(numDocs).build(); + RefreshInput refreshInput = RefreshInput.builder().addWriterFileSet(writerFileSet).build(); + + RefreshResult result = engine.refresh(refreshInput); + assertNotNull(result); + assertEquals(numDocs, writer.getDocStats().numDocs); + } + + /** + * Refresh skips WriterFileSets whose directory does not match the Lucene data format name. + */ + public void testRefreshSkipsNonLuceneDirectories() throws IOException { + LuceneIndexingExecutionEngine engine = new LuceneIndexingExecutionEngine(committer.getIndexWriter()); + IndexWriter writer = committer.getIndexWriter(); + + // Create a directory named "parquet" — should be skipped by the Lucene engine + Path parquetDir = createTempDir().resolve("parquet"); + Files.createDirectories(parquetDir); + try ( + Directory extDirectory = FSDirectory.open(parquetDir); + IndexWriter extWriter = new IndexWriter(extDirectory, new IndexWriterConfig()) + ) { + Document doc = new Document(); + doc.add(new StringField("id", "doc_0", Field.Store.YES)); + extWriter.addDocument(doc); + extWriter.commit(); + } + + WriterFileSet writerFileSet = WriterFileSet.builder().directory(parquetDir).writerGeneration(1L).addNumRows(1).build(); + RefreshInput refreshInput = RefreshInput.builder().addWriterFileSet(writerFileSet).build(); + + engine.refresh(refreshInput); + assertEquals("Non-lucene directories should be skipped", 0, writer.getDocStats().numDocs); + } + + /** + * Refresh with no files skips addIndexes. + */ + public void testRefreshWithNoLuceneFilesSkipsAddIndexes() throws IOException { + LuceneIndexingExecutionEngine engine = new LuceneIndexingExecutionEngine(committer.getIndexWriter()); + + RefreshInput emptyInput = RefreshInput.builder().build(); + RefreshResult result = engine.refresh(emptyInput); + assertNotNull(result); + assertEquals(0, committer.getIndexWriter().getDocStats().numDocs); + } + + /** + * Refresh with no parent writer is a no-op. + */ + public void testRefreshWithoutParentWriterIsNoOp() throws IOException { + LuceneIndexingExecutionEngine engine = new LuceneIndexingExecutionEngine(null); + + RefreshInput input = RefreshInput.builder().build(); + RefreshResult result = engine.refresh(input); + assertNotNull(result); + assertTrue(result.refreshedSegments().isEmpty()); + } + + /** + * Refresh with null input returns empty result. + */ + public void testRefreshWithNullInputReturnsEmpty() throws IOException { + LuceneIndexingExecutionEngine engine = new LuceneIndexingExecutionEngine(committer.getIndexWriter()); + + RefreshResult result = engine.refresh(null); + assertNotNull(result); + assertTrue(result.refreshedSegments().isEmpty()); + } +} diff --git a/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneReaderManagerTests.java b/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneReaderManagerTests.java new file mode 100644 index 0000000000000..7931aa300f29c --- /dev/null +++ b/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneReaderManagerTests.java @@ -0,0 +1,319 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.lucene; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.MMapDirectory; +import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.FieldTypeCapabilities; +import org.opensearch.index.engine.exec.Segment; +import org.opensearch.index.engine.exec.WriterFileSet; +import org.opensearch.index.engine.exec.coord.CatalogSnapshot; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Tests for {@link LuceneReaderManager} lifecycle with CatalogSnapshot interactions. + */ +public class LuceneReaderManagerTests extends OpenSearchTestCase { + + private IndexWriter indexWriter; + private Directory directory; + private ShardId shardId; + private DataFormat dataFormat; + + @Override + public void setUp() throws Exception { + super.setUp(); + Path dir = createTempDir(); + directory = new MMapDirectory(dir); + indexWriter = new IndexWriter(directory, new IndexWriterConfig()); + shardId = new ShardId("test", "_na_", 0); + dataFormat = new DataFormat() { + @Override + public String name() { + return "lucene"; + } + + @Override + public long priority() { + return 0; + } + + @Override + public Set supportedFields() { + return Set.of(); + } + }; + } + + @Override + public void tearDown() throws Exception { + indexWriter.close(); + directory.close(); + super.tearDown(); + } + + private OpenSearchDirectoryReader openReader() throws IOException { + return OpenSearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId); + } + + private CatalogSnapshot stubSnapshot(long generation) { + return new CatalogSnapshot("test", generation, 1) { + @Override + protected void closeInternal() {} + + @Override + public Map getUserData() { + return Map.of(); + } + + @Override + public long getId() { + return generation; + } + + @Override + public List getSegments() { + return List.of(); + } + + @Override + public Collection getSearchableFiles(String df) { + return List.of(); + } + + @Override + public Set getDataFormats() { + return Set.of(); + } + + @Override + public long getLastWriterGeneration() { + return 0; + } + + @Override + public String serializeToString() { + return ""; + } + + @Override + public CatalogSnapshot cloneNoAcquire() { + return this; + } + + @Override + public void setUserData(Map userData) {} + + @Override + public CatalogSnapshot clone() { + return this; + } + }; + } + + private void addDoc(String id) throws IOException { + Document doc = new Document(); + doc.add(new StringField("id", id, Field.Store.YES)); + indexWriter.addDocument(doc); + indexWriter.commit(); + } + + /** + * afterRefresh with didRefresh=true creates a reader for the snapshot. + */ + public void testAfterRefreshCreatesReader() throws IOException { + LuceneReaderManager rm = new LuceneReaderManager(dataFormat, openReader()); + CatalogSnapshot snap = stubSnapshot(1); + + expectThrows(IllegalStateException.class, () -> rm.getReader(snap)); + rm.afterRefresh(true, snap); + assertNotNull(rm.getReader(snap)); + } + + /** + * afterRefresh with didRefresh=false does not create a reader. + */ + public void testAfterRefreshNoOpWhenDidRefreshFalse() throws IOException { + LuceneReaderManager rm = new LuceneReaderManager(dataFormat, openReader()); + CatalogSnapshot snap = stubSnapshot(1); + + rm.afterRefresh(false, snap); + expectThrows(IllegalStateException.class, () -> rm.getReader(snap)); + } + + /** + * Multiple refreshes after indexing produce readers that see the correct doc counts. + */ + public void testMultipleRefreshesWithIndexing() throws IOException { + LuceneReaderManager rm = new LuceneReaderManager(dataFormat, openReader()); + + // Snapshot 1: 0 docs + CatalogSnapshot snap1 = stubSnapshot(1); + rm.afterRefresh(true, snap1); + OpenSearchDirectoryReader reader1 = rm.getReader(snap1); + assertNotNull(reader1); + assertEquals(0, new IndexSearcher(reader1).count(new MatchAllDocsQuery())); + + // Index a doc and refresh + addDoc("doc1"); + CatalogSnapshot snap2 = stubSnapshot(2); + rm.afterRefresh(true, snap2); + OpenSearchDirectoryReader reader2 = rm.getReader(snap2); + assertNotNull(reader2); + assertEquals(1, new IndexSearcher(reader2).count(new MatchAllDocsQuery())); + + // Snapshot 1 reader still sees 0 docs (point-in-time) + assertEquals(0, new IndexSearcher(reader1).count(new MatchAllDocsQuery())); + + // Index another doc and refresh + addDoc("doc2"); + CatalogSnapshot snap3 = stubSnapshot(3); + rm.afterRefresh(true, snap3); + OpenSearchDirectoryReader reader3 = rm.getReader(snap3); + assertEquals(2, new IndexSearcher(reader3).count(new MatchAllDocsQuery())); + + // Each snapshot has its own reader + assertNotSame(reader1, reader2); + assertNotSame(reader2, reader3); + + // Cleanup + rm.onDeleted(snap1); + rm.onDeleted(snap2); + rm.onDeleted(snap3); + } + + /** + * onDeleted removes the reader and closes it. + */ + public void testOnDeletedClosesReader() throws IOException { + LuceneReaderManager rm = new LuceneReaderManager(dataFormat, openReader()); + CatalogSnapshot snap = stubSnapshot(1); + rm.afterRefresh(true, snap); + + OpenSearchDirectoryReader reader = rm.getReader(snap); + assertNotNull(reader); + // Reader should have positive ref count + assertTrue(reader.getRefCount() > 0); + + rm.onDeleted(snap); + expectThrows(IllegalStateException.class, () -> rm.getReader(snap)); + } + + /** + * onDeleted with unknown snapshot is a no-op. + */ + public void testOnDeletedUnknownSnapshotIsNoOp() throws IOException { + LuceneReaderManager rm = new LuceneReaderManager(dataFormat, openReader()); + CatalogSnapshot unknown = stubSnapshot(99); + rm.onDeleted(unknown); // should not throw + } + + /** + * getReader throws IllegalStateException for a snapshot that was never refreshed. + */ + public void testGetReaderThrowsForUnknownSnapshot() throws IOException { + LuceneReaderManager rm = new LuceneReaderManager(dataFormat, openReader()); + CatalogSnapshot unknown = stubSnapshot(42); + expectThrows(IllegalStateException.class, () -> rm.getReader(unknown)); + } + + /** + * Duplicate afterRefresh for the same snapshot is idempotent. + */ + public void testDuplicateAfterRefreshIsIdempotent() throws IOException { + LuceneReaderManager rm = new LuceneReaderManager(dataFormat, openReader()); + CatalogSnapshot snap = stubSnapshot(1); + + rm.afterRefresh(true, snap); + OpenSearchDirectoryReader first = rm.getReader(snap); + + rm.afterRefresh(true, snap); + assertSame(first, rm.getReader(snap)); + + rm.onDeleted(snap); + } + + /** + * Multiple getReader calls on the same snapshot return the same reader with stable ref count. + * The reader ref count stays at 1 (the reader manager's own reference). + * onDeleted closes the reader, dropping the ref count to 0. + */ + public void testMultipleAcquiresRefCountStaysOne() throws IOException { + LuceneReaderManager rm = new LuceneReaderManager(dataFormat, openReader()); + CatalogSnapshot snap = stubSnapshot(1); + rm.afterRefresh(true, snap); + + OpenSearchDirectoryReader reader = rm.getReader(snap); + int initialRefCount = reader.getRefCount(); + assertEquals("Reader ref count should be 1 after afterRefresh", 1, initialRefCount); + + // Multiple getReader calls should return the same instance without incrementing ref count + for (int i = 0; i < 5; i++) { + OpenSearchDirectoryReader same = rm.getReader(snap); + assertSame(reader, same); + assertEquals("Ref count must not change on getReader", 1, reader.getRefCount()); + } + + // onDeleted closes the reader + rm.onDeleted(snap); + expectThrows(IllegalStateException.class, () -> rm.getReader(snap)); + assertEquals("Ref count should be 0 after onDeleted", 0, reader.getRefCount()); + } + + /** + * Two snapshots sharing the same underlying reader (no index changes between refreshes). + * Deleting one closes the shared reader. The other snapshot still has a map entry + * but the reader is closed. In production, CatalogSnapshotManager ensures ordered deletion. + */ + public void testTwoSnapshotsSameReaderDeleteOneClosesSharedReader() throws IOException { + LuceneReaderManager rm = new LuceneReaderManager(dataFormat, openReader()); + + // No indexing between refreshes — both snapshots get the same reader + CatalogSnapshot snap1 = stubSnapshot(1); + rm.afterRefresh(true, snap1); + OpenSearchDirectoryReader reader1 = rm.getReader(snap1); + + CatalogSnapshot snap2 = stubSnapshot(2); + rm.afterRefresh(true, snap2); + OpenSearchDirectoryReader reader2 = rm.getReader(snap2); + + // Same reader since no changes happened + assertSame(reader1, reader2); + + // Delete snap1 — closes the shared reader + rm.onDeleted(snap1); + expectThrows(IllegalStateException.class, () -> rm.getReader(snap1)); + + // snap2 still has a map entry but the underlying reader is closed + // This is expected — in production, snapshots are deleted in order + OpenSearchDirectoryReader snap2Reader = rm.getReader(snap2); + assertEquals("Shared reader should be closed after snap1 deletion", 0, snap2Reader.getRefCount()); + + // Clean up + rm.onDeleted(snap2); + } +} diff --git a/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneSearchEnginePluginTests.java b/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneSearchEnginePluginTests.java new file mode 100644 index 0000000000000..649953953eee2 --- /dev/null +++ b/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneSearchEnginePluginTests.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.be.lucene; + +import org.opensearch.index.engine.exec.commit.Committer; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Optional; + +/** + * Tests for {@link LuceneSearchEnginePlugin}. + */ +public class LuceneSearchEnginePluginTests extends OpenSearchTestCase { + + /** + * Test that getCommitter() returns a non-empty Optional containing + * a LuceneCommitter instance. + * + * Validates: Requirements 4.2 + */ + public void testGetCommitterReturnsLuceneCommitter() { + LuceneSearchEnginePlugin plugin = new LuceneSearchEnginePlugin(); + Optional committer = plugin.getCommitter(null); + + assertTrue("getCommitter() should return a non-empty Optional", committer.isPresent()); + assertTrue("getCommitter() should return a LuceneCommitter instance", committer.get() instanceof LuceneCommitter); + } +} diff --git a/sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/exec/DefaultPlanExecutorTests.java b/sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/exec/DefaultPlanExecutorTests.java index 14d9dd2a17ed6..d235f6fe4a3e4 100644 --- a/sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/exec/DefaultPlanExecutorTests.java +++ b/sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/exec/DefaultPlanExecutorTests.java @@ -294,11 +294,6 @@ public String serializeToString() { @Override public void setUserData(Map userData) {} - @Override - public Object getReader(DataFormat dataFormat) { - return null; - } - @Override public MockCatalogSnapshot clone() { return new MockCatalogSnapshot(generation, segments, format); diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java index 182171c499ff0..912bc00ea300d 100644 --- a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java @@ -16,6 +16,7 @@ import org.opensearch.index.engine.dataformat.DataFormat; import org.opensearch.index.engine.dataformat.DataFormatPlugin; import org.opensearch.index.engine.dataformat.IndexingExecutionEngine; +import org.opensearch.index.engine.exec.commit.Committer; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.shard.ShardPath; import org.opensearch.plugins.ExtensiblePlugin; @@ -137,8 +138,13 @@ public DataFormat getDataFormat() { } @Override - public IndexingExecutionEngine indexingEngine(MapperService mapperService, ShardPath shardPath, IndexSettings indexSettings) { - return new CompositeIndexingExecutionEngine(dataFormatPlugins, indexSettings, mapperService, shardPath); + public IndexingExecutionEngine indexingEngine( + Committer committer, + MapperService mapperService, + ShardPath shardPath, + IndexSettings indexSettings + ) { + return new CompositeIndexingExecutionEngine(dataFormatPlugins, indexSettings, mapperService, shardPath, committer); } /** diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java index 9383112b7a8b4..c76c6a5022935 100644 --- a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.queue.LockablePool; import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexSettings; @@ -25,12 +26,17 @@ import org.opensearch.index.engine.dataformat.Writer; import org.opensearch.index.engine.exec.Segment; import org.opensearch.index.engine.exec.WriterFileSet; +import org.opensearch.index.engine.exec.commit.Committer; +import org.opensearch.index.engine.exec.commit.CommitterSettings; +import org.opensearch.index.engine.exec.coord.CatalogSnapshot; +import org.opensearch.index.engine.exec.coord.CatalogSnapshotManager; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.shard.ShardPath; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.IdentityHashMap; import java.util.LinkedList; import java.util.List; @@ -62,6 +68,8 @@ public class CompositeIndexingExecutionEngine implements IndexingExecutionEngine private final CompositeDataFormat compositeDataFormat; private final LockablePool writerPool; private final AtomicLong writerGenerationCounter; + private final Committer committer; + private CatalogSnapshotManager catalogSnapshotManager; /** * Constructs a CompositeIndexingExecutionEngine by reading index settings to @@ -80,16 +88,22 @@ public class CompositeIndexingExecutionEngine implements IndexingExecutionEngine * @param indexSettings the index settings containing composite configuration * @param mapperService the mapper service for field mapping resolution * @param shardPath the shard path for file storage + * @param committer the committer for durable catalog snapshot persistence during flush * @throws IllegalArgumentException if any configured format is not registered + * @throws IllegalStateException if committer is null */ public CompositeIndexingExecutionEngine( Map dataFormatPlugins, IndexSettings indexSettings, MapperService mapperService, - ShardPath shardPath + ShardPath shardPath, + Committer committer ) { Objects.requireNonNull(dataFormatPlugins, "dataFormatPlugins must not be null"); Objects.requireNonNull(indexSettings, "indexSettings must not be null"); + if (committer == null) { + throw new IllegalStateException("Committer must not be null"); + } Settings settings = indexSettings.getSettings(); @@ -100,13 +114,13 @@ public CompositeIndexingExecutionEngine( List allFormats = new ArrayList<>(); DataFormatPlugin primaryPlugin = dataFormatPlugins.get(primaryFormatName); - this.primaryEngine = primaryPlugin.indexingEngine(mapperService, shardPath, indexSettings); + this.primaryEngine = primaryPlugin.indexingEngine(committer, mapperService, shardPath, indexSettings); allFormats.add(primaryPlugin.getDataFormat()); List> secondaries = new ArrayList<>(); for (String secondaryName : secondaryFormatNames) { DataFormatPlugin secondaryPlugin = dataFormatPlugins.get(secondaryName); - secondaries.add(secondaryPlugin.indexingEngine(mapperService, shardPath, indexSettings)); + secondaries.add(secondaryPlugin.indexingEngine(committer, mapperService, shardPath, indexSettings)); allFormats.add(secondaryPlugin.getDataFormat()); } this.secondaryEngines = Set.copyOf(secondaries); @@ -120,6 +134,13 @@ public CompositeIndexingExecutionEngine( LinkedList::new, Runtime.getRuntime().availableProcessors() ); + + this.committer = committer; + try { + committer.init(new CommitterSettings(shardPath, indexSettings)); + } catch (IOException e) { + throw new RuntimeException("Failed to initialize committer", e); + } } /** @@ -288,6 +309,48 @@ public CompositeDocumentInput newDocumentInput() { return new CompositeDocumentInput(primaryEngine.getDataFormat(), primaryInput, secondaryInputMap); } + /** + * Sets the {@link CatalogSnapshotManager} used by {@link #flush()} to acquire the latest snapshot. + * + * @param catalogSnapshotManager the catalog snapshot manager + */ + public void setCatalogSnapshotManager(CatalogSnapshotManager catalogSnapshotManager) { + this.catalogSnapshotManager = catalogSnapshotManager; + } + + /** + * Durably commits the latest {@link CatalogSnapshot} via the {@link Committer}. + * Acquires the current snapshot from the {@link CatalogSnapshotManager}, passes it + * to the committer, and releases the snapshot reference when done. + * + * @throws IOException if the committer's commit fails + * @throws IllegalStateException if the CatalogSnapshotManager has not been set + */ + public void flush() throws IOException { + if (catalogSnapshotManager == null) { + throw new IllegalStateException("CatalogSnapshotManager not set"); + } + try (GatedCloseable snapshotRef = catalogSnapshotManager.acquireSnapshot()) { + CatalogSnapshot snapshot = snapshotRef.get(); + Map commitData = new HashMap<>(snapshot.getUserData()); + commitData.put(CatalogSnapshot.CATALOG_SNAPSHOT_KEY, snapshot.serializeToString()); + committer.commit(commitData); + } + } + + /** + * Closes the committer, releasing any resources it holds (e.g., IndexWriter). + * If the committer's close throws an IOException, the error is logged and + * shutdown continues. + */ + public void close() { + try { + committer.close(); + } catch (IOException e) { + logger.error("Failed to close committer", e); + } + } + /** * Returns the primary delegate engine. * diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java index c43fc891e3279..f5578e01da823 100644 --- a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java +++ b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java @@ -102,6 +102,7 @@ public DataFormat getDataFormat() { @Override public org.opensearch.index.engine.dataformat.IndexingExecutionEngine indexingEngine( + org.opensearch.index.engine.exec.commit.Committer committer, org.opensearch.index.mapper.MapperService mapperService, org.opensearch.index.shard.ShardPath shardPath, IndexSettings indexSettings diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeIndexingExecutionEngineTests.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeIndexingExecutionEngineTests.java index fb6bc8f08c0f2..64b3200d16d2c 100644 --- a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeIndexingExecutionEngineTests.java +++ b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeIndexingExecutionEngineTests.java @@ -12,7 +12,13 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.CommitStats; +import org.opensearch.index.engine.SafeCommitInfo; import org.opensearch.index.engine.dataformat.DataFormatPlugin; +import org.opensearch.index.engine.dataformat.RefreshInput; +import org.opensearch.index.engine.exec.commit.Committer; +import org.opensearch.index.engine.exec.commit.CommitterSettings; +import org.opensearch.index.engine.exec.coord.CatalogSnapshotManager; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -51,7 +57,7 @@ public void testConstructorThrowsWhenPrimaryFormatNotRegistered() { IndexSettings indexSettings = createIndexSettings("parquet"); IllegalArgumentException ex = expectThrows( IllegalArgumentException.class, - () -> new CompositeIndexingExecutionEngine(plugins, indexSettings, null, null) + () -> new CompositeIndexingExecutionEngine(plugins, indexSettings, null, null, new CompositeTestHelper.StubCommitter()) ); assertTrue(ex.getMessage().contains("parquet")); } @@ -72,19 +78,25 @@ public void testConstructorThrowsWhenSecondaryFormatNotRegistered() { IllegalArgumentException ex = expectThrows( IllegalArgumentException.class, - () -> new CompositeIndexingExecutionEngine(plugins, indexSettings, null, null) + () -> new CompositeIndexingExecutionEngine(plugins, indexSettings, null, null, new CompositeTestHelper.StubCommitter()) ); assertTrue(ex.getMessage().contains("parquet")); } public void testConstructorRejectsNullDataFormatPlugins() { IndexSettings indexSettings = createIndexSettings("lucene"); - expectThrows(NullPointerException.class, () -> new CompositeIndexingExecutionEngine(null, indexSettings, null, null)); + expectThrows( + NullPointerException.class, + () -> new CompositeIndexingExecutionEngine(null, indexSettings, null, null, new CompositeTestHelper.StubCommitter()) + ); } public void testConstructorRejectsNullIndexSettings() { Map plugins = Map.of("lucene", CompositeTestHelper.stubPlugin("lucene", 1)); - expectThrows(NullPointerException.class, () -> new CompositeIndexingExecutionEngine(plugins, null, null, null)); + expectThrows( + NullPointerException.class, + () -> new CompositeIndexingExecutionEngine(plugins, null, null, null, new CompositeTestHelper.StubCommitter()) + ); } public void testValidateFormatsRegisteredAcceptsValidConfig() { @@ -168,6 +180,260 @@ public void testDeleteFilesDoesNotThrow() throws Exception { engine.deleteFiles(Map.of()); } + // --- Task 8.5: Property test — Committer is required --- + + /** + * Property 2: Committer is required. + * Attempting to construct CompositeIndexingExecutionEngine with a null Committer + * must throw IllegalStateException. + * + * Validates: Requirements 3.2 + */ + public void testConstructorThrowsWhenCommitterNull() { + Map plugins = new HashMap<>(); + plugins.put("lucene", CompositeTestHelper.stubPlugin("lucene", 1)); + IndexSettings indexSettings = createIndexSettings("lucene"); + + IllegalStateException ex = expectThrows( + IllegalStateException.class, + () -> new CompositeIndexingExecutionEngine(plugins, indexSettings, null, null, null) + ); + assertTrue(ex.getMessage().contains("Committer must not be null")); + } + + // --- Task 8.6: Property test — Refresh never calls Committer methods --- + + /** + * Property 5: Refresh never calls Committer methods. + * Running refresh() on the composite engine must not invoke commit() on the Committer. + * + * Validates: Requirements 3.6 + */ + public void testRefreshNeverCallsCommitterMethods() throws IOException { + TrackingCommitter tracking = new TrackingCommitter(); + Map plugins = new HashMap<>(); + plugins.put("lucene", CompositeTestHelper.stubPlugin("lucene", 1)); + IndexSettings indexSettings = createIndexSettings("lucene"); + + CompositeIndexingExecutionEngine engine = new CompositeIndexingExecutionEngine(plugins, indexSettings, null, null, tracking); + + // Reset tracking after construction (init is called during construction) + tracking.commitCalled = false; + + RefreshInput refreshInput = RefreshInput.builder().build(); + engine.refresh(refreshInput); + + assertFalse("commit() must not be called during refresh", tracking.commitCalled); + } + + // --- Task 8.7: Unit tests for flush and Committer lifecycle --- + + public void testInitCalledDuringConstruction() { + CompositeTestHelper.StubCommitter stub = new CompositeTestHelper.StubCommitter(); + Map plugins = new HashMap<>(); + plugins.put("lucene", CompositeTestHelper.stubPlugin("lucene", 1)); + IndexSettings indexSettings = createIndexSettings("lucene"); + + new CompositeIndexingExecutionEngine(plugins, indexSettings, null, null, stub); + assertTrue("init() must be called during construction", stub.initCalled); + } + + public void testCloseCalledDuringShutdown() { + CompositeTestHelper.StubCommitter stub = new CompositeTestHelper.StubCommitter(); + Map plugins = new HashMap<>(); + plugins.put("lucene", CompositeTestHelper.stubPlugin("lucene", 1)); + IndexSettings indexSettings = createIndexSettings("lucene"); + + CompositeIndexingExecutionEngine engine = new CompositeIndexingExecutionEngine(plugins, indexSettings, null, null, stub); + engine.close(); + assertTrue("close() must be called during shutdown", stub.closeCalled); + } + + public void testInitFailurePreventsConstruction() { + Committer failingInit = new Committer() { + @Override + public void init(CommitterSettings settings) throws IOException { + throw new IOException("init failed"); + } + + @Override + public void commit(Map commitData) {} + + @Override + public void close() {} + + @Override + public Map getLastCommittedData() { + return Map.of(); + } + + @Override + public CommitStats getCommitStats() { + return null; + } + + @Override + public SafeCommitInfo getSafeCommitInfo() { + return SafeCommitInfo.EMPTY; + } + }; + + Map plugins = new HashMap<>(); + plugins.put("lucene", CompositeTestHelper.stubPlugin("lucene", 1)); + IndexSettings indexSettings = createIndexSettings("lucene"); + + RuntimeException ex = expectThrows( + RuntimeException.class, + () -> new CompositeIndexingExecutionEngine(plugins, indexSettings, null, null, failingInit) + ); + assertTrue(ex.getMessage().contains("Failed to initialize committer")); + } + + public void testCloseFailureIsLoggedAndShutdownContinues() { + Committer failingClose = new Committer() { + @Override + public void init(CommitterSettings settings) {} + + @Override + public void commit(Map commitData) {} + + @Override + public void close() throws IOException { + throw new IOException("close failed"); + } + + @Override + public Map getLastCommittedData() { + return Map.of(); + } + + @Override + public CommitStats getCommitStats() { + return null; + } + + @Override + public SafeCommitInfo getSafeCommitInfo() { + return SafeCommitInfo.EMPTY; + } + }; + + Map plugins = new HashMap<>(); + plugins.put("lucene", CompositeTestHelper.stubPlugin("lucene", 1)); + IndexSettings indexSettings = createIndexSettings("lucene"); + + CompositeIndexingExecutionEngine engine = new CompositeIndexingExecutionEngine(plugins, indexSettings, null, null, failingClose); + + // close() should not throw — it logs the error and continues + engine.close(); + } + + public void testFlushCallsCommitterCommit() throws IOException { + TrackingCommitter tracking = new TrackingCommitter(); + Map plugins = new HashMap<>(); + plugins.put("lucene", CompositeTestHelper.stubPlugin("lucene", 1)); + IndexSettings indexSettings = createIndexSettings("lucene"); + + CompositeIndexingExecutionEngine engine = new CompositeIndexingExecutionEngine(plugins, indexSettings, null, null, tracking); + + CatalogSnapshotManager csm = new CatalogSnapshotManager(0, 0, 0, List.of(), 0, Map.of()); + engine.setCatalogSnapshotManager(csm); + + engine.flush(); + assertTrue("commit() must be called during flush", tracking.commitCalled); + assertNotNull("commit() must receive commit data", tracking.lastCommitData); + } + + public void testFlushPropagatesIOExceptionFromCommit() { + Committer failingCommit = new Committer() { + @Override + public void init(CommitterSettings settings) {} + + @Override + public void commit(Map commitData) throws IOException { + throw new IOException("commit failed"); + } + + @Override + public void close() {} + + @Override + public Map getLastCommittedData() { + return Map.of(); + } + + @Override + public CommitStats getCommitStats() { + return null; + } + + @Override + public SafeCommitInfo getSafeCommitInfo() { + return SafeCommitInfo.EMPTY; + } + }; + + Map plugins = new HashMap<>(); + plugins.put("lucene", CompositeTestHelper.stubPlugin("lucene", 1)); + IndexSettings indexSettings = createIndexSettings("lucene"); + + CompositeIndexingExecutionEngine engine = new CompositeIndexingExecutionEngine(plugins, indexSettings, null, null, failingCommit); + + CatalogSnapshotManager csm = new CatalogSnapshotManager(0, 0, 0, List.of(), 0, Map.of()); + engine.setCatalogSnapshotManager(csm); + + IOException ex = expectThrows(IOException.class, engine::flush); + assertTrue(ex.getMessage().contains("commit failed")); + } + + public void testFlushThrowsWhenCatalogSnapshotManagerNotSet() { + CompositeIndexingExecutionEngine engine = CompositeTestHelper.createStubEngine("lucene"); + + IllegalStateException ex = expectThrows(IllegalStateException.class, engine::flush); + assertTrue(ex.getMessage().contains("CatalogSnapshotManager not set")); + } + + /** + * A Committer that tracks which methods were called, for test assertions. + */ + private static class TrackingCommitter implements Committer { + boolean initCalled = false; + boolean commitCalled = false; + boolean closeCalled = false; + Map lastCommitData = null; + + @Override + public void init(CommitterSettings settings) { + initCalled = true; + } + + @Override + public void commit(Map commitData) { + commitCalled = true; + lastCommitData = commitData; + } + + @Override + public void close() { + closeCalled = true; + } + + @Override + public Map getLastCommittedData() { + return Map.of(); + } + + @Override + public CommitStats getCommitStats() { + return null; + } + + @Override + public SafeCommitInfo getSafeCommitInfo() { + return SafeCommitInfo.EMPTY; + } + } + private IndexSettings createIndexSettings(String primaryFormat) { Settings settings = Settings.builder() .put("index.composite.primary_data_format", primaryFormat) diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java index af83555e7b7b3..bee1a07ee7964 100644 --- a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java +++ b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java @@ -12,6 +12,8 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.CommitStats; +import org.opensearch.index.engine.SafeCommitInfo; import org.opensearch.index.engine.dataformat.DataFormat; import org.opensearch.index.engine.dataformat.DataFormatPlugin; import org.opensearch.index.engine.dataformat.DocumentInput; @@ -23,6 +25,8 @@ import org.opensearch.index.engine.dataformat.RefreshResult; import org.opensearch.index.engine.dataformat.WriteResult; import org.opensearch.index.engine.dataformat.Writer; +import org.opensearch.index.engine.exec.commit.Committer; +import org.opensearch.index.engine.exec.commit.CommitterSettings; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.shard.ShardPath; @@ -64,7 +68,7 @@ static CompositeIndexingExecutionEngine createStubEngine(String primaryName, Str IndexMetadata indexMetadata = IndexMetadata.builder("test-index").settings(settings).build(); IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY); - return new CompositeIndexingExecutionEngine(plugins, indexSettings, null, null); + return new CompositeIndexingExecutionEngine(plugins, indexSettings, null, null, new StubCommitter()); } static DataFormatPlugin stubPlugin(String formatName, long priority) { @@ -77,6 +81,7 @@ public DataFormat getDataFormat() { @Override public IndexingExecutionEngine indexingEngine( + Committer committer, MapperService mapperService, ShardPath shardPath, IndexSettings indexSettings @@ -96,6 +101,7 @@ public DataFormat getDataFormat() { @Override public IndexingExecutionEngine indexingEngine( + Committer committer, MapperService mapperService, ShardPath shardPath, IndexSettings indexSettings @@ -226,4 +232,40 @@ public void setRowId(String rowIdFieldName, long rowId) {} @Override public void close() {} } + + /** + * Minimal stub Committer that records init/close calls and does nothing on commit. + */ + static class StubCommitter implements Committer { + boolean initCalled = false; + boolean closeCalled = false; + + @Override + public void init(CommitterSettings settings) { + initCalled = true; + } + + @Override + public void commit(Map commitData) {} + + @Override + public void close() { + closeCalled = true; + } + + @Override + public Map getLastCommittedData() { + return Map.of(); + } + + @Override + public CommitStats getCommitStats() { + return null; + } + + @Override + public SafeCommitInfo getSafeCommitInfo() { + return SafeCommitInfo.EMPTY; + } + } } diff --git a/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetDataFormatPlugin.java b/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetDataFormatPlugin.java index 504722af216d1..d4343f08759d4 100644 --- a/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetDataFormatPlugin.java +++ b/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetDataFormatPlugin.java @@ -21,6 +21,7 @@ import org.opensearch.index.engine.dataformat.DataFormat; import org.opensearch.index.engine.dataformat.DataFormatPlugin; import org.opensearch.index.engine.dataformat.IndexingExecutionEngine; +import org.opensearch.index.engine.exec.commit.Committer; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.shard.ShardPath; import org.opensearch.parquet.engine.ParquetDataFormat; @@ -88,7 +89,12 @@ public DataFormat getDataFormat() { } @Override - public IndexingExecutionEngine indexingEngine(MapperService mapperService, ShardPath shardPath, IndexSettings indexSettings) { + public IndexingExecutionEngine indexingEngine( + Committer committer, + MapperService mapperService, + ShardPath shardPath, + IndexSettings indexSettings + ) { return new ParquetIndexingEngine( settings, dataFormat, diff --git a/server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java b/server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java index 8ba14b5fcacf4..477fd9aaf4c88 100644 --- a/server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java @@ -14,6 +14,7 @@ import org.opensearch.index.engine.exec.DataFormatAwareEngineFactory; import org.opensearch.index.engine.exec.EngineReaderManager; import org.opensearch.index.engine.exec.IndexReaderProvider; +import org.opensearch.index.engine.exec.commit.Committer; import org.opensearch.index.engine.exec.coord.CatalogSnapshot; import org.opensearch.index.engine.exec.coord.CatalogSnapshotManager; @@ -37,6 +38,7 @@ public class DataFormatAwareEngine implements IndexReaderProvider, Closeable { private final Map> readerManagers; private volatile CatalogSnapshotManager catalogSnapshotManager; + private volatile Committer committer; /** * Constructs a new DataFormatAwareEngine. @@ -59,6 +61,25 @@ public void setCatalogSnapshotManager(CatalogSnapshotManager catalogSnapshotMana this.catalogSnapshotManager = catalogSnapshotManager; } + /** + * Sets the committer for durable catalog snapshot persistence. + * May be null if no committer is configured. + * + * @param committer the committer instance, or null + */ + public void setCommitter(Committer committer) { + this.committer = committer; + } + + /** + * Returns the committer, or null if none has been set. + * + * @return the committer instance, or null + */ + public Committer getCommitter() { + return committer; + } + public EngineReaderManager getReaderManager(DataFormat format) { return readerManagers.get(format); } diff --git a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java index 9c28cf4cb825b..e1241857e5b50 100644 --- a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java +++ b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java @@ -10,16 +10,10 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.exec.commit.Committer; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.shard.ShardPath; -/** - * Plugin interface for providing custom data format implementations. - * Plugins implement this to register their data format (e.g., Parquet, Lucene) - * with the DataFormatRegistry during node bootstrap. - * - * @opensearch.experimental - */ /** * Plugin interface for providing custom data format implementations. * Plugins implement this to register their data format (e.g., Parquet, Lucene) @@ -39,11 +33,19 @@ public interface DataFormatPlugin { /** * Creates the indexing engine for the data format. This should be instantiated per shard. + * The {@link Committer} provides access to the backing store (e.g., IndexWriter) so that + * per-format engines can share the same writer for segment incorporation. * + * @param committer the committer holding the backing store, or null if not available * @param mapperService the mapper service for field mapping resolution * @param shardPath the shard path for file storage * @param indexSettings the index settings * @return the indexing execution engine instance */ - IndexingExecutionEngine indexingEngine(MapperService mapperService, ShardPath shardPath, IndexSettings indexSettings); + IndexingExecutionEngine indexingEngine( + Committer committer, + MapperService mapperService, + ShardPath shardPath, + IndexSettings indexSettings + ); } diff --git a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java index 3df6560769fdb..598dd96cfd8a1 100644 --- a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java +++ b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java @@ -8,10 +8,10 @@ package org.opensearch.index.engine.dataformat; -import org.opensearch.common.CheckedFunction; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.exec.EngineReaderManager; +import org.opensearch.index.engine.exec.commit.Committer; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.shard.ShardPath; import org.opensearch.plugins.PluginsService; @@ -34,61 +34,55 @@ @ExperimentalApi public class DataFormatRegistry { - /** Map from data format to the plugin that provides its indexing engine. */ private final Map dataFormatPluginRegistry; - - /** Map from data format to a factory that creates an {@link EngineReaderManager} for a given shard path. */ - private final Map, IOException>> readerManagerBuilders; - + private final Map> readerManagerPlugins; private final Map dataFormats; /** * Creates a registry by discovering all {@link DataFormatPlugin} and {@link SearchBackEndPlugin} implementations - * from the given {@link PluginsService}. Registers each data format with its indexing plugin and reader manager factory. + * from the given {@link PluginsService}. * * @param pluginsService the plugins service used to discover data format plugins and search back-end plugins - * @throws IllegalArgumentException if a data format is registered by more than one plugin - * @throws IllegalStateException if the set of formats with indexing plugins does not match the set with reader managers */ public DataFormatRegistry(PluginsService pluginsService) { - Map dataFormatPlugiRegistry = new HashMap<>(); - Map, IOException>> readerManagerBuilders = new HashMap<>(); - Map dataFormats = new HashMap<>(); + Map pluginRegistry = new HashMap<>(); + Map> readerPlugins = new HashMap<>(); + Map formats = new HashMap<>(); for (DataFormatPlugin plugin : pluginsService.filterPlugins(DataFormatPlugin.class)) { DataFormat format = plugin.getDataFormat(); - DataFormatPlugin existing = dataFormatPlugiRegistry.putIfAbsent(format, plugin); + DataFormatPlugin existing = pluginRegistry.putIfAbsent(format, plugin); if (existing != null) { throw new IllegalArgumentException("DataFormat [" + format.name() + "] is already registered by plugin [" + existing + "]"); } - dataFormats.put(format.name(), format); + formats.put(format.name(), format); } for (SearchBackEndPlugin plugin : pluginsService.filterPlugins(SearchBackEndPlugin.class)) { for (DataFormat format : plugin.getSupportedFormats()) { - // TODO: use mapperService and indexSettings to filter formats relevant to this index - readerManagerBuilders.put(format, shardPath -> plugin.createReaderManager(format, shardPath)); + readerPlugins.put(format, plugin); } } - if (!readerManagerBuilders.keySet().equals(dataFormatPlugiRegistry.keySet())) { + if (readerPlugins.keySet().equals(pluginRegistry.keySet()) == false) { throw new IllegalStateException( "Cannot build registry as data formats have missing indexing engine/reader managers" + " - formats with reader managers: " - + readerManagerBuilders.keySet() + + readerPlugins.keySet() + ", formats with plugins: " - + dataFormatPlugiRegistry.keySet() + + pluginRegistry.keySet() ); } - this.dataFormatPluginRegistry = Map.copyOf(dataFormatPlugiRegistry); - this.dataFormats = Map.copyOf(dataFormats); - this.readerManagerBuilders = Map.copyOf(readerManagerBuilders); + this.dataFormatPluginRegistry = Map.copyOf(pluginRegistry); + this.dataFormats = Map.copyOf(formats); + this.readerManagerPlugins = Map.copyOf(readerPlugins); } /** * Creates an {@link IndexingExecutionEngine} for the given data format. * + * @param committer the committer holding the backing store, or null if not available * @param format the data format * @param mapperService the mapper service for field mapping resolution * @param shardPath the shard path for file storage @@ -97,6 +91,7 @@ public DataFormatRegistry(PluginsService pluginsService) { * @throws IllegalArgumentException if the data format is not registered */ public IndexingExecutionEngine getIndexingEngine( + Committer committer, DataFormat format, MapperService mapperService, ShardPath shardPath, @@ -106,7 +101,7 @@ public DataFormatRegistry(PluginsService pluginsService) { if (plugin == null) { throw new IllegalArgumentException("No plugin registered for DataFormat [" + format.name() + "]"); } - return plugin.indexingEngine(mapperService, shardPath, indexSettings); + return plugin.indexingEngine(committer, mapperService, shardPath, indexSettings); } public DataFormat format(String name) { @@ -119,10 +114,6 @@ public DataFormat format(String name) { /** * Returns all registered data formats that support a specific capability for a field type. - * - * @param fieldType the field type name - * @param capability the capability to check - * @return list of data formats supporting the capability for the field type */ public List supportsCapability(String fieldType, FieldTypeCapabilities.Capability capability) { return dataFormatPluginRegistry.keySet() @@ -137,35 +128,31 @@ public List supportsCapability(String fieldType, FieldTypeCapabiliti } /** - * Returns an unmodifiable view of all registered data formats and their plugins. - * - * @return unmodifiable map of data formats to plugins + * Returns an unmodifiable view of all registered data formats. */ public Set getRegisteredFormats() { return Set.copyOf(dataFormatPluginRegistry.keySet()); } /** - * Creates {@link EngineReaderManager} instances for all applicable data formats based on index settings/mappings. - * Each reader manager is instantiated by applying the shard path to the factory registered - * by the corresponding {@link SearchBackEndPlugin}. + * Creates {@link EngineReaderManager} instances for all applicable data formats. * - * @param mapperService the mapper service for field mapping resolution (reserved for future filtering) + * @param committer the committer holding the backing store, or null if not available + * @param mapperService the mapper service (reserved for future filtering) * @param indexSettings the index settings (reserved for future filtering) * @param shardPath the shard path used to create reader managers * @return a map from data format to its reader manager - * @throws RuntimeException wrapping an {@link IOException} if reader manager creation fails + * @throws IOException if reader manager creation fails */ public Map> getReaderManagers( + Committer committer, MapperService mapperService, IndexSettings indexSettings, ShardPath shardPath ) throws IOException { - // TODO: Filter based on index settings Map> readerManagers = new HashMap<>(); - for (Map.Entry, IOException>> entry : readerManagerBuilders - .entrySet()) { - readerManagers.put(entry.getKey(), entry.getValue().apply(shardPath)); + for (Map.Entry> entry : readerManagerPlugins.entrySet()) { + readerManagers.put(entry.getKey(), entry.getValue().createReaderManager(committer, entry.getKey(), shardPath)); } return readerManagers; } diff --git a/server/src/main/java/org/opensearch/index/engine/exec/DataFormatAwareEngineFactory.java b/server/src/main/java/org/opensearch/index/engine/exec/DataFormatAwareEngineFactory.java index f32e6aa4410fd..8b2570f65db66 100644 --- a/server/src/main/java/org/opensearch/index/engine/exec/DataFormatAwareEngineFactory.java +++ b/server/src/main/java/org/opensearch/index/engine/exec/DataFormatAwareEngineFactory.java @@ -9,10 +9,9 @@ package org.opensearch.index.engine.exec; import org.opensearch.common.annotation.ExperimentalApi; -import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.DataFormatAwareEngine; import org.opensearch.index.engine.dataformat.DataFormat; -import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.engine.exec.commit.Committer; import org.opensearch.index.shard.ShardPath; import org.opensearch.plugins.PluginsService; import org.opensearch.plugins.SearchBackEndPlugin; @@ -33,35 +32,42 @@ @ExperimentalApi public class DataFormatAwareEngineFactory { - private final Map> readerManagers = new HashMap<>(); + private final List> searchBackEndPlugins; + private final ShardPath shardPath; private final IndexFileDeleter indexFileDeleter; - @SuppressWarnings("rawtypes") - public DataFormatAwareEngineFactory( - PluginsService pluginsService, - ShardPath shardPath, - MapperService mapperService, - IndexSettings indexSettings - ) throws IOException { - for (SearchBackEndPlugin plugin : pluginsService.filterPlugins(SearchBackEndPlugin.class)) { - List formats = plugin.getSupportedFormats(); - if (formats == null) { - continue; - } - for (DataFormat format : formats) { - // TODO: use mapperService and indexSettings to filter formats relevant to this index - readerManagers.put(format, plugin.createReaderManager(format, shardPath)); + @SuppressWarnings("unchecked") + public DataFormatAwareEngineFactory(PluginsService pluginsService, ShardPath shardPath) throws IOException { + this.searchBackEndPlugins = (List>) (List) pluginsService.filterPlugins(SearchBackEndPlugin.class); + this.shardPath = shardPath; + this.indexFileDeleter = new IndexFileDeleter(null, shardPath); + } + + /** + * Creates reader managers for all discovered search back-end plugins. + * The {@link Committer} is passed through so plugins can access the backing store. + * + * @param committer the committer holding the backing store, or null if not available + * @return a map of data format to reader manager + * @throws IOException if reader manager creation fails + */ + @SuppressWarnings("unchecked") + public Map> createReaderManagers(Committer committer) throws IOException { + Map> readerManagers = new HashMap<>(); + for (SearchBackEndPlugin plugin : searchBackEndPlugins) { + for (DataFormat format : plugin.getSupportedFormats()) { + readerManagers.put(format, plugin.createReaderManager(committer, format, shardPath)); } } - this.indexFileDeleter = new IndexFileDeleter(null, shardPath); + return readerManagers; } /** - * Creates a new {@link DataFormatAwareEngine} populated with the discovered - * reader managers and memoizing suppliers. + * Creates a new {@link DataFormatAwareEngine} with empty reader managers. + * Reader managers should be populated later via {@link #createReaderManagers}. */ public DataFormatAwareEngine create() { - return new DataFormatAwareEngine(readerManagers); + return new DataFormatAwareEngine(new HashMap<>()); } /** diff --git a/server/src/main/java/org/opensearch/index/engine/exec/commit/Committer.java b/server/src/main/java/org/opensearch/index/engine/exec/commit/Committer.java new file mode 100644 index 0000000000000..d6b05a2a5ad43 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/commit/Committer.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec.commit; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.CommitStats; +import org.opensearch.index.engine.SafeCommitInfo; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; + +/** + * Abstraction for durably committing data to a backing store. + *

+ * Implementations persist commit data (key-value pairs) so it can be recovered after a restart. + * The canonical implementation stores the data as Lucene commit userData via + * {@code IndexWriter.setLiveCommitData} + {@code IndexWriter.commit()}. + *

+ * The caller is responsible for serializing any higher-level state (e.g., CatalogSnapshot) + * into the commit data before calling {@link #commit}. + *

+ * Lifecycle: {@link #init(CommitterSettings)} is called once during engine construction, + * {@link #commit(Map)} is called during flush, and {@link #close()} is called + * during engine shutdown. + * + * @opensearch.experimental + */ +@ExperimentalApi +public interface Committer extends Closeable { + + /** + * Initializes the committer with the given settings. + * Called once during engine construction before any indexing operations. + * + * @param settings initialization parameters (e.g., shard path, index settings) + * @throws IOException if initialization fails + */ + void init(CommitterSettings settings) throws IOException; + + /** + * Durably commits the given data to the backing store's commit metadata. + * Called during the engine's flush path. + * + * @param commitData the key-value pairs to persist as commit metadata + * @throws IOException if the commit fails + */ + void commit(Map commitData) throws IOException; + + /** + * Returns the user data from the last successful commit. + * For Lucene-backed implementations, this is the commit userData from the last + * {@code IndexWriter.commit()} call, which includes the serialized CatalogSnapshot. + * + * @return the last committed user data, or an empty map if no commit has occurred + * @throws IOException if reading the commit data fails + */ + Map getLastCommittedData() throws IOException; + + /** + * Returns statistics about the last commit point. + * Includes generation, user data, commit ID, and document count. + * + * @return the commit stats, or null if no commit has occurred + */ + CommitStats getCommitStats(); + + /** + * Returns information about the safe commit point for recovery decisions. + * The safe commit is the most recent commit that is safe to recover from, + * carrying the local checkpoint and document count. + * + * @return the safe commit info + */ + SafeCommitInfo getSafeCommitInfo(); +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/commit/CommitterSettings.java b/server/src/main/java/org/opensearch/index/engine/exec/commit/CommitterSettings.java new file mode 100644 index 0000000000000..e8c329fb917bc --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/commit/CommitterSettings.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec.commit; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.EngineConfig; +import org.opensearch.index.shard.ShardPath; + +/** + * Initialization parameters for a {@link Committer}. + * Carries the shard path, index settings, and engine configuration needed to set up the backing store. + * + * @param shardPath the shard's file system path + * @param indexSettings the index-level settings + * @param engineConfig the engine configuration (nullable — may be absent in tests or standalone mode) + * + * @opensearch.experimental + */ +@ExperimentalApi +public record CommitterSettings(ShardPath shardPath, IndexSettings indexSettings, EngineConfig engineConfig) { + + /** + * Convenience constructor without engine config (for tests and standalone usage). + */ + public CommitterSettings(ShardPath shardPath, IndexSettings indexSettings) { + this(shardPath, indexSettings, null); + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/commit/package-info.java b/server/src/main/java/org/opensearch/index/engine/exec/commit/package-info.java new file mode 100644 index 0000000000000..601f35a2af327 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/commit/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Committer abstraction for durably persisting catalog snapshots during flush. */ +package org.opensearch.index.engine.exec.commit; diff --git a/server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshot.java b/server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshot.java index f10cd55a075e3..ca7169cc535fd 100644 --- a/server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshot.java +++ b/server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshot.java @@ -13,7 +13,6 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; -import org.opensearch.index.engine.dataformat.DataFormat; import org.opensearch.index.engine.exec.Segment; import org.opensearch.index.engine.exec.WriterFileSet; @@ -202,6 +201,4 @@ public CatalogSnapshot cloneNoAcquire() { * @return a new {@link CatalogSnapshot} with the same logical state */ public abstract CatalogSnapshot clone(); - - public abstract Object getReader(DataFormat dataFormat); } diff --git a/server/src/main/java/org/opensearch/index/engine/exec/coord/DataformatAwareCatalogSnapshot.java b/server/src/main/java/org/opensearch/index/engine/exec/coord/DataformatAwareCatalogSnapshot.java index 9426bbeaad47b..86348e82099cf 100644 --- a/server/src/main/java/org/opensearch/index/engine/exec/coord/DataformatAwareCatalogSnapshot.java +++ b/server/src/main/java/org/opensearch/index/engine/exec/coord/DataformatAwareCatalogSnapshot.java @@ -14,7 +14,6 @@ import org.opensearch.core.common.io.stream.BytesStreamInput; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.index.engine.dataformat.DataFormat; import org.opensearch.index.engine.exec.Segment; import org.opensearch.index.engine.exec.WriterFileSet; @@ -200,9 +199,4 @@ protected void closeInternal() { public boolean isClosed() { return closed.get(); } - - @Override - public Object getReader(DataFormat dataFormat) { - throw new UnsupportedOperationException("Not implemented"); - } } diff --git a/server/src/main/java/org/opensearch/index/engine/exec/coord/SegmentInfosCatalogSnapshot.java b/server/src/main/java/org/opensearch/index/engine/exec/coord/SegmentInfosCatalogSnapshot.java index 82cf48f07d804..7b43e9d93f616 100644 --- a/server/src/main/java/org/opensearch/index/engine/exec/coord/SegmentInfosCatalogSnapshot.java +++ b/server/src/main/java/org/opensearch/index/engine/exec/coord/SegmentInfosCatalogSnapshot.java @@ -16,7 +16,6 @@ import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.index.engine.dataformat.DataFormat; import org.opensearch.index.engine.exec.Segment; import org.opensearch.index.engine.exec.WriterFileSet; @@ -128,11 +127,6 @@ public void setUserData(Map userData) { // No-op for SegmentInfosCatalogSnapshot } - @Override - public Object getReader(DataFormat dataFormat) { - throw new UnsupportedOperationException("SegmentInfosCatalogSnapshot does not support getReader()"); - } - @Override protected void closeInternal() { // No resources to release for SegmentInfos wrapper. diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index fa66e8405120a..536d38d48f2f5 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -577,10 +577,13 @@ public boolean shouldCache(Query query) { } this.dataFormatRegistry = dataFormatRegistry; if (dataFormatRegistry != null) { - // TODO: This should go away and we should use indexer directly. - this.currentCompositeEngineReference.set( - new DataFormatAwareEngine(dataFormatRegistry.getReaderManagers(mapperService, indexSettings, path)) + // TODO: Wire the Committer from EnginePlugin discovery and pass it here. + // For now, reader managers are created without a committer — the Lucene reader manager + // will fail if used before the committer is wired at engine construction time. + DataFormatAwareEngine dfaEngine = new DataFormatAwareEngine( + dataFormatRegistry.getReaderManagers(null, mapperService, indexSettings, path) ); + this.currentCompositeEngineReference.set(dfaEngine); } } diff --git a/server/src/main/java/org/opensearch/plugins/EnginePlugin.java b/server/src/main/java/org/opensearch/plugins/EnginePlugin.java index 16556014c4793..c8c226c4ea490 100644 --- a/server/src/main/java/org/opensearch/plugins/EnginePlugin.java +++ b/server/src/main/java/org/opensearch/plugins/EnginePlugin.java @@ -37,6 +37,7 @@ import org.opensearch.index.codec.CodecService; import org.opensearch.index.codec.CodecServiceFactory; import org.opensearch.index.engine.EngineFactory; +import org.opensearch.index.engine.exec.commit.Committer; import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.translog.TranslogDeletionPolicy; import org.opensearch.index.translog.TranslogDeletionPolicyFactory; @@ -116,4 +117,16 @@ default Optional getAdditionalCodecs(IndexSettings indexSettin default Optional getCustomTranslogDeletionPolicyFactory() { return Optional.empty(); } + + /** + * When an index is created this method is invoked for each engine plugin. Engine plugins can inspect the index settings to determine + * whether or not to provide a {@link Committer} for the given index. A plugin that does not provide a Committer should return + * {@link Optional#empty()}. + * + * @param indexSettings the index settings + * @return an optional committer + */ + default Optional getCommitter(IndexSettings indexSettings) { + return Optional.empty(); + } } diff --git a/server/src/main/java/org/opensearch/plugins/SearchBackEndPlugin.java b/server/src/main/java/org/opensearch/plugins/SearchBackEndPlugin.java index 3b1d562d5f9d3..d3d149fe98189 100644 --- a/server/src/main/java/org/opensearch/plugins/SearchBackEndPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/SearchBackEndPlugin.java @@ -10,6 +10,7 @@ import org.opensearch.index.engine.dataformat.DataFormat; import org.opensearch.index.engine.exec.EngineReaderManager; +import org.opensearch.index.engine.exec.commit.Committer; import org.opensearch.index.shard.ShardPath; import java.io.IOException; @@ -38,12 +39,15 @@ public interface SearchBackEndPlugin { List getSupportedFormats(); /** - * Creates a reader manager for the given format and shard. + * Creates a reader manager for the given data format and shard. + * The {@link Committer} provides access to the backing store (e.g., IndexWriter) + * so that the reader manager can open readers from the same writer. * + * @param committer the committer holding the backing store, or null if not available * @param format the data format * @param shardPath the shard path - * @return a reader manager that produces readers of type {@code R} - * @throws IOException if creation fails + * @return the reader manager + * @throws IOException if reader creation fails */ - EngineReaderManager createReaderManager(DataFormat format, ShardPath shardPath) throws IOException; + EngineReaderManager createReaderManager(Committer committer, DataFormat format, ShardPath shardPath) throws IOException; } diff --git a/server/src/test/java/org/opensearch/index/engine/DataFormatAwareEngineCommitterTests.java b/server/src/test/java/org/opensearch/index/engine/DataFormatAwareEngineCommitterTests.java new file mode 100644 index 0000000000000..1091824d67e7c --- /dev/null +++ b/server/src/test/java/org/opensearch/index/engine/DataFormatAwareEngineCommitterTests.java @@ -0,0 +1,68 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.opensearch.index.engine.exec.commit.Committer; +import org.opensearch.index.engine.exec.commit.CommitterSettings; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Tests for the optional Committer field on {@link DataFormatAwareEngine}. + */ +public class DataFormatAwareEngineCommitterTests extends OpenSearchTestCase { + + /** + * Verifies that getCommitter() returns null when no committer has been set. + * Validates: Requirements 4.3, 4.4 + */ + public void testGetCommitterReturnsNullByDefault() { + DataFormatAwareEngine engine = new DataFormatAwareEngine(new HashMap<>()); + assertNull("getCommitter() should return null by default", engine.getCommitter()); + } + + /** + * Verifies that setCommitter() followed by getCommitter() returns the same instance. + * Validates: Requirements 4.3, 4.4 + */ + public void testSetCommitterThenGetCommitterReturnsSameInstance() { + DataFormatAwareEngine engine = new DataFormatAwareEngine(new HashMap<>()); + Committer committer = new Committer() { + @Override + public void init(CommitterSettings settings) throws IOException {} + + @Override + public void commit(Map commitData) throws IOException {} + + @Override + public void close() throws IOException {} + + @Override + public Map getLastCommittedData() { + return Map.of(); + } + + @Override + public CommitStats getCommitStats() { + return null; + } + + @Override + public SafeCommitInfo getSafeCommitInfo() { + return SafeCommitInfo.EMPTY; + } + }; + + engine.setCommitter(committer); + assertSame("getCommitter() should return the exact instance that was set", committer, engine.getCommitter()); + } +} diff --git a/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java b/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java index 4c5191ac1af2c..3a1d629266afc 100644 --- a/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java +++ b/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java @@ -72,6 +72,7 @@ public void testFullDataFormatLifecycle() throws IOException { @SuppressWarnings("unchecked") IndexingExecutionEngine engine = (IndexingExecutionEngine) plugin .indexingEngine( + null, mock(MapperService.class), new ShardPath(false, Path.of("/tmp/uuid/0"), Path.of("/tmp/uuid/0"), new ShardId("index", "uuid", 0)), new IndexSettings(IndexMetadata.builder("index").settings(settings).build(), settings) diff --git a/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatRegistryTests.java b/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatRegistryTests.java index 0255d0e4046fa..ce1a22bba9bb2 100644 --- a/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatRegistryTests.java +++ b/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatRegistryTests.java @@ -142,7 +142,7 @@ public void testGetIndexingEngine() { DataFormatRegistry registry = new DataFormatRegistry(pluginsService); - IndexingExecutionEngine engine = registry.getIndexingEngine(format, mapperService, shardPath, indexSettings); + IndexingExecutionEngine engine = registry.getIndexingEngine(null, format, mapperService, shardPath, indexSettings); assertNotNull(engine); assertEquals(format, engine.getDataFormat()); } @@ -156,7 +156,7 @@ public void testGetIndexingEngineForUnregisteredFormatThrows() { IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> registry.getIndexingEngine(unregistered, mapperService, shardPath, indexSettings) + () -> registry.getIndexingEngine(null, unregistered, mapperService, shardPath, indexSettings) ); assertTrue(e.getMessage().contains("unknown")); } @@ -261,7 +261,7 @@ public void testGetReaderManagers() throws IOException { DataFormatRegistry registry = new DataFormatRegistry(pluginsService); - Map> managers = registry.getReaderManagers(mapperService, indexSettings, shardPath); + Map> managers = registry.getReaderManagers(null, mapperService, indexSettings, shardPath); assertEquals(1, managers.size()); assertNotNull(managers.get(format)); } diff --git a/server/src/test/java/org/opensearch/index/engine/dataformat/stub/MockCatalogSnapshot.java b/server/src/test/java/org/opensearch/index/engine/dataformat/stub/MockCatalogSnapshot.java index 9d619d95ccbcb..1fa2a96a45b31 100644 --- a/server/src/test/java/org/opensearch/index/engine/dataformat/stub/MockCatalogSnapshot.java +++ b/server/src/test/java/org/opensearch/index/engine/dataformat/stub/MockCatalogSnapshot.java @@ -9,7 +9,6 @@ package org.opensearch.index.engine.dataformat.stub; import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.index.engine.dataformat.DataFormat; import org.opensearch.index.engine.exec.Segment; import org.opensearch.index.engine.exec.WriterFileSet; import org.opensearch.index.engine.exec.coord.CatalogSnapshot; @@ -77,11 +76,6 @@ public String serializeToString() { @Override public void setUserData(Map userData) {} - @Override - public Object getReader(DataFormat dataFormat) { - return null; - } - @Override public CatalogSnapshot clone() { return new MockCatalogSnapshot(generation, segments, format); diff --git a/server/src/test/java/org/opensearch/index/engine/dataformat/stub/MockDataFormatPlugin.java b/server/src/test/java/org/opensearch/index/engine/dataformat/stub/MockDataFormatPlugin.java index 4a5ca72c5e5dd..fe1e830a48413 100644 --- a/server/src/test/java/org/opensearch/index/engine/dataformat/stub/MockDataFormatPlugin.java +++ b/server/src/test/java/org/opensearch/index/engine/dataformat/stub/MockDataFormatPlugin.java @@ -12,6 +12,7 @@ import org.opensearch.index.engine.dataformat.DataFormat; import org.opensearch.index.engine.dataformat.DataFormatPlugin; import org.opensearch.index.engine.dataformat.IndexingExecutionEngine; +import org.opensearch.index.engine.exec.commit.Committer; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.shard.ShardPath; @@ -35,7 +36,12 @@ public DataFormat getDataFormat() { } @Override - public IndexingExecutionEngine indexingEngine(MapperService mapperService, ShardPath shardPath, IndexSettings indexSettings) { + public IndexingExecutionEngine indexingEngine( + Committer committer, + MapperService mapperService, + ShardPath shardPath, + IndexSettings indexSettings + ) { return new MockIndexingExecutionEngine(dataFormat); } } diff --git a/server/src/test/java/org/opensearch/index/engine/dataformat/stub/MockSearchBackEndPlugin.java b/server/src/test/java/org/opensearch/index/engine/dataformat/stub/MockSearchBackEndPlugin.java index 48e9b83353451..e5b86232c01d0 100644 --- a/server/src/test/java/org/opensearch/index/engine/dataformat/stub/MockSearchBackEndPlugin.java +++ b/server/src/test/java/org/opensearch/index/engine/dataformat/stub/MockSearchBackEndPlugin.java @@ -10,12 +10,13 @@ import org.opensearch.index.engine.dataformat.DataFormat; import org.opensearch.index.engine.exec.EngineReaderManager; +import org.opensearch.index.engine.exec.commit.Committer; import org.opensearch.index.shard.ShardPath; import org.opensearch.plugins.SearchBackEndPlugin; import java.util.List; -public class MockSearchBackEndPlugin implements SearchBackEndPlugin { +public class MockSearchBackEndPlugin implements SearchBackEndPlugin { private final List formats; public MockSearchBackEndPlugin(List formats) { @@ -33,7 +34,7 @@ public List getSupportedFormats() { } @Override - public EngineReaderManager createReaderManager(DataFormat format, ShardPath shardPath) { + public EngineReaderManager createReaderManager(Committer committer, DataFormat format, ShardPath shardPath) { return new MockReaderManager("mock-columnar"); } } diff --git a/server/src/test/java/org/opensearch/index/engine/exec/commit/CommitterTests.java b/server/src/test/java/org/opensearch/index/engine/exec/commit/CommitterTests.java new file mode 100644 index 0000000000000..5c8530f5efaa1 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/engine/exec/commit/CommitterTests.java @@ -0,0 +1,193 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec.commit; + +import org.opensearch.index.engine.CommitStats; +import org.opensearch.index.engine.SafeCommitInfo; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Tests for the {@link Committer} interface contract. + */ +public class CommitterTests extends OpenSearchTestCase { + + /** Creates a minimal Committer with no-op implementations for all methods. */ + private static Committer noOpCommitter() { + return new Committer() { + @Override + public void init(CommitterSettings settings) throws IOException {} + + @Override + public void commit(Map commitData) throws IOException {} + + @Override + public void close() throws IOException {} + + @Override + public Map getLastCommittedData() { + return Map.of(); + } + + @Override + public CommitStats getCommitStats() { + return null; + } + + @Override + public SafeCommitInfo getSafeCommitInfo() { + return SafeCommitInfo.EMPTY; + } + }; + } + + public void testMinimalImplementationCanBeInstantiated() { + Committer committer = noOpCommitter(); + assertNotNull(committer); + } + + public void testCloseFromCloseableIsCallable() throws IOException { + AtomicBoolean closed = new AtomicBoolean(false); + Committer committer = new Committer() { + @Override + public void init(CommitterSettings settings) {} + + @Override + public void commit(Map commitData) {} + + @Override + public void close() { + closed.set(true); + } + + @Override + public Map getLastCommittedData() { + return Map.of(); + } + + @Override + public CommitStats getCommitStats() { + return null; + } + + @Override + public SafeCommitInfo getSafeCommitInfo() { + return SafeCommitInfo.EMPTY; + } + }; + + committer.close(); + assertTrue("close() should have been called", closed.get()); + } + + public void testCommitterWorksWithTryWithResources() throws IOException { + AtomicBoolean closed = new AtomicBoolean(false); + try (Committer committer = new Committer() { + @Override + public void init(CommitterSettings settings) {} + + @Override + public void commit(Map commitData) {} + + @Override + public void close() { + closed.set(true); + } + + @Override + public Map getLastCommittedData() { + return Map.of(); + } + + @Override + public CommitStats getCommitStats() { + return null; + } + + @Override + public SafeCommitInfo getSafeCommitInfo() { + return SafeCommitInfo.EMPTY; + } + }) { + assertNotNull(committer); + } + assertTrue("close() should have been called by try-with-resources", closed.get()); + } + + public void testInitIsCallable() throws IOException { + AtomicBoolean initialized = new AtomicBoolean(false); + Committer committer = new Committer() { + @Override + public void init(CommitterSettings settings) { + initialized.set(true); + } + + @Override + public void commit(Map commitData) {} + + @Override + public void close() {} + + @Override + public Map getLastCommittedData() { + return Map.of(); + } + + @Override + public CommitStats getCommitStats() { + return null; + } + + @Override + public SafeCommitInfo getSafeCommitInfo() { + return SafeCommitInfo.EMPTY; + } + }; + + committer.init(null); + assertTrue("init() should have been called", initialized.get()); + } + + public void testCommitIsCallable() throws IOException { + AtomicBoolean committed = new AtomicBoolean(false); + Committer committer = new Committer() { + @Override + public void init(CommitterSettings settings) {} + + @Override + public void commit(Map commitData) { + committed.set(true); + } + + @Override + public void close() {} + + @Override + public Map getLastCommittedData() { + return Map.of(); + } + + @Override + public CommitStats getCommitStats() { + return null; + } + + @Override + public SafeCommitInfo getSafeCommitInfo() { + return SafeCommitInfo.EMPTY; + } + }; + + committer.commit(Map.of()); + assertTrue("commit() should have been called", committed.get()); + } +} From 3fe091a7b7c412ad1a25bde85a184a63da52f3b4 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Tue, 7 Apr 2026 00:04:57 +0530 Subject: [PATCH 2/3] Simplify committer interface and limit broad usage Signed-off-by: Bukhtawar Khan --- .../opensearch/be/lucene/LuceneCommitter.java | 14 ++-- .../be/lucene/LuceneSearchEnginePlugin.java | 30 ++++--- .../be/lucene/LuceneCommitterTests.java | 18 ++-- .../LuceneIndexingExecutionEngineTests.java | 4 +- .../lucene/LuceneSearchEnginePluginTests.java | 22 ++++- .../CompositeIndexingExecutionEngine.java | 19 +++-- ...CompositeIndexingExecutionEngineTests.java | 83 +------------------ .../composite/CompositeTestHelper.java | 9 +- .../engine/dataformat/DataFormatRegistry.java | 4 +- .../exec/DataFormatAwareEngineFactory.java | 7 +- .../index/engine/exec/commit/Committer.java | 11 +-- .../opensearch/index/shard/IndexShard.java | 2 +- .../org/opensearch/plugins/EnginePlugin.java | 7 +- .../plugins/SearchBackEndPlugin.java | 8 +- .../DataFormatAwareEngineCommitterTests.java | 4 - .../dataformat/DataFormatRegistryTests.java | 2 +- .../stub/MockSearchBackEndPlugin.java | 3 +- .../engine/exec/commit/CommitterTests.java | 46 ---------- 18 files changed, 83 insertions(+), 210 deletions(-) diff --git a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneCommitter.java b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneCommitter.java index 6e31d61c7b281..c27c8ca6f582c 100644 --- a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneCommitter.java +++ b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneCommitter.java @@ -34,7 +34,7 @@ *

* Responsibilities: *

    - *
  • {@link #init} — opens the IndexWriter on the shard's Lucene directory
  • + *
  • Constructor — opens the IndexWriter on the shard's Lucene directory
  • *
  • {@link #commit} — serializes the CatalogSnapshot as Lucene commit userData
  • *
  • {@link #close} — closes the IndexWriter
  • *
@@ -55,11 +55,13 @@ public class LuceneCommitter implements Committer { private IndexWriter indexWriter; - /** Creates a new LuceneCommitter. */ - public LuceneCommitter() {} - - @Override - public void init(CommitterSettings settings) throws IOException { + /** + * Creates a new LuceneCommitter and immediately opens the IndexWriter. + * + * @param settings the committer settings (shard path, index settings, optional engine config) + * @throws IOException if the IndexWriter cannot be opened + */ + public LuceneCommitter(CommitterSettings settings) throws IOException { Path luceneDir = settings.shardPath().getDataPath().resolve(LUCENE_DIR_NAME); Files.createDirectories(luceneDir); Directory directory = FSDirectory.open(luceneDir); diff --git a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchEnginePlugin.java b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchEnginePlugin.java index 8c13b994db4fe..5a968f38ddebe 100644 --- a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchEnginePlugin.java +++ b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchEnginePlugin.java @@ -12,10 +12,10 @@ import org.apache.lucene.index.IndexWriter; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; -import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.dataformat.DataFormat; import org.opensearch.index.engine.exec.EngineReaderManager; import org.opensearch.index.engine.exec.commit.Committer; +import org.opensearch.index.engine.exec.commit.CommitterSettings; import org.opensearch.index.shard.ShardPath; import org.opensearch.plugins.EnginePlugin; import org.opensearch.plugins.SearchBackEndPlugin; @@ -42,6 +42,8 @@ @ExperimentalApi public class LuceneSearchEnginePlugin implements SearchBackEndPlugin, EnginePlugin { + private LuceneCommitter luceneCommitter; + /** Creates a new LuceneSearchEnginePlugin. */ public LuceneSearchEnginePlugin() {} @@ -53,16 +55,17 @@ public String name() { // --- SearchBackEndPlugin --- @Override - public EngineReaderManager createReaderManager(Committer committer, DataFormat format, ShardPath shardPath) + public EngineReaderManager createReaderManager(DataFormat format, ShardPath shardPath) throws IOException { - if (committer instanceof LuceneCommitter) { - IndexWriter writer = ((LuceneCommitter) committer).getIndexWriter(); - if (writer != null) { - OpenSearchDirectoryReader osReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardPath.getShardId()); - return new LuceneReaderManager(format, osReader); - } + if (luceneCommitter == null) { + throw new IllegalStateException("getCommitter() must be called before createReaderManager()"); + } + IndexWriter writer = luceneCommitter.getIndexWriter(); + if (writer == null) { + throw new IllegalStateException("LuceneCommitter not initialized"); } - throw new IllegalStateException("Cannot create LuceneReaderManager without an initialized LuceneCommitter"); + OpenSearchDirectoryReader osReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardPath.getShardId()); + return new LuceneReaderManager(format, osReader); } @Override @@ -73,7 +76,12 @@ public List getSupportedFormats() { // --- EnginePlugin --- @Override - public Optional getCommitter(IndexSettings indexSettings) { - return Optional.of(new LuceneCommitter()); + public Optional getCommitter(CommitterSettings committerSettings) { + try { + this.luceneCommitter = new LuceneCommitter(committerSettings); + } catch (IOException e) { + throw new RuntimeException("Failed to create LuceneCommitter", e); + } + return Optional.of(luceneCommitter); } } diff --git a/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneCommitterTests.java b/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneCommitterTests.java index 1a2c2c7448dc8..7936cc926ebf2 100644 --- a/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneCommitterTests.java +++ b/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneCommitterTests.java @@ -39,10 +39,8 @@ private CommitterSettings createCommitterSettings() throws IOException { } public void testInitOpensIndexWriter() throws IOException { - LuceneCommitter committer = new LuceneCommitter(); + LuceneCommitter committer = new LuceneCommitter(createCommitterSettings()); try { - assertNull(committer.getIndexWriter()); - committer.init(createCommitterSettings()); IndexWriter writer = committer.getIndexWriter(); assertNotNull(writer); assertTrue(writer.isOpen()); @@ -52,8 +50,7 @@ public void testInitOpensIndexWriter() throws IOException { } public void testCloseReleasesIndexWriter() throws IOException { - LuceneCommitter committer = new LuceneCommitter(); - committer.init(createCommitterSettings()); + LuceneCommitter committer = new LuceneCommitter(createCommitterSettings()); assertNotNull(committer.getIndexWriter()); committer.close(); @@ -61,10 +58,8 @@ public void testCloseReleasesIndexWriter() throws IOException { } public void testCommitRoundTrip() throws IOException { - LuceneCommitter committer = new LuceneCommitter(); + LuceneCommitter committer = new LuceneCommitter(createCommitterSettings()); try { - committer.init(createCommitterSettings()); - Map commitData = Map.of("key1", "value1", "key2", "value2", "_snapshot_", "serialized-data"); committer.commit(commitData); @@ -82,10 +77,8 @@ public void testCommitRoundTrip() throws IOException { } public void testCommitWithEmptyData() throws IOException { - LuceneCommitter committer = new LuceneCommitter(); + LuceneCommitter committer = new LuceneCommitter(createCommitterSettings()); try { - committer.init(createCommitterSettings()); - committer.commit(Map.of()); Map readBack = new HashMap<>(); @@ -100,8 +93,7 @@ public void testCommitWithEmptyData() throws IOException { } public void testCommitAfterCloseThrows() throws IOException { - LuceneCommitter committer = new LuceneCommitter(); - committer.init(createCommitterSettings()); + LuceneCommitter committer = new LuceneCommitter(createCommitterSettings()); committer.close(); expectThrows(IllegalStateException.class, () -> committer.commit(Map.of("key", "value"))); diff --git a/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneIndexingExecutionEngineTests.java b/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneIndexingExecutionEngineTests.java index 4f7e23e26acfb..7091958f0e7d8 100644 --- a/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneIndexingExecutionEngineTests.java +++ b/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneIndexingExecutionEngineTests.java @@ -45,9 +45,7 @@ private LuceneCommitter createAndInitCommitter() throws IOException { ShardPath shardPath = new ShardPath(false, dataPath, dataPath, shardId); IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", Settings.EMPTY); - LuceneCommitter c = new LuceneCommitter(); - c.init(new CommitterSettings(shardPath, indexSettings)); - return c; + return new LuceneCommitter(new CommitterSettings(shardPath, indexSettings)); } @Override diff --git a/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneSearchEnginePluginTests.java b/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneSearchEnginePluginTests.java index 649953953eee2..10e43adf56c4a 100644 --- a/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneSearchEnginePluginTests.java +++ b/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneSearchEnginePluginTests.java @@ -8,9 +8,18 @@ package org.opensearch.be.lucene; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.exec.commit.Committer; +import org.opensearch.index.engine.exec.commit.CommitterSettings; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.OpenSearchTestCase; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Optional; /** @@ -24,11 +33,20 @@ public class LuceneSearchEnginePluginTests extends OpenSearchTestCase { * * Validates: Requirements 4.2 */ - public void testGetCommitterReturnsLuceneCommitter() { + public void testGetCommitterReturnsLuceneCommitter() throws IOException { + Path baseDir = createTempDir(); + ShardId shardId = new ShardId("test", "_na_", 0); + Path dataPath = baseDir.resolve(shardId.getIndex().getUUID()).resolve(Integer.toString(shardId.id())); + Files.createDirectories(dataPath); + ShardPath shardPath = new ShardPath(false, dataPath, dataPath, shardId); + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", Settings.EMPTY); + CommitterSettings committerSettings = new CommitterSettings(shardPath, indexSettings); + LuceneSearchEnginePlugin plugin = new LuceneSearchEnginePlugin(); - Optional committer = plugin.getCommitter(null); + Optional committer = plugin.getCommitter(committerSettings); assertTrue("getCommitter() should return a non-empty Optional", committer.isPresent()); assertTrue("getCommitter() should return a LuceneCommitter instance", committer.get() instanceof LuceneCommitter); + committer.get().close(); } } diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java index c76c6a5022935..0ddb41af96a06 100644 --- a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java @@ -27,7 +27,6 @@ import org.opensearch.index.engine.exec.Segment; import org.opensearch.index.engine.exec.WriterFileSet; import org.opensearch.index.engine.exec.commit.Committer; -import org.opensearch.index.engine.exec.commit.CommitterSettings; import org.opensearch.index.engine.exec.coord.CatalogSnapshot; import org.opensearch.index.engine.exec.coord.CatalogSnapshotManager; import org.opensearch.index.mapper.MapperService; @@ -136,11 +135,6 @@ public CompositeIndexingExecutionEngine( ); this.committer = committer; - try { - committer.init(new CommitterSettings(shardPath, indexSettings)); - } catch (IOException e) { - throw new RuntimeException("Failed to initialize committer", e); - } } /** @@ -217,7 +211,18 @@ public RefreshResult refresh(RefreshInput refreshInput) throws IOException { // Flush each writer to disk and build segments from the file infos for (CompositeWriter writer : dataFormatWriters) { - FileInfos fileInfos = writer.flush(); + FileInfos fileInfos; + try { + fileInfos = writer.flush(); + } catch (IOException e) { + logger.error( + "Writer gen={} flush failed - aborting writer and skipping its segments", + writer.getWriterGeneration(), e + ); + writer.abort(); + writer.close(); + continue; + } Segment.Builder segmentBuilder = Segment.builder(writer.getWriterGeneration()); boolean hasFiles = false; for (Map.Entry entry : fileInfos.writerFilesMap().entrySet()) { diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeIndexingExecutionEngineTests.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeIndexingExecutionEngineTests.java index 64b3200d16d2c..9c97c44641a9a 100644 --- a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeIndexingExecutionEngineTests.java +++ b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeIndexingExecutionEngineTests.java @@ -17,7 +17,6 @@ import org.opensearch.index.engine.dataformat.DataFormatPlugin; import org.opensearch.index.engine.dataformat.RefreshInput; import org.opensearch.index.engine.exec.commit.Committer; -import org.opensearch.index.engine.exec.commit.CommitterSettings; import org.opensearch.index.engine.exec.coord.CatalogSnapshotManager; import org.opensearch.test.OpenSearchTestCase; @@ -180,15 +179,8 @@ public void testDeleteFilesDoesNotThrow() throws Exception { engine.deleteFiles(Map.of()); } - // --- Task 8.5: Property test — Committer is required --- + // --- Committer is required --- - /** - * Property 2: Committer is required. - * Attempting to construct CompositeIndexingExecutionEngine with a null Committer - * must throw IllegalStateException. - * - * Validates: Requirements 3.2 - */ public void testConstructorThrowsWhenCommitterNull() { Map plugins = new HashMap<>(); plugins.put("lucene", CompositeTestHelper.stubPlugin("lucene", 1)); @@ -201,14 +193,8 @@ public void testConstructorThrowsWhenCommitterNull() { assertTrue(ex.getMessage().contains("Committer must not be null")); } - // --- Task 8.6: Property test — Refresh never calls Committer methods --- + // --- Refresh never calls Committer methods --- - /** - * Property 5: Refresh never calls Committer methods. - * Running refresh() on the composite engine must not invoke commit() on the Committer. - * - * Validates: Requirements 3.6 - */ public void testRefreshNeverCallsCommitterMethods() throws IOException { TrackingCommitter tracking = new TrackingCommitter(); Map plugins = new HashMap<>(); @@ -217,7 +203,6 @@ public void testRefreshNeverCallsCommitterMethods() throws IOException { CompositeIndexingExecutionEngine engine = new CompositeIndexingExecutionEngine(plugins, indexSettings, null, null, tracking); - // Reset tracking after construction (init is called during construction) tracking.commitCalled = false; RefreshInput refreshInput = RefreshInput.builder().build(); @@ -226,17 +211,7 @@ public void testRefreshNeverCallsCommitterMethods() throws IOException { assertFalse("commit() must not be called during refresh", tracking.commitCalled); } - // --- Task 8.7: Unit tests for flush and Committer lifecycle --- - - public void testInitCalledDuringConstruction() { - CompositeTestHelper.StubCommitter stub = new CompositeTestHelper.StubCommitter(); - Map plugins = new HashMap<>(); - plugins.put("lucene", CompositeTestHelper.stubPlugin("lucene", 1)); - IndexSettings indexSettings = createIndexSettings("lucene"); - - new CompositeIndexingExecutionEngine(plugins, indexSettings, null, null, stub); - assertTrue("init() must be called during construction", stub.initCalled); - } + // --- Committer lifecycle tests --- public void testCloseCalledDuringShutdown() { CompositeTestHelper.StubCommitter stub = new CompositeTestHelper.StubCommitter(); @@ -249,51 +224,8 @@ public void testCloseCalledDuringShutdown() { assertTrue("close() must be called during shutdown", stub.closeCalled); } - public void testInitFailurePreventsConstruction() { - Committer failingInit = new Committer() { - @Override - public void init(CommitterSettings settings) throws IOException { - throw new IOException("init failed"); - } - - @Override - public void commit(Map commitData) {} - - @Override - public void close() {} - - @Override - public Map getLastCommittedData() { - return Map.of(); - } - - @Override - public CommitStats getCommitStats() { - return null; - } - - @Override - public SafeCommitInfo getSafeCommitInfo() { - return SafeCommitInfo.EMPTY; - } - }; - - Map plugins = new HashMap<>(); - plugins.put("lucene", CompositeTestHelper.stubPlugin("lucene", 1)); - IndexSettings indexSettings = createIndexSettings("lucene"); - - RuntimeException ex = expectThrows( - RuntimeException.class, - () -> new CompositeIndexingExecutionEngine(plugins, indexSettings, null, null, failingInit) - ); - assertTrue(ex.getMessage().contains("Failed to initialize committer")); - } - public void testCloseFailureIsLoggedAndShutdownContinues() { Committer failingClose = new Committer() { - @Override - public void init(CommitterSettings settings) {} - @Override public void commit(Map commitData) {} @@ -346,9 +278,6 @@ public void testFlushCallsCommitterCommit() throws IOException { public void testFlushPropagatesIOExceptionFromCommit() { Committer failingCommit = new Committer() { - @Override - public void init(CommitterSettings settings) {} - @Override public void commit(Map commitData) throws IOException { throw new IOException("commit failed"); @@ -397,16 +326,10 @@ public void testFlushThrowsWhenCatalogSnapshotManagerNotSet() { * A Committer that tracks which methods were called, for test assertions. */ private static class TrackingCommitter implements Committer { - boolean initCalled = false; boolean commitCalled = false; boolean closeCalled = false; Map lastCommitData = null; - @Override - public void init(CommitterSettings settings) { - initCalled = true; - } - @Override public void commit(Map commitData) { commitCalled = true; diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java index bee1a07ee7964..aacd60f0e3c49 100644 --- a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java +++ b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java @@ -26,7 +26,6 @@ import org.opensearch.index.engine.dataformat.WriteResult; import org.opensearch.index.engine.dataformat.Writer; import org.opensearch.index.engine.exec.commit.Committer; -import org.opensearch.index.engine.exec.commit.CommitterSettings; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.shard.ShardPath; @@ -234,17 +233,11 @@ public void close() {} } /** - * Minimal stub Committer that records init/close calls and does nothing on commit. + * Minimal stub Committer that records close calls and does nothing on commit. */ static class StubCommitter implements Committer { - boolean initCalled = false; boolean closeCalled = false; - @Override - public void init(CommitterSettings settings) { - initCalled = true; - } - @Override public void commit(Map commitData) {} diff --git a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java index 598dd96cfd8a1..81ea234b6e2ad 100644 --- a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java +++ b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java @@ -137,7 +137,6 @@ public Set getRegisteredFormats() { /** * Creates {@link EngineReaderManager} instances for all applicable data formats. * - * @param committer the committer holding the backing store, or null if not available * @param mapperService the mapper service (reserved for future filtering) * @param indexSettings the index settings (reserved for future filtering) * @param shardPath the shard path used to create reader managers @@ -145,14 +144,13 @@ public Set getRegisteredFormats() { * @throws IOException if reader manager creation fails */ public Map> getReaderManagers( - Committer committer, MapperService mapperService, IndexSettings indexSettings, ShardPath shardPath ) throws IOException { Map> readerManagers = new HashMap<>(); for (Map.Entry> entry : readerManagerPlugins.entrySet()) { - readerManagers.put(entry.getKey(), entry.getValue().createReaderManager(committer, entry.getKey(), shardPath)); + readerManagers.put(entry.getKey(), entry.getValue().createReaderManager(entry.getKey(), shardPath)); } return readerManagers; } diff --git a/server/src/main/java/org/opensearch/index/engine/exec/DataFormatAwareEngineFactory.java b/server/src/main/java/org/opensearch/index/engine/exec/DataFormatAwareEngineFactory.java index 8b2570f65db66..c094ab789c357 100644 --- a/server/src/main/java/org/opensearch/index/engine/exec/DataFormatAwareEngineFactory.java +++ b/server/src/main/java/org/opensearch/index/engine/exec/DataFormatAwareEngineFactory.java @@ -11,7 +11,6 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.index.engine.DataFormatAwareEngine; import org.opensearch.index.engine.dataformat.DataFormat; -import org.opensearch.index.engine.exec.commit.Committer; import org.opensearch.index.shard.ShardPath; import org.opensearch.plugins.PluginsService; import org.opensearch.plugins.SearchBackEndPlugin; @@ -45,18 +44,16 @@ public DataFormatAwareEngineFactory(PluginsService pluginsService, ShardPath sha /** * Creates reader managers for all discovered search back-end plugins. - * The {@link Committer} is passed through so plugins can access the backing store. * - * @param committer the committer holding the backing store, or null if not available * @return a map of data format to reader manager * @throws IOException if reader manager creation fails */ @SuppressWarnings("unchecked") - public Map> createReaderManagers(Committer committer) throws IOException { + public Map> createReaderManagers() throws IOException { Map> readerManagers = new HashMap<>(); for (SearchBackEndPlugin plugin : searchBackEndPlugins) { for (DataFormat format : plugin.getSupportedFormats()) { - readerManagers.put(format, plugin.createReaderManager(committer, format, shardPath)); + readerManagers.put(format, plugin.createReaderManager(format, shardPath)); } } return readerManagers; diff --git a/server/src/main/java/org/opensearch/index/engine/exec/commit/Committer.java b/server/src/main/java/org/opensearch/index/engine/exec/commit/Committer.java index d6b05a2a5ad43..c24e9ff279bd4 100644 --- a/server/src/main/java/org/opensearch/index/engine/exec/commit/Committer.java +++ b/server/src/main/java/org/opensearch/index/engine/exec/commit/Committer.java @@ -26,7 +26,7 @@ * The caller is responsible for serializing any higher-level state (e.g., CatalogSnapshot) * into the commit data before calling {@link #commit}. *

- * Lifecycle: {@link #init(CommitterSettings)} is called once during engine construction, + * Lifecycle: implementations receive {@link CommitterSettings} at construction time, * {@link #commit(Map)} is called during flush, and {@link #close()} is called * during engine shutdown. * @@ -35,15 +35,6 @@ @ExperimentalApi public interface Committer extends Closeable { - /** - * Initializes the committer with the given settings. - * Called once during engine construction before any indexing operations. - * - * @param settings initialization parameters (e.g., shard path, index settings) - * @throws IOException if initialization fails - */ - void init(CommitterSettings settings) throws IOException; - /** * Durably commits the given data to the backing store's commit metadata. * Called during the engine's flush path. diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 536d38d48f2f5..4532c84727040 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -581,7 +581,7 @@ public boolean shouldCache(Query query) { // For now, reader managers are created without a committer — the Lucene reader manager // will fail if used before the committer is wired at engine construction time. DataFormatAwareEngine dfaEngine = new DataFormatAwareEngine( - dataFormatRegistry.getReaderManagers(null, mapperService, indexSettings, path) + dataFormatRegistry.getReaderManagers(mapperService, indexSettings, path) ); this.currentCompositeEngineReference.set(dfaEngine); } diff --git a/server/src/main/java/org/opensearch/plugins/EnginePlugin.java b/server/src/main/java/org/opensearch/plugins/EnginePlugin.java index c8c226c4ea490..ab447fe7d5a08 100644 --- a/server/src/main/java/org/opensearch/plugins/EnginePlugin.java +++ b/server/src/main/java/org/opensearch/plugins/EnginePlugin.java @@ -38,6 +38,7 @@ import org.opensearch.index.codec.CodecServiceFactory; import org.opensearch.index.engine.EngineFactory; import org.opensearch.index.engine.exec.commit.Committer; +import org.opensearch.index.engine.exec.commit.CommitterSettings; import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.translog.TranslogDeletionPolicy; import org.opensearch.index.translog.TranslogDeletionPolicyFactory; @@ -119,14 +120,14 @@ default Optional getCustomTranslogDeletionPolicyF } /** - * When an index is created this method is invoked for each engine plugin. Engine plugins can inspect the index settings to determine + * When an index is created this method is invoked for each engine plugin. Engine plugins can inspect the settings to determine * whether or not to provide a {@link Committer} for the given index. A plugin that does not provide a Committer should return * {@link Optional#empty()}. * - * @param indexSettings the index settings + * @param committerSettings the committer settings (shard path, index settings, engine config) * @return an optional committer */ - default Optional getCommitter(IndexSettings indexSettings) { + default Optional getCommitter(CommitterSettings committerSettings) { return Optional.empty(); } } diff --git a/server/src/main/java/org/opensearch/plugins/SearchBackEndPlugin.java b/server/src/main/java/org/opensearch/plugins/SearchBackEndPlugin.java index d3d149fe98189..8b32c259c5cc5 100644 --- a/server/src/main/java/org/opensearch/plugins/SearchBackEndPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/SearchBackEndPlugin.java @@ -10,7 +10,6 @@ import org.opensearch.index.engine.dataformat.DataFormat; import org.opensearch.index.engine.exec.EngineReaderManager; -import org.opensearch.index.engine.exec.commit.Committer; import org.opensearch.index.shard.ShardPath; import java.io.IOException; @@ -40,14 +39,13 @@ public interface SearchBackEndPlugin { /** * Creates a reader manager for the given data format and shard. - * The {@link Committer} provides access to the backing store (e.g., IndexWriter) - * so that the reader manager can open readers from the same writer. + * Each plugin manages its own internal state (e.g., committer, writer) + * needed to open readers. * - * @param committer the committer holding the backing store, or null if not available * @param format the data format * @param shardPath the shard path * @return the reader manager * @throws IOException if reader creation fails */ - EngineReaderManager createReaderManager(Committer committer, DataFormat format, ShardPath shardPath) throws IOException; + EngineReaderManager createReaderManager(DataFormat format, ShardPath shardPath) throws IOException; } diff --git a/server/src/test/java/org/opensearch/index/engine/DataFormatAwareEngineCommitterTests.java b/server/src/test/java/org/opensearch/index/engine/DataFormatAwareEngineCommitterTests.java index 1091824d67e7c..aff0bdbbc7c45 100644 --- a/server/src/test/java/org/opensearch/index/engine/DataFormatAwareEngineCommitterTests.java +++ b/server/src/test/java/org/opensearch/index/engine/DataFormatAwareEngineCommitterTests.java @@ -9,7 +9,6 @@ package org.opensearch.index.engine; import org.opensearch.index.engine.exec.commit.Committer; -import org.opensearch.index.engine.exec.commit.CommitterSettings; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -37,9 +36,6 @@ public void testGetCommitterReturnsNullByDefault() { public void testSetCommitterThenGetCommitterReturnsSameInstance() { DataFormatAwareEngine engine = new DataFormatAwareEngine(new HashMap<>()); Committer committer = new Committer() { - @Override - public void init(CommitterSettings settings) throws IOException {} - @Override public void commit(Map commitData) throws IOException {} diff --git a/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatRegistryTests.java b/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatRegistryTests.java index ce1a22bba9bb2..310e306ad22f9 100644 --- a/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatRegistryTests.java +++ b/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatRegistryTests.java @@ -261,7 +261,7 @@ public void testGetReaderManagers() throws IOException { DataFormatRegistry registry = new DataFormatRegistry(pluginsService); - Map> managers = registry.getReaderManagers(null, mapperService, indexSettings, shardPath); + Map> managers = registry.getReaderManagers(mapperService, indexSettings, shardPath); assertEquals(1, managers.size()); assertNotNull(managers.get(format)); } diff --git a/server/src/test/java/org/opensearch/index/engine/dataformat/stub/MockSearchBackEndPlugin.java b/server/src/test/java/org/opensearch/index/engine/dataformat/stub/MockSearchBackEndPlugin.java index e5b86232c01d0..7e810a4713644 100644 --- a/server/src/test/java/org/opensearch/index/engine/dataformat/stub/MockSearchBackEndPlugin.java +++ b/server/src/test/java/org/opensearch/index/engine/dataformat/stub/MockSearchBackEndPlugin.java @@ -10,7 +10,6 @@ import org.opensearch.index.engine.dataformat.DataFormat; import org.opensearch.index.engine.exec.EngineReaderManager; -import org.opensearch.index.engine.exec.commit.Committer; import org.opensearch.index.shard.ShardPath; import org.opensearch.plugins.SearchBackEndPlugin; @@ -34,7 +33,7 @@ public List getSupportedFormats() { } @Override - public EngineReaderManager createReaderManager(Committer committer, DataFormat format, ShardPath shardPath) { + public EngineReaderManager createReaderManager(DataFormat format, ShardPath shardPath) { return new MockReaderManager("mock-columnar"); } } diff --git a/server/src/test/java/org/opensearch/index/engine/exec/commit/CommitterTests.java b/server/src/test/java/org/opensearch/index/engine/exec/commit/CommitterTests.java index 5c8530f5efaa1..966185c36c691 100644 --- a/server/src/test/java/org/opensearch/index/engine/exec/commit/CommitterTests.java +++ b/server/src/test/java/org/opensearch/index/engine/exec/commit/CommitterTests.java @@ -24,9 +24,6 @@ public class CommitterTests extends OpenSearchTestCase { /** Creates a minimal Committer with no-op implementations for all methods. */ private static Committer noOpCommitter() { return new Committer() { - @Override - public void init(CommitterSettings settings) throws IOException {} - @Override public void commit(Map commitData) throws IOException {} @@ -58,9 +55,6 @@ public void testMinimalImplementationCanBeInstantiated() { public void testCloseFromCloseableIsCallable() throws IOException { AtomicBoolean closed = new AtomicBoolean(false); Committer committer = new Committer() { - @Override - public void init(CommitterSettings settings) {} - @Override public void commit(Map commitData) {} @@ -92,9 +86,6 @@ public SafeCommitInfo getSafeCommitInfo() { public void testCommitterWorksWithTryWithResources() throws IOException { AtomicBoolean closed = new AtomicBoolean(false); try (Committer committer = new Committer() { - @Override - public void init(CommitterSettings settings) {} - @Override public void commit(Map commitData) {} @@ -123,46 +114,9 @@ public SafeCommitInfo getSafeCommitInfo() { assertTrue("close() should have been called by try-with-resources", closed.get()); } - public void testInitIsCallable() throws IOException { - AtomicBoolean initialized = new AtomicBoolean(false); - Committer committer = new Committer() { - @Override - public void init(CommitterSettings settings) { - initialized.set(true); - } - - @Override - public void commit(Map commitData) {} - - @Override - public void close() {} - - @Override - public Map getLastCommittedData() { - return Map.of(); - } - - @Override - public CommitStats getCommitStats() { - return null; - } - - @Override - public SafeCommitInfo getSafeCommitInfo() { - return SafeCommitInfo.EMPTY; - } - }; - - committer.init(null); - assertTrue("init() should have been called", initialized.get()); - } - public void testCommitIsCallable() throws IOException { AtomicBoolean committed = new AtomicBoolean(false); Committer committer = new Committer() { - @Override - public void init(CommitterSettings settings) {} - @Override public void commit(Map commitData) { committed.set(true); From 4ae813fc9f49df0f2d01bc78f520e284ccdcde7a Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Tue, 7 Apr 2026 12:15:59 +0530 Subject: [PATCH 3/3] Simplify committer interface and limit broad usage Signed-off-by: Bukhtawar Khan --- .../be/datafusion/DataFusionPlugin.java | 7 +++- .../opensearch/be/lucene/LuceneCommitter.java | 5 +-- .../be/lucene/LuceneSearchEnginePlugin.java | 31 ++++++++-------- .../lucene/LuceneSearchEnginePluginTests.java | 35 ++++++++++++++----- .../engine/dataformat/DataFormatRegistry.java | 28 +++++++++++++-- .../exec/DataFormatAwareEngineFactory.java | 5 +-- .../engine/exec/IndexWriterProvider.java | 32 +++++++++++++++++ .../opensearch/index/shard/IndexShard.java | 3 -- .../plugins/SearchBackEndPlugin.java | 11 ++++-- .../stub/MockSearchBackEndPlugin.java | 3 +- 10 files changed, 123 insertions(+), 37 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/engine/exec/IndexWriterProvider.java diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java index c406856e7fa47..491e3f713955b 100644 --- a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java +++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java @@ -24,6 +24,7 @@ import org.opensearch.env.NodeEnvironment; import org.opensearch.index.engine.dataformat.DataFormat; import org.opensearch.index.engine.exec.EngineReaderManager; +import org.opensearch.index.engine.exec.IndexWriterProvider; import org.opensearch.index.shard.ShardPath; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.SearchBackEndPlugin; @@ -110,7 +111,11 @@ public String name() { } @Override - public EngineReaderManager createReaderManager(DataFormat format, ShardPath shardPath) throws IOException { + public EngineReaderManager createReaderManager( + DataFormat format, + ShardPath shardPath, + IndexWriterProvider indexWriterProvider + ) throws IOException { return new DatafusionReaderManager(format, shardPath, dataFusionService); } diff --git a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneCommitter.java b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneCommitter.java index c27c8ca6f582c..76b8a25882332 100644 --- a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneCommitter.java +++ b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneCommitter.java @@ -19,6 +19,7 @@ import org.opensearch.index.engine.CommitStats; import org.opensearch.index.engine.EngineConfig; import org.opensearch.index.engine.SafeCommitInfo; +import org.opensearch.index.engine.exec.IndexWriterProvider; import org.opensearch.index.engine.exec.commit.Committer; import org.opensearch.index.engine.exec.commit.CommitterSettings; import org.opensearch.index.seqno.SequenceNumbers; @@ -46,7 +47,7 @@ * @opensearch.experimental */ @ExperimentalApi -public class LuceneCommitter implements Committer { +public class LuceneCommitter implements Committer, IndexWriterProvider { private static final Logger logger = LogManager.getLogger(LuceneCommitter.class); @@ -116,7 +117,7 @@ public void close() throws IOException { * * @return the IndexWriter, or null if not initialized */ - IndexWriter getIndexWriter() { + public IndexWriter getIndexWriter() { return indexWriter; } diff --git a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchEnginePlugin.java b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchEnginePlugin.java index 5a968f38ddebe..b43b94c8db0e6 100644 --- a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchEnginePlugin.java +++ b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchEnginePlugin.java @@ -14,6 +14,7 @@ import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.index.engine.dataformat.DataFormat; import org.opensearch.index.engine.exec.EngineReaderManager; +import org.opensearch.index.engine.exec.IndexWriterProvider; import org.opensearch.index.engine.exec.commit.Committer; import org.opensearch.index.engine.exec.commit.CommitterSettings; import org.opensearch.index.shard.ShardPath; @@ -33,17 +34,14 @@ *

  • {@link SearchBackEndPlugin} — provides {@link LuceneReaderManager} for search.
  • * *

    - * Both {@link #createReaderManager} accepts a {@link Committer}. - * When the committer is a {@link LuceneCommitter}, the IndexWriter is extracted and shared - * with the reader manager on a given shard. + * {@link #createReaderManager} uses the supplied {@link IndexWriterProvider} to obtain + * the shard's IndexWriter for opening NRT readers. The plugin itself holds no per-shard state. * * @opensearch.experimental */ @ExperimentalApi public class LuceneSearchEnginePlugin implements SearchBackEndPlugin, EnginePlugin { - private LuceneCommitter luceneCommitter; - /** Creates a new LuceneSearchEnginePlugin. */ public LuceneSearchEnginePlugin() {} @@ -55,16 +53,22 @@ public String name() { // --- SearchBackEndPlugin --- @Override - public EngineReaderManager createReaderManager(DataFormat format, ShardPath shardPath) - throws IOException { - if (luceneCommitter == null) { - throw new IllegalStateException("getCommitter() must be called before createReaderManager()"); + public EngineReaderManager createReaderManager( + DataFormat format, + ShardPath shardPath, + IndexWriterProvider indexWriterProvider + ) throws IOException { + if (indexWriterProvider == null) { + throw new IllegalStateException("IndexWriterProvider is required for Lucene reader manager"); } - IndexWriter writer = luceneCommitter.getIndexWriter(); + IndexWriter writer = indexWriterProvider.getIndexWriter(); if (writer == null) { - throw new IllegalStateException("LuceneCommitter not initialized"); + throw new IllegalStateException("IndexWriterProvider returned null"); } - OpenSearchDirectoryReader osReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardPath.getShardId()); + OpenSearchDirectoryReader osReader = OpenSearchDirectoryReader.wrap( + DirectoryReader.open(writer), + shardPath.getShardId() + ); return new LuceneReaderManager(format, osReader); } @@ -78,10 +82,9 @@ public List getSupportedFormats() { @Override public Optional getCommitter(CommitterSettings committerSettings) { try { - this.luceneCommitter = new LuceneCommitter(committerSettings); + return Optional.of(new LuceneCommitter(committerSettings)); } catch (IOException e) { throw new RuntimeException("Failed to create LuceneCommitter", e); } - return Optional.of(luceneCommitter); } } diff --git a/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneSearchEnginePluginTests.java b/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneSearchEnginePluginTests.java index 10e43adf56c4a..329a041566e1f 100644 --- a/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneSearchEnginePluginTests.java +++ b/sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneSearchEnginePluginTests.java @@ -27,26 +27,43 @@ */ public class LuceneSearchEnginePluginTests extends OpenSearchTestCase { - /** - * Test that getCommitter() returns a non-empty Optional containing - * a LuceneCommitter instance. - * - * Validates: Requirements 4.2 - */ - public void testGetCommitterReturnsLuceneCommitter() throws IOException { + private CommitterSettings createCommitterSettings() throws IOException { Path baseDir = createTempDir(); ShardId shardId = new ShardId("test", "_na_", 0); Path dataPath = baseDir.resolve(shardId.getIndex().getUUID()).resolve(Integer.toString(shardId.id())); Files.createDirectories(dataPath); ShardPath shardPath = new ShardPath(false, dataPath, dataPath, shardId); IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", Settings.EMPTY); - CommitterSettings committerSettings = new CommitterSettings(shardPath, indexSettings); + return new CommitterSettings(shardPath, indexSettings); + } + public void testGetCommitterReturnsLuceneCommitter() throws IOException { LuceneSearchEnginePlugin plugin = new LuceneSearchEnginePlugin(); - Optional committer = plugin.getCommitter(committerSettings); + Optional committer = plugin.getCommitter(createCommitterSettings()); assertTrue("getCommitter() should return a non-empty Optional", committer.isPresent()); assertTrue("getCommitter() should return a LuceneCommitter instance", committer.get() instanceof LuceneCommitter); committer.get().close(); } + + public void testGetCommitterReturnsNewInstancePerCall() throws IOException { + LuceneSearchEnginePlugin plugin = new LuceneSearchEnginePlugin(); + + Optional committer1 = plugin.getCommitter(createCommitterSettings()); + Optional committer2 = plugin.getCommitter(createCommitterSettings()); + + assertNotSame("Each call should return a new instance", committer1.get(), committer2.get()); + committer1.get().close(); + committer2.get().close(); + } + + public void testCreateReaderManagerThrowsWithoutProvider() { + LuceneSearchEnginePlugin plugin = new LuceneSearchEnginePlugin(); + + IllegalStateException ex = expectThrows( + IllegalStateException.class, + () -> plugin.createReaderManager(null, null, null) + ); + assertTrue(ex.getMessage().contains("IndexWriterProvider is required")); + } } diff --git a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java index 81ea234b6e2ad..097a145997a33 100644 --- a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java +++ b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java @@ -11,9 +11,12 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.exec.EngineReaderManager; +import org.opensearch.index.engine.exec.IndexWriterProvider; import org.opensearch.index.engine.exec.commit.Committer; +import org.opensearch.index.engine.exec.commit.CommitterSettings; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.shard.ShardPath; +import org.opensearch.plugins.EnginePlugin; import org.opensearch.plugins.PluginsService; import org.opensearch.plugins.SearchBackEndPlugin; @@ -136,9 +139,14 @@ public Set getRegisteredFormats() { /** * Creates {@link EngineReaderManager} instances for all applicable data formats. + *

    + * If any registered {@link SearchBackEndPlugin} also implements {@link EnginePlugin}, + * it is queried for a {@link Committer}. When the committer implements + * {@link IndexWriterProvider}, it is passed to each backend so that Lucene can open + * NRT readers on the same IndexWriter. * * @param mapperService the mapper service (reserved for future filtering) - * @param indexSettings the index settings (reserved for future filtering) + * @param indexSettings the index settings * @param shardPath the shard path used to create reader managers * @return a map from data format to its reader manager * @throws IOException if reader manager creation fails @@ -148,9 +156,25 @@ public Map> getReaderManagers( IndexSettings indexSettings, ShardPath shardPath ) throws IOException { + // Check if any SearchBackEndPlugin also provides a Committer via EnginePlugin + IndexWriterProvider indexWriterProvider = null; + CommitterSettings committerSettings = new CommitterSettings(shardPath, indexSettings); + for (SearchBackEndPlugin plugin : readerManagerPlugins.values()) { + if (plugin instanceof EnginePlugin) { + var optCommitter = ((EnginePlugin) plugin).getCommitter(committerSettings); + if (optCommitter.isPresent()) { + Committer committer = optCommitter.get(); + if (committer instanceof IndexWriterProvider) { + indexWriterProvider = (IndexWriterProvider) committer; + } + break; + } + } + } + Map> readerManagers = new HashMap<>(); for (Map.Entry> entry : readerManagerPlugins.entrySet()) { - readerManagers.put(entry.getKey(), entry.getValue().createReaderManager(entry.getKey(), shardPath)); + readerManagers.put(entry.getKey(), entry.getValue().createReaderManager(entry.getKey(), shardPath, indexWriterProvider)); } return readerManagers; } diff --git a/server/src/main/java/org/opensearch/index/engine/exec/DataFormatAwareEngineFactory.java b/server/src/main/java/org/opensearch/index/engine/exec/DataFormatAwareEngineFactory.java index c094ab789c357..49c2245c4d818 100644 --- a/server/src/main/java/org/opensearch/index/engine/exec/DataFormatAwareEngineFactory.java +++ b/server/src/main/java/org/opensearch/index/engine/exec/DataFormatAwareEngineFactory.java @@ -45,15 +45,16 @@ public DataFormatAwareEngineFactory(PluginsService pluginsService, ShardPath sha /** * Creates reader managers for all discovered search back-end plugins. * + * @param indexWriterProvider provider for the shard's IndexWriter, or null if not available * @return a map of data format to reader manager * @throws IOException if reader manager creation fails */ @SuppressWarnings("unchecked") - public Map> createReaderManagers() throws IOException { + public Map> createReaderManagers(IndexWriterProvider indexWriterProvider) throws IOException { Map> readerManagers = new HashMap<>(); for (SearchBackEndPlugin plugin : searchBackEndPlugins) { for (DataFormat format : plugin.getSupportedFormats()) { - readerManagers.put(format, plugin.createReaderManager(format, shardPath)); + readerManagers.put(format, plugin.createReaderManager(format, shardPath, indexWriterProvider)); } } return readerManagers; diff --git a/server/src/main/java/org/opensearch/index/engine/exec/IndexWriterProvider.java b/server/src/main/java/org/opensearch/index/engine/exec/IndexWriterProvider.java new file mode 100644 index 0000000000000..cd7566ba55311 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/IndexWriterProvider.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec; + +import org.apache.lucene.index.IndexWriter; +import org.opensearch.common.annotation.ExperimentalApi; + +/** + * Provides read-only access to the shard's {@link IndexWriter} for opening NRT readers. + *

    + * This abstraction exists so that the {@link org.opensearch.plugins.SearchBackEndPlugin} + * SPI does not leak Lucene types. Implementations wrap a per-shard IndexWriter and + * expose it only to backends that need it (e.g., Lucene NRT reader managers). + * + * @opensearch.experimental + */ +@ExperimentalApi +public interface IndexWriterProvider { + + /** + * Returns the IndexWriter for the current shard. + * + * @return the IndexWriter, never null + */ + IndexWriter getIndexWriter(); +} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 4532c84727040..4742e5a22c6b4 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -577,9 +577,6 @@ public boolean shouldCache(Query query) { } this.dataFormatRegistry = dataFormatRegistry; if (dataFormatRegistry != null) { - // TODO: Wire the Committer from EnginePlugin discovery and pass it here. - // For now, reader managers are created without a committer — the Lucene reader manager - // will fail if used before the committer is wired at engine construction time. DataFormatAwareEngine dfaEngine = new DataFormatAwareEngine( dataFormatRegistry.getReaderManagers(mapperService, indexSettings, path) ); diff --git a/server/src/main/java/org/opensearch/plugins/SearchBackEndPlugin.java b/server/src/main/java/org/opensearch/plugins/SearchBackEndPlugin.java index 8b32c259c5cc5..5623d95ae461a 100644 --- a/server/src/main/java/org/opensearch/plugins/SearchBackEndPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/SearchBackEndPlugin.java @@ -10,6 +10,7 @@ import org.opensearch.index.engine.dataformat.DataFormat; import org.opensearch.index.engine.exec.EngineReaderManager; +import org.opensearch.index.engine.exec.IndexWriterProvider; import org.opensearch.index.shard.ShardPath; import java.io.IOException; @@ -39,13 +40,17 @@ public interface SearchBackEndPlugin { /** * Creates a reader manager for the given data format and shard. - * Each plugin manages its own internal state (e.g., committer, writer) - * needed to open readers. + *

    + * The optional {@link IndexWriterProvider} gives backends access to the + * shard's IndexWriter for opening NRT readers. Backends that do not need + * a writer (e.g., DataFusion) may ignore this parameter. * * @param format the data format * @param shardPath the shard path + * @param indexWriterProvider provider for the shard's IndexWriter, or null if not available * @return the reader manager * @throws IOException if reader creation fails */ - EngineReaderManager createReaderManager(DataFormat format, ShardPath shardPath) throws IOException; + EngineReaderManager createReaderManager(DataFormat format, ShardPath shardPath, IndexWriterProvider indexWriterProvider) + throws IOException; } diff --git a/server/src/test/java/org/opensearch/index/engine/dataformat/stub/MockSearchBackEndPlugin.java b/server/src/test/java/org/opensearch/index/engine/dataformat/stub/MockSearchBackEndPlugin.java index 7e810a4713644..6905bc3cb3ccd 100644 --- a/server/src/test/java/org/opensearch/index/engine/dataformat/stub/MockSearchBackEndPlugin.java +++ b/server/src/test/java/org/opensearch/index/engine/dataformat/stub/MockSearchBackEndPlugin.java @@ -10,6 +10,7 @@ import org.opensearch.index.engine.dataformat.DataFormat; import org.opensearch.index.engine.exec.EngineReaderManager; +import org.opensearch.index.engine.exec.IndexWriterProvider; import org.opensearch.index.shard.ShardPath; import org.opensearch.plugins.SearchBackEndPlugin; @@ -33,7 +34,7 @@ public List getSupportedFormats() { } @Override - public EngineReaderManager createReaderManager(DataFormat format, ShardPath shardPath) { + public EngineReaderManager createReaderManager(DataFormat format, ShardPath shardPath, IndexWriterProvider indexWriterProvider) { return new MockReaderManager("mock-columnar"); } }