Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
fd6ea9b
feat: add native Elasticsearch vector search support
joaopamaral Apr 6, 2026
e080173
fix: correct VectorSearchQueryBuilder.build() call in test to pass al…
joaopamaral Apr 8, 2026
a0cf7e5
fix: align ElasticSearchVectorService with VectorIndexService interfa…
joaopamaral Apr 10, 2026
f97048b
fix: replace brittle string-replace in loadIndexMapping with ObjectMa…
joaopamaral Apr 10, 2026
420e028
fix: fully initialize ElasticSearchVectorService before publishing in…
joaopamaral Apr 10, 2026
493e0cb
fix: mirror OpenSearch pagination logic in ElasticSearchVectorService…
joaopamaral Apr 10, 2026
4b6ef83
fix: use parentId/chunkIndex (camelCase) in bulkIndex to match Vector…
joaopamaral Apr 10, 2026
0d78e3b
fix: move copyExistingVectorDocuments to VectorIndexService interface
joaopamaral Apr 10, 2026
b557756
fix: use instanceof pattern match instead of interface method for cop…
joaopamaral Apr 10, 2026
f81a21e
refactor: make Elasticsearch store embeddings inline like OpenSearch
joaopamaral Apr 25, 2026
7d82e95
fix: make kNN num_candidates configurable and improve default recall
joaopamaral Apr 25, 2026
5a9a0a5
Update generated TypeScript types
joaopamaral Apr 25, 2026
41f28f5
fix: remove resolveNumCandidatesMultiplier() that required uninstalle…
joaopamaral Apr 25, 2026
4ee2a93
fix: wire knnNumCandidatesMultiplier from config into ElasticSearchVe…
joaopamaral Apr 25, 2026
0bf1a6e
fix: gate getFingerprint endpoint to admin users only
joaopamaral Apr 25, 2026
f94c790
fix: check HTTP status code in executeGenericRequest
joaopamaral Apr 25, 2026
96eaf36
Update openmetadata-service/src/test/java/org/openmetadata/service/se…
joaopamaral Apr 25, 2026
a593b1f
fix: replace stale reformatVectorIndexWithDimension test with current…
joaopamaral Apr 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.limits.Limits;
import org.openmetadata.service.search.vector.OpenSearchVectorService;
import org.openmetadata.service.search.vector.VectorIndexService;
import org.openmetadata.service.search.vector.utils.DTOs.VectorSearchResponse;
import org.openmetadata.service.security.Authorizer;
import org.openmetadata.service.security.auth.CatalogSecurityContext;
Expand Down Expand Up @@ -42,7 +42,7 @@ public Map<String, Object> execute(
"Semantic search is not enabled. Configure vector embeddings in the OpenMetadata server settings.");
}

