最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

docker - Unix domain sockets in rust with interprocess intermittent 'failed to fill whole buffer' - Stack Overfl

programmeradmin3浏览0评论

I have a rust program that starts up a server process and listens on a specific unix domain socket for incoming connections. Incoming connections connect to this domain socket, send a single request and wait for single response and disconnects. I am using for managing the socket connections.

This works perfectly until it doesn't.

The server processes may be heavy - startup times may be up to 60 seconds and take up as much as 10-15GBs of memory but some may be very light, startup times be under 1s and take up as little as a few hundred MBs.

An external process (Deno in my case) will start up the rust server processes based on demand (message queue), keep them running and shut them down with SIGTERM when the relevant queues are empty and memory is needed for other processes.

At any given time I may have anywhere between 0 to 10 running processes, each having a domain socket with a unique name listening.

This works perfectly on my dev machine (Arch) and on the PROD server (Ubuntu 24.04.1 LTS host with a Debian based docker container denoland/deno:2.0.0) for a while but eventually the incoming connections will start failing with failed to fill whole buffer. After the initial error, the server process seems to be come unresponsive and connection attempts error with Connection refused (os error 111) which seems to suggest the socket has closed from the server side?

So far I've only seen it on the prod server and only after a few cycles of opening and closing the domain sockets successfully. I've not been able to pinpoint the exact pattern so I've been unable to replicate the problem on my machine to properly debug the problem.

Since this only works in prod, the debug process is a bit difficult, I am currently in the process of adding more debug logs in both the rust and deno process but for various reasons, the prod release process is slow and therefore progress is slow.

Any suggestions on possible causes, debug steps, etc would be greatly appreciated!

The only clue I've got so far is output from ss -a where the problematic socket in in state ESTAB while the working ones are in state LISTENING - in this example the problematic socket is for estonia while the other sockets are working fine.

Rust server code for reference:

use interprocess::local_socket::{prelude::*, GenericNamespaced, ListenerOptions, Name, Stream};

pub struct IpcHandler<'a> {
    socket_print_name: String,
    socket_name: Name<'a>,
}

impl<'a> IpcHandler<'a> {
    pub fn init(socket_name: Option<String>) -> Result<Self, IpcHandlerError> {
        let socket_name = socket_name.map_or("1".to_string(), |v| {
            v.chars()
                .map(|c| if c.is_alphanumeric() { c } else { '-' })
                .collect::<String>()
        });
        let socket_print_name = if GenericNamespaced::is_supported() {
            format!("ridi-router-{}.socket", socket_name)
        } else {
            format!("/tmp/ridi-router-{}.socket", socket_name)
        };

        let socket_name = socket_print_name
            .clone()
            .to_ns_name::<GenericNamespaced>()
            .map_err(|error| IpcHandlerError::NamespaceName { error })?;

        Ok(Self {
            socket_print_name,
            socket_name,
        })
    }

    pub fn listen<T>(&self, message_handler: T) -> Result<(), IpcHandlerError>
    where
        T: Fn(RequestMessage) -> ResponseMessage + Sync + Send + Copy + 'static,
    {
        let opts = ListenerOptions::new().name(self.socket_name.clone());

        let listener = match opts.create_sync() {
            Err(e) if e.kind() == io::ErrorKind::AddrInUse => {
                return Err(IpcHandlerError::SocketAddressInUse { error: e });
            }
            x => x.map_err(|error| IpcHandlerError::CreateListener { error })?,
        };

        println!(";RIDI_ROUTER SERVER READY;"); // this is in stdout so calling processes knows the server is ready to accept connections

        for conn in listener.incoming() {
            rayon::spawn(move || match conn {
                Err(e) => {
                    warn!("Incoming connection failed {}", e);
                }
                Ok(conn) => {
                    trace!("received connection");
                    let req = match IpcHandler::process_request(&conn) {
                        Err(err) => {
                            warn!("error from connection {:?}", err);
                            return;
                        }
                        Ok(req) => req,
                    };
                    let resp = message_handler(req);
                    if let Err(error) = IpcHandler::process_response(&conn, &resp) {
                        warn!("error from connection {:?}", error);
                    }
                }
            });
        }

        Ok(())
    }

    fn process_request(conn: &Stream) -> Result<RequestMessage, IpcHandlerError> {
        let start = SystemTime::now();
        let req_timestamp = start
            .duration_since(UNIX_EPOCH)
            .expect("Time went backwards")
            .as_millis();

        let mut conn = BufReader::new(conn);

        let mut mes_len_buf = [0u8; 8];
        conn.read_exact(&mut mes_len_buf)
            .map_err(|error| IpcHandlerError::ReadLine { error })?;

        let mut buffer = vec![0; u64::from_ne_bytes(mes_len_buf) as usize];
        conn.read_exact(&mut buffer[..])
            .map_err(|error| IpcHandlerError::ReadLine { error })?;

        let string_message =
            std::str::from_utf8(&buffer).map_err(|error| IpcHandlerError::Utf8Message { error })?;

        let request_message: RequestMessage = serde_json::from_str(&string_message)
            .map_err(|error| IpcHandlerError::DeserializeMessage { error })?;

        Ok(request_message)
    }
    fn process_response(
        conn: &Stream,
        response_message: &ResponseMessage,
    ) -> Result<(), IpcHandlerError> {
        let mut conn = BufReader::new(conn);

        let string_message = serde_json::to_string(response_message)
            .map_err(|error| IpcHandlerError::SerializeMessage { error })?;

        let buffer = string_message.as_bytes();

        let mes_len_bytes: u64 = buffer.len() as u64;
        conn.get_mut()
            .write_all(&mes_len_bytes.to_ne_bytes()[..])
            .map_err(|error| IpcHandlerError::WriteAll { error })?;

        conn.get_mut()
            .write_all(buffer)
            .map_err(|error| IpcHandlerError::WriteLine { error })?;

        Ok(())
    }


client:

    pub fn connect(
        &self,
        routing_mode: &RoutingMode,
        rules: RouterRules,
        route_req_id: Option<String>,
    ) -> Result<ResponseMessage, IpcHandlerError> {
        let conn = Stream::connect(self.socket_name.clone())
            .map_err(|error| IpcHandlerError::Connect { error })?;

        let mut conn = BufReader::new(conn);

        let req_msg = RequestMessage {
            id: route_req_id.map_or(String::from("default-request-id"), |v| v.to_string()),
            routing_mode: routing_mode.clone(),
            rules,
        };
        let string_req = serde_json::to_string(&req_msg)
            .map_err(|error| IpcHandlerError::SerializeMessage { error })?;

        let req_buf = string_req.as_bytes();

        let mes_len_bytes: u64 = req_buf.len() as u64;
        conn.get_mut()
            .write_all(&mes_len_bytes.to_ne_bytes()[..])
            .map_err(|error| IpcHandlerError::WriteAll { error })?;

        conn.get_mut()
            .write_all(req_buf)
            .map_err(|error| IpcHandlerError::WriteAll { error })?;

        let mut mes_len_buf = [0u8; 8];
        conn.read_exact(&mut mes_len_buf)
            .map_err(|error| IpcHandlerError::ReadLine { error })?;

        let mut resp_buf = vec![0; u64::from_ne_bytes(mes_len_buf) as usize];
        conn.read_exact(&mut resp_buf[..])
            .map_err(|error| IpcHandlerError::ReadLine { error })?;

        let string_resp = std::str::from_utf8(&resp_buf)
            .map_err(|error| IpcHandlerError::Utf8Message { error })?;

        let resp_msg: ResponseMessage = serde_json::from_str(string_resp)
            .map_err(|error| IpcHandlerError::DeserializeMessage { error })?;

        Ok(resp_msg)
    }

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论