diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/SemanticSearchTool.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/SemanticSearchTool.java index a243d483acad..c7548fccd1b7 100644 --- a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/SemanticSearchTool.java +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/SemanticSearchTool.java @@ -10,7 +10,7 @@ import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.service.Entity; import org.openmetadata.service.limits.Limits; -import org.openmetadata.service.search.vector.OpenSearchVectorService; +import org.openmetadata.service.search.vector.VectorIndexService; import org.openmetadata.service.search.vector.utils.DTOs.VectorSearchResponse; import org.openmetadata.service.security.Authorizer; import org.openmetadata.service.security.auth.CatalogSecurityContext; @@ -42,7 +42,7 @@ public Map execute( "Semantic search is not enabled. Configure vector embeddings in the OpenMetadata server settings."); } - OpenSearchVectorService vectorService = OpenSearchVectorService.getInstance(); + VectorIndexService vectorService = Entity.getSearchRepository().getVectorIndexService(); if (vectorService == null) { return errorResponse("Vector search service is not initialized"); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSink.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSink.java index fcbd5c143e59..78fad908500a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSink.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSink.java @@ -1,6 +1,7 @@ package org.openmetadata.service.apps.bundles.searchIndex; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY; +import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.RECREATE_CONTEXT; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.TARGET_INDEX_KEY; import com.fasterxml.jackson.databind.ObjectMapper; @@ -18,12 +19,16 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; @@ -40,15 +45,20 @@ import org.openmetadata.service.apps.bundles.searchIndex.stats.StatsResult; import org.openmetadata.service.exception.EntityNotFoundException; import org.openmetadata.service.exception.SearchIndexException; +import org.openmetadata.service.search.ReindexContext; import org.openmetadata.service.search.SearchRepository; import org.openmetadata.service.search.elasticsearch.ElasticSearchClient; import org.openmetadata.service.search.elasticsearch.EsUtils; +import org.openmetadata.service.search.vector.VectorDocBuilder; +import org.openmetadata.service.search.vector.VectorIndexService; +import org.openmetadata.service.search.vector.utils.AvailableEntityTypes; /** * Elasticsearch implementation using new Java API client with custom bulk handler */ @Slf4j public class ElasticSearchBulkSink implements BulkSink { + private static final int MAX_VECTOR_THREADS = 10; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final JacksonJsonpMapper JACKSON_JSONP_MAPPER = new JacksonJsonpMapper(OBJECT_MAPPER); @@ -107,6 +117,13 @@ public static synchronized void resetDocBuildPoolSize() { // Failure callback private volatile FailureCallback failureCallback; + // Vector embedding fields + private final ExecutorService vectorExecutor; + private final Phaser phaser; + private final CopyOnWriteArrayList pendingThreads; + private final AtomicLong vectorSuccess = new AtomicLong(0); + private final AtomicLong vectorFailed = new AtomicLong(0); + public ElasticSearchBulkSink( SearchRepository searchRepository, int batchSize, @@ -117,6 +134,10 @@ public ElasticSearchBulkSink( this.searchClient = (ElasticSearchClient) searchRepository.getSearchClient(); this.batchSize = batchSize; this.maxConcurrentRequests = maxConcurrentRequests; + this.vectorExecutor = + Executors.newFixedThreadPool(MAX_VECTOR_THREADS, Thread.ofVirtual().factory()); + this.phaser = new Phaser(1); + this.pendingThreads = new CopyOnWriteArrayList<>(); // Initialize stats stats.withTotalRecords(0).withSuccessRecords(0).withFailedRecords(0); @@ -204,6 +225,10 @@ public void write(List entities, Map contextData) throws Exce CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join(); } else { List entityInterfaces = (List) entities; + ReindexContext reindexContext = + contextData.containsKey(RECREATE_CONTEXT) + ? (ReindexContext) contextData.get(RECREATE_CONTEXT) + : null; // Add entities to search index in parallel List> futures = @@ -216,9 +241,9 @@ public void write(List entities, Map contextData) throws Exce .toList(); CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join(); - // Process vector embeddings in batch (no-op in base class) if (embeddingsEnabled) { - addEntitiesToVectorIndexBatch(bulkProcessor, entityInterfaces, recreateIndex); + addEntitiesToVectorIndexBatch( + bulkProcessor, entityInterfaces, recreateIndex, reindexContext, tracker); } } } catch (Exception e) { @@ -422,15 +447,17 @@ public StepStats getProcessStats() { @Override public void close() { try { - // Flush any pending requests + awaitVectorCompletion(60); + bulkProcessor.flush(); - // Wait for completion boolean terminated = bulkProcessor.awaitClose(60, TimeUnit.SECONDS); if (!terminated) { LOG.warn("Bulk processor did not terminate within timeout"); } + vectorExecutor.shutdown(); + // Final stats update to ensure all processed records are reflected updateStats(); @@ -508,17 +535,161 @@ public void updateConcurrentRequests(int concurrentRequests) { LOG.info("Concurrent requests updated to: {}", concurrentRequests); } - /** - * Checks if vector embeddings are enabled for a specific entity type. - * This combines SearchRepository capability check with job configuration. - */ boolean isVectorEmbeddingEnabledForEntity(String entityType) { - return false; + return searchRepository.isVectorEmbeddingEnabled() + && searchRepository.getVectorIndexService() != null + && AvailableEntityTypes.isVectorIndexable(entityType); } void addEntitiesToVectorIndexBatch( - CustomBulkProcessor bulkProcessor, List entities, boolean recreateIndex) { - // TODO: Implement Elasticsearch vector embedding support + CustomBulkProcessor bulkProcessor, + List entities, + boolean recreateIndex, + ReindexContext reindexContext, + StageStatsTracker tracker) { + if (entities.isEmpty()) { + return; + } + + VectorIndexService vectorService = searchRepository.getVectorIndexService(); + if (vectorService == null) { + return; + } + + String entityType = entities.getFirst().getEntityReference().getType(); + if (!AvailableEntityTypes.isVectorIndexable(entityType)) { + return; + } + + String canonicalIndex = VectorIndexService.getClusteredIndexName(); + String finalTargetIndex = canonicalIndex; + String finalSourceIndex = null; + + if (reindexContext != null) { + String stagedIndex = + reindexContext.getStagedIndex(VectorIndexService.VECTOR_INDEX_KEY).orElse(null); + if (stagedIndex != null) { + finalSourceIndex = canonicalIndex; + finalTargetIndex = stagedIndex; + } + } + + String srcIdx = finalSourceIndex; + String tgtIdx = finalTargetIndex; + + Map existingFingerprints = Map.of(); + if (srcIdx != null) { + List parentIds = new ArrayList<>(entities.size()); + for (EntityInterface entity : entities) { + parentIds.add(entity.getId().toString()); + } + existingFingerprints = vectorService.getExistingFingerprintsBatch(srcIdx, parentIds); + } + + for (EntityInterface entity : entities) { + String parentId = entity.getId().toString(); + String existingFp = existingFingerprints.get(parentId); + String currentFp = VectorDocBuilder.computeFingerprintForEntity(entity); + + if (existingFp != null && existingFp.equals(currentFp) && srcIdx != null) { + submitVectorTask( + () -> + processMigration( + vectorService, srcIdx, tgtIdx, parentId, currentFp, entity, tracker)); + } else { + submitVectorTask(() -> processEmbedding(vectorService, entity, tgtIdx, tracker)); + } + } + } + + private void processMigration( + VectorIndexService vectorService, + String sourceIndex, + String targetIndex, + String parentId, + String fingerprint, + EntityInterface entity, + StageStatsTracker tracker) { + try { + if (vectorService.copyExistingVectorDocuments( + sourceIndex, targetIndex, parentId, fingerprint)) { + vectorSuccess.incrementAndGet(); + if (tracker != null) { + tracker.recordVector(StatsResult.SUCCESS); + } + } else { + processEmbedding(vectorService, entity, targetIndex, tracker); + } + } catch (Exception e) { + LOG.warn( + "Vector migration failed for parent_id={}, falling back to recomputation: {}", + parentId, + e.getMessage()); + processEmbedding(vectorService, entity, targetIndex, tracker); + } + } + + private void processEmbedding( + VectorIndexService vectorService, + EntityInterface entity, + String targetIndex, + StageStatsTracker tracker) { + try { + vectorService.updateVectorEmbeddings(entity, targetIndex); + vectorSuccess.incrementAndGet(); + if (tracker != null) { + tracker.recordVector(StatsResult.SUCCESS); + } + } catch (Exception e) { + vectorFailed.incrementAndGet(); + if (tracker != null) { + tracker.recordVector(StatsResult.FAILED); + } + LOG.error("Vector embedding failed for entity {}: {}", entity.getId(), e.getMessage(), e); + } + } + + private void submitVectorTask(Runnable task) { + phaser.register(); + vectorExecutor.submit( + () -> { + Thread current = Thread.currentThread(); + pendingThreads.add(current); + try { + task.run(); + } finally { + pendingThreads.remove(current); + phaser.arriveAndDeregister(); + } + }); + } + + @Override + public boolean awaitVectorCompletion(int timeoutSeconds) { + try { + int phase = phaser.arrive(); + phaser.awaitAdvanceInterruptibly(phase, timeoutSeconds, TimeUnit.SECONDS); + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } catch (TimeoutException e) { + LOG.warn("Timeout waiting for vector completion after {}s", timeoutSeconds); + return false; + } + } + + @Override + public int getPendingVectorTaskCount() { + return Math.max(0, phaser.getUnarrivedParties() - 1); + } + + @Override + public StepStats getVectorStats() { + return new StepStats() + .withTotalRecords((int) (vectorSuccess.get() + vectorFailed.get())) + .withSuccessRecords((int) vectorSuccess.get()) + .withFailedRecords((int) vectorFailed.get()); } public static class CustomBulkProcessor { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/VectorSearchResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/VectorSearchResource.java index 695bc1670d5e..e867b24181a5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/VectorSearchResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/VectorSearchResource.java @@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.openmetadata.service.Entity; import org.openmetadata.service.resources.Collection; -import org.openmetadata.service.search.vector.OpenSearchVectorService; +import org.openmetadata.service.search.vector.VectorIndexService; import org.openmetadata.service.search.vector.utils.DTOs.FingerprintResponse; import org.openmetadata.service.search.vector.utils.DTOs.VectorSearchRequest; import org.openmetadata.service.search.vector.utils.DTOs.VectorSearchResponse; @@ -75,7 +75,7 @@ public Response vectorSearchPost( .build(); } - OpenSearchVectorService vectorService = OpenSearchVectorService.getInstance(); + VectorIndexService vectorService = Entity.getSearchRepository().getVectorIndexService(); if (vectorService == null) { return Response.status(Response.Status.SERVICE_UNAVAILABLE) .entity("{\"error\":\"Vector search service is not initialized\"}") @@ -119,7 +119,7 @@ public Response getFingerprint( .build(); } - OpenSearchVectorService vectorService = OpenSearchVectorService.getInstance(); + VectorIndexService vectorService = Entity.getSearchRepository().getVectorIndexService(); if (vectorService == null) { return Response.status(Response.Status.SERVICE_UNAVAILABLE) .entity("{\"error\":\"Vector search service is not initialized\"}") diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/RecreateWithEmbeddings.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/RecreateWithEmbeddings.java index ef3f907d02d0..512c1b67155b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/RecreateWithEmbeddings.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/RecreateWithEmbeddings.java @@ -5,7 +5,6 @@ import lombok.extern.slf4j.Slf4j; import org.openmetadata.search.IndexMapping; import org.openmetadata.service.Entity; -import org.openmetadata.service.search.vector.OpenSearchVectorService; import org.openmetadata.service.search.vector.VectorIndexService; @Slf4j @@ -17,7 +16,7 @@ public ReindexContext reCreateIndexes(Set entities) { searchRepository.initializeVectorSearchService(); Set allEntities = new HashSet<>(entities); - if (OpenSearchVectorService.getInstance() != null) { + if (searchRepository.getVectorIndexService() != null) { allEntities.add(VectorIndexService.VECTOR_INDEX_KEY); } @@ -28,7 +27,7 @@ public ReindexContext reCreateIndexes(Set entities) { protected void recreateIndexFromMapping( ReindexContext context, IndexMapping indexMapping, String entityType) { if (VectorIndexService.VECTOR_INDEX_KEY.equals(entityType) - && OpenSearchVectorService.getInstance() == null) { + && Entity.getSearchRepository().getVectorIndexService() == null) { LOG.info("Skipping vector index recreation - vector service not initialized"); return; } @@ -38,7 +37,7 @@ protected void recreateIndexFromMapping( @Override public void promoteEntityIndex(EntityReindexContext context, boolean reindexSuccess) { if (VectorIndexService.VECTOR_INDEX_KEY.equals(context.getEntityType()) - && OpenSearchVectorService.getInstance() == null) { + && Entity.getSearchRepository().getVectorIndexService() == null) { return; } super.promoteEntityIndex(context, reindexSuccess); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index 5787c1e01cd7..154f51c9cc83 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -134,6 +134,7 @@ import org.openmetadata.service.search.nlq.NLQService; import org.openmetadata.service.search.nlq.NLQServiceFactory; import org.openmetadata.service.search.opensearch.OpenSearchClient; +import org.openmetadata.service.search.vector.ElasticSearchVectorService; import org.openmetadata.service.search.vector.OpenSearchVectorService; import org.openmetadata.service.search.vector.VectorEmbeddingHandler; import org.openmetadata.service.search.vector.VectorIndexService; @@ -410,9 +411,10 @@ public synchronized void initializeVectorSearchService() { OpenSearchVectorService.init(osClient, embeddingClient, language); this.vectorIndexService = OpenSearchVectorService.getInstance(); } else { - LOG.warn( - "Vector embedding is only supported with OpenSearch. Elasticsearch support is planned."); - return; + es.co.elastic.clients.elasticsearch.ElasticsearchClient esClient = + ((ElasticSearchClient) getSearchClient()).getNewClient(); + ElasticSearchVectorService.init(esClient, embeddingClient, language); + this.vectorIndexService = ElasticSearchVectorService.getInstance(); } this.vectorEmbeddingHandler = new VectorEmbeddingHandler(vectorIndexService); @@ -521,10 +523,14 @@ public void deleteIndex(IndexMapping indexMapping) { } private String getIndexMapping(IndexMapping indexMapping) { + String mappingFile = indexMapping.getIndexMappingFile(); + boolean isOpenSearch = getSearchType() == ElasticSearchConfiguration.SearchType.OPENSEARCH; + if (!isOpenSearch && mappingFile != null && mappingFile.contains("vector_search_index.json")) { + mappingFile = + mappingFile.replace("vector_search_index.json", "vector_search_index_es_native.json"); + } try (InputStream in = - getClass() - .getResourceAsStream( - String.format(indexMapping.getIndexMappingFile(), language.toLowerCase()))) { + getClass().getResourceAsStream(String.format(mappingFile, language.toLowerCase()))) { assert in != null; return new String(in.readAllBytes()); } catch (Exception e) { @@ -2102,8 +2108,13 @@ private String reformatVectorIndexWithDimension(String mapping, int dimension) { if (mappings.has("properties")) { JsonNode properties = mappings.get("properties"); if (properties.has("embedding")) { - ((com.fasterxml.jackson.databind.node.ObjectNode) properties.get("embedding")) - .put("dimension", dimension); + com.fasterxml.jackson.databind.node.ObjectNode embeddingNode = + (com.fasterxml.jackson.databind.node.ObjectNode) properties.get("embedding"); + if (embeddingNode.has("dims")) { + embeddingNode.put("dims", dimension); + } else { + embeddingNode.put("dimension", dimension); + } } } JsonNode meta = @@ -2122,7 +2133,9 @@ private String reformatVectorIndexWithDimension(String mapping, int dimension) { .replace("\"dimension\": 768", "\"dimension\": " + dimension) .replace("\"dimension\":768", "\"dimension\":" + dimension) .replace("\"dimension\": 512", "\"dimension\": " + dimension) - .replace("\"dimension\":512", "\"dimension\":" + dimension); + .replace("\"dimension\":512", "\"dimension\":" + dimension) + .replace("\"dims\": 512", "\"dims\": " + dimension) + .replace("\"dims\":512", "\"dims\":" + dimension); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexManager.java index 9b451e4ad76a..b00bb66afb4f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexManager.java @@ -1,5 +1,7 @@ package org.openmetadata.service.search.elasticsearch; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import es.co.elastic.clients.elasticsearch.ElasticsearchClient; import es.co.elastic.clients.elasticsearch._types.ElasticsearchException; import es.co.elastic.clients.elasticsearch.indices.CreateIndexRequest; @@ -31,6 +33,7 @@ */ @Slf4j public class ElasticSearchIndexManager implements IndexManagementClient { + private static final ObjectMapper MAPPER = new ObjectMapper(); private final ElasticsearchClient client; private final String clusterAlias; private final boolean isClientAvailable; @@ -83,12 +86,13 @@ public void updateIndex(IndexMapping indexMapping, String indexMappingContent) { try { String indexName = indexMapping.getIndexName(clusterAlias); + String mappingsJson = extractMappingsJson(indexMappingContent); PutMappingRequest request = PutMappingRequest.of( builder -> { builder.index(indexName); - if (indexMappingContent != null) { - builder.withJson(new StringReader(indexMappingContent)); + if (mappingsJson != null) { + builder.withJson(new StringReader(mappingsJson)); } return builder; }); @@ -141,6 +145,24 @@ public void createIndex(String indexName, String indexMappingContent) { } } + private String extractMappingsJson(String indexMappingContent) { + if (indexMappingContent == null) { + return null; + } + try { + JsonNode root = MAPPER.readTree(indexMappingContent); + JsonNode mappings = root.get("mappings"); + if (mappings != null) { + return MAPPER.writeValueAsString(mappings); + } + return indexMappingContent; + } catch (IOException e) { + LOG.warn( + "Failed to extract mappings from index content, using full content: {}", e.getMessage()); + return indexMappingContent; + } + } + private void createIndexInternal(String indexName, String indexMappingContent) throws IOException { CreateIndexRequest request = diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/SemanticSearchQueryBuilder.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/SemanticSearchQueryBuilder.java new file mode 100644 index 000000000000..751ac113aaa3 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/SemanticSearchQueryBuilder.java @@ -0,0 +1,153 @@ +package org.openmetadata.service.search.elasticsearch; + +import es.co.elastic.clients.elasticsearch._types.Script; +import es.co.elastic.clients.elasticsearch._types.ScriptLanguage; +import es.co.elastic.clients.elasticsearch._types.query_dsl.FunctionBoostMode; +import es.co.elastic.clients.elasticsearch._types.query_dsl.FunctionScoreMode; +import es.co.elastic.clients.elasticsearch._types.query_dsl.Query; +import es.co.elastic.clients.elasticsearch._types.query_dsl.TextQueryType; +import es.co.elastic.clients.json.JsonData; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.search.SearchRequest; +import org.openmetadata.schema.utils.JsonUtils; +import org.openmetadata.service.rdf.semantic.EmbeddingService; + +/** + * Builds semantic search queries for Elasticsearch that combine: + * 1. Vector similarity search using k-NN (dense_vector) + * 2. Traditional text search with BM25 + * 3. RDF context boosting + */ +@Slf4j +public class SemanticSearchQueryBuilder { + + private static final String KNN_FIELD = "embedding"; + private static final String RDF_CONTEXT_FIELD = "rdfContext"; + + private final EmbeddingService embeddingService; + + public SemanticSearchQueryBuilder() { + this.embeddingService = EmbeddingService.getInstance(); + } + + public Query buildSemanticQuery(SearchRequest request) { + String queryText = request.getQuery(); + if (!isSemanticSearchEnabled(request)) { + return null; + } + float[] queryEmbedding = embeddingService.generateEmbedding(queryText); + + Query knnQuery = buildKnnQuery(queryEmbedding); + Query textQuery = buildTextQuery(queryText, request); + + Query hybridQuery = + Query.of( + q -> + q.bool( + b -> + b.should(s -> s.constantScore(cs -> cs.filter(knnQuery).boost(0.7f))) + .should(s -> s.constantScore(cs -> cs.filter(textQuery).boost(0.3f))))); + + return Query.of( + q -> + q.functionScore( + fs -> + fs.query(hybridQuery) + .functions(f -> f.scriptScore(ss -> ss.script(buildRdfBoostScript()))) + .scoreMode(FunctionScoreMode.Sum) + .boostMode(FunctionBoostMode.Multiply))); + } + + private Query buildKnnQuery(float[] queryEmbedding) { + Map params = new HashMap<>(); + List vectorList = new ArrayList<>(); + for (float v : queryEmbedding) { + vectorList.add((double) v); + } + params.put("query_vector", vectorList); + + return Query.of( + q -> + q.scriptScore( + ss -> + ss.query(mq -> mq.matchAll(m -> m)) + .script( + Script.of( + s -> + s.source( + src -> + src.scriptString( + "cosineSimilarity(params.query_vector, '" + + KNN_FIELD + + "') + 1.0")) + .lang(ScriptLanguage.Painless) + .params(convertToJsonDataMap(params)))))); + } + + private Query buildTextQuery(String queryText, SearchRequest request) { + List fields = new ArrayList<>(); + fields.add("name^5"); + fields.add("displayName^4"); + fields.add("description^2"); + fields.add("tags.tagFQN^3"); + + if ("table".equalsIgnoreCase(request.getIndex())) { + fields.add("columns.name^3"); + fields.add("columns.description"); + } + + return Query.of( + q -> + q.multiMatch( + m -> + m.query(queryText) + .fields(fields) + .type(TextQueryType.BestFields) + .fuzziness("AUTO"))); + } + + private Script buildRdfBoostScript() { + String scriptSource = + """ + double boost = 1.0; + + if (doc.containsKey('rdfContext.upstreamCount')) { + int upstreamCount = doc['rdfContext.upstreamCount'].value; + boost += Math.min(upstreamCount * 0.01, 0.2); + } + + if (doc.containsKey('rdfContext.downstreamCount')) { + int downstreamCount = doc['rdfContext.downstreamCount'].value; + boost += Math.min(downstreamCount * 0.02, 0.3); + } + + if (doc.containsKey('rdfContext.semanticTypes')) { + int typeCount = doc['rdfContext.semanticTypes'].size(); + boost += Math.min(typeCount * 0.05, 0.2); + } + + return boost; + """; + + return Script.of( + s -> + s.source(src -> src.scriptString(scriptSource)) + .lang(ScriptLanguage.Painless) + .params(Map.of())); + } + + private boolean isSemanticSearchEnabled(SearchRequest request) { + return request.getSemanticSearch() != null && request.getSemanticSearch(); + } + + private Map convertToJsonDataMap(Map map) { + return JsonUtils.getMap(map).entrySet().stream() + .filter(entry -> entry.getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, entry -> JsonData.of(entry.getValue()))); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/vector/ElasticSearchVectorService.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/vector/ElasticSearchVectorService.java new file mode 100644 index 000000000000..14a60295a7c1 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/vector/ElasticSearchVectorService.java @@ -0,0 +1,543 @@ +package org.openmetadata.service.search.vector; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import es.co.elastic.clients.elasticsearch.ElasticsearchClient; +import es.co.elastic.clients.elasticsearch._types.Refresh; +import es.co.elastic.clients.elasticsearch._types.mapping.TypeMapping; +import es.co.elastic.clients.elasticsearch.core.BulkRequest; +import es.co.elastic.clients.elasticsearch.core.BulkResponse; +import es.co.elastic.clients.elasticsearch.core.bulk.BulkOperation; +import es.co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; +import es.co.elastic.clients.elasticsearch.indices.CreateIndexRequest; +import es.co.elastic.clients.elasticsearch.indices.ExistsRequest; +import es.co.elastic.clients.elasticsearch.indices.IndexSettings; +import es.co.elastic.clients.transport.rest5_client.Rest5ClientTransport; +import es.co.elastic.clients.transport.rest5_client.low_level.Request; +import es.co.elastic.clients.transport.rest5_client.low_level.Response; +import es.co.elastic.clients.transport.rest5_client.low_level.Rest5Client; +import jakarta.json.stream.JsonParser; +import java.io.InputStream; +import java.io.StringReader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.EntityInterface; +import org.openmetadata.service.events.lifecycle.EntityLifecycleEventDispatcher; +import org.openmetadata.service.search.vector.client.EmbeddingClient; +import org.openmetadata.service.search.vector.utils.DTOs.VectorSearchResponse; + +@Slf4j +public class ElasticSearchVectorService implements VectorIndexService { + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final int OVER_FETCH_MULTIPLIER = 2; + + private static volatile ElasticSearchVectorService instance; + + private final ElasticsearchClient client; + private final Rest5Client restClient; + @Getter private final EmbeddingClient embeddingClient; + private final String language; + + public ElasticSearchVectorService( + ElasticsearchClient client, EmbeddingClient embeddingClient, String language) { + this.client = client; + this.restClient = extractRestClient(client); + this.embeddingClient = embeddingClient; + this.language = language != null ? language.toLowerCase() : "en"; + } + + public ElasticSearchVectorService(ElasticsearchClient client, EmbeddingClient embeddingClient) { + this(client, embeddingClient, "en"); + } + + private static Rest5Client extractRestClient(ElasticsearchClient client) { + Rest5ClientTransport transport = (Rest5ClientTransport) client._transport(); + return transport.restClient(); + } + + public static synchronized void init( + ElasticsearchClient client, EmbeddingClient embeddingClient, String language) { + if (instance != null) { + LOG.warn("ElasticSearchVectorService already initialized, reinitializing"); + } + instance = new ElasticSearchVectorService(client, embeddingClient, language); + instance.registerVectorEmbeddingHandler(); + LOG.info( + "ElasticSearchVectorService initialized with model={}, dimension={}", + embeddingClient.getModelId(), + embeddingClient.getDimension()); + } + + public static ElasticSearchVectorService getInstance() { + return instance; + } + + private void registerVectorEmbeddingHandler() { + try { + VectorEmbeddingHandler handler = new VectorEmbeddingHandler(this); + EntityLifecycleEventDispatcher.getInstance().registerHandler(handler); + LOG.info("Registered VectorEmbeddingHandler for entity lifecycle events"); + } catch (Exception e) { + LOG.error("Failed to register VectorEmbeddingHandler", e); + } + } + + @Override + @SuppressWarnings("unchecked") + public VectorSearchResponse search( + String query, Map> filters, int size, int k, double threshold) { + long start = System.currentTimeMillis(); + try { + float[] queryVector = embeddingClient.embed(query); + int overFetchSize = size * OVER_FETCH_MULTIPLIER; + + String queryJson = + VectorSearchQueryBuilder.buildNativeESQuery(queryVector, overFetchSize, k, filters); + String indexName = getClusteredIndexName(); + String responseBody = executeGenericRequest("POST", "/" + indexName + "/_search", queryJson); + + JsonNode root = MAPPER.readTree(responseBody); + JsonNode hitsNode = root.path("hits").path("hits"); + + LinkedHashMap>> byParent = new LinkedHashMap<>(); + for (JsonNode hit : hitsNode) { + double score = hit.path("_score").asDouble(0.0); + if (score < threshold) { + continue; + } + + Map hitMap = MAPPER.convertValue(hit.path("_source"), Map.class); + hitMap.put("_score", score); + + String parentId = (String) hitMap.get("parent_id"); + if (parentId != null) { + byParent.computeIfAbsent(parentId, kVal -> new ArrayList<>()).add(hitMap); + } + } + + List> results = new ArrayList<>(); + int parentCount = 0; + for (List> chunks : byParent.values()) { + if (parentCount >= size) { + break; + } + results.addAll(chunks); + parentCount++; + } + + long tookMillis = System.currentTimeMillis() - start; + return new VectorSearchResponse(tookMillis, results); + } catch (Exception e) { + LOG.error("Vector search failed: {}", e.getMessage(), e); + throw new RuntimeException("Vector search failed", e); + } + } + + String executeGenericRequest(String method, String endpoint, String body) { + try { + Request request = new Request(method, endpoint); + if (body != null) { + request.setJsonEntity(body); + } + Response response = restClient.performRequest(request); + try (InputStream is = response.getEntity().getContent()) { + return new String(is.readAllBytes(), StandardCharsets.UTF_8); + } + } catch (Exception e) { + LOG.error("Generic request failed: {} {}", method, endpoint, e); + throw new RuntimeException("Elasticsearch generic request failed", e); + } + } + + @Override + public void updateVectorEmbeddings(EntityInterface entity, String targetIndex) { + try { + String parentId = entity.getId().toString(); + String existingFingerprint = getExistingFingerprint(targetIndex, parentId); + String currentFingerprint = VectorDocBuilder.computeFingerprintForEntity(entity); + + if (currentFingerprint.equals(existingFingerprint)) { + LOG.debug("Skipping entity {} - fingerprint unchanged", parentId); + return; + } + + List> docs = VectorDocBuilder.fromEntity(entity, embeddingClient); + deleteByParentId(targetIndex, parentId); + bulkIndex(docs, targetIndex); + } catch (Exception e) { + LOG.error( + "Failed to update vector embeddings for entity {}: {}", + entity.getId(), + e.getMessage(), + e); + } + } + + @Override + public void updateVectorEmbeddingsWithMigration( + EntityInterface entity, String targetIndex, String sourceIndex) { + try { + String parentId = entity.getId().toString(); + String currentFingerprint = VectorDocBuilder.computeFingerprintForEntity(entity); + + if (sourceIndex != null) { + try { + String existingFingerprint = getExistingFingerprint(sourceIndex, parentId); + if (currentFingerprint.equals(existingFingerprint)) { + if (copyExistingVectorDocuments( + sourceIndex, targetIndex, parentId, currentFingerprint)) { + return; + } + } + } catch (Exception ex) { + LOG.warn( + "Migration copy failed for entity {}, falling back to recomputation: {}", + parentId, + ex.getMessage()); + } + } + + List> docs = VectorDocBuilder.fromEntity(entity, embeddingClient); + bulkIndex(docs, targetIndex); + } catch (Exception e) { + LOG.error( + "Failed to update vector embeddings with migration for entity {}: {}", + entity.getId(), + e.getMessage(), + e); + } + } + + @Override + public String getExistingFingerprint(String indexName, String parentId) { + try { + String query = + "{\"size\":1,\"_source\":[\"fingerprint\"]," + + "\"query\":{\"term\":{\"parent_id\":\"" + + VectorSearchQueryBuilder.escape(parentId) + + "\"}}}"; + String response = executeGenericRequest("POST", "/" + indexName + "/_search", query); + JsonNode root = MAPPER.readTree(response); + JsonNode hits = root.path("hits").path("hits"); + if (hits.isArray() && !hits.isEmpty()) { + return hits.get(0).path("_source").path("fingerprint").asText(null); + } + } catch (Exception e) { + LOG.debug( + "Failed to get fingerprint for parent_id={} in index={}: {}", + parentId, + indexName, + e.getMessage()); + } + return null; + } + + @Override + public Map getExistingFingerprintsBatch( + String indexName, List parentIds) { + if (parentIds == null || parentIds.isEmpty()) { + return Collections.emptyMap(); + } + try { + StringBuilder termsArray = new StringBuilder("["); + for (int i = 0; i < parentIds.size(); i++) { + if (i > 0) termsArray.append(','); + termsArray + .append("\"") + .append(VectorSearchQueryBuilder.escape(parentIds.get(i))) + .append("\""); + } + termsArray.append("]"); + + String query = + "{\"size\":" + + parentIds.size() + + ",\"_source\":[\"parent_id\",\"fingerprint\"]" + + ",\"query\":{\"terms\":{\"parent_id\":" + + termsArray + + "}}" + + ",\"collapse\":{\"field\":\"parent_id\"}}"; + + String response = executeGenericRequest("POST", "/" + indexName + "/_search", query); + JsonNode root = MAPPER.readTree(response); + JsonNode hits = root.path("hits").path("hits"); + + Map result = new HashMap<>(); + for (JsonNode hit : hits) { + String pid = hit.path("_source").path("parent_id").asText(); + String fp = hit.path("_source").path("fingerprint").asText(null); + if (pid != null && fp != null) { + result.put(pid, fp); + } + } + return result; + } catch (Exception e) { + LOG.error("Failed to batch get fingerprints in index={}: {}", indexName, e.getMessage(), e); + return Collections.emptyMap(); + } + } + + @Override + @SuppressWarnings("unchecked") + public boolean copyExistingVectorDocuments( + String sourceIndex, String targetIndex, String parentId, String fingerprint) { + try { + String searchQuery = + "{\"size\":1000,\"query\":{\"term\":{\"parent_id\":\"" + + VectorSearchQueryBuilder.escape(parentId) + + "\"}}}"; + String response = executeGenericRequest("POST", "/" + sourceIndex + "/_search", searchQuery); + JsonNode root = MAPPER.readTree(response); + JsonNode hits = root.path("hits").path("hits"); + + if (!hits.isArray() || hits.isEmpty()) { + return false; + } + + List> docs = new ArrayList<>(); + for (JsonNode hit : hits) { + Map source = MAPPER.convertValue(hit.path("_source"), Map.class); + source.put("fingerprint", fingerprint); + docs.add(source); + } + bulkIndex(docs, targetIndex); + return true; + } catch (Exception e) { + LOG.error( + "Failed to copy vector documents from {} to {} for parent_id={}: {}", + sourceIndex, + targetIndex, + parentId, + e.getMessage(), + e); + return false; + } + } + + @Override + public void softDeleteEmbeddings(EntityInterface entity) { + try { + String parentId = entity.getId().toString(); + String indexName = getClusteredIndexName(); + String script = + "{\"script\":{\"source\":\"ctx._source.deleted = true\"}," + + "\"query\":{\"term\":{\"parent_id\":\"" + + VectorSearchQueryBuilder.escape(parentId) + + "\"}}}"; + executeGenericRequest("POST", "/" + indexName + "/_update_by_query", script); + } catch (Exception e) { + LOG.error( + "Failed to soft delete embeddings for entity {}: {}", entity.getId(), e.getMessage(), e); + } + } + + @Override + public void hardDeleteEmbeddings(EntityInterface entity) { + try { + String parentId = entity.getId().toString(); + String indexName = getClusteredIndexName(); + deleteByParentId(indexName, parentId); + } catch (Exception e) { + LOG.error( + "Failed to hard delete embeddings for entity {}: {}", entity.getId(), e.getMessage(), e); + } + } + + @Override + public void restoreEmbeddings(EntityInterface entity) { + try { + String parentId = entity.getId().toString(); + String indexName = getClusteredIndexName(); + String script = + "{\"script\":{\"source\":\"ctx._source.deleted = false\"}," + + "\"query\":{\"term\":{\"parent_id\":\"" + + VectorSearchQueryBuilder.escape(parentId) + + "\"}}}"; + executeGenericRequest("POST", "/" + indexName + "/_update_by_query", script); + } catch (Exception e) { + LOG.error( + "Failed to restore embeddings for entity {}: {}", entity.getId(), e.getMessage(), e); + } + } + + private void deleteByParentId(String indexName, String parentId) { + try { + String query = + "{\"query\":{\"term\":{\"parent_id\":\"" + + VectorSearchQueryBuilder.escape(parentId) + + "\"}}}"; + executeGenericRequest("POST", "/" + indexName + "/_delete_by_query", query); + } catch (Exception e) { + LOG.error( + "Failed to delete by parent_id={} in index={}: {}", + parentId, + indexName, + e.getMessage(), + e); + } + } + + private static String getClusteredIndexName() { + return VectorIndexService.getClusteredIndexName(); + } + + @Override + public void createOrUpdateIndex(int dimension) { + try { + if (indexExists()) { + LOG.info("Vector index {} already exists", VECTOR_INDEX_NAME); + return; + } + + String mappingJson = loadIndexMapping(dimension); + JsonNode rootNode = MAPPER.readTree(mappingJson); + JsonNode mappingsNode = rootNode.get("mappings"); + JsonNode settingsNode = rootNode.get("settings"); + + CreateIndexRequest request = + CreateIndexRequest.of( + builder -> { + builder.index(getClusteredIndexName()); + + if (mappingsNode != null && !mappingsNode.isNull()) { + TypeMapping typeMapping = parseTypeMapping(mappingsNode); + builder.mappings(typeMapping); + } + + if (settingsNode != null && !settingsNode.isNull()) { + IndexSettings settings = parseIndexSettings(settingsNode); + builder.settings(settings); + } + + return builder; + }); + client.indices().create(request); + + LOG.info("Created vector index {} with dimension {}", getClusteredIndexName(), dimension); + } catch (Exception e) { + LOG.error("Failed to create vector index: {}", e.getMessage(), e); + } + } + + @Override + public boolean indexExists() { + try { + ExistsRequest request = ExistsRequest.of(b -> b.index(getClusteredIndexName())); + return client.indices().exists(request).value(); + } catch (Exception e) { + LOG.error("Failed to check if vector index exists: {}", e.getMessage(), e); + return false; + } + } + + @Override + public String getIndexName() { + return getClusteredIndexName(); + } + + @Override + @SuppressWarnings("unchecked") + public void bulkIndex(List> documents, String targetIndex) { + if (documents == null || documents.isEmpty()) { + return; + } + + try { + List operations = new ArrayList<>(); + for (int i = 0; i < documents.size(); i++) { + Map doc = documents.get(i); + String parentId = (String) doc.get("parent_id"); + int chunkIndex = doc.containsKey("chunk_index") ? (int) doc.get("chunk_index") : i; + String docId = parentId + "-" + chunkIndex; + + operations.add( + BulkOperation.of( + op -> op.index(idx -> idx.index(targetIndex).id(docId).document(doc)))); + } + + BulkRequest bulkRequest = + BulkRequest.of(b -> b.operations(operations).refresh(Refresh.False)); + BulkResponse response = client.bulk(bulkRequest); + + if (response.errors()) { + long errorCount = 0; + for (BulkResponseItem item : response.items()) { + if (item.error() != null) { + errorCount++; + LOG.warn( + "Bulk vector indexing error for document [{}] in [{}]: type={}, reason={}", + item.id(), + targetIndex, + item.error().type(), + item.error().reason()); + } + } + LOG.warn( + "Bulk vector indexing completed with {}/{} errors in {}", + errorCount, + documents.size(), + targetIndex); + } else { + LOG.debug( + "Successfully bulk indexed {} vector documents in {}", documents.size(), targetIndex); + } + } catch (Exception e) { + LOG.error("Bulk vector indexing failed in {}: {}", targetIndex, e.getMessage(), e); + } + } + + private TypeMapping parseTypeMapping(JsonNode mappingsNode) { + JsonParser parser = + client + ._transport() + .jsonpMapper() + .jsonProvider() + .createParser(new StringReader(mappingsNode.toString())); + return TypeMapping._DESERIALIZER.deserialize(parser, client._transport().jsonpMapper()); + } + + private IndexSettings parseIndexSettings(JsonNode settingsNode) { + JsonParser parser = + client + ._transport() + .jsonpMapper() + .jsonProvider() + .createParser(new StringReader(settingsNode.toString())); + return IndexSettings._DESERIALIZER.deserialize(parser, client._transport().jsonpMapper()); + } + + private String loadIndexMapping(int dimension) { + String resourcePath = "elasticsearch/" + language + "/vector_search_index_es_native.json"; + try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream(resourcePath)) { + if (inputStream == null) { + throw new IllegalStateException("Could not find " + resourcePath + " in classpath"); + } + String template = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8); + String result = template.replace("\"dims\": 512", "\"dims\": " + dimension); + if (result.equals(template) && dimension != 512) { + throw new IllegalStateException( + "Failed to replace dimension placeholder in vector index mapping template"); + } + return result; + } catch (Exception e) { + throw new RuntimeException("Failed to load vector search index mapping", e); + } + } + + public void close() { + try { + if (client != null && client._transport() != null) { + client._transport().close(); + } + } catch (Exception e) { + LOG.warn("Error closing Elasticsearch transport: {}", e.getMessage()); + } + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/vector/VectorSearchQueryBuilder.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/vector/VectorSearchQueryBuilder.java index 7e785b5f941f..8c76481c8516 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/vector/VectorSearchQueryBuilder.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/vector/VectorSearchQueryBuilder.java @@ -16,7 +16,7 @@ public class VectorSearchQueryBuilder { private static final String NONE = "__NONE__"; public static String build(float[] vector, int size, int k, Map> filters) { - + StringBuilder sb = new StringBuilder(512) .append("{\"size\":") @@ -30,10 +30,42 @@ public static String build(float[] vector, int size, int k, Map> filters) { + int numCandidates = Math.max(k, 100); + StringBuilder sb = + new StringBuilder(512) + .append("{\"size\":") + .append(size) + .append(",\"_source\":{\"excludes\":[\"embedding\"]}") + .append(",\"knn\":{") + .append("\"field\":\"embedding\"") + .append(",\"query_vector\":") + .append(Arrays.toString(vector)) + .append(",\"k\":") + .append(k) + .append(",\"num_candidates\":") + .append(numCandidates); + + sb.append(",\"filter\":{\"bool\":{\"must\":["); + appendFilterMustClauses(sb, filters); + sb.append("]}}"); // close must array and bool + + sb.append("}}"); // close knn object + return sb.toString(); + } + private static void appendFilterMustClauses(StringBuilder sb, Map> filters) { // Only include documents where deleted=false sb.append("{\"term\":{\"deleted\":false}}"); - + // Then add user-specified filters for (var e : filters.entrySet()) { String field = e.getKey(); @@ -78,11 +110,6 @@ public static String build(float[] vector, int size, int k, Map vals) { diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSinkSimpleTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSinkSimpleTest.java index e6abd406638b..043ec4983ef1 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSinkSimpleTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSinkSimpleTest.java @@ -91,6 +91,7 @@ void testIsVectorEmbeddingEnabledForEntity() { @Test void testAddEntitiesToVectorIndexBatch() { - elasticSearchBulkSink.addEntitiesToVectorIndexBatch(null, Collections.emptyList(), true); + elasticSearchBulkSink.addEntitiesToVectorIndexBatch( + null, Collections.emptyList(), true, null, null); } } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexManagerTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexManagerTest.java index bbed76ba715b..b809aa625938 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexManagerTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/search/elasticsearch/ElasticSearchIndexManagerTest.java @@ -299,6 +299,21 @@ void testUpdateIndex_HandlesInvalidJson() throws IOException { verifyNoInteractions(indicesClient); } + @Test + void testUpdateIndex_ExtractsMappingsFromFullIndexJson() throws IOException { + // putMapping only accepts the mappings sub-object, not a full index JSON with settings/aliases + String fullIndexJson = + "{\"settings\":{\"number_of_shards\":1}," + + "\"mappings\":{\"properties\":{\"field1\":{\"type\":\"text\"}}}," + + "\"aliases\":{}}"; + when(indexMapping.getIndexName(CLUSTER_ALIAS)).thenReturn(TEST_INDEX); + when(indicesClient.putMapping(any(PutMappingRequest.class))).thenReturn(putMappingResponse); + + assertDoesNotThrow(() -> indexManager.updateIndex(indexMapping, fullIndexJson)); + + verify(indicesClient).putMapping(any(PutMappingRequest.class)); + } + @Test void testCreateIndex_ClientNotAvailable() { ElasticSearchIndexManager managerWithNullClient = diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/search/vector/ElasticSearchVectorServiceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/search/vector/ElasticSearchVectorServiceTest.java new file mode 100644 index 000000000000..bc95a9ec3257 --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/search/vector/ElasticSearchVectorServiceTest.java @@ -0,0 +1,340 @@ +package org.openmetadata.service.search.vector; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import es.co.elastic.clients.elasticsearch.ElasticsearchClient; +import es.co.elastic.clients.transport.rest5_client.Rest5ClientTransport; +import es.co.elastic.clients.transport.rest5_client.low_level.Request; +import es.co.elastic.clients.transport.rest5_client.low_level.Response; +import es.co.elastic.clients.transport.rest5_client.low_level.Rest5Client; +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.hc.core5.http.HttpEntity; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.openmetadata.service.search.vector.client.EmbeddingClient; +import org.openmetadata.service.search.vector.utils.DTOs; + +class ElasticSearchVectorServiceTest { + + private ElasticSearchVectorService vectorService; + private Rest5Client mockRestClient; + private EmbeddingClient mockEmbeddingClient; + + @BeforeEach + void setup() throws Exception { + ElasticsearchClient mockClient = mock(ElasticsearchClient.class); + Rest5ClientTransport mockTransport = mock(Rest5ClientTransport.class); + mockRestClient = mock(Rest5Client.class); + + when(mockClient._transport()).thenReturn(mockTransport); + when(mockTransport.restClient()).thenReturn(mockRestClient); + + mockEmbeddingClient = mock(EmbeddingClient.class); + when(mockEmbeddingClient.embed(any(String.class))).thenReturn(new float[] {0.1f, 0.2f, 0.3f}); + + vectorService = new ElasticSearchVectorService(mockClient, mockEmbeddingClient); + } + + @Test + void testThresholdFilteringRemovesLowScoreResults() throws Exception { + String esResponse = + """ + { + "hits": { + "total": {"value": 4}, + "hits": [ + { + "_score": 0.9, + "_source": { + "parent_id": "parent1", + "chunk_index": 0, + "text": "High score chunk" + } + }, + { + "_score": 0.7, + "_source": { + "parent_id": "parent2", + "chunk_index": 0, + "text": "Medium score chunk" + } + }, + { + "_score": 0.4, + "_source": { + "parent_id": "parent3", + "chunk_index": 0, + "text": "Low score chunk" + } + }, + { + "_score": 0.2, + "_source": { + "parent_id": "parent4", + "chunk_index": 0, + "text": "Very low score chunk" + } + } + ] + } + } + """; + + mockRestClientResponse(esResponse); + + DTOs.VectorSearchResponse results = vectorService.search("test query", Map.of(), 10, 100, 0.5); + + assertNotNull(results); + assertEquals(2, results.hits.size(), "Should return 2 results (scores 0.9 and 0.7)"); + for (Map result : results.hits) { + double score = (double) result.get("_score"); + assertTrue(score >= 0.5, "All results should have score >= 0.5, got: " + score); + } + } + + @Test + void testScoreFieldIncludedInResults() throws Exception { + String esResponse = + """ + { + "hits": { + "total": {"value": 1}, + "hits": [ + { + "_score": 0.85, + "_source": { + "parent_id": "parent1", + "chunk_index": 0, + "text": "Test chunk" + } + } + ] + } + } + """; + + mockRestClientResponse(esResponse); + + DTOs.VectorSearchResponse results = vectorService.search("test query", Map.of(), 10, 100, 0.0); + + assertEquals(1, results.hits.size()); + assertTrue(results.hits.get(0).containsKey("_score"), "Result should contain _score field"); + assertEquals(0.85, (double) results.hits.get(0).get("_score"), 0.001); + } + + @Test + void testParentGroupingLimitsDistinctParents() throws Exception { + String esResponse = + """ + { + "hits": { + "total": {"value": 8}, + "hits": [ + {"_score": 0.9, "_source": {"parent_id": "parent1", "chunk_index": 0}}, + {"_score": 0.88, "_source": {"parent_id": "parent1", "chunk_index": 1}}, + {"_score": 0.85, "_source": {"parent_id": "parent1", "chunk_index": 2}}, + {"_score": 0.8, "_source": {"parent_id": "parent2", "chunk_index": 0}}, + {"_score": 0.78, "_source": {"parent_id": "parent2", "chunk_index": 1}}, + {"_score": 0.7, "_source": {"parent_id": "parent3", "chunk_index": 0}}, + {"_score": 0.68, "_source": {"parent_id": "parent3", "chunk_index": 1}}, + {"_score": 0.6, "_source": {"parent_id": "parent4", "chunk_index": 0}} + ] + } + } + """; + + mockRestClientResponse(esResponse); + + DTOs.VectorSearchResponse results = vectorService.search("test query", Map.of(), 2, 100, 0.0); + + assertEquals(5, results.hits.size(), "Should return all chunks from first 2 parents (3+2=5)"); + long distinctParents = results.hits.stream().map(r -> r.get("parent_id")).distinct().count(); + assertEquals(2, distinctParents, "Should have chunks from exactly 2 distinct parents"); + } + + @Test + void testZeroThresholdReturnsAllResults() throws Exception { + String esResponse = + """ + { + "hits": { + "total": {"value": 3}, + "hits": [ + {"_score": 0.9, "_source": {"parent_id": "p1", "chunk_index": 0}}, + {"_score": 0.5, "_source": {"parent_id": "p2", "chunk_index": 0}}, + {"_score": 0.1, "_source": {"parent_id": "p3", "chunk_index": 0}} + ] + } + } + """; + + mockRestClientResponse(esResponse); + + DTOs.VectorSearchResponse results = vectorService.search("test query", Map.of(), 10, 100, 0.0); + + assertEquals(3, results.hits.size(), "With threshold 0.0, should return all 3 results"); + } + + @Test + void testHighThresholdFiltersAllResults() throws Exception { + String esResponse = + """ + { + "hits": { + "total": {"value": 3}, + "hits": [ + {"_score": 0.5, "_source": {"parent_id": "p1", "chunk_index": 0}}, + {"_score": 0.3, "_source": {"parent_id": "p2", "chunk_index": 0}}, + {"_score": 0.1, "_source": {"parent_id": "p3", "chunk_index": 0}} + ] + } + } + """; + + mockRestClientResponse(esResponse); + + DTOs.VectorSearchResponse results = vectorService.search("test query", Map.of(), 10, 100, 0.9); + + assertEquals(0, results.hits.size(), "With threshold 0.9, all results should be filtered out"); + } + + @Test + void testChunksWithoutParentIdAreSkipped() throws Exception { + String esResponse = + """ + { + "hits": { + "total": {"value": 3}, + "hits": [ + {"_score": 0.9, "_source": {"parent_id": "p1", "chunk_index": 0}}, + {"_score": 0.8, "_source": {"chunk_index": 0, "text": "orphan chunk"}}, + {"_score": 0.7, "_source": {"parent_id": "p2", "chunk_index": 0}} + ] + } + } + """; + + mockRestClientResponse(esResponse); + + DTOs.VectorSearchResponse results = vectorService.search("test query", Map.of(), 10, 100, 0.0); + + assertEquals(2, results.hits.size(), "Chunks without parent_id should be skipped"); + } + + @Test + void testRequestedSizeLimitsDistinctParents() throws Exception { + String esResponse = + """ + { + "hits": { + "total": {"value": 10}, + "hits": [ + {"_score": 0.9, "_source": {"parent_id": "p1", "chunk_index": 0}}, + {"_score": 0.8, "_source": {"parent_id": "p2", "chunk_index": 0}}, + {"_score": 0.7, "_source": {"parent_id": "p3", "chunk_index": 0}}, + {"_score": 0.6, "_source": {"parent_id": "p4", "chunk_index": 0}}, + {"_score": 0.5, "_source": {"parent_id": "p5", "chunk_index": 0}}, + {"_score": 0.4, "_source": {"parent_id": "p6", "chunk_index": 0}}, + {"_score": 0.3, "_source": {"parent_id": "p7", "chunk_index": 0}}, + {"_score": 0.2, "_source": {"parent_id": "p8", "chunk_index": 0}}, + {"_score": 0.15, "_source": {"parent_id": "p9", "chunk_index": 0}}, + {"_score": 0.1, "_source": {"parent_id": "p10", "chunk_index": 0}} + ] + } + } + """; + + mockRestClientResponse(esResponse); + + DTOs.VectorSearchResponse results = vectorService.search("test query", Map.of(), 3, 100, 0.0); + + assertEquals(3, results.hits.size(), "Should limit to 3 distinct parents"); + long distinctParents = results.hits.stream().map(r -> r.get("parent_id")).distinct().count(); + assertEquals(3, distinctParents, "Should have exactly 3 distinct parents"); + } + + @Test + void testEmptyHitsResponseReturnsEmptyList() throws Exception { + String esResponse = + """ + { + "hits": { + "total": {"value": 0}, + "hits": [] + } + } + """; + + mockRestClientResponse(esResponse); + + DTOs.VectorSearchResponse results = vectorService.search("test query", Map.of(), 10, 100, 0.0); + + assertNotNull(results); + assertTrue(results.hits.isEmpty(), "Empty hits should return empty list"); + } + + @Test + void testGetExistingFingerprintReturnsNullWhenNotFound() throws Exception { + String esResponse = """ + {"hits":{"total":{"value":0},"hits":[]}} + """; + + mockRestClientResponse(esResponse); + + String fingerprint = vectorService.getExistingFingerprint("vector_search_index", "unknown-id"); + + assertTrue(fingerprint == null, "Should return null when no fingerprint found"); + } + + @Test + void testGetExistingFingerprintReturnsValueWhenFound() throws Exception { + String esResponse = + """ + { + "hits": { + "total": {"value": 1}, + "hits": [ + {"_source": {"fingerprint": "abc123"}} + ] + } + } + """; + + mockRestClientResponse(esResponse); + + String fingerprint = + vectorService.getExistingFingerprint("vector_search_index", "some-entity-id"); + + assertEquals("abc123", fingerprint); + } + + @Test + void testGetExistingFingerprintsBatchReturnsEmptyForNullInput() { + Map result = vectorService.getExistingFingerprintsBatch("index", null); + assertTrue(result.isEmpty()); + } + + @Test + void testGetExistingFingerprintsBatchReturnsEmptyForEmptyInput() { + Map result = + vectorService.getExistingFingerprintsBatch("index", java.util.List.of()); + assertTrue(result.isEmpty()); + } + + private void mockRestClientResponse(String responseJson) throws Exception { + Response mockResponse = mock(Response.class); + HttpEntity mockEntity = mock(HttpEntity.class); + + when(mockRestClient.performRequest(any(Request.class))).thenReturn(mockResponse); + when(mockResponse.getEntity()).thenReturn(mockEntity); + when(mockEntity.getContent()) + .thenReturn(new ByteArrayInputStream(responseJson.getBytes(StandardCharsets.UTF_8))); + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/search/vector/VectorSearchQueryBuilderTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/search/vector/VectorSearchQueryBuilderTest.java index b28b75caa5a9..f8524b0820e2 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/search/vector/VectorSearchQueryBuilderTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/search/vector/VectorSearchQueryBuilderTest.java @@ -614,9 +614,187 @@ void testIgnoresOnlyUnrecognizedFilterKeys() throws Exception { JsonNode root = MAPPER.readTree(query); JsonNode mustFilters = root.get("query").get("knn").get("embedding").get("filter").get("bool").get("must"); - + // Should have only 1 filter: deleted=false assertEquals(1, mustFilters.size()); assertEquals(false, mustFilters.get(0).get("term").get("deleted").asBoolean()); } + + // ------------------------------------------------------------------------- + // buildNativeESQuery tests (Elasticsearch 8.x/9.x top-level knn format) + // ------------------------------------------------------------------------- + + @Test + void testNativeESQueryTopLevelKnnStructure() throws Exception { + float[] vector = {0.1f, 0.2f, 0.3f}; + int size = 10; + int k = 100; + + String query = VectorSearchQueryBuilder.buildNativeESQuery(vector, size, k, Map.of()); + + JsonNode root = MAPPER.readTree(query); + assertEquals(size, root.get("size").asInt()); + + // Must have top-level "knn", NOT "query" + assertTrue(root.has("knn"), "ES native query must have top-level 'knn'"); + assertTrue(!root.has("query"), "ES native query must not have 'query' key"); + + JsonNode knn = root.get("knn"); + assertEquals("embedding", knn.get("field").asText()); + assertEquals(k, knn.get("k").asInt()); + assertNotNull(knn.get("query_vector")); + assertTrue(knn.get("query_vector").isArray()); + assertEquals(3, knn.get("query_vector").size()); + } + + @Test + void testNativeESQueryNumCandidates() throws Exception { + float[] vector = {0.1f}; + + // k < 100 → num_candidates should be 100 + String query1 = VectorSearchQueryBuilder.buildNativeESQuery(vector, 10, 50, Map.of()); + JsonNode root1 = MAPPER.readTree(query1); + assertEquals(100, root1.get("knn").get("num_candidates").asInt()); + + // k > 100 → num_candidates should equal k + String query2 = VectorSearchQueryBuilder.buildNativeESQuery(vector, 10, 200, Map.of()); + JsonNode root2 = MAPPER.readTree(query2); + assertEquals(200, root2.get("knn").get("num_candidates").asInt()); + } + + @Test + void testNativeESQueryAlwaysHasDeletedFilter() throws Exception { + float[] vector = {0.1f, 0.2f}; + + String query = VectorSearchQueryBuilder.buildNativeESQuery(vector, 10, 100, Map.of()); + + JsonNode root = MAPPER.readTree(query); + JsonNode mustFilters = root.get("knn").get("filter").get("bool").get("must"); + + assertNotNull(mustFilters); + assertTrue(mustFilters.isArray()); + assertTrue(mustFilters.size() >= 1); + assertEquals(false, mustFilters.get(0).get("term").get("deleted").asBoolean()); + } + + @Test + void testNativeESQueryWithEntityTypeFilter() throws Exception { + float[] vector = {0.5f}; + Map> filters = Map.of("entityType", List.of("table", "dashboard")); + + String query = VectorSearchQueryBuilder.buildNativeESQuery(vector, 5, 50, filters); + + JsonNode root = MAPPER.readTree(query); + JsonNode mustFilters = root.get("knn").get("filter").get("bool").get("must"); + + assertEquals(2, mustFilters.size()); + JsonNode entityTypeFilter = mustFilters.get(1); + assertTrue(entityTypeFilter.has("terms")); + JsonNode entityTypes = entityTypeFilter.get("terms").get("entityType"); + assertEquals(2, entityTypes.size()); + assertEquals("table", entityTypes.get(0).asText()); + assertEquals("dashboard", entityTypes.get(1).asText()); + } + + @Test + void testNativeESQueryWithOwnersFilter() throws Exception { + float[] vector = {0.1f}; + Map> filters = Map.of("owners", List.of("user1", "team2")); + + String query = VectorSearchQueryBuilder.buildNativeESQuery(vector, 10, 100, filters); + + JsonNode root = MAPPER.readTree(query); + JsonNode mustFilters = root.get("knn").get("filter").get("bool").get("must"); + + assertEquals(2, mustFilters.size()); + JsonNode ownersFilter = mustFilters.get(1); + assertTrue(ownersFilter.has("bool")); + JsonNode shouldClauses = ownersFilter.get("bool").get("should"); + assertNotNull(shouldClauses); + assertEquals(2, shouldClauses.size()); + + String ownersJson = shouldClauses.toString(); + assertTrue(ownersJson.contains("user1")); + assertTrue(ownersJson.contains("team2")); + } + + @Test + void testNativeESQueryWithTagsFilter() throws Exception { + float[] vector = {0.1f, 0.2f}; + Map> filters = Map.of("tags", List.of("PII.Sensitive")); + + String query = VectorSearchQueryBuilder.buildNativeESQuery(vector, 10, 100, filters); + + JsonNode root = MAPPER.readTree(query); + JsonNode mustFilters = root.get("knn").get("filter").get("bool").get("must"); + + assertEquals(2, mustFilters.size()); + JsonNode tagsFilter = mustFilters.get(1); + assertTrue(tagsFilter.has("nested")); + assertEquals("tags", tagsFilter.get("nested").get("path").asText()); + } + + @Test + void testNativeESQueryWithMultipleFilters() throws Exception { + float[] vector = {0.1f, 0.2f}; + Map> filters = + Map.of( + "entityType", List.of("table"), + "tier", List.of("Tier.Tier1"), + "serviceType", List.of("BigQuery")); + + String query = VectorSearchQueryBuilder.buildNativeESQuery(vector, 10, 100, filters); + + JsonNode root = MAPPER.readTree(query); + JsonNode mustFilters = root.get("knn").get("filter").get("bool").get("must"); + + assertEquals(4, mustFilters.size(), "deleted=false + 3 user filters"); + String filtersJson = mustFilters.toString(); + assertTrue(filtersJson.contains("entityType")); + assertTrue(filtersJson.contains("tier")); + assertTrue(filtersJson.contains("serviceType")); + } + + @Test + void testNativeESQuerySourceExcludesEmbedding() throws Exception { + float[] vector = {0.1f}; + + String query = VectorSearchQueryBuilder.buildNativeESQuery(vector, 10, 100, Map.of()); + + JsonNode root = MAPPER.readTree(query); + JsonNode excludes = root.get("_source").get("excludes"); + assertNotNull(excludes); + assertTrue(excludes.isArray()); + assertEquals("embedding", excludes.get(0).asText()); + } + + @Test + void testNativeESQueryAndOpenSearchQueryProduceSameFilters() throws Exception { + float[] vector = {0.1f, 0.2f}; + Map> filters = + Map.of( + "entityType", List.of("table"), + "owners", List.of("alice"), + "tier", List.of("Tier.Gold")); + + String osQuery = VectorSearchQueryBuilder.build(vector, 10, 100, filters); + String esQuery = VectorSearchQueryBuilder.buildNativeESQuery(vector, 10, 100, filters); + + JsonNode osFilters = + MAPPER + .readTree(osQuery) + .get("query") + .get("knn") + .get("embedding") + .get("filter") + .get("bool") + .get("must"); + JsonNode esFilters = MAPPER.readTree(esQuery).get("knn").get("filter").get("bool").get("must"); + + assertEquals( + osFilters.size(), + esFilters.size(), + "Both queries should produce the same number of filter clauses"); + assertEquals(osFilters.toString(), esFilters.toString(), "Filter clauses should be identical"); + } } diff --git a/openmetadata-spec/src/main/resources/elasticsearch/en/vector_search_index_es_native.json b/openmetadata-spec/src/main/resources/elasticsearch/en/vector_search_index_es_native.json new file mode 100644 index 000000000000..1de030834943 --- /dev/null +++ b/openmetadata-spec/src/main/resources/elasticsearch/en/vector_search_index_es_native.json @@ -0,0 +1,293 @@ +{ + "settings": { + "analysis": { + "normalizer": { + "lowercase_normalizer": { + "type": "custom", + "filter": [ + "lowercase" + ] + } + }, + "filter": { + "om_stemmer": { + "type": "stemmer", + "name": "english" + }, + "word_delimiter_filter": { + "type": "word_delimiter", + "preserve_original": "true" + } + }, + "analyzer": { + "om_analyzer": { + "tokenizer": "standard", + "filter": [ + "lowercase", + "word_delimiter_filter", + "om_stemmer" + ] + } + } + } + }, + "mappings": { + "properties": { + "embedding": { + "type": "dense_vector", + "dims": 512, + "index": true, + "similarity": "cosine" + }, + "text_to_embed": { + "type": "text" + }, + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" + } + } + }, + "fullyQualifiedName": { + "type": "keyword" + }, + "entityType": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "normalizer": "lowercase_normalizer", + "ignore_above": 256 + } + } + }, + "serviceType": { + "type": "keyword", + "normalizer": "lowercase_normalizer" + }, + "parent_id": { + "type": "keyword" + }, + "chunk_index": { + "type": "integer" + }, + "chunk_count": { + "type": "integer" + }, + "tags": { + "type": "nested", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "tier": { + "type": "object", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "certification": { + "type": "object", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "domains": { + "type": "object", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "displayName": { + "type": "text" + } + } + }, + "owners": { + "type": "nested", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "type": { + "type": "keyword" + }, + "displayName": { + "type": "text" + } + } + }, + "customProperties": { + "type": "object" + }, + "sourceId": { + "type": "keyword" + }, + "deleted": { + "type": "boolean" + }, + "fingerprint": { + "type": "keyword" + }, + "upVotes": { + "type": "integer" + }, + "downVotes": { + "type": "integer" + }, + "totalVotes": { + "type": "integer" + }, + "followersCount": { + "type": "integer" + }, + "usageSummary": { + "type": "object", + "properties": { + "dailyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + } + } + }, + "weeklyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + }, + "percentileRank": { + "type": "double" + } + } + }, + "monthlyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + }, + "percentileRank": { + "type": "double" + } + } + } + } + }, + "synonyms": { + "type": "keyword" + }, + "relatedTerms": { + "type": "nested", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "type": { + "type": "keyword" + }, + "displayName": { + "type": "text" + }, + "fullyQualifiedName": { + "type": "keyword" + } + } + }, + "metricExpression": { + "type": "object", + "properties": { + "language": { + "type": "keyword" + }, + "code": { + "type": "text", + "analyzer": "om_analyzer" + } + } + }, + "metricType": { + "type": "keyword" + }, + "unitOfMeasurement": { + "type": "keyword" + }, + "customUnitOfMeasurement": { + "type": "keyword" + }, + "granularity": { + "type": "keyword" + }, + "relatedMetrics": { + "type": "keyword" + } + } + } +} diff --git a/openmetadata-spec/src/main/resources/elasticsearch/jp/vector_search_index_es_native.json b/openmetadata-spec/src/main/resources/elasticsearch/jp/vector_search_index_es_native.json new file mode 100644 index 000000000000..606bdc0a916c --- /dev/null +++ b/openmetadata-spec/src/main/resources/elasticsearch/jp/vector_search_index_es_native.json @@ -0,0 +1,293 @@ +{ + "settings": { + "analysis": { + "normalizer": { + "lowercase_normalizer": { + "type": "custom", + "filter": [ + "lowercase" + ] + } + }, + "filter": { + "om_stemmer": { + "type": "stemmer", + "name": "english" + }, + "word_delimiter_filter": { + "type": "word_delimiter", + "preserve_original": "true" + } + }, + "analyzer": { + "om_analyzer": { + "tokenizer": "standard", + "filter": [ + "lowercase", + "word_delimiter_filter", + "om_stemmer" + ] + } + } + } + }, + "mappings": { + "properties": { + "embedding": { + "type": "dense_vector", + "dims": 512, + "index": true, + "similarity": "cosine" + }, + "text_to_embed": { + "type": "text" + }, + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" + } + } + }, + "fullyQualifiedName": { + "type": "keyword" + }, + "entityType": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "normalizer": "lowercase_normalizer", + "ignore_above": 256 + } + } + }, + "serviceType": { + "type": "keyword", + "normalizer": "lowercase_normalizer" + }, + "parent_id": { + "type": "keyword" + }, + "chunk_index": { + "type": "integer" + }, + "chunk_count": { + "type": "integer" + }, + "tags": { + "type": "nested", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "tier": { + "type": "object", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "certification": { + "type": "object", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "domains": { + "type": "object", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "displayName": { + "type": "text" + } + } + }, + "owners": { + "type": "nested", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "type": { + "type": "keyword" + }, + "displayName": { + "type": "text" + } + } + }, + "customProperties": { + "type": "object" + }, + "sourceId": { + "type": "keyword" + }, + "deleted": { + "type": "boolean" + }, + "fingerprint": { + "type": "keyword" + }, + "upVotes": { + "type": "integer" + }, + "downVotes": { + "type": "integer" + }, + "totalVotes": { + "type": "integer" + }, + "followersCount": { + "type": "integer" + }, + "synonyms": { + "type": "keyword" + }, + "relatedTerms": { + "type": "nested", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "type": { + "type": "keyword" + }, + "displayName": { + "type": "text" + }, + "fullyQualifiedName": { + "type": "keyword" + } + } + }, + "usageSummary": { + "type": "object", + "properties": { + "dailyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + } + } + }, + "weeklyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + }, + "percentileRank": { + "type": "double" + } + } + }, + "monthlyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + }, + "percentileRank": { + "type": "double" + } + } + } + } + }, + "metricExpression": { + "type": "object", + "properties": { + "language": { + "type": "keyword" + }, + "code": { + "type": "text", + "analyzer": "om_analyzer" + } + } + }, + "metricType": { + "type": "keyword" + }, + "unitOfMeasurement": { + "type": "keyword" + }, + "customUnitOfMeasurement": { + "type": "keyword" + }, + "granularity": { + "type": "keyword" + }, + "relatedMetrics": { + "type": "keyword" + } + } + } +} diff --git a/openmetadata-spec/src/main/resources/elasticsearch/ru/vector_search_index_es_native.json b/openmetadata-spec/src/main/resources/elasticsearch/ru/vector_search_index_es_native.json new file mode 100644 index 000000000000..6f621f1fdb80 --- /dev/null +++ b/openmetadata-spec/src/main/resources/elasticsearch/ru/vector_search_index_es_native.json @@ -0,0 +1,410 @@ +{ + "settings": { + "index": { + "max_ngram_diff": 17 + }, + "analysis": { + "tokenizer": { + "n_gram_tokenizer": { + "type": "ngram", + "min_gram": 3, + "max_gram": 20, + "token_chars": [ + "letter", + "digit" + ] + } + }, + "normalizer": { + "lowercase_normalizer": { + "type": "custom", + "filter": [ + "lowercase", + "asciifolding" + ] + } + }, + "filter": { + "word_delimiter_filter": { + "type": "word_delimiter", + "preserve_original": true + }, + "compound_word_delimiter_graph": { + "type": "word_delimiter_graph", + "generate_word_parts": true, + "generate_number_parts": true, + "split_on_case_change": true, + "split_on_numerics": true, + "catenate_words": false, + "catenate_numbers": false, + "catenate_all": false, + "preserve_original": true, + "stem_english_possessive": true + }, + "russian_stop": { + "type": "stop", + "stopwords": "_russian_" + }, + "english_stop": { + "type": "stop", + "stopwords": "_english_" + }, + "russian_snowball": { + "name": "russian", + "type": "stemmer" + }, + "om_kstem": { + "type": "kstem" + }, + "asciifolding": { + "type": "asciifolding" + } + }, + "analyzer": { + "om_analyzer": { + "tokenizer": "standard", + "filter": [ + "word_delimiter_filter", + "lowercase", + "asciifolding", + "russian_stop", + "russian_snowball", + "english_stop", + "om_kstem" + ] + }, + "om_ngram": { + "type": "custom", + "tokenizer": "n_gram_tokenizer", + "filter": [ + "lowercase" + ] + }, + "om_compound_analyzer": { + "tokenizer": "standard", + "filter": [ + "compound_word_delimiter_graph", + "lowercase", + "flatten_graph" + ] + } + } + } + }, + "mappings": { + "properties": { + "embedding": { + "type": "dense_vector", + "dims": 512, + "index": true, + "similarity": "cosine" + }, + "text_to_embed": { + "type": "text" + }, + "name": { + "type": "text", + "analyzer": "om_analyzer", + "fields": { + "keyword": { + "type": "keyword", + "normalizer": "lowercase_normalizer" + }, + "ngram": { + "type": "text", + "analyzer": "om_ngram" + }, + "compound": { + "type": "text", + "analyzer": "om_compound_analyzer" + } + } + }, + "fullyQualifiedName": { + "type": "keyword", + "normalizer": "lowercase_normalizer" + }, + "entityType": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "normalizer": "lowercase_normalizer", + "ignore_above": 256 + } + } + }, + "serviceType": { + "type": "keyword", + "normalizer": "lowercase_normalizer" + }, + "parent_id": { + "type": "keyword" + }, + "chunk_index": { + "type": "integer" + }, + "chunk_count": { + "type": "integer" + }, + "tags": { + "type": "nested", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "tier": { + "type": "object", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "certification": { + "type": "object", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "domains": { + "type": "object", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "displayName": { + "type": "text", + "analyzer": "om_analyzer", + "fields": { + "keyword": { + "type": "keyword", + "normalizer": "lowercase_normalizer" + }, + "ngram": { + "type": "text", + "analyzer": "om_ngram" + }, + "compound": { + "type": "text", + "analyzer": "om_compound_analyzer" + } + } + } + } + }, + "owners": { + "type": "nested", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "type": { + "type": "keyword" + }, + "displayName": { + "type": "text", + "analyzer": "om_analyzer", + "fields": { + "keyword": { + "type": "keyword", + "normalizer": "lowercase_normalizer" + }, + "ngram": { + "type": "text", + "analyzer": "om_ngram" + }, + "compound": { + "type": "text", + "analyzer": "om_compound_analyzer" + } + } + } + } + }, + "customProperties": { + "type": "object" + }, + "sourceId": { + "type": "keyword" + }, + "deleted": { + "type": "boolean" + }, + "fingerprint": { + "type": "keyword" + }, + "upVotes": { + "type": "integer" + }, + "downVotes": { + "type": "integer" + }, + "totalVotes": { + "type": "integer" + }, + "followersCount": { + "type": "integer" + }, + "synonyms": { + "type": "keyword" + }, + "relatedTerms": { + "type": "nested", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "type": { + "type": "keyword" + }, + "displayName": { + "type": "text", + "analyzer": "om_analyzer", + "fields": { + "keyword": { + "type": "keyword", + "normalizer": "lowercase_normalizer" + }, + "ngram": { + "type": "text", + "analyzer": "om_ngram" + }, + "compound": { + "type": "text", + "analyzer": "om_compound_analyzer" + } + } + }, + "fullyQualifiedName": { + "type": "keyword", + "normalizer": "lowercase_normalizer" + } + } + }, + "usageSummary": { + "type": "object", + "properties": { + "dailyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + } + } + }, + "weeklyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + }, + "percentileRank": { + "type": "double" + } + } + }, + "monthlyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + }, + "percentileRank": { + "type": "double" + } + } + } + } + }, + "metricExpression": { + "type": "object", + "properties": { + "language": { + "type": "keyword" + }, + "code": { + "type": "text", + "analyzer": "om_analyzer" + } + } + }, + "metricType": { + "type": "keyword" + }, + "unitOfMeasurement": { + "type": "keyword" + }, + "customUnitOfMeasurement": { + "type": "keyword" + }, + "granularity": { + "type": "keyword" + }, + "relatedMetrics": { + "type": "keyword" + } + } + } +} diff --git a/openmetadata-spec/src/main/resources/elasticsearch/zh/vector_search_index_es_native.json b/openmetadata-spec/src/main/resources/elasticsearch/zh/vector_search_index_es_native.json new file mode 100644 index 000000000000..606bdc0a916c --- /dev/null +++ b/openmetadata-spec/src/main/resources/elasticsearch/zh/vector_search_index_es_native.json @@ -0,0 +1,293 @@ +{ + "settings": { + "analysis": { + "normalizer": { + "lowercase_normalizer": { + "type": "custom", + "filter": [ + "lowercase" + ] + } + }, + "filter": { + "om_stemmer": { + "type": "stemmer", + "name": "english" + }, + "word_delimiter_filter": { + "type": "word_delimiter", + "preserve_original": "true" + } + }, + "analyzer": { + "om_analyzer": { + "tokenizer": "standard", + "filter": [ + "lowercase", + "word_delimiter_filter", + "om_stemmer" + ] + } + } + } + }, + "mappings": { + "properties": { + "embedding": { + "type": "dense_vector", + "dims": 512, + "index": true, + "similarity": "cosine" + }, + "text_to_embed": { + "type": "text" + }, + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" + } + } + }, + "fullyQualifiedName": { + "type": "keyword" + }, + "entityType": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "normalizer": "lowercase_normalizer", + "ignore_above": 256 + } + } + }, + "serviceType": { + "type": "keyword", + "normalizer": "lowercase_normalizer" + }, + "parent_id": { + "type": "keyword" + }, + "chunk_index": { + "type": "integer" + }, + "chunk_count": { + "type": "integer" + }, + "tags": { + "type": "nested", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "tier": { + "type": "object", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "certification": { + "type": "object", + "properties": { + "tagFQN": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + }, + "domains": { + "type": "object", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "displayName": { + "type": "text" + } + } + }, + "owners": { + "type": "nested", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "type": { + "type": "keyword" + }, + "displayName": { + "type": "text" + } + } + }, + "customProperties": { + "type": "object" + }, + "sourceId": { + "type": "keyword" + }, + "deleted": { + "type": "boolean" + }, + "fingerprint": { + "type": "keyword" + }, + "upVotes": { + "type": "integer" + }, + "downVotes": { + "type": "integer" + }, + "totalVotes": { + "type": "integer" + }, + "followersCount": { + "type": "integer" + }, + "synonyms": { + "type": "keyword" + }, + "relatedTerms": { + "type": "nested", + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "type": { + "type": "keyword" + }, + "displayName": { + "type": "text" + }, + "fullyQualifiedName": { + "type": "keyword" + } + } + }, + "usageSummary": { + "type": "object", + "properties": { + "dailyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + } + } + }, + "weeklyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + }, + "percentileRank": { + "type": "double" + } + } + }, + "monthlyStats": { + "type": "object", + "properties": { + "count": { + "type": "integer" + }, + "percentileRank": { + "type": "double" + } + } + } + } + }, + "metricExpression": { + "type": "object", + "properties": { + "language": { + "type": "keyword" + }, + "code": { + "type": "text", + "analyzer": "om_analyzer" + } + } + }, + "metricType": { + "type": "keyword" + }, + "unitOfMeasurement": { + "type": "keyword" + }, + "customUnitOfMeasurement": { + "type": "keyword" + }, + "granularity": { + "type": "keyword" + }, + "relatedMetrics": { + "type": "keyword" + } + } + } +}