最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

kotlin - Some coroutines shenanigans in a custom flow operator with select clause - Stack Overflow

programmeradmin5浏览0评论

I have the following flow operator:

fun <T> Flow<T>.bufferedWithTimeout(
  maxBufferSize: Int,
  timeout: Duration
): Flow<List<T>> {
  require(maxBufferSize > 0) {
    "Max buffer size has to be greater than zero (got $maxBufferSize)"
  }
  require(timeout.isPositive()) {
    "Timeout has to be positive (got $timeout)"
  }

  return flow {
    coroutineScope {
      val buffer = ArrayList<T>(maxBufferSize)

      suspend fun flush() {
        if (buffer.isEmpty()) return
        emit(buffer.toList())
        println("Flushing ${buffer.size}")
        buffer.clear()
      }

      try {
        val overflowMarker = produce {
          // Suspend the producer in case it's too quick.
          buffer(RENDEZVOUS).collect {
            buffer.add(it)
            println("Received $it. Size ${buffer.size}")
            if (buffer.size == maxBufferSize) {
              println("Overflow sent")
              send(Unit)
            }
          }
        }

        while (isActive) {
          select {
            overflowMarker.onReceive {
              println("Overflow received")
              flush()
            }

            onTimeout(timeout) {
              println("Timeout")
              flush()
            }
          }
        }
      } catch (e: ClosedReceiveChannelException) {
        println("Cancellation")
        flush()
      }
    }
  }
}

It receives emissions from upstream, buffers them either up to a specified maxBufferSize or up to a specified timeout, and emits the whole buffer downstream as a batch.

I have a failing test:

@Test
fun `overflowing batch is emitted immediately, next batch is emitted after timeout`() = runTest {
  val upstream = flow {
    repeat(5) {
      emit(it)
    }
    emit(Unit)
    delay(5.seconds)
  }

  val batches = upstream
    .bufferedWithTimeout(
      maxBufferSize = 5,
      timeout = 5.seconds
    )
    .toList()

  Assertions.assertThat(batches.map(List<Any>::size))
    .isEqualTo(
      listOf(5, 1)
    )

  Assertions.assertThat(batches).isEqualTo(
    listOf(
      listOf(0, 1, 2, 3, 4),
      listOf(Unit)
    )
  )
}

It fails with the following logs:

Received 0. Size 1
Received 1. Size 2
Received 2. Size 3
Received 3. Size 4
Received 4. Size 5
Overflow sent
Received kotlin.Unit. Size 6
Overflow received
Flushing 6
Cancellation

Expected :[5, 1]
Actual   :[6]

I cannot for the life of me figure out exactly why this is happening. It's quite clear that the overflowMarker.onReceive clause isn't quick enough to be selected before the next upstream emission, but what can be done about it? How can I guarantee the expected outcome regardless of how quickly upstream emits?

发布评论

评论列表(0)

  1. 暂无评论