最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

javascript - SSE and EventEmitter in NestJS - How to use observables - Stack Overflow

programmeradmin3浏览0评论

I want to omit an event that happens in the backend and show it in the frontend. I dont need sockets here because its a one way munication. So I want to try to push the event thats omitted towards the frontend using SSE (Server sent events) in nestjs. Now the setup is pretty simple according to the docs:

@Sse('sse')
sse(): Observable<MessageEvent> {
  return interval(1000).pipe(map((_) => ({ data: { hello: 'world' } })));
}

This is all fine and dandy, and it works. However, this should now just push the "event" thats happening in the backend, instead of using interval etc.

Here is my current setup:

@Injectable()
export class StocksService {
  public stocks: Stock[] = [
    {
      id: 1,
      symbol: 'Stock #1',
      bid: 500,
      ask: 500,
    }
  ];

  constructor(private eventEmitter: EventEmitter2) {}

  create(createStockDto: CreateStockDto) {
    const stock = {
      id: this.stocks.length + 1,
      ...createStockDto,
    };
    this.stocks.push(stock);

    const stockCreatedEvent = new StockCreatedEvent();
    stockCreatedEvent.symbol = stock.symbol;
    stockCreatedEvent.ask = stock.ask;
    stockCreatedEvent.bid = stock.bid;

    this.eventEmitter.emit('stock.created', stockCreatedEvent);

    return stock;
  }
}

Now this.eventEmitter.emit('stock.created', stockCreatedEvent); emits the event and I can console log it using a small listener, and see it just fine:

@Injectable()
export class StockCreatedListener {
  @OnEvent('stock.created')
  handleStockCreatedEvent(event: StockCreatedEvent) {
    console.log(event);
  }
}

So now, whenever I hit the service with Postman and create a Stock entry, it will console log the event, which is great! Now I want this data pushed towards the frontend using SSE.

However, after digging through the RxJS docs, I am not sure I understand how I am supposed to connect these two. I know I need to make an Observable, which I tried with:

  @Sse('sse')
  @OnEvent('stock.created')
  sse(event: StockCreatedEvent): Observable<MessageEvent> {
    const obj = of(event);
    return obj.pipe(map((_) => ({ data: event})));
  }

However, even if i go to the url http://localhost:3000/sse it will not do anything, even not console logging or returning any stream. Do i need an observable here.. or a subject?

Please help a brother out. Here is also the repo, if you need to look at it a bit more specifically

I want to omit an event that happens in the backend and show it in the frontend. I dont need sockets here because its a one way munication. So I want to try to push the event thats omitted towards the frontend using SSE (Server sent events) in nestjs. Now the setup is pretty simple according to the docs:

@Sse('sse')
sse(): Observable<MessageEvent> {
  return interval(1000).pipe(map((_) => ({ data: { hello: 'world' } })));
}

This is all fine and dandy, and it works. However, this should now just push the "event" thats happening in the backend, instead of using interval etc.

Here is my current setup:

@Injectable()
export class StocksService {
  public stocks: Stock[] = [
    {
      id: 1,
      symbol: 'Stock #1',
      bid: 500,
      ask: 500,
    }
  ];

  constructor(private eventEmitter: EventEmitter2) {}

  create(createStockDto: CreateStockDto) {
    const stock = {
      id: this.stocks.length + 1,
      ...createStockDto,
    };
    this.stocks.push(stock);

    const stockCreatedEvent = new StockCreatedEvent();
    stockCreatedEvent.symbol = stock.symbol;
    stockCreatedEvent.ask = stock.ask;
    stockCreatedEvent.bid = stock.bid;

    this.eventEmitter.emit('stock.created', stockCreatedEvent);

    return stock;
  }
}

Now this.eventEmitter.emit('stock.created', stockCreatedEvent); emits the event and I can console log it using a small listener, and see it just fine:

@Injectable()
export class StockCreatedListener {
  @OnEvent('stock.created')
  handleStockCreatedEvent(event: StockCreatedEvent) {
    console.log(event);
  }
}

So now, whenever I hit the service with Postman and create a Stock entry, it will console log the event, which is great! Now I want this data pushed towards the frontend using SSE.

However, after digging through the RxJS docs, I am not sure I understand how I am supposed to connect these two. I know I need to make an Observable, which I tried with:

  @Sse('sse')
  @OnEvent('stock.created')
  sse(event: StockCreatedEvent): Observable<MessageEvent> {
    const obj = of(event);
    return obj.pipe(map((_) => ({ data: event})));
  }

However, even if i go to the url http://localhost:3000/sse it will not do anything, even not console logging or returning any stream. Do i need an observable here.. or a subject?

Please help a brother out. Here is also the repo, if you need to look at it a bit more specifically

Share Improve this question edited Dec 10, 2022 at 11:38 TLListenreich asked Dec 10, 2022 at 11:27 TLListenreichTLListenreich 1232 silver badges9 bronze badges
Add a ment  | 

2 Answers 2

Reset to default 5

I'm creating something similar.

I needed to send an SSE signal to connected clients only when an asset had its price updated. Used a webhook instead of short polling.

The solution was achieved in a similar way as below (I removed the least important parts):

LiveQuoteController:

//... other imports
import { EventEmitter2 } from "@nestjs/event-emitter";
import { Observable, fromEvent } from "rxjs";
import { map } from "rxjs/operators";

@Controller("/livequote")
export class LiveQuoteController {
    constructor (private readonly eventEmitter: EventEmitter2) {}

    @Sse()
    public liveQuote(): Observable<MessageEvent> {
        return fromEvent(this.eventEmitter, "price.update")
            .pipe(map(price => {
                return { data: price } as MessageEvent;
            }));
    }
}

PriceUpdateHookController:

//... other imports
import { EventEmitter2 } from "@nestjs/event-emitter";

@Controller("/price")
export class PriceUpdateHookController {
    constructor (private readonly eventEmitter: EventEmitter2) { }

    @Post()
    updatePrice(@Body() newPrice: number) {
        this.eventEmitter.emit("price.update", newPrice);
        return true;
    }
}

Hope this helps.

I just changed http://localhost:3000/sse in React App.tsx at line 6.

It works for me.

client side

From

const eventSource = new EventSource('http://0.0.0.0:3000/sse');

To

  const eventSource = new EventSource('http://localhost:3000/sse');

server side

It display same message, I just changed current time at server side. In app.controller.ts,

From

  sse(event: StockCreatedEvent): Observable<MessageEvent> {
    return interval(1000).pipe(map((_) => ({ data: { hello: 'world' } })));
  }

To

  sse(event: StockCreatedEvent): Observable<MessageEvent> {
    return interval(1000).pipe(map((_) => ({ data: { hello: 'world, time = ' +  new Date().toUTCString() } })));
  }

Result

server side - backend (nestjs)

client side - client (React)

发布评论

评论列表(0)

  1. 暂无评论