6
6
7
7
import asyncio
8
8
import itertools
9
- from typing import Any , Dict , Optional
9
+ import ssl
10
+ import warnings
11
+ from typing import Any , Dict
10
12
from urllib .parse import urlparse
11
13
12
- from aiohttp import ClientSession
14
+ from aiohttp import ClientSession , TCPConnector
13
15
14
16
from .exceptions import BitrixError
15
17
@@ -21,28 +23,40 @@ class Bitrix24:
21
23
Provides an easy way to communicate with Bitrix24 portal over REST without OAuth.
22
24
"""
23
25
24
- def __init__ (self , domain : str , timeout : int = 60 ):
26
+ def __init__ (
27
+ self ,
28
+ domain : str ,
29
+ timeout : int = 60 ,
30
+ safe : bool = True ,
31
+ fetch_all_pages : bool = True ,
32
+ retry_after : int = 3 ,
33
+ ):
25
34
"""
26
35
Create Bitrix24 API object.
27
36
28
37
Parameters
29
38
----------
30
39
domain (str): Bitrix24 webhook domain
31
40
timeout (int): Timeout for API request in seconds
41
+ safe (bool): Set to `False` to ignore the certificate verification
42
+ fetch_all_pages (bool): Fetch all pages for paginated requests
43
+ retry_after (int): Retry after seconds for QUERY_LIMIT_EXCEEDED error
32
44
"""
33
- self .domain = self ._prepare_domain (domain )
34
- self .timeout = timeout
45
+ self ._domain = self ._prepare_domain (domain )
46
+ self ._timeout = int (timeout )
47
+ self ._fetch_all_pages = bool (fetch_all_pages )
48
+ self ._retry_after = int (retry_after )
49
+ self ._verify_ssl = bool (safe )
35
50
36
51
def _prepare_domain (self , domain : str ) -> str :
37
52
"""Normalize user passed domain to a valid one."""
38
- if not domain :
39
- raise BitrixError ("Empty domain" )
40
-
41
53
o = urlparse (domain )
54
+ if not o .scheme or not o .netloc :
55
+ raise BitrixError ("Not a valid domain. Please provide a valid domain." )
42
56
user_id , code = o .path .split ("/" )[2 :4 ]
43
57
return "{0}://{1}/rest/{2}/{3}" .format (o .scheme , o .netloc , user_id , code )
44
58
45
- def _prepare_params (self , params : Dict [str , Any ], prev = "" ) -> str :
59
+ def _prepare_params (self , params : Dict [str , Any ], prev : str = "" ) -> str :
46
60
"""
47
61
Transform list of parameters to a valid bitrix array.
48
62
@@ -81,41 +95,50 @@ def _prepare_params(self, params: Dict[str, Any], prev="") -> str:
81
95
return ret
82
96
83
97
async def request (self , method : str , params : str = None ) -> Dict [str , Any ]:
84
- async with ClientSession () as session :
98
+ ssl_context = ssl .create_default_context ()
99
+ if not self ._verify_ssl :
100
+ ssl_context .check_hostname = False
101
+ ssl_context .verify_mode = ssl .CERT_NONE
102
+ async with ClientSession (connector = TCPConnector (ssl = ssl_context )) as session :
85
103
async with session .get (
86
- f"{ self .domain } /{ method } .json" , params = params , timeout = self .timeout
104
+ f"{ self ._domain } /{ method } .json" , params = params , timeout = self ._timeout
87
105
) as resp :
88
106
if resp .status not in [200 , 201 ]:
89
107
raise BitrixError (f"HTTP error: { resp .status } " )
90
108
response = await resp .json ()
91
109
if "error" in response :
110
+ if response ["error" ] == "QUERY_LIMIT_EXCEEDED" :
111
+ await asyncio .sleep (self ._retry_after )
112
+ return await self .request (method , params )
92
113
raise BitrixError (response ["error_description" ], response ["error" ])
93
114
return response
94
115
95
- async def call (self , method : str , params : Dict [str , Any ] = {}, start : Optional [int ] = None ):
116
+ async def _call (
117
+ self , method : str , params : Dict [str , Any ] = {}, start : int = 0
118
+ ) -> Dict [str , Any ]:
96
119
"""Async call a REST method with specified parameters.
97
120
98
- This method is a replacement for the callMethod method, which is synchronous.
99
-
100
121
Parameters
101
122
----------
102
123
method (str): REST method name
103
124
params (dict): Optional arguments which will be converted to a POST request string
125
+ start (int): Offset for pagination
104
126
"""
105
- if start is not None :
106
- params ["start" ] = start
127
+ params ["start" ] = start
107
128
108
129
payload = self ._prepare_params (params )
109
130
res = await self .request (method , payload )
110
131
111
- if "next" in res and start is None :
112
- tasks = [self .call (method , params , start = start ) for start in range (res ["total" ] // 50 )]
132
+ if "next" in res and not start and self ._fetch_all_pages :
133
+ tasks = [
134
+ self ._call (method , params , (s + 1 ) * 50 ) for s in range (res ["total" ] // 50 - 1 )
135
+ ]
113
136
items = await asyncio .gather (* tasks )
114
137
result = list (itertools .chain (* items ))
115
138
return res ["result" ] + result
116
139
return res ["result" ]
117
140
118
- def callMethod (self , method : str , params : Dict [str , Any ] = {}, ** kwargs ):
141
+ def callMethod (self , method : str , params : Dict [str , Any ] = {}, ** kwargs ) -> Dict [ str , Any ] :
119
142
"""Call a REST method with specified parameters.
120
143
121
144
Parameters
@@ -133,10 +156,16 @@ def callMethod(self, method: str, params: Dict[str, Any] = {}, **kwargs):
133
156
try :
134
157
loop = asyncio .get_running_loop ()
135
158
except RuntimeError :
159
+ warnings .warn (
160
+ "You are using `callMethod` method in a synchronous way. "
161
+ "Starting from version 3, this method will be completly asynchronous."
162
+ "Please consider updating your code" ,
163
+ DeprecationWarning ,
164
+ )
136
165
loop = asyncio .new_event_loop ()
137
166
asyncio .set_event_loop (loop )
138
- result = loop .run_until_complete (self .call (method , params or kwargs ))
167
+ result = loop .run_until_complete (self ._call (method , params or kwargs ))
139
168
loop .close ()
140
169
else :
141
- result = asyncio .ensure_future (self .call (method , params or kwargs ))
170
+ result = asyncio .ensure_future (self ._call (method , params or kwargs ))
142
171
return result
0 commit comments