导图社区 RxJava2.0
RxJava2.0操作符整理,有用到的可以参考
编辑于2019-06-11 19:10:47RxJava2.0
基础知识
五种观察者模式
Onservable和Onserver(能够发射0或n个数据,并以成功或者错误事件终止)
Hot Observable & Cold Observable
Hot Observable(无论有没有观察者进行订阅,时间始终都会发生,一对多关系,可以与多个订阅者共享信息 )
Cold Observable(只有订阅了,才开始执行发射数据流的代码,一对一关系,当有多个Observer的时候,他们的事件是各自独立的。)
Flowable和Subscriber(能够发射0或n个数据,并以成功或者错误事件终止,支持背压,可以控制数据源发射的速度)
Single和SingleObserver(只发射单个数据或错误事件)
Compleable和CompletableObserver(从来不发射数据,只处理onComplete和onError事件,可以看成Rx的Runnable)
Maybe和MaybeObserver(能够发射0或者1个数据,要么成功,要么失败,有点类似Optional)
Observable
RxJava使用三步骤
1、创建Observable
2、创建Observer
3、使用Subscribe()进行订阅
Subscribe(onNext)
Subscribe(onNext,onError)
Subscribe(onNext,onError,onComplete)
Subscribe(onNext,onError,onComplete,onSubscribe)
do操作符
doOnSubscribe(一旦观察者订阅了Observable,他就会被调用)
doObLifecycle(可以在观察者订阅之后,设置是否取消订阅)
doOnNext(它产生的Observable每发射一项数据就会调用它一次,它的Consumer接受发射的数据项。一般用于在Subscribe之前对数据进行处理)
doOnEach(它产生的Observable每发射一项数据就会调用它一次,不仅包括onNext,还包括onError和onCompleted)
doAfterNext(在onNext之后执行,而doOnNext是在onNext之前执行)
doOnComplete(当它的Observable在正常终止调用onComplete时会被调用)
doFinally(当它产生的Observable终止之后会被调用,无论是正常终止还是异常终止。的onFinlly有限于doAfterTerminate的调用)
doAfterTerminte(注册一个Action,当Observable调用onComplete或者onError时触发)
创建操作符
just() 将一个或多个对象转换成发射这个或这些对象的一个Observable。
form() 将一个Iterable、一个Future或者一个数组转换成一个Observable。
create() 使用一个函数从头创建一个Observable。
defer() 只有当订阅者订阅才创建Observable,为每个订阅创建一个新的Observable。
range() 创建一个发射指定范围的整数序列的Observable。
interval() 创建一个按照给定的时间间隔发射整数序列的Observable。
timer() 创建一个在给定延迟之后发射单个数据的Observable。
empty() 创建一个什么都不做直接通知完成的Observable。
error() 创建一个什么都不做直接通知错误的Observable。
never() 创建一个不发射任何数据的Observable。
repeat() 创建一个发射特定数据重复多次的Observable。
线程操作
调度器(Scheduler)的种类
single 使用定长为1的线程池(new Scheduled Thread Pool(1),重复利用这个线程)
newThread 每次都启用新的线程,并在新线程中执行操作
computation 使用固定的线程池(Fix Scheduler Pool)大小为CPU核数,适用于CPU密集型计算
io 适合I/O操作(读写文件。读写数据库,网络信息交互等)所使用的Scheduler。行为模式和newThread()差不多,区别在于io()的内部实现是一个无数量上线的线程池,可以重用空闲的线程,因此多数情况下,io比newThread()更有效率。
trampoline 直接在当前线程运行,如果当前线程有任务正在执行,则会先暂停其他任务
Schedulers 将java.concurrent.Executor转换成一个调度器实例,即可以自定义一个Executor来作为调度器
变换操作符
map() 对序列的每一项都用一个函数来变换Observable发射的数据序列
flatMap() 、concatMap()、flatMapIterable() 将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable中
switchMap() 将Observable发射的数据集合变换为Observable集合,然后只发射这些Observable最近发射过的数据
scan() 对Observable发射的每一项数据应用一个函数,然后按次序依次发射每一个值
groupBy() 将Observable拆分为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射过一组不同的数据
buffer() 定期从Observable收集数据收集到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
window() 定期将来自Observable的数据拆分成一些Observable窗口,然后发射这些窗口,而不是每次发射一项
cast() 在发射之前强制将Observable发射的所有数据转换为指定类型
过滤操作符
filter() 过滤数据
takeLast() 只发射最后的N项数据
last() 只发射最后一项数据
lastOrDefault() 只发射最后一项数据,如果Observable为空,就发射默认值
takeLastBuffer() 将最后的N项数据当做单个数据发射
skip() 跳过开始的N项数据
skipLast() 跳过最后的N项数据
take() 只发射开始的N项数据
first()、takeFirst() 只发射第一项数据,或者满足某种条件的第一项数据
firstOrDefault() 只发射第一项数据,或者满足某种条件的第一项数据
elementAt() 发射第N项数据
elementAtOrDefault() 发射第N项数据,如果Observable数据少于N项,就发射默认值
sample()、throttleLast()定期发射Observable最近的数据
throttleFirst() 定期发射Observable发射的第一项数据
throttleWithTimeout() 、debounce() 只有当Observable在指定的时间段吼还没有发射数据时,才发射一个数据
timeout() 、如果在一个指定的时间段内后还没发射数据,就发射一个异常
distinct() 过滤掉重复的数据
distinctUnitChanged() 过滤掉连续重复的数据
ofType() 只发射指定类型的数据
ignoreElements() 丢弃所有的正常数据只发射错误或完成通知
条件操作符
amb() 给定多个Observable,只让第一个发射数据的Observable发射全部数据
defaultIfEmpty() 发射来自原始Observable的数据,如果原始Observable没有发射数据,则发射一个默认数据
skipUntil() 丢弃原始Observable的数据,如果原始Observable没有发射数据,则发射一个默认数据
skipWhile() 丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据
takeUntil() 发射来自原始Observable的数据,直到第二个Observable发射了一个数据或一个通知
takeWhile() 、takeWhileWithIndex() 发射袁女士Observable的数据,直到一个特定的条件为真,然后跳过剩余的数据
布尔型操作符
all() 判断是否所有的数据项都满足某个条件
contains() 判断Observable是否会发射一个特定的值
exists() 、isEmpty() 判断Observable是否发射了一个值
sequenceEqual() 判断两个Observable发射的序列是否相等
合并操作符
startWith() 在数据序列的开头增加一项数据
merge() 将多个Observable 合并为一个
mergeDelayError() 合并多个Observable,让没有错误的Observable都完成后再发射错误通知
zip() 使用一个函数组合多个Observable发射的数据集合,然后发射这个结果
combineLatest() 当两个Observable中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据)然后发射这个函数的结果
join()、groupJoin() 无论何时,如果一个Observable发射了一个数据项,就需要在另一个Observable发射的数据项顶一顶额时间窗口内,将两个Observable发射的数据合并发射
switchOnNexr() 将一个发射Observable色Observable转换成另一个Observable,后者发射这些Observable最近能发射的数据
连接操作符
ConnectableObservable.connect() 指示一个可连接的Observable开始发射数据
Observable.pubilsh() 将一个Observable转换为一个可连接的Observable
Observable.replay() 确保所有的订阅者看到相同的数据序列,即使他们在Observable开始发射数据之后才订阅
ConnectableObservable.refConnt() 让一个可连接的Observable表现的像一个普通的Observable