看啥推荐读物
专栏名称: 易冬
OpenCV or Android
目录
相关文章推荐
鸿洋  ·  Android Jetpack ...·  1 周前  
今天看啥  ›  专栏  ›  易冬

RxJava2小白手册(2)- 线程管理和流程浅析

易冬  · 掘金  · android  · 2018-01-06 08:22

介绍

承接上文,结合使用场景,讨论一下如何告别AsyncTask,就是因为RxJava的强大线程管理功能。

举个栗子

认识RxJava之前,我们处理异步任务的方式主要有两种:
1. AsyncTask
2. Thread + Runnable.
涉及的代码量相比较RxJava而言打太多,针对Handler处理不好,可能存在内存泄漏的风险。不赘述,看看如何使用RxJava处理异步任务。

1. 异步处理

1.1 代码示例

Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        Logger("Emit 111");
        e.onNext(111);
        Logger("Emit 222");
        e.onNext(222);
        Logger("Emit onComplete");
        e.onComplete();

    }
});

Observer<Integer> observer = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Logger("onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Logger("onNext integer = " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Logger("onError e = " + e.getMessage());
    }

    @Override
    public void onComplete() {
        Logger("onComplete");
    }
};
observable.subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(observer);

1.2 运行结果

这里写图片描述
可以清楚的看到,Observer下的操作是在主线程下完成的,而Observable下发射器的发射动作却是在一个新的线程中完成的。通过这种操作,我们可以在subscribe方法中执行耗时操作,然后结果用过onNext()方法返回给主线程,实现异步处理的目的。
常用的场景:访问数据库,网络请求数据,后台计算操作等等。

1.3 Schedulers 和AndroidSchedulers

AndroidSchedulers是RxAndroid中的线程调度器,主要用途如上所示,AndroidSchedulers.mainThread,代表Android中的主线程(UI线程)。

方法 解释
Schedulers.computation() 用于计算任务
Schedulers.from(Executor executor) 使用指定的Executor作为调度器
Schedulers.io() 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长
Scheduler.newThread() 为每个任务创建一个新线程
Scheduler.shutdown() 停止调度器
Scheduler.single() 单独线程
Scheduler.start() 启动调度器
Scheduler.trampoline() 在当前线程中,但是会等到当前线程任务执行完毕之后再去执行
AndroidScheduler.mainThread() 主线程

看看源码

1. Observable.create()

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

先执行非空检查,然后通过ObservableCreate来创建Observable。而ObservableCreate继承Observable。看下代码

//ObservableCreate继承Observable
public final class ObservableCreate<T> extends Observable<T> {
    //ObservableOnSubscribe接口只有一个subscribe方法
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        //赋值,结合上面Observable的create方法,这个source应该是我们new出来的ObservableOnSubscribe
        this.source = source;
    }

    //这个方法在Observable执行subscribe(Observer)的时候使用到
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //创建CreateEmitter,传入observer,内部使用
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //执行observer的onSubscribe方法,parent是CreateEmitter,实现了Disposable,和我们创建Observer时实现的onSubscribe方法一致,没毛病
        observer.onSubscribe(parent);

        try {
            //执行subscribe方法,source为ObservableOnSubscribe对象,parent为CreateEmitter,而CreateEmitter实现ObservableEmitter接口,没毛病
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            //CreateEmitter执行onError方法
            parent.onError(ex);
        }
    }

    //CreateEmitter类继承AtomicReference<Disposable>实现ObservableEmitter和Disposable 接口
    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        //创建过程中传入的Observer
        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        //OnNext方法
        @Override
        public void onNext(T t) {
            //非空检查,onNext在2.0之后允许传入null值作为参数
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            //这个对应上我们的上一篇博客,一次性水管,如果isDisposed为true,则发射器发出的事件,将不会被观察者执行
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        //onError方法,当tryOnErro返回false的时候,执行RxJavaPlugins.onError(t),何时tryOnError会返回false呢?看下面
        @Override
        public void onError(Throwable t) {
            //当isDisposed()为true后,会执行RxJavaPlugins.onError(t)操作,也就是说如果在isDisposed()为true的情况下,发射器还发出onError()事件,回导致程序崩溃。具体看下面的运行示例。
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            //也是不允许传入null
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            //如果isDisposed为false,执行观察者的onError方法,然后执行dispose()操作,也就是观察者不处理后面发射器发送的事件了。估计onComplete()方法中也会有类似的操作流程
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            //只有当isDisposed为true的时候回返回false,也就是上一个方法回执行RxJavaPlugins.onError(t);操作
            return false;
        }

        @Override
        public void onComplete() {
            //和上面onError()操作类似,不同的是没有非空检查,因为onComplete没有参数。
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
        ……
    }
    ……
    //这部分介绍的是SerializedEmitter,暂无涉及
}

