Skip to content

Testing with Dagster dg #97

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .devcontainer/dagster-testing/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM mcr.microsoft.com/devcontainers/python:0-3.11-bullseye
ENV PYTHONUNBUFFERED 1

COPY --from=ghcr.io/astral-sh/uv:0.4.7 /uv /bin/uv
COPY --from=ghcr.io/astral-sh/uv:0.6.10 /uv /bin/uv

COPY dagster_university/dagster_testing/pyproject.toml .
RUN uv pip install -r pyproject.toml --system
29 changes: 15 additions & 14 deletions course/pages/dagster-testing/lesson-2/1-set-up-local.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ lesson: '2'

# Set up local

This will set up Dagster for you local machine. If you would prefer to do this course in Github Codespaces, please follow [that guide](/dagster-testing/lesson-2/2-set-up-codespace).

- **To install git.** Refer to the [Git documentation](https://github.com/git-guides/install-git) if you don’t have this installed.
- **To have Python installed.** Dagster supports Python 3.9 - 3.12.
- **To install a package manager**. To manage the python packages, we recommend [`uv`]((https://docs.astral.sh/uv/)) which Dagster uses internally.
Expand Down Expand Up @@ -34,38 +36,37 @@ After cloning the Dagster University project, you’ll want to navigate to speci
cd dagster_university/dagster_testing
```

## Install the dependencies
## Install uv and dg

**uv**
Now we want to install `dg`. This is the command line interface that makes it easy to interact with Dagster. Throughout the course we will use `dg` to scaffold our project and streamline the development process.

To install the python dependencies with [uv](https://docs.astral.sh/uv/).
In order to best use `dg` we will need the Python package manager [`uv`](https://docs.astral.sh/uv/). `uv` will allow us to install `dg` globally and more easily build our virtual environments.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for your awareness, we hope to get dg available via homebrew and curl.

I actually already did the homebrew part but it's not being updated automatically via CI, so not recommended yet.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can create a partial to share across courses for this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. These steps should get reviewed and standardized when we get closer on dg

https://linear.app/dagster-labs/issue/DR-1058/standardize-dagster-u-install-instructions


If you do not have `uv` instead already, you can do so with:
```bash
uv sync
brew install uv
```

This will create a virtual environment that you can now use.

Now you can use `uv` to install `dg` globally:
```bash
source .venv/bin/activate
uv tool install dagster-dg
```

**pip**

Create the virtual environment.
## Install the dependencies

With `uv` and `dg` set, we can create the virtual environment specific to this course. All of the dependencies are maintained in the `pyproject.toml` (you will not need to edit anything in that project for this course). To create the virtual environment, run:
```bash
python3 -m venv .venv
uv sync
```

Enter the virtual environment.
This will create a virtual environment and install all the necessary dependencies. To activate this virtual environment:

```bash
source .venv/bin/activate
```

Install the packages.
To ensure everything is working you can launch the Dagster UI.

```bash
pip install -e ".[dev]"
dg dev
```
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ cd dagster_university/dagster_testing
To ensure everything is working you can launch the Dagster UI.

```bash
dagster dev
dg dev
```

After Dagster starts running you will be prompted to open the Dagster UI within your browser. Click "Open in Browser".
Expand Down
2 changes: 1 addition & 1 deletion course/pages/dagster-testing/lesson-3/1-unit-tests.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Dagster assets are good candidates for unit tests. Since an asset is responsible
We will begin with the following asset:

```python
# /dagster_testing/assets/lesson_3.py
# /dagster_testing/defs/assets/lesson_3.py
@dg.asset
def state_population_file() -> list[dict]:
file_path = Path(__file__).absolute().parent / "../data/ny.csv"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Most Dagster asset graphs contain multiple assets that depend on the output of o
We will add an additional asset downstream of `state_population_file` that takes in its output:

```python
# /dagster_testing/assets/lesson_3.py
# /dagster_testing/defs/assets/lesson_3.py
@dg.asset
def total_population(state_population_file: list[dict]) -> int:
return sum([int(x["Population"]) for x in state_population_file])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Some assets in Dagster pipelines may take in parameters defined outside of asset
If we think about the `state_population_file` it can currently only parse a single file. Let's create a new asset called `state_population_file_config` with a run configuration. This asset will be able to process any file:

```python
# /dagster_testing/assets/lesson_3.py
# /dagster_testing/defs/assets/lesson_3.py
class FilepathConfig(dg.Config):
path: str

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ lesson: '3'
In standard Python, a function does not need to match its type annotation in order to execute properly. For example:

```python
# /dagster_testing/assets/lesson_3.py
# /dagster_testing/defs/assets/lesson_3.py
def func_wrong_type() -> str:
return 2
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ If your assets do not access any of the context APIs, you will not need to worry
However if we rewrite the `state_population_file` asset to include context logging, we will need to update our tests:

```python
# /dagster_testing/assets/lesson_3.py
# /dagster_testing/defs/assets/lesson_3.py
@dg.asset()
def state_population_file_logging(context: dg.AssetExecutionContext) -> list[dict]:
file_path = Path(__file__).absolute().parent / "../data/ny.csv"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Imagine that an API exists to query city populations by state (sadly this API do
We want to rewrite the `state_population_file` asset to use this endpoint instead of reading a file to retrieve the necessary data. This is what the new asset will look like.

```python
# /dagster_testing/assets/lesson_4.py
# /dagster_testing/defs/assets/lesson_4.py
API_URL = "https://fake.com/population.json"


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Resources are objects that provide access to external systems, databases, or ser
We can refactor the API asset code into a resource. A resource is just a class that inherits from `dg.ConfigurableResource`. It can have any number of methods which assets can use. This resource will only include a single method for `get_cities`.

```python
# /dagster_testing/assets/lesson_4.py
# /dagster_testing/defs/assets/lesson_4.py
class StatePopulation(dg.ConfigurableResource):
def get_cities(self, state: str) -> list[dict]:
output = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ lesson: '4'
When we discussed unit tests we showed how you can execute one or more assets together using `dg.materialize()`. We can still materialize our assets this way using mocks.

```python
# /dagster_testing/assets/lesson_4.py
# /dagster_testing/defs/assets/lesson_4.py
@patch("requests.get")
def test_state_population_api_assets(mock_get, example_response, api_output):
mock_response = Mock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Assume a table exists in our production data warehouse, `data.city_population`,
We will hardcode our asset to look for the cities in New York and return the results from Snowflake using a resource.

```python
# /dagster_testing/assets/lesson_5.py
# /dagster_testing/defs/assets/lesson_5.py
@dg.asset
def state_population_database(database: SnowflakeResource) -> list[tuple]:
query = """
Expand Down
2 changes: 1 addition & 1 deletion course/pages/dagster-testing/lesson-6/1-asset-checks.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ When the asset runs, we can see that its associated asset checks also run and va
To define an asset check we first need an asset. `total_population` is a slightly modified version of the asset we have used throughout the course. Now it will take in the output of several assets and sums their populations.

```python
# /dagster_testing/assets/lesson_6.py
# /dagster_testing/defs/assets/lesson_6.py
@dg.asset
def total_population(
state_population_file_config: list[dict],
Expand Down
4 changes: 3 additions & 1 deletion course/pages/dagster-testing/lesson-6/2-definitions.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ lesson: '6'

# Definitions

Within your Dagster project the most important object is the definition. This defines all the objects that will deployed into your code location. Because of its importance we will want to write a test for it.
Within your Dagster project the most important object is the `Definitions`. This defines all the objects that will deployed into your code location. If you are using `dg` you may already be in the habit of checking to ensure your `Definitions` is valid by running `dg check defs`.

This is a great habit and you can build out workflows (such as precommit hooks) to always run that check. But it is also good to get in the habit of writing a specific test for this to live alongside your other Dagster tests.

Luckily this is a very easy test to write.

Expand Down
4 changes: 2 additions & 2 deletions course/pages/dagster-testing/lesson-6/3-dagster-objects.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ In order to write a reliable test for this sensor, we will go back to our lesson
First we can set a test where our sensor will skip.

```python
@patch("dagster_testing.sensors.check_for_new_files", return_value=[])
@patch("dagster_testing.defs.sensors.check_for_new_files", return_value=[])
def test_sensor_skip(mock_check_new_files):
instance = dg.DagsterInstance.ephemeral()
context = dg.build_sensor_context(instance=instance)
Expand All @@ -152,7 +152,7 @@ What would it look like to write a test to ensure the sensor picks up a new file

```python {% obfuscated="true" %}
@patch(
"dagster_testing.sensors.check_for_new_files",
"dagster_testing.defs.sensors.check_for_new_files",
return_value=["test_file"],
)
def test_sensor_run(mock_check_new_files):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,5 @@
import dagster as dg

import dagster_testing.jobs as jobs
import dagster_testing.resources as resources
import dagster_testing.schedules as schedules
import dagster_testing.sensors as sensors
from dagster_testing.assets import lesson_3, lesson_4, lesson_5, lesson_6
import dagster_testing.defs

lesson_3_assets = dg.load_assets_from_modules([lesson_3])
lesson_4_assets = dg.load_assets_from_modules([lesson_4])
lesson_5_assets = dg.load_assets_from_modules([lesson_5])
lesson_6_assets = dg.load_assets_from_modules([lesson_6])


defs = dg.Definitions(
assets=lesson_3_assets + lesson_4_assets + lesson_5_assets + lesson_6_assets,
asset_checks=[lesson_6.non_negative],
jobs=[jobs.my_job, jobs.my_job_configured],
resources={
"state_population_resource": resources.StatePopulation(),
"database": dg.ResourceDefinition.mock_resource(),
},
schedules=[schedules.my_schedule],
sensors=[sensors.my_sensor],
)
defs = dg.components.load_defs(dagster_testing.defs)
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import dagster as dg

import dagster_testing.resources as resources
import dagster_testing.defs.resources as resources


class FilepathConfig(dg.Config):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import dagster as dg
import yaml

from dagster_testing.assets import lesson_6
from dagster_testing.defs.assets import lesson_6

my_job = dg.define_asset_job(
name="jobs",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,11 @@ def get_cities(self, state: str) -> list[dict]:
"Population": 269840,
},
]


defs = dg.Definitions(
resources={
"state_population_resource": StatePopulation(),
"database": dg.ResourceDefinition.mock_resource(),
},
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import dagster as dg

import dagster_testing.jobs as jobs
from dagster_testing.assets import lesson_6
import dagster_testing.defs.jobs as jobs
from dagster_testing.defs.assets import lesson_6

my_schedule = dg.ScheduleDefinition(
name="my_schedule",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import dagster as dg

import dagster_testing.jobs as jobs
import dagster_testing.defs.jobs as jobs


def check_for_new_files() -> list[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import yaml
from dagster._core.errors import DagsterTypeCheckDidNotPass

from dagster_testing.assets import lesson_3
from dagster_testing.defs.assets import lesson_3


@pytest.fixture()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import dagster as dg
import pytest

from dagster_testing.assets import lesson_4
from dagster_testing.defs.assets import lesson_4


@pytest.fixture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest
from dagster_snowflake import SnowflakeResource

from dagster_testing.assets import lesson_5
from dagster_testing.defs.assets import lesson_5

from ..fixtures import docker_compose # noqa: F401

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
import dagster as dg
import pytest

import dagster_testing.jobs as jobs
import dagster_testing.resources as resources
import dagster_testing.schedules as schedules
import dagster_testing.sensors as sensors
from dagster_testing.assets import lesson_6
import dagster_testing.defs.jobs as jobs
import dagster_testing.defs.resources as resources
import dagster_testing.defs.schedules as schedules
import dagster_testing.defs.sensors as sensors
from dagster_testing.definitions import defs
from dagster_testing.defs.assets import lesson_6


@pytest.fixture()
Expand Down Expand Up @@ -128,15 +128,15 @@ def test_sensors():
assert sensors.my_sensor


@patch("dagster_testing.sensors.check_for_new_files", return_value=[])
@patch("dagster_testing.defs.sensors.check_for_new_files", return_value=[])
def test_sensor_skip(mock_check_new_files):
instance = dg.DagsterInstance.ephemeral()
context = dg.build_sensor_context(instance=instance)
assert sensors.my_sensor(context).__next__() == dg.SkipReason("No new files found")


@patch(
"dagster_testing.sensors.check_for_new_files",
"dagster_testing.defs.sensors.check_for_new_files",
return_value=["test_file"],
)
def test_sensor_run(mock_check_new_files):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import yaml # noqa: F401
from dagster._core.errors import DagsterTypeCheckDidNotPass # noqa: F401

from dagster_testing.assets import lesson_3
from dagster_testing.defs.assets import lesson_3


@pytest.fixture()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import dagster as dg # noqa: F401
import pytest

from dagster_testing.assets import lesson_4 # noqa: F401
from dagster_testing.defs.assets import lesson_4 # noqa: F401


@pytest.fixture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest
from dagster_snowflake import SnowflakeResource

from dagster_testing.assets import lesson_5
from dagster_testing.defs.assets import lesson_5

from .fixtures import docker_compose # noqa: F401

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import pytest

import dagster_testing.jobs as jobs # noqa: F401
import dagster_testing.schedules as schedules # noqa: F401
import dagster_testing.sensors as sensors # noqa: F401
from dagster_testing.assets import lesson_6 # noqa: F401
import dagster_testing.defs.jobs as jobs # noqa: F401
import dagster_testing.defs.schedules as schedules # noqa: F401
import dagster_testing.defs.sensors as sensors # noqa: F401
from dagster_testing.definitions import defs
from dagster_testing.defs.assets import lesson_6 # noqa: F401


@pytest.fixture()
Expand Down
Loading