diff --git a/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksHttpTTransport.java b/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksHttpTTransport.java index e4c239bba..9b52029c7 100644 --- a/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksHttpTTransport.java +++ b/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksHttpTTransport.java @@ -92,7 +92,16 @@ public void write(byte[] buf, int off, int len) { @Override public void flush() throws TTransportException { + long refreshHeadersStartTime = System.currentTimeMillis(); refreshHeadersIfRequired(); + long refreshHeadersEndTime = System.currentTimeMillis(); + long refreshHeadersLatency = refreshHeadersEndTime - refreshHeadersStartTime; + LOGGER.debug( + "Connection [" + + connectionContext.getConnectionUuid() + + "] Header refresh latency: " + + refreshHeadersLatency + + "ms"); HttpPost request = new HttpPost(this.url); DEFAULT_HEADERS.forEach(request::addHeader); @@ -112,7 +121,17 @@ public void flush() throws TTransportException { request.setEntity(new ByteArrayEntity(requestBuffer.toByteArray())); // Execute the request and handle the response + long httpRequestStartTime = System.currentTimeMillis(); try (CloseableHttpResponse response = httpClient.execute(request)) { + long httpRequestEndTime = System.currentTimeMillis(); + long httpRequestLatency = httpRequestEndTime - httpRequestStartTime; + LOGGER.debug( + "Connection [" + + connectionContext.getConnectionUuid() + + "] HTTP request latency: " + + httpRequestLatency + + "ms"); + ValidationUtil.checkHTTPError(response); // Read the response @@ -122,6 +141,15 @@ public void flush() throws TTransportException { responseBuffer = new ByteArrayInputStream(responseBytes); } } catch (DatabricksHttpException | IOException e) { + long httpRequestEndTime = System.currentTimeMillis(); + long httpRequestLatency = httpRequestEndTime - httpRequestStartTime; + LOGGER.debug( + "Connection [" + + connectionContext.getConnectionUuid() + + "] HTTP request latency (with error): " + + httpRequestLatency + + "ms"); + String errorMessage = "Failed to flush data to server: " + e.getMessage(); LOGGER.error(e, errorMessage); throw new TTransportException(TTransportException.UNKNOWN, errorMessage); diff --git a/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java b/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java index 8459b58a5..16fe3ca18 100644 --- a/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java +++ b/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java @@ -55,6 +55,7 @@ final class DatabricksThriftAccessor { private final boolean enableDirectResults; private final int asyncPollIntervalMillis; private final int maxRowsPerBlock; + private final String connectionUuid; private TProtocolVersion serverProtocolVersion = JDBC_THRIFT_VERSION; DatabricksThriftAccessor(IDatabricksConnectionContext connectionContext) @@ -67,6 +68,7 @@ final class DatabricksThriftAccessor { String endPointUrl = connectionContext.getEndpointURL(); this.asyncPollIntervalMillis = connectionContext.getAsyncExecPollInterval(); this.maxRowsPerBlock = connectionContext.getRowsFetchedPerBlock(); + this.connectionUuid = connectionContext.getConnectionUuid(); if (!DriverUtil.isRunningAgainstFake()) { // Create a new thrift client for each thread as client state is not thread safe. Note that @@ -89,34 +91,38 @@ final class DatabricksThriftAccessor { this.enableDirectResults = connectionContext.getDirectResultMode(); this.asyncPollIntervalMillis = connectionContext.getAsyncExecPollInterval(); this.maxRowsPerBlock = connectionContext.getRowsFetchedPerBlock(); + this.connectionUuid = connectionContext.getConnectionUuid(); } @SuppressWarnings("rawtypes") TBase getThriftResponse(TBase request) throws DatabricksSQLException { LOGGER.debug("Fetching thrift response for request {}", request.toString()); + + long thriftRequestStartTime = System.currentTimeMillis(); try { + TBase result; if (request instanceof TOpenSessionReq) { - return getThriftClient().OpenSession((TOpenSessionReq) request); + result = getThriftClient().OpenSession((TOpenSessionReq) request); } else if (request instanceof TCloseSessionReq) { - return getThriftClient().CloseSession((TCloseSessionReq) request); + result = getThriftClient().CloseSession((TCloseSessionReq) request); } else if (request instanceof TGetPrimaryKeysReq) { - return listPrimaryKeys((TGetPrimaryKeysReq) request); + result = listPrimaryKeys((TGetPrimaryKeysReq) request); } else if (request instanceof TGetFunctionsReq) { - return listFunctions((TGetFunctionsReq) request); + result = listFunctions((TGetFunctionsReq) request); } else if (request instanceof TGetSchemasReq) { - return listSchemas((TGetSchemasReq) request); + result = listSchemas((TGetSchemasReq) request); } else if (request instanceof TGetColumnsReq) { - return listColumns((TGetColumnsReq) request); + result = listColumns((TGetColumnsReq) request); } else if (request instanceof TGetCatalogsReq) { - return getCatalogs((TGetCatalogsReq) request); + result = getCatalogs((TGetCatalogsReq) request); } else if (request instanceof TGetTablesReq) { - return getTables((TGetTablesReq) request); + result = getTables((TGetTablesReq) request); } else if (request instanceof TGetTableTypesReq) { - return getTableTypes((TGetTableTypesReq) request); + result = getTableTypes((TGetTableTypesReq) request); } else if (request instanceof TGetTypeInfoReq) { - return getTypeInfo((TGetTypeInfoReq) request); + result = getTypeInfo((TGetTypeInfoReq) request); } else if (request instanceof TGetCrossReferenceReq) { - return listCrossReferences((TGetCrossReferenceReq) request); + result = listCrossReferences((TGetCrossReferenceReq) request); } else { String errorMessage = String.format( @@ -124,7 +130,32 @@ TBase getThriftResponse(TBase request) throws DatabricksSQLException { LOGGER.error(errorMessage); throw new DatabricksSQLFeatureNotSupportedException(errorMessage); } + + // TODO (PECOBLR-389): remove these latency logs once DatabricksMetricsTimedProcessor is ready + long thriftRequestEndTime = System.currentTimeMillis(); + long thriftRequestLatency = thriftRequestEndTime - thriftRequestStartTime; + LOGGER.debug( + "Connection [" + + connectionUuid + + "] Thrift request latency (" + + request.getClass().getSimpleName() + + "): " + + thriftRequestLatency + + "ms"); + + return result; } catch (TException | SQLException e) { + long thriftRequestEndTime = System.currentTimeMillis(); + long thriftRequestLatency = thriftRequestEndTime - thriftRequestStartTime; + LOGGER.debug( + "Connection [" + + connectionUuid + + "] Thrift request latency (" + + request.getClass().getSimpleName() + + ") (with error): " + + thriftRequestLatency + + "ms"); + Throwable cause = e; while (cause != null) { if (cause instanceof HttpException) { @@ -203,21 +234,25 @@ DatabricksResultSet execute( StatementType statementType) throws SQLException { - // Set direct result configuration - if (enableDirectResults) { - // if getDirectResults.maxRows > 0, the server will immediately call FetchResults. Fetch - // initial rows limited by maxRows. - // if = 0, server does not call FetchResults. - TSparkGetDirectResults directResults = - new TSparkGetDirectResults().setMaxBytes(DEFAULT_BYTE_LIMIT).setMaxRows(maxRowsPerBlock); - request.setGetDirectResults(directResults); - } - TExecuteStatementResp response; - TFetchResultsResp resultSet; - int timeoutInSeconds = - (parentStatement == null) ? 0 : parentStatement.getStatement().getQueryTimeout(); + long executeStartTime = System.currentTimeMillis(); try { + // Set direct result configuration + if (enableDirectResults) { + // if getDirectResults.maxRows > 0, the server will immediately call FetchResults. Fetch + // initial rows limited by maxRows. + // if = 0, server does not call FetchResults. + TSparkGetDirectResults directResults = + new TSparkGetDirectResults() + .setMaxBytes(DEFAULT_BYTE_LIMIT) + .setMaxRows(maxRowsPerBlock); + request.setGetDirectResults(directResults); + } + TExecuteStatementResp response; + TFetchResultsResp resultSet; + int timeoutInSeconds = + (parentStatement == null) ? 0 : parentStatement.getStatement().getQueryTimeout(); + response = getThriftClient().ExecuteStatement(request); checkResponseForErrors(response); @@ -239,6 +274,7 @@ DatabricksResultSet execute( TimeoutHandler timeoutHandler = getTimeoutHandler(response, timeoutInSeconds); // Polling until query operation state is finished + long pollingStartTime = System.currentTimeMillis(); TGetOperationStatusReq statusReq = new TGetOperationStatusReq() .setOperationHandle(response.getOperationHandle()) @@ -260,6 +296,19 @@ DatabricksResultSet execute( "Query execution interrupted", e, DatabricksDriverErrorCode.THREAD_INTERRUPTED_ERROR); } } + long pollingEndTime = System.currentTimeMillis(); + long pollingLatency = pollingEndTime - pollingStartTime; + String sessionInfo = session.getSessionId() + " (" + session.getComputeResource() + ")"; + LOGGER.debug( + "Connection [" + + connectionUuid + + "] Statement [" + + statementId + + "] Session [" + + sessionInfo + + "] Thrift polling latency: " + + pollingLatency + + "ms"); if (hasResultDataInDirectResults(response)) { // The first response has result data @@ -268,6 +317,7 @@ DatabricksResultSet execute( resultSet.setResultSetMetadata(response.getDirectResults().getResultSetMetadata()); } else { // Fetch the result data after polling + long fetchStartTime = System.currentTimeMillis(); resultSet = getResultSetResp( response.getStatus(), @@ -275,8 +325,35 @@ DatabricksResultSet execute( response.toString(), maxRowsPerBlock, true); + long fetchEndTime = System.currentTimeMillis(); + long fetchLatency = fetchEndTime - fetchStartTime; + LOGGER.debug( + "Connection [" + + connectionUuid + + "] Statement [" + + statementId + + "] Session [" + + sessionInfo + + "] Thrift fetch latency: " + + fetchLatency + + "ms"); } + long executeEndTime = System.currentTimeMillis(); + long executeLatency = executeEndTime - executeStartTime; + LOGGER.debug( + "Connection [" + + connectionUuid + + "] Statement [" + + statementId + + "] Session [" + + sessionInfo + + "] Thrift execute latency (" + + statementType + + "): " + + executeLatency + + "ms"); + return new DatabricksResultSet( getStatementStatus(statusResp), statementId, @@ -285,6 +362,17 @@ DatabricksResultSet execute( parentStatement, session); } catch (TException e) { + long executeEndTime = System.currentTimeMillis(); + long executeLatency = executeEndTime - executeStartTime; + LOGGER.debug( + "Connection [" + + connectionUuid + + "] Thrift execute latency (" + + statementType + + ") (with error): " + + executeLatency + + "ms"); + String errorMessage = String.format( "Error while receiving response from Thrift server. Request {%s}, Error {%s}", @@ -300,6 +388,8 @@ DatabricksResultSet executeAsync( IDatabricksSession session, StatementType statementType) throws SQLException { + long executeAsyncStartTime = System.currentTimeMillis(); + TExecuteStatementResp response; try { response = getThriftClient().ExecuteStatement(request); @@ -312,6 +402,20 @@ DatabricksResultSet executeAsync( throw new DatabricksSQLException(response.status.errorMessage, response.status.sqlState); } } catch (DatabricksSQLException | TException e) { + long executeAsyncEndTime = System.currentTimeMillis(); + long executeAsyncLatency = executeAsyncEndTime - executeAsyncStartTime; + String sessionInfo = session.getSessionId() + " (" + session.getComputeResource() + ")"; + LOGGER.debug( + "Connection [" + + connectionUuid + + "] Session [" + + sessionInfo + + "] Thrift executeAsync latency (" + + statementType + + ") (with error): " + + executeAsyncLatency + + "ms"); + String errorMessage = String.format( "Error while receiving response from Thrift server. Request {%s}, Error {%s}", @@ -329,6 +433,23 @@ DatabricksResultSet executeAsync( parentStatement.setStatementId(statementId); } StatementStatus statementStatus = getAsyncStatus(response.getStatus()); + + long executeAsyncEndTime = System.currentTimeMillis(); + long executeAsyncLatency = executeAsyncEndTime - executeAsyncStartTime; + String sessionInfo = session.getSessionId() + " (" + session.getComputeResource() + ")"; + LOGGER.debug( + "Connection [" + + connectionUuid + + "] Statement [" + + statementId + + "] Session [" + + sessionInfo + + "] Thrift executeAsync latency (" + + statementType + + "): " + + executeAsyncLatency + + "ms"); + return new DatabricksResultSet( statementStatus, statementId, null, statementType, parentStatement, session); } @@ -339,19 +460,50 @@ DatabricksResultSet getStatementResult( IDatabricksSession session) throws SQLException { LOGGER.debug("Operation handle {}", operationHandle); + + long getStatementResultStartTime = System.currentTimeMillis(); + StatementId statementId = new StatementId(operationHandle.getOperationId()); + String sessionInfo = session.getSessionId() + " (" + session.getComputeResource() + ")"; + TGetOperationStatusReq request = new TGetOperationStatusReq() .setOperationHandle(operationHandle) .setGetProgressUpdate(false); TGetOperationStatusResp response; TFetchResultsResp resultSet = null; - StatementId statementId = new StatementId(operationHandle.getOperationId()); try { response = getThriftClient().GetOperationStatus(request); TOperationState operationState = response.getOperationState(); if (operationState == TOperationState.FINISHED_STATE) { + long fetchStartTime = System.currentTimeMillis(); resultSet = getResultSetResp(response.getStatus(), operationHandle, response.toString(), -1, true); + long fetchEndTime = System.currentTimeMillis(); + long fetchLatency = fetchEndTime - fetchStartTime; + LOGGER.debug( + "Connection [" + + connectionUuid + + "] Statement [" + + statementId + + "] Session [" + + sessionInfo + + "] Thrift getStatementResult fetch latency: " + + fetchLatency + + "ms"); + + long getStatementResultEndTime = System.currentTimeMillis(); + long getStatementResultLatency = getStatementResultEndTime - getStatementResultStartTime; + LOGGER.debug( + "Connection [" + + connectionUuid + + "] Statement [" + + statementId + + "] Session [" + + sessionInfo + + "] Thrift getStatementResult latency: " + + getStatementResultLatency + + "ms"); + return new DatabricksResultSet( new StatementStatus().setState(StatementState.SUCCEEDED), statementId, @@ -361,6 +513,19 @@ DatabricksResultSet getStatementResult( session); } } catch (TException e) { + long getStatementResultEndTime = System.currentTimeMillis(); + long getStatementResultLatency = getStatementResultEndTime - getStatementResultStartTime; + LOGGER.debug( + "Connection [" + + connectionUuid + + "] Statement [" + + statementId + + "] Session [" + + sessionInfo + + "] Thrift getStatementResult latency (with error): " + + getStatementResultLatency + + "ms"); + String errorMessage = String.format( "Error while receiving response from Thrift server. Request {%s}, Error {%s}", @@ -369,6 +534,20 @@ DatabricksResultSet getStatementResult( throw new DatabricksHttpException(errorMessage, e, DatabricksDriverErrorCode.INVALID_STATE); } StatementStatus executionStatus = getStatementStatus(response); + + long getStatementResultEndTime = System.currentTimeMillis(); + long getStatementResultLatency = getStatementResultEndTime - getStatementResultStartTime; + LOGGER.debug( + "Connection [" + + connectionUuid + + "] Statement [" + + statementId + + "] Session [" + + sessionInfo + + "] Thrift getStatementResult latency: " + + getStatementResultLatency + + "ms"); + return new DatabricksResultSet( executionStatus, statementId, resultSet, StatementType.SQL, parentStatement, session); }