I'm trying to create an rxjs Subject that will repeat its output after a period of inactivity. My initial design was to make use of debounceTime, however that doesn't appear to trigger more than once.
I would like the subject to emit immediately when next
is called, and repeat that emission periodically until a new value is provided:
Inputs: ---a---------b---------c----
Outputs: ---a---a---a-b---b---b-c---c
Currently I have something like so:
const subject = new rx.Subject()
subject.debounceTime(5000)
.subscribe(subject)
subject.subscribe(value => console.log(`emitted: ${value}`))
subject.take(1).subscribe(next => next, error => error, () => {
console.log('emitted once')
})
subject.take(2).subscribe(next => next, error => error, () => {
console.log('emitted twice')
})
subject.take(3).subscribe(next => next, error => error, () => {
console.log('emitted thrice')
})
subject.next('a')
However this will only emit 'a' once, and the output 'emitted thrice' is never seen.
Could somebody please help me understand what's going wrong here?
I'm trying to create an rxjs Subject that will repeat its output after a period of inactivity. My initial design was to make use of debounceTime, however that doesn't appear to trigger more than once.
I would like the subject to emit immediately when next
is called, and repeat that emission periodically until a new value is provided:
Inputs: ---a---------b---------c----
Outputs: ---a---a---a-b---b---b-c---c
Currently I have something like so:
const subject = new rx.Subject()
subject.debounceTime(5000)
.subscribe(subject)
subject.subscribe(value => console.log(`emitted: ${value}`))
subject.take(1).subscribe(next => next, error => error, () => {
console.log('emitted once')
})
subject.take(2).subscribe(next => next, error => error, () => {
console.log('emitted twice')
})
subject.take(3).subscribe(next => next, error => error, () => {
console.log('emitted thrice')
})
subject.next('a')
However this will only emit 'a' once, and the output 'emitted thrice' is never seen.
Could somebody please help me understand what's going wrong here?
Share Improve this question asked Apr 18, 2017 at 2:44 Micheal HillMicheal Hill 1,6592 gold badges20 silver badges38 bronze badges3 Answers
Reset to default 6I think you can use the repeatWhen()
operator if I understand your problem correctly:
const subject = new ReplaySubject(1);
subject.next('a');
subject
.take(1)
.repeatWhen(() => Observable.timer(500, 500))
.subscribe(val => console.log(val));
setTimeout(() => subject.next('b'), 1400);
See live demo: https://jsbin./nufasiq/2/edit?js,console
This prints to console the following output in 500ms intervals:
a
a
a
b
b
b
The take(1)
is necessary here to make the chain plete properly which is intercepted by repeatWhen()
that subscribes again to its source Observable.
Another option is to use switchMap
and interval
:
const source = new Rx.Subject();
source
.switchMap((val) => Rx.Observable.interval(5000).map(() => val))
.subscribe(val => console.log(val));
Rx.Observable.interval(11000).subscribe(x => source.next(x));
see a demo in jsbin
You could try something like this jsfiddle (rxjs v4, but I guess replacing flatMapLatest
by switchMap
should do the trick for v5):
const input$ = new Rx.Subject()
const repeatedInput$ = input$.flatMapLatest(input => Rx.Observable.interval(200).map(input))
output$ = Rx.Observable.merge(input$, repeatedInput$)
output$.subscribe(console.log.bind(console, `output`))
input$.onNext(2)
setTimeout(() => input$.onNext(4), 300)