diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java index 1d2289c40481..ca58cd686a51 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/ColumnGridResourceIT.java @@ -7,6 +7,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.ObjectMapper; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.List; import org.junit.jupiter.api.BeforeAll; @@ -14,6 +16,9 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; +import org.junit.jupiter.api.parallel.ResourceAccessMode; +import org.junit.jupiter.api.parallel.ResourceLock; +import org.junit.jupiter.api.parallel.Resources; import org.openmetadata.it.factories.DashboardServiceTestFactory; import org.openmetadata.it.factories.DatabaseSchemaTestFactory; import org.openmetadata.it.factories.DatabaseServiceTestFactory; @@ -1471,6 +1476,581 @@ private void waitForSearchIndexRefresh() { .until(() -> true); } + @Test + void test_getColumnGrid_patternSearchIsCaseInsensitive(TestNamespace ns) throws Exception { + OpenMetadataClient client = SdkClients.adminClient(); + DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns); + DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service); + + String colName = ns.prefix("CaseMixCol"); + Column col = Columns.build(colName).withType(ColumnDataType.VARCHAR).withLength(255).create(); + Tables.create() + .name(ns.prefix("case_test_table")) + .inSchema(schema.getFullyQualifiedName()) + .withColumns(List.of(col)) + .execute(); + + waitForSearchIndexRefresh(); + + await("Wait for lowercase pattern search to find mixed-case column") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(2)) + .untilAsserted( + () -> { + ColumnGridResponse lowerResponse = + getColumnGrid( + client, + "entityTypes=table&columnNamePattern=casemixcol&serviceName=" + + service.getName()); + + assertNotNull(lowerResponse); + assertTrue( + lowerResponse.getColumns().stream() + .anyMatch(c -> c.getColumnName().equals(colName)), + "Lowercase search should find the mixed-case column"); + }); + + await("Wait for uppercase pattern search to find mixed-case column") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(2)) + .untilAsserted( + () -> { + ColumnGridResponse upperResponse = + getColumnGrid( + client, + "entityTypes=table&columnNamePattern=CASEMIXCOL&serviceName=" + + service.getName()); + + assertNotNull(upperResponse); + assertTrue( + upperResponse.getColumns().stream() + .anyMatch(c -> c.getColumnName().equals(colName)), + "Uppercase search should find the mixed-case column"); + }); + } + + @Test + void test_getColumnGrid_patternSearchExcludesNonMatching(TestNamespace ns) throws Exception { + OpenMetadataClient client = SdkClients.adminClient(); + DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns); + DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service); + + String matchCol = ns.prefix("regex_target"); + String noMatchCol = ns.prefix("other_field"); + Column col1 = Columns.build(matchCol).withType(ColumnDataType.VARCHAR).withLength(255).create(); + Column col2 = + Columns.build(noMatchCol).withType(ColumnDataType.VARCHAR).withLength(255).create(); + Tables.create() + .name(ns.prefix("regex_exclude_table")) + .inSchema(schema.getFullyQualifiedName()) + .withColumns(List.of(col1, col2)) + .execute(); + + waitForSearchIndexRefresh(); + + await("Wait for pattern search to exclude non-matching columns") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(2)) + .untilAsserted( + () -> { + ColumnGridResponse response = + getColumnGrid( + client, + "entityTypes=table&columnNamePattern=regex_target&serviceName=" + + service.getName()); + + assertNotNull(response); + assertTrue( + response.getColumns().stream().anyMatch(c -> c.getColumnName().equals(matchCol)), + "Matching column should be in results"); + assertFalse( + response.getColumns().stream() + .anyMatch(c -> c.getColumnName().equals(noMatchCol)), + "Non-matching column from same table should be excluded"); + }); + } + + @Test + void test_getColumnGrid_patternSearchWithSpecialChars(TestNamespace ns) throws Exception { + OpenMetadataClient client = SdkClients.adminClient(); + DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns); + DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service); + + String colWithDot = ns.prefix("col.with.dots"); + String colNoDot = ns.prefix("colXwithXdots"); + Column col1 = + Columns.build(colWithDot).withType(ColumnDataType.VARCHAR).withLength(255).create(); + Column col2 = Columns.build(colNoDot).withType(ColumnDataType.VARCHAR).withLength(255).create(); + Tables.create() + .name(ns.prefix("special_char_table")) + .inSchema(schema.getFullyQualifiedName()) + .withColumns(List.of(col1, col2)) + .execute(); + + waitForSearchIndexRefresh(); + + // Search for "col.with" — dot should be literal, not wildcard + await("Wait for pattern search with special chars") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(2)) + .untilAsserted( + () -> { + ColumnGridResponse response = + getColumnGrid( + client, + "entityTypes=table&columnNamePattern=col.with&serviceName=" + + service.getName()); + + assertNotNull(response); + assertTrue( + response.getColumns().stream() + .anyMatch(c -> c.getColumnName().equals(colWithDot)), + "Column with literal dot should match"); + assertFalse( + response.getColumns().stream().anyMatch(c -> c.getColumnName().equals(colNoDot)), + "Column without dot should not match — dot must be literal, not wildcard"); + }); + } + + @Test + void test_getColumnGrid_patternPlusTagFilter(TestNamespace ns) throws Exception { + OpenMetadataClient client = SdkClients.adminClient(); + DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns); + DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service); + + TagLabel piiTag = new TagLabel(); + piiTag.setTagFQN("PII.Sensitive"); + piiTag.setSource(TagLabel.TagSource.CLASSIFICATION); + piiTag.setLabelType(TagLabel.LabelType.MANUAL); + piiTag.setState(TagLabel.State.CONFIRMED); + + String taggedMatchCol = ns.prefix("pat_tag_match"); + String taggedNoMatchCol = ns.prefix("pat_tag_other"); + String untaggedMatchCol = ns.prefix("pat_tag_match_notag"); + + // Table 1: tagged column matching pattern + tagged column NOT matching pattern + Column col1 = + Columns.build(taggedMatchCol) + .withType(ColumnDataType.VARCHAR) + .withLength(255) + .withTags(List.of(piiTag)) + .create(); + Column col2 = + Columns.build(taggedNoMatchCol) + .withType(ColumnDataType.VARCHAR) + .withLength(255) + .withTags(List.of(piiTag)) + .create(); + Tables.create() + .name(ns.prefix("pat_tag_table_1")) + .inSchema(schema.getFullyQualifiedName()) + .withColumns(List.of(col1, col2)) + .execute(); + + // Table 2: untagged column whose name also matches the pattern + Column col3 = + Columns.build(untaggedMatchCol).withType(ColumnDataType.VARCHAR).withLength(255).create(); + Tables.create() + .name(ns.prefix("pat_tag_table_2")) + .inSchema(schema.getFullyQualifiedName()) + .withColumns(List.of(col3)) + .execute(); + + waitForSearchIndexRefresh(); + + await("Wait for pattern + tag filter result") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(2)) + .untilAsserted( + () -> { + ColumnGridResponse response = + getColumnGrid( + client, + "entityTypes=table&tags=PII.Sensitive&columnNamePattern=pat_tag_match&serviceName=" + + service.getName()); + + assertNotNull(response); + + // Should find taggedMatchCol (matches pattern AND has tag) + // Should NOT find taggedNoMatchCol (has tag but doesn't match pattern) + // Should NOT find untaggedMatchCol (matches pattern but no tag) + boolean foundTaggedMatch = false; + boolean foundTaggedNoMatch = false; + boolean foundUntaggedMatch = false; + + for (ColumnGridItem item : response.getColumns()) { + if (item.getColumnName().equals(taggedMatchCol)) { + foundTaggedMatch = true; + } + if (item.getColumnName().equals(taggedNoMatchCol)) { + foundTaggedNoMatch = true; + } + if (item.getColumnName().equals(untaggedMatchCol)) { + foundUntaggedMatch = true; + } + } + + assertTrue( + foundTaggedMatch, "Column with tag AND matching pattern should be in results"); + assertFalse( + foundTaggedNoMatch, + "Column with tag but NOT matching pattern should be excluded"); + assertFalse( + foundUntaggedMatch, "Column matching pattern but WITHOUT tag should be excluded"); + }); + } + + @Test + void test_getColumnGrid_patternPlusGlossaryFilter(TestNamespace ns) throws Exception { + OpenMetadataClient client = SdkClients.adminClient(); + DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns); + DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service); + + Glossary glossary = createGlossary(client, ns, "PG"); + GlossaryTerm term = createGlossaryTerm(client, glossary, ns, "PT"); + + TagLabel glossaryTag = new TagLabel(); + glossaryTag.setTagFQN(term.getFullyQualifiedName()); + glossaryTag.setSource(TagLabel.TagSource.GLOSSARY); + glossaryTag.setLabelType(TagLabel.LabelType.MANUAL); + glossaryTag.setState(TagLabel.State.CONFIRMED); + + String matchCol = ns.prefix("pg_match_col"); + String noMatchCol = ns.prefix("pg_other_col"); + + Column col1 = + Columns.build(matchCol) + .withType(ColumnDataType.VARCHAR) + .withLength(255) + .withTags(List.of(glossaryTag)) + .create(); + Column col2 = + Columns.build(noMatchCol) + .withType(ColumnDataType.VARCHAR) + .withLength(255) + .withTags(List.of(glossaryTag)) + .create(); + Tables.create() + .name(ns.prefix("pg_table")) + .inSchema(schema.getFullyQualifiedName()) + .withColumns(List.of(col1, col2)) + .execute(); + + waitForSearchIndexRefresh(); + + await("Wait for pattern + glossary filter result") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(2)) + .untilAsserted( + () -> { + ColumnGridResponse response = + getColumnGrid( + client, + "entityTypes=table&glossaryTerms=" + + term.getFullyQualifiedName() + + "&columnNamePattern=pg_match&serviceName=" + + service.getName()); + + assertNotNull(response); + + assertTrue( + response.getColumns().stream().anyMatch(c -> c.getColumnName().equals(matchCol)), + "Column matching both pattern and glossary should be in results"); + assertFalse( + response.getColumns().stream() + .anyMatch(c -> c.getColumnName().equals(noMatchCol)), + "Column with glossary but not matching pattern should be excluded"); + }); + } + + @Test + void test_getColumnGrid_tagFilterPaginationConsistency(TestNamespace ns) throws Exception { + OpenMetadataClient client = SdkClients.adminClient(); + DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns); + DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service); + + TagLabel piiTag = new TagLabel(); + piiTag.setTagFQN("PII.Sensitive"); + piiTag.setSource(TagLabel.TagSource.CLASSIFICATION); + piiTag.setLabelType(TagLabel.LabelType.MANUAL); + piiTag.setState(TagLabel.State.CONFIRMED); + + // Create 5 tables, each with a uniquely-named tagged column + for (int i = 0; i < 5; i++) { + Column col = + Columns.build(ns.prefix("pagcon_col_" + i)) + .withType(ColumnDataType.VARCHAR) + .withLength(255) + .withTags(List.of(piiTag)) + .create(); + Tables.create() + .name(ns.prefix("pagcon_table_" + i)) + .inSchema(schema.getFullyQualifiedName()) + .withColumns(List.of(col)) + .execute(); + } + + waitForSearchIndexRefresh(); + + // Page through with size=2 — should get 2, 2, 1 + // Use serviceName to scope to this test's data, raw pattern prefix to match column names + String baseQuery = + "entityTypes=table&tags=PII.Sensitive&columnNamePattern=pagcon&serviceName=" + + service.getName() + + "&size=2"; + + await("Wait for all 5 tagged columns to be indexed") + .atMost(Duration.ofSeconds(45)) + .pollInterval(Duration.ofSeconds(2)) + .untilAsserted( + () -> { + ColumnGridResponse first = getColumnGrid(client, baseQuery); + assertNotNull(first); + assertEquals(5, first.getTotalUniqueColumns(), "Should report 5 unique columns"); + }); + + ColumnGridResponse page1 = getColumnGrid(client, baseQuery); + assertEquals(2, page1.getColumns().size(), "Page 1 should have exactly 2 columns"); + assertNotNull(page1.getCursor(), "Page 1 should have a cursor for next page"); + + ColumnGridResponse page2 = + getColumnGrid( + client, + baseQuery + "&cursor=" + URLEncoder.encode(page1.getCursor(), StandardCharsets.UTF_8)); + assertEquals(2, page2.getColumns().size(), "Page 2 should have exactly 2 columns"); + assertNotNull(page2.getCursor(), "Page 2 should have a cursor for next page"); + + ColumnGridResponse page3 = + getColumnGrid( + client, + baseQuery + "&cursor=" + URLEncoder.encode(page2.getCursor(), StandardCharsets.UTF_8)); + assertEquals(1, page3.getColumns().size(), "Page 3 (last) should have exactly 1 column"); + + // Verify no duplicates across pages + java.util.Set allNames = new java.util.HashSet<>(); + for (ColumnGridItem item : page1.getColumns()) { + assertTrue(allNames.add(item.getColumnName()), "Duplicate found: " + item.getColumnName()); + } + for (ColumnGridItem item : page2.getColumns()) { + assertTrue(allNames.add(item.getColumnName()), "Duplicate found: " + item.getColumnName()); + } + for (ColumnGridItem item : page3.getColumns()) { + assertTrue(allNames.add(item.getColumnName()), "Duplicate found: " + item.getColumnName()); + } + assertEquals(5, allNames.size(), "Should have collected all 5 unique columns across pages"); + } + + @Test + void test_getColumnGrid_glossaryFilter_onlyReturnsGlossaryOccurrences(TestNamespace ns) + throws Exception { + OpenMetadataClient client = SdkClients.adminClient(); + DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns); + DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service); + + Glossary glossary = createGlossary(client, ns, "OG"); + GlossaryTerm term = createGlossaryTerm(client, glossary, ns, "OT"); + + TagLabel glossaryTag = new TagLabel(); + glossaryTag.setTagFQN(term.getFullyQualifiedName()); + glossaryTag.setSource(TagLabel.TagSource.GLOSSARY); + glossaryTag.setLabelType(TagLabel.LabelType.MANUAL); + glossaryTag.setState(TagLabel.State.CONFIRMED); + + String sharedName = ns.prefix("gocc_col"); + + // Table 1: column WITH glossary term + Column withGlossary = + Columns.build(sharedName) + .withType(ColumnDataType.VARCHAR) + .withLength(255) + .withDescription("Has glossary") + .withTags(List.of(glossaryTag)) + .create(); + Tables.create() + .name(ns.prefix("gocc_t1")) + .inSchema(schema.getFullyQualifiedName()) + .withColumns(List.of(withGlossary)) + .execute(); + + // Table 2: same column name WITHOUT glossary term + Column withoutGlossary = + Columns.build(sharedName) + .withType(ColumnDataType.VARCHAR) + .withLength(255) + .withDescription("No glossary") + .create(); + Tables.create() + .name(ns.prefix("gocc_t2")) + .inSchema(schema.getFullyQualifiedName()) + .withColumns(List.of(withoutGlossary)) + .execute(); + + waitForSearchIndexRefresh(); + + await("Wait for glossary-filtered column to return the tagged occurrence only") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(2)) + .untilAsserted( + () -> { + ColumnGridResponse response = + getColumnGrid( + client, + "entityTypes=table&glossaryTerms=" + + term.getFullyQualifiedName() + + "&serviceName=" + + service.getName()); + + assertNotNull(response); + assertNotNull(response.getColumns()); + + ColumnGridItem sharedItem = + response.getColumns().stream() + .filter(item -> item.getColumnName().equals(sharedName)) + .findFirst() + .orElse(null); + + assertNotNull( + sharedItem, + "Expected '" + sharedName + "' to be present in the glossary-filtered response"); + assertEquals( + 1, + sharedItem.getTotalOccurrences(), + "Should only return the occurrence WITH the glossary term, not all with same name"); + }); + } + + @Test + void test_getColumnGrid_patternSearchAcrossEntityTypesDedupesNames(TestNamespace ns) + throws Exception { + OpenMetadataClient client = SdkClients.adminClient(); + + DatabaseService dbService = DatabaseServiceTestFactory.createPostgres(ns); + DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, dbService); + + String sharedName = ns.prefix("multi_type_col"); + + Column tableCol = + Columns.build(sharedName).withType(ColumnDataType.VARCHAR).withLength(255).create(); + Tables.create() + .name(ns.prefix("multi_type_table")) + .inSchema(schema.getFullyQualifiedName()) + .withColumns(List.of(tableCol)) + .execute(); + + DashboardService dashService = DashboardServiceTestFactory.createMetabase(ns); + Column dashCol = + Columns.build(sharedName).withType(ColumnDataType.VARCHAR).withLength(255).create(); + DashboardDataModels.create() + .name(ns.prefix("multi_type_datamodel")) + .in(dashService.getFullyQualifiedName()) + .withColumns(List.of(dashCol)) + .withDataModelType(DataModelType.MetabaseDataModel) + .execute(); + + waitForSearchIndexRefresh(); + + await("Wait for both entities to be indexed and dedupe correctly") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(2)) + .untilAsserted( + () -> { + ColumnGridResponse response = + getColumnGrid( + client, + "entityTypes=table,dashboardDataModel&columnNamePattern=multi_type_col"); + + assertNotNull(response); + + long matches = + response.getColumns().stream() + .filter(c -> c.getColumnName().equals(sharedName)) + .count(); + + assertEquals( + 1, matches, "Same column name in two entity types must dedupe to one grid entry"); + + ColumnGridItem item = + response.getColumns().stream() + .filter(c -> c.getColumnName().equals(sharedName)) + .findFirst() + .orElseThrow(); + + assertEquals( + 2, + item.getTotalOccurrences(), + "Per-column occurrences must include both entity types"); + assertTrue( + response.getTotalOccurrences() >= 2, + "Response totalOccurrences must include both entity-type buckets"); + }); + } + + @Test + @ResourceLock(value = Resources.GLOBAL, mode = ResourceAccessMode.READ_WRITE) + void test_getColumnGrid_patternSearchFindsAlphabeticallyLateColumn(TestNamespace ns) + throws Exception { + OpenMetadataClient client = SdkClients.adminClient(); + DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns); + DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service); + + // Match (zzz_target) at position 50 with size=25 — old code returns 0 on page 1, new code finds + // it. + int columnCount = 50; + String matchedColumn = ns.prefix("zzz_target"); + + java.util.List columns = new java.util.ArrayList<>(); + for (int i = 0; i < columnCount - 1; i++) { + columns.add( + Columns.build(ns.prefix(String.format("aaa_filler_%02d", i))) + .withType(ColumnDataType.VARCHAR) + .withLength(255) + .create()); + } + columns.add( + Columns.build(matchedColumn).withType(ColumnDataType.VARCHAR).withLength(255).create()); + + Table table = + Tables.create() + .name(ns.prefix("scale_search_table")) + .inSchema(schema.getFullyQualifiedName()) + .withColumns(columns) + .execute(); + + try { + waitForSearchIndexRefresh(); + + await("Wait for first-page search to surface alphabetically-late match (size=25)") + .atMost(Duration.ofSeconds(45)) + .pollInterval(Duration.ofSeconds(2)) + .untilAsserted( + () -> { + ColumnGridResponse response = + getColumnGrid( + client, + "entityTypes=table&columnNamePattern=zzz_target&size=25&serviceName=" + + service.getName()); + + assertNotNull(response); + assertTrue( + response.getColumns().stream() + .anyMatch(c -> c.getColumnName().equals(matchedColumn)), + "First page must contain the alphabetically-late matching column " + + "(this exercises the original bug fix — composite agg would have hidden it)"); + assertEquals( + 1, + response.getTotalUniqueColumns(), + "Only one unique column matches the pattern"); + }); + } finally { + java.util.Map params = new java.util.HashMap<>(); + params.put("hardDelete", "true"); + try { + SdkClients.adminClient().tables().delete(table.getId().toString(), params); + } catch (Exception ignored) { + } + } + } + private void waitForColumnToBeIndexed( OpenMetadataClient client, String columnName, String serviceName) { await() diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/ColumnAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/ColumnAggregator.java index 4788046e3169..f673b20c33e2 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/ColumnAggregator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/ColumnAggregator.java @@ -13,14 +13,111 @@ package org.openmetadata.service.search; +import com.fasterxml.jackson.core.type.TypeReference; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; import java.util.List; +import java.util.Map; import org.openmetadata.schema.api.data.ColumnGridResponse; +import org.openmetadata.schema.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public interface ColumnAggregator { + Logger LOG = LoggerFactory.getLogger(ColumnAggregator.class); + + /** Max column names to retrieve in the names-only query during pattern search. */ + int MAX_PATTERN_SEARCH_NAMES = 10000; + + /** + * Number of sample docs pulled per column-name bucket to populate occurrences. Caps + * {@code ColumnGridItem.totalOccurrences}; columns appearing in more entities than this + * undercount. + */ + int SAMPLE_DOCS_PER_COLUMN = 100; + + /** Aggregation names used in pattern-search queries (ES + OS). */ + String AGG_MATCHING_COLUMNS = "matching_columns"; + + String AGG_PAGE_COLUMNS = "page_columns"; + String AGG_SAMPLE_DOCS = "sample_docs"; + String AGG_KEY_ORDER = "_key"; + + /** Cursor payload key for the offset-based search/tag pagination cursor. */ + String CURSOR_SEARCH_OFFSET = "searchOffset"; + + TypeReference> CURSOR_TYPE = new TypeReference<>() {}; + ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) throws IOException; + /** + * Convert a plain text pattern to a case-insensitive regex for ES/OS terms include. Lucene regex + * does not support (?i), so each letter is expanded to a character class: "MAT" → [mM][aA][tT]. + */ + static String toCaseInsensitiveRegex(String pattern) { + StringBuilder sb = new StringBuilder(".*"); + for (char c : pattern.toCharArray()) { + if (Character.isLetter(c)) { + sb.append('[') + .append(Character.toLowerCase(c)) + .append(Character.toUpperCase(c)) + .append(']'); + } else if (".+*?|[](){}^$\\~@&#<>\"".indexOf(c) >= 0) { + sb.append('\\').append(c); + } else { + sb.append(c); + } + } + sb.append(".*"); + return sb.toString(); + } + + /** Encode an offset into the search/tag pagination cursor (base64 JSON). */ + static String encodeSearchOffset(int offset) { + try { + String json = JsonUtils.pojoToJson(Map.of(CURSOR_SEARCH_OFFSET, offset)); + return Base64.getEncoder().encodeToString(json.getBytes(StandardCharsets.UTF_8)); + } catch (Exception e) { + LOG.error("Failed to encode search offset", e); + return null; + } + } + + /** Decode the search/tag pagination cursor; restart at 0 for malformed input. */ + static int decodeSearchOffset(String cursor) { + if (cursor == null) { + return 0; + } + try { + String json = new String(Base64.getDecoder().decode(cursor), StandardCharsets.UTF_8); + Map map = JsonUtils.readValue(json, CURSOR_TYPE); + Object offset = map.get(CURSOR_SEARCH_OFFSET); + if (offset instanceof Number num) { + return num.intValue(); + } + return 0; + } catch (Exception e) { + LOG.debug("Failed to decode search offset cursor, restarting from page 1", e); + return 0; + } + } + + /** Saturating long → int cast for response totals. Caps at Integer.MAX_VALUE. */ + static int toIntSaturating(long value) { + if (value > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + if (value < 0) { + return 0; + } + return (int) value; + } + + /** Phase 1 result: matching column names and the total doc_count summed across buckets. */ + record NamesWithCount(List names, long totalDocCount) {} + class ColumnAggregationRequest { private int size = 1000; private String cursor; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java index 187235b11ea1..acb6804eb2c4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchColumnAggregator.java @@ -15,6 +15,7 @@ import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import es.co.elastic.clients.elasticsearch.ElasticsearchClient; import es.co.elastic.clients.elasticsearch._types.ElasticsearchException; @@ -24,6 +25,8 @@ import es.co.elastic.clients.elasticsearch._types.aggregations.CompositeAggregate; import es.co.elastic.clients.elasticsearch._types.aggregations.CompositeAggregationSource; import es.co.elastic.clients.elasticsearch._types.aggregations.CompositeBucket; +import es.co.elastic.clients.elasticsearch._types.aggregations.StringTermsAggregate; +import es.co.elastic.clients.elasticsearch._types.aggregations.StringTermsBucket; import es.co.elastic.clients.elasticsearch._types.aggregations.TopHitsAggregate; import es.co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery; import es.co.elastic.clients.elasticsearch._types.query_dsl.Query; @@ -31,6 +34,7 @@ import es.co.elastic.clients.elasticsearch.core.SearchResponse; import es.co.elastic.clients.elasticsearch.core.search.Hit; import es.co.elastic.clients.json.JsonData; +import es.co.elastic.clients.util.NamedValue; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -38,8 +42,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.api.data.ColumnGridItem; import org.openmetadata.schema.api.data.ColumnGridResponse; @@ -87,24 +94,33 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr List entityTypes = getEntityTypesForRequest(request); - // Two-phase query for tags/glossaryTerms filtering: - // Phase 1: Find entityFQN#columnName pairs that have the tag - // Phase 2: Filter to only return those specific occurrences + // Tag/glossary filter path: we must read _source to check which specific column has + // the tag (ES flat object mapping can't tell us). Since we're already reading _source, + // we extract full column metadata in the same pass — no separate data-fetch query needed. boolean hasTagFilter = !nullOrEmpty(request.getTags()) || !nullOrEmpty(request.getGlossaryTerms()); - Set entityColumnPairsWithTags = null; - Set columnNamesWithTags = null; if (hasTagFilter) { - entityColumnPairsWithTags = getEntityColumnPairsWithTags(request, entityTypes); - if (entityColumnPairsWithTags.isEmpty()) { + Map> taggedColumns = + getColumnsWithTagsFromSource(request, entityTypes); + if (taggedColumns.isEmpty()) { return buildResponse(new ArrayList<>(), null, false, 0, 0); } - // Also keep just the column names for the Phase 2 query filter - columnNamesWithTags = - entityColumnPairsWithTags.stream() - .map(pair -> pair.substring(pair.indexOf('#') + 1)) - .collect(java.util.stream.Collectors.toSet()); + + // Pattern + tag combined: filter the already-fetched columns by pattern in Java + if (!nullOrEmpty(request.getColumnNamePattern())) { + String pattern = request.getColumnNamePattern().toLowerCase(Locale.ROOT); + taggedColumns + .entrySet() + .removeIf(e -> !e.getKey().toLowerCase(Locale.ROOT).contains(pattern)); + } + + return aggregateColumnsWithKnownNames(request, taggedColumns); + } + + // Pattern-only path (no tag filter): use terms agg with include regex + if (!nullOrEmpty(request.getColumnNamePattern())) { + return aggregateColumnsWithPattern(request, entityTypes); } Map> allColumnsByName = new HashMap<>(); @@ -124,43 +140,15 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr String columnFieldPath = INDEX_CONFIGS.get(groupEntityTypes.getFirst()).columnFieldPath(); - // Phase 2: Build query WITHOUT tag filter but WITH column names filter - List columnNamesList = - columnNamesWithTags != null ? new ArrayList<>(columnNamesWithTags) : null; - Query query = buildFilters(request, columnNameKeyword, columnNamesList); + Query query = buildFilters(request, columnNameKeyword, null); try { SearchResponse response = executeSearch(request, query, indexes, columnNameKeyword); Map> columnsByName = - parseAggregationResults(response, columnFieldPath); - - // Post-filter columns by name pattern since ES aggregation returns all columns from matched - // documents - String columnNamePattern = request.getColumnNamePattern(); - if (!nullOrEmpty(columnNamePattern)) { - columnsByName - .entrySet() - .removeIf(e -> !matchesColumnNamePattern(e.getKey(), columnNamePattern)); - } + parseCompositeAggResults(response, columnFieldPath); - // Post-filter for tag/glossary terms filtering: Only keep occurrences that were - // identified in Phase 1 as having the tag (not just same column name) - if (entityColumnPairsWithTags != null && !entityColumnPairsWithTags.isEmpty()) { - final Set allowedPairs = entityColumnPairsWithTags; - for (List occurrences : columnsByName.values()) { - occurrences.removeIf( - ctx -> { - String key = ctx.entityFQN + "#" + ctx.column.getName(); - return !allowedPairs.contains(key); - }); - } - // Remove column entries that have no occurrences left - columnsByName.entrySet().removeIf(e -> e.getValue().isEmpty()); - } - - // Merge results for (Map.Entry> colEntry : columnsByName.entrySet()) { allColumnsByName .computeIfAbsent(colEntry.getKey(), k -> new ArrayList<>()) @@ -173,9 +161,7 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr hasMore = true; } - // Get totals only on first page and only when no column name pattern - // (ES aggregation counts all columns from matched docs, not just filtered ones) - if (request.getCursor() == null && nullOrEmpty(request.getColumnNamePattern())) { + if (request.getCursor() == null) { Map totals = getTotalCounts(query, indexes, columnNameKeyword); totalUniqueColumns += totals.get("uniqueColumns"); totalOccurrences += totals.get("totalOccurrences"); @@ -191,10 +177,7 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr List gridItems = ColumnMetadataGrouper.groupColumns(allColumnsByName); - // Calculate totals from actual filtered data when: - // - On subsequent pages (cursor is set) - // - When column name pattern is specified (ES aggregation includes non-matching columns) - if (request.getCursor() != null || !nullOrEmpty(request.getColumnNamePattern())) { + if (request.getCursor() != null) { totalUniqueColumns = allColumnsByName.size(); totalOccurrences = gridItems.stream().mapToInt(ColumnGridItem::getTotalOccurrences).sum(); } @@ -204,13 +187,128 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr } /** - * Phase 1: Get entityFQN#columnName pairs that have the specified tags. Since ES flattens - * arrays, we must fetch column data and filter in Java to find columns that actually have the - * tag. + * Pattern-only search path (no tag filter): uses terms aggregation with include regex to filter + * column names at the aggregation level. Two queries per entity-type group: (1) lightweight names + * query to get all matching names and total count, (2) targeted data query with top_hits for the + * current page. */ - private Set getEntityColumnPairsWithTags( + private ColumnGridResponse aggregateColumnsWithPattern( ColumnAggregationRequest request, List entityTypes) throws IOException { - Set entityColumnPairs = new HashSet<>(); + + Map> fieldPathToEntityTypes = groupByFieldPath(entityTypes); + String regex = ColumnAggregator.toCaseInsensitiveRegex(request.getColumnNamePattern()); + + Set allMatchingNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + long totalOccurrencesAcrossGroups = 0; + + for (Map.Entry> entry : fieldPathToEntityTypes.entrySet()) { + String columnNameKeyword = entry.getKey(); + List indexes = resolveIndexNames(entry.getValue()); + Query query = buildFilters(request, columnNameKeyword, null); + + try { + NamesWithCount result = executeNamesQuery(query, indexes, columnNameKeyword, regex); + allMatchingNames.addAll(result.names()); + totalOccurrencesAcrossGroups += result.totalDocCount(); + } catch (ElasticsearchException e) { + if (!isIndexNotFoundException(e)) { + throw e; + } + } + } + + int totalUniqueColumns = allMatchingNames.size(); + int totalOccurrences = ColumnAggregator.toIntSaturating(totalOccurrencesAcrossGroups); + int offset = ColumnAggregator.decodeSearchOffset(request.getCursor()); + int pageSize = request.getSize(); + + List sortedNames = new ArrayList<>(allMatchingNames); + int fromIndex = Math.min(offset, sortedNames.size()); + int toIndex = Math.min(offset + pageSize, sortedNames.size()); + List pageNames = sortedNames.subList(fromIndex, toIndex); + + if (pageNames.isEmpty()) { + return buildResponse(new ArrayList<>(), null, false, totalUniqueColumns, totalOccurrences); + } + + Map> allColumnsByName = new HashMap<>(); + + for (Map.Entry> entry : fieldPathToEntityTypes.entrySet()) { + String columnNameKeyword = entry.getKey(); + List indexes = resolveIndexNames(entry.getValue()); + String columnFieldPath = INDEX_CONFIGS.get(entry.getValue().getFirst()).columnFieldPath(); + Query query = buildFilters(request, columnNameKeyword, null); + + try { + Map> columnsByName = + executePageDataQuery(query, indexes, columnNameKeyword, columnFieldPath, pageNames); + + for (Map.Entry> colEntry : columnsByName.entrySet()) { + allColumnsByName + .computeIfAbsent(colEntry.getKey(), k -> new ArrayList<>()) + .addAll(colEntry.getValue()); + } + } catch (ElasticsearchException e) { + if (!isIndexNotFoundException(e)) { + throw e; + } + } + } + + List gridItems = ColumnMetadataGrouper.groupColumns(allColumnsByName); + + boolean hasMore = toIndex < totalUniqueColumns; + String cursor = hasMore ? ColumnAggregator.encodeSearchOffset(toIndex) : null; + + return buildResponse(gridItems, cursor, hasMore, totalUniqueColumns, totalOccurrences); + } + + /** + * Tag/glossary filter path: the tag-check pass already extracted full column metadata from + * _source (only tagged columns are in the map). Just paginate over the in-memory result. + */ + private ColumnGridResponse aggregateColumnsWithKnownNames( + ColumnAggregationRequest request, Map> taggedColumns) { + + int totalUniqueColumns = taggedColumns.size(); + int totalOccurrences = taggedColumns.values().stream().mapToInt(List::size).sum(); + int offset = ColumnAggregator.decodeSearchOffset(request.getCursor()); + int pageSize = request.getSize(); + + List sortedNames = new ArrayList<>(taggedColumns.keySet()); + int fromIndex = Math.min(offset, sortedNames.size()); + int toIndex = Math.min(offset + pageSize, sortedNames.size()); + List pageNames = sortedNames.subList(fromIndex, toIndex); + + if (pageNames.isEmpty()) { + return buildResponse(new ArrayList<>(), null, false, totalUniqueColumns, totalOccurrences); + } + + Map> pageColumns = new HashMap<>(); + for (String name : pageNames) { + List occurrences = taggedColumns.get(name); + if (occurrences != null) { + pageColumns.put(name, occurrences); + } + } + + List gridItems = ColumnMetadataGrouper.groupColumns(pageColumns); + + boolean hasMore = toIndex < totalUniqueColumns; + String cursor = hasMore ? ColumnAggregator.encodeSearchOffset(toIndex) : null; + + return buildResponse(gridItems, cursor, hasMore, totalUniqueColumns, totalOccurrences); + } + + /** + * Fetch columns with matching tags from _source. ES flat object mapping means we can't filter + * "column X has tag Y" at query level, so we read _source and check in Java. Since we already + * have the full document, we extract column metadata here — avoiding a separate data-fetch query. + */ + private Map> getColumnsWithTagsFromSource( + ColumnAggregationRequest request, List entityTypes) throws IOException { + Map> columnsByName = + new TreeMap<>(String.CASE_INSENSITIVE_ORDER); Map> fieldPathToEntityTypes = groupByFieldPath(entityTypes); Set targetTags = buildTargetTagSet(request); @@ -224,9 +322,7 @@ private Set getEntityColumnPairsWithTags( Query query = buildTagFilterQuery(request, columnNameKeyword); try { - Set matchingPairs = - fetchEntityColumnPairsWithTags(indexes, query, columnFieldPath, targetTags); - entityColumnPairs.addAll(matchingPairs); + fetchColumnsWithTagsFromSource(indexes, query, columnFieldPath, targetTags, columnsByName); } catch (ElasticsearchException e) { if (!isIndexNotFoundException(e)) { throw e; @@ -234,7 +330,7 @@ private Set getEntityColumnPairsWithTags( } } - return entityColumnPairs; + return columnsByName; } private Set buildTargetTagSet(ColumnAggregationRequest request) { @@ -248,27 +344,33 @@ private Set buildTargetTagSet(ColumnAggregationRequest request) { return targetTags; } - private Set fetchEntityColumnPairsWithTags( - List indexes, Query query, String columnFieldPath, Set targetTags) + private void fetchColumnsWithTagsFromSource( + List indexes, + Query query, + String columnFieldPath, + Set targetTags, + Map> columnsByName) throws IOException { - Set entityColumnPairs = new HashSet<>(); SearchRequest searchRequest = SearchRequest.of(s -> s.index(indexes).query(query).size(10000)); SearchResponse response = client.search(searchRequest, JsonData.class); + long totalHits = response.hits().total() != null ? response.hits().total().value() : 0; + if (totalHits > 10000) { + LOG.warn( + "Tag/glossary source-fetch matched {} entities; only first 10000 scanned.", totalHits); + } for (Hit hit : response.hits().hits()) { - extractMatchingEntityColumnPairs(hit, columnFieldPath, targetTags, entityColumnPairs); + extractMatchingColumnsFromHit(hit, columnFieldPath, targetTags, columnsByName); } - - return entityColumnPairs; } - private void extractMatchingEntityColumnPairs( + private void extractMatchingColumnsFromHit( Hit hit, String columnFieldPath, Set targetTags, - Set entityColumnPairs) { + Map> columnsByName) { if (hit.source() == null) { return; } @@ -279,19 +381,35 @@ private void extractMatchingEntityColumnPairs( return; } + String entityType = getTextField(sourceNode, "entityType"); + String entityDisplayName = getTextField(sourceNode, "displayName"); + String serviceName = getNestedField(sourceNode, "service", "name"); + String databaseName = getNestedField(sourceNode, "database", "name"); + String schemaName = getNestedField(sourceNode, "databaseSchema", "name"); + JsonNode columnsData = getNestedJsonNode(sourceNode, columnFieldPath); if (columnsData != null && columnsData.isArray()) { for (JsonNode columnData : columnsData) { String colName = getTextField(columnData, "name"); - boolean hasTag = columnHasTargetTag(columnData, targetTags); - if (hasTag && colName != null) { - entityColumnPairs.add(entityFQN + "#" + colName); + if (colName != null && columnHasTargetTag(columnData, targetTags)) { + Column column = parseColumn(columnData, entityFQN); + columnsByName + .computeIfAbsent(colName, k -> new ArrayList<>()) + .add( + new ColumnWithContext( + column, + entityType, + entityFQN, + entityDisplayName, + serviceName, + databaseName, + schemaName)); } } } } catch (Exception e) { - LOG.warn("Failed to extract entity column pairs from hit", e); + LOG.warn("Failed to extract columns from hit", e); } } @@ -318,13 +436,28 @@ private boolean containsIgnoreCase(Set set, String value) { return false; } - /** Build query specifically for tag filtering (Phase 1) */ + /** + * Build query for tag filtering source fetch. Includes all scope filters (service, database, + * schema, domain, entityType), column-name pattern, and metadataStatus so the _source fetch is + * scoped to the same data as the main query. Per-column correlation (which specific column has + * the tag + matches the pattern) still happens in Java because flat object mapping prevents + * expressing it at query level. + */ private Query buildTagFilterQuery(ColumnAggregationRequest request, String columnNameKeyword) { BoolQuery.Builder boolBuilder = new BoolQuery.Builder(); String columnFieldPath = columnNameKeyword.replace(".name.keyword", ""); boolBuilder.filter(Query.of(q -> q.exists(e -> e.field(columnFieldPath)))); + addEntityTypeFilter(boolBuilder, request); + addServiceFilter(boolBuilder, request); + addServiceTypeFilter(boolBuilder, request); + addDatabaseFilter(boolBuilder, request); + addSchemaFilter(boolBuilder, request); + addDomainFilter(boolBuilder, request); + addColumnNamePatternFilter(boolBuilder, request, columnNameKeyword); + addMetadataStatusFilter(boolBuilder, request, columnFieldPath); + String tagFQNField = columnNameKeyword.replace(".name.keyword", ".tags.tagFQN"); List allTags = new ArrayList<>(); @@ -356,15 +489,6 @@ private String escapeWildcardPattern(String input) { return input.replace("\\", "\\\\").replace("*", "\\*").replace("?", "\\?"); } - private boolean matchesColumnNamePattern(String columnName, String pattern) { - if (nullOrEmpty(pattern)) { - return true; - } - String lowerColumnName = columnName.toLowerCase(); - String lowerPattern = pattern.toLowerCase(); - return lowerColumnName.contains(lowerPattern); - } - /** Get entity types to query - defaults to table only for performance */ private List getEntityTypesForRequest(ColumnAggregationRequest request) { if (request.getEntityTypes() == null || request.getEntityTypes().isEmpty()) { @@ -573,23 +697,132 @@ private Query hasEmptyOrMissingField(String field) { .minimumShouldMatch("1"))); } + /** Phase 1: Get all matching column names using terms agg with include regex (no top_hits). */ + private ColumnAggregator.NamesWithCount executeNamesQuery( + Query query, List indexes, String columnNameKeyword, String regex) + throws IOException { + + Aggregation termsAgg = + Aggregation.of( + a -> + a.terms( + t -> + t.field(columnNameKeyword) + .include(inc -> inc.regexp(regex)) + .size(ColumnAggregator.MAX_PATTERN_SEARCH_NAMES) + .order( + List.of( + NamedValue.of( + ColumnAggregator.AGG_KEY_ORDER, SortOrder.Asc))))); + + SearchRequest searchRequest = + SearchRequest.of( + s -> + s.index(indexes) + .query(query) + .aggregations(ColumnAggregator.AGG_MATCHING_COLUMNS, termsAgg) + .size(0)); + + SearchResponse response = client.search(searchRequest, JsonData.class); + + List names = new ArrayList<>(); + long totalDocCount = 0; + if (response.aggregations() != null + && response.aggregations().containsKey(ColumnAggregator.AGG_MATCHING_COLUMNS)) { + StringTermsAggregate termsResult = + response.aggregations().get(ColumnAggregator.AGG_MATCHING_COLUMNS).sterms(); + for (StringTermsBucket bucket : termsResult.buckets().array()) { + names.add(bucket.key().stringValue()); + totalDocCount += bucket.docCount(); + } + if (names.size() == ColumnAggregator.MAX_PATTERN_SEARCH_NAMES) { + LOG.warn( + "Column name pattern matched at least {} distinct names; results truncated", + ColumnAggregator.MAX_PATTERN_SEARCH_NAMES); + } + } + return new ColumnAggregator.NamesWithCount(names, totalDocCount); + } + + /** Phase 2: Get data for specific column names using terms agg with exact include + top_hits. */ + private Map> executePageDataQuery( + Query query, + List indexes, + String columnNameKeyword, + String columnFieldPath, + List columnNames) + throws IOException { + + Aggregation topHitsAgg = + Aggregation.of(a -> a.topHits(th -> th.size(ColumnAggregator.SAMPLE_DOCS_PER_COLUMN))); + + Aggregation termsAgg = + Aggregation.of( + a -> + a.terms( + t -> + t.field(columnNameKeyword) + .include(inc -> inc.terms(columnNames)) + .size(columnNames.size())) + .aggregations(ColumnAggregator.AGG_SAMPLE_DOCS, topHitsAgg)); + + SearchRequest searchRequest = + SearchRequest.of( + s -> + s.index(indexes) + .query(query) + .aggregations(ColumnAggregator.AGG_PAGE_COLUMNS, termsAgg) + .size(0)); + + SearchResponse response = client.search(searchRequest, JsonData.class); + + return parseTermsAggResults(response, columnFieldPath); + } + + private Map> parseTermsAggResults( + SearchResponse response, String columnFieldPath) { + Map> columnsByName = new HashMap<>(); + + if (response.aggregations() == null + || !response.aggregations().containsKey(ColumnAggregator.AGG_PAGE_COLUMNS)) { + return columnsByName; + } + + StringTermsAggregate termsAgg = + response.aggregations().get(ColumnAggregator.AGG_PAGE_COLUMNS).sterms(); + + for (StringTermsBucket bucket : termsAgg.buckets().array()) { + String columnName = bucket.key().stringValue(); + + if (!bucket.aggregations().containsKey(ColumnAggregator.AGG_SAMPLE_DOCS)) { + continue; + } + + TopHitsAggregate topHits = + bucket.aggregations().get(ColumnAggregator.AGG_SAMPLE_DOCS).topHits(); + parseBucketHits(columnName, topHits, columnFieldPath, columnsByName); + } + + return columnsByName; + } + private SearchResponse executeSearch( ColumnAggregationRequest request, Query query, List indexes, String columnNameKeyword) throws IOException { - List> sources = - new ArrayList<>(); + List> sources = new ArrayList<>(); sources.add( - es.co.elastic.clients.util.NamedValue.of( + NamedValue.of( "column_name", CompositeAggregationSource.of( cas -> cas.terms(t -> t.field(columnNameKeyword).order(SortOrder.Asc))))); // Use full _source to avoid top_hits source-filter edge cases where combining root and nested // include paths can produce empty buckets. - Aggregation topHitsAgg = Aggregation.of(a -> a.topHits(th -> th.size(10))); + Aggregation topHitsAgg = + Aggregation.of(a -> a.topHits(th -> th.size(ColumnAggregator.SAMPLE_DOCS_PER_COLUMN))); Map subAggs = new HashMap<>(); - subAggs.put("sample_docs", topHitsAgg); + subAggs.put(ColumnAggregator.AGG_SAMPLE_DOCS, topHitsAgg); Map afterKey = request.getCursor() != null ? decodeCursor(request.getCursor()) : null; @@ -617,7 +850,7 @@ private SearchResponse executeSearch( return client.search(searchRequest, JsonData.class); } - private Map> parseAggregationResults( + private Map> parseCompositeAggResults( SearchResponse response, String columnFieldPath) { Map> columnsByName = new HashMap<>(); @@ -639,69 +872,72 @@ private Map> parseAggregationResults( } TopHitsAggregate topHits = bucket.aggregations().get("sample_docs").topHits(); - if (topHits == null || topHits.hits() == null || topHits.hits().hits().isEmpty()) { - continue; - } + parseBucketHits(columnName, topHits, columnFieldPath, columnsByName); + } - List occurrences = new ArrayList<>(); - // Track the original case column name from the document source - String originalCaseColumnName = null; - - for (Hit hit : topHits.hits().hits()) { - try { - JsonData source = hit.source(); - if (source == null) continue; - - JsonNode sourceNode = source.to(JsonNode.class); - String entityType = getTextField(sourceNode, "entityType"); - String entityFQN = getTextField(sourceNode, "fullyQualifiedName"); - String entityDisplayName = getTextField(sourceNode, "displayName"); - - String serviceName = getNestedField(sourceNode, "service", "name"); - String databaseName = getNestedField(sourceNode, "database", "name"); - String schemaName = getNestedField(sourceNode, "databaseSchema", "name"); - - // Get columns data from the correct path (e.g., "columns", "dataModel.columns", "fields") - JsonNode columnsData = getNestedJsonNode(sourceNode, columnFieldPath); - - if (columnsData != null && columnsData.isArray()) { - for (JsonNode columnData : columnsData) { - String colName = getTextField(columnData, "name"); - // ES keyword aggregation lowercases the column names, so use case-insensitive - // comparison - if (columnName.equalsIgnoreCase(colName)) { - // Preserve the original case column name from the first match - if (originalCaseColumnName == null) { - originalCaseColumnName = colName; - } - Column column = parseColumn(columnData, entityFQN); - - ColumnWithContext columnCtx = - new ColumnWithContext( - column, - entityType, - entityFQN, - entityDisplayName, - serviceName, - databaseName, - schemaName); + return columnsByName; + } + + /** Parse top_hits from a single bucket (shared by composite and terms agg parsing). */ + private void parseBucketHits( + String columnName, + TopHitsAggregate topHits, + String columnFieldPath, + Map> columnsByName) { + + if (topHits == null || topHits.hits() == null || topHits.hits().hits().isEmpty()) { + return; + } + + List occurrences = new ArrayList<>(); + String originalCaseColumnName = null; - occurrences.add(columnCtx); - break; + for (Hit hit : topHits.hits().hits()) { + try { + JsonData source = hit.source(); + if (source == null) continue; + + JsonNode sourceNode = source.to(JsonNode.class); + String entityType = getTextField(sourceNode, "entityType"); + String entityFQN = getTextField(sourceNode, "fullyQualifiedName"); + String entityDisplayName = getTextField(sourceNode, "displayName"); + + String serviceName = getNestedField(sourceNode, "service", "name"); + String databaseName = getNestedField(sourceNode, "database", "name"); + String schemaName = getNestedField(sourceNode, "databaseSchema", "name"); + + JsonNode columnsData = getNestedJsonNode(sourceNode, columnFieldPath); + + if (columnsData != null && columnsData.isArray()) { + for (JsonNode columnData : columnsData) { + String colName = getTextField(columnData, "name"); + if (columnName.equalsIgnoreCase(colName)) { + if (originalCaseColumnName == null) { + originalCaseColumnName = colName; } + Column column = parseColumn(columnData, entityFQN); + + occurrences.add( + new ColumnWithContext( + column, + entityType, + entityFQN, + entityDisplayName, + serviceName, + databaseName, + schemaName)); + break; } } - } catch (Exception e) { - LOG.warn("Failed to parse column occurrence from search hit", e); } - } - - if (!occurrences.isEmpty() && originalCaseColumnName != null) { - columnsByName.put(originalCaseColumnName, occurrences); + } catch (Exception e) { + LOG.warn("Failed to parse column occurrence from search hit", e); } } - return columnsByName; + if (!occurrences.isEmpty() && originalCaseColumnName != null) { + columnsByName.put(originalCaseColumnName, occurrences); + } } /** Navigate nested JSON path like "dataModel.columns" or "messageSchema.schemaFields" */ @@ -848,12 +1084,11 @@ private String encodeCursor(Map afterKey) { } } - @SuppressWarnings("unchecked") private Map decodeCursor(String cursor) { try { byte[] decoded = Base64.getDecoder().decode(cursor); String json = new String(decoded, StandardCharsets.UTF_8); - Map stringMap = JsonUtils.readValue(json, Map.class); + Map stringMap = JsonUtils.readValue(json, new TypeReference<>() {}); Map result = new HashMap<>(); for (Map.Entry entry : stringMap.entrySet()) { result.put(entry.getKey(), FieldValue.of(entry.getValue())); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java index 88a98a267f5b..28b5f3be543e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchColumnAggregator.java @@ -25,8 +25,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.api.data.ColumnGridItem; import org.openmetadata.schema.api.data.ColumnGridResponse; @@ -47,6 +50,8 @@ import os.org.opensearch.client.opensearch._types.aggregations.CompositeAggregate; import os.org.opensearch.client.opensearch._types.aggregations.CompositeAggregationSource; import os.org.opensearch.client.opensearch._types.aggregations.CompositeBucket; +import os.org.opensearch.client.opensearch._types.aggregations.StringTermsAggregate; +import os.org.opensearch.client.opensearch._types.aggregations.StringTermsBucket; import os.org.opensearch.client.opensearch._types.aggregations.TopHitsAggregate; import os.org.opensearch.client.opensearch._types.query_dsl.BoolQuery; import os.org.opensearch.client.opensearch._types.query_dsl.Query; @@ -73,61 +78,41 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr request.getTags(), request.getGlossaryTerms()); - // Two-phase query for tags/glossaryTerms filtering: - // Phase 1: Find entityFQN#columnName pairs that have the tag - // Phase 2: Filter to only return those specific occurrences + // Tag/glossary filter path: we must read _source to check which specific column has + // the tag (ES flat object mapping can't tell us). Since we're already reading _source, + // we extract full column metadata in the same pass — no separate data-fetch query needed. boolean hasTagFilter = !nullOrEmpty(request.getTags()) || !nullOrEmpty(request.getGlossaryTerms()); - Set entityColumnPairsWithTags = null; - List columnNamesWithTags = null; - - LOG.info("hasTagFilter={}", hasTagFilter); if (hasTagFilter) { - entityColumnPairsWithTags = getEntityColumnPairsWithTags(request); - LOG.info("Phase1 result: entityColumnPairsWithTags={}", entityColumnPairsWithTags); - if (entityColumnPairsWithTags.isEmpty()) { - LOG.info("No columns found with tags, returning empty response"); + Map> taggedColumns = getColumnsWithTagsFromSource(request); + if (taggedColumns.isEmpty()) { return buildResponse(new ArrayList<>(), null, false, 0, 0); } - // Also keep just the column names for the Phase 2 query filter - columnNamesWithTags = - entityColumnPairsWithTags.stream() - .map(pair -> pair.substring(pair.indexOf('#') + 1)) - .collect(java.util.stream.Collectors.toList()); + + // Pattern + tag combined: filter the already-fetched columns by pattern in Java + if (!nullOrEmpty(request.getColumnNamePattern())) { + String pattern = request.getColumnNamePattern().toLowerCase(Locale.ROOT); + taggedColumns + .entrySet() + .removeIf(e -> !e.getKey().toLowerCase(Locale.ROOT).contains(pattern)); + } + + return aggregateColumnsWithKnownNames(request, taggedColumns); } - // Phase 2: Build query WITHOUT tag filter but WITH column names filter - Query query = buildFilters(request, columnNamesWithTags); + // Pattern-only path (no tag filter): use terms agg with include regex + if (!nullOrEmpty(request.getColumnNamePattern())) { + return aggregateColumnsWithPattern(request); + } + + // Browse path: scope filters + composite agg with after_key cursor. + Query query = buildFilters(request, null); try { SearchResponse response = executeSearch(request, query); - Map> columnsByName = parseAggregationResults(response); - - // Post-filter columns by name pattern since ES aggregation returns all columns from matched - // documents - String columnNamePattern = request.getColumnNamePattern(); - if (!nullOrEmpty(columnNamePattern)) { - columnsByName - .entrySet() - .removeIf(e -> !matchesColumnNamePattern(e.getKey(), columnNamePattern)); - } - - // Post-filter for tag/glossary terms filtering: Only keep occurrences that were - // identified in Phase 1 as having the tag (not just same column name) - if (entityColumnPairsWithTags != null && !entityColumnPairsWithTags.isEmpty()) { - final Set allowedPairs = entityColumnPairsWithTags; - for (List occurrences : columnsByName.values()) { - occurrences.removeIf( - ctx -> { - String key = ctx.entityFQN + "#" + ctx.column.getName(); - return !allowedPairs.contains(key); - }); - } - // Remove column entries that have no occurrences left - columnsByName.entrySet().removeIf(e -> e.getValue().isEmpty()); - } + Map> columnsByName = parseCompositeAggResults(response); List gridItems = ColumnMetadataGrouper.groupColumns(columnsByName); @@ -136,14 +121,11 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr int totalUniqueColumns; int totalOccurrences; - // Get totals from ES aggregation only when no column name pattern - // (ES aggregation counts all columns from matched docs, not just filtered ones) - if (request.getCursor() == null && nullOrEmpty(request.getColumnNamePattern())) { + if (request.getCursor() == null) { Map totals = getTotalCounts(query); totalUniqueColumns = totals.get("uniqueColumns").intValue(); totalOccurrences = totals.get("totalOccurrences").intValue(); } else { - // Calculate from actual filtered data when pattern is specified or on subsequent pages totalUniqueColumns = columnsByName.size(); totalOccurrences = gridItems.stream().mapToInt(ColumnGridItem::getTotalOccurrences).sum(); } @@ -159,26 +141,111 @@ public ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) thr } /** - * Phase 1: Get entityFQN#columnName pairs that have the specified tags. Since ES flattens arrays, - * we must fetch column data and filter in Java to find columns that actually have the tag. + * Pattern-only search path (no tag filter): uses terms aggregation with include regex to filter + * column names at the aggregation level. Two queries: (1) lightweight names query to get all + * matching names and total count, (2) targeted data query with top_hits for the current page. */ - private Set getEntityColumnPairsWithTags(ColumnAggregationRequest request) + private ColumnGridResponse aggregateColumnsWithPattern(ColumnAggregationRequest request) throws IOException { - Set entityColumnPairs = new HashSet<>(); + + Query query = buildFilters(request, null); + String regex = ColumnAggregator.toCaseInsensitiveRegex(request.getColumnNamePattern()); + + try { + ColumnAggregator.NamesWithCount phase1 = executeNamesQuery(query, regex); + Set dedupedNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + dedupedNames.addAll(phase1.names()); + + int totalUniqueColumns = dedupedNames.size(); + int totalOccurrences = ColumnAggregator.toIntSaturating(phase1.totalDocCount()); + int offset = ColumnAggregator.decodeSearchOffset(request.getCursor()); + int pageSize = request.getSize(); + + List sortedNames = new ArrayList<>(dedupedNames); + int fromIndex = Math.min(offset, sortedNames.size()); + int toIndex = Math.min(offset + pageSize, sortedNames.size()); + List pageNames = sortedNames.subList(fromIndex, toIndex); + + if (pageNames.isEmpty()) { + return buildResponse(new ArrayList<>(), null, false, totalUniqueColumns, totalOccurrences); + } + + Map> columnsByName = executePageDataQuery(query, pageNames); + + List gridItems = ColumnMetadataGrouper.groupColumns(columnsByName); + + boolean hasMore = toIndex < totalUniqueColumns; + String cursor = hasMore ? ColumnAggregator.encodeSearchOffset(toIndex) : null; + + return buildResponse(gridItems, cursor, hasMore, totalUniqueColumns, totalOccurrences); + } catch (OpenSearchException e) { + if (isIndexNotFoundException(e)) { + LOG.warn("Search index not found, returning empty results"); + return buildResponse(new ArrayList<>(), null, false, 0, 0); + } + throw e; + } + } + + /** + * Tag/glossary filter path: the tag-check pass already extracted full column metadata from + * _source (only tagged columns are in the map). Just paginate over the in-memory result. + */ + private ColumnGridResponse aggregateColumnsWithKnownNames( + ColumnAggregationRequest request, Map> taggedColumns) { + + int totalUniqueColumns = taggedColumns.size(); + int totalOccurrences = taggedColumns.values().stream().mapToInt(List::size).sum(); + int offset = ColumnAggregator.decodeSearchOffset(request.getCursor()); + int pageSize = request.getSize(); + + List sortedNames = new ArrayList<>(taggedColumns.keySet()); + int fromIndex = Math.min(offset, sortedNames.size()); + int toIndex = Math.min(offset + pageSize, sortedNames.size()); + List pageNames = sortedNames.subList(fromIndex, toIndex); + + if (pageNames.isEmpty()) { + return buildResponse(new ArrayList<>(), null, false, totalUniqueColumns, totalOccurrences); + } + + Map> pageColumns = new HashMap<>(); + for (String name : pageNames) { + List occurrences = taggedColumns.get(name); + if (occurrences != null) { + pageColumns.put(name, occurrences); + } + } + + List gridItems = ColumnMetadataGrouper.groupColumns(pageColumns); + + boolean hasMore = toIndex < totalUniqueColumns; + String cursor = hasMore ? ColumnAggregator.encodeSearchOffset(toIndex) : null; + + return buildResponse(gridItems, cursor, hasMore, totalUniqueColumns, totalOccurrences); + } + + /** + * Fetch columns with matching tags from _source. ES flat object mapping means we can't filter + * "column X has tag Y" at query level, so we read _source and check in Java. Since we already + * have the full document, we extract column metadata here — avoiding a separate data-fetch query. + */ + private Map> getColumnsWithTagsFromSource( + ColumnAggregationRequest request) throws IOException { + Map> columnsByName = + new TreeMap<>(String.CASE_INSENSITIVE_ORDER); Set targetTags = buildTargetTagSet(request); Query query = buildTagFilterQuery(request); try { - Set matchingPairs = fetchEntityColumnPairsWithTags(query, targetTags); - entityColumnPairs.addAll(matchingPairs); + fetchColumnsWithTagsFromSource(query, targetTags, columnsByName); } catch (OpenSearchException e) { if (!isIndexNotFoundException(e)) { throw e; } } - return entityColumnPairs; + return columnsByName; } private Set buildTargetTagSet(ColumnAggregationRequest request) { @@ -199,37 +266,31 @@ private List resolveIndexNames() { .toList(); } - private Set fetchEntityColumnPairsWithTags(Query query, Set targetTags) + private void fetchColumnsWithTagsFromSource( + Query query, Set targetTags, Map> columnsByName) throws IOException { - Set entityColumnPairs = new HashSet<>(); List resolvedIndexes = resolveIndexNames(); SearchRequest searchRequest = SearchRequest.of(s -> s.index(resolvedIndexes).query(query).size(10000)); SearchResponse response = client.search(searchRequest, JsonData.class); - long totalHits = response.hits().total() != null ? response.hits().total().value() : 0; - LOG.info( - "Phase1 fetchEntityColumnPairsWithTags: indexes={}, targetTags={}, totalHits={}", - resolvedIndexes, - targetTags, - totalHits); + if (totalHits > 10000) { + LOG.warn( + "Tag/glossary source-fetch matched {} entities; only first 10000 scanned.", totalHits); + } for (os.org.opensearch.client.opensearch.core.search.Hit hit : response.hits().hits()) { - extractMatchingEntityColumnPairs(hit, targetTags, entityColumnPairs); + extractMatchingColumnsFromHit(hit, targetTags, columnsByName); } - - LOG.info("Phase1 fetchEntityColumnPairsWithTags: found pairs: {}", entityColumnPairs); - - return entityColumnPairs; } - private void extractMatchingEntityColumnPairs( + private void extractMatchingColumnsFromHit( os.org.opensearch.client.opensearch.core.search.Hit hit, Set targetTags, - Set entityColumnPairs) { + Map> columnsByName) { if (hit.source() == null) { return; } @@ -240,19 +301,35 @@ private void extractMatchingEntityColumnPairs( return; } + String entityType = getTextField(sourceNode, "entityType"); + String entityDisplayName = getTextField(sourceNode, "displayName"); + String serviceName = getNestedField(sourceNode, "service", "name"); + String databaseName = getNestedField(sourceNode, "database", "name"); + String schemaName = getNestedField(sourceNode, "databaseSchema", "name"); + JsonNode columnsData = sourceNode.get("columns"); if (columnsData != null && columnsData.isArray()) { for (JsonNode columnData : columnsData) { String colName = getTextField(columnData, "name"); - boolean hasTag = columnHasTargetTag(columnData, targetTags); - if (hasTag && colName != null) { - entityColumnPairs.add(entityFQN + "#" + colName); + if (colName != null && columnHasTargetTag(columnData, targetTags)) { + Column column = parseColumn(columnData, entityFQN); + columnsByName + .computeIfAbsent(colName, k -> new ArrayList<>()) + .add( + new ColumnWithContext( + column, + entityType, + entityFQN, + entityDisplayName, + serviceName, + databaseName, + schemaName)); } } } } catch (Exception e) { - LOG.warn("Failed to extract entity column pairs from hit", e); + LOG.warn("Failed to extract columns from hit", e); } } @@ -279,12 +356,27 @@ private boolean containsIgnoreCase(Set set, String value) { return false; } - /** Build query specifically for tag filtering (Phase 1) */ + /** + * Build query for tag filtering source fetch. Includes all scope filters (service, database, + * schema, domain, entityType), column-name pattern, and metadataStatus so the _source fetch is + * scoped to the same data as the main query. Per-column correlation (which specific column has + * the tag + matches the pattern) still happens in Java because flat object mapping prevents + * expressing it at query level. + */ private Query buildTagFilterQuery(ColumnAggregationRequest request) { BoolQuery.Builder boolBuilder = new BoolQuery.Builder(); boolBuilder.filter(Query.of(q -> q.exists(e -> e.field("columns")))); + addEntityTypeFilter(boolBuilder, request); + addServiceFilter(boolBuilder, request); + addServiceTypeFilter(boolBuilder, request); + addDatabaseFilter(boolBuilder, request); + addSchemaFilter(boolBuilder, request); + addDomainFilter(boolBuilder, request); + addColumnNamePatternFilter(boolBuilder, request); + addMetadataStatusFilter(boolBuilder, request); + List allTags = new ArrayList<>(); if (!nullOrEmpty(request.getTags())) { allTags.addAll(request.getTags()); @@ -315,15 +407,6 @@ private String escapeWildcardPattern(String input) { return input.replace("\\", "\\\\").replace("*", "\\*").replace("?", "\\?"); } - private boolean matchesColumnNamePattern(String columnName, String pattern) { - if (nullOrEmpty(pattern)) { - return true; - } - String lowerColumnName = columnName.toLowerCase(); - String lowerPattern = pattern.toLowerCase(); - return lowerColumnName.contains(lowerPattern); - } - /** * Build filters for the main query. When columnNamesFromTagFilter is provided (two-phase query), * skip tag/glossaryTerms filters and use column names filter instead. @@ -516,6 +599,106 @@ private Query hasEmptyOrMissingField(String field) { .minimumShouldMatch("1"))); } + /** Phase 1: Get all matching column names using terms agg with include regex (no top_hits). */ + private ColumnAggregator.NamesWithCount executeNamesQuery(Query query, String regex) + throws IOException { + Aggregation termsAgg = + Aggregation.of( + a -> + a.terms( + t -> + t.field("columns.name.keyword") + .include(inc -> inc.regexp(regex)) + .size(ColumnAggregator.MAX_PATTERN_SEARCH_NAMES) + .order( + List.of(Map.of(ColumnAggregator.AGG_KEY_ORDER, SortOrder.Asc))))); + + SearchRequest searchRequest = + SearchRequest.of( + s -> + s.index(resolveIndexNames()) + .query(query) + .aggregations(ColumnAggregator.AGG_MATCHING_COLUMNS, termsAgg) + .size(0)); + + SearchResponse response = client.search(searchRequest, JsonData.class); + + List names = new ArrayList<>(); + long totalDocCount = 0; + if (response.aggregations() != null + && response.aggregations().containsKey(ColumnAggregator.AGG_MATCHING_COLUMNS)) { + StringTermsAggregate termsResult = + response.aggregations().get(ColumnAggregator.AGG_MATCHING_COLUMNS).sterms(); + for (StringTermsBucket bucket : termsResult.buckets().array()) { + names.add(bucket.key()); + totalDocCount += bucket.docCount(); + } + if (names.size() == ColumnAggregator.MAX_PATTERN_SEARCH_NAMES) { + LOG.warn( + "Column name pattern matched at least {} distinct names; results truncated", + ColumnAggregator.MAX_PATTERN_SEARCH_NAMES); + } + } + return new ColumnAggregator.NamesWithCount(names, totalDocCount); + } + + /** Phase 2: Get data for specific column names using terms agg with exact include + top_hits. */ + private Map> executePageDataQuery( + Query query, List columnNames) throws IOException { + + Aggregation topHitsAgg = + Aggregation.of(a -> a.topHits(th -> th.size(ColumnAggregator.SAMPLE_DOCS_PER_COLUMN))); + + Aggregation termsAgg = + Aggregation.of( + a -> + a.terms( + t -> + t.field("columns.name.keyword") + .include(inc -> inc.terms(columnNames)) + .size(columnNames.size())) + .aggregations(ColumnAggregator.AGG_SAMPLE_DOCS, topHitsAgg)); + + SearchRequest searchRequest = + SearchRequest.of( + s -> + s.index(resolveIndexNames()) + .query(query) + .aggregations(ColumnAggregator.AGG_PAGE_COLUMNS, termsAgg) + .size(0)); + + SearchResponse response = client.search(searchRequest, JsonData.class); + + return parseTermsAggResults(response); + } + + private Map> parseTermsAggResults( + SearchResponse response) { + Map> columnsByName = new HashMap<>(); + + if (response.aggregations() == null + || !response.aggregations().containsKey(ColumnAggregator.AGG_PAGE_COLUMNS)) { + return columnsByName; + } + + StringTermsAggregate termsAgg = + response.aggregations().get(ColumnAggregator.AGG_PAGE_COLUMNS).sterms(); + + for (StringTermsBucket bucket : termsAgg.buckets().array()) { + String columnName = bucket.key(); + + if (!bucket.aggregations().containsKey(ColumnAggregator.AGG_SAMPLE_DOCS)) { + continue; + } + + TopHitsAggregate topHits = + bucket.aggregations().get(ColumnAggregator.AGG_SAMPLE_DOCS).topHits(); + parseBucketHits(columnName, topHits, columnsByName); + } + + return columnsByName; + } + private SearchResponse executeSearch(ColumnAggregationRequest request, Query query) throws IOException { Map sources = new HashMap<>(); @@ -525,14 +708,10 @@ private SearchResponse executeSearch(ColumnAggregationRequest request, cas -> cas.terms(t -> t.field("columns.name.keyword").order(SortOrder.Asc)))); Aggregation topHitsAgg = - Aggregation.of( - a -> - // Use full _source to avoid OpenSearch top_hits source-filter edge cases where - // mixing root + nested include paths can return empty buckets unexpectedly. - a.topHits(th -> th.size(100))); + Aggregation.of(a -> a.topHits(th -> th.size(ColumnAggregator.SAMPLE_DOCS_PER_COLUMN))); Map subAggs = new HashMap<>(); - subAggs.put("sample_docs", topHitsAgg); + subAggs.put(ColumnAggregator.AGG_SAMPLE_DOCS, topHitsAgg); Map afterKey = request.getCursor() != null ? decodeCursorAsFieldValues(request.getCursor()) : null; @@ -560,7 +739,7 @@ private SearchResponse executeSearch(ColumnAggregationRequest request, return client.search(searchRequest, JsonData.class); } - private Map> parseAggregationResults( + private Map> parseCompositeAggResults( SearchResponse response) { Map> columnsByName = new HashMap<>(); @@ -578,73 +757,77 @@ private Map> parseAggregationResults( FieldValue fieldValue = bucket.key().get("column_name"); String columnName = fieldValue != null ? fieldValue.stringValue() : null; - if (!bucket.aggregations().containsKey("sample_docs")) { + if (!bucket.aggregations().containsKey(ColumnAggregator.AGG_SAMPLE_DOCS)) { continue; } - TopHitsAggregate topHits = bucket.aggregations().get("sample_docs").topHits(); - if (topHits == null || topHits.hits() == null || topHits.hits().hits().isEmpty()) { - continue; - } + TopHitsAggregate topHits = + bucket.aggregations().get(ColumnAggregator.AGG_SAMPLE_DOCS).topHits(); + parseBucketHits(columnName, topHits, columnsByName); + } - List occurrences = new ArrayList<>(); - // Track the original case column name from the document source - String originalCaseColumnName = null; - - for (Hit hit : topHits.hits().hits()) { - try { - JsonData source = hit.source(); - if (source == null) continue; - - JsonNode sourceNode = source.to(JsonNode.class); - String entityType = getTextField(sourceNode, "entityType"); - String entityFQN = getTextField(sourceNode, "fullyQualifiedName"); - String entityDisplayName = getTextField(sourceNode, "displayName"); - - String serviceName = getNestedField(sourceNode, "service", "name"); - String databaseName = getNestedField(sourceNode, "database", "name"); - String schemaName = getNestedField(sourceNode, "databaseSchema", "name"); - - JsonNode columnsData = sourceNode.get("columns"); - - if (columnsData != null && columnsData.isArray()) { - for (JsonNode columnData : columnsData) { - String colName = getTextField(columnData, "name"); - // ES keyword aggregation lowercases the column names, so use case-insensitive - // comparison - if (columnName.equalsIgnoreCase(colName)) { - // Preserve the original case column name from the first match - if (originalCaseColumnName == null) { - originalCaseColumnName = colName; - } - Column column = parseColumn(columnData, entityFQN); - - ColumnWithContext columnCtx = - new ColumnWithContext( - column, - entityType, - entityFQN, - entityDisplayName, - serviceName, - databaseName, - schemaName); + return columnsByName; + } + + /** Parse top_hits from a single bucket (shared by composite and terms agg parsing). */ + private void parseBucketHits( + String columnName, + TopHitsAggregate topHits, + Map> columnsByName) { + + if (topHits == null || topHits.hits() == null || topHits.hits().hits().isEmpty()) { + return; + } - occurrences.add(columnCtx); - break; + List occurrences = new ArrayList<>(); + String originalCaseColumnName = null; + + for (Hit hit : topHits.hits().hits()) { + try { + JsonData source = hit.source(); + if (source == null) continue; + + JsonNode sourceNode = source.to(JsonNode.class); + String entityType = getTextField(sourceNode, "entityType"); + String entityFQN = getTextField(sourceNode, "fullyQualifiedName"); + String entityDisplayName = getTextField(sourceNode, "displayName"); + + String serviceName = getNestedField(sourceNode, "service", "name"); + String databaseName = getNestedField(sourceNode, "database", "name"); + String schemaName = getNestedField(sourceNode, "databaseSchema", "name"); + + JsonNode columnsData = sourceNode.get("columns"); + + if (columnsData != null && columnsData.isArray()) { + for (JsonNode columnData : columnsData) { + String colName = getTextField(columnData, "name"); + if (columnName.equalsIgnoreCase(colName)) { + if (originalCaseColumnName == null) { + originalCaseColumnName = colName; } + Column column = parseColumn(columnData, entityFQN); + + occurrences.add( + new ColumnWithContext( + column, + entityType, + entityFQN, + entityDisplayName, + serviceName, + databaseName, + schemaName)); + break; } } - } catch (Exception e) { - LOG.warn("Failed to parse column occurrence from search hit", e); } - } - - if (!occurrences.isEmpty() && originalCaseColumnName != null) { - columnsByName.put(originalCaseColumnName, occurrences); + } catch (Exception e) { + LOG.warn("Failed to parse column occurrence from search hit", e); } } - return columnsByName; + if (!occurrences.isEmpty() && originalCaseColumnName != null) { + columnsByName.put(originalCaseColumnName, occurrences); + } } private String getTextField(JsonNode node, String field) { @@ -794,12 +977,11 @@ private Map decodeCursorAsFieldValues(String cursor) { } } - @SuppressWarnings("unchecked") private Map decodeCursor(String cursor) { try { byte[] decoded = Base64.getDecoder().decode(cursor); String json = new String(decoded, StandardCharsets.UTF_8); - return JsonUtils.readValue(json, Map.class); + return JsonUtils.readValue(json, new TypeReference<>() {}); } catch (Exception e) { LOG.error("Failed to decode cursor", e); return new HashMap<>(); diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/search/ColumnAggregatorTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/search/ColumnAggregatorTest.java new file mode 100644 index 000000000000..b0f8259f010e --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/search/ColumnAggregatorTest.java @@ -0,0 +1,112 @@ +/* + * Copyright 2025 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.service.search; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.regex.Pattern; +import org.junit.jupiter.api.Test; + +class ColumnAggregatorTest { + + @Test + void toCaseInsensitiveRegex_simpleAlpha() { + String regex = ColumnAggregator.toCaseInsensitiveRegex("MAT"); + + assertEquals(".*[mM][aA][tT].*", regex); + Pattern pattern = Pattern.compile(regex); + assertTrue(pattern.matcher("MAT").matches()); + assertTrue(pattern.matcher("mat").matches()); + assertTrue(pattern.matcher("Mat").matches()); + assertTrue(pattern.matcher("MATNR").matches()); + assertTrue(pattern.matcher("some_mat_column").matches()); + assertFalse(pattern.matcher("MBA").matches()); + } + + @Test + void toCaseInsensitiveRegex_mixedCase() { + String regex = ColumnAggregator.toCaseInsensitiveRegex("MaTnR"); + + Pattern pattern = Pattern.compile(regex); + assertTrue(pattern.matcher("MATNR").matches()); + assertTrue(pattern.matcher("matnr").matches()); + assertTrue(pattern.matcher("MaTnR").matches()); + assertFalse(pattern.matcher("MATMR").matches()); + } + + @Test + void toCaseInsensitiveRegex_withDigits() { + String regex = ColumnAggregator.toCaseInsensitiveRegex("col1"); + + Pattern pattern = Pattern.compile(regex); + assertTrue(pattern.matcher("COL1").matches()); + assertTrue(pattern.matcher("col1").matches()); + assertTrue(pattern.matcher("my_col1_name").matches()); + assertFalse(pattern.matcher("col2").matches()); + } + + @Test + void toCaseInsensitiveRegex_withUnderscore() { + String regex = ColumnAggregator.toCaseInsensitiveRegex("col_name"); + + Pattern pattern = Pattern.compile(regex); + assertTrue(pattern.matcher("col_name").matches()); + assertTrue(pattern.matcher("COL_NAME").matches()); + assertTrue(pattern.matcher("my_col_name_here").matches()); + } + + @Test + void toCaseInsensitiveRegex_escapesRegexSpecialChars() { + String regex = ColumnAggregator.toCaseInsensitiveRegex("col.name"); + + Pattern pattern = Pattern.compile(regex); + assertTrue(pattern.matcher("col.name").matches()); + // Dot should be literal, not wildcard + assertFalse(pattern.matcher("colXname").matches()); + } + + @Test + void toCaseInsensitiveRegex_singleChar() { + String regex = ColumnAggregator.toCaseInsensitiveRegex("a"); + + assertEquals(".*[aA].*", regex); + Pattern pattern = Pattern.compile(regex); + assertTrue(pattern.matcher("A").matches()); + assertTrue(pattern.matcher("abc").matches()); + assertTrue(pattern.matcher("XAY").matches()); + } + + @Test + void toCaseInsensitiveRegex_emptyString() { + String regex = ColumnAggregator.toCaseInsensitiveRegex(""); + + assertEquals(".*.*", regex); + Pattern pattern = Pattern.compile(regex); + assertTrue(pattern.matcher("anything").matches()); + assertTrue(pattern.matcher("").matches()); + } + + @Test + void toCaseInsensitiveRegex_specialCharsAreEscaped() { + String regex = ColumnAggregator.toCaseInsensitiveRegex("a+b*c?"); + + Pattern pattern = Pattern.compile(regex); + assertTrue(pattern.matcher("a+b*c?").matches()); + assertTrue(pattern.matcher("prefix_a+b*c?_suffix").matches()); + // Plus and star should be literal, not regex quantifiers + assertFalse(pattern.matcher("abbbbc").matches()); + } +}