RxJS 错误处理指南
错误处理是 RxJS 中的重要组成部分,只要我们使用 RxJS,基本上就绕不开错误处理。
但是由于 RxJS 本身陡峭的学习曲线,以及各种各样的操作符,使得 RxJS 的错误处理并不容易理解,在这篇文章中,我列出了一些常见的错误处理方式,可以涵盖大多数使用场景。
前提
为了理解 RxJS 中的错误处理,我们首先要明确一个前提:
RxJS 是基于流(stream)的,任何给定的流只能出错一次,发出错误通知后,Observable 就会终止执行,后续的订阅也不会再收到任何值。
这是由 Observable 契约规定的,任何流都是这样。一个 stream 要么完成(complete),要么出错(error),不可能两种情况同时出现,并且完成和出错都是可选的。
这个前提对于我们更好地理解 RxJS 错误处理至关重要。
subscribe 的 error 回调函数
第一种错误处理方式是在订阅 Observable 时传入错误处理函数。
我们最常用的订阅可能长这样:
但其实 subscribe 也可以传入一个对象,先来看一下 Observable subscribe 函数的定义:
可以看到除了传入一个函数,也可以传入 Partial<Observer<T>>
作为参数,再来看一下 Observer
的定义:
- next:每次流发出值都会调用这个函数,如果在订阅时只传入了一个函数,相当于只传了这个 next 函数。
- error:错误处理函数,仅在发生错误时调用,参数 err 就是 Observable 发出的错误。
- complete:完成处理函数,仅当流完成时才会调用。
先来看一个正常运行的例子:
运行结果如下:
这个流按顺序 emit 三个值出来,之后会 complete。再来看一下出错的情况:
运行结果如下:
可以看到,如果 Observable 报错,那么会调用我们的 error 回调函数,并且之后不会有任何值 emit 出来,更不会 complete。
这种错误处理方式可能是最常见的,但是在某些情况下并不适用。
比如,我们的 observable 任务是通过网络请求获取数据,当请求出错时想给一个默认值,但 observable 在发出错误的那一刻就终止了。
但别急,RxJS 已经给了方案 - 操作符(operators)。
catchError 操作符
在同步编程中,我们可以通过 try {} catch {}
块来包裹任何错误,然后在 catct
中处理错误。
这样处理错误非常简单,但是 JavaScript 中大部分操作都是异步的,比如远程请求一个 API,RxJS 中提供的 catchError
操作符可以帮助我们处理类似的场景。
catchError 原理
和其他操作符一样,catchError
是一个函数,接受一个 Observable,输出一个 Observable。每次调用 catchError
时需要传入一个错误处理函数。
catchError
操作符将一个可能出错的 Observable 作为输入,并发出和输入一样的 Observable。如果没有错误,catchError
产生的输出 Observable 与输入 Observable 完全相同。
但是如果发生了错误,catchError
就会生效,我们在 catchError
中传入的函数就会被调用,返回一个备用的 Observable,以便继续流程。
来看一个例子:
我们在订阅的代码中加了 catchError
操作符,先打印错误,然后返回了一个新的 Observable。
运行结果如下:
让我们逐步分析一下这段代码的执行过程,了解每次打印的内容:
-
subscriber.next(1);
:Observable 发出值1
,然后next
回调被调用,打印got value 1
。 -
subscriber.next(2);
:Observable 发出值2
,然后next
回调被调用,打印got value 2
。 -
subscriber.error('value cannot be greater than 2');
:在这里发生了错误,Observable 发出错误消息'value cannot be greater than 2'
,然后catchError
操作符捕获了这个错误,因此catchError
中的错误处理函数被调用,打印catchError: value cannot be greater than 2
。然后catchError
返回了一个备用的 Observable,of('default value')
。因此,订阅者会收到'default value'
。接下来不会再有任何值被发出,因为错误已经终止了 Observable。 -
订阅者收到了
'default value'
,然后next
回调被调用,打印got value default value
。 -
由于 Observable 已经终止,不会再有任何值被发出,因此
complete
回调被调用,打印done
。至此,Observable 的生命周期结束。
需要注意的是,我们在文章开头已经讲过,任何给定的流只能出错一次,发出错误通知后,Observable 就会终止执行,后续的订阅也不会再收到任何值。因此在这段代码中,我们在 step 4 和 5 中打印的内容,其实是自动订阅了
catchError
返回的新的 Observable,和原来的 Observable 已经没有关系了。
重新抛出错误
我们也可以在 catchError
中再次抛出错误,一个常见的情景是,在 catchError
做一些逻辑处理,然后把这个错误继续抛出去,这样我们在订阅时传入的 error 回调函数仍然会被执行。
只需要将 catchError
中的 return 部分改为 return throwError(() => err);
,运行结果如下:
可以看到,由于我们在捕获错误之后又抛出了错误,所以 catchError
和 error
函数都会执行。
多次抛出错误
如果有需要,我们也可以在同一个 Observable chain 中多次调用 catchError
。比如这样:
finalize 操作符
我们刚提到了 JavaScript 中同步的 try {} catch {}
,但别忘了后面还有一个 finally
代码块,finally
块中的代码无论如何都会被执行
RxJS 也为我们提供了类似功能:finalize
操作符(因为 finally 是关键字,所以叫 finalize)。
和 catchError
操作符一样,finalize
操作符也可以被调用多次。
运行结果如下:
重试策略
除了使用 catchError
给默认值,或者重新报错之外,我们还可以尝试另一种策略:重试(retry)。
比如由于服务器不稳定导致 API 调用失败,很可能我们重新调用一遍就成功了,这时候就可以尝试 retry。
重试的核心就是:如果流出错了,那就重新订阅流的来源,创建一个新的流。
当然没出错的话就和 catchError
一样,输入什么流就输出什么流。
什么时候重试
重试之前,有个问题需要明确:是否立即重试?如果要等待的话,等待多久重试?
为了回答这歌问题,我们需要一个辅助 Observable,我们将其称为 Notifier Observable,这个 Observable 用来决定什么时候尝试重试。
retryWhen
retryWhen
操作符用于在 Observable 发生错误时进行重试。它允许我们根据错误进行条件性的重试,而不是简单地无限重试。通过 retryWhen
,我们可以指定一个 observable,该 observable 控制重试的频率和条件。
来看一个官网的例子:
让我们逐步分析这段代码的执行过程:
-
const source = interval(1000);
:创建一个 Observable,每隔 1 秒发出一个递增的整数。 -
const result = source.pipe(...)
:创建一个新的 Observableresult
,对源 Observable 进行管道操作。 -
map
操作符用于转换源 Observable 发出的值。在这个示例中,当源 Observable 发出的值大于 3 时,会抛出一个错误。如果值小于或等于 3,则会返回原始值。 -
retryWhen
操作符用于在遇到错误时进行重试。它接收一个函数作为参数,该函数接收一个表示错误的 Observable,并返回一个 Observable,控制重试的行为。 -
在
retryWhen
中,errors
observable 发出了源 Observable 发出的错误值。tap
操作符用于在发生错误时打印错误消息。 -
result.subscribe((value) => console.log(value));
:订阅result
Observable,输出它发出的值。
所以,根据上述分析,当执行代码后,运行结果如下:
现在我们通过使用 retryWhen
实现了重试,试着观察打印出来的当前秒数可以看到,出错之后也是隔了一秒就发出了新的值,说明现在是立即重试的。
现在我们解决立即重试,那如果我们想等待一段时间重试该怎么办呢。这就用到了另一个操作符 - delayWhen
。
delayWhen
delayWhen
操作符用于在 Observable 发出值之前添加延迟。它会接收一个函数作为参数,该函数会接收源 Observable 发出的值,并返回一个 Observable,该 Observable 会发出一个值,用于指示要延迟的时间。
来看一个例子:
和立即重试的唯一区别就是在 retryWhen
加了一行代码:delayWhen(() => timer(3000))
,通过 delayWhen
操作符添加一个延迟来控制重试的频率,使用 timer
来设置每次重试之间的延迟为 3s。
运行结果如下:
通过观察可以看到,在打印错误消息之后,每次重试都延迟了 3s。
总结
RxJS 常用的错误处理方式差不多就是这些,简单分成三大类:
- 使用 subscribe 函数的 error 回调函数
- 使用
catchError
操作符 - 使用
retryWhen
操作符
在实际开发中,可以根据不同的业务场景选择不同的错误处理方式。