te')); return $arr; } /* 遍历用户所有主题 * @param $uid 用户ID * @param int $page 页数 * @param int $pagesize 每页记录条数 * @param bool $desc 排序方式 TRUE降序 FALSE升序 * @param string $key 返回的数组用那一列的值作为 key * @param array $col 查询哪些列 */ function thread_tid_find_by_uid($uid, $page = 1, $pagesize = 1000, $desc = TRUE, $key = 'tid', $col = array()) { if (empty($uid)) return array(); $orderby = TRUE == $desc ? -1 : 1; $arr = thread_tid__find($cond = array('uid' => $uid), array('tid' => $orderby), $page, $pagesize, $key, $col); return $arr; } // 遍历栏目下tid 支持数组 $fid = array(1,2,3) function thread_tid_find_by_fid($fid, $page = 1, $pagesize = 1000, $desc = TRUE) { if (empty($fid)) return array(); $orderby = TRUE == $desc ? -1 : 1; $arr = thread_tid__find($cond = array('fid' => $fid), array('tid' => $orderby), $page, $pagesize, 'tid', array('tid', 'verify_date')); return $arr; } function thread_tid_delete($tid) { if (empty($tid)) return FALSE; $r = thread_tid__delete(array('tid' => $tid)); return $r; } function thread_tid_count() { $n = thread_tid__count(); return $n; } // 统计用户主题数 大数量下严谨使用非主键统计 function thread_uid_count($uid) { $n = thread_tid__count(array('uid' => $uid)); return $n; } // 统计栏目主题数 大数量下严谨使用非主键统计 function thread_fid_count($fid) { $n = thread_tid__count(array('fid' => $fid)); return $n; } ?>rust - How do I encapsulate waiting for a `tokio::sync::watch::Receiver` to be `Some`? - Stack Overflow
最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

rust - How do I encapsulate waiting for a `tokio::sync::watch::Receiver` to be `Some`? - Stack Overflow

programmeradmin2浏览0评论

I'd like to write an async function recv_some which takes a watch::Receiver<Option<T>>, waits for the value to be Some, and returns something which Deref's to T. For reasons I sort of understand, when I do this the obvious way, the borrow checker complains. I can't seem to work around the issue either.

use std::ops::Deref;

use tokio::sync::watch;

pub struct WatchSomeRef<'a, T>(watch::Ref<'a, Option<T>>);

impl<T> Deref for WatchSomeRef<'_, T> {
    type Target = T;

    fn deref(&self) -> &Self::Target {
        self.0.as_ref().unwrap()
    }
}

pub async fn recv_some<T>(
    rx: &mut watch::Receiver<Option<T>>,
) -> Result<WatchSomeRef<T>, watch::error::RecvError> {
    loop {
        {
            let guard = rx.borrow_and_update();
            if guard.is_some() {
                return Ok(WatchSomeRef(guard));
            }
        }
        rx.changed().await?;
    }
}

When I do this, I get the error:

error[E0499]: cannot borrow `*rx` as mutable more than once at a time
  --> src/lib.rs:20:25
   |
16 |     rx: &mut watch::Receiver<Option<T>>,
   |         - let's call the lifetime of this reference `'1`
...
20 |             let guard = rx.borrow_and_update();
   |                         ^^ `*rx` was mutably borrowed here in the previous iteration of the loop
21 |             if guard.is_some() {
22 |                 return Ok(WatchSomeRef(guard));
   |                        ----------------------- returning this value requires that `*rx` is borrowed for `'1`

error[E0499]: cannot borrow `*rx` as mutable more than once at a time
  --> src/lib.rs:25:9
   |
16 |     rx: &mut watch::Receiver<Option<T>>,
   |         - let's call the lifetime of this reference `'1`
...
20 |             let guard = rx.borrow_and_update();
   |                         -- first mutable borrow occurs here
21 |             if guard.is_some() {
22 |                 return Ok(WatchSomeRef(guard));
   |                        ----------------------- returning this value requires that `*rx` is borrowed for `'1`
...
25 |         rx.changed().await?;
   |         ^^ second mutable borrow occurs here

ETA:

I think this happens because which lifetime we want on the guard depends on the underlying value. If it's Some, it needs to outlast the function call, but if it's None it needs to be dropped right away so we can wait for the next change.

I still have no idea how to work around it though (except using an unsafe mem::transmute to change the lifetime).

I'd like to write an async function recv_some which takes a watch::Receiver<Option<T>>, waits for the value to be Some, and returns something which Deref's to T. For reasons I sort of understand, when I do this the obvious way, the borrow checker complains. I can't seem to work around the issue either.

use std::ops::Deref;

use tokio::sync::watch;

pub struct WatchSomeRef<'a, T>(watch::Ref<'a, Option<T>>);

impl<T> Deref for WatchSomeRef<'_, T> {
    type Target = T;

    fn deref(&self) -> &Self::Target {
        self.0.as_ref().unwrap()
    }
}

pub async fn recv_some<T>(
    rx: &mut watch::Receiver<Option<T>>,
) -> Result<WatchSomeRef<T>, watch::error::RecvError> {
    loop {
        {
            let guard = rx.borrow_and_update();
            if guard.is_some() {
                return Ok(WatchSomeRef(guard));
            }
        }
        rx.changed().await?;
    }
}

When I do this, I get the error:

error[E0499]: cannot borrow `*rx` as mutable more than once at a time
  --> src/lib.rs:20:25
   |
16 |     rx: &mut watch::Receiver<Option<T>>,
   |         - let's call the lifetime of this reference `'1`
...
20 |             let guard = rx.borrow_and_update();
   |                         ^^ `*rx` was mutably borrowed here in the previous iteration of the loop
21 |             if guard.is_some() {
22 |                 return Ok(WatchSomeRef(guard));
   |                        ----------------------- returning this value requires that `*rx` is borrowed for `'1`

error[E0499]: cannot borrow `*rx` as mutable more than once at a time
  --> src/lib.rs:25:9
   |
16 |     rx: &mut watch::Receiver<Option<T>>,
   |         - let's call the lifetime of this reference `'1`
...
20 |             let guard = rx.borrow_and_update();
   |                         -- first mutable borrow occurs here
21 |             if guard.is_some() {
22 |                 return Ok(WatchSomeRef(guard));
   |                        ----------------------- returning this value requires that `*rx` is borrowed for `'1`
...
25 |         rx.changed().await?;
   |         ^^ second mutable borrow occurs here

ETA:

I think this happens because which lifetime we want on the guard depends on the underlying value. If it's Some, it needs to outlast the function call, but if it's None it needs to be dropped right away so we can wait for the next change.

I still have no idea how to work around it though (except using an unsafe mem::transmute to change the lifetime).

Share Improve this question edited Feb 17 at 20:28 dspyz asked Feb 17 at 20:14 dspyzdspyz 5,5142 gold badges29 silver badges69 bronze badges 1
  • 2 This is problem case #3 (a known limitation of the borrow checker) which polonius ought to solve. Though as opposed to other occurances you can't simply redo the work. You probably need something like polonius_the_crab – cafce25 Commented Feb 17 at 20:28
Add a comment  | 

1 Answer 1

Reset to default 6

It seems to be a false positive by the borrow checker, probably closely related to its known limitations discussed here.

If you use watch::Receiver::wait_for, it compiles fine:

use std::ops::Deref;

use tokio::sync::watch;

pub struct WatchSomeRef<'a, T>(watch::Ref<'a, Option<T>>);

impl<T> Deref for WatchSomeRef<'_, T> {
    type Target = T;

    fn deref(&self) -> &Self::Target {
        self.0.as_ref().unwrap()
    }
}

pub async fn recv_some<T>(
    rx: &mut watch::Receiver<Option<T>>,
) -> Result<WatchSomeRef<T>, watch::error::RecvError> {
    let value = rx.wait_for(|val| val.is_some()).await?;
    Ok(WatchSomeRef(value))
}

Or its more compact version:

pub async fn recv_some<T>(
    rx: &mut watch::Receiver<Option<T>>,
) -> Result<WatchSomeRef<T>, watch::error::RecvError> {
    rx.wait_for(Option::is_some).await.map(WatchSomeRef)
}
发布评论

评论列表(0)

  1. 暂无评论