OpenSearchVectorService vectorService = OpenSearchVectorService.getInstance();
VectorIndexService vectorService = Entity.getSearchRepository().getVectorIndexService();
if (vectorService == null) {
return errorResponse("Vector search service is not initialized");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -51,6 +52,9 @@
import org.openmetadata.service.search.elasticsearch.ElasticSearchClient;
import org.openmetadata.service.search.elasticsearch.EsUtils;
import org.openmetadata.service.search.indexes.ColumnSearchIndex;
import org.openmetadata.service.search.vector.ElasticSearchVectorService;
import org.openmetadata.service.search.vector.VectorDocBuilder;
import org.openmetadata.service.search.vector.utils.AvailableEntityTypes;

/**
* Elasticsearch implementation using new Java API client with custom bulk handler
Expand Down Expand Up @@ -125,6 +129,10 @@ public static synchronized void resetDocBuildPoolSize() {
private final ConcurrentLinkedDeque<CompletableFuture<Void>> pendingColumnFutures =
new ConcurrentLinkedDeque<>();

// Vector embedding stats (incremented inline during addEntity)
private final AtomicLong vectorSuccess = new AtomicLong(0);
private final AtomicLong vectorFailed = new AtomicLong(0);

public ElasticSearchBulkSink(
SearchRepository searchRepository,
int batchSize,
Expand Down Expand Up @@ -243,13 +251,28 @@ public void write(List<?> entities, Map<String, Object> contextData) throws Exce
} else {
List<EntityInterface> entityInterfaces = (List<EntityInterface>) entities;

// Add entities to search index in parallel
boolean embeddingsEnabled = isVectorEmbeddingEnabledForEntity(entityType);

Map<String, String> existingFingerprints = Collections.emptyMap();
if (embeddingsEnabled && !recreateIndex) {
existingFingerprints = fetchExistingFingerprints(entityInterfaces, indexName);
}

Map<String, String> finalFingerprints = existingFingerprints;
List<CompletableFuture<Void>> futures =
entityInterfaces.stream()
.map(
entity ->
CompletableFuture.runAsync(
() -> addEntity(entity, indexName, recreateIndex, tracker),
() ->
addEntity(
entity,
indexName,
recreateIndex,
reindexContext,
tracker,
embeddingsEnabled,
finalFingerprints),
DOC_BUILD_EXECUTOR))
.toList();
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
Expand Down Expand Up @@ -300,11 +323,22 @@ protected StageStatsTracker extractTracker(Map<String, Object> contextData) {
private static final int BULK_OPERATION_METADATA_OVERHEAD = 150;

private void addEntity(
EntityInterface entity, String indexName, boolean recreateIndex, StageStatsTracker tracker) {
EntityInterface entity,
String indexName,
boolean recreateIndex,
ReindexContext reindexContext,
StageStatsTracker tracker,
boolean embeddingsEnabled,
Map<String, String> existingFingerprints) {
try {
String entityType = Entity.getEntityTypeFromObject(entity);
Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc();
String json = JsonUtils.pojoToJson(searchIndexDoc);

if (embeddingsEnabled) {
json = enrichWithEmbedding(entity, json, recreateIndex, existingFingerprints, tracker);
}

String docId = entity.getId().toString();
long rawDocSize = (long) json.getBytes(StandardCharsets.UTF_8).length;
long estimatedSize = rawDocSize + BULK_OPERATION_METADATA_OVERHEAD;
Expand Down Expand Up @@ -749,6 +783,83 @@ public void updateConcurrentRequests(int concurrentRequests) {
LOG.info("Concurrent requests updated to: {}", concurrentRequests);
}

boolean isVectorEmbeddingEnabledForEntity(String entityType) {
return searchRepository.isVectorEmbeddingEnabled()
&& ElasticSearchVectorService.getInstance() != null
&& AvailableEntityTypes.isVectorIndexable(entityType);
}

@SuppressWarnings("unchecked")
private String enrichWithEmbedding(
EntityInterface entity,
String json,
boolean recreateIndex,
Map<String, String> existingFingerprints,
StageStatsTracker tracker) {
try {
ElasticSearchVectorService vectorService = ElasticSearchVectorService.getInstance();
if (vectorService == null) {
return json;
}

if (!recreateIndex) {
String currentFp = VectorDocBuilder.computeFingerprintForEntity(entity);
String existingFp = existingFingerprints.get(entity.getId().toString());
if (existingFp != null && existingFp.equals(currentFp)) {
vectorSuccess.incrementAndGet();
if (tracker != null) {
tracker.recordVector(StatsResult.SUCCESS);
}
return json;
}
}

Map<String, Object> embeddingFields = vectorService.generateEmbeddingFields(entity);
Map<String, Object> docMap = OBJECT_MAPPER.readValue(json, Map.class);
docMap.putAll(embeddingFields);

vectorSuccess.incrementAndGet();
if (tracker != null) {
tracker.recordVector(StatsResult.SUCCESS);
}
return OBJECT_MAPPER.writeValueAsString(docMap);
} catch (Exception e) {
LOG.warn(
"Failed to generate embeddings for entity {}: {}", entity.getId(), e.getMessage(), e);
vectorFailed.incrementAndGet();
if (tracker != null) {
tracker.recordVector(StatsResult.FAILED);
}
return json;
}
}

private Map<String, String> fetchExistingFingerprints(
List<EntityInterface> entities, String indexName) {
try {
ElasticSearchVectorService vectorService = ElasticSearchVectorService.getInstance();
if (vectorService == null) {
return Collections.emptyMap();
}
List<String> entityIds = new ArrayList<>(entities.size());
for (EntityInterface entity : entities) {
entityIds.add(entity.getId().toString());
}
return vectorService.getExistingFingerprintsBatch(indexName, entityIds);
} catch (Exception e) {
LOG.warn("Failed to fetch existing fingerprints: {}", e.getMessage());
return Collections.emptyMap();
}
}

@Override
public StepStats getVectorStats() {
return new StepStats()
.withTotalRecords((int) (vectorSuccess.get() + vectorFailed.get()))
.withSuccessRecords((int) vectorSuccess.get())
.withFailedRecords((int) vectorFailed.get());
}

public static class CustomBulkProcessor {
private final ElasticsearchAsyncClient asyncClient;
private final List<BulkOperation> buffer = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
package org.openmetadata.service.resources.search;

import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.SecurityContext;
import java.util.Collections;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.service.Entity;
import org.openmetadata.service.resources.Collection;
import org.openmetadata.service.search.vector.OpenSearchVectorService;
import org.openmetadata.service.search.vector.VectorIndexService;
import org.openmetadata.service.search.vector.utils.DTOs.FingerprintResponse;
import org.openmetadata.service.search.vector.utils.DTOs.VectorSearchRequest;
import org.openmetadata.service.search.vector.utils.DTOs.VectorSearchResponse;
import org.openmetadata.service.security.Authorizer;
Expand Down Expand Up @@ -70,7 +75,7 @@ public Response vectorSearchPost(
.build();
}

OpenSearchVectorService vectorService = OpenSearchVectorService.getInstance();
VectorIndexService vectorService = Entity.getSearchRepository().getVectorIndexService();
if (vectorService == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE)
.entity("{\"error\":\"Vector search service is not initialized\"}")
Expand All @@ -97,4 +102,57 @@ public Response vectorSearchPost(
.build();
}
}

@GET
@Path("/fingerprint")
@Operation(
operationId = "getFingerprint",
summary = "Get vector fingerprint",
description = "Returns the existing fingerprint for a given entity.")
public Response getFingerprint(
@Context SecurityContext securityContext,
@Parameter(description = "Parent entity ID", required = true) @QueryParam("parentId")
String parentId) {
authorizer.authorizeAdmin(securityContext);

if (!Entity.getSearchRepository().isVectorEmbeddingEnabled()) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE)
.entity("{\"error\":\"Vector search is not enabled\"}")
.build();
}

VectorIndexService vectorService = Entity.getSearchRepository().getVectorIndexService();
if (vectorService == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE)
.entity("{\"error\":\"Vector search service is not initialized\"}")
.build();
}
Comment thread
joaopamaral marked this conversation as resolved.

if (parentId == null || parentId.isBlank()) {
return Response.status(Response.Status.BAD_REQUEST)
.entity("{\"error\":\"parentId is required\"}")
.build();
}
try {
UUID.fromString(parentId);
} catch (IllegalArgumentException e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity("{\"error\":\"Invalid parentId format\"}")
.build();
}

try {
String indexName = vectorService.getIndexAlias();
String fingerprint = vectorService.getExistingFingerprint(indexName, parentId);
FingerprintResponse response =
new FingerprintResponse(
parentId, indexName, fingerprint, fingerprint != null ? "Found" : "Not found");
return Response.ok(response).build();
} catch (Exception e) {
LOG.error("Failed to get fingerprint for {}: {}", parentId, e.getMessage(), e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity("{\"error\":\"An internal error occurred\"}")
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,18 @@ public class RecreateWithEmbeddings extends DefaultRecreateHandler {

@Override
public ReindexContext reCreateIndexes(Set<String> entities) {
SearchRepository searchRepository = Entity.getSearchRepository();
searchRepository.initializeVectorSearchService();
Entity.getSearchRepository().initializeVectorSearchService();
return super.reCreateIndexes(entities);
}

@Override
public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess) {
super.finalizeReindex(context, reindexSuccess);

if (reindexSuccess) {
SearchRepository searchRepository = Entity.getSearchRepository();
if (searchRepository.isVectorEmbeddingEnabled()) {
LOG.info(
"Reindex finalized for entity type '{}' with vector embeddings enabled",
context.getEntityType());
}
if (reindexSuccess && Entity.getSearchRepository().isVectorEmbeddingEnabled()) {
LOG.info(
"Reindex finalized for entity type '{}' with vector embeddings enabled",
context.getEntityType());
}
}
}
Loading
Loading