I wrote a very crude and simple version of my server program to demonstrate that when I try to disconnect a client, it just hangs forever and I have no idea why. I first suspected that the issue was my "read" method, but even after adding a timeout implementation it keeps happening.
import asyncio
from asyncio import StreamReader, StreamWriter, wait_for, start_server
import logging
import socket
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(message)s",
)
class Client:
def __init__(self, reader: StreamReader, writer: StreamWriter) -> None:
self.reader = reader
self.writer = writer
self.address: str = ":".join(map(str, writer.get_extra_info("peername")))
self._configure_tcp()
logging.info(f"New connection: {self.address}")
async def write(self, data: bytes) -> None:
logging.debug(f"Sending to {self.address}: {data}")
self.writer.write(data)
await self.writer.drain()
async def read(self) -> bytes:
try:
if data := await wait_for(self.reader.read(1024), timeout=1):
logging.debug(f"Received from {self.address}: {data}")
self.last_msg = data
return data
else:
return b""
except TimeoutError:
return b""
async def close(self) -> None:
if not self.writer.is_closing():
self.writer.close()
await self.writer.wait_closed()
logging.info(f"Connection closed: {self.address}")
def _configure_tcp(self) -> None:
_socket = self.writer.get_extra_info("socket")
_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10)
_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10)
_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
class ServerProtocol:
def __init__(self, name: str, port: int):
self.name: str = name
self.port: int = port
self.clients: list[Client] = []
self.server = None
async def start_server(self) -> None:
self.server = await start_server(self.connection, "0.0.0.0", self.port)
logging.info(f"Serving at 0.0.0.0:{self.port}")
async def stop_server(self) -> None:
if self.server:
logging.info("Stoping server...")
self.server.close()
await self.server.wait_closed()
logging.info("Server finalized")
for client in self.clients:
await self.disconnect(client)
logging.info("All connections closed")
async def connection(self, reader: StreamReader, writer: StreamWriter) -> None:
client = Client(reader, writer)
self.clients.append(client)
try:
while True:
data = await client.read()
if data:
await self._process(client, data)
except TimeoutError:
logging.info(f"Timeout: {client.address}")
except Exception as e:
logging.error(f"Exception with {client.address}: {e}")
finally:
await self.disconnect(client)
async def disconnect(self, client: Client) -> None:
logging.info(f"Disconnecting {client.address}")
if client in self.clients:
self.clients.remove(client)
await client.close()
logging.info(f"{client.address} disconnected.")
async def _process(self, client: Client, data: bytes) -> None:
await client.write(data)
async def main():
server = ServerProtocol(name="TestServer", port=8888)
try:
await server.start_server()
await asyncio.sleep(30)
await server.disconnect(server.clients[0])
await server.stop_server()
except KeyboardInterrupt:
await server.stop_server()
if __name__ == "__main__":
asyncio.run(main())
All this example does is open a port at 8888, and wait for 30 seconds. After 30 seconds, it grabs the first client who connected there and try to disconnect it before shutting down. The issue also happens if I connect to the server and disconnect (close netcat). Any help is appreciated.
I wrote a very crude and simple version of my server program to demonstrate that when I try to disconnect a client, it just hangs forever and I have no idea why. I first suspected that the issue was my "read" method, but even after adding a timeout implementation it keeps happening.
import asyncio
from asyncio import StreamReader, StreamWriter, wait_for, start_server
import logging
import socket
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(message)s",
)
class Client:
def __init__(self, reader: StreamReader, writer: StreamWriter) -> None:
self.reader = reader
self.writer = writer
self.address: str = ":".join(map(str, writer.get_extra_info("peername")))
self._configure_tcp()
logging.info(f"New connection: {self.address}")
async def write(self, data: bytes) -> None:
logging.debug(f"Sending to {self.address}: {data}")
self.writer.write(data)
await self.writer.drain()
async def read(self) -> bytes:
try:
if data := await wait_for(self.reader.read(1024), timeout=1):
logging.debug(f"Received from {self.address}: {data}")
self.last_msg = data
return data
else:
return b""
except TimeoutError:
return b""
async def close(self) -> None:
if not self.writer.is_closing():
self.writer.close()
await self.writer.wait_closed()
logging.info(f"Connection closed: {self.address}")
def _configure_tcp(self) -> None:
_socket = self.writer.get_extra_info("socket")
_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10)
_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10)
_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
class ServerProtocol:
def __init__(self, name: str, port: int):
self.name: str = name
self.port: int = port
self.clients: list[Client] = []
self.server = None
async def start_server(self) -> None:
self.server = await start_server(self.connection, "0.0.0.0", self.port)
logging.info(f"Serving at 0.0.0.0:{self.port}")
async def stop_server(self) -> None:
if self.server:
logging.info("Stoping server...")
self.server.close()
await self.server.wait_closed()
logging.info("Server finalized")
for client in self.clients:
await self.disconnect(client)
logging.info("All connections closed")
async def connection(self, reader: StreamReader, writer: StreamWriter) -> None:
client = Client(reader, writer)
self.clients.append(client)
try:
while True:
data = await client.read()
if data:
await self._process(client, data)
except TimeoutError:
logging.info(f"Timeout: {client.address}")
except Exception as e:
logging.error(f"Exception with {client.address}: {e}")
finally:
await self.disconnect(client)
async def disconnect(self, client: Client) -> None:
logging.info(f"Disconnecting {client.address}")
if client in self.clients:
self.clients.remove(client)
await client.close()
logging.info(f"{client.address} disconnected.")
async def _process(self, client: Client, data: bytes) -> None:
await client.write(data)
async def main():
server = ServerProtocol(name="TestServer", port=8888)
try:
await server.start_server()
await asyncio.sleep(30)
await server.disconnect(server.clients[0])
await server.stop_server()
except KeyboardInterrupt:
await server.stop_server()
if __name__ == "__main__":
asyncio.run(main())
All this example does is open a port at 8888, and wait for 30 seconds. After 30 seconds, it grabs the first client who connected there and try to disconnect it before shutting down. The issue also happens if I connect to the server and disconnect (close netcat). Any help is appreciated.
Share Improve this question asked Mar 17 at 18:10 IamRichterIamRichter 552 silver badges10 bronze badges 1- I did not study the code much, but I guess maybe you might have overlooked some details of a normal TCP connection teardown. A TCP connection must be closed from both sides. An application level communication protocol is normally responsible for that. There is also an option to force a RESET to break the connection. – VPfB Commented Mar 18 at 10:18
1 Answer
Reset to default 2My issue was very simple. I keep a infinite loop running asking to read the client. All I had to do was replace the while True:
for while not writer.is_closing():
and the problem would be fix.