I have an observer that get different values published to it over time. For example
sub = new Subject<any>();
sub.next(1); sub.next(2); sub.next(3);
#hack 1
sub.next(4); sub.next(5); sub.next(6);
#hack 2
If there is a subscription in #hack 1 it should get all values 1,2,3 and if there is subscription in #hack 2 it should get all values 1,2,3,4,5,6. Also, ReplaySubject gets old values, sure, but I want all the values on every publish. So, on every sub.next(7), I need all the values 1,2,3,4,5,6 and 7. So basically the observable stores all published or 'next'-ed values. How do I achieve this?
I have an observer that get different values published to it over time. For example
sub = new Subject<any>();
sub.next(1); sub.next(2); sub.next(3);
#hack 1
sub.next(4); sub.next(5); sub.next(6);
#hack 2
If there is a subscription in #hack 1 it should get all values 1,2,3 and if there is subscription in #hack 2 it should get all values 1,2,3,4,5,6. Also, ReplaySubject gets old values, sure, but I want all the values on every publish. So, on every sub.next(7), I need all the values 1,2,3,4,5,6 and 7. So basically the observable stores all published or 'next'-ed values. How do I achieve this?
Share Improve this question asked Apr 5, 2018 at 20:14 AnkurAnkur 1872 silver badges11 bronze badges3 Answers
Reset to default 4I like @TomaszKula's answer, but I think the question is about successive subscriptions picking up previous (and future) emits.
As far as I can tell, ReplaySubject
does what is asked for...
console.clear()
const sub = new Rx.ReplaySubject();
sub.next(1);
console.log('1st subscribe')
sub.subscribe(x => console.log('1st', x))
sub.next(2);
console.log('2nd subscribe')
sub.subscribe(x => console.log('2nd', x))
sub.next(3)
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/5.5.8/Rx.js"></script>
Use BehaviorSubject
with scan
:) It's basically like Map.reduce, but emits on every update.
import { Component } from '@angular/core';
import { scan } from 'rxjs/operators';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
@Component({
selector: 'my-app',
template: `
<input (keydown.enter)="onMessage(input.value); input.value =''" #input />
<div *ngFor="let message of messages$ | async">
{{ message }}
</div>
`,
styleUrls: ['./app.ponent.css']
})
export class AppComponent {
private messagesSource = new BehaviorSubject<any>([]);
messages$ = this.messagesSource.asObservable()
.pipe(scan((acc, curr) => [...acc, curr], []));
onMessage(val: string) {
this.messagesSource.next(val);
}
}
Live demo
This line is what does all the magic: scan((acc, curr) => [...acc, curr])
It concats every value emitted by the subject into an array.
The answer is: You can't with a simple Subject as you only have access to it's value inside a Subscription but never locally in your Service. Using a BehaviorSubject you can.
But first of all, make your subject private and accessible via methods only. Then you can do magic like this:
import { Injectable } from '@angular/core';
import {BehaviorSubject} from 'rxjs/BehaviorSubject';
import {Observable} from 'rxjs/Observable';
@Injectable()
export class DataService {
private subject = new BehaviorSubject<string>();
setData(data: string) {
this.subject.next(
this.subject.getValue() + ',' + data;
);
}
clearData() {
this.subject.next();
}
getData(): Observable<any> {
return this.subject.asObservable();
}
}
And your Subscribers access this BehaviorSubject this way
constructor(private dataService:DataService){}
ngOnInit(): void {
this.dataService.getData().subscribe(data => {
// do something with data
});
}
private aMethod(): void {
this.dataService.setData(aString);
}