Skip to content

Commit 964098c

Browse files
Add GitHub permission checking utilities
1 parent d738804 commit 964098c

File tree

2 files changed

+321
-0
lines changed

2 files changed

+321
-0
lines changed

src/django_github_app/permissions.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
from __future__ import annotations
2+
3+
from enum import Enum
4+
from typing import NamedTuple
5+
6+
import cachetools
7+
import gidgethub
8+
9+
from django_github_app.github import AsyncGitHubAPI
10+
from django_github_app.github import SyncGitHubAPI
11+
12+
13+
class PermissionCacheKey(NamedTuple):
14+
owner: str
15+
repo: str
16+
username: str
17+
18+
19+
class Permission(int, Enum):
20+
NONE = 0
21+
READ = 1
22+
TRIAGE = 2
23+
WRITE = 3
24+
MAINTAIN = 4
25+
ADMIN = 5
26+
27+
@classmethod
28+
def from_string(cls, permission: str) -> Permission:
29+
permission_map = {
30+
"none": cls.NONE,
31+
"read": cls.READ,
32+
"triage": cls.TRIAGE,
33+
"write": cls.WRITE,
34+
"maintain": cls.MAINTAIN,
35+
"admin": cls.ADMIN,
36+
}
37+
38+
normalized = permission.lower().strip()
39+
if normalized not in permission_map:
40+
raise ValueError(f"Unknown permission level: {permission}")
41+
42+
return permission_map[normalized]
43+
44+
45+
cache: cachetools.LRUCache[PermissionCacheKey, Permission] = cachetools.LRUCache(
46+
maxsize=128
47+
)
48+
49+
50+
async def aget_user_permission(
51+
gh: AsyncGitHubAPI, owner: str, repo: str, username: str
52+
) -> Permission:
53+
cache_key = PermissionCacheKey(owner, repo, username)
54+
55+
if cache_key in cache:
56+
return cache[cache_key]
57+
58+
permission = Permission.NONE
59+
60+
try:
61+
# Check if user is a collaborator and get their permission
62+
data = await gh.getitem(
63+
f"/repos/{owner}/{repo}/collaborators/{username}/permission"
64+
)
65+
permission_str = data.get("permission", "none")
66+
permission = Permission.from_string(permission_str)
67+
except gidgethub.HTTPException as e:
68+
if e.status_code == 404:
69+
# User is not a collaborator, they have read permission if repo is public
70+
# Check if repo is public
71+
try:
72+
repo_data = await gh.getitem(f"/repos/{owner}/{repo}")
73+
if not repo_data.get("private", True):
74+
permission = Permission.READ
75+
except gidgethub.HTTPException:
76+
pass
77+
78+
cache[cache_key] = permission
79+
return permission
80+
81+
82+
def get_user_permission(
83+
gh: SyncGitHubAPI, owner: str, repo: str, username: str
84+
) -> Permission:
85+
cache_key = PermissionCacheKey(owner, repo, username)
86+
87+
if cache_key in cache:
88+
return cache[cache_key]
89+
90+
permission = Permission.NONE
91+
92+
try:
93+
# Check if user is a collaborator and get their permission
94+
data = gh.getitem(f"/repos/{owner}/{repo}/collaborators/{username}/permission")
95+
permission_str = data.get("permission", "none")
96+
permission = Permission.from_string(permission_str)
97+
except gidgethub.HTTPException as e:
98+
if e.status_code == 404:
99+
# User is not a collaborator, they have read permission if repo is public
100+
# Check if repo is public
101+
try:
102+
repo_data = gh.getitem(f"/repos/{owner}/{repo}")
103+
if not repo_data.get("private", True):
104+
permission = Permission.READ
105+
except gidgethub.HTTPException:
106+
pass
107+
108+
cache[cache_key] = permission
109+
return permission

tests/test_permissions.py

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
"""Tests for GitHub permission checking utilities."""
2+
3+
from __future__ import annotations
4+
5+
from unittest.mock import AsyncMock, Mock, create_autospec
6+
7+
import gidgethub
8+
import pytest
9+
10+
from django_github_app.github import AsyncGitHubAPI, SyncGitHubAPI
11+
from django_github_app.permissions import (
12+
Permission,
13+
aget_user_permission,
14+
get_user_permission,
15+
cache,
16+
)
17+
18+
19+
@pytest.fixture(autouse=True)
20+
def clear_cache():
21+
"""Clear the permission cache before and after each test."""
22+
cache.clear()
23+
yield
24+
cache.clear()
25+
26+
27+
class TestPermission:
28+
"""Test Permission enum functionality."""
29+
30+
def test_permission_ordering(self):
31+
"""Test that permission levels are correctly ordered."""
32+
assert Permission.NONE < Permission.READ
33+
assert Permission.READ < Permission.TRIAGE
34+
assert Permission.TRIAGE < Permission.WRITE
35+
assert Permission.WRITE < Permission.MAINTAIN
36+
assert Permission.MAINTAIN < Permission.ADMIN
37+
38+
assert Permission.ADMIN > Permission.WRITE
39+
assert Permission.WRITE >= Permission.WRITE
40+
assert Permission.READ <= Permission.TRIAGE
41+
42+
def test_from_string(self):
43+
"""Test converting string permissions to enum."""
44+
assert Permission.from_string("read") == Permission.READ
45+
assert Permission.from_string("READ") == Permission.READ
46+
assert Permission.from_string(" admin ") == Permission.ADMIN
47+
assert Permission.from_string("triage") == Permission.TRIAGE
48+
assert Permission.from_string("write") == Permission.WRITE
49+
assert Permission.from_string("maintain") == Permission.MAINTAIN
50+
assert Permission.from_string("none") == Permission.NONE
51+
52+
def test_from_string_invalid(self):
53+
"""Test that invalid permission strings raise ValueError."""
54+
with pytest.raises(ValueError, match="Unknown permission level: invalid"):
55+
Permission.from_string("invalid")
56+
57+
with pytest.raises(ValueError, match="Unknown permission level: owner"):
58+
Permission.from_string("owner")
59+
60+
61+
@pytest.mark.asyncio
62+
class TestGetUserPermission:
63+
"""Test aget_user_permission function."""
64+
65+
async def test_collaborator_with_admin_permission(self):
66+
"""Test getting permission for a collaborator with admin access."""
67+
gh = create_autospec(AsyncGitHubAPI, instance=True)
68+
gh.getitem = AsyncMock(return_value={"permission": "admin"})
69+
70+
permission = await aget_user_permission(gh, "owner", "repo", "user")
71+
72+
assert permission == Permission.ADMIN
73+
gh.getitem.assert_called_once_with(
74+
"/repos/owner/repo/collaborators/user/permission"
75+
)
76+
77+
async def test_collaborator_with_write_permission(self):
78+
"""Test getting permission for a collaborator with write access."""
79+
gh = create_autospec(AsyncGitHubAPI, instance=True)
80+
gh.getitem = AsyncMock(return_value={"permission": "write"})
81+
82+
permission = await aget_user_permission(gh, "owner", "repo", "user")
83+
84+
assert permission == Permission.WRITE
85+
86+
async def test_non_collaborator_public_repo(self):
87+
"""Test non-collaborator has read access to public repo."""
88+
gh = create_autospec(AsyncGitHubAPI, instance=True)
89+
# First call returns 404 (not a collaborator)
90+
gh.getitem = AsyncMock(side_effect=[
91+
gidgethub.HTTPException(404, "Not found", {}),
92+
{"private": False}, # Repo is public
93+
])
94+
95+
permission = await aget_user_permission(gh, "owner", "repo", "user")
96+
97+
assert permission == Permission.READ
98+
assert gh.getitem.call_count == 2
99+
gh.getitem.assert_any_call("/repos/owner/repo/collaborators/user/permission")
100+
gh.getitem.assert_any_call("/repos/owner/repo")
101+
102+
async def test_non_collaborator_private_repo(self):
103+
"""Test non-collaborator has no access to private repo."""
104+
gh = create_autospec(AsyncGitHubAPI, instance=True)
105+
# First call returns 404 (not a collaborator)
106+
gh.getitem = AsyncMock(side_effect=[
107+
gidgethub.HTTPException(404, "Not found", {}),
108+
{"private": True}, # Repo is private
109+
])
110+
111+
permission = await aget_user_permission(gh, "owner", "repo", "user")
112+
113+
assert permission == Permission.NONE
114+
115+
async def test_api_error_returns_none_permission(self):
116+
"""Test that API errors default to no permission."""
117+
gh = create_autospec(AsyncGitHubAPI, instance=True)
118+
gh.getitem = AsyncMock(side_effect=gidgethub.HTTPException(
119+
500, "Server error", {}
120+
))
121+
122+
permission = await aget_user_permission(gh, "owner", "repo", "user")
123+
124+
assert permission == Permission.NONE
125+
126+
async def test_missing_permission_field(self):
127+
"""Test handling response without permission field."""
128+
gh = create_autospec(AsyncGitHubAPI, instance=True)
129+
gh.getitem = AsyncMock(return_value={}) # No permission field
130+
131+
permission = await aget_user_permission(gh, "owner", "repo", "user")
132+
133+
assert permission == Permission.NONE
134+
135+
136+
class TestGetUserPermissionSync:
137+
"""Test synchronous get_user_permission function."""
138+
139+
def test_collaborator_with_permission(self):
140+
"""Test getting permission for a collaborator."""
141+
gh = create_autospec(SyncGitHubAPI, instance=True)
142+
gh.getitem = Mock(return_value={"permission": "maintain"})
143+
144+
permission = get_user_permission(gh, "owner", "repo", "user")
145+
146+
assert permission == Permission.MAINTAIN
147+
gh.getitem.assert_called_once_with(
148+
"/repos/owner/repo/collaborators/user/permission"
149+
)
150+
151+
def test_non_collaborator_public_repo(self):
152+
"""Test non-collaborator has read access to public repo."""
153+
gh = create_autospec(SyncGitHubAPI, instance=True)
154+
# First call returns 404 (not a collaborator)
155+
gh.getitem = Mock(side_effect=[
156+
gidgethub.HTTPException(404, "Not found", {}),
157+
{"private": False}, # Repo is public
158+
])
159+
160+
permission = get_user_permission(gh, "owner", "repo", "user")
161+
162+
assert permission == Permission.READ
163+
164+
165+
@pytest.mark.asyncio
166+
class TestPermissionCaching:
167+
"""Test permission caching functionality."""
168+
169+
async def test_cache_hit(self):
170+
"""Test that cache returns stored values."""
171+
gh = create_autospec(AsyncGitHubAPI, instance=True)
172+
gh.getitem = AsyncMock(return_value={"permission": "write"})
173+
174+
# First call should hit the API
175+
perm1 = await aget_user_permission(gh, "owner", "repo", "user")
176+
assert perm1 == Permission.WRITE
177+
assert gh.getitem.call_count == 1
178+
179+
# Second call should use cache
180+
perm2 = await aget_user_permission(gh, "owner", "repo", "user")
181+
assert perm2 == Permission.WRITE
182+
assert gh.getitem.call_count == 1 # No additional API call
183+
184+
async def test_cache_different_users(self):
185+
"""Test that cache handles different users correctly."""
186+
gh = create_autospec(AsyncGitHubAPI, instance=True)
187+
gh.getitem = AsyncMock(side_effect=[
188+
{"permission": "write"},
189+
{"permission": "admin"},
190+
])
191+
192+
perm1 = await aget_user_permission(gh, "owner", "repo", "user1")
193+
perm2 = await aget_user_permission(gh, "owner", "repo", "user2")
194+
195+
assert perm1 == Permission.WRITE
196+
assert perm2 == Permission.ADMIN
197+
assert gh.getitem.call_count == 2
198+
199+
def test_sync_cache_hit(self):
200+
"""Test that sync version uses cache."""
201+
gh = create_autospec(SyncGitHubAPI, instance=True)
202+
gh.getitem = Mock(return_value={"permission": "read"})
203+
204+
# First call should hit the API
205+
perm1 = get_user_permission(gh, "owner", "repo", "user")
206+
assert perm1 == Permission.READ
207+
assert gh.getitem.call_count == 1
208+
209+
# Second call should use cache
210+
perm2 = get_user_permission(gh, "owner", "repo", "user")
211+
assert perm2 == Permission.READ
212+
assert gh.getitem.call_count == 1 # No additional API call

0 commit comments

Comments
 (0)