|
| 1 | +import typing as t |
| 2 | +from enum import Enum |
| 3 | + |
| 4 | +from ellar.core.context import IExecutionContext |
| 5 | +from ellar.serializer import Serializer, SerializerFilter, serialize_object |
| 6 | + |
| 7 | +from ..response_types import FileResponse, Response, StreamingResponse |
| 8 | +from .base import ResponseModel, ResponseModelField |
| 9 | + |
| 10 | + |
| 11 | +class StreamingResponseModelInvalidContent(RuntimeError): |
| 12 | + pass |
| 13 | + |
| 14 | + |
| 15 | +class ContentDispositionType(str, Enum): |
| 16 | + inline = "inline" |
| 17 | + attachment = "attachment" |
| 18 | + |
| 19 | + |
| 20 | +class FileResponseModelSchema(Serializer): |
| 21 | + path: str |
| 22 | + media_type: t.Optional[str] = None |
| 23 | + filename: t.Optional[str] = None |
| 24 | + method: t.Optional[str] = None |
| 25 | + content_disposition_type: ContentDispositionType = ContentDispositionType.attachment |
| 26 | + |
| 27 | + |
| 28 | +class FileResponseModel(ResponseModel): |
| 29 | + __slots__ = ("_file_init_schema",) |
| 30 | + |
| 31 | + response_type: t.Type[Response] = FileResponse |
| 32 | + file_init_schema_type = FileResponseModelSchema |
| 33 | + |
| 34 | + def __init__(self, *args: t.Any, **kwargs: t.Any) -> None: |
| 35 | + super(FileResponseModel, self).__init__(*args, **kwargs) |
| 36 | + self._file_init_schema = self._get_model_field_from_schema( |
| 37 | + self.file_init_schema_type |
| 38 | + ) |
| 39 | + |
| 40 | + def create_response( |
| 41 | + self, context: IExecutionContext, response_obj: t.Any, status_code: int |
| 42 | + ) -> Response: |
| 43 | + response_args, headers = self.get_context_response( |
| 44 | + context=context, status_code=status_code |
| 45 | + ) |
| 46 | + |
| 47 | + init_kwargs = serialize_object(self.serialize(response_obj)) |
| 48 | + response_args.update(init_kwargs) |
| 49 | + |
| 50 | + response = self._response_type( |
| 51 | + **response_args, |
| 52 | + headers=headers, |
| 53 | + ) |
| 54 | + return response |
| 55 | + |
| 56 | + def get_init_kwargs_schema(self) -> ResponseModelField: |
| 57 | + assert self._file_init_schema |
| 58 | + return self._file_init_schema |
| 59 | + |
| 60 | + def get_model_field(self) -> t.Optional[t.Union[ResponseModelField, t.Any]]: |
| 61 | + # We don't want any schema for this. |
| 62 | + return None |
| 63 | + |
| 64 | + def serialize( |
| 65 | + self, |
| 66 | + response_obj: t.Any, |
| 67 | + serializer_filter: t.Optional[SerializerFilter] = None, |
| 68 | + ) -> t.Union[t.List[t.Dict], t.Dict, t.Any]: |
| 69 | + _response_model_field = self.get_init_kwargs_schema() |
| 70 | + return _response_model_field.serialize( |
| 71 | + response_obj, serializer_filter=serializer_filter |
| 72 | + ) |
| 73 | + |
| 74 | + |
| 75 | +class StreamingResponseModel(ResponseModel): |
| 76 | + response_type = StreamingResponse |
| 77 | + |
| 78 | + def get_model_field(self) -> t.Optional[t.Union[ResponseModelField, t.Any]]: |
| 79 | + # We don't want any schema for this. |
| 80 | + return None |
| 81 | + |
| 82 | + def create_response( |
| 83 | + self, context: IExecutionContext, response_obj: t.Any, status_code: int |
| 84 | + ) -> Response: |
| 85 | + response_args, headers = self.get_context_response( |
| 86 | + context=context, status_code=status_code |
| 87 | + ) |
| 88 | + if not isinstance(response_obj, (t.AsyncGenerator, t.Generator)): |
| 89 | + raise StreamingResponseModelInvalidContent( |
| 90 | + "Content must typing.AsyncIterable OR typing.Iterable" |
| 91 | + ) |
| 92 | + |
| 93 | + response = self._response_type( |
| 94 | + **response_args, headers=headers, content=response_obj |
| 95 | + ) |
| 96 | + return response |
0 commit comments