You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm working with the 4km NOAA Analysis of Records for Calibration (AORC) dataset which contains 41 years of hourly data (time: 359,160) across the contiguous US (lat: 885, lon: 1770) with two data variables (float32). Originally, the data was split into one NetCDF per hour per variable over 12 geographic zones (~8.6 mil files), but I've combined these to 492 Zarr files which I load through xr.open_mfdataset.
I need to create a tab-delimited text file for every location (~1.5 mil files!) to input into a model (it was created many years ago, unfortunately, this is a requirement). Using Python single-threaded, this would take about a year to complete! I have a local server with 20 cores, 2 threads each, and 1 TB of RAM and I've attempted to use Dask to parallelize, but with no success so far. I'm creating a task (a single function that selects data for the location and saves) for each file. Each worker (20 total) computes a task, as I've intended, but it seems that Dask is not actually executing the tasks until all tasks have been completed. As a result, RAM is not freed up, and compute fails. I've tested this by adding only 20-40 tasks, and this works; beyond that point the workers start spilling over the memory limit. Using the dashboard, I can see that it is distributing the tasks properly (for 40 tasks each worker gets 2 tasks) and computes the tasks, but the functions are not executed until the very end. I can also confirm all 40 were saved at the same time from the timestamp, instead of the desired 2 batches of 20.
Does anyone have an idea how I can get Dask to execute the tasks immediately after computation? Or are there other packages or workflows that may be better suited than Dask? Please see below for the code, and I appreciate any thoughts!
client = Client(n_workers=20, threads_per_worker=2, memory_limit='32GB')
data = xr.open_mfdataset('filenames*?.zarr', concat_dim="time", combine="nested", parallel=True, engine='zarr',
data_vars='minimal', coords='minimal', compat='override').chunk(chunks={'time': 359424, 'latitude': 6, 'longitude': 6})
def save_csvs(i, j):
vals = data.isel(latitude=i, longitude=j)
lat = str(round(float(vals.latitude.values), 4))
lon = str(round(float(vals.longitude.values), 4))
fn = 'name_' + lat + '_' + lon + '.csv'
vals.to_pandas().to_csv(fn, sep = '\t')
tasks = [delayed(save_csvs)(i, j) for i in range(885) for j in range(1770)]
compute(*tasks)
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
-
I'm working with the 4km NOAA Analysis of Records for Calibration (AORC) dataset which contains 41 years of hourly data (time: 359,160) across the contiguous US (lat: 885, lon: 1770) with two data variables (float32). Originally, the data was split into one NetCDF per hour per variable over 12 geographic zones (~8.6 mil files), but I've combined these to 492 Zarr files which I load through xr.open_mfdataset.
I need to create a tab-delimited text file for every location (~1.5 mil files!) to input into a model (it was created many years ago, unfortunately, this is a requirement). Using Python single-threaded, this would take about a year to complete! I have a local server with 20 cores, 2 threads each, and 1 TB of RAM and I've attempted to use Dask to parallelize, but with no success so far. I'm creating a task (a single function that selects data for the location and saves) for each file. Each worker (20 total) computes a task, as I've intended, but it seems that Dask is not actually executing the tasks until all tasks have been completed. As a result, RAM is not freed up, and compute fails. I've tested this by adding only 20-40 tasks, and this works; beyond that point the workers start spilling over the memory limit. Using the dashboard, I can see that it is distributing the tasks properly (for 40 tasks each worker gets 2 tasks) and computes the tasks, but the functions are not executed until the very end. I can also confirm all 40 were saved at the same time from the timestamp, instead of the desired 2 batches of 20.
Does anyone have an idea how I can get Dask to execute the tasks immediately after computation? Or are there other packages or workflows that may be better suited than Dask? Please see below for the code, and I appreciate any thoughts!
Beta Was this translation helpful? Give feedback.
All reactions