@@ -32,7 +32,9 @@ def __init__(
32
32
):
33
33
super ().__init__ (target , stream_name , schema , key_properties )
34
34
self .client = self ._authenticated_client ()
35
- self .index_schema_fields = self .config .get ("index_schema_fields" , {}).get (self .stream_name , {})
35
+ self .index_schema_fields = self .config .get ("index_schema_fields" , {}).get (
36
+ self .stream_name , {}
37
+ )
36
38
self .metadata_fields = self .config .get ("metadata_fields" , {}).get (self .stream_name , {})
37
39
self .index_mappings = self .config .get ("index_mappings" , {}).get (self .stream_name , {})
38
40
self .index_name = None
@@ -97,7 +99,9 @@ def _build_fields(
97
99
for k , v in mapping .items ():
98
100
match = jsonpath_ng .parse (v ).find (record )
99
101
if len (match ) == 0 :
100
- self .logger .warning (f"schema key { k } with json path { v } could not be found in record: { record } " )
102
+ self .logger .warning (
103
+ f"schema key { k } with json path { v } could not be found in record: { record } "
104
+ )
101
105
schemas [k ] = v
102
106
else :
103
107
if len (match ) > 1 :
@@ -153,9 +157,13 @@ def create_index(self, index: str) -> None:
153
157
index = index , fields = list (self .index_mappings .keys ())
154
158
)[index ]["mappings" ].items ()
155
159
}
156
- if not all (self .index_mappings [key ]["type" ] == value for key , value in mappings .items ()):
160
+ if not all (
161
+ self .index_mappings [key ]["type" ] == value for key , value in mappings .items ()
162
+ ):
157
163
try :
158
- self .client .indices .put_mapping (index = index , body = {"properties" : self .index_mappings })
164
+ self .client .indices .put_mapping (
165
+ index = index , body = {"properties" : self .index_mappings }
166
+ )
159
167
except elasticsearch .exceptions .BadRequestError as e :
160
168
if e .message == "illegal_argument_exception" :
161
169
self .logger .warning (
@@ -209,7 +217,9 @@ def process_batch(self, context: dict[str, Any]) -> None:
209
217
Args:
210
218
context: Dictionary containing batch processing context including records.
211
219
"""
212
- updated_records , distinct_indices = self .build_request_body_and_distinct_indices (context ["records" ])
220
+ updated_records , distinct_indices = self .build_request_body_and_distinct_indices (
221
+ context ["records" ]
222
+ )
213
223
for index in distinct_indices :
214
224
self .create_index (index )
215
225
try :
0 commit comments