RxJava3
RxJava3
Single.just的工作流程
var single = Single.just("1")//被观察对象
single = single.subscribeOn(Schedulers.io())
single.observeOn(AndroidSchedulers.mainThread())
.subscribe(object: SingleObserver<MutableList<Repo>?> {//内部为观察者,subscribe表示关联
override fun onSuccess(repos: MutableList<Repo>?) {
}
override fun onSubscribe(d: Disposable?) {
// 网络请求前会调用 Disposable可以让上游停止工作
}
override fun onSuccess(e: Throwable) {
}
})
Single.just
RxJavaPlugins.onAssembly(new SingleJust<T>(item))
onAssembly是一个钩子函数,可以对传入数据加工后再传回,默认啥也不干
SingleJust是Single的一个子类
#SingleJust#
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {//核心方法 在single.subscribe时最后就会进入这里
observer.onSubscribe(Disposables.disposed());//结束标记,因为Single.just瞬间就会结束
observer.onSuccess(value);
}
操作符
map
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
return "2";
}
})
map里传入mapper对象
.map时会生成一个新的SingleMap对象传入single自己和mapper
.subscribe –> subscribeActual –> new MapSingleObserver<T, R>(t, mapper)
MapSingleObserver在onSuccess时会调用 新数据 = mapper.apply(原始数据)
最后SingleObserver.onSuccess(新数据)
Disposable
核心问题在于如何取消的
分类:
- 有没有来自上游的后续任务,也就是非sigle,有多个任务,这种情况要通知上游取消任务
- 自身是否生产任务,比如自己是最上游,或自身有延时,这种情况要取消自身任务
interval
无上游有延时
停止倒计时,停止向下发送事件
#ObservableInterval#
@Override
public void subscribeActual(Observer<? super Long> observer) {
IntervalObserver is = new IntervalObserver(observer);
observer.onSubscribe(is);
Scheduler sch = scheduler;//定时器
if (sch instanceof TrampolineScheduler) {
Worker worker = sch.createWorker();//主线程会进入这里,一般不用
is.setResource(worker);
worker.schedulePeriodically(is, initialDelay, period, unit);
} else {
Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);//开始计时
is.setResource(d);
}
}
#ObservableInterval#
//即在内部维护了定时器,又是定时向下发送的地方,所以即实现了Disposable接口又继承AtomicReference<Disposable>
static final class IntervalObserver
extends AtomicReference<Disposable>
implements Disposable, Runnable { //注意这里的实现,接口说明了自己是diposable,而继承则可以真正使用disposable的功能
private static final long serialVersionUID = 346773832286157679L;
final Observer<? super Long> downstream;
long count;
IntervalObserver(Observer<? super Long> downstream) {
this.downstream = downstream;
}
@Override
public void dispose() {
DisposableHelper.dispose(this);//这里的this就是来自继承的AtomicReference<Disposable>
}
@Override
public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}
@Override
public void run() {
if (get() != DisposableHelper.DISPOSED) {
downstream.onNext(count++);
}
}
public void setResource(Disposable d) {
DisposableHelper.setOnce(this, d);//把IntervalObserver里的disposable设置为当前的
}
}
Single.map
有上游无延时,直接拿上游的
Single.delay
有上游有延时
#SingleDelay#
protected void subscribeActual(final SingleObserver<? super T> observer) {
final SequentialDisposable sd = new SequentialDisposable();//一个空的Disposable
observer.onSubscribe(sd);
source.subscribe(new Delay(sd, observer));
}
final class Delay implements SingleObserver<T> {
private final SequentialDisposable sd;
final SingleObserver<? super T> downstream;
Delay(SequentialDisposable sd, SingleObserver<? super T> observer) {
this.sd = sd;
this.downstream = observer;
}
@Override
public void onSubscribe(Disposable d) {
sd.replace(d);//把自己的Disposable替换为上游的
}
@Override
public void onSuccess(final T value) {
sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit));//启用计时器并置换
}
@Override
public void onError(final Throwable e) {
sd.replace(scheduler.scheduleDirect(new OnError(e), delayError ? time : 0, unit));
}
}
Observable.map
无延迟有后续
MapObserver<T, U> extends BasicFuseableObserver<T, U>
#BasicFuseableObserver
public final void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {//对上游下游的Disposable做合规判断
this.upstream = d;//传给上游
if (d instanceof QueueDisposable) {
this.qd = (QueueDisposable<T>)d;
}
if (beforeDownstream()) {
downstream.onSubscribe(this);//调动下游
afterDownstream();
}
}
}
Observable.delay
延时有下游,把自己的Disposable传给上游同时自己执行延时,然后调动下游
切换线程
subscribeOn
#SingleSubscribeOn#
protected void subscribeActual(final SingleObserver<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer, source);
observer.onSubscribe(parent);
Disposable f = scheduler.scheduleDirect(parent);//线程发生切换的地方,scheduler内部维护了一个线程池工具
parent.task.replace(f);
}
static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
@Override
public void run() {
source.subscribe(this);//这里线程就已经发生变化了,所以从这个往上所有的操作符线程都会变
}
}
observeOn
在onSuccess的地方切换,后面的所有Observer都会变为此线程,使用mainhandler.sendMessage切换成主线程
总结
原始的被观察者(Single)每经过一个操作符(map)就会生成一个新的继承于观察者的新对象(SingleMap),同时把自身传入,这个对象(SingleMap)内部会有一个subscribeActual方法,在subscribe(也就是调用链的最后)时被调用,这个方法会生成并subscribe一个内部Observer(MapSingleObserver)的对象,这个对象里有subscribe进来的observer,然后会在这个方法里用上层对象调用subscribe并重复操作知道最顶层,最顶层不会生成内部Observer,而是会直接调用onSuccess方法传递数据,然后数据经历一层层的Observer一直传输到最后。
总体是一个蛇形的调用链,从上到下(生成各种被观察者实例),从下在到上(在subscribe后依次生成观察者实例)然后再从上到下(数据流)