Replies: 6 comments 11 replies
-
Hi @sryza, Thanks for this, really useful feature! Just a sanity check, am I right in thinking that this won't work with multi-dimensional partitions due to the fact that they don't have partition ranges, but use subsets instead? Just wanting to make sure before I start throwing weird strings in to the tag to try and trick it in to working! |
Beta Was this translation helpful? Give feedback.
-
Thanks for providing this example! Is this warning expected when running a partition range using this method? This is with a custom IO manager and I don't get this warning when I run for a single partition.
|
Beta Was this translation helpful? Give feedback.
-
@sryza do they even support non contiguous ranges? |
Beta Was this translation helpful? Give feedback.
-
When using this feature, how does one handle AssetMaterializations and Outputs? Yield one output for each partition? One AssetMaterialization for each partition? |
Beta Was this translation helpful? Give feedback.
-
An example of how you could programmatically launch runs across a partition range, i.e. within a schedule:
|
Beta Was this translation helpful? Give feedback.
-
An example of a project that implements this capability, specifically for two assets: hourly_partitions = HourlyPartitionsDefinition(
start_date="2023-04-11-00:00"
)
def _hourly_partition_seq(start, end):
start = pd.to_datetime(start)
end = pd.to_datetime(end)
hourly_diffs = int((end - start) / timedelta(hours=1))
return [str(start + timedelta(hours=i)) for i in range(hourly_diffs)]
@asset(
compute_kind="api",
required_resource_keys={"data_api"},
partitions_def=hourly_partitions,
metadata={"partition_expr": "created_at"},
)
def users(context: OpExecutionContext) -> pd.DataFrame:
"""A table containing all users data"""
api = context.resources.data_api
# during a backfill the partition range will span multiple hours
# during a single run the partition range will be for a single hour
first_partition, last_partition = context.asset_partitions_time_window_for_output()
partition_seq = _hourly_partition_seq(first_partition, last_partition)
all_users = []
for partition in partition_seq:
resp = api.get_users(partition)
users = pd.read_json(resp.json())
all_users.append(users)
return pd.concat(all_users) In this example the asset is written so that it always uses The example also includes a unit test to ensure the asset works whether it is called with a single partition (regular incremental runs) or a backfill (multiple partitions supplied in one run): https://github.com/dagster-io/hooli-data-eng-pipelines/blob/master/hooli_data_eng_tests/test_assets.py |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Refer to the Dagster documentation: https://docs.dagster.io/concepts/partitions-schedules-sensors/backfills#single-run-backfills
Beta Was this translation helpful? Give feedback.
All reactions