diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java index 6947b0b4b88b..138fb8056b61 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -42,6 +42,7 @@ import org.apache.hive.service.cli.operation.ClassicTableTypeMapping.ClassicTableTypes; import org.apache.hive.service.cli.operation.HiveTableTypeMapping; import org.apache.hive.service.cli.operation.TableTypeMappingFactory.TableTypeMappings; +import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -128,9 +129,43 @@ public class TestJdbcDriver2 { private static Connection con; private static final float floatCompareDelta = 0.0001f; + /** + * Required prefix of {@link SQLTimeoutException#getMessage()} for a 1s limit. HS2 may append + * {@code ; Query ID: ...} after the base text from {@code HiveSQLException}. + */ + private static final String QUERY_TIMED_OUT_AFTER_1_SECONDS = "Query timed out after 1 seconds"; + @Rule public ExpectedException thrown = ExpectedException.none(); @Rule public final TestName testName = new TestName(); + /** + * {@code SET hive.query.timeout.seconds} applies to the whole HS2 session. Tests such as + * {@link #testQueryTimeoutFromSetStatement()} must not leave a short limit on the shared + * {@link #con}, or unrelated tests will see {@link SQLTimeoutException}. + */ + @After + public void resetHiveSessionQueryTimeout() { + try { + if (con == null || con.isClosed()) { + return; + } + try (Statement st = con.createStatement()) { + st.execute("set hive.query.timeout.seconds=0s"); + } + } catch (SQLException e) { + LOG.warn("Could not reset hive.query.timeout.seconds after {}", testName.getMethodName(), e); + } + } + + private static void assertTimeoutMessageShowsOneSecond(String context, SQLTimeoutException e) { + String msg = e.getMessage(); + assertNotNull(context + ": message should not be null", msg); + assertTrue( + context + ": should start with " + QUERY_TIMED_OUT_AFTER_1_SECONDS + + " (HS2 may append ; Query ID: ...); actual=" + msg, + msg.startsWith(QUERY_TIMED_OUT_AFTER_1_SECONDS)); + } + private static Connection getConnection(String prefix, String postfix) throws SQLException { Connection con1; String connString = "jdbc:hive2:///" + prefix + "?" + conf.getOverlayOptionsAsQueryString() @@ -336,11 +371,46 @@ private void checkBadUrl(String url) throws SQLException { * @throws SQLException */ public void testURLWithFetchSize() throws SQLException { - Connection con = getConnection(testDbName + ";fetchSize=1234", ""); - Statement stmt = con.createStatement(); + Connection connectionWithFetchSize = getConnection(testDbName + ";fetchSize=1234", ""); + Statement stmt = connectionWithFetchSize.createStatement(); assertEquals(stmt.getFetchSize(), 1234); stmt.close(); - con.close(); + connectionWithFetchSize.close(); + } + + /** + * Same idea as {@link #testURLWithFetchSize}: drive session behavior from the JDBC URL instead of + * only {@link Statement#setQueryTimeout(int)} or an explicit {@code SET}. The timeout is supplied + * in the URL query ({@code ?hive_conf_list}) per the driver format + * {@code jdbc:hive2://.../db;sess?hive_conf#hive_var}. + *

+ * {@link SQLTimeoutException#getMessage()} must reflect the configured limit (1s), + * not {@code after 0 seconds}. + */ + @Test + public void testURLWithHiveQueryTimeoutSeconds() throws Exception { + String udfName = SleepMsUDF.class.getName(); + // Postfix appends to the query string after test overlay / lock manager settings. + Connection connectionWithUrlQueryTimeout = getConnection(testDbName, "hive.query.timeout.seconds=1"); + try { + Statement stmt1 = connectionWithUrlQueryTimeout.createStatement(); + stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'"); + stmt1.close(); + Statement stmt = connectionWithUrlQueryTimeout.createStatement(); + try { + stmt.executeQuery("select sleepMsUDF(t1.under_col, 5) as u0, t1.under_col as u1, " + + "t2.under_col as u2 from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col = t2.under_col"); + fail("Expecting SQLTimeoutException"); + } catch (SQLTimeoutException e) { + assertTimeoutMessageShowsOneSecond("JDBC URL hive.query.timeout.seconds=1 (query string)", e); + } catch (SQLException e) { + fail("Expecting SQLTimeoutException, but got SQLException: " + e); + } + stmt.close(); + } finally { + connectionWithUrlQueryTimeout.close(); + } } @Test @@ -349,13 +419,13 @@ public void testURLWithFetchSize() throws SQLException { * @throws SQLException */ public void testCreateTableAsExternal() throws SQLException { - Connection con = getConnection(testDbName + ";hiveCreateAsExternalLegacy=true", ""); - Statement stmt = con.createStatement(); + Connection connectionWithExternalLegacy = getConnection(testDbName + ";hiveCreateAsExternalLegacy=true", ""); + Statement stmt = connectionWithExternalLegacy.createStatement(); ResultSet res = stmt.executeQuery("set hive.create.as.external.legacy"); assertTrue("ResultSet is empty", res.next()); assertEquals("hive.create.as.external.legacy=true", res.getObject(1)); stmt.close(); - con.close(); + connectionWithExternalLegacy.close(); } @Test @@ -2661,7 +2731,8 @@ public void testQueryTimeout() throws Exception { + " t2 on t1.under_col = t2.under_col"); fail("Expecting SQLTimeoutException"); } catch (SQLTimeoutException e) { - assertNotNull(e); + assertTimeoutMessageShowsOneSecond( + "JDBC query timeout (1s)", e); System.err.println(e.toString()); } catch (SQLException e) { fail("Expecting SQLTimeoutException, but got SQLException: " + e); @@ -2680,6 +2751,83 @@ public void testQueryTimeout() throws Exception { stmt.close(); } + /** + * When the session timeout is configured via a {@code SET hive.query.timeout.seconds=...} + * statement and no {@link Statement#setQueryTimeout(int)} is called, the + * {@link SQLTimeoutException#getMessage()} must still reflect the real limit. + * Message must begin with {@link #QUERY_TIMED_OUT_AFTER_1_SECONDS}; + * HS2 may append {@code ; Query ID: ...}. + */ + @Test + public void testQueryTimeoutFromSetStatement() throws Exception { + String udfName = SleepMsUDF.class.getName(); + Statement stmt1 = con.createStatement(); + stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'"); + stmt1.close(); + + Statement stmt = con.createStatement(); + stmt.execute("set hive.query.timeout.seconds=1s"); + + try { + stmt.executeQuery("select sleepMsUDF(t1.under_col, 5) as u0, t1.under_col as u1, " + + "t2.under_col as u2 from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col = t2.under_col"); + fail("Expecting SQLTimeoutException"); + } catch (SQLTimeoutException e) { + assertTimeoutMessageShowsOneSecond( + "Session query timeout (1s)", e); + } catch (SQLException e) { + fail("Expecting SQLTimeoutException, but got SQLException: " + e); + } + stmt.close(); + } + + /** + * Variant of {@link #testQueryTimeoutFromSetStatement}: the {@code SET} is issued on a separate, + * already-closed statement; the timed-out query runs on a brand-new statement with no + * {@link Statement#setQueryTimeout(int)} call. The tracked session timeout lives on the + * {@link HiveConnection}, so it persists across statement instances and must still drive the + * {@link SQLTimeoutException} message correctly. + */ + @Test + public void testQueryTimeoutMessagePersistedAcrossStatements() throws Exception { + String udfName = SleepMsUDF.class.getName(); + Statement stmt1 = con.createStatement(); + stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'"); + stmt1.close(); + + // SET is issued on stmt2, which is then closed – timeout must survive on the connection + Statement stmt2 = con.createStatement(); + stmt2.execute("set hive.query.timeout.seconds=1s"); + stmt2.close(); + + // Brand-new statement, no setQueryTimeout() call – relies solely on the tracked session value + Statement stmt = con.createStatement(); + System.err.println("Executing query (expecting timeout): "); + try { + stmt.executeQuery("select sleepMsUDF(t1.under_col, 5) as u0, t1.under_col as u1, " + + "t2.under_col as u2 from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col = t2.under_col"); + fail("Expecting SQLTimeoutException"); + } catch (SQLTimeoutException e) { + assertTimeoutMessageShowsOneSecond("SET on closed stmt2, executeQuery on new stmt", e); + System.err.println(e.toString()); + } catch (SQLException e) { + fail("Expecting SQLTimeoutException, but got SQLException: " + e); + e.printStackTrace(); + } + + // A fast query must still complete when the per-statement timeout overrides + stmt.setQueryTimeout(5); + try { + stmt.executeQuery("show tables"); + } catch (SQLException e) { + fail("Unexpected SQLException: " + e); + e.printStackTrace(); + } + stmt.close(); + } + /** * Test the non-null value of the Yarn ATS GUID. * We spawn 2 threads - one running the query and diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index 8f7c3ea8acd4..3db9f29ee67a 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -69,6 +69,7 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Stream; @@ -82,6 +83,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.common.auth.HiveAuthUtils; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -155,6 +157,7 @@ import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; /** @@ -163,6 +166,14 @@ */ public class HiveConnection implements java.sql.Connection { private static final Logger LOG = LoggerFactory.getLogger(HiveConnection.class); + + /** + * Last effective {@code hive.query.timeout.seconds} in seconds, or {@code -1} if not yet set. + * Seeded from the JDBC URL at connect time; a JDBC {@link java.sql.Connection} may be shared + * across threads with concurrent {@link org.apache.hive.jdbc.HiveStatement}s on one HS2 session, + * so this field uses an {@link AtomicLong} to keep updates well-defined. + */ + private final AtomicLong sessionQueryTimeoutSeconds = new AtomicLong(-1L); private String jdbcUriString; private String host; private int port; @@ -190,6 +201,46 @@ public class HiveConnection implements java.sql.Connection { public TCLIService.Iface getClient() { return client; } + /** + * Updates the tracked {@code hive.query.timeout.seconds} value (in seconds) on this connection. + * Called at connect time from the JDBC URL hive-conf map, and may be called again later if needed. + */ + void setSessionQueryTimeoutSeconds(long seconds) { + sessionQueryTimeoutSeconds.set(seconds); + } + + /** + * If the JDBC URL supplied {@code hive.query.timeout.seconds} (via the {@code ?hive_conf_list} + * segment), parses and stores the value so that {@link #getSessionQueryTimeoutSeconds()} can + * return it for timeout error messages. This runs once at connect time and does not affect the + * server-side configuration, which is applied separately in {@link #openSession()}. + */ + private void applySessionQueryTimeoutFromJdbcUrl() { + Map hiveConfs = connParams.getHiveConfs(); + if (hiveConfs == null || hiveConfs.isEmpty()) { + return; + } + String raw = hiveConfs.get(ConfVars.HIVE_QUERY_TIMEOUT_SECONDS.varname); + if (StringUtils.isBlank(raw)) { + return; + } + try { + long sec = HiveConf.toTime(raw.trim(), TimeUnit.SECONDS, TimeUnit.SECONDS); + if (sec > 0) { + setSessionQueryTimeoutSeconds(sec); + } + } catch (Exception e) { + LOG.debug("Could not parse {} from JDBC URL: {}", ConfVars.HIVE_QUERY_TIMEOUT_SECONDS.varname, raw, e); + } + } + + /** + * @return the tracked {@code hive.query.timeout.seconds} in seconds, or {@code -1} if not set + */ + long getSessionQueryTimeoutSeconds() { + return sessionQueryTimeoutSeconds.get(); + } + /** * Get all direct HiveServer2 URLs from a ZooKeeper based HiveServer2 URL * @param zookeeperBasedHS2Url @@ -332,6 +383,7 @@ protected HiveConnection(String uri, Properties info, // hive_conf_list -> hiveConfMap // hive_var_list -> hiveVarMap sessConfMap = connParams.getSessionVars(); + applySessionQueryTimeoutFromJdbcUrl(); setupLoginTimeout(); if (isKerberosAuthMode()) { // Ensure UserGroupInformation includes any authorized Kerberos principals. @@ -396,18 +448,7 @@ protected HiveConnection(String uri, Properties info, executeInitSql(); } } else { - long retryInterval = 1000L; - try { - String strRetries = sessConfMap.get(JdbcConnectionParams.RETRIES); - if (StringUtils.isNotBlank(strRetries)) { - maxRetries = Integer.parseInt(strRetries); - } - String strRetryInterval = sessConfMap.get(JdbcConnectionParams.RETRY_INTERVAL); - if(StringUtils.isNotBlank(strRetryInterval)){ - retryInterval = Long.parseLong(strRetryInterval); - } - } catch(NumberFormatException e) { // Ignore the exception - } + long retryInterval = readRetryIntervalMillis(); for (int numRetries = 0;;) { try { @@ -469,6 +510,27 @@ protected HiveConnection(String uri, Properties info, client = newSynchronizedClient(client); } + /** + * Reads {@code retries} and {@code retryInterval} from the session configuration, updating + * {@link #maxRetries} as a side-effect, and returns the interval in milliseconds (default 1000). + * Extracted from the constructor to keep its length within Sonar's 150-line limit. + */ + private long readRetryIntervalMillis() { + long retryInterval = 1000L; + try { + String strRetries = sessConfMap.get(JdbcConnectionParams.RETRIES); + if (StringUtils.isNotBlank(strRetries)) { + maxRetries = Integer.parseInt(strRetries); + } + String strRetryInterval = sessConfMap.get(JdbcConnectionParams.RETRY_INTERVAL); + if (StringUtils.isNotBlank(strRetryInterval)) { + retryInterval = Long.parseLong(strRetryInterval); + } + } catch (NumberFormatException e) { // Ignore — bad values are silently skipped + } + return retryInterval; + } + private void executeInitSql() throws SQLException { if (initFile != null) { try (Statement st = createStatement()) { diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index aba982670acb..0a831e75cfbb 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -398,20 +398,93 @@ private TGetOperationStatusResp waitForResultSetStatus() throws SQLException { return statusResp; } + /** + * Returns the timeout message for a {@code TIMEDOUT_STATE} response. + * Uses the server error message when the SQL state is {@code HYT00} ("timeout expired"), + * which indicates that the server set a precise message. Otherwise falls back to a + * locally derived message from {@link #setQueryTimeout(int)} or the URL-seeded + * {@code hive.query.timeout.seconds} value on the connection. + */ + private String sqlTimeoutMessageForTimedOutState(String serverMessage, String sqlState) { + if ("HYT00".equals(sqlState) && StringUtils.isNotBlank(serverMessage)) { + return serverMessage; + } + long effectiveSec = resolveEffectiveTimeoutSecondsForMessage(); + if (effectiveSec > 0) { + return "Query timed out after " + effectiveSec + " seconds"; + } + return "Query timed out"; + } + + private long resolveEffectiveTimeoutSecondsForMessage() { + if (queryTimeout > 0) { + return queryTimeout; + } + long tracked = connection.getSessionQueryTimeoutSeconds(); + if (tracked > 0) { + return tracked; + } + return 0L; + } + + private SQLException sqlExceptionForCanceledState(TGetOperationStatusResp statusResp) { + final String errMsg = statusResp.getErrorMessage(); + final String fullErrMsg; + if (errMsg == null || errMsg.isEmpty()) { + fullErrMsg = QUERY_CANCELLED_MESSAGE; + } else { + fullErrMsg = QUERY_CANCELLED_MESSAGE + " " + errMsg; + } + return new SQLException(fullErrMsg, "01000"); // SQLSTATE 01000 = warning + } + + /** + * Handles one {@code GetOperationStatus} response: applies a progress update if in-place updates + * are enabled, verifies the Thrift status, and dispatches on the operation state. + */ + private void processOperationStatusResponse(TGetOperationStatusResp statusResp) throws SQLException { + if (!isOperationComplete && inPlaceUpdateStream.isPresent()) { + inPlaceUpdateStream.get().update(statusResp.getProgressUpdateResponse()); + } + Utils.verifySuccessWithInfo(statusResp.getStatus()); + if (!statusResp.isSetOperationState()) { + return; + } + switch (statusResp.getOperationState()) { + case CLOSED_STATE: + case FINISHED_STATE: + isOperationComplete = true; + isLogBeingGenerated = false; + break; + case CANCELED_STATE: + throw sqlExceptionForCanceledState(statusResp); + case TIMEDOUT_STATE: + throw new SQLTimeoutException( + sqlTimeoutMessageForTimedOutState(statusResp.getErrorMessage(), statusResp.getSqlState())); + case ERROR_STATE: + throw new SQLException(statusResp.getErrorMessage(), statusResp.getSqlState(), statusResp.getErrorCode()); + case UKNOWN_STATE: + throw new SQLException("Unknown query", "HY000"); + case INITIALIZED_STATE: + case PENDING_STATE: + case RUNNING_STATE: + break; + } + } + TGetOperationStatusResp waitForOperationToComplete() throws SQLException { TGetOperationStatusResp statusResp = null; final TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle.get()); - statusReq.setGetProgressUpdate(inPlaceUpdateStream.isPresent()); + boolean progressUpdates = inPlaceUpdateStream.isPresent(); + statusReq.setGetProgressUpdate(progressUpdates); - // Progress bar is completed if there is nothing to request - if (inPlaceUpdateStream.isPresent()) { + if (progressUpdates) { inPlaceUpdateStream.get().getEventNotifier().progressBarCompleted(); } LOG.debug("Waiting on operation to complete: Polling operation status"); - // Poll on the operation status, till the operation is complete do { try { if (Thread.currentThread().isInterrupted()) { @@ -424,37 +497,7 @@ TGetOperationStatusResp waitForOperationToComplete() throws SQLException { */ statusResp = client.GetOperationStatus(statusReq); LOG.debug("Status response: {}", statusResp); - if (!isOperationComplete && inPlaceUpdateStream.isPresent()) { - inPlaceUpdateStream.get().update(statusResp.getProgressUpdateResponse()); - } - Utils.verifySuccessWithInfo(statusResp.getStatus()); - if (statusResp.isSetOperationState()) { - switch (statusResp.getOperationState()) { - case CLOSED_STATE: - case FINISHED_STATE: - isOperationComplete = true; - isLogBeingGenerated = false; - break; - case CANCELED_STATE: - // 01000 -> warning - final String errMsg = statusResp.getErrorMessage(); - final String fullErrMsg = - (errMsg == null || errMsg.isEmpty()) ? QUERY_CANCELLED_MESSAGE : QUERY_CANCELLED_MESSAGE + " " + errMsg; - throw new SQLException(fullErrMsg, "01000"); - case TIMEDOUT_STATE: - throw new SQLTimeoutException("Query timed out after " + queryTimeout + " seconds"); - case ERROR_STATE: - // Get the error details from the underlying exception - throw new SQLException(statusResp.getErrorMessage(), statusResp.getSqlState(), - statusResp.getErrorCode()); - case UKNOWN_STATE: - throw new SQLException("Unknown query", "HY000"); - case INITIALIZED_STATE: - case PENDING_STATE: - case RUNNING_STATE: - break; - } - } + processOperationStatusResponse(statusResp); } catch (SQLException e) { isLogBeingGenerated = false; throw e; @@ -464,8 +507,7 @@ TGetOperationStatusResp waitForOperationToComplete() throws SQLException { } } while (!isOperationComplete); - // set progress bar to be completed when hive query execution has completed - if (inPlaceUpdateStream.isPresent()) { + if (progressUpdates) { inPlaceUpdateStream.get().getEventNotifier().progressBarCompleted(); } return statusResp; diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 5f781a3bbb66..73e4637e48b2 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -179,6 +179,11 @@ private void prepare(QueryState queryState) throws HiveSQLException { try { final String queryId = queryState.getQueryId(); log.info("Query timed out after: {} seconds. Cancelling the execution now: {}", queryTimeout, queryId); + setOperationException(new HiveSQLException( + "Query timed out after " + queryTimeout + " seconds", + "HYT00", + 0, + queryId)); SQLOperation.this.cancel(OperationState.TIMEDOUT); } catch (HiveSQLException e) { log.error("Error cancelling the query after timeout: {} seconds", queryTimeout, e); @@ -334,7 +339,9 @@ public Object run() throws HiveSQLException { runQuery(); } catch (HiveSQLException e) { // TODO: why do we invent our own error path op top of the one from Future.get? - setOperationException(e); + if (getState() != OperationState.TIMEDOUT) { + setOperationException(e); + } log.error("Error running hive query", e); } finally { if (!embedded) { @@ -353,7 +360,9 @@ public Object run() throws HiveSQLException { try { currentUGI.doAs(doAsAction); } catch (Exception e) { - setOperationException(new HiveSQLException(e)); + if (getState() != OperationState.TIMEDOUT) { + setOperationException(new HiveSQLException(e)); + } log.error("Error running hive query as user : {}", currentUGI.getShortUserName(), e); } finally { /** diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java index fbece9c199a7..fa9d5e2e9dd6 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java @@ -37,6 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -56,6 +57,9 @@ public class PartitionManagementTask implements MetastoreTaskThread { public static final String DISCOVER_PARTITIONS_TBLPROPERTY = "discover.partitions"; public static final String PARTITION_RETENTION_PERIOD_TBLPROPERTY = "partition.retention.period"; private static final Lock lock = new ReentrantLock(); + // these are just for testing + private static int completedAttempts; + private static int skippedAttempts; private Configuration conf; @@ -83,6 +87,7 @@ private static boolean partitionDiscoveryEnabled(Map params) { @Override public void run() { if (lock.tryLock()) { + skippedAttempts = 0; String qualifiedTableName = null; IMetaStoreClient msc = null; try { @@ -131,8 +136,10 @@ public void run() { } lock.unlock(); } + completedAttempts++; } else { - LOG.info("Lock is held by some other partition discovery task. Skipping this attempt."); + skippedAttempts++; + LOG.info("Lock is held by some other partition discovery task. Skipping this attempt..#{}", skippedAttempts); } } @@ -193,4 +200,13 @@ public void run() { } } + @VisibleForTesting + public static int getSkippedAttempts() { + return skippedAttempts; + } + + @VisibleForTesting + public static int getCompletedAttempts() { + return completedAttempts; + } }