RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。免除了业务逻辑代码的频繁回调,增加了逻辑代码的可读性。
观察者模式:
A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,需要在 B 变化的一瞬间做出反应。
观察者不需要时刻盯着被观察者(例如 A 不需要每过 2ms 就检查一次 B 的状态),而是采用注册(Register)或者称为订阅(Subscribe)的方式
- 观察者接口定义
1
2
3public interface Observer {
void update(Object value);
} - 观察者实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14public class User implements Observer{
private String name;
private String message;
public User(String name){
this,name = name;
}
public void update(Object value){
this.message = (String) value;
readMessage();
}
private void readMessage(){
System.out.println(name + "收到推送消息" + message);
}
} - 被观察者接口定义
1
2
3
4
5
6public interface Observable{
void addObserver(Observer observer);
void removeObserver(Observer observer);
void notifyObservers();
void pushMessage(String message);
} - 被观察者实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public class WechatServerObservable implements Observable{
private List<Observer> observers = new ArrayList<>(); // 缓存多个观察者
private String message;
public void addObserver(Observer observer){
observers.add(observer);
}
public void removeObserver(Observer observer){
observers.remove(observer);
}
public void notifyObservers(){
for (Observer observer : observers) {
observer.update(message);
}
}
public void pushMessage(String message){
this,message = message;
notifyObservers();
System.out.println(name + "发送推送消息" + message);
}
} - 使用
1
2
3
4
5
6
7
8
9//创建一个被观察者,微信服务
Observable observable =new WeChatServerObservable();
//创建多个观察者,用户
Observer user1 =new UserPerson("小方");
Observer user2 =new UserPerson("小明");
observable.addObserver(user1);
observable.addObserver(user2);
//推送消息
observable.pushMessage("开始抢票!");RxJava 基本概念:
Observable (可观察者,即被观察者)也称作上游
Observer (观察者)也称作下游
subscribe (订阅)、事件
Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。
onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
对于Android本身有一套现成的框架,复杂的逻辑操作在服务端较多,客户端出现的场景较少,长时间不怎么使用,很容易让人忘记那些理论知识,所以根据一些实战例子来进行学习效果更佳。
map操作符
常用于数据列表类型转换
上游读取数据源并发送到下游,Utils.getApiUserList()模拟从服务端获得的dto类型为ApiUser的数据列表
1 | private Observable<List<ApiUser>> getObservable() { |
建立订阅关系,通过map操作符将ApiUser类型的dto转化成User类型的dto,再执行下游的处理逻辑:
1 | //上游处理逻辑 |
下游获取到数据源,进行数据打印:
1 | private Observer<List<User>> getObserver() { |
flatMap操作符
常用于遍历数据列表,按条件进行过滤
与它相同的是concatMap,但是flatMap可能会乱序,concatMap可保证下游的获取到的数据源与上游发送顺序一致
此处上游数据源与map操作符相同
1 | getObservable() |
下游获取到数据源,进行数据打印:
1 | private Observer<ApiUser> getObserver(){ |
zip操作符
常用于对两个数据源列表进行处理,并返回一个结果数据列表
上游读取两个数据源并发送到下游:
1 | private Observable<List<User>> getCricketFansObservable() { |
建立订阅关系,通过zip操作符过滤两个数据源都有的数据,返回结果数据列表
1 | Observable.zip(getCricketFansObservable(), getFootballFansObservable(), |
下游获取到数据源进行数据打印
1 | private Observer<List<User>> getObserver() { |
Interval操作符
常用于周期性的处理逻辑(每隔多少时间后执行)
上游11秒延迟后,每隔2秒,发送long型数据源到下游,从0开始逐渐递增
1 | private Observable<? extends Long> getObservable() { |
建立订阅关系,CompositeDisposable统一存放水管,也就是被观察者,为防止内存泄漏,在activity被destory时切断所有的水管
1 | CompositeDisposable disposables = new CompositeDisposable(); |
下游获取到数据源进行数据打印:
1 | private DisposableObserver<Long> getObserver() { |
Timer操作符
常用于计时器的处理逻辑(多少时间后执行)
上游延迟2秒发送long型数据0到下游,只发送一次
1 | private Observable<? extends Long> getObservable() { |
建立订阅关系
1 | getObservable() |
下游进行打印输出
1 | private Observer<Long> getObserver() { |
Flowable背压式
Rxjava在处理异步操作时,会先将上游的数据放入一个缓存池中,这里称作水缸,一旦下游不能及时处理就会出现内存溢出的问题,抛出MissingBackpressureException
解决方式
- 放慢上游的发送速度
- 减少发送的数据
Flowable在设计的时候采用了一种新的思路也就是响应式拉取的方式来更好的解决上下游流速不均衡的问题,
在同一线程中下游没有调用request, 上游就认为下游没有处理事件的能力,抛出MissingBackpressureException,当上下游工作在不同的线程中时, 只有当下游调用request时, 才从水缸里取出事件发给下游。背压策略
BackpressureStrategy.ERROR:
默认策略,有一个大小为128的水缸BackpressureStrategy.BUFFER
增大水缸的容量,仍有内存溢出问题BackpressureStrategy.DROP
水缸存不下时,直接把存不下的事件丢弃BackpressureStrategy.LATEST
水缸存不下时,只保留最新的事件reduce操作符
遍历list,第一次将第一个元素和第二个元素作为入参,返回一个结果值,以后将上一次返回的结果和当前遍历的元素作为入参,返回一个结果值。
与它相同的是scan操作符,但是scan操作符会返回上一次的结果
上游创建数据源,发送到下游建立订阅关系,将上一次返回的结果和当前遍历的元素作为入参,最后返回一个累加值1
2
3private Observable<Integer> getObservable() {
return Observable.just(1, 2, 3, 4);
}下游进行打印输出1
2
3
4
5
6
7
8getObservable()
.reduce(new BiFunction<Integer, Integer, Integer>() {
public Integer apply(Integer t1, Integer t2) {
return t1 + t2;
}
})
.subscribe(getObserver());1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29private MaybeObserver<Integer> getObserver() {
return new MaybeObserver<Integer>() {
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
public void onSuccess(Integer value) {
textView.append(" onSuccess : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onSuccess : value : " + value);
}
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}Filter操作符
遍历list中的每个元素,按条件过滤1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43Observable.just(1, 2, 3, 4, 5, 6)
.filter(new Predicate<Integer>() {
public boolean test(Integer integer) throws Exception {
return integer % 2 == 0;
}
})
.subscribe(getObserver());
private Observer<Integer> getObserver() {
return new Observer<Integer>() {
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
public void onNext(Integer value) {
textView.append(" onNext : ");
textView.append(AppConstant.LINE_SEPARATOR);
textView.append(" value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext ");
Log.d(TAG, " value : " + value);
}
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}Contact操作符
将多个数据源结合成一个数据源并发送数据,并且严格按照先后顺序发射数据
与它相同的有merge,但是merge是无序的且是并发的1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39String[] aStrings = {"A1", "A2", "A3", "A4"};
String[] bStrings = {"B1", "B2", "B3"};
Observable<String> aObservable = Observable.fromArray(aStrings);
Observable<String> bObservable = Observable.fromArray(bStrings);
Observable.concat(aObservable, bObservable)
.subscribe(getObserver());
private Observer<String> getObserver() {
return new Observer<String>() {
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
public void onNext(String value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext : value : " + value);
}
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}Rxbus的使用
定义Rxbus在Application中初始化Rxbus,做事件的统一管理1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public class RxBus {
public RxBus() {
}
private PublishSubject<Object> bus = PublishSubject.create();
//发送event
public void send(Object o) {
bus.onNext(o);
}
//获取bus对象
public Observable<Object> toObservable() {
return bus;
}
public boolean hasObservers() {
return bus.hasObservers();
}
}在activity中发送事件,CompositeDisposable统一存放水管,为避免内存泄漏,destory时切断水管1
2
3
4
5
6
7
8
9
10
11
12public class MyApplication extends Application {
private RxBus bus;
public void onCreate() {
super.onCreate();
bus = new RxBus();
}
public RxBus bus() {
return bus;
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25CompositeDisposable disposables = new CompositeDisposable();
protected void onDestroy() {
super.onDestroy();
disposables.clear();
}
//上游是MyApplication的bus,建立订阅关系,绑定下游事件接收回调处理
disposables.add(((MyApplication) getApplication())
.bus()
.toObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Object>() {
public void accept(Object object) throws Exception {
if (object instanceof Events.TapEvent) {
textView.setText("Tap Event Received");
}
}
}));
//发送事件到MyApplication的bus
((MyApplication) getApplication())
.bus()
.send(new Events.TapEvent());操作符组合使用
android搜索功能的实现
绑定searchview的监听建立订阅关系,上游输入文字监听,下游显示结果1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public class RxSearchObservable {
public static Observable<String> fromView(SearchView searchView) {
final PublishSubject<String> subject = PublishSubject.create();
searchView.setOnQueryTextListener(new SearchView.OnQueryTextListener() {
public boolean onQueryTextSubmit(String s) {
subject.onComplete();
return true;
}
public boolean onQueryTextChange(String text) {
subject.onNext(text);
return true;
}
});
return subject;
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49//上游绑定SearchView输入文字监听
RxSearchObservable.fromView(searchView)
//每隔300毫秒执行
.debounce(300, TimeUnit.MILLISECONDS)
//过滤输入文字不为空
.filter(new Predicate<String>() {
public boolean test(String text) throws Exception {
if (text.isEmpty()) {
textViewResult.setText("");
return false;
} else {
return true;
}
}
})
//与上次的数据对比有变化
.distinctUntilChanged()
//只发射最近的一次observable
.switchMap(new Function<String, ObservableSource<String>>() {
public ObservableSource<String> apply(String query) throws Exception {
//从网络获取数据
return dataFromNetwork(query);
}
})
//在子线程中运行
.subscribeOn(Schedulers.io())
//切到主线程
.observeOn(AndroidSchedulers.mainThread())
//下游显示结果
.subscribe(new Consumer<String>() {
public void accept(String result) throws Exception {
textViewResult.setText(result);
}
});
//模拟网络请求
private Observable<String> dataFromNetwork(final String query) {
return Observable.just(true)
.delay(2, TimeUnit.SECONDS)
.map(new Function<Boolean, String>() {
public String apply(@NonNull Boolean value) throws Exception {
return query;
}
});
}封装线程切换
1
2
3
4
5
6
7
8
9
10
11
12
13private final static <UD> ObservableTransformer <UD, UD> rxud(){
return new ObservableTransformer<UD,UD>(){
public ObservableSource<UD> apply (Observable<UD> upstream){
return upstream.subscribeOn(Schedulers,io())
.observableOn(AndroidSchedulers.mainThread());
}
}
}
Observable.from(getObservable())
.map(data -> dealwith(data))
.compose(rxud())
.subscribe(getObserver());切子线程切主线程再切子线程切主线程
doOnNext 替代subscribeOn连接两次请求之间的切换1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19api.registerAction()
.subscribeOn(Schedulers.io())
.observableOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<RegisterResponse>(){
public void accept(RegisterResponse registerResponse) throws Exception {
}
})
.observableOn(Schedulers.io())
.flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>(){
public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse){
Observable<LoginResponse> loginResponse = api.loginAction();
return loginResponse;
}
})
.observableOn(AndroidSchedulers.mainThread())
.subscribe(new Observable<LoginResponse>{
})封装RXView
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78public class RxView{
private final static String TAG = RxView.class.getSimpleName();
public static Observable<Object> clicks(View view){
return new ViewClickObservable(view);
}
}
public class ViewClickObservable extends Observable<Object> {
private final View view;
private static final Object EVENT = new Object();
private static Object EVENT2;
public ViewClickObservable(View view){
this.view = view;
EVENT2 = view;
}
protected void subscribeActual(Observer<? super Object> observer){
MyListener myListener = new MyListener(view, observer);
observer.onSubscribe(myListener)
this.view.setOnClickListener(myListener)
}
static final class MyListener implements View.OnClickListener, Disposable{
private final View view;
private Observer<Object> observer;
private final AtomicBoolean isDisposable = new AtomicBoolean(); // 是否中断
public MyListener(View view, Observer<Object> observer){
this.view = view;
this.observer = observer;
}
public void onClick(View v){
// 继续分发
if (isDisposed() == false){
observer.onNext(EVENT);
}
}
public void dispose(){
// 之前没有中断,设置为中断
if (isDisposable.compareAndSet(false, true)){
// 主线程取消监听逻辑
if (Looper.myLoop() == Looper.getMainLooper()){
view.setOnClickListener(null)
}else {
AndroidSchedulers.mainThread().schedulerDirect(new Runnable()){
public void run(){
view.setOnClickListener(null)
}
}
}
}
}
public boolean isDisposed(){
return isDisposable;
}
}
}
RxView.clicks(button)
.throttleFirst(2000, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Object>(){
public void accept(Object o) throws Exception {
Observable.create(new ObservableOnSubscribe<String>(){
public void subscribe(ObservableEmitter<String> e) throws Exception{
e.onNext("click");
}
})
}
})
.subscribe(new Consumer<String>(){
public void accept(String s) throws Exception {
Log.d("RxView","accept:" + s)
}
})