RxJava操作符大全
本文基于RxJava 2.0文档,Github地址:
https://github.com/ReactiveX/RxJava
注意查看RxJava 1.x 与 2.x的不同
1 RxJava2的简单使用
先上一段示例代码
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.e(TAG, "subscribe thread = " + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
emitter.onComplete();
.observeOn(Schedulers.io())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
Log.e(TAG, "apply thread = " + Thread.currentThread().getName() + ", integer = " + integer);
return String.valueOf(integer);
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.newThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe thread = " + Thread.currentThread().getName());
@Override
public void onNext(String s) {
Log.e(TAG, "onNext thread = " + Thread.currentThread().getName() + ", String = " + s);
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError thread = " + Thread.currentThread().getName() + ", Throwable = " + e);
@Override
public void onComplete() {
Log.e(TAG, "onComplete thread = " + Thread.currentThread().getName());
输出日志:
E/RxJavaActivity: onSubscribe thread = main
E/RxJavaActivity: subscribe thread = main
E/RxJavaActivity: apply thread = RxCachedThreadScheduler-1, integer = 1
E/RxJavaActivity: apply thread = RxCachedThreadScheduler-1, integer = 2
E/RxJavaActivity: apply thread = RxCachedThreadScheduler-1, integer = 3
E/RxJavaActivity: apply thread = RxCachedThreadScheduler-1, integer = 4
E/RxJavaActivity: onNext thread = RxNewThreadScheduler-1, String = 1
E/RxJavaActivity: onNext thread = RxNewThreadScheduler-1, String = 2
E/RxJavaActivity: onNext thread = RxNewThreadScheduler-1, String = 3
E/RxJavaActivity: onNext thread = RxNewThreadScheduler-1, String = 4
E/RxJavaActivity: onComplete thread = RxNewThreadScheduler-1
这就是RxJava使用的三部曲 1. 创建
Observable
创建
Observable
时,回调的是
ObservableEmitter
,即发射器,用于发射数据(
onNext
)和通知(
onError/onComplete
) 2. 创建
Observer
创建的
Observer
中有一个回调方法
onSubscribe
,传递参数为
Disposable
,可用于解除订阅。 3. 建立订阅关系
observable.subscribe(observer)
RxJava2中仍然保留了其他简化订阅方法,我们可以根据需求,选择相应的简化订阅(
Consumer
)。
同时,RxJava2引入了新的类
Flowable
,专门用于应对背压(backpressure)问题,但这并不是RxJava2.x中新引入的概念。所谓背压,即生产者的速度大于消费者的速度带来的问题,比如在Android中常见的点击事件,点击过快则会造成点击两次的效果。
在RxJava2.x中将其独立了出来,取名为
Flowable
。因此,
Observable
已经不具备背压处理能力。
关于backpressure,官方文档地址
1
2
2 RxJava中的操作符
当阻塞的
Observables
执行完成后,其他代码才能执行.
-
forEach
- invoke a function on each item emitted by the Observable; block until the Observable completes
-
first/firstOrDefault
- block until the Observable emits an item, then return the first item emitted by the Observable or a default item if the Observable did not emit an item
-
last/lastOrDefault
- block until the Observable completes, then return the last item emitted by the Observable or a default item if there is no last item
-
mostRecent
- returns an iterable that always returns the item most recently emitted by the Observable
-
next
- returns an iterable that blocks until the Observable emits another item, then returns that item
-
latest
- returns an iterable that blocks until or unless the Observable emits an item that has not been returned by the iterable, then returns that item
-
single
- if the Observable completes after emitting a single item, return that item, otherwise throw an exception
-
singleOrDefault
- if the Observable completes after emitting a single item, return that item, otherwise return a default item
-
toFuture
- convert the Observable into a Future
-
toIterable
- convert the sequence emitted by the Observable into an Iterable
-
getIterator
- convert the sequence emitted by the Observable into an Iterator
示例代码
Observable.just(1, 2, 3).observeOn(Schedulers.io()).blockingForEach(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, integer + " - " + Thread.currentThread().getName());
Log.e(TAG, "done - " + Thread.currentThread().getName());
结果
02-12 16:33:13.083 7539-7539/? E/RxJavaActivity: 1 - main
02-12 16:33:13.083 7539-7539/? E/RxJavaActivity: 2 - main
02-12 16:33:13.083 7539-7539/? E/RxJavaActivity: 3 - main
02-12 16:33:13.087 7539-7539/? E/RxJavaActivity: done - main
注意
: RxJava2中针对此部分有了变化:
toBlocking().y - inlined as blockingY() operators, except toFuture
也就是说,在RxJava2中使用上述操作符,应该是这样的
Observable.just(...).blockingForEach
。即使用时加上前缀
blockingXXX
-
startWith
- emit a specified sequence of items before beginning to emit the items from the Observable
-
merge
- combine multiple Observables into one
-
mergeDelayError
- combine multiple Observables into one, allowing error-free Observables to continue before propagating errors
-
zip
- combine sets of items emitted by two or more Observables together via a specified function and emit items based on the results of this function
-
combineLatest
- when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function
-
join and groupJoin
- combine the items emitted by two Observables whenever one item from one Observable falls within a window of duration specified by an item emitted by the other Observable
如果一个Observable发射了一条数据,只要在另一个Observable发射的数据定义的时间窗口内,就结合两个Observable发射的数据,然后发射结合后的数据。
目标Observable和源Observable发射的数据都有一个有效时间限制,比如目标发射了一条数据(a)有效期为3s,过了2s后,源发射了一条数据(b),因为2s<3s,目标的那条数据还在有效期,所以可以组合为ab;再过2s,源又发射了一条数据(c),这时候一共过去了4s,目标的数据a已经过期,所以不能组合了…
-
switchOnNext
- convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently emitted of those Observables
将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些Observables最近发射的数据项。
Switch订阅一个发射多个Observables的Observable。它每次观察那些Observables中的一个,Switch返回的这个Observable取消订阅前一个发射数据的Observable,开始发射最近的Observable发射的数据。
注意:当原始Observable发射了一个新的Observable时(不是这个新的Observable发射了一条数据时),它将取消订阅之前的那个Observable。这意味着,在后来那个Observable产生之后,前一个Observable发射的数据将被丢弃(就像图例上的那个黄色圆圈一样)。
组合操作例子:
Observable.just(1, 2, 3)
.startWith(0)
.subscribe(new Consumer<Integer>() {