Why does publishReplay(1).refCount() not replay the last value for late subscribers?
a = new Rx.Subject();
b = a.publishReplay(1).refCount();
a.subscribe(function(x){console.log('timely subscriber:',x)});
a.next(1);
b.subscribe(function(x){console.log('late subscriber:',x)});
<script src=".js"></script>
Why does publishReplay(1).refCount() not replay the last value for late subscribers?
a = new Rx.Subject();
b = a.publishReplay(1).refCount();
a.subscribe(function(x){console.log('timely subscriber:',x)});
a.next(1);
b.subscribe(function(x){console.log('late subscriber:',x)});
<script src="http://reactivex.io/rxjs/user/script/0-Rx.js"></script>
Expected output:
timely subscribe: 1
late subscriber: 1
Actual output
timely subscriber: 1
Share
asked Jan 30, 2017 at 18:14
markmarijnissenmarkmarijnissen
5,9172 gold badges30 silver badges34 bronze badges
2 Answers
Reset to default 9This happens because at the time you call a.next(1)
the publishReplay(1)
hasn't subscribed to its source Observable (Subject a
in this case) and therefore the internal ReplaySubject
won't receive the value 1
.
In RxJS 5 the actual subscription between operators happens when you subscribe at the end of the chain which is b.subscribe(...)
in this example. See:
- https://github./ReactiveX/rxjs/blob/master/src/operator/multicast.ts#L63
- https://github./ReactiveX/rxjs/blob/master/src/Observable.ts#L96
Until you call subscribe()
the operators are chained thanks to lift()
method that just takes an instance of operator and assigns it to the new Observable. The operator.call()
method as you can see in the two links above is called later when subscribing. See:
- https://github./ReactiveX/rxjs/blob/master/src/Observable.ts#L67
Your first subscriber subscribes to a
, since refCount
first activates the stream when there is at least 1 subscriber (of which there is none, because not b
but a
is subscribed), it is not active until your last loc.
a = new Rx.Subject();
b = a.publishReplay(1).refCount();
b.subscribe(function(x){console.log('timely subscriber:',x)});
a.next(1);
b.subscribe(function(x){console.log('late subscriber:',x)});
<script src="https://unpkg./rxjs/bundles/Rx.min.js"></script>