From 9f3b66472a12519b1e8335ec96794563f12e3908 Mon Sep 17 00:00:00 2001 From: sonika-shah <58761340+sonika-shah@users.noreply.github.com> Date: Fri, 10 Apr 2026 01:16:06 +0530 Subject: [PATCH 1/8] fix(search): column bulk operations search not returning results at scale MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When searching by column name pattern (e.g., "MAT") in column bulk operations, the composite aggregation returned ALL column names from matching documents, then post-filtered in Java. With 20000+ columns, the first composite page of 25 names rarely contained matches, so users saw 0 results. Switch to terms aggregation with `include` regex when a search pattern is set. This filters at the ES/OS aggregation level — only matching column names produce buckets. Two-phase approach: (1) lightweight names query to get all matching names + accurate total, (2) targeted data query with top_hits for the current page only. --- .../service/search/ColumnAggregator.java | 22 + .../ElasticSearchColumnAggregator.java | 392 +++++++++++++----- .../OpenSearchColumnAggregator.java | 357 +++++++++++----- .../service/search/ColumnAggregatorTest.java | 112 +++++ 4 files changed, 686 insertions(+), 197 deletions(-) create mode 100644 openmetadata-service/src/test/java/org/openmetadata/service/search/ColumnAggregatorTest.java diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/ColumnAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/ColumnAggregator.java index 4788046e3169..546298503ac1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/ColumnAggregator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/ColumnAggregator.java @@ -21,6 +21,28 @@ public interface ColumnAggregator { ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) throws IOException; + /** + * Convert a plain text pattern to a case-insensitive regex for ES/OS terms include. Lucene regex + * does not support (?i), so each letter is expanded to a character class: "MAT" → [mM][aA][tT]. + */ + static String toCaseInsensitiveRegex(String pattern) { + StringBuilder sb = new StringBuilder(".*"); + for (char c : pattern.toCharArray()) { + if (Character.isLetter(c)) { + sb.append('[') + .append(Character.toLowerCase(c)) + .append(Character.toUpperCase(c)) + .append(']'); + } else if (".+*?|[](){}^$\\~@&#<>\"".indexOf(c) >= 0) { + sb.append('\\').append(c); + } else { + sb.append(c); + } + } + sb.append(".*"); + return sb.toString(); + } + class ColumnAggregationRequest { private int size = 1000; private String cursor; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java index 187235b11ea1..2163b004b339 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java @@ -24,6 +24,8 @@ import es.co.elastic.clients.elasticsearch._types.aggregations.CompositeAggregate; import es.co.elastic.clients.elasticsearch._types.aggregations.CompositeAggregationSource; import es.co.elastic.clients.elasticsearch._types.aggregations.CompositeBucket; +import es.co.elastic.clients.elasticsearch._types.aggregations.StringTermsAggregate; +import es.co.elastic.clients.elasticsearch._types.aggregations.StringTermsBucket; import es.co.elastic.clients.elasticsearch._types.aggregations.TopHitsAggregate; import es.co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery; import es.co.elastic.clients.elasticsearch._types.query_dsl.Query; @@ -40,6 +42,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.api.data.ColumnGridItem; import org.openmetadata.schema.api.data.ColumnGridResponse; @@ -56,6 +59,9 @@ public class ElasticSearchColumnAggregator implements ColumnAggregator { private final ElasticsearchClient client; + /** Max column names to retrieve in the names-only query during pattern search. */ + private static final int MAX_PATTERN_SEARCH_NAMES = 10000; + /** Index configuration with field mappings for each entity type. Uses aliases defined in indexMapping.json */ private static final Map INDEX_CONFIGS = Map.of( @@ -107,6 +113,13 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr .collect(java.util.stream.Collectors.toSet()); } + // When a column name pattern is set, use terms aggregation with include regex + // for efficient server-side filtering instead of composite agg + post-filter + if (!nullOrEmpty(request.getColumnNamePattern())) { + return aggregateColumnsWithPattern( + request, entityTypes, entityColumnPairsWithTags, columnNamesWithTags); + } + Map> allColumnsByName = new HashMap<>(); long totalUniqueColumns = 0; long totalOccurrences = 0; @@ -124,7 +137,6 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr String columnFieldPath = INDEX_CONFIGS.get(groupEntityTypes.getFirst()).columnFieldPath(); - // Phase 2: Build query WITHOUT tag filter but WITH column names filter List columnNamesList = columnNamesWithTags != null ? new ArrayList<>(columnNamesWithTags) : null; Query query = buildFilters(request, columnNameKeyword, columnNamesList); @@ -134,33 +146,10 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr executeSearch(request, query, indexes, columnNameKeyword); Map> columnsByName = - parseAggregationResults(response, columnFieldPath); - - // Post-filter columns by name pattern since ES aggregation returns all columns from matched - // documents - String columnNamePattern = request.getColumnNamePattern(); - if (!nullOrEmpty(columnNamePattern)) { - columnsByName - .entrySet() - .removeIf(e -> !matchesColumnNamePattern(e.getKey(), columnNamePattern)); - } + parseCompositeAggResults(response, columnFieldPath); - // Post-filter for tag/glossary terms filtering: Only keep occurrences that were - // identified in Phase 1 as having the tag (not just same column name) - if (entityColumnPairsWithTags != null && !entityColumnPairsWithTags.isEmpty()) { - final Set allowedPairs = entityColumnPairsWithTags; - for (List occurrences : columnsByName.values()) { - occurrences.removeIf( - ctx -> { - String key = ctx.entityFQN + "#" + ctx.column.getName(); - return !allowedPairs.contains(key); - }); - } - // Remove column entries that have no occurrences left - columnsByName.entrySet().removeIf(e -> e.getValue().isEmpty()); - } + applyTagPostFilter(columnsByName, entityColumnPairsWithTags); - // Merge results for (Map.Entry> colEntry : columnsByName.entrySet()) { allColumnsByName .computeIfAbsent(colEntry.getKey(), k -> new ArrayList<>()) @@ -173,9 +162,7 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr hasMore = true; } - // Get totals only on first page and only when no column name pattern - // (ES aggregation counts all columns from matched docs, not just filtered ones) - if (request.getCursor() == null && nullOrEmpty(request.getColumnNamePattern())) { + if (request.getCursor() == null) { Map totals = getTotalCounts(query, indexes, columnNameKeyword); totalUniqueColumns += totals.get("uniqueColumns"); totalOccurrences += totals.get("totalOccurrences"); @@ -191,10 +178,7 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr List gridItems = ColumnMetadataGrouper.groupColumns(allColumnsByName); - // Calculate totals from actual filtered data when: - // - On subsequent pages (cursor is set) - // - When column name pattern is specified (ES aggregation includes non-matching columns) - if (request.getCursor() != null || !nullOrEmpty(request.getColumnNamePattern())) { + if (request.getCursor() != null) { totalUniqueColumns = allColumnsByName.size(); totalOccurrences = gridItems.stream().mapToInt(ColumnGridItem::getTotalOccurrences).sum(); } @@ -203,6 +187,107 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr gridItems, lastCursor, hasMore, (int) totalUniqueColumns, (int) totalOccurrences); } + /** + * Search path: uses terms aggregation with include regex to filter column names at the + * aggregation level. Two queries per entity-type group: (1) lightweight names query to get all + * matching names and total count, (2) targeted data query with top_hits for the current page. + */ + private ColumnGridResponse aggregateColumnsWithPattern( + ColumnAggregationRequest request, + List entityTypes, + Set entityColumnPairsWithTags, + Set columnNamesWithTags) + throws IOException { + + Map> fieldPathToEntityTypes = groupByFieldPath(entityTypes); + String regex = ColumnAggregator.toCaseInsensitiveRegex(request.getColumnNamePattern()); + + // Phase 1: Collect all matching column names across entity type groups + Set allMatchingNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + + for (Map.Entry> entry : fieldPathToEntityTypes.entrySet()) { + String columnNameKeyword = entry.getKey(); + List indexes = resolveIndexNames(entry.getValue()); + List columnNamesList = + columnNamesWithTags != null ? new ArrayList<>(columnNamesWithTags) : null; + Query query = buildFilters(request, columnNameKeyword, columnNamesList); + + try { + List names = executeNamesQuery(query, indexes, columnNameKeyword, regex); + allMatchingNames.addAll(names); + } catch (ElasticsearchException e) { + if (!isIndexNotFoundException(e)) { + throw e; + } + } + } + + int totalUniqueColumns = allMatchingNames.size(); + int offset = decodeSearchOffset(request.getCursor()); + int pageSize = request.getSize(); + + List sortedNames = new ArrayList<>(allMatchingNames); + int fromIndex = Math.min(offset, sortedNames.size()); + int toIndex = Math.min(offset + pageSize, sortedNames.size()); + List pageNames = sortedNames.subList(fromIndex, toIndex); + + if (pageNames.isEmpty()) { + return buildResponse(new ArrayList<>(), null, false, totalUniqueColumns, 0); + } + + // Phase 2: Fetch data for this page's column names + Map> allColumnsByName = new HashMap<>(); + + for (Map.Entry> entry : fieldPathToEntityTypes.entrySet()) { + String columnNameKeyword = entry.getKey(); + List indexes = resolveIndexNames(entry.getValue()); + String columnFieldPath = INDEX_CONFIGS.get(entry.getValue().getFirst()).columnFieldPath(); + List columnNamesList = + columnNamesWithTags != null ? new ArrayList<>(columnNamesWithTags) : null; + Query query = buildFilters(request, columnNameKeyword, columnNamesList); + + try { + Map> columnsByName = + executePageDataQuery(query, indexes, columnNameKeyword, columnFieldPath, pageNames); + + applyTagPostFilter(columnsByName, entityColumnPairsWithTags); + + for (Map.Entry> colEntry : columnsByName.entrySet()) { + allColumnsByName + .computeIfAbsent(colEntry.getKey(), k -> new ArrayList<>()) + .addAll(colEntry.getValue()); + } + } catch (ElasticsearchException e) { + if (!isIndexNotFoundException(e)) { + throw e; + } + } + } + + List gridItems = ColumnMetadataGrouper.groupColumns(allColumnsByName); + int totalOccurrences = gridItems.stream().mapToInt(ColumnGridItem::getTotalOccurrences).sum(); + + boolean hasMore = toIndex < totalUniqueColumns; + String cursor = hasMore ? encodeSearchOffset(toIndex) : null; + + return buildResponse(gridItems, cursor, hasMore, totalUniqueColumns, totalOccurrences); + } + + private void applyTagPostFilter( + Map> columnsByName, Set entityColumnPairsWithTags) { + if (entityColumnPairsWithTags == null || entityColumnPairsWithTags.isEmpty()) { + return; + } + for (List occurrences : columnsByName.values()) { + occurrences.removeIf( + ctx -> { + String key = ctx.entityFQN + "#" + ctx.column.getName(); + return !entityColumnPairsWithTags.contains(key); + }); + } + columnsByName.entrySet().removeIf(e -> e.getValue().isEmpty()); + } + /** * Phase 1: Get entityFQN#columnName pairs that have the specified tags. Since ES flattens * arrays, we must fetch column data and filter in Java to find columns that actually have the @@ -356,15 +441,6 @@ private String escapeWildcardPattern(String input) { return input.replace("\\", "\\\\").replace("*", "\\*").replace("?", "\\?"); } - private boolean matchesColumnNamePattern(String columnName, String pattern) { - if (nullOrEmpty(pattern)) { - return true; - } - String lowerColumnName = columnName.toLowerCase(); - String lowerPattern = pattern.toLowerCase(); - return lowerColumnName.contains(lowerPattern); - } - /** Get entity types to query - defaults to table only for performance */ private List getEntityTypesForRequest(ColumnAggregationRequest request) { if (request.getEntityTypes() == null || request.getEntityTypes().isEmpty()) { @@ -573,6 +649,95 @@ private Query hasEmptyOrMissingField(String field) { .minimumShouldMatch("1"))); } + /** Phase 1: Get all matching column names using terms agg with include regex (no top_hits). */ + private List executeNamesQuery( + Query query, List indexes, String columnNameKeyword, String regex) + throws IOException { + + Aggregation termsAgg = + Aggregation.of( + a -> + a.terms( + t -> + t.field(columnNameKeyword) + .include(inc -> inc.regexp(regex)) + .size(MAX_PATTERN_SEARCH_NAMES) + .order( + List.of( + es.co.elastic.clients.util.NamedValue.of( + "_key", SortOrder.Asc))))); + + SearchRequest searchRequest = + SearchRequest.of( + s -> s.index(indexes).query(query).aggregations("matching_columns", termsAgg).size(0)); + + SearchResponse response = client.search(searchRequest, JsonData.class); + + List names = new ArrayList<>(); + if (response.aggregations() != null + && response.aggregations().containsKey("matching_columns")) { + StringTermsAggregate termsResult = response.aggregations().get("matching_columns").sterms(); + for (StringTermsBucket bucket : termsResult.buckets().array()) { + names.add(bucket.key().stringValue()); + } + } + return names; + } + + /** Phase 2: Get data for specific column names using terms agg with exact include + top_hits. */ + private Map> executePageDataQuery( + Query query, + List indexes, + String columnNameKeyword, + String columnFieldPath, + List columnNames) + throws IOException { + + Aggregation topHitsAgg = Aggregation.of(a -> a.topHits(th -> th.size(10))); + + Aggregation termsAgg = + Aggregation.of( + a -> + a.terms( + t -> + t.field(columnNameKeyword) + .include(inc -> inc.terms(columnNames)) + .size(columnNames.size())) + .aggregations("sample_docs", topHitsAgg)); + + SearchRequest searchRequest = + SearchRequest.of( + s -> s.index(indexes).query(query).aggregations("page_columns", termsAgg).size(0)); + + SearchResponse response = client.search(searchRequest, JsonData.class); + + return parseTermsAggResults(response, columnFieldPath); + } + + private Map> parseTermsAggResults( + SearchResponse response, String columnFieldPath) { + Map> columnsByName = new HashMap<>(); + + if (response.aggregations() == null || !response.aggregations().containsKey("page_columns")) { + return columnsByName; + } + + StringTermsAggregate termsAgg = response.aggregations().get("page_columns").sterms(); + + for (StringTermsBucket bucket : termsAgg.buckets().array()) { + String columnName = bucket.key().stringValue(); + + if (!bucket.aggregations().containsKey("sample_docs")) { + continue; + } + + TopHitsAggregate topHits = bucket.aggregations().get("sample_docs").topHits(); + parseBucketHits(columnName, topHits, columnFieldPath, columnsByName); + } + + return columnsByName; + } + private SearchResponse executeSearch( ColumnAggregationRequest request, Query query, List indexes, String columnNameKeyword) throws IOException { @@ -617,7 +782,7 @@ private SearchResponse executeSearch( return client.search(searchRequest, JsonData.class); } - private Map> parseAggregationResults( + private Map> parseCompositeAggResults( SearchResponse response, String columnFieldPath) { Map> columnsByName = new HashMap<>(); @@ -639,69 +804,72 @@ private Map> parseAggregationResults( } TopHitsAggregate topHits = bucket.aggregations().get("sample_docs").topHits(); - if (topHits == null || topHits.hits() == null || topHits.hits().hits().isEmpty()) { - continue; - } + parseBucketHits(columnName, topHits, columnFieldPath, columnsByName); + } - List occurrences = new ArrayList<>(); - // Track the original case column name from the document source - String originalCaseColumnName = null; - - for (Hit hit : topHits.hits().hits()) { - try { - JsonData source = hit.source(); - if (source == null) continue; - - JsonNode sourceNode = source.to(JsonNode.class); - String entityType = getTextField(sourceNode, "entityType"); - String entityFQN = getTextField(sourceNode, "fullyQualifiedName"); - String entityDisplayName = getTextField(sourceNode, "displayName"); - - String serviceName = getNestedField(sourceNode, "service", "name"); - String databaseName = getNestedField(sourceNode, "database", "name"); - String schemaName = getNestedField(sourceNode, "databaseSchema", "name"); - - // Get columns data from the correct path (e.g., "columns", "dataModel.columns", "fields") - JsonNode columnsData = getNestedJsonNode(sourceNode, columnFieldPath); - - if (columnsData != null && columnsData.isArray()) { - for (JsonNode columnData : columnsData) { - String colName = getTextField(columnData, "name"); - // ES keyword aggregation lowercases the column names, so use case-insensitive - // comparison - if (columnName.equalsIgnoreCase(colName)) { - // Preserve the original case column name from the first match - if (originalCaseColumnName == null) { - originalCaseColumnName = colName; - } - Column column = parseColumn(columnData, entityFQN); - - ColumnWithContext columnCtx = - new ColumnWithContext( - column, - entityType, - entityFQN, - entityDisplayName, - serviceName, - databaseName, - schemaName); - - occurrences.add(columnCtx); - break; + return columnsByName; + } + + /** Parse top_hits from a single bucket (shared by composite and terms agg parsing). */ + private void parseBucketHits( + String columnName, + TopHitsAggregate topHits, + String columnFieldPath, + Map> columnsByName) { + + if (topHits == null || topHits.hits() == null || topHits.hits().hits().isEmpty()) { + return; + } + + List occurrences = new ArrayList<>(); + String originalCaseColumnName = null; + + for (Hit hit : topHits.hits().hits()) { + try { + JsonData source = hit.source(); + if (source == null) continue; + + JsonNode sourceNode = source.to(JsonNode.class); + String entityType = getTextField(sourceNode, "entityType"); + String entityFQN = getTextField(sourceNode, "fullyQualifiedName"); + String entityDisplayName = getTextField(sourceNode, "displayName"); + + String serviceName = getNestedField(sourceNode, "service", "name"); + String databaseName = getNestedField(sourceNode, "database", "name"); + String schemaName = getNestedField(sourceNode, "databaseSchema", "name"); + + JsonNode columnsData = getNestedJsonNode(sourceNode, columnFieldPath); + + if (columnsData != null && columnsData.isArray()) { + for (JsonNode columnData : columnsData) { + String colName = getTextField(columnData, "name"); + if (columnName.equalsIgnoreCase(colName)) { + if (originalCaseColumnName == null) { + originalCaseColumnName = colName; } + Column column = parseColumn(columnData, entityFQN); + + occurrences.add( + new ColumnWithContext( + column, + entityType, + entityFQN, + entityDisplayName, + serviceName, + databaseName, + schemaName)); + break; } } - } catch (Exception e) { - LOG.warn("Failed to parse column occurrence from search hit", e); } - } - - if (!occurrences.isEmpty() && originalCaseColumnName != null) { - columnsByName.put(originalCaseColumnName, occurrences); + } catch (Exception e) { + LOG.warn("Failed to parse column occurrence from search hit", e); } } - return columnsByName; + if (!occurrences.isEmpty() && originalCaseColumnName != null) { + columnsByName.put(originalCaseColumnName, occurrences); + } } /** Navigate nested JSON path like "dataModel.columns" or "messageSchema.schemaFields" */ @@ -909,6 +1077,34 @@ private Map getTotalCounts( return totals; } + private String encodeSearchOffset(int offset) { + try { + String json = JsonUtils.pojoToJson(Map.of("searchOffset", offset)); + return Base64.getEncoder().encodeToString(json.getBytes(StandardCharsets.UTF_8)); + } catch (Exception e) { + LOG.error("Failed to encode search offset", e); + return null; + } + } + + @SuppressWarnings("unchecked") + private int decodeSearchOffset(String cursor) { + if (cursor == null) { + return 0; + } + try { + String json = new String(Base64.getDecoder().decode(cursor), StandardCharsets.UTF_8); + Map map = JsonUtils.readValue(json, Map.class); + Object offset = map.get("searchOffset"); + if (offset instanceof Number num) { + return num.intValue(); + } + return 0; + } catch (Exception e) { + return 0; + } + } + private ColumnGridResponse buildResponse( List gridItems, String cursor, diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java index 88a98a267f5b..fd173c50934a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.api.data.ColumnGridItem; import org.openmetadata.schema.api.data.ColumnGridResponse; @@ -47,6 +48,8 @@ import os.org.opensearch.client.opensearch._types.aggregations.CompositeAggregate; import os.org.opensearch.client.opensearch._types.aggregations.CompositeAggregationSource; import os.org.opensearch.client.opensearch._types.aggregations.CompositeBucket; +import os.org.opensearch.client.opensearch._types.aggregations.StringTermsAggregate; +import os.org.opensearch.client.opensearch._types.aggregations.StringTermsBucket; import os.org.opensearch.client.opensearch._types.aggregations.TopHitsAggregate; import os.org.opensearch.client.opensearch._types.query_dsl.BoolQuery; import os.org.opensearch.client.opensearch._types.query_dsl.Query; @@ -58,6 +61,9 @@ public class OpenSearchColumnAggregator implements ColumnAggregator { private final OpenSearchClient client; + /** Max column names to retrieve in the names-only query during pattern search. */ + private static final int MAX_PATTERN_SEARCH_NAMES = 10000; + /** Uses aliases defined in indexMapping.json */ private static final List DATA_ASSET_INDEXES = Arrays.asList("table", "dashboardDataModel", "topic", "searchIndex", "container"); @@ -97,37 +103,21 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr .collect(java.util.stream.Collectors.toList()); } - // Phase 2: Build query WITHOUT tag filter but WITH column names filter + // When a column name pattern is set, use terms aggregation with include regex + // for efficient server-side filtering instead of composite agg + post-filter + if (!nullOrEmpty(request.getColumnNamePattern())) { + return aggregateColumnsWithPattern(request, entityColumnPairsWithTags, columnNamesWithTags); + } + + // Non-search path: use composite aggregation Query query = buildFilters(request, columnNamesWithTags); try { SearchResponse response = executeSearch(request, query); - Map> columnsByName = parseAggregationResults(response); + Map> columnsByName = parseCompositeAggResults(response); - // Post-filter columns by name pattern since ES aggregation returns all columns from matched - // documents - String columnNamePattern = request.getColumnNamePattern(); - if (!nullOrEmpty(columnNamePattern)) { - columnsByName - .entrySet() - .removeIf(e -> !matchesColumnNamePattern(e.getKey(), columnNamePattern)); - } - - // Post-filter for tag/glossary terms filtering: Only keep occurrences that were - // identified in Phase 1 as having the tag (not just same column name) - if (entityColumnPairsWithTags != null && !entityColumnPairsWithTags.isEmpty()) { - final Set allowedPairs = entityColumnPairsWithTags; - for (List occurrences : columnsByName.values()) { - occurrences.removeIf( - ctx -> { - String key = ctx.entityFQN + "#" + ctx.column.getName(); - return !allowedPairs.contains(key); - }); - } - // Remove column entries that have no occurrences left - columnsByName.entrySet().removeIf(e -> e.getValue().isEmpty()); - } + applyTagPostFilter(columnsByName, entityColumnPairsWithTags); List gridItems = ColumnMetadataGrouper.groupColumns(columnsByName); @@ -136,14 +126,11 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr int totalUniqueColumns; int totalOccurrences; - // Get totals from ES aggregation only when no column name pattern - // (ES aggregation counts all columns from matched docs, not just filtered ones) - if (request.getCursor() == null && nullOrEmpty(request.getColumnNamePattern())) { + if (request.getCursor() == null) { Map totals = getTotalCounts(query); totalUniqueColumns = totals.get("uniqueColumns").intValue(); totalOccurrences = totals.get("totalOccurrences").intValue(); } else { - // Calculate from actual filtered data when pattern is specified or on subsequent pages totalUniqueColumns = columnsByName.size(); totalOccurrences = gridItems.stream().mapToInt(ColumnGridItem::getTotalOccurrences).sum(); } @@ -158,6 +145,75 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr } } + /** + * Search path: uses terms aggregation with include regex to filter column names at the + * aggregation level. Two queries: (1) lightweight names query to get all matching names and total + * count, (2) targeted data query with top_hits for the current page. + */ + private ColumnGridResponse aggregateColumnsWithPattern( + ColumnAggregationRequest request, + Set entityColumnPairsWithTags, + List columnNamesWithTags) + throws IOException { + + Query query = buildFilters(request, columnNamesWithTags); + String regex = ColumnAggregator.toCaseInsensitiveRegex(request.getColumnNamePattern()); + + try { + // Phase 1: Get all matching column names + List matchingNames = executeNamesQuery(query, regex); + Set dedupedNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + dedupedNames.addAll(matchingNames); + + int totalUniqueColumns = dedupedNames.size(); + int offset = decodeSearchOffset(request.getCursor()); + int pageSize = request.getSize(); + + List sortedNames = new ArrayList<>(dedupedNames); + int fromIndex = Math.min(offset, sortedNames.size()); + int toIndex = Math.min(offset + pageSize, sortedNames.size()); + List pageNames = sortedNames.subList(fromIndex, toIndex); + + if (pageNames.isEmpty()) { + return buildResponse(new ArrayList<>(), null, false, totalUniqueColumns, 0); + } + + // Phase 2: Get data for this page's column names + Map> columnsByName = executePageDataQuery(query, pageNames); + + applyTagPostFilter(columnsByName, entityColumnPairsWithTags); + + List gridItems = ColumnMetadataGrouper.groupColumns(columnsByName); + int totalOccurrences = gridItems.stream().mapToInt(ColumnGridItem::getTotalOccurrences).sum(); + + boolean hasMore = toIndex < totalUniqueColumns; + String cursor = hasMore ? encodeSearchOffset(toIndex) : null; + + return buildResponse(gridItems, cursor, hasMore, totalUniqueColumns, totalOccurrences); + } catch (OpenSearchException e) { + if (isIndexNotFoundException(e)) { + LOG.warn("Search index not found, returning empty results"); + return buildResponse(new ArrayList<>(), null, false, 0, 0); + } + throw e; + } + } + + private void applyTagPostFilter( + Map> columnsByName, Set entityColumnPairsWithTags) { + if (entityColumnPairsWithTags == null || entityColumnPairsWithTags.isEmpty()) { + return; + } + for (List occurrences : columnsByName.values()) { + occurrences.removeIf( + ctx -> { + String key = ctx.entityFQN + "#" + ctx.column.getName(); + return !entityColumnPairsWithTags.contains(key); + }); + } + columnsByName.entrySet().removeIf(e -> e.getValue().isEmpty()); + } + /** * Phase 1: Get entityFQN#columnName pairs that have the specified tags. Since ES flattens arrays, * we must fetch column data and filter in Java to find columns that actually have the tag. @@ -315,15 +371,6 @@ private String escapeWildcardPattern(String input) { return input.replace("\\", "\\\\").replace("*", "\\*").replace("?", "\\?"); } - private boolean matchesColumnNamePattern(String columnName, String pattern) { - if (nullOrEmpty(pattern)) { - return true; - } - String lowerColumnName = columnName.toLowerCase(); - String lowerPattern = pattern.toLowerCase(); - return lowerColumnName.contains(lowerPattern); - } - /** * Build filters for the main query. When columnNamesFromTagFilter is provided (two-phase query), * skip tag/glossaryTerms filters and use column names filter instead. @@ -516,6 +563,92 @@ private Query hasEmptyOrMissingField(String field) { .minimumShouldMatch("1"))); } + /** Phase 1: Get all matching column names using terms agg with include regex (no top_hits). */ + private List executeNamesQuery(Query query, String regex) throws IOException { + Aggregation termsAgg = + Aggregation.of( + a -> + a.terms( + t -> + t.field("columns.name.keyword") + .include(inc -> inc.regexp(regex)) + .size(MAX_PATTERN_SEARCH_NAMES) + .order(List.of(Map.of("_key", SortOrder.Asc))))); + + SearchRequest searchRequest = + SearchRequest.of( + s -> + s.index(resolveIndexNames()) + .query(query) + .aggregations("matching_columns", termsAgg) + .size(0)); + + SearchResponse response = client.search(searchRequest, JsonData.class); + + List names = new ArrayList<>(); + if (response.aggregations() != null + && response.aggregations().containsKey("matching_columns")) { + StringTermsAggregate termsResult = response.aggregations().get("matching_columns").sterms(); + for (StringTermsBucket bucket : termsResult.buckets().array()) { + names.add(bucket.key()); + } + } + return names; + } + + /** Phase 2: Get data for specific column names using terms agg with exact include + top_hits. */ + private Map> executePageDataQuery( + Query query, List columnNames) throws IOException { + + Aggregation topHitsAgg = Aggregation.of(a -> a.topHits(th -> th.size(100))); + + Aggregation termsAgg = + Aggregation.of( + a -> + a.terms( + t -> + t.field("columns.name.keyword") + .include(inc -> inc.terms(columnNames)) + .size(columnNames.size())) + .aggregations("sample_docs", topHitsAgg)); + + SearchRequest searchRequest = + SearchRequest.of( + s -> + s.index(resolveIndexNames()) + .query(query) + .aggregations("page_columns", termsAgg) + .size(0)); + + SearchResponse response = client.search(searchRequest, JsonData.class); + + return parseTermsAggResults(response); + } + + private Map> parseTermsAggResults( + SearchResponse response) { + Map> columnsByName = new HashMap<>(); + + if (response.aggregations() == null || !response.aggregations().containsKey("page_columns")) { + return columnsByName; + } + + StringTermsAggregate termsAgg = response.aggregations().get("page_columns").sterms(); + + for (StringTermsBucket bucket : termsAgg.buckets().array()) { + String columnName = bucket.key(); + + if (!bucket.aggregations().containsKey("sample_docs")) { + continue; + } + + TopHitsAggregate topHits = bucket.aggregations().get("sample_docs").topHits(); + parseBucketHits(columnName, topHits, columnsByName); + } + + return columnsByName; + } + private SearchResponse executeSearch(ColumnAggregationRequest request, Query query) throws IOException { Map sources = new HashMap<>(); @@ -524,12 +657,7 @@ private SearchResponse executeSearch(ColumnAggregationRequest request, CompositeAggregationSource.of( cas -> cas.terms(t -> t.field("columns.name.keyword").order(SortOrder.Asc)))); - Aggregation topHitsAgg = - Aggregation.of( - a -> - // Use full _source to avoid OpenSearch top_hits source-filter edge cases where - // mixing root + nested include paths can return empty buckets unexpectedly. - a.topHits(th -> th.size(100))); + Aggregation topHitsAgg = Aggregation.of(a -> a.topHits(th -> th.size(100))); Map subAggs = new HashMap<>(); subAggs.put("sample_docs", topHitsAgg); @@ -560,7 +688,7 @@ private SearchResponse executeSearch(ColumnAggregationRequest request, return client.search(searchRequest, JsonData.class); } - private Map> parseAggregationResults( + private Map> parseCompositeAggResults( SearchResponse response) { Map> columnsByName = new HashMap<>(); @@ -583,68 +711,71 @@ private Map> parseAggregationResults( } TopHitsAggregate topHits = bucket.aggregations().get("sample_docs").topHits(); - if (topHits == null || topHits.hits() == null || topHits.hits().hits().isEmpty()) { - continue; - } + parseBucketHits(columnName, topHits, columnsByName); + } + + return columnsByName; + } - List occurrences = new ArrayList<>(); - // Track the original case column name from the document source - String originalCaseColumnName = null; - - for (Hit hit : topHits.hits().hits()) { - try { - JsonData source = hit.source(); - if (source == null) continue; - - JsonNode sourceNode = source.to(JsonNode.class); - String entityType = getTextField(sourceNode, "entityType"); - String entityFQN = getTextField(sourceNode, "fullyQualifiedName"); - String entityDisplayName = getTextField(sourceNode, "displayName"); - - String serviceName = getNestedField(sourceNode, "service", "name"); - String databaseName = getNestedField(sourceNode, "database", "name"); - String schemaName = getNestedField(sourceNode, "databaseSchema", "name"); - - JsonNode columnsData = sourceNode.get("columns"); - - if (columnsData != null && columnsData.isArray()) { - for (JsonNode columnData : columnsData) { - String colName = getTextField(columnData, "name"); - // ES keyword aggregation lowercases the column names, so use case-insensitive - // comparison - if (columnName.equalsIgnoreCase(colName)) { - // Preserve the original case column name from the first match - if (originalCaseColumnName == null) { - originalCaseColumnName = colName; - } - Column column = parseColumn(columnData, entityFQN); - - ColumnWithContext columnCtx = - new ColumnWithContext( - column, - entityType, - entityFQN, - entityDisplayName, - serviceName, - databaseName, - schemaName); - - occurrences.add(columnCtx); - break; + /** Parse top_hits from a single bucket (shared by composite and terms agg parsing). */ + private void parseBucketHits( + String columnName, + TopHitsAggregate topHits, + Map> columnsByName) { + + if (topHits == null || topHits.hits() == null || topHits.hits().hits().isEmpty()) { + return; + } + + List occurrences = new ArrayList<>(); + String originalCaseColumnName = null; + + for (Hit hit : topHits.hits().hits()) { + try { + JsonData source = hit.source(); + if (source == null) continue; + + JsonNode sourceNode = source.to(JsonNode.class); + String entityType = getTextField(sourceNode, "entityType"); + String entityFQN = getTextField(sourceNode, "fullyQualifiedName"); + String entityDisplayName = getTextField(sourceNode, "displayName"); + + String serviceName = getNestedField(sourceNode, "service", "name"); + String databaseName = getNestedField(sourceNode, "database", "name"); + String schemaName = getNestedField(sourceNode, "databaseSchema", "name"); + + JsonNode columnsData = sourceNode.get("columns"); + + if (columnsData != null && columnsData.isArray()) { + for (JsonNode columnData : columnsData) { + String colName = getTextField(columnData, "name"); + if (columnName.equalsIgnoreCase(colName)) { + if (originalCaseColumnName == null) { + originalCaseColumnName = colName; } + Column column = parseColumn(columnData, entityFQN); + + occurrences.add( + new ColumnWithContext( + column, + entityType, + entityFQN, + entityDisplayName, + serviceName, + databaseName, + schemaName)); + break; } } - } catch (Exception e) { - LOG.warn("Failed to parse column occurrence from search hit", e); } - } - - if (!occurrences.isEmpty() && originalCaseColumnName != null) { - columnsByName.put(originalCaseColumnName, occurrences); + } catch (Exception e) { + LOG.warn("Failed to parse column occurrence from search hit", e); } } - return columnsByName; + if (!occurrences.isEmpty() && originalCaseColumnName != null) { + columnsByName.put(originalCaseColumnName, occurrences); + } } private String getTextField(JsonNode node, String field) { @@ -864,6 +995,34 @@ private Map getTotalCounts(Query query) throws IOException { return totals; } + private String encodeSearchOffset(int offset) { + try { + String json = JsonUtils.pojoToJson(Map.of("searchOffset", offset)); + return Base64.getEncoder().encodeToString(json.getBytes(StandardCharsets.UTF_8)); + } catch (Exception e) { + LOG.error("Failed to encode search offset", e); + return null; + } + } + + @SuppressWarnings("unchecked") + private int decodeSearchOffset(String cursor) { + if (cursor == null) { + return 0; + } + try { + String json = new String(Base64.getDecoder().decode(cursor), StandardCharsets.UTF_8); + Map map = JsonUtils.readValue(json, Map.class); + Object offset = map.get("searchOffset"); + if (offset instanceof Number num) { + return num.intValue(); + } + return 0; + } catch (Exception e) { + return 0; + } + } + private ColumnGridResponse buildResponse( List gridItems, String cursor, diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/search/ColumnAggregatorTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/search/ColumnAggregatorTest.java new file mode 100644 index 000000000000..b0f8259f010e --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/search/ColumnAggregatorTest.java @@ -0,0 +1,112 @@ +/* + * Copyright 2025 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.service.search; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.regex.Pattern; +import org.junit.jupiter.api.Test; + +class ColumnAggregatorTest { + + @Test + void toCaseInsensitiveRegex_simpleAlpha() { + String regex = ColumnAggregator.toCaseInsensitiveRegex("MAT"); + + assertEquals(".*[mM][aA][tT].*", regex); + Pattern pattern = Pattern.compile(regex); + assertTrue(pattern.matcher("MAT").matches()); + assertTrue(pattern.matcher("mat").matches()); + assertTrue(pattern.matcher("Mat").matches()); + assertTrue(pattern.matcher("MATNR").matches()); + assertTrue(pattern.matcher("some_mat_column").matches()); + assertFalse(pattern.matcher("MBA").matches()); + } + + @Test + void toCaseInsensitiveRegex_mixedCase() { + String regex = ColumnAggregator.toCaseInsensitiveRegex("MaTnR"); + + Pattern pattern = Pattern.compile(regex); + assertTrue(pattern.matcher("MATNR").matches()); + assertTrue(pattern.matcher("matnr").matches()); + assertTrue(pattern.matcher("MaTnR").matches()); + assertFalse(pattern.matcher("MATMR").matches()); + } + + @Test + void toCaseInsensitiveRegex_withDigits() { + String regex = ColumnAggregator.toCaseInsensitiveRegex("col1"); + + Pattern pattern = Pattern.compile(regex); + assertTrue(pattern.matcher("COL1").matches()); + assertTrue(pattern.matcher("col1").matches()); + assertTrue(pattern.matcher("my_col1_name").matches()); + assertFalse(pattern.matcher("col2").matches()); + } + + @Test + void toCaseInsensitiveRegex_withUnderscore() { + String regex = ColumnAggregator.toCaseInsensitiveRegex("col_name"); + + Pattern pattern = Pattern.compile(regex); + assertTrue(pattern.matcher("col_name").matches()); + assertTrue(pattern.matcher("COL_NAME").matches()); + assertTrue(pattern.matcher("my_col_name_here").matches()); + } + + @Test + void toCaseInsensitiveRegex_escapesRegexSpecialChars() { + String regex = ColumnAggregator.toCaseInsensitiveRegex("col.name"); + + Pattern pattern = Pattern.compile(regex); + assertTrue(pattern.matcher("col.name").matches()); + // Dot should be literal, not wildcard + assertFalse(pattern.matcher("colXname").matches()); + } + + @Test + void toCaseInsensitiveRegex_singleChar() { + String regex = ColumnAggregator.toCaseInsensitiveRegex("a"); + + assertEquals(".*[aA].*", regex); + Pattern pattern = Pattern.compile(regex); + assertTrue(pattern.matcher("A").matches()); + assertTrue(pattern.matcher("abc").matches()); + assertTrue(pattern.matcher("XAY").matches()); + } + + @Test + void toCaseInsensitiveRegex_emptyString() { + String regex = ColumnAggregator.toCaseInsensitiveRegex(""); + + assertEquals(".*.*", regex); + Pattern pattern = Pattern.compile(regex); + assertTrue(pattern.matcher("anything").matches()); + assertTrue(pattern.matcher("").matches()); + } + + @Test + void toCaseInsensitiveRegex_specialCharsAreEscaped() { + String regex = ColumnAggregator.toCaseInsensitiveRegex("a+b*c?"); + + Pattern pattern = Pattern.compile(regex); + assertTrue(pattern.matcher("a+b*c?").matches()); + assertTrue(pattern.matcher("prefix_a+b*c?_suffix").matches()); + // Plus and star should be literal, not regex quantifiers + assertFalse(pattern.matcher("abbbbc").matches()); + } +} From 4d6bc82e5a7fad3841fdba81d9cf4103b77fb86f Mon Sep 17 00:00:00 2001 From: sonika-shah <58761340+sonika-shah@users.noreply.github.com> Date: Mon, 13 Apr 2026 11:42:07 +0530 Subject: [PATCH 2/8] test(search): add integration tests for column pattern search regex against ES --- .../it/tests/ColumnGridResourceIT.java | 107 ++++++++++++++++++ 1 file changed, 107 insertions(+) diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java index 1d2289c40481..bdbbf0b69a7a 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java @@ -1471,6 +1471,113 @@ private void waitForSearchIndexRefresh() { .until(() -> true); } + @Test + void test_getColumnGrid_patternSearchIsCaseInsensitive(TestNamespace ns) throws Exception { + OpenMetadataClient client = SdkClients.adminClient(); + DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns); + DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service); + + String colName = ns.prefix("CaseMixCol"); + Column col = Columns.build(colName).withType(ColumnDataType.VARCHAR).withLength(255).create(); + Tables.create() + .name(ns.prefix("case_test_table")) + .inSchema(schema.getFullyQualifiedName()) + .withColumns(List.of(col)) + .execute(); + + waitForSearchIndexRefresh(); + + // Search with all lowercase + ColumnGridResponse lowerResponse = + getColumnGrid( + client, + "entityTypes=table&columnNamePattern=casemixcol&serviceName=" + service.getName()); + + assertNotNull(lowerResponse); + boolean foundLower = + lowerResponse.getColumns().stream().anyMatch(c -> c.getColumnName().equals(colName)); + assertTrue(foundLower, "Lowercase search should find the mixed-case column"); + + // Search with all uppercase + ColumnGridResponse upperResponse = + getColumnGrid( + client, + "entityTypes=table&columnNamePattern=CASEMIXCOL&serviceName=" + service.getName()); + + assertNotNull(upperResponse); + boolean foundUpper = + upperResponse.getColumns().stream().anyMatch(c -> c.getColumnName().equals(colName)); + assertTrue(foundUpper, "Uppercase search should find the mixed-case column"); + } + + @Test + void test_getColumnGrid_patternSearchExcludesNonMatching(TestNamespace ns) throws Exception { + OpenMetadataClient client = SdkClients.adminClient(); + DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns); + DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service); + + String matchCol = ns.prefix("regex_target"); + String noMatchCol = ns.prefix("other_field"); + Column col1 = Columns.build(matchCol).withType(ColumnDataType.VARCHAR).withLength(255).create(); + Column col2 = + Columns.build(noMatchCol).withType(ColumnDataType.VARCHAR).withLength(255).create(); + Tables.create() + .name(ns.prefix("regex_exclude_table")) + .inSchema(schema.getFullyQualifiedName()) + .withColumns(List.of(col1, col2)) + .execute(); + + waitForSearchIndexRefresh(); + + ColumnGridResponse response = + getColumnGrid( + client, + "entityTypes=table&columnNamePattern=regex_target&serviceName=" + service.getName()); + + assertNotNull(response); + boolean foundMatch = + response.getColumns().stream().anyMatch(c -> c.getColumnName().equals(matchCol)); + boolean foundNoMatch = + response.getColumns().stream().anyMatch(c -> c.getColumnName().equals(noMatchCol)); + assertTrue(foundMatch, "Matching column should be in results"); + assertFalse(foundNoMatch, "Non-matching column from same table should be excluded"); + } + + @Test + void test_getColumnGrid_patternSearchWithSpecialChars(TestNamespace ns) throws Exception { + OpenMetadataClient client = SdkClients.adminClient(); + DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns); + DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service); + + String colWithDot = ns.prefix("col.with.dots"); + String colNoDot = ns.prefix("colXwithXdots"); + Column col1 = + Columns.build(colWithDot).withType(ColumnDataType.VARCHAR).withLength(255).create(); + Column col2 = Columns.build(colNoDot).withType(ColumnDataType.VARCHAR).withLength(255).create(); + Tables.create() + .name(ns.prefix("special_char_table")) + .inSchema(schema.getFullyQualifiedName()) + .withColumns(List.of(col1, col2)) + .execute(); + + waitForSearchIndexRefresh(); + + // Search for "col.with" — dot should be literal, not wildcard + ColumnGridResponse response = + getColumnGrid( + client, + "entityTypes=table&columnNamePattern=col.with&serviceName=" + service.getName()); + + assertNotNull(response); + boolean foundDotCol = + response.getColumns().stream().anyMatch(c -> c.getColumnName().equals(colWithDot)); + boolean foundNoDotCol = + response.getColumns().stream().anyMatch(c -> c.getColumnName().equals(colNoDot)); + assertTrue(foundDotCol, "Column with literal dot should match"); + assertFalse( + foundNoDotCol, "Column without dot should not match — dot must be literal, not wildcard"); + } + private void waitForColumnToBeIndexed( OpenMetadataClient client, String columnName, String serviceName) { await() From f8dee2dadfaa8321757aa36b4fe6a235f18512a7 Mon Sep 17 00:00:00 2001 From: sonika-shah <58761340+sonika-shah@users.noreply.github.com> Date: Tue, 14 Apr 2026 00:44:21 +0530 Subject: [PATCH 3/8] fix(search): extract column metadata from _source in tag filter path, eliminating Phase 2 ES query --- .../it/tests/ColumnGridResourceIT.java | 273 ++++++++++++++++++ .../ElasticSearchColumnAggregator.java | 181 +++++++----- .../OpenSearchColumnAggregator.java | 179 +++++++----- 3 files changed, 490 insertions(+), 143 deletions(-) diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java index bdbbf0b69a7a..814bf36f72c3 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java @@ -1578,6 +1578,279 @@ void test_getColumnGrid_patternSearchWithSpecialChars(TestNamespace ns) throws E foundNoDotCol, "Column without dot should not match — dot must be literal, not wildcard"); } + @Test + void test_getColumnGrid_patternPlusTagFilter(TestNamespace ns) throws Exception { + OpenMetadataClient client = SdkClients.adminClient(); + DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns); + DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service); + + TagLabel piiTag = new TagLabel(); + piiTag.setTagFQN("PII.Sensitive"); + piiTag.setSource(TagLabel.TagSource.CLASSIFICATION); + piiTag.setLabelType(TagLabel.LabelType.MANUAL); + piiTag.setState(TagLabel.State.CONFIRMED); + + String taggedMatchCol = ns.prefix("pat_tag_match"); + String taggedNoMatchCol = ns.prefix("pat_tag_other"); + String untaggedMatchCol = ns.prefix("pat_tag_match_notag"); + + // Table 1: tagged column matching pattern + tagged column NOT matching pattern + Column col1 = + Columns.build(taggedMatchCol) + .withType(ColumnDataType.VARCHAR) + .withLength(255) + .withTags(List.of(piiTag)) + .create(); + Column col2 = + Columns.build(taggedNoMatchCol) + .withType(ColumnDataType.VARCHAR) + .withLength(255) + .withTags(List.of(piiTag)) + .create(); + Tables.create() + .name(ns.prefix("pat_tag_table_1")) + .inSchema(schema.getFullyQualifiedName()) + .withColumns(List.of(col1, col2)) + .execute(); + + // Table 2: same column name as col1 but WITHOUT tag + Column col3 = + Columns.build(untaggedMatchCol).withType(ColumnDataType.VARCHAR).withLength(255).create(); + Tables.create() + .name(ns.prefix("pat_tag_table_2")) + .inSchema(schema.getFullyQualifiedName()) + .withColumns(List.of(col3)) + .execute(); + + waitForSearchIndexRefresh(); + + ColumnGridResponse response = + getColumnGrid( + client, + "entityTypes=table&tags=PII.Sensitive&columnNamePattern=pat_tag_match&serviceName=" + + service.getName()); + + assertNotNull(response); + + // Should find taggedMatchCol (matches pattern AND has tag) + // Should NOT find taggedNoMatchCol (has tag but doesn't match pattern) + // Should NOT find untaggedMatchCol (matches pattern but no tag) + boolean foundTaggedMatch = false; + boolean foundTaggedNoMatch = false; + boolean foundUntaggedMatch = false; + + for (ColumnGridItem item : response.getColumns()) { + if (item.getColumnName().equals(taggedMatchCol)) { + foundTaggedMatch = true; + } + if (item.getColumnName().equals(taggedNoMatchCol)) { + foundTaggedNoMatch = true; + } + if (item.getColumnName().equals(untaggedMatchCol)) { + foundUntaggedMatch = true; + } + } + + assertTrue(foundTaggedMatch, "Column with tag AND matching pattern should be in results"); + assertFalse(foundTaggedNoMatch, "Column with tag but NOT matching pattern should be excluded"); + assertFalse(foundUntaggedMatch, "Column matching pattern but WITHOUT tag should be excluded"); + } + + @Test + void test_getColumnGrid_patternPlusGlossaryFilter(TestNamespace ns) throws Exception { + OpenMetadataClient client = SdkClients.adminClient(); + DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns); + DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service); + + Glossary glossary = createGlossary(client, ns, "PG"); + GlossaryTerm term = createGlossaryTerm(client, glossary, ns, "PT"); + + TagLabel glossaryTag = new TagLabel(); + glossaryTag.setTagFQN(term.getFullyQualifiedName()); + glossaryTag.setSource(TagLabel.TagSource.GLOSSARY); + glossaryTag.setLabelType(TagLabel.LabelType.MANUAL); + glossaryTag.setState(TagLabel.State.CONFIRMED); + + String matchCol = ns.prefix("pg_match_col"); + String noMatchCol = ns.prefix("pg_other_col"); + + Column col1 = + Columns.build(matchCol) + .withType(ColumnDataType.VARCHAR) + .withLength(255) + .withTags(List.of(glossaryTag)) + .create(); + Column col2 = + Columns.build(noMatchCol) + .withType(ColumnDataType.VARCHAR) + .withLength(255) + .withTags(List.of(glossaryTag)) + .create(); + Tables.create() + .name(ns.prefix("pg_table")) + .inSchema(schema.getFullyQualifiedName()) + .withColumns(List.of(col1, col2)) + .execute(); + + waitForSearchIndexRefresh(); + + ColumnGridResponse response = + getColumnGrid( + client, + "entityTypes=table&glossaryTerms=" + + term.getFullyQualifiedName() + + "&columnNamePattern=pg_match&serviceName=" + + service.getName()); + + assertNotNull(response); + + boolean foundMatch = + response.getColumns().stream().anyMatch(c -> c.getColumnName().equals(matchCol)); + boolean foundNoMatch = + response.getColumns().stream().anyMatch(c -> c.getColumnName().equals(noMatchCol)); + + assertTrue(foundMatch, "Column matching both pattern and glossary should be in results"); + assertFalse(foundNoMatch, "Column with glossary but not matching pattern should be excluded"); + } + + @Test + void test_getColumnGrid_tagFilterPaginationConsistency(TestNamespace ns) throws Exception { + OpenMetadataClient client = SdkClients.adminClient(); + DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns); + DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service); + + TagLabel piiTag = new TagLabel(); + piiTag.setTagFQN("PII.Sensitive"); + piiTag.setSource(TagLabel.TagSource.CLASSIFICATION); + piiTag.setLabelType(TagLabel.LabelType.MANUAL); + piiTag.setState(TagLabel.State.CONFIRMED); + + // Create 5 tables, each with a uniquely-named tagged column + for (int i = 0; i < 5; i++) { + Column col = + Columns.build(ns.prefix("pagcon_col_" + i)) + .withType(ColumnDataType.VARCHAR) + .withLength(255) + .withTags(List.of(piiTag)) + .create(); + Tables.create() + .name(ns.prefix("pagcon_table_" + i)) + .inSchema(schema.getFullyQualifiedName()) + .withColumns(List.of(col)) + .execute(); + } + + waitForSearchIndexRefresh(); + + // Page through with size=2 — should get 2, 2, 1 + // Use serviceName to scope to this test's data, raw pattern prefix to match column names + String baseQuery = + "entityTypes=table&tags=PII.Sensitive&columnNamePattern=pagcon&serviceName=" + + service.getName() + + "&size=2"; + + await("Wait for all 5 tagged columns to be indexed") + .atMost(Duration.ofSeconds(45)) + .pollInterval(Duration.ofSeconds(2)) + .untilAsserted( + () -> { + ColumnGridResponse first = getColumnGrid(client, baseQuery); + assertNotNull(first); + assertEquals(5, first.getTotalUniqueColumns(), "Should report 5 unique columns"); + }); + + ColumnGridResponse page1 = getColumnGrid(client, baseQuery); + assertEquals(2, page1.getColumns().size(), "Page 1 should have exactly 2 columns"); + assertNotNull(page1.getCursor(), "Page 1 should have a cursor for next page"); + + ColumnGridResponse page2 = getColumnGrid(client, baseQuery + "&cursor=" + page1.getCursor()); + assertEquals(2, page2.getColumns().size(), "Page 2 should have exactly 2 columns"); + assertNotNull(page2.getCursor(), "Page 2 should have a cursor for next page"); + + ColumnGridResponse page3 = getColumnGrid(client, baseQuery + "&cursor=" + page2.getCursor()); + assertEquals(1, page3.getColumns().size(), "Page 3 (last) should have exactly 1 column"); + + // Verify no duplicates across pages + java.util.Set allNames = new java.util.HashSet<>(); + for (ColumnGridItem item : page1.getColumns()) { + assertTrue(allNames.add(item.getColumnName()), "Duplicate found: " + item.getColumnName()); + } + for (ColumnGridItem item : page2.getColumns()) { + assertTrue(allNames.add(item.getColumnName()), "Duplicate found: " + item.getColumnName()); + } + for (ColumnGridItem item : page3.getColumns()) { + assertTrue(allNames.add(item.getColumnName()), "Duplicate found: " + item.getColumnName()); + } + assertEquals(5, allNames.size(), "Should have collected all 5 unique columns across pages"); + } + + @Test + void test_getColumnGrid_glossaryFilter_onlyReturnsGlossaryOccurrences(TestNamespace ns) + throws Exception { + OpenMetadataClient client = SdkClients.adminClient(); + DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns); + DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service); + + Glossary glossary = createGlossary(client, ns, "OG"); + GlossaryTerm term = createGlossaryTerm(client, glossary, ns, "OT"); + + TagLabel glossaryTag = new TagLabel(); + glossaryTag.setTagFQN(term.getFullyQualifiedName()); + glossaryTag.setSource(TagLabel.TagSource.GLOSSARY); + glossaryTag.setLabelType(TagLabel.LabelType.MANUAL); + glossaryTag.setState(TagLabel.State.CONFIRMED); + + String sharedName = ns.prefix("gocc_col"); + + // Table 1: column WITH glossary term + Column withGlossary = + Columns.build(sharedName) + .withType(ColumnDataType.VARCHAR) + .withLength(255) + .withDescription("Has glossary") + .withTags(List.of(glossaryTag)) + .create(); + Tables.create() + .name(ns.prefix("gocc_t1")) + .inSchema(schema.getFullyQualifiedName()) + .withColumns(List.of(withGlossary)) + .execute(); + + // Table 2: same column name WITHOUT glossary term + Column withoutGlossary = + Columns.build(sharedName) + .withType(ColumnDataType.VARCHAR) + .withLength(255) + .withDescription("No glossary") + .create(); + Tables.create() + .name(ns.prefix("gocc_t2")) + .inSchema(schema.getFullyQualifiedName()) + .withColumns(List.of(withoutGlossary)) + .execute(); + + waitForSearchIndexRefresh(); + + ColumnGridResponse response = + getColumnGrid( + client, + "entityTypes=table&glossaryTerms=" + + term.getFullyQualifiedName() + + "&serviceName=" + + service.getName()); + + assertNotNull(response); + + for (ColumnGridItem item : response.getColumns()) { + if (item.getColumnName().equals(sharedName)) { + assertEquals( + 1, + item.getTotalOccurrences(), + "Should only return the occurrence WITH the glossary term, not all with same name"); + } + } + } + private void waitForColumnToBeIndexed( OpenMetadataClient client, String columnName, String serviceName) { await() diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java index 2163b004b339..23f2322ed69a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java @@ -93,31 +93,33 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr List entityTypes = getEntityTypesForRequest(request); - // Two-phase query for tags/glossaryTerms filtering: - // Phase 1: Find entityFQN#columnName pairs that have the tag - // Phase 2: Filter to only return those specific occurrences + // Tag/glossary filter path: we must read _source to check which specific column has + // the tag (ES flat object mapping can't tell us). Since we're already reading _source, + // we extract full column metadata in the same pass — no separate data-fetch query needed. boolean hasTagFilter = !nullOrEmpty(request.getTags()) || !nullOrEmpty(request.getGlossaryTerms()); - Set entityColumnPairsWithTags = null; - Set columnNamesWithTags = null; if (hasTagFilter) { - entityColumnPairsWithTags = getEntityColumnPairsWithTags(request, entityTypes); - if (entityColumnPairsWithTags.isEmpty()) { + Map> taggedColumns = + getColumnsWithTagsFromSource(request, entityTypes); + if (taggedColumns.isEmpty()) { return buildResponse(new ArrayList<>(), null, false, 0, 0); } - // Also keep just the column names for the Phase 2 query filter - columnNamesWithTags = - entityColumnPairsWithTags.stream() - .map(pair -> pair.substring(pair.indexOf('#') + 1)) - .collect(java.util.stream.Collectors.toSet()); + + // Pattern + tag combined: filter the already-fetched columns by pattern in Java + if (!nullOrEmpty(request.getColumnNamePattern())) { + String pattern = request.getColumnNamePattern().toLowerCase(java.util.Locale.ROOT); + taggedColumns + .entrySet() + .removeIf(e -> !e.getKey().toLowerCase(java.util.Locale.ROOT).contains(pattern)); + } + + return aggregateColumnsWithKnownNames(request, taggedColumns); } - // When a column name pattern is set, use terms aggregation with include regex - // for efficient server-side filtering instead of composite agg + post-filter + // Pattern-only path (no tag filter): use terms agg with include regex if (!nullOrEmpty(request.getColumnNamePattern())) { - return aggregateColumnsWithPattern( - request, entityTypes, entityColumnPairsWithTags, columnNamesWithTags); + return aggregateColumnsWithPattern(request, entityTypes); } Map> allColumnsByName = new HashMap<>(); @@ -137,9 +139,7 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr String columnFieldPath = INDEX_CONFIGS.get(groupEntityTypes.getFirst()).columnFieldPath(); - List columnNamesList = - columnNamesWithTags != null ? new ArrayList<>(columnNamesWithTags) : null; - Query query = buildFilters(request, columnNameKeyword, columnNamesList); + Query query = buildFilters(request, columnNameKeyword, null); try { SearchResponse response = @@ -148,8 +148,6 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr Map> columnsByName = parseCompositeAggResults(response, columnFieldPath); - applyTagPostFilter(columnsByName, entityColumnPairsWithTags); - for (Map.Entry> colEntry : columnsByName.entrySet()) { allColumnsByName .computeIfAbsent(colEntry.getKey(), k -> new ArrayList<>()) @@ -188,16 +186,13 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr } /** - * Search path: uses terms aggregation with include regex to filter column names at the - * aggregation level. Two queries per entity-type group: (1) lightweight names query to get all - * matching names and total count, (2) targeted data query with top_hits for the current page. + * Pattern-only search path (no tag filter): uses terms aggregation with include regex to filter + * column names at the aggregation level. Two queries per entity-type group: (1) lightweight names + * query to get all matching names and total count, (2) targeted data query with top_hits for the + * current page. */ private ColumnGridResponse aggregateColumnsWithPattern( - ColumnAggregationRequest request, - List entityTypes, - Set entityColumnPairsWithTags, - Set columnNamesWithTags) - throws IOException { + ColumnAggregationRequest request, List entityTypes) throws IOException { Map> fieldPathToEntityTypes = groupByFieldPath(entityTypes); String regex = ColumnAggregator.toCaseInsensitiveRegex(request.getColumnNamePattern()); @@ -208,9 +203,7 @@ private ColumnGridResponse aggregateColumnsWithPattern( for (Map.Entry> entry : fieldPathToEntityTypes.entrySet()) { String columnNameKeyword = entry.getKey(); List indexes = resolveIndexNames(entry.getValue()); - List columnNamesList = - columnNamesWithTags != null ? new ArrayList<>(columnNamesWithTags) : null; - Query query = buildFilters(request, columnNameKeyword, columnNamesList); + Query query = buildFilters(request, columnNameKeyword, null); try { List names = executeNamesQuery(query, indexes, columnNameKeyword, regex); @@ -242,16 +235,12 @@ private ColumnGridResponse aggregateColumnsWithPattern( String columnNameKeyword = entry.getKey(); List indexes = resolveIndexNames(entry.getValue()); String columnFieldPath = INDEX_CONFIGS.get(entry.getValue().getFirst()).columnFieldPath(); - List columnNamesList = - columnNamesWithTags != null ? new ArrayList<>(columnNamesWithTags) : null; - Query query = buildFilters(request, columnNameKeyword, columnNamesList); + Query query = buildFilters(request, columnNameKeyword, null); try { Map> columnsByName = executePageDataQuery(query, indexes, columnNameKeyword, columnFieldPath, pageNames); - applyTagPostFilter(columnsByName, entityColumnPairsWithTags); - for (Map.Entry> colEntry : columnsByName.entrySet()) { allColumnsByName .computeIfAbsent(colEntry.getKey(), k -> new ArrayList<>()) @@ -273,29 +262,55 @@ private ColumnGridResponse aggregateColumnsWithPattern( return buildResponse(gridItems, cursor, hasMore, totalUniqueColumns, totalOccurrences); } - private void applyTagPostFilter( - Map> columnsByName, Set entityColumnPairsWithTags) { - if (entityColumnPairsWithTags == null || entityColumnPairsWithTags.isEmpty()) { - return; + /** + * Tag/glossary filter path: the tag-check pass already extracted full column metadata from + * _source (only tagged columns are in the map). Just paginate over the in-memory result. + */ + private ColumnGridResponse aggregateColumnsWithKnownNames( + ColumnAggregationRequest request, Map> taggedColumns) { + + Set allNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + allNames.addAll(taggedColumns.keySet()); + + int totalUniqueColumns = allNames.size(); + int offset = decodeSearchOffset(request.getCursor()); + int pageSize = request.getSize(); + + List sortedNames = new ArrayList<>(allNames); + int fromIndex = Math.min(offset, sortedNames.size()); + int toIndex = Math.min(offset + pageSize, sortedNames.size()); + List pageNames = sortedNames.subList(fromIndex, toIndex); + + if (pageNames.isEmpty()) { + return buildResponse(new ArrayList<>(), null, false, totalUniqueColumns, 0); } - for (List occurrences : columnsByName.values()) { - occurrences.removeIf( - ctx -> { - String key = ctx.entityFQN + "#" + ctx.column.getName(); - return !entityColumnPairsWithTags.contains(key); - }); + + // Slice only the current page from the full map + Map> pageColumns = new HashMap<>(); + for (String name : pageNames) { + List occurrences = taggedColumns.get(name); + if (occurrences != null) { + pageColumns.put(name, occurrences); + } } - columnsByName.entrySet().removeIf(e -> e.getValue().isEmpty()); + + List gridItems = ColumnMetadataGrouper.groupColumns(pageColumns); + int totalOccurrences = gridItems.stream().mapToInt(ColumnGridItem::getTotalOccurrences).sum(); + + boolean hasMore = toIndex < totalUniqueColumns; + String cursor = hasMore ? encodeSearchOffset(toIndex) : null; + + return buildResponse(gridItems, cursor, hasMore, totalUniqueColumns, totalOccurrences); } /** - * Phase 1: Get entityFQN#columnName pairs that have the specified tags. Since ES flattens - * arrays, we must fetch column data and filter in Java to find columns that actually have the - * tag. + * Fetch columns with matching tags from _source. ES flat object mapping means we can't filter + * "column X has tag Y" at query level, so we read _source and check in Java. Since we already + * have the full document, we extract column metadata here — avoiding a separate data-fetch query. */ - private Set getEntityColumnPairsWithTags( + private Map> getColumnsWithTagsFromSource( ColumnAggregationRequest request, List entityTypes) throws IOException { - Set entityColumnPairs = new HashSet<>(); + Map> columnsByName = new HashMap<>(); Map> fieldPathToEntityTypes = groupByFieldPath(entityTypes); Set targetTags = buildTargetTagSet(request); @@ -309,9 +324,7 @@ private Set getEntityColumnPairsWithTags( Query query = buildTagFilterQuery(request, columnNameKeyword); try { - Set matchingPairs = - fetchEntityColumnPairsWithTags(indexes, query, columnFieldPath, targetTags); - entityColumnPairs.addAll(matchingPairs); + fetchColumnsWithTagsFromSource(indexes, query, columnFieldPath, targetTags, columnsByName); } catch (ElasticsearchException e) { if (!isIndexNotFoundException(e)) { throw e; @@ -319,7 +332,7 @@ private Set getEntityColumnPairsWithTags( } } - return entityColumnPairs; + return columnsByName; } private Set buildTargetTagSet(ColumnAggregationRequest request) { @@ -333,27 +346,28 @@ private Set buildTargetTagSet(ColumnAggregationRequest request) { return targetTags; } - private Set fetchEntityColumnPairsWithTags( - List indexes, Query query, String columnFieldPath, Set targetTags) + private void fetchColumnsWithTagsFromSource( + List indexes, + Query query, + String columnFieldPath, + Set targetTags, + Map> columnsByName) throws IOException { - Set entityColumnPairs = new HashSet<>(); SearchRequest searchRequest = SearchRequest.of(s -> s.index(indexes).query(query).size(10000)); SearchResponse response = client.search(searchRequest, JsonData.class); for (Hit hit : response.hits().hits()) { - extractMatchingEntityColumnPairs(hit, columnFieldPath, targetTags, entityColumnPairs); + extractMatchingColumnsFromHit(hit, columnFieldPath, targetTags, columnsByName); } - - return entityColumnPairs; } - private void extractMatchingEntityColumnPairs( + private void extractMatchingColumnsFromHit( Hit hit, String columnFieldPath, Set targetTags, - Set entityColumnPairs) { + Map> columnsByName) { if (hit.source() == null) { return; } @@ -364,19 +378,35 @@ private void extractMatchingEntityColumnPairs( return; } + String entityType = getTextField(sourceNode, "entityType"); + String entityDisplayName = getTextField(sourceNode, "displayName"); + String serviceName = getNestedField(sourceNode, "service", "name"); + String databaseName = getNestedField(sourceNode, "database", "name"); + String schemaName = getNestedField(sourceNode, "databaseSchema", "name"); + JsonNode columnsData = getNestedJsonNode(sourceNode, columnFieldPath); if (columnsData != null && columnsData.isArray()) { for (JsonNode columnData : columnsData) { String colName = getTextField(columnData, "name"); - boolean hasTag = columnHasTargetTag(columnData, targetTags); - if (hasTag && colName != null) { - entityColumnPairs.add(entityFQN + "#" + colName); + if (colName != null && columnHasTargetTag(columnData, targetTags)) { + Column column = parseColumn(columnData, entityFQN); + columnsByName + .computeIfAbsent(colName, k -> new ArrayList<>()) + .add( + new ColumnWithContext( + column, + entityType, + entityFQN, + entityDisplayName, + serviceName, + databaseName, + schemaName)); } } } } catch (Exception e) { - LOG.warn("Failed to extract entity column pairs from hit", e); + LOG.warn("Failed to extract columns from hit", e); } } @@ -403,13 +433,24 @@ private boolean containsIgnoreCase(Set set, String value) { return false; } - /** Build query specifically for tag filtering (Phase 1) */ + /** + * Build query for tag filtering source fetch. Includes all scope filters (service, database, + * schema, domain, entityType) so the _source fetch is scoped to the same data as the main query. + */ private Query buildTagFilterQuery(ColumnAggregationRequest request, String columnNameKeyword) { BoolQuery.Builder boolBuilder = new BoolQuery.Builder(); String columnFieldPath = columnNameKeyword.replace(".name.keyword", ""); boolBuilder.filter(Query.of(q -> q.exists(e -> e.field(columnFieldPath)))); + // Scope filters — must match the main query so we don't fetch columns outside the user's scope + addEntityTypeFilter(boolBuilder, request); + addServiceFilter(boolBuilder, request); + addServiceTypeFilter(boolBuilder, request); + addDatabaseFilter(boolBuilder, request); + addSchemaFilter(boolBuilder, request); + addDomainFilter(boolBuilder, request); + String tagFQNField = columnNameKeyword.replace(".name.keyword", ".tags.tagFQN"); List allTags = new ArrayList<>(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java index fd173c50934a..01e995fb6f95 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java @@ -79,46 +79,42 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr request.getTags(), request.getGlossaryTerms()); - // Two-phase query for tags/glossaryTerms filtering: - // Phase 1: Find entityFQN#columnName pairs that have the tag - // Phase 2: Filter to only return those specific occurrences + // Tag/glossary filter path: we must read _source to check which specific column has + // the tag (ES flat object mapping can't tell us). Since we're already reading _source, + // we extract full column metadata in the same pass — no separate data-fetch query needed. boolean hasTagFilter = !nullOrEmpty(request.getTags()) || !nullOrEmpty(request.getGlossaryTerms()); - Set entityColumnPairsWithTags = null; - List columnNamesWithTags = null; - - LOG.info("hasTagFilter={}", hasTagFilter); if (hasTagFilter) { - entityColumnPairsWithTags = getEntityColumnPairsWithTags(request); - LOG.info("Phase1 result: entityColumnPairsWithTags={}", entityColumnPairsWithTags); - if (entityColumnPairsWithTags.isEmpty()) { - LOG.info("No columns found with tags, returning empty response"); + Map> taggedColumns = getColumnsWithTagsFromSource(request); + if (taggedColumns.isEmpty()) { return buildResponse(new ArrayList<>(), null, false, 0, 0); } - // Also keep just the column names for the Phase 2 query filter - columnNamesWithTags = - entityColumnPairsWithTags.stream() - .map(pair -> pair.substring(pair.indexOf('#') + 1)) - .collect(java.util.stream.Collectors.toList()); + + // Pattern + tag combined: filter the already-fetched columns by pattern in Java + if (!nullOrEmpty(request.getColumnNamePattern())) { + String pattern = request.getColumnNamePattern().toLowerCase(java.util.Locale.ROOT); + taggedColumns + .entrySet() + .removeIf(e -> !e.getKey().toLowerCase(java.util.Locale.ROOT).contains(pattern)); + } + + return aggregateColumnsWithKnownNames(request, taggedColumns); } - // When a column name pattern is set, use terms aggregation with include regex - // for efficient server-side filtering instead of composite agg + post-filter + // Pattern-only path (no tag filter): use terms agg with include regex if (!nullOrEmpty(request.getColumnNamePattern())) { - return aggregateColumnsWithPattern(request, entityColumnPairsWithTags, columnNamesWithTags); + return aggregateColumnsWithPattern(request); } - // Non-search path: use composite aggregation - Query query = buildFilters(request, columnNamesWithTags); + // Browse path (no filters): use composite aggregation + Query query = buildFilters(request, null); try { SearchResponse response = executeSearch(request, query); Map> columnsByName = parseCompositeAggResults(response); - applyTagPostFilter(columnsByName, entityColumnPairsWithTags); - List gridItems = ColumnMetadataGrouper.groupColumns(columnsByName); String cursor = extractCursor(response); @@ -146,17 +142,14 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr } /** - * Search path: uses terms aggregation with include regex to filter column names at the - * aggregation level. Two queries: (1) lightweight names query to get all matching names and total - * count, (2) targeted data query with top_hits for the current page. + * Pattern-only search path (no tag filter): uses terms aggregation with include regex to filter + * column names at the aggregation level. Two queries: (1) lightweight names query to get all + * matching names and total count, (2) targeted data query with top_hits for the current page. */ - private ColumnGridResponse aggregateColumnsWithPattern( - ColumnAggregationRequest request, - Set entityColumnPairsWithTags, - List columnNamesWithTags) + private ColumnGridResponse aggregateColumnsWithPattern(ColumnAggregationRequest request) throws IOException { - Query query = buildFilters(request, columnNamesWithTags); + Query query = buildFilters(request, null); String regex = ColumnAggregator.toCaseInsensitiveRegex(request.getColumnNamePattern()); try { @@ -181,8 +174,6 @@ private ColumnGridResponse aggregateColumnsWithPattern( // Phase 2: Get data for this page's column names Map> columnsByName = executePageDataQuery(query, pageNames); - applyTagPostFilter(columnsByName, entityColumnPairsWithTags); - List gridItems = ColumnMetadataGrouper.groupColumns(columnsByName); int totalOccurrences = gridItems.stream().mapToInt(ColumnGridItem::getTotalOccurrences).sum(); @@ -199,42 +190,68 @@ private ColumnGridResponse aggregateColumnsWithPattern( } } - private void applyTagPostFilter( - Map> columnsByName, Set entityColumnPairsWithTags) { - if (entityColumnPairsWithTags == null || entityColumnPairsWithTags.isEmpty()) { - return; + /** + * Tag/glossary filter path: the tag-check pass already extracted full column metadata from + * _source (only tagged columns are in the map). Just paginate over the in-memory result. + */ + private ColumnGridResponse aggregateColumnsWithKnownNames( + ColumnAggregationRequest request, Map> taggedColumns) { + + Set allNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + allNames.addAll(taggedColumns.keySet()); + + int totalUniqueColumns = allNames.size(); + int offset = decodeSearchOffset(request.getCursor()); + int pageSize = request.getSize(); + + List sortedNames = new ArrayList<>(allNames); + int fromIndex = Math.min(offset, sortedNames.size()); + int toIndex = Math.min(offset + pageSize, sortedNames.size()); + List pageNames = sortedNames.subList(fromIndex, toIndex); + + if (pageNames.isEmpty()) { + return buildResponse(new ArrayList<>(), null, false, totalUniqueColumns, 0); } - for (List occurrences : columnsByName.values()) { - occurrences.removeIf( - ctx -> { - String key = ctx.entityFQN + "#" + ctx.column.getName(); - return !entityColumnPairsWithTags.contains(key); - }); + + // Slice only the current page from the full map + Map> pageColumns = new HashMap<>(); + for (String name : pageNames) { + List occurrences = taggedColumns.get(name); + if (occurrences != null) { + pageColumns.put(name, occurrences); + } } - columnsByName.entrySet().removeIf(e -> e.getValue().isEmpty()); + + List gridItems = ColumnMetadataGrouper.groupColumns(pageColumns); + int totalOccurrences = gridItems.stream().mapToInt(ColumnGridItem::getTotalOccurrences).sum(); + + boolean hasMore = toIndex < totalUniqueColumns; + String cursor = hasMore ? encodeSearchOffset(toIndex) : null; + + return buildResponse(gridItems, cursor, hasMore, totalUniqueColumns, totalOccurrences); } /** - * Phase 1: Get entityFQN#columnName pairs that have the specified tags. Since ES flattens arrays, - * we must fetch column data and filter in Java to find columns that actually have the tag. + * Fetch columns with matching tags from _source. ES flat object mapping means we can't filter + * "column X has tag Y" at query level, so we read _source and check in Java. Since we already + * have the full document, we extract column metadata here — avoiding a separate data-fetch query. */ - private Set getEntityColumnPairsWithTags(ColumnAggregationRequest request) - throws IOException { - Set entityColumnPairs = new HashSet<>(); + private Map> getColumnsWithTagsFromSource( + ColumnAggregationRequest request) throws IOException { + Map> columnsByName = new HashMap<>(); Set targetTags = buildTargetTagSet(request); Query query = buildTagFilterQuery(request); try { - Set matchingPairs = fetchEntityColumnPairsWithTags(query, targetTags); - entityColumnPairs.addAll(matchingPairs); + fetchColumnsWithTagsFromSource(query, targetTags, columnsByName); } catch (OpenSearchException e) { if (!isIndexNotFoundException(e)) { throw e; } } - return entityColumnPairs; + return columnsByName; } private Set buildTargetTagSet(ColumnAggregationRequest request) { @@ -255,9 +272,9 @@ private List resolveIndexNames() { .toList(); } - private Set fetchEntityColumnPairsWithTags(Query query, Set targetTags) + private void fetchColumnsWithTagsFromSource( + Query query, Set targetTags, Map> columnsByName) throws IOException { - Set entityColumnPairs = new HashSet<>(); List resolvedIndexes = resolveIndexNames(); SearchRequest searchRequest = @@ -265,27 +282,16 @@ private Set fetchEntityColumnPairsWithTags(Query query, Set targ SearchResponse response = client.search(searchRequest, JsonData.class); - long totalHits = response.hits().total() != null ? response.hits().total().value() : 0; - LOG.info( - "Phase1 fetchEntityColumnPairsWithTags: indexes={}, targetTags={}, totalHits={}", - resolvedIndexes, - targetTags, - totalHits); - for (os.org.opensearch.client.opensearch.core.search.Hit hit : response.hits().hits()) { - extractMatchingEntityColumnPairs(hit, targetTags, entityColumnPairs); + extractMatchingColumnsFromHit(hit, targetTags, columnsByName); } - - LOG.info("Phase1 fetchEntityColumnPairsWithTags: found pairs: {}", entityColumnPairs); - - return entityColumnPairs; } - private void extractMatchingEntityColumnPairs( + private void extractMatchingColumnsFromHit( os.org.opensearch.client.opensearch.core.search.Hit hit, Set targetTags, - Set entityColumnPairs) { + Map> columnsByName) { if (hit.source() == null) { return; } @@ -296,19 +302,35 @@ private void extractMatchingEntityColumnPairs( return; } + String entityType = getTextField(sourceNode, "entityType"); + String entityDisplayName = getTextField(sourceNode, "displayName"); + String serviceName = getNestedField(sourceNode, "service", "name"); + String databaseName = getNestedField(sourceNode, "database", "name"); + String schemaName = getNestedField(sourceNode, "databaseSchema", "name"); + JsonNode columnsData = sourceNode.get("columns"); if (columnsData != null && columnsData.isArray()) { for (JsonNode columnData : columnsData) { String colName = getTextField(columnData, "name"); - boolean hasTag = columnHasTargetTag(columnData, targetTags); - if (hasTag && colName != null) { - entityColumnPairs.add(entityFQN + "#" + colName); + if (colName != null && columnHasTargetTag(columnData, targetTags)) { + Column column = parseColumn(columnData, entityFQN); + columnsByName + .computeIfAbsent(colName, k -> new ArrayList<>()) + .add( + new ColumnWithContext( + column, + entityType, + entityFQN, + entityDisplayName, + serviceName, + databaseName, + schemaName)); } } } } catch (Exception e) { - LOG.warn("Failed to extract entity column pairs from hit", e); + LOG.warn("Failed to extract columns from hit", e); } } @@ -335,12 +357,23 @@ private boolean containsIgnoreCase(Set set, String value) { return false; } - /** Build query specifically for tag filtering (Phase 1) */ + /** + * Build query for tag filtering source fetch. Includes all scope filters (service, database, + * schema, domain, entityType) so the _source fetch is scoped to the same data as the main query. + */ private Query buildTagFilterQuery(ColumnAggregationRequest request) { BoolQuery.Builder boolBuilder = new BoolQuery.Builder(); boolBuilder.filter(Query.of(q -> q.exists(e -> e.field("columns")))); + // Scope filters — must match the main query so we don't fetch columns outside the user's scope + addEntityTypeFilter(boolBuilder, request); + addServiceFilter(boolBuilder, request); + addServiceTypeFilter(boolBuilder, request); + addDatabaseFilter(boolBuilder, request); + addSchemaFilter(boolBuilder, request); + addDomainFilter(boolBuilder, request); + List allTags = new ArrayList<>(); if (!nullOrEmpty(request.getTags())) { allTags.addAll(request.getTags()); From f7ad832acbd89550512c56424a3e472f53f1c689 Mon Sep 17 00:00:00 2001 From: sonika-shah <58761340+sonika-shah@users.noreply.github.com> Date: Sun, 19 Apr 2026 10:48:50 +0530 Subject: [PATCH 4/8] fix(columnGrid): stabilize totalOccurrences across pages and scope tag filter --- .../it/tests/ColumnGridResourceIT.java | 175 +++++++++++------- .../ElasticSearchColumnAggregator.java | 61 ++++-- .../OpenSearchColumnAggregator.java | 59 ++++-- 3 files changed, 191 insertions(+), 104 deletions(-) diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java index 814bf36f72c3..6376d4c09e73 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java @@ -1487,27 +1487,41 @@ void test_getColumnGrid_patternSearchIsCaseInsensitive(TestNamespace ns) throws waitForSearchIndexRefresh(); - // Search with all lowercase - ColumnGridResponse lowerResponse = - getColumnGrid( - client, - "entityTypes=table&columnNamePattern=casemixcol&serviceName=" + service.getName()); + await("Wait for lowercase pattern search to find mixed-case column") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(2)) + .untilAsserted( + () -> { + ColumnGridResponse lowerResponse = + getColumnGrid( + client, + "entityTypes=table&columnNamePattern=casemixcol&serviceName=" + + service.getName()); - assertNotNull(lowerResponse); - boolean foundLower = - lowerResponse.getColumns().stream().anyMatch(c -> c.getColumnName().equals(colName)); - assertTrue(foundLower, "Lowercase search should find the mixed-case column"); + assertNotNull(lowerResponse); + assertTrue( + lowerResponse.getColumns().stream() + .anyMatch(c -> c.getColumnName().equals(colName)), + "Lowercase search should find the mixed-case column"); + }); - // Search with all uppercase - ColumnGridResponse upperResponse = - getColumnGrid( - client, - "entityTypes=table&columnNamePattern=CASEMIXCOL&serviceName=" + service.getName()); + await("Wait for uppercase pattern search to find mixed-case column") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(2)) + .untilAsserted( + () -> { + ColumnGridResponse upperResponse = + getColumnGrid( + client, + "entityTypes=table&columnNamePattern=CASEMIXCOL&serviceName=" + + service.getName()); - assertNotNull(upperResponse); - boolean foundUpper = - upperResponse.getColumns().stream().anyMatch(c -> c.getColumnName().equals(colName)); - assertTrue(foundUpper, "Uppercase search should find the mixed-case column"); + assertNotNull(upperResponse); + assertTrue( + upperResponse.getColumns().stream() + .anyMatch(c -> c.getColumnName().equals(colName)), + "Uppercase search should find the mixed-case column"); + }); } @Test @@ -1529,18 +1543,26 @@ void test_getColumnGrid_patternSearchExcludesNonMatching(TestNamespace ns) throw waitForSearchIndexRefresh(); - ColumnGridResponse response = - getColumnGrid( - client, - "entityTypes=table&columnNamePattern=regex_target&serviceName=" + service.getName()); + await("Wait for pattern search to exclude non-matching columns") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(2)) + .untilAsserted( + () -> { + ColumnGridResponse response = + getColumnGrid( + client, + "entityTypes=table&columnNamePattern=regex_target&serviceName=" + + service.getName()); - assertNotNull(response); - boolean foundMatch = - response.getColumns().stream().anyMatch(c -> c.getColumnName().equals(matchCol)); - boolean foundNoMatch = - response.getColumns().stream().anyMatch(c -> c.getColumnName().equals(noMatchCol)); - assertTrue(foundMatch, "Matching column should be in results"); - assertFalse(foundNoMatch, "Non-matching column from same table should be excluded"); + assertNotNull(response); + assertTrue( + response.getColumns().stream().anyMatch(c -> c.getColumnName().equals(matchCol)), + "Matching column should be in results"); + assertFalse( + response.getColumns().stream() + .anyMatch(c -> c.getColumnName().equals(noMatchCol)), + "Non-matching column from same table should be excluded"); + }); } @Test @@ -1563,19 +1585,26 @@ void test_getColumnGrid_patternSearchWithSpecialChars(TestNamespace ns) throws E waitForSearchIndexRefresh(); // Search for "col.with" — dot should be literal, not wildcard - ColumnGridResponse response = - getColumnGrid( - client, - "entityTypes=table&columnNamePattern=col.with&serviceName=" + service.getName()); + await("Wait for pattern search with special chars") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(2)) + .untilAsserted( + () -> { + ColumnGridResponse response = + getColumnGrid( + client, + "entityTypes=table&columnNamePattern=col.with&serviceName=" + + service.getName()); - assertNotNull(response); - boolean foundDotCol = - response.getColumns().stream().anyMatch(c -> c.getColumnName().equals(colWithDot)); - boolean foundNoDotCol = - response.getColumns().stream().anyMatch(c -> c.getColumnName().equals(colNoDot)); - assertTrue(foundDotCol, "Column with literal dot should match"); - assertFalse( - foundNoDotCol, "Column without dot should not match — dot must be literal, not wildcard"); + assertNotNull(response); + assertTrue( + response.getColumns().stream() + .anyMatch(c -> c.getColumnName().equals(colWithDot)), + "Column with literal dot should match"); + assertFalse( + response.getColumns().stream().anyMatch(c -> c.getColumnName().equals(colNoDot)), + "Column without dot should not match — dot must be literal, not wildcard"); + }); } @Test @@ -1613,7 +1642,7 @@ void test_getColumnGrid_patternPlusTagFilter(TestNamespace ns) throws Exception .withColumns(List.of(col1, col2)) .execute(); - // Table 2: same column name as col1 but WITHOUT tag + // Table 2: untagged column whose name also matches the pattern Column col3 = Columns.build(untaggedMatchCol).withType(ColumnDataType.VARCHAR).withLength(255).create(); Tables.create() @@ -1624,36 +1653,46 @@ void test_getColumnGrid_patternPlusTagFilter(TestNamespace ns) throws Exception waitForSearchIndexRefresh(); - ColumnGridResponse response = - getColumnGrid( - client, - "entityTypes=table&tags=PII.Sensitive&columnNamePattern=pat_tag_match&serviceName=" - + service.getName()); + await("Wait for pattern + tag filter result") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(2)) + .untilAsserted( + () -> { + ColumnGridResponse response = + getColumnGrid( + client, + "entityTypes=table&tags=PII.Sensitive&columnNamePattern=pat_tag_match&serviceName=" + + service.getName()); - assertNotNull(response); + assertNotNull(response); - // Should find taggedMatchCol (matches pattern AND has tag) - // Should NOT find taggedNoMatchCol (has tag but doesn't match pattern) - // Should NOT find untaggedMatchCol (matches pattern but no tag) - boolean foundTaggedMatch = false; - boolean foundTaggedNoMatch = false; - boolean foundUntaggedMatch = false; + // Should find taggedMatchCol (matches pattern AND has tag) + // Should NOT find taggedNoMatchCol (has tag but doesn't match pattern) + // Should NOT find untaggedMatchCol (matches pattern but no tag) + boolean foundTaggedMatch = false; + boolean foundTaggedNoMatch = false; + boolean foundUntaggedMatch = false; - for (ColumnGridItem item : response.getColumns()) { - if (item.getColumnName().equals(taggedMatchCol)) { - foundTaggedMatch = true; - } - if (item.getColumnName().equals(taggedNoMatchCol)) { - foundTaggedNoMatch = true; - } - if (item.getColumnName().equals(untaggedMatchCol)) { - foundUntaggedMatch = true; - } - } + for (ColumnGridItem item : response.getColumns()) { + if (item.getColumnName().equals(taggedMatchCol)) { + foundTaggedMatch = true; + } + if (item.getColumnName().equals(taggedNoMatchCol)) { + foundTaggedNoMatch = true; + } + if (item.getColumnName().equals(untaggedMatchCol)) { + foundUntaggedMatch = true; + } + } - assertTrue(foundTaggedMatch, "Column with tag AND matching pattern should be in results"); - assertFalse(foundTaggedNoMatch, "Column with tag but NOT matching pattern should be excluded"); - assertFalse(foundUntaggedMatch, "Column matching pattern but WITHOUT tag should be excluded"); + assertTrue( + foundTaggedMatch, "Column with tag AND matching pattern should be in results"); + assertFalse( + foundTaggedNoMatch, + "Column with tag but NOT matching pattern should be excluded"); + assertFalse( + foundUntaggedMatch, "Column matching pattern but WITHOUT tag should be excluded"); + }); } @Test diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java index 23f2322ed69a..c0b7b4a50459 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java @@ -62,6 +62,12 @@ public class ElasticSearchColumnAggregator implements ColumnAggregator { /** Max column names to retrieve in the names-only query during pattern search. */ private static final int MAX_PATTERN_SEARCH_NAMES = 10000; + /** + * Number of sample docs pulled per column-name bucket to populate occurrences. Caps + * {@code ColumnGridItem.totalOccurrences}; columns appearing in more entities than this undercount. + */ + private static final int SAMPLE_DOCS_PER_COLUMN = 100; + /** Index configuration with field mappings for each entity type. Uses aliases defined in indexMapping.json */ private static final Map INDEX_CONFIGS = Map.of( @@ -197,8 +203,8 @@ private ColumnGridResponse aggregateColumnsWithPattern( Map> fieldPathToEntityTypes = groupByFieldPath(entityTypes); String regex = ColumnAggregator.toCaseInsensitiveRegex(request.getColumnNamePattern()); - // Phase 1: Collect all matching column names across entity type groups Set allMatchingNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + long totalOccurrencesAcrossGroups = 0; for (Map.Entry> entry : fieldPathToEntityTypes.entrySet()) { String columnNameKeyword = entry.getKey(); @@ -206,8 +212,9 @@ private ColumnGridResponse aggregateColumnsWithPattern( Query query = buildFilters(request, columnNameKeyword, null); try { - List names = executeNamesQuery(query, indexes, columnNameKeyword, regex); - allMatchingNames.addAll(names); + NamesWithCount result = executeNamesQuery(query, indexes, columnNameKeyword, regex); + allMatchingNames.addAll(result.names()); + totalOccurrencesAcrossGroups += result.totalDocCount(); } catch (ElasticsearchException e) { if (!isIndexNotFoundException(e)) { throw e; @@ -216,6 +223,7 @@ private ColumnGridResponse aggregateColumnsWithPattern( } int totalUniqueColumns = allMatchingNames.size(); + int totalOccurrences = (int) totalOccurrencesAcrossGroups; int offset = decodeSearchOffset(request.getCursor()); int pageSize = request.getSize(); @@ -225,10 +233,9 @@ private ColumnGridResponse aggregateColumnsWithPattern( List pageNames = sortedNames.subList(fromIndex, toIndex); if (pageNames.isEmpty()) { - return buildResponse(new ArrayList<>(), null, false, totalUniqueColumns, 0); + return buildResponse(new ArrayList<>(), null, false, totalUniqueColumns, totalOccurrences); } - // Phase 2: Fetch data for this page's column names Map> allColumnsByName = new HashMap<>(); for (Map.Entry> entry : fieldPathToEntityTypes.entrySet()) { @@ -254,7 +261,6 @@ private ColumnGridResponse aggregateColumnsWithPattern( } List gridItems = ColumnMetadataGrouper.groupColumns(allColumnsByName); - int totalOccurrences = gridItems.stream().mapToInt(ColumnGridItem::getTotalOccurrences).sum(); boolean hasMore = toIndex < totalUniqueColumns; String cursor = hasMore ? encodeSearchOffset(toIndex) : null; @@ -273,6 +279,7 @@ private ColumnGridResponse aggregateColumnsWithKnownNames( allNames.addAll(taggedColumns.keySet()); int totalUniqueColumns = allNames.size(); + int totalOccurrences = taggedColumns.values().stream().mapToInt(List::size).sum(); int offset = decodeSearchOffset(request.getCursor()); int pageSize = request.getSize(); @@ -282,20 +289,19 @@ private ColumnGridResponse aggregateColumnsWithKnownNames( List pageNames = sortedNames.subList(fromIndex, toIndex); if (pageNames.isEmpty()) { - return buildResponse(new ArrayList<>(), null, false, totalUniqueColumns, 0); + return buildResponse(new ArrayList<>(), null, false, totalUniqueColumns, totalOccurrences); } - // Slice only the current page from the full map Map> pageColumns = new HashMap<>(); for (String name : pageNames) { - List occurrences = taggedColumns.get(name); - if (occurrences != null) { - pageColumns.put(name, occurrences); + for (Map.Entry> entry : taggedColumns.entrySet()) { + if (entry.getKey().equalsIgnoreCase(name)) { + pageColumns.computeIfAbsent(name, k -> new ArrayList<>()).addAll(entry.getValue()); + } } } List gridItems = ColumnMetadataGrouper.groupColumns(pageColumns); - int totalOccurrences = gridItems.stream().mapToInt(ColumnGridItem::getTotalOccurrences).sum(); boolean hasMore = toIndex < totalUniqueColumns; String cursor = hasMore ? encodeSearchOffset(toIndex) : null; @@ -435,7 +441,10 @@ private boolean containsIgnoreCase(Set set, String value) { /** * Build query for tag filtering source fetch. Includes all scope filters (service, database, - * schema, domain, entityType) so the _source fetch is scoped to the same data as the main query. + * schema, domain, entityType), column-name pattern, and metadataStatus so the _source fetch is + * scoped to the same data as the main query. Per-column correlation (which specific column has + * the tag + matches the pattern) still happens in Java because flat object mapping prevents + * expressing it at query level. */ private Query buildTagFilterQuery(ColumnAggregationRequest request, String columnNameKeyword) { BoolQuery.Builder boolBuilder = new BoolQuery.Builder(); @@ -443,13 +452,14 @@ private Query buildTagFilterQuery(ColumnAggregationRequest request, String colum String columnFieldPath = columnNameKeyword.replace(".name.keyword", ""); boolBuilder.filter(Query.of(q -> q.exists(e -> e.field(columnFieldPath)))); - // Scope filters — must match the main query so we don't fetch columns outside the user's scope addEntityTypeFilter(boolBuilder, request); addServiceFilter(boolBuilder, request); addServiceTypeFilter(boolBuilder, request); addDatabaseFilter(boolBuilder, request); addSchemaFilter(boolBuilder, request); addDomainFilter(boolBuilder, request); + addColumnNamePatternFilter(boolBuilder, request, columnNameKeyword); + addMetadataStatusFilter(boolBuilder, request, columnFieldPath); String tagFQNField = columnNameKeyword.replace(".name.keyword", ".tags.tagFQN"); List allTags = new ArrayList<>(); @@ -690,8 +700,15 @@ private Query hasEmptyOrMissingField(String field) { .minimumShouldMatch("1"))); } + /** + * Phase 1 result: matching column names and the total doc_count summed across buckets. doc_count + * is the number of docs that contain each name; with flat-object mapping on the columns array and + * column names unique within an entity, this is a close proxy for total occurrences. + */ + private record NamesWithCount(List names, long totalDocCount) {} + /** Phase 1: Get all matching column names using terms agg with include regex (no top_hits). */ - private List executeNamesQuery( + private NamesWithCount executeNamesQuery( Query query, List indexes, String columnNameKeyword, String regex) throws IOException { @@ -715,14 +732,21 @@ private List executeNamesQuery( SearchResponse response = client.search(searchRequest, JsonData.class); List names = new ArrayList<>(); + long totalDocCount = 0; if (response.aggregations() != null && response.aggregations().containsKey("matching_columns")) { StringTermsAggregate termsResult = response.aggregations().get("matching_columns").sterms(); for (StringTermsBucket bucket : termsResult.buckets().array()) { names.add(bucket.key().stringValue()); + totalDocCount += bucket.docCount(); + } + if (names.size() == MAX_PATTERN_SEARCH_NAMES) { + LOG.warn( + "Column name pattern matched at least {} distinct names; results truncated", + MAX_PATTERN_SEARCH_NAMES); } } - return names; + return new NamesWithCount(names, totalDocCount); } /** Phase 2: Get data for specific column names using terms agg with exact include + top_hits. */ @@ -734,7 +758,7 @@ private Map> executePageDataQuery( List columnNames) throws IOException { - Aggregation topHitsAgg = Aggregation.of(a -> a.topHits(th -> th.size(10))); + Aggregation topHitsAgg = Aggregation.of(a -> a.topHits(th -> th.size(SAMPLE_DOCS_PER_COLUMN))); Aggregation termsAgg = Aggregation.of( @@ -792,7 +816,7 @@ private SearchResponse executeSearch( // Use full _source to avoid top_hits source-filter edge cases where combining root and nested // include paths can produce empty buckets. - Aggregation topHitsAgg = Aggregation.of(a -> a.topHits(th -> th.size(10))); + Aggregation topHitsAgg = Aggregation.of(a -> a.topHits(th -> th.size(SAMPLE_DOCS_PER_COLUMN))); Map subAggs = new HashMap<>(); subAggs.put("sample_docs", topHitsAgg); @@ -1142,6 +1166,7 @@ private int decodeSearchOffset(String cursor) { } return 0; } catch (Exception e) { + LOG.debug("Failed to decode search offset cursor, restarting from page 1", e); return 0; } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java index 01e995fb6f95..87fdeb9c0de8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java @@ -64,6 +64,12 @@ public class OpenSearchColumnAggregator implements ColumnAggregator { /** Max column names to retrieve in the names-only query during pattern search. */ private static final int MAX_PATTERN_SEARCH_NAMES = 10000; + /** + * Number of sample docs pulled per column-name bucket to populate occurrences. Caps + * {@code ColumnGridItem.totalOccurrences}; columns appearing in more entities than this undercount. + */ + private static final int SAMPLE_DOCS_PER_COLUMN = 100; + /** Uses aliases defined in indexMapping.json */ private static final List DATA_ASSET_INDEXES = Arrays.asList("table", "dashboardDataModel", "topic", "searchIndex", "container"); @@ -153,12 +159,12 @@ private ColumnGridResponse aggregateColumnsWithPattern(ColumnAggregationRequest String regex = ColumnAggregator.toCaseInsensitiveRegex(request.getColumnNamePattern()); try { - // Phase 1: Get all matching column names - List matchingNames = executeNamesQuery(query, regex); + NamesWithCount phase1 = executeNamesQuery(query, regex); Set dedupedNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); - dedupedNames.addAll(matchingNames); + dedupedNames.addAll(phase1.names()); int totalUniqueColumns = dedupedNames.size(); + int totalOccurrences = (int) phase1.totalDocCount(); int offset = decodeSearchOffset(request.getCursor()); int pageSize = request.getSize(); @@ -168,14 +174,12 @@ private ColumnGridResponse aggregateColumnsWithPattern(ColumnAggregationRequest List pageNames = sortedNames.subList(fromIndex, toIndex); if (pageNames.isEmpty()) { - return buildResponse(new ArrayList<>(), null, false, totalUniqueColumns, 0); + return buildResponse(new ArrayList<>(), null, false, totalUniqueColumns, totalOccurrences); } - // Phase 2: Get data for this page's column names Map> columnsByName = executePageDataQuery(query, pageNames); List gridItems = ColumnMetadataGrouper.groupColumns(columnsByName); - int totalOccurrences = gridItems.stream().mapToInt(ColumnGridItem::getTotalOccurrences).sum(); boolean hasMore = toIndex < totalUniqueColumns; String cursor = hasMore ? encodeSearchOffset(toIndex) : null; @@ -201,6 +205,7 @@ private ColumnGridResponse aggregateColumnsWithKnownNames( allNames.addAll(taggedColumns.keySet()); int totalUniqueColumns = allNames.size(); + int totalOccurrences = taggedColumns.values().stream().mapToInt(List::size).sum(); int offset = decodeSearchOffset(request.getCursor()); int pageSize = request.getSize(); @@ -210,20 +215,19 @@ private ColumnGridResponse aggregateColumnsWithKnownNames( List pageNames = sortedNames.subList(fromIndex, toIndex); if (pageNames.isEmpty()) { - return buildResponse(new ArrayList<>(), null, false, totalUniqueColumns, 0); + return buildResponse(new ArrayList<>(), null, false, totalUniqueColumns, totalOccurrences); } - // Slice only the current page from the full map Map> pageColumns = new HashMap<>(); for (String name : pageNames) { - List occurrences = taggedColumns.get(name); - if (occurrences != null) { - pageColumns.put(name, occurrences); + for (Map.Entry> entry : taggedColumns.entrySet()) { + if (entry.getKey().equalsIgnoreCase(name)) { + pageColumns.computeIfAbsent(name, k -> new ArrayList<>()).addAll(entry.getValue()); + } } } List gridItems = ColumnMetadataGrouper.groupColumns(pageColumns); - int totalOccurrences = gridItems.stream().mapToInt(ColumnGridItem::getTotalOccurrences).sum(); boolean hasMore = toIndex < totalUniqueColumns; String cursor = hasMore ? encodeSearchOffset(toIndex) : null; @@ -359,20 +363,24 @@ private boolean containsIgnoreCase(Set set, String value) { /** * Build query for tag filtering source fetch. Includes all scope filters (service, database, - * schema, domain, entityType) so the _source fetch is scoped to the same data as the main query. + * schema, domain, entityType), column-name pattern, and metadataStatus so the _source fetch is + * scoped to the same data as the main query. Per-column correlation (which specific column has + * the tag + matches the pattern) still happens in Java because flat object mapping prevents + * expressing it at query level. */ private Query buildTagFilterQuery(ColumnAggregationRequest request) { BoolQuery.Builder boolBuilder = new BoolQuery.Builder(); boolBuilder.filter(Query.of(q -> q.exists(e -> e.field("columns")))); - // Scope filters — must match the main query so we don't fetch columns outside the user's scope addEntityTypeFilter(boolBuilder, request); addServiceFilter(boolBuilder, request); addServiceTypeFilter(boolBuilder, request); addDatabaseFilter(boolBuilder, request); addSchemaFilter(boolBuilder, request); addDomainFilter(boolBuilder, request); + addColumnNamePatternFilter(boolBuilder, request); + addMetadataStatusFilter(boolBuilder, request); List allTags = new ArrayList<>(); if (!nullOrEmpty(request.getTags())) { @@ -596,8 +604,15 @@ private Query hasEmptyOrMissingField(String field) { .minimumShouldMatch("1"))); } + /** + * Phase 1 result: matching column names and the total doc_count summed across buckets. doc_count + * is the number of docs that contain each name; with flat-object mapping on the columns array and + * column names unique within an entity, this is a close proxy for total occurrences. + */ + private record NamesWithCount(List names, long totalDocCount) {} + /** Phase 1: Get all matching column names using terms agg with include regex (no top_hits). */ - private List executeNamesQuery(Query query, String regex) throws IOException { + private NamesWithCount executeNamesQuery(Query query, String regex) throws IOException { Aggregation termsAgg = Aggregation.of( a -> @@ -619,21 +634,28 @@ private List executeNamesQuery(Query query, String regex) throws IOExcep SearchResponse response = client.search(searchRequest, JsonData.class); List names = new ArrayList<>(); + long totalDocCount = 0; if (response.aggregations() != null && response.aggregations().containsKey("matching_columns")) { StringTermsAggregate termsResult = response.aggregations().get("matching_columns").sterms(); for (StringTermsBucket bucket : termsResult.buckets().array()) { names.add(bucket.key()); + totalDocCount += bucket.docCount(); + } + if (names.size() == MAX_PATTERN_SEARCH_NAMES) { + LOG.warn( + "Column name pattern matched at least {} distinct names; results truncated", + MAX_PATTERN_SEARCH_NAMES); } } - return names; + return new NamesWithCount(names, totalDocCount); } /** Phase 2: Get data for specific column names using terms agg with exact include + top_hits. */ private Map> executePageDataQuery( Query query, List columnNames) throws IOException { - Aggregation topHitsAgg = Aggregation.of(a -> a.topHits(th -> th.size(100))); + Aggregation topHitsAgg = Aggregation.of(a -> a.topHits(th -> th.size(SAMPLE_DOCS_PER_COLUMN))); Aggregation termsAgg = Aggregation.of( @@ -690,7 +712,7 @@ private SearchResponse executeSearch(ColumnAggregationRequest request, CompositeAggregationSource.of( cas -> cas.terms(t -> t.field("columns.name.keyword").order(SortOrder.Asc)))); - Aggregation topHitsAgg = Aggregation.of(a -> a.topHits(th -> th.size(100))); + Aggregation topHitsAgg = Aggregation.of(a -> a.topHits(th -> th.size(SAMPLE_DOCS_PER_COLUMN))); Map subAggs = new HashMap<>(); subAggs.put("sample_docs", topHitsAgg); @@ -1052,6 +1074,7 @@ private int decodeSearchOffset(String cursor) { } return 0; } catch (Exception e) { + LOG.debug("Failed to decode search offset cursor, restarting from page 1", e); return 0; } } From a2ffde0973f7cf86574d74af71243b507c0db601 Mon Sep 17 00:00:00 2001 From: sonika-shah <58761340+sonika-shah@users.noreply.github.com> Date: Mon, 20 Apr 2026 11:52:23 +0530 Subject: [PATCH 5/8] test(columnGrid): wrap glossary-filter assertions in await(), tighten false-green check --- .../it/tests/ColumnGridResourceIT.java | 80 ++++++++++++------- 1 file changed, 49 insertions(+), 31 deletions(-) diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java index 6376d4c09e73..0ec88f365fb0 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java @@ -1733,23 +1733,29 @@ void test_getColumnGrid_patternPlusGlossaryFilter(TestNamespace ns) throws Excep waitForSearchIndexRefresh(); - ColumnGridResponse response = - getColumnGrid( - client, - "entityTypes=table&glossaryTerms=" - + term.getFullyQualifiedName() - + "&columnNamePattern=pg_match&serviceName=" - + service.getName()); - - assertNotNull(response); + await("Wait for pattern + glossary filter result") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(2)) + .untilAsserted( + () -> { + ColumnGridResponse response = + getColumnGrid( + client, + "entityTypes=table&glossaryTerms=" + + term.getFullyQualifiedName() + + "&columnNamePattern=pg_match&serviceName=" + + service.getName()); - boolean foundMatch = - response.getColumns().stream().anyMatch(c -> c.getColumnName().equals(matchCol)); - boolean foundNoMatch = - response.getColumns().stream().anyMatch(c -> c.getColumnName().equals(noMatchCol)); + assertNotNull(response); - assertTrue(foundMatch, "Column matching both pattern and glossary should be in results"); - assertFalse(foundNoMatch, "Column with glossary but not matching pattern should be excluded"); + assertTrue( + response.getColumns().stream().anyMatch(c -> c.getColumnName().equals(matchCol)), + "Column matching both pattern and glossary should be in results"); + assertFalse( + response.getColumns().stream() + .anyMatch(c -> c.getColumnName().equals(noMatchCol)), + "Column with glossary but not matching pattern should be excluded"); + }); } @Test @@ -1870,24 +1876,36 @@ void test_getColumnGrid_glossaryFilter_onlyReturnsGlossaryOccurrences(TestNamesp waitForSearchIndexRefresh(); - ColumnGridResponse response = - getColumnGrid( - client, - "entityTypes=table&glossaryTerms=" - + term.getFullyQualifiedName() - + "&serviceName=" - + service.getName()); + await("Wait for glossary-filtered column to return the tagged occurrence only") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(2)) + .untilAsserted( + () -> { + ColumnGridResponse response = + getColumnGrid( + client, + "entityTypes=table&glossaryTerms=" + + term.getFullyQualifiedName() + + "&serviceName=" + + service.getName()); - assertNotNull(response); + assertNotNull(response); + assertNotNull(response.getColumns()); - for (ColumnGridItem item : response.getColumns()) { - if (item.getColumnName().equals(sharedName)) { - assertEquals( - 1, - item.getTotalOccurrences(), - "Should only return the occurrence WITH the glossary term, not all with same name"); - } - } + ColumnGridItem sharedItem = + response.getColumns().stream() + .filter(item -> item.getColumnName().equals(sharedName)) + .findFirst() + .orElse(null); + + assertNotNull( + sharedItem, + "Expected '" + sharedName + "' to be present in the glossary-filtered response"); + assertEquals( + 1, + sharedItem.getTotalOccurrences(), + "Should only return the occurrence WITH the glossary term, not all with same name"); + }); } private void waitForColumnToBeIndexed( From 2172bd3a22d6957cdb03a7950ebd617aa911a4a5 Mon Sep 17 00:00:00 2001 From: sonika-shah <58761340+sonika-shah@users.noreply.github.com> Date: Sun, 26 Apr 2026 19:16:59 +0530 Subject: [PATCH 6/8] fix(search): case-insensitive tag map + 10K cap warnings + int overflow guard --- .../it/tests/ColumnGridResourceIT.java | 120 ++++++++++++++ .../service/search/ColumnAggregator.java | 75 +++++++++ .../ElasticSearchColumnAggregator.java | 156 ++++++++---------- .../OpenSearchColumnAggregator.java | 152 ++++++++--------- 4 files changed, 331 insertions(+), 172 deletions(-) diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java index 0ec88f365fb0..29db482b852a 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java @@ -1908,6 +1908,126 @@ void test_getColumnGrid_glossaryFilter_onlyReturnsGlossaryOccurrences(TestNamesp }); } + @Test + void test_getColumnGrid_patternSearchAcrossEntityTypesDedupesNames(TestNamespace ns) + throws Exception { + OpenMetadataClient client = SdkClients.adminClient(); + + DatabaseService dbService = DatabaseServiceTestFactory.createPostgres(ns); + DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, dbService); + + String sharedName = ns.prefix("multi_type_col"); + + Column tableCol = + Columns.build(sharedName).withType(ColumnDataType.VARCHAR).withLength(255).create(); + Tables.create() + .name(ns.prefix("multi_type_table")) + .inSchema(schema.getFullyQualifiedName()) + .withColumns(List.of(tableCol)) + .execute(); + + DashboardService dashService = DashboardServiceTestFactory.createMetabase(ns); + Column dashCol = + Columns.build(sharedName).withType(ColumnDataType.VARCHAR).withLength(255).create(); + DashboardDataModels.create() + .name(ns.prefix("multi_type_datamodel")) + .in(dashService.getFullyQualifiedName()) + .withColumns(List.of(dashCol)) + .withDataModelType(DataModelType.MetabaseDataModel) + .execute(); + + waitForSearchIndexRefresh(); + + await("Wait for both entities to be indexed and dedupe correctly") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(2)) + .untilAsserted( + () -> { + ColumnGridResponse response = + getColumnGrid( + client, + "entityTypes=table,dashboardDataModel&columnNamePattern=multi_type_col"); + + assertNotNull(response); + + long matches = + response.getColumns().stream() + .filter(c -> c.getColumnName().equals(sharedName)) + .count(); + + assertEquals( + 1, matches, "Same column name in two entity types must dedupe to one grid entry"); + + ColumnGridItem item = + response.getColumns().stream() + .filter(c -> c.getColumnName().equals(sharedName)) + .findFirst() + .orElseThrow(); + + assertEquals( + 2, + item.getTotalOccurrences(), + "Per-column occurrences must include both entity types"); + assertTrue( + response.getTotalOccurrences() >= 2, + "Response totalOccurrences must include both entity-type buckets"); + }); + } + + @Test + void test_getColumnGrid_patternSearchFindsAlphabeticallyLateColumn(TestNamespace ns) + throws Exception { + OpenMetadataClient client = SdkClients.adminClient(); + DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns); + DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service); + + int columnCount = 200; + String matchPattern = ns.prefix("zzz_target"); + String matchedColumn = matchPattern; + + java.util.List columns = new java.util.ArrayList<>(); + for (int i = 0; i < columnCount - 1; i++) { + columns.add( + Columns.build(ns.prefix(String.format("aaa_filler_%04d", i))) + .withType(ColumnDataType.VARCHAR) + .withLength(255) + .create()); + } + columns.add( + Columns.build(matchedColumn).withType(ColumnDataType.VARCHAR).withLength(255).create()); + + Tables.create() + .name(ns.prefix("scale_search_table")) + .inSchema(schema.getFullyQualifiedName()) + .withColumns(columns) + .execute(); + + waitForSearchIndexRefresh(); + + await("Wait for first-page search to surface alphabetically-late match (size=25)") + .atMost(Duration.ofSeconds(45)) + .pollInterval(Duration.ofSeconds(2)) + .untilAsserted( + () -> { + ColumnGridResponse response = + getColumnGrid( + client, + "entityTypes=table&columnNamePattern=zzz_target&size=25&serviceName=" + + service.getName()); + + assertNotNull(response); + assertTrue( + response.getColumns().stream() + .anyMatch(c -> c.getColumnName().equals(matchedColumn)), + "First page must contain the alphabetically-late matching column " + + "(this exercises the original bug fix — composite agg would have hidden it)"); + assertEquals( + 1, + response.getTotalUniqueColumns(), + "Only one unique column matches the pattern"); + }); + } + private void waitForColumnToBeIndexed( OpenMetadataClient client, String columnName, String serviceName) { await() diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/ColumnAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/ColumnAggregator.java index 546298503ac1..f673b20c33e2 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/ColumnAggregator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/ColumnAggregator.java @@ -13,12 +13,43 @@ package org.openmetadata.service.search; +import com.fasterxml.jackson.core.type.TypeReference; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; import java.util.List; +import java.util.Map; import org.openmetadata.schema.api.data.ColumnGridResponse; +import org.openmetadata.schema.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public interface ColumnAggregator { + Logger LOG = LoggerFactory.getLogger(ColumnAggregator.class); + + /** Max column names to retrieve in the names-only query during pattern search. */ + int MAX_PATTERN_SEARCH_NAMES = 10000; + + /** + * Number of sample docs pulled per column-name bucket to populate occurrences. Caps + * {@code ColumnGridItem.totalOccurrences}; columns appearing in more entities than this + * undercount. + */ + int SAMPLE_DOCS_PER_COLUMN = 100; + + /** Aggregation names used in pattern-search queries (ES + OS). */ + String AGG_MATCHING_COLUMNS = "matching_columns"; + + String AGG_PAGE_COLUMNS = "page_columns"; + String AGG_SAMPLE_DOCS = "sample_docs"; + String AGG_KEY_ORDER = "_key"; + + /** Cursor payload key for the offset-based search/tag pagination cursor. */ + String CURSOR_SEARCH_OFFSET = "searchOffset"; + + TypeReference> CURSOR_TYPE = new TypeReference<>() {}; + ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) throws IOException; /** @@ -43,6 +74,50 @@ static String toCaseInsensitiveRegex(String pattern) { return sb.toString(); } + /** Encode an offset into the search/tag pagination cursor (base64 JSON). */ + static String encodeSearchOffset(int offset) { + try { + String json = JsonUtils.pojoToJson(Map.of(CURSOR_SEARCH_OFFSET, offset)); + return Base64.getEncoder().encodeToString(json.getBytes(StandardCharsets.UTF_8)); + } catch (Exception e) { + LOG.error("Failed to encode search offset", e); + return null; + } + } + + /** Decode the search/tag pagination cursor; restart at 0 for malformed input. */ + static int decodeSearchOffset(String cursor) { + if (cursor == null) { + return 0; + } + try { + String json = new String(Base64.getDecoder().decode(cursor), StandardCharsets.UTF_8); + Map map = JsonUtils.readValue(json, CURSOR_TYPE); + Object offset = map.get(CURSOR_SEARCH_OFFSET); + if (offset instanceof Number num) { + return num.intValue(); + } + return 0; + } catch (Exception e) { + LOG.debug("Failed to decode search offset cursor, restarting from page 1", e); + return 0; + } + } + + /** Saturating long → int cast for response totals. Caps at Integer.MAX_VALUE. */ + static int toIntSaturating(long value) { + if (value > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + if (value < 0) { + return 0; + } + return (int) value; + } + + /** Phase 1 result: matching column names and the total doc_count summed across buckets. */ + record NamesWithCount(List names, long totalDocCount) {} + class ColumnAggregationRequest { private int size = 1000; private String cursor; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java index c0b7b4a50459..317e43c9ea59 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java @@ -15,6 +15,7 @@ import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import es.co.elastic.clients.elasticsearch.ElasticsearchClient; import es.co.elastic.clients.elasticsearch._types.ElasticsearchException; @@ -33,6 +34,7 @@ import es.co.elastic.clients.elasticsearch.core.SearchResponse; import es.co.elastic.clients.elasticsearch.core.search.Hit; import es.co.elastic.clients.json.JsonData; +import es.co.elastic.clients.util.NamedValue; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -40,8 +42,10 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.TreeSet; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.api.data.ColumnGridItem; @@ -59,15 +63,6 @@ public class ElasticSearchColumnAggregator implements ColumnAggregator { private final ElasticsearchClient client; - /** Max column names to retrieve in the names-only query during pattern search. */ - private static final int MAX_PATTERN_SEARCH_NAMES = 10000; - - /** - * Number of sample docs pulled per column-name bucket to populate occurrences. Caps - * {@code ColumnGridItem.totalOccurrences}; columns appearing in more entities than this undercount. - */ - private static final int SAMPLE_DOCS_PER_COLUMN = 100; - /** Index configuration with field mappings for each entity type. Uses aliases defined in indexMapping.json */ private static final Map INDEX_CONFIGS = Map.of( @@ -114,10 +109,10 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr // Pattern + tag combined: filter the already-fetched columns by pattern in Java if (!nullOrEmpty(request.getColumnNamePattern())) { - String pattern = request.getColumnNamePattern().toLowerCase(java.util.Locale.ROOT); + String pattern = request.getColumnNamePattern().toLowerCase(Locale.ROOT); taggedColumns .entrySet() - .removeIf(e -> !e.getKey().toLowerCase(java.util.Locale.ROOT).contains(pattern)); + .removeIf(e -> !e.getKey().toLowerCase(Locale.ROOT).contains(pattern)); } return aggregateColumnsWithKnownNames(request, taggedColumns); @@ -223,8 +218,8 @@ private ColumnGridResponse aggregateColumnsWithPattern( } int totalUniqueColumns = allMatchingNames.size(); - int totalOccurrences = (int) totalOccurrencesAcrossGroups; - int offset = decodeSearchOffset(request.getCursor()); + int totalOccurrences = ColumnAggregator.toIntSaturating(totalOccurrencesAcrossGroups); + int offset = ColumnAggregator.decodeSearchOffset(request.getCursor()); int pageSize = request.getSize(); List sortedNames = new ArrayList<>(allMatchingNames); @@ -263,7 +258,7 @@ private ColumnGridResponse aggregateColumnsWithPattern( List gridItems = ColumnMetadataGrouper.groupColumns(allColumnsByName); boolean hasMore = toIndex < totalUniqueColumns; - String cursor = hasMore ? encodeSearchOffset(toIndex) : null; + String cursor = hasMore ? ColumnAggregator.encodeSearchOffset(toIndex) : null; return buildResponse(gridItems, cursor, hasMore, totalUniqueColumns, totalOccurrences); } @@ -271,19 +266,19 @@ private ColumnGridResponse aggregateColumnsWithPattern( /** * Tag/glossary filter path: the tag-check pass already extracted full column metadata from * _source (only tagged columns are in the map). Just paginate over the in-memory result. + * + *

{@code taggedColumns} is a case-insensitive map: when two entities have columns differing + * only in case (e.g. "User" / "user"), occurrences are merged under a single key. */ private ColumnGridResponse aggregateColumnsWithKnownNames( ColumnAggregationRequest request, Map> taggedColumns) { - Set allNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); - allNames.addAll(taggedColumns.keySet()); - - int totalUniqueColumns = allNames.size(); + int totalUniqueColumns = taggedColumns.size(); int totalOccurrences = taggedColumns.values().stream().mapToInt(List::size).sum(); - int offset = decodeSearchOffset(request.getCursor()); + int offset = ColumnAggregator.decodeSearchOffset(request.getCursor()); int pageSize = request.getSize(); - List sortedNames = new ArrayList<>(allNames); + List sortedNames = new ArrayList<>(taggedColumns.keySet()); int fromIndex = Math.min(offset, sortedNames.size()); int toIndex = Math.min(offset + pageSize, sortedNames.size()); List pageNames = sortedNames.subList(fromIndex, toIndex); @@ -294,17 +289,16 @@ private ColumnGridResponse aggregateColumnsWithKnownNames( Map> pageColumns = new HashMap<>(); for (String name : pageNames) { - for (Map.Entry> entry : taggedColumns.entrySet()) { - if (entry.getKey().equalsIgnoreCase(name)) { - pageColumns.computeIfAbsent(name, k -> new ArrayList<>()).addAll(entry.getValue()); - } + List occurrences = taggedColumns.get(name); + if (occurrences != null) { + pageColumns.put(name, occurrences); } } List gridItems = ColumnMetadataGrouper.groupColumns(pageColumns); boolean hasMore = toIndex < totalUniqueColumns; - String cursor = hasMore ? encodeSearchOffset(toIndex) : null; + String cursor = hasMore ? ColumnAggregator.encodeSearchOffset(toIndex) : null; return buildResponse(gridItems, cursor, hasMore, totalUniqueColumns, totalOccurrences); } @@ -313,10 +307,14 @@ private ColumnGridResponse aggregateColumnsWithKnownNames( * Fetch columns with matching tags from _source. ES flat object mapping means we can't filter * "column X has tag Y" at query level, so we read _source and check in Java. Since we already * have the full document, we extract column metadata here — avoiding a separate data-fetch query. + * + *

Returns a case-insensitive map so that columns differing only in case (e.g. "User" / "user") + * group together, matching how the search/browse paths display them. */ private Map> getColumnsWithTagsFromSource( ColumnAggregationRequest request, List entityTypes) throws IOException { - Map> columnsByName = new HashMap<>(); + Map> columnsByName = + new TreeMap<>(String.CASE_INSENSITIVE_ORDER); Map> fieldPathToEntityTypes = groupByFieldPath(entityTypes); Set targetTags = buildTargetTagSet(request); @@ -360,9 +358,19 @@ private void fetchColumnsWithTagsFromSource( Map> columnsByName) throws IOException { + // Capped at index.max_result_window (default 10k). For tag/glossary filtering this is the + // max number of *tagged entities* we can scan per group; columns from later entities are + // not considered. Tracked separately — would need search_after / scroll to remove this cap. SearchRequest searchRequest = SearchRequest.of(s -> s.index(indexes).query(query).size(10000)); SearchResponse response = client.search(searchRequest, JsonData.class); + long totalHits = response.hits().total() != null ? response.hits().total().value() : 0; + if (totalHits > 10000) { + LOG.warn( + "Tag/glossary source-fetch matched {} entities; only first 10000 scanned for tagged " + + "columns (index.max_result_window). Later entities will not be included.", + totalHits); + } for (Hit hit : response.hits().hits()) { extractMatchingColumnsFromHit(hit, columnFieldPath, targetTags, columnsByName); @@ -700,15 +708,8 @@ private Query hasEmptyOrMissingField(String field) { .minimumShouldMatch("1"))); } - /** - * Phase 1 result: matching column names and the total doc_count summed across buckets. doc_count - * is the number of docs that contain each name; with flat-object mapping on the columns array and - * column names unique within an entity, this is a close proxy for total occurrences. - */ - private record NamesWithCount(List names, long totalDocCount) {} - /** Phase 1: Get all matching column names using terms agg with include regex (no top_hits). */ - private NamesWithCount executeNamesQuery( + private ColumnAggregator.NamesWithCount executeNamesQuery( Query query, List indexes, String columnNameKeyword, String regex) throws IOException { @@ -719,34 +720,39 @@ private NamesWithCount executeNamesQuery( t -> t.field(columnNameKeyword) .include(inc -> inc.regexp(regex)) - .size(MAX_PATTERN_SEARCH_NAMES) + .size(ColumnAggregator.MAX_PATTERN_SEARCH_NAMES) .order( List.of( - es.co.elastic.clients.util.NamedValue.of( - "_key", SortOrder.Asc))))); + NamedValue.of( + ColumnAggregator.AGG_KEY_ORDER, SortOrder.Asc))))); SearchRequest searchRequest = SearchRequest.of( - s -> s.index(indexes).query(query).aggregations("matching_columns", termsAgg).size(0)); + s -> + s.index(indexes) + .query(query) + .aggregations(ColumnAggregator.AGG_MATCHING_COLUMNS, termsAgg) + .size(0)); SearchResponse response = client.search(searchRequest, JsonData.class); List names = new ArrayList<>(); long totalDocCount = 0; if (response.aggregations() != null - && response.aggregations().containsKey("matching_columns")) { - StringTermsAggregate termsResult = response.aggregations().get("matching_columns").sterms(); + && response.aggregations().containsKey(ColumnAggregator.AGG_MATCHING_COLUMNS)) { + StringTermsAggregate termsResult = + response.aggregations().get(ColumnAggregator.AGG_MATCHING_COLUMNS).sterms(); for (StringTermsBucket bucket : termsResult.buckets().array()) { names.add(bucket.key().stringValue()); totalDocCount += bucket.docCount(); } - if (names.size() == MAX_PATTERN_SEARCH_NAMES) { + if (names.size() == ColumnAggregator.MAX_PATTERN_SEARCH_NAMES) { LOG.warn( "Column name pattern matched at least {} distinct names; results truncated", - MAX_PATTERN_SEARCH_NAMES); + ColumnAggregator.MAX_PATTERN_SEARCH_NAMES); } } - return new NamesWithCount(names, totalDocCount); + return new ColumnAggregator.NamesWithCount(names, totalDocCount); } /** Phase 2: Get data for specific column names using terms agg with exact include + top_hits. */ @@ -758,7 +764,8 @@ private Map> executePageDataQuery( List columnNames) throws IOException { - Aggregation topHitsAgg = Aggregation.of(a -> a.topHits(th -> th.size(SAMPLE_DOCS_PER_COLUMN))); + Aggregation topHitsAgg = + Aggregation.of(a -> a.topHits(th -> th.size(ColumnAggregator.SAMPLE_DOCS_PER_COLUMN))); Aggregation termsAgg = Aggregation.of( @@ -768,11 +775,15 @@ private Map> executePageDataQuery( t.field(columnNameKeyword) .include(inc -> inc.terms(columnNames)) .size(columnNames.size())) - .aggregations("sample_docs", topHitsAgg)); + .aggregations(ColumnAggregator.AGG_SAMPLE_DOCS, topHitsAgg)); SearchRequest searchRequest = SearchRequest.of( - s -> s.index(indexes).query(query).aggregations("page_columns", termsAgg).size(0)); + s -> + s.index(indexes) + .query(query) + .aggregations(ColumnAggregator.AGG_PAGE_COLUMNS, termsAgg) + .size(0)); SearchResponse response = client.search(searchRequest, JsonData.class); @@ -783,20 +794,23 @@ private Map> parseTermsAggResults( SearchResponse response, String columnFieldPath) { Map> columnsByName = new HashMap<>(); - if (response.aggregations() == null || !response.aggregations().containsKey("page_columns")) { + if (response.aggregations() == null + || !response.aggregations().containsKey(ColumnAggregator.AGG_PAGE_COLUMNS)) { return columnsByName; } - StringTermsAggregate termsAgg = response.aggregations().get("page_columns").sterms(); + StringTermsAggregate termsAgg = + response.aggregations().get(ColumnAggregator.AGG_PAGE_COLUMNS).sterms(); for (StringTermsBucket bucket : termsAgg.buckets().array()) { String columnName = bucket.key().stringValue(); - if (!bucket.aggregations().containsKey("sample_docs")) { + if (!bucket.aggregations().containsKey(ColumnAggregator.AGG_SAMPLE_DOCS)) { continue; } - TopHitsAggregate topHits = bucket.aggregations().get("sample_docs").topHits(); + TopHitsAggregate topHits = + bucket.aggregations().get(ColumnAggregator.AGG_SAMPLE_DOCS).topHits(); parseBucketHits(columnName, topHits, columnFieldPath, columnsByName); } @@ -806,20 +820,20 @@ private Map> parseTermsAggResults( private SearchResponse executeSearch( ColumnAggregationRequest request, Query query, List indexes, String columnNameKeyword) throws IOException { - List> sources = - new ArrayList<>(); + List> sources = new ArrayList<>(); sources.add( - es.co.elastic.clients.util.NamedValue.of( + NamedValue.of( "column_name", CompositeAggregationSource.of( cas -> cas.terms(t -> t.field(columnNameKeyword).order(SortOrder.Asc))))); // Use full _source to avoid top_hits source-filter edge cases where combining root and nested // include paths can produce empty buckets. - Aggregation topHitsAgg = Aggregation.of(a -> a.topHits(th -> th.size(SAMPLE_DOCS_PER_COLUMN))); + Aggregation topHitsAgg = + Aggregation.of(a -> a.topHits(th -> th.size(ColumnAggregator.SAMPLE_DOCS_PER_COLUMN))); Map subAggs = new HashMap<>(); - subAggs.put("sample_docs", topHitsAgg); + subAggs.put(ColumnAggregator.AGG_SAMPLE_DOCS, topHitsAgg); Map afterKey = request.getCursor() != null ? decodeCursor(request.getCursor()) : null; @@ -1081,12 +1095,11 @@ private String encodeCursor(Map afterKey) { } } - @SuppressWarnings("unchecked") private Map decodeCursor(String cursor) { try { byte[] decoded = Base64.getDecoder().decode(cursor); String json = new String(decoded, StandardCharsets.UTF_8); - Map stringMap = JsonUtils.readValue(json, Map.class); + Map stringMap = JsonUtils.readValue(json, new TypeReference<>() {}); Map result = new HashMap<>(); for (Map.Entry entry : stringMap.entrySet()) { result.put(entry.getKey(), FieldValue.of(entry.getValue())); @@ -1142,35 +1155,6 @@ private Map getTotalCounts( return totals; } - private String encodeSearchOffset(int offset) { - try { - String json = JsonUtils.pojoToJson(Map.of("searchOffset", offset)); - return Base64.getEncoder().encodeToString(json.getBytes(StandardCharsets.UTF_8)); - } catch (Exception e) { - LOG.error("Failed to encode search offset", e); - return null; - } - } - - @SuppressWarnings("unchecked") - private int decodeSearchOffset(String cursor) { - if (cursor == null) { - return 0; - } - try { - String json = new String(Base64.getDecoder().decode(cursor), StandardCharsets.UTF_8); - Map map = JsonUtils.readValue(json, Map.class); - Object offset = map.get("searchOffset"); - if (offset instanceof Number num) { - return num.intValue(); - } - return 0; - } catch (Exception e) { - LOG.debug("Failed to decode search offset cursor, restarting from page 1", e); - return 0; - } - } - private ColumnGridResponse buildResponse( List gridItems, String cursor, diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java index 87fdeb9c0de8..8964d40d3995 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java @@ -25,8 +25,10 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.TreeSet; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.api.data.ColumnGridItem; @@ -61,15 +63,6 @@ public class OpenSearchColumnAggregator implements ColumnAggregator { private final OpenSearchClient client; - /** Max column names to retrieve in the names-only query during pattern search. */ - private static final int MAX_PATTERN_SEARCH_NAMES = 10000; - - /** - * Number of sample docs pulled per column-name bucket to populate occurrences. Caps - * {@code ColumnGridItem.totalOccurrences}; columns appearing in more entities than this undercount. - */ - private static final int SAMPLE_DOCS_PER_COLUMN = 100; - /** Uses aliases defined in indexMapping.json */ private static final List DATA_ASSET_INDEXES = Arrays.asList("table", "dashboardDataModel", "topic", "searchIndex", "container"); @@ -99,10 +92,10 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr // Pattern + tag combined: filter the already-fetched columns by pattern in Java if (!nullOrEmpty(request.getColumnNamePattern())) { - String pattern = request.getColumnNamePattern().toLowerCase(java.util.Locale.ROOT); + String pattern = request.getColumnNamePattern().toLowerCase(Locale.ROOT); taggedColumns .entrySet() - .removeIf(e -> !e.getKey().toLowerCase(java.util.Locale.ROOT).contains(pattern)); + .removeIf(e -> !e.getKey().toLowerCase(Locale.ROOT).contains(pattern)); } return aggregateColumnsWithKnownNames(request, taggedColumns); @@ -113,7 +106,9 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr return aggregateColumnsWithPattern(request); } - // Browse path (no filters): use composite aggregation + // Browse path with no pattern or tag/glossary filter: still applies request scope filters + // (service/database/schema/domain/entityType/metadataStatus, etc.) and uses composite + // aggregation for engine-side pagination via after_key. Query query = buildFilters(request, null); try { @@ -159,13 +154,13 @@ private ColumnGridResponse aggregateColumnsWithPattern(ColumnAggregationRequest String regex = ColumnAggregator.toCaseInsensitiveRegex(request.getColumnNamePattern()); try { - NamesWithCount phase1 = executeNamesQuery(query, regex); + ColumnAggregator.NamesWithCount phase1 = executeNamesQuery(query, regex); Set dedupedNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); dedupedNames.addAll(phase1.names()); int totalUniqueColumns = dedupedNames.size(); - int totalOccurrences = (int) phase1.totalDocCount(); - int offset = decodeSearchOffset(request.getCursor()); + int totalOccurrences = ColumnAggregator.toIntSaturating(phase1.totalDocCount()); + int offset = ColumnAggregator.decodeSearchOffset(request.getCursor()); int pageSize = request.getSize(); List sortedNames = new ArrayList<>(dedupedNames); @@ -182,7 +177,7 @@ private ColumnGridResponse aggregateColumnsWithPattern(ColumnAggregationRequest List gridItems = ColumnMetadataGrouper.groupColumns(columnsByName); boolean hasMore = toIndex < totalUniqueColumns; - String cursor = hasMore ? encodeSearchOffset(toIndex) : null; + String cursor = hasMore ? ColumnAggregator.encodeSearchOffset(toIndex) : null; return buildResponse(gridItems, cursor, hasMore, totalUniqueColumns, totalOccurrences); } catch (OpenSearchException e) { @@ -197,19 +192,19 @@ private ColumnGridResponse aggregateColumnsWithPattern(ColumnAggregationRequest /** * Tag/glossary filter path: the tag-check pass already extracted full column metadata from * _source (only tagged columns are in the map). Just paginate over the in-memory result. + * + *

{@code taggedColumns} is a case-insensitive map: when two entities have columns differing + * only in case (e.g. "User" / "user"), occurrences are merged under a single key. */ private ColumnGridResponse aggregateColumnsWithKnownNames( ColumnAggregationRequest request, Map> taggedColumns) { - Set allNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); - allNames.addAll(taggedColumns.keySet()); - - int totalUniqueColumns = allNames.size(); + int totalUniqueColumns = taggedColumns.size(); int totalOccurrences = taggedColumns.values().stream().mapToInt(List::size).sum(); - int offset = decodeSearchOffset(request.getCursor()); + int offset = ColumnAggregator.decodeSearchOffset(request.getCursor()); int pageSize = request.getSize(); - List sortedNames = new ArrayList<>(allNames); + List sortedNames = new ArrayList<>(taggedColumns.keySet()); int fromIndex = Math.min(offset, sortedNames.size()); int toIndex = Math.min(offset + pageSize, sortedNames.size()); List pageNames = sortedNames.subList(fromIndex, toIndex); @@ -220,17 +215,16 @@ private ColumnGridResponse aggregateColumnsWithKnownNames( Map> pageColumns = new HashMap<>(); for (String name : pageNames) { - for (Map.Entry> entry : taggedColumns.entrySet()) { - if (entry.getKey().equalsIgnoreCase(name)) { - pageColumns.computeIfAbsent(name, k -> new ArrayList<>()).addAll(entry.getValue()); - } + List occurrences = taggedColumns.get(name); + if (occurrences != null) { + pageColumns.put(name, occurrences); } } List gridItems = ColumnMetadataGrouper.groupColumns(pageColumns); boolean hasMore = toIndex < totalUniqueColumns; - String cursor = hasMore ? encodeSearchOffset(toIndex) : null; + String cursor = hasMore ? ColumnAggregator.encodeSearchOffset(toIndex) : null; return buildResponse(gridItems, cursor, hasMore, totalUniqueColumns, totalOccurrences); } @@ -239,10 +233,14 @@ private ColumnGridResponse aggregateColumnsWithKnownNames( * Fetch columns with matching tags from _source. ES flat object mapping means we can't filter * "column X has tag Y" at query level, so we read _source and check in Java. Since we already * have the full document, we extract column metadata here — avoiding a separate data-fetch query. + * + *

Returns a case-insensitive map so that columns differing only in case (e.g. "User" / "user") + * group together, matching how the search/browse paths display them. */ private Map> getColumnsWithTagsFromSource( ColumnAggregationRequest request) throws IOException { - Map> columnsByName = new HashMap<>(); + Map> columnsByName = + new TreeMap<>(String.CASE_INSENSITIVE_ORDER); Set targetTags = buildTargetTagSet(request); Query query = buildTagFilterQuery(request); @@ -281,10 +279,20 @@ private void fetchColumnsWithTagsFromSource( throws IOException { List resolvedIndexes = resolveIndexNames(); + // Capped at index.max_result_window (default 10k). For tag/glossary filtering this is the + // max number of *tagged entities* we can scan; columns from later entities are not + // considered. Tracked separately — would need search_after / scroll to remove this cap. SearchRequest searchRequest = SearchRequest.of(s -> s.index(resolvedIndexes).query(query).size(10000)); SearchResponse response = client.search(searchRequest, JsonData.class); + long totalHits = response.hits().total() != null ? response.hits().total().value() : 0; + if (totalHits > 10000) { + LOG.warn( + "Tag/glossary source-fetch matched {} entities; only first 10000 scanned for tagged " + + "columns (index.max_result_window). Later entities will not be included.", + totalHits); + } for (os.org.opensearch.client.opensearch.core.search.Hit hit : response.hits().hits()) { @@ -604,15 +612,9 @@ private Query hasEmptyOrMissingField(String field) { .minimumShouldMatch("1"))); } - /** - * Phase 1 result: matching column names and the total doc_count summed across buckets. doc_count - * is the number of docs that contain each name; with flat-object mapping on the columns array and - * column names unique within an entity, this is a close proxy for total occurrences. - */ - private record NamesWithCount(List names, long totalDocCount) {} - /** Phase 1: Get all matching column names using terms agg with include regex (no top_hits). */ - private NamesWithCount executeNamesQuery(Query query, String regex) throws IOException { + private ColumnAggregator.NamesWithCount executeNamesQuery(Query query, String regex) + throws IOException { Aggregation termsAgg = Aggregation.of( a -> @@ -620,15 +622,16 @@ private NamesWithCount executeNamesQuery(Query query, String regex) throws IOExc t -> t.field("columns.name.keyword") .include(inc -> inc.regexp(regex)) - .size(MAX_PATTERN_SEARCH_NAMES) - .order(List.of(Map.of("_key", SortOrder.Asc))))); + .size(ColumnAggregator.MAX_PATTERN_SEARCH_NAMES) + .order( + List.of(Map.of(ColumnAggregator.AGG_KEY_ORDER, SortOrder.Asc))))); SearchRequest searchRequest = SearchRequest.of( s -> s.index(resolveIndexNames()) .query(query) - .aggregations("matching_columns", termsAgg) + .aggregations(ColumnAggregator.AGG_MATCHING_COLUMNS, termsAgg) .size(0)); SearchResponse response = client.search(searchRequest, JsonData.class); @@ -636,26 +639,28 @@ private NamesWithCount executeNamesQuery(Query query, String regex) throws IOExc List names = new ArrayList<>(); long totalDocCount = 0; if (response.aggregations() != null - && response.aggregations().containsKey("matching_columns")) { - StringTermsAggregate termsResult = response.aggregations().get("matching_columns").sterms(); + && response.aggregations().containsKey(ColumnAggregator.AGG_MATCHING_COLUMNS)) { + StringTermsAggregate termsResult = + response.aggregations().get(ColumnAggregator.AGG_MATCHING_COLUMNS).sterms(); for (StringTermsBucket bucket : termsResult.buckets().array()) { names.add(bucket.key()); totalDocCount += bucket.docCount(); } - if (names.size() == MAX_PATTERN_SEARCH_NAMES) { + if (names.size() == ColumnAggregator.MAX_PATTERN_SEARCH_NAMES) { LOG.warn( "Column name pattern matched at least {} distinct names; results truncated", - MAX_PATTERN_SEARCH_NAMES); + ColumnAggregator.MAX_PATTERN_SEARCH_NAMES); } } - return new NamesWithCount(names, totalDocCount); + return new ColumnAggregator.NamesWithCount(names, totalDocCount); } /** Phase 2: Get data for specific column names using terms agg with exact include + top_hits. */ private Map> executePageDataQuery( Query query, List columnNames) throws IOException { - Aggregation topHitsAgg = Aggregation.of(a -> a.topHits(th -> th.size(SAMPLE_DOCS_PER_COLUMN))); + Aggregation topHitsAgg = + Aggregation.of(a -> a.topHits(th -> th.size(ColumnAggregator.SAMPLE_DOCS_PER_COLUMN))); Aggregation termsAgg = Aggregation.of( @@ -665,14 +670,14 @@ private Map> executePageDataQuery( t.field("columns.name.keyword") .include(inc -> inc.terms(columnNames)) .size(columnNames.size())) - .aggregations("sample_docs", topHitsAgg)); + .aggregations(ColumnAggregator.AGG_SAMPLE_DOCS, topHitsAgg)); SearchRequest searchRequest = SearchRequest.of( s -> s.index(resolveIndexNames()) .query(query) - .aggregations("page_columns", termsAgg) + .aggregations(ColumnAggregator.AGG_PAGE_COLUMNS, termsAgg) .size(0)); SearchResponse response = client.search(searchRequest, JsonData.class); @@ -684,20 +689,23 @@ private Map> parseTermsAggResults( SearchResponse response) { Map> columnsByName = new HashMap<>(); - if (response.aggregations() == null || !response.aggregations().containsKey("page_columns")) { + if (response.aggregations() == null + || !response.aggregations().containsKey(ColumnAggregator.AGG_PAGE_COLUMNS)) { return columnsByName; } - StringTermsAggregate termsAgg = response.aggregations().get("page_columns").sterms(); + StringTermsAggregate termsAgg = + response.aggregations().get(ColumnAggregator.AGG_PAGE_COLUMNS).sterms(); for (StringTermsBucket bucket : termsAgg.buckets().array()) { String columnName = bucket.key(); - if (!bucket.aggregations().containsKey("sample_docs")) { + if (!bucket.aggregations().containsKey(ColumnAggregator.AGG_SAMPLE_DOCS)) { continue; } - TopHitsAggregate topHits = bucket.aggregations().get("sample_docs").topHits(); + TopHitsAggregate topHits = + bucket.aggregations().get(ColumnAggregator.AGG_SAMPLE_DOCS).topHits(); parseBucketHits(columnName, topHits, columnsByName); } @@ -712,10 +720,11 @@ private SearchResponse executeSearch(ColumnAggregationRequest request, CompositeAggregationSource.of( cas -> cas.terms(t -> t.field("columns.name.keyword").order(SortOrder.Asc)))); - Aggregation topHitsAgg = Aggregation.of(a -> a.topHits(th -> th.size(SAMPLE_DOCS_PER_COLUMN))); + Aggregation topHitsAgg = + Aggregation.of(a -> a.topHits(th -> th.size(ColumnAggregator.SAMPLE_DOCS_PER_COLUMN))); Map subAggs = new HashMap<>(); - subAggs.put("sample_docs", topHitsAgg); + subAggs.put(ColumnAggregator.AGG_SAMPLE_DOCS, topHitsAgg); Map afterKey = request.getCursor() != null ? decodeCursorAsFieldValues(request.getCursor()) : null; @@ -761,11 +770,12 @@ private Map> parseCompositeAggResults( FieldValue fieldValue = bucket.key().get("column_name"); String columnName = fieldValue != null ? fieldValue.stringValue() : null; - if (!bucket.aggregations().containsKey("sample_docs")) { + if (!bucket.aggregations().containsKey(ColumnAggregator.AGG_SAMPLE_DOCS)) { continue; } - TopHitsAggregate topHits = bucket.aggregations().get("sample_docs").topHits(); + TopHitsAggregate topHits = + bucket.aggregations().get(ColumnAggregator.AGG_SAMPLE_DOCS).topHits(); parseBucketHits(columnName, topHits, columnsByName); } @@ -980,12 +990,11 @@ private Map decodeCursorAsFieldValues(String cursor) { } } - @SuppressWarnings("unchecked") private Map decodeCursor(String cursor) { try { byte[] decoded = Base64.getDecoder().decode(cursor); String json = new String(decoded, StandardCharsets.UTF_8); - return JsonUtils.readValue(json, Map.class); + return JsonUtils.readValue(json, new TypeReference<>() {}); } catch (Exception e) { LOG.error("Failed to decode cursor", e); return new HashMap<>(); @@ -1050,35 +1059,6 @@ private Map getTotalCounts(Query query) throws IOException { return totals; } - private String encodeSearchOffset(int offset) { - try { - String json = JsonUtils.pojoToJson(Map.of("searchOffset", offset)); - return Base64.getEncoder().encodeToString(json.getBytes(StandardCharsets.UTF_8)); - } catch (Exception e) { - LOG.error("Failed to encode search offset", e); - return null; - } - } - - @SuppressWarnings("unchecked") - private int decodeSearchOffset(String cursor) { - if (cursor == null) { - return 0; - } - try { - String json = new String(Base64.getDecoder().decode(cursor), StandardCharsets.UTF_8); - Map map = JsonUtils.readValue(json, Map.class); - Object offset = map.get("searchOffset"); - if (offset instanceof Number num) { - return num.intValue(); - } - return 0; - } catch (Exception e) { - LOG.debug("Failed to decode search offset cursor, restarting from page 1", e); - return 0; - } - } - private ColumnGridResponse buildResponse( List gridItems, String cursor, From ffe2324dfe2396b16d317e8bc012be4680e9bcc9 Mon Sep 17 00:00:00 2001 From: sonika-shah <58761340+sonika-shah@users.noreply.github.com> Date: Sun, 26 Apr 2026 22:17:39 +0530 Subject: [PATCH 7/8] address feedback --- .../openmetadata/it/tests/ColumnGridResourceIT.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java index 29db482b852a..2e82842830f4 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java @@ -7,6 +7,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.ObjectMapper; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.List; import org.junit.jupiter.api.BeforeAll; @@ -1808,11 +1810,17 @@ void test_getColumnGrid_tagFilterPaginationConsistency(TestNamespace ns) throws assertEquals(2, page1.getColumns().size(), "Page 1 should have exactly 2 columns"); assertNotNull(page1.getCursor(), "Page 1 should have a cursor for next page"); - ColumnGridResponse page2 = getColumnGrid(client, baseQuery + "&cursor=" + page1.getCursor()); + ColumnGridResponse page2 = + getColumnGrid( + client, + baseQuery + "&cursor=" + URLEncoder.encode(page1.getCursor(), StandardCharsets.UTF_8)); assertEquals(2, page2.getColumns().size(), "Page 2 should have exactly 2 columns"); assertNotNull(page2.getCursor(), "Page 2 should have a cursor for next page"); - ColumnGridResponse page3 = getColumnGrid(client, baseQuery + "&cursor=" + page2.getCursor()); + ColumnGridResponse page3 = + getColumnGrid( + client, + baseQuery + "&cursor=" + URLEncoder.encode(page2.getCursor(), StandardCharsets.UTF_8)); assertEquals(1, page3.getColumns().size(), "Page 3 (last) should have exactly 1 column"); // Verify no duplicates across pages From 764b71b8e743eea1c0e0050fb498c1f320aaae02 Mon Sep 17 00:00:00 2001 From: sonika-shah <58761340+sonika-shah@users.noreply.github.com> Date: Sun, 26 Apr 2026 23:17:40 +0530 Subject: [PATCH 8/8] fix test --- .../it/tests/ColumnGridResourceIT.java | 83 +++++++++++-------- .../ElasticSearchColumnAggregator.java | 13 +-- .../OpenSearchColumnAggregator.java | 17 +--- 3 files changed, 52 insertions(+), 61 deletions(-) diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java index 2e82842830f4..ca58cd686a51 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java @@ -16,6 +16,9 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; +import org.junit.jupiter.api.parallel.ResourceAccessMode; +import org.junit.jupiter.api.parallel.ResourceLock; +import org.junit.jupiter.api.parallel.Resources; import org.openmetadata.it.factories.DashboardServiceTestFactory; import org.openmetadata.it.factories.DatabaseSchemaTestFactory; import org.openmetadata.it.factories.DatabaseServiceTestFactory; @@ -1983,20 +1986,22 @@ void test_getColumnGrid_patternSearchAcrossEntityTypesDedupesNames(TestNamespace } @Test + @ResourceLock(value = Resources.GLOBAL, mode = ResourceAccessMode.READ_WRITE) void test_getColumnGrid_patternSearchFindsAlphabeticallyLateColumn(TestNamespace ns) throws Exception { OpenMetadataClient client = SdkClients.adminClient(); DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns); DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service); - int columnCount = 200; - String matchPattern = ns.prefix("zzz_target"); - String matchedColumn = matchPattern; + // Match (zzz_target) at position 50 with size=25 — old code returns 0 on page 1, new code finds + // it. + int columnCount = 50; + String matchedColumn = ns.prefix("zzz_target"); java.util.List columns = new java.util.ArrayList<>(); for (int i = 0; i < columnCount - 1; i++) { columns.add( - Columns.build(ns.prefix(String.format("aaa_filler_%04d", i))) + Columns.build(ns.prefix(String.format("aaa_filler_%02d", i))) .withType(ColumnDataType.VARCHAR) .withLength(255) .create()); @@ -2004,36 +2009,46 @@ void test_getColumnGrid_patternSearchFindsAlphabeticallyLateColumn(TestNamespace columns.add( Columns.build(matchedColumn).withType(ColumnDataType.VARCHAR).withLength(255).create()); - Tables.create() - .name(ns.prefix("scale_search_table")) - .inSchema(schema.getFullyQualifiedName()) - .withColumns(columns) - .execute(); - - waitForSearchIndexRefresh(); - - await("Wait for first-page search to surface alphabetically-late match (size=25)") - .atMost(Duration.ofSeconds(45)) - .pollInterval(Duration.ofSeconds(2)) - .untilAsserted( - () -> { - ColumnGridResponse response = - getColumnGrid( - client, - "entityTypes=table&columnNamePattern=zzz_target&size=25&serviceName=" - + service.getName()); - - assertNotNull(response); - assertTrue( - response.getColumns().stream() - .anyMatch(c -> c.getColumnName().equals(matchedColumn)), - "First page must contain the alphabetically-late matching column " - + "(this exercises the original bug fix — composite agg would have hidden it)"); - assertEquals( - 1, - response.getTotalUniqueColumns(), - "Only one unique column matches the pattern"); - }); + Table table = + Tables.create() + .name(ns.prefix("scale_search_table")) + .inSchema(schema.getFullyQualifiedName()) + .withColumns(columns) + .execute(); + + try { + waitForSearchIndexRefresh(); + + await("Wait for first-page search to surface alphabetically-late match (size=25)") + .atMost(Duration.ofSeconds(45)) + .pollInterval(Duration.ofSeconds(2)) + .untilAsserted( + () -> { + ColumnGridResponse response = + getColumnGrid( + client, + "entityTypes=table&columnNamePattern=zzz_target&size=25&serviceName=" + + service.getName()); + + assertNotNull(response); + assertTrue( + response.getColumns().stream() + .anyMatch(c -> c.getColumnName().equals(matchedColumn)), + "First page must contain the alphabetically-late matching column " + + "(this exercises the original bug fix — composite agg would have hidden it)"); + assertEquals( + 1, + response.getTotalUniqueColumns(), + "Only one unique column matches the pattern"); + }); + } finally { + java.util.Map params = new java.util.HashMap<>(); + params.put("hardDelete", "true"); + try { + SdkClients.adminClient().tables().delete(table.getId().toString(), params); + } catch (Exception ignored) { + } + } } private void waitForColumnToBeIndexed( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java index 317e43c9ea59..acb6804eb2c4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java @@ -266,9 +266,6 @@ private ColumnGridResponse aggregateColumnsWithPattern( /** * Tag/glossary filter path: the tag-check pass already extracted full column metadata from * _source (only tagged columns are in the map). Just paginate over the in-memory result. - * - *

{@code taggedColumns} is a case-insensitive map: when two entities have columns differing - * only in case (e.g. "User" / "user"), occurrences are merged under a single key. */ private ColumnGridResponse aggregateColumnsWithKnownNames( ColumnAggregationRequest request, Map> taggedColumns) { @@ -307,9 +304,6 @@ private ColumnGridResponse aggregateColumnsWithKnownNames( * Fetch columns with matching tags from _source. ES flat object mapping means we can't filter * "column X has tag Y" at query level, so we read _source and check in Java. Since we already * have the full document, we extract column metadata here — avoiding a separate data-fetch query. - * - *

Returns a case-insensitive map so that columns differing only in case (e.g. "User" / "user") - * group together, matching how the search/browse paths display them. */ private Map> getColumnsWithTagsFromSource( ColumnAggregationRequest request, List entityTypes) throws IOException { @@ -358,18 +352,13 @@ private void fetchColumnsWithTagsFromSource( Map> columnsByName) throws IOException { - // Capped at index.max_result_window (default 10k). For tag/glossary filtering this is the - // max number of *tagged entities* we can scan per group; columns from later entities are - // not considered. Tracked separately — would need search_after / scroll to remove this cap. SearchRequest searchRequest = SearchRequest.of(s -> s.index(indexes).query(query).size(10000)); SearchResponse response = client.search(searchRequest, JsonData.class); long totalHits = response.hits().total() != null ? response.hits().total().value() : 0; if (totalHits > 10000) { LOG.warn( - "Tag/glossary source-fetch matched {} entities; only first 10000 scanned for tagged " - + "columns (index.max_result_window). Later entities will not be included.", - totalHits); + "Tag/glossary source-fetch matched {} entities; only first 10000 scanned.", totalHits); } for (Hit hit : response.hits().hits()) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java index 8964d40d3995..28b5f3be543e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java @@ -106,9 +106,7 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr return aggregateColumnsWithPattern(request); } - // Browse path with no pattern or tag/glossary filter: still applies request scope filters - // (service/database/schema/domain/entityType/metadataStatus, etc.) and uses composite - // aggregation for engine-side pagination via after_key. + // Browse path: scope filters + composite agg with after_key cursor. Query query = buildFilters(request, null); try { @@ -192,9 +190,6 @@ private ColumnGridResponse aggregateColumnsWithPattern(ColumnAggregationRequest /** * Tag/glossary filter path: the tag-check pass already extracted full column metadata from * _source (only tagged columns are in the map). Just paginate over the in-memory result. - * - *

{@code taggedColumns} is a case-insensitive map: when two entities have columns differing - * only in case (e.g. "User" / "user"), occurrences are merged under a single key. */ private ColumnGridResponse aggregateColumnsWithKnownNames( ColumnAggregationRequest request, Map> taggedColumns) { @@ -233,9 +228,6 @@ private ColumnGridResponse aggregateColumnsWithKnownNames( * Fetch columns with matching tags from _source. ES flat object mapping means we can't filter * "column X has tag Y" at query level, so we read _source and check in Java. Since we already * have the full document, we extract column metadata here — avoiding a separate data-fetch query. - * - *

Returns a case-insensitive map so that columns differing only in case (e.g. "User" / "user") - * group together, matching how the search/browse paths display them. */ private Map> getColumnsWithTagsFromSource( ColumnAggregationRequest request) throws IOException { @@ -279,9 +271,6 @@ private void fetchColumnsWithTagsFromSource( throws IOException { List resolvedIndexes = resolveIndexNames(); - // Capped at index.max_result_window (default 10k). For tag/glossary filtering this is the - // max number of *tagged entities* we can scan; columns from later entities are not - // considered. Tracked separately — would need search_after / scroll to remove this cap. SearchRequest searchRequest = SearchRequest.of(s -> s.index(resolvedIndexes).query(query).size(10000)); @@ -289,9 +278,7 @@ private void fetchColumnsWithTagsFromSource( long totalHits = response.hits().total() != null ? response.hits().total().value() : 0; if (totalHits > 10000) { LOG.warn( - "Tag/glossary source-fetch matched {} entities; only first 10000 scanned for tagged " - + "columns (index.max_result_window). Later entities will not be included.", - totalHits); + "Tag/glossary source-fetch matched {} entities; only first 10000 scanned.", totalHits); } for (os.org.opensearch.client.opensearch.core.search.Hit hit :