Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions deploy/docker/venice/docker-compose-single-dc-setup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ services:
retries: 5

venice-controller:
image: venicedb/venice-controller:0.4.340
image: venicedb/venice-controller:0.4.858
container_name: venice-controller
hostname: venice-controller
depends_on:
Expand All @@ -45,7 +45,7 @@ services:
retries: 5

venice-server:
image: venicedb/venice-server:0.4.340
image: venicedb/venice-server:0.4.858
container_name: venice-server
hostname: venice-server
depends_on:
Expand All @@ -59,7 +59,7 @@ services:
retries: 5

venice-router:
image: venicedb/venice-router:0.4.340
image: venicedb/venice-router:0.4.858
container_name: venice-router
hostname: venice-router
depends_on:
Expand All @@ -75,7 +75,7 @@ services:
retries: 5

venice-client:
image: venicedb/venice-client:0.4.340
image: venicedb/venice-client:0.4.858
container_name: venice-client
hostname: venice-client
tty: true
Expand Down
4 changes: 2 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ slf4j-simple = "org.slf4j:slf4j-simple:2.0.11"
slf4j-api = "org.slf4j:slf4j-api:2.0.11"
sqlline = "sqlline:sqlline:1.12.0"
quidem = "net.hydromatic:quidem:0.11"
venice = "com.linkedin.venice:venice-common:0.4.697"
venice-client = "com.linkedin.venice:venice-thin-client:0.4.697"
venice = "com.linkedin.venice:venice-common:0.4.858"
venice-client = "com.linkedin.venice:venice-thin-client:0.4.858"
yaml = "org.yaml:snakeyaml:1.33"
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import com.linkedin.venice.client.schema.StoreSchemaFetcher;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerClientFactory;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.exceptions.ErrorType;
import com.linkedin.venice.client.store.RouterBasedStoreMetadataFetcher;
import com.linkedin.venice.client.store.StoreMetadataFetcher;
import com.linkedin.venice.client.store.transport.TransportClient;
import com.linkedin.venice.client.store.transport.TransportClientResponse;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.utils.SslUtils;
import org.apache.calcite.schema.Table;
Expand All @@ -19,11 +19,14 @@

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static com.linkedin.hoptimator.util.DeploymentService.parseHints;
Expand All @@ -33,27 +36,29 @@ public class ClusterSchema extends AbstractSchema {

protected static final String SSL_FACTORY_CLASS_NAME = "ssl.factory.class.name";
protected static final String DEFAULT_SSL_FACTORY_CLASS_NAME = "com.linkedin.venice.security.DefaultSSLFactory";
protected final Properties properties;
protected static final String ROUTER_URL = "router.url";
protected static final String SSL_CONFIG_PATH = "ssl-config-path";
private static final String STORE_HINT_KEY_PREFIX = "venice.%s.";
private static final String DISCOVER_CLUSTER_PATH = "discover_cluster";
private static final long DISCOVERY_TIMEOUT_SECONDS = 5;

protected final Properties properties;
private final LazyReference<Lookup<Table>> tables = new LazyReference<>();

public ClusterSchema(Properties properties) {
this.properties = properties;
}

protected ControllerClient createControllerClient(String cluster, Optional<SSLFactory> sslFactory) {
String routerUrl = properties.getProperty("router.url");
if (routerUrl.contains("localhost")) {
return new LocalControllerClient(cluster, routerUrl, sslFactory);
} else {
return ControllerClientFactory.getControllerClient(cluster, routerUrl, sslFactory);
}
protected StoreMetadataFetcher createStoreMetadataFetcher() {
return new RouterBasedStoreMetadataFetcher(
new ClientConfig<>()
.setVeniceURL(properties.getProperty(ROUTER_URL)));
}

protected StoreSchemaFetcher createStoreSchemaFetcher(String storeName) {
return ClientFactory.createStoreSchemaFetcher(
ClientConfig.defaultGenericClientConfig(storeName)
.setVeniceURL(properties.getProperty("router.url")));
.setVeniceURL(properties.getProperty(ROUTER_URL)));
}

protected VeniceStore createVeniceStore(String store, StoreSchemaFetcher storeSchemaFetcher) {
Expand All @@ -78,15 +83,14 @@ public Lookup<Table> tables() {
@Override
protected Map<String, Table> loadAll() throws Exception {
Map<String, Table> tableMap = new HashMap<>();
String clusterStr = properties.getProperty("clusters");
String[] clusters = clusterStr.split(",");

for (String cluster : clusters) {
try (ControllerClient controllerClient = createControllerClient(cluster, getSslFactory())) {
String[] stores = controllerClient.queryStoreList(false).getStores();
for (String store : stores) {
StoreSchemaFetcher storeSchemaFetcher = createStoreSchemaFetcher(store);
tableMap.put(store, createVeniceStore(store, storeSchemaFetcher));
try (StoreMetadataFetcher fetcher = createStoreMetadataFetcher()) {
Set<String> storeNames = fetcher.getAllStoreNames();
log.info("Discovered {} Venice stores via router /stores endpoint", storeNames.size());
for (String storeName : storeNames) {
try {
tableMap.put(storeName, createVeniceStore(storeName, createStoreSchemaFetcher(storeName)));
} catch (Exception e) {
log.warn("Skipping Venice store {} due to setup failure", storeName, e);
}
}
}
Expand All @@ -95,47 +99,49 @@ protected Map<String, Table> loadAll() throws Exception {

@Override
protected @Nullable Table load(String name) throws Exception {
String clusterStr = properties.getProperty("clusters");
String[] clusters = clusterStr.split(",");

try (ControllerClient controllerClient = createControllerClient(clusters[0], getSslFactory())) {
ControllerResponse controllerResponse = controllerClient.discoverCluster(name);
if (controllerResponse.isError() && controllerResponse.getErrorType().equals(ErrorType.STORE_NOT_FOUND)) {
return null;
} else if (controllerResponse.isError()) {
throw new RuntimeException(String.format("Error fetching store errorType=%s, error=%s",
controllerResponse.getErrorType(), controllerResponse.getError()));
}

// Venice does not currently have the ability to list all clusters so the "clusters" property
// is required as part of the JDBC driver. To keep loadTable and loadAllTables consistent, we
// check that the fetched store actually belongs to one of these clusters, otherwise we could end up
// with different results.
if (!Arrays.asList(clusters).contains(controllerResponse.getCluster())) {
return null;
}
if (!storeExists(name)) {
return null;
}

StoreSchemaFetcher storeSchemaFetcher = createStoreSchemaFetcher(name);
return createVeniceStore(name, storeSchemaFetcher);
return createVeniceStore(name, createStoreSchemaFetcher(name));
}

@Override
protected String getDescription() {
return "Venice clusters " + properties.getProperty("clusters");
}

private Optional<SSLFactory> getSslFactory() throws IOException {
String sslConfigPath = properties.getProperty("ssl-config-path");
if (sslConfigPath != null) {
log.debug("Using ssl configs at {}", sslConfigPath);
Properties sslProperties = SslUtils.loadSSLConfig(sslConfigPath);
String sslFactoryClassName =
sslProperties.getProperty(SSL_FACTORY_CLASS_NAME, DEFAULT_SSL_FACTORY_CLASS_NAME);
return Optional.of(SslUtils.getSSLFactory(sslProperties, sslFactoryClassName));
}
return Optional.empty();
return "Venice router " + properties.getProperty(ROUTER_URL);
}
});
}

/**
* Targeted per-store existence check against the router's {@code discover_cluster} endpoint
* over HTTP/HTTPS. A {@code null} response indicates HTTP 404 per the transport callback
* contract, which means the store does not exist.
*/
protected boolean storeExists(String storeName) throws IOException {
try (TransportClient transport = ClientFactory.getTransportClient(
new ClientConfig<>()
.setVeniceURL(properties.getProperty(ROUTER_URL))
.setSslFactory(getSslFactory().orElse(null)))) {
TransportClientResponse response = transport.get(DISCOVER_CLUSTER_PATH + "/" + storeName)
.get(DISCOVERY_TIMEOUT_SECONDS, TimeUnit.SECONDS);
return response != null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while discovering Venice store " + storeName, e);
} catch (ExecutionException | TimeoutException e) {
throw new IOException("Failed to discover Venice store " + storeName, e);
}
}

protected Optional<SSLFactory> getSslFactory() throws IOException {
String sslConfigPath = properties.getProperty(SSL_CONFIG_PATH);
if (sslConfigPath != null) {
log.debug("Using ssl configs at {}", sslConfigPath);
Properties sslProperties = SslUtils.loadSSLConfig(sslConfigPath);
String sslFactoryClassName =
sslProperties.getProperty(SSL_FACTORY_CLASS_NAME, DEFAULT_SSL_FACTORY_CLASS_NAME);
return Optional.of(SslUtils.getSSLFactory(sslProperties, sslFactoryClassName));
}
return Optional.empty();
}
}
Loading
Loading