I am trying to implement background polling in an RxJava-based Android application where:
- A repository periodically fetches data from an API every 10 seconds and emits updates via a BehaviorSubject.
- A presenter subscribes to this BehaviorSubject and updates the UI when new data arrives.
The problem:
- The repository is calling onNext() on the BehaviorSubject.
- However, the presenter is not receiving the emitted values (the subscribe block never executes).
- No errors are thrown.
Code Structure (Simplified Example)
Repository - Fetches Data and Emits via BehaviorSubject
@Singleton
class MyRepository @Inject constructor(
private val apiService: ApiService
) {
private val pollingDisposable = SerialDisposable()
private val subject = BehaviorSubject.create<Optional<Data>>()
fun watchData(): Observable<Optional<Data>> = subject.hide()
fun startPolling() {
Observable.interval(10, TimeUnit.SECONDS, Schedulers.io())
.flatMapSingle {
fetchDataFromApi()
.map { Optional(it) }
.onErrorReturn { Optional(null) }
}
.subscribe(
{ data ->
subject.onNext(data) // This is getting called
},
{ error -> error.printStackTrace() }
)
.into(pollingDisposable)
}
private fun fetchDataFromApi(): Single<Data> {
return apiService.getData().subscribeOn(Schedulers.io())
}
}
Presenter - Subscribes to Watch the Data
class MyPresenter @Inject constructor(
private val repository: MyRepository
) : RxPresenter() {
override fun onStart() {
super.onStart()
repository.startPolling() // Start polling when presenter starts
disposeBag += repository.watchData()
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ optionalData ->
Log.d("PRESENTER_DEBUG", "Received new data: $optionalData") // Never gets called
if (optionalData.value != null) {
updateUI(optionalData.value)
}
}, { error ->
Log.e("PRESENTER_DEBUG", "Error receiving data", error) // Never gets called
})
}
private fun updateUI(data: Data) {
// UI update logic
}
}
What I've Tried
- Confirmed that onNext() is being called
- Called subject.hasObservers() before calling onNext() and it returns false. (weird)
Question is:
- Why is hasObservers() returning false, even though the Presenter should be subscribed?
I should note that I am fairly new to Rx (forced lol) so it might be a very silly issue.
Any help or debugging suggestions would be greatly appreciated!