举例说明,isDisposed()为true只有,发射器继续发送onError事件回导致程序崩溃。

 Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Logger("Emit 111");
                e.onNext(111);
                Logger("Emit 222");
                e.onNext(222);
                Logger("Emit 333");
                e.onNext(333);
                Logger("Emit onError");
                e.onError(new Throwable("Test Disposable onError"));

            }
        });

        Observer<Integer> observer = new Observer<Integer>() {
            Disposable mDisposable;
            @Override
            public void onSubscribe(Disposable d) {
                Logger("onSubscribe");
                mDisposable = d;
            }

            @Override
            public void onNext(Integer integer) {
                Logger("onNext integer = " + integer);
                if(mDisposable!=null && !mDisposable.isDisposed() && integer == 222) {
                    mDisposable.dispose();
                }
            }

            @Override
            public void onError(Throwable e) {
                Logger("onError e = " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Logger("onComplete");
            }
        };
        observable.subscribe(observer);

运行结果:
这里写图片描述

2. Observer

创建Observer,无甚特殊,注意onSubscribe,onNext,onError传入的参数不能为空以及Disposable 的使用。

public interface Observer<T> {
    void onSubscribe(@NonNull Disposable d);

    void onNext(@NonNull T t);

    void onError(@NonNull Throwable e);

    void onComplete();
}

3. observable.subscribe(observer);

分析一下subscribe方法

    public final void subscribe(Observer<? super T> observer) {
        //observer非空检查
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            //关联observable和observer
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            //这个方法在Observable中是个抽象方法,但是结合上面Observerable的create过程,可以知道这里实际上调用的是ObservableCreate的subscribeActual方法,也就是上面我们分析的过程,没毛病
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

一路看下来,我们就可以很快的和我们做的测试对应上,先调用onSubscribe方法,然后执行subscribe方法中的发射器操作,根据发射器的操作Observer作出对应的处理。

4. subscribeOn

subscribeOn用来指定Observable在哪个线程执行自己的subscribe方法。

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        //scheduler非空检查
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

创建了一个ObservableSubscribeOn,这个类千万别和上面创建Observable过程中使用的ObservableOnSubscribe接口弄混淆,结合当前操作为subscribeOn来记住这个类名。

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

Observerable继承自ObservableSource,所以创建ObservableSubscribeOn的时候Observable和scheduler传递过来。
ObservableSubscribeOn继承
AbstractObservableWithUpstream,而后者又继承Observable。所以实际上经过subscribeOn操作之后,后续操作的对象从Observerable变成了ObservableSubscribeOn,所以,当后面执行subscribe时执行的subscribeActual方法为ObservableSubscribeOn重的对应方法

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        //封装Observer,实际Observer由其内部actual维护
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        //Observer执行onSubscribe方法,SubscribeOnObserver实现Disposable接口,所以上面的例子中onSubscribe传递的是Disposable类型
        s.onSubscribe(parent);
//这里有3个操作:
//1. 创建SubscribeTask,传入封装后的Observer
//2. 调度器执行scheduleDirect操作
//3. 封装后的Observer执行setDisposable操作
 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

关注 scheduler.scheduleDirect(new SubscribeTask(parent)),先看下

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

实现Runnable,终于看到一个有点熟悉的东西。传入的parent为封装后的Observer。而source则是创建ObservableSubscribeOn过程中传入的Observable。 在看scheduleDirect方法之前,我们得先弄清楚这个scheduler是个什么东西?在Schedulers类中,不同的Scheduler已经初始化完成。

    static {
        SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

        COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

        IO = RxJavaPlugins.initIoScheduler(new IOTask());

        TRAMPOLINE = TrampolineScheduler.instance();

        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }

就看下NEW_THREAD 这个,首先new NewThreadTask()

    static final class NewThreadTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return NewThreadHolder.DEFAULT;
        }
    }
    static final class NewThreadHolder {
        static final Scheduler DEFAULT = new NewThreadScheduler();
    }
public final class NewThreadScheduler extends Scheduler {

    final ThreadFactory threadFactory;

    private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
    private static final RxThreadFactory THREAD_FACTORY;

    /** The name of the system property for setting the thread priority for this Scheduler. */
    private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";

    static {
        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));

        THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
    }

    public NewThreadScheduler() {
        this(THREAD_FACTORY);
    }

    public NewThreadScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }

    @NonNull
    @Override
    public Worker createWorker() {
        return new NewThreadWorker(threadFactory);
    }
}

看到了ThreadFactory,RxThreadFactory,而RxThreadFactory实现了ThreadFactory接口,所以最好还是线程的使用,只是RxJava对这些基础的东西做了深度的封装和流程上的优化,让我们更方便的使用。 回溯到上面的scheduleDirect方法,

    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        //创建工作线程,以NewThreadScheduler为例,是创建NewThreadWorker
        final Worker w = createWorker();
        //
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        //封装出一个带Dispose的Task,方便控制
        DisposeTask task = new DisposeTask(decoratedRun, w);
        //以NewThreadScheduler为例,这里执行的是NewThreadWorker中的schedule方法
        w.schedule(task, delay, unit);

        return task;
    }
    public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }
        return scheduleActual(action, delayTime, unit, null);
    }

忽略disposed的影响,最后执行到scheduleActual

   public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            //直接提交或者进入队列
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }

可以看到熟悉的ScheduledExecutorService和Future。 所以,综上所述,其内部也是新创建一个线程,然后使用Runnable。

5. observeOn

原理我觉得和subscribeOn没有太大的差别。不做赘述。

最后

RxJava使用起来方便,其中包含着很多技巧,也不甚了解,只能是慢慢用,慢慢理
【码道长】公众号,最近开始运营,有兴趣的朋友欢迎来访。
这里写图片描述




原文地址:访问原文地址
快照地址: 访问文章快照