Skip to content

Commit d190cba

Browse files
committed
feat: add client debug logging support for unary-stream gRPC calls
1 parent 629cf19 commit d190cba

File tree

24 files changed

+750
-24
lines changed

24 files changed

+750
-24
lines changed

gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc.py.j2

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,12 @@ except ImportError: # pragma: NO COVER
5959
_LOGGER = std_logging.getLogger(__name__)
6060

6161

62-
class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER
62+
class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER
63+
def intercept_unary_stream(self, continuation, client_call_details, request):
64+
{{ shared_macros.unary_request_interceptor_common(service) }}
65+
response_it = continuation(client_call_details, request)
66+
return response_it
67+
6368
def intercept_unary_unary(self, continuation, client_call_details, request):
6469
{{ shared_macros.unary_request_interceptor_common(service) }}
6570
response = continuation(client_call_details, request)

gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc_asyncio.py.j2

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,12 @@ except ImportError: # pragma: NO COVER
6464
_LOGGER = std_logging.getLogger(__name__)
6565

6666

67-
class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER
67+
class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER
68+
async def intercept_unary_stream(self, continuation, client_call_details, request):
69+
{{ shared_macros.unary_request_interceptor_common(service) }}
70+
response_it = await continuation(client_call_details, request)
71+
return response_it
72+
6873
async def intercept_unary_unary(self, continuation, client_call_details, request):
6974
{{ shared_macros.unary_request_interceptor_common(service) }}
7075
response = await continuation(client_call_details, request)
@@ -296,6 +301,7 @@ class {{ service.grpc_asyncio_transport_name }}({{ service.name }}Transport):
296301

297302
self._interceptor = _LoggingClientAIOInterceptor()
298303
self._grpc_channel._unary_unary_interceptors.append(self._interceptor)
304+
self._grpc_channel._unary_stream_interceptors.append(self._interceptor)
299305
self._logged_channel = self._grpc_channel
300306
self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters
301307
# Wrap messages. This must be done after self._logged_channel exists

tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,39 @@
4545
_LOGGER = std_logging.getLogger(__name__)
4646

4747

48-
class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER
48+
class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER
49+
def intercept_unary_stream(self, continuation, client_call_details, request):
50+
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
51+
if logging_enabled: # pragma: NO COVER
52+
request_metadata = client_call_details.metadata
53+
if isinstance(request, proto.Message):
54+
request_payload = type(request).to_json(request)
55+
elif isinstance(request, google.protobuf.message.Message):
56+
request_payload = MessageToJson(request)
57+
else:
58+
request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"
59+
60+
request_metadata = {
61+
key: value.decode("utf-8") if isinstance(value, bytes) else value
62+
for key, value in request_metadata
63+
}
64+
grpc_request = {
65+
"payload": request_payload,
66+
"requestMethod": "grpc",
67+
"metadata": dict(request_metadata),
68+
}
69+
_LOGGER.debug(
70+
f"Sending request for {client_call_details.method}",
71+
extra = {
72+
"serviceName": "google.cloud.asset.v1.AssetService",
73+
"rpcName": str(client_call_details.method),
74+
"request": grpc_request,
75+
"metadata": grpc_request["metadata"],
76+
},
77+
)
78+
response_it = continuation(client_call_details, request)
79+
return response_it
80+
4981
def intercept_unary_unary(self, continuation, client_call_details, request):
5082
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
5183
if logging_enabled: # pragma: NO COVER

tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc_asyncio.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,39 @@
4949
_LOGGER = std_logging.getLogger(__name__)
5050

5151

52-
class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER
52+
class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER
53+
async def intercept_unary_stream(self, continuation, client_call_details, request):
54+
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
55+
if logging_enabled: # pragma: NO COVER
56+
request_metadata = client_call_details.metadata
57+
if isinstance(request, proto.Message):
58+
request_payload = type(request).to_json(request)
59+
elif isinstance(request, google.protobuf.message.Message):
60+
request_payload = MessageToJson(request)
61+
else:
62+
request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"
63+
64+
request_metadata = {
65+
key: value.decode("utf-8") if isinstance(value, bytes) else value
66+
for key, value in request_metadata
67+
}
68+
grpc_request = {
69+
"payload": request_payload,
70+
"requestMethod": "grpc",
71+
"metadata": dict(request_metadata),
72+
}
73+
_LOGGER.debug(
74+
f"Sending request for {client_call_details.method}",
75+
extra = {
76+
"serviceName": "google.cloud.asset.v1.AssetService",
77+
"rpcName": str(client_call_details.method),
78+
"request": grpc_request,
79+
"metadata": grpc_request["metadata"],
80+
},
81+
)
82+
response_it = await continuation(client_call_details, request)
83+
return response_it
84+
5385
async def intercept_unary_unary(self, continuation, client_call_details, request):
5486
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
5587
if logging_enabled: # pragma: NO COVER
@@ -302,6 +334,7 @@ def __init__(self, *,
302334

303335
self._interceptor = _LoggingClientAIOInterceptor()
304336
self._grpc_channel._unary_unary_interceptors.append(self._interceptor)
337+
self._grpc_channel._unary_stream_interceptors.append(self._interceptor)
305338
self._logged_channel = self._grpc_channel
306339
self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters
307340
# Wrap messages. This must be done after self._logged_channel exists

tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,39 @@
4242
_LOGGER = std_logging.getLogger(__name__)
4343

4444

45-
class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER
45+
class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER
46+
def intercept_unary_stream(self, continuation, client_call_details, request):
47+
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
48+
if logging_enabled: # pragma: NO COVER
49+
request_metadata = client_call_details.metadata
50+
if isinstance(request, proto.Message):
51+
request_payload = type(request).to_json(request)
52+
elif isinstance(request, google.protobuf.message.Message):
53+
request_payload = MessageToJson(request)
54+
else:
55+
request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"
56+
57+
request_metadata = {
58+
key: value.decode("utf-8") if isinstance(value, bytes) else value
59+
for key, value in request_metadata
60+
}
61+
grpc_request = {
62+
"payload": request_payload,
63+
"requestMethod": "grpc",
64+
"metadata": dict(request_metadata),
65+
}
66+
_LOGGER.debug(
67+
f"Sending request for {client_call_details.method}",
68+
extra = {
69+
"serviceName": "google.iam.credentials.v1.IAMCredentials",
70+
"rpcName": str(client_call_details.method),
71+
"request": grpc_request,
72+
"metadata": grpc_request["metadata"],
73+
},
74+
)
75+
response_it = continuation(client_call_details, request)
76+
return response_it
77+
4678
def intercept_unary_unary(self, continuation, client_call_details, request):
4779
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
4880
if logging_enabled: # pragma: NO COVER

tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc_asyncio.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,39 @@
4646
_LOGGER = std_logging.getLogger(__name__)
4747

4848

49-
class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER
49+
class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER
50+
async def intercept_unary_stream(self, continuation, client_call_details, request):
51+
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
52+
if logging_enabled: # pragma: NO COVER
53+
request_metadata = client_call_details.metadata
54+
if isinstance(request, proto.Message):
55+
request_payload = type(request).to_json(request)
56+
elif isinstance(request, google.protobuf.message.Message):
57+
request_payload = MessageToJson(request)
58+
else:
59+
request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"
60+
61+
request_metadata = {
62+
key: value.decode("utf-8") if isinstance(value, bytes) else value
63+
for key, value in request_metadata
64+
}
65+
grpc_request = {
66+
"payload": request_payload,
67+
"requestMethod": "grpc",
68+
"metadata": dict(request_metadata),
69+
}
70+
_LOGGER.debug(
71+
f"Sending request for {client_call_details.method}",
72+
extra = {
73+
"serviceName": "google.iam.credentials.v1.IAMCredentials",
74+
"rpcName": str(client_call_details.method),
75+
"request": grpc_request,
76+
"metadata": grpc_request["metadata"],
77+
},
78+
)
79+
response_it = await continuation(client_call_details, request)
80+
return response_it
81+
5082
async def intercept_unary_unary(self, continuation, client_call_details, request):
5183
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
5284
if logging_enabled: # pragma: NO COVER
@@ -307,6 +339,7 @@ def __init__(self, *,
307339

308340
self._interceptor = _LoggingClientAIOInterceptor()
309341
self._grpc_channel._unary_unary_interceptors.append(self._interceptor)
342+
self._grpc_channel._unary_stream_interceptors.append(self._interceptor)
310343
self._logged_channel = self._grpc_channel
311344
self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters
312345
# Wrap messages. This must be done after self._logged_channel exists

tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,39 @@
5353
_LOGGER = std_logging.getLogger(__name__)
5454

5555

56-
class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER
56+
class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER
57+
def intercept_unary_stream(self, continuation, client_call_details, request):
58+
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
59+
if logging_enabled: # pragma: NO COVER
60+
request_metadata = client_call_details.metadata
61+
if isinstance(request, proto.Message):
62+
request_payload = type(request).to_json(request)
63+
elif isinstance(request, google.protobuf.message.Message):
64+
request_payload = MessageToJson(request)
65+
else:
66+
request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"
67+
68+
request_metadata = {
69+
key: value.decode("utf-8") if isinstance(value, bytes) else value
70+
for key, value in request_metadata
71+
}
72+
grpc_request = {
73+
"payload": request_payload,
74+
"requestMethod": "grpc",
75+
"metadata": dict(request_metadata),
76+
}
77+
_LOGGER.debug(
78+
f"Sending request for {client_call_details.method}",
79+
extra = {
80+
"serviceName": "google.cloud.eventarc.v1.Eventarc",
81+
"rpcName": str(client_call_details.method),
82+
"request": grpc_request,
83+
"metadata": grpc_request["metadata"],
84+
},
85+
)
86+
response_it = continuation(client_call_details, request)
87+
return response_it
88+
5789
def intercept_unary_unary(self, continuation, client_call_details, request):
5890
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
5991
if logging_enabled: # pragma: NO COVER

tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc_asyncio.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,39 @@
5757
_LOGGER = std_logging.getLogger(__name__)
5858

5959

60-
class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER
60+
class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER
61+
async def intercept_unary_stream(self, continuation, client_call_details, request):
62+
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
63+
if logging_enabled: # pragma: NO COVER
64+
request_metadata = client_call_details.metadata
65+
if isinstance(request, proto.Message):
66+
request_payload = type(request).to_json(request)
67+
elif isinstance(request, google.protobuf.message.Message):
68+
request_payload = MessageToJson(request)
69+
else:
70+
request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"
71+
72+
request_metadata = {
73+
key: value.decode("utf-8") if isinstance(value, bytes) else value
74+
for key, value in request_metadata
75+
}
76+
grpc_request = {
77+
"payload": request_payload,
78+
"requestMethod": "grpc",
79+
"metadata": dict(request_metadata),
80+
}
81+
_LOGGER.debug(
82+
f"Sending request for {client_call_details.method}",
83+
extra = {
84+
"serviceName": "google.cloud.eventarc.v1.Eventarc",
85+
"rpcName": str(client_call_details.method),
86+
"request": grpc_request,
87+
"metadata": grpc_request["metadata"],
88+
},
89+
)
90+
response_it = await continuation(client_call_details, request)
91+
return response_it
92+
6193
async def intercept_unary_unary(self, continuation, client_call_details, request):
6294
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
6395
if logging_enabled: # pragma: NO COVER
@@ -312,6 +344,7 @@ def __init__(self, *,
312344

313345
self._interceptor = _LoggingClientAIOInterceptor()
314346
self._grpc_channel._unary_unary_interceptors.append(self._interceptor)
347+
self._grpc_channel._unary_stream_interceptors.append(self._interceptor)
315348
self._logged_channel = self._grpc_channel
316349
self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters
317350
# Wrap messages. This must be done after self._logged_channel exists

tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,39 @@
4545
_LOGGER = std_logging.getLogger(__name__)
4646

4747

48-
class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER
48+
class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER
49+
def intercept_unary_stream(self, continuation, client_call_details, request):
50+
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
51+
if logging_enabled: # pragma: NO COVER
52+
request_metadata = client_call_details.metadata
53+
if isinstance(request, proto.Message):
54+
request_payload = type(request).to_json(request)
55+
elif isinstance(request, google.protobuf.message.Message):
56+
request_payload = MessageToJson(request)
57+
else:
58+
request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"
59+
60+
request_metadata = {
61+
key: value.decode("utf-8") if isinstance(value, bytes) else value
62+
for key, value in request_metadata
63+
}
64+
grpc_request = {
65+
"payload": request_payload,
66+
"requestMethod": "grpc",
67+
"metadata": dict(request_metadata),
68+
}
69+
_LOGGER.debug(
70+
f"Sending request for {client_call_details.method}",
71+
extra = {
72+
"serviceName": "google.logging.v2.ConfigServiceV2",
73+
"rpcName": str(client_call_details.method),
74+
"request": grpc_request,
75+
"metadata": grpc_request["metadata"],
76+
},
77+
)
78+
response_it = continuation(client_call_details, request)
79+
return response_it
80+
4981
def intercept_unary_unary(self, continuation, client_call_details, request):
5082
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
5183
if logging_enabled: # pragma: NO COVER

tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc_asyncio.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,39 @@
4949
_LOGGER = std_logging.getLogger(__name__)
5050

5151

52-
class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER
52+
class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER
53+
async def intercept_unary_stream(self, continuation, client_call_details, request):
54+
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
55+
if logging_enabled: # pragma: NO COVER
56+
request_metadata = client_call_details.metadata
57+
if isinstance(request, proto.Message):
58+
request_payload = type(request).to_json(request)
59+
elif isinstance(request, google.protobuf.message.Message):
60+
request_payload = MessageToJson(request)
61+
else:
62+
request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"
63+
64+
request_metadata = {
65+
key: value.decode("utf-8") if isinstance(value, bytes) else value
66+
for key, value in request_metadata
67+
}
68+
grpc_request = {
69+
"payload": request_payload,
70+
"requestMethod": "grpc",
71+
"metadata": dict(request_metadata),
72+
}
73+
_LOGGER.debug(
74+
f"Sending request for {client_call_details.method}",
75+
extra = {
76+
"serviceName": "google.logging.v2.ConfigServiceV2",
77+
"rpcName": str(client_call_details.method),
78+
"request": grpc_request,
79+
"metadata": grpc_request["metadata"],
80+
},
81+
)
82+
response_it = await continuation(client_call_details, request)
83+
return response_it
84+
5385
async def intercept_unary_unary(self, continuation, client_call_details, request):
5486
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
5587
if logging_enabled: # pragma: NO COVER
@@ -302,6 +334,7 @@ def __init__(self, *,
302334

303335
self._interceptor = _LoggingClientAIOInterceptor()
304336
self._grpc_channel._unary_unary_interceptors.append(self._interceptor)
337+
self._grpc_channel._unary_stream_interceptors.append(self._interceptor)
305338
self._logged_channel = self._grpc_channel
306339
self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters
307340
# Wrap messages. This must be done after self._logged_channel exists

0 commit comments

Comments
 (0)