Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.storage.directory.TieredDataFormatAwareStoreDirectoryFactory;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.Client;

Expand Down Expand Up @@ -1176,6 +1177,14 @@ private static DataFormatAwareStoreDirectoryFactory getDataFormatAwareStoreDirec
if (dataFormatAwareStoreDirectoryFactories.isEmpty()) {
return null;
}
if (indexSettings.isWarmIndex() && indexSettings.isPluggableDataFormatEnabled()) {
DataFormatAwareStoreDirectoryFactory tiered = dataFormatAwareStoreDirectoryFactories.get(
TieredDataFormatAwareStoreDirectoryFactory.FACTORY_KEY
);
if (tiered != null) {
return tiered;
}
}
return dataFormatAwareStoreDirectoryFactories.get("default");
}

Expand Down
22 changes: 18 additions & 4 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.DataFormatAwareStoreDirectory;
import org.opensearch.index.store.DataFormatAwareStoreDirectoryFactory;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.remote.filecache.FileCache;
Expand Down Expand Up @@ -773,9 +774,22 @@ protected void closeInternal() {
}

Directory directory = null;
if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_SETTING) &&
// TODO : Need to remove this check after support for hot indices is added in Composite Directory
this.indexSettings.isWarmIndex()) {
if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_SETTING)
&& this.indexSettings.isWarmIndex()
&& this.indexSettings.isPluggableDataFormatEnabled()
&& this.dataFormatAwareStoreDirectoryFactory != null) {
// Warm + format-aware: use warm-aware factory overload
directory = dataFormatAwareStoreDirectoryFactory.newDataFormatAwareStoreDirectory(
this.indexSettings,
shardId,
path,
directoryFactory,
dataFormatRegistry,
(RemoteSegmentStoreDirectory) remoteDirectory,
fileCache,
threadPool
);
} else if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_SETTING) && this.indexSettings.isWarmIndex()) {
directory = compositeDirectoryFactory.newDirectory(
this.indexSettings,
path,
Expand All @@ -784,7 +798,7 @@ protected void closeInternal() {
fileCache,
threadPool
);
} else if (!this.indexSettings.isPluggableDataFormatEnabled()) {
} else if (this.indexSettings.isPluggableDataFormatEnabled() == false) {
directory = directoryFactory.newDirectory(this.indexSettings, path);
} else {
// Will be enabled in case of formatAware indices.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,56 @@

/**
* Describes the static capabilities of a data format, including its default checksum
* strategy and format name. Provided by {@link DataFormatPlugin} implementations and
* consumed by DataFormatAwareStoreDirectory and DataFormatAwareRemoteDirectory.
* strategy, format name, and whether it requires native store for warm reads.
* Provided by {@link DataFormatPlugin} implementations and consumed by
* DataFormatAwareStoreDirectory, DataFormatAwareRemoteDirectory, and
* TieredDataFormatAwareStoreDirectoryFactory.
*
* <p>The checksum strategy here is the <em>default fallback</em> — a full-file scan.
* At runtime, the {@link IndexingExecutionEngine} may override this with a more
* efficient strategy (e.g., {@link org.opensearch.index.store.PrecomputedChecksumStrategy})
* via {@link org.opensearch.index.store.DataFormatAwareStoreDirectory#registerChecksumStrategy}.
*
* <p>All formats use the segment store ({@code RemoteSegmentStoreDirectory}) for uploads.
* Formats that return {@code true} from {@link #nativeStoreSupported()} get a
* {@code NativeStoreRepository}-backed TieredDirectory for warm reads. The native store
* is obtained from the repository that's already wired to the shard (via
* {@code Repository.getNativeStore()}) — the descriptor only signals the intent,
* it doesn't provide the store itself.
*
* @opensearch.experimental
*/
@ExperimentalApi
public class DataFormatDescriptor {

private final String formatName;
private final FormatChecksumStrategy checksumStrategy;
private final boolean nativeStoreSupported;

/**
* Creates a new DataFormatDescriptor.
* Creates a new DataFormatDescriptor without native store.
* The format uses the standard segment store for both uploads and warm reads.
*
* @param formatName the format name (e.g., "parquet")
* @param checksumStrategy the default checksum strategy for this format
*/
public DataFormatDescriptor(String formatName, FormatChecksumStrategy checksumStrategy) {
this(formatName, checksumStrategy, false);
}

/**
* Creates a new DataFormatDescriptor.
*
* @param formatName the format name (e.g., "parquet")
* @param checksumStrategy the default checksum strategy for this format
* @param nativeStoreSupported if {@code true}, warm reads for this format use the
* {@code NativeStoreRepository} from the shard's repository
* instead of the standard block-based remote reads
*/
public DataFormatDescriptor(String formatName, FormatChecksumStrategy checksumStrategy, boolean nativeStoreSupported) {
this.formatName = formatName;
this.checksumStrategy = checksumStrategy;
this.nativeStoreSupported = nativeStoreSupported;
}

/**
Expand All @@ -57,4 +82,22 @@ public String getFormatName() {
public FormatChecksumStrategy getChecksumStrategy() {
return checksumStrategy;
}

/**
* Returns whether this format needs native store for warm reads.
*
* <p>When {@code true}, the tiered directory factory obtains the
* {@code NativeStoreRepository} from the shard's repository (via
* {@code Repository.getNativeStore()}) and wires it into this format's
* TieredDirectory for Rust-native I/O optimized for sequential
* column-chunk reads (e.g., Parquet via native S3/GCS/Azure/FS).
*
* <p>When {@code false} (the default), warm reads go through the standard
* {@code RemoteSegmentStoreDirectory} with block-based on-demand fetching.
*
* @return true if native store is needed for warm reads
*/
public boolean nativeStoreSupported() {
return nativeStoreSupported;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@

package org.opensearch.index.engine.dataformat;

import org.apache.lucene.store.Directory;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.store.FormatChecksumStrategy;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;

import java.util.Map;

Expand Down Expand Up @@ -54,4 +56,5 @@ public interface DataFormatPlugin {
default Map<String, DataFormatDescriptor> getFormatDescriptors(IndexSettings indexSettings, DataFormatRegistry dataFormatRegistry) {
return Map.of();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.Directory;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.exec.EngineReaderManager;
import org.opensearch.index.store.FormatChecksumStrategy;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.plugins.PluginsService;
import org.opensearch.plugins.SearchBackEndPlugin;

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -139,6 +142,37 @@ public Set<DataFormat> getRegisteredFormats() {
return Set.copyOf(dataFormatPluginRegistry.keySet());
}

/**
* Asks each registered format plugin for a tiered directory and returns the non-null results.
* Called by TieredDataFormatAwareStoreDirectoryFactory at shard open time on warm nodes.
*
* @param localDirectory the subdirectory-aware local directory
* @param remoteDirectory the remote segment store directory
* @param indexSettings the index settings for this shard
* @return map of format name to tiered directory (only formats that provide one)
*/
public Map<DataFormat, Directory> getTieredDirectories(
Directory localDirectory,
RemoteSegmentStoreDirectory remoteDirectory,
IndexSettings indexSettings
) {
Map<DataFormat, Directory> result = new HashMap<>();
String dataformatName = indexSettings.getSettings().get(PLUGGABLE_DATAFORMAT_SETTING);
if (dataformatName != null) {
DataFormat format = dataFormats.get(dataformatName);
if (format != null) {
DataFormatPlugin plugin = dataFormatPluginRegistry.get(format);
if (plugin != null) {
Directory dir = plugin.getTieredDirectory(localDirectory, remoteDirectory, indexSettings);
if (dir != null) {
result.put(format, dir);
}
}
}
}
return Collections.unmodifiableMap(result);
}

/**
* Returns format descriptors for the active data format of the given index.
* Resolves the data format from index settings via the {@code pluggable_dataformat} setting,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.util.UploadListener;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.store.CompositeDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.RemoteSyncAwareDirectory;

import java.util.Collection;
import java.util.Map;
Expand Down Expand Up @@ -63,7 +63,6 @@ public void uploadSegments(
logger.debug("Effective new segments files to upload {}", localSegments);
ActionListener<Collection<Void>> mappedListener = ActionListener.map(listener, resp -> null);
GroupedActionListener<Void> batchUploadListener = new GroupedActionListener<>(mappedListener, localSegments.size());
Directory directory = ((FilterDirectory) (((FilterDirectory) storeDirectory).getDelegate())).getDelegate();

for (String localSegment : localSegments) {
// Initializing listener here to ensure that the stats increment operations are thread-safe
Expand All @@ -72,9 +71,7 @@ public void uploadSegments(
statsListener.onSuccess(localSegment);
batchUploadListener.onResponse(resp);
// Once uploaded to Remote, local files become eligible for eviction from FileCache
if (directory instanceof CompositeDirectory compositeDirectory) {
compositeDirectory.afterSyncToRemote(localSegment);
}
notifyAfterSyncToRemote(storeDirectory, localSegment);
}, ex -> {
logger.warn(() -> new ParameterizedMessage("Exception: [{}] while uploading segment files", ex), ex);
if (ex instanceof CorruptIndexException) {
Expand All @@ -94,4 +91,19 @@ public void uploadSegments(
);
}
}

private static void notifyAfterSyncToRemote(Directory dir, String file) {
Directory current = dir;
while (current != null) {
if (current instanceof RemoteSyncAwareDirectory) {
((RemoteSyncAwareDirectory) current).afterSyncToRemote(file);
return;
}
if (current instanceof FilterDirectory) {
current = ((FilterDirectory) current).getDelegate();
} else {
break;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.index.store.remote.filecache.FileCache.RestoredCachedIndexInput;
import org.opensearch.index.store.remote.utils.FileTypeUtils;
import org.opensearch.index.store.remote.utils.TransferManager;
import org.opensearch.storage.utils.DirectoryUtils;
import org.opensearch.threadpool.ThreadPool;

import java.io.FileNotFoundException;
Expand All @@ -54,7 +55,7 @@
* @opensearch.experimental
*/
@ExperimentalApi
public class CompositeDirectory extends FilterDirectory {
public class CompositeDirectory extends FilterDirectory implements RemoteSyncAwareDirectory {
private static final Logger logger = LogManager.getLogger(CompositeDirectory.class);
protected final Directory localDirectory;
protected final RemoteSegmentStoreDirectory remoteDirectory;
Expand Down Expand Up @@ -397,15 +398,7 @@ public Path getFilePath(String name) {
}

private FSDirectory getLocalFSDirectory() {
FSDirectory localFSDirectory;
if (localDirectory instanceof FSDirectory) {
localFSDirectory = (FSDirectory) localDirectory;
} else {
// In this case it should be a FilterDirectory wrapped over FSDirectory as per above validation.
localFSDirectory = (FSDirectory) (((FilterDirectory) localDirectory).getDelegate());
}

return localFSDirectory;
return DirectoryUtils.getFSDirectory(localDirectory);
}

/**
Expand All @@ -423,9 +416,11 @@ private void validate(Directory localDirectory, Directory remoteDirectory, FileC
if (fileCache == null) throw new IllegalStateException(
"File Cache not initialized on this Node, cannot create Composite Directory without FileCache"
);
if (localDirectory instanceof FSDirectory == false
&& !(localDirectory instanceof FilterDirectory && ((FilterDirectory) localDirectory).getDelegate() instanceof FSDirectory))
try {
DirectoryUtils.getFSDirectory(localDirectory);
} catch (IllegalStateException e) {
throw new IllegalStateException("For Composite Directory, local directory must be of type FSDirectory");
}
if (remoteDirectory instanceof RemoteSegmentStoreDirectory == false) throw new IllegalStateException(
"For Composite Directory, remote directory must be of type RemoteSegmentStoreDirectory"
);
Expand Down
Loading
Loading