I’m given an observable that may either emit values and complete, or error. The observable can be repeated.
Consumers of this observable need to be informed if the source errors, but they also should have a mechanism to retry the observable. The consumer may choose not to retry the observable.
The problem I’m trying to work around is that an error, once handled, cannot be handled by a different handler down the line. My solution involves splitting the pipeline in two, and I’m wondering if there is a better way. Splitting and re-merging a pipe is nothing I’ve seen so far.
This is my solution:
import { catchError, EMPTY, merge, Observable, of, retry, Subject, switchMap, tap } from "rxjs";
/** A source that will only emit only one value, but can be retried. */
declare const source$: Observable<unknown>
/** Flag to indicate that the source last emitted an error. */
let needsRetrying = false
/** Emits when the consumer requests retrying after the source errors */
const retrier$ = new Subject<void>()
// At this point we fork the stream to apply two error handlers,
// one that facilitets the consumer to retry on error,
// and one that communicates the error to the consumer.
// Later we merge them again to present the consumer with a single observable.
// The fork duplicates the event emissions,
// so we must take precautions against the consumer’s receiving them twice.
const retrying$ = source$.pipe(
tap({ error: () => { needsRetrying = true }}),
retry({ delay: () => retrier$ }), // Listen to retry requests
)
const errorCommunicating$ = source$.pipe(
catchError((error) => of({ data: null, error})), // Inform the consumer
switchMap(() => EMPTY) // Discard the regular events
)
// Re-join the split pipe, so the consumer gets informed, and can retry.
const foo$ = merge([retrying$, errorCommunicating$])
export function getFoo$() {
if (needsRetrying) {
retrier$.next()
}
return foo$
}
The final observable, foo$
, will be used in multiple places across the application. All these places must use the same observable. Therefore re-assigning foo$
is not an option.
I’m given an observable that may either emit values and complete, or error. The observable can be repeated.
Consumers of this observable need to be informed if the source errors, but they also should have a mechanism to retry the observable. The consumer may choose not to retry the observable.
The problem I’m trying to work around is that an error, once handled, cannot be handled by a different handler down the line. My solution involves splitting the pipeline in two, and I’m wondering if there is a better way. Splitting and re-merging a pipe is nothing I’ve seen so far.
This is my solution:
import { catchError, EMPTY, merge, Observable, of, retry, Subject, switchMap, tap } from "rxjs";
/** A source that will only emit only one value, but can be retried. */
declare const source$: Observable<unknown>
/** Flag to indicate that the source last emitted an error. */
let needsRetrying = false
/** Emits when the consumer requests retrying after the source errors */
const retrier$ = new Subject<void>()
// At this point we fork the stream to apply two error handlers,
// one that facilitets the consumer to retry on error,
// and one that communicates the error to the consumer.
// Later we merge them again to present the consumer with a single observable.
// The fork duplicates the event emissions,
// so we must take precautions against the consumer’s receiving them twice.
const retrying$ = source$.pipe(
tap({ error: () => { needsRetrying = true }}),
retry({ delay: () => retrier$ }), // Listen to retry requests
)
const errorCommunicating$ = source$.pipe(
catchError((error) => of({ data: null, error})), // Inform the consumer
switchMap(() => EMPTY) // Discard the regular events
)
// Re-join the split pipe, so the consumer gets informed, and can retry.
const foo$ = merge([retrying$, errorCommunicating$])
export function getFoo$() {
if (needsRetrying) {
retrier$.next()
}
return foo$
}
The final observable, foo$
, will be used in multiple places across the application. All these places must use the same observable. Therefore re-assigning foo$
is not an option.
1 Answer
Reset to default 0does this work for you?
im assuming what u want to achieve was to flatten error result as normal emit, while still able to specify it to retry on error by the consumer
function getFoo$(retry = false) {
const foo$ = source$.pipe(
catchError((error) => {
const optionalPipes = []
if(retry) optionalPipes.push(mergeMap(() => source$))
return of({ data: null, error}).pipe(...optionalPipes)
}),
)
return foo$
}