Understanding Parallel Writes #802
-
|
Hi, TL;DR; Why is it not possible to open a Repository from a process in a multiprocessing setting? I am coming from this discussion where I received the recommendation to use icechunk. After a few hours of refactoring I got again a sync and a threading version running, which is also a lot cleaner thanks to icechunks git-like method of updating the datacube. I really like the approach, I think it is a great concept, so thank you a lot for your work! :D Now I am (again) in a deadlock when trying to use multiprocessing. class DownloadAndLoadHandler:
def __init__(self, storage: icechunk.Storage, ...):
self.repo = icechunk.Repository.open_or_create(storage)
...
def _task(i):
storage = icechunk.local_filesystem_storage("arcticdem_32m.zarr")
h = DownloadAndLoadHandler(storage)
res = h.do_something(i)
return i, res
if __name__ == "__name__":
with ProcessPoolExecutor(max_workers=3) as executor:
results = list(executor.map(_mp_task, list(range(3))))
print(results)Since the documentation about Distributed Writes states that one should pass a session to each process, instead of creating new ones, this seems to be the expected behavior. But this got me wondering: why must the sessions be created and handled on the main process? Since I don't have control on how users will use their distributed computing setup, I can't have access to the main thread. Or at least I don't want to enforce it through my library design. Hence, the example in the docs won't work for me as-is. I pushed the current status of my project: https://github.com/relativityhd/smart-geocubes/tree/66651de032437a4ec6ff32a4f67226475242f75f |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 22 replies
-
|
Quick clarification question: in your intended production environment, will the data be stored in cloud object storage or local file storage? |
Beta Was this translation helpful? Give feedback.
-
|
There are fundamentally two different modes for distributed writes in Icechunk:
I think you want uncooperative. Here's an example of uncooperative mode based on zarr-developers/zarr-python#2868. import multiprocessing as mp
import icechunk as ic
import shutil
import zarr
def get_storage():
# Ironically, local storage is not safe wrt race conditions on commit due to limitations of object_store
# storage = ic.local_filesystem_storage("data.icechunk")
storage = ic.s3_storage(bucket="icechunk-test", prefix="zarr_issue_2868", from_env=True)
return storage
def worker(i):
print(f"Stated worker {i}")
storage = get_storage()
repo = ic.Repository.open(storage)
# keep trying until it succeeds
while True:
try:
session = repo.writable_session("main")
z = zarr.open(session.store, mode="r+")
print(f"Opened store for {i} | {dict(z.attrs)}")
a = z.attrs.get("done", [])
a.append(i)
z.attrs["done"] = a
session.commit(f"wrote from worker {i}")
break
except ic.ConflictError:
print(f"Conflict for {i}, retying")
pass
def main():
storage = get_storage()
repo = ic.Repository.create(storage)
session = repo.writable_session("main")
zarr.create(
shape=(10, 10),
chunks=(5, 5),
store=session.store,
overwrite=True,
)
session.commit("initialized dataset")
p1 = mp.Process(target=worker, args=(1,))
p2 = mp.Process(target=worker, args=(2,))
p1.start()
p2.start()
p1.join()
p2.join()
session = repo.readonly_session(branch="main")
z = zarr.open(session.store, mode="r")
print(z.attrs["done"])
print(list(repo.ancestry(branch="main")))
if __name__ == "__main__":
main()This outputs |
Beta Was this translation helpful? Give feedback.
-
This is a bit of a red herring I think. You definitely do not want to be attempting to create a Repo from scratch in the same place from multiple uncoordinated process. Instead, create the repo once from the master process (as in my example above) and then open it from the child processes. |
Beta Was this translation helpful? Give feedback.
-
|
Hello, Looking at the docs, I have tried a branching approach where at each write I would create a new branch for my dataset, write to the branch, and then try to merge in main. However, looks like icechunk only currently support Now the issue here is that by the time Wondering if anyone had any luck executing a workflow like that? Thanks a lot for any information! |
Beta Was this translation helpful? Give feedback.
@relativityhd adding
mp.set_start_method('forkserver')at the start ofmainsolved it for me. Could you try it?