32
32
import os
33
33
import stat
34
34
import sys
35
+ import unittest
35
36
36
- from tests .support import MIG_BASE , MigTestCase , testmain
37
+ from tests .support import MIG_BASE , TEST_BASE , TEST_DATA_DIR , MigTestCase , testmain
37
38
from mig .shared .output import format_output
38
39
import mig .shared .returnvalues as returnvalues
39
40
@@ -65,14 +66,35 @@ def _is_return_value(return_value):
65
66
return return_value in defined_return_values
66
67
67
68
68
- def _trigger_and_unpack_result (application_result ):
69
+ def _trigger_and_unpack_result (application_result , result_kind = 'textual' ):
70
+ assert result_kind in ('textual' , 'binary' )
71
+
69
72
chunks = list (application_result )
70
73
assert len (chunks ) > 0 , "invocation returned no output"
71
74
complete_value = b'' .join (chunks )
72
- decoded_value = codecs .decode (complete_value , 'utf8' )
75
+ if result_kind == 'binary' :
76
+ decoded_value = complete_value
77
+ else :
78
+ decoded_value = codecs .decode (complete_value , 'utf8' )
73
79
return decoded_value
74
80
75
81
82
+ def create_instrumented_fieldstorage_to_dict ():
83
+ def _instrumented_fieldstorage_to_dict (fieldstorage ):
84
+ return _instrumented_fieldstorage_to_dict ._result
85
+
86
+ _instrumented_fieldstorage_to_dict ._result = {
87
+ 'output_format' : ('html' ,)
88
+ }
89
+
90
+ def set_result (result ):
91
+ _instrumented_fieldstorage_to_dict ._result = result
92
+
93
+ _instrumented_fieldstorage_to_dict .set_result = set_result
94
+
95
+ return _instrumented_fieldstorage_to_dict
96
+
97
+
76
98
def create_instrumented_format_output ():
77
99
def _instrumented_format_output (
78
100
configuration ,
@@ -87,6 +109,16 @@ def _instrumented_format_output(
87
109
call_args = (configuration , backend , ret_val , ret_msg , call_args_out_obj , outputformat ,)
88
110
_instrumented_format_output .calls .append ({ 'args' : call_args })
89
111
112
+ if _instrumented_format_output ._file :
113
+ return format_output (
114
+ configuration ,
115
+ backend ,
116
+ ret_val ,
117
+ ret_msg ,
118
+ out_obj ,
119
+ outputformat ,
120
+ )
121
+
90
122
# FIXME: the following is a workaround for a bug that exists between the WSGI wrapper
91
123
# and the output formatter - specifically, the latter adds default header and
92
124
# title if start does not exist, but the former ensures that start always exists
@@ -121,11 +153,18 @@ def _instrumented_format_output(
121
153
outputformat ,
122
154
)
123
155
_instrumented_format_output .calls = []
156
+ _instrumented_format_output ._file = False
124
157
_instrumented_format_output .values = dict (
125
158
title_text = '' ,
126
159
header_text = '' ,
127
160
)
128
161
162
+
163
+ def _set_file (is_enabled ):
164
+ _instrumented_format_output ._file = is_enabled
165
+
166
+ setattr (_instrumented_format_output , 'set_file' , _set_file )
167
+
129
168
def _program_values (** kwargs ):
130
169
_instrumented_format_output .values .update (kwargs )
131
170
@@ -179,6 +218,7 @@ def before_each(self):
179
218
self .fake_wsgi = prepare_wsgi (self .configuration , 'http://localhost/' )
180
219
181
220
# MiG WSGI wrapper specific setup
221
+ self .instrumented_fieldstorage_to_dict = create_instrumented_fieldstorage_to_dict ()
182
222
self .instrumented_format_output = create_instrumented_format_output ()
183
223
self .instrumented_retrieve_handler = create_instrumented_retrieve_handler ()
184
224
@@ -190,6 +230,7 @@ def before_each(self):
190
230
self .application_kwargs = dict (
191
231
_wrap_wsgi_errors = noop ,
192
232
_format_output = self .instrumented_format_output ,
233
+ _fieldstorage_to_dict = self .instrumented_fieldstorage_to_dict ,
193
234
_retrieve_handler = self .instrumented_retrieve_handler ,
194
235
_set_environ = noop ,
195
236
)
@@ -230,6 +271,29 @@ def test_return_value_ok_returns_expected_title(self):
230
271
self .assertHtmlElementTextContent (output , 'title' , 'TEST' , trim_newlines = True )
231
272
self .assertInstrumentation ()
232
273
274
+ def test_return_value_ok_serving_a_binary_file (self ):
275
+ test_binary_file = os .path .join (TEST_DATA_DIR , 'loading.gif' )
276
+ with open (test_binary_file , 'rb' ) as f :
277
+ test_binary_data = f .read ()
278
+
279
+ self .instrumented_fieldstorage_to_dict .set_result ({
280
+ 'output_format' : ('file' ,)
281
+ })
282
+ self .instrumented_format_output .set_file (True )
283
+
284
+ file_obj = { 'object_type' : 'binary' , 'data' : test_binary_data }
285
+ self .instrumented_retrieve_handler .program ([file_obj ], returnvalues .OK )
286
+
287
+ application_result = migwsgi ._application (
288
+ * self .application_args ,
289
+ ** self .application_kwargs
290
+ )
291
+
292
+ output = _trigger_and_unpack_result (application_result , 'binary' )
293
+
294
+ self .assertInstrumentation ()
295
+ self .assertEqual (output , test_binary_data )
296
+
233
297
234
298
if __name__ == '__main__' :
235
299
testmain ()
0 commit comments