Skip to content

Comments

Rca cluster redirect#31

Open
rajesh-meesho wants to merge 7 commits intomasterfrom
rca-cluster-redirect
Open

Rca cluster redirect#31
rajesh-meesho wants to merge 7 commits intomasterfrom
rca-cluster-redirect

Conversation

@rajesh-meesho
Copy link

@rajesh-meesho rajesh-meesho commented Feb 11, 2026

What is this PR for?

A few sentences describing the overall goals of the pull request's commits.
First time? Check out the contributing guide - https://zeppelin.apache.org/contribution/contributions.html

What type of PR is it?

Bug Fix
Improvement
Feature
Documentation
Hot Fix
Refactoring
Please leave your type of PR only

Todos

  • - Task

What is the Jira issue?

How should this be tested?

  • Strongly recommended: add automated unit tests for any new or changed behavior
  • Outline any manual steps to test the PR here.

Screenshots (if appropriate)

Questions:

  • Does the licenses files need to update?
  • Is there breaking changes for older versions?
  • Does this needs documentation?

Summary by CodeRabbit

Release Notes

  • New Features
    • SQL queries are now pre-validated before database connection for improved reliability
    • Automatic cluster routing based on validation results
    • Query text optimization during validation with updated content applied before execution
    • Enhanced error handling with fallback to originally requested cluster if validation fails

@coderabbitai
Copy link

coderabbitai bot commented Feb 11, 2026

Walkthrough

The changes introduce pre-validation of SQL queries through a remote Validation API before establishing database connections. The JDBCInterpreter now builds validation requests, processes responses for cluster routing, and applies query transformations. ValidationRequest and ValidationResponse classes are enhanced with Gson serialization support and new fields to handle query metadata and target cluster routing.

Changes

Cohort / File(s) Summary
Core Interpreter Logic
jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
Integrated pre-validation workflow that calls a remote Validation API before database connection establishment. Added target cluster routing logic based on validation response, query transformation via newQueryText, and fallback error handling. Moved interpreter name construction to earlier point for reusability.
Validation Models
jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java, jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java
Enhanced both request and response classes to support Gson-based JSON serialization. ValidationRequest now includes rawQueryText field and constructor parameter. ValidationResponse added three new fields (rawQueryText, newQueryText, targetCluster) with accessors and backward-compatible parsing in fromJson.

Sequence Diagram(s)

sequenceDiagram
    participant User as Interpreter
    participant Validator as Validation API
    participant DB as Database
    
    User->>User: Build ValidationRequest<br/>(normalized SQL, user, interpreterName, rawQueryText)
    User->>Validator: POST validation request
    Validator-->>User: ValidationResponse<br/>(targetCluster, newQueryText)
    
    User->>User: Verify & route target cluster<br/>Update dbPrefix if needed
    
    alt Validation Success
        User->>DB: Establish connection<br/>(updated dbPrefix)
        User->>DB: Execute query<br/>(newQueryText or sqlToExecute)
        DB-->>User: Results
    else Validation Failure
        User->>User: Fallback to original cluster
        User->>DB: Establish connection<br/>(original dbPrefix)
        User->>DB: Execute query<br/>(original SQL)
        DB-->>User: Results
    end
Loading

Tip

Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java (3)

921-924: ⚠️ Potential issue | 🔴 Critical

Bug: String comparison using == instead of .equals().

response.getVersion() == "v1" compares object references, not string content. Since getVersion() returns a string parsed from JSON, this will almost never be true, effectively making the entire v1 error-handling block dead code.

