最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

javascript - Angular 8 - handling SSE reconnect on error - Stack Overflow

programmeradmin1浏览0评论

I'm working on an Angular 8 (with Electron 6 and Ionic 4) project and right now we are having evaluation phase where we are deciding whether to replace polling with SSE (Server-sent events) or Web Sockets. My part of the job is to research SSE.

I created small express application which generates random numbers and it all works fine. The only thing that bugs me is correct way to reconnect on server error.

My implementation looks like this:

  private createSseSource(): Observable<MessageEvent> {
    return Observable.create(observer => {
      this.eventSource = new EventSource(SSE_URL);
      this.eventSource.onmessage = (event) => {
        this.zone.run(() => observer.next(event));
      };

    this.eventSource.onopen = (event) => {
      console.log('connection open');
    };

    this.eventSource.onerror = (error) => {
      console.log('looks like the best thing to do is to do nothing');
      // this.zone.run(() => observer.error(error));
      // this.closeSseConnection();
      // this.reconnectOnError();
    };
  });
}

I tried to implement reconnectOnError() function following this answer, but I just wasn't able to make it work. Then I ditched the reconnectOnError() function and it seems like it's a better thing to do. Do not try to close and reconnect nor propagate error to observable. Just sit and wait and when the server is running again it will reconnect automatically.

Question is, is this really the best thing to do? Important thing to mention is, that the FE application municates with it's own server which can't be accessed by another instance of the app (built-in device).

I'm working on an Angular 8 (with Electron 6 and Ionic 4) project and right now we are having evaluation phase where we are deciding whether to replace polling with SSE (Server-sent events) or Web Sockets. My part of the job is to research SSE.

I created small express application which generates random numbers and it all works fine. The only thing that bugs me is correct way to reconnect on server error.

My implementation looks like this:

  private createSseSource(): Observable<MessageEvent> {
    return Observable.create(observer => {
      this.eventSource = new EventSource(SSE_URL);
      this.eventSource.onmessage = (event) => {
        this.zone.run(() => observer.next(event));
      };

    this.eventSource.onopen = (event) => {
      console.log('connection open');
    };

    this.eventSource.onerror = (error) => {
      console.log('looks like the best thing to do is to do nothing');
      // this.zone.run(() => observer.error(error));
      // this.closeSseConnection();
      // this.reconnectOnError();
    };
  });
}

I tried to implement reconnectOnError() function following this answer, but I just wasn't able to make it work. Then I ditched the reconnectOnError() function and it seems like it's a better thing to do. Do not try to close and reconnect nor propagate error to observable. Just sit and wait and when the server is running again it will reconnect automatically.

Question is, is this really the best thing to do? Important thing to mention is, that the FE application municates with it's own server which can't be accessed by another instance of the app (built-in device).

Share Improve this question asked Sep 24, 2019 at 7:46 mat.hudakmat.hudak 3,2784 gold badges29 silver badges50 bronze badges
Add a ment  | 

1 Answer 1

Reset to default 4

I see that my question is getting some attention so I decided to post my solution. To answer my question: "Is this really the best thing to do, to omit reconnect function?" I don't know :). But this solution works for me and it was proven in production, that it offers way how to actually control SSE reconnect to some extent.

Here's what I did:

  1. Rewritten createSseSource function so the return type is void
  2. Instead of returning observable, data from SSE are fed to subjects/NgRx actions
  3. Added public openSseChannel and private reconnectOnError functions for better control
  4. Added private function processSseEvent to handle custom message types

Since I'm using NgRx on this project every SSE message dispatches corresponding action, but this can be replaced by ReplaySubject and exposed as observable.

// Public function, initializes connection, returns true if successful
openSseChannel(): boolean {
  this.createSseEventSource();
  return !!this.eventSource;
}

// Creates SSE event source, handles SSE events
protected createSseEventSource(): void {
  // Close event source if current instance of SSE service has some
  if (this.eventSource) {
    this.closeSseConnection();
    this.eventSource = null;
  }
  // Open new channel, create new EventSource
  this.eventSource = new EventSource(this.sseChannelUrl);

  // Process default event
  this.eventSource.onmessage = (event: MessageEvent) => {
    this.zone.run(() => this.processSseEvent(event));
  };

  // Add custom events
  Object.keys(SSE_EVENTS).forEach(key => {
    this.eventSource.addEventListener(SSE_EVENTS[key], event => {
      this.zone.run(() => this.processSseEvent(event));
    });
  });

  // Process connection opened
  this.eventSource.onopen = () => {
    this.reconnectFrequencySec = 1;
  };

  // Process error
  this.eventSource.onerror = (error: any) => {
    this.reconnectOnError();
  };
}

// Processes custom event types
private processSseEvent(sseEvent: MessageEvent): void {
  const parsed = sseEvent.data ? JSON.parse(sseEvent.data) : {};
  switch (sseEvent.type) {
    case SSE_EVENTS.STATUS: {
      this.store.dispatch(StatusActions.setStatus({ status: parsed }));
      // or
      // this.someReplaySubject.next(parsed);
      break;
    }
    // Add others if neccessary
    default: {
      console.error('Unknown event:', sseEvent.type);
      break;
    }
  }
}

// Handles reconnect attempts when the connection fails for some reason.
// const SSE_RECONNECT_UPPER_LIMIT = 64;
private reconnectOnError(): void {
  const self = this;
  this.closeSseConnection();
  clearTimeout(this.reconnectTimeout);
  this.reconnectTimeout = setTimeout(() => {
    self.openSseChannel();
    self.reconnectFrequencySec *= 2;
    if (self.reconnectFrequencySec >= SSE_RECONNECT_UPPER_LIMIT) {
      self.reconnectFrequencySec = SSE_RECONNECT_UPPER_LIMIT;
    }
  }, this.reconnectFrequencySec * 1000);
}

Since the SSE events are fed to subject/actions it doesn't matter if the connection is lost since at least last event is preserved within subject or store. Attempts to reconnect can then happen silently and when new data are send, there are processed seamlessly.

发布评论

评论列表(0)

  1. 暂无评论