最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

websocket - Kotlin flow suspends in onEach for too long - Stack Overflow

programmeradmin2浏览0评论

I have a question about why my onEach intermediate operator suspends for so long? I have a web socket that emits a flow of events.

localJob = coroutineScope.async(Dispatchers.IO + coroutineExceptionHandler) {
    websocketDataSource
        .websocketFlow()
        .buffer(256)
        .chunked(5, 300) // We have 5 events or 300 milliseconds have passed
        .onEach { handleMessages(it) }
        .onCompletion { _ -> log.i("Websocket flow closed") }
        .catch { cause -> log.e("Unexpected message flow error.", cause) }
        .collect()
}

...

private suspend fun handleMessages(messages: List<Message>) {
    log.v("handleMessages (count: ${messages.size})")
    messages.forEach { msg ->
        try {
            when (val name = json.decodeFromJsonElement<String>(msg.json.getValue("name"))) {
                "audio_start_event" -> {
                    _audioEvents.emit(json.decodeFromString<AudioStartEvent>(msg.raw))
                }
                "audio_data_event" -> {
                    _audioEvents.emit(json.decodeFromString<AudioDataEvent>(msg.raw))
                }
                "audio_end_event" -> {
                    _audioEvents.emit(json.decodeFromString<AudioEndEvent>(msg.raw))
                }
                else -> {
                    log.v("ignoring: '$name'")
                }
            }
        } catch (e: Exception) {
            log.e("Parse failed", e)
        }
    }
}

The events are audio events, so they need to be processed without a lot of delay. I'm seeing that in some cases, the suspend handleMessages() function gets suspended for so long that my audio player gives up. I've read a lot of documentation, but I'm not sure how to fix this so that handleMessages gets called without such a long delay. I'm seeing that it can suspend for 10 or more seconds when events are being sent across the web socket. From my investigation the buffer is not filling up. Setting the buffer size to a smaller value doesn't seem to have any effect.

Any ideas on how to give equal priority to the web socket events and the logic to handle them in handleMessages()?

发布评论

评论列表(0)

  1. 暂无评论