32
32
import os
33
33
import stat
34
34
import sys
35
+ import unittest
35
36
36
- from tests .support import MIG_BASE , MigTestCase , testmain
37
+ from tests .support import MIG_BASE , TEST_BASE , TEST_DATA_DIR , MigTestCase , testmain
37
38
from mig .shared .output import format_output
38
39
import mig .shared .returnvalues as returnvalues
39
40
@@ -88,14 +89,35 @@ def _is_return_value(return_value):
88
89
return return_value in defined_return_values
89
90
90
91
91
- def _trigger_and_unpack_result (application_result ):
92
+ def _trigger_and_unpack_result (application_result , result_kind = 'textual' ):
93
+ assert result_kind in ('textual' , 'binary' )
94
+
92
95
chunks = list (application_result )
93
96
assert len (chunks ) > 0 , "invocation returned no output"
94
97
complete_value = b'' .join (chunks )
95
- decoded_value = codecs .decode (complete_value , 'utf8' )
98
+ if result_kind == 'binary' :
99
+ decoded_value = complete_value
100
+ else :
101
+ decoded_value = codecs .decode (complete_value , 'utf8' )
96
102
return decoded_value
97
103
98
104
105
+ def create_instrumented_fieldstorage_to_dict ():
106
+ def _instrumented_fieldstorage_to_dict (fieldstorage ):
107
+ return _instrumented_fieldstorage_to_dict ._result
108
+
109
+ _instrumented_fieldstorage_to_dict ._result = {
110
+ 'output_format' : ('html' ,)
111
+ }
112
+
113
+ def set_result (result ):
114
+ _instrumented_fieldstorage_to_dict ._result = result
115
+
116
+ _instrumented_fieldstorage_to_dict .set_result = set_result
117
+
118
+ return _instrumented_fieldstorage_to_dict
119
+
120
+
99
121
def create_instrumented_format_output ():
100
122
def _instrumented_format_output (
101
123
configuration ,
@@ -110,6 +132,16 @@ def _instrumented_format_output(
110
132
call_args = (configuration , backend , ret_val , ret_msg , call_args_out_obj , outputformat ,)
111
133
_instrumented_format_output .calls .append ({ 'args' : call_args })
112
134
135
+ if _instrumented_format_output ._file :
136
+ return format_output (
137
+ configuration ,
138
+ backend ,
139
+ ret_val ,
140
+ ret_msg ,
141
+ out_obj ,
142
+ outputformat ,
143
+ )
144
+
113
145
# FIXME: the following is a workaround for a bug that exists between the WSGI wrapper
114
146
# and the output formatter - specifically, the latter adds default header and
115
147
# title if start does not exist, but the former ensures that start always exists
@@ -144,11 +176,18 @@ def _instrumented_format_output(
144
176
outputformat ,
145
177
)
146
178
_instrumented_format_output .calls = []
179
+ _instrumented_format_output ._file = False
147
180
_instrumented_format_output .values = dict (
148
181
title_text = '' ,
149
182
header_text = '' ,
150
183
)
151
184
185
+
186
+ def _set_file (is_enabled ):
187
+ _instrumented_format_output ._file = is_enabled
188
+
189
+ setattr (_instrumented_format_output , 'set_file' , _set_file )
190
+
152
191
def _program_values (** kwargs ):
153
192
_instrumented_format_output .values .update (kwargs )
154
193
@@ -254,12 +293,14 @@ def fake_set_environ(value):
254
293
path_info = '/' ,
255
294
))
256
295
296
+ self .instrumented_fieldstorage_to_dict = create_instrumented_fieldstorage_to_dict ()
257
297
self .instrumented_format_output = create_instrumented_format_output ()
258
298
self .instrumented_retrieve_handler = create_instrumented_retrieve_handler ()
259
299
260
300
self .application_args = (fake_wsgi_environ , fake_start_response ,)
261
301
self .application_kwargs = dict (
262
302
_format_output = self .instrumented_format_output ,
303
+ _fieldstorage_to_dict = self .instrumented_fieldstorage_to_dict ,
263
304
_retrieve_handler = self .instrumented_retrieve_handler ,
264
305
_set_environ = fake_set_environ ,
265
306
)
@@ -313,6 +354,32 @@ def test_return_value_ok_returns_expected_title(self):
313
354
self .assertInstrumentation ()
314
355
self .assertHtmlElementTextContent (output , 'title' , 'TEST' , trim_newlines = True )
315
356
357
+ def test_xxxx (self ):
358
+ test_binary_file = os .path .join (TEST_DATA_DIR , 'loading.gif' )
359
+ with open (test_binary_file , 'rb' ) as f :
360
+ test_binary_data = f .read ()
361
+
362
+ self .instrumented_fieldstorage_to_dict .set_result ({
363
+ 'output_format' : ('file' ,)
364
+ })
365
+ self .instrumented_format_output .set_file (True )
366
+
367
+ file_obj = { 'object_type' : 'binary' , 'data' : test_binary_data }
368
+ self .instrumented_retrieve_handler .program ([file_obj ], returnvalues .OK )
369
+
370
+ application_result = migwsgi ._application (
371
+ * self .application_args ,
372
+ _wrap_wsgi_errors = noop ,
373
+ _config_file = _TEST_CONF_FILE ,
374
+ _skip_log = True ,
375
+ ** self .application_kwargs
376
+ )
377
+
378
+ output = _trigger_and_unpack_result (application_result , 'binary' )
379
+
380
+ self .assertInstrumentation ()
381
+ self .assertEqual (output , test_binary_data )
382
+
316
383
317
384
if __name__ == '__main__' :
318
385
testmain ()
0 commit comments