1
1
# Size of a single page in a paginated query.
2
2
from abc import ABC , abstractmethod
3
- from typing import Any , Callable , Dict , List , Optional
3
+ from typing import Any , Callable , Dict , List , Optional , Tuple , Type
4
4
5
5
from typing import TYPE_CHECKING
6
6
if TYPE_CHECKING :
@@ -24,9 +24,9 @@ def __init__(self,
24
24
query : str ,
25
25
params : Dict [str , str ],
26
26
dereferencing : Dict [str , Any ],
27
- obj_class : "DbObject" ,
27
+ obj_class : Type [ "DbObject" ] ,
28
28
cursor_path : Optional [Dict [str , Any ]] = None ,
29
- beta : bool = False ):
29
+ experimental : bool = False ):
30
30
""" Creates a PaginatedCollection.
31
31
32
32
Args:
@@ -43,98 +43,117 @@ def __init__(self,
43
43
cursor_path: If not None, this is used to find the cursor
44
44
experimental: Used to call experimental endpoints
45
45
"""
46
- self .client = client
47
- self .query = query
48
- self .params = params
49
- self .dereferencing = dereferencing
50
- self .obj_class = obj_class
51
- self .cursor_path = cursor_path
52
- self .beta = beta
53
-
54
46
self ._fetched_all = False
55
47
self ._data : List [Dict [str , Any ]] = []
56
48
self ._data_ind = 0
57
49
50
+ pagination_kwargs = {
51
+ 'client' : client ,
52
+ 'obj_class' : obj_class ,
53
+ 'dereferencing' : dereferencing ,
54
+ 'experimental' : experimental ,
55
+ 'query' : query ,
56
+ 'params' : params
57
+ }
58
+
58
59
self .paginator = _CursorPagination (
59
- client , cursor_path ) if cursor_path else _OffsetPagination (client )
60
+ cursor_path , **
61
+ pagination_kwargs ) if cursor_path else _OffsetPagination (
62
+ ** pagination_kwargs )
60
63
61
64
def __iter__ (self ):
62
65
self ._data_ind = 0
63
66
return self
64
67
65
- def get_page_data (self , results ):
66
- for deref in self .dereferencing :
67
- results = results [deref ]
68
- return [self .obj_class (self .client , result ) for result in results ]
69
-
70
68
def __next__ (self ):
71
69
if len (self ._data ) <= self ._data_ind :
72
70
if self ._fetched_all :
73
71
raise StopIteration ()
74
72
75
- results = self .paginator .fetch_results (self .query , self .params ,
76
- self .beta )
77
- page_data = self .get_page_data (results )
73
+ page_data , self ._fetched_all = self .paginator .get_next_page ()
78
74
self ._data .extend (page_data )
79
- n_items = len (page_data )
80
-
81
- if n_items == 0 :
75
+ if len (page_data ) == 0 :
82
76
raise StopIteration ()
83
77
84
- self ._fetched_all = self .paginator .fetched_all (n_items , results )
85
-
86
78
rval = self ._data [self ._data_ind ]
87
79
self ._data_ind += 1
88
80
return rval
89
81
90
82
91
83
class _Pagination (ABC ):
92
84
93
- @abstractmethod
94
- def fetched_all (self , n_items : int , results : List [Dict [str , Any ]]) -> bool :
95
- ...
85
+ def __init__ (self , client : "Client" , obj_class : Type ["DbObject" ],
86
+ dereferencing : Dict [str , Any ], query : str ,
87
+ params : Dict [str , Any ], experimental : bool ):
88
+ self .client = client
89
+ self .obj_class = obj_class
90
+ self .dereferencing = dereferencing
91
+ self .experimental = experimental
92
+ self .query = query
93
+ self .params = params
94
+
95
+ def get_page_data (self , results : Dict [str , Any ]) -> List ["DbObject" ]:
96
+ for deref in self .dereferencing :
97
+ results = results [deref ]
98
+ return [self .obj_class (self .client , result ) for result in results ]
96
99
97
100
@abstractmethod
98
- def fetch_results (self , query : str , params : Dict [str , Any ],
99
- beta : bool ) -> Dict [str , Any ]:
101
+ def get_next_page (self ) -> Tuple [Dict [str , Any ], bool ]:
100
102
...
101
103
102
104
103
105
class _CursorPagination (_Pagination ):
104
106
105
- def __init__ (self , client : "Client" , cursor_path : Dict [str , Any ]):
106
- self . client = client
107
+ def __init__ (self , cursor_path : Dict [str , Any ], * args , ** kwargs ):
108
+ super (). __init__ ( * args , ** kwargs )
107
109
self .cursor_path = cursor_path
108
- self .next_cursor : Optional [str ] = None
110
+ self .next_cursor : Optional [Any ] = None
109
111
110
- def get_next_cursor (self , results ) -> Optional [str ] :
112
+ def increment_page (self , results : Dict [str , Any ]) :
111
113
for path in self .cursor_path :
112
114
results = results [path ]
113
- return results
115
+ self . next_cursor = results
114
116
115
- def fetched_all (self , n_items : int , results : List [Dict [str , Any ]]) -> bool :
116
- self .next_cursor = self .get_next_cursor (results )
117
- return bool (self .next_cursor is None )
117
+ def fetched_all (self ) -> bool :
118
+ return bool (self .next_cursor )
118
119
119
- def fetch_results (self , query : str , params : Dict [str , Any ],
120
- beta : bool ) -> Dict [str , Any ]:
121
- params .update ({'from' : self .next_cursor , 'first' : _PAGE_SIZE })
122
- return self .client .execute (query , params , beta = beta )
120
+ def fetch_results (self ) -> Dict [str , Any ]:
121
+ self .params .update ({'from' : self .next_cursor , 'first' : _PAGE_SIZE })
122
+ return self .client .execute (self .query ,
123
+ self .params ,
124
+ experimental = self .experimental )
125
+
126
+ def get_next_page (self ):
127
+ results = self .fetch_results ()
128
+ page_data = self .get_page_data (results )
129
+ self .increment_page (results )
130
+ done = self .fetched_all ()
131
+ return page_data , done
123
132
124
133
125
134
class _OffsetPagination (_Pagination ):
126
135
127
- def __init__ (self , client : "Client" ):
128
- self . client = client
136
+ def __init__ (self , * args , ** kwargs ):
137
+ super (). __init__ ( * args , ** kwargs )
129
138
self ._fetched_pages = 0
130
139
131
- def fetched_all (self , n_items : int , results : List [ Dict [ str , Any ]]) -> bool :
140
+ def increment_page (self ) :
132
141
self ._fetched_pages += 1
142
+
143
+ def fetched_all (self , n_items : int ) -> bool :
133
144
if n_items < _PAGE_SIZE :
134
145
return True
135
146
return False
136
147
137
- def fetch_results (self , query : str , params : Dict [str , Any ],
138
- beta : bool ) -> Dict [str , Any ]:
139
- query = query % (self ._fetched_pages * _PAGE_SIZE , _PAGE_SIZE )
140
- return self .client .execute (query , params , beta = beta )
148
+ def fetch_results (self ) -> Dict [str , Any ]:
149
+ query = self .query % (self ._fetched_pages * _PAGE_SIZE , _PAGE_SIZE )
150
+ return self .client .execute (query ,
151
+ self .params ,
152
+ experimental = self .experimental )
153
+
154
+ def get_next_page (self ):
155
+ results = self .fetch_results ()
156
+ page_data = self .get_page_data (results )
157
+ self .increment_page ()
158
+ done = self .fetched_all (len (page_data ))
159
+ return page_data , done
0 commit comments