Launching a single RunRequest for a set of partitions from a schedule or sensor #19457
Replies: 3 comments 5 replies
-
Maybe this #14622? |
Beta Was this translation helpful? Give feedback.
-
I have the same issue. In short: Now to the problem: The quite ugly workaround for me is to: @sensor(job=...)
def sensor_do_job(
context: SensorEvaluationContext,
):
partition_keys = get_partition_keys_from_some_resource()
if len(partition_keys) > 1:
context.log.info(f"Clearing {len(partition_keys)} partitions")
for partition in partition_keys:
context.instance.delete_dynamic_partition(
partitions_def.name, partition
)
context.log.info(f"Adding {len(partition_keys)} partitions")
context.instance.add_dynamic_partitions(
partitions_def.name,
partition_keys=partition_keys,
)
return RunRequest(
tags={
"dagster/asset_partition_range_start": partition_keys[0],
"dagster/asset_partition_range_end": partition_keys[-1],
}
)
elif len(partition_keys) == 1:
return RunRequest(partition_key=partition_keys[0]) By deleting and then reinserting, the partition_keys the are appended as a continuous range and can therefore be run using tags. However this solution comes with the drawback that if in the process of deleting and adding the partition_keys another application is using the same I really think that assets which fill multiple partitions with a single run are really missing an important feature here. In my specific usecase the initialization cost stems from a machine learning model that is loaded from cluster storage into memory for inference. One possibility would be to serve this model to an endpoint in our Kubernetes cluster and therefore eleminate the cost in setting up the resource. However this would come with some additional structural overhead and would eleminate in parts the ease of use of dagster. Would be very interested in how others solved the problem of initialization cost or potential other workarounds of the mentioned problem. Thanks! |
Beta Was this translation helpful? Give feedback.
-
Anyone aware of a workaround or fix for this that doesn't involve deleting and reinserting? I've just gone through the process of writing code to automatically create dynamic assets to deal with the 25k partition limit - I have ~50,000 total partitions. I re-use these dynamic partitions across multiple assets, so deleting and re-appending is certain to cause issues. I preferred to not use single_run as well, but in my case it's even more severe than for @AdrianTheopold, it's around 90% initialization and 10% actual work to do. So not doing it I'm looking at a 10x to the run time, probably even more when you add on the additional load on dagster daemons. One option is to use a different partitions_def on each asset, but then I'm adding even more duplication. That's probably the route I'll go with if there isn't a better alternative though. EDIT: I decided to still share the partitions, but just clear all of them out and re-insert now. Fortunately the main order in which I want to keep inserting is by date+id ascending, so I should be able to get away with making sure that I always create them in order from now on. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Is there a way to launch a single RunRequest for a set of partitions from a schedule or sensor, just like you can do a single run backfill on the UI?
I have a partitioned asset with dynamic partitions (7-800 partitions) and I want to periodically refresh the table for some or all the available partitions (essentially to have my own replica of a dataset provided via an API). The asset itself can work with
partition_keys
(can handleBackfillPolicy.single_run()
), as refreshing a single partition only takes a relatively lightweight API call, and I have a neat resource wrapping the API, so it’s easy to do all partitions within a single run (maybe even async).But I can’t seem to find a way to launch just one run for multiple partitions from a schedule. All the examples I saw iterate over the partition keys and create a
RunRequest
for each, but that won’t work for me here, because it creates too much overhead with hundreds of runs that in turn trigger hundreds of unnecessary auto-materializations of downstream assets and checks, with a flurry of Slack notifications, etc., while the whole business could be finished in a matter of seconds with multiple partitions on a single run.Beta Was this translation helpful? Give feedback.
All reactions