Skip to content
This repository was archived by the owner on Jul 11, 2025. It is now read-only.

Disk-based job system and improved analysis #57

Merged
merged 13 commits into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions src/ReefGuideAPI.jl
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,19 @@ using
HTTP,
Oxygen

include("Middleware.jl")
include("admin.jl")

include("job_management/JobInterface.jl")
include("job_management/DiskService.jl")

include("criteria_assessment/criteria.jl")
include("criteria_assessment/query_thresholds.jl")
include("criteria_assessment/regional_assessment.jl")

include("site_assessment/common_functions.jl")
include("site_assessment/best_fit_polygons.jl")

include("Middleware.jl")
include("admin.jl")

function get_regions()
# TODO: Comes from config?
regions = String[
Expand Down Expand Up @@ -97,14 +101,6 @@ function initialize_regional_data_cache(reef_data_path::String, reg_cache_fn::St
# flat_table[!, :lats] .= last.(coords)

rst_stack = RasterStack(data_paths; name=data_names, lazy=true)

# Constrain to just the areas with valid data (with a 0.05 degree buffer)
# min_lon = min(minimum(slope_table.lons), minimum(flat_table.lons)) - 0.05
# max_lon = max(maximum(slope_table.lons), maximum(flat_table.lons)) + 0.05
# min_lat = min(minimum(slope_table.lats), minimum(flat_table.lats)) - 0.05
# max_lat = max(maximum(slope_table.lats), maximum(flat_table.lats)) + 0.05
# rst_stack = view(rst_stack, X(min_lon .. max_lon), Y(min_lat .. max_lat))

regional_assessment_data[reg] = RegionalCriteria(
rst_stack,
slope_table,
Expand Down Expand Up @@ -223,7 +219,7 @@ Generate a filename for a cache.
- `ext` : file extension to use
"""
function cache_filename(qp::Dict, config::Dict, suffix::String, ext::String)
file_id = string(hash(qp))
file_id = create_job_id(qp)
temp_path = _cache_location(config)
cache_file_path = joinpath(temp_path, "$(file_id)$(suffix).$(ext)")

Expand Down Expand Up @@ -301,6 +297,9 @@ function start_server(config_path)
@info "Setting up tile routes..."
setup_tile_routes(config, auth)

@info "Setting up job routes..."
setup_job_routes(config, auth)

@info "Setting up admin routes..."
setup_admin_routes(config)

Expand All @@ -309,7 +308,7 @@ function start_server(config_path)

return serve(;
middleware=[CorsMiddleware],
host="0.0.0.0",
host="127.0.0.1",
port=port,
parallel=Threads.nthreads() > 1,
is_prioritized=(req::HTTP.Request) -> req.target == "/health"
Expand Down
13 changes: 7 additions & 6 deletions src/criteria_assessment/criteria.jl
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ function criteria_data_map()
:WavesTp => "_waves_Tp",
:Rugosity => "_rugosity",
:ValidSlopes => "_valid_slopes",
:ValidFlats => "_valid_flats",
:PortDistSlopes => "_PortDistSlopes",
:PortDistFlats => "_PortDistFlats"
:ValidFlats => "_valid_flats"

# Unused datasets
# :PortDistSlopes => "_PortDistSlopes",
# :PortDistFlats => "_PortDistFlats"
)
end

Expand Down Expand Up @@ -234,11 +236,10 @@ function setup_region_routes(config, auth)
open(suitable_sites_fn, "w") do f
JSON.print(f, nothing)
end

return file(suitable_sites_fn)
else
output_geojson(suitable_sites_fn, best_sites)
end

output_geojson(suitable_sites_fn, best_sites)
return file(suitable_sites_fn)
end

Expand Down
113 changes: 113 additions & 0 deletions src/criteria_assessment/regional_assessment.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@

function setup_job_routes(config, auth)
reg_assess_data = setup_regional_data(config)

@get auth("/job/details/{job_id}") function (req::Request, job_id::String)
srv = DiskService(_cache_location(config))
return json(job_details(srv, job_id))
end

@get auth("/job/result/{job_id}") function (req::Request, job_id::String)
srv = DiskService(_cache_location(config))
return file(job_result(srv, job_id))
end

@get auth("/submit/region-assess/{reg}/{rtype}") function (
req::Request, reg::String, rtype::String
)
# 127.0.0.1:8000/submit/region-assess/Mackay-Capricorn/slopes?Depth=-12.0:-2.0&Slope=0.0:40.0&Rugosity=0.0:6.0&SuitabilityThreshold=95
qp = queryparams(req)
srv = DiskService(_cache_location(config))
job_id = create_job_id(qp) * "$(reg)_suitable"

details = job_details(srv, job_id)
job_state = job_status(details)
if (job_state == "no job") && (job_state != "processing")
@debug "$(now()) : Submitting $(job_id)"
assessed_fn = cache_filename(
extract_criteria(qp, suitability_criteria()),
config,
"$(reg)_suitable",
"tiff"
)

details = submit_job(srv, job_id, assessed_fn)

# Do job asyncronously...
@async assess_region(
config,
qp,
reg,
rtype,
reg_assess_data
)
end

if job_state == "processing"
@debug details
end

@debug "$(now()) : Job submitted, return polling url"
return json(details.access_url)
end

@get auth("/submit/site-assess/{reg}/{rtype}") function (
req::Request, reg::String, rtype::String
)
qp = queryparams(req)
suitable_sites_fn = cache_filename(
qp, config, "$(reg)_potential_sites", "geojson"
)

srv = DiskService(_cache_location(config))
job_id = create_job_id(qp) * "$(reg)_potential_sites"

details = job_details(srv, job_id)
job_state = job_status(details)
if (job_state != "no job") && (job_state != "completed")
@debug "$(now()) : Submitting $(job_id)"
details = submit_job(srv, job_id, suitable_sites_fn)

# Do job asyncronously...
@async begin
assessed_fn = assess_region(
config,
qp,
reg,
rtype,
reg_assess_data
)

assessed = Raster(assessed_fn; missingval=0)

# Extract criteria and assessment
pixel_criteria = extract_criteria(qp, search_criteria())
deploy_site_criteria = extract_criteria(qp, site_criteria())

best_sites = filter_sites(
assess_sites(
reg_assess_data, reg, rtype, pixel_criteria, deploy_site_criteria,
assessed
)
)

# Specifically clear from memory to invoke garbage collector
assessed = nothing

if nrow(best_sites) == 0
open(suitable_sites_fn, "w") do f
JSON.print(f, nothing)
end
else
output_geojson(suitable_sites_fn, best_sites)
end

details.status = "completed"
update_job!(srv, job_id)
end
end

@debug "$(now()) : Job submitted, return polling url"
return json(details.access_url)
end
end
57 changes: 43 additions & 14 deletions src/criteria_assessment/site_identification.jl
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@ function lookup_assess_region(reg_assess_data, reg, qp, rtype; x_dist=100.0, y_d
(assess_locs[1, :lons], assess_locs[1, :lats]), # center point
x_dist, # x distance in meters
y_dist, # y distance in meters
target_crs,
0.0
target_crs
)

# Create KD-tree to identify `n` nearest pixels
Expand Down Expand Up @@ -240,6 +239,7 @@ end

Convenience method wrapping around the analysis conducted by `assess_region()`.
Checks for previous assessment of indicated region and returns filename of cache if found.
If corresponding job is found, wait for results.

"""
function assess_region(
Expand All @@ -252,11 +252,45 @@ function assess_region(
return assessed_fn
end

@debug "$(now()) : Assessing region $(reg)"
assessed = assess_region(reg_assess_data, reg, qp, rtype)
srv = DiskService(_cache_location(config))
job_id = create_job_id(qp) * "$(reg)_suitable"

@debug "$(now()) : Writing to $(assessed_fn)"
_write_tiff(assessed_fn, assessed)
job_state = job_status(srv, job_id)
if (job_state != "no job") && (job_state != "completed") && (job_state != "error")
@debug "$(now()) : Waiting for $(reg) job to finish : ($(job_id))"
# Job exists, wait for job to finish
wait_time = 20.0 # seconds
max_wait = 60.0 # max time to wait per loop

while true
st = job_status(srv, job_id)
if st ∈ ["completed", "error"]
break
end

sleep(wait_time)

# Exponential backoff (increase wait time every loop)
wait_time = min(wait_time * 2.0, max_wait)
end

if job_status(srv, job_id) == "error"
throw(ArgumentError("Job $(job_id) errored."))
end
else
@debug "$(now()) : Submitting job for $(reg)"
job_details = submit_job(srv, job_id, assessed_fn)

@debug "$(now()) : Assessing region $(reg)"
assessed = assess_region(reg_assess_data, reg, qp, rtype)

@debug "$(now()) : Writing to $(assessed_fn)"
_write_tiff(assessed_fn, assessed)

@debug "$(now()) : Marking job for $(reg) as completed"
job_details.status = "completed"
update_job!(srv, job_id, job_details)
end

return assessed_fn
end
Expand Down Expand Up @@ -316,22 +350,17 @@ function assess_sites(
CriteriaBounds.(criteria_names, lbs, ubs)
)

# Need reef outlines to indicate direction of the reef edge
gdf = REGIONAL_DATA["reef_outlines"]

res = abs(step(dims(assess_locs, X)))
x_dist = parse(Int64, site_criteria["xdist"])
y_dist = parse(Int64, site_criteria["ydist"])
@debug "$(now()) : Assessing site polygons for $(size(target_locs, 1)) locations in $(reg)."
initial_polygons = identify_edge_aligned_sites(
@debug "$(now()) : Assessing $(size(target_locs, 1)) candidate locations in $(reg)."
initial_polygons = find_optimal_site_alignment(
crit_pixels,
target_locs,
res,
gdf,
x_dist,
y_dist,
target_crs,
reg
target_crs
)

return initial_polygons
Expand Down
Loading
Loading