logo头像

你是否为一个人拼过命

RxJava的简单入门

本文于1328天之前发表,文中内容可能已经过时。

Rxjava简单入门

  • Rxjava 的含义
  • Rxjava的外层思路
  • 一些简单的Rxjava源码分析
  • 学习Rxjava源码思路的建议
Rxjava 的含义

Rxjava GitHub的解释:RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences. —“一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库”;通俗地说就是异步,和AsyncTask / Handle类似,扩展的观察者模式。
观察者模式:
观察者模式是对象的行为模式,又叫发布-订阅(Publish/Subscribe)模式、模型-视图(Model/View)模式、源-监听器(Source/Listener)模式或从属者(Dependents)模式。
  观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个主题对象在状态上发生变化时,会通知所有观察者对象,使它们能够自动更新自己。比如Button的点击相应
  
代码示例:

1
2
3
4
5
6
view.setOnClickListener(new View.onClickListener(){
@Override
public void onClick(View v) {
}
};)
Rxjava 的外层思路

Observable :被观察者;
Observer :观察者;
subscribe: 实现两者的订阅关系。subscribeOn() 和 observeOn() 两个方法来对线程进行控制。
subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

Scheduler ——调度器,线程控制器。
列举几个默认场景:

  • Schedulers.immediate(): 当前线程
  • Schedulers.newThread():新线程
  • Schedulers.io():
  • Schedulers.computation(): 计算所使用的线程,不能被IO所占
  • Android 还有一个专用的AndroidSchedulers.mainThread(),它指定的操作将在Android 主线程运行。
一些简单的Rxjava源码分析

Observable 被观察者的创建

图形解说:流控制 (淘一张官方图…表示有点晕)
image

Subscriber 是 Obser的一个抽象类,实现了Observer的接口,有onStart(),unsubscribe(){取消订阅}方法。Observable 在 subscribe(new subscriber)[即订阅的时候]方法执行call方法 。

1
public abstract class Subscriber<T> implements Observer<T>, Subscription

RxJava Observable create 源码

1
2
3
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}

hook.onCreate(f)的源码 原参数返回 onSubscribe。

1
2
3
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
return f;
}

Observable.create()方法构造了一个被观察者Observable对象,同时将new出来的OnSubscribe赋值给了该Observable的成员变量onSubscribe。

subscribe的源码释义 里面的内容剪切部分

1
2
3
4
5
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}

这个SafeSubscriber就是subscriber的代理,让onCompleted和onError只能执行一个

SafeSubscriber的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public void onError(Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwIfFatal(e);
if (!done) {
done = true;
_onError(e);
}
}
@Override
public void onCompleted() {
if (!done) {
done = true;
try {
actual.onCompleted();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPluginUtils.handleException(e);
throw new OnCompletedFailedException(e.getMessage(), e);
} finally {
try {
unsubscribe();
} catch (Throwable e) {
RxJavaPluginUtils.handleException(e);
throw new UnsubscribeFailedException(e.getMessage(), e);
}
}
}
}

其中下面的hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber) 相当于 onSubscribe.call(subscriber)—new 出来的观察者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
if (subscriber.isUnsubscribed()) {
RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
} else {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
hook.onSubscribeError(r);
throw r;
}
}
return Subscriptions.unsubscribed();
}

Observable create例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable<Boolean> mObservable = Observable
.create(new Observable.OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> subscriber) {
subscriber.onNext(writeBytes(data));
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
mObservable.subscribe(new Subscriber<Boolean>() {
@Override
public void onCompleted() {
Logger.e("onCompleted");
}
@Override
public void onError(Throwable e) {
Logger.e("onError");
}
@Override
public void onNext(Boolean aBoolean) {
Logger.e("onNext" + aBoolean);
}
});

大概执行思路:调用Observable.crate()创建一个观察者,同时创建一个OnSubscribe作为该方法的入参;用subscribe()来订阅我们自己创建的观察者Subscriber。调用subscribe()方法后,就会触发执行OnSubscribe.call(),然后在call方法调用观察者subscriber里的三个方法。

介绍下FlatMap和Map变换的区别

Map变换 只是对数据类型的转换,传入参数返回一个对象

1
2
3
4
5
6
7
.map(new Func1<String, Boolean>() {
@Override
public Boolean call(String string) {
//做数据对比
return true;
}
});

FlatMap变换 创建的是一个新Observable对象,并激活和发送到同一个Observable,这个 Observable 负责将这些事件统一交给 Subscriber 的回调当中去。

1
2
3
4
5
6
.flatMap(new Func1<Boolean, Observable<?>>() {
@Override
public Observable<?> call(Boolean aBoolean) {
return Observable.timer(3000, TimeUnit.MILLISECONDS);
}
});

应用场景:
当RxJava与Retrofit结合使用的时候,FlatMap变换可以更好地应用到http请求的多次尝试 ,retryWhen变换中就可以应用;Map可以更好地对数据结果进行截取data…等等。
更多的操作符请移步官网:http://reactivex.io/documentation/operators.html

学习Rxjava源码思路的建议
注重源码阅读

Java 观察者模式:http://www.cnblogs.com/java-my-life/archive/2012/05/16/2502279.html
大头鬼收集的:https://github.com/lzyzsd/awesome-rxjava
RxAndroid Tutorial学习网站:https://www.raywenderlich.com/141980/rxandroid-tutorial?utm_source=Android+Weekly&utm_campaign=7128946b09-androidweekly-240&utm_medium=email&utm_term=0_4eb677ad19-7128946b09-338017429
RxJava 官方翻译文档:https://mcxiaoke.gitbooks.io/rxdocs/content/Operators.html
Rxjava 简书:http://www.jianshu.com/p/059907d3e82b
RxJava源码分析:https://segmentfault.com/a/1190000008903789
RxJava执行流程分析:https://zhuanlan.zhihu.com/p/22338235
Rxjava2:http://www.jcodecraeer.com/a/anzhuokaifa/androidkaifa/2016/0907/6604.html
Rxjava2.0和Rxjava1.0的区别:https://github.com/ReactiveX/RxJava/wiki/What%27s-different-in-2.0
GitHub - google/agera: Reactive Programming for Android: https://github.com/google/agera —反应式编程