🐛 Proposed fix
-              if(response.getVersion() == "v1") {
+              if("v1".equals(response.getVersion())) {

362-374: ⚠️ Potential issue | 🟠 Major

No connect/read timeout on the HTTP connection to the validation API.

If the validation service at FAIL_FAST_VALIDATE_URL is slow or unresponsive, this call will block the SQL execution thread indefinitely. The fallback in the catch block (line 854) only handles exceptions, not hangs.

⏱️ Proposed fix — add timeouts
   private static HttpURLConnection createConnection() throws Exception {
     URL url = new URL(FAIL_FAST_VALIDATE_URL);
     HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+    connection.setConnectTimeout(5000); // 5 seconds
+    connection.setReadTimeout(10000);   // 10 seconds
     connection.setRequestMethod("POST");
     connection.setRequestProperty("Content-Type", "application/json");
     connection.setDoOutput(true);
     return connection;
   }

154-154: 🛠️ Refactor suggestion | 🟠 Major

Hardcoded internal service URL should be configurable.

FAIL_FAST_VALIDATE_URL is hardcoded to an internal hostname (spark-event-listener.prd.meesho.int). This prevents use in non-production environments (staging, dev, local) and requires a code change to update the endpoint.

Consider reading this from the interpreter properties, similar to how other configuration values are handled.

🤖 Fix all issues with AI agents
In `@jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java`:
- Around line 820-825: Remove the debug lines that write internal config to user
output: delete the context.out.write(...) and context.out.flush() calls that
print basePropertiesMap and dbPrefix after sendValidationRequest; if you need
visibility for debugging, log only non-sensitive, sanitized values via a secure
logger (not context.out) or mask credentials when referencing basePropertiesMap
in any logs, and ensure sendValidationRequest remains unchanged.
- Line 983: The current logic applies response.getNewQueryText() inside the loop
over statements produced by sqlSplitter.splitSql(sql), causing the same
rewritten SQL to replace each split fragment; instead, detect if
response.getNewQueryText() is non-null before splitting (or only replace when
sqlSplitter.splitSql(sql) yields a single statement), set sql =
response.getNewQueryText() (or skip per-fragment replacement) and then call
sqlSplitter.splitSql(sql) so each fragment remains correct; update references in
JDBCInterpreter to remove the in-loop assignment sqlToExecute =
response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute
and apply the replacement at the top-level using response.getNewQueryText() or
conditionally when there is exactly one split statement.
- Around line 817-856: The ValidationResponse variable `response` can remain
null if `sendValidationRequest(request)` throws, leading to an NPE later when
code calls methods like `response.isPreSubmitFail()`; modify the catch block
around `sendValidationRequest` to initialize `response` to a safe default (e.g.,
a new ValidationResponse populated with conservative values such as
preSubmitFail=false and empty/neutral fields) so subsequent uses (in code
referencing `response`, `response.getTargetCluster()`,
`response.isPreSubmitFail()`, etc.) never dereference null; alternatively add a
defensive null-check before any later `response` method calls, but the preferred
fix is to assign a well-defined default ValidationResponse inside the existing
catch for `sendValidationRequest`.
🧹 Nitpick comments (4)
jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java (1)

26-29: Consider reusing a static Gson instance.

Gson is thread-safe and immutable. Allocating a new instance on every toJson() call is unnecessary.

♻️ Suggested fix
 public class ValidationRequest {
+    private static final Gson GSON = new Gson();
+
     `@SerializedName`("query_text")
     private String queryText;
     public String toJson() {
-        Gson gson = new Gson();
-        return gson.toJson(this);
+        return GSON.toJson(this);
     }
jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java (1)

91-151: Consider using @SerializedName + Gson.fromJson() to eliminate the manual parsing boilerplate.

ValidationRequest already uses @SerializedName for serialization. Applying the same approach for deserialization in ValidationResponse would replace ~50 lines of repetitive has/get/isJsonNull checks with a single gson.fromJson(jsonResponse, ValidationResponse.class) call and field-level annotations with default values. This is not urgent since the current approach is consistent within this file, but worth considering as a follow-up.

jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java (2)

807-818: Minor cleanup: redundant variable and commented-out code.

  • Line 809: userName duplicates user declared on line 806 (both call getUser(context)).
  • Line 818: Commented-out // String effectiveDbPrefix = dbPrefix; should be removed.
♻️ Suggested cleanup
     String interpreterName = getInterpreterGroup().getId();
-    String userName = getUser(context);
     String sqlToValidate = sql
             .replace("\n", " ")
             .replace("\r", " ")
             .replace("\t", " ");
     
-    ValidationRequest request = new ValidationRequest(sqlToValidate, userName, 
+    ValidationRequest request = new ValidationRequest(sqlToValidate, user, 
                                                                 interpreterName, sql);
     ValidationResponse response = null;
-    // String effectiveDbPrefix = dbPrefix;

385-405: Resource leak: BufferedReader is not closed in a finally block on error paths.

If responseBuilder.append() or any intermediate operation throws, the reader and connection won't be properly cleaned up. Use try-with-resources for the reader.

♻️ Suggested fix
   private static ValidationResponse readResponse(HttpURLConnection connection) throws Exception {
     int statusCode = connection.getResponseCode();
-    BufferedReader reader;
-
-    if (statusCode == HttpURLConnection.HTTP_OK) {
-      reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "utf-8"));
-    } else {
-      reader = new BufferedReader(new InputStreamReader(connection.getErrorStream(), "utf-8"));
-    }
-
-    StringBuilder responseBuilder = new StringBuilder();
-    String line;
-    while ((line = reader.readLine()) != null) {
-      responseBuilder.append(line.trim());
-    }
-
-    reader.close();
-    connection.disconnect();
-
-    return ValidationResponse.fromJson(responseBuilder.toString());
+    try (BufferedReader reader = new BufferedReader(new InputStreamReader(
+            statusCode == HttpURLConnection.HTTP_OK
+                ? connection.getInputStream()
+                : connection.getErrorStream(), "utf-8"))) {
+      StringBuilder responseBuilder = new StringBuilder();
+      String line;
+      while ((line = reader.readLine()) != null) {
+        responseBuilder.append(line.trim());
+      }
+      return ValidationResponse.fromJson(responseBuilder.toString());
+    } finally {
+      connection.disconnect();
+    }
   }

Comment on lines +817 to +856
ValidationResponse response = null;
// String effectiveDbPrefix = dbPrefix;

try {
response = sendValidationRequest(request);

context.out.write("base properties map: " + basePropertiesMap.toString() + "\n");
context.out.write("dbPrefix: " + dbPrefix + "\n");
context.out.flush();

if (response.getTargetCluster() != null &&
!response.getTargetCluster().isEmpty()) {
String targetClusterPrefix = response.getTargetCluster();

if (!targetClusterPrefix.equals(dbPrefix)) {
// Verify the target cluster exists in configuration
if (basePropertiesMap.containsKey(targetClusterPrefix)) {
LOGGER.info("Long-range query detected. Routing from '{}' to '{}' cluster",
dbPrefix, targetClusterPrefix);

try {
context.out.write("%text Long-range query detected. " +
"Automatically routing to optimized cluster: " +
targetClusterPrefix + "\n\n");
context.out.flush();
} catch (IOException e) {
LOGGER.warn("Failed to write redirection notification", e);
}

dbPrefix = targetClusterPrefix;
} else {
LOGGER.warn("Target cluster '{}' not found in configuration. " +
"Using requested cluster '{}'",
targetClusterPrefix, dbPrefix);
}
}
}
} catch (Exception e) {
LOGGER.warn("Failed to call validation API, using requested cluster: {}", dbPrefix, e);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: response is null when validation API call fails, causing NPE at line 922.

If sendValidationRequest throws (line 854 catch block), response remains null (initialized on line 817). Later at line 922, response.isPreSubmitFail() dereferences it, throwing a NullPointerException. While the surrounding catch on line 985 prevents a crash, the user sees a misleading error message and the intended graceful fallback is broken.

🐛 Proposed fix — initialize a safe default response on failure
     } catch (Exception e) {
       LOGGER.warn("Failed to call validation API, using requested cluster: {}", dbPrefix, e);
+      response = new ValidationResponse(); // safe defaults: preSubmitFail=false, newQueryText=null
     }
🤖 Prompt for AI Agents
In `@jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java` around
lines 817 - 856, The ValidationResponse variable `response` can remain null if
`sendValidationRequest(request)` throws, leading to an NPE later when code calls
methods like `response.isPreSubmitFail()`; modify the catch block around
`sendValidationRequest` to initialize `response` to a safe default (e.g., a new
ValidationResponse populated with conservative values such as
preSubmitFail=false and empty/neutral fields) so subsequent uses (in code
referencing `response`, `response.getTargetCluster()`,
`response.isPreSubmitFail()`, etc.) never dereference null; alternatively add a
defensive null-check before any later `response` method calls, but the preferred
fix is to assign a well-defined default ValidationResponse inside the existing
catch for `sendValidationRequest`.

Comment on lines +820 to +825
try {
response = sendValidationRequest(request);

context.out.write("base properties map: " + basePropertiesMap.toString() + "\n");
context.out.write("dbPrefix: " + dbPrefix + "\n");
context.out.flush();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Sensitive internal configuration is being written to user-facing output.

basePropertiesMap.toString() likely includes JDBC URLs, usernames, and potentially passwords. Writing this to context.out exposes it directly to end users. This appears to be debug code that should not be in production.

🔒 Proposed fix — remove debug output
-      context.out.write("base properties map: " + basePropertiesMap.toString() + "\n");
-      context.out.write("dbPrefix: " + dbPrefix + "\n");
-      context.out.flush();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try {
response = sendValidationRequest(request);
context.out.write("base properties map: " + basePropertiesMap.toString() + "\n");
context.out.write("dbPrefix: " + dbPrefix + "\n");
context.out.flush();
try {
response = sendValidationRequest(request);
🤖 Prompt for AI Agents
In `@jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java` around
lines 820 - 825, Remove the debug lines that write internal config to user
output: delete the context.out.write(...) and context.out.flush() calls that
print basePropertiesMap and dbPrefix after sendValidationRequest; if you need
visibility for debugging, log only non-sensitive, sanitized values via a secure
logger (not context.out) or mask credentials when referencing basePropertiesMap
in any logs, and ensure sendValidationRequest remains unchanged.

context.out.write("%text " + message + "\n\n");
context.out.flush();
}
sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

newQueryText replacement applies uniformly to every statement in a multi-statement batch.

sqlSplitter.splitSql(sql) on line 878 may produce multiple statements. Line 983 replaces each sqlToExecute with the same response.getNewQueryText(), discarding the original individual statements. If the validation API returns a rewritten query, it likely corresponds to the full SQL, not each split fragment.

Consider applying the replacement before the split, or only when there's a single statement.

🔧 Proposed fix — apply replacement before splitting
+    // Apply query text replacement from validation before splitting
+    if (response != null && response.getNewQueryText() != null) {
+      sql = response.getNewQueryText();
+    }
+
     try {
       List<String>  sqlArray = sqlSplitter.splitSql(sql);
       for (String sqlToExecute : sqlArray) {

And remove the in-loop replacement at line 983:

-              sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute;
// Apply query text replacement from validation before splitting
if (response != null && response.getNewQueryText() != null) {
sql = response.getNewQueryText();
}
try {
List<String> sqlArray = sqlSplitter.splitSql(sql);
for (String sqlToExecute : sqlArray) {
// ... (loop content - the line below has been removed)
// Previous line removed: sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() : sqlToExecute;
🤖 Prompt for AI Agents
In `@jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java` at line
983, The current logic applies response.getNewQueryText() inside the loop over
statements produced by sqlSplitter.splitSql(sql), causing the same rewritten SQL
to replace each split fragment; instead, detect if response.getNewQueryText() is
non-null before splitting (or only replace when sqlSplitter.splitSql(sql) yields
a single statement), set sql = response.getNewQueryText() (or skip per-fragment
replacement) and then call sqlSplitter.splitSql(sql) so each fragment remains
correct; update references in JDBCInterpreter to remove the in-loop assignment
sqlToExecute = response.getNewQueryText() != null ? response.getNewQueryText() :
sqlToExecute and apply the replacement at the top-level using
response.getNewQueryText() or conditionally when there is exactly one split
statement.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant