32
32
import os
33
33
import stat
34
34
import sys
35
+ import unittest
35
36
36
- from tests .support import MIG_BASE , MigTestCase , testmain
37
+ from tests .support import MIG_BASE , TEST_BASE , TEST_DATA_DIR , MigTestCase , testmain
37
38
from mig .shared .output import format_output
38
39
import mig .shared .returnvalues as returnvalues
39
40
@@ -66,14 +67,35 @@ def _is_return_value(return_value):
66
67
return return_value in defined_return_values
67
68
68
69
69
- def _trigger_and_unpack_result (application_result ):
70
+ def _trigger_and_unpack_result (application_result , result_kind = 'textual' ):
71
+ assert result_kind in ('textual' , 'binary' )
72
+
70
73
chunks = list (application_result )
71
74
assert len (chunks ) > 0 , "invocation returned no output"
72
75
complete_value = b'' .join (chunks )
73
- decoded_value = codecs .decode (complete_value , 'utf8' )
76
+ if result_kind == 'binary' :
77
+ decoded_value = complete_value
78
+ else :
79
+ decoded_value = codecs .decode (complete_value , 'utf8' )
74
80
return decoded_value
75
81
76
82
83
+ def create_instrumented_fieldstorage_to_dict ():
84
+ def _instrumented_fieldstorage_to_dict (fieldstorage ):
85
+ return _instrumented_fieldstorage_to_dict ._result
86
+
87
+ _instrumented_fieldstorage_to_dict ._result = {
88
+ 'output_format' : ('html' ,)
89
+ }
90
+
91
+ def set_result (result ):
92
+ _instrumented_fieldstorage_to_dict ._result = result
93
+
94
+ _instrumented_fieldstorage_to_dict .set_result = set_result
95
+
96
+ return _instrumented_fieldstorage_to_dict
97
+
98
+
77
99
def create_instrumented_format_output ():
78
100
def _instrumented_format_output (
79
101
configuration ,
@@ -88,6 +110,16 @@ def _instrumented_format_output(
88
110
call_args = (configuration , backend , ret_val , ret_msg , call_args_out_obj , outputformat ,)
89
111
_instrumented_format_output .calls .append ({ 'args' : call_args })
90
112
113
+ if _instrumented_format_output ._file :
114
+ return format_output (
115
+ configuration ,
116
+ backend ,
117
+ ret_val ,
118
+ ret_msg ,
119
+ out_obj ,
120
+ outputformat ,
121
+ )
122
+
91
123
# FIXME: the following is a workaround for a bug that exists between the WSGI wrapper
92
124
# and the output formatter - specifically, the latter adds default header and
93
125
# title if start does not exist, but the former ensures that start always exists
@@ -122,11 +154,18 @@ def _instrumented_format_output(
122
154
outputformat ,
123
155
)
124
156
_instrumented_format_output .calls = []
157
+ _instrumented_format_output ._file = False
125
158
_instrumented_format_output .values = dict (
126
159
title_text = '' ,
127
160
header_text = '' ,
128
161
)
129
162
163
+
164
+ def _set_file (is_enabled ):
165
+ _instrumented_format_output ._file = is_enabled
166
+
167
+ setattr (_instrumented_format_output , 'set_file' , _set_file )
168
+
130
169
def _program_values (** kwargs ):
131
170
_instrumented_format_output .values .update (kwargs )
132
171
@@ -185,13 +224,15 @@ def before_each(self):
185
224
self .fake_start_response = create_wsgi_start_response ()
186
225
187
226
# MiG WSGI wrapper specific setup
227
+ self .instrumented_fieldstorage_to_dict = create_instrumented_fieldstorage_to_dict ()
188
228
self .instrumented_format_output = create_instrumented_format_output ()
189
229
self .instrumented_retrieve_handler = create_instrumented_retrieve_handler ()
190
230
191
231
self .application_args = (self .configuration , self .fake_wsgi_environ , self .fake_start_response ,)
192
232
self .application_kwargs = dict (
193
233
_wrap_wsgi_errors = noop ,
194
234
_format_output = self .instrumented_format_output ,
235
+ _fieldstorage_to_dict = self .instrumented_fieldstorage_to_dict ,
195
236
_retrieve_handler = self .instrumented_retrieve_handler ,
196
237
_set_environ = noop ,
197
238
)
@@ -236,6 +277,29 @@ def test_return_value_ok_returns_expected_title(self):
236
277
self .assertInstrumentation ()
237
278
self .assertHtmlElementTextContent (output , 'title' , 'TEST' , trim_newlines = True )
238
279
280
+ def test_return_value_ok_serving_a_binary_file (self ):
281
+ test_binary_file = os .path .join (TEST_DATA_DIR , 'loading.gif' )
282
+ with open (test_binary_file , 'rb' ) as f :
283
+ test_binary_data = f .read ()
284
+
285
+ self .instrumented_fieldstorage_to_dict .set_result ({
286
+ 'output_format' : ('file' ,)
287
+ })
288
+ self .instrumented_format_output .set_file (True )
289
+
290
+ file_obj = { 'object_type' : 'binary' , 'data' : test_binary_data }
291
+ self .instrumented_retrieve_handler .program ([file_obj ], returnvalues .OK )
292
+
293
+ application_result = migwsgi ._application (
294
+ * self .application_args ,
295
+ ** self .application_kwargs
296
+ )
297
+
298
+ output = _trigger_and_unpack_result (application_result , 'binary' )
299
+
300
+ self .assertInstrumentation ()
301
+ self .assertEqual (output , test_binary_data )
302
+
239
303
240
304
if __name__ == '__main__' :
241
305
testmain ()
0 commit comments