Skip to content

Commit a71cd97

Browse files
committed
feat: QueryList: Filter lists of dictionaries w/ nested support
1 parent 8a65bf2 commit a71cd97

File tree

4 files changed

+478
-0
lines changed

4 files changed

+478
-0
lines changed

docs/contributing/internals.md

+5
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,8 @@ stability policy.
1818
.. autoapimodule:: libvcs.types
1919
:members:
2020
```
21+
22+
```{eval-rst}
23+
.. autoapimodule:: libvcs.utils.query_list
24+
:members:
25+
```

libvcs/utils/__init__.py

Whitespace-only changes.

libvcs/utils/query_list.py

+233
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
import dataclasses
2+
import re
3+
import traceback
4+
from typing import Any, Callable, Generic, Optional, Protocol, Sequence, TypeVar, Union
5+
6+
T = TypeVar("T", Any, Any)
7+
8+
9+
def keygetter(obj, path):
10+
"""obj, "foods__breakfast", obj['foods']['breakfast']
11+
12+
>>> keygetter({ "foods": { "breakfast": "cereal" } }, "foods__breakfast")
13+
'cereal'
14+
>>> keygetter({ "foods": { "breakfast": "cereal" } }, "foods")
15+
{'breakfast': 'cereal'}
16+
17+
"""
18+
try:
19+
sub_fields = path.split("__")
20+
dct = obj
21+
for sub_field in sub_fields:
22+
dct = dct[sub_field]
23+
return dct
24+
except Exception as e:
25+
traceback.print_exception(e)
26+
return None
27+
28+
29+
def parse_lookup(obj, path, lookup):
30+
"""Check if field lookup key, e.g. "my__path__contains" has comparator, return val.
31+
32+
If comparator not used or value not found, return None.
33+
34+
mykey__endswith("mykey") -> "mykey" else None
35+
36+
>>> parse_lookup({ "food": "red apple" }, "food__istartswith", "__istartswith")
37+
'red apple'
38+
"""
39+
try:
40+
if path.endswith(lookup):
41+
if field_name := path.rsplit(lookup)[0]:
42+
return keygetter(obj, field_name)
43+
except Exception as e:
44+
traceback.print_exception(e)
45+
return None
46+
47+
48+
class LookupProtocol(Protocol):
49+
"""Protocol for :class:`QueryList` filtering operators."""
50+
51+
def __call__(self, data: Union[list[str], str], rhs: Union[list[str], str]):
52+
"""Callback for :class:`QueryList` filtering operators."""
53+
54+
55+
def lookup_exact(data, rhs):
56+
return rhs == data
57+
58+
59+
def lookup_iexact(data, rhs):
60+
return rhs.lower() == data.lower()
61+
62+
63+
def lookup_contains(data, rhs):
64+
return rhs in data
65+
66+
67+
def lookup_icontains(data, rhs):
68+
return rhs.lower() in data.lower()
69+
70+
71+
def lookup_startswith(data, rhs):
72+
return data.startswith(rhs)
73+
74+
75+
def lookup_istartswith(data, rhs):
76+
return data.lower().startswith(rhs.lower())
77+
78+
79+
def lookup_endswith(data, rhs):
80+
return data.endswith(rhs)
81+
82+
83+
def lookup_iendswith(data, rhs):
84+
return data.lower().endswith(rhs.lower())
85+
86+
87+
def lookup_in(data, rhs):
88+
if isinstance(rhs, list):
89+
return data in rhs
90+
return rhs in data
91+
92+
93+
def lookup_nin(data, rhs):
94+
if isinstance(rhs, list):
95+
return data not in rhs
96+
return rhs not in data
97+
98+
99+
def lookup_regex(data, rhs):
100+
return re.search(rhs, data)
101+
102+
103+
def lookup_iregex(data, rhs):
104+
return re.search(rhs, data, re.IGNORECASE)
105+
106+
107+
LOOKUP_NAME_MAP: dict[str, LookupProtocol] = {
108+
"eq": lookup_exact,
109+
"exact": lookup_exact,
110+
"iexact": lookup_iexact,
111+
"contains": lookup_contains,
112+
"icontains": lookup_icontains,
113+
"startswith": lookup_startswith,
114+
"istartswith": lookup_istartswith,
115+
"endswith": lookup_endswith,
116+
"iendswith": lookup_iendswith,
117+
"in": lookup_in,
118+
"nin": lookup_nin,
119+
"regex": lookup_regex,
120+
"iregex": lookup_iregex,
121+
}
122+
123+
124+
@dataclasses.dataclass(eq=False)
125+
class QueryList(Generic[T]):
126+
"""Filter list of object/dicts. For small, local datasets. *Experimental, unstable*.
127+
128+
:py:func:`dataclasses.dataclass` is only used for ``__repr__`` and pytest comparison
129+
details.
130+
131+
>>> query = QueryList(
132+
... [
133+
... {
134+
... "place": "Largo",
135+
... "city": "Tampa",
136+
... "state": "Florida",
137+
... "foods": {"fruit": ["banana", "orange"], "breakfast": "cereal"},
138+
... },
139+
... {
140+
... "place": "Chicago suburbs",
141+
... "city": "Elmhurst",
142+
... "state": "Illinois",
143+
... "foods": {"fruit": ["apple", "cantelope"], "breakfast": "waffles"},
144+
... },
145+
... ]
146+
... )
147+
>>> query.filter(place="Chicago suburbs").data[0]['city']
148+
'Elmhurst'
149+
>>> query.filter(place__icontains="chicago").data[0]['city']
150+
'Elmhurst'
151+
>>> query.filter(foods__breakfast="waffles").data[0]['city']
152+
'Elmhurst'
153+
>>> query.filter(foods__fruit__in="cantelope").data[0]['city']
154+
'Elmhurst'
155+
>>> query.filter(foods__fruit__in="orange").data[0]['city']
156+
'Tampa'
157+
"""
158+
159+
__slots__ = ("data", "pk_key")
160+
data: Sequence[T]
161+
162+
# def __init__(self, data, pk_key: Optional[str] = None):
163+
# self.data: Sequence[T] = data
164+
# #: Primary key for objects, optional.
165+
# #: Use for .get(), .items()
166+
# self.pk_key: Optional[Any] = pk_key
167+
168+
def items(self):
169+
data: Sequence[T]
170+
171+
if self.pk_key is None:
172+
raise Exception("items() require a pk_key exists")
173+
return [(getattr(item, self.pk_key), item) for item in self.data]
174+
175+
def __eq__(self, other):
176+
data = other
177+
if hasattr(data, "data"):
178+
data = getattr(data, "data")
179+
180+
if not isinstance(self.data, list) or not isinstance(data, list):
181+
return False
182+
183+
if len(self.data) == len(data):
184+
for (a, b) in zip(self.data, data):
185+
if isinstance(a, dict):
186+
a_keys = a.keys()
187+
if a.keys == b.keys():
188+
for key in a_keys:
189+
if abs(a[key] - b[key]) > 1:
190+
return False
191+
else:
192+
if a != b:
193+
return False
194+
195+
return True
196+
return False
197+
198+
def filter(self, matcher: Optional[Union[Callable[[T], bool], T]] = None, **kwargs):
199+
def filter_lookup(obj) -> bool:
200+
for path, v in kwargs.items():
201+
try:
202+
lhs, op = path.rsplit("__", 1)
203+
204+
if op not in LOOKUP_NAME_MAP:
205+
raise ValueError(f"{op} not in LOOKUP_NAME_MAP")
206+
except ValueError:
207+
lhs = path
208+
op = "exact"
209+
210+
assert op in LOOKUP_NAME_MAP
211+
path = lhs
212+
data = keygetter(obj, path)
213+
214+
if not LOOKUP_NAME_MAP[op](data, v):
215+
return False
216+
217+
return True
218+
219+
if callable(matcher):
220+
_filter = matcher
221+
elif matcher is not None:
222+
223+
def val_match(obj):
224+
if isinstance(matcher, list):
225+
return obj in matcher
226+
else:
227+
return obj == matcher
228+
229+
_filter = val_match
230+
else:
231+
_filter = filter_lookup
232+
233+
return self.__class__(data=[k for k in self.data if _filter(k)])

0 commit comments

Comments
 (0)