最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Asyncio StreamWriter wait_closed blocks my code and hangs - Stack Overflow

programmeradmin5浏览0评论

I wrote a very crude and simple version of my server program to demonstrate that when I try to disconnect a client, it just hangs forever and I have no idea why. I first suspected that the issue was my "read" method, but even after adding a timeout implementation it keeps happening.

import asyncio
from asyncio import StreamReader, StreamWriter, wait_for, start_server
import logging
import socket

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s - %(levelname)s - %(message)s",
)


class Client:
    def __init__(self, reader: StreamReader, writer: StreamWriter) -> None:
        self.reader = reader
        self.writer = writer
        self.address: str = ":".join(map(str, writer.get_extra_info("peername")))
        self._configure_tcp()
        logging.info(f"New connection: {self.address}")

    async def write(self, data: bytes) -> None:
        logging.debug(f"Sending to {self.address}: {data}")
        self.writer.write(data)
        await self.writer.drain()

    async def read(self) -> bytes:
        try:
            if data := await wait_for(self.reader.read(1024), timeout=1):
                logging.debug(f"Received from {self.address}: {data}")
                self.last_msg = data
                return data
            else:
                return b""
        except TimeoutError:
            return b""

    async def close(self) -> None:
        if not self.writer.is_closing():
            self.writer.close()
            await self.writer.wait_closed()
            logging.info(f"Connection closed: {self.address}")

    def _configure_tcp(self) -> None:
        _socket = self.writer.get_extra_info("socket")
        _socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
        _socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10)
        _socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10)
        _socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)


class ServerProtocol:
    def __init__(self, name: str, port: int):
        self.name: str = name
        self.port: int = port
        self.clients: list[Client] = []
        self.server = None

    async def start_server(self) -> None:
        self.server = await start_server(self.connection, "0.0.0.0", self.port)
        logging.info(f"Serving at 0.0.0.0:{self.port}")

    async def stop_server(self) -> None:
        if self.server:
            logging.info("Stoping server...")
            self.server.close()
            await self.server.wait_closed()
            logging.info("Server finalized")

        for client in self.clients:
            await self.disconnect(client)
        logging.info("All connections closed")

    async def connection(self, reader: StreamReader, writer: StreamWriter) -> None:
        client = Client(reader, writer)
        self.clients.append(client)
        try:
            while True:
                data = await client.read()
                if data:
                    await self._process(client, data)
        except TimeoutError:
            logging.info(f"Timeout: {client.address}")
        except Exception as e:
            logging.error(f"Exception with {client.address}: {e}")
        finally:
            await self.disconnect(client)

    async def disconnect(self, client: Client) -> None:
        logging.info(f"Disconnecting {client.address}")
        if client in self.clients:
            self.clients.remove(client)
            await client.close()
            logging.info(f"{client.address} disconnected.")

    async def _process(self, client: Client, data: bytes) -> None:
        await client.write(data)



async def main():
    server = ServerProtocol(name="TestServer", port=8888)

    try:
        await server.start_server()
        await asyncio.sleep(30)
        await server.disconnect(server.clients[0])
        await server.stop_server()
    except KeyboardInterrupt:
        await server.stop_server()


if __name__ == "__main__":
    asyncio.run(main())

All this example does is open a port at 8888, and wait for 30 seconds. After 30 seconds, it grabs the first client who connected there and try to disconnect it before shutting down. The issue also happens if I connect to the server and disconnect (close netcat). Any help is appreciated.

I wrote a very crude and simple version of my server program to demonstrate that when I try to disconnect a client, it just hangs forever and I have no idea why. I first suspected that the issue was my "read" method, but even after adding a timeout implementation it keeps happening.

import asyncio
from asyncio import StreamReader, StreamWriter, wait_for, start_server
import logging
import socket

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s - %(levelname)s - %(message)s",
)


class Client:
    def __init__(self, reader: StreamReader, writer: StreamWriter) -> None:
        self.reader = reader
        self.writer = writer
        self.address: str = ":".join(map(str, writer.get_extra_info("peername")))
        self._configure_tcp()
        logging.info(f"New connection: {self.address}")

    async def write(self, data: bytes) -> None:
        logging.debug(f"Sending to {self.address}: {data}")
        self.writer.write(data)
        await self.writer.drain()

    async def read(self) -> bytes:
        try:
            if data := await wait_for(self.reader.read(1024), timeout=1):
                logging.debug(f"Received from {self.address}: {data}")
                self.last_msg = data
                return data
            else:
                return b""
        except TimeoutError:
            return b""

    async def close(self) -> None:
        if not self.writer.is_closing():
            self.writer.close()
            await self.writer.wait_closed()
            logging.info(f"Connection closed: {self.address}")

    def _configure_tcp(self) -> None:
        _socket = self.writer.get_extra_info("socket")
        _socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
        _socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10)
        _socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10)
        _socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)


class ServerProtocol:
    def __init__(self, name: str, port: int):
        self.name: str = name
        self.port: int = port
        self.clients: list[Client] = []
        self.server = None

    async def start_server(self) -> None:
        self.server = await start_server(self.connection, "0.0.0.0", self.port)
        logging.info(f"Serving at 0.0.0.0:{self.port}")

    async def stop_server(self) -> None:
        if self.server:
            logging.info("Stoping server...")
            self.server.close()
            await self.server.wait_closed()
            logging.info("Server finalized")

        for client in self.clients:
            await self.disconnect(client)
        logging.info("All connections closed")

    async def connection(self, reader: StreamReader, writer: StreamWriter) -> None:
        client = Client(reader, writer)
        self.clients.append(client)
        try:
            while True:
                data = await client.read()
                if data:
                    await self._process(client, data)
        except TimeoutError:
            logging.info(f"Timeout: {client.address}")
        except Exception as e:
            logging.error(f"Exception with {client.address}: {e}")
        finally:
            await self.disconnect(client)

    async def disconnect(self, client: Client) -> None:
        logging.info(f"Disconnecting {client.address}")
        if client in self.clients:
            self.clients.remove(client)
            await client.close()
            logging.info(f"{client.address} disconnected.")

    async def _process(self, client: Client, data: bytes) -> None:
        await client.write(data)



async def main():
    server = ServerProtocol(name="TestServer", port=8888)

    try:
        await server.start_server()
        await asyncio.sleep(30)
        await server.disconnect(server.clients[0])
        await server.stop_server()
    except KeyboardInterrupt:
        await server.stop_server()


if __name__ == "__main__":
    asyncio.run(main())

All this example does is open a port at 8888, and wait for 30 seconds. After 30 seconds, it grabs the first client who connected there and try to disconnect it before shutting down. The issue also happens if I connect to the server and disconnect (close netcat). Any help is appreciated.

Share Improve this question asked Mar 17 at 18:10 IamRichterIamRichter 552 silver badges10 bronze badges 1
  • I did not study the code much, but I guess maybe you might have overlooked some details of a normal TCP connection teardown. A TCP connection must be closed from both sides. An application level communication protocol is normally responsible for that. There is also an option to force a RESET to break the connection. – VPfB Commented Mar 18 at 10:18
Add a comment  | 

1 Answer 1

Reset to default 2

My issue was very simple. I keep a infinite loop running asking to read the client. All I had to do was replace the while True: for while not writer.is_closing(): and the problem would be fix.

发布评论

评论列表(0)

  1. 暂无评论