32
32
import os
33
33
import stat
34
34
import sys
35
+ import unittest
35
36
36
- from tests .support import MIG_BASE , MigTestCase , testmain
37
+ from tests .support import MIG_BASE , TEST_BASE , TEST_DATA_DIR , MigTestCase , testmain
37
38
from mig .shared .output import format_output
38
39
import mig .shared .returnvalues as returnvalues
39
40
@@ -90,14 +91,35 @@ def _is_return_value(return_value):
90
91
return return_value in defined_return_values
91
92
92
93
93
- def _trigger_and_unpack_result (application_result ):
94
+ def _trigger_and_unpack_result (application_result , result_kind = 'textual' ):
95
+ assert result_kind in ('textual' , 'binary' )
96
+
94
97
chunks = list (application_result )
95
98
assert len (chunks ) > 0 , "invocation returned no output"
96
99
complete_value = b'' .join (chunks )
97
- decoded_value = codecs .decode (complete_value , 'utf8' )
100
+ if result_kind == 'binary' :
101
+ decoded_value = complete_value
102
+ else :
103
+ decoded_value = codecs .decode (complete_value , 'utf8' )
98
104
return decoded_value
99
105
100
106
107
+ def create_instrumented_fieldstorage_to_dict ():
108
+ def _instrumented_fieldstorage_to_dict (fieldstorage ):
109
+ return _instrumented_fieldstorage_to_dict ._result
110
+
111
+ _instrumented_fieldstorage_to_dict ._result = {
112
+ 'output_format' : ('html' ,)
113
+ }
114
+
115
+ def set_result (result ):
116
+ _instrumented_fieldstorage_to_dict ._result = result
117
+
118
+ _instrumented_fieldstorage_to_dict .set_result = set_result
119
+
120
+ return _instrumented_fieldstorage_to_dict
121
+
122
+
101
123
def create_instrumented_format_output ():
102
124
def _instrumented_format_output (
103
125
configuration ,
@@ -112,6 +134,16 @@ def _instrumented_format_output(
112
134
call_args = (configuration , backend , ret_val , ret_msg , call_args_out_obj , outputformat ,)
113
135
_instrumented_format_output .calls .append ({ 'args' : call_args })
114
136
137
+ if _instrumented_format_output ._file :
138
+ return format_output (
139
+ configuration ,
140
+ backend ,
141
+ ret_val ,
142
+ ret_msg ,
143
+ out_obj ,
144
+ outputformat ,
145
+ )
146
+
115
147
# FIXME: the following is a workaround for a bug that exists between the WSGI wrapper
116
148
# and the output formatter - specifically, the latter adds default header and
117
149
# title if start does not exist, but the former ensures that start always exists
@@ -146,11 +178,18 @@ def _instrumented_format_output(
146
178
outputformat ,
147
179
)
148
180
_instrumented_format_output .calls = []
181
+ _instrumented_format_output ._file = False
149
182
_instrumented_format_output .values = dict (
150
183
title_text = '' ,
151
184
header_text = '' ,
152
185
)
153
186
187
+
188
+ def _set_file (is_enabled ):
189
+ _instrumented_format_output ._file = is_enabled
190
+
191
+ setattr (_instrumented_format_output , 'set_file' , _set_file )
192
+
154
193
def _program_values (** kwargs ):
155
194
_instrumented_format_output .values .update (kwargs )
156
195
@@ -209,6 +248,7 @@ def before_each(self):
209
248
self .fake_start_response = create_wsgi_start_response ()
210
249
211
250
# MiG WSGI wrapper specific setup
251
+ self .instrumented_fieldstorage_to_dict = create_instrumented_fieldstorage_to_dict ()
212
252
self .instrumented_format_output = create_instrumented_format_output ()
213
253
self .instrumented_retrieve_handler = create_instrumented_retrieve_handler ()
214
254
@@ -218,6 +258,7 @@ def before_each(self):
218
258
_config_file = _TEST_CONF_FILE ,
219
259
_skip_log = True ,
220
260
_format_output = self .instrumented_format_output ,
261
+ _fieldstorage_to_dict = self .instrumented_fieldstorage_to_dict ,
221
262
_retrieve_handler = self .instrumented_retrieve_handler ,
222
263
_set_environ = noop ,
223
264
)
@@ -262,6 +303,29 @@ def test_return_value_ok_returns_expected_title(self):
262
303
self .assertInstrumentation ()
263
304
self .assertHtmlElementTextContent (output , 'title' , 'TEST' , trim_newlines = True )
264
305
306
+ def test_xxxx (self ):
307
+ test_binary_file = os .path .join (TEST_DATA_DIR , 'loading.gif' )
308
+ with open (test_binary_file , 'rb' ) as f :
309
+ test_binary_data = f .read ()
310
+
311
+ self .instrumented_fieldstorage_to_dict .set_result ({
312
+ 'output_format' : ('file' ,)
313
+ })
314
+ self .instrumented_format_output .set_file (True )
315
+
316
+ file_obj = { 'object_type' : 'binary' , 'data' : test_binary_data }
317
+ self .instrumented_retrieve_handler .program ([file_obj ], returnvalues .OK )
318
+
319
+ application_result = migwsgi ._application (
320
+ * self .application_args ,
321
+ ** self .application_kwargs
322
+ )
323
+
324
+ output = _trigger_and_unpack_result (application_result , 'binary' )
325
+
326
+ self .assertInstrumentation ()
327
+ self .assertEqual (output , test_binary_data )
328
+
265
329
266
330
if __name__ == '__main__' :
267
331
testmain ()
0 commit comments