RxJava简单源码的示例分析
发表于:2025-01-26 作者:千家信息网编辑
千家信息网最后更新 2025年01月26日,今天就跟大家聊聊有关RxJava简单源码的示例分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。demo代码如下public class Ob
千家信息网最后更新 2025年01月26日RxJava简单源码的示例分析
今天就跟大家聊聊有关RxJava简单源码的示例分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
demo代码如下
public class ObservableTest { public static void main(String[] args) { Observable
先看第一行代码
Observableobservable = Observable.create(new ObservableOnSubscribe () { @Override public void subscribe(ObservableEmitter observer) throws Exception { observer.onNext("处理的数字是:" + Math.random() * 100); observer.onComplete(); }});//Observable.java//第1560行public static Observable create(ObservableOnSubscribe source) { ObjectHelper.requireNonNull(source, "source is null"); //RxJavaPlugins里有很多方法可以设置, //有点类似于Spring的ApplicationListener,在对应的生命周期中会被调用 return RxJavaPlugins.onAssembly(new ObservableCreate (source));}//RxJavaPlugins.java//第1031行public static Observable onAssembly(@NonNull Observable source) { Function super Observable, ? extends Observable> f = onObservableAssembly; //如果设置了对应的方法,就执行,否则原样返回 if (f != null) { return apply(f, source); } return source;}
可以看到RxJavaPlugins中的方法如果不配置的方法,参数就会原样返回,所以Observable.create最终得到的就是ObservableCreate这个类。
再来看第二行代码
observable.subscribe(new Consumer() { @Override public void accept(Object consumer) throws Exception { System.out.println("我处理的元素是:" + consumer); }});//Observable.java//第10869行public final Disposable subscribe(Consumer super T> onNext) { return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());}//Observable.java//第10958行public final Disposable subscribe(Consumer super T> onNext, Consumer super Throwable> onError, Action onComplete, Consumer super Disposable> onSubscribe) { ObjectHelper.requireNonNull(onNext, "onNext is null"); ObjectHelper.requireNonNull(onError, "onError is null"); ObjectHelper.requireNonNull(onComplete, "onComplete is null"); ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null"); //这里的onNext就是我们自己写的Consumer类 LambdaObserver ls = new LambdaObserver (onNext, onError, onComplete, onSubscribe); subscribe(ls); return ls;}//Observable.java//第10974行public final void subscribe(Observer super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); //还记得我们的observable变量是什么类型么?ObservableCreate! subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; }}//ObservableCreate.java//第35行protected void subscribeActual(Observer super T> observer) { //这里的observer是LambdaObserver CreateEmitter parent = new CreateEmitter (observer); observer.onSubscribe(parent); //省略部分代码}//LambdaObserver.java//第47行public void onSubscribe(Disposable s) { //设置AtomicReference的值(LambdaObserver继承了AtomicReference) //如果之前已经设置过了(AtomicReference的值不为空),则直接返回false if (DisposableHelper.setOnce(this, s)) { try { //在new LambdaObserver()的时候我们设置了onSubscribe = Functions.emptyConsumer() //所以这里什么都不做 onSubscribe.accept(this); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.dispose(); onError(ex); } }}//ObservableCreate.java//第35行protected void subscribeActual(Observer super T> observer) { //省略部分代码 try { //还记得source是啥么,就是你在创建Observable的时候new的ObservableOnSubscribe //于是终于执行到了我们编写的代码中 source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}//ObservableOnSubscribe.java//第6行public static void main(String[] args) { Observable observable = Observable.create(new ObservableOnSubscribe () { //开始执行这个方法 //observer是new CreateEmitter (new LambdaObserver()); @Override public void subscribe(ObservableEmitter observer) throws Exception { observer.onNext("处理的数字是:" + Math.random() * 100); observer.onComplete(); } });}//ObservableCreate$CreateEmitter//第61行public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { //这里的observer就是LambdaObserver //t就是《"处理的数字是:" + Math.random() * 100》这段字符串 observer.onNext(t); }}//LambdaObserver.java//第60行public void onNext(T t) { if (!isDisposed()) { try { onNext.accept(t); } catch (Throwable e) { Exceptions.throwIfFatal(e); get().dispose(); onError(e); } }}//ObservableOnSubscribe.java//第13行public static void main(String[] args) { //省略部分代码 observable.subscribe(new Consumer () { @Override public void accept(Object consumer) throws Exception { System.out.println("我处理的元素是:" + consumer); } });}//ObservableOnSubscribe.java//第8行public static void main(String[] args) { Observable observable = Observable.create(new ObservableOnSubscribe () { @Override public void subscribe(ObservableEmitter observer) throws Exception { //省略部分代码 observer.onComplete(); } }); //省略部分代码}//ObservableCreate.java//第95行public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { //取消订阅 dispose(); } }}//LambdaObserver.java//第86行public void onComplete() { if (!isDisposed()) { lazySet(DisposableHelper.DISPOSED); try { //new LambdaObserver的时候设置了为空,所以不执行操作 onComplete.run(); } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(e); } }}
至此,调用流程分析完成,可以看到虽然在main方法里我们只写了几行代码,但是内部调用的流程还是很繁杂的
看完上述内容,你们对RxJava简单源码的示例分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。
代码
处理
方法
就是
部分
数字
分析
元素
内容
时候
源码
示例
原样
流程
繁杂
一行
参数
变量
周期
字符
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
专技天下网络安全试卷6
网络安全调查问卷l
软件开发者服务收入合同
网络安全手抄报漂亮字少
网络安全防护横向隔离
大圣网络技术有限公司发薪日
北京大学计算机网络安全
cmcc的开放网络安全么
抢酒软件开发
月报网络安全补丁
java 服务器源码
中国科技论文与引文分析数据库
ip地址怎么控制服务器
数据库只读是怎么回事
opc与数据库
云服务器的密钥在哪
app服务器搭配
服务器ip不正确修复
网络安全保卫大队大队长金谦
三年级网络安全手抄报的内容
中国人民银行网络安全宣传标语
长宁区第三方软件开发供应商简介
java 服务器源码
日本强化网络安全
ios软件开发实验室
什么设备可以管理服务器web
南京网络服务器机柜哪家强
服务器ip不正确修复
网络技术题库软件
学校网络安全应急队伍