Conversation
WalkthroughThe changes introduce pre-validation of SQL queries through a remote Validation API before establishing database connections. The JDBCInterpreter now builds validation requests, processes responses for cluster routing, and applies query transformations. ValidationRequest and ValidationResponse classes are enhanced with Gson serialization support and new fields to handle query metadata and target cluster routing. Changes
Sequence Diagram(s)sequenceDiagram
participant User as Interpreter
participant Validator as Validation API
participant DB as Database
User->>User: Build ValidationRequest<br/>(normalized SQL, user, interpreterName, rawQueryText)
User->>Validator: POST validation request
Validator-->>User: ValidationResponse<br/>(targetCluster, newQueryText)
User->>User: Verify & route target cluster<br/>Update dbPrefix if needed
alt Validation Success
User->>DB: Establish connection<br/>(updated dbPrefix)
User->>DB: Execute query<br/>(newQueryText or sqlToExecute)
DB-->>User: Results
else Validation Failure
User->>User: Fallback to original cluster
User->>DB: Establish connection<br/>(original dbPrefix)
User->>DB: Execute query<br/>(original SQL)
DB-->>User: Results
end
Tip Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java (3)
921-924:⚠️ Potential issue | 🔴 CriticalBug: String comparison using
==instead of.equals().
response.getVersion() == "v1"compares object references, not string content. SincegetVersion()returns a string parsed from JSON, this will almost never betrue, effectively making the entirev1error-handling block dead code.🐛 Proposed fix
- if(response.getVersion() == "v1") { + if("v1".equals(response.getVersion())) {
362-374:⚠️ Potential issue | 🟠 MajorNo connect/read timeout on the HTTP connection to the validation API.
If the validation service at
FAIL_FAST_VALIDATE_URLis slow or unresponsive, this call will block the SQL execution thread indefinitely. The fallback in thecatchblock (line 854) only handles exceptions, not hangs.⏱️ Proposed fix — add timeouts
private static HttpURLConnection createConnection() throws Exception { URL url = new URL(FAIL_FAST_VALIDATE_URL); HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setConnectTimeout(5000); // 5 seconds + connection.setReadTimeout(10000); // 10 seconds connection.setRequestMethod("POST"); connection.setRequestProperty("Content-Type", "application/json"); connection.setDoOutput(true); return connection; }
154-154: 🛠️ Refactor suggestion | 🟠 MajorHardcoded internal service URL should be configurable.
FAIL_FAST_VALIDATE_URLis hardcoded to an internal hostname (spark-event-listener.prd.meesho.int). This prevents use in non-production environments (staging, dev, local) and requires a code change to update the endpoint.Consider reading this from the interpreter properties, similar to how other configuration values are handled.
🤖 Fix all issues with AI agents
In `@jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java`:
- Around line 820-825: Remove the debug lines that write internal config to user
output: delete the context.out.write(...) and context.out.flush() calls that
print basePropertiesMap and dbPrefix after sendValidationRequest; if you need
visibility for debugging, log only non-sensitive, sanitized values via a secure
logger (not context.out) or mask credentials when referencing basePropertiesMap
in any logs, and ensure sendValidationRequest remains unchanged.
- Line 983: The current logic applies response.getNewQueryText() inside the loop
over statements produced by sqlSplitter.splitSql(sql), causing the same
rewritten SQL to replace each split fragment; instead, detect if
response.getNewQueryText() is non-null before splitting (or only replace when
sqlSplitter.splitSql(sql) yields a single statement), set sql =
response.getNewQueryText() (or skip per-fragment replacement) and then call
sqlSplitter.splitSql(sql) so each fragment remains correct; update references in
JDBCInterpreter to remove the in-loop assignment sqlToExecute =
response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute
and apply the replacement at the top-level using response.getNewQueryText() or
conditionally when there is exactly one split statement.
- Around line 817-856: The ValidationResponse variable `response` can remain
null if `sendValidationRequest(request)` throws, leading to an NPE later when
code calls methods like `response.isPreSubmitFail()`; modify the catch block
around `sendValidationRequest` to initialize `response` to a safe default (e.g.,
a new ValidationResponse populated with conservative values such as
preSubmitFail=false and empty/neutral fields) so subsequent uses (in code
referencing `response`, `response.getTargetCluster()`,
`response.isPreSubmitFail()`, etc.) never dereference null; alternatively add a
defensive null-check before any later `response` method calls, but the preferred
fix is to assign a well-defined default ValidationResponse inside the existing
catch for `sendValidationRequest`.
🧹 Nitpick comments (4)
jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java (1)
26-29: Consider reusing a staticGsoninstance.
Gsonis thread-safe and immutable. Allocating a new instance on everytoJson()call is unnecessary.♻️ Suggested fix
public class ValidationRequest { + private static final Gson GSON = new Gson(); + `@SerializedName`("query_text") private String queryText;public String toJson() { - Gson gson = new Gson(); - return gson.toJson(this); + return GSON.toJson(this); }jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java (1)
91-151: Consider using@SerializedName+Gson.fromJson()to eliminate the manual parsing boilerplate.
ValidationRequestalready uses@SerializedNamefor serialization. Applying the same approach for deserialization inValidationResponsewould replace ~50 lines of repetitivehas/get/isJsonNullchecks with a singlegson.fromJson(jsonResponse, ValidationResponse.class)call and field-level annotations with default values. This is not urgent since the current approach is consistent within this file, but worth considering as a follow-up.jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java (2)
807-818: Minor cleanup: redundant variable and commented-out code.
- Line 809:
userNameduplicatesuserdeclared on line 806 (both callgetUser(context)).- Line 818: Commented-out
// String effectiveDbPrefix = dbPrefix;should be removed.♻️ Suggested cleanup
String interpreterName = getInterpreterGroup().getId(); - String userName = getUser(context); String sqlToValidate = sql .replace("\n", " ") .replace("\r", " ") .replace("\t", " "); - ValidationRequest request = new ValidationRequest(sqlToValidate, userName, + ValidationRequest request = new ValidationRequest(sqlToValidate, user, interpreterName, sql); ValidationResponse response = null; - // String effectiveDbPrefix = dbPrefix;
385-405: Resource leak:BufferedReaderis not closed in afinallyblock on error paths.If
responseBuilder.append()or any intermediate operation throws, the reader and connection won't be properly cleaned up. Use try-with-resources for the reader.♻️ Suggested fix
private static ValidationResponse readResponse(HttpURLConnection connection) throws Exception { int statusCode = connection.getResponseCode(); - BufferedReader reader; - - if (statusCode == HttpURLConnection.HTTP_OK) { - reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "utf-8")); - } else { - reader = new BufferedReader(new InputStreamReader(connection.getErrorStream(), "utf-8")); - } - - StringBuilder responseBuilder = new StringBuilder(); - String line; - while ((line = reader.readLine()) != null) { - responseBuilder.append(line.trim()); - } - - reader.close(); - connection.disconnect(); - - return ValidationResponse.fromJson(responseBuilder.toString()); + try (BufferedReader reader = new BufferedReader(new InputStreamReader( + statusCode == HttpURLConnection.HTTP_OK + ? connection.getInputStream() + : connection.getErrorStream(), "utf-8"))) { + StringBuilder responseBuilder = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + responseBuilder.append(line.trim()); + } + return ValidationResponse.fromJson(responseBuilder.toString()); + } finally { + connection.disconnect(); + } }
| ValidationResponse response = null; | ||
| // String effectiveDbPrefix = dbPrefix; | ||
|
|
||
| try { | ||
| response = sendValidationRequest(request); | ||
|
|
||
| context.out.write("base properties map: " + basePropertiesMap.toString() + "\n"); | ||
| context.out.write("dbPrefix: " + dbPrefix + "\n"); | ||
| context.out.flush(); | ||
|
|
||
| if (response.getTargetCluster() != null && | ||
| !response.getTargetCluster().isEmpty()) { | ||
| String targetClusterPrefix = response.getTargetCluster(); | ||
|
|
||
| if (!targetClusterPrefix.equals(dbPrefix)) { | ||
| // Verify the target cluster exists in configuration | ||
| if (basePropertiesMap.containsKey(targetClusterPrefix)) { | ||
| LOGGER.info("Long-range query detected. Routing from '{}' to '{}' cluster", | ||
| dbPrefix, targetClusterPrefix); | ||
|
|
||
| try { | ||
| context.out.write("%text Long-range query detected. " + | ||
| "Automatically routing to optimized cluster: " + | ||
| targetClusterPrefix + "\n\n"); | ||
| context.out.flush(); | ||
| } catch (IOException e) { | ||
| LOGGER.warn("Failed to write redirection notification", e); | ||
| } | ||
|
|
||
| dbPrefix = targetClusterPrefix; | ||
| } else { | ||
| LOGGER.warn("Target cluster '{}' not found in configuration. " + | ||
| "Using requested cluster '{}'", | ||
| targetClusterPrefix, dbPrefix); | ||
| } | ||
| } | ||
| } | ||
| } catch (Exception e) { | ||
| LOGGER.warn("Failed to call validation API, using requested cluster: {}", dbPrefix, e); | ||
| } |
There was a problem hiding this comment.
Critical: response is null when validation API call fails, causing NPE at line 922.
If sendValidationRequest throws (line 854 catch block), response remains null (initialized on line 817). Later at line 922, response.isPreSubmitFail() dereferences it, throwing a NullPointerException. While the surrounding catch on line 985 prevents a crash, the user sees a misleading error message and the intended graceful fallback is broken.
🐛 Proposed fix — initialize a safe default response on failure
} catch (Exception e) {
LOGGER.warn("Failed to call validation API, using requested cluster: {}", dbPrefix, e);
+ response = new ValidationResponse(); // safe defaults: preSubmitFail=false, newQueryText=null
}🤖 Prompt for AI Agents
In `@jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java` around
lines 817 - 856, The ValidationResponse variable `response` can remain null if
`sendValidationRequest(request)` throws, leading to an NPE later when code calls
methods like `response.isPreSubmitFail()`; modify the catch block around
`sendValidationRequest` to initialize `response` to a safe default (e.g., a new
ValidationResponse populated with conservative values such as
preSubmitFail=false and empty/neutral fields) so subsequent uses (in code
referencing `response`, `response.getTargetCluster()`,
`response.isPreSubmitFail()`, etc.) never dereference null; alternatively add a
defensive null-check before any later `response` method calls, but the preferred
fix is to assign a well-defined default ValidationResponse inside the existing
catch for `sendValidationRequest`.
| try { | ||
| response = sendValidationRequest(request); | ||
|
|
||
| context.out.write("base properties map: " + basePropertiesMap.toString() + "\n"); | ||
| context.out.write("dbPrefix: " + dbPrefix + "\n"); | ||
| context.out.flush(); |
There was a problem hiding this comment.
Sensitive internal configuration is being written to user-facing output.
basePropertiesMap.toString() likely includes JDBC URLs, usernames, and potentially passwords. Writing this to context.out exposes it directly to end users. This appears to be debug code that should not be in production.
🔒 Proposed fix — remove debug output
- context.out.write("base properties map: " + basePropertiesMap.toString() + "\n");
- context.out.write("dbPrefix: " + dbPrefix + "\n");
- context.out.flush();📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| try { | |
| response = sendValidationRequest(request); | |
| context.out.write("base properties map: " + basePropertiesMap.toString() + "\n"); | |
| context.out.write("dbPrefix: " + dbPrefix + "\n"); | |
| context.out.flush(); | |
| try { | |
| response = sendValidationRequest(request); | |
🤖 Prompt for AI Agents
In `@jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java` around
lines 820 - 825, Remove the debug lines that write internal config to user
output: delete the context.out.write(...) and context.out.flush() calls that
print basePropertiesMap and dbPrefix after sendValidationRequest; if you need
visibility for debugging, log only non-sensitive, sanitized values via a secure
logger (not context.out) or mask credentials when referencing basePropertiesMap
in any logs, and ensure sendValidationRequest remains unchanged.
| context.out.write("%text " + message + "\n\n"); | ||
| context.out.flush(); | ||
| } | ||
| sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute; |
There was a problem hiding this comment.
newQueryText replacement applies uniformly to every statement in a multi-statement batch.
sqlSplitter.splitSql(sql) on line 878 may produce multiple statements. Line 983 replaces each sqlToExecute with the same response.getNewQueryText(), discarding the original individual statements. If the validation API returns a rewritten query, it likely corresponds to the full SQL, not each split fragment.
Consider applying the replacement before the split, or only when there's a single statement.
🔧 Proposed fix — apply replacement before splitting
+ // Apply query text replacement from validation before splitting
+ if (response != null && response.getNewQueryText() != null) {
+ sql = response.getNewQueryText();
+ }
+
try {
List<String> sqlArray = sqlSplitter.splitSql(sql);
for (String sqlToExecute : sqlArray) {And remove the in-loop replacement at line 983:
- sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute; | |
| // Apply query text replacement from validation before splitting | |
| if (response != null && response.getNewQueryText() != null) { | |
| sql = response.getNewQueryText(); | |
| } | |
| try { | |
| List<String> sqlArray = sqlSplitter.splitSql(sql); | |
| for (String sqlToExecute : sqlArray) { | |
| // ... (loop content - the line below has been removed) | |
| // Previous line removed: sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute; |
🤖 Prompt for AI Agents
In `@jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java` at line
983, The current logic applies response.getNewQueryText() inside the loop over
statements produced by sqlSplitter.splitSql(sql), causing the same rewritten SQL
to replace each split fragment; instead, detect if response.getNewQueryText() is
non-null before splitting (or only replace when sqlSplitter.splitSql(sql) yields
a single statement), set sql = response.getNewQueryText() (or skip per-fragment
replacement) and then call sqlSplitter.splitSql(sql) so each fragment remains
correct; update references in JDBCInterpreter to remove the in-loop assignment
sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() :
sqlToExecute and apply the replacement at the top-level using
response.getNewQueryText() or conditionally when there is exactly one split
statement.
What is this PR for?
A few sentences describing the overall goals of the pull request's commits.
First time? Check out the contributing guide - https://zeppelin.apache.org/contribution/contributions.html
What type of PR is it?
Bug Fix
Improvement
Feature
Documentation
Hot Fix
Refactoring
Please leave your type of PR only
Todos
What is the Jira issue?
How should this be tested?
Screenshots (if appropriate)
Questions:
Summary by CodeRabbit
Release Notes