diff --git a/modules/store-subdirectory/src/main/java/org/opensearch/plugin/store/subdirectory/SubdirectoryAwareStore.java b/modules/store-subdirectory/src/main/java/org/opensearch/plugin/store/subdirectory/SubdirectoryAwareStore.java index 46667f173b80e..1efe0f0a532e4 100644 --- a/modules/store-subdirectory/src/main/java/org/opensearch/plugin/store/subdirectory/SubdirectoryAwareStore.java +++ b/modules/store-subdirectory/src/main/java/org/opensearch/plugin/store/subdirectory/SubdirectoryAwareStore.java @@ -27,6 +27,7 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.plugins.IndexStorePlugin; +import org.opensearch.repositories.NativeStoreRepository; import java.io.IOException; import java.nio.file.Path; @@ -103,6 +104,40 @@ public SubdirectoryAwareStore( ); } + /** + * Constructor for SubdirectoryAwareStore with native store support. + * + * @param shardId the shard ID + * @param indexSettings the index settings + * @param directory the directory to use for the store + * @param shardLock the shard lock + * @param onClose the on close callback + * @param shardPath the shard path + * @param directoryFactory the directory factory + * @param nativeStoreRepository the native object store repository + */ + public SubdirectoryAwareStore( + ShardId shardId, + IndexSettings indexSettings, + Directory directory, + ShardLock shardLock, + OnClose onClose, + ShardPath shardPath, + IndexStorePlugin.DirectoryFactory directoryFactory, + NativeStoreRepository nativeStoreRepository + ) { + super( + shardId, + indexSettings, + new SubdirectoryAwareDirectory(directory, shardPath), + shardLock, + onClose, + shardPath, + directoryFactory, + nativeStoreRepository + ); + } + @Override public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException { long totalNumDocs = 0; diff --git a/modules/store-subdirectory/src/main/java/org/opensearch/plugin/store/subdirectory/SubdirectoryStorePlugin.java b/modules/store-subdirectory/src/main/java/org/opensearch/plugin/store/subdirectory/SubdirectoryStorePlugin.java index 7c1836256469f..41f1a3dcc17cb 100644 --- a/modules/store-subdirectory/src/main/java/org/opensearch/plugin/store/subdirectory/SubdirectoryStorePlugin.java +++ b/modules/store-subdirectory/src/main/java/org/opensearch/plugin/store/subdirectory/SubdirectoryStorePlugin.java @@ -16,6 +16,7 @@ import org.opensearch.index.store.Store; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.plugins.Plugin; +import org.opensearch.repositories.NativeStoreRepository; import java.util.Collections; import java.util.HashMap; @@ -60,7 +61,7 @@ public Map getStoreFactories() { */ static class SubdirectoryStoreFactory implements StoreFactory { /** - * Creates a new {@link SubdirectoryAwareStore} instance. + * Creates a new {@link SubdirectoryAwareStore} instance with native store support. * * @param shardId the shard identifier * @param indexSettings the index settings @@ -68,31 +69,8 @@ static class SubdirectoryStoreFactory implements StoreFactory { * @param shardLock the shard lock * @param onClose callback to execute when the store is closed * @param shardPath the path information for the shard - * @return a new SubdirectoryAwareStore instance - */ - @Override - public Store newStore( - ShardId shardId, - IndexSettings indexSettings, - Directory directory, - ShardLock shardLock, - Store.OnClose onClose, - ShardPath shardPath - ) { - return new SubdirectoryAwareStore(shardId, indexSettings, directory, shardLock, onClose, shardPath); - } - - /** - * Creates a new {@link SubdirectoryAwareStore} instance. - * - * @param shardId the shard identifier - * @param indexSettings the index settings - * @param directory the underlying Lucene directory - * @param shardLock the shard lock - * @param onClose callback to execute when the store is closed - * @param shardPath the path information for the shard - * @param directoryFactory the directory factory to create child level directory. - * Used for Context Aware Segments enabled indices. + * @param directoryFactory the directory factory + * @param nativeStoreRepository the native object store repository * @return a new SubdirectoryAwareStore instance */ @Override @@ -103,9 +81,19 @@ public Store newStore( ShardLock shardLock, Store.OnClose onClose, ShardPath shardPath, - DirectoryFactory directoryFactory + DirectoryFactory directoryFactory, + NativeStoreRepository nativeStoreRepository ) { - return new SubdirectoryAwareStore(shardId, indexSettings, directory, shardLock, onClose, shardPath, directoryFactory); + return new SubdirectoryAwareStore( + shardId, + indexSettings, + directory, + shardLock, + onClose, + shardPath, + directoryFactory, + nativeStoreRepository + ); } } } diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java index e271ee574e815..20001fcf62169 100644 --- a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java @@ -17,6 +17,7 @@ import org.opensearch.index.engine.dataformat.DataFormat; import org.opensearch.index.engine.dataformat.DataFormatPlugin; import org.opensearch.index.engine.dataformat.DataFormatRegistry; +import org.opensearch.plugins.NativeStoreHandle; import org.opensearch.index.engine.dataformat.DocumentInput; import org.opensearch.index.engine.dataformat.IndexingEngineConfig; import org.opensearch.index.engine.dataformat.IndexingExecutionEngine; @@ -109,7 +110,7 @@ public CompositeIndexingExecutionEngine( validateFormatsRegistered(dataFormatRegistry, primaryFormatName, secondaryFormatNames); Map strategies = checksumStrategies != null ? checksumStrategies : Map.of(); - IndexingEngineConfig engineSettings = new IndexingEngineConfig(committer, mapperService, indexSettings, store, dataFormatRegistry); + IndexingEngineConfig engineSettings = new IndexingEngineConfig(committer, mapperService, indexSettings, store, dataFormatRegistry, NativeStoreHandle.EMPTY); List allFormats = new ArrayList<>(); DataFormat primaryFormat = dataFormatRegistry.format(primaryFormatName); diff --git a/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetDataFormatPlugin.java b/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetDataFormatPlugin.java index fc5da5742adf6..3f94c62864d84 100644 --- a/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetDataFormatPlugin.java +++ b/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetDataFormatPlugin.java @@ -26,9 +26,11 @@ import org.opensearch.index.engine.dataformat.IndexingExecutionEngine; import org.opensearch.index.store.FormatChecksumStrategy; import org.opensearch.index.store.PrecomputedChecksumStrategy; +import org.opensearch.index.store.Store; import org.opensearch.parquet.engine.ParquetDataFormat; import org.opensearch.parquet.engine.ParquetIndexingEngine; import org.opensearch.parquet.fields.ArrowSchemaBuilder; +import org.opensearch.plugins.NativeStoreHandle; import org.opensearch.plugins.Plugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptService; @@ -98,6 +100,40 @@ public DataFormat getDataFormat() { return dataFormat; } + @Override + public NativeStoreHandle createNativeStore(Store store) { + // Phase 2 implementation outline: + // + // 1. Get the repository-level native store from the Store + // NativeStoreRepository repoStore = store.getNativeStoreRepository(); + // + // 2. If a repository-level native store exists (remote-backed index), + // create a PrefixObjectStore scoped to this shard's path within the repository. + // This ensures the shard can only access its own files. + // if (repoStore.isLive()) { + // String shardPrefix = "indices/" + indexUUID + "/" + shardId + "/parquet/"; + // long scopedPtr = RustBridge.createScopedStore(repoStore.getPointer(), shardPrefix); + // return new NativeStoreHandle(scopedPtr, RustBridge::destroyStore); + // } + // + // 3. Otherwise (local-only index), create a LocalFileSystem ObjectStore + // rooted at the shard's data path. + // long localPtr = RustBridge.createLocalStore(store.shardPath().getDataPath().toString()); + // if (localPtr <= 0) return NativeStoreHandle.EMPTY; + // return new NativeStoreHandle(localPtr, RustBridge::destroyStore); + // + // TODO: Implement RustBridge.createLocalStore(String rootPath) — FFM call to + // parquet_create_local_store which creates LocalFileSystem::new_with_prefix(root) + // TODO: Implement RustBridge.createScopedStore(long parentHandle, String prefix) — FFM call to + // parquet_create_scoped_store which creates PrefixStore wrapping the parent store + // TODO: Implement RustBridge.destroyStore(long handle) — FFM call to + // parquet_destroy_store which drops the Arc + // TODO: Implement Rust object_store_handle.rs with create_local_store, create_scoped_store, drop_store + // TODO: Add FFM exports in ffm.rs for the above functions + + return NativeStoreHandle.EMPTY; + } + @Override public IndexingExecutionEngine indexingEngine(IndexingEngineConfig engineConfig, FormatChecksumStrategy checksumStrategy) { return new ParquetIndexingEngine( diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 47ce26abc948c..d7796c1c5f812 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -101,6 +101,7 @@ import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.IndexStorePlugin; +import org.opensearch.repositories.NativeStoreRepository; import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; @@ -1127,18 +1128,6 @@ private static IndexStorePlugin.StoreFactory resolveStoreFactory( if (key == null || key.isEmpty()) { return new IndexStorePlugin.StoreFactory() { - @Override - public Store newStore( - ShardId shardId, - IndexSettings indexSettings, - Directory directory, - ShardLock shardLock, - Store.OnClose onClose, - ShardPath shardPath - ) throws IOException { - return new Store(shardId, indexSettings, directory, shardLock, onClose, shardPath); - } - @Override public Store newStore( ShardId shardId, @@ -1147,9 +1136,19 @@ public Store newStore( ShardLock shardLock, Store.OnClose onClose, ShardPath shardPath, - IndexStorePlugin.DirectoryFactory directoryFactory + IndexStorePlugin.DirectoryFactory directoryFactory, + NativeStoreRepository nativeStoreRepository ) throws IOException { - return new Store(shardId, indexSettings, directory, shardLock, onClose, shardPath, directoryFactory); + return new Store( + shardId, + indexSettings, + directory, + shardLock, + onClose, + shardPath, + directoryFactory, + nativeStoreRepository + ); } }; } diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 5e579227cbcdd..146c218415c4a 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -117,7 +117,9 @@ import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.plugins.IndexStorePlugin; +import org.opensearch.repositories.NativeStoreRepository; import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.threadpool.ThreadPool; @@ -790,7 +792,8 @@ protected void closeInternal() { lock, new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)), path, - directoryFactory + directoryFactory, + resolveNativeStoreRepository(repositoriesService) ); eventListener.onStoreCreated(shardId); indexShard = new IndexShard( @@ -1353,6 +1356,23 @@ private DataFormatAwareStoreDirectory createDataFormatAwareStoreDirectory(ShardI return null; } + /** + * Resolves the native (Rust) object store repository for this index, if available. + * For indices with a configured remote store repository, returns the repository's + * native store. Otherwise returns {@link NativeStoreRepository#EMPTY}. + */ + private NativeStoreRepository resolveNativeStoreRepository(RepositoriesService repositoriesService) { + String repoName = this.indexSettings.getRemoteStoreRepository(); + if (repoName != null) { + try { + return repositoriesService.repository(repoName).getNativeStore(); + } catch (RepositoryMissingException e) { + logger.warn("Native store not available for repository [{}]", repoName); + } + } + return NativeStoreRepository.EMPTY; + } + private void updateFsyncTaskIfNecessary() { if (indexSettings.getTranslogDurability() == Translog.Durability.REQUEST) { try { diff --git a/server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java b/server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java index 23f4a873cf0ee..5e50239b29a08 100644 --- a/server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java @@ -32,6 +32,7 @@ import org.opensearch.index.engine.dataformat.FileInfos; import org.opensearch.index.engine.dataformat.IndexingEngineConfig; import org.opensearch.index.engine.dataformat.IndexingExecutionEngine; +import org.opensearch.plugins.NativeStoreHandle; import org.opensearch.index.engine.dataformat.RefreshInput; import org.opensearch.index.engine.dataformat.RefreshResult; import org.opensearch.index.engine.dataformat.WriteResult; @@ -137,6 +138,9 @@ public class DataFormatAwareEngine implements Indexer { private final IndexingThrottler throttle; private final AtomicInteger throttleRequestCount = new AtomicInteger(); + // Native object store handle for this shard (owned, closed in closeNoLock) + private final NativeStoreHandle shardNativeStore; + // Timestamps and seq-no markers private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1); @@ -208,17 +212,22 @@ public DataFormatAwareEngine(EngineConfig engineConfig) { // Move to data format aware writers and readers. DataFormatRegistry registry = engineConfig.getDataFormatRegistry(); - // Create indexing engine - // Pass committer here as well. + DataFormat format = registry.format(config().getIndexSettings().pluggableDataFormat()); + + // Plugin creates shard-scoped native store from the Store's repository reference + this.shardNativeStore = registry.createNativeStore(format, config().getStore()); + + // Create indexing engine with the native store handle this.indexingExecutionEngine = registry.getIndexingEngine( new IndexingEngineConfig( committer, config().getMapperService(), config().getIndexSettings(), config().getStore(), - registry + registry, + shardNativeStore ), - registry.format(config().getIndexSettings().pluggableDataFormat()) + format ); this.writerGenerationCounter = new AtomicLong(1L);// committer.getCommitStats().getGeneration()); this.writerPool = new LockablePool<>( @@ -233,7 +242,8 @@ public DataFormatAwareEngine(EngineConfig engineConfig) { Optional.ofNullable(indexingExecutionEngine.getProvider()), engineConfig.getMapperService(), engineConfig.getIndexSettings(), - store.shardPath() + store.shardPath(), + shardNativeStore ); this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker); @@ -1015,6 +1025,7 @@ private void closeNoLock(String reason) { } catch (Exception e) { logger.warn("failed to close engine resources", e); } finally { + shardNativeStore.close(); try { store.decRef(); logger.debug("engine closed [{}]", reason); diff --git a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java index ac34836f97e67..9c4977d53f660 100644 --- a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java +++ b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java @@ -11,6 +11,8 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.index.IndexSettings; import org.opensearch.index.store.FormatChecksumStrategy; +import org.opensearch.index.store.Store; +import org.opensearch.plugins.NativeStoreHandle; import java.util.Map; @@ -54,4 +56,24 @@ public interface DataFormatPlugin { default Map getFormatDescriptors(IndexSettings indexSettings, DataFormatRegistry dataFormatRegistry) { return Map.of(); } + + /** + * Creates a shard-scoped native object store handle for this data format. + * Called once per shard at engine creation time. The returned handle is + * shared between the indexing engine (writes) and reader managers (reads). + * + *

The {@link Store} provides access to the shard path and the repository-level + * {@link org.opensearch.repositories.NativeStoreRepository} (via + * {@link Store#getNativeStoreRepository()}) for scoping. + * + *

The default implementation returns {@link NativeStoreHandle#EMPTY}. + * Data format plugins that use native (Rust) object stores for I/O should + * override this method. + * + * @param store the shard's store with shard path and native store repository + * @return a live handle, or {@link NativeStoreHandle#EMPTY} if not supported + */ + default NativeStoreHandle createNativeStore(Store store) { + return NativeStoreHandle.EMPTY; + } } diff --git a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java index d5ee37dd9c35d..88269e7a73645 100644 --- a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java +++ b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java @@ -16,6 +16,8 @@ import org.opensearch.index.mapper.MapperService; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.store.FormatChecksumStrategy; +import org.opensearch.index.store.Store; +import org.opensearch.plugins.NativeStoreHandle; import org.opensearch.plugins.PluginsService; import org.opensearch.plugins.SearchBackEndPlugin; @@ -108,6 +110,22 @@ public DataFormat format(String name) { return format; } + /** + * Creates a shard-scoped native object store handle for the given data format. + * Delegates to {@link DataFormatPlugin#createNativeStore(Store)}. + * + * @param format the data format + * @param store the shard's store (carries ShardPath and NativeStoreRepository) + * @return a live handle, or {@link NativeStoreHandle#EMPTY} if not supported + */ + public NativeStoreHandle createNativeStore(DataFormat format, Store store) { + DataFormatPlugin plugin = dataFormatPluginRegistry.get(format); + if (plugin == null) { + return NativeStoreHandle.EMPTY; + } + return plugin.createNativeStore(store); + } + /** * Returns all registered data formats that support a specific capability for a field type. * @@ -174,13 +192,14 @@ public Map> getReaderManagers( Optional indexStoreProvider, MapperService mapperService, IndexSettings indexSettings, - ShardPath shardPath + ShardPath shardPath, + NativeStoreHandle nativeStoreHandle ) throws IOException { // TODO: Filter based on index settings Map> readerManagers = new HashMap<>(); for (Map.Entry, IOException>> entry : readerManagerBuilders .entrySet()) { - ReaderManagerConfig settings = new ReaderManagerConfig(indexStoreProvider, entry.getKey(), shardPath); + ReaderManagerConfig settings = new ReaderManagerConfig(indexStoreProvider, entry.getKey(), shardPath, nativeStoreHandle); readerManagers.put(entry.getKey(), entry.getValue().apply(settings)); } return readerManagers; diff --git a/server/src/main/java/org/opensearch/index/engine/dataformat/IndexingEngineConfig.java b/server/src/main/java/org/opensearch/index/engine/dataformat/IndexingEngineConfig.java index 0e417d9b5c3e7..51eb9efc5eead 100644 --- a/server/src/main/java/org/opensearch/index/engine/dataformat/IndexingEngineConfig.java +++ b/server/src/main/java/org/opensearch/index/engine/dataformat/IndexingEngineConfig.java @@ -13,6 +13,7 @@ import org.opensearch.index.engine.exec.commit.Committer; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.store.Store; +import org.opensearch.plugins.NativeStoreHandle; /** * Initialization parameters for creating an {@link IndexingExecutionEngine} via @@ -24,10 +25,11 @@ * @param indexSettings the index-level settings * @param store the shard's store, or null if not available * @param registry DataFormatRegistry containing information about registered data formats. + * @param nativeStoreHandle the shard-scoped native object store handle, or {@link NativeStoreHandle#EMPTY} * * @opensearch.experimental */ @ExperimentalApi public record IndexingEngineConfig(Committer committer, MapperService mapperService, IndexSettings indexSettings, Store store, - DataFormatRegistry registry) { + DataFormatRegistry registry, NativeStoreHandle nativeStoreHandle) { } diff --git a/server/src/main/java/org/opensearch/index/engine/dataformat/ReaderManagerConfig.java b/server/src/main/java/org/opensearch/index/engine/dataformat/ReaderManagerConfig.java index 97afc26730155..f8d03110d07b8 100644 --- a/server/src/main/java/org/opensearch/index/engine/dataformat/ReaderManagerConfig.java +++ b/server/src/main/java/org/opensearch/index/engine/dataformat/ReaderManagerConfig.java @@ -11,6 +11,7 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.index.engine.exec.commit.IndexStoreProvider; import org.opensearch.index.shard.ShardPath; +import org.opensearch.plugins.NativeStoreHandle; import java.util.Optional; @@ -22,9 +23,11 @@ * @param indexStoreProvider the store provider, or empty if not available * @param format the data format to create a reader manager for * @param shardPath the shard path for file storage + * @param nativeStoreHandle the shard-scoped native object store handle, or {@link NativeStoreHandle#EMPTY} * * @opensearch.experimental */ @ExperimentalApi -public record ReaderManagerConfig(Optional indexStoreProvider, DataFormat format, ShardPath shardPath) { +public record ReaderManagerConfig(Optional indexStoreProvider, DataFormat format, ShardPath shardPath, + NativeStoreHandle nativeStoreHandle) { } diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 624523ca6e24c..f5e87f7cb53c3 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -89,6 +89,7 @@ import org.opensearch.env.ShardLock; import org.opensearch.env.ShardLockObtainFailedException; import org.opensearch.index.BucketedCompositeDirectory; +import org.opensearch.repositories.NativeStoreRepository; import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.CombinedDeletionPolicy; import org.opensearch.index.engine.Engine; @@ -186,6 +187,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref private final boolean isParentFieldEnabledVersion; private final boolean isIndexSortEnabled; private final IndexStorePlugin.DirectoryFactory directoryFactory; + private final NativeStoreRepository nativeStoreRepository; // used to ref count files when a new Reader is opened for PIT/Scroll queries // prevents segment files deletion until the PIT/Scroll expires or is discarded @@ -199,7 +201,7 @@ protected void closeInternal() { }; public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock) { - this(shardId, indexSettings, directory, shardLock, OnClose.EMPTY, null, null); + this(shardId, indexSettings, directory, shardLock, OnClose.EMPTY, null, null, NativeStoreRepository.EMPTY); } public Store( @@ -210,7 +212,7 @@ public Store( OnClose onClose, ShardPath shardPath ) { - this(shardId, indexSettings, directory, shardLock, onClose, shardPath, null); + this(shardId, indexSettings, directory, shardLock, onClose, shardPath, null, NativeStoreRepository.EMPTY); } public Store( @@ -221,6 +223,19 @@ public Store( OnClose onClose, ShardPath shardPath, IndexStorePlugin.DirectoryFactory directoryFactory + ) { + this(shardId, indexSettings, directory, shardLock, onClose, shardPath, directoryFactory, NativeStoreRepository.EMPTY); + } + + public Store( + ShardId shardId, + IndexSettings indexSettings, + Directory directory, + ShardLock shardLock, + OnClose onClose, + ShardPath shardPath, + IndexStorePlugin.DirectoryFactory directoryFactory, + NativeStoreRepository nativeStoreRepository ) { super(shardId, indexSettings); final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING); @@ -233,12 +248,23 @@ public Store( this.isIndexSortEnabled = indexSettings.getIndexSortConfig().hasIndexSort(); this.isParentFieldEnabledVersion = indexSettings.getIndexVersionCreated().onOrAfter(org.opensearch.Version.V_3_2_0); this.directoryFactory = directoryFactory; + this.nativeStoreRepository = nativeStoreRepository != null ? nativeStoreRepository : NativeStoreRepository.EMPTY; assert onClose != null; assert shardLock != null; assert shardLock.getShardId().equals(shardId); } + /** + * Returns the native (Rust) object store repository for this shard's storage backend. + * Returns {@link NativeStoreRepository#EMPTY} if no native store is configured. + * + * @return the native store repository, never null + */ + public NativeStoreRepository getNativeStoreRepository() { + return nativeStoreRepository; + } + public Directory directory() { ensureOpen(); return directory; diff --git a/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java b/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java index b83880eac7533..e9df82ad16ba7 100644 --- a/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java +++ b/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java @@ -47,6 +47,7 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.repositories.NativeStoreRepository; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; @@ -179,14 +180,16 @@ interface StoreFactory { * @param shardPath the shard path * @return a new Store instance */ - Store newStore( + default Store newStore( ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock, Store.OnClose onClose, ShardPath shardPath - ) throws IOException; + ) throws IOException { + return newStore(shardId, indexSettings, directory, shardLock, onClose, shardPath, null, NativeStoreRepository.EMPTY); + } /** * Creates a new Store per shard. This method is called once per shard on shard creation. @@ -196,10 +199,10 @@ Store newStore( * @param shardLock the shard lock to associate with the store * @param onClose listener invoked on store close * @param shardPath the shard path - * @param directoryFactory the directory path. + * @param directoryFactory the directory factory * @return a new Store instance */ - Store newStore( + default Store newStore( ShardId shardId, IndexSettings indexSettings, Directory directory, @@ -207,6 +210,31 @@ Store newStore( Store.OnClose onClose, ShardPath shardPath, DirectoryFactory directoryFactory + ) throws IOException { + return newStore(shardId, indexSettings, directory, shardLock, onClose, shardPath, directoryFactory, NativeStoreRepository.EMPTY); + } + + /** + * Creates a new Store per shard with a native object store repository for the storage backend. + * @param shardId the shard id + * @param indexSettings the shard's index settings + * @param directory the Lucene directory selected for this shard + * @param shardLock the shard lock to associate with the store + * @param onClose listener invoked on store close + * @param shardPath the shard path + * @param directoryFactory the directory factory + * @param nativeStoreRepository the native object store repository, or {@link NativeStoreRepository#EMPTY} + * @return a new Store instance + */ + Store newStore( + ShardId shardId, + IndexSettings indexSettings, + Directory directory, + ShardLock shardLock, + Store.OnClose onClose, + ShardPath shardPath, + DirectoryFactory directoryFactory, + NativeStoreRepository nativeStoreRepository ) throws IOException; } diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index 5ecabffe35c70..26e97a25efa34 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -115,6 +115,7 @@ import org.opensearch.indices.recovery.DefaultRecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.IndexStorePlugin; +import org.opensearch.repositories.NativeStoreRepository; import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptService; import org.opensearch.search.internal.ReaderContext; @@ -861,21 +862,10 @@ public Store newStore( ShardLock shardLock, Store.OnClose onClose, ShardPath shardPath, - IndexStorePlugin.DirectoryFactory directoryFactory + IndexStorePlugin.DirectoryFactory directoryFactory, + NativeStoreRepository nativeStoreRepository ) throws IOException { - return new Store(shardId, indexSettings, directory, shardLock, onClose, shardPath, directoryFactory); - } - - @Override - public Store newStore( - ShardId shardId, - IndexSettings indexSettings, - Directory directory, - ShardLock shardLock, - Store.OnClose onClose, - ShardPath shardPath - ) throws IOException { - return new Store(shardId, indexSettings, directory, shardLock, onClose, shardPath); + return new Store(shardId, indexSettings, directory, shardLock, onClose, shardPath, directoryFactory, nativeStoreRepository); } } diff --git a/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java b/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java index 2cd4008fe3bfb..3f81ed631458a 100644 --- a/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java +++ b/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java @@ -13,6 +13,7 @@ import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexSettings; +import org.opensearch.plugins.NativeStoreHandle; import org.opensearch.index.engine.dataformat.stub.MockCatalogSnapshot; import org.opensearch.index.engine.dataformat.stub.MockDataFormat; import org.opensearch.index.engine.dataformat.stub.MockDataFormatPlugin; @@ -86,7 +87,8 @@ public void testFullDataFormatLifecycle() throws IOException { mock(MapperService.class), new IndexSettings(IndexMetadata.builder("index").settings(settings).build(), settings), null, - null + null, + NativeStoreHandle.EMPTY ), null ); diff --git a/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatRegistryTests.java b/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatRegistryTests.java index e8ced73980e71..32d8c6551a631 100644 --- a/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatRegistryTests.java +++ b/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatRegistryTests.java @@ -19,6 +19,7 @@ import org.opensearch.index.engine.exec.EngineReaderManager; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.shard.ShardPath; +import org.opensearch.plugins.NativeStoreHandle; import org.opensearch.plugins.PluginsService; import org.opensearch.plugins.SearchBackEndPlugin; import org.opensearch.test.OpenSearchTestCase; @@ -146,7 +147,7 @@ public void testGetIndexingEngine() { DataFormatRegistry registry = new DataFormatRegistry(pluginsService); IndexingExecutionEngine engine = registry.getIndexingEngine( - new IndexingEngineConfig(null, mapperService, indexSettings, null, null), + new IndexingEngineConfig(null, mapperService, indexSettings, null, null, NativeStoreHandle.EMPTY), format ); assertNotNull(engine); @@ -162,7 +163,10 @@ public void testGetIndexingEngineForUnregisteredFormatThrows() { IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> registry.getIndexingEngine(new IndexingEngineConfig(null, mapperService, indexSettings, null, null), unregistered) + () -> registry.getIndexingEngine( + new IndexingEngineConfig(null, mapperService, indexSettings, null, null, NativeStoreHandle.EMPTY), + unregistered + ) ); assertTrue(e.getMessage().contains("unknown")); } @@ -271,7 +275,8 @@ public void testGetReaderManagers() throws IOException { Optional.empty(), mapperService, indexSettings, - shardPath + shardPath, + NativeStoreHandle.EMPTY ); assertEquals(1, managers.size()); assertNotNull(managers.get(format)); diff --git a/server/src/test/java/org/opensearch/plugins/IndexStorePluginTests.java b/server/src/test/java/org/opensearch/plugins/IndexStorePluginTests.java index c43b5592f1be1..7fc9dd7b6d982 100644 --- a/server/src/test/java/org/opensearch/plugins/IndexStorePluginTests.java +++ b/server/src/test/java/org/opensearch/plugins/IndexStorePluginTests.java @@ -42,6 +42,7 @@ import org.opensearch.index.shard.ShardPath; import org.opensearch.index.store.FsDirectoryFactory; import org.opensearch.index.store.Store; +import org.opensearch.repositories.NativeStoreRepository; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.node.MockNode; import org.opensearch.test.OpenSearchTestCase; @@ -137,21 +138,10 @@ public Store newStore( ShardLock shardLock, Store.OnClose onClose, ShardPath shardPath, - IndexStorePlugin.DirectoryFactory directoryFactory + IndexStorePlugin.DirectoryFactory directoryFactory, + NativeStoreRepository nativeStoreRepository ) throws IOException { - return new Store(shardId, indexSettings, directory, shardLock, onClose, shardPath, directoryFactory); - } - - @Override - public Store newStore( - ShardId shardId, - IndexSettings indexSettings, - org.apache.lucene.store.Directory directory, - ShardLock shardLock, - Store.OnClose onClose, - ShardPath shardPath - ) throws IOException { - return new Store(shardId, indexSettings, directory, shardLock, onClose, shardPath); + return new Store(shardId, indexSettings, directory, shardLock, onClose, shardPath, directoryFactory, nativeStoreRepository); } }