Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ testfixtures_shared/
# These are generated from .ci/jobs.t
.ci/jobs/

# native build files
*.dylib

# build files generated
doc-tools/missing-doclet/bin/
**/Cargo.lock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.engine.dataformat.DataFormat;
import org.opensearch.index.engine.exec.EngineReaderManager;
import org.opensearch.index.engine.exec.IndexWriterProvider;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchBackEndPlugin;
Expand Down Expand Up @@ -110,7 +111,11 @@ public String name() {
}

@Override
public EngineReaderManager<DatafusionReader> createReaderManager(DataFormat format, ShardPath shardPath) throws IOException {
public EngineReaderManager<DatafusionReader> createReaderManager(
DataFormat format,
ShardPath shardPath,
IndexWriterProvider indexWriterProvider
) throws IOException {
return new DatafusionReaderManager(format, shardPath, dataFusionService);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.be.lucene;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.engine.CommitStats;
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.index.engine.SafeCommitInfo;
import org.opensearch.index.engine.exec.IndexWriterProvider;
import org.opensearch.index.engine.exec.commit.Committer;
import org.opensearch.index.engine.exec.commit.CommitterSettings;
import org.opensearch.index.seqno.SequenceNumbers;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;

/**
* Lucene-specific {@link Committer} that owns the {@link IndexWriter} lifecycle.
* <p>
* Responsibilities:
* <ul>
* <li>Constructor — opens the IndexWriter on the shard's Lucene directory</li>
* <li>{@link #commit} — serializes the CatalogSnapshot as Lucene commit userData</li>
* <li>{@link #close} — closes the IndexWriter</li>
* </ul>
* <p>
* The IndexWriter is exposed via {@link #getIndexWriter()} so that
* {@link LuceneIndexingExecutionEngine} (which handles {@code addIndexes} during refresh)
* and {@link LuceneReaderManager} (which opens DirectoryReaders) can share the same writer.
*
* @opensearch.experimental
*/
@ExperimentalApi
public class LuceneCommitter implements Committer, IndexWriterProvider {

private static final Logger logger = LogManager.getLogger(LuceneCommitter.class);

/** Subdirectory under the shard data path where the Lucene index is stored. */
static final String LUCENE_DIR_NAME = "lucene";

private IndexWriter indexWriter;

/**
* Creates a new LuceneCommitter and immediately opens the IndexWriter.
*
* @param settings the committer settings (shard path, index settings, optional engine config)
* @throws IOException if the IndexWriter cannot be opened
*/
public LuceneCommitter(CommitterSettings settings) throws IOException {
Path luceneDir = settings.shardPath().getDataPath().resolve(LUCENE_DIR_NAME);
Files.createDirectories(luceneDir);
Directory directory = FSDirectory.open(luceneDir);
IndexWriterConfig iwc = createIndexWriterConfig(settings.engineConfig());
this.indexWriter = new IndexWriter(directory, iwc);
}

/**
* Creates an {@link IndexWriterConfig} from the engine configuration.
* When an {@link EngineConfig} is provided, the analyzer, codec, index sort,
* and similarity are taken from it. Otherwise a default config is used.
*/
private IndexWriterConfig createIndexWriterConfig(EngineConfig engineConfig) {
if (engineConfig == null) {
return new IndexWriterConfig();
}
IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
iwc.setCodec(engineConfig.getCodec());
iwc.setSimilarity(engineConfig.getSimilarity());
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());
iwc.setUseCompoundFile(engineConfig.useCompoundFile());
if (engineConfig.getIndexSort() != null) {
iwc.setIndexSort(engineConfig.getIndexSort());
}
iwc.setCommitOnClose(false);
iwc.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
return iwc;
}

@Override
public void commit(Map<String, String> commitData) throws IOException {
if (indexWriter == null) {
throw new IllegalStateException("LuceneCommitter is closed");
}
indexWriter.setLiveCommitData(commitData.entrySet());
indexWriter.commit();
}

@Override
public void close() throws IOException {
if (indexWriter != null) {
indexWriter.close();
indexWriter = null;
}
}

/**
* Returns the underlying IndexWriter.
* Used by {@link LuceneIndexingExecutionEngine} for {@code addIndexes} and
* by {@link LuceneReaderManager} for opening DirectoryReaders.
* Package-private — only accessible within the analytics-backend-lucene plugin.
*
* @return the IndexWriter, or null if not initialized
*/
public IndexWriter getIndexWriter() {
return indexWriter;
}

@Override
public Map<String, String> getLastCommittedData() throws IOException {
if (indexWriter == null) {
return Map.of();
}
Iterable<Map.Entry<String, String>> liveCommitData = indexWriter.getLiveCommitData();
if (liveCommitData == null) {
return Map.of();
}
Map<String, String> result = new HashMap<>();
for (Map.Entry<String, String> entry : liveCommitData) {
result.put(entry.getKey(), entry.getValue());
}
return Map.copyOf(result);
}

@Override
public CommitStats getCommitStats() {
if (indexWriter == null) {
return null;
}
try {
SegmentInfos segmentInfos = SegmentInfos.readLatestCommit(indexWriter.getDirectory());
return new CommitStats(segmentInfos);
} catch (IOException e) {
logger.warn("Failed to read segment infos for commit stats", e);
return null;
}
}

@Override
public SafeCommitInfo getSafeCommitInfo() {
if (indexWriter == null) {
return SafeCommitInfo.EMPTY;
}
try {
Map<String, String> commitData = getLastCommittedData();
long localCheckpoint = Long.parseLong(
commitData.getOrDefault(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(SequenceNumbers.NO_OPS_PERFORMED))
);
int docCount = indexWriter.getDocStats().numDocs;
return new SafeCommitInfo(localCheckpoint, docCount);
} catch (IOException e) {
logger.warn("Failed to get safe commit info", e);
return SafeCommitInfo.EMPTY;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.be.lucene;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.misc.store.HardlinkCopyDirectoryWrapper;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.NIOFSDirectory;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.index.engine.dataformat.DataFormat;
import org.opensearch.index.engine.dataformat.DocumentInput;
import org.opensearch.index.engine.dataformat.IndexingExecutionEngine;
import org.opensearch.index.engine.dataformat.Merger;
import org.opensearch.index.engine.dataformat.RefreshInput;
import org.opensearch.index.engine.dataformat.RefreshResult;
import org.opensearch.index.engine.dataformat.Writer;
import org.opensearch.index.engine.exec.WriterFileSet;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;

/**
* Lucene-specific {@link IndexingExecutionEngine} that incorporates flushed segments
* during refresh via {@code IndexWriter.addIndexes(Directory...)}.
* <p>
* Does not own the {@link IndexWriter} — it receives an optional parent writer
* from the {@link LuceneCommitter} which owns the writer lifecycle. This separation allows
* the committer to be used standalone (committer-only scenario) while the indexing engine
* is only created when Lucene participates as a per-format engine in the composite engine.
* <p>
* When no parent writer is provided, refresh is a no-op (no segments to incorporate).
*
* @opensearch.experimental
*/
@ExperimentalApi
public class LuceneIndexingExecutionEngine implements IndexingExecutionEngine<DataFormat, DocumentInput<?>> {

private static final Logger logger = LogManager.getLogger(LuceneIndexingExecutionEngine.class);
// TODO:: This will go once we implement the Dataformat plugin in Lucene
private static final String LUCENE_FORMAT_NAME = "lucene";

private final IndexWriter parentIndexWriter;

/**
* Creates a new LuceneIndexingExecutionEngine.
*
* @param parentIndexWriter the IndexWriter from the LuceneCommitter, or null if not available (refresh will be a no-op)
*/
public LuceneIndexingExecutionEngine(IndexWriter parentIndexWriter) {
this.parentIndexWriter = parentIndexWriter;
}

@Override
public RefreshResult refresh(RefreshInput refreshInput) throws IOException {
if (refreshInput == null || parentIndexWriter == null) {
return new RefreshResult(List.of());
}

List<Directory> directories = new ArrayList<>();
List<Path> sourcePaths = new ArrayList<>();
try {
for (WriterFileSet wfs : refreshInput.writerFiles()) {
Path dirPath = Path.of(wfs.directory());
if (dirPath.getFileName().toString().equals(LUCENE_FORMAT_NAME) == false) {
continue;
}
if (Files.isDirectory(dirPath)) {
directories.add(new HardlinkCopyDirectoryWrapper(new NIOFSDirectory(dirPath)));
sourcePaths.add(dirPath);
}
}
if (directories.isEmpty() == false) {
parentIndexWriter.addIndexes(directories.toArray(new Directory[0]));
}
} finally {
// Close directory handles first, then delete source files
for (Directory dir : directories) {
try {
dir.close();
} catch (IOException e) {
logger.warn("Failed to close directory after addIndexes", e);
}
}
for (Path sourcePath : sourcePaths) {
try {
IOUtils.rm(sourcePath);
} catch (IOException e) {
logger.warn("Failed to delete source directory [{}] after addIndexes", sourcePath, e);
}
}
}
return new RefreshResult(List.of());
}

@Override
public Writer<DocumentInput<?>> createWriter(long writerGeneration) {
throw new UnsupportedOperationException("createWriter not yet implemented for Lucene engine");
}

@Override
public Merger getMerger() {
return null;
}

@Override
public long getNextWriterGeneration() {
throw new UnsupportedOperationException("getNextWriterGeneration not yet implemented for Lucene engine");
}

@Override
public DataFormat getDataFormat() {
throw new UnsupportedOperationException("getDataFormat not yet implemented — LuceneDataFormat deferred to future PR");
}

@Override
public void deleteFiles(Map<String, Collection<String>> filesToDelete) throws IOException {
// Stub: file deletion to be implemented in a future iteration
}

@Override
public DocumentInput<?> newDocumentInput() {
throw new UnsupportedOperationException("newDocumentInput not yet implemented — LuceneDocumentInput deferred to future PR");
}
}
Loading
Loading