RxJava2 线程切换源码分析。原理解析
RxJava是一个异步框架,使用RxJava可以使代码逻辑更加清晰,避免过多的嵌套调用导致代码可读性变差。在实际项目中,由于主线程的绘制机制,我们经常会使用到线程的切换,将耗时的操作放在工作线程,将结果使用Handler返回给主线程处理。而使用RxJava我们可以很方便的实现这一切操作。
首先我们先看一个例子:
1 | Observable.create<Int> { |
首先大概说一下上述例子,可以看到首开始是创建了一个Observable然后发射了一个int类型的数据源,然后分别进行了两次map操作,最后交由Observer处理可以看到分别进行了三次线程切换操作,下面是上述例子的输出:
1 | TAG: 1 source |
可以看到,数据发射源,以及两个map都与运行在single()
线程,但是代码中确实也是切换了一次newThread(),但是没有生效,最后的Observer运行在main线程,说明observeOn生效了,这是为什么呢?下面我们带着这个问题去看看源码:
在开始源码阅读前,我们首先先说明一个概念,就是RxJava的事件流的订阅顺序实际上是自下而上的。按照上述例子来说,首先订阅的是observeOn -> map -> suscribeOn -> map -> subscribeOn -> ObservableCreate,为什么这么说呢?我们知道,真正的订阅是通过subscribe方法,那么先看一下subscribe的源码:
1 |
|
上面的代码就是subscribe的调用步骤,我们可以看出最后一步调用subscriActual(observer)
是真正执行订阅的方法:
1 | protected abstract void subscribeActual(Observer<? super T> observer); |
可以看到subscribeActual是一个抽象方法,它的实现在哪呢?在上述例子中,最后一步调用的是observeOn(AndroidSchedulers.mainThread()).subscribe()
,也就是说明,observeOn()的返回结果,实现了第一个subscribeActual,那么来看一下observeOn的实现:
1 |
|
可以看到observeOn的最后一行,调用了RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
:
1 |
|
从RxJavaPlugin.onAssembly()
是将传入的Observable通过一个转换函数转换为另一个Observable, 但我们这里没有定义这个转换函数,因此,最终得到的还是开始传入的参数ObservableObserveOn。所以observeOn返回的是一个ObservableObserveOn对象,也就是说subscribe首先执行了ObservableObserveOn
的subscribeActual
方法,那么我们首先看一下它的源码:
1 | public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { |
上面就是ObservableObserveOn的源码,我们可以看到这个类首先做的是将source保存起来,这个source就是事件流中上一步生成的Observable,在subscribeActual中,首先是生成了一个线程,这个就是我们想说的切换线程的关键步骤,然后调用source.subscribe去订阅事件流中上一步生成的Observable。这里就解释了为什么RxJava的事件流订阅顺序是自下而上的。
我们可以看到在source.subscribe
中传入了一个ObserverOnObserver,将所生成的线程作为参数传了进去。这个Observer就是Observable的观察者,我们看一下它的onNext方法:
1 |
|
从上面的源码来看,onNext通过调用worker.schedule()运行在worker的线程中,onError()、onComplete()都是一样。这样就实现了线程的切换,那么这个线程又是如何生成的呢?我们来看看scheduler.createWorker()
方法:
1 |
|
createWorker也是一个抽象方法,它的实现就是我们传入的AndroidSchedulers.mainThread()
:
1 | private static final class MainHolder { |
上述代码注释已经说明了生成主线程的步骤,值得注意的是,这里用到的是一个内部类的方式生成的实际上是一个单例的HandlerScheduler对象。当我们知道了在上面的提到的ObservableObserveOn中的worker是HandlerScheduler对象之后,我们来分别看看createWorker()和worker.schedule(this)方法在HandlerScheduler中的实现即可:
createWorker():
1 |
|
由于createWorker返回了一个HandlerWorker对象,因此schedule的实现也就是在HandlerWorker中:
1 |
|
上述代码就是将我们传入的run方法的实现(ObservableObserveOn)传递给了ScheduleRunnable,而ScheduleRunnable也实现了Runnable接口:
1 |
|
到这里可以看出来,真正执行的是ObservableObserveOn.ObserveOnObserver
中的run方法:
1 |
|
到此就完成了一个订阅的流程,那么我们可以总结一下observeOn的工作顺序:
- 首先是调用Observable的subscribeActual方法
- 根据observeOn传入的参数类型,创建不同的线程,newThread()、singleThread()、computation、io()、mainThread,代表了可以生成不同的线程。
- 然后在Observer的onNext中调用workerThread的schedule方法,这个方法会调用到Observer中的run方法,然后在workerThread所在线程调用onNext(),从而实现了线程的切换。
针对subscribeOn来说,流程和observeOn基本差不多,只是区别在于subscribeOn改变的是数据源的运行线程,而Observer是切换Observer所在的线程,这一点我们可以在subscribeOn的源码中看出来:
1 |
|
这里调用了一个scheduleDirect方法并且创建了一个SubscribeTask(parent)
:
1 |
|
scheduleDirect的工作就是创建了指定的线程,并且调用了schedule方法。然后再看看SubscribeTask:
1 | final class SubscribeTask implements Runnable { |
从这里看出source.subscribe(parent)
通过run()运行在指定线程中,从而实现了上游数据源的线程切换,而由于RxJava是由下而上订阅的顺序,因此subscribeOn只有第一个指定的切换有效,多次设置是无效的。而observeOn则是每次切换都有效。