From d190cba6ef26b1d632b3003abe6b8877eee98fd7 Mon Sep 17 00:00:00 2001 From: Anthonios Partheniou Date: Mon, 31 Mar 2025 09:37:26 +0000 Subject: [PATCH 1/2] feat: add client debug logging support for unary-stream gRPC calls --- .../services/%service/transports/grpc.py.j2 | 7 +++- .../%service/transports/grpc_asyncio.py.j2 | 8 ++++- .../services/asset_service/transports/grpc.py | 34 +++++++++++++++++- .../asset_service/transports/grpc_asyncio.py | 35 ++++++++++++++++++- .../iam_credentials/transports/grpc.py | 34 +++++++++++++++++- .../transports/grpc_asyncio.py | 35 ++++++++++++++++++- .../services/eventarc/transports/grpc.py | 34 +++++++++++++++++- .../eventarc/transports/grpc_asyncio.py | 35 ++++++++++++++++++- .../config_service_v2/transports/grpc.py | 34 +++++++++++++++++- .../transports/grpc_asyncio.py | 35 ++++++++++++++++++- .../logging_service_v2/transports/grpc.py | 34 +++++++++++++++++- .../transports/grpc_asyncio.py | 35 ++++++++++++++++++- .../metrics_service_v2/transports/grpc.py | 34 +++++++++++++++++- .../transports/grpc_asyncio.py | 35 ++++++++++++++++++- .../config_service_v2/transports/grpc.py | 34 +++++++++++++++++- .../transports/grpc_asyncio.py | 35 ++++++++++++++++++- .../logging_service_v2/transports/grpc.py | 34 +++++++++++++++++- .../transports/grpc_asyncio.py | 35 ++++++++++++++++++- .../metrics_service_v2/transports/grpc.py | 34 +++++++++++++++++- .../transports/grpc_asyncio.py | 35 ++++++++++++++++++- .../services/cloud_redis/transports/grpc.py | 34 +++++++++++++++++- .../cloud_redis/transports/grpc_asyncio.py | 35 ++++++++++++++++++- .../services/cloud_redis/transports/grpc.py | 34 +++++++++++++++++- .../cloud_redis/transports/grpc_asyncio.py | 35 ++++++++++++++++++- 24 files changed, 750 insertions(+), 24 deletions(-) diff --git a/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc.py.j2 b/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc.py.j2 index 7158a4c430..f94c8bdf10 100644 --- a/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc.py.j2 +++ b/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc.py.j2 @@ -59,7 +59,12 @@ except ImportError: # pragma: NO COVER _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + def intercept_unary_stream(self, continuation, client_call_details, request): +{{ shared_macros.unary_request_interceptor_common(service) }} + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): {{ shared_macros.unary_request_interceptor_common(service) }} response = continuation(client_call_details, request) diff --git a/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc_asyncio.py.j2 b/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc_asyncio.py.j2 index 64aeb0abf6..d80987f46c 100644 --- a/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc_asyncio.py.j2 +++ b/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc_asyncio.py.j2 @@ -64,7 +64,12 @@ except ImportError: # pragma: NO COVER _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + async def intercept_unary_stream(self, continuation, client_call_details, request): +{{ shared_macros.unary_request_interceptor_common(service) }} + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): {{ shared_macros.unary_request_interceptor_common(service) }} response = await continuation(client_call_details, request) @@ -296,6 +301,7 @@ class {{ service.grpc_asyncio_transport_name }}({{ service.name }}Transport): self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists diff --git a/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc.py b/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc.py index 2f1d30c418..26255bf6fa 100755 --- a/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc.py +++ b/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc.py @@ -45,7 +45,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.cloud.asset.v1.AssetService", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER diff --git a/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc_asyncio.py b/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc_asyncio.py index 96833f741b..45c2c411c0 100755 --- a/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc_asyncio.py +++ b/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc_asyncio.py @@ -49,7 +49,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + async def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.cloud.asset.v1.AssetService", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER @@ -302,6 +334,7 @@ def __init__(self, *, self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists diff --git a/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc.py b/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc.py index 4472c18352..3e380dd524 100755 --- a/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc.py +++ b/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc.py @@ -42,7 +42,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.iam.credentials.v1.IAMCredentials", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER diff --git a/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc_asyncio.py b/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc_asyncio.py index c97b00af64..26738407c9 100755 --- a/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc_asyncio.py +++ b/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc_asyncio.py @@ -46,7 +46,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + async def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.iam.credentials.v1.IAMCredentials", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER @@ -307,6 +339,7 @@ def __init__(self, *, self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists diff --git a/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc.py b/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc.py index 5ec53389bb..951c4c2094 100755 --- a/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc.py +++ b/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc.py @@ -53,7 +53,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.cloud.eventarc.v1.Eventarc", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER diff --git a/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc_asyncio.py b/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc_asyncio.py index fdbfeab462..aaf558a1a0 100755 --- a/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc_asyncio.py +++ b/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc_asyncio.py @@ -57,7 +57,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + async def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.cloud.eventarc.v1.Eventarc", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER @@ -312,6 +344,7 @@ def __init__(self, *, self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists diff --git a/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc.py b/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc.py index 8e56dd2231..51b9054674 100755 --- a/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc.py +++ b/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc.py @@ -45,7 +45,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.ConfigServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER diff --git a/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc_asyncio.py b/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc_asyncio.py index 46dd72d79d..35ce853891 100755 --- a/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc_asyncio.py +++ b/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc_asyncio.py @@ -49,7 +49,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + async def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.ConfigServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER @@ -302,6 +334,7 @@ def __init__(self, *, self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists diff --git a/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc.py b/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc.py index ec9737507e..9b32fb0cf6 100755 --- a/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc.py +++ b/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc.py @@ -44,7 +44,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.LoggingServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER diff --git a/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc_asyncio.py b/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc_asyncio.py index 1604ba62bc..d8ad687b1e 100755 --- a/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc_asyncio.py +++ b/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc_asyncio.py @@ -48,7 +48,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + async def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.LoggingServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER @@ -300,6 +332,7 @@ def __init__(self, *, self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists diff --git a/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc.py b/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc.py index 70a0c89da1..3dffb4fdca 100755 --- a/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc.py +++ b/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc.py @@ -44,7 +44,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.MetricsServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER diff --git a/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc_asyncio.py b/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc_asyncio.py index dcadfbe957..29984993d1 100755 --- a/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc_asyncio.py +++ b/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc_asyncio.py @@ -48,7 +48,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + async def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.MetricsServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER @@ -300,6 +332,7 @@ def __init__(self, *, self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists diff --git a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/config_service_v2/transports/grpc.py b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/config_service_v2/transports/grpc.py index 8e56dd2231..51b9054674 100755 --- a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/config_service_v2/transports/grpc.py +++ b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/config_service_v2/transports/grpc.py @@ -45,7 +45,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.ConfigServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER diff --git a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/config_service_v2/transports/grpc_asyncio.py b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/config_service_v2/transports/grpc_asyncio.py index 46dd72d79d..35ce853891 100755 --- a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/config_service_v2/transports/grpc_asyncio.py +++ b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/config_service_v2/transports/grpc_asyncio.py @@ -49,7 +49,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + async def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.ConfigServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER @@ -302,6 +334,7 @@ def __init__(self, *, self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists diff --git a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/logging_service_v2/transports/grpc.py b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/logging_service_v2/transports/grpc.py index ec9737507e..9b32fb0cf6 100755 --- a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/logging_service_v2/transports/grpc.py +++ b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/logging_service_v2/transports/grpc.py @@ -44,7 +44,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.LoggingServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER diff --git a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/logging_service_v2/transports/grpc_asyncio.py b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/logging_service_v2/transports/grpc_asyncio.py index 1604ba62bc..d8ad687b1e 100755 --- a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/logging_service_v2/transports/grpc_asyncio.py +++ b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/logging_service_v2/transports/grpc_asyncio.py @@ -48,7 +48,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + async def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.LoggingServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER @@ -300,6 +332,7 @@ def __init__(self, *, self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists diff --git a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc.py b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc.py index 70a0c89da1..3dffb4fdca 100755 --- a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc.py +++ b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc.py @@ -44,7 +44,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.MetricsServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER diff --git a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc_asyncio.py b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc_asyncio.py index dcadfbe957..29984993d1 100755 --- a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc_asyncio.py +++ b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc_asyncio.py @@ -48,7 +48,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + async def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.MetricsServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER @@ -300,6 +332,7 @@ def __init__(self, *, self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists diff --git a/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc.py b/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc.py index 6267d2c78a..85e09d609e 100755 --- a/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc.py +++ b/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc.py @@ -45,7 +45,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.cloud.redis.v1.CloudRedis", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER diff --git a/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc_asyncio.py b/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc_asyncio.py index 3731518549..94206ff97e 100755 --- a/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc_asyncio.py +++ b/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc_asyncio.py @@ -49,7 +49,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + async def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.cloud.redis.v1.CloudRedis", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER @@ -322,6 +354,7 @@ def __init__(self, *, self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists diff --git a/tests/integration/goldens/redis_selective/google/cloud/redis_v1/services/cloud_redis/transports/grpc.py b/tests/integration/goldens/redis_selective/google/cloud/redis_v1/services/cloud_redis/transports/grpc.py index fcaefca376..fb247706c1 100755 --- a/tests/integration/goldens/redis_selective/google/cloud/redis_v1/services/cloud_redis/transports/grpc.py +++ b/tests/integration/goldens/redis_selective/google/cloud/redis_v1/services/cloud_redis/transports/grpc.py @@ -45,7 +45,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.cloud.redis.v1.CloudRedis", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER diff --git a/tests/integration/goldens/redis_selective/google/cloud/redis_v1/services/cloud_redis/transports/grpc_asyncio.py b/tests/integration/goldens/redis_selective/google/cloud/redis_v1/services/cloud_redis/transports/grpc_asyncio.py index 23337db892..56a63a2b72 100755 --- a/tests/integration/goldens/redis_selective/google/cloud/redis_v1/services/cloud_redis/transports/grpc_asyncio.py +++ b/tests/integration/goldens/redis_selective/google/cloud/redis_v1/services/cloud_redis/transports/grpc_asyncio.py @@ -49,7 +49,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + async def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.cloud.redis.v1.CloudRedis", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER @@ -322,6 +354,7 @@ def __init__(self, *, self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists From 84c7986f524d1402c21be9b457f1ab7ca69915cf Mon Sep 17 00:00:00 2001 From: Anthonios Partheniou Date: Fri, 9 May 2025 15:05:51 +0000 Subject: [PATCH 2/2] add comment --- .../%name_%version/%sub/services/%service/transports/grpc.py.j2 | 2 ++ .../%sub/services/%service/transports/grpc_asyncio.py.j2 | 2 ++ 2 files changed, 4 insertions(+) diff --git a/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc.py.j2 b/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc.py.j2 index f94c8bdf10..147b998efd 100644 --- a/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc.py.j2 +++ b/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc.py.j2 @@ -60,6 +60,8 @@ _LOGGER = std_logging.getLogger(__name__) class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + {# Streaming responses will be logged in `google-api-core`, while all requests and unary responses are logged here. #} + {# The reason is that gRPC returns an iterator, which we provide directly as the response #} def intercept_unary_stream(self, continuation, client_call_details, request): {{ shared_macros.unary_request_interceptor_common(service) }} response_it = continuation(client_call_details, request) diff --git a/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc_asyncio.py.j2 b/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc_asyncio.py.j2 index d80987f46c..e0c0e17c4b 100644 --- a/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc_asyncio.py.j2 +++ b/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc_asyncio.py.j2 @@ -65,6 +65,8 @@ _LOGGER = std_logging.getLogger(__name__) class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + {# Streaming responses will be logged in `google-api-core`, while all requests and unary responses are logged here. #} + {# The reason is that gRPC returns an iterator, which we provide directly as the response #} async def intercept_unary_stream(self, continuation, client_call_details, request): {{ shared_macros.unary_request_interceptor_common(service) }} response_it = await continuation(client_call_details, request)