Skip to content

Commit b00890e

Browse files
authored
Query and Scan Filters using Condition Expressions (#339)
1 parent 1eb388c commit b00890e

File tree

8 files changed

+524
-54
lines changed

8 files changed

+524
-54
lines changed

pynamodb/connection/base.py

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,24 @@ def get_index_hash_keyname(self, index_name):
109109
if schema_key.get(KEY_TYPE) == HASH:
110110
return schema_key.get(ATTR_NAME)
111111

112+
def get_index_range_keyname(self, index_name):
113+
"""
114+
Returns the name of the hash key for a given index
115+
"""
116+
global_indexes = self.data.get(GLOBAL_SECONDARY_INDEXES)
117+
local_indexes = self.data.get(LOCAL_SECONDARY_INDEXES)
118+
indexes = []
119+
if local_indexes:
120+
indexes += local_indexes
121+
if global_indexes:
122+
indexes += global_indexes
123+
for index in indexes:
124+
if index.get(INDEX_NAME) == index_name:
125+
for schema_key in index.get(KEY_SCHEMA):
126+
if schema_key.get(KEY_TYPE) == RANGE:
127+
return schema_key.get(ATTR_NAME)
128+
return None
129+
112130
def get_item_attribute_map(self, attributes, item_key=ITEM, pythonic_key=True):
113131
"""
114132
Builds up a dynamodb compatible AttributeValue map
@@ -1031,6 +1049,7 @@ def get_item(self,
10311049

10321050
def rate_limited_scan(self,
10331051
table_name,
1052+
filter_condition=None,
10341053
attributes_to_get=None,
10351054
page_size=None,
10361055
limit=None,
@@ -1051,6 +1070,7 @@ def rate_limited_scan(self,
10511070
limit the rate of the scan. 'ProvisionedThroughputExceededException' is also handled and retried.
10521071
10531072
:param table_name: Name of the table to perform scan on.
1073+
:param filter_condition: Condition used to restrict the scan results
10541074
:param attributes_to_get: A list of attributes to return.
10551075
:param page_size: Page size of the scan to DynamoDB
10561076
:param limit: Used to limit the number of results returned
@@ -1094,6 +1114,7 @@ def rate_limited_scan(self,
10941114
try:
10951115
data = self.scan(
10961116
table_name,
1117+
filter_condition=filter_condition,
10971118
attributes_to_get=attributes_to_get,
10981119
exclusive_start_key=last_evaluated_key,
10991120
limit=page_size,
@@ -1177,6 +1198,7 @@ def rate_limited_scan(self,
11771198

11781199
def scan(self,
11791200
table_name,
1201+
filter_condition=None,
11801202
attributes_to_get=None,
11811203
limit=None,
11821204
conditional_operator=None,
@@ -1189,10 +1211,15 @@ def scan(self,
11891211
"""
11901212
Performs the scan operation
11911213
"""
1214+
self._check_condition('filter_condition', filter_condition, scan_filter, conditional_operator)
1215+
11921216
operation_kwargs = {TABLE_NAME: table_name}
11931217
name_placeholders = {}
11941218
expression_attribute_values = {}
11951219

1220+
if filter_condition is not None:
1221+
filter_expression = filter_condition.serialize(name_placeholders, expression_attribute_values)
1222+
operation_kwargs[FILTER_EXPRESSION] = filter_expression
11961223
if attributes_to_get is not None:
11971224
projection_expression = create_projection_expression(attributes_to_get, name_placeholders)
11981225
operation_kwargs[PROJECTION_EXPRESSION] = projection_expression
@@ -1226,6 +1253,8 @@ def scan(self,
12261253
def query(self,
12271254
table_name,
12281255
hash_key,
1256+
range_key_condition=None,
1257+
filter_condition=None,
12291258
attributes_to_get=None,
12301259
consistent_read=False,
12311260
exclusive_start_key=None,
@@ -1240,6 +1269,9 @@ def query(self,
12401269
"""
12411270
Performs the Query operation and returns the result
12421271
"""
1272+
self._check_condition('range_key_condition', range_key_condition, key_conditions, conditional_operator)
1273+
self._check_condition('filter_condition', filter_condition, query_filters, conditional_operator)
1274+
12431275
operation_kwargs = {TABLE_NAME: table_name}
12441276
name_placeholders = {}
12451277
expression_attribute_values = {}
@@ -1251,10 +1283,21 @@ def query(self,
12511283
hash_keyname = tbl.get_index_hash_keyname(index_name)
12521284
if not hash_keyname:
12531285
raise ValueError("No hash key attribute for index: {0}".format(index_name))
1286+
range_keyname = tbl.get_index_range_keyname(index_name)
12541287
else:
12551288
hash_keyname = tbl.hash_keyname
1289+
range_keyname = tbl.range_keyname
1290+
1291+
key_condition = self._get_condition(table_name, hash_keyname, '__eq__', hash_key)
1292+
if range_key_condition is not None:
1293+
if range_key_condition.is_valid_range_key_condition(range_keyname):
1294+
key_condition = key_condition & range_key_condition
1295+
elif filter_condition is None:
1296+
# Try to gracefully handle the case where a user passed in a filter as a range key condition
1297+
(filter_condition, range_key_condition) = (range_key_condition, None)
1298+
else:
1299+
raise ValueError("{0} is not a valid range key condition".format(range_key_condition))
12561300

1257-
key_condition_expression = self._get_condition(table_name, hash_keyname, '__eq__', hash_key)
12581301
if key_conditions is None or len(key_conditions) == 0:
12591302
pass # No comparisons on sort key
12601303
elif len(key_conditions) > 1:
@@ -1267,11 +1310,21 @@ def query(self,
12671310
operator = KEY_CONDITION_OPERATOR_MAP[operator]
12681311
values = condition.get(ATTR_VALUE_LIST)
12691312
sort_key_expression = self._get_condition(table_name, key, operator, *values)
1270-
key_condition_expression = key_condition_expression & sort_key_expression
1313+
key_condition = key_condition & sort_key_expression
12711314

1272-
operation_kwargs[KEY_CONDITION_EXPRESSION] = key_condition_expression.serialize(
1315+
operation_kwargs[KEY_CONDITION_EXPRESSION] = key_condition.serialize(
12731316
name_placeholders, expression_attribute_values)
1274-
1317+
if filter_condition is not None:
1318+
filter_expression = filter_condition.serialize(name_placeholders, expression_attribute_values)
1319+
# FilterExpression does not allow key attributes. Check for hash and range key name placeholders
1320+
hash_key_placeholder = name_placeholders.get(hash_keyname)
1321+
range_key_placeholder = range_keyname and name_placeholders.get(range_keyname)
1322+
if (
1323+
hash_key_placeholder in filter_expression or
1324+
(range_key_placeholder and range_key_placeholder in filter_expression)
1325+
):
1326+
raise ValueError("'filter_condition' cannot contain key attributes")
1327+
operation_kwargs[FILTER_EXPRESSION] = filter_expression
12751328
if attributes_to_get:
12761329
projection_expression = create_projection_expression(attributes_to_get, name_placeholders)
12771330
operation_kwargs[PROJECTION_EXPRESSION] = projection_expression
@@ -1383,7 +1436,7 @@ def _check_condition(self, name, condition, expected_or_filter, conditional_oper
13831436
if condition is not None:
13841437
if not isinstance(condition, Condition):
13851438
raise ValueError("'{0}' must be an instance of Condition".format(name))
1386-
if expected_or_filter is not None or conditional_operator is not None:
1439+
if expected_or_filter or conditional_operator is not None:
13871440
raise ValueError("Legacy conditional parameters cannot be used with condition expressions")
13881441

13891442
@staticmethod

pynamodb/connection/table.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ def get_item(self, hash_key, range_key=None, consistent_read=False, attributes_t
138138
attributes_to_get=attributes_to_get)
139139

140140
def rate_limited_scan(self,
141+
filter_condition=None,
141142
attributes_to_get=None,
142143
page_size=None,
143144
limit=None,
@@ -157,6 +158,7 @@ def rate_limited_scan(self,
157158
"""
158159
return self.connection.rate_limited_scan(
159160
self.table_name,
161+
filter_condition=filter_condition,
160162
attributes_to_get=attributes_to_get,
161163
page_size=page_size,
162164
limit=limit,
@@ -173,6 +175,7 @@ def rate_limited_scan(self,
173175
consistent_read=consistent_read)
174176

175177
def scan(self,
178+
filter_condition=None,
176179
attributes_to_get=None,
177180
limit=None,
178181
conditional_operator=None,
@@ -187,6 +190,7 @@ def scan(self,
187190
"""
188191
return self.connection.scan(
189192
self.table_name,
193+
filter_condition=filter_condition,
190194
attributes_to_get=attributes_to_get,
191195
limit=limit,
192196
conditional_operator=conditional_operator,
@@ -199,6 +203,8 @@ def scan(self,
199203

200204
def query(self,
201205
hash_key,
206+
range_key_condition=None,
207+
filter_condition=None,
202208
attributes_to_get=None,
203209
consistent_read=False,
204210
exclusive_start_key=None,
@@ -217,6 +223,8 @@ def query(self,
217223
return self.connection.query(
218224
self.table_name,
219225
hash_key,
226+
range_key_condition=range_key_condition,
227+
filter_condition=filter_condition,
220228
attributes_to_get=attributes_to_get,
221229
consistent_read=consistent_read,
222230
exclusive_start_key=exclusive_start_key,

pynamodb/expressions/condition.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,10 @@ def __init__(self, path, operator, *values):
137137
self.operator = operator
138138
self.values = values
139139

140+
# http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Query.html#DDB-Query-request-KeyConditionExpression
141+
def is_valid_range_key_condition(self, path):
142+
return str(self.path) == path and self.operator in ['=', '<', '<=', '>', '>=', BETWEEN, 'begins_with']
143+
140144
def serialize(self, placeholder_names, expression_attribute_values):
141145
path = self._get_path(self.path, placeholder_names)
142146
values = self._get_values(placeholder_names, expression_attribute_values)
@@ -175,7 +179,7 @@ def __invert__(self):
175179
return Not(self)
176180

177181
def __repr__(self):
178-
values = [repr(value) if isinstance(value, Condition) else value.items()[0][1] for value in self.values]
182+
values = [repr(value) if isinstance(value, Condition) else list(value.items())[0][1] for value in self.values]
179183
return self.format_string.format(*values, path=self.path, operator = self.operator)
180184

181185
def __nonzero__(self):

pynamodb/indexes.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,17 @@ def __init__(self):
4545
@classmethod
4646
def count(cls,
4747
hash_key,
48+
range_key_condition=None,
49+
filter_condition=None,
4850
consistent_read=False,
4951
**filters):
5052
"""
5153
Count on an index
5254
"""
5355
return cls.Meta.model.count(
5456
hash_key,
57+
range_key_condition=range_key_condition,
58+
filter_condition=filter_condition,
5559
index_name=cls.Meta.index_name,
5660
consistent_read=consistent_read,
5761
**filters
@@ -60,6 +64,8 @@ def count(cls,
6064
@classmethod
6165
def query(self,
6266
hash_key,
67+
range_key_condition=None,
68+
filter_condition=None,
6369
scan_index_forward=None,
6470
consistent_read=False,
6571
limit=None,
@@ -71,6 +77,8 @@ def query(self,
7177
"""
7278
return self.Meta.model.query(
7379
hash_key,
80+
range_key_condition=range_key_condition,
81+
filter_condition=filter_condition,
7482
index_name=self.Meta.index_name,
7583
scan_index_forward=scan_index_forward,
7684
consistent_read=consistent_read,

pynamodb/models.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,8 @@ def from_raw_data(cls, data):
517517
@classmethod
518518
def count(cls,
519519
hash_key=None,
520+
range_key_condition=None,
521+
filter_condition=None,
520522
consistent_read=False,
521523
index_name=None,
522524
limit=None,
@@ -525,6 +527,8 @@ def count(cls,
525527
Provides a filtered count
526528
527529
:param hash_key: The hash key to query. Can be None.
530+
:param range_key_condition: Condition for range key
531+
:param filter_condition: Condition used to restrict the query results
528532
:param consistent_read: If True, a consistent read is performed
529533
:param index_name: If set, then this index is used
530534
:param filters: A dictionary of filters to be used in the query. Requires a hash_key to be passed.
@@ -562,6 +566,8 @@ def count(cls,
562566
started = True
563567
data = cls._get_connection().query(
564568
hash_key,
569+
range_key_condition=range_key_condition,
570+
filter_condition=filter_condition,
565571
index_name=index_name,
566572
consistent_read=consistent_read,
567573
key_conditions=key_conditions,
@@ -578,6 +584,8 @@ def count(cls,
578584
@classmethod
579585
def query(cls,
580586
hash_key,
587+
range_key_condition=None,
588+
filter_condition=None,
581589
consistent_read=False,
582590
index_name=None,
583591
scan_index_forward=None,
@@ -591,6 +599,8 @@ def query(cls,
591599
Provides a high level query API
592600
593601
:param hash_key: The hash key to query
602+
:param range_key_condition: Condition for range key
603+
:param filter_condition: Condition used to restrict the query results
594604
:param consistent_read: If True, a consistent read is performed
595605
:param index_name: If set, then this index is used
596606
:param limit: Used to limit the number of results returned
@@ -630,6 +640,8 @@ def query(cls,
630640
log.debug("Fetching first query page")
631641

632642
query_kwargs = dict(
643+
range_key_condition=range_key_condition,
644+
filter_condition=filter_condition,
633645
index_name=index_name,
634646
exclusive_start_key=last_evaluated_key,
635647
consistent_read=consistent_read,
@@ -668,6 +680,7 @@ def query(cls,
668680

669681
@classmethod
670682
def rate_limited_scan(cls,
683+
filter_condition=None,
671684
attributes_to_get=None,
672685
segment=None,
673686
total_segments=None,
@@ -686,6 +699,7 @@ def rate_limited_scan(cls,
686699
Scans the items in the table at a definite rate.
687700
Invokes the low level rate_limited_scan API.
688701
702+
:param filter_condition: Condition used to restrict the scan results
689703
:param attributes_to_get: A list of attributes to return.
690704
:param segment: If set, then scans the segment
691705
:param total_segments: If set, then specifies total segments
@@ -717,6 +731,7 @@ def rate_limited_scan(cls,
717731
key_filter.update(scan_filter)
718732

719733
scan_result = cls._get_connection().rate_limited_scan(
734+
filter_condition=filter_condition,
720735
attributes_to_get=attributes_to_get,
721736
page_size=page_size,
722737
limit=limit,
@@ -738,6 +753,7 @@ def rate_limited_scan(cls,
738753

739754
@classmethod
740755
def scan(cls,
756+
filter_condition=None,
741757
segment=None,
742758
total_segments=None,
743759
limit=None,
@@ -749,6 +765,7 @@ def scan(cls,
749765
"""
750766
Iterates through all items in the table
751767
768+
:param filter_condition: Condition used to restrict the scan results
752769
:param segment: If set, then scans the segment
753770
:param total_segments: If set, then specifies total segments
754771
:param limit: Used to limit the number of results returned
@@ -770,6 +787,7 @@ def scan(cls,
770787
page_size = limit
771788

772789
data = cls._get_connection().scan(
790+
filter_condition=filter_condition,
773791
exclusive_start_key=last_evaluated_key,
774792
segment=segment,
775793
limit=page_size,
@@ -790,6 +808,7 @@ def scan(cls,
790808
while last_evaluated_key:
791809
log.debug("Fetching scan page with exclusive start key: %s", last_evaluated_key)
792810
data = cls._get_connection().scan(
811+
filter_condition=filter_condition,
793812
exclusive_start_key=last_evaluated_key,
794813
limit=page_size,
795814
scan_filter=key_filter,

0 commit comments

Comments
 (0)