千家信息网

RxJava简单源码的示例分析

发表于:2025-01-26 作者:千家信息网编辑
千家信息网最后更新 2025年01月26日,今天就跟大家聊聊有关RxJava简单源码的示例分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。demo代码如下public class Ob
千家信息网最后更新 2025年01月26日RxJava简单源码的示例分析

今天就跟大家聊聊有关RxJava简单源码的示例分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

demo代码如下

public class ObservableTest {    public static void main(String[] args) {        Observable observable = Observable.create(new ObservableOnSubscribe() {            @Override            public void subscribe(ObservableEmitter observer) throws Exception {                observer.onNext("处理的数字是:" + Math.random() * 100);                observer.onComplete();            }        });        observable.subscribe(new Consumer() {            @Override            public void accept(Object consumer) throws Exception {                System.out.println("我处理的元素是:" + consumer);            }        });    }}

先看第一行代码

Observable observable = Observable.create(new ObservableOnSubscribe() {    @Override    public void subscribe(ObservableEmitter observer) throws Exception {        observer.onNext("处理的数字是:" + Math.random() * 100);        observer.onComplete();    }});//Observable.java//第1560行public static  Observable create(ObservableOnSubscribe source) {    ObjectHelper.requireNonNull(source, "source is null");    //RxJavaPlugins里有很多方法可以设置,    //有点类似于Spring的ApplicationListener,在对应的生命周期中会被调用    return RxJavaPlugins.onAssembly(new ObservableCreate(source));}//RxJavaPlugins.java//第1031行public static  Observable onAssembly(@NonNull Observable source) {    Function f = onObservableAssembly;    //如果设置了对应的方法,就执行,否则原样返回    if (f != null) {        return apply(f, source);    }    return source;}

可以看到RxJavaPlugins中的方法如果不配置的方法,参数就会原样返回,所以Observable.create最终得到的就是ObservableCreate这个类。


再来看第二行代码

observable.subscribe(new Consumer() {    @Override    public void accept(Object consumer) throws Exception {        System.out.println("我处理的元素是:" + consumer);    }});//Observable.java//第10869行public final Disposable subscribe(Consumer onNext) {    return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());}//Observable.java//第10958行public final Disposable subscribe(Consumer onNext, Consumer onError,            Action onComplete, Consumer onSubscribe) {    ObjectHelper.requireNonNull(onNext, "onNext is null");    ObjectHelper.requireNonNull(onError, "onError is null");    ObjectHelper.requireNonNull(onComplete, "onComplete is null");    ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");    //这里的onNext就是我们自己写的Consumer类    LambdaObserver ls = new LambdaObserver(onNext, onError, onComplete, onSubscribe);    subscribe(ls);    return ls;}//Observable.java//第10974行public final void subscribe(Observer observer) {    ObjectHelper.requireNonNull(observer, "observer is null");    try {        observer = RxJavaPlugins.onSubscribe(this, observer);        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");        //还记得我们的observable变量是什么类型么?ObservableCreate!        subscribeActual(observer);    } catch (NullPointerException e) { // NOPMD        throw e;    } catch (Throwable e) {        Exceptions.throwIfFatal(e);        // can't call onError because no way to know if a Disposable has been set or not        // can't call onSubscribe because the call might have set a Subscription already        RxJavaPlugins.onError(e);        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");        npe.initCause(e);        throw npe;    }}//ObservableCreate.java//第35行protected void subscribeActual(Observer observer) {    //这里的observer是LambdaObserver    CreateEmitter parent = new CreateEmitter(observer);    observer.onSubscribe(parent);    //省略部分代码}//LambdaObserver.java//第47行public void onSubscribe(Disposable s) {    //设置AtomicReference的值(LambdaObserver继承了AtomicReference)    //如果之前已经设置过了(AtomicReference的值不为空),则直接返回false    if (DisposableHelper.setOnce(this, s)) {        try {            //在new LambdaObserver()的时候我们设置了onSubscribe = Functions.emptyConsumer()            //所以这里什么都不做            onSubscribe.accept(this);        } catch (Throwable ex) {            Exceptions.throwIfFatal(ex);            s.dispose();            onError(ex);        }    }}//ObservableCreate.java//第35行protected void subscribeActual(Observer observer) {    //省略部分代码    try {        //还记得source是啥么,就是你在创建Observable的时候new的ObservableOnSubscribe        //于是终于执行到了我们编写的代码中        source.subscribe(parent);    } catch (Throwable ex) {        Exceptions.throwIfFatal(ex);        parent.onError(ex);    }}//ObservableOnSubscribe.java//第6行public static void main(String[] args) {    Observable observable = Observable.create(new ObservableOnSubscribe() {        //开始执行这个方法        //observer是new CreateEmitter(new LambdaObserver());        @Override        public void subscribe(ObservableEmitter observer) throws Exception {            observer.onNext("处理的数字是:" + Math.random() * 100);            observer.onComplete();        }    });}//ObservableCreate$CreateEmitter//第61行public void onNext(T t) {    if (t == null) {        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));        return;    }    if (!isDisposed()) {        //这里的observer就是LambdaObserver        //t就是《"处理的数字是:" + Math.random() * 100》这段字符串        observer.onNext(t);    }}//LambdaObserver.java//第60行public void onNext(T t) {    if (!isDisposed()) {        try {            onNext.accept(t);        } catch (Throwable e) {            Exceptions.throwIfFatal(e);            get().dispose();            onError(e);        }    }}//ObservableOnSubscribe.java//第13行public static void main(String[] args) {    //省略部分代码        observable.subscribe(new Consumer() {        @Override        public void accept(Object consumer) throws Exception {            System.out.println("我处理的元素是:" + consumer);        }    });}//ObservableOnSubscribe.java//第8行public static void main(String[] args) {    Observable observable = Observable.create(new ObservableOnSubscribe() {        @Override        public void subscribe(ObservableEmitter observer) throws Exception {            //省略部分代码            observer.onComplete();        }    });    //省略部分代码}//ObservableCreate.java//第95行public void onComplete() {    if (!isDisposed()) {        try {            observer.onComplete();        } finally {            //取消订阅            dispose();        }    }}//LambdaObserver.java//第86行public void onComplete() {    if (!isDisposed()) {        lazySet(DisposableHelper.DISPOSED);        try {            //new LambdaObserver的时候设置了为空,所以不执行操作            onComplete.run();        } catch (Throwable e) {            Exceptions.throwIfFatal(e);            RxJavaPlugins.onError(e);        }    }}

至此,调用流程分析完成,可以看到虽然在main方法里我们只写了几行代码,但是内部调用的流程还是很繁杂的

看完上述内容,你们对RxJava简单源码的示例分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。

0