1
- from typing import List , Tuple , Iterator
1
+ from typing import List , Tuple , Iterator , Iterable , Any , Optional , Union
2
+ from functools import wraps
2
3
3
4
import pandas as pd
4
5
8
9
from exasol_udf_mock_python .udf_context import UDFContext
9
10
10
11
12
+ def check_context (f ):
13
+ """
14
+ Decorator checking that a MockContext object has valid current group context.
15
+ Raises a RuntimeError if this is not the case.
16
+ """
17
+ @wraps (f )
18
+ def wrapper (self , * args , ** kwargs ):
19
+ if self .no_context :
20
+ raise RuntimeError ('Calling UDFContext interface when the current group context '
21
+ 'is invalid is disallowed' )
22
+ return f (self , * args , ** kwargs )
23
+
24
+ return wrapper
25
+
26
+
27
+ def validate_emit (row : Tuple , columns : List [Column ]):
28
+ """
29
+ Validates that a data row to be emitted corresponds to the definition of the output columns.
30
+ The number of elements in the row should match the number of columns and the type of each
31
+ element should match the type of the correspondent column. Raises a ValueError if the first
32
+ condition is false or a TypeError if the second condition is false.
33
+
34
+ :param row: Data row
35
+ :param columns: Column definition.
36
+ """
37
+ if len (row ) != len (columns ):
38
+ raise ValueError (f"row { row } has not the same number of values as columns are defined" )
39
+ for i , column in enumerate (columns ):
40
+ if row [i ] is not None and not isinstance (row [i ], column .type ):
41
+ raise TypeError (f"Value { row [i ]} ({ type (row [i ])} ) at position { i } is not a { column .type } " )
42
+
43
+
11
44
class MockContext (UDFContext ):
45
+ """
46
+ Implementation of generic UDF Mock Context interface for a SET UDF with groups.
47
+ This class allows iterating over groups. The functionality of the UDF Context are applicable
48
+ for the current input group.
49
+
50
+ Call `next_group` to iterate over groups. The `output_groups` property provides the emit
51
+ output for all groups iterated so far including the output for the current group.
52
+
53
+ Calling any function of the UDFContext interface when the group iterator has passed the end
54
+ or before the first call to the `next_group` is illegal and will cause a RuntimeException.
55
+ """
12
56
13
57
def __init__ (self , input_groups : Iterator [Group ], metadata : MockMetaData ):
58
+ """
59
+ :param input_groups: Input groups. Each group object should contain input rows for the group.
60
+
61
+ :param metadata: The mock metadata object.
62
+ """
63
+
14
64
self ._input_groups = input_groups
15
- self ._output_groups = []
16
- self ._input_group = None # type: Group
17
- self ._output_group_list = None # type: List
18
- self ._output_group = None # type: Group
19
- self ._iter = None # type: Iterator[Tuple]
20
- self ._len = None # type: int
21
65
self ._metadata = metadata
22
- self ._name_position_map = \
23
- {column .name : position
24
- for position , column
25
- in enumerate (metadata .input_columns )}
66
+ """ Mock context for the current group """
67
+ self ._current_context : Optional [StandaloneMockContext ] = None
68
+ """ Output for all groups """
69
+ self ._previous_output : List [Group ] = []
70
+
71
+ @property
72
+ def no_context (self ) -> bool :
73
+ """Returns True if the current group context is invalid"""
74
+ return self ._current_context is None
26
75
27
- def _next_group (self ):
76
+ def next_group (self ) -> bool :
77
+ """
78
+ Moves group iterator to the next group.
79
+ Returns False if the iterator gets beyond the last group. Returns True otherwise.
80
+ """
81
+
82
+ # Save output of the current group
83
+ if self ._current_context is not None :
84
+ self ._previous_output .append (Group (self ._current_context .output ))
85
+ self ._current_context = None
86
+
87
+ # Try get to the next input group
28
88
try :
29
- self . _input_group = next (self ._input_groups )
89
+ input_group = next (self ._input_groups )
30
90
except StopIteration as e :
31
- self ._data = None
32
- self ._output_group_list = None
33
- self ._output_group = None
34
- self ._input_group = None
35
- self ._iter = None
36
- self ._len = None
37
91
return False
38
- self ._len = len (self ._input_group )
39
- if self ._len == 0 :
40
- self ._data = None
41
- self ._output_group_list = None
42
- self ._output_group = None
43
- self ._input_group = None
44
- self ._iter = None
45
- self ._len = None
46
- raise RuntimeError ("Empty input groups are not allowd" )
47
- self ._output_group_list = []
48
- self ._output_group = Group (self ._output_group_list )
49
- self ._output_groups .append (self ._output_group )
50
- self ._iter = iter (self ._input_group )
51
- self .next ()
92
+ if len (input_group ) == 0 :
93
+ raise RuntimeError ("Empty input groups are not allowed" )
94
+
95
+ # Create Mock Context for the new input group
96
+ self ._current_context = StandaloneMockContext (input_group , self ._metadata )
52
97
return True
53
98
54
- def _is_positive_integer (self , value ):
99
+ @property
100
+ def output_groups (self ):
101
+ """
102
+ Output of all groups including the current one.
103
+ """
104
+ if self ._current_context is None :
105
+ return self ._previous_output
106
+ else :
107
+ groups = list (self ._previous_output )
108
+ groups .append (Group (self ._current_context .output ))
109
+ return groups
110
+
111
+ @check_context
112
+ def __getattr__ (self , name ):
113
+ return getattr (self ._current_context , name )
114
+
115
+ @check_context
116
+ def get_dataframe (self , num_rows : Union [str , int ], start_col : int = 0 ) -> Optional [pd .DataFrame ]:
117
+ return self ._current_context .get_dataframe (num_rows , start_col )
118
+
119
+ @check_context
120
+ def next (self , reset : bool = False ) -> bool :
121
+ return self ._current_context .next (reset )
122
+
123
+ @check_context
124
+ def size (self ) -> int :
125
+ return self ._current_context .size ()
126
+
127
+ @check_context
128
+ def reset (self ) -> None :
129
+ self ._current_context .reset ()
130
+
131
+ @check_context
132
+ def emit (self , * args ) -> None :
133
+ self ._current_context .emit (* args )
134
+
135
+
136
+ def get_scalar_input (inp : Any ) -> Iterable [Tuple [Any , ...]]:
137
+ """
138
+ Figures out if the SCALAR parameters are provided as a scalar value or a tuple
139
+ and also if there is a wrapping container around.
140
+ Unless the parameters are already in a wrapping container returns parameters as a tuple wrapped
141
+ into a one-item list, e.g [(param1[, param2, ...)]. Otherwise, returns the original input.
142
+
143
+ :param inp: Input parameters.
144
+ """
145
+
146
+ if isinstance (inp , Iterable ) and not isinstance (inp , str ):
147
+ row1 = next (iter (inp ))
148
+ if isinstance (row1 , Iterable ) and not isinstance (row1 , str ):
149
+ return inp
150
+ else :
151
+ return [inp ]
152
+ else :
153
+ return [(inp ,)]
154
+
155
+
156
+ class StandaloneMockContext (UDFContext ):
157
+ """
158
+ Implementation of generic UDF Mock Context interface a SCALAR UDF or a SET UDF with no groups.
159
+
160
+ For Emit UDFs the output in the form of the list of tuples can be
161
+ accessed by reading the `output` property.
162
+ """
163
+
164
+ def __init__ (self , inp : Any , metadata : MockMetaData ):
165
+ """
166
+ :param inp: Input rows for a SET UDF or parameters for a SCALAR one.
167
+ In the former case the input object must be an iterable of rows. This, for example,
168
+ can be a Group object. It must implement the __len__ method. Each data row must be
169
+ an indexable container, e.g. a tuple.
170
+ In the SCALAR case the input can be a scalar value, or tuple. This can also be wrapped
171
+ in an iterable container, similar to the SET case.
172
+
173
+ :param metadata: The mock metadata object.
174
+ """
175
+ if metadata .input_type .upper () == 'SCALAR' :
176
+ self ._input = get_scalar_input (inp )
177
+ else :
178
+ self ._input = inp
179
+ self ._metadata = metadata
180
+ self ._data : Optional [Any ] = None
181
+ self ._iter : Optional [Iterator [Tuple [Any , ...]]] = None
182
+ self ._name_position_map = \
183
+ {column .name : position
184
+ for position , column
185
+ in enumerate (metadata .input_columns )}
186
+ self ._output = []
187
+ self .next (reset = True )
188
+
189
+ @property
190
+ def output (self ) -> List [Tuple [Any , ...]]:
191
+ """Emitted output so far"""
192
+ return self ._output
193
+
194
+ @staticmethod
195
+ def _is_positive_integer (value ):
55
196
return value is not None and isinstance (value , int ) and value > 0
56
197
57
198
def get_dataframe (self , num_rows = 'all' , start_col = 0 ):
@@ -80,26 +221,26 @@ def get_dataframe(self, num_rows='all', start_col=0):
80
221
return df
81
222
82
223
def __getattr__ (self , name ):
83
- return self ._data [self ._name_position_map [name ]]
224
+ return None if self . _data is None else self ._data [self ._name_position_map [name ]]
84
225
85
226
def next (self , reset : bool = False ):
86
- if reset :
227
+ if self . _iter is None or reset :
87
228
self .reset ()
88
229
else :
89
230
try :
90
231
new_data = next (self ._iter )
91
232
self ._data = new_data
92
- self . _validate_tuples (self ._data , self ._metadata .input_columns )
233
+ validate_emit (self ._data , self ._metadata .input_columns )
93
234
return True
94
235
except StopIteration as e :
95
236
self ._data = None
96
237
return False
97
238
98
239
def size (self ):
99
- return self ._len
240
+ return len ( self ._input )
100
241
101
242
def reset (self ):
102
- self ._iter = iter (self ._input_group )
243
+ self ._iter = iter (self ._input )
103
244
self .next ()
104
245
105
246
def emit (self , * args ):
@@ -108,13 +249,5 @@ def emit(self, *args):
108
249
else :
109
250
tuples = [args ]
110
251
for row in tuples :
111
- self ._validate_tuples (row , self ._metadata .output_columns )
112
- self ._output_group_list .extend (tuples )
113
- return
114
-
115
- def _validate_tuples (self , row : Tuple , columns : List [Column ]):
116
- if len (row ) != len (columns ):
117
- raise Exception (f"row { row } has not the same number of values as columns are defined" )
118
- for i , column in enumerate (columns ):
119
- if row [i ] is not None and not isinstance (row [i ], column .type ):
120
- raise TypeError (f"Value { row [i ]} ({ type (row [i ])} ) at position { i } is not a { column .type } " )
252
+ validate_emit (row , self ._metadata .output_columns )
253
+ self ._output .extend (tuples )
0 commit comments