Skip to content

Commit 94960c2

Browse files
authored
DBOS Client should not run migrations (#343)
This PR makes sure that DBOS Client doesn't run migrations on either the system database or the application database. Added a test.
1 parent d7ee4de commit 94960c2

File tree

7 files changed

+119
-83
lines changed

7 files changed

+119
-83
lines changed

dbos/_app_db.py

Lines changed: 48 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from . import _serialization
1010
from ._error import DBOSUnexpectedStepError, DBOSWorkflowConflictIDError
11+
from ._logger import dbos_logger
1112
from ._schemas.application_database import ApplicationSchema
1213
from ._sys_db import StepInfo
1314

@@ -39,62 +40,68 @@ def __init__(
3940
):
4041
app_db_url = sa.make_url(database_url).set(drivername="postgresql+psycopg")
4142

42-
# If the application database does not already exist, create it
43-
if not debug_mode:
44-
postgres_db_engine = sa.create_engine(
45-
app_db_url.set(database="postgres"),
46-
**engine_kwargs,
47-
)
48-
with postgres_db_engine.connect() as conn:
49-
conn.execution_options(isolation_level="AUTOCOMMIT")
50-
if not conn.execute(
51-
sa.text("SELECT 1 FROM pg_database WHERE datname=:db_name"),
52-
parameters={"db_name": app_db_url.database},
53-
).scalar():
54-
conn.execute(sa.text(f"CREATE DATABASE {app_db_url.database}"))
55-
postgres_db_engine.dispose()
56-
5743
if engine_kwargs is None:
5844
engine_kwargs = {}
5945

6046
self.engine = sa.create_engine(
6147
app_db_url,
6248
**engine_kwargs,
6349
)
50+
self._engine_kwargs = engine_kwargs
6451
self.sessionmaker = sessionmaker(bind=self.engine)
6552
self.debug_mode = debug_mode
6653

54+
def run_migrations(self) -> None:
55+
if self.debug_mode:
56+
dbos_logger.warning(
57+
"Application database migrations are skipped in debug mode."
58+
)
59+
return
60+
# Check if the database exists
61+
app_db_url = self.engine.url
62+
postgres_db_engine = sa.create_engine(
63+
app_db_url.set(database="postgres"),
64+
**self._engine_kwargs,
65+
)
66+
with postgres_db_engine.connect() as conn:
67+
conn.execution_options(isolation_level="AUTOCOMMIT")
68+
if not conn.execute(
69+
sa.text("SELECT 1 FROM pg_database WHERE datname=:db_name"),
70+
parameters={"db_name": app_db_url.database},
71+
).scalar():
72+
conn.execute(sa.text(f"CREATE DATABASE {app_db_url.database}"))
73+
postgres_db_engine.dispose()
74+
6775
# Create the dbos schema and transaction_outputs table in the application database
68-
if not debug_mode:
69-
with self.engine.begin() as conn:
70-
schema_creation_query = sa.text(
71-
f"CREATE SCHEMA IF NOT EXISTS {ApplicationSchema.schema}"
72-
)
73-
conn.execute(schema_creation_query)
76+
with self.engine.begin() as conn:
77+
schema_creation_query = sa.text(
78+
f"CREATE SCHEMA IF NOT EXISTS {ApplicationSchema.schema}"
79+
)
80+
conn.execute(schema_creation_query)
7481

75-
inspector = inspect(self.engine)
76-
if not inspector.has_table(
82+
inspector = inspect(self.engine)
83+
if not inspector.has_table(
84+
"transaction_outputs", schema=ApplicationSchema.schema
85+
):
86+
ApplicationSchema.metadata_obj.create_all(self.engine)
87+
else:
88+
columns = inspector.get_columns(
7789
"transaction_outputs", schema=ApplicationSchema.schema
78-
):
79-
ApplicationSchema.metadata_obj.create_all(self.engine)
80-
else:
81-
columns = inspector.get_columns(
82-
"transaction_outputs", schema=ApplicationSchema.schema
83-
)
84-
column_names = [col["name"] for col in columns]
90+
)
91+
column_names = [col["name"] for col in columns]
8592

86-
if "function_name" not in column_names:
87-
# Column missing, alter table to add it
88-
with self.engine.connect() as conn:
89-
conn.execute(
90-
text(
91-
f"""
92-
ALTER TABLE {ApplicationSchema.schema}.transaction_outputs
93-
ADD COLUMN function_name TEXT NOT NULL DEFAULT '';
94-
"""
95-
)
93+
if "function_name" not in column_names:
94+
# Column missing, alter table to add it
95+
with self.engine.connect() as conn:
96+
conn.execute(
97+
text(
98+
f"""
99+
ALTER TABLE {ApplicationSchema.schema}.transaction_outputs
100+
ADD COLUMN function_name TEXT NOT NULL DEFAULT '';
101+
"""
96102
)
97-
conn.commit()
103+
)
104+
conn.commit()
98105

99106
def destroy(self) -> None:
100107
self.engine.dispose()

dbos/_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ async def get_status(self) -> WorkflowStatus:
9999

100100
class DBOSClient:
101101
def __init__(self, database_url: str, *, system_database: Optional[str] = None):
102+
# We only create database connections but do not run migrations
102103
self._sys_db = SystemDatabase(
103104
database_url=database_url,
104105
engine_kwargs={

dbos/_dbos.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,10 @@ def _launch(self, *, debug_mode: bool = False) -> None:
433433
if debug_mode:
434434
return
435435

436+
# Run migrations for the system and application databases
437+
self._sys_db.run_migrations()
438+
self._app_db.run_migrations()
439+
436440
admin_port = self._config.get("runtimeConfig", {}).get("admin_port")
437441
if admin_port is None:
438442
admin_port = 3001

dbos/_sys_db.py

Lines changed: 48 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -241,34 +241,63 @@ def __init__(
241241
sysdb_name = system_db_url.database + SystemSchema.sysdb_suffix
242242
system_db_url = system_db_url.set(database=sysdb_name)
243243

244-
if not debug_mode:
245-
# If the system database does not already exist, create it
246-
engine = sa.create_engine(
247-
system_db_url.set(database="postgres"), **engine_kwargs
248-
)
249-
with engine.connect() as conn:
250-
conn.execution_options(isolation_level="AUTOCOMMIT")
251-
if not conn.execute(
252-
sa.text("SELECT 1 FROM pg_database WHERE datname=:db_name"),
253-
parameters={"db_name": sysdb_name},
254-
).scalar():
255-
dbos_logger.info(f"Creating system database {sysdb_name}")
256-
conn.execute(sa.text(f"CREATE DATABASE {sysdb_name}"))
257-
engine.dispose()
258-
259244
self.engine = sa.create_engine(
260245
system_db_url,
261246
**engine_kwargs,
262247
)
248+
self._engine_kwargs = engine_kwargs
249+
250+
self.notification_conn: Optional[psycopg.connection.Connection] = None
251+
self.notifications_map: Dict[str, threading.Condition] = {}
252+
self.workflow_events_map: Dict[str, threading.Condition] = {}
253+
254+
# Now we can run background processes
255+
self._run_background_processes = True
256+
self._debug_mode = debug_mode
257+
258+
# Run migrations
259+
def run_migrations(self) -> None:
260+
if self._debug_mode:
261+
dbos_logger.warning("System database migrations are skipped in debug mode.")
262+
return
263+
system_db_url = self.engine.url
264+
sysdb_name = system_db_url.database
265+
# If the system database does not already exist, create it
266+
engine = sa.create_engine(
267+
system_db_url.set(database="postgres"), **self._engine_kwargs
268+
)
269+
with engine.connect() as conn:
270+
conn.execution_options(isolation_level="AUTOCOMMIT")
271+
if not conn.execute(
272+
sa.text("SELECT 1 FROM pg_database WHERE datname=:db_name"),
273+
parameters={"db_name": sysdb_name},
274+
).scalar():
275+
dbos_logger.info(f"Creating system database {sysdb_name}")
276+
conn.execute(sa.text(f"CREATE DATABASE {sysdb_name}"))
277+
engine.dispose()
263278

264279
# Run a schema migration for the system database
265-
if not debug_mode:
266-
migration_dir = os.path.join(
267-
os.path.dirname(os.path.realpath(__file__)), "_migrations"
280+
migration_dir = os.path.join(
281+
os.path.dirname(os.path.realpath(__file__)), "_migrations"
282+
)
283+
alembic_cfg = Config()
284+
alembic_cfg.set_main_option("script_location", migration_dir)
285+
logging.getLogger("alembic").setLevel(logging.WARNING)
286+
# Alembic requires the % in URL-escaped parameters to itself be escaped to %%.
287+
escaped_conn_string = re.sub(
288+
r"%(?=[0-9A-Fa-f]{2})",
289+
"%%",
290+
self.engine.url.render_as_string(hide_password=False),
291+
)
292+
alembic_cfg.set_main_option("sqlalchemy.url", escaped_conn_string)
293+
try:
294+
command.upgrade(alembic_cfg, "head")
295+
except Exception as e:
296+
dbos_logger.warning(
297+
f"Exception during system database construction. This is most likely because the system database was configured using a later version of DBOS: {e}"
268298
)
269299
alembic_cfg = Config()
270300
alembic_cfg.set_main_option("script_location", migration_dir)
271-
logging.getLogger("alembic").setLevel(logging.WARNING)
272301
# Alembic requires the % in URL-escaped parameters to itself be escaped to %%.
273302
escaped_conn_string = re.sub(
274303
r"%(?=[0-9A-Fa-f]{2})",
@@ -282,29 +311,6 @@ def __init__(
282311
dbos_logger.warning(
283312
f"Exception during system database construction. This is most likely because the system database was configured using a later version of DBOS: {e}"
284313
)
285-
alembic_cfg = Config()
286-
alembic_cfg.set_main_option("script_location", migration_dir)
287-
# Alembic requires the % in URL-escaped parameters to itself be escaped to %%.
288-
escaped_conn_string = re.sub(
289-
r"%(?=[0-9A-Fa-f]{2})",
290-
"%%",
291-
self.engine.url.render_as_string(hide_password=False),
292-
)
293-
alembic_cfg.set_main_option("sqlalchemy.url", escaped_conn_string)
294-
try:
295-
command.upgrade(alembic_cfg, "head")
296-
except Exception as e:
297-
dbos_logger.warning(
298-
f"Exception during system database construction. This is most likely because the system database was configured using a later version of DBOS: {e}"
299-
)
300-
301-
self.notification_conn: Optional[psycopg.connection.Connection] = None
302-
self.notifications_map: Dict[str, threading.Condition] = {}
303-
self.workflow_events_map: Dict[str, threading.Condition] = {}
304-
305-
# Now we can run background processes
306-
self._run_background_processes = True
307-
self._debug_mode = debug_mode
308314

309315
# Destroy the pool when finished
310316
def destroy(self) -> None:

dbos/cli/cli.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,8 @@ def migrate(
258258
"pool_size": 2,
259259
},
260260
)
261+
sys_db.run_migrations()
262+
app_db.run_migrations()
261263
except Exception as e:
262264
typer.echo(f"DBOS system schema migration failed: {e}")
263265
finally:

tests/conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def sys_db(config: DBOSConfig) -> Generator[SystemDatabase, Any, None]:
4949
"connect_args": {"connect_timeout": 30},
5050
},
5151
)
52+
sys_db.run_migrations()
5253
yield sys_db
5354
sys_db.destroy()
5455

@@ -65,6 +66,7 @@ def app_db(config: DBOSConfig) -> Generator[ApplicationDatabase, Any, None]:
6566
"connect_args": {"connect_timeout": 30},
6667
},
6768
)
69+
app_db.run_migrations()
6870
yield app_db
6971
app_db.destroy()
7072

tests/test_client.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,20 @@ def run_client_collateral() -> None:
3131
runpy.run_path(filename)
3232

3333

34+
def test_client_no_migrate(dbos: DBOS, config: DBOSConfig) -> None:
35+
# Drop the system database
36+
DBOS.destroy()
37+
DBOS(config=config)
38+
DBOS.reset_system_database()
39+
40+
# The client should not be able to connect to the system database
41+
with pytest.raises(Exception) as exc_info:
42+
assert config["database_url"] is not None
43+
client = DBOSClient(config["database_url"])
44+
client.list_workflows()
45+
assert f'database "dbostestpy_dbos_sys" does not exist' in str(exc_info.value)
46+
47+
3448
def test_client_enqueue_and_get_result(dbos: DBOS, client: DBOSClient) -> None:
3549
run_client_collateral()
3650

0 commit comments

Comments
 (0)