Skip to content

Commit 91f7dff

Browse files
committed
Merge branch 'master' into 7795-add-osparc-trace-id-to-response-headers
2 parents ab4db0f + 15efb0b commit 91f7dff

File tree

7 files changed

+147
-57
lines changed

7 files changed

+147
-57
lines changed

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/functions/functions_rpc_interface.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -315,25 +315,25 @@ async def delete_function_job(
315315

316316

317317
@log_decorator(_logger, level=logging.DEBUG)
318-
async def find_cached_function_job(
318+
async def find_cached_function_jobs(
319319
rabbitmq_rpc_client: RabbitMQRPCClient,
320320
*,
321321
user_id: UserID,
322322
product_name: ProductName,
323323
function_id: FunctionID,
324324
inputs: FunctionInputs,
325-
) -> RegisteredFunctionJob | None:
325+
) -> list[RegisteredFunctionJob] | None:
326326
result = await rabbitmq_rpc_client.request(
327327
WEBSERVER_RPC_NAMESPACE,
328-
TypeAdapter(RPCMethodName).validate_python("find_cached_function_job"),
328+
TypeAdapter(RPCMethodName).validate_python("find_cached_function_jobs"),
329329
function_id=function_id,
330330
inputs=inputs,
331331
user_id=user_id,
332332
product_name=product_name,
333333
)
334334
if result is None:
335335
return None
336-
return TypeAdapter(RegisteredFunctionJob).validate_python(result)
336+
return TypeAdapter(list[RegisteredFunctionJob]).validate_python(result)
337337

338338

339339
@log_decorator(_logger, level=logging.DEBUG)

services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
UnsupportedFunctionClassError,
2727
)
2828
from models_library.products import ProductName
29+
from models_library.projects_state import RunningState
2930
from models_library.users import UserID
3031
from servicelib.fastapi.dependencies import get_reverse_url_mapper
3132
from simcore_service_api_server._service_jobs import JobService
@@ -351,6 +352,8 @@ async def run_function( # noqa: PLR0913
351352
job_service: Annotated[JobService, Depends(get_job_service)],
352353
) -> RegisteredFunctionJob:
353354

355+
from .function_jobs_routes import function_job_status
356+
354357
to_run_function = await wb_api_rpc.get_function(
355358
function_id=function_id, user_id=user_id, product_name=product_name
356359
)
@@ -371,13 +374,22 @@ async def run_function( # noqa: PLR0913
371374
if not is_valid:
372375
raise FunctionInputsValidationError(error=validation_str)
373376

374-
if cached_function_job := await wb_api_rpc.find_cached_function_job(
377+
if cached_function_jobs := await wb_api_rpc.find_cached_function_jobs(
375378
function_id=to_run_function.uid,
376379
inputs=joined_inputs,
377380
user_id=user_id,
378381
product_name=product_name,
379382
):
380-
return cached_function_job
383+
for cached_function_job in cached_function_jobs:
384+
job_status = await function_job_status(
385+
wb_api_rpc=wb_api_rpc,
386+
director2_api=director2_api,
387+
function_job_id=cached_function_job.uid,
388+
user_id=user_id,
389+
product_name=product_name,
390+
)
391+
if job_status.status == RunningState.SUCCESS:
392+
return cached_function_job
381393

382394
if to_run_function.function_class == FunctionClass.PROJECT:
383395
study_job = await studies_jobs.create_study_job(

services/api-server/src/simcore_service_api_server/services_rpc/wb_api_server.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -468,15 +468,15 @@ async def get_function_output_schema(
468468
function_id=function_id,
469469
)
470470

471-
async def find_cached_function_job(
471+
async def find_cached_function_jobs(
472472
self,
473473
*,
474474
user_id: UserID,
475475
product_name: ProductName,
476476
function_id: FunctionID,
477477
inputs: FunctionInputs,
478-
) -> RegisteredFunctionJob | None:
479-
return await functions_rpc_interface.find_cached_function_job(
478+
) -> list[RegisteredFunctionJob] | None:
479+
return await functions_rpc_interface.find_cached_function_jobs(
480480
self._client,
481481
user_id=user_id,
482482
product_name=product_name,

services/web/server/src/simcore_service_webserver/functions/_controller/_functions_rpc.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -303,15 +303,15 @@ async def update_function_description(
303303

304304

305305
@router.expose()
306-
async def find_cached_function_job(
306+
async def find_cached_function_jobs(
307307
app: web.Application,
308308
*,
309309
user_id: UserID,
310310
product_name: ProductName,
311311
function_id: FunctionID,
312312
inputs: FunctionInputs,
313-
) -> FunctionJob | None:
314-
return await _functions_service.find_cached_function_job(
313+
) -> list[RegisteredFunctionJob] | None:
314+
return await _functions_service.find_cached_function_jobs(
315315
app=app,
316316
user_id=user_id,
317317
product_name=product_name,

services/web/server/src/simcore_service_webserver/functions/_functions_repository.py

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -687,15 +687,15 @@ async def delete_function_job(
687687
)
688688

689689

690-
async def find_cached_function_job(
690+
async def find_cached_function_jobs(
691691
app: web.Application,
692692
connection: AsyncConnection | None = None,
693693
*,
694694
user_id: UserID,
695695
function_id: FunctionID,
696696
product_name: ProductName,
697697
inputs: FunctionInputs,
698-
) -> RegisteredFunctionJobDB | None:
698+
) -> list[RegisteredFunctionJobDB] | None:
699699

700700
async with transaction_context(get_asyncpg_engine(app), connection) as conn:
701701
result = await conn.stream(
@@ -704,19 +704,13 @@ async def find_cached_function_job(
704704
cast(function_jobs_table.c.inputs, Text) == json.dumps(inputs),
705705
),
706706
)
707-
708707
rows = await result.all()
709708

710-
if rows is None or len(rows) == 0:
711-
return None
712-
713-
assert len(rows) == 1, (
714-
"More than one function job found with the same function id and inputs."
715-
f" Function id: {function_id}, Inputs: {inputs}"
716-
) # nosec
717-
718-
row = rows[0]
709+
if rows is None or len(rows) == 0:
710+
return None
719711

712+
jobs = []
713+
for row in rows:
720714
job = RegisteredFunctionJobDB.model_validate(dict(row))
721715
try:
722716
await check_user_permissions(
@@ -729,13 +723,14 @@ async def find_cached_function_job(
729723
permissions=["read"],
730724
)
731725
except FunctionJobReadAccessDeniedError:
732-
# If the user does not have read access, return None
733-
return None
726+
continue
734727

735-
if job.inputs == inputs:
736-
return job
728+
jobs.append(job)
737729

738-
return None
730+
if len(jobs) > 0:
731+
return jobs
732+
733+
return None
739734

740735

741736
async def get_function_job_collection(

services/web/server/src/simcore_service_webserver/functions/_functions_service.py

Lines changed: 40 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -340,48 +340,60 @@ async def update_function_description(
340340

341341

342342
@router.expose()
343-
async def find_cached_function_job(
343+
async def find_cached_function_jobs(
344344
app: web.Application,
345345
*,
346346
user_id: UserID,
347347
product_name: ProductName,
348348
function_id: FunctionID,
349349
inputs: FunctionInputs,
350-
) -> FunctionJob | None:
351-
returned_function_job = await _functions_repository.find_cached_function_job(
350+
) -> list[RegisteredFunctionJob] | None:
351+
returned_function_jobs = await _functions_repository.find_cached_function_jobs(
352352
app=app,
353353
user_id=user_id,
354354
product_name=product_name,
355355
function_id=function_id,
356356
inputs=inputs,
357357
)
358-
if returned_function_job is None:
358+
if returned_function_jobs is None or len(returned_function_jobs) == 0:
359359
return None
360360

361-
if returned_function_job.function_class == FunctionClass.PROJECT:
362-
return RegisteredProjectFunctionJob(
363-
uid=returned_function_job.uuid,
364-
title=returned_function_job.title,
365-
description=returned_function_job.description,
366-
function_uid=returned_function_job.function_uuid,
367-
inputs=returned_function_job.inputs,
368-
outputs=None,
369-
project_job_id=returned_function_job.class_specific_data["project_job_id"],
370-
)
371-
if returned_function_job.function_class == FunctionClass.SOLVER:
372-
return RegisteredSolverFunctionJob(
373-
uid=returned_function_job.uuid,
374-
title=returned_function_job.title,
375-
description=returned_function_job.description,
376-
function_uid=returned_function_job.function_uuid,
377-
inputs=returned_function_job.inputs,
378-
outputs=None,
379-
solver_job_id=returned_function_job.class_specific_data["solver_job_id"],
380-
)
381-
382-
raise UnsupportedFunctionJobClassError(
383-
function_job_class=returned_function_job.function_class
384-
)
361+
to_return_function_jobs: list[RegisteredFunctionJob] = []
362+
for returned_function_job in returned_function_jobs:
363+
if returned_function_job.function_class == FunctionClass.PROJECT:
364+
to_return_function_jobs.append(
365+
RegisteredProjectFunctionJob(
366+
uid=returned_function_job.uuid,
367+
title=returned_function_job.title,
368+
description=returned_function_job.description,
369+
function_uid=returned_function_job.function_uuid,
370+
inputs=returned_function_job.inputs,
371+
outputs=None,
372+
project_job_id=returned_function_job.class_specific_data[
373+
"project_job_id"
374+
],
375+
)
376+
)
377+
elif returned_function_job.function_class == FunctionClass.SOLVER:
378+
to_return_function_jobs.append(
379+
RegisteredSolverFunctionJob(
380+
uid=returned_function_job.uuid,
381+
title=returned_function_job.title,
382+
description=returned_function_job.description,
383+
function_uid=returned_function_job.function_uuid,
384+
inputs=returned_function_job.inputs,
385+
outputs=None,
386+
solver_job_id=returned_function_job.class_specific_data[
387+
"solver_job_id"
388+
],
389+
)
390+
)
391+
else:
392+
raise UnsupportedFunctionJobClassError(
393+
function_job_class=returned_function_job.function_class
394+
)
395+
396+
return to_return_function_jobs
385397

386398

387399
@router.expose(reraise_if_error_type=(FunctionIDNotFoundError,))

services/web/server/tests/unit/with_dbs/04/functions_rpc/test_functions_controller_rpc.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1125,3 +1125,74 @@ async def test_list_function_job_collections_filtered_function_id(
11251125
assert collections[1].uid in [
11261126
collection.uid for collection in registered_collections
11271127
]
1128+
1129+
1130+
@pytest.mark.parametrize(
1131+
"user_role",
1132+
[UserRole.USER],
1133+
)
1134+
async def test_find_cached_function_jobs(
1135+
client: TestClient,
1136+
rpc_client: RabbitMQRPCClient,
1137+
logged_user: UserInfoDict,
1138+
other_logged_user: UserInfoDict,
1139+
osparc_product_name: ProductName,
1140+
mock_function: ProjectFunction,
1141+
clean_functions: None,
1142+
):
1143+
1144+
# Register the function first
1145+
registered_function = await functions_rpc.register_function(
1146+
rabbitmq_rpc_client=rpc_client,
1147+
function=mock_function,
1148+
user_id=logged_user["id"],
1149+
product_name=osparc_product_name,
1150+
)
1151+
1152+
registered_function_jobs = []
1153+
for value in range(5):
1154+
function_job = ProjectFunctionJob(
1155+
function_uid=registered_function.uid,
1156+
title="Test Function Job",
1157+
description="A test function job",
1158+
project_job_id=uuid4(),
1159+
inputs={"input1": value if value < 4 else 1},
1160+
outputs={"output1": "result1"},
1161+
)
1162+
1163+
# Register the function job
1164+
registered_job = await functions_rpc.register_function_job(
1165+
rabbitmq_rpc_client=rpc_client,
1166+
function_job=function_job,
1167+
user_id=logged_user["id"],
1168+
product_name=osparc_product_name,
1169+
)
1170+
registered_function_jobs.append(registered_job)
1171+
1172+
# Find cached function jobs
1173+
cached_jobs = await functions_rpc.find_cached_function_jobs(
1174+
rabbitmq_rpc_client=rpc_client,
1175+
function_id=registered_function.uid,
1176+
inputs={"input1": 1},
1177+
user_id=logged_user["id"],
1178+
product_name=osparc_product_name,
1179+
)
1180+
1181+
# Assert the cached jobs contain the registered job
1182+
assert cached_jobs is not None
1183+
assert len(cached_jobs) == 2
1184+
assert {job.uid for job in cached_jobs} == {
1185+
registered_function_jobs[1].uid,
1186+
registered_function_jobs[4].uid,
1187+
}
1188+
1189+
cached_jobs = await functions_rpc.find_cached_function_jobs(
1190+
rabbitmq_rpc_client=rpc_client,
1191+
function_id=registered_function.uid,
1192+
inputs={"input1": 1},
1193+
user_id=other_logged_user["id"],
1194+
product_name=osparc_product_name,
1195+
)
1196+
1197+
# Assert the cached jobs does not contain the registered job for the other user
1198+
assert cached_jobs is None

0 commit comments

Comments
 (0)