Skip to content

Commit fefd46c

Browse files
committed
local testing & code re-structuring
1 parent 20c73ce commit fefd46c

16 files changed

+913
-903
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## 0.3.0
2+
### Changed
3+
- Option for local unittest is added. Also GitHub Actions Workflow is modified to use local testing.
4+
- Fixed the issue #37 ([Make wrapper more maintenance-proof with non-breaking refactor](https://github.com/vvaezian/metabase_api_python/issues/37))
5+
16
## 0.2.16
27
### Changed
38
- Fixed the issue #41 ([KeyError: 'sizeX'](https://github.com/vvaezian/metabase_api_python/issues/41))
200 Bytes
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

metabase_api/_helper_methods.py

Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
2+
def get_item_info(self, item_type
3+
, item_id=None, item_name=None
4+
, collection_id=None, collection_name=None
5+
, params=None):
6+
'''
7+
Return the info for the given item.
8+
Use 'params' for providing arguments. E.g. to include tables in the result for databases, use: params={'include':'tables'}
9+
'''
10+
11+
assert item_type in ['database', 'table', 'card', 'collection', 'dashboard', 'pulse', 'segment']
12+
13+
if params:
14+
assert type(params) == dict
15+
16+
if not item_id:
17+
if not item_name:
18+
raise ValueError('Either the name or id of the {} must be provided.'.format(item_type))
19+
item_id = self.get_item_id(item_type, item_name, collection_id=collection_id, collection_name=collection_name)
20+
21+
res = self.get("/api/{}/{}".format(item_type, item_id), params=params)
22+
if res:
23+
return res
24+
else:
25+
raise ValueError('There is no {} with the id "{}"'.format(item_type, item_id))
26+
27+
28+
29+
def get_item_name(self, item_type, item_id):
30+
31+
assert item_type in ['database', 'table', 'card', 'collection', 'dashboard', 'pulse', 'segment']
32+
33+
res = self.get("/api/{}/{}".format(item_type, item_id))
34+
if res:
35+
return res['name']
36+
else:
37+
raise ValueError('There is no {} with the id "{}"'.format(item_type, item_id))
38+
39+
40+
41+
def get_item_id(self, item_type, item_name, collection_id=None, collection_name=None, db_id=None, db_name=None, table_id=None):
42+
43+
assert item_type in ['database', 'table', 'card', 'collection', 'dashboard', 'pulse', 'segment']
44+
45+
if item_type in ['card', 'dashboard', 'pulse']:
46+
if not collection_id:
47+
if not collection_name:
48+
# Collection name/id is not provided. Searching in all collections
49+
item_IDs = [ i['id'] for i in self.get("/api/{}/".format(item_type)) if i['name'] == item_name
50+
and i['archived'] == False ]
51+
else:
52+
collection_id = self.get_item_id('collection', collection_name) if collection_name != 'root' else None
53+
item_IDs = [ i['id'] for i in self.get("/api/{}/".format(item_type)) if i['name'] == item_name
54+
and i['collection_id'] == collection_id
55+
and i['archived'] == False ]
56+
else:
57+
collection_name = self.get_item_name('collection', collection_id)
58+
item_IDs = [ i['id'] for i in self.get("/api/{}/".format(item_type)) if i['name'] == item_name
59+
and i['collection_id'] == collection_id
60+
and i['archived'] == False ]
61+
62+
if len(item_IDs) > 1:
63+
if not collection_name:
64+
raise ValueError('There is more than one {} with the name "{}".\n\
65+
Provide collection id/name to limit the search space'.format(item_type, item_name))
66+
raise ValueError('There is more than one {} with the name "{}" in the collection "{}"'
67+
.format(item_type, item_name, collection_name))
68+
if len(item_IDs) == 0:
69+
if not collection_name:
70+
raise ValueError('There is no {} with the name "{}"'.format(item_type, item_name))
71+
raise ValueError('There is no item with the name "{}" in the collection "{}"'
72+
.format(item_name, collection_name))
73+
74+
return item_IDs[0]
75+
76+
77+
if item_type == 'collection':
78+
collection_IDs = [ i['id'] for i in self.get("/api/collection/") if i['name'] == item_name ]
79+
80+
if len(collection_IDs) > 1:
81+
raise ValueError('There is more than one collection with the name "{}"'.format(item_name))
82+
if len(collection_IDs) == 0:
83+
raise ValueError('There is no collection with the name "{}"'.format(item_name))
84+
85+
return collection_IDs[0]
86+
87+
88+
if item_type == 'database':
89+
res = self.get("/api/database/")
90+
if type(res) == dict: # in Metabase version *.40.0 the format of the returned result for this endpoint changed
91+
res = res['data']
92+
db_IDs = [ i['id'] for i in res if i['name'] == item_name ]
93+
94+
if len(db_IDs) > 1:
95+
raise ValueError('There is more than one DB with the name "{}"'.format(item_name))
96+
if len(db_IDs) == 0:
97+
raise ValueError('There is no DB with the name "{}"'.format(item_name))
98+
99+
return db_IDs[0]
100+
101+
102+
if item_type == 'table':
103+
tables = self.get("/api/table/")
104+
105+
if db_id:
106+
table_IDs = [ i['id'] for i in tables if i['name'] == item_name and i['db']['id'] == db_id ]
107+
elif db_name:
108+
table_IDs = [ i['id'] for i in tables if i['name'] == item_name and i['db']['name'] == db_name ]
109+
else:
110+
table_IDs = [ i['id'] for i in tables if i['name'] == item_name ]
111+
112+
if len(table_IDs) > 1:
113+
raise ValueError('There is more than one table with the name {}. Provide db id/name.'.format(item_name))
114+
if len(table_IDs) == 0:
115+
raise ValueError('There is no table with the name "{}" (in the provided db, if any)'.format(item_name))
116+
117+
return table_IDs[0]
118+
119+
120+
if item_type == 'segment':
121+
segment_IDs = [ i['id'] for i in self.get("/api/segment/") if i['name'] == item_name
122+
and (not table_id or i['table_id'] == table_id) ]
123+
if len(segment_IDs) > 1:
124+
raise ValueError('There is more than one segment with the name "{}"'.format(item_name))
125+
if len(segment_IDs) == 0:
126+
raise ValueError('There is no segment with the name "{}"'.format(item_name))
127+
128+
return segment_IDs[0]
129+
130+
131+
132+
def get_collection_id(self, collection_name):
133+
import warnings
134+
warnings.warn("The function get_collection_id will be removed in the next version. Use get_item_id function instead.", DeprecationWarning)
135+
136+
collection_IDs = [ i['id'] for i in self.get("/api/collection/") if i['name'] == collection_name ]
137+
138+
if len(collection_IDs) > 1:
139+
raise ValueError('There is more than one collection with the name "{}"'.format(collection_name))
140+
if len(collection_IDs) == 0:
141+
raise ValueError('There is no collection with the name "{}"'.format(collection_name))
142+
143+
return collection_IDs[0]
144+
145+
146+
147+
def get_db_id(self, db_name):
148+
import warnings
149+
warnings.warn("The function get_db_id will be removed in the next version. Use get_item_id function instead.", DeprecationWarning)
150+
151+
res = self.get("/api/database/")
152+
if type(res) == dict: # in Metabase version *.40.0 the format of the returned result for this endpoint changed
153+
res = res['data']
154+
db_IDs = [ i['id'] for i in res if i['name'] == db_name ]
155+
156+
if len(db_IDs) > 1:
157+
raise ValueError('There is more than one DB with the name "{}"'.format(db_name))
158+
if len(db_IDs) == 0:
159+
raise ValueError('There is no DB with the name "{}"'.format(db_name))
160+
161+
return db_IDs[0]
162+
163+
164+
165+
def get_table_id(self, table_name, db_name=None, db_id=None):
166+
import warnings
167+
warnings.warn("The function get_table_id will be removed in the next version. Use get_item_id function instead.", DeprecationWarning)
168+
169+
tables = self.get("/api/table/")
170+
171+
if db_id:
172+
table_IDs = [ i['id'] for i in tables if i['name'] == table_name and i['db']['id'] == db_id ]
173+
elif db_name:
174+
table_IDs = [ i['id'] for i in tables if i['name'] == table_name and i['db']['name'] == db_name ]
175+
else:
176+
table_IDs = [ i['id'] for i in tables if i['name'] == table_name ]
177+
178+
if len(table_IDs) > 1:
179+
raise ValueError('There is more than one table with the name {}. Provide db id/name.'.format(table_name))
180+
if len(table_IDs) == 0:
181+
raise ValueError('There is no table with the name "{}" (in the provided db, if any)'.format(table_name))
182+
183+
return table_IDs[0]
184+
185+
186+
187+
def get_segment_id(self, segment_name, table_id=None):
188+
import warnings
189+
warnings.warn("The function get_segment_id will be removed in the next version. Use get_item_id function instead.", DeprecationWarning)
190+
191+
segment_IDs = [ i['id'] for i in self.get("/api/segment/") if i['name'] == segment_name
192+
and (not table_id or i['table_id'] == table_id) ]
193+
if len(segment_IDs) > 1:
194+
raise ValueError('There is more than one segment with the name "{}"'.format(segment_name))
195+
if len(segment_IDs) == 0:
196+
raise ValueError('There is no segment with the name "{}"'.format(segment_name))
197+
198+
return segment_IDs[0]
199+
200+
201+
202+
def get_db_id_from_table_id(self, table_id):
203+
tables = [ i['db_id'] for i in self.get("/api/table/") if i['id'] == table_id ]
204+
205+
if len(tables) == 0:
206+
raise ValueError('There is no DB containing the table with the ID "{}"'.format(table_id))
207+
208+
return tables[0]
209+
210+
211+
212+
def get_db_info(self, db_name=None, db_id=None, params=None):
213+
'''
214+
Return Database info. Use 'params' for providing arguments.
215+
For example to include tables in the result, use: params={'include':'tables'}
216+
'''
217+
import warnings
218+
warnings.warn("The function get_db_info will be removed in the next version. Use get_item_info function instead.", DeprecationWarning)
219+
220+
if params:
221+
assert type(params) == dict
222+
223+
if not db_id:
224+
if not db_name:
225+
raise ValueError('Either the name or id of the DB needs to be provided.')
226+
db_id = self.get_item_id('database', db_name)
227+
228+
return self.get("/api/database/{}".format(db_id), params=params)
229+
230+
231+
232+
def get_table_metadata(self, table_name=None, table_id=None, db_name=None, db_id=None, params=None):
233+
234+
if params:
235+
assert type(params) == dict
236+
237+
if not table_id:
238+
if not table_name:
239+
raise ValueError('Either the name or id of the table needs to be provided.')
240+
table_id = self.get_item_id('table', table_name, db_name=db_name, db_id=db_id)
241+
242+
return self.get("/api/table/{}/query_metadata".format(table_id), params=params)
243+
244+
245+
246+
def get_columns_name_id(self, table_name=None, db_name=None, table_id=None, db_id=None, verbose=False, column_id_name=False):
247+
'''
248+
Return a dictionary with col_name key and col_id value, for the given table_id/table_name in the given db_id/db_name.
249+
If column_id_name is True, return a dictionary with col_id key and col_name value.
250+
'''
251+
if not self.friendly_names_is_disabled():
252+
raise ValueError('Please disable "Friendly Table and Field Names" from Admin Panel > Settings > General, and try again.')
253+
254+
if not table_name:
255+
if not table_id:
256+
raise ValueError('Either the name or id of the table must be provided.')
257+
table_name = self.get_item_name(item_type='table', item_id=table_id)
258+
259+
# Get db_id
260+
if not db_id:
261+
if db_name:
262+
db_id = self.get_item_id('database', db_name)
263+
else:
264+
if not table_id:
265+
table_id = self.get_item_id('table', table_name)
266+
db_id = self.get_db_id_from_table_id(table_id)
267+
268+
# Get column names and IDs
269+
if column_id_name:
270+
return {i['id']: i['name'] for i in self.get("/api/database/{}/fields".format(db_id))
271+
if i['table_name'] == table_name}
272+
else:
273+
return {i['name']: i['id'] for i in self.get("/api/database/{}/fields".format(db_id))
274+
if i['table_name'] == table_name}
275+
276+
277+
278+
def friendly_names_is_disabled(self):
279+
'''
280+
The endpoint /api/database/:db-id/fields which is used in the function get_columns_name_id relies on the display name of fields.
281+
If "Friendly Table and Field Names" (in Admin Panel > Settings > General) is not disabled, it changes the display name of fields.
282+
So it is important to make sure this setting is disabled, before running the get_columns_name_id function.
283+
'''
284+
# checking whether friendly_name is disabled required admin access.
285+
# So to let non-admin users also use this package we skip this step for them.
286+
# There is warning in the __init__ method for these users.
287+
if not self.is_admin:
288+
return True
289+
290+
friendly_name_setting = [ i['value'] for i in self.get('/api/setting') if i['key'] == 'humanization-strategy' ][0]
291+
return friendly_name_setting == 'none' # 'none' means disabled
292+
293+
294+
295+
@staticmethod
296+
def verbose_print(verbose, msg):
297+
if verbose:
298+
print(msg)

metabase_api/_rest_methods.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import requests
2+
3+
def get(self, endpoint, *args, **kwargs):
4+
self.validate_session()
5+
res = requests.get(self.domain + endpoint, headers=self.header, **kwargs, auth=self.auth)
6+
if 'raw' in args:
7+
return res
8+
else:
9+
return res.json() if res.ok else False
10+
11+
12+
def post(self, endpoint, *args, **kwargs):
13+
self.validate_session()
14+
res = requests.post(self.domain + endpoint, headers=self.header, **kwargs, auth=self.auth)
15+
if 'raw' in args:
16+
return res
17+
else:
18+
return res.json() if res.ok else False
19+
20+
21+
def put(self, endpoint, *args, **kwargs):
22+
"""Used for updating objects (cards, dashboards, ...)"""
23+
self.validate_session()
24+
res = requests.put(self.domain + endpoint, headers=self.header, **kwargs, auth=self.auth)
25+
if 'raw' in args:
26+
return res
27+
else:
28+
return res.status_code
29+
30+
31+
def delete(self, endpoint, *args, **kwargs):
32+
self.validate_session()
33+
res = requests.delete(self.domain + endpoint, headers=self.header, **kwargs, auth=self.auth)
34+
if 'raw' in args:
35+
return res
36+
else:
37+
return res.status_code

0 commit comments

Comments
 (0)