Chunking input files #1951
Replies: 4 comments 6 replies
-
I have a With the unchunked file
With the chunked file
I am really confused what is happening here, because the chunked file and more cores is making the run slower. Of course, I understand that this could occur depending on how you chunk and parallel the process. For example, if you chunk too much it might get slower to communicate between workers than to run the task itself, or when there is not much task to perform, but it doesn't seem very clear to me for this case. I could work on a better MWE if you think it is useful, but this is the one I've been running for those benchmarks. import os
import sys
from datetime import datetime, timedelta
from glob import glob
import numpy as np
import pandas as pd
import xarray as xr
from tqdm import tqdm
from parcels import logger
import parcels
from tools import SmagorinskyDiff
# Set logger level to WARNING
logger.setLevel("WARNING")
### DEFINE THE LOCATION OF THE PARTICLES ###
# This is basically just to define all possible options for particle release
# Define the central coordinates (Icapuí, CE - Brasil)
clat, clon = -4.645110, -37.316935
# Define the bounding box dimensions (degrees)
dx, dy = 8, 6
box = [
clon - dx * 2 / 3, clon + dx / 3,
clat - dy / 3, clat + dy * 2 / 3
]
# Load and preprocess the bathymetry dataset
bat = xr.open_dataset("../../data/external/gebco/gebco_subset.nc").elevation
bat = bat.sel(lon=slice(*box[:2]), lat=slice(*box[2:]))
window = 30 # Smoothing window size
bat = bat.rolling(lon=window, lat=window, center=True, min_periods=1).mean()
# Calculate distances from the central point
dist = np.sqrt((bat.lat - clat)**2 + (bat.lon - clon)**2) * 112
# Define conditions for particle placement
where = (bat > -50) & (bat < 0) & (dist < 50)
plon, plat = xr.broadcast(bat.lon, bat.lat)
plon = plon.where(where).stack(p=["lon", "lat"]).dropna("p").values
plat = plat.where(where).stack(p=["lon", "lat"]).dropna("p").values
### THE LAGRANGIAN SIMULATION ###
# Simulation parameters
nparticles = 50 # number of particles to be release for each 5 days
runtime = 30 # days
total_time = 100 # days
# Load the ocean current dataset
ds = xr.open_dataset("../../data/external/nemo/cmems_reanalysis.nc")
ds = ds.sel(time=slice(None, ds.time.min() + np.timedelta64(total_time, 'D')))
# Create output directory
path = "../../data/processed/parcels"
os.makedirs(path, exist_ok=True)
# Define particle behavior functions
# We are excluding particles after the predefined maximum number of days
def Age(particle, fieldset, time):
"""Update particle age."""
particle.age += particle.dt / 86400
def DeleteParticle(particle, fieldset, time):
"""Delete particle if its age exceeds the runtime."""
if particle.age > fieldset.runtime:
particle.delete()
# Add an 'age' variable to the particle class
AgeParticle = parcels.JITParticle.add_variable(parcels.Variable("age", initial=0))
# Define fieldset variables and dimensions
variables = {"U": "uo", "V": "vo"}
dimensions = {"lat": "latitude", "lon": "longitude", "time": "time"}
# Define chunk size for fieldset
cs = 128
if cs not in ["auto", False]:
cs = {"time": ("time", 1), "lat": ("latitude", cs), "lon": ("longitude", cs)}
# Create the fieldset
fieldset = parcels.FieldSet.from_xarray_dataset(
ds, variables, dimensions, allow_time_extrapolation=True, chunksize=cs
)
# Add constants and fields to the fieldset
fieldset.add_constant("runtime", runtime)
x = fieldset.U.grid.lon
y = fieldset.U.grid.lat
cell_areas = parcels.Field(name="cell_areas", data=fieldset.U.cell_areas(), lon=x, lat=y)
fieldset.add_field(cell_areas)
fieldset.add_constant("Cs", 0.1)
# Generate initial particle positions and times
ploni, plati, timei = [], [], []
for di in np.arange(0, total_time, 5):
ind = np.random.randint(0, plon.size - 1, nparticles)
ploni.append(plon[ind])
plati.append(plat[ind])
timei.append([timedelta(days=float(di)).total_seconds()] * nparticles)
ploni, plati, timei = np.concatenate(ploni), np.concatenate(plati), np.concatenate(timei)
# Create the particle set
pset = parcels.ParticleSet(
fieldset=fieldset, pclass=AgeParticle, lon=ploni, lat=plati, time=timei
)
# Define output file for particle trajectories
output_fname = "parcels.zarr"
output_file = pset.ParticleFile(
name=os.path.join(path, output_fname), outputdt=timedelta(hours=3)
)
# Execute the particle simulation
pset.execute(
[Age, parcels.AdvectionRK4, SmagorinskyDiff, DeleteParticle],
endtime=timei.max() + runtime * 86400,
dt=timedelta(hours=0.5),
output_file=output_file,
verbose_progress=True
) The |
Beta Was this translation helpful? Give feedback.
-
@iuryt Interesting. I presume you double checked with My guess is that your problem is much, much smaller than mine. My grid of (time, depth, y,x) is (365,36,3059,4322), so that even one time point is 2627 times larger than yours ((36 x 3059 x 4322)/(1 x 383 x 473)). Another sign of the difference in scale is that I gain considerably from running in parallel, while you lose. I think our problems lie on different ends of the spectrum when it comes to trading off the complexity of parallelization and chunking compared to the potential gain. I guess this underscores the importance of benchmarking of problems at the scale relevant to our work. Jamie |
Beta Was this translation helpful? Give feedback.
-
To confirm with some benchmarks what I had said above, running with a NEMO grid from the Mercator GLORYS V12R3 model, global domain, on a (time, depth, y,x) dataset of size (365,36,3059,4322), with a total of about 1,549,325 start locations, each released 30 times for a total of 46,479,750 particles, chunking both the input data files and explicitly providing a chunksize to the fieldset makes a big difference. In this case, a 30 day run made on 8 cores is 204 minutes with chunking, and 280 minutes without it. So the chunking reduces the run time by 28%. In runs which often take a week to do, this can bring the run time down from 7 days to 5 days. In general, the more cores I use, the bigger difference the chunking makes. But again, this depends on the details of your system and the parcels run. Also, if you are doing runs in which the particles are released in a much smaller domain than the model domain, chunking can have a very big impact. Jamie |
Beta Was this translation helpful? Give feedback.
-
I really don't understand what is going on, and I am not sure how to proceed with investigating this. My simulation is small because I am only running it for 100 days; however, if I change that to 20 years of reanalysis, it does not scale linearly. For example, if 100 days takes about 2.5 minutes to run, I would expect 20 years to take about 3 hours, but the |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
I was asked if chunking the input circulation fields could make a difference, and if that entailed converting netCDF data into a zarr data store. In short, chunking and compressing the circulation can result in a very substantial speedup, especially if the model is being run in parallel with MPI, but even otherwise. There is no need to write the data as zarr, and I usually find it better to leave it as netCDF. This also allows the input circulation data to be used by any other analysis tools you might be using without altering their code. I have found that appropriate chunking and compressing of data decreases run times by a factor of 1.4 to 4, depending on circumstances.
In this work, I will assume that your system is typical, and that IO speeds are a more important bottleneck than the CPU overhead of uncompromising data. This has almost always been the case for me, but in all of this, a certain amount of benchmarking can be very, very useful.
First, you want to see if your circulation data is already chunked and compressed. For a netCDF file, run
ncdump -h -s YourFile.nc
and look at the output for each variable. You should see something likeThe important data are in the last five lines. You can see that the "_Storage" is "chunked" and you can see the chunksizes in the following line. This shows that a chunk of data is 1 time, 1 depth, 256 in y, 256 in x. In general, you can use smaller chunk sizes in netCDF than zarr, because all data is stored in a single file. You can also see that the "_DeflateLevel", or compression, is 1. This indicates that it is compressed. It is very important that if you compress data you also chunk it, since it is harder for the netCDF to seek arbitrary locations in compressed data.
If your data is not compressed and chunked, you should play with doing so. Start with a scheme something like that above, but I would experiment on your case. In the run with this data, chunking and compressing the input netCDF speed up the particle tracking by a factor of two and left the netCDF files usable by all other programs. I would set the chunking in parcels to match the chunking of the netCDF file, or some multiple of it. For some mysterious reason, I have found that my code works fastest if the chunksize in parcels is 1,1,512,512. Neither the netCDF nor parcels chunk sizes need to divide evenly into the array dimension.
So how do you chunk and compress the data? If you are downloading it with xarray and than saving it to disk with .to_netcdf(), or if you want to process existing files in python, you can do something like the following case. In this case
varName
is the name of the 4D variable, andnav_lon
andnav_lat
are the names of 2D variables. All variables are in the xarray datasetdataIn
However, it is often easier to use a command line function to rechunk and compress the data. In this case the netCDF kitchen sink command,
ncks
is very useful. You can run a command likewhere
deptht
,time_counter
,x
andy
are the dimensions you want to chunk, and the number after the comma is the chunk size. The -4 makes sure the output is in netCDF4, and so it can be compressed.-O
overwrites existing files, so use with care. '-L 4' sets the compression level.I hope this is helpful, @iuryt and @erikvansebille
Beta Was this translation helpful? Give feedback.
All reactions