I have the following flow operator:
fun <T> Flow<T>.bufferedWithTimeout(
maxBufferSize: Int,
timeout: Duration
): Flow<List<T>> {
require(maxBufferSize > 0) {
"Max buffer size has to be greater than zero (got $maxBufferSize)"
}
require(timeout.isPositive()) {
"Timeout has to be positive (got $timeout)"
}
return flow {
coroutineScope {
val buffer = ArrayList<T>(maxBufferSize)
suspend fun flush() {
if (buffer.isEmpty()) return
emit(buffer.toList())
println("Flushing ${buffer.size}")
buffer.clear()
}
try {
val overflowMarker = produce {
// Suspend the producer in case it's too quick.
buffer(RENDEZVOUS).collect {
buffer.add(it)
println("Received $it. Size ${buffer.size}")
if (buffer.size == maxBufferSize) {
println("Overflow sent")
send(Unit)
}
}
}
while (isActive) {
select {
overflowMarker.onReceive {
println("Overflow received")
flush()
}
onTimeout(timeout) {
println("Timeout")
flush()
}
}
}
} catch (e: ClosedReceiveChannelException) {
println("Cancellation")
flush()
}
}
}
}
It receives emissions from upstream, buffers them either up to a specified maxBufferSize
or up to a specified timeout
, and emits the whole buffer downstream as a batch.
I have a failing test:
@Test
fun `overflowing batch is emitted immediately, next batch is emitted after timeout`() = runTest {
val upstream = flow {
repeat(5) {
emit(it)
}
emit(Unit)
delay(5.seconds)
}
val batches = upstream
.bufferedWithTimeout(
maxBufferSize = 5,
timeout = 5.seconds
)
.toList()
Assertions.assertThat(batches.map(List<Any>::size))
.isEqualTo(
listOf(5, 1)
)
Assertions.assertThat(batches).isEqualTo(
listOf(
listOf(0, 1, 2, 3, 4),
listOf(Unit)
)
)
}
It fails with the following logs:
Received 0. Size 1
Received 1. Size 2
Received 2. Size 3
Received 3. Size 4
Received 4. Size 5
Overflow sent
Received kotlin.Unit. Size 6
Overflow received
Flushing 6
Cancellation
Expected :[5, 1]
Actual :[6]
I cannot for the life of me figure out exactly why this is happening. It's quite clear that the overflowMarker.onReceive
clause isn't quick enough to be selected before the next upstream emission, but what can be done about it? How can I guarantee the expected outcome regardless of how quickly upstream emits?