Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public void beforeStart() {
node.jvmArgs("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=" + debugPort);
debugPort += 1;
}
node.jvmArgs("--enable-native-access=ALL-UNNAMED");
if (keystorePassword.length() > 0) {
node.keystorePassword(keystorePassword);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,4 @@ private static SymbolLookup loadLibrary() {
throw new RuntimeException("Failed to load native library '" + LIBRARY_NAME + "'");
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.engine.dataformat.DataFormat;
import org.opensearch.index.engine.dataformat.FieldTypeCapabilities;
import org.opensearch.index.engine.exec.EngineReaderManager;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.plugins.Plugin;
Expand All @@ -37,6 +38,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -118,7 +120,22 @@ public EngineReaderManager<DatafusionReader> createReaderManager(DataFormat form
* Data formats this plugin can handle. Used by CompositeEngine to route queries.
*/
public List<DataFormat> getSupportedFormats() {
return null; // TODO : List.of("parquet");
return List.of(new DataFormat() {
@Override
public String name() {
return "parquet";
}

@Override
public long priority() {
return 0;
}

@Override
public Set<FieldTypeCapabilities> supportedFields() {
return Set.of();
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.opensearch.analytics.spi.AnalyticsSearchBackendPlugin;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.index.IndexService;
import org.opensearch.index.engine.DataFormatAwareEngine;
import org.opensearch.index.engine.exec.IndexReaderProvider;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;

Expand Down Expand Up @@ -69,14 +69,14 @@ public Iterable<Object[]> execute(RelNode logicalFragment, Object context) {
}

IndexShard shard = resolveShard(tableName);
DataFormatAwareEngine dataFormatAwareEngine = shard.getCompositeEngine();
if (dataFormatAwareEngine == null) {
IndexReaderProvider indexReaderProvider = shard.getReaderProvider();
if (indexReaderProvider == null) {
throw new IllegalStateException("No CompositeEngine on shard [" + shard.shardId() + "]");
}

SearchShardTask task = null; // TODO: init task
List<Object[]> rows = new ArrayList<>();
try (var dataFormatAwareReader = dataFormatAwareEngine.acquireReader()) {
try (var dataFormatAwareReader = indexReaderProvider.acquireReader()) {
ExecutionContext ctx = new ExecutionContext(tableName, task, dataFormatAwareReader.get());
try (SearchExecEngine<ExecutionContext, EngineResultStream> engine = provider.createSearchExecEngine(ctx)) {
logger.info("[DefaultPlanExecutor] Executing via [{}]", provider.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexService;
import org.opensearch.index.engine.DataFormatAwareEngine;
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.index.engine.EngineConfigFactory;
import org.opensearch.index.engine.EngineTestCase;
import org.opensearch.index.engine.dataformat.DataFormat;
import org.opensearch.index.engine.dataformat.FieldTypeCapabilities;
import org.opensearch.index.engine.exec.EngineReaderManager;
Expand Down Expand Up @@ -114,11 +117,11 @@ public void testEndToEndExecuteWithMockBackend() throws IOException {
readerManager.afterRefresh(true, ref.get());
}

DataFormatAwareEngine engine = new DataFormatAwareEngine(Map.of(format, readerManager), snapshotManager);
DataFormatAwareEngine engine = new DataFormatAwareEngine(null);

// Mock shard + cluster wiring
IndexShard shard = mock(IndexShard.class);
when(shard.getCompositeEngine()).thenReturn(engine);
when(shard.getReaderProvider()).thenReturn(engine);

Index index = new Index("my_index", "uuid");
IndexMetadata indexMetadata = mock(IndexMetadata.class);
Expand Down Expand Up @@ -239,6 +242,11 @@ public void onFilesDeleted(Collection<String> files) {}

@Override
public void onFilesAdded(Collection<String> files) {}

@Override
public void close() throws IOException {
readers.clear();
}
}

static class MockCatalogSnapshot extends CatalogSnapshot {
Expand Down
22 changes: 22 additions & 0 deletions sandbox/plugins/composite-engine/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,31 @@ opensearchplugin {
classname = 'org.opensearch.composite.CompositeEnginePlugin'
}

apply plugin: 'opensearch.internal-cluster-test'

// Test compilation needs JDK 25 to consume parquet-data-format (which targets JDK 25)
tasks.named('compileTestJava').configure {
sourceCompatibility = JavaVersion.toVersion(25)
targetCompatibility = JavaVersion.toVersion(25)
}

tasks.named('compileInternalClusterTestJava').configure {
sourceCompatibility = JavaVersion.toVersion(25)
targetCompatibility = JavaVersion.toVersion(25)
}

tasks.named('internalClusterTest').configure {
jvmArgs += [
'--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED',
'--enable-native-access=ALL-UNNAMED'
]
}

dependencies {
api project(':libs:opensearch-concurrent-queue')
api project(':sandbox:libs:composite-common')
compileOnly project(':server')
testImplementation project(':test:framework')
testImplementation project(':sandbox:plugins:parquet-data-format')
internalClusterTestImplementation project(':sandbox:plugins:parquet-data-format')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.composite;

import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.parquet.ParquetDataFormatPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.Collection;

/**
* Integration test that validates a composite index with parquet as the primary data format
* can be created and its settings are correctly persisted.
*
* Requires JDK 25 and sandbox enabled. Run with:
* ./gradlew :sandbox:plugins:composite-engine:test \
* --tests "*.CompositeParquetIndexIT" \
* -Dsandbox.enabled=true
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 1)
public class CompositeParquetIndexIT extends OpenSearchIntegTestCase {

private static final String INDEX_NAME = "test-composite-parquet";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(ParquetDataFormatPlugin.class, CompositeEnginePlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(FeatureFlags.PLUGGABLE_DATAFORMAT_EXPERIMENTAL_FLAG, true)
.build();
}

public void testCreateCompositeParquetIndex() {
Settings indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.pluggable.dataformat.enabled", true)
.put("index.composite.primary_data_format", "parquet")
.putList("index.composite.secondary_data_formats")
.build();

CreateIndexResponse response = client().admin().indices().prepareCreate(INDEX_NAME).setSettings(indexSettings).get();
assertTrue("Index creation should be acknowledged", response.isAcknowledged());

GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings(INDEX_NAME).get();
Settings actual = settingsResponse.getIndexToSettings().get(INDEX_NAME);

assertEquals("1", actual.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS));
assertEquals("0", actual.get(IndexMetadata.SETTING_NUMBER_OF_REPLICAS));
assertEquals("true", actual.get("index.pluggable.dataformat.enabled"));
assertEquals("parquet", actual.get("index.composite.primary_data_format"));

ensureGreen(INDEX_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,31 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.settings.Setting;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.dataformat.DataFormat;
import org.opensearch.index.engine.dataformat.DataFormatPlugin;
import org.opensearch.index.engine.dataformat.DataFormatRegistry;
import org.opensearch.index.engine.dataformat.IndexingExecutionEngine;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.plugins.ExtensiblePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.Client;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.function.Supplier;

/**
* Sandbox plugin that provides a {@link CompositeIndexingExecutionEngine} for
Expand Down Expand Up @@ -61,6 +71,11 @@ public class CompositeEnginePlugin extends Plugin implements ExtensiblePlugin, D
Setting.Property.Final
);

@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, ResourceWatcherService resourceWatcherService, ScriptService scriptService, NamedXContentRegistry xContentRegistry, Environment environment, NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<RepositoriesService> repositoriesServiceSupplier) {
return super.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService, xContentRegistry, environment, nodeEnvironment, namedWriteableRegistry, indexNameExpressionResolver, repositoriesServiceSupplier);
}

/**
* Index setting that lists the secondary data formats for an index.
* Secondary formats receive writes alongside the primary but are not used
Expand Down Expand Up @@ -133,12 +148,12 @@ public List<Setting<?>> getSettings() {
@Override
public DataFormat getDataFormat() {
// TODO: Dataformat for Composite is per index, while this one talks about cluster level. Switching it off for now
return null;
return new CompositeDataFormat();
}

@Override
public IndexingExecutionEngine<?, ?> indexingEngine(MapperService mapperService, ShardPath shardPath, IndexSettings indexSettings) {
return new CompositeIndexingExecutionEngine(dataFormatPlugins, indexSettings, mapperService, shardPath);
public IndexingExecutionEngine<?, ?> indexingEngine(MapperService mapperService, ShardPath shardPath, IndexSettings indexSettings, DataFormatRegistry dataFormatRegistry) {
return new CompositeIndexingExecutionEngine(indexSettings, mapperService, dataFormatRegistry, shardPath);
}

/**
Expand Down
Loading
Loading