|
19 | 19 | ExecuteResponse,
|
20 | 20 | ParamEscaper,
|
21 | 21 | named_parameters_to_tsparkparams,
|
| 22 | + inject_parameters, |
| 23 | + ParameterApproach, |
22 | 24 | )
|
23 | 25 | from databricks.sql.types import Row
|
24 | 26 | from databricks.sql.auth.auth import get_python_sql_connector_auth_provider
|
25 | 27 | from databricks.sql.experimental.oauth_persistence import OAuthPersistence
|
26 | 28 |
|
| 29 | +from databricks.sql.thrift_api.TCLIService.ttypes import ( |
| 30 | + TSparkParameter, |
| 31 | +) |
| 32 | + |
| 33 | + |
27 | 34 | logger = logging.getLogger(__name__)
|
28 | 35 |
|
29 | 36 | DEFAULT_RESULT_BUFFER_SIZE_BYTES = 104857600
|
30 | 37 | DEFAULT_ARRAY_SIZE = 100000
|
31 | 38 |
|
| 39 | +NO_NATIVE_PARAMS: List = [] |
| 40 | + |
32 | 41 |
|
33 | 42 | class Connection:
|
34 | 43 | def __init__(
|
@@ -65,6 +74,12 @@ def __init__(
|
65 | 74 | :param schema: An optional initial schema to use. Requires DBR version 9.0+
|
66 | 75 |
|
67 | 76 | Other Parameters:
|
| 77 | + use_inline_params: `boolean`, optional (default is True) |
| 78 | + When True, parameterized calls to cursor.execute() will try to render parameter values inline with the |
| 79 | + query text instead of using native bound parameters supported in DBR 14.1 and above. This connector will attempt to |
| 80 | + sanitise parameterized inputs to prevent SQL injection. Before you can switch this to False, you must |
| 81 | + update your queries to use the PEP-249 `named` paramstyle instead of the `pyformat` paramstyle used |
| 82 | + in INLINE mode. |
68 | 83 | auth_type: `str`, optional
|
69 | 84 | `databricks-oauth` : to use oauth with fine-grained permission scopes, set to `databricks-oauth`.
|
70 | 85 | This is currently in private preview for Databricks accounts on AWS.
|
@@ -207,6 +222,9 @@ def read(self) -> Optional[OAuthToken]:
|
207 | 222 | logger.info("Successfully opened session " + str(self.get_session_id_hex()))
|
208 | 223 | self._cursors = [] # type: List[Cursor]
|
209 | 224 |
|
| 225 | + self._suppress_inline_warning = "use_inline_params" in kwargs |
| 226 | + self.use_inline_params = kwargs.get("use_inline_params", True) |
| 227 | + |
210 | 228 | def __enter__(self):
|
211 | 229 | return self
|
212 | 230 |
|
@@ -358,6 +376,100 @@ def __iter__(self):
|
358 | 376 | else:
|
359 | 377 | raise Error("There is no active result set")
|
360 | 378 |
|
| 379 | + def _determine_parameter_approach( |
| 380 | + self, params: Optional[Union[List, Dict[str, Any]]] = None |
| 381 | + ) -> ParameterApproach: |
| 382 | + """Encapsulates the logic for choosing whether to send parameters in native vs inline mode |
| 383 | +
|
| 384 | + If params is None then ParameterApproach.NONE is returned. |
| 385 | + If self.use_inline_params is True then inline mode is used. |
| 386 | + If self.use_inline_params is False, then check if the server supports them and proceed. |
| 387 | + Else raise an exception. |
| 388 | +
|
| 389 | + Returns a ParameterApproach enumeration or raises an exception |
| 390 | +
|
| 391 | + If inline approach is used when the server supports native approach, a warning is logged |
| 392 | + """ |
| 393 | + |
| 394 | + if params is None: |
| 395 | + return ParameterApproach.NONE |
| 396 | + |
| 397 | + server_supports_native_approach = ( |
| 398 | + self.connection.server_parameterized_queries_enabled( |
| 399 | + self.connection.protocol_version |
| 400 | + ) |
| 401 | + ) |
| 402 | + |
| 403 | + if self.connection.use_inline_params: |
| 404 | + if ( |
| 405 | + server_supports_native_approach |
| 406 | + and not self.connection._suppress_inline_warning |
| 407 | + ): |
| 408 | + logger.warning( |
| 409 | + "This query will be executed with inline parameters." |
| 410 | + "Consider using native parameters." |
| 411 | + "Learn more: https://github.com/databricks/databricks-sql-python/tree/main/docs/parameters.md" |
| 412 | + "To suppress this warning, pass use_inline_params=True when creating the connection." |
| 413 | + ) |
| 414 | + return ParameterApproach.INLINE |
| 415 | + |
| 416 | + elif server_supports_native_approach: |
| 417 | + return ParameterApproach.NATIVE |
| 418 | + else: |
| 419 | + raise NotSupportedError( |
| 420 | + "Parameterized operations are not supported by this server. DBR 14.1 is required." |
| 421 | + ) |
| 422 | + |
| 423 | + def _prepare_inline_parameters( |
| 424 | + self, stmt: str, params: Optional[Union[List, Dict[str, Any]]] |
| 425 | + ) -> Tuple[str, List]: |
| 426 | + """Return a statement and list of native parameters to be passed to thrift_backend for execution |
| 427 | +
|
| 428 | + :stmt: |
| 429 | + A string SQL query containing parameter markers of PEP-249 paramstyle `pyformat`. |
| 430 | + For example `%(param)s`. |
| 431 | +
|
| 432 | + :params: |
| 433 | + An iterable of parameter values to be rendered inline. If passed as a Dict, the keys |
| 434 | + must match the names of the markers included in :stmt:. If passed as a List, its length |
| 435 | + must equal the count of parameter markers in :stmt:. |
| 436 | +
|
| 437 | + Returns a tuple of: |
| 438 | + stmt: the passed statement with the param markers replaced by literal rendered values |
| 439 | + params: an empty list representing the native parameters to be passed with this query. |
| 440 | + The list is always empty because native parameters are never used under the inline approach |
| 441 | + """ |
| 442 | + |
| 443 | + escaped_values = self.escaper.escape_args(params) |
| 444 | + rendered_statement = inject_parameters(stmt, escaped_values) |
| 445 | + |
| 446 | + return rendered_statement, NO_NATIVE_PARAMS |
| 447 | + |
| 448 | + def _prepare_native_parameters( |
| 449 | + self, stmt: str, params: Optional[Union[List[Any], Dict[str, Any]]] |
| 450 | + ) -> Tuple[str, List[TSparkParameter]]: |
| 451 | + """Return a statement and a list of native parameters to be passed to thrift_backend for execution |
| 452 | +
|
| 453 | + :stmt: |
| 454 | + A string SQL query containing parameter markers of PEP-249 paramstyle `named`. |
| 455 | + For example `:param`. |
| 456 | +
|
| 457 | + :params: |
| 458 | + An iterable of parameter values to be sent natively. If passed as a Dict, the keys |
| 459 | + must match the names of the markers included in :stmt:. If passed as a List, its length |
| 460 | + must equal the count of parameter markers in :stmt:. In list form, any member of the list |
| 461 | + can be wrapped in a DbsqlParameter class. |
| 462 | +
|
| 463 | + Returns a tuple of: |
| 464 | + stmt: the passed statement` with the param markers replaced by literal rendered values |
| 465 | + params: a list of TSparkParameters that will be passed in native mode |
| 466 | + """ |
| 467 | + |
| 468 | + stmt = stmt |
| 469 | + params = named_parameters_to_tsparkparams(params) # type: ignore |
| 470 | + |
| 471 | + return stmt, params |
| 472 | + |
361 | 473 | def _close_and_clear_active_result_set(self):
|
362 | 474 | try:
|
363 | 475 | if self.active_result_set:
|
@@ -515,40 +627,62 @@ def _handle_staging_remove(self, presigned_url: str, headers: dict = None):
|
515 | 627 | def execute(
|
516 | 628 | self,
|
517 | 629 | operation: str,
|
518 |
| - parameters: Optional[Union[List[Any], Dict[str, str]]] = None, |
| 630 | + parameters: Optional[Union[List[Any], Dict[str, Any]]] = None, |
519 | 631 | ) -> "Cursor":
|
520 | 632 | """
|
521 | 633 | Execute a query and wait for execution to complete.
|
522 |
| - Parameters should be given in extended param format style: %(...)<s|d|f>. |
523 |
| - For example: |
524 |
| - operation = "SELECT * FROM table WHERE field = %(some_value)s" |
525 |
| - parameters = {"some_value": "foo"} |
526 |
| - Will result in the query "SELECT * FROM table WHERE field = 'foo' being sent to the server |
| 634 | +
|
| 635 | + The parameterisation behaviour of this method depends on which parameter approach is used: |
| 636 | + - With INLINE mode (default), parameters are rendered inline with the query text |
| 637 | + - With NATIVE mode, parameters are sent to the server separately for binding |
| 638 | +
|
| 639 | + This behaviour is controlled by the `use_inline_params` argument passed when building a connection. |
| 640 | +
|
| 641 | + The syntax for these approaches is different: |
| 642 | +
|
| 643 | + If the connection was instantiated with use_inline_params=False, then parameters |
| 644 | + should be given in PEP-249 `named` paramstyle like :param_name |
| 645 | +
|
| 646 | + If the connection was instantiated with use_inline_params=True (default), then parameters |
| 647 | + should be given in PEP-249 `pyformat` paramstyle like %(param_name)s |
| 648 | +
|
| 649 | + ```python |
| 650 | + inline_operation = "SELECT * FROM table WHERE field = %(some_value)s" |
| 651 | + native_operation = "SELECT * FROM table WHERE field = :some_value" |
| 652 | + parameters = {"some_value": "foo"} |
| 653 | + ``` |
| 654 | +
|
| 655 | + Both will result in the query equivalent to "SELECT * FROM table WHERE field = 'foo' |
| 656 | + being sent to the server |
| 657 | +
|
527 | 658 | :returns self
|
528 | 659 | """
|
529 |
| - if parameters is None: |
530 |
| - parameters = [] |
531 | 660 |
|
532 |
| - elif not Connection.server_parameterized_queries_enabled( |
533 |
| - self.connection.protocol_version |
534 |
| - ): |
535 |
| - raise NotSupportedError( |
536 |
| - "Parameterized operations are not supported by this server. DBR 14.1 is required." |
| 661 | + param_approach = self._determine_parameter_approach(parameters) |
| 662 | + if param_approach == ParameterApproach.NONE: |
| 663 | + prepared_params = NO_NATIVE_PARAMS |
| 664 | + prepared_operation = operation |
| 665 | + |
| 666 | + elif param_approach == ParameterApproach.INLINE: |
| 667 | + prepared_operation, prepared_params = self._prepare_inline_parameters( |
| 668 | + operation, parameters |
| 669 | + ) |
| 670 | + elif param_approach == ParameterApproach.NATIVE: |
| 671 | + prepared_operation, prepared_params = self._prepare_native_parameters( |
| 672 | + operation, parameters |
537 | 673 | )
|
538 |
| - else: |
539 |
| - parameters = named_parameters_to_tsparkparams(parameters) |
540 | 674 |
|
541 | 675 | self._check_not_closed()
|
542 | 676 | self._close_and_clear_active_result_set()
|
543 | 677 | execute_response = self.thrift_backend.execute_command(
|
544 |
| - operation=operation, |
| 678 | + operation=prepared_operation, |
545 | 679 | session_handle=self.connection._session_handle,
|
546 | 680 | max_rows=self.arraysize,
|
547 | 681 | max_bytes=self.buffer_size_bytes,
|
548 | 682 | lz4_compression=self.connection.lz4_compression,
|
549 | 683 | cursor=self,
|
550 | 684 | use_cloud_fetch=self.connection.use_cloud_fetch,
|
551 |
| - parameters=parameters, |
| 685 | + parameters=prepared_params, |
552 | 686 | )
|
553 | 687 | self.active_result_set = ResultSet(
|
554 | 688 | self.connection,
|
|
0 commit comments