Skip to content

Commit eeba485

Browse files
authored
feat: Create users in metadata backend via API (#289)
* Initial commit of rough draft. Allow creating users in metadata db via API. Signed-off-by: Grant Seward <grant@stemma.ai> * Cleaned up commented items Signed-off-by: Grant Seward <grant@stemma.ai> * Added abstract functions to fix make test Signed-off-by: Grant Seward <grant@stemma.ai> * added initial proxy test Signed-off-by: Grant Seward <grant@stemma.ai> * Added user tests, fixes for make file to pass Signed-off-by: Grant Seward <grant@stemma.ai> * Removed extra test mocks per PR feedback Signed-off-by: Grant Seward <grant@stemma.ai>
1 parent c8423c6 commit eeba485

File tree

8 files changed

+194
-2
lines changed

8 files changed

+194
-2
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
Create or update a user
2+
---
3+
tags:
4+
- 'user'
5+
requestBody:
6+
content:
7+
application/json:
8+
schema:
9+
$ref: '#/components/schemas/UserDetailFields'
10+
description: User attribute fields
11+
required: true
12+
responses:
13+
200:
14+
description: 'Existing user found and updated'
15+
content:
16+
application/json:
17+
schema:
18+
$ref: '#/components/schemas/UserDetailFields'
19+
201:
20+
description: 'New user created'
21+
content:
22+
application/json:
23+
schema:
24+
$ref: '#/components/schemas/UserDetailFields'
25+
400:
26+
description: 'Bad Request'
27+
content:
28+
application/json:
29+
schema:
30+
$ref: '#/components/schemas/ErrorResponse'
31+
500:
32+
description: 'Internal server error'
33+
content:
34+
application/json:
35+
schema:
36+
$ref: '#/components/schemas/ErrorResponse'

metadata_service/api/user.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Copyright Contributors to the Amundsen project.
22
# SPDX-License-Identifier: Apache-2.0
33

4+
import json
45
import logging
56
from http import HTTPStatus
67
from typing import (Any, Dict, Iterable, List, Mapping, Optional, # noqa: F401
@@ -11,7 +12,9 @@
1112
from amundsen_common.models.user import UserSchema
1213
from flasgger import swag_from
1314
from flask import current_app as app
15+
from flask import request
1416
from flask_restful import Resource
17+
from marshmallow.exceptions import ValidationError as SchemaValidationError
1518

1619
from metadata_service.api import BaseAPI
1720
from metadata_service.entity.resource_type import (ResourceType,
@@ -44,6 +47,34 @@ def get(self, *, id: Optional[str] = None) -> Iterable[Union[Mapping, int, None]
4447
else:
4548
return super().get(id=id)
4649

50+
@swag_from('swagger_doc/user/detail_put.yml')
51+
def put(self) -> Iterable[Union[Mapping, int, None]]:
52+
"""
53+
Create or update a user. Serializes the data in the request body
54+
using the UserSchema, validating the inputs in the process to ensure
55+
all downstream processes leverage clean data, and passes the User
56+
object to the client to create or update the user record.
57+
"""
58+
if not request.data:
59+
return {'message': 'No user information provided in the request.'}, HTTPStatus.BAD_REQUEST
60+
61+
try:
62+
user_attributes = json.loads(request.data)
63+
schema = UserSchema()
64+
user = schema.load(user_attributes)
65+
66+
new_user, user_created = self.client.create_update_user(user=user)
67+
resp_code = HTTPStatus.CREATED if user_created else HTTPStatus.OK
68+
return schema.dumps(new_user), resp_code
69+
70+
except SchemaValidationError as schema_err:
71+
err_msg = 'User inputs provided are not valid: %s' % schema_err
72+
return {'message': err_msg}, HTTPStatus.BAD_REQUEST
73+
74+
except Exception:
75+
LOGGER.exception('UserDetailAPI PUT Failed')
76+
return {'message': 'Internal server error!'}, HTTPStatus.INTERNAL_SERVER_ERROR
77+
4778

4879
class UserFollowsAPI(Resource):
4980
"""

metadata_service/proxy/atlas_proxy.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,9 @@ def _get_owners(self, data_owners: list, fallback_owner: str = None) -> List[Use
454454
def get_user(self, *, id: str) -> Union[UserEntity, None]:
455455
pass
456456

457+
def create_update_user(self, *, user: User) -> Tuple[User, bool]:
458+
pass
459+
457460
def get_users(self) -> List[UserEntity]:
458461
pass
459462

metadata_service/proxy/base_proxy.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# SPDX-License-Identifier: Apache-2.0
33

44
from abc import ABCMeta, abstractmethod
5-
from typing import Any, Dict, List, Optional, Union
5+
from typing import Any, Dict, List, Optional, Tuple, Union
66

77
from amundsen_common.models.dashboard import DashboardSummary
88
from amundsen_common.models.lineage import Lineage
@@ -27,6 +27,18 @@ class BaseProxy(metaclass=ABCMeta):
2727
def get_user(self, *, id: str) -> Union[User, None]:
2828
pass
2929

30+
@abstractmethod
31+
def create_update_user(self, *, user: User) -> Tuple[User, bool]:
32+
"""
33+
Allows creating and updating users. Returns a tuple of the User
34+
object that has been created or updated as well as a flag that
35+
depicts whether or no the user was created or updated.
36+
37+
:param user: a User object
38+
:return: Tuple of [User object, bool (True = created, False = updated)]
39+
"""
40+
pass
41+
3042
@abstractmethod
3143
def get_users(self) -> List[User]:
3244
pass

metadata_service/proxy/gremlin_proxy.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from datetime import date, datetime, timedelta
99
from operator import attrgetter
1010
from typing import (Any, Callable, Dict, Iterable, List, Mapping, Optional,
11-
Sequence, Set, Type, TypeVar, Union, no_type_check,
11+
Sequence, Set, Tuple, Type, TypeVar, Union, no_type_check,
1212
overload)
1313
from urllib.parse import unquote
1414

@@ -1010,6 +1010,9 @@ def _get_user(self, *, id: str, executor: ExecuteQuery) -> Union[User, None]:
10101010
user.manager_fullname = _safe_get(managers[0], 'full_name', default=None) if managers else None
10111011
return user
10121012

1013+
def create_update_user(self, *, user: User) -> Tuple[User, bool]:
1014+
pass
1015+
10131016
@timer_with_counter
10141017
@overrides
10151018
def get_users(self) -> List[User]:

metadata_service/proxy/neo4j_proxy.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
Reader, Source, Stat, Table, Tag,
1919
User, Watermark)
2020
from amundsen_common.models.user import User as UserEntity
21+
from amundsen_common.models.user import UserSchema
2122
from beaker.cache import CacheManager
2223
from beaker.util import parse_cache_config_options
2324
from flask import current_app, has_app_context
@@ -42,6 +43,12 @@
4243
# Expire cache every 11 hours + jitter
4344
_GET_POPULAR_TABLE_CACHE_EXPIRY_SEC = 11 * 60 * 60 + randint(0, 3600)
4445

46+
47+
CREATED_EPOCH_MS = 'publisher_created_epoch_ms'
48+
LAST_UPDATED_EPOCH_MS = 'publisher_last_updated_epoch_ms'
49+
PUBLISHED_TAG_PROPERTY_NAME = 'published_tag'
50+
51+
4552
LOGGER = logging.getLogger(__name__)
4653

4754

@@ -958,6 +965,61 @@ def get_user(self, *, id: str) -> Union[UserEntity, None]:
958965

959966
return self._build_user_from_record(record=record, manager_name=manager_name)
960967

968+
def create_update_user(self, *, user: User) -> Tuple[User, bool]:
969+
"""
970+
Create a user if it does not exist, otherwise update the user. Required
971+
fields for creating / updating a user are validated upstream to this when
972+
the User object is created.
973+
974+
:param user:
975+
:return:
976+
"""
977+
user_data = UserSchema().dump(user)
978+
user_props = self._create_props_body(user_data, 'usr')
979+
980+
create_update_user_query = textwrap.dedent("""
981+
MERGE (usr:User {key: $user_id})
982+
on CREATE SET %s, usr.%s=timestamp()
983+
on MATCH SET %s
984+
RETURN usr, usr.%s = timestamp() as created
985+
""" % (user_props, CREATED_EPOCH_MS, user_props, CREATED_EPOCH_MS))
986+
987+
try:
988+
tx = self._driver.session().begin_transaction()
989+
result = tx.run(create_update_user_query, user_data)
990+
991+
user_result = result.single()
992+
if not user_result:
993+
raise RuntimeError('Failed to create user with data %s' % user_data)
994+
tx.commit()
995+
996+
new_user = self._build_user_from_record(user_result['usr'])
997+
new_user_created = True if user_result['created'] is True else False
998+
999+
except Exception as e:
1000+
if not tx.closed():
1001+
tx.rollback()
1002+
# propagate the exception back to api
1003+
raise e
1004+
1005+
return new_user, new_user_created
1006+
1007+
def _create_props_body(self,
1008+
record_dict: dict,
1009+
identifier: str) -> str:
1010+
"""
1011+
Creates a Neo4j property body by converting a dictionary into a comma
1012+
separated string of KEY = VALUE.
1013+
"""
1014+
props = []
1015+
for k, v in record_dict.items():
1016+
if v:
1017+
props.append(f'{identifier}.{k} = ${k}')
1018+
1019+
props.append(f"{identifier}.{PUBLISHED_TAG_PROPERTY_NAME} = 'api_create_update_user'")
1020+
props.append(f"{identifier}.{LAST_UPDATED_EPOCH_MS} = timestamp()")
1021+
return ', '.join(props)
1022+
9611023
def get_users(self) -> List[UserEntity]:
9621024
statement = "MATCH (usr:User) WHERE usr.is_active = true RETURN collect(usr) as users"
9631025

tests/unit/api/test_user.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Copyright Contributors to the Amundsen project.
22
# SPDX-License-Identifier: Apache-2.0
33

4+
import json
45
import unittest
56
from http import HTTPStatus
67
from unittest import mock
@@ -37,6 +38,32 @@ def test_gets(self) -> None:
3738
self.assertEqual(list(response)[1], HTTPStatus.OK)
3839
self.mock_client.get_users.assert_called_once()
3940

41+
def test_put(self) -> None:
42+
m = MagicMock()
43+
m.data = json.dumps({'email': 'create_email@email.com'})
44+
with mock.patch("metadata_service.api.user.request", m):
45+
# Test user creation
46+
create_email = {'email': 'test_email'}
47+
self.mock_client.create_update_user.return_value = create_email, True
48+
test_user, test_user_created = self.api.put()
49+
self.assertEqual(test_user, json.dumps(create_email))
50+
self.assertEqual(test_user_created, HTTPStatus.CREATED)
51+
52+
# Test user update
53+
update_email = {'email': 'update_email@email.com'}
54+
self.mock_client.create_update_user.return_value = update_email, False
55+
test_user2, test_user_updated = self.api.put()
56+
self.assertEqual(test_user2, json.dumps(update_email))
57+
self.assertEqual(test_user_updated, HTTPStatus.OK)
58+
59+
def test_put_no_inputs(self) -> None:
60+
# Test no data provided
61+
m2 = MagicMock()
62+
m2.data = {}
63+
with mock.patch("metadata_service.api.user.request", m2):
64+
_, status_code = self.api.put()
65+
self.assertEquals(status_code, HTTPStatus.BAD_REQUEST)
66+
4067

4168
class UserFollowsAPITest(unittest.TestCase):
4269

tests/unit/proxy/test_neo4j_proxy.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,24 @@ def test_get_user_other_key_values(self) -> None:
622622
neo4j_user = neo4j_proxy.get_user(id='test_email')
623623
self.assertEqual(neo4j_user.other_key_values, {'mode_user_id': 'mode_foo_bar'})
624624

625+
def test_put_user_new_user(self) -> None:
626+
"""
627+
Test creating a new user
628+
:return:
629+
"""
630+
with patch.object(GraphDatabase, 'driver') as mock_driver:
631+
mock_transaction = mock_driver.return_value.session.return_value.begin_transaction.return_value
632+
mock_run = mock_transaction.run
633+
mock_commit = mock_transaction.commit
634+
635+
test_user = MagicMock()
636+
637+
neo4j_proxy = Neo4jProxy(host='DOES_NOT_MATTER', port=0000)
638+
neo4j_proxy.create_update_user(user=test_user)
639+
640+
self.assertEqual(mock_run.call_count, 1)
641+
self.assertEqual(mock_commit.call_count, 1)
642+
625643
def test_get_users(self) -> None:
626644
with patch.object(GraphDatabase, 'driver'), patch.object(Neo4jProxy, '_execute_cypher_query') as mock_execute:
627645
test_user = {

0 commit comments

Comments
 (0)