RxJava3

Single.just的工作流程

var single = Single.just("1")//被观察对象
single = single.subscribeOn(Schedulers.io())
  single.observeOn(AndroidSchedulers.mainThread())
    .subscribe(object: SingleObserver<MutableList<Repo>?> {//内部为观察者,subscribe表示关联
        override fun onSuccess(repos: MutableList<Repo>?) {
        
        }
        override fun onSubscribe(d: Disposable?) {
            //    网络请求前会调用 Disposable可以让上游停止工作
        }
        override fun onSuccess(e: Throwable) {
        
        }
    })

Single.just

RxJavaPlugins.onAssembly(new SingleJust<T>(item))

onAssembly是一个钩子函数,可以对传入数据加工后再传回,默认啥也不干

SingleJust是Single的一个子类

#SingleJust#
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {//核心方法 在single.subscribe时最后就会进入这里
    observer.onSubscribe(Disposables.disposed());//结束标记,因为Single.just瞬间就会结束
    observer.onSuccess(value);
}

操作符

map

.map(new Function<String, String>() {
    @Override
    public String apply(@NonNull String s) throws Exception {
        return "2";
    }
})

map里传入mapper对象

.map时会生成一个新的SingleMap对象传入single自己和mapper

.subscribe –> subscribeActual –> new MapSingleObserver<T, R>(t, mapper)

MapSingleObserver在onSuccess时会调用 新数据 = mapper.apply(原始数据)

最后SingleObserver.onSuccess(新数据)

Disposable

核心问题在于如何取消的

分类:

  1. 有没有来自上游的后续任务,也就是非sigle,有多个任务,这种情况要通知上游取消任务
  2. 自身是否生产任务,比如自己是最上游,或自身有延时,这种情况要取消自身任务

interval

无上游有延时

停止倒计时,停止向下发送事件

#ObservableInterval#
@Override
public void subscribeActual(Observer<? super Long> observer) {
    IntervalObserver is = new IntervalObserver(observer);
    observer.onSubscribe(is);

    Scheduler sch = scheduler;//定时器

    if (sch instanceof TrampolineScheduler) {
        Worker worker = sch.createWorker();//主线程会进入这里,一般不用
        is.setResource(worker);
        worker.schedulePeriodically(is, initialDelay, period, unit);
    } else {
        Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);//开始计时
        is.setResource(d);
    }
}
#ObservableInterval#
  //即在内部维护了定时器,又是定时向下发送的地方,所以即实现了Disposable接口又继承AtomicReference<Disposable>
static final class IntervalObserver
extends AtomicReference<Disposable>
implements Disposable, Runnable {  //注意这里的实现,接口说明了自己是diposable,而继承则可以真正使用disposable的功能

    private static final long serialVersionUID = 346773832286157679L;

    final Observer<? super Long> downstream;

    long count;

    IntervalObserver(Observer<? super Long> downstream) {
        this.downstream = downstream;
    }

    @Override
    public void dispose() {
        DisposableHelper.dispose(this);//这里的this就是来自继承的AtomicReference<Disposable>
    }

    @Override
    public boolean isDisposed() {
        return get() == DisposableHelper.DISPOSED;
    }

    @Override
    public void run() {
        if (get() != DisposableHelper.DISPOSED) {
            downstream.onNext(count++);
        }
    }

    public void setResource(Disposable d) {
        DisposableHelper.setOnce(this, d);//把IntervalObserver里的disposable设置为当前的
    }
}

Single.map

有上游无延时,直接拿上游的

Single.delay

有上游有延时

#SingleDelay#
protected void subscribeActual(final SingleObserver<? super T> observer) {
    final SequentialDisposable sd = new SequentialDisposable();//一个空的Disposable
    observer.onSubscribe(sd);
    source.subscribe(new Delay(sd, observer));
}
final class Delay implements SingleObserver<T> {
    private final SequentialDisposable sd;
    final SingleObserver<? super T> downstream;

    Delay(SequentialDisposable sd, SingleObserver<? super T> observer) {
        this.sd = sd;
        this.downstream = observer;
    }

    @Override
    public void onSubscribe(Disposable d) {
        sd.replace(d);//把自己的Disposable替换为上游的
    }

    @Override
    public void onSuccess(final T value) {
        sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit));//启用计时器并置换
    }

    @Override
    public void onError(final Throwable e) {
        sd.replace(scheduler.scheduleDirect(new OnError(e), delayError ? time : 0, unit));
    }
}

Observable.map

无延迟有后续

MapObserver<T, U> extends BasicFuseableObserver<T, U>

#BasicFuseableObserver
public final void onSubscribe(Disposable d) {
    if (DisposableHelper.validate(this.upstream, d)) {//对上游下游的Disposable做合规判断

        this.upstream = d;//传给上游
        if (d instanceof QueueDisposable) {
            this.qd = (QueueDisposable<T>)d;
        }

        if (beforeDownstream()) {

            downstream.onSubscribe(this);//调动下游

            afterDownstream();
        }

    }
}

Observable.delay

延时有下游,把自己的Disposable传给上游同时自己执行延时,然后调动下游

切换线程

subscribeOn

#SingleSubscribeOn#
protected void subscribeActual(final SingleObserver<? super T> observer) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer, source);
    observer.onSubscribe(parent);

    Disposable f = scheduler.scheduleDirect(parent);//线程发生切换的地方,scheduler内部维护了一个线程池工具

    parent.task.replace(f);

}
static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {

    @Override
    public void run() {
        source.subscribe(this);//这里线程就已经发生变化了,所以从这个往上所有的操作符线程都会变
    }
}

observeOn

在onSuccess的地方切换,后面的所有Observer都会变为此线程,使用mainhandler.sendMessage切换成主线程

总结

原始的被观察者(Single)每经过一个操作符(map)就会生成一个新的继承于观察者的新对象(SingleMap),同时把自身传入,这个对象(SingleMap)内部会有一个subscribeActual方法,在subscribe(也就是调用链的最后)时被调用,这个方法会生成并subscribe一个内部Observer(MapSingleObserver)的对象,这个对象里有subscribe进来的observer,然后会在这个方法里用上层对象调用subscribe并重复操作知道最顶层,最顶层不会生成内部Observer,而是会直接调用onSuccess方法传递数据,然后数据经历一层层的Observer一直传输到最后。

总体是一个蛇形的调用链,从上到下(生成各种被观察者实例),从下在到上(在subscribe后依次生成观察者实例)然后再从上到下(数据流)