Skip to content

Commit d06302b

Browse files
committed
fix: add retry policy to grpc calls.
Some error codes are not on the stream, eg. istio is sending some special ones, and we already used to implement the same logic in java to bypass this issue. Signed-off-by: Simon Schrottner <simon.schrottner@dynatrace.com>
1 parent d0ec36f commit d06302b

File tree

2 files changed

+85
-10
lines changed
  • providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers

2 files changed

+85
-10
lines changed

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import logging
23
import threading
34
import time
@@ -80,6 +81,43 @@ def _generate_channel(self, config: Config) -> grpc.Channel:
8081
("grpc.initial_reconnect_backoff_ms", config.retry_backoff_ms),
8182
("grpc.max_reconnect_backoff_ms", config.retry_backoff_max_ms),
8283
("grpc.min_reconnect_backoff_ms", config.deadline_ms),
84+
("grpc.service_config", json.dumps({
85+
"methodConfig": [
86+
{
87+
"name": [
88+
{
89+
"service": "flagd.sync.v1.FlagSyncService"
90+
},
91+
{
92+
"service": "flagd.evaluation.v1.Service"
93+
}
94+
],
95+
"retryPolicy": {
96+
"maxAttempts": 3.0,
97+
"initialBackoff": "1s",
98+
"maxBackoff": "5s",
99+
"backoffMultiplier": 2.0,
100+
"retryableStatusCodes": [
101+
"CANCELLED",
102+
"UNKNOWN",
103+
"INVALID_ARGUMENT",
104+
"NOT_FOUND",
105+
"ALREADY_EXISTS",
106+
"PERMISSION_DENIED",
107+
"RESOURCE_EXHAUSTED",
108+
"FAILED_PRECONDITION",
109+
"ABORTED",
110+
"OUT_OF_RANGE",
111+
"UNIMPLEMENTED",
112+
"INTERNAL",
113+
"UNAVAILABLE",
114+
"DATA_LOSS",
115+
"UNAUTHENTICATED"
116+
]
117+
}
118+
}
119+
]
120+
})
83121
]
84122
if config.tls:
85123
channel_args = {

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@
2525

2626
class GrpcWatcher(FlagStateConnector):
2727
def __init__(
28-
self,
29-
config: Config,
30-
flag_store: FlagStore,
31-
emit_provider_ready: typing.Callable[[ProviderEventDetails, dict], None],
32-
emit_provider_error: typing.Callable[[ProviderEventDetails], None],
33-
emit_provider_stale: typing.Callable[[ProviderEventDetails], None],
28+
self,
29+
config: Config,
30+
flag_store: FlagStore,
31+
emit_provider_ready: typing.Callable[[ProviderEventDetails, dict], None],
32+
emit_provider_error: typing.Callable[[ProviderEventDetails], None],
33+
emit_provider_stale: typing.Callable[[ProviderEventDetails], None],
3434
):
3535
self.flag_store = flag_store
3636
self.config = config
@@ -62,6 +62,43 @@ def _generate_channel(self, config: Config) -> grpc.Channel:
6262
("grpc.initial_reconnect_backoff_ms", config.retry_backoff_ms),
6363
("grpc.max_reconnect_backoff_ms", config.retry_backoff_max_ms),
6464
("grpc.min_reconnect_backoff_ms", config.stream_deadline_ms),
65+
("grpc.service_config", json.dumps({
66+
"methodConfig": [
67+
{
68+
"name": [
69+
{
70+
"service": "flagd.sync.v1.FlagSyncService"
71+
},
72+
{
73+
"service": "flagd.evaluation.v1.Service"
74+
}
75+
],
76+
"retryPolicy": {
77+
"maxAttempts": 3.0,
78+
"initialBackoff": "1s",
79+
"maxBackoff": "5s",
80+
"backoffMultiplier": 2.0,
81+
"retryableStatusCodes": [
82+
"CANCELLED",
83+
"UNKNOWN",
84+
"INVALID_ARGUMENT",
85+
"NOT_FOUND",
86+
"ALREADY_EXISTS",
87+
"PERMISSION_DENIED",
88+
"RESOURCE_EXHAUSTED",
89+
"FAILED_PRECONDITION",
90+
"ABORTED",
91+
"OUT_OF_RANGE",
92+
"UNIMPLEMENTED",
93+
"INTERNAL",
94+
"UNAVAILABLE",
95+
"DATA_LOSS",
96+
"UNAUTHENTICATED"
97+
]
98+
}
99+
}
100+
]
101+
}))
65102
]
66103
if config.default_authority is not None:
67104
options.append(("grpc.default_authority", config.default_authority))
@@ -120,8 +157,8 @@ def monitor(self) -> None:
120157
def _state_change_callback(self, new_state: grpc.ChannelConnectivity) -> None:
121158
logger.debug(f"gRPC state change: {new_state}")
122159
if (
123-
new_state == grpc.ChannelConnectivity.READY
124-
or new_state == grpc.ChannelConnectivity.IDLE
160+
new_state == grpc.ChannelConnectivity.READY
161+
or new_state == grpc.ChannelConnectivity.IDLE
125162
):
126163
if not self.thread or not self.thread.is_alive():
127164
self.thread = threading.Thread(
@@ -199,11 +236,11 @@ def listen(self) -> None:
199236

200237
logger.debug("Setting up gRPC sync flags connection")
201238
for flag_rsp in self.stub.SyncFlags(
202-
request, wait_for_ready=True, **call_args
239+
request, wait_for_ready=True, **call_args
203240
):
204241
flag_str = flag_rsp.flag_configuration
205242
logger.debug(
206-
f"Received flag configuration - {abs(hash(flag_str)) % (10**8)}"
243+
f"Received flag configuration - {abs(hash(flag_str)) % (10 ** 8)}"
207244
)
208245
self.flag_store.update(json.loads(flag_str))
209246

0 commit comments

Comments
 (0)