Skip to content

Commit b9b06a7

Browse files
committed
Optionnally provide request HTTP headers to processors
1 parent fb642dd commit b9b06a7

File tree

7 files changed

+34
-9
lines changed

7 files changed

+34
-9
lines changed

pygeoapi/api/processes.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,8 @@ def execute_process(api: API, request: APIRequest,
493493
process_id, data_dict, execution_mode=execution_mode,
494494
requested_outputs=requested_outputs,
495495
subscriber=subscriber,
496-
requested_response=requested_response)
496+
requested_response=requested_response,
497+
request_headers=request.headers)
497498
job_id, mime_type, outputs, status, additional_headers = result
498499
headers.update(additional_headers or {})
499500

pygeoapi/process/base.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ def __init__(self, processor_def: dict, process_metadata: dict):
5353
self.name = processor_def['name']
5454
self.metadata = process_metadata
5555
self.supports_outputs = False
56+
self.supports_request_headers = False
5657

5758
def set_job_id(self, job_id: str) -> None:
5859
"""
@@ -70,7 +71,8 @@ def set_job_id(self, job_id: str) -> None:
7071

7172
pass
7273

73-
def execute(self, data: dict, outputs: Optional[dict] = None
74+
def execute(self, data: dict, outputs: Optional[dict] = None,
75+
request_headers: Optional[dict] = None
7476
) -> Tuple[str, Any]:
7577
"""
7678
execute the process
@@ -81,6 +83,8 @@ def execute(self, data: dict, outputs: Optional[dict] = None
8183
required outputs - defaults to all outputs.
8284
The value of any key may be an object and include the
8385
property `transmissionMode` - defaults to `value`.
86+
:param request_headers: `dict` optionally specifying the headers from
87+
the request
8488
:returns: tuple of MIME type and process response
8589
(string or bytes, or dict)
8690
"""

pygeoapi/process/manager/base.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ def __init__(self, manager_def: dict):
7777
self.name = manager_def['name']
7878
self.is_async = False
7979
self.supports_subscribing = False
80+
self.supports_request_headers = False
8081
self.connection = manager_def.get('connection')
8182
self.output_dir = manager_def.get('output_dir')
8283

@@ -195,7 +196,8 @@ def _execute_handler_async(self, p: BaseProcessor, job_id: str,
195196
data_dict: dict,
196197
requested_outputs: Optional[dict] = None,
197198
subscriber: Optional[Subscriber] = None,
198-
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa
199+
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa
200+
request_headers: Optional[dict] = None
199201
) -> Tuple[str, None, JobStatus]:
200202
"""
201203
This private execution handler executes a process in a background
@@ -216,13 +218,15 @@ def _execute_handler_async(self, p: BaseProcessor, job_id: str,
216218
:param subscriber: optional `Subscriber` specifying callback URLs
217219
:param requested_response: `RequestedResponse` optionally specifying
218220
raw or document (default is `raw`)
221+
:param request_headers: `dict` optionally specifying the headers from
222+
the request
219223
220224
:returns: tuple of None (i.e. initial response payload)
221225
and JobStatus.accepted (i.e. initial job status)
222226
"""
223227

224228
args = (p, job_id, data_dict, requested_outputs, subscriber,
225-
requested_response)
229+
requested_response, request_headers)
226230

227231
_process = dummy.Process(target=self._execute_handler_sync, args=args)
228232
_process.start()
@@ -233,7 +237,8 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str,
233237
data_dict: dict,
234238
requested_outputs: Optional[dict] = None,
235239
subscriber: Optional[Subscriber] = None,
236-
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa
240+
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa
241+
request_headers: Optional[dict] = None
237242
) -> Tuple[str, Any, JobStatus]:
238243
"""
239244
Synchronous execution handler
@@ -255,16 +260,20 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str,
255260
:param subscriber: optional `Subscriber` specifying callback URLs
256261
:param requested_response: `RequestedResponse` optionally specifying
257262
raw or document (default is `raw`)
263+
:param request_headers: `dict` optionally specifying the headers from
264+
the request
258265
259266
:returns: tuple of MIME type, response payload and status
260267
"""
261268

262269
extra_execute_parameters = {}
263270

264-
# only pass requested_outputs if supported,
271+
# only pass requested_outputs and request_headers if supported,
265272
# otherwise this breaks existing processes
266273
if p.supports_outputs:
267274
extra_execute_parameters['outputs'] = requested_outputs
275+
if p.supports_request_headers:
276+
extra_execute_parameters['request_headers'] = request_headers
268277

269278
self._send_in_progress_notification(subscriber)
270279

@@ -358,7 +367,8 @@ def execute_process(
358367
execution_mode: Optional[RequestedProcessExecutionMode] = None,
359368
requested_outputs: Optional[dict] = None,
360369
subscriber: Optional[Subscriber] = None,
361-
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa
370+
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa
371+
request_headers: Optional[dict] = None
362372
) -> Tuple[str, Any, JobStatus, Optional[Dict[str, str]]]:
363373
"""
364374
Default process execution handler
@@ -377,6 +387,8 @@ def execute_process(
377387
:param subscriber: `Subscriber` optionally specifying callback urls
378388
:param requested_response: `RequestedResponse` optionally specifying
379389
raw or document (default is `raw`)
390+
:param request_headers: `dict` optionally specifying the headers from
391+
the request
380392
381393
382394
:raises UnknownProcessError: if the input process_id does not
@@ -442,10 +454,12 @@ def execute_process(
442454
}
443455
self.add_job(job_metadata)
444456

445-
# only pass subscriber if supported, otherwise this breaks
457+
# only pass subscriber and headers if supported, otherwise this breaks
446458
# existing managers
447459
if self.supports_subscribing:
448460
extra_execute_handler_parameters['subscriber'] = subscriber
461+
if self.supports_request_headers:
462+
extra_execute_handler_parameters['request_headers'] = request_headers # noqa
449463

450464
# TODO: handler's response could also be allowed to include more HTTP
451465
# headers

pygeoapi/process/manager/dummy.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ def execute_process(
7979
execution_mode: Optional[RequestedProcessExecutionMode] = None,
8080
requested_outputs: Optional[dict] = None,
8181
subscriber: Optional[Subscriber] = None,
82-
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa
82+
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa
83+
request_headers: Optional[dict] = None
8384
) -> Tuple[str, str, Any, JobStatus, Optional[Dict[str, str]]]:
8485
"""
8586
Default process execution handler
@@ -95,6 +96,8 @@ def execute_process(
9596
:param subscriber: `Subscriber` optionally specifying callback urls
9697
:param requested_response: `RequestedResponse` optionally specifying
9798
raw or document (default is `raw`)
99+
:param request_headers: `dict` optionally specifying the headers from
100+
the request
98101
99102
:raises UnknownProcessError: if the input process_id does not
100103
correspond to a known process

pygeoapi/process/manager/mongodb_.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def __init__(self, manager_def):
4747
super().__init__(manager_def)
4848
self.is_async = True
4949
self.supports_subscribing = True
50+
self.supports_request_headers = True
5051

5152
def _connect(self):
5253
try:

pygeoapi/process/manager/postgresql.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ def __init__(self, manager_def: dict):
7979
self.is_async = True
8080
self.id_field = 'identifier'
8181
self.supports_subscribing = True
82+
self.supports_request_headers = True
8283
self.connection = manager_def['connection']
8384

8485
try:

pygeoapi/process/manager/tinydb_.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ def __init__(self, manager_def: dict):
6363
super().__init__(manager_def)
6464
self.is_async = True
6565
self.supports_subscribing = True
66+
self.supports_request_headers = True
6667

6768
@contextmanager
6869
def _db(self):

0 commit comments

Comments
 (0)