最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

javascript - Using React useEffect hook with rxjs mergeMap operator - Stack Overflow

programmeradmin2浏览0评论

I'm trying to implement a data stream that has to use inner observables, where I use one from mergeMap, concatMap etc.

e.g.:

const output$$ = input$$.pipe(
    mergeMap(str => of(str).pipe(delay(10))),
    share()
  );

  output$$.subscribe(console.log);

This works fine when logging into console. But when I try to use it in React like below utilizing useEffect and useState hooks to update some text:

function App() {
  const input$ = new Subject<string>();
  const input$$ = input$.pipe(share());
  const output$$ = input$$.pipe(
    mergeMap(str => of(str).pipe(delay(10))),
    share()
  );

  output$$.subscribe(console.log);
  // This works

  const [input, setInput] = useState("");
  const [output, setOutput] = useState("");

  useEffect(() => {
    const subscription = input$$.subscribe(setInput);

    return () => {
      subscription.unsubscribe();
    };
  }, [input$$]);

  useEffect(() => {
    const subscription = output$$.subscribe(setOutput);
    // This doesn't

    return () => {
      subscription.unsubscribe();
    };
  }, [output$$]);

  return (
    <div className="App">
      <input
        onChange={event => input$.next(event.target.value)}
        value={input}
      />
      <p>{output}</p>
    </div>
  );
}

it starts acting weird/unpredictable (e.g.: sometimes the text is updated in the middle of typing, sometimes it doesn't update at all).

Things I have noticed:

  • If the inner observable pletes immediately/is a promise that resolves immediately, it works fine.
  • If we print to console instead of useEffect, it works fine.

I believe this has to do something with the inner workings of useEffect and how it captures and notices outside changes, but cannot get it working.
Any help is much appreciated.

Minimal reproduction of the case:

I'm trying to implement a data stream that has to use inner observables, where I use one from mergeMap, concatMap etc.

e.g.:

const output$$ = input$$.pipe(
    mergeMap(str => of(str).pipe(delay(10))),
    share()
  );

  output$$.subscribe(console.log);

This works fine when logging into console. But when I try to use it in React like below utilizing useEffect and useState hooks to update some text:

function App() {
  const input$ = new Subject<string>();
  const input$$ = input$.pipe(share());
  const output$$ = input$$.pipe(
    mergeMap(str => of(str).pipe(delay(10))),
    share()
  );

  output$$.subscribe(console.log);
  // This works

  const [input, setInput] = useState("");
  const [output, setOutput] = useState("");

  useEffect(() => {
    const subscription = input$$.subscribe(setInput);

    return () => {
      subscription.unsubscribe();
    };
  }, [input$$]);

  useEffect(() => {
    const subscription = output$$.subscribe(setOutput);
    // This doesn't

    return () => {
      subscription.unsubscribe();
    };
  }, [output$$]);

  return (
    <div className="App">
      <input
        onChange={event => input$.next(event.target.value)}
        value={input}
      />
      <p>{output}</p>
    </div>
  );
}

it starts acting weird/unpredictable (e.g.: sometimes the text is updated in the middle of typing, sometimes it doesn't update at all).

Things I have noticed:

  • If the inner observable pletes immediately/is a promise that resolves immediately, it works fine.
  • If we print to console instead of useEffect, it works fine.

I believe this has to do something with the inner workings of useEffect and how it captures and notices outside changes, but cannot get it working.
Any help is much appreciated.

Minimal reproduction of the case:
https://codesandbox.io/s/hooks-and-observables-1-7ygd8

Share Improve this question asked Aug 23, 2019 at 18:02 WickramarangaWickramaranga 1,1912 gold badges21 silver badges39 bronze badges 1
  • input$$ and output$$ are being created on each render. Also, useEffect is depending on them, which means the effects will execute on every render. – Steve Commented Aug 24, 2019 at 14:36
Add a ment  | 

2 Answers 2

Reset to default 9

I'm not quite sure what you're trying to achieve, but I found a number of problems which hopefully the following code fixes:

function App() {
    // Create these observables only once.
    const [input$] = useState(() => new Subject<string>());
    const [input$$] = useState(() => input$.pipe(share()));
    const [output$$] = useState(() => input$$.pipe(
        mergeMap(str => of(str).pipe(delay(10))),
        share()
    ));

    const [input, setInput] = useState("");
    const [output, setOutput] = useState("");

    // Create the subscription to input$$ on ponent mount, not on every render.
    useEffect(() => {
        const subscription = input$$.subscribe(setInput);

        return () => {
            subscription.unsubscribe();
        };
    }, []);

    // Create the subscription to output$$ on ponent mount, not on every render.
    useEffect(() => {
        const subscription = output$$.subscribe(setOutput);

        return () => {
            subscription.unsubscribe();
        };
    }, []);

    return (
        <div className="App">
            <input
                onChange={event => input$.next(event.target.value)}
                value={input}
            />
            <p>{output}</p>
        </div>
    );
}

I had a similar task but the goal was to pipe and debounce the input test and execute ajax call. The simple answer that you should init RxJS subject with arrow function in the react hook 'useState' in order to init subject once per init.

Then you should useEffect with empty array [] in order to create a pipe once on ponent init.

import React, { useEffect, useState } from "react";
import { ajax } from "rxjs/ajax";
import { debounceTime, delay, takeUntil } from "rxjs/operators";
import { Subject } from "rxjs/internal/Subject";

const App = () => {
  const [items, setItems] = useState([]);
  const [loading, setLoading] = useState(true);
  const [filterChangedSubject] = useState(() => {
    // Arrow function is used to init Singleton Subject. (in a scope of a current ponent)
    return new Subject<string>();
  });

  useEffect(() => {
    // Effect that will be initialized once on a react ponent init. 
    // Define your pipe here.
    const subscription = filterChangedSubject
      .pipe(debounceTime(200))
      .subscribe((filter) => {
        if (!filter) {
          setLoading(false);
          setItems([]);
          return;
        }
        ajax(`https://swapi.dev/api/people?search=${filter}`)
          .pipe(
            // current running ajax is canceled on filter change.
            takeUntil(filterChangedSubject)
          )
          .subscribe(
            (results) => {
              // Set items will cause render:
              setItems(results.response.results);
            },
            () => {
              setLoading(false);
            },
            () => {
              setLoading(false);
            }
          );
      });

    return () => {
      // On Component destroy. notify takeUntil to unsubscribe from current running ajax request
      filterChangedSubject.next("");
      // unsubscribe filter change listener
      subscription.unsubscribe();
    };
  }, []);

  const onFilterChange = (e) => {
    // Notify subject about the filter change
    filterChangedSubject.next(e.target.value);
  };
  return (
    <div>
      Cards
      {loading && <div>Loading...</div>}
      <input onChange={onFilterChange}></input>
      {items && items.map((item, index) => <div key={index}>{item.name}</div>)}
    </div>
  );
};

export default App;
发布评论

评论列表(0)

  1. 暂无评论