最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

javascript - RxJS Buffer until stream is true - Stack Overflow

programmeradmin5浏览0评论

I want to buffer the stream, until the stream is true of a predicate:

For example, the number can be divided by five:

//emit value every 1 second
const oneSecondInterval = Rx.Observable.interval(1000);
//return an observable that checks if the number is divided by zero
const fiveSecondInterval = () => oneSecondInterval.filter(number => number % 5 === 0);

const bufferWhenExample = oneSecondInterval.bufferWhen(fiveSecondInterval);
//log values
const subscribe = bufferWhenExample.subscribe(val => console.log('Emitted Buffer: ', val));

Output

"Emitted Buffer: "
[]
"Emitted Buffer: "
[0, 1]
"Emitted Buffer: "
[2]
"Emitted Buffer: "
[3]
"Emitted Buffer: "
[4]
"Emitted Buffer: "
[5]
"Emitted Buffer: "
[6]
"Emitted Buffer: "
[7]
"Emitted Buffer: "
[8]
"Emitted Buffer: "
[9]
"Emitted Buffer: "
[10]

What I want:

"Emitted Buffer: "
[0]
"Emitted Buffer: "
[1,2,3,4,5]
"Emitted Buffer: "
[6,7,8,9,10]

But this is not working. Why?

DEMO: ,console


Update

This is almost good

const oneSecondInterval = Rx.Observable.interval(1000);
const fiveSecondInterval = oneSecondInterval.filter(time => time % 5 === 0);
const bufferWhenExample = oneSecondInterval.buffer(fiveSecondInterval);

bufferWhenExample.subscribe(console.log)

The only problem is that it emits

[]
[0,1,2,3,4]
[5,6,7,8,9]

Instead I would like to have

[0]
[1,2,3,4,5]
[6,7,8,9,10]

I want to buffer the stream, until the stream is true of a predicate:

For example, the number can be divided by five:

//emit value every 1 second
const oneSecondInterval = Rx.Observable.interval(1000);
//return an observable that checks if the number is divided by zero
const fiveSecondInterval = () => oneSecondInterval.filter(number => number % 5 === 0);

const bufferWhenExample = oneSecondInterval.bufferWhen(fiveSecondInterval);
//log values
const subscribe = bufferWhenExample.subscribe(val => console.log('Emitted Buffer: ', val));

Output

"Emitted Buffer: "
[]
"Emitted Buffer: "
[0, 1]
"Emitted Buffer: "
[2]
"Emitted Buffer: "
[3]
"Emitted Buffer: "
[4]
"Emitted Buffer: "
[5]
"Emitted Buffer: "
[6]
"Emitted Buffer: "
[7]
"Emitted Buffer: "
[8]
"Emitted Buffer: "
[9]
"Emitted Buffer: "
[10]

What I want:

"Emitted Buffer: "
[0]
"Emitted Buffer: "
[1,2,3,4,5]
"Emitted Buffer: "
[6,7,8,9,10]

But this is not working. Why?

DEMO: http://jsbin./durerimiju/1/edit?js,console


Update

This is almost good

const oneSecondInterval = Rx.Observable.interval(1000);
const fiveSecondInterval = oneSecondInterval.filter(time => time % 5 === 0);
const bufferWhenExample = oneSecondInterval.buffer(fiveSecondInterval);

bufferWhenExample.subscribe(console.log)

The only problem is that it emits

[]
[0,1,2,3,4]
[5,6,7,8,9]

Instead I would like to have

[0]
[1,2,3,4,5]
[6,7,8,9,10]
Share Improve this question edited Oct 17, 2017 at 22:05 Gergely Fehérvári asked Oct 15, 2017 at 22:52 Gergely FehérváriGergely Fehérvári 7,9516 gold badges52 silver badges77 bronze badges
Add a ment  | 

4 Answers 4

Reset to default 3

Here is the solution (in Typescript) that seems to work for me:

import { interval, OperatorFunction } from 'rxjs';
import { buffer, delay, filter, share, tap } from 'rxjs/operators';

export function bufferUntil<T>(predicate:(value:T) => boolean):OperatorFunction<T, T[]>
{
    return function(source)
    {
        const share$ = source.pipe(share());
        const until$ = share$.pipe(filter(predicate), delay(0));
        return share$.pipe(buffer(until$));
    };
}

interval(1000).pipe(
    tap(console.log),
    bufferUntil(value => value % 5 === 0),
    tap(console.log)
).subscribe();

This results in the following output:

0
[0] // this emits immediately after the previous log
1
2
3
4
5
[1, 2, 3, 4, 5] // this emits immediately after the previous log
6
7
8
9
10
[6, 7, 8, 9, 10] // this emits immediately after the previous log

If anyone has a better solution or if my implementation is dangerous or incorrect in some way, I'd love to hear it as I needed this functionality for myself.

The bufferWhen operator gets a closing observable factory function, which creates the observable on every buffer start. In your case, for every buffering iteration it creates a new interval observable that always starts with 0.

You can simply use the buffer operator which gets a closing observable and it will work as expected:

const oneSecondInterval = Rx.Observable.interval(1000);
const fiveSecondInterval = oneSecondInterval.filter((number) => number % 5 === 0);
oneSecondInterval.buffer(fiveSecondInterval);

I eddited your jsbin here

The interval observable doesn't emit a count of the number of times it has gone off, it just emits undefined. So when you do .filter(number => number % 5 === 0), the predicate is always returning false.

To keep a count of how many values have been emitted, you can use the .scan operator:

const fiveSecondInterval = () => 
    oneSecondInterval
        .scan(count => count + 1, 0)
        .filter(count => count % 5 === 0);

Addressing the issue raised by @michalc in the ments to @TroyWeber's answer.

Here's a pretty simple function that (I believe) works in all cases. It doesn't do anything fancy or leverage other operators, but gets the job done!

import { from, Observable, type OperatorFunction } from 'rxjs'

function bufferUntil<T>(predicate: (value: T) => boolean): OperatorFunction<T, T[]> {
  return (source: Observable<T>) => new Observable<T[]>(observer => {
    let buffer: T[] = []

    return source.subscribe({
      next(value) {
        buffer.push(value)

        if (predicate(value)) {
          observer.next(buffer)
          buffer = []
        }
      },
      error(err) { observer.error(err) },
      plete() { observer.plete() },
    })
  })
}

from([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]).pipe(
  bufferUntil(n => n % 5 === 0),
).subscribe(console.log)
发布评论

评论列表(0)

  1. 暂无评论