Skip to content

Commit ad3febc

Browse files
committed
fix: add retry policy to grpc calls.
Some error codes are not on the stream, eg. istio is sending some special ones, and we already used to implement the same logic in java to bypass this issue. Signed-off-by: Simon Schrottner <simon.schrottner@dynatrace.com>
1 parent d0ec36f commit ad3febc

File tree

2 files changed

+122
-47
lines changed
  • providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers

2 files changed

+122
-47
lines changed

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py

Lines changed: 75 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import logging
23
import threading
34
import time
@@ -39,14 +40,14 @@
3940

4041
class GrpcResolver:
4142
def __init__(
42-
self,
43-
config: Config,
44-
emit_provider_ready: typing.Callable[[ProviderEventDetails], None],
45-
emit_provider_error: typing.Callable[[ProviderEventDetails], None],
46-
emit_provider_stale: typing.Callable[[ProviderEventDetails], None],
47-
emit_provider_configuration_changed: typing.Callable[
48-
[ProviderEventDetails], None
49-
],
43+
self,
44+
config: Config,
45+
emit_provider_ready: typing.Callable[[ProviderEventDetails], None],
46+
emit_provider_error: typing.Callable[[ProviderEventDetails], None],
47+
emit_provider_stale: typing.Callable[[ProviderEventDetails], None],
48+
emit_provider_configuration_changed: typing.Callable[
49+
[ProviderEventDetails], None
50+
],
5051
):
5152
self.active = False
5253
self.config = config
@@ -80,6 +81,43 @@ def _generate_channel(self, config: Config) -> grpc.Channel:
8081
("grpc.initial_reconnect_backoff_ms", config.retry_backoff_ms),
8182
("grpc.max_reconnect_backoff_ms", config.retry_backoff_max_ms),
8283
("grpc.min_reconnect_backoff_ms", config.deadline_ms),
84+
("grpc.service_config", json.dumps({
85+
"methodConfig": [
86+
{
87+
"name": [
88+
{
89+
"service": "flagd.sync.v1.FlagSyncService"
90+
},
91+
{
92+
"service": "flagd.evaluation.v1.Service"
93+
}
94+
],
95+
"retryPolicy": {
96+
"maxAttempts": 3.0,
97+
"initialBackoff": "1s",
98+
"maxBackoff": "5s",
99+
"backoffMultiplier": 2.0,
100+
"retryableStatusCodes": [
101+
"CANCELLED",
102+
"UNKNOWN",
103+
"INVALID_ARGUMENT",
104+
"NOT_FOUND",
105+
"ALREADY_EXISTS",
106+
"PERMISSION_DENIED",
107+
"RESOURCE_EXHAUSTED",
108+
"FAILED_PRECONDITION",
109+
"ABORTED",
110+
"OUT_OF_RANGE",
111+
"UNIMPLEMENTED",
112+
"INTERNAL",
113+
"UNAVAILABLE",
114+
"DATA_LOSS",
115+
"UNAUTHENTICATED"
116+
]
117+
}
118+
}
119+
]
120+
}))
83121
]
84122
if config.tls:
85123
channel_args = {
@@ -138,8 +176,8 @@ def monitor(self) -> None:
138176
def _state_change_callback(self, new_state: ChannelConnectivity) -> None:
139177
logger.debug(f"gRPC state change: {new_state}")
140178
if (
141-
new_state == grpc.ChannelConnectivity.READY
142-
or new_state == grpc.ChannelConnectivity.IDLE
179+
new_state == grpc.ChannelConnectivity.READY
180+
or new_state == grpc.ChannelConnectivity.IDLE
143181
):
144182
if not self.thread or not self.thread.is_alive():
145183
self.thread = threading.Thread(
@@ -193,7 +231,7 @@ def listen(self) -> None:
193231
try:
194232
logger.debug("Setting up gRPC sync flags connection")
195233
for message in self.stub.EventStream(
196-
request, wait_for_ready=True, **call_args
234+
request, wait_for_ready=True, **call_args
197235
):
198236
if message.type == "provider_ready":
199237
self.emit_provider_ready(
@@ -227,51 +265,51 @@ def handle_changed_flags(self, data: typing.Any) -> None:
227265
self.emit_provider_configuration_changed(ProviderEventDetails(changed_flags))
228266

229267
def resolve_boolean_details(
230-
self,
231-
key: str,
232-
default_value: bool,
233-
evaluation_context: typing.Optional[EvaluationContext] = None,
268+
self,
269+
key: str,
270+
default_value: bool,
271+
evaluation_context: typing.Optional[EvaluationContext] = None,
234272
) -> FlagResolutionDetails[bool]:
235273
return self._resolve(key, FlagType.BOOLEAN, default_value, evaluation_context)
236274

237275
def resolve_string_details(
238-
self,
239-
key: str,
240-
default_value: str,
241-
evaluation_context: typing.Optional[EvaluationContext] = None,
276+
self,
277+
key: str,
278+
default_value: str,
279+
evaluation_context: typing.Optional[EvaluationContext] = None,
242280
) -> FlagResolutionDetails[str]:
243281
return self._resolve(key, FlagType.STRING, default_value, evaluation_context)
244282

245283
def resolve_float_details(
246-
self,
247-
key: str,
248-
default_value: float,
249-
evaluation_context: typing.Optional[EvaluationContext] = None,
284+
self,
285+
key: str,
286+
default_value: float,
287+
evaluation_context: typing.Optional[EvaluationContext] = None,
250288
) -> FlagResolutionDetails[float]:
251289
return self._resolve(key, FlagType.FLOAT, default_value, evaluation_context)
252290

253291
def resolve_integer_details(
254-
self,
255-
key: str,
256-
default_value: int,
257-
evaluation_context: typing.Optional[EvaluationContext] = None,
292+
self,
293+
key: str,
294+
default_value: int,
295+
evaluation_context: typing.Optional[EvaluationContext] = None,
258296
) -> FlagResolutionDetails[int]:
259297
return self._resolve(key, FlagType.INTEGER, default_value, evaluation_context)
260298

261299
def resolve_object_details(
262-
self,
263-
key: str,
264-
default_value: typing.Union[dict, list],
265-
evaluation_context: typing.Optional[EvaluationContext] = None,
300+
self,
301+
key: str,
302+
default_value: typing.Union[dict, list],
303+
evaluation_context: typing.Optional[EvaluationContext] = None,
266304
) -> FlagResolutionDetails[typing.Union[dict, list]]:
267305
return self._resolve(key, FlagType.OBJECT, default_value, evaluation_context)
268306

269307
def _resolve( # noqa: PLR0915 C901
270-
self,
271-
flag_key: str,
272-
flag_type: FlagType,
273-
default_value: T,
274-
evaluation_context: typing.Optional[EvaluationContext],
308+
self,
309+
flag_key: str,
310+
flag_type: FlagType,
311+
default_value: T,
312+
evaluation_context: typing.Optional[EvaluationContext],
275313
) -> FlagResolutionDetails[T]:
276314
if self.cache is not None and flag_key in self.cache:
277315
cached_flag: FlagResolutionDetails[T] = self.cache[flag_key]
@@ -342,7 +380,7 @@ def _resolve( # noqa: PLR0915 C901
342380
return result
343381

344382
def _convert_context(
345-
self, evaluation_context: typing.Optional[EvaluationContext]
383+
self, evaluation_context: typing.Optional[EvaluationContext]
346384
) -> Struct:
347385
s = Struct()
348386
if evaluation_context:

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@
2525

2626
class GrpcWatcher(FlagStateConnector):
2727
def __init__(
28-
self,
29-
config: Config,
30-
flag_store: FlagStore,
31-
emit_provider_ready: typing.Callable[[ProviderEventDetails, dict], None],
32-
emit_provider_error: typing.Callable[[ProviderEventDetails], None],
33-
emit_provider_stale: typing.Callable[[ProviderEventDetails], None],
28+
self,
29+
config: Config,
30+
flag_store: FlagStore,
31+
emit_provider_ready: typing.Callable[[ProviderEventDetails, dict], None],
32+
emit_provider_error: typing.Callable[[ProviderEventDetails], None],
33+
emit_provider_stale: typing.Callable[[ProviderEventDetails], None],
3434
):
3535
self.flag_store = flag_store
3636
self.config = config
@@ -62,6 +62,43 @@ def _generate_channel(self, config: Config) -> grpc.Channel:
6262
("grpc.initial_reconnect_backoff_ms", config.retry_backoff_ms),
6363
("grpc.max_reconnect_backoff_ms", config.retry_backoff_max_ms),
6464
("grpc.min_reconnect_backoff_ms", config.stream_deadline_ms),
65+
("grpc.service_config", json.dumps({
66+
"methodConfig": [
67+
{
68+
"name": [
69+
{
70+
"service": "flagd.sync.v1.FlagSyncService"
71+
},
72+
{
73+
"service": "flagd.evaluation.v1.Service"
74+
}
75+
],
76+
"retryPolicy": {
77+
"maxAttempts": 3.0,
78+
"initialBackoff": "1s",
79+
"maxBackoff": "5s",
80+
"backoffMultiplier": 2.0,
81+
"retryableStatusCodes": [
82+
"CANCELLED",
83+
"UNKNOWN",
84+
"INVALID_ARGUMENT",
85+
"NOT_FOUND",
86+
"ALREADY_EXISTS",
87+
"PERMISSION_DENIED",
88+
"RESOURCE_EXHAUSTED",
89+
"FAILED_PRECONDITION",
90+
"ABORTED",
91+
"OUT_OF_RANGE",
92+
"UNIMPLEMENTED",
93+
"INTERNAL",
94+
"UNAVAILABLE",
95+
"DATA_LOSS",
96+
"UNAUTHENTICATED"
97+
]
98+
}
99+
}
100+
]
101+
}))
65102
]
66103
if config.default_authority is not None:
67104
options.append(("grpc.default_authority", config.default_authority))
@@ -120,8 +157,8 @@ def monitor(self) -> None:
120157
def _state_change_callback(self, new_state: grpc.ChannelConnectivity) -> None:
121158
logger.debug(f"gRPC state change: {new_state}")
122159
if (
123-
new_state == grpc.ChannelConnectivity.READY
124-
or new_state == grpc.ChannelConnectivity.IDLE
160+
new_state == grpc.ChannelConnectivity.READY
161+
or new_state == grpc.ChannelConnectivity.IDLE
125162
):
126163
if not self.thread or not self.thread.is_alive():
127164
self.thread = threading.Thread(
@@ -199,11 +236,11 @@ def listen(self) -> None:
199236

200237
logger.debug("Setting up gRPC sync flags connection")
201238
for flag_rsp in self.stub.SyncFlags(
202-
request, wait_for_ready=True, **call_args
239+
request, wait_for_ready=True, **call_args
203240
):
204241
flag_str = flag_rsp.flag_configuration
205242
logger.debug(
206-
f"Received flag configuration - {abs(hash(flag_str)) % (10**8)}"
243+
f"Received flag configuration - {abs(hash(flag_str)) % (10 ** 8)}"
207244
)
208245
self.flag_store.update(json.loads(flag_str))
209246

0 commit comments

Comments
 (0)