博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
关于RxJava最友好的文章(进阶)
阅读量:6411 次
发布时间:2019-06-23

本文共 8113 字,大约阅读时间需要 27 分钟。

前言

之前就写过一篇,反响很不错,由于那篇文章的定位就是简单友好,因此尽可能的摒弃复杂的概念,只抓住关键的东西来讲,以保证大家都能看懂。

不过那篇文章写完之后,我就觉得应该还得有一篇文章给RxJava做一个深入的讲解才算完美,于是就有了今天的进阶篇。因为一个团队里可能大家都会用RxJava,但是必须要有一个人很懂这个,不然碰到问题可就麻烦了。

在前一篇文章中的最后,我们得出结论:RxJava就是在观察者模式的骨架下,通过丰富的操作符和便捷的异步操作来完成对于复杂业务的处理。今天我们还是就结论中的观察者模式操作符来做深入的拓展。

在进入正题之前,还是希望大家先去看看。

关于观察者模式

前一篇文章首先就重点谈到了观察者模式,我们认为观察者模式RxJava的骨架*。在这里不是要推翻之前的结论,而是希望从深入它的内部的去了解它的实现。

依然使用之前文章中关于开关和台灯的代码

//创建一个被观察者(开关) Observable switcher=Observable.create(new Observable.OnSubscribe
(){ @Override public void call(Subscriber
subscriber) { subscriber.onNext("On"); subscriber.onNext("Off"); subscriber.onNext("On"); subscriber.onNext("On"); subscriber.onCompleted(); } });//创建一个观察者(台灯) Subscriber light=new Subscriber
() { @Override public void onCompleted() { //被观察者的onCompleted()事件会走到这里; Log.d("DDDDDD","结束观察...\n"); } @Override public void onError(Throwable e) { //出现错误会调用这个方法 } @Override public void onNext(String s) { //处理传过来的onNext事件 Log.d("DDDDD","handle this---"+s) }//订阅switcher.subscribe(light);复制代码

以上就是一个RxJava观察者架构,

看到这样的代码不知道你会不会有一些疑惑:

  • 被观察者中的Observable.OnSubscribe是什么,有什么用?
  • call(subscriber)方法中,subscriber哪里来的?
  • 为什么只有在订阅之后,被观察者才会开始发送消息?

其实,这些问题都可以通过了解OnSubscribe来解决。

那我们先来看看关于OnSubscribe的定义

//上一篇文章也提到Acton1这个接口,内部只有一个待实现call()方法//没啥特别,人畜无害public interface Action1
extends Action { void call(T t);}//OnSubscribe继承了这个Action1接口public interface OnSubscribe
extends Action1
> { // OnSubscribe仍然是个接口 }复制代码

那么也就是说,OnSubscribe本质上也是和 Action1一样的接口,只不过它专门用于Observable内部。

而在Observable观察者的类中,OnSubscribe是它唯一的属性,同时也是Observable构造函数中唯一必须传入的参数,也就是说,只要创建了Observable,那么内部也一定有一个OnSubscribe对象。

当然,Observable是没有办法直接new的,我们只能通过create(),just()等等方法创建,当然,这些方法背后去调用了new Observable(onSubscribe)

public class Observable
{ //唯一的属性 final OnSubscribe
onSubscribe; //构造函数,因为protected,我们只能使用create函数 protected Observable(OnSubscribe
f) { this.onSubscribe = f; } //create(onSubscribe) 内部调用构造函数。 public static
Observable
create(OnSubscribe
f) { return new Observable
(RxJavaHooks.onCreate(f)); } .... .... }复制代码

当创建了Observable和Subscribe之后,调用subscribe(subscriber)方法时,发生了什么呢?

//传入了观察者对象    public final Subscription subscribe(final Observer
observer) { .... //往下调用 return subscribe(new ObserverSubscriber
(observer)); } public final Subscription subscribe(Subscriber
subscriber) { //往下调用 return Observable.subscribe(subscriber, this); } //调用到这个函数 static
Subscription subscribe(Subscriber
subscriber, Observable
observable) { // new Subscriber so onStart it subscriber.onStart(); // add a significant depth to already huge call stacks. try { // 在这里简单讲,对onSubscribe进行封装,不必紧张。 OnSubscribe onSubscribe=RxJavaHooks.onObservableStart(observable, observable.onSubscribe); //这个才是重点!!! //这个调用的具体实现方法就是我们创建观察者时 //写在Observable.create()中的call()呀 //而调用了那个call(),就意味着事件开始发送了 onSubscribe.call(subscriber); //不信你往回看 return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { .... .... } return Subscriptions.unsubscribed(); } }复制代码

代码看到这里,我们就可以对上面三个问题做统一的回答了:

  • onSubscribe是Observable内部唯一属性,是连接Observable和subscriber的关键,相当于连接台灯和开关的那根电线
  • call(Subscriber<? super String> subscriber)中的subscriber,就是我们自己创建的那个观察者
  • 只有在订阅的时候,才会发生onSubscribe.call(subscriber),进而才会开始调用onNext(),onComplete()等。

到这里,你是不是对于RxJava的观察者模式了解更加清晰了呢?我们用流程图复习一下刚才的过程。

了解了上面这些,我们就可以更进一步做以下总结:

  • 订阅这个动作,实际上是观察者(subscriber)对象把自己传递给被观察者(observable)内部的onSubscribe
  • onSubscribe的工作就是调用call(subscriber)来通知被观察者发送消息给这个subscriber

以上的结论对于下面我们理解操作符的原理十分有帮助,因此一定要看明白。

观察者模式介绍到这里,才敢说讲完了。

关于操作符

上一篇文章讲了一些操作符,并且在github上放了很多其他的操作符使用范例给大家,因此在这里不会介绍更多操作符的用法,而是讲解操作符的实现原理。他是如何拦截事件,然后变换处理之后,最后传递到观察者手中的呢?

相信了解相关内容的人可能会想到lift()操作符,它本来是其他操作符做变换的基础,不过那已经是好几个版本以前的事情了。但是目前版本中RxJava已经不一样了了,直接把lift()的工作下放到每个操作符中,把lift的弱化了(但是依然保留了lift()操作符)。

因此,我们在这里不必讲解lift,直接拿一个操作符做例子,来了解它的原理即可,因为基本上操作符的实现原理都是一样的。

以map()为例,依然拿之前文章里面的例子:

Observable.just(getFilePath())            //使用map操作来完成类型转换            .map(new Func1
() { @Override public Bitmap call(String s) { //显然自定义的createBitmapFromPath(s)方法,是一个极其耗时的操作 return createBitmapFromPath(s); } }) .subscribe( //创建观察者,作为事件传递的终点处理事件 new Subscriber
() { @Override public void onCompleted() { Log.d("DDDDDD","结束观察...\n"); } @Override public void onError(Throwable e) { //出现错误会调用这个方法 } @Override public void onNext(Bitmap s) { //处理事件 showBitmap(s) } );复制代码

看看map背后到底做了什么

public final 
Observable
map(Func1
func) { //创建了全新代理的的Observable,构造函数传入的参数是OnSubscribe //OnSubscribeMap显然是OnSubscribe的一个实现类, //也就是说,OnSubscribeMap需要实现call()方法 //构造函数传入了真实的Observable对象 //和一个开发者自己实现的Func1的实例 return create(new OnSubscribeMap
(this, func)); }复制代码

看OnSubscribeMap的具体实现:

public final class OnSubscribeMap
implements OnSubscribe
{ //用于保存真实的Observable对象 final Observable
source; //还有我们传入的那个Func1的实例 final Func1
transformer; public OnSubscribeMap(Observable
source, Func1
transformer) { this.source = source; this.transformer = transformer; } //实现了call方法,我们知道call方法传入的Subscriber //就是订阅之后,外部传入真实的的观察者 @Override public void call(final Subscriber
o) { //把外部传入的真实观察者传入到MapSubscriber,构造一个代理的观察者 MapSubscriber
parent = new MapSubscriber
(o, transformer); o.add(parent); //让外部的Observable去订阅这个代理的观察者 source.unsafeSubscribe(parent); } //Subscriber的子类,用于构造一个代理的观察者 static final class MapSubscriber
extends Subscriber
{ //这个Subscriber保存了真实的观察者 final Subscriber
actual; //我们自己在外部自己定义的Func1 final Func1
mapper; boolean done; public MapSubscriber(Subscriber
actual, Func1
mapper) { this.actual = actual; this.mapper = mapper; } //外部的Observable发送的onNext()等事件 //都会首先传递到代理观察者这里 @Override public void onNext(T t) { R result; try { //mapper其实就是开发者自己创建的Func1, //call()开始变换数据 result = mapper.call(t); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); unsubscribe(); onError(OnErrorThrowable.addValueAsLastCause(ex, t)); return; } //调用真实的观察者的onNext() //从而在变换数据之后,把数据送到真实的观察者手中 actual.onNext(result); } //onError()方法也是一样 @Override public void onError(Throwable e) { if (done) { RxJavaHooks.onError(e); return; } done = true; actual.onError(e); } @Override public void onCompleted() { if (done) { return; } actual.onCompleted(); } @Override public void setProducer(Producer p) { actual.setProducer(p); } }}复制代码

map操作符的原理基本上就讲完了,其他的操作符和map在原理上是一致的。

假如你想创建自定义的操作符(这其实是不建议的),你应该按照上面的步骤

  • 创建一个代理的被观察者
  • 实现被观察者中onSubscribe的call方法
  • 在call方法中创建一个代理的观察者,让真实的被观察者订阅它。

我知道你会有点晕,没关系,我后面会写一个自定义操作符放在上,可以关注下。

下面,我们先通过一个流程图巩固一下前面学习的成果。

下次你使用操作符时,心里应该清楚,每使用一个操作符,都会创建一个代理观察者和一个代理被观察者,以及他们之间是如何相互调用的。相信有了这一层的理解,今后使用RxJava应该会更加得心应手。

不过最后,我想给大家留一个思考题:使用一个操作符时,流程图是这样的,那么使用多个操作符呢?

勘误

暂无

后记

到这里,关于RxJava的讲解就基本可以告一段落了,

我相信,两篇文章读下来,对于RxJava的理解应该已经到了比较高的一个层次了,我的目标也就达到了。

接下来....

因为RxJava是一个事件的异步处理框架,理论上,他可以封装任何其他的库,那么.....

转载地址:http://ezkra.baihongyu.com/

你可能感兴趣的文章
Backbone.js 使用模板
查看>>
安装xenomai的记实
查看>>
我们为什么需要SDN?---致新人
查看>>
自制VTP实验总结
查看>>
prime_test
查看>>
用python的smtplib发送邮件python2,3都可以使用
查看>>
我在新浪微博的第1000篇,截图纪念。
查看>>
Graphx处理janusGraph数据实现
查看>>
Oracle Study之案例--重建数据库控制文件
查看>>
log4j&slf4j日志框架入门
查看>>
五子棋精髓
查看>>
记一次升级Oracle驱动引发的死锁
查看>>
php加速器eaccelerator概述
查看>>
SER2012安装和激活终端激活
查看>>
微容器:更小的,更轻便的Docker容器
查看>>
TXT 文本阅读器源码
查看>>
java异常
查看>>
SQL2008笔记(一)
查看>>
Linux通过命令发送邮件
查看>>
HttpClient4.4 登录知乎(详细过程)
查看>>