Skip to content

Commit a15d79d

Browse files
committed
support for arrow file transfers
1 parent 55aff39 commit a15d79d

File tree

2 files changed

+70
-23
lines changed

2 files changed

+70
-23
lines changed

sliderule/sliderule.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@
6464

6565
recdef_table = {}
6666

67+
arrow_file_table = {}
68+
6769
gps_epoch = datetime(1980, 1, 6)
6870
tai_epoch = datetime(1970, 1, 1, 0, 0, 10)
6971

@@ -361,10 +363,32 @@ def __exceptrec(rec):
361363
else:
362364
eventlogger[rec["level"]]("%s", rec["text"])
363365

366+
#
367+
# _arrowrec
368+
#
369+
def __arrowrec(rec):
370+
global arrow_file_table
371+
try :
372+
filename = rec["filename"]
373+
if rec["__rectype"] == 'arrowrec.meta':
374+
if filename in arrow_file_table:
375+
raise FatalError("file transfer already in progress")
376+
arrow_file_table[filename] = { "fp": open(filename, "wb"), "size": rec["size"], "progress": 0 }
377+
else: # rec["__rectype"] == 'arrowrec.data'
378+
data = rec['data']
379+
file = arrow_file_table[filename]
380+
file["fp"].write(bytearray(data))
381+
file["progress"] += len(data)
382+
if file["progress"] >= file["size"]:
383+
file["fp"].close()
384+
del arrow_file_table[filename]
385+
except Exception as e:
386+
raise FatalError("Failed to process arrow file: {}".format(e))
387+
364388
#
365389
# Globals
366390
#
367-
__callbacks = {'eventrec': __logeventrec, 'exceptrec': __exceptrec}
391+
__callbacks = {'eventrec': __logeventrec, 'exceptrec': __exceptrec, 'arrowrec.meta': __arrowrec, 'arrowrec.data': __arrowrec }
368392

369393
###############################################################################
370394
# APIs

utils/utils.py

Lines changed: 45 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -56,28 +56,37 @@ def initialize_client(args):
5656

5757
# Set Script Defaults
5858
cfg = {
59-
"url": 'localhost',
60-
"organization": None,
61-
"asset": 'atlas-local',
62-
"region": 'examples/grandmesa.geojson',
63-
"resource": 'ATL03_20181017222812_02950102_005_01.h5',
64-
"raster": True,
65-
"atl08_class": [],
66-
"srt": icesat2.SRT_LAND,
67-
"cnf": icesat2.CNF_SURFACE_HIGH,
68-
"ats": 10.0,
69-
"cnt": 10,
70-
"len": 40.0,
71-
"res": 20.0,
72-
"maxi": 1,
73-
"atl03_geo_fields": [],
74-
"atl03_ph_fields": [],
75-
"profile": True,
76-
"verbose": True,
77-
"timeout": 0,
78-
"rqst-timeout": 0,
79-
"node-timeout": 0,
80-
"read-timeout": 0
59+
"url": 'localhost',
60+
"organization": None,
61+
"asset": 'atlas-local',
62+
"region": 'examples/grandmesa.geojson',
63+
"resource": 'ATL03_20181017222812_02950102_005_01.h5',
64+
"raster": True,
65+
"atl08_class": [],
66+
"yapc.score": 0,
67+
"yapc.knn": 0,
68+
"yapc.min_knn": 5,
69+
"yapc.win_h": 6.0,
70+
"yapc.win_x": 15.0,
71+
"yapc.version": 0,
72+
"srt": icesat2.SRT_LAND,
73+
"cnf": icesat2.CNF_SURFACE_HIGH,
74+
"ats": 10.0,
75+
"cnt": 10,
76+
"len": 40.0,
77+
"res": 20.0,
78+
"maxi": 1,
79+
"atl03_geo_fields": [],
80+
"atl03_ph_fields": [],
81+
"profile": True,
82+
"verbose": True,
83+
"timeout": 0,
84+
"rqst-timeout": 0,
85+
"node-timeout": 0,
86+
"read-timeout": 0,
87+
"output.path": None,
88+
"output.format": "native",
89+
"output.open_on_complete": False
8190
}
8291

8392
# Parse Configuration Parameters
@@ -114,6 +123,15 @@ def initialize_client(args):
114123
if len(cfg['atl08_class']) > 0:
115124
parms['atl08_class'] = cfg['atl08_class']
116125

126+
# Add YAPC Parameters
127+
if cfg["yapc.version"] > 0:
128+
parms["yapc"] = { "score": cfg["yapc.score"],
129+
"knn": cfg["yapc.knn"],
130+
"min_knn": cfg["yapc.min_knn"],
131+
"win_h": cfg["yapc.win_h"],
132+
"win_x": cfg["yapc.win_x"],
133+
"version": cfg["yapc.version"] }
134+
117135
# Provide Timeouts
118136
if cfg["timeout"] > 0:
119137
parms["timeout"] = cfg["timeout"]
@@ -127,6 +145,11 @@ def initialize_client(args):
127145
if cfg["read-timeout"] > 0:
128146
parms["read-timeout"] = cfg["read-timeout"]
129147

148+
# Add Output Options
149+
if cfg["output.path"]:
150+
parms["output"] = { "path": cfg["output.path"],
151+
"format": cfg["output.format"],
152+
"open_on_complete": cfg["output.open_on_complete"] }
130153
# Latch Start Time
131154
tstart = time.perf_counter()
132155

0 commit comments

Comments
 (0)