Skip to content

Commit bddee13

Browse files
authored
chore: move connect to json pipe (#1580)
1 parent d2dad56 commit bddee13

File tree

8 files changed

+106
-88
lines changed

8 files changed

+106
-88
lines changed

meta.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ requirements:
2525
- python
2626
- greenlet ==1.1.3
2727
- pyee ==8.1.0
28-
- websockets ==10.1
2928
- typing_extensions # [py<39]
3029
test:
3130
requires:

playwright/_impl/_browser_type.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
ServiceWorkersPolicy,
4343
locals_to_params,
4444
)
45-
from playwright._impl._transport import WebSocketTransport
45+
from playwright._impl._json_pipe import JsonPipeTransport
4646
from playwright._impl._wait_helper import throw_on_timeout
4747

4848
if TYPE_CHECKING:
@@ -188,12 +188,22 @@ async def connect(
188188
) -> Browser:
189189
if timeout is None:
190190
timeout = 30000
191+
if slow_mo is None:
192+
slow_mo = 0
191193

192194
headers = {**(headers if headers else {}), "x-playwright-browser": self.name}
193-
194-
transport = WebSocketTransport(
195-
self._connection._loop, ws_endpoint, headers, slow_mo
195+
local_utils = self._connection.local_utils
196+
pipe_channel = await local_utils._channel.send(
197+
"connect",
198+
{
199+
"wsEndpoint": ws_endpoint,
200+
"headers": headers,
201+
"slowMo": slow_mo,
202+
"timeout": timeout,
203+
},
196204
)
205+
transport = JsonPipeTransport(self._connection._loop, pipe_channel)
206+
197207
connection = Connection(
198208
self._connection._dispatcher_fiber,
199209
self._connection._object_factory,

playwright/_impl/_connection.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -316,16 +316,25 @@ def dispatch(self, msg: ParsedMessagePayload) -> None:
316316
self._objects[guid]._dispose()
317317
return
318318
object = self._objects[guid]
319+
should_replace_guids_with_channels = "jsonPipe@" not in guid
319320
try:
320321
if self._is_sync:
321322
for listener in object._channel.listeners(method):
322323
# Each event handler is a potentilly blocking context, create a fiber for each
323324
# and switch to them in order, until they block inside and pass control to each
324325
# other and then eventually back to dispatcher as listener functions return.
325326
g = greenlet(listener)
326-
g.switch(self._replace_guids_with_channels(params))
327+
if should_replace_guids_with_channels:
328+
g.switch(self._replace_guids_with_channels(params))
329+
else:
330+
g.switch(params)
327331
else:
328-
object._channel.emit(method, self._replace_guids_with_channels(params))
332+
if should_replace_guids_with_channels:
333+
object._channel.emit(
334+
method, self._replace_guids_with_channels(params)
335+
)
336+
else:
337+
object._channel.emit(method, params)
329338
except BaseException as exc:
330339
print("Error occurred in event listener", file=sys.stderr)
331340
traceback.print_exc()

playwright/_impl/_json_pipe.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# Copyright (c) Microsoft Corporation.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import asyncio
16+
from typing import Dict, Optional, cast
17+
18+
from pyee import AsyncIOEventEmitter
19+
20+
from playwright._impl._api_types import Error
21+
from playwright._impl._connection import Channel
22+
from playwright._impl._helper import ParsedMessagePayload, parse_error
23+
from playwright._impl._transport import Transport
24+
25+
26+
class JsonPipeTransport(AsyncIOEventEmitter, Transport):
27+
def __init__(
28+
self,
29+
loop: asyncio.AbstractEventLoop,
30+
pipe_channel: Channel,
31+
) -> None:
32+
super().__init__(loop)
33+
Transport.__init__(self, loop)
34+
self._stop_requested = False
35+
self._pipe_channel = pipe_channel
36+
37+
def request_stop(self) -> None:
38+
self._stop_requested = True
39+
self._loop.create_task(self._pipe_channel.send("close", {}))
40+
41+
def dispose(self) -> None:
42+
self.on_error_future.cancel()
43+
self._stopped_future.cancel()
44+
45+
async def wait_until_stopped(self) -> None:
46+
await self._stopped_future
47+
48+
async def connect(self) -> None:
49+
self._stopped_future: asyncio.Future = asyncio.Future()
50+
51+
def handle_message(message: Dict) -> None:
52+
if not self._stop_requested:
53+
self.on_message(cast(ParsedMessagePayload, message))
54+
55+
def handle_closed(error: Optional[Dict]) -> None:
56+
self.emit("close")
57+
self.on_error_future.set_exception(
58+
parse_error(error["error"])
59+
if error
60+
else Error("Playwright connection closed")
61+
)
62+
self._stopped_future.set_result(None)
63+
64+
self._pipe_channel.on(
65+
"message",
66+
lambda params: handle_message(params["message"]),
67+
)
68+
self._pipe_channel.on(
69+
"closed",
70+
lambda params: handle_closed(params.get("error")),
71+
)
72+
73+
async def run(self) -> None:
74+
await self._stopped_future
75+
76+
def send(self, message: Dict) -> None:
77+
if self._stop_requested:
78+
raise Error("Playwright connection closed")
79+
self._loop.create_task(self._pipe_channel.send("send", {"message": message}))

playwright/_impl/_transport.py

Lines changed: 0 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,6 @@
2222
from pathlib import Path
2323
from typing import Callable, Dict, Optional, Union
2424

25-
import websockets
26-
import websockets.exceptions
27-
from pyee import AsyncIOEventEmitter
28-
from websockets.client import connect as websocket_connect
29-
30-
from playwright._impl._api_types import Error
3125
from playwright._impl._driver import get_driver_env
3226
from playwright._impl._helper import ParsedMessagePayload
3327

@@ -178,75 +172,3 @@ def send(self, message: Dict) -> None:
178172
self._output.write(
179173
len(data).to_bytes(4, byteorder="little", signed=False) + data
180174
)
181-
182-
183-
class WebSocketTransport(AsyncIOEventEmitter, Transport):
184-
def __init__(
185-
self,
186-
loop: asyncio.AbstractEventLoop,
187-
ws_endpoint: str,
188-
headers: Dict[str, str] = None,
189-
slow_mo: float = None,
190-
) -> None:
191-
super().__init__(loop)
192-
Transport.__init__(self, loop)
193-
194-
self._stopped = False
195-
self.ws_endpoint = ws_endpoint
196-
self.headers = headers
197-
self.slow_mo = slow_mo
198-
199-
def request_stop(self) -> None:
200-
self._stopped = True
201-
self.emit("close")
202-
self._loop.create_task(self._connection.close())
203-
204-
def dispose(self) -> None:
205-
self.on_error_future.cancel()
206-
207-
async def wait_until_stopped(self) -> None:
208-
await self._connection.wait_closed()
209-
210-
async def connect(self) -> None:
211-
try:
212-
self._connection = await websocket_connect(
213-
self.ws_endpoint,
214-
extra_headers=self.headers,
215-
max_size=256 * 1024 * 1024, # 256Mb
216-
)
217-
except Exception as exc:
218-
self.on_error_future.set_exception(Error(f"websocket.connect: {str(exc)}"))
219-
raise exc
220-
221-
async def run(self) -> None:
222-
while not self._stopped:
223-
try:
224-
message = await self._connection.recv()
225-
if self.slow_mo is not None:
226-
await asyncio.sleep(self.slow_mo / 1000)
227-
if self._stopped:
228-
self.on_error_future.set_exception(
229-
Error("Playwright connection closed")
230-
)
231-
break
232-
obj = self.deserialize_message(message)
233-
self.on_message(obj)
234-
except (
235-
websockets.exceptions.ConnectionClosed,
236-
websockets.exceptions.ConnectionClosedError,
237-
):
238-
if not self._stopped:
239-
self.emit("close")
240-
self.on_error_future.set_exception(
241-
Error("Playwright connection closed")
242-
)
243-
break
244-
except Exception as exc:
245-
self.on_error_future.set_exception(exc)
246-
break
247-
248-
def send(self, message: Dict) -> None:
249-
if self._stopped or (hasattr(self, "_connection") and self._connection.closed):
250-
raise Error("Playwright connection closed")
251-
data = self.serialize_message(message)
252-
self._loop.create_task(self._connection.send(data))

setup.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,6 @@ def _download_and_extract_local_driver(
211211
packages=["playwright"],
212212
include_package_data=True,
213213
install_requires=[
214-
"websockets==10.1",
215214
"greenlet==1.1.3",
216215
"pyee==8.1.0",
217216
"typing-extensions;python_version<='3.8'",

tests/async/test_browsertype_connect.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ async def test_connect_to_closed_server_without_hangs(
213213
remote_server.kill()
214214
with pytest.raises(Error) as exc:
215215
await browser_type.connect(remote_server.ws_endpoint)
216-
assert "websocket.connect: " in exc.value.message
216+
assert "WebSocket error: " in exc.value.message
217217

218218

219219
async def test_should_fulfill_with_global_fetch_result(

tests/sync/test_browsertype_connect.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ def test_connect_to_closed_server_without_hangs(
192192
remote_server.kill()
193193
with pytest.raises(Error) as exc:
194194
browser_type.connect(remote_server.ws_endpoint)
195-
assert "websocket.connect: " in exc.value.message
195+
assert "WebSocket error: " in exc.value.message
196196

197197

198198
def test_browser_type_connect_should_fulfill_with_global_fetch_result(

0 commit comments

Comments
 (0)