I want to use RXJS to set up an ORDERED data stream that emits a number at a random interval (say every 1-5 seconds) which I want to use as a time-randomized data source for testing other parts of RXJS. The following code is generating the items in a random order (due to the delay) but I would like the order preserved only the time randomized.
function randomDelay(bottom, top) {
return Math.floor( Math.random() * ( 1 + top - bottom ) ) + bottom;
}
var source = Rx.Observable
.range(1, 10)
.flatMap(function (x) {
return Rx.Observable
.of(x)
.delay(randomDelay(1000,5000));
})
.timeInterval();
var subscription = source.subscribe(
function (x) {
$("#result").append('Next: ' + JSON.stringify(x) + '<br>');
},
function (err) {
$("#result").append('Error: ' + err);
},
function () {
$("#result").append('Completed');
});
is giving me variants of the following output:
Next: {"value":1,"interval":1229}
Next: {"value":2,"interval":321}
Next: {"value":4,"interval":645}
Next: {"value":5,"interval":28}
Next: {"value":9,"interval":728}
Next: {"value":10,"interval":269}
Next: {"value":3,"interval":107}
Next: {"value":6,"interval":265}
Next: {"value":8,"interval":1038}
Next: {"value":7,"interval":199}
I want to use RXJS to set up an ORDERED data stream that emits a number at a random interval (say every 1-5 seconds) which I want to use as a time-randomized data source for testing other parts of RXJS. The following code is generating the items in a random order (due to the delay) but I would like the order preserved only the time randomized.
function randomDelay(bottom, top) {
return Math.floor( Math.random() * ( 1 + top - bottom ) ) + bottom;
}
var source = Rx.Observable
.range(1, 10)
.flatMap(function (x) {
return Rx.Observable
.of(x)
.delay(randomDelay(1000,5000));
})
.timeInterval();
var subscription = source.subscribe(
function (x) {
$("#result").append('Next: ' + JSON.stringify(x) + '<br>');
},
function (err) {
$("#result").append('Error: ' + err);
},
function () {
$("#result").append('Completed');
});
is giving me variants of the following output:
Next: {"value":1,"interval":1229}
Next: {"value":2,"interval":321}
Next: {"value":4,"interval":645}
Next: {"value":5,"interval":28}
Next: {"value":9,"interval":728}
Next: {"value":10,"interval":269}
Next: {"value":3,"interval":107}
Next: {"value":6,"interval":265}
Next: {"value":8,"interval":1038}
Next: {"value":7,"interval":199}
Share
Improve this question
edited Nov 18, 2016 at 5:16
user3743222
18.7k5 gold badges73 silver badges75 bronze badges
asked Jan 28, 2016 at 11:47
Laurence FassLaurence Fass
1,9425 gold badges28 silver badges56 bronze badges
3 Answers
Reset to default 11I just used this question as my base to another and had to update it to RxJs 6 if anyone is interested.
const { range, of } = rxjs;
const { concatMap, delay } = rxjs.operators;
range(1, 10).pipe(
concatMap(i => of(i).pipe(delay(1000 + (Math.random() * 4000))))
).subscribe(val => { console.log(val); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>
Use concatMap
instead of flatMap
.
Documentation here: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/concatmap.md
var source = Rx.Observable
.range(1, 10)
.concatMap(function (x) {
return Rx.Observable
.of(x)
.delay(randomDelay(1000,5000));
})
.timeInterval();
I would suggest dividing this problem into two subproblems: i) emitting at random intervals and ii) generating random values. The first problem could be solved by a suitable custom RxJS observer (that is what crazyObservable is meant to do); the second problem can be solved by a custom subscription to the crazyObservable.
crazyObservable has three parameters: totalNumberOfEmissions, timeInterval and totalNumberOfTimeIntervals; it returns a custom observable that emits randomly totalNumberOfEmissions over the totalNumberOfTimeIntervals.
To get your desired behavior, set totalNumberOfEmissions, timeInterval, and totalNumberOfTimeIntervals at your willing. This Marble diagram can be of help.
var oneHourRandomTenHundredRequisitions$ = crazyObservable(1000,1000,60*60);
let testComponent1 = oneHourRandomTenHundredRequisitions$.subscribe({
next() { <a call to your random generator> } ,
error() { <your custom error call> },
complete() { <your custom complete call> }
});