Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 66 additions & 5 deletions cassandra/io/asyncioreactor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import errno
import threading

from cassandra.connection import Connection, ConnectionShutdown
Expand All @@ -12,6 +13,23 @@

log = logging.getLogger(__name__)

# Errno values that indicate the remote peer has disconnected.
_PEER_DISCONNECT_ERRNOS = frozenset((
errno.ENOTCONN, errno.ESHUTDOWN,
errno.ECONNRESET, errno.ECONNABORTED,
))

# Windows winerror codes for the same conditions:
# 10053 = WSAECONNABORTED, 10054 = WSAECONNRESET
_PEER_DISCONNECT_WINERRORS = frozenset((10053, 10054))


def _is_peer_disconnect(err):
"""Return True if *err* indicates the remote peer closed the connection."""
return (isinstance(err, ConnectionError)
or getattr(err, 'winerror', None) in _PEER_DISCONNECT_WINERRORS
or getattr(err, 'errno', None) in _PEER_DISCONNECT_ERRNOS)


# This module uses ``yield from`` and ``@asyncio.coroutine`` over ``await`` and
# ``async def`` for pre-Python-3.5 compatibility, so keep in mind that the
Expand Down Expand Up @@ -153,11 +171,36 @@ async def _close(self):
if self._read_watcher:
self._read_watcher.cancel()
if self._socket:
self._loop.remove_writer(self._socket.fileno())
self._loop.remove_reader(self._socket.fileno())
self._socket.close()

log.debug("Closed socket to %s" % (self.endpoint,))
# NotImplementedError: remove_reader/remove_writer are not
# supported on Windows ProactorEventLoop (default since
# Python 3.10). ProactorEventLoop uses completion-based
# IOCP, which has no concept of "watching a fd for
# readiness" to remove.
fd = self._socket.fileno()
if fd >= 0:
try:
self._loop.remove_writer(fd)
except NotImplementedError:
pass
except Exception:
log.error("Unexpected error removing writer for %s",
self.endpoint, exc_info=True)
try:
self._loop.remove_reader(fd)
except NotImplementedError:
pass
except Exception:
log.error("Unexpected error removing reader for %s",
self.endpoint, exc_info=True)

try:
self._socket.close()
except OSError:
pass
except Exception:
log.debug("Unexpected error closing socket to %s",
self.endpoint, exc_info=True)
log.debug("Closed socket to %s" % (self.endpoint,))

if not self.is_defunct:
msg = "Connection to %s was closed" % self.endpoint
Expand Down Expand Up @@ -202,6 +245,14 @@ async def handle_write(self):
if next_msg:
await self._loop.sock_sendall(self._socket, next_msg)
except socket.error as err:
if _is_peer_disconnect(err):
log.debug("Connection %s closed by peer during write: %s",
self, err)
self.close()
return
# Connection is already shutting down, just exit
if self.is_closed or self.is_defunct:
return
log.debug("Exception in send for %s: %s", self, err)
self.defunct(err)
return
Expand All @@ -223,6 +274,14 @@ async def handle_read(self):
await asyncio.sleep(0)
continue
except socket.error as err:
if _is_peer_disconnect(err):
log.debug("Connection %s closed by peer during read: %s",
self, err)
self.close()
return
# Connection is already shutting down, just exit
if self.is_closed or self.is_defunct:
return
log.debug("Exception during socket recv for %s: %s",
self, err)
self.defunct(err)
Expand All @@ -234,5 +293,7 @@ async def handle_read(self):
self.process_io_buffer()
else:
log.debug("Connection %s closed by server", self)
self.last_error = ConnectionShutdown(
"Connection to %s was closed by server" % self.endpoint)
self.close()
return
Loading
Loading