Skip to content

Commit 304aff1

Browse files
fix(connection): unpack_from error caused by network issues (#185)
* fix(connection): Raise InterfaceError for network issue related to closed server side socket (#164) * add unit test and integration test * rectify unit test * rectify integration test * raise InterfaceError with Broken Pipe for timeout and blocking modes * fix(connection): add empty buffer check upon connect and unit tests * fix(connection): unpack_from caused by network issues * rectify unit tests when network disabled --------- Co-authored-by: Soksamnang Lim <soklim@amazon.com>
1 parent f619f30 commit 304aff1

File tree

3 files changed

+208
-12
lines changed

3 files changed

+208
-12
lines changed

redshift_connector/core.py

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -739,7 +739,21 @@ def get_calling_module() -> str:
739739
# Each time will read 5 bytes, the first byte, the code, inform the type of message
740740
# following 4 bytes inform the message's length
741741
# then can use this length to minus 4 to get the real data.
742-
code, data_len = ci_unpack(self._read(5))
742+
buffer = self._read(5)
743+
744+
if len(buffer) == 0:
745+
if self._usock.timeout is not None:
746+
raise InterfaceError(
747+
"BrokenPipe: server socket closed. We noticed a timeout is set for this connection. Consider "
748+
"raising the timeout or defaulting timeout to none."
749+
)
750+
else:
751+
raise InterfaceError(
752+
"BrokenPipe: server socket closed. Please check that client side networking configurations such "
753+
"as Proxies, firewalls, VPN, etc. are not affecting your network connection."
754+
)
755+
756+
code, data_len = ci_unpack(buffer)
743757
self.message_types[code](self._read(data_len - 4), None)
744758
if self.error is not None:
745759
raise self.error
@@ -2007,7 +2021,22 @@ def handle_messages(self: "Connection", cursor: Cursor) -> None:
20072021
code = self.error = None
20082022

20092023
while code != READY_FOR_QUERY:
2010-
code, data_len = ci_unpack(self._read(5))
2024+
buffer = self._read(5)
2025+
2026+
if len(buffer) == 0:
2027+
if self._usock.timeout is not None:
2028+
raise InterfaceError(
2029+
"BrokenPipe: server socket closed. We noticed a timeout is set for this connection. Consider "
2030+
"raising the timeout or defaulting timeout to none."
2031+
)
2032+
else:
2033+
raise InterfaceError(
2034+
"BrokenPipe: server socket closed. Please check that client side networking configurations such "
2035+
"as Proxies, firewalls, VPN, etc. are not affecting your network connection."
2036+
)
2037+
2038+
code, data_len = ci_unpack(buffer)
2039+
20112040
self.message_types[code](self._read(data_len - 4), cursor)
20122041

20132042
if self.error is not None:
@@ -2028,7 +2057,20 @@ def handle_messages_merge_socket_read(self: "Connection", cursor: Cursor):
20282057
"""
20292058
code = self.error = None
20302059
# read 5 bytes of message firstly
2031-
code, data_len = ci_unpack(self._read(5))
2060+
buffer = self._read(5)
2061+
if len(buffer) == 0:
2062+
if self._usock.timeout is not None:
2063+
raise InterfaceError(
2064+
"BrokenPipe: server socket closed. We noticed a timeout is set for this connection. Consider "
2065+
"raising the timeout or defaulting timeout to none."
2066+
)
2067+
else:
2068+
raise InterfaceError(
2069+
"BrokenPipe: server socket closed. Please check that client side networking configurations such "
2070+
"as Proxies, firewalls, VPN, etc. are not affecting your network connection."
2071+
)
2072+
2073+
code, data_len = ci_unpack(buffer)
20322074

20332075
while True:
20342076
if code == READY_FOR_QUERY:

test/integration/test_connection.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@
22
import logging
33
import os
44
import random
5-
import socket
65
import string
7-
import struct
86
import sys
97

108
import pytest # type: ignore
@@ -178,10 +176,31 @@ def test_broken_pipe(con, db_kwargs):
178176
pid1 = cur1.fetchone()[0]
179177

180178
cur2.execute("select pg_terminate_backend(%s)", (pid1,))
181-
try:
179+
with pytest.raises(
180+
redshift_connector.InterfaceError,
181+
match="BrokenPipe: server socket closed. Please check that client side networking configurations such "
182+
"as Proxies, firewalls, VPN, etc. are not affecting your network connection.",
183+
):
184+
cur1.execute("select 1")
185+
186+
187+
# case 2: same connector configuration, but should throw an error since the timeout is set,
188+
def test_broken_pipe_timeout(con, db_kwargs):
189+
db_kwargs["timeout"] = 60
190+
with redshift_connector.connect(**db_kwargs) as db1:
191+
with db1.cursor() as cur1, con.cursor() as cur2:
192+
print(db1._usock.timeout)
193+
cur1.execute("select pg_backend_pid()")
194+
pid1 = cur1.fetchone()[0]
195+
196+
cur2.execute("select pg_terminate_backend(%s)", (pid1,))
197+
with pytest.raises(
198+
redshift_connector.InterfaceError,
199+
match="BrokenPipe: server socket closed. We noticed a timeout is "
200+
"set for this connection. Consider raising the timeout or "
201+
"defaulting timeout to none.",
202+
):
182203
cur1.execute("select 1")
183-
except Exception as e:
184-
assert isinstance(e, (socket.error, struct.error))
185204

186205

187206
def test_application_name_integer(db_kwargs):

test/unit/test_connection.py

Lines changed: 139 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1+
import socket
12
import typing
23
from collections import deque
34
from decimal import Decimal
4-
from unittest.mock import patch
5-
import socket
65
from unittest import mock
6+
from unittest.mock import patch
77

88
import pytest # type: ignore
99

@@ -13,8 +13,8 @@
1313
Error,
1414
IntegrityError,
1515
InterfaceError,
16+
OperationalError,
1617
ProgrammingError,
17-
OperationalError
1818
)
1919
from redshift_connector.config import (
2020
ClientProtocolVersion,
@@ -28,7 +28,9 @@
2828
from redshift_connector.utils.type_utils import py_types as PY_TYPES
2929
from redshift_connector.utils.type_utils import redshift_types as REDSHIFT_TYPES
3030

31-
test_error_responses_data: typing.List[typing.Tuple[bytes, typing.Dict, typing.Type[Error]]] = [
31+
test_error_responses_data: typing.List[
32+
typing.Tuple[bytes, typing.Dict, typing.Type[Error]]
33+
] = [
3234
(
3335
(
3436
b"SERROR\x00"
@@ -332,8 +334,141 @@ def test_client_os_version_is_not_present():
332334
with patch("platform.platform", side_effect=Exception("not for you")):
333335
assert mock_connection.client_os_version == "unknown"
334336

337+
335338
def test_socket_timeout_error():
336339
with mock.patch('socket.socket.connect') as mock_socket:
337340
mock_socket.side_effect = (socket.timeout)
338341
with pytest.raises(OperationalError):
339342
Connection(user='mock_user', password='mock_password', host='localhost', port=8080, database='mocked')
343+
344+
345+
def mock_read(*args, **kwargs):
346+
return b""
347+
348+
349+
def test_handle_messages_broken_pipe_blocking():
350+
# mock the connection and mock the read attribute
351+
mock_connection: Connection = Connection.__new__(Connection)
352+
mock_connection._read = mock_read
353+
354+
# we only need to mock the raw socket
355+
mock_usock = mock.Mock()
356+
mock_usock.timeout = None
357+
mock_connection._usock = mock_usock
358+
359+
mock_cursor: Cursor = Cursor.__new__(Cursor)
360+
mock_cursor.ps = None
361+
362+
with pytest.raises(
363+
InterfaceError,
364+
match="BrokenPipe: server socket closed. Please check that client side networking configurations such "
365+
"as Proxies, firewalls, VPN, etc. are not affecting your network connection.",
366+
):
367+
mock_connection.handle_messages(mock_cursor)
368+
369+
370+
def test_handle_messages_broken_pipe_timeout():
371+
# mock the connection and mock the read attribute
372+
mock_connection: Connection = Connection.__new__(Connection)
373+
mock_connection._read = mock_read
374+
375+
# we only need to mock the raw socket
376+
mock_usock = mock.Mock()
377+
mock_usock.timeout = 47
378+
mock_connection._usock = mock_usock
379+
380+
mock_cursor: Cursor = Cursor.__new__(Cursor)
381+
mock_cursor.ps = None
382+
383+
with pytest.raises(
384+
InterfaceError,
385+
match="BrokenPipe: server socket closed. We noticed a timeout is set for this connection. Consider "
386+
"raising the timeout or defaulting timeout to none.",
387+
):
388+
mock_connection.handle_messages(mock_cursor)
389+
390+
391+
def test_handle_messages_merge_socket_read_broken_pipe_blocking():
392+
# mock the connection and mock the read attribute
393+
mock_connection: Connection = Connection.__new__(Connection)
394+
mock_connection._read = mock_read
395+
396+
# we only need to mock the raw socket
397+
mock_usock = mock.Mock()
398+
mock_usock.timeout = None
399+
mock_connection._usock = mock_usock
400+
401+
mock_cursor: Cursor = Cursor.__new__(Cursor)
402+
mock_cursor.ps = None
403+
404+
with pytest.raises(
405+
InterfaceError,
406+
match="BrokenPipe: server socket closed. Please check that client side networking configurations such "
407+
"as Proxies, firewalls, VPN, etc. are not affecting your network connection.",
408+
):
409+
mock_connection.handle_messages_merge_socket_read(mock_cursor)
410+
411+
412+
def test_handle_messages_merge_socket_read_broken_pipe_timeout():
413+
# mock the connection and mock the read attribute
414+
mock_connection: Connection = Connection.__new__(Connection)
415+
mock_connection._read = mock_read
416+
417+
# we only need to mock the raw socket
418+
mock_usock = mock.Mock()
419+
mock_usock.timeout = 47
420+
mock_connection._usock = mock_usock
421+
422+
mock_cursor: Cursor = Cursor.__new__(Cursor)
423+
mock_cursor.ps = None
424+
425+
with pytest.raises(
426+
InterfaceError,
427+
match="BrokenPipe: server socket closed. We noticed a timeout is set for this connection. Consider "
428+
"raising the timeout or defaulting timeout to none.",
429+
):
430+
mock_connection.handle_messages_merge_socket_read(mock_cursor)
431+
432+
433+
def test_broken_pipe_on_connect(db_kwargs):
434+
db_kwargs["ssl"] = False
435+
436+
with mock.patch("socket.getaddrinfo") as mock_getaddrinfo:
437+
addr_tuple = [(0, 1, 2, "", ('3.226.18.73', 5439)), (2, 1, 6, '', ('3.226.18.73', 5439))]
438+
mock_getaddrinfo.return_value = addr_tuple
439+
with mock.patch('socket.socket.connect') as mock_usock:
440+
mock_usock.side_effect = lambda *args, **kwargs: None
441+
with mock.patch("socket.socket.makefile") as mock_sock:
442+
mock_file = mock_sock.return_value
443+
mock_file._read.return_value = b""
444+
with pytest.raises(
445+
InterfaceError,
446+
match="BrokenPipe: server socket closed. Please check that client side networking configurations such "
447+
"as Proxies, firewalls, VPN, etc. are not affecting your network connection.",
448+
):
449+
db_kwargs.pop("region")
450+
db_kwargs.pop("cluster_identifier")
451+
Connection(**db_kwargs)
452+
453+
def test_broken_pipe_timeout_on_connect(db_kwargs):
454+
db_kwargs["ssl"] = False
455+
db_kwargs["timeout"] = 60
456+
457+
458+
with mock.patch("socket.getaddrinfo") as mock_getaddrinfo:
459+
addr_tuple =[(0, 1, 2, "", ('3.226.18.73', 5439)), (2, 1, 6, '', ('3.226.18.73', 5439))]
460+
mock_getaddrinfo.return_value= addr_tuple
461+
with mock.patch('socket.socket.connect') as mock_usock:
462+
mock_usock.side_effect = lambda *args, **kwargs: None
463+
464+
with mock.patch("socket.socket.makefile") as mock_sock:
465+
mock_file = mock_sock.return_value
466+
mock_file._read.return_value = b""
467+
with pytest.raises(
468+
InterfaceError,
469+
match="BrokenPipe: server socket closed. We noticed a timeout is set for this connection. Consider "
470+
"raising the timeout or defaulting timeout to none.",
471+
):
472+
db_kwargs.pop("region")
473+
db_kwargs.pop("cluster_identifier")
474+
Connection(**db_kwargs)

0 commit comments

Comments
 (0)