Skip to content

Commit 37e3349

Browse files
authored
Optionally clean up server connections (#444)
* Optionally clean up server connections * move setting to pool * fix test * Print setting to screen * fmt * Fix pool_settings override in tests
1 parent 7f57a89 commit 37e3349

File tree

6 files changed

+97
-24
lines changed

6 files changed

+97
-24
lines changed

src/config.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,10 +487,15 @@ pub struct Pool {
487487
#[serde(default)] // False
488488
pub primary_reads_enabled: bool,
489489

490+
/// Maximum time to allow for establishing a new server connection.
490491
pub connect_timeout: Option<u64>,
491492

493+
/// Close idle connections that have been opened for longer than this.
492494
pub idle_timeout: Option<u64>,
493495

496+
/// Close server connections that have been opened for longer than this.
497+
/// Only applied to idle connections. If the connection is actively used for
498+
/// longer than this period, the pool will not interrupt it.
494499
pub server_lifetime: Option<u64>,
495500

496501
#[serde(default = "Pool::default_sharding_function")]
@@ -507,6 +512,9 @@ pub struct Pool {
507512
pub auth_query_user: Option<String>,
508513
pub auth_query_password: Option<String>,
509514

515+
#[serde(default = "Pool::default_cleanup_server_connections")]
516+
pub cleanup_server_connections: bool,
517+
510518
pub plugins: Option<Plugins>,
511519
pub shards: BTreeMap<String, Shard>,
512520
pub users: BTreeMap<String, User>,
@@ -548,6 +556,10 @@ impl Pool {
548556
ShardingFunction::PgBigintHash
549557
}
550558

559+
pub fn default_cleanup_server_connections() -> bool {
560+
true
561+
}
562+
551563
pub fn validate(&mut self) -> Result<(), Error> {
552564
match self.default_role.as_ref() {
553565
"any" => (),
@@ -637,6 +649,7 @@ impl Default for Pool {
637649
auth_query_password: None,
638650
server_lifetime: None,
639651
plugins: None,
652+
cleanup_server_connections: true,
640653
}
641654
}
642655
}
@@ -1066,6 +1079,10 @@ impl Config {
10661079
None => "default".to_string(),
10671080
}
10681081
);
1082+
info!(
1083+
"[pool: {}] Cleanup server connections: {}",
1084+
pool_name, pool_config.cleanup_server_connections
1085+
);
10691086
info!(
10701087
"[pool: {}] Plugins: {}",
10711088
pool_name,

src/mirrors.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ impl MirroredClient {
4444
Arc::new(PoolStats::new(identifier, cfg.clone())),
4545
Arc::new(RwLock::new(None)),
4646
None,
47+
true,
4748
);
4849

4950
Pool::builder()

src/pool.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@ impl ConnectionPool {
364364
Some(ref plugins) => Some(plugins.clone()),
365365
None => config.plugins.clone(),
366366
},
367+
pool_config.cleanup_server_connections,
367368
);
368369

369370
let connect_timeout = match pool_config.connect_timeout {
@@ -914,13 +915,29 @@ impl ConnectionPool {
914915

915916
/// Wrapper for the bb8 connection pool.
916917
pub struct ServerPool {
918+
/// Server address.
917919
address: Address,
920+
921+
/// Server Postgres user.
918922
user: User,
923+
924+
/// Server database.
919925
database: String,
926+
927+
/// Client/server mapping.
920928
client_server_map: ClientServerMap,
929+
930+
/// Server statistics.
921931
stats: Arc<PoolStats>,
932+
933+
/// Server auth hash (for auth passthrough).
922934
auth_hash: Arc<RwLock<Option<String>>>,
935+
936+
/// Server plugins.
923937
plugins: Option<Plugins>,
938+
939+
/// Should we clean up dirty connections before putting them into the pool?
940+
cleanup_connections: bool,
924941
}
925942

926943
impl ServerPool {
@@ -932,6 +949,7 @@ impl ServerPool {
932949
stats: Arc<PoolStats>,
933950
auth_hash: Arc<RwLock<Option<String>>>,
934951
plugins: Option<Plugins>,
952+
cleanup_connections: bool,
935953
) -> ServerPool {
936954
ServerPool {
937955
address,
@@ -941,6 +959,7 @@ impl ServerPool {
941959
stats,
942960
auth_hash,
943961
plugins,
962+
cleanup_connections,
944963
}
945964
}
946965
}
@@ -970,6 +989,7 @@ impl ManageConnection for ServerPool {
970989
self.client_server_map.clone(),
971990
stats.clone(),
972991
self.auth_hash.clone(),
992+
self.cleanup_connections,
973993
)
974994
.await
975995
{

src/server.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,13 +188,16 @@ pub struct Server {
188188
/// Application name using the server at the moment.
189189
application_name: String,
190190

191-
// Last time that a successful server send or response happened
191+
/// Last time that a successful server send or response happened
192192
last_activity: SystemTime,
193193

194194
mirror_manager: Option<MirroringManager>,
195195

196-
// Associated addresses used
196+
/// Associated addresses used
197197
addr_set: Option<AddrSet>,
198+
199+
/// Should clean up dirty connections?
200+
cleanup_connections: bool,
198201
}
199202

200203
impl Server {
@@ -207,6 +210,7 @@ impl Server {
207210
client_server_map: ClientServerMap,
208211
stats: Arc<ServerStats>,
209212
auth_hash: Arc<RwLock<Option<String>>>,
213+
cleanup_connections: bool,
210214
) -> Result<Server, Error> {
211215
let cached_resolver = CACHED_RESOLVER.load();
212216
let mut addr_set: Option<AddrSet> = None;
@@ -687,6 +691,7 @@ impl Server {
687691
address.mirrors.clone(),
688692
)),
689693
},
694+
cleanup_connections,
690695
};
691696

692697
server.set_name("pgcat").await?;
@@ -1004,7 +1009,7 @@ impl Server {
10041009
// to avoid leaking state between clients. For performance reasons we only
10051010
// send `DISCARD ALL` if we think the session is altered instead of just sending
10061011
// it before each checkin.
1007-
if self.cleanup_state.needs_cleanup() {
1012+
if self.cleanup_state.needs_cleanup() && self.cleanup_connections {
10081013
warn!("Server returned with session state altered, discarding state ({}) for application {}", self.cleanup_state, self.application_name);
10091014
self.query("DISCARD ALL").await?;
10101015
self.query("RESET ROLE").await?;
@@ -1084,6 +1089,7 @@ impl Server {
10841089
client_server_map,
10851090
Arc::new(ServerStats::default()),
10861091
Arc::new(RwLock::new(None)),
1092+
true,
10871093
)
10881094
.await?;
10891095
debug!("Connected!, sending query.");

tests/ruby/helpers/pgcat_helper.rb

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction", lb
118118
end
119119
end
120120

121-
def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random", log_level="info")
121+
def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random", log_level="info", pool_settings={})
122122
user = {
123123
"password" => "sharding_user",
124124
"pool_size" => pool_size,
@@ -134,28 +134,32 @@ def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mo
134134
replica1 = PgInstance.new(8432, user["username"], user["password"], "shard0")
135135
replica2 = PgInstance.new(9432, user["username"], user["password"], "shard0")
136136

137+
pool_config = {
138+
"default_role" => "any",
139+
"pool_mode" => pool_mode,
140+
"load_balancing_mode" => lb_mode,
141+
"primary_reads_enabled" => false,
142+
"query_parser_enabled" => false,
143+
"sharding_function" => "pg_bigint_hash",
144+
"shards" => {
145+
"0" => {
146+
"database" => "shard0",
147+
"servers" => [
148+
["localhost", primary.port.to_s, "primary"],
149+
["localhost", replica0.port.to_s, "replica"],
150+
["localhost", replica1.port.to_s, "replica"],
151+
["localhost", replica2.port.to_s, "replica"]
152+
]
153+
},
154+
},
155+
"users" => { "0" => user }
156+
}
157+
158+
pool_config = pool_config.merge(pool_settings)
159+
137160
# Main proxy configs
138161
pgcat_cfg["pools"] = {
139-
"#{pool_name}" => {
140-
"default_role" => "any",
141-
"pool_mode" => pool_mode,
142-
"load_balancing_mode" => lb_mode,
143-
"primary_reads_enabled" => false,
144-
"query_parser_enabled" => false,
145-
"sharding_function" => "pg_bigint_hash",
146-
"shards" => {
147-
"0" => {
148-
"database" => "shard0",
149-
"servers" => [
150-
["localhost", primary.port.to_s, "primary"],
151-
["localhost", replica0.port.to_s, "replica"],
152-
["localhost", replica1.port.to_s, "replica"],
153-
["localhost", replica2.port.to_s, "replica"]
154-
]
155-
},
156-
},
157-
"users" => { "0" => user }
158-
}
162+
"#{pool_name}" => pool_config,
159163
}
160164
pgcat_cfg["general"]["port"] = pgcat.port
161165
pgcat.update_config(pgcat_cfg)

tests/ruby/misc_spec.rb

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,31 @@
320320
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
321321
end
322322
end
323+
324+
context "server cleanup disabled" do
325+
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 1, "transaction", "random", "info", { "cleanup_server_connections" => false }) }
326+
327+
it "will not clean up connection state" do
328+
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
329+
processes.primary.reset_stats
330+
conn.async_exec("SET statement_timeout TO 1000")
331+
conn.close
332+
333+
puts processes.pgcat.logs
334+
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
335+
end
336+
337+
it "will not clean up prepared statements" do
338+
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
339+
processes.primary.reset_stats
340+
conn.async_exec("PREPARE prepared_q (int) AS SELECT $1")
341+
342+
conn.close
343+
344+
puts processes.pgcat.logs
345+
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
346+
end
347+
end
323348
end
324349

325350
describe "Idle client timeout" do

0 commit comments

Comments
 (0)