|
1 | 1 | import asyncio
|
2 | 2 | import collections
|
| 3 | +import datetime |
3 | 4 |
|
4 | 5 | import asyncpg
|
5 | 6 | import pytest
|
6 | 7 | from pgcachewatch import cli, decorators, listeners, models, strategies
|
7 | 8 |
|
8 | 9 |
|
| 10 | +def utcnow() -> datetime.datetime: |
| 11 | + return datetime.datetime.now(tz=datetime.timezone.utc) |
| 12 | + |
| 13 | + |
9 | 14 | async def test_1_install_triggers(
|
10 | 15 | monkeypatch: pytest.MonkeyPatch,
|
11 | 16 | pgconn: asyncpg.Connection,
|
@@ -54,7 +59,106 @@ async def fetch_sysconf() -> list:
|
54 | 59 | assert statistics["hit"] == N - 1
|
55 | 60 |
|
56 | 61 |
|
57 |
| -async def test_3_uninstall_triggers( |
| 62 | +async def test_3_cache_invalidation_update( |
| 63 | + pgconn: asyncpg.Connection, |
| 64 | + pgpool: asyncpg.Pool, |
| 65 | +) -> None: |
| 66 | + statistics = collections.Counter[str]() |
| 67 | + listener = await listeners.PGEventQueue.create( |
| 68 | + models.PGChannel("ch_pgcachewatch_table_change"), |
| 69 | + pgconn=pgconn, |
| 70 | + ) |
| 71 | + |
| 72 | + @decorators.cache( |
| 73 | + strategy=strategies.Gready(listener=listener), |
| 74 | + statistics_callback=lambda x: statistics.update([x]), |
| 75 | + ) |
| 76 | + async def fetch_sysconf() -> list: |
| 77 | + return await pgpool.fetch("SELECT * FROM sysconf") |
| 78 | + |
| 79 | + async def blast() -> list: |
| 80 | + before = await fetch_sysconf() |
| 81 | + while (rv := await fetch_sysconf()) == before: |
| 82 | + await asyncio.sleep(0.001) |
| 83 | + return rv |
| 84 | + |
| 85 | + blast_task = asyncio.create_task(blast()) |
| 86 | + await pgpool.execute( |
| 87 | + "UPDATE sysconf set value = $1 where key = 'updated_at'", |
| 88 | + utcnow().isoformat(), |
| 89 | + ) |
| 90 | + await asyncio.wait_for(blast_task, 1) |
| 91 | + # First fetch and update |
| 92 | + assert statistics["miss"] == 2 |
| 93 | + |
| 94 | + |
| 95 | +async def test_3_cache_invalidation_insert( |
| 96 | + pgconn: asyncpg.Connection, |
| 97 | + pgpool: asyncpg.Pool, |
| 98 | +) -> None: |
| 99 | + statistics = collections.Counter[str]() |
| 100 | + listener = await listeners.PGEventQueue.create( |
| 101 | + models.PGChannel("ch_pgcachewatch_table_change"), |
| 102 | + pgconn=pgconn, |
| 103 | + ) |
| 104 | + |
| 105 | + @decorators.cache( |
| 106 | + strategy=strategies.Gready(listener=listener), |
| 107 | + statistics_callback=lambda x: statistics.update([x]), |
| 108 | + ) |
| 109 | + async def fetch_sysconf() -> list: |
| 110 | + return await pgpool.fetch("SELECT * FROM sysconf") |
| 111 | + |
| 112 | + async def blast() -> list: |
| 113 | + before = await fetch_sysconf() |
| 114 | + while (rv := await fetch_sysconf()) == before: |
| 115 | + await asyncio.sleep(0.001) |
| 116 | + return rv |
| 117 | + |
| 118 | + blast_task = asyncio.create_task(blast()) |
| 119 | + await pgpool.execute( |
| 120 | + "INSERT INTO sysconf (key, value) VALUES ($1, $2);", |
| 121 | + utcnow().isoformat(), |
| 122 | + utcnow().isoformat(), |
| 123 | + ) |
| 124 | + await asyncio.wait_for(blast_task, 1) |
| 125 | + # First fetch and insert |
| 126 | + assert statistics["miss"] == 2 |
| 127 | + |
| 128 | + |
| 129 | +async def test_3_cache_invalidation_delete( |
| 130 | + pgconn: asyncpg.Connection, |
| 131 | + pgpool: asyncpg.Pool, |
| 132 | +) -> None: |
| 133 | + statistics = collections.Counter[str]() |
| 134 | + listener = await listeners.PGEventQueue.create( |
| 135 | + models.PGChannel("ch_pgcachewatch_table_change"), |
| 136 | + pgconn=pgconn, |
| 137 | + ) |
| 138 | + |
| 139 | + @decorators.cache( |
| 140 | + strategy=strategies.Gready(listener=listener), |
| 141 | + statistics_callback=lambda x: statistics.update([x]), |
| 142 | + ) |
| 143 | + async def fetch_sysconf() -> list: |
| 144 | + return await pgpool.fetch("SELECT * FROM sysconf") |
| 145 | + |
| 146 | + async def blast() -> list: |
| 147 | + before = await fetch_sysconf() |
| 148 | + while (rv := await fetch_sysconf()) == before: |
| 149 | + await asyncio.sleep(0.001) |
| 150 | + return rv |
| 151 | + |
| 152 | + blast_task = asyncio.create_task(blast()) |
| 153 | + await pgpool.execute( |
| 154 | + "DELETE FROM sysconf WHERE key ~ '^\\d{4}-\\d{2}-\\d{2}';", |
| 155 | + ) |
| 156 | + await asyncio.wait_for(blast_task, 1) |
| 157 | + # First fetch and insert |
| 158 | + assert statistics["miss"] == 2 |
| 159 | + |
| 160 | + |
| 161 | +async def test_4_uninstall_triggers( |
58 | 162 | monkeypatch: pytest.MonkeyPatch,
|
59 | 163 | pgconn: asyncpg.Connection,
|
60 | 164 | ) -> None:
|
|
0 commit comments