Skip to content

Commit 2b4a6ea

Browse files
committed
add keyword pre processing step
sometimes an array will be a long string with commas, make sure to parse this to a list before processing record.
1 parent 3695717 commit 2b4a6ea

File tree

1 file changed

+25
-15
lines changed

1 file changed

+25
-15
lines changed

target_elasticsearch/sinks.py

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,7 @@ def __init__(
3232
):
3333
super().__init__(target, stream_name, schema, key_properties)
3434
self.client = self._authenticated_client()
35-
self.index_schema_fields = self.config.get("index_schema_fields", {}).get(
36-
self.stream_name, {}
37-
)
35+
self.index_schema_fields = self.config.get("index_schema_fields", {}).get(self.stream_name, {})
3836
self.metadata_fields = self.config.get("metadata_fields", {}).get(self.stream_name, {})
3937
self.index_mappings = self.config.get("index_mappings", {}).get(self.stream_name, {})
4038
self.index_name = None
@@ -50,6 +48,26 @@ def setup(self) -> None:
5048
self.index_name = self._template_index()
5149
self.create_index(self.index_name)
5250

51+
def preprocess_record(self, record: dict, context: dict) -> dict: # noqa: PLR6301, ARG002
52+
"""Process incoming record and return a modified result.
53+
54+
Args:
55+
record: Individual record in the stream.
56+
context: Stream partition or context dictionary.
57+
58+
Returns:
59+
A new, processed record.
60+
"""
61+
for field, mapping in self.index_mappings.items():
62+
type = mapping.get("type")
63+
if type == "keyword":
64+
value = record.get(field)
65+
if value:
66+
record[field] = (
67+
[item.strip() for item in record[field].split(",")] if isinstance(value, str) else value
68+
)
69+
return record
70+
5371
def _template_index(self, schemas: dict = {}) -> str:
5472
"""Template the input index config for Elasticsearch indexing.
5573
@@ -99,9 +117,7 @@ def _build_fields(
99117
for k, v in mapping.items():
100118
match = jsonpath_ng.parse(v).find(record)
101119
if len(match) == 0:
102-
self.logger.warning(
103-
f"schema key {k} with json path {v} could not be found in record: {record}"
104-
)
120+
self.logger.warning(f"schema key {k} with json path {v} could not be found in record: {record}")
105121
schemas[k] = v
106122
else:
107123
if len(match) > 1:
@@ -157,13 +173,9 @@ def create_index(self, index: str) -> None:
157173
index=index, fields=list(self.index_mappings.keys())
158174
)[index]["mappings"].items()
159175
}
160-
if not all(
161-
self.index_mappings[key]["type"] == value for key, value in mappings.items()
162-
):
176+
if not all(self.index_mappings[key]["type"] == value for key, value in mappings.items()):
163177
try:
164-
self.client.indices.put_mapping(
165-
index=index, body={"properties": self.index_mappings}
166-
)
178+
self.client.indices.put_mapping(index=index, body={"properties": self.index_mappings})
167179
except elasticsearch.exceptions.BadRequestError as e:
168180
if e.message == "illegal_argument_exception":
169181
self.logger.warning(
@@ -217,9 +229,7 @@ def process_batch(self, context: dict[str, Any]) -> None:
217229
Args:
218230
context: Dictionary containing batch processing context including records.
219231
"""
220-
updated_records, distinct_indices = self.build_request_body_and_distinct_indices(
221-
context["records"]
222-
)
232+
updated_records, distinct_indices = self.build_request_body_and_distinct_indices(context["records"])
223233
for index in distinct_indices:
224234
self.create_index(index)
225235
try:

0 commit comments

Comments
 (0)