3
3
4
4
from datetime import datetime
5
5
from nose .tools import assert_equal
6
- from twilio .jwt import decode
7
- from twilio .jwt .access_token import AccessToken , ConversationsGrant , IpMessagingGrant , SyncGrant , VoiceGrant , VideoGrant
6
+
7
+ from twilio .jwt .access_token import AccessToken
8
+ from twilio .jwt .access_token .grants import (
9
+ ConversationsGrant ,
10
+ IpMessagingGrant ,
11
+ SyncGrant ,
12
+ VoiceGrant ,
13
+ VideoGrant
14
+ )
8
15
9
16
ACCOUNT_SID = 'AC123'
10
17
SIGNING_KEY_SID = 'SK123'
@@ -38,102 +45,109 @@ def _validate_claims(self, payload):
38
45
39
46
def test_empty_grants (self ):
40
47
scat = AccessToken (ACCOUNT_SID , SIGNING_KEY_SID , 'secret' )
41
- token = str ( scat )
48
+ token = scat . to_jwt ( )
42
49
43
50
assert_is_not_none (token )
44
- payload = decode (token , 'secret' )
45
- self ._validate_claims (payload )
46
- assert_equal ({}, payload ['grants' ])
51
+ decoded_token = AccessToken . from_jwt (token , 'secret' )
52
+ self ._validate_claims (decoded_token . payload )
53
+ assert_equal ({}, decoded_token . payload ['grants' ])
47
54
48
55
def test_nbf (self ):
49
56
now = int (time .mktime (datetime .now ().timetuple ()))
50
57
scat = AccessToken (ACCOUNT_SID , SIGNING_KEY_SID , 'secret' , nbf = now )
51
- token = str (scat )
58
+ token = scat .to_jwt ()
59
+
60
+ assert_is_not_none (token )
61
+ decoded_token = AccessToken .from_jwt (token , 'secret' )
62
+ self ._validate_claims (decoded_token .payload )
63
+ assert_equal (now , decoded_token .nbf )
52
64
65
+ def test_headers (self ):
66
+ scat = AccessToken (ACCOUNT_SID , SIGNING_KEY_SID , 'secret' )
67
+ token = scat .to_jwt ()
53
68
assert_is_not_none (token )
54
- payload = decode (token , 'secret' )
55
- self ._validate_claims (payload )
56
- assert_equal (now , payload ['nbf' ])
69
+ decoded_token = AccessToken .from_jwt (token , 'secret' )
70
+ self .assertEqual (decoded_token .headers ['cty' ], 'twilio-fpa;v=1' )
57
71
58
72
def test_identity (self ):
59
73
scat = AccessToken (ACCOUNT_SID , SIGNING_KEY_SID , 'secret' , identity = 'test@twilio.com' )
60
- token = str ( scat )
74
+ token = scat . to_jwt ( )
61
75
62
76
assert_is_not_none (token )
63
- payload = decode (token , 'secret' )
64
- self ._validate_claims (payload )
77
+ decoded_token = AccessToken . from_jwt (token , 'secret' )
78
+ self ._validate_claims (decoded_token . payload )
65
79
assert_equal ({
66
80
'identity' : 'test@twilio.com'
67
- }, payload ['grants' ])
81
+ }, decoded_token . payload ['grants' ])
68
82
69
83
def test_conversations_grant (self ):
70
84
scat = AccessToken (ACCOUNT_SID , SIGNING_KEY_SID , 'secret' )
71
85
scat .add_grant (ConversationsGrant (configuration_profile_sid = 'CP123' ))
72
86
73
- token = str ( scat )
87
+ token = scat . to_jwt ( )
74
88
assert_is_not_none (token )
75
- payload = decode (token , 'secret' )
76
- self ._validate_claims (payload )
77
- assert_equal (1 , len (payload ['grants' ]))
89
+ decoded_token = AccessToken . from_jwt (token , 'secret' )
90
+ self ._validate_claims (decoded_token . payload )
91
+ assert_equal (1 , len (decoded_token . payload ['grants' ]))
78
92
assert_equal ({
79
93
'configuration_profile_sid' : 'CP123'
80
- }, payload ['grants' ]['rtc' ])
94
+ }, decoded_token . payload ['grants' ]['rtc' ])
81
95
82
96
def test_video_grant (self ):
83
97
scat = AccessToken (ACCOUNT_SID , SIGNING_KEY_SID , 'secret' )
84
98
scat .add_grant (VideoGrant (configuration_profile_sid = 'CP123' ))
85
99
86
- token = str ( scat )
100
+ token = scat . to_jwt ( )
87
101
assert_is_not_none (token )
88
- payload = decode (token , 'secret' )
89
- self ._validate_claims (payload )
90
- assert_equal (1 , len (payload ['grants' ]))
102
+ decoded_token = AccessToken . from_jwt (token , 'secret' )
103
+ self ._validate_claims (decoded_token . payload )
104
+ assert_equal (1 , len (decoded_token . payload ['grants' ]))
91
105
assert_equal ({
92
106
'configuration_profile_sid' : 'CP123'
93
- }, payload ['grants' ]['video' ])
107
+ }, decoded_token . payload ['grants' ]['video' ])
94
108
95
109
def test_ip_messaging_grant (self ):
96
110
scat = AccessToken (ACCOUNT_SID , SIGNING_KEY_SID , 'secret' )
97
111
scat .add_grant (IpMessagingGrant (service_sid = 'IS123' , push_credential_sid = 'CR123' ))
98
112
99
- token = str ( scat )
113
+ token = scat . to_jwt ( )
100
114
assert_is_not_none (token )
101
- payload = decode (token , 'secret' )
102
- self ._validate_claims (payload )
103
- assert_equal (1 , len (payload ['grants' ]))
115
+ decoded_token = AccessToken . from_jwt (token , 'secret' )
116
+ self ._validate_claims (decoded_token . payload )
117
+ assert_equal (1 , len (decoded_token . payload ['grants' ]))
104
118
assert_equal ({
105
119
'service_sid' : 'IS123' ,
106
120
'push_credential_sid' : 'CR123'
107
- }, payload ['grants' ]['ip_messaging' ])
121
+ }, decoded_token . payload ['grants' ]['ip_messaging' ])
108
122
109
123
def test_sync_grant (self ):
110
124
scat = AccessToken (ACCOUNT_SID , SIGNING_KEY_SID , 'secret' )
111
125
scat .identity = "bender"
112
126
scat .add_grant (SyncGrant (service_sid = 'IS123' , endpoint_id = 'blahblahendpoint' ))
113
127
114
- token = str ( scat )
128
+ token = scat . to_jwt ( )
115
129
assert_is_not_none (token )
116
- payload = decode (token , 'secret' )
117
- self ._validate_claims (payload )
118
- assert_equal (2 , len (payload ['grants' ]))
119
- assert_equal ("bender" , payload ['grants' ]['identity' ])
130
+ decoded_token = AccessToken . from_jwt (token , 'secret' )
131
+ self ._validate_claims (decoded_token . payload )
132
+ assert_equal (2 , len (decoded_token . payload ['grants' ]))
133
+ assert_equal ("bender" , decoded_token . payload ['grants' ]['identity' ])
120
134
assert_equal ({
121
135
'service_sid' : 'IS123' ,
122
136
'endpoint_id' : 'blahblahendpoint'
123
- }, payload ['grants' ]['data_sync' ])
137
+ }, decoded_token . payload ['grants' ]['data_sync' ])
124
138
125
139
def test_grants (self ):
126
140
scat = AccessToken (ACCOUNT_SID , SIGNING_KEY_SID , 'secret' )
127
141
scat .add_grant (ConversationsGrant ())
128
142
scat .add_grant (IpMessagingGrant ())
129
143
130
- token = str ( scat )
144
+ token = scat . to_jwt ( )
131
145
assert_is_not_none (token )
132
- payload = decode (token , 'secret' )
133
- self ._validate_claims (payload )
134
- assert_equal (2 , len (payload ['grants' ]))
135
- assert_equal ({}, payload ['grants' ]['rtc' ])
136
- assert_equal ({}, payload ['grants' ]['ip_messaging' ])
146
+ decoded_token = AccessToken . from_jwt (token , 'secret' )
147
+ self ._validate_claims (decoded_token . payload )
148
+ assert_equal (2 , len (decoded_token . payload ['grants' ]))
149
+ assert_equal ({}, decoded_token . payload ['grants' ]['rtc' ])
150
+ assert_equal ({}, decoded_token . payload ['grants' ]['ip_messaging' ])
137
151
138
152
def test_programmable_voice_grant (self ):
139
153
grant = VoiceGrant (
@@ -146,16 +160,42 @@ def test_programmable_voice_grant(self):
146
160
scat = AccessToken (ACCOUNT_SID , SIGNING_KEY_SID , 'secret' )
147
161
scat .add_grant (grant )
148
162
149
- token = str ( scat )
163
+ token = scat . to_jwt ( )
150
164
assert_is_not_none (token )
151
- payload = decode (token , 'secret' )
152
- self ._validate_claims (payload )
153
- assert_equal (1 , len (payload ['grants' ]))
165
+ decoded_token = AccessToken . from_jwt (token , 'secret' )
166
+ self ._validate_claims (decoded_token . payload )
167
+ assert_equal (1 , len (decoded_token . payload ['grants' ]))
154
168
assert_equal ({
155
169
'outgoing' : {
156
170
'application_sid' : 'AP123' ,
157
171
'params' : {
158
172
'foo' : 'bar'
159
173
}
160
174
}
161
- }, payload ['grants' ]['voice' ])
175
+ }, decoded_token .payload ['grants' ]['voice' ])
176
+
177
+ def test_pass_grants_in_constructor (self ):
178
+ grants = [
179
+ ConversationsGrant (),
180
+ IpMessagingGrant ()
181
+ ]
182
+ scat = AccessToken (ACCOUNT_SID , SIGNING_KEY_SID , 'secret' , grants = grants )
183
+
184
+ token = scat .to_jwt ()
185
+ assert_is_not_none (token )
186
+
187
+ decoded_token = AccessToken .from_jwt (token , 'secret' )
188
+ self ._validate_claims (decoded_token .payload )
189
+ assert_equal (2 , len (decoded_token .payload ['grants' ]))
190
+ assert_equal ({}, decoded_token .payload ['grants' ]['rtc' ])
191
+ assert_equal ({}, decoded_token .payload ['grants' ]['ip_messaging' ])
192
+
193
+ def test_constructor_validates_grants (self ):
194
+ grants = [ConversationsGrant , 'GrantMeAccessToEverything' ]
195
+ self .assertRaises (ValueError , AccessToken , ACCOUNT_SID , SIGNING_KEY_SID , 'secret' ,
196
+ grants = grants )
197
+
198
+ def test_add_grant_validates_grant (self ):
199
+ scat = AccessToken (ACCOUNT_SID , SIGNING_KEY_SID , 'secret' )
200
+ scat .add_grant (ConversationsGrant ())
201
+ self .assertRaises (ValueError , scat .add_grant , 'GrantRootAccess' )
0 commit comments