Skip to content

Commit 465f2ea

Browse files
authored
Allow to ignore missing columns when calling "SDF.drop()" (#434)
1 parent 04dffef commit 465f2ea

File tree

3 files changed

+43
-5
lines changed

3 files changed

+43
-5
lines changed

docs/processing.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,9 @@ It accepts either one column name as a string or a list of names.
150150

151151
The `.drop()` method updates the existing `StreamingDataFrame` object and returns the same `StreamingDataFrame` instance so that you can chain other methods after the `drop()` call, too.
152152

153+
By default, the `.drop()` fails if one of the columns is missing in the value.
154+
If you expect some columns to be missing, you can pass `errors="ignore"` to the `.drop()` method to suppress the errors.
155+
153156
Internally, it mutates the record's value and deletes the keys in place.
154157

155158
**Example**:
@@ -184,6 +187,9 @@ sdf.drop("metadata")
184187

185188
# You may also drop multiple keys by providing a list of names:
186189
sdf.drop(["metadata", "timestamp"])
190+
191+
# You may suppress KeyErrors if some columns are missing in the value dictionary
192+
sdf.drop(["missing_column"], errors='ignore')
187193
```
188194

189195
> **_NOTE:_** The `StreamingDataFrame.drop()` method works only with mapping-like values like dictionaries.

quixstreams/dataframe/dataframe.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -966,7 +966,11 @@ def hopping_window(
966966
name=name,
967967
)
968968

969-
def drop(self, columns: Union[str, List[str]]) -> Self:
969+
def drop(
970+
self,
971+
columns: Union[str, List[str]],
972+
errors: Literal["ignore", "raise"] = "raise",
973+
) -> Self:
970974
"""
971975
Drop column(s) from the message value (value must support `del`, like a dict).
972976
@@ -985,6 +989,9 @@ def drop(self, columns: Union[str, List[str]]) -> Self:
985989
```
986990
987991
:param columns: a single column name or a list of names, where names are `str`
992+
:param errors: If "ignore", suppress error and only existing labels are dropped.
993+
Default - `"raise"`.
994+
988995
:return: a new StreamingDataFrame instance
989996
"""
990997
if isinstance(columns, list):
@@ -998,7 +1005,10 @@ def drop(self, columns: Union[str, List[str]]) -> Self:
9981005
raise TypeError(
9991006
f"Expected a string or a list of strings, not {type(columns)}"
10001007
)
1001-
return self._add_update(lambda value: _drop(value, columns), metadata=False)
1008+
return self._add_update(
1009+
lambda value: _drop(value, columns, ignore_missing=errors == "ignore"),
1010+
metadata=False,
1011+
)
10021012

10031013
def _produce(
10041014
self,
@@ -1145,14 +1155,19 @@ def __bool__(self):
11451155
)
11461156

11471157

1148-
def _drop(value: Dict, columns: List[str]):
1158+
def _drop(value: Dict, columns: List[str], ignore_missing: bool = False):
11491159
"""
11501160
remove columns from the value, inplace
11511161
:param value: a dict or something that supports `del`
11521162
:param columns: a list of column names
1163+
:param ignore_missing: if True, ignore missing columns
11531164
"""
11541165
for column in columns:
1155-
del value[column]
1166+
try:
1167+
del value[column]
1168+
except KeyError:
1169+
if not ignore_missing:
1170+
raise
11561171

11571172

11581173
def _as_metadata_func(

tests/test_quixstreams/test_dataframe/test_dataframe.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import json
21
import operator
32
import uuid
43
from collections import namedtuple
@@ -423,6 +422,24 @@ def test_drop_empty_list(self, dataframe_factory):
423422
post_drop_stream = sdf.stream.tree()
424423
assert pre_drop_stream == post_drop_stream
425424

425+
def test_drop_missing_columns_errors_raise(self, dataframe_factory):
426+
value = {"col_a": 1}
427+
key, timestamp, headers = b"key", 0, []
428+
sdf = dataframe_factory()
429+
sdf.drop(["col_b", "col_c"], errors="raise")
430+
with pytest.raises(KeyError):
431+
assert sdf.test(value=value, key=key, timestamp=timestamp, headers=headers)
432+
433+
def test_drop_missing_columns_errors_ignore(self, dataframe_factory):
434+
value = {"col_a": 1}
435+
expected = {"col_a": 1}
436+
key, timestamp, headers = b"key", 0, []
437+
sdf = dataframe_factory()
438+
sdf.drop(["col_b", "col_c"], errors="ignore")
439+
assert sdf.test(value=value, key=key, timestamp=timestamp, headers=headers)[
440+
0
441+
] == (expected, key, timestamp, headers)
442+
426443

427444
class TestStreamingDataFrameApplyExpand:
428445
def test_apply_expand(self, dataframe_factory):

0 commit comments

Comments
 (0)