diff --git a/deploy/docker/venice/docker-compose-single-dc-setup.yaml b/deploy/docker/venice/docker-compose-single-dc-setup.yaml index ce9ecc72..9c979b68 100644 --- a/deploy/docker/venice/docker-compose-single-dc-setup.yaml +++ b/deploy/docker/venice/docker-compose-single-dc-setup.yaml @@ -29,7 +29,7 @@ services: retries: 5 venice-controller: - image: venicedb/venice-controller:0.4.340 + image: venicedb/venice-controller:0.4.858 container_name: venice-controller hostname: venice-controller depends_on: @@ -45,7 +45,7 @@ services: retries: 5 venice-server: - image: venicedb/venice-server:0.4.340 + image: venicedb/venice-server:0.4.858 container_name: venice-server hostname: venice-server depends_on: @@ -59,7 +59,7 @@ services: retries: 5 venice-router: - image: venicedb/venice-router:0.4.340 + image: venicedb/venice-router:0.4.858 container_name: venice-router hostname: venice-router depends_on: @@ -75,7 +75,7 @@ services: retries: 5 venice-client: - image: venicedb/venice-client:0.4.340 + image: venicedb/venice-client:0.4.858 container_name: venice-client hostname: venice-client tty: true diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 6641eeba..c7a5a921 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -42,6 +42,6 @@ slf4j-simple = "org.slf4j:slf4j-simple:2.0.11" slf4j-api = "org.slf4j:slf4j-api:2.0.11" sqlline = "sqlline:sqlline:1.12.0" quidem = "net.hydromatic:quidem:0.11" -venice = "com.linkedin.venice:venice-common:0.4.697" -venice-client = "com.linkedin.venice:venice-thin-client:0.4.697" +venice = "com.linkedin.venice:venice-common:0.4.858" +venice-client = "com.linkedin.venice:venice-thin-client:0.4.858" yaml = "org.yaml:snakeyaml:1.33" diff --git a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/ClusterSchema.java b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/ClusterSchema.java index fbd1eb32..cfc374b1 100644 --- a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/ClusterSchema.java +++ b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/ClusterSchema.java @@ -4,10 +4,10 @@ import com.linkedin.venice.client.schema.StoreSchemaFetcher; import com.linkedin.venice.client.store.ClientConfig; import com.linkedin.venice.client.store.ClientFactory; -import com.linkedin.venice.controllerapi.ControllerClient; -import com.linkedin.venice.controllerapi.ControllerClientFactory; -import com.linkedin.venice.controllerapi.ControllerResponse; -import com.linkedin.venice.exceptions.ErrorType; +import com.linkedin.venice.client.store.RouterBasedStoreMetadataFetcher; +import com.linkedin.venice.client.store.StoreMetadataFetcher; +import com.linkedin.venice.client.store.transport.TransportClient; +import com.linkedin.venice.client.store.transport.TransportClientResponse; import com.linkedin.venice.security.SSLFactory; import com.linkedin.venice.utils.SslUtils; import org.apache.calcite.schema.Table; @@ -19,11 +19,14 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import static com.linkedin.hoptimator.util.DeploymentService.parseHints; @@ -33,27 +36,29 @@ public class ClusterSchema extends AbstractSchema { protected static final String SSL_FACTORY_CLASS_NAME = "ssl.factory.class.name"; protected static final String DEFAULT_SSL_FACTORY_CLASS_NAME = "com.linkedin.venice.security.DefaultSSLFactory"; - protected final Properties properties; + protected static final String ROUTER_URL = "router.url"; + protected static final String SSL_CONFIG_PATH = "ssl-config-path"; private static final String STORE_HINT_KEY_PREFIX = "venice.%s."; + private static final String DISCOVER_CLUSTER_PATH = "discover_cluster"; + private static final long DISCOVERY_TIMEOUT_SECONDS = 5; + + protected final Properties properties; private final LazyReference> tables = new LazyReference<>(); public ClusterSchema(Properties properties) { this.properties = properties; } - protected ControllerClient createControllerClient(String cluster, Optional sslFactory) { - String routerUrl = properties.getProperty("router.url"); - if (routerUrl.contains("localhost")) { - return new LocalControllerClient(cluster, routerUrl, sslFactory); - } else { - return ControllerClientFactory.getControllerClient(cluster, routerUrl, sslFactory); - } + protected StoreMetadataFetcher createStoreMetadataFetcher() { + return new RouterBasedStoreMetadataFetcher( + new ClientConfig<>() + .setVeniceURL(properties.getProperty(ROUTER_URL))); } protected StoreSchemaFetcher createStoreSchemaFetcher(String storeName) { return ClientFactory.createStoreSchemaFetcher( ClientConfig.defaultGenericClientConfig(storeName) - .setVeniceURL(properties.getProperty("router.url"))); + .setVeniceURL(properties.getProperty(ROUTER_URL))); } protected VeniceStore createVeniceStore(String store, StoreSchemaFetcher storeSchemaFetcher) { @@ -78,15 +83,14 @@ public Lookup tables() { @Override protected Map loadAll() throws Exception { Map tableMap = new HashMap<>(); - String clusterStr = properties.getProperty("clusters"); - String[] clusters = clusterStr.split(","); - - for (String cluster : clusters) { - try (ControllerClient controllerClient = createControllerClient(cluster, getSslFactory())) { - String[] stores = controllerClient.queryStoreList(false).getStores(); - for (String store : stores) { - StoreSchemaFetcher storeSchemaFetcher = createStoreSchemaFetcher(store); - tableMap.put(store, createVeniceStore(store, storeSchemaFetcher)); + try (StoreMetadataFetcher fetcher = createStoreMetadataFetcher()) { + Set storeNames = fetcher.getAllStoreNames(); + log.info("Discovered {} Venice stores via router /stores endpoint", storeNames.size()); + for (String storeName : storeNames) { + try { + tableMap.put(storeName, createVeniceStore(storeName, createStoreSchemaFetcher(storeName))); + } catch (Exception e) { + log.warn("Skipping Venice store {} due to setup failure", storeName, e); } } } @@ -95,47 +99,49 @@ protected Map loadAll() throws Exception { @Override protected @Nullable Table load(String name) throws Exception { - String clusterStr = properties.getProperty("clusters"); - String[] clusters = clusterStr.split(","); - - try (ControllerClient controllerClient = createControllerClient(clusters[0], getSslFactory())) { - ControllerResponse controllerResponse = controllerClient.discoverCluster(name); - if (controllerResponse.isError() && controllerResponse.getErrorType().equals(ErrorType.STORE_NOT_FOUND)) { - return null; - } else if (controllerResponse.isError()) { - throw new RuntimeException(String.format("Error fetching store errorType=%s, error=%s", - controllerResponse.getErrorType(), controllerResponse.getError())); - } - - // Venice does not currently have the ability to list all clusters so the "clusters" property - // is required as part of the JDBC driver. To keep loadTable and loadAllTables consistent, we - // check that the fetched store actually belongs to one of these clusters, otherwise we could end up - // with different results. - if (!Arrays.asList(clusters).contains(controllerResponse.getCluster())) { - return null; - } + if (!storeExists(name)) { + return null; } - - StoreSchemaFetcher storeSchemaFetcher = createStoreSchemaFetcher(name); - return createVeniceStore(name, storeSchemaFetcher); + return createVeniceStore(name, createStoreSchemaFetcher(name)); } @Override protected String getDescription() { - return "Venice clusters " + properties.getProperty("clusters"); - } - - private Optional getSslFactory() throws IOException { - String sslConfigPath = properties.getProperty("ssl-config-path"); - if (sslConfigPath != null) { - log.debug("Using ssl configs at {}", sslConfigPath); - Properties sslProperties = SslUtils.loadSSLConfig(sslConfigPath); - String sslFactoryClassName = - sslProperties.getProperty(SSL_FACTORY_CLASS_NAME, DEFAULT_SSL_FACTORY_CLASS_NAME); - return Optional.of(SslUtils.getSSLFactory(sslProperties, sslFactoryClassName)); - } - return Optional.empty(); + return "Venice router " + properties.getProperty(ROUTER_URL); } }); } + + /** + * Targeted per-store existence check against the router's {@code discover_cluster} endpoint + * over HTTP/HTTPS. A {@code null} response indicates HTTP 404 per the transport callback + * contract, which means the store does not exist. + */ + protected boolean storeExists(String storeName) throws IOException { + try (TransportClient transport = ClientFactory.getTransportClient( + new ClientConfig<>() + .setVeniceURL(properties.getProperty(ROUTER_URL)) + .setSslFactory(getSslFactory().orElse(null)))) { + TransportClientResponse response = transport.get(DISCOVER_CLUSTER_PATH + "/" + storeName) + .get(DISCOVERY_TIMEOUT_SECONDS, TimeUnit.SECONDS); + return response != null; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while discovering Venice store " + storeName, e); + } catch (ExecutionException | TimeoutException e) { + throw new IOException("Failed to discover Venice store " + storeName, e); + } + } + + protected Optional getSslFactory() throws IOException { + String sslConfigPath = properties.getProperty(SSL_CONFIG_PATH); + if (sslConfigPath != null) { + log.debug("Using ssl configs at {}", sslConfigPath); + Properties sslProperties = SslUtils.loadSSLConfig(sslConfigPath); + String sslFactoryClassName = + sslProperties.getProperty(SSL_FACTORY_CLASS_NAME, DEFAULT_SSL_FACTORY_CLASS_NAME); + return Optional.of(SslUtils.getSSLFactory(sslProperties, sslFactoryClassName)); + } + return Optional.empty(); + } } diff --git a/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/ClusterSchemaTest.java b/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/ClusterSchemaTest.java index c5b62272..b5040884 100644 --- a/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/ClusterSchemaTest.java +++ b/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/ClusterSchemaTest.java @@ -1,23 +1,27 @@ package com.linkedin.hoptimator.venice; import com.linkedin.venice.client.schema.StoreSchemaFetcher; -import com.linkedin.venice.controllerapi.ControllerClient; -import com.linkedin.venice.controllerapi.D2ServiceDiscoveryResponse; -import com.linkedin.venice.controllerapi.MultiStoreResponse; -import com.linkedin.venice.exceptions.ErrorType; -import com.linkedin.venice.security.SSLFactory; +import com.linkedin.venice.client.store.ClientConfig; +import com.linkedin.venice.client.store.ClientFactory; +import com.linkedin.venice.client.store.StoreMetadataFetcher; +import com.linkedin.venice.client.store.transport.TransportClient; +import com.linkedin.venice.client.store.transport.TransportClientResponse; import org.apache.calcite.schema.Table; import org.apache.calcite.schema.lookup.LikePattern; import org.apache.calcite.schema.lookup.Lookup; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; +import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; +import java.io.IOException; import java.util.Map; -import java.util.Optional; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CompletableFuture; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -25,36 +29,45 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) class ClusterSchemaTest { - @Mock - private ControllerClient mockControllerClient; - @Mock private StoreSchemaFetcher mockSchemaFetcher; @Mock private VeniceStore mockVeniceStore; + @Mock + private MockedStatic clientFactory; + private Properties properties; @BeforeEach void setUp() { properties = new Properties(); properties.setProperty("router.url", "http://localhost:1234"); - properties.setProperty("clusters", "test-cluster"); } - private ClusterSchema createTestableSchema() { + private ClusterSchema createTestableSchema(StoreMetadataFetcher metadataFetcher) { + return createTestableSchema(metadataFetcher, Set.of()); + } + + private ClusterSchema createTestableSchema(StoreMetadataFetcher metadataFetcher, Set existingStores) { return new ClusterSchema(properties) { @Override - protected ControllerClient createControllerClient(String cluster, Optional sslFactory) { - return mockControllerClient; + protected StoreMetadataFetcher createStoreMetadataFetcher() { + return metadataFetcher; } @Override @@ -66,9 +79,16 @@ protected StoreSchemaFetcher createStoreSchemaFetcher(String storeName) { protected VeniceStore createVeniceStore(String store, StoreSchemaFetcher storeSchemaFetcher) { return mockVeniceStore; } + + @Override + protected boolean storeExists(String storeName) { + return existingStores.contains(storeName); + } }; } + // --- filterStoreHints --- + @Test void testFilterStoreHintsReturnsMatchingHints() { ClusterSchema schema = new ClusterSchema(properties); @@ -99,149 +119,86 @@ void testFilterStoreHintsReturnsEmptyWhenNoMatch() { assertTrue(result.isEmpty()); } - // --- tables().get() / loadTable() tests --- + // --- tables().get() (load path uses storeExists, NOT the bulk /stores endpoint) --- @Test - void testLoadTableReturnsStoreWhenFound() { - D2ServiceDiscoveryResponse discoverResponse = mock(D2ServiceDiscoveryResponse.class); - when(discoverResponse.isError()).thenReturn(false); - when(discoverResponse.getCluster()).thenReturn("test-cluster"); - when(mockControllerClient.discoverCluster("myStore")).thenReturn(discoverResponse); + void testLoadReturnsTableWhenStoreFoundInDiscovery() { + StoreMetadataFetcher fetcher = mock(StoreMetadataFetcher.class); - ClusterSchema schema = createTestableSchema(); - Lookup
tables = schema.tables(); - Table result = tables.get("myStore"); + ClusterSchema schema = createTestableSchema(fetcher, Set.of("known-store")); + Table result = schema.tables().get("known-store"); assertNotNull(result); assertEquals(mockVeniceStore, result); } @Test - void testLoadTableReturnsNullWhenStoreNotFound() { - D2ServiceDiscoveryResponse discoverResponse = mock(D2ServiceDiscoveryResponse.class); - when(discoverResponse.isError()).thenReturn(true); - when(discoverResponse.getErrorType()).thenReturn(ErrorType.STORE_NOT_FOUND); - when(mockControllerClient.discoverCluster("unknownStore")).thenReturn(discoverResponse); + void testLoadReturnsNullWhenStoreNotFoundInDiscovery() { + StoreMetadataFetcher fetcher = mock(StoreMetadataFetcher.class); - ClusterSchema schema = createTestableSchema(); - Lookup
tables = schema.tables(); - Table result = tables.get("unknownStore"); + ClusterSchema schema = createTestableSchema(fetcher, Set.of()); + Table result = schema.tables().get("nonexistent-store"); assertNull(result); } @Test - void testLoadTableThrowsOnOtherError() { - D2ServiceDiscoveryResponse discoverResponse = mock(D2ServiceDiscoveryResponse.class); - when(discoverResponse.isError()).thenReturn(true); - when(discoverResponse.getErrorType()).thenReturn(ErrorType.GENERAL_ERROR); - when(discoverResponse.getError()).thenReturn("something went wrong"); - when(mockControllerClient.discoverCluster("badStore")).thenReturn(discoverResponse); - - ClusterSchema schema = createTestableSchema(); - Lookup
tables = schema.tables(); + void testLoadDoesNotHitMetadataFetcher() { + StoreMetadataFetcher fetcher = mock(StoreMetadataFetcher.class); + ClusterSchema schema = createTestableSchema(fetcher, Set.of("a", "b", "c")); - RuntimeException ex = assertThrows(RuntimeException.class, () -> tables.get("badStore")); - assertTrue(ex.getMessage().contains("badStore")); - } + schema.tables().get("a"); + schema.tables().get("b"); + schema.tables().get("c"); - @Test - void testLoadTableReturnsNullWhenStoreInDifferentCluster() { - D2ServiceDiscoveryResponse discoverResponse = mock(D2ServiceDiscoveryResponse.class); - when(discoverResponse.isError()).thenReturn(false); - when(discoverResponse.getCluster()).thenReturn("other-cluster"); - when(mockControllerClient.discoverCluster("remoteStore")).thenReturn(discoverResponse); - - ClusterSchema schema = createTestableSchema(); - Lookup
tables = schema.tables(); - Table result = tables.get("remoteStore"); - - assertNull(result); + // load() uses targeted per-store discovery (storeExists), never the bulk /stores endpoint. + verify(fetcher, never()).getAllStoreNames(); } - // --- loadAllTables tests --- + // --- tables().getNames(...) (loadAll path) --- @Test - void testLoadAllTablesPopulatesFromStoreList() { - MultiStoreResponse storeListResponse = mock(MultiStoreResponse.class); - when(storeListResponse.getStores()).thenReturn(new String[]{"store1", "store2"}); - when(mockControllerClient.queryStoreList(false)).thenReturn(storeListResponse); + void testLoadAllTablesPopulatesFromRouterStores() { + StoreMetadataFetcher fetcher = mock(StoreMetadataFetcher.class); + when(fetcher.getAllStoreNames()).thenReturn(Set.of("store1", "store2")); - ClusterSchema schema = createTestableSchema(); + ClusterSchema schema = createTestableSchema(fetcher); Lookup
tables = schema.tables(); - // getNames triggers loadAllTables - assertNotNull(tables.getNames(LikePattern.any())); - } - - // --- createControllerClient tests --- - - @Test - void testCreateControllerClientWithLocalhostUrl() { - properties.setProperty("router.url", "http://localhost:5555"); - ClusterSchema schema = new ClusterSchema(properties); - ControllerClient client = schema.createControllerClient("test-cluster", Optional.empty()); - assertNotNull(client); - client.close(); + Set names = tables.getNames(LikePattern.any()); + assertEquals(Set.of("store1", "store2"), names); } @Test - void testCreateControllerClientWithNonLocalhostUrl() { - // non-localhost URL takes the else branch → ControllerClientFactory path - // We can't easily call ControllerClientFactory in unit tests, so override it in a subclass - properties.setProperty("router.url", "http://venice.example.com:5555"); - ClusterSchema schema = new ClusterSchema(properties) { - @Override - protected ControllerClient createControllerClient(String cluster, Optional sslFactory) { - // verify the non-localhost branch would be reached (url does not contain localhost) - assertFalse(properties.getProperty("router.url").contains("localhost")); - return mockControllerClient; - } - }; - ControllerClient client = schema.createControllerClient("test-cluster", Optional.empty()); - assertNotNull(client); - } + void testLoadAllTablesAccessibleViaGet() { + StoreMetadataFetcher fetcher = mock(StoreMetadataFetcher.class); + when(fetcher.getAllStoreNames()).thenReturn(Set.of("storeA", "storeB")); - // --- getSchemaDescription() must return non-empty string --- - @Test - void testLoadAllTablesSchemaDescriptionIsNonEmpty() { - MultiStoreResponse storeListResponse = mock(MultiStoreResponse.class); - when(storeListResponse.getStores()).thenReturn(new String[]{"store1"}); - when(mockControllerClient.queryStoreList(false)).thenReturn(storeListResponse); - - ClusterSchema schema = createTestableSchema(); + ClusterSchema schema = createTestableSchema(fetcher); Lookup
tables = schema.tables(); - // getNames(LikePattern.any()) triggers loadAllTables and calls getSchemaDescription internally - Iterable names = tables.getNames(LikePattern.any()); - assertNotNull(names); + tables.getNames(LikePattern.any()); - // Verify description via the toString contains cluster info - // The description returned is "Venice clusters test-cluster" - assertNotNull(tables.toString()); // non-null confirms the table lookup was constructed + assertNotNull(tables.get("storeA")); + assertNotNull(tables.get("storeB")); } - // --- getSslFactory() no ssl-config-path → returns empty Optional --- - @Test - void testGetSslFactoryReturnsEmptyWhenNoSslConfigPath() { - // Properties without ssl-config-path: getSslFactory() should return Optional.empty() - // We verify this indirectly: loadAllTables() completes without throwing - MultiStoreResponse storeListResponse = mock(MultiStoreResponse.class); - when(storeListResponse.getStores()).thenReturn(new String[]{"store1"}); - when(mockControllerClient.queryStoreList(false)).thenReturn(storeListResponse); - - // properties does NOT have ssl-config-path + void testLoadAllSkipsStoreWhenSchemaFetcherConstructionFails() { + StoreMetadataFetcher fetcher = mock(StoreMetadataFetcher.class); + when(fetcher.getAllStoreNames()).thenReturn(Set.of("good-store", "bad-store")); + ClusterSchema schema = new ClusterSchema(properties) { @Override - protected ControllerClient createControllerClient(String cluster, Optional sslFactory) { - // ssl-config-path absent → sslFactory must be empty - assertFalse(sslFactory.isPresent()); - return mockControllerClient; + protected StoreMetadataFetcher createStoreMetadataFetcher() { + return fetcher; } @Override protected StoreSchemaFetcher createStoreSchemaFetcher(String storeName) { + if ("bad-store".equals(storeName)) { + throw new RuntimeException("schema fetcher setup failed"); + } return mockSchemaFetcher; } @@ -249,59 +206,118 @@ protected StoreSchemaFetcher createStoreSchemaFetcher(String storeName) { protected VeniceStore createVeniceStore(String store, StoreSchemaFetcher storeSchemaFetcher) { return mockVeniceStore; } + + @Override + protected boolean storeExists(String storeName) { + return true; + } }; Lookup
tables = schema.tables(); - Iterable names = tables.getNames(LikePattern.any()); - assertNotNull(names); + tables.getNames(LikePattern.any()); + + assertNotNull(tables.get("good-store")); + assertNull(tables.get("bad-store")); + } + + @Test + void testGetNamesTriggersMetadataFetcherOnce() { + StoreMetadataFetcher fetcher = mock(StoreMetadataFetcher.class); + when(fetcher.getAllStoreNames()).thenReturn(Set.of("a", "b")); + + ClusterSchema schema = createTestableSchema(fetcher); + schema.tables().getNames(LikePattern.any()); + schema.tables().getNames(LikePattern.any()); + + // loadAll() runs at most once per schema lifetime (LazyLookup caches the result). + verify(fetcher).getAllStoreNames(); + } + + // --- storeExists (real impl, HTTP/HTTPS transport) --- + + @Test + void testStoreExistsReturnsTrueOnNonNullResponse() throws Exception { + TransportClient transport = mock(TransportClient.class); + TransportClientResponse response = mock(TransportClientResponse.class); + doReturn(CompletableFuture.completedFuture(response)).when(transport).get(eq("discover_cluster/known")); + clientFactory.when(() -> ClientFactory.getTransportClient(any(ClientConfig.class))).thenReturn(transport); + + ClusterSchema schema = new ClusterSchema(properties); + assertTrue(schema.storeExists("known")); + } + + @Test + void testStoreExistsReturnsFalseOnNullResponse() throws Exception { + TransportClient transport = mock(TransportClient.class); + doReturn(CompletableFuture.completedFuture(null)).when(transport).get(eq("discover_cluster/missing")); + clientFactory.when(() -> ClientFactory.getTransportClient(any(ClientConfig.class))).thenReturn(transport); + + ClusterSchema schema = new ClusterSchema(properties); + assertFalse(schema.storeExists("missing")); + } + + @Test + void testStoreExistsWrapsExecutionFailureAsIOException() { + TransportClient transport = mock(TransportClient.class); + CompletableFuture failed = new CompletableFuture<>(); + failed.completeExceptionally(new RuntimeException("router down")); + doReturn(failed).when(transport).get(eq("discover_cluster/boom")); + clientFactory.when(() -> ClientFactory.getTransportClient(any(ClientConfig.class))).thenReturn(transport); + + ClusterSchema schema = new ClusterSchema(properties); + IOException ex = assertThrows(IOException.class, () -> schema.storeExists("boom")); + assertTrue(ex.getMessage().contains("boom")); } - // --- getSslFactory() with invalid ssl-config-path → throws IOException during loadAllTables --- + // --- getSslFactory --- + + @Test + void testGetSslFactoryReturnsEmptyWhenNoSslConfigPath() throws IOException { + ClusterSchema schema = new ClusterSchema(properties); + assertTrue(schema.getSslFactory().isEmpty()); + } @Test void testGetSslFactoryThrowsWhenSslConfigPathInvalid() { properties.setProperty("ssl-config-path", "/nonexistent/ssl.properties"); - - // No need to override createControllerClient — exception happens before reaching it ClusterSchema schema = new ClusterSchema(properties); - Lookup
tables = schema.tables(); - // getNames triggers loadAllTables which calls getSslFactory → IOException - assertThrows(RuntimeException.class, () -> tables.getNames(LikePattern.any())); + assertThrows(Exception.class, schema::getSslFactory); } - // --- loadAllTables() result must be non-empty when stores are registered --- + // --- description --- @Test - void testLoadAllTablesReturnsNonEmptyMapWhenStoresExist() { - MultiStoreResponse storeListResponse = mock(MultiStoreResponse.class); - when(storeListResponse.getStores()).thenReturn(new String[]{"storeA", "storeB"}); - when(mockControllerClient.queryStoreList(false)).thenReturn(storeListResponse); + void testTablesDescriptionMentionsRouterUrl() { + StoreMetadataFetcher fetcher = mock(StoreMetadataFetcher.class); + lenient().when(fetcher.getAllStoreNames()).thenReturn(Set.of("storeX")); - ClusterSchema schema = createTestableSchema(); + ClusterSchema schema = createTestableSchema(fetcher); Lookup
tables = schema.tables(); - // Trigger loadAllTables - Iterable names = tables.getNames(LikePattern.any()); - assertNotNull(names); + assertNotNull(tables.toString()); + } - // Verify that both stores are accessible via get() - assertNotNull(tables.get("storeA")); - assertNotNull(tables.get("storeB")); + // --- real ClientFactory wiring (createStoreSchemaFetcher) --- + + @Test + void testCreateStoreSchemaFetcherBuildsClientConfigFromRouterUrl() { + ArgumentCaptor captor = ArgumentCaptor.forClass(ClientConfig.class); + clientFactory.when(() -> ClientFactory.createStoreSchemaFetcher(captor.capture())).thenReturn(mockSchemaFetcher); + + ClusterSchema schema = new ClusterSchema(properties); + assertNotNull(schema.createStoreSchemaFetcher("myStore")); + assertEquals("myStore", captor.getValue().getStoreName()); + assertEquals("http://localhost:1234", captor.getValue().getVeniceURL()); } - // --- createVeniceStore with real hints --- + // --- createVeniceStore wires hints --- @Test void testCreateVeniceStoreWithFilteredHints() { properties.setProperty("hints", "venice.myStore.valueSchemaId=5"); ClusterSchema schema = new ClusterSchema(properties) { - @Override - protected ControllerClient createControllerClient(String cluster, Optional sslFactory) { - return mockControllerClient; - } - @Override protected StoreSchemaFetcher createStoreSchemaFetcher(String storeName) { return mockSchemaFetcher;