Skip to content

Commit cc68ed0

Browse files
committed
Simplify trigger queries
1 parent 043f0fc commit cc68ed0

File tree

4 files changed

+40
-32
lines changed

4 files changed

+40
-32
lines changed

src/pgcachewatch/cli.py

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@
33

44
import asyncpg
55

6-
from pgcachewatch import queries
6+
from pgcachewatch import models, queries
77

88

99
def cliparser() -> argparse.Namespace:
10-
common_arguments = argparse.ArgumentParser(add_help=False)
10+
common_arguments = argparse.ArgumentParser(
11+
add_help=False,
12+
prog="pgcachewatch",
13+
)
1114
common_arguments.add_argument(
1215
"--channel-name",
13-
default="ch_pgcachewatch_table_change",
16+
default=models.DEFAULT_PG_CHANNE,
1417
help=(
1518
"The PGNotify channel that will be used by pgcachewatch to listen "
1619
"for changes on tables, this should be uniq to pgcachewatch clients."
@@ -20,23 +23,24 @@ def cliparser() -> argparse.Namespace:
2023
"--function-name",
2124
default="fn_pgcachewatch_table_change",
2225
help=(
23-
"The name of postgres 'helper function' that emits the on change evnets. "
24-
"This must be uniq."
26+
"The prefix of the postgres 'helper function' that emits "
27+
"the on change evnets."
2528
),
2629
)
2730
common_arguments.add_argument(
2831
"--trigger-name",
29-
default="tg_pgcachewatch_table_change_",
32+
default="tg_pgcachewatch_table_change",
3033
help="All triggers installed on tables will start with this prefix.",
3134
)
3235
common_arguments.add_argument(
3336
"--commit",
3437
action="store_true",
35-
help="Commit changes to DB.",
38+
help="Commit changes to database.",
3639
)
3740

3841
parser = argparse.ArgumentParser(
3942
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
43+
prog="pgcachewatch",
4044
)
4145

4246
subparsers = parser.add_subparsers(dest="command", required=True)
@@ -60,29 +64,32 @@ def cliparser() -> argparse.Namespace:
6064
async def main() -> None:
6165
parsed = cliparser()
6266

67+
pg_fn_name = f"{parsed.function_name}_{parsed.channel_name}"
68+
pg_tg_name = f"{parsed.trigger_name}_{parsed.channel_name}"
69+
6370
match parsed.command:
6471
case "install":
65-
install = [
66-
queries.create_notify_function(
67-
channel_name=parsed.channel_name,
68-
function_name=parsed.function_name,
69-
)
70-
]
71-
72-
for table in parsed.tables:
73-
install.append(
72+
install = "\n".join(
73+
[
74+
queries.create_notify_function(
75+
channel_name=parsed.channel_name,
76+
function_name=pg_fn_name,
77+
)
78+
]
79+
+ [
7480
queries.create_after_change_trigger(
81+
trigger_name=pg_tg_name,
7582
table_name=table,
76-
channel_name=parsed.channel_name,
77-
function_name=parsed.function_name,
78-
trigger_name_prefix=parsed.trigger_name,
83+
function_name=pg_fn_name,
7984
)
80-
)
85+
for table in parsed.tables
86+
]
87+
)
88+
89+
print(install, flush=True)
8190

82-
combined = "\n".join(install)
83-
print(combined, flush=True)
8491
if parsed.commit:
85-
await (await asyncpg.connect()).execute(combined)
92+
await (await asyncpg.connect()).execute(install)
8693
else:
8794
print(
8895
"::: Use '--commit' to write changes to db. :::",
@@ -91,15 +98,15 @@ async def main() -> None:
9198

9299
case "uninstall":
93100
trigger_names = await (await asyncpg.connect()).fetch(
94-
queries.fetch_trigger_names(parsed.trigger_name),
101+
queries.fetch_trigger_names(pg_tg_name),
95102
)
96103
combined = "\n".join(
97104
(
98105
"\n".join(
99106
queries.drop_trigger(t["trigger_name"], t["table"])
100107
for t in trigger_names
101108
),
102-
queries.drop_function(parsed.function_name),
109+
queries.drop_function(pg_fn_name),
103110
)
104111
)
105112
print(combined, flush=True)

src/pgcachewatch/listeners.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ def __init__(
139139
async def connect(
140140
self,
141141
connection: asyncpg.Connection,
142-
channel: models.PGChannel = models.PGChannel("ch_pgcachewatch_table_change"),
142+
channel: models.PGChannel = models.DEFAULT_PG_CHANNE,
143143
) -> None:
144144
"""
145145
Asynchronously connects the PGEventQueue to a specified
@@ -205,7 +205,7 @@ def __init__(
205205
async def connect(
206206
self,
207207
ws: websockets.WebSocketClientProtocol,
208-
channel: models.PGChannel = models.PGChannel("ch_pgcachewatch_table_change"),
208+
channel: models.PGChannel = models.DEFAULT_PG_CHANNE,
209209
) -> None:
210210
async def _handler(ws: websockets.WebSocketClientProtocol) -> None:
211211
event_handler = create_event_inserter(self, self._max_latency)

src/pgcachewatch/models.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
str,
1616
)
1717

18+
DEFAULT_PG_CHANNE = PGChannel("ch_pgcachewatch_table_change")
19+
1820

1921
class DeadlineSetting(pydantic.BaseModel):
2022
"""

src/pgcachewatch/queries.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ def create_notify_function(
33
function_name: str,
44
) -> str:
55
return f"""
6-
CREATE OR REPLACE FUNCTION {function_name}_{channel_name}() RETURNS TRIGGER AS $$
6+
CREATE OR REPLACE FUNCTION {function_name}() RETURNS TRIGGER AS $$
77
BEGIN
88
PERFORM pg_notify(
99
'{channel_name}',
@@ -19,15 +19,14 @@ def create_notify_function(
1919

2020

2121
def create_after_change_trigger(
22+
trigger_name: str,
2223
table_name: str,
23-
channel_name: str,
2424
function_name: str,
25-
trigger_name_prefix: str,
2625
) -> str:
2726
return f"""
28-
CREATE OR REPLACE TRIGGER {trigger_name_prefix}{table_name}
27+
CREATE OR REPLACE TRIGGER {trigger_name}
2928
AFTER INSERT OR UPDATE OR DELETE OR TRUNCATE ON {table_name}
30-
EXECUTE FUNCTION {function_name}_{channel_name}();
29+
EXECUTE FUNCTION {function_name}();
3130
"""
3231

3332

0 commit comments

Comments
 (0)