Skip to content

Commit 2e20395

Browse files
committed
test(config): created config script to setup test workspace
1 parent 036ad2d commit 2e20395

File tree

2 files changed

+285
-0
lines changed

2 files changed

+285
-0
lines changed

scripts/release

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ ROOT_DIR = Path(__file__).parent.parent
1919
CHANGELOG_PATH = ROOT_DIR / "CHANGELOG.md"
2020
INIT_PATH = ROOT_DIR / "pygitguardian" / "__init__.py"
2121
CASSETTES_DIR = ROOT_DIR / "tests" / "cassettes"
22+
SETUP_WORKSPACE_SCRIPT = ROOT_DIR / "scripts" / "setup_test_workspace.py"
2223

2324
# The branch this script must be run from, except in dev mode.
2425
RELEASE_BRANCH = "master"
@@ -97,6 +98,16 @@ def main(dev_mode: bool) -> int:
9798
return 0
9899

99100

101+
def setup_test_workspace():
102+
log_progress("Setting up workspace")
103+
try:
104+
check_run(
105+
["pdm", "run", SETUP_WORKSPACE_SCRIPT], cwd=ROOT_DIR, stderr=subprocess.PIPE
106+
)
107+
except subprocess.CalledProcessError as exc:
108+
fail(f"There was an error setting up the test workspace :\n{exc.stderr}")
109+
110+
100111
@main.command()
101112
def run_tests() -> None:
102113
"""Run all tests.
@@ -110,6 +121,8 @@ def run_tests() -> None:
110121
shutil.rmtree(CASSETTES_DIR)
111122
CASSETTES_DIR.mkdir()
112123

124+
setup_test_workspace()
125+
113126
log_progress("Running tests")
114127
check_run(["pytest", "tests"], cwd=ROOT_DIR)
115128

scripts/setup_test_workspace.py

Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
"""
2+
Notice: This script will attempt to setup a test workspace on GitGuardian.
3+
This will allow the user to run tests without relying on cassettes, note that
4+
there are a few limitations due to actions that cannot be performed through
5+
the API, notably :
6+
- Create the workspace
7+
- Create members
8+
- Add deleted members back to the workspace
9+
- Integrate a source
10+
"""
11+
12+
import os
13+
from typing import Iterable, List, TypeVar
14+
15+
from pygitguardian.client import GGClient
16+
from pygitguardian.models import (
17+
AccessLevel,
18+
CreateInvitation,
19+
CreateTeam,
20+
CreateTeamInvitation,
21+
CreateTeamMember,
22+
Detail,
23+
IncidentPermission,
24+
InvitationParameters,
25+
Member,
26+
MembersParameters,
27+
Source,
28+
Team,
29+
TeamMember,
30+
TeamsParameters,
31+
UpdateMember,
32+
UpdateTeamSource,
33+
)
34+
from pygitguardian.models_utils import FromDictWithBase
35+
from tests.utils import CursorPaginatedResponse
36+
37+
38+
client = GGClient(
39+
api_key=os.environ["GITGUARDIAN_API_KEY"],
40+
base_uri=os.environ.get("GITGUARDIAN_API_URL"),
41+
)
42+
43+
T = TypeVar("T")
44+
PaginatedDataType = TypeVar("PaginatedDataType", bound=FromDictWithBase)
45+
46+
MIN_NB_TEAM = 2
47+
MIN_NB_MEMBER = 3 # 1 owner, 1 manager and at least one member
48+
MIN_NB_TEAM_MEMBER = 2
49+
# This is the team that is created in the tests, it should be deleted before we run the tests
50+
PYGITGUARDIAN_TEST_TEAM = "PyGitGuardian team"
51+
52+
53+
def ensure_not_detail(var: T | Detail) -> T:
54+
if not isinstance(var, Detail):
55+
return var
56+
else:
57+
raise TypeError(var.detail)
58+
59+
60+
def unwrap_paginated_response(
61+
var: CursorPaginatedResponse[PaginatedDataType] | Detail,
62+
) -> List[PaginatedDataType]:
63+
data = ensure_not_detail(var)
64+
65+
return data.data
66+
67+
68+
def ensure_member_coherence():
69+
deactivated_members = unwrap_paginated_response(
70+
client.list_members(MembersParameters(active=False))
71+
)
72+
for member in deactivated_members:
73+
client.update_member(UpdateMember(member.id, AccessLevel.MEMBER, active=True))
74+
75+
admin_members = unwrap_paginated_response(
76+
client.list_members(MembersParameters(access_level=AccessLevel.MANAGER))
77+
)
78+
79+
if len(admin_members) > 1:
80+
for member in admin_members[1:]:
81+
ensure_not_detail(
82+
client.update_member(UpdateMember(member.id, AccessLevel.MEMBER))
83+
)
84+
else:
85+
members = unwrap_paginated_response(
86+
client.list_members(MembersParameters(access_level=AccessLevel.MEMBER))
87+
)
88+
assert (
89+
len(members) > 0
90+
), "There must be at least one member with access level member in the workspace"
91+
92+
ensure_not_detail(
93+
client.update_member(UpdateMember(members[0].id, AccessLevel.MANAGER))
94+
)
95+
96+
members = ensure_not_detail(client.list_members(MembersParameters(per_page=5)))
97+
98+
assert len(members.data) > 3, "There must be at least 3 members in the workspace"
99+
100+
101+
def add_source_to_team(team: Team, available_sources: Iterable[Source] | None = None):
102+
if available_sources is None:
103+
available_sources = ensure_not_detail(client.list_sources()).data
104+
105+
ensure_not_detail(
106+
client.update_team_source(
107+
UpdateTeamSource(team.id, [source.id for source in available_sources], [])
108+
)
109+
)
110+
111+
112+
def add_team_members(
113+
team: Team,
114+
team_members: Iterable[TeamMember],
115+
nb_members: int,
116+
available_members: Iterable[Member] | None = None,
117+
):
118+
print(
119+
f"For team: {team.name}, it has {len(team_members)} team members, we will add {nb_members}"
120+
)
121+
assert nb_members > 0, "We should add at least one member"
122+
if available_members is None:
123+
available_members = unwrap_paginated_response(client.list_members())
124+
125+
# Every manager is by default a team leader
126+
has_admin = any(team_member.is_team_leader for team_member in team_members)
127+
128+
if not has_admin:
129+
admin_member = next(
130+
(
131+
member
132+
for member in available_members
133+
if member.access_level == AccessLevel.MANAGER
134+
),
135+
None,
136+
)
137+
assert admin_member is not None, "There should be at least one admin member"
138+
139+
ensure_not_detail(
140+
client.create_team_member(
141+
team.id,
142+
CreateTeamMember(
143+
admin_member.id,
144+
is_team_leader=True,
145+
incident_permission=IncidentPermission.FULL_ACCESS,
146+
),
147+
)
148+
)
149+
print(f"Added one admin member to the team: {team.name}")
150+
nb_members -= 1
151+
152+
team_member_ids = {team_member.member_id for team_member in team_members}
153+
for _ in range(nb_members):
154+
to_add_member = next(
155+
(
156+
member
157+
for member in available_members
158+
if member.id not in team_member_ids
159+
and member.access_level not in {AccessLevel.OWNER, AccessLevel.MANAGER}
160+
),
161+
None,
162+
)
163+
assert to_add_member is not None, "There is not enough members in the workspace"
164+
is_team_leader = False
165+
permissions = IncidentPermission.FULL_ACCESS
166+
167+
if to_add_member.access_level == AccessLevel.MANAGER:
168+
is_team_leader = True
169+
170+
ensure_not_detail(
171+
client.create_team_member(
172+
team.id,
173+
CreateTeamMember(
174+
to_add_member.id,
175+
is_team_leader=is_team_leader,
176+
incident_permission=permissions,
177+
),
178+
)
179+
)
180+
print(f"Added one member to the team: {team.name}")
181+
182+
183+
def ensure_team_coherence():
184+
pygitguardian_teams = []
185+
try:
186+
pygitguardian_teams = unwrap_paginated_response(
187+
client.list_teams(TeamsParameters(search=PYGITGUARDIAN_TEST_TEAM))
188+
)
189+
except TypeError as exc:
190+
if str(exc) != "Team not found.":
191+
raise
192+
finally:
193+
for team in pygitguardian_teams:
194+
ensure_not_detail(client.delete_team(team.id))
195+
196+
teams = unwrap_paginated_response(
197+
# exclude global team since we can't add sources / members to it
198+
client.list_teams(TeamsParameters(is_global=False))
199+
)
200+
201+
nb_teams = len(teams)
202+
if nb_teams < MIN_NB_TEAM:
203+
for i in range(MIN_NB_TEAM - nb_teams):
204+
new_team = ensure_not_detail(
205+
client.create_team(CreateTeam(name=f"PyGitGuardian Team {i}"))
206+
)
207+
teams.append(new_team)
208+
209+
# Ensure every team has:
210+
# - At least one source
211+
# - At least two members, one with admin access and one with member access
212+
for team in teams:
213+
team_members = unwrap_paginated_response(client.list_team_members(team.id))
214+
nb_team_members = len(team_members)
215+
if nb_team_members < MIN_NB_TEAM_MEMBER:
216+
add_team_members(team, team_members, MIN_NB_TEAM_MEMBER - nb_team_members)
217+
218+
team_sources = unwrap_paginated_response(client.list_team_sources(team.id))
219+
nb_team_sources = len(team_sources)
220+
if nb_team_sources == 0:
221+
add_source_to_team(team)
222+
223+
224+
def ensure_invitation_coherence():
225+
test_invitation = unwrap_paginated_response(
226+
client.list_invitations(InvitationParameters(search="pygitguardian"))
227+
)
228+
229+
for invitation in test_invitation:
230+
ensure_not_detail(client.delete_invitation(invitation.id))
231+
invitations = unwrap_paginated_response(client.list_invitations())
232+
233+
if len(invitations) < 1:
234+
invitation = ensure_not_detail(
235+
client.create_invitation(
236+
CreateInvitation(
237+
email="pygitguardian@invitation.com",
238+
access_level=AccessLevel.MEMBER,
239+
)
240+
)
241+
)
242+
invitations.append(invitation)
243+
244+
teams = unwrap_paginated_response(client.list_teams())
245+
invitation = invitations[0]
246+
for team in teams:
247+
team_invitations = unwrap_paginated_response(
248+
client.list_team_invitations(team.id)
249+
)
250+
if not team_invitations:
251+
ensure_not_detail(
252+
client.create_team_invitation(
253+
team.id,
254+
CreateTeamInvitation(
255+
invitation_id=invitation.id,
256+
is_team_leader=False,
257+
incident_permission=IncidentPermission.FULL_ACCESS,
258+
),
259+
)
260+
)
261+
262+
263+
def main():
264+
ensure_member_coherence()
265+
ensure_team_coherence()
266+
ensure_invitation_coherence()
267+
268+
print("Test workspace has been set up properly")
269+
270+
271+
if __name__ == "__main__":
272+
main()

0 commit comments

Comments
 (0)