Skip to content

refactor: drop posix_ipc and use native python shared_memory #659

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cloudvolume/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
from . import exceptions
from . import secrets

__version__ = '12.1.1'
__version__ = '12.2.0'

# Register plugins
from .datasource.precomputed import register as register_precomputed
Expand Down
25 changes: 14 additions & 11 deletions cloudvolume/datasource/precomputed/image/tx.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,19 @@ def upload_aligned(
secrets=secrets
)

parallel_execution(
cup, chunk_ranges, parallel,
progress, desc="Upload",
cleanup_shm=location
)

# If manual mode is enabled, it's the
# responsibilty of the user to clean up
if not use_shared_memory:
array_like.close()
shm.unlink(location)
try:
parallel_execution(
cup, chunk_ranges, parallel,
progress, desc="Upload",
cleanup_shm=location
)
finally:
# If manual mode is enabled, it's the
# responsibilty of the user to clean up
if not use_shared_memory:
del renderbuffer
array_like.close()
shm.unlink(location)

def child_upload_process(
meta, cache,
Expand Down Expand Up @@ -267,6 +269,7 @@ def updatefn():
secrets=secrets,
)
finally:
del renderbuffer
array_like.close()

def threaded_upload_chunks(
Expand Down
4 changes: 3 additions & 1 deletion cloudvolume/frontends/precomputed.py
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,8 @@ def upload_from_shared_memory(self, location, bbox, order='F', cutout_bbox=None)
order=order,
use_shared_memory=True,
)
mmap_handle.close()
del shared_image
# mmap_handle.close()

def upload_from_file(self, location, bbox, order='F', cutout_bbox=None):
"""
Expand Down Expand Up @@ -1190,6 +1191,7 @@ def upload_from_file(self, location, bbox, order='F', cutout_bbox=None):
order=order,
use_file=True,
)
del shared_image
mmap_handle.close()

def viewer(self, port=1337):
Expand Down
22 changes: 10 additions & 12 deletions cloudvolume/sharedmemory.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ def allocate_shm_file(filename, nbytes, dbytes, readonly):

def ndarray_shm(shape, dtype, location, readonly=False, order='F', **kwargs):
"""Create a shared memory numpy array. Requires /dev/shm to exist."""
import posix_ipc
from posix_ipc import O_CREAT
from multiprocessing import shared_memory
import psutil

nbytes = Vec(*shape).rectVolume() * np.dtype(dtype).itemsize
Expand Down Expand Up @@ -170,32 +169,31 @@ def ndarray_shm(shape, dtype, location, readonly=False, order='F', **kwargs):
# a threading condition where the condition of the shared memory
# was adjusted between the check above and now. Better to make sure
# that we don't accidently change anything if readonly is set.
flags = 0 if readonly else O_CREAT
size = 0 if readonly else int(nbytes)

try:
shared = posix_ipc.SharedMemory(location, flags=flags, size=size)
array_like = mmap.mmap(shared.fd, shared.size)
os.close(shared.fd)
renderbuffer = np.ndarray(buffer=array_like, dtype=dtype, shape=shape, order=order, **kwargs)
shm = shared_memory.SharedMemory(name=location, create=(not readonly), size=size)
renderbuffer = np.frombuffer(buffer=shm.buf, dtype=dtype)
renderbuffer = renderbuffer.reshape(shape, order=order)
except OSError as err:
if err.errno == errno.ENOMEM: # Out of Memory
posix_ipc.unlink_shared_memory(location)
unlink_shm(location)
raise

renderbuffer.setflags(write=(not readonly))
return array_like, renderbuffer
return shm, renderbuffer

def unlink(location):
if EMULATE_SHM:
return unlink_fs(location)
return unlink_shm(location)

def unlink_shm(location):
import posix_ipc
from multiprocessing import shared_memory
try:
posix_ipc.unlink_shared_memory(location)
except posix_ipc.ExistentialError:
shm = shared_memory.SharedMemory(name=location, create=False)
shm.unlink()
except FileNotFoundError:
return False
return True

Expand Down
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def requirements():

setuptools.setup(
name="cloud_volume",
version="12.1.1",
version="12.2.0",
setup_requires=[
'numpy<1.17; python_version<"3.5"',
'numpy; python_version>="3.5"',
Expand All @@ -50,7 +50,6 @@ def requirements():
"blosc",
],
':sys_platform!="win32"': [
"posix_ipc>=1.0.4",
"psutil>=5.4.3",
],
"mesh_viewer": [ 'vtk' ],
Expand Down