@@ -197,14 +197,43 @@ class TestOAuthFlow:
197
197
"""Test OAuth flow methods."""
198
198
199
199
@pytest .mark .anyio
200
- async def test_discover_protected_resource_request (self , oauth_provider ):
201
- """Test protected resource discovery request building."""
202
- request = await oauth_provider ._discover_protected_resource ()
200
+ async def test_discover_protected_resource_request (self , client_metadata , mock_storage ):
201
+ """Test protected resource discovery request building maintains backward compatibility."""
203
202
203
+ async def redirect_handler (url : str ) -> None :
204
+ pass
205
+
206
+ async def callback_handler () -> tuple [str , str | None ]:
207
+ return "test_auth_code" , "test_state"
208
+
209
+ provider = OAuthClientProvider (
210
+ server_url = "https://api.example.com" ,
211
+ client_metadata = client_metadata ,
212
+ storage = mock_storage ,
213
+ redirect_handler = redirect_handler ,
214
+ callback_handler = callback_handler ,
215
+ )
216
+
217
+ # Test without WWW-Authenticate (fallback)
218
+ init_response = httpx .Response (
219
+ status_code = 401 , headers = {}, request = httpx .Request ("GET" , "https://request-api.example.com" )
220
+ )
221
+
222
+ request = await provider ._discover_protected_resource (init_response )
204
223
assert request .method == "GET"
205
224
assert str (request .url ) == "https://api.example.com/.well-known/oauth-protected-resource"
206
225
assert "mcp-protocol-version" in request .headers
207
226
227
+ # Test with WWW-Authenticate header
228
+ init_response .headers ["WWW-Authenticate" ] = (
229
+ 'Bearer resource_metadata="https://prm.example.com/.well-known/oauth-protected-resource/path"'
230
+ )
231
+
232
+ request = await provider ._discover_protected_resource (init_response )
233
+ assert request .method == "GET"
234
+ assert str (request .url ) == "https://prm.example.com/.well-known/oauth-protected-resource/path"
235
+ assert "mcp-protocol-version" in request .headers
236
+
208
237
@pytest .mark .anyio
209
238
async def test_discover_oauth_metadata_request (self , oauth_provider ):
210
239
"""Test OAuth metadata discovery request building."""
@@ -660,3 +689,114 @@ def test_build_metadata(
660
689
"code_challenge_methods_supported" : ["S256" ],
661
690
}
662
691
)
692
+
693
+
694
+ class TestProtectedResourceWWWAuthenticate :
695
+ """Test RFC9728 WWW-Authenticate header parsing functionality for protected resource."""
696
+
697
+ @pytest .mark .parametrize (
698
+ "www_auth_header,expected_url" ,
699
+ [
700
+ # Quoted URL
701
+ (
702
+ 'Bearer resource_metadata="https://api.example.com/.well-known/oauth-protected-resource"' ,
703
+ "https://api.example.com/.well-known/oauth-protected-resource" ,
704
+ ),
705
+ # Unquoted URL
706
+ (
707
+ "Bearer resource_metadata=https://api.example.com/.well-known/oauth-protected-resource" ,
708
+ "https://api.example.com/.well-known/oauth-protected-resource" ,
709
+ ),
710
+ # Complex header with multiple parameters
711
+ (
712
+ 'Bearer realm="api", resource_metadata="https://api.example.com/.well-known/oauth-protected-resource", '
713
+ 'error="insufficient_scope"' ,
714
+ "https://api.example.com/.well-known/oauth-protected-resource" ,
715
+ ),
716
+ # Different URL format
717
+ ('Bearer resource_metadata="https://custom.domain.com/metadata"' , "https://custom.domain.com/metadata" ),
718
+ # With path and query params
719
+ (
720
+ 'Bearer resource_metadata="https://api.example.com/auth/metadata?version=1"' ,
721
+ "https://api.example.com/auth/metadata?version=1" ,
722
+ ),
723
+ ],
724
+ )
725
+ def test_extract_resource_metadata_from_www_auth_valid_cases (
726
+ self , client_metadata , mock_storage , www_auth_header , expected_url
727
+ ):
728
+ """Test extraction of resource_metadata URL from various valid WWW-Authenticate headers."""
729
+
730
+ async def redirect_handler (url : str ) -> None :
731
+ pass
732
+
733
+ async def callback_handler () -> tuple [str , str | None ]:
734
+ return "test_auth_code" , "test_state"
735
+
736
+ provider = OAuthClientProvider (
737
+ server_url = "https://api.example.com/v1/mcp" ,
738
+ client_metadata = client_metadata ,
739
+ storage = mock_storage ,
740
+ redirect_handler = redirect_handler ,
741
+ callback_handler = callback_handler ,
742
+ )
743
+
744
+ init_response = httpx .Response (
745
+ status_code = 401 ,
746
+ headers = {"WWW-Authenticate" : www_auth_header },
747
+ request = httpx .Request ("GET" , "https://api.example.com/test" ),
748
+ )
749
+
750
+ result = provider ._extract_resource_metadata_from_www_auth (init_response )
751
+ assert result == expected_url
752
+
753
+ @pytest .mark .parametrize (
754
+ "status_code,www_auth_header,description" ,
755
+ [
756
+ # No header
757
+ (401 , None , "no WWW-Authenticate header" ),
758
+ # Empty header
759
+ (401 , "" , "empty WWW-Authenticate header" ),
760
+ # Header without resource_metadata
761
+ (401 , 'Bearer realm="api", error="insufficient_scope"' , "no resource_metadata parameter" ),
762
+ # Malformed header
763
+ (401 , "Bearer resource_metadata=" , "malformed resource_metadata parameter" ),
764
+ # Non-401 status code
765
+ (
766
+ 200 ,
767
+ 'Bearer resource_metadata="https://api.example.com/.well-known/oauth-protected-resource"' ,
768
+ "200 OK response" ,
769
+ ),
770
+ (
771
+ 500 ,
772
+ 'Bearer resource_metadata="https://api.example.com/.well-known/oauth-protected-resource"' ,
773
+ "500 error response" ,
774
+ ),
775
+ ],
776
+ )
777
+ def test_extract_resource_metadata_from_www_auth_invalid_cases (
778
+ self , client_metadata , mock_storage , status_code , www_auth_header , description
779
+ ):
780
+ """Test extraction returns None for invalid cases."""
781
+
782
+ async def redirect_handler (url : str ) -> None :
783
+ pass
784
+
785
+ async def callback_handler () -> tuple [str , str | None ]:
786
+ return "test_auth_code" , "test_state"
787
+
788
+ provider = OAuthClientProvider (
789
+ server_url = "https://api.example.com/v1/mcp" ,
790
+ client_metadata = client_metadata ,
791
+ storage = mock_storage ,
792
+ redirect_handler = redirect_handler ,
793
+ callback_handler = callback_handler ,
794
+ )
795
+
796
+ headers = {"WWW-Authenticate" : www_auth_header } if www_auth_header is not None else {}
797
+ init_response = httpx .Response (
798
+ status_code = status_code , headers = headers , request = httpx .Request ("GET" , "https://api.example.com/test" )
799
+ )
800
+
801
+ result = provider ._extract_resource_metadata_from_www_auth (init_response )
802
+ assert result is None , f"Should return None for { description } "
0 commit comments