Skip to content

Carlton file connector #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 17 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,7 @@ venv.bak/
# Editor files
*~
*.sw[po]

.aws
.DS_Store
pip-wheel-metadata
1 change: 1 addition & 0 deletions cranial/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
from cranial.connectors.base import Connector
from cranial.connectors.local import Connector as LocalConnector
from cranial.connectors.s3 import InMemoryConnector as S3InMemoryConnector
from cranial.connectors.file import Connector as FileConnector
15 changes: 11 additions & 4 deletions cranial/connectors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,21 @@
from tempfile import mkstemp
from typing import Any, Callable, Dict, Iterator, List, Set, Union # noqa

import ujson as json

SomeIO = Union[io.StringIO, io.BytesIO]


class Connector():
def __init__(self, base_address='', binary=True, do_read=False):
def __init__(self,
base_address='',
binary=True,
do_read=False,
executor=ThreadPoolExecutor):
self.base_address = base_address
self.binary = binary
self.do_read = do_read # @TODO Deprecated!
self.poolExecutor = executor

def iterator(self, target: Any) -> Iterator:
"""Get one or more IOstreams and iterate over them yielding serialized
Expand All @@ -22,9 +29,9 @@ def iterator(self, target: Any) -> Iterator:
return self.generate_from_list(target)
else:
f = self.getFuture(target)
return self.generate(f)
return self._generate(f)

def generate(self, future_item: Future):
def _generate(self, future_item: Future):
for l in future_item.result():
yield l

Expand Down Expand Up @@ -70,7 +77,7 @@ def put(self, stream: io.BytesIO, name: str = None) -> bool:

def executor(self):
if not hasattr(self, 'pool'):
self.pool = ThreadPoolExecutor()
self.pool = self.poolExecutor()
return self.pool

def _doFuture(self, fn: Callable, *args, **kwargs) -> Future:
Expand Down
188 changes: 188 additions & 0 deletions cranial/connectors/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
from glob import iglob
import io
import os
from typing import Any, List, IO, Iterator # noqa

import boto3
from smart_open import open
from toolz import memoize
import ujson as json

from cranial.connectors import base
from cranial.common import logger

log = logger.get()


def file_readlines(fp):
"""
Memory efficient iterator to read lines from a file
(readlines() method reads whole file)

Parameters
----------
fp
path to a file that can be opened by smart_open.
Returns
-------
generator of lines
"""
for line in open(fp):
if line:
yield line
else:
break


def is_s3(name):
return '://' in name and name.startswith('s3')


class Connector(base.Connector):
def __init__(self, path='', binary=True, do_read=False):
self.base_address = path
self.binary = binary
self.do_read = do_read
self._open_files = [] # type: List[IO]

def get(self, name=None):
if name is not None and name.startswith('/'):
name = name[1:]
filepath = self.base_address if name is None \
else os.path.join(self.base_address, name)
try:
mode = 'rb' if self.binary else 'r'
res = open(filepath, mode)
self._open_files.append(res)
log.info("Opened \t{}".format(filepath))

except Exception as e:
log.error("{}\tbase_address={}\tname={}\tfilepath={}".format(
e, self.base_address, name, filepath))
raise e

if self.do_read:
res = res.read()

return res

def put(self, source, name: str = None, append=False) -> bool:
filepath = self.base_address if name is None \
else os.path.join(self.base_address, name)

if '://' not in filepath:
# We are writing a local file, so we need to make sure target
# directories exist.
dir_path = os.path.split(filepath)[0]
if len(dir_path) > 0:
os.makedirs(dir_path, exist_ok=True)

if isinstance(source, (io.StringIO, io.BytesIO)):
# @TODO buffer
source = source.read()
elif isinstance(source, (str, bytes)):
pass
else:
raise Exception('Source should be either a string, bytes or a ' +
'readable buffer, got {}'.format(type(source)))

try:
# s3 doesn't support append.
mode = 'ab' if append and not is_s3(filepath) else 'wb'

# first write to file
with open(filepath, mode) as f:
f.write(source)

log.info("wrote to \t{}".format(filepath))
return True

except Exception as e:
log.error("{}\tbase_address={}\tname={}".format(
e, self.base_address, name))
return False

@memoize
def _split_base(self):
base_parts = self.base_address.split('//', 1)
if len(base_parts) == 1:
return 'local', base_parts[0]
else:
return base_parts[0], base_parts[1]

def list_names(self, prefix: str = '') -> Iterator[str]:
"""
Lists all the items under a give filepath, for supported protocols.

Test a listing in a public S3 bucket and a local temp dir:
>>> con = Connector('s3://landsat-pds/test/')
>>> 'test.txt' in con.list_names()
True
>>> import tempfile
>>> from pathlib import Path
>>> from os.path import join
>>> tmpdir = tempfile.TemporaryDirectory()
>>> os.mkdir(join(tmpdir.name, 'pre'))
>>> Path(join(tmpdir.name, 'top.file')).touch()
>>> Path(join(tmpdir.name, 'pre', 'foo.file')).touch()
>>> Path(join(tmpdir.name, 'pre', 'bar.file')).touch()
>>> con = Connector(tmpdir.name)
>>> [x for x in con.list_names('pre/')]
['pre/bar.file', 'pre/foo.file']
>>> sorted([x for x in con.list_names()])
['pre/bar.file', 'pre/foo.file', 'top.file']
"""
# Returns a List of paths.
protocol, address = self._split_base()

if protocol.startswith('s3'):
bucket, path = address.split('/', 1)
s3 = boto3.client('s3')
result = s3.list_objects_v2(Bucket=bucket, Prefix=path+prefix)
contents = result.get('Contents')
# @TODO We need to handle the case where there are more than 1000
# keys, and so S3 requires another request to retrieve remaining
# items. We can use the public 'irs-form-990' bucket to test this.
for c in contents:
name = c['Key'].replace(path, '')
if name == '':
continue
yield name
elif protocol in ('local', 'file'):
for x in iglob(
os.path.join(address, prefix, '**'),
recursive=True):
if os.path.isdir(x):
continue
name = x.replace(address + os.path.sep, '')
yield name
else:
raise Exception('list_names is not implemented for %d.', protocol)

def get_last_id(self, prefix='', id_name='id', serde=json) -> Any:
""" Gets the most recently modified file, parses it's last record
and returns it's ID.
@WIP
"""
protocol, address = self._split_base()
if protocol.startswith('s3'):
keys = self.get_dir_keys(prefix=prefix)
sorted_keys = sorted(
keys, key=lambda item: item['LastModified'], reverse=True)

last_file = self.get(sorted_keys[0])

# @TODO Read last record from last file
last_rec = serde.loads()
return last_rec[id_name]

def __del__(self):
self.close()

def close(self):
[fh.close() for fh in self._open_files]


if __name__ == "__main__":
import doctest
doctest.testmod()
File renamed without changes.
9 changes: 7 additions & 2 deletions cranial/connectors/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,13 @@ def key_download_decompress(bucket, key, force_decompress=False):

class S3Connector(base.Connector):
def __init__(self, bucket: str, prefix='') -> None:
""" @deprecated """
self.bucket = bucket
self.prefix = prefix
self.cache = {} # type: Dict[str, str]
log.info('s3.S3Connector downloads whole files to local disk '
+ 'before use. Consider s3.InMemoryConnector instead.')
log.warn('s3.S3Connector downloads whole files to local disk '
+ 'before use. Consider file.Connector instead, which '
+ 'supports getting s3://bucket/path URLs.')

def get(self, s3_path, binary=False, redownload=False):
if not redownload:
Expand Down Expand Up @@ -162,9 +164,12 @@ def put(self, source, name=None):
class InMemoryConnector(base.Connector):
def __init__(self, bucket, prefix='', binary=True, do_read=False,
credentials={}):
""" @deprecated """
super(InMemoryConnector, self).__init__(base_address=prefix, binary=binary, do_read=do_read)
self.credentials = credentials
self.bucket = bucket
log.warn('s3.InMemoryConnector is deprecated in favor of '
+ 'file.Connector, which supports s3://bucket/path URLs.')

def get(self, name=None):
"""
Expand Down
15 changes: 12 additions & 3 deletions cranial/messaging/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

StructClass = recordobject

MessageTypes = Union[Dict, str, bytes]


class Serde(metaclass=ABCMeta):
@classmethod
Expand All @@ -40,6 +42,10 @@ def __subclasshook__(cls, ClassObject):
class Message():
"""Stores message in it's native form and lazy-converts to required forms
with minimal copying.

Beware this can be 2-5x slower than directly converting a single
message, so use this only when the message is passing through multiple
processors that don't know it's type.
"""
raw: Any
b: Optional[bytes] = None
Expand Down Expand Up @@ -128,16 +134,19 @@ class NotifyException(Exception):


class Notifier(metaclass=ABCMeta):
def __init__(self, **kwargs):
pass
def __init__(self, serde=json, **kwargs):
self.serde = serde

# Optionally, @staticmethod
@abstractmethod
def send(self,
address: Optional[str],
message: str,
message: MessageTypes,
endpoint: Optional[str],
**kwargs):
""" If message is not already bytes, the Notifier should use
self.serde to convert it.
"""
return False


Expand Down
Loading