|
165 | 165 |
|
166 | 166 | describe "Candidate filtering based on `default_pool`" do
|
167 | 167 | let(:processes) {
|
168 |
| - Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction", "random", "debug", pool_settings) |
| 168 | + Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction", "random", "debug", pool_settings, general_settings) |
169 | 169 | }
|
170 | 170 |
|
171 | 171 | after do
|
|
175 | 175 |
|
176 | 176 | context("with default_pool set to replicas") do
|
177 | 177 | context("when all replicas are down ") do
|
| 178 | + let(:ban_time) { 60 } |
| 179 | + let(:connect_timeout) { 1000 } |
178 | 180 | let(:pool_settings) do
|
179 | 181 | {
|
180 | 182 | "default_role" => "replica",
|
181 |
| - "replica_to_primary_failover_enabled" => replica_to_primary_failover_enabled |
| 183 | + "replica_to_primary_failover_enabled" => replica_to_primary_failover_enabled, |
| 184 | + "connect_timeout" => connect_timeout, |
182 | 185 | }
|
183 | 186 | end
|
| 187 | + let(:general_settings) { { "ban_time" => ban_time } } |
184 | 188 |
|
185 | 189 | context("with `replica_to_primary_failover_enabled` set to false`") do
|
186 | 190 | let(:replica_to_primary_failover_enabled) { false }
|
|
249 | 253 |
|
250 | 254 | expect(failed_count).to(eq(number_of_replicas))
|
251 | 255 | end
|
| 256 | + context("when banned replicas are tested for availability because they expired the ban time") do |
| 257 | + let(:ban_time) { 2 } |
| 258 | + it "should be done in the background without interfering with traffic" do |
| 259 | + select_server_port = "SELECT setting AS port FROM pg_settings WHERE name = 'port';" |
| 260 | + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) |
| 261 | + primary_port = processes.primary.original_port; |
| 262 | + |
| 263 | + response = conn.async_exec(select_server_port) |
| 264 | + expect(response[0]["port"].to_i).not_to(eq(primary_port)) |
| 265 | + |
| 266 | + failed_count = 0 |
| 267 | + number_of_replicas = processes[:replicas].length |
| 268 | + |
| 269 | + # Take down all replicas |
| 270 | + processes[:replicas].each(&:take_down) |
| 271 | + |
| 272 | + (number_of_replicas).times do |n| |
| 273 | + response = conn.async_exec(select_server_port) |
| 274 | + expect(response[0]["port"].to_i).to(eq(primary_port)) |
| 275 | + rescue |
| 276 | + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) |
| 277 | + failed_count += 1 |
| 278 | + end |
| 279 | + expect(failed_count).to(eq(number_of_replicas)) |
| 280 | + failed_count = 0 |
| 281 | + |
| 282 | + response = conn.async_exec(select_server_port) |
| 283 | + expect(response[0]["port"].to_i).to(eq(primary_port)) |
| 284 | + sleep(ban_time + 1) |
| 285 | + |
| 286 | + begin |
| 287 | + time_before_query = Time.now |
| 288 | + response = conn.async_exec(select_server_port) |
| 289 | + expect(response[0]["port"].to_i).to(eq(primary_port)) |
| 290 | + expect(Time.now - time_before_query).to(be < connect_timeout / 1000.0) |
| 291 | + rescue |
| 292 | + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) |
| 293 | + failed_count += 1 |
| 294 | + end |
| 295 | + expect(response[0]["port"].to_i).to(eq(primary_port)) |
| 296 | + expect(failed_count).to(eq(0)) |
| 297 | + end |
| 298 | + end |
252 | 299 | end
|
253 | 300 | end
|
254 | 301 | end
|
|
0 commit comments