From ae7f066bd31f034eebf5035a5cc4a2abf4faee80 Mon Sep 17 00:00:00 2001 From: Simon Schrottner Date: Tue, 10 Jun 2025 13:19:57 +0200 Subject: [PATCH] fix: add retry policy to grpc calls. Some error codes are not on the stream, eg. istio is sending some special ones, and we already used to implement the same logic in java to bypass this issue. Signed-off-by: Simon Schrottner --- .../contrib/provider/flagd/resolvers/grpc.py | 39 +++++++++++++++++++ .../process/connector/grpc_watcher.py | 38 ++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py index 3d8ebe1f..03a87f3b 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py @@ -1,3 +1,4 @@ +import json import logging import threading import time @@ -80,6 +81,44 @@ def _generate_channel(self, config: Config) -> grpc.Channel: ("grpc.initial_reconnect_backoff_ms", config.retry_backoff_ms), ("grpc.max_reconnect_backoff_ms", config.retry_backoff_max_ms), ("grpc.min_reconnect_backoff_ms", config.deadline_ms), + ( + "grpc.service_config", + json.dumps( + { + "methodConfig": [ + { + "name": [ + {"service": "flagd.sync.v1.FlagSyncService"}, + {"service": "flagd.evaluation.v1.Service"}, + ], + "retryPolicy": { + "maxAttempts": 3, + "initialBackoff": "1s", + "maxBackoff": "5s", + "backoffMultiplier": 2.0, + "retryableStatusCodes": [ + "CANCELLED", + "UNKNOWN", + "INVALID_ARGUMENT", + "NOT_FOUND", + "ALREADY_EXISTS", + "PERMISSION_DENIED", + "RESOURCE_EXHAUSTED", + "FAILED_PRECONDITION", + "ABORTED", + "OUT_OF_RANGE", + "UNIMPLEMENTED", + "INTERNAL", + "UNAVAILABLE", + "DATA_LOSS", + "UNAUTHENTICATED", + ], + }, + } + ] + } + ), + ), ] if config.tls: channel_args = { diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py index 66429666..6e87d147 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py @@ -62,6 +62,44 @@ def _generate_channel(self, config: Config) -> grpc.Channel: ("grpc.initial_reconnect_backoff_ms", config.retry_backoff_ms), ("grpc.max_reconnect_backoff_ms", config.retry_backoff_max_ms), ("grpc.min_reconnect_backoff_ms", config.stream_deadline_ms), + ( + "grpc.service_config", + json.dumps( + { + "methodConfig": [ + { + "name": [ + {"service": "flagd.sync.v1.FlagSyncService"}, + {"service": "flagd.evaluation.v1.Service"}, + ], + "retryPolicy": { + "maxAttempts": 3, + "initialBackoff": "1s", + "maxBackoff": "5s", + "backoffMultiplier": 2.0, + "retryableStatusCodes": [ + "CANCELLED", + "UNKNOWN", + "INVALID_ARGUMENT", + "NOT_FOUND", + "ALREADY_EXISTS", + "PERMISSION_DENIED", + "RESOURCE_EXHAUSTED", + "FAILED_PRECONDITION", + "ABORTED", + "OUT_OF_RANGE", + "UNIMPLEMENTED", + "INTERNAL", + "UNAVAILABLE", + "DATA_LOSS", + "UNAUTHENTICATED", + ], + }, + } + ] + } + ), + ), ] if config.default_authority is not None: options.append(("grpc.default_authority", config.default_authority))