最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

android - RxJava BehaviorSubject.onNext() is called but subscriber is not receiving updates - Stack Overflow

programmeradmin3浏览0评论

I am trying to implement background polling in an RxJava-based Android application where:

  1. A repository periodically fetches data from an API every 10 seconds and emits updates via a BehaviorSubject.
  2. A presenter subscribes to this BehaviorSubject and updates the UI when new data arrives.

The problem:

  • The repository is calling onNext() on the BehaviorSubject.
  • However, the presenter is not receiving the emitted values (the subscribe block never executes).
  • No errors are thrown.

Code Structure (Simplified Example)

Repository - Fetches Data and Emits via BehaviorSubject

@Singleton
class MyRepository @Inject constructor(
    private val apiService: ApiService
) {
    private val pollingDisposable = SerialDisposable()
    private val subject = BehaviorSubject.create<Optional<Data>>() 

    fun watchData(): Observable<Optional<Data>> = subject.hide()

    fun startPolling() {
        Observable.interval(10, TimeUnit.SECONDS, Schedulers.io())
            .flatMapSingle {
                fetchDataFromApi()
                    .map { Optional(it) }
                    .onErrorReturn { Optional(null) }
            }
            .subscribe(
                { data -> 
                    subject.onNext(data) // This is getting called
                },
                { error -> error.printStackTrace() }
            )
            .into(pollingDisposable)
    }

    private fun fetchDataFromApi(): Single<Data> {
        return apiService.getData().subscribeOn(Schedulers.io())
    }
}

Presenter - Subscribes to Watch the Data

class MyPresenter @Inject constructor(
    private val repository: MyRepository
) : RxPresenter() {

    override fun onStart() {
        super.onStart()

        repository.startPolling() // Start polling when presenter starts

        disposeBag += repository.watchData()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({ optionalData ->
                Log.d("PRESENTER_DEBUG", "Received new data: $optionalData") // Never gets called
                if (optionalData.value != null) {
                    updateUI(optionalData.value)
                }
            }, { error ->
                Log.e("PRESENTER_DEBUG", "Error receiving data", error) // Never gets called
            })
    }

    private fun updateUI(data: Data) {
        // UI update logic
    }
}

What I've Tried

  1. Confirmed that onNext() is being called
  2. Called subject.hasObservers() before calling onNext() and it returns false. (weird)

Question is:

  • Why is hasObservers() returning false, even though the Presenter should be subscribed?

I should note that I am fairly new to Rx (forced lol) so it might be a very silly issue.

Any help or debugging suggestions would be greatly appreciated!

发布评论

评论列表(0)

  1. 暂无评论