diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java
index 56bd6e22884a7..afa210a2d3da9 100644
--- a/server/src/main/java/org/opensearch/index/IndexModule.java
+++ b/server/src/main/java/org/opensearch/index/IndexModule.java
@@ -105,6 +105,7 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
+import org.opensearch.storage.directory.TieredDataFormatAwareStoreDirectoryFactory;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.Client;
@@ -1176,6 +1177,14 @@ private static DataFormatAwareStoreDirectoryFactory getDataFormatAwareStoreDirec
if (dataFormatAwareStoreDirectoryFactories.isEmpty()) {
return null;
}
+ if (indexSettings.isWarmIndex() && indexSettings.isPluggableDataFormatEnabled()) {
+ DataFormatAwareStoreDirectoryFactory tiered = dataFormatAwareStoreDirectoryFactories.get(
+ TieredDataFormatAwareStoreDirectoryFactory.FACTORY_KEY
+ );
+ if (tiered != null) {
+ return tiered;
+ }
+ }
return dataFormatAwareStoreDirectoryFactories.get("default");
}
diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java
index 65bcfdcc565c5..16731e43d9d96 100644
--- a/server/src/main/java/org/opensearch/index/IndexService.java
+++ b/server/src/main/java/org/opensearch/index/IndexService.java
@@ -100,6 +100,7 @@
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.DataFormatAwareStoreDirectory;
import org.opensearch.index.store.DataFormatAwareStoreDirectoryFactory;
+import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.remote.filecache.FileCache;
@@ -773,9 +774,22 @@ protected void closeInternal() {
}
Directory directory = null;
- if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_SETTING) &&
- // TODO : Need to remove this check after support for hot indices is added in Composite Directory
- this.indexSettings.isWarmIndex()) {
+ if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_SETTING)
+ && this.indexSettings.isWarmIndex()
+ && this.indexSettings.isPluggableDataFormatEnabled()
+ && this.dataFormatAwareStoreDirectoryFactory != null) {
+ // Warm + format-aware: use warm-aware factory overload
+ directory = dataFormatAwareStoreDirectoryFactory.newDataFormatAwareStoreDirectory(
+ this.indexSettings,
+ shardId,
+ path,
+ directoryFactory,
+ dataFormatRegistry,
+ (RemoteSegmentStoreDirectory) remoteDirectory,
+ fileCache,
+ threadPool
+ );
+ } else if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_SETTING) && this.indexSettings.isWarmIndex()) {
directory = compositeDirectoryFactory.newDirectory(
this.indexSettings,
path,
@@ -784,7 +798,7 @@ protected void closeInternal() {
fileCache,
threadPool
);
- } else if (!this.indexSettings.isPluggableDataFormatEnabled()) {
+ } else if (this.indexSettings.isPluggableDataFormatEnabled() == false) {
directory = directoryFactory.newDirectory(this.indexSettings, path);
} else {
// Will be enabled in case of formatAware indices.
diff --git a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatDescriptor.java b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatDescriptor.java
index 0df1498a23b41..2f4ceca713253 100644
--- a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatDescriptor.java
+++ b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatDescriptor.java
@@ -13,14 +13,23 @@
/**
* Describes the static capabilities of a data format, including its default checksum
- * strategy and format name. Provided by {@link DataFormatPlugin} implementations and
- * consumed by DataFormatAwareStoreDirectory and DataFormatAwareRemoteDirectory.
+ * strategy, format name, and whether it requires native store for warm reads.
+ * Provided by {@link DataFormatPlugin} implementations and consumed by
+ * DataFormatAwareStoreDirectory, DataFormatAwareRemoteDirectory, and
+ * TieredDataFormatAwareStoreDirectoryFactory.
*
*
The checksum strategy here is the default fallback — a full-file scan.
* At runtime, the {@link IndexingExecutionEngine} may override this with a more
* efficient strategy (e.g., {@link org.opensearch.index.store.PrecomputedChecksumStrategy})
* via {@link org.opensearch.index.store.DataFormatAwareStoreDirectory#registerChecksumStrategy}.
*
+ *
All formats use the segment store ({@code RemoteSegmentStoreDirectory}) for uploads.
+ * Formats that return {@code true} from {@link #nativeStoreSupported()} get a
+ * {@code NativeStoreRepository}-backed TieredDirectory for warm reads. The native store
+ * is obtained from the repository that's already wired to the shard (via
+ * {@code Repository.getNativeStore()}) — the descriptor only signals the intent,
+ * it doesn't provide the store itself.
+ *
* @opensearch.experimental
*/
@ExperimentalApi
@@ -28,16 +37,32 @@ public class DataFormatDescriptor {
private final String formatName;
private final FormatChecksumStrategy checksumStrategy;
+ private final boolean nativeStoreSupported;
/**
- * Creates a new DataFormatDescriptor.
+ * Creates a new DataFormatDescriptor without native store.
+ * The format uses the standard segment store for both uploads and warm reads.
*
* @param formatName the format name (e.g., "parquet")
* @param checksumStrategy the default checksum strategy for this format
*/
public DataFormatDescriptor(String formatName, FormatChecksumStrategy checksumStrategy) {
+ this(formatName, checksumStrategy, false);
+ }
+
+ /**
+ * Creates a new DataFormatDescriptor.
+ *
+ * @param formatName the format name (e.g., "parquet")
+ * @param checksumStrategy the default checksum strategy for this format
+ * @param nativeStoreSupported if {@code true}, warm reads for this format use the
+ * {@code NativeStoreRepository} from the shard's repository
+ * instead of the standard block-based remote reads
+ */
+ public DataFormatDescriptor(String formatName, FormatChecksumStrategy checksumStrategy, boolean nativeStoreSupported) {
this.formatName = formatName;
this.checksumStrategy = checksumStrategy;
+ this.nativeStoreSupported = nativeStoreSupported;
}
/**
@@ -57,4 +82,22 @@ public String getFormatName() {
public FormatChecksumStrategy getChecksumStrategy() {
return checksumStrategy;
}
+
+ /**
+ * Returns whether this format needs native store for warm reads.
+ *
+ *
When {@code true}, the tiered directory factory obtains the
+ * {@code NativeStoreRepository} from the shard's repository (via
+ * {@code Repository.getNativeStore()}) and wires it into this format's
+ * TieredDirectory for Rust-native I/O optimized for sequential
+ * column-chunk reads (e.g., Parquet via native S3/GCS/Azure/FS).
+ *
+ *
When {@code false} (the default), warm reads go through the standard
+ * {@code RemoteSegmentStoreDirectory} with block-based on-demand fetching.
+ *
+ * @return true if native store is needed for warm reads
+ */
+ public boolean nativeStoreSupported() {
+ return nativeStoreSupported;
+ }
}
diff --git a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java
index ac34836f97e67..b87c04c8fe1fb 100644
--- a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java
+++ b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java
@@ -8,9 +8,11 @@
package org.opensearch.index.engine.dataformat;
+import org.apache.lucene.store.Directory;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.store.FormatChecksumStrategy;
+import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import java.util.Map;
@@ -54,4 +56,5 @@ public interface DataFormatPlugin {
default Map getFormatDescriptors(IndexSettings indexSettings, DataFormatRegistry dataFormatRegistry) {
return Map.of();
}
+
}
diff --git a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java
index 5a6254b0ce5ed..7cdee0f88e4be 100644
--- a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java
+++ b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java
@@ -10,15 +10,18 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.apache.lucene.store.Directory;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.exec.EngineReaderManager;
import org.opensearch.index.store.FormatChecksumStrategy;
+import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.plugins.PluginsService;
import org.opensearch.plugins.SearchBackEndPlugin;
import java.io.IOException;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -139,6 +142,37 @@ public Set getRegisteredFormats() {
return Set.copyOf(dataFormatPluginRegistry.keySet());
}
+ /**
+ * Asks each registered format plugin for a tiered directory and returns the non-null results.
+ * Called by TieredDataFormatAwareStoreDirectoryFactory at shard open time on warm nodes.
+ *
+ * @param localDirectory the subdirectory-aware local directory
+ * @param remoteDirectory the remote segment store directory
+ * @param indexSettings the index settings for this shard
+ * @return map of format name to tiered directory (only formats that provide one)
+ */
+ public Map getTieredDirectories(
+ Directory localDirectory,
+ RemoteSegmentStoreDirectory remoteDirectory,
+ IndexSettings indexSettings
+ ) {
+ Map result = new HashMap<>();
+ String dataformatName = indexSettings.getSettings().get(PLUGGABLE_DATAFORMAT_SETTING);
+ if (dataformatName != null) {
+ DataFormat format = dataFormats.get(dataformatName);
+ if (format != null) {
+ DataFormatPlugin plugin = dataFormatPluginRegistry.get(format);
+ if (plugin != null) {
+ Directory dir = plugin.getTieredDirectory(localDirectory, remoteDirectory, indexSettings);
+ if (dir != null) {
+ result.put(format, dir);
+ }
+ }
+ }
+ }
+ return Collections.unmodifiableMap(result);
+ }
+
/**
* Returns format descriptors for the active data format of the given index.
* Resolves the data format from index settings via the {@code pluggable_dataformat} setting,
diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreUploaderService.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreUploaderService.java
index 413316b884e39..a1dc6b1a4f03e 100644
--- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreUploaderService.java
+++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreUploaderService.java
@@ -19,8 +19,8 @@
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.util.UploadListener;
import org.opensearch.core.action.ActionListener;
-import org.opensearch.index.store.CompositeDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
+import org.opensearch.index.store.RemoteSyncAwareDirectory;
import java.util.Collection;
import java.util.Map;
@@ -63,7 +63,6 @@ public void uploadSegments(
logger.debug("Effective new segments files to upload {}", localSegments);
ActionListener> mappedListener = ActionListener.map(listener, resp -> null);
GroupedActionListener batchUploadListener = new GroupedActionListener<>(mappedListener, localSegments.size());
- Directory directory = ((FilterDirectory) (((FilterDirectory) storeDirectory).getDelegate())).getDelegate();
for (String localSegment : localSegments) {
// Initializing listener here to ensure that the stats increment operations are thread-safe
@@ -72,9 +71,7 @@ public void uploadSegments(
statsListener.onSuccess(localSegment);
batchUploadListener.onResponse(resp);
// Once uploaded to Remote, local files become eligible for eviction from FileCache
- if (directory instanceof CompositeDirectory compositeDirectory) {
- compositeDirectory.afterSyncToRemote(localSegment);
- }
+ notifyAfterSyncToRemote(storeDirectory, localSegment);
}, ex -> {
logger.warn(() -> new ParameterizedMessage("Exception: [{}] while uploading segment files", ex), ex);
if (ex instanceof CorruptIndexException) {
@@ -94,4 +91,19 @@ public void uploadSegments(
);
}
}
+
+ private static void notifyAfterSyncToRemote(Directory dir, String file) {
+ Directory current = dir;
+ while (current != null) {
+ if (current instanceof RemoteSyncAwareDirectory) {
+ ((RemoteSyncAwareDirectory) current).afterSyncToRemote(file);
+ return;
+ }
+ if (current instanceof FilterDirectory) {
+ current = ((FilterDirectory) current).getDelegate();
+ } else {
+ break;
+ }
+ }
+ }
}
diff --git a/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java b/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java
index 51aec1c7045e3..44b6677caf780 100644
--- a/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java
+++ b/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java
@@ -28,6 +28,7 @@
import org.opensearch.index.store.remote.filecache.FileCache.RestoredCachedIndexInput;
import org.opensearch.index.store.remote.utils.FileTypeUtils;
import org.opensearch.index.store.remote.utils.TransferManager;
+import org.opensearch.storage.utils.DirectoryUtils;
import org.opensearch.threadpool.ThreadPool;
import java.io.FileNotFoundException;
@@ -54,7 +55,7 @@
* @opensearch.experimental
*/
@ExperimentalApi
-public class CompositeDirectory extends FilterDirectory {
+public class CompositeDirectory extends FilterDirectory implements RemoteSyncAwareDirectory {
private static final Logger logger = LogManager.getLogger(CompositeDirectory.class);
protected final Directory localDirectory;
protected final RemoteSegmentStoreDirectory remoteDirectory;
@@ -397,15 +398,7 @@ public Path getFilePath(String name) {
}
private FSDirectory getLocalFSDirectory() {
- FSDirectory localFSDirectory;
- if (localDirectory instanceof FSDirectory) {
- localFSDirectory = (FSDirectory) localDirectory;
- } else {
- // In this case it should be a FilterDirectory wrapped over FSDirectory as per above validation.
- localFSDirectory = (FSDirectory) (((FilterDirectory) localDirectory).getDelegate());
- }
-
- return localFSDirectory;
+ return DirectoryUtils.getFSDirectory(localDirectory);
}
/**
@@ -423,9 +416,11 @@ private void validate(Directory localDirectory, Directory remoteDirectory, FileC
if (fileCache == null) throw new IllegalStateException(
"File Cache not initialized on this Node, cannot create Composite Directory without FileCache"
);
- if (localDirectory instanceof FSDirectory == false
- && !(localDirectory instanceof FilterDirectory && ((FilterDirectory) localDirectory).getDelegate() instanceof FSDirectory))
+ try {
+ DirectoryUtils.getFSDirectory(localDirectory);
+ } catch (IllegalStateException e) {
throw new IllegalStateException("For Composite Directory, local directory must be of type FSDirectory");
+ }
if (remoteDirectory instanceof RemoteSegmentStoreDirectory == false) throw new IllegalStateException(
"For Composite Directory, remote directory must be of type RemoteSegmentStoreDirectory"
);
diff --git a/server/src/main/java/org/opensearch/index/store/DataFormatAwareStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/DataFormatAwareStoreDirectory.java
index 24065799c537e..7d4dd86621c3f 100644
--- a/server/src/main/java/org/opensearch/index/store/DataFormatAwareStoreDirectory.java
+++ b/server/src/main/java/org/opensearch/index/store/DataFormatAwareStoreDirectory.java
@@ -68,7 +68,7 @@
* @opensearch.api
*/
@PublicApi(since = "3.0.0")
-public class DataFormatAwareStoreDirectory extends FilterDirectory {
+public class DataFormatAwareStoreDirectory extends FilterDirectory implements RemoteSyncAwareDirectory {
private static final Logger logger = LogManager.getLogger(DataFormatAwareStoreDirectory.class);
@@ -81,9 +81,10 @@ public class DataFormatAwareStoreDirectory extends FilterDirectory {
private static final FormatChecksumStrategy DEFAULT_CHECKSUM_STRATEGY = new GenericCRC32ChecksumHandler();
/**
- * Constructs a DataFormatAwareStoreDirectory with a {@link DataFormatRegistry} for format-aware
- * checksum calculation and other format-specific operations.
+ * Constructs a DataFormatAwareStoreDirectory that auto-wraps the delegate in
+ * {@link SubdirectoryAwareDirectory}.
*
+ * @param indexSettings the index settings
* @param delegate the underlying FSDirectory (typically for <shard>/index/)
* @param shardPath the shard path for resolving subdirectories
* @param dataFormatRegistry registry providing format-specific checksum handlers
@@ -94,7 +95,31 @@ public DataFormatAwareStoreDirectory(
ShardPath shardPath,
DataFormatRegistry dataFormatRegistry
) {
- super(new SubdirectoryAwareDirectory(delegate, shardPath));
+ this(indexSettings, new SubdirectoryAwareDirectory(delegate, shardPath), shardPath, dataFormatRegistry, false);
+ }
+
+ /**
+ * Constructs a DataFormatAwareStoreDirectory with a pre-built delegate directory.
+ *
+ * Unlike the primary constructor which auto-wraps the delegate in
+ * {@link SubdirectoryAwareDirectory}, this constructor uses the delegate as-is.
+ * This is intended for warm nodes where the delegate is already a
+ * TieredSubdirectoryAwareDirectory (which wraps SubdirectoryAwareDirectory internally).
+ *
+ * @param indexSettings the index settings
+ * @param delegate the pre-built directory (e.g., TieredSubdirectoryAwareDirectory)
+ * @param shardPath the shard path for resolving subdirectories
+ * @param dataFormatRegistry registry providing format-specific checksum handlers
+ * @param directDelegate marker flag; when {@code true}, delegate is used directly without wrapping
+ */
+ public DataFormatAwareStoreDirectory(
+ IndexSettings indexSettings,
+ Directory delegate,
+ ShardPath shardPath,
+ DataFormatRegistry dataFormatRegistry,
+ boolean directDelegate
+ ) {
+ super(delegate);
this.shardPath = shardPath;
Map descriptors = dataFormatRegistry.getFormatDescriptors(indexSettings);
this.checksumStrategies = new HashMap<>();
@@ -104,7 +129,8 @@ public DataFormatAwareStoreDirectory(
this.checksumStrategies.put(DEFAULT_FORMAT, new LuceneChecksumHandler());
logger.debug(
- "Created DataFormatAwareStoreDirectory for shard {} with checksum strategies for formats: {}",
+ "Created DataFormatAwareStoreDirectory (directDelegate={}) for shard {} with checksum strategies for formats: {}",
+ directDelegate,
shardPath.getShardId(),
checksumStrategies.keySet()
);
@@ -141,6 +167,16 @@ private String resolveFileName(String fileName) {
return fileName;
}
+ @Override
+ public void afterSyncToRemote(String file) {
+ Directory inner = getDelegate();
+ if (inner instanceof RemoteSyncAwareDirectory) {
+ ((RemoteSyncAwareDirectory) inner).afterSyncToRemote(file);
+ }
+ // On hot: inner is SubdirectoryAwareDirectory → not RemoteSyncAwareDirectory → no-op
+ // On warm: inner is TieredSubdirectoryAwareDirectory → implements it → delegates
+ }
+
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
return in.openInput(resolveFileName(name), context);
diff --git a/server/src/main/java/org/opensearch/index/store/DataFormatAwareStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/DataFormatAwareStoreDirectoryFactory.java
index b633a00ca67eb..f399d1342b84a 100644
--- a/server/src/main/java/org/opensearch/index/store/DataFormatAwareStoreDirectoryFactory.java
+++ b/server/src/main/java/org/opensearch/index/store/DataFormatAwareStoreDirectoryFactory.java
@@ -13,7 +13,9 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.dataformat.DataFormatRegistry;
import org.opensearch.index.shard.ShardPath;
+import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.plugins.IndexStorePlugin;
+import org.opensearch.threadpool.ThreadPool;
import java.io.IOException;
@@ -29,7 +31,6 @@
* @opensearch.experimental
*/
@ExperimentalApi
-@FunctionalInterface
public interface DataFormatAwareStoreDirectoryFactory {
/**
@@ -57,4 +58,38 @@ DataFormatAwareStoreDirectory newDataFormatAwareStoreDirectory(
IndexStorePlugin.DirectoryFactory localDirectoryFactory,
DataFormatRegistry dataFormatRegistry
) throws IOException;
+
+ /**
+ * Creates a new DataFormatAwareStoreDirectory for warm nodes with tiered storage support.
+ *
+ * This overload accepts additional parameters needed for warm node directory creation,
+ * including the remote directory, file cache, and thread pool. The default implementation
+ * delegates to the 5-parameter method, ignoring the warm-specific parameters.
+ *
+ *
Implementations that support warm+format (e.g., TieredDataFormatAwareStoreDirectoryFactory)
+ * should override this method to build the full tiered directory stack.
+ *
+ * @param indexSettings the shard's index settings
+ * @param shardId the shard identifier
+ * @param shardPath the path the shard is using for file storage
+ * @param localDirectoryFactory the factory for creating the underlying local directory
+ * @param dataFormatRegistry registry of available data format plugins
+ * @param remoteDirectory the remote segment store directory
+ * @param fileCache the file cache for warm node caching
+ * @param threadPool the thread pool for async operations
+ * @return a new DataFormatAwareStoreDirectory instance
+ * @throws IOException if directory creation fails
+ */
+ default DataFormatAwareStoreDirectory newDataFormatAwareStoreDirectory(
+ IndexSettings indexSettings,
+ ShardId shardId,
+ ShardPath shardPath,
+ IndexStorePlugin.DirectoryFactory localDirectoryFactory,
+ DataFormatRegistry dataFormatRegistry,
+ RemoteSegmentStoreDirectory remoteDirectory,
+ FileCache fileCache,
+ ThreadPool threadPool
+ ) throws IOException {
+ return newDataFormatAwareStoreDirectory(indexSettings, shardId, shardPath, localDirectoryFactory, dataFormatRegistry);
+ }
}
diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSyncAwareDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSyncAwareDirectory.java
new file mode 100644
index 0000000000000..0b5f622bac005
--- /dev/null
+++ b/server/src/main/java/org/opensearch/index/store/RemoteSyncAwareDirectory.java
@@ -0,0 +1,44 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.index.store;
+
+import org.opensearch.common.annotation.ExperimentalApi;
+
+/**
+ * Interface for directories that need notification after files are synced to the remote store.
+ *
+ *
When a file is uploaded to the remote segment store, the uploader service walks the
+ * {@link org.apache.lucene.store.FilterDirectory} chain to find a directory implementing this
+ * interface and calls {@link #afterSyncToRemote(String)}. This allows each directory layer to
+ * react to the sync event — for example, unpinning a file from the local cache so it becomes
+ * eligible for eviction, or updating a file registry to reflect the new remote location.
+ *
+ *
Implemented by:
+ *
+ * - {@link CompositeDirectory} — unpins files from FileCache after upload
+ * - TieredSubdirectoryAwareDirectory — delegates to format-specific handlers
+ * - {@link DataFormatAwareStoreDirectory} — pass-through to inner directory
+ *
+ *
+ * @opensearch.experimental
+ */
+@ExperimentalApi
+public interface RemoteSyncAwareDirectory {
+
+ /**
+ * Called after a file has been successfully uploaded to the remote store.
+ *
+ * Implementations should use this callback to update internal state related to the
+ * file's remote availability — such as unpinning from a local cache, marking the file
+ * as remotely available in a registry, or forwarding the notification to a delegate.
+ *
+ * @param file the name of the file that was synced to remote
+ */
+ void afterSyncToRemote(String file);
+}
diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java
index eaa62fb9f9526..ff8a8dae9e49b 100644
--- a/server/src/main/java/org/opensearch/node/Node.java
+++ b/server/src/main/java/org/opensearch/node/Node.java
@@ -170,6 +170,7 @@
import org.opensearch.index.remote.RemoteIndexPathUploader;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.store.DefaultCompositeDirectoryFactory;
+import org.opensearch.index.store.DefaultDataFormatAwareStoreDirectoryFactory;
import org.opensearch.index.store.IndexStoreListener;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
@@ -270,6 +271,8 @@
import org.opensearch.snapshots.SnapshotShardsService;
import org.opensearch.snapshots.SnapshotsInfoService;
import org.opensearch.snapshots.SnapshotsService;
+import org.opensearch.storage.directory.TieredDataFormatAwareStoreDirectoryFactory;
+import org.opensearch.storage.prefetch.TieredStoragePrefetchSettings;
import org.opensearch.storage.tiering.HotToWarmTieringService;
import org.opensearch.storage.tiering.WarmToHotTieringService;
import org.opensearch.task.commons.clients.TaskManagerClient;
@@ -940,9 +943,15 @@ protected Node(final Environment initialEnvironment, Collection clas
new HashMap<>();
// Register default factory
+ dataFormatAwareStoreDirectoryFactories.put("default", new DefaultDataFormatAwareStoreDirectoryFactory());
+
+ // Register tiered factory for warm+format indices
+ final TieredStoragePrefetchSettings tieredStoragePrefetchSettings = new TieredStoragePrefetchSettings(
+ settingsModule.getClusterSettings()
+ );
dataFormatAwareStoreDirectoryFactories.put(
- "default",
- new org.opensearch.index.store.DefaultDataFormatAwareStoreDirectoryFactory()
+ TieredDataFormatAwareStoreDirectoryFactory.FACTORY_KEY,
+ new TieredDataFormatAwareStoreDirectoryFactory(() -> tieredStoragePrefetchSettings)
);
final Map recoveryStateFactories = pluginsService.filterPlugins(
diff --git a/server/src/main/java/org/opensearch/storage/directory/TieredDataFormatAwareStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/storage/directory/TieredDataFormatAwareStoreDirectoryFactory.java
new file mode 100644
index 0000000000000..1c317f7e7c6f5
--- /dev/null
+++ b/server/src/main/java/org/opensearch/storage/directory/TieredDataFormatAwareStoreDirectoryFactory.java
@@ -0,0 +1,177 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.storage.directory;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.lucene.store.Directory;
+import org.opensearch.common.annotation.ExperimentalApi;
+import org.opensearch.core.index.shard.ShardId;
+import org.opensearch.index.IndexSettings;
+import org.opensearch.index.engine.dataformat.DataFormatDescriptor;
+import org.opensearch.index.engine.dataformat.DataFormatRegistry;
+import org.opensearch.index.shard.ShardPath;
+import org.opensearch.index.store.DataFormatAwareStoreDirectory;
+import org.opensearch.index.store.DataFormatAwareStoreDirectoryFactory;
+import org.opensearch.index.store.RemoteSegmentStoreDirectory;
+import org.opensearch.index.store.SubdirectoryAwareDirectory;
+import org.opensearch.index.store.remote.filecache.FileCache;
+import org.opensearch.plugins.IndexStorePlugin;
+import org.opensearch.storage.prefetch.TieredStoragePrefetchSettings;
+import org.opensearch.threadpool.ThreadPool;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/**
+ * Factory for creating the warm+format directory stack.
+ *
+ * This factory builds the full tiered directory stack for warm nodes with pluggable data format
+ * support. The resulting directory stack is:
+ *
+ * DataFormatAwareStoreDirectory (checksums, format metadata)
+ * → TieredSubdirectoryAwareDirectory (format routing + tiered storage)
+ * ├── wraps: SubdirectoryAwareDirectory → FSDirectory
+ * ├── holds: TieredDirectory(SubdirectoryAwareDirectory, RemoteDir, FileCache, ThreadPool)
+ * └── holds: Map<String, DataFormatDirectoryDelegator>
+ *
+ *
+ * This factory is only used for warm+format indices. It is always registered in Node.java
+ * under the key "dataformat-tiered", but IndexModule only selects it when both
+ * {@code isWarmIndex()} and {@code isPluggableDataFormatEnabled()} are true.
+ *
+ * @opensearch.experimental
+ */
+@ExperimentalApi
+public class TieredDataFormatAwareStoreDirectoryFactory implements DataFormatAwareStoreDirectoryFactory {
+
+ /** Factory key for the warm+format tiered directory stack. */
+ public static final String FACTORY_KEY = "dataformat-tiered";
+
+ private static final Logger logger = LogManager.getLogger(TieredDataFormatAwareStoreDirectoryFactory.class);
+
+ private final Supplier tieredStoragePrefetchSettingsSupplier;
+
+ /**
+ * Creates a new TieredDataFormatAwareStoreDirectoryFactory with the given prefetch settings supplier.
+ *
+ * @param tieredStoragePrefetchSettingsSupplier supplier for tiered storage prefetch settings
+ */
+ public TieredDataFormatAwareStoreDirectoryFactory(Supplier tieredStoragePrefetchSettingsSupplier) {
+ this.tieredStoragePrefetchSettingsSupplier = tieredStoragePrefetchSettingsSupplier;
+ }
+
+ /**
+ * Hot path: not supported by this factory. This factory is only for warm+format indices.
+ *
+ * @throws UnsupportedOperationException always — use the warm-aware overload instead
+ */
+ @Override
+ public DataFormatAwareStoreDirectory newDataFormatAwareStoreDirectory(
+ IndexSettings indexSettings,
+ ShardId shardId,
+ ShardPath shardPath,
+ IndexStorePlugin.DirectoryFactory localDirectoryFactory,
+ DataFormatRegistry dataFormatRegistry
+ ) throws IOException {
+ throw new UnsupportedOperationException(
+ "TieredDataFormatAwareStoreDirectoryFactory requires warm parameters "
+ + "(remoteDirectory, fileCache, threadPool). Use the warm-aware overload."
+ );
+ }
+
+ /**
+ * Creates the warm+format directory stack.
+ *
+ * Builds: FSDirectory → SubdirectoryAwareDirectory → TieredSubdirectoryAwareDirectory
+ * → DataFormatAwareStoreDirectory (direct delegate constructor).
+ *
+ * @param indexSettings the shard's index settings
+ * @param shardId the shard identifier
+ * @param shardPath the path the shard is using for file storage
+ * @param localDirectoryFactory the factory for creating the underlying local directory
+ * @param dataFormatRegistry registry of available data format plugins
+ * @param remoteDirectory the remote segment store directory
+ * @param fileCache the file cache for warm node caching
+ * @param threadPool the thread pool for async operations
+ * @return a new DataFormatAwareStoreDirectory wrapping the tiered directory stack
+ * @throws IOException if directory creation fails
+ */
+ @Override
+ public DataFormatAwareStoreDirectory newDataFormatAwareStoreDirectory(
+ IndexSettings indexSettings,
+ ShardId shardId,
+ ShardPath shardPath,
+ IndexStorePlugin.DirectoryFactory localDirectoryFactory,
+ DataFormatRegistry dataFormatRegistry,
+ RemoteSegmentStoreDirectory remoteDirectory,
+ FileCache fileCache,
+ ThreadPool threadPool
+ ) throws IOException {
+ logger.debug("Creating warm+format directory stack for shard [{}]", shardId);
+
+ // 1. Create local directory via factory
+ Directory localDir = localDirectoryFactory.newDirectory(indexSettings, shardPath);
+
+ // 2. Wrap in SubdirectoryAwareDirectory for path routing
+ SubdirectoryAwareDirectory subdirAware = new SubdirectoryAwareDirectory(localDir, shardPath);
+
+ // 3. Build a per-format TieredDirectory for each registered data format.
+ // Each format gets its own TieredDirectory instance so that format-specific
+ // prefetch and caching behavior can be configured independently.
+ // If a format declares a custom remoteStoreType (e.g., "native-s3"), the
+ // factory resolves it to a repository at shard creation time. Otherwise the
+ // format shares the default RemoteSegmentStoreDirectory.
+ Map descriptors = dataFormatRegistry.getFormatDescriptors(indexSettings);
+ Map formatDirectories = new HashMap<>();
+ for (Map.Entry entry : descriptors.entrySet()) {
+ String formatName = entry.getKey();
+ DataFormatDescriptor descriptor = entry.getValue();
+
+ // If the format needs native store for warm reads, the factory obtains
+ // the NativeStoreRepository from the shard's repository (which already
+ // has it wired via NativeRemoteObjectStoreProvider at repository creation).
+ // TODO: Get NativeStoreRepository from remoteDirectory's underlying repository
+ // and wire it as the remote directory for this format's TieredDirectory.
+ if (descriptor.nativeStoreSupported()) {
+ logger.debug(
+ "Format [{}] requires native store for warm reads — will be wired when Native Repository integration is implemented",
+ formatName
+ );
+ }
+
+ TieredDirectory formatTiered = new TieredDirectory(
+ subdirAware,
+ remoteDirectory,
+ fileCache,
+ threadPool,
+ tieredStoragePrefetchSettingsSupplier
+ );
+ formatDirectories.put(formatName, formatTiered);
+ logger.debug("Created TieredDirectory for format [{}] on shard [{}]", formatName, shardId);
+ }
+
+ // 4. Create TieredSubdirectoryAwareDirectory
+ TieredSubdirectoryAwareDirectory tieredSubdir = new TieredSubdirectoryAwareDirectory(
+ subdirAware,
+ remoteDirectory,
+ fileCache,
+ threadPool,
+ formatDirectories,
+ tieredStoragePrefetchSettingsSupplier
+ );
+
+ logger.debug("Created warm+format directory stack for shard [{}] with format directories: {}", shardId, formatDirectories.keySet());
+
+ // 5. Wrap in DataFormatAwareStoreDirectory (direct delegate constructor — no double wrapping)
+ return new DataFormatAwareStoreDirectory(indexSettings, tieredSubdir, shardPath, dataFormatRegistry, true);
+ }
+}
diff --git a/server/src/main/java/org/opensearch/storage/directory/TieredDirectory.java b/server/src/main/java/org/opensearch/storage/directory/TieredDirectory.java
index 41d29d4031fe1..39cb5fd6e5c3e 100644
--- a/server/src/main/java/org/opensearch/storage/directory/TieredDirectory.java
+++ b/server/src/main/java/org/opensearch/storage/directory/TieredDirectory.java
@@ -12,7 +12,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.index.store.CompositeDirectory;
@@ -24,6 +23,7 @@
import org.opensearch.storage.indexinput.SwitchableIndexInput;
import org.opensearch.storage.indexinput.SwitchableIndexInputWrapper;
import org.opensearch.storage.prefetch.TieredStoragePrefetchSettings;
+import org.opensearch.storage.utils.DirectoryUtils;
import org.opensearch.threadpool.ThreadPool;
import java.io.IOException;
@@ -229,7 +229,7 @@ protected void cacheFile(String fileName, boolean cacheFromRemote) throws IOExce
new CachedSwitchableIndexInput(
fileCache,
fileName,
- (FSDirectory) localDirectory,
+ DirectoryUtils.getFSDirectory(localDirectory),
remoteDirectory,
transferManager,
cacheFromRemote,
diff --git a/server/src/main/java/org/opensearch/storage/directory/TieredSubdirectoryAwareDirectory.java b/server/src/main/java/org/opensearch/storage/directory/TieredSubdirectoryAwareDirectory.java
new file mode 100644
index 0000000000000..9e25d6a867b7e
--- /dev/null
+++ b/server/src/main/java/org/opensearch/storage/directory/TieredSubdirectoryAwareDirectory.java
@@ -0,0 +1,176 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.storage.directory;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.opensearch.common.annotation.ExperimentalApi;
+import org.opensearch.common.util.io.IOUtils;
+import org.opensearch.index.store.DataFormatAwareStoreDirectory;
+import org.opensearch.index.store.RemoteSyncAwareDirectory;
+import org.opensearch.index.store.SubdirectoryAwareDirectory;
+import org.opensearch.index.store.remote.filecache.FileCache;
+import org.opensearch.storage.prefetch.TieredStoragePrefetchSettings;
+import org.opensearch.threadpool.ThreadPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**
+ * A tiered directory that combines subdirectory-aware local storage with remote tiered storage
+ * and per-format directory routing.
+ *
+ * This directory extends {@link FilterDirectory} wrapping a {@link SubdirectoryAwareDirectory}
+ * and implements {@link RemoteSyncAwareDirectory} for remote sync notifications. It routes file
+ * operations based on data format:
+ *
+ * - Files with a format-specific directory (e.g., parquet) are routed to that directory
+ * - Files without a format directory (e.g., Lucene) are routed to the internal {@link TieredDirectory}
+ *
+ *
+ * The directory stack for warm+format nodes:
+ *
+ * DataFormatAwareStoreDirectory (checksums, format metadata)
+ * → TieredSubdirectoryAwareDirectory (this class — format routing)
+ * ├── wraps: SubdirectoryAwareDirectory → FSDirectory (local path routing)
+ * ├── holds: TieredDirectory(SubdirectoryAwareDirectory, RemoteDir, FileCache, ThreadPool)
+ * └── holds: Map<String, Directory> (per-format directories from DataFormatDescriptor)
+ *
+ *
+ * @opensearch.experimental
+ */
+@ExperimentalApi
+public class TieredSubdirectoryAwareDirectory extends FilterDirectory implements RemoteSyncAwareDirectory {
+
+ private static final Logger logger = LogManager.getLogger(TieredSubdirectoryAwareDirectory.class);
+
+ private final TieredDirectory tieredDirectory;
+ private final Map formatDirectories;
+
+ /**
+ * Constructs a TieredSubdirectoryAwareDirectory.
+ *
+ * @param localDirectory the subdirectory-aware local directory (used as FilterDirectory delegate)
+ * @param remoteDirectory the remote segment store directory
+ * @param fileCache the file cache for warm node caching
+ * @param threadPool the thread pool for async operations
+ * @param formatDirectories per-format directories (format name → directory), from DataFormatDescriptor
+ * @param tieredStoragePrefetchSettingsSupplier supplier for prefetch settings
+ */
+ public TieredSubdirectoryAwareDirectory(
+ SubdirectoryAwareDirectory localDirectory,
+ Directory remoteDirectory,
+ FileCache fileCache,
+ ThreadPool threadPool,
+ Map formatDirectories,
+ Supplier tieredStoragePrefetchSettingsSupplier
+ ) {
+ super(localDirectory);
+ this.formatDirectories = formatDirectories;
+ this.tieredDirectory = new TieredDirectory(
+ localDirectory,
+ remoteDirectory,
+ fileCache,
+ threadPool,
+ tieredStoragePrefetchSettingsSupplier
+ );
+ logger.debug("Created TieredSubdirectoryAwareDirectory with format directories: {}", formatDirectories.keySet());
+ }
+
+ @Override
+ public IndexInput openInput(String name, IOContext context) throws IOException {
+ Directory formatDir = resolveFormatDirectory(name);
+ if (formatDir != null) {
+ return formatDir.openInput(name, context);
+ }
+ return tieredDirectory.openInput(name, context);
+ }
+
+ @Override
+ public long fileLength(String name) throws IOException {
+ Directory formatDir = resolveFormatDirectory(name);
+ if (formatDir != null) {
+ return formatDir.fileLength(name);
+ }
+ return tieredDirectory.fileLength(name);
+ }
+
+ @Override
+ public String[] listAll() throws IOException {
+ Set all = new HashSet<>(Arrays.asList(tieredDirectory.listAll()));
+ for (Directory formatDir : formatDirectories.values()) {
+ Collections.addAll(all, formatDir.listAll());
+ }
+ return all.stream().sorted().toArray(String[]::new);
+ }
+
+ @Override
+ public IndexOutput createOutput(String name, IOContext context) throws IOException {
+ Directory formatDir = resolveFormatDirectory(name);
+ if (formatDir != null) {
+ return formatDir.createOutput(name, context);
+ }
+ return tieredDirectory.createOutput(name, context);
+ }
+
+ @Override
+ public void deleteFile(String name) throws IOException {
+ Directory formatDir = resolveFormatDirectory(name);
+ if (formatDir != null) {
+ formatDir.deleteFile(name);
+ } else {
+ tieredDirectory.deleteFile(name);
+ }
+ }
+
+ @Override
+ public void afterSyncToRemote(String file) {
+ Directory formatDir = resolveFormatDirectory(file);
+ if (formatDir != null) {
+ if (formatDir instanceof RemoteSyncAwareDirectory) {
+ ((RemoteSyncAwareDirectory) formatDir).afterSyncToRemote(file);
+ }
+ // else: format directory doesn't support sync notifications — no-op
+ } else {
+ tieredDirectory.afterSyncToRemote(file);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ List toClose = new ArrayList<>(formatDirectories.values());
+ toClose.add(tieredDirectory);
+ IOUtils.close(toClose);
+ }
+
+ /**
+ * Resolves the format-specific directory for the given file name by parsing the data format.
+ *
+ * @param name the file name or identifier
+ * @return the directory for the file's format, or {@code null} if no format directory is registered
+ */
+ @SuppressWarnings("resource")
+ private Directory resolveFormatDirectory(String name) {
+ String format = DataFormatAwareStoreDirectory.toFileMetadata(name).dataFormat();
+ return formatDirectories.get(format);
+ }
+}
diff --git a/server/src/main/java/org/opensearch/storage/utils/DirectoryUtils.java b/server/src/main/java/org/opensearch/storage/utils/DirectoryUtils.java
index 47d6f012d580a..88d15a5e5cecc 100644
--- a/server/src/main/java/org/opensearch/storage/utils/DirectoryUtils.java
+++ b/server/src/main/java/org/opensearch/storage/utils/DirectoryUtils.java
@@ -10,6 +10,7 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.FilterDirectory;
import java.nio.file.Path;
@@ -20,11 +21,30 @@ public class DirectoryUtils {
public static final String SWITCHABLE_PREFIX = "_switchable";
+ /**
+ * Walks the {@link FilterDirectory} chain to find the underlying {@link FSDirectory}.
+ * Returns immediately if the given directory is already an FSDirectory.
+ *
+ * @param dir the directory to unwrap
+ * @return the underlying FSDirectory
+ * @throws IllegalStateException if no FSDirectory is found in the chain
+ */
+ public static FSDirectory getFSDirectory(Directory dir) {
+ Directory current = dir;
+ while (current instanceof FilterDirectory) {
+ current = ((FilterDirectory) current).getDelegate();
+ }
+ if (current instanceof FSDirectory) {
+ return (FSDirectory) current;
+ }
+ throw new IllegalStateException("No FSDirectory found in directory chain: " + dir.getClass().getName());
+ }
+
public static Path getFilePath(Directory localDirectory, String fileName) {
- return ((FSDirectory) localDirectory).getDirectory().resolve(fileName);
+ return getFSDirectory(localDirectory).getDirectory().resolve(fileName);
}
public static Path getFilePathSwitchable(Directory localDirectory, String fileName) {
- return ((FSDirectory) localDirectory).getDirectory().resolve(fileName + SWITCHABLE_PREFIX);
+ return getFSDirectory(localDirectory).getDirectory().resolve(fileName + SWITCHABLE_PREFIX);
}
}
diff --git a/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatRegistryTests.java b/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatRegistryTests.java
index 94ca8d727c56a..8832b3cf23f82 100644
--- a/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatRegistryTests.java
+++ b/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatRegistryTests.java
@@ -8,6 +8,7 @@
package org.opensearch.index.engine.dataformat;
+import org.apache.lucene.store.Directory;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
@@ -286,4 +287,5 @@ public void testGetRegisteredFormatsIsUnmodifiable() {
expectThrows(UnsupportedOperationException.class, () -> formats.add(new MockDataFormat("new", 1L, Set.of())));
}
+
}
diff --git a/server/src/test/java/org/opensearch/index/store/DataFormatAwareStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/DataFormatAwareStoreDirectoryTests.java
index ba795396451b5..7f1d7677fa83a 100644
--- a/server/src/test/java/org/opensearch/index/store/DataFormatAwareStoreDirectoryTests.java
+++ b/server/src/test/java/org/opensearch/index/store/DataFormatAwareStoreDirectoryTests.java
@@ -989,4 +989,65 @@ public void testGetDataFormat_comprehensive() {
assertEquals("orc", dataFormatAwareStoreDirectory.getDataFormat("orc/data.orc"));
assertEquals("custom", dataFormatAwareStoreDirectory.getDataFormat("custom/myfile.dat"));
}
+
+ public void testAfterSyncToRemoteWithNonRemoteSyncAwareDelegate() {
+ // Default constructor wraps delegate in SubdirectoryAwareDirectory which does NOT
+ // implement RemoteSyncAwareDirectory → afterSyncToRemote should be a no-op
+ dataFormatAwareStoreDirectory.afterSyncToRemote("_0.cfe");
+ // No exception = pass. The inner SubdirectoryAwareDirectory is not RemoteSyncAwareDirectory.
+ }
+
+ public void testAfterSyncToRemoteWithRemoteSyncAwareDelegate() {
+ // Create a DataFormatAwareStoreDirectory with a RemoteSyncAwareDirectory delegate
+ RemoteSyncAwareDirectory mockDelegate = mock(RemoteSyncAwareDirectory.class);
+ org.apache.lucene.store.Directory mockDir = mock(org.apache.lucene.store.Directory.class);
+
+ // We need a Directory that is also RemoteSyncAwareDirectory — use the abstract helper
+ RemoteSyncAwareMockDirectory syncAwareDir = mock(RemoteSyncAwareMockDirectory.class);
+
+ PluginsService pluginsService = mock(PluginsService.class);
+ when(pluginsService.filterPlugins(DataFormatPlugin.class)).thenReturn(List.of());
+ when(pluginsService.filterPlugins(SearchBackEndPlugin.class)).thenReturn(List.of());
+ DataFormatRegistry registry = new DataFormatRegistry(pluginsService);
+
+ Settings settings = Settings.builder()
+ .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
+ .put(IndexMetadata.SETTING_INDEX_UUID, "test-uuid")
+ .build();
+ IndexMetadata metadata = IndexMetadata.builder("test-index").settings(settings).numberOfShards(1).numberOfReplicas(0).build();
+ IndexSettings idxSettings = new IndexSettings(metadata, Settings.EMPTY);
+
+ DataFormatAwareStoreDirectory dir = new DataFormatAwareStoreDirectory(idxSettings, syncAwareDir, shardPath, registry, true);
+ dir.afterSyncToRemote("_0.cfe");
+ org.mockito.Mockito.verify(syncAwareDir).afterSyncToRemote("_0.cfe");
+ }
+
+ public void testDirectDelegateConstructorDoesNotDoubleWrap() throws IOException {
+ // The directDelegate=true constructor should use the delegate as-is
+ PluginsService pluginsService = mock(PluginsService.class);
+ when(pluginsService.filterPlugins(DataFormatPlugin.class)).thenReturn(List.of());
+ when(pluginsService.filterPlugins(SearchBackEndPlugin.class)).thenReturn(List.of());
+ DataFormatRegistry registry = new DataFormatRegistry(pluginsService);
+
+ Settings settings = Settings.builder()
+ .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
+ .put(IndexMetadata.SETTING_INDEX_UUID, "test-uuid")
+ .build();
+ IndexMetadata metadata = IndexMetadata.builder("test-index").settings(settings).numberOfShards(1).numberOfReplicas(0).build();
+ IndexSettings idxSettings = new IndexSettings(metadata, Settings.EMPTY);
+
+ SubdirectoryAwareDirectory subdirAware = new SubdirectoryAwareDirectory(fsDirectory, shardPath);
+ DataFormatAwareStoreDirectory dir = new DataFormatAwareStoreDirectory(idxSettings, subdirAware, shardPath, registry, true);
+
+ // The delegate should be the SubdirectoryAwareDirectory directly, not wrapped again
+ org.apache.lucene.store.Directory delegate = org.apache.lucene.store.FilterDirectory.unwrap(dir);
+ // unwrap goes all the way to the leaf — should be FSDirectory
+ assertTrue("Leaf should be FSDirectory", delegate instanceof FSDirectory);
+ dir.close();
+ }
+
+ /**
+ * Helper interface for mocking a Directory that also implements RemoteSyncAwareDirectory.
+ */
+ abstract static class RemoteSyncAwareMockDirectory extends org.apache.lucene.store.Directory implements RemoteSyncAwareDirectory {}
}
diff --git a/server/src/test/java/org/opensearch/storage/directory/GracefulDegradationTests.java b/server/src/test/java/org/opensearch/storage/directory/GracefulDegradationTests.java
new file mode 100644
index 0000000000000..2bcf69a499286
--- /dev/null
+++ b/server/src/test/java/org/opensearch/storage/directory/GracefulDegradationTests.java
@@ -0,0 +1,192 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.storage.directory;
+
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.FilterDirectory;
+import org.opensearch.Version;
+import org.opensearch.cluster.metadata.IndexMetadata;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.core.index.Index;
+import org.opensearch.core.index.shard.ShardId;
+import org.opensearch.index.IndexSettings;
+import org.opensearch.index.engine.dataformat.DataFormatRegistry;
+import org.opensearch.index.shard.ShardPath;
+import org.opensearch.index.store.DataFormatAwareStoreDirectory;
+import org.opensearch.index.store.RemoteDirectory;
+import org.opensearch.index.store.RemoteSegmentStoreDirectory;
+import org.opensearch.index.store.SubdirectoryAwareDirectory;
+import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
+import org.opensearch.index.store.remote.filecache.FileCache;
+import org.opensearch.index.store.remote.filecache.FileCacheFactory;
+import org.opensearch.plugins.IndexStorePlugin;
+import org.opensearch.storage.prefetch.TieredStoragePrefetchSettings;
+import org.opensearch.test.OpenSearchTestCase;
+import org.opensearch.threadpool.ThreadPool;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for graceful degradation when sandbox plugins are not loaded.
+ *
+ * When no data format plugins provide tiered directories (e.g., sandbox not loaded),
+ * the warm directory stack should still function correctly using only the default
+ * TieredDirectory for Lucene files. No errors should occur.
+ */
+public class GracefulDegradationTests extends OpenSearchTestCase {
+
+ private Supplier getMockPrefetchSettingsSupplier() {
+ return () -> {
+ TieredStoragePrefetchSettings settings = mock(TieredStoragePrefetchSettings.class);
+ when(settings.getReadAheadBlockCount()).thenReturn(TieredStoragePrefetchSettings.DEFAULT_READ_AHEAD_BLOCK_COUNT);
+ when(settings.getReadAheadEnableFileFormats()).thenReturn(TieredStoragePrefetchSettings.READ_AHEAD_ENABLE_FILE_FORMATS);
+ when(settings.isStoredFieldsPrefetchEnabled()).thenReturn(true);
+ return settings;
+ };
+ }
+
+ /**
+ * Tests that when DataFormatRegistry returns empty tiered directories (simulating
+ * sandbox not loaded), the factory creates a valid directory stack that works
+ * for plain Lucene warm operations without errors.
+ */
+ public void testNoFormatPluginsCreatesValidStack() throws IOException {
+ Path tempDir = createTempDir();
+ Index index = new Index("test-degradation", "test-uuid");
+ ShardId shardId = new ShardId(index, 0);
+
+ Path shardStatePath = tempDir.resolve("state").resolve("test-uuid").resolve("0");
+ Path shardDataPath = tempDir.resolve("data").resolve("test-uuid").resolve("0");
+ Files.createDirectories(shardStatePath);
+ Files.createDirectories(shardDataPath);
+ Files.createDirectories(shardDataPath.resolve("index"));
+
+ ShardPath shardPath = new ShardPath(false, shardDataPath, shardStatePath, shardId);
+
+ Settings settings = Settings.builder()
+ .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
+ .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
+ .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
+ .build();
+ IndexMetadata indexMetadata = IndexMetadata.builder("test-degradation").settings(settings).build();
+ IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY);
+
+ // Simulate sandbox not loaded: DataFormatRegistry returns empty format descriptors
+ DataFormatRegistry registry = mock(DataFormatRegistry.class);
+ when(registry.getFormatDescriptors(any())).thenReturn(Map.of());
+
+ FSDirectory fsDir = FSDirectory.open(shardPath.resolveIndex());
+ IndexStorePlugin.DirectoryFactory localDirFactory = mock(IndexStorePlugin.DirectoryFactory.class);
+ when(localDirFactory.newDirectory(any(), any())).thenReturn(fsDir);
+
+ RemoteSegmentStoreDirectory remoteDir = createRealRemoteDir(shardId);
+ FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(10_000_000, 1);
+
+ TieredDataFormatAwareStoreDirectoryFactory factory = new TieredDataFormatAwareStoreDirectoryFactory(
+ getMockPrefetchSettingsSupplier()
+ );
+
+ // Should not throw — graceful degradation
+ DataFormatAwareStoreDirectory storeDir = factory.newDataFormatAwareStoreDirectory(
+ indexSettings,
+ shardId,
+ shardPath,
+ localDirFactory,
+ registry,
+ remoteDir,
+ fileCache,
+ null
+ );
+
+ assertNotNull("Directory should be created even without format plugins", storeDir);
+
+ // Verify the stack is correct
+ Directory delegate = ((FilterDirectory) storeDir).getDelegate();
+ assertTrue(
+ "Should have TieredSubdirectoryAwareDirectory even without format plugins",
+ delegate instanceof TieredSubdirectoryAwareDirectory
+ );
+
+ Directory innerDelegate = ((FilterDirectory) delegate).getDelegate();
+ assertTrue("Should have SubdirectoryAwareDirectory", innerDelegate instanceof SubdirectoryAwareDirectory);
+
+ storeDir.close();
+ }
+
+ /**
+ * Tests that TieredSubdirectoryAwareDirectory with empty format directories
+ * routes all operations to TieredDirectory without errors.
+ */
+ public void testEmptyFormatDirectoriesRoutesToTieredDirectory() throws IOException {
+ Path tempDir = createTempDir();
+ Index index = new Index("test-empty-formats", "test-uuid");
+ ShardId shardId = new ShardId(index, 0);
+
+ Path shardStatePath = tempDir.resolve("state").resolve("test-uuid").resolve("0");
+ Path shardDataPath = tempDir.resolve("data").resolve("test-uuid").resolve("0");
+ Files.createDirectories(shardStatePath);
+ Files.createDirectories(shardDataPath);
+ Files.createDirectories(shardDataPath.resolve("index"));
+
+ ShardPath shardPath = new ShardPath(false, shardDataPath, shardStatePath, shardId);
+
+ FSDirectory fsDir = FSDirectory.open(shardPath.resolveIndex());
+ SubdirectoryAwareDirectory subdirAware = new SubdirectoryAwareDirectory(fsDir, shardPath);
+ RemoteSegmentStoreDirectory remoteDir = createRealRemoteDir(shardId);
+ FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(10_000_000, 1);
+
+ // Empty format directories — simulates no sandbox plugins
+ Map emptyFormats = new HashMap<>();
+
+ TieredSubdirectoryAwareDirectory tieredSubdir = new TieredSubdirectoryAwareDirectory(
+ subdirAware,
+ remoteDir,
+ fileCache,
+ null,
+ emptyFormats,
+ getMockPrefetchSettingsSupplier()
+ );
+
+ // listAll should work without errors
+ String[] files = tieredSubdir.listAll();
+ assertNotNull("listAll should return non-null", files);
+
+ // close should not throw
+ tieredSubdir.close();
+ }
+
+ /**
+ * Tests that the factory key constant is correctly defined.
+ */
+ public void testFactoryKeyConstant() {
+ assertEquals(
+ "Factory key should be 'dataformat-tiered'",
+ "dataformat-tiered",
+ TieredDataFormatAwareStoreDirectoryFactory.FACTORY_KEY
+ );
+ }
+
+ private RemoteSegmentStoreDirectory createRealRemoteDir(ShardId shardId) throws IOException {
+ RemoteDirectory remoteDataDir = mock(RemoteDirectory.class);
+ RemoteDirectory remoteMetadataDir = mock(RemoteDirectory.class);
+ RemoteStoreLockManager lockManager = mock(RemoteStoreLockManager.class);
+ ThreadPool tp = mock(ThreadPool.class);
+ return new RemoteSegmentStoreDirectory(remoteDataDir, remoteMetadataDir, lockManager, tp, shardId, new HashMap<>());
+ }
+}
diff --git a/server/src/test/java/org/opensearch/storage/directory/TieredDataFormatAwareStoreDirectoryFactoryTests.java b/server/src/test/java/org/opensearch/storage/directory/TieredDataFormatAwareStoreDirectoryFactoryTests.java
new file mode 100644
index 0000000000000..ba34274663005
--- /dev/null
+++ b/server/src/test/java/org/opensearch/storage/directory/TieredDataFormatAwareStoreDirectoryFactoryTests.java
@@ -0,0 +1,221 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.storage.directory;
+
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.FilterDirectory;
+import org.opensearch.Version;
+import org.opensearch.cluster.metadata.IndexMetadata;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.core.index.Index;
+import org.opensearch.core.index.shard.ShardId;
+import org.opensearch.index.IndexSettings;
+import org.opensearch.index.engine.dataformat.DataFormatRegistry;
+import org.opensearch.index.shard.ShardPath;
+import org.opensearch.index.store.DataFormatAwareStoreDirectory;
+import org.opensearch.index.store.RemoteDirectory;
+import org.opensearch.index.store.RemoteSegmentStoreDirectory;
+import org.opensearch.index.store.SubdirectoryAwareDirectory;
+import org.opensearch.index.store.remote.filecache.FileCache;
+import org.opensearch.index.store.remote.filecache.FileCacheFactory;
+import org.opensearch.plugins.IndexStorePlugin;
+import org.opensearch.storage.prefetch.TieredStoragePrefetchSettings;
+import org.opensearch.test.OpenSearchTestCase;
+import org.opensearch.threadpool.ThreadPool;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link TieredDataFormatAwareStoreDirectoryFactory}.
+ *
+ * Verifies the factory creates the correct directory stack for warm+format indices
+ * and rejects the hot path (5-param method).
+ */
+public class TieredDataFormatAwareStoreDirectoryFactoryTests extends OpenSearchTestCase {
+
+ private TieredDataFormatAwareStoreDirectoryFactory factory;
+ private IndexSettings indexSettings;
+ private ShardId shardId;
+ private ShardPath shardPath;
+ private IndexStorePlugin.DirectoryFactory localDirectoryFactory;
+ private DataFormatRegistry dataFormatRegistry;
+ private RemoteSegmentStoreDirectory remoteDirectory;
+ private FileCache fileCache;
+ private ThreadPool threadPool;
+
+ /**
+ * Sets up the factory and mock dependencies before each test.
+ */
+ @Before
+ public void setup() throws IOException {
+ Supplier prefetchSupplier = () -> {
+ TieredStoragePrefetchSettings settings = mock(TieredStoragePrefetchSettings.class);
+ when(settings.getReadAheadBlockCount()).thenReturn(TieredStoragePrefetchSettings.DEFAULT_READ_AHEAD_BLOCK_COUNT);
+ when(settings.getReadAheadEnableFileFormats()).thenReturn(TieredStoragePrefetchSettings.READ_AHEAD_ENABLE_FILE_FORMATS);
+ when(settings.isStoredFieldsPrefetchEnabled()).thenReturn(true);
+ return settings;
+ };
+ factory = new TieredDataFormatAwareStoreDirectoryFactory(prefetchSupplier);
+
+ Path tempDir = createTempDir();
+ Index index = new Index("test-index", "test-uuid");
+ shardId = new ShardId(index, 0);
+
+ // ShardPath requires: dataPath ends with /
+ Path shardStatePath = tempDir.resolve("state").resolve("test-uuid").resolve("0");
+ Path shardDataPath = tempDir.resolve("data").resolve("test-uuid").resolve("0");
+ Path indexPath = shardDataPath.resolve("index");
+ java.nio.file.Files.createDirectories(shardStatePath);
+ java.nio.file.Files.createDirectories(shardDataPath);
+ java.nio.file.Files.createDirectories(indexPath);
+ shardPath = new ShardPath(false, shardDataPath, shardStatePath, shardId);
+
+ Settings settings = Settings.builder()
+ .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
+ .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
+ .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
+ .build();
+ IndexMetadata indexMetadata = IndexMetadata.builder("test-index").settings(settings).build();
+ indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY);
+
+ FSDirectory fsDir = FSDirectory.open(indexPath);
+ localDirectoryFactory = mock(IndexStorePlugin.DirectoryFactory.class);
+ when(localDirectoryFactory.newDirectory(any(), any())).thenReturn(fsDir);
+
+ dataFormatRegistry = mock(DataFormatRegistry.class);
+ when(dataFormatRegistry.getFormatDescriptors(any())).thenReturn(new HashMap<>());
+
+ remoteDirectory = createRealRemoteSegmentStoreDirectory(shardId);
+ fileCache = FileCacheFactory.createConcurrentLRUFileCache(10_000_000, 1);
+ threadPool = mock(ThreadPool.class);
+ }
+
+ /**
+ * Creates a real RemoteSegmentStoreDirectory with mocked inner directories.
+ * RemoteSegmentStoreDirectory is a final class and cannot be mocked.
+ */
+ private RemoteSegmentStoreDirectory createRealRemoteSegmentStoreDirectory(ShardId shardId) throws IOException {
+ RemoteDirectory remoteDataDir = mock(RemoteDirectory.class);
+ RemoteDirectory remoteMetadataDir = mock(RemoteDirectory.class);
+ org.opensearch.index.store.lockmanager.RemoteStoreLockManager lockManager = mock(
+ org.opensearch.index.store.lockmanager.RemoteStoreLockManager.class
+ );
+ ThreadPool tp = mock(ThreadPool.class);
+ return new RemoteSegmentStoreDirectory(remoteDataDir, remoteMetadataDir, lockManager, tp, shardId, new HashMap<>());
+ }
+
+ /**
+ * Tests that the warm-aware factory method creates the correct directory stack:
+ * DataFormatAwareStoreDirectory wrapping TieredSubdirectoryAwareDirectory.
+ */
+ public void testCreatesCorrectDirectoryStack() throws IOException {
+ DataFormatAwareStoreDirectory result = factory.newDataFormatAwareStoreDirectory(
+ indexSettings,
+ shardId,
+ shardPath,
+ localDirectoryFactory,
+ dataFormatRegistry,
+ remoteDirectory,
+ fileCache,
+ threadPool
+ );
+
+ assertNotNull("Factory should return a non-null directory", result);
+ assertTrue("Outermost directory should be DataFormatAwareStoreDirectory", result instanceof DataFormatAwareStoreDirectory);
+
+ // The delegate of DataFormatAwareStoreDirectory should be TieredSubdirectoryAwareDirectory
+ Directory delegate = ((FilterDirectory) result).getDelegate();
+ assertTrue("Delegate should be TieredSubdirectoryAwareDirectory", delegate instanceof TieredSubdirectoryAwareDirectory);
+
+ // The delegate of TieredSubdirectoryAwareDirectory should be SubdirectoryAwareDirectory
+ Directory innerDelegate = ((FilterDirectory) delegate).getDelegate();
+ assertTrue("Inner delegate should be SubdirectoryAwareDirectory", innerDelegate instanceof SubdirectoryAwareDirectory);
+
+ result.close();
+ }
+
+ /**
+ * Tests that SubdirectoryAwareDirectory appears only once in the directory chain.
+ * The factory should NOT double-wrap with SubdirectoryAwareDirectory.
+ */
+ public void testNoDoubleSubdirectoryAwareDirectoryWrapping() throws IOException {
+ DataFormatAwareStoreDirectory result = factory.newDataFormatAwareStoreDirectory(
+ indexSettings,
+ shardId,
+ shardPath,
+ localDirectoryFactory,
+ dataFormatRegistry,
+ remoteDirectory,
+ fileCache,
+ threadPool
+ );
+
+ int subdirAwareCount = 0;
+ Directory current = result;
+ while (current instanceof FilterDirectory) {
+ if (current instanceof SubdirectoryAwareDirectory) {
+ subdirAwareCount++;
+ }
+ current = ((FilterDirectory) current).getDelegate();
+ }
+
+ assertEquals("SubdirectoryAwareDirectory should appear exactly once in the chain", 1, subdirAwareCount);
+
+ result.close();
+ }
+
+ /**
+ * Tests that when DataFormatRegistry returns empty tiered directories,
+ * the factory still creates a valid directory stack with no format directories.
+ */
+ public void testEmptyFormatDirectoriesWhenNoPluginProvides() throws IOException {
+ when(dataFormatRegistry.getFormatDescriptors(any())).thenReturn(Map.of());
+
+ DataFormatAwareStoreDirectory result = factory.newDataFormatAwareStoreDirectory(
+ indexSettings,
+ shardId,
+ shardPath,
+ localDirectoryFactory,
+ dataFormatRegistry,
+ remoteDirectory,
+ fileCache,
+ threadPool
+ );
+
+ assertNotNull("Factory should return a non-null directory even with no format plugins", result);
+
+ // Verify the stack is still correct
+ Directory delegate = ((FilterDirectory) result).getDelegate();
+ assertTrue("Delegate should still be TieredSubdirectoryAwareDirectory", delegate instanceof TieredSubdirectoryAwareDirectory);
+
+ result.close();
+ }
+
+ /**
+ * Tests that calling the 5-param (hot path) method throws UnsupportedOperationException.
+ */
+ public void testHotPathThrowsUnsupportedOperation() {
+ UnsupportedOperationException exception = expectThrows(
+ UnsupportedOperationException.class,
+ () -> factory.newDataFormatAwareStoreDirectory(indexSettings, shardId, shardPath, localDirectoryFactory, dataFormatRegistry)
+ );
+
+ assertTrue("Exception message should mention warm parameters", exception.getMessage().contains("warm"));
+ }
+}
diff --git a/server/src/test/java/org/opensearch/storage/directory/TieredSubdirectoryAwareDirectoryTests.java b/server/src/test/java/org/opensearch/storage/directory/TieredSubdirectoryAwareDirectoryTests.java
new file mode 100644
index 0000000000000..4eac4f618ae34
--- /dev/null
+++ b/server/src/test/java/org/opensearch/storage/directory/TieredSubdirectoryAwareDirectoryTests.java
@@ -0,0 +1,637 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.storage.directory;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.FilterDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.opensearch.core.index.Index;
+import org.opensearch.core.index.shard.ShardId;
+import org.opensearch.index.shard.ShardPath;
+import org.opensearch.index.store.RemoteSyncAwareDirectory;
+import org.opensearch.index.store.SubdirectoryAwareDirectory;
+import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
+import org.opensearch.index.store.remote.filecache.FileCache;
+import org.opensearch.index.store.remote.filecache.FileCacheFactory;
+import org.opensearch.storage.prefetch.TieredStoragePrefetchSettings;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import static org.opensearch.storage.utils.DirectoryUtils.getFilePathSwitchable;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Functional tests for {@link TieredSubdirectoryAwareDirectory} exercising real I/O
+ * through the full directory stack (FSDirectory → SubdirectoryAwareDirectory → TieredDirectory).
+ *
+ * Format directories use a non-closing FilterDirectory wrapper around the shared
+ * SubdirectoryAwareDirectory to avoid double-close issues. Parquet files are written
+ * directly to disk via {@link Files#write} to simulate the Rust writer path.
+ */
+@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
+public class TieredSubdirectoryAwareDirectoryTests extends TieredStorageBaseTestCase {
+
+ private FileCache fileCache;
+ private ShardPath shardPath;
+ private FSDirectory localFsDir;
+ private SubdirectoryAwareDirectory subdirAware;
+ private TieredSubdirectoryAwareDirectory directory;
+
+ private static final byte[] TEST_DATA = "hello-tiered".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] PARQUET_DATA = "parquet-payload".getBytes(StandardCharsets.UTF_8);
+
+ @Before
+ public void setup() throws IOException {
+ setupRemoteSegmentStoreDirectory();
+ populateMetadata();
+ remoteSegmentStoreDirectory.init();
+
+ Path tempDir = createTempDir();
+ Index index = new Index("test-index", "test-uuid");
+ ShardId shardId = new ShardId(index, 0);
+ Path shardDataPath = tempDir.resolve("data").resolve("test-uuid").resolve("0");
+ Path shardStatePath = tempDir.resolve("state").resolve("test-uuid").resolve("0");
+ Files.createDirectories(shardDataPath.resolve("index"));
+ Files.createDirectories(shardStatePath);
+ shardPath = new ShardPath(false, shardDataPath, shardStatePath, shardId);
+
+ localFsDir = FSDirectory.open(shardPath.resolveIndex());
+ subdirAware = new SubdirectoryAwareDirectory(localFsDir, shardPath);
+ fileCache = FileCacheFactory.createConcurrentLRUFileCache(FILE_CACHE_CAPACITY, 1);
+ }
+
+ private Supplier getMockPrefetchSettingsSupplier() {
+ return () -> {
+ TieredStoragePrefetchSettings settings = mock(TieredStoragePrefetchSettings.class);
+ when(settings.getReadAheadBlockCount()).thenReturn(TieredStoragePrefetchSettings.DEFAULT_READ_AHEAD_BLOCK_COUNT);
+ when(settings.getReadAheadEnableFileFormats()).thenReturn(TieredStoragePrefetchSettings.READ_AHEAD_ENABLE_FILE_FORMATS);
+ when(settings.isStoredFieldsPrefetchEnabled()).thenReturn(true);
+ return settings;
+ };
+ }
+
+ /**
+ * Creates a non-closing FilterDirectory wrapper around the given SubdirectoryAwareDirectory.
+ * This prevents double-close when the format directory and TieredDirectory share the same
+ * underlying SubdirectoryAwareDirectory.
+ */
+ private Directory createNonClosingFormatDirectory(SubdirectoryAwareDirectory delegate) {
+ return new FilterDirectory(delegate) {
+ @Override
+ public void close() {
+ // Don't close — shared with TieredDirectory
+ }
+ };
+ }
+
+ /**
+ * Builds a TieredSubdirectoryAwareDirectory with no format directories (Lucene-only).
+ */
+ private TieredSubdirectoryAwareDirectory buildDirectoryNoFormats() {
+ return new TieredSubdirectoryAwareDirectory(
+ subdirAware,
+ remoteSegmentStoreDirectory,
+ fileCache,
+ threadPool,
+ new HashMap<>(),
+ getMockPrefetchSettingsSupplier()
+ );
+ }
+
+ /**
+ * Builds a TieredSubdirectoryAwareDirectory with a "parquet" format directory backed by
+ * a non-closing wrapper around the shared SubdirectoryAwareDirectory.
+ */
+ private TieredSubdirectoryAwareDirectory buildDirectoryWithParquetFormat() {
+ Directory formatDir = createNonClosingFormatDirectory(subdirAware);
+ Map formatDirs = new HashMap<>();
+ formatDirs.put("parquet", formatDir);
+ return new TieredSubdirectoryAwareDirectory(
+ subdirAware,
+ remoteSegmentStoreDirectory,
+ fileCache,
+ threadPool,
+ formatDirs,
+ getMockPrefetchSettingsSupplier()
+ );
+ }
+
+ /**
+ * Writes a parquet file directly to disk (simulating the Rust writer), not via createOutput.
+ */
+ private void writeParquetFileToDisk(String relativePath) throws IOException {
+ Path fullPath = shardPath.getDataPath().resolve(relativePath);
+ Files.createDirectories(fullPath.getParent());
+ Files.write(fullPath, PARQUET_DATA);
+ }
+
+ // ═══════════════════════════════════════════════════════════════
+ // Routing tests — openInput
+ // ═══════════════════════════════════════════════════════════════
+
+ /**
+ * Write a Lucene file via createOutput on TieredSubdirectoryAwareDirectory (no format dir
+ * for lucene), read it back via openInput — should go through TieredDirectory → FileCache.
+ */
+ public void testOpenInputLuceneFileRoutesToTieredDirectory() throws IOException {
+ directory = buildDirectoryWithParquetFormat();
+ populateData();
+ try {
+ String luceneFile = "_0_test.cfe";
+ try (IndexOutput out = directory.createOutput(luceneFile, IOContext.DEFAULT)) {
+ out.writeBytes(TEST_DATA, TEST_DATA.length);
+ }
+
+ // The file should be cached in FileCache via TieredDirectory
+ Path switchablePath = getFilePathSwitchable(localFsDir, luceneFile);
+ assertNotNull("Lucene file should be in FileCache after createOutput", fileCache.get(switchablePath));
+ fileCache.decRef(switchablePath);
+
+ try (IndexInput in = directory.openInput(luceneFile, IOContext.DEFAULT)) {
+ assertNotNull("openInput should return non-null for Lucene file", in);
+ byte[] buf = new byte[TEST_DATA.length];
+ in.readBytes(buf, 0, buf.length);
+ assertArrayEquals("Data read back should match data written", TEST_DATA, buf);
+ }
+ } finally {
+ directory.close();
+ }
+ }
+
+ /**
+ * Create a SubdirectoryAwareDirectory as format directory for "parquet", write a file to
+ * parquet/ subdir on disk, read via openInput("parquet/seg.parquet") — should route to
+ * format directory.
+ */
+ public void testOpenInputFormatFileRoutesToFormatDirectory() throws IOException {
+ directory = buildDirectoryWithParquetFormat();
+ try {
+ writeParquetFileToDisk("parquet/seg.parquet");
+
+ try (IndexInput in = directory.openInput("parquet/seg.parquet", IOContext.DEFAULT)) {
+ assertNotNull("openInput should return non-null for parquet file", in);
+ byte[] buf = new byte[PARQUET_DATA.length];
+ in.readBytes(buf, 0, buf.length);
+ assertArrayEquals("Parquet data should match what was written to disk", PARQUET_DATA, buf);
+ }
+ } finally {
+ directory.close();
+ }
+ }
+
+ // ═══════════════════════════════════════════════════════════════
+ // Routing tests — fileLength
+ // ═══════════════════════════════════════════════════════════════
+
+ /**
+ * Write a Lucene file, check fileLength routes to TieredDirectory.
+ */
+ public void testFileLengthLuceneFile() throws IOException {
+ directory = buildDirectoryWithParquetFormat();
+ populateData();
+ try {
+ String luceneFile = "_0_len.cfe";
+ try (IndexOutput out = directory.createOutput(luceneFile, IOContext.DEFAULT)) {
+ out.writeBytes(TEST_DATA, TEST_DATA.length);
+ }
+
+ long length = directory.fileLength(luceneFile);
+ assertEquals("fileLength should match written data length", TEST_DATA.length, length);
+ } finally {
+ directory.close();
+ }
+ }
+
+ /**
+ * Write parquet file on disk, check fileLength routes to format directory.
+ */
+ public void testFileLengthFormatFile() throws IOException {
+ directory = buildDirectoryWithParquetFormat();
+ try {
+ writeParquetFileToDisk("parquet/seg_len.parquet");
+
+ long length = directory.fileLength("parquet/seg_len.parquet");
+ assertEquals("fileLength should match parquet data length", PARQUET_DATA.length, length);
+ } finally {
+ directory.close();
+ }
+ }
+
+ // ═══════════════════════════════════════════════════════════════
+ // listAll tests
+ // ═══════════════════════════════════════════════════════════════
+
+ /**
+ * Write Lucene files via createOutput + write parquet files on disk, listAll should include both.
+ */
+ public void testListAllMergesLuceneAndFormatFiles() throws IOException {
+ directory = buildDirectoryWithParquetFormat();
+ populateData();
+ try {
+ try (IndexOutput out = directory.createOutput("_0_list.cfe", IOContext.DEFAULT)) {
+ out.writeBytes(TEST_DATA, TEST_DATA.length);
+ }
+ writeParquetFileToDisk("parquet/seg_list.parquet");
+
+ String[] files = directory.listAll();
+ Set fileSet = new HashSet<>(Arrays.asList(files));
+ assertTrue("listAll should contain Lucene file", fileSet.contains("_0_list.cfe"));
+ assertTrue("listAll should contain parquet file", fileSet.contains("parquet/seg_list.parquet"));
+ } finally {
+ directory.close();
+ }
+ }
+
+ /**
+ * No format dirs, listAll returns only TieredDirectory files.
+ */
+ public void testListAllWithEmptyFormatDirectories() throws IOException {
+ directory = buildDirectoryNoFormats();
+ populateData();
+ try {
+ try (IndexOutput out = directory.createOutput("_0_only.cfe", IOContext.DEFAULT)) {
+ out.writeBytes(TEST_DATA, TEST_DATA.length);
+ }
+
+ String[] files = directory.listAll();
+ Set fileSet = new HashSet<>(Arrays.asList(files));
+ assertTrue("listAll should contain Lucene file", fileSet.contains("_0_only.cfe"));
+
+ // Verify no parquet files appear
+ for (String f : files) {
+ assertFalse("No parquet files should appear without format dirs", f.startsWith("parquet/"));
+ }
+ } finally {
+ directory.close();
+ }
+ }
+
+ /**
+ * Same file visible from both TieredDirectory and format directory, should appear once.
+ */
+ public void testListAllDeduplicates() throws IOException {
+ // Build a directory where the format directory wraps the same SubdirectoryAwareDirectory
+ // Write a Lucene file that will appear in both TieredDirectory.listAll() and the
+ // format directory's listAll() (since they share the same underlying FS)
+ directory = buildDirectoryWithParquetFormat();
+ populateData();
+ try {
+ writeParquetFileToDisk("parquet/dup.parquet");
+
+ String[] files = directory.listAll();
+ // Count occurrences of the parquet file
+ long count = Arrays.stream(files).filter(f -> f.equals("parquet/dup.parquet")).count();
+ assertEquals("Deduplicated file should appear exactly once", 1, count);
+
+ // Verify sorted order
+ for (int i = 1; i < files.length; i++) {
+ assertTrue("listAll should return sorted results", files[i - 1].compareTo(files[i]) <= 0);
+ }
+ } finally {
+ directory.close();
+ }
+ }
+
+ // ═══════════════════════════════════════════════════════════════
+ // deleteFile tests
+ // ═══════════════════════════════════════════════════════════════
+
+ /**
+ * Write Lucene file, delete it, verify gone.
+ */
+ public void testDeleteFileLuceneRoutesToTieredDirectory() throws IOException {
+ directory = buildDirectoryWithParquetFormat();
+ populateData();
+ try {
+ String luceneFile = "_0_del.cfe";
+ try (IndexOutput out = directory.createOutput(luceneFile, IOContext.DEFAULT)) {
+ out.writeBytes(TEST_DATA, TEST_DATA.length);
+ }
+
+ Set beforeDelete = new HashSet<>(Arrays.asList(directory.listAll()));
+ assertTrue("File should exist before delete", beforeDelete.contains(luceneFile));
+
+ directory.deleteFile(luceneFile);
+
+ Set afterDelete = new HashSet<>(Arrays.asList(directory.listAll()));
+ assertFalse("File should be gone after delete", afterDelete.contains(luceneFile));
+ } finally {
+ directory.close();
+ }
+ }
+
+ /**
+ * Write parquet file, delete via format directory path, verify gone.
+ */
+ public void testDeleteFileFormatRoutesToFormatDirectory() throws IOException {
+ directory = buildDirectoryWithParquetFormat();
+ try {
+ writeParquetFileToDisk("parquet/seg_del.parquet");
+
+ Set beforeDelete = new HashSet<>(Arrays.asList(directory.listAll()));
+ assertTrue("Parquet file should exist before delete", beforeDelete.contains("parquet/seg_del.parquet"));
+
+ directory.deleteFile("parquet/seg_del.parquet");
+
+ Set afterDelete = new HashSet<>(Arrays.asList(directory.listAll()));
+ assertFalse("Parquet file should be gone after delete", afterDelete.contains("parquet/seg_del.parquet"));
+ } finally {
+ directory.close();
+ }
+ }
+
+ // ═══════════════════════════════════════════════════════════════
+ // afterSyncToRemote tests
+ // ═══════════════════════════════════════════════════════════════
+
+ /**
+ * Write Lucene file via createOutput (adds to FileCache), call afterSyncToRemote —
+ * should call TieredDirectory.afterSyncToRemote (unpin + switch).
+ */
+ public void testAfterSyncToRemoteLuceneFile() throws IOException {
+ directory = buildDirectoryWithParquetFormat();
+ populateData();
+ try {
+ String luceneFile = "_0_sync.cfe";
+ try (IndexOutput out = directory.createOutput(luceneFile, IOContext.DEFAULT)) {
+ out.writeBytes(TEST_DATA, TEST_DATA.length);
+ }
+
+ // File should be in FileCache after createOutput
+ Path switchablePath = getFilePathSwitchable(localFsDir, luceneFile);
+ assertNotNull("File should be in FileCache before afterSyncToRemote", fileCache.get(switchablePath));
+ fileCache.decRef(switchablePath);
+
+ // afterSyncToRemote should unpin and switch
+ directory.afterSyncToRemote(luceneFile);
+
+ // After sync, the switchable ref count should have been decremented
+ // (the file may still be in cache but unpinned)
+ Integer refCount = fileCache.getRef(switchablePath);
+ assertTrue("Ref count should be 0 or null after afterSyncToRemote", refCount == null || refCount == 0);
+ } finally {
+ directory.close();
+ }
+ }
+
+ /**
+ * Format directory implements RemoteSyncAwareDirectory, afterSyncToRemote should call it.
+ */
+ public void testAfterSyncToRemoteFormatFileWithRemoteSyncAware() {
+ RemoteSyncAwareFormatDirectory syncAwareDir = mock(RemoteSyncAwareFormatDirectory.class);
+ Map syncAwareFormats = new HashMap<>();
+ syncAwareFormats.put("parquet", syncAwareDir);
+
+ TieredSubdirectoryAwareDirectory syncDir = new TieredSubdirectoryAwareDirectory(
+ subdirAware,
+ remoteSegmentStoreDirectory,
+ fileCache,
+ threadPool,
+ syncAwareFormats,
+ getMockPrefetchSettingsSupplier()
+ );
+
+ String parquetFile = "parquet/seg_sync.parquet";
+ syncDir.afterSyncToRemote(parquetFile);
+ verify(syncAwareDir).afterSyncToRemote(parquetFile);
+ }
+
+ /**
+ * Format directory does NOT implement RemoteSyncAwareDirectory, afterSyncToRemote should
+ * be a no-op for the format file (not fall through to TieredDirectory).
+ */
+ public void testAfterSyncToRemoteFormatFileWithoutRemoteSyncAware() throws IOException {
+ directory = buildDirectoryWithParquetFormat();
+ try {
+ // The non-closing format directory does NOT implement RemoteSyncAwareDirectory.
+ // With the fix, afterSyncToRemote is a no-op for format files whose directory
+ // doesn't support sync — it does NOT fall through to tieredDirectory.
+ String parquetFile = "parquet/seg_nosync.parquet";
+ writeParquetFileToDisk(parquetFile);
+
+ // Should complete without error — no-op for non-RemoteSyncAwareDirectory format dirs
+ directory.afterSyncToRemote(parquetFile);
+ } finally {
+ directory.close();
+ }
+ }
+
+ // ═══════════════════════════════════════════════════════════════
+ // createOutput tests
+ // ═══════════════════════════════════════════════════════════════
+
+ /**
+ * createOutput for Lucene file goes through TieredDirectory, file is in FileCache after close.
+ */
+ public void testCreateOutputLuceneFile() throws IOException {
+ directory = buildDirectoryWithParquetFormat();
+ populateData();
+ try {
+ String luceneFile = "_0_create.cfe";
+ try (IndexOutput out = directory.createOutput(luceneFile, IOContext.DEFAULT)) {
+ out.writeBytes(TEST_DATA, TEST_DATA.length);
+ }
+
+ // Verify file is in FileCache
+ Path switchablePath = getFilePathSwitchable(localFsDir, luceneFile);
+ assertNotNull("Lucene file should be cached in FileCache after createOutput", fileCache.get(switchablePath));
+ fileCache.decRef(switchablePath);
+
+ // Verify file exists on disk
+ assertTrue("Lucene file should exist on local disk", Arrays.asList(localFsDir.listAll()).contains(luceneFile));
+ } finally {
+ directory.close();
+ }
+ }
+
+ /**
+ * createOutput for format file goes through format directory.
+ */
+ public void testCreateOutputFormatFile() throws IOException {
+ directory = buildDirectoryWithParquetFormat();
+ try {
+ String parquetFile = "parquet/seg_create.parquet";
+ // Ensure the parquet subdirectory exists
+ Files.createDirectories(shardPath.getDataPath().resolve("parquet"));
+
+ try (IndexOutput out = directory.createOutput(parquetFile, IOContext.DEFAULT)) {
+ out.writeBytes(PARQUET_DATA, PARQUET_DATA.length);
+ }
+
+ // Verify file exists on disk
+ Path fullPath = shardPath.getDataPath().resolve(parquetFile);
+ assertTrue("Parquet file should exist on disk after createOutput", Files.exists(fullPath));
+ } finally {
+ directory.close();
+ }
+ }
+
+ // ═══════════════════════════════════════════════════════════════
+ // Edge case tests
+ // ═══════════════════════════════════════════════════════════════
+
+ /**
+ * openInput on file that doesn't exist anywhere → NoSuchFileException.
+ */
+ public void testOpenInputNonExistentFile() throws IOException {
+ directory = buildDirectoryWithParquetFormat();
+ populateData();
+ try {
+ expectThrows(NoSuchFileException.class, () -> directory.openInput("non_existent_file.cfe", IOContext.DEFAULT));
+ } finally {
+ directory.close();
+ }
+ }
+
+ /**
+ * fileLength on non-existent file → exception.
+ */
+ public void testFileLengthNonExistentFile() throws IOException {
+ directory = buildDirectoryWithParquetFormat();
+ populateData();
+ try {
+ expectThrows(Exception.class, () -> directory.fileLength("non_existent_file.cfe"));
+ } finally {
+ directory.close();
+ }
+ }
+
+ /**
+ * close() closes all format directories and TieredDirectory.
+ */
+ public void testCloseClosesFormatDirectoriesAndTieredDirectory() throws IOException {
+ Directory mockFormatDir = mock(Directory.class);
+ when(mockFormatDir.listAll()).thenReturn(new String[0]);
+
+ Map formatDirs = new HashMap<>();
+ formatDirs.put("parquet", mockFormatDir);
+
+ TieredSubdirectoryAwareDirectory dir = new TieredSubdirectoryAwareDirectory(
+ subdirAware,
+ remoteSegmentStoreDirectory,
+ fileCache,
+ threadPool,
+ formatDirs,
+ getMockPrefetchSettingsSupplier()
+ );
+
+ dir.close();
+ verify(mockFormatDir).close();
+
+ // TieredDirectory is also closed — attempting to use it should fail
+ // We can't easily verify TieredDirectory.close() was called without mocking,
+ // but the fact that close() completes without error is sufficient.
+ }
+
+ /**
+ * Format directory wraps same SubdirectoryAwareDirectory — close format dir (no-op close)
+ * then close TieredDirectory — no AlreadyClosedException.
+ */
+ public void testCloseDoesNotDoubleCloseSharedSubdirectoryAwareDirectory() throws IOException {
+ directory = buildDirectoryWithParquetFormat();
+ populateData();
+ try {
+ // Write a file to ensure the directory is in a valid state
+ try (IndexOutput out = directory.createOutput("_0_noclose.cfe", IOContext.DEFAULT)) {
+ out.writeBytes(TEST_DATA, TEST_DATA.length);
+ }
+ } finally {
+ // close() should not throw AlreadyClosedException because the format directory
+ // uses a non-closing wrapper — only TieredDirectory closes the underlying FS
+ directory.close();
+ }
+
+ // If we get here without AlreadyClosedException, the test passes.
+ // The non-closing FilterDirectory wrapper prevents double-close of the shared
+ // SubdirectoryAwareDirectory.
+ }
+
+ // ═══════════════════════════════════════════════════════════════
+ // IOUtils.close tests — partial close safety
+ // ═══════════════════════════════════════════════════════════════
+
+ /**
+ * If one format directory throws on close, the other format directories and
+ * tieredDirectory are still closed (IOUtils.close collects exceptions).
+ */
+ public void testCloseWithThrowingFormatDirectoryStillClosesOthers() throws IOException {
+ Directory throwingDir = mock(Directory.class);
+ when(throwingDir.listAll()).thenReturn(new String[0]);
+ org.mockito.Mockito.doThrow(new IOException("format close failed")).when(throwingDir).close();
+
+ Directory goodDir = mock(Directory.class);
+ when(goodDir.listAll()).thenReturn(new String[0]);
+
+ Map formatDirs = new HashMap<>();
+ formatDirs.put("bad-format", throwingDir);
+ formatDirs.put("good-format", goodDir);
+
+ TieredSubdirectoryAwareDirectory dir = new TieredSubdirectoryAwareDirectory(
+ subdirAware,
+ remoteSegmentStoreDirectory,
+ fileCache,
+ threadPool,
+ formatDirs,
+ getMockPrefetchSettingsSupplier()
+ );
+
+ IOException ex = expectThrows(IOException.class, dir::close);
+ assertEquals("format close failed", ex.getMessage());
+ // Good format directory should still have been closed despite the other throwing
+ verify(goodDir).close();
+ }
+
+ /**
+ * afterSyncToRemote for a format file whose directory is non-null but not RemoteSyncAwareDirectory
+ * should be a no-op — must NOT fall through to tieredDirectory.
+ */
+ public void testAfterSyncToRemoteFormatFileNoopWhenNotRemoteSyncAware() throws IOException {
+ directory = buildDirectoryWithParquetFormat();
+ try {
+ // Write a parquet file directly to disk (not via createOutput, so not in FileCache)
+ String parquetFile = "parquet/seg_noop.parquet";
+ writeParquetFileToDisk(parquetFile);
+
+ // With the fix, this is a no-op because the format directory exists but doesn't
+ // implement RemoteSyncAwareDirectory. Previously this would fall through to
+ // tieredDirectory.afterSyncToRemote and NPE on fileCache.decRef for uncached file.
+ directory.afterSyncToRemote(parquetFile);
+ // If we get here without NPE, the no-op path is working correctly.
+ } finally {
+ directory.close();
+ }
+ }
+
+ // ═══════════════════════════════════════════════════════════════
+ // Helper: interface combining Directory + RemoteSyncAwareDirectory for mocking
+ // ═══════════════════════════════════════════════════════════════
+
+ /**
+ * Helper interface for creating mocks that implement both Directory and RemoteSyncAwareDirectory.
+ */
+ abstract static class RemoteSyncAwareFormatDirectory extends Directory implements RemoteSyncAwareDirectory {}
+}
diff --git a/server/src/test/java/org/opensearch/storage/directory/WarmShardDirectoryStackTests.java b/server/src/test/java/org/opensearch/storage/directory/WarmShardDirectoryStackTests.java
new file mode 100644
index 0000000000000..52228e59d4f5c
--- /dev/null
+++ b/server/src/test/java/org/opensearch/storage/directory/WarmShardDirectoryStackTests.java
@@ -0,0 +1,206 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.storage.directory;
+
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.FilterDirectory;
+import org.opensearch.Version;
+import org.opensearch.cluster.metadata.IndexMetadata;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.common.util.FeatureFlags;
+import org.opensearch.core.index.Index;
+import org.opensearch.core.index.shard.ShardId;
+import org.opensearch.index.IndexSettings;
+import org.opensearch.index.engine.dataformat.DataFormatRegistry;
+import org.opensearch.index.shard.ShardPath;
+import org.opensearch.index.store.DataFormatAwareStoreDirectory;
+import org.opensearch.index.store.RemoteDirectory;
+import org.opensearch.index.store.RemoteSegmentStoreDirectory;
+import org.opensearch.index.store.SubdirectoryAwareDirectory;
+import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
+import org.opensearch.index.store.remote.filecache.FileCache;
+import org.opensearch.index.store.remote.filecache.FileCacheFactory;
+import org.opensearch.plugins.IndexStorePlugin;
+import org.opensearch.storage.prefetch.TieredStoragePrefetchSettings;
+import org.opensearch.test.OpenSearchTestCase;
+import org.opensearch.threadpool.ThreadPool;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Integration-level tests for the warm shard directory stack.
+ *
+ * These tests verify that the full directory stack (FSDirectory → SubdirectoryAwareDirectory
+ * → TieredSubdirectoryAwareDirectory → DataFormatAwareStoreDirectory) is wired correctly
+ * and that file operations flow through the correct layers.
+ */
+public class WarmShardDirectoryStackTests extends OpenSearchTestCase {
+
+ private Path tempDir;
+ private ShardPath shardPath;
+ private IndexSettings indexSettings;
+ private FileCache fileCache;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ tempDir = createTempDir();
+ Index index = new Index("test-warm-index", "test-uuid");
+ ShardId shardId = new ShardId(index, 0);
+
+ // ShardPath requires: dataPath ends with /
+ Path shardStatePath = tempDir.resolve("state").resolve("test-uuid").resolve("0");
+ Path shardDataPath = tempDir.resolve("data").resolve("test-uuid").resolve("0");
+ Path indexPath = shardDataPath.resolve("index");
+ Files.createDirectories(shardStatePath);
+ Files.createDirectories(shardDataPath);
+ Files.createDirectories(indexPath);
+
+ shardPath = new ShardPath(false, shardDataPath, shardStatePath, shardId);
+
+ Settings settings = Settings.builder()
+ .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
+ .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
+ .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
+ .build();
+ IndexMetadata indexMetadata = IndexMetadata.builder("test-warm-index").settings(settings).build();
+ indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY);
+
+ fileCache = FileCacheFactory.createConcurrentLRUFileCache(10_000_000, 1);
+ }
+
+ private Supplier getMockPrefetchSettingsSupplier() {
+ return () -> {
+ TieredStoragePrefetchSettings settings = mock(TieredStoragePrefetchSettings.class);
+ when(settings.getReadAheadBlockCount()).thenReturn(TieredStoragePrefetchSettings.DEFAULT_READ_AHEAD_BLOCK_COUNT);
+ when(settings.getReadAheadEnableFileFormats()).thenReturn(TieredStoragePrefetchSettings.READ_AHEAD_ENABLE_FILE_FORMATS);
+ when(settings.isStoredFieldsPrefetchEnabled()).thenReturn(true);
+ return settings;
+ };
+ }
+
+ /**
+ * Tests that the full warm directory stack is created correctly via the factory
+ * and that basic write operations flow through all layers.
+ *
+ * Verifies: FSDirectory → SubdirectoryAwareDirectory → TieredSubdirectoryAwareDirectory
+ * → DataFormatAwareStoreDirectory, and that a file written through the stack is
+ * visible via listAll.
+ */
+ @LockFeatureFlag(FeatureFlags.WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG)
+ public void testWarmDirectoryStackCreationAndWrite() throws IOException {
+ TieredDataFormatAwareStoreDirectoryFactory factory = new TieredDataFormatAwareStoreDirectoryFactory(
+ getMockPrefetchSettingsSupplier()
+ );
+
+ FSDirectory fsDir = FSDirectory.open(shardPath.resolveIndex());
+ IndexStorePlugin.DirectoryFactory localDirFactory = mock(IndexStorePlugin.DirectoryFactory.class);
+ when(localDirFactory.newDirectory(any(), any())).thenReturn(fsDir);
+
+ DataFormatRegistry registry = mock(DataFormatRegistry.class);
+ when(registry.getFormatDescriptors(any())).thenReturn(new HashMap<>());
+
+ RemoteSegmentStoreDirectory remoteDir = createRealRemoteDir(shardPath.getShardId());
+
+ DataFormatAwareStoreDirectory storeDir = factory.newDataFormatAwareStoreDirectory(
+ indexSettings,
+ shardPath.getShardId(),
+ shardPath,
+ localDirFactory,
+ registry,
+ remoteDir,
+ fileCache,
+ null // threadPool
+ );
+
+ assertNotNull("Directory stack should be created", storeDir);
+
+ // Verify the stack structure
+ Directory delegate = ((FilterDirectory) storeDir).getDelegate();
+ assertTrue("Should have TieredSubdirectoryAwareDirectory", delegate instanceof TieredSubdirectoryAwareDirectory);
+
+ Directory innerDelegate = ((FilterDirectory) delegate).getDelegate();
+ assertTrue("Should have SubdirectoryAwareDirectory", innerDelegate instanceof SubdirectoryAwareDirectory);
+
+ storeDir.close();
+ }
+
+ /**
+ * Tests that a format directory registered in the stack receives file operations
+ * for files with the matching format prefix.
+ */
+ @LockFeatureFlag(FeatureFlags.WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG)
+ public void testWarmDirectoryStackWithFormatDirectory() throws IOException {
+ // Create a parquet file on disk in the shard's parquet subdirectory
+ Path parquetPath = shardPath.getDataPath().resolve("parquet");
+ Files.createDirectories(parquetPath);
+ // Write the file directly to disk (simulating Rust writer)
+ Files.write(parquetPath.resolve("seg.parquet"), new byte[] { 1, 2, 3, 4 });
+
+ // Build the directory stack with SubdirectoryAwareDirectory as the format directory
+ // (same as production — ParquetTieredDirectory wraps SubdirectoryAwareDirectory)
+ FSDirectory localFsDir = FSDirectory.open(shardPath.resolveIndex());
+ SubdirectoryAwareDirectory subdirAware = new SubdirectoryAwareDirectory(localFsDir, shardPath);
+
+ // Use the same SubdirectoryAwareDirectory as the format directory
+ // (in production, ParquetTieredDirectory wraps it via FilterDirectory)
+ // Use a non-closing wrapper so the shared SubdirectoryAwareDirectory isn't double-closed
+ // (in production, ParquetTieredDirectory.close() only cleans FFM resources, not super.close())
+ Map formatDirs = new HashMap<>();
+ formatDirs.put("parquet", new FilterDirectory(subdirAware) {
+ @Override
+ public void close() {
+ // Don't close — SubdirectoryAwareDirectory is shared with TieredDirectory
+ }
+ });
+
+ RemoteSegmentStoreDirectory remoteDir = createRealRemoteDir(shardPath.getShardId());
+
+ TieredSubdirectoryAwareDirectory tieredSubdir = new TieredSubdirectoryAwareDirectory(
+ subdirAware,
+ remoteDir,
+ fileCache,
+ null,
+ formatDirs,
+ getMockPrefetchSettingsSupplier()
+ );
+
+ // Verify the parquet file is accessible through the stack
+ long length = tieredSubdir.fileLength("parquet/seg.parquet");
+ assertEquals("Parquet file should be 4 bytes", 4L, length);
+
+ // Verify listAll includes parquet files
+ String[] allFiles = tieredSubdir.listAll();
+ Set fileSet = new HashSet<>(Arrays.asList(allFiles));
+ assertTrue("listAll should include parquet file", fileSet.contains("parquet/seg.parquet"));
+
+ tieredSubdir.close();
+ }
+
+ private RemoteSegmentStoreDirectory createRealRemoteDir(ShardId shardId) throws IOException {
+ RemoteDirectory remoteDataDir = mock(RemoteDirectory.class);
+ RemoteDirectory remoteMetadataDir = mock(RemoteDirectory.class);
+ RemoteStoreLockManager lockManager = mock(RemoteStoreLockManager.class);
+ ThreadPool tp = mock(ThreadPool.class);
+ return new RemoteSegmentStoreDirectory(remoteDataDir, remoteMetadataDir, lockManager, tp, shardId, new HashMap<>());
+ }
+}
diff --git a/server/src/test/java/org/opensearch/storage/utils/DirectoryUtilsTests.java b/server/src/test/java/org/opensearch/storage/utils/DirectoryUtilsTests.java
new file mode 100644
index 0000000000000..b6ca9b34b9e62
--- /dev/null
+++ b/server/src/test/java/org/opensearch/storage/utils/DirectoryUtilsTests.java
@@ -0,0 +1,73 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.storage.utils;
+
+import org.apache.lucene.store.ByteBuffersDirectory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.FilterDirectory;
+import org.opensearch.test.OpenSearchTestCase;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+/**
+ * Tests for {@link DirectoryUtils}.
+ */
+public class DirectoryUtilsTests extends OpenSearchTestCase {
+
+ public void testGetFSDirectoryDirect() throws IOException {
+ Path tempDir = createTempDir();
+ FSDirectory fsDir = FSDirectory.open(tempDir);
+ assertSame("Should return the FSDirectory directly", fsDir, DirectoryUtils.getFSDirectory(fsDir));
+ fsDir.close();
+ }
+
+ public void testGetFSDirectoryWrappedInFilterDirectory() throws IOException {
+ Path tempDir = createTempDir();
+ FSDirectory fsDir = FSDirectory.open(tempDir);
+ FilterDirectory wrapped = new FilterDirectory(fsDir) {
+ };
+ assertSame("Should unwrap FilterDirectory to find FSDirectory", fsDir, DirectoryUtils.getFSDirectory(wrapped));
+ wrapped.close();
+ }
+
+ public void testGetFSDirectoryDeepChain() throws IOException {
+ Path tempDir = createTempDir();
+ FSDirectory fsDir = FSDirectory.open(tempDir);
+ FilterDirectory inner = new FilterDirectory(fsDir) {
+ };
+ FilterDirectory outer = new FilterDirectory(inner) {
+ };
+ assertSame("Should unwrap multiple FilterDirectory layers", fsDir, DirectoryUtils.getFSDirectory(outer));
+ outer.close();
+ }
+
+ public void testGetFSDirectoryThrowsWhenNoFSDirectory() throws IOException {
+ ByteBuffersDirectory ramDir = new ByteBuffersDirectory();
+ IllegalStateException ex = expectThrows(IllegalStateException.class, () -> DirectoryUtils.getFSDirectory(ramDir));
+ assertTrue("Should mention class name", ex.getMessage().contains("ByteBuffersDirectory"));
+ ramDir.close();
+ }
+
+ public void testGetFilePathResolves() throws IOException {
+ Path tempDir = createTempDir();
+ FSDirectory fsDir = FSDirectory.open(tempDir);
+ Path result = DirectoryUtils.getFilePath(fsDir, "test.txt");
+ assertEquals("Should resolve file path", tempDir.resolve("test.txt"), result);
+ fsDir.close();
+ }
+
+ public void testGetFilePathSwitchableResolves() throws IOException {
+ Path tempDir = createTempDir();
+ FSDirectory fsDir = FSDirectory.open(tempDir);
+ Path result = DirectoryUtils.getFilePathSwitchable(fsDir, "test.txt");
+ assertEquals("Should resolve switchable path", tempDir.resolve("test.txt" + DirectoryUtils.SWITCHABLE_PREFIX), result);
+ fsDir.close();
+ }
+}