I have a question about why my onEach intermediate operator suspends for so long? I have a web socket that emits a flow of events.
localJob = coroutineScope.async(Dispatchers.IO + coroutineExceptionHandler) {
websocketDataSource
.websocketFlow()
.buffer(256)
.chunked(5, 300) // We have 5 events or 300 milliseconds have passed
.onEach { handleMessages(it) }
.onCompletion { _ -> log.i("Websocket flow closed") }
.catch { cause -> log.e("Unexpected message flow error.", cause) }
.collect()
}
...
private suspend fun handleMessages(messages: List<Message>) {
log.v("handleMessages (count: ${messages.size})")
messages.forEach { msg ->
try {
when (val name = json.decodeFromJsonElement<String>(msg.json.getValue("name"))) {
"audio_start_event" -> {
_audioEvents.emit(json.decodeFromString<AudioStartEvent>(msg.raw))
}
"audio_data_event" -> {
_audioEvents.emit(json.decodeFromString<AudioDataEvent>(msg.raw))
}
"audio_end_event" -> {
_audioEvents.emit(json.decodeFromString<AudioEndEvent>(msg.raw))
}
else -> {
log.v("ignoring: '$name'")
}
}
} catch (e: Exception) {
log.e("Parse failed", e)
}
}
}
The events are audio events, so they need to be processed without a lot of delay. I'm seeing that in some cases, the suspend handleMessages() function gets suspended for so long that my audio player gives up. I've read a lot of documentation, but I'm not sure how to fix this so that handleMessages gets called without such a long delay. I'm seeing that it can suspend for 10 or more seconds when events are being sent across the web socket. From my investigation the buffer is not filling up. Setting the buffer size to a smaller value doesn't seem to have any effect.
Any ideas on how to give equal priority to the web socket events and the logic to handle them in handleMessages()?