Skip to content

Commit 842227d

Browse files
TuningAgent interface (#49)
**Summary**: basic `TuningAgent` interface. It provides a common interface between agents and the replay subsystem (which will be pushed in a later PR). **Demo**: Added test cases that pass. <img width="802" alt="Screenshot 2024-12-20 at 17 17 59" src="https://github.com/user-attachments/assets/5c2cf531-d9c8-4326-9e6d-dd519bbec261" /> **Details**: * The core of the interface is the `DBMSConfigDelta` class which represents the change to the DBMS's config after a single step of tuning. * Subclasses only need to override the `TuningAgent._step()` function. * The main functionality of the class is in automatically saving the deltas to files. * CWI's JOB dataset link also stopped working so I made my own: https://drive.google.com/uc?id=19m0zDpphAw0Bu9Irr_ta9EGr5k85hiN1.
1 parent c1f162d commit 842227d

File tree

7 files changed

+191
-24
lines changed

7 files changed

+191
-24
lines changed

benchmark/job/cli.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import click
44

55
from benchmark.constants import DEFAULT_SCALE_FACTOR
6-
from benchmark.job.load_info import JobLoadInfo
76
from util.log import DBGYM_LOGGER_NAME
87
from util.shell import subprocess_run
98
from util.workspace import (
@@ -13,7 +12,8 @@
1312
link_result,
1413
)
1514

16-
JOB_TABLES_URL = "https://homepages.cwi.nl/~boncz/job/imdb.tgz"
15+
# JOB_TABLES_URL = "https://homepages.cwi.nl/~boncz/job/imdb.tgz" # This link stopped working for me
16+
JOB_TABLES_URL = "https://drive.google.com/uc?id=19m0zDpphAw0Bu9Irr_ta9EGr5k85hiN1"
1717
JOB_QUERY_NAMES = [
1818
"1a",
1919
"1b",
@@ -177,7 +177,8 @@ def _download_job_data(dbgym_cfg: DBGymConfig) -> None:
177177

178178
logging.getLogger(DBGYM_LOGGER_NAME).info(f"Downloading: {expected_symlink_dpath}")
179179
real_data_path = dbgym_cfg.cur_task_runs_data_path(mkdir=True)
180-
subprocess_run(f"curl -O {JOB_TABLES_URL}", cwd=real_data_path)
180+
# subprocess_run(f"curl -O {JOB_TABLES_URL}", cwd=real_data_path) # This is if we're using a non-Google-Drive link
181+
subprocess_run(f"gdown {JOB_TABLES_URL}", cwd=real_data_path)
181182
job_data_dpath = dbgym_cfg.cur_task_runs_data_path(
182183
default_tables_dname(DEFAULT_SCALE_FACTOR), mkdir=True
183184
)

dependencies/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,3 +135,4 @@ Werkzeug==3.0.1
135135
wrapt==1.14.1
136136
zipp==3.17.0
137137
streamlit==1.39.0
138+
gdown==5.2.0

env/integtest_pg_conn.py

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
import copy
2-
import subprocess
32
import unittest
4-
from pathlib import Path
5-
6-
import yaml
73

4+
from env.integtest_util import IntegtestWorkspace
85
from env.pg_conn import PostgresConn
96
from util.pg import (
107
DEFAULT_POSTGRES_PORT,
@@ -19,27 +16,16 @@
1916
default_pristine_dbdata_snapshot_path,
2017
)
2118

22-
ENV_INTEGTESTS_DBGYM_CONFIG_FPATH = Path("env/env_integtests_dbgym_config.yaml")
2319
BENCHMARK = "tpch"
2420
SCALE_FACTOR = 0.01
2521

2622

27-
def get_unittest_workspace_path() -> Path:
28-
with open(ENV_INTEGTESTS_DBGYM_CONFIG_FPATH) as f:
29-
return Path(yaml.safe_load(f)["dbgym_workspace_path"])
30-
assert False
31-
32-
3323
class PostgresConnTests(unittest.TestCase):
3424
dbgym_cfg: DBGymConfig
3525

3626
@staticmethod
3727
def setUpClass() -> None:
38-
# If you're running the test locally, this check makes runs past the first one much faster.
39-
if not get_unittest_workspace_path().exists():
40-
subprocess.run(["./env/set_up_env_integtests.sh"], check=True)
41-
42-
PostgresConnTests.dbgym_cfg = DBGymConfig(ENV_INTEGTESTS_DBGYM_CONFIG_FPATH)
28+
IntegtestWorkspace.set_up_workspace()
4329

4430
def setUp(self) -> None:
4531
self.assertFalse(
@@ -48,19 +34,23 @@ def setUp(self) -> None:
4834
+ "to ensure this. Be careful about accidentally taking down other people's Postgres instances though.",
4935
)
5036
self.pristine_dbdata_snapshot_path = default_pristine_dbdata_snapshot_path(
51-
self.dbgym_cfg.dbgym_workspace_path, BENCHMARK, SCALE_FACTOR
37+
IntegtestWorkspace.get_dbgym_cfg().dbgym_workspace_path,
38+
BENCHMARK,
39+
SCALE_FACTOR,
5240
)
5341
self.dbdata_parent_dpath = default_dbdata_parent_dpath(
54-
self.dbgym_cfg.dbgym_workspace_path
42+
IntegtestWorkspace.get_dbgym_cfg().dbgym_workspace_path
43+
)
44+
self.pgbin_dpath = default_pgbin_path(
45+
IntegtestWorkspace.get_dbgym_cfg().dbgym_workspace_path
5546
)
56-
self.pgbin_dpath = default_pgbin_path(self.dbgym_cfg.dbgym_workspace_path)
5747

5848
def tearDown(self) -> None:
5949
self.assertFalse(get_is_postgres_running())
6050

6151
def create_pg_conn(self, pgport: int = DEFAULT_POSTGRES_PORT) -> PostgresConn:
6252
return PostgresConn(
63-
PostgresConnTests.dbgym_cfg,
53+
IntegtestWorkspace.get_dbgym_cfg(),
6454
pgport,
6555
self.pristine_dbdata_snapshot_path,
6656
self.dbdata_parent_dpath,

env/integtest_tuning_agent.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import unittest
2+
from typing import Any, Optional
3+
4+
from env.integtest_util import IntegtestWorkspace
5+
from env.tuning_agent import DBMSConfigDelta, TuningAgent
6+
7+
8+
class MockTuningAgent(TuningAgent):
9+
def __init__(self, *args: Any, **kwargs: Any) -> None:
10+
super().__init__(*args, **kwargs)
11+
self.config_to_return: Optional[DBMSConfigDelta] = None
12+
13+
def _step(self) -> DBMSConfigDelta:
14+
assert self.config_to_return is not None
15+
ret = self.config_to_return
16+
# Setting this ensures you must set self.config_to_return every time.
17+
self.config_to_return = None
18+
return ret
19+
20+
21+
class PostgresConnTests(unittest.TestCase):
22+
@staticmethod
23+
def setUpClass() -> None:
24+
IntegtestWorkspace.set_up_workspace()
25+
26+
@staticmethod
27+
def make_config(letter: str) -> DBMSConfigDelta:
28+
return DBMSConfigDelta([letter], {letter: letter}, {letter: [letter]})
29+
30+
def test_get_step_delta(self) -> None:
31+
agent = MockTuningAgent(IntegtestWorkspace.get_dbgym_cfg())
32+
33+
agent.config_to_return = PostgresConnTests.make_config("a")
34+
agent.step()
35+
agent.config_to_return = PostgresConnTests.make_config("b")
36+
agent.step()
37+
agent.config_to_return = PostgresConnTests.make_config("c")
38+
agent.step()
39+
40+
self.assertEqual(agent.get_step_delta(1), PostgresConnTests.make_config("b"))
41+
self.assertEqual(agent.get_step_delta(0), PostgresConnTests.make_config("a"))
42+
self.assertEqual(agent.get_step_delta(1), PostgresConnTests.make_config("b"))
43+
self.assertEqual(agent.get_step_delta(2), PostgresConnTests.make_config("c"))
44+
45+
def test_get_all_deltas(self) -> None:
46+
agent = MockTuningAgent(IntegtestWorkspace.get_dbgym_cfg())
47+
48+
agent.config_to_return = PostgresConnTests.make_config("a")
49+
agent.step()
50+
agent.config_to_return = PostgresConnTests.make_config("b")
51+
agent.step()
52+
agent.config_to_return = PostgresConnTests.make_config("c")
53+
agent.step()
54+
55+
self.assertEqual(
56+
agent.get_all_deltas(),
57+
[
58+
PostgresConnTests.make_config("a"),
59+
PostgresConnTests.make_config("b"),
60+
PostgresConnTests.make_config("c"),
61+
],
62+
)
63+
64+
65+
if __name__ == "__main__":
66+
unittest.main()

env/integtest_util.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import subprocess
2+
from pathlib import Path
3+
from typing import Optional
4+
5+
import yaml
6+
7+
from util.workspace import DBGymConfig
8+
9+
10+
class IntegtestWorkspace:
11+
"""
12+
This is essentially a singleton class. This avoids multiple integtest_*.py files creating
13+
the workspace and/or the DBGymConfig redundantly.
14+
"""
15+
16+
ENV_INTEGTESTS_DBGYM_CONFIG_FPATH = Path("env/env_integtests_dbgym_config.yaml")
17+
INTEGTEST_DBGYM_CFG: Optional[DBGymConfig] = None
18+
19+
@staticmethod
20+
def set_up_workspace() -> None:
21+
# This if statement prevents us from setting up the workspace twice, which saves time.
22+
if not IntegtestWorkspace.get_workspace_path().exists():
23+
subprocess.run(["./env/set_up_env_integtests.sh"], check=True)
24+
25+
# Once we get here, we have an invariant that the workspace exists. We need this
26+
# invariant to be true in order to create the DBGymConfig.
27+
#
28+
# However, it also can't be created more than once so we need to check `is None`.
29+
if IntegtestWorkspace.INTEGTEST_DBGYM_CFG is None:
30+
IntegtestWorkspace.INTEGTEST_DBGYM_CFG = DBGymConfig(
31+
IntegtestWorkspace.ENV_INTEGTESTS_DBGYM_CONFIG_FPATH
32+
)
33+
34+
@staticmethod
35+
def get_dbgym_cfg() -> DBGymConfig:
36+
assert IntegtestWorkspace.INTEGTEST_DBGYM_CFG is not None
37+
return IntegtestWorkspace.INTEGTEST_DBGYM_CFG
38+
39+
@staticmethod
40+
def get_workspace_path() -> Path:
41+
with open(IntegtestWorkspace.ENV_INTEGTESTS_DBGYM_CONFIG_FPATH) as f:
42+
return Path(yaml.safe_load(f)["dbgym_workspace_path"])
43+
assert False

env/tuning_agent.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import json
2+
from dataclasses import asdict, dataclass
3+
from pathlib import Path
4+
5+
from util.workspace import DBGymConfig
6+
7+
8+
@dataclass
9+
class DBMSConfigDelta:
10+
"""
11+
This class represents a DBMS config delta. A "DBMS config" is the indexes, system knobs,
12+
and query knobs set by the tuning agent. A "delta" is the change from the prior config.
13+
14+
`indexes` contains a list of SQL statements for creating indexes. Note that since it's a
15+
config delta, it might contain "DROP ..." statements.
16+
17+
`sysknobs` contains a mapping from knob names to their values.
18+
19+
`qknobs` contains a mapping from query IDs to a list of knobs. Each list contains knobs
20+
to prepend to the start of the query. The knobs are a list[str] instead of a dict[str, str]
21+
because knobs can be settings ("SET (enable_sort on)") or flags ("IndexOnlyScan(it)").
22+
"""
23+
24+
indexes: list[str]
25+
sysknobs: dict[str, str]
26+
qknobs: dict[str, list[str]]
27+
28+
29+
class TuningAgent:
30+
def __init__(self, dbgym_cfg: DBGymConfig) -> None:
31+
self.dbgym_cfg = dbgym_cfg
32+
self.dbms_cfg_deltas_dpath = self.dbgym_cfg.cur_task_runs_artifacts_path(
33+
"dbms_cfg_deltas", mkdir=True
34+
)
35+
self.next_step_num = 0
36+
37+
def step(self) -> None:
38+
"""
39+
This wraps _step() and saves the cfg to a file so that it can be replayed.
40+
"""
41+
curr_step_num = self.next_step_num
42+
self.next_step_num += 1
43+
dbms_cfg_delta = self._step()
44+
with self.get_step_delta_fpath(curr_step_num).open("w") as f:
45+
json.dump(asdict(dbms_cfg_delta), f)
46+
47+
def get_step_delta_fpath(self, step_num: int) -> Path:
48+
return self.dbms_cfg_deltas_dpath / f"step{step_num}_delta.json"
49+
50+
# Subclasses should override this function.
51+
def _step(self) -> DBMSConfigDelta:
52+
"""
53+
This should be overridden by subclasses.
54+
55+
This should return the delta in the config caused by this step.
56+
"""
57+
raise NotImplementedError
58+
59+
def get_step_delta(self, step_num: int) -> DBMSConfigDelta:
60+
assert step_num >= 0 and step_num < self.next_step_num
61+
with self.get_step_delta_fpath(step_num).open("r") as f:
62+
return DBMSConfigDelta(**json.load(f))
63+
assert False
64+
65+
def get_all_deltas(self) -> list[DBMSConfigDelta]:
66+
return [self.get_step_delta(step_num) for step_num in range(self.next_step_num)]

util/workspace.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from datetime import datetime
1111
from enum import Enum
1212
from pathlib import Path
13-
from typing import IO, Any, Callable, Optional, Tuple
13+
from typing import IO, Any, Callable, Optional
1414

1515
import redis
1616
import yaml

0 commit comments

Comments
 (0)