Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.NativeStoreRepository;

import java.io.IOException;
import java.nio.file.Path;
Expand Down Expand Up @@ -103,6 +104,40 @@ public SubdirectoryAwareStore(
);
}

/**
* Constructor for SubdirectoryAwareStore with native store support.
*
* @param shardId the shard ID
* @param indexSettings the index settings
* @param directory the directory to use for the store
* @param shardLock the shard lock
* @param onClose the on close callback
* @param shardPath the shard path
* @param directoryFactory the directory factory
* @param nativeStoreRepository the native object store repository
*/
public SubdirectoryAwareStore(
ShardId shardId,
IndexSettings indexSettings,
Directory directory,
ShardLock shardLock,
OnClose onClose,
ShardPath shardPath,
IndexStorePlugin.DirectoryFactory directoryFactory,
NativeStoreRepository nativeStoreRepository
) {
super(
shardId,
indexSettings,
new SubdirectoryAwareDirectory(directory, shardPath),
shardLock,
onClose,
shardPath,
directoryFactory,
nativeStoreRepository
);
}

@Override
public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException {
long totalNumDocs = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.index.store.Store;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.NativeStoreRepository;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -60,39 +61,16 @@ public Map<String, StoreFactory> getStoreFactories() {
*/
static class SubdirectoryStoreFactory implements StoreFactory {
/**
* Creates a new {@link SubdirectoryAwareStore} instance.
* Creates a new {@link SubdirectoryAwareStore} instance with native store support.
*
* @param shardId the shard identifier
* @param indexSettings the index settings
* @param directory the underlying Lucene directory
* @param shardLock the shard lock
* @param onClose callback to execute when the store is closed
* @param shardPath the path information for the shard
* @return a new SubdirectoryAwareStore instance
*/
@Override
public Store newStore(
ShardId shardId,
IndexSettings indexSettings,
Directory directory,
ShardLock shardLock,
Store.OnClose onClose,
ShardPath shardPath
) {
return new SubdirectoryAwareStore(shardId, indexSettings, directory, shardLock, onClose, shardPath);
}

/**
* Creates a new {@link SubdirectoryAwareStore} instance.
*
* @param shardId the shard identifier
* @param indexSettings the index settings
* @param directory the underlying Lucene directory
* @param shardLock the shard lock
* @param onClose callback to execute when the store is closed
* @param shardPath the path information for the shard
* @param directoryFactory the directory factory to create child level directory.
* Used for Context Aware Segments enabled indices.
* @param directoryFactory the directory factory
* @param nativeStoreRepository the native object store repository
* @return a new SubdirectoryAwareStore instance
*/
@Override
Expand All @@ -103,9 +81,19 @@ public Store newStore(
ShardLock shardLock,
Store.OnClose onClose,
ShardPath shardPath,
DirectoryFactory directoryFactory
DirectoryFactory directoryFactory,
NativeStoreRepository nativeStoreRepository
) {
return new SubdirectoryAwareStore(shardId, indexSettings, directory, shardLock, onClose, shardPath, directoryFactory);
return new SubdirectoryAwareStore(
shardId,
indexSettings,
directory,
shardLock,
onClose,
shardPath,
directoryFactory,
nativeStoreRepository
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.index.engine.dataformat.DataFormat;
import org.opensearch.index.engine.dataformat.DataFormatPlugin;
import org.opensearch.index.engine.dataformat.DataFormatRegistry;
import org.opensearch.plugins.NativeStoreHandle;
import org.opensearch.index.engine.dataformat.DocumentInput;
import org.opensearch.index.engine.dataformat.IndexingEngineConfig;
import org.opensearch.index.engine.dataformat.IndexingExecutionEngine;
Expand Down Expand Up @@ -109,7 +110,7 @@ public CompositeIndexingExecutionEngine(
validateFormatsRegistered(dataFormatRegistry, primaryFormatName, secondaryFormatNames);

Map<String, FormatChecksumStrategy> strategies = checksumStrategies != null ? checksumStrategies : Map.of();
IndexingEngineConfig engineSettings = new IndexingEngineConfig(committer, mapperService, indexSettings, store, dataFormatRegistry);
IndexingEngineConfig engineSettings = new IndexingEngineConfig(committer, mapperService, indexSettings, store, dataFormatRegistry, NativeStoreHandle.EMPTY);

List<DataFormat> allFormats = new ArrayList<>();
DataFormat primaryFormat = dataFormatRegistry.format(primaryFormatName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.opensearch.index.engine.dataformat.IndexingExecutionEngine;
import org.opensearch.index.store.FormatChecksumStrategy;
import org.opensearch.index.store.PrecomputedChecksumStrategy;
import org.opensearch.index.store.Store;
import org.opensearch.parquet.engine.ParquetDataFormat;
import org.opensearch.parquet.engine.ParquetIndexingEngine;
import org.opensearch.parquet.fields.ArrowSchemaBuilder;
import org.opensearch.plugins.NativeStoreHandle;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
Expand Down Expand Up @@ -98,6 +100,40 @@ public DataFormat getDataFormat() {
return dataFormat;
}

@Override
public NativeStoreHandle createNativeStore(Store store) {
// Phase 2 implementation outline:
//
// 1. Get the repository-level native store from the Store
// NativeStoreRepository repoStore = store.getNativeStoreRepository();
//
// 2. If a repository-level native store exists (remote-backed index),
// create a PrefixObjectStore scoped to this shard's path within the repository.
// This ensures the shard can only access its own files.
// if (repoStore.isLive()) {
// String shardPrefix = "indices/" + indexUUID + "/" + shardId + "/parquet/";
// long scopedPtr = RustBridge.createScopedStore(repoStore.getPointer(), shardPrefix);
// return new NativeStoreHandle(scopedPtr, RustBridge::destroyStore);
// }
//
// 3. Otherwise (local-only index), create a LocalFileSystem ObjectStore
// rooted at the shard's data path.
// long localPtr = RustBridge.createLocalStore(store.shardPath().getDataPath().toString());
// if (localPtr <= 0) return NativeStoreHandle.EMPTY;
// return new NativeStoreHandle(localPtr, RustBridge::destroyStore);
//
// TODO: Implement RustBridge.createLocalStore(String rootPath) — FFM call to
// parquet_create_local_store which creates LocalFileSystem::new_with_prefix(root)
// TODO: Implement RustBridge.createScopedStore(long parentHandle, String prefix) — FFM call to
// parquet_create_scoped_store which creates PrefixStore wrapping the parent store
// TODO: Implement RustBridge.destroyStore(long handle) — FFM call to
// parquet_destroy_store which drops the Arc<dyn ObjectStore>
// TODO: Implement Rust object_store_handle.rs with create_local_store, create_scoped_store, drop_store
// TODO: Add FFM exports in ffm.rs for the above functions

return NativeStoreHandle.EMPTY;
}

@Override
public IndexingExecutionEngine<?, ?> indexingEngine(IndexingEngineConfig engineConfig, FormatChecksumStrategy checksumStrategy) {
return new ParquetIndexingEngine(
Expand Down
27 changes: 13 additions & 14 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.NativeStoreRepository;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
Expand Down Expand Up @@ -1127,18 +1128,6 @@ private static IndexStorePlugin.StoreFactory resolveStoreFactory(
if (key == null || key.isEmpty()) {
return new IndexStorePlugin.StoreFactory() {

@Override
public Store newStore(
ShardId shardId,
IndexSettings indexSettings,
Directory directory,
ShardLock shardLock,
Store.OnClose onClose,
ShardPath shardPath
) throws IOException {
return new Store(shardId, indexSettings, directory, shardLock, onClose, shardPath);
}

@Override
public Store newStore(
ShardId shardId,
Expand All @@ -1147,9 +1136,19 @@ public Store newStore(
ShardLock shardLock,
Store.OnClose onClose,
ShardPath shardPath,
IndexStorePlugin.DirectoryFactory directoryFactory
IndexStorePlugin.DirectoryFactory directoryFactory,
NativeStoreRepository nativeStoreRepository
) throws IOException {
return new Store(shardId, indexSettings, directory, shardLock, onClose, shardPath, directoryFactory);
return new Store(
shardId,
indexSettings,
directory,
shardLock,
onClose,
shardPath,
directoryFactory,
nativeStoreRepository
);
}
};
}
Expand Down
22 changes: 21 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.NativeStoreRepository;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -790,7 +792,8 @@ protected void closeInternal() {
lock,
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)),
path,
directoryFactory
directoryFactory,
resolveNativeStoreRepository(repositoriesService)
);
eventListener.onStoreCreated(shardId);
indexShard = new IndexShard(
Expand Down Expand Up @@ -1353,6 +1356,23 @@ private DataFormatAwareStoreDirectory createDataFormatAwareStoreDirectory(ShardI
return null;
}

/**
* Resolves the native (Rust) object store repository for this index, if available.
* For indices with a configured remote store repository, returns the repository's
* native store. Otherwise returns {@link NativeStoreRepository#EMPTY}.
*/
private NativeStoreRepository resolveNativeStoreRepository(RepositoriesService repositoriesService) {
String repoName = this.indexSettings.getRemoteStoreRepository();
if (repoName != null) {
try {
return repositoriesService.repository(repoName).getNativeStore();
} catch (RepositoryMissingException e) {
logger.warn("Native store not available for repository [{}]", repoName);
}
}
return NativeStoreRepository.EMPTY;
}

private void updateFsyncTaskIfNecessary() {
if (indexSettings.getTranslogDurability() == Translog.Durability.REQUEST) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.opensearch.index.engine.dataformat.FileInfos;
import org.opensearch.index.engine.dataformat.IndexingEngineConfig;
import org.opensearch.index.engine.dataformat.IndexingExecutionEngine;
import org.opensearch.plugins.NativeStoreHandle;
import org.opensearch.index.engine.dataformat.RefreshInput;
import org.opensearch.index.engine.dataformat.RefreshResult;
import org.opensearch.index.engine.dataformat.WriteResult;
Expand Down Expand Up @@ -137,6 +138,9 @@ public class DataFormatAwareEngine implements Indexer {
private final IndexingThrottler throttle;
private final AtomicInteger throttleRequestCount = new AtomicInteger();

// Native object store handle for this shard (owned, closed in closeNoLock)
private final NativeStoreHandle shardNativeStore;

// Timestamps and seq-no markers
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1);
Expand Down Expand Up @@ -208,17 +212,22 @@ public DataFormatAwareEngine(EngineConfig engineConfig) {

// Move to data format aware writers and readers.
DataFormatRegistry registry = engineConfig.getDataFormatRegistry();
// Create indexing engine
// Pass committer here as well.
DataFormat format = registry.format(config().getIndexSettings().pluggableDataFormat());

// Plugin creates shard-scoped native store from the Store's repository reference
this.shardNativeStore = registry.createNativeStore(format, config().getStore());

// Create indexing engine with the native store handle
this.indexingExecutionEngine = registry.getIndexingEngine(
new IndexingEngineConfig(
committer,
config().getMapperService(),
config().getIndexSettings(),
config().getStore(),
registry
registry,
shardNativeStore
),
registry.format(config().getIndexSettings().pluggableDataFormat())
format
);
this.writerGenerationCounter = new AtomicLong(1L);// committer.getCommitStats().getGeneration());
this.writerPool = new LockablePool<>(
Expand All @@ -233,7 +242,8 @@ public DataFormatAwareEngine(EngineConfig engineConfig) {
Optional.ofNullable(indexingExecutionEngine.getProvider()),
engineConfig.getMapperService(),
engineConfig.getIndexSettings(),
store.shardPath()
store.shardPath(),
shardNativeStore
);

this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker);
Expand Down Expand Up @@ -1015,6 +1025,7 @@ private void closeNoLock(String reason) {
} catch (Exception e) {
logger.warn("failed to close engine resources", e);
} finally {
shardNativeStore.close();
try {
store.decRef();
logger.debug("engine closed [{}]", reason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.store.FormatChecksumStrategy;
import org.opensearch.index.store.Store;
import org.opensearch.plugins.NativeStoreHandle;

import java.util.Map;

Expand Down Expand Up @@ -54,4 +56,24 @@ public interface DataFormatPlugin {
default Map<String, DataFormatDescriptor> getFormatDescriptors(IndexSettings indexSettings, DataFormatRegistry dataFormatRegistry) {
return Map.of();
}

/**
* Creates a shard-scoped native object store handle for this data format.
* Called once per shard at engine creation time. The returned handle is
* shared between the indexing engine (writes) and reader managers (reads).
*
* <p>The {@link Store} provides access to the shard path and the repository-level
* {@link org.opensearch.repositories.NativeStoreRepository} (via
* {@link Store#getNativeStoreRepository()}) for scoping.
*
* <p>The default implementation returns {@link NativeStoreHandle#EMPTY}.
* Data format plugins that use native (Rust) object stores for I/O should
* override this method.
*
* @param store the shard's store with shard path and native store repository
* @return a live handle, or {@link NativeStoreHandle#EMPTY} if not supported
*/
default NativeStoreHandle createNativeStore(Store store) {
return NativeStoreHandle.EMPTY;
}
}
Loading