0%

LiveDataBus的封装


随着JetPack的火热,google首推的Kotlin+LiveData+ViewModel+DataBinding的MVVM框架也越来越流行,在MVP的基础上,解决了P层带来的接口地狱和内存泄漏的问题,通信框架也从传统的handler,broadcast,interface到EventBus再到rxBus,最后到LiveDataBus,目前为止LiveDataBus确实是众多通信方案中最优的,在LiveData的加持下,拥有体积小,易封装,易维护且可感知生命周期防止内存泄漏的特点,接下来记录一下LiveDataBus的封装过程


通信框架对比

Handler 高耦合,不利于维护,内存泄漏

BroadCast 性能差,传输数据有限,打乱代码的执行逻辑

Interface 实现复杂,不利于维护

RxBus 基于RxJava,学习成本高且依赖大

EventBus 需解决混淆问题,无法感知生命周期,实现复杂

发布订阅模式和观察者模式区别

  • 观察者模式:观察者和被观察者相互知道对方的存在

  • 发布订阅模式:发布者和订阅者互相不知道对方的存在

什么是LiveData

数据持有类,持有数据并且这个数据可以被观察者监听,它是和LifeCycle绑定的,在生命周期内使用有效,减少内存泄漏和引用问题

LiveData的特点

  1. UI和数据保持一致:LiveData采用观察者模式,在数据变化时得到通知更新UI
  2. 避免内存泄漏:观察者被绑定到组件的生命周期上,组件销毁时,观察者会立刻清理数据
  3. 不会在Activity的stop状态下崩溃:当Activity处于后台,不会收到LiveData的延迟消息
  4. 解决屏幕旋转重启问题:能收到最新的数

LiveDataBus的封装

  1. 通过map维护一个消息事件和MutableLiveData的映射关系,MutableLiveData的类型默认为Object,接收任意类型,实现总线通信
  2. 将LiveDataBus封装为一个单例类
  3. 消息注册时,如果当前map中不存在,则先将消息和对应的MutableLiveData对象放入维护的map中,添加映射关系,返回当前map中缓存的MutableLiveData对象

粘性消息问题解决

具体现象:当前Activity给未启动的Activity发送一个消息,Activity在启动时能收到之前发送的消息

LiveData源码

LifecycleBoundObserver

1
2
3
4
5
6
7
8
9
@Override
public void onStateChanged(@NonNull LifecycleOwner source,
@NonNull Lifecycle.Event event) {
if (mOwner.getLifecycle().getCurrentState() == DESTROYED) {
removeObserver(mObserver);
return;
}
activeStateChanged(shouldBeActive());
}

LiveData的version初始化是-1,每次LiveData设置值都会version加1

1
2
3
4
5
6
7
8
private int mVersion = START_VERSION;
@MainThread
protected void setValue(T value) {
assertMainThread("setValue");
mVersion++;
mData = value;
dispatchingValue(null);
}

LifeCircleOwner的状态变化时,会调LiveData.ObserverWrapper的activeStateChanged方法,如果这个时候ObserverWrapper的状态是active,就会调用LiveData的dispatchingValue,继续跟踪considerNotify,如果ObserverWrapper的mLastVersion小于LiveData的mVersion,会调mObserver的onChanged方法。所以LiveDataBus注册一个新的订阅者就会收到消息,即使消息发生在订阅之前。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
void activeStateChanged(boolean newActive) {
if (newActive == mActive) {
return;
}
// immediately set active state, so we'd never dispatch anything to inactive
// owner
mActive = newActive;
boolean wasInactive = LiveData.this.mActiveCount == 0;
LiveData.this.mActiveCount += mActive ? 1 : -1;
if (wasInactive && mActive) {
onActive();
}
if (LiveData.this.mActiveCount == 0 && !mActive) {
onInactive();
}
if (mActive) {
dispatchingValue(this);
}
}

void dispatchingValue(@Nullable ObserverWrapper initiator) {
if (mDispatchingValue) {
mDispatchInvalidated = true;
return;
}
mDispatchingValue = true;
do {
mDispatchInvalidated = false;
if (initiator != null) {
considerNotify(initiator);
initiator = null;
} else {
for (Iterator<Map.Entry<Observer<? super T>, ObserverWrapper>> iterator =
mObservers.iteratorWithAdditions(); iterator.hasNext(); ) {
considerNotify(iterator.next().getValue());
if (mDispatchInvalidated) {
break;
}
}
}
} while (mDispatchInvalidated);
mDispatchingValue = false;
}

private void considerNotify(ObserverWrapper observer) {
if (!observer.mActive) {
return;
}
// Check latest state b4 dispatch. Maybe it changed state but we didn't get the event yet.
//
// we still first check observer.active to keep it as the entrance for events. So even if
// the observer moved to an active state, if we've not received that event, we better not
// notify for a more predictable notification order.
if (!observer.shouldBeActive()) {
observer.activeStateChanged(false);
return;
}
if (observer.mLastVersion >= mVersion) {
return;
}
observer.mLastVersion = mVersion;
//noinspection unchecked
observer.mObserver.onChanged((T) mData);
}

Hook

在事件传递过程中拦截并监控事件的传输,修改事件传递流程
只要调用setValue版本号mVersion就会加1,此时版本号已经不一致导致onChange的调用,触发粘性事件,如果将mObservers.observer.mLastVersion修改为mVersion当前版本,就会在mObservers.observer.onChange调用前,也就是数据变化通知前return结束,这样就不调onChange方法
mObservers是Map对象,Map的item是键值对,observer是键值对的value,反射Map获取到Entry并获取到value也就是observer
继承MutableLiveData,重写observe方法,在注册监听时进行hook逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class LiveDataBus {
private final Map<String, BusMutableLiveData<Object>> bus;

private LiveDataBus() {
bus = new HashMap<>();
}

private static class SingletonHolder {
private static final LiveDataBus DEFAULT_BUS = new LiveDataBus();
}

public static LiveDataBus get() {
return SingletonHolder.DEFAULT_BUS;
}

public <T> MutableLiveData<T> with(String key, Class<T> type) {
if (!bus.containsKey(key)) {
bus.put(key, new BusMutableLiveData<>());
}
return (MutableLiveData<T>) bus.get(key);
}

public MutableLiveData<Object> with(String key) {
return with(key, Object.class);
}



private static class BusMutableLiveData<T> extends MutableLiveData<T> {

// 生命周期感知的注册监听处理,去除粘性事件
@Override
public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) {
super.observe(owner, observer);
try {
hook(observer);
} catch (Exception e) {
e.printStackTrace();
}
}


// 去除粘性事件
private void hook(@NonNull Observer<T> observer) throws Exception {
//get wrapper's version
Class<LiveData> classLiveData = LiveData.class;
Field fieldObservers = classLiveData.getDeclaredField("mObservers");
fieldObservers.setAccessible(true);
Object objectObservers = fieldObservers.get(this);
Class<?> classObservers = objectObservers.getClass();
Method methodGet = classObservers.getDeclaredMethod("get", Object.class);
methodGet.setAccessible(true);
Object objectWrapperEntry = methodGet.invoke(objectObservers, observer);
Object objectWrapper = null;
if (objectWrapperEntry instanceof Map.Entry) {
objectWrapper = ((Map.Entry) objectWrapperEntry).getValue();
}
if (objectWrapper == null) {
throw new NullPointerException("Wrapper can not be bull!");
}
Class<?> classObserverWrapper = objectWrapper.getClass().getSuperclass();
Field fieldLastVersion = classObserverWrapper.getDeclaredField("mLastVersion");
fieldLastVersion.setAccessible(true);
//get livedata's version
Field fieldVersion = classLiveData.getDeclaredField("mVersion");
fieldVersion.setAccessible(true);
Object objectVersion = fieldVersion.get(this);
//set wrapper's version
fieldLastVersion.set(objectWrapper, objectVersion);
}
}
}

对非生命周期感知的observeForever方法,生成的wrapper不是LifecycleBoundObserver而是AlwaysActiveObserver,没有办法在observeForever调用完后再改AlwaysActiveObserver的version,因为注册监听时直接调了wrapper.activeStateChanged(true)而不是在LifeCircleOwner的状态变化时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  @MainThread
public void observeForever(@NonNull Observer<? super T> observer) {
assertMainThread("observeForever");
AlwaysActiveObserver wrapper = new AlwaysActiveObserver(observer);
ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper);
if (existing != null && existing instanceof LiveData.LifecycleBoundObserver) {
throw new IllegalArgumentException("Cannot add the same observer"
+ " with different lifecycles");
}
if (existing != null) {
return;
}
wrapper.activeStateChanged(true);
}

private class AlwaysActiveObserver extends ObserverWrapper {

AlwaysActiveObserver(Observer<? super T> observer) {
super(observer);
}

@Override
boolean shouldBeActive() {
return true;
}
}

可以用ObserverWrapper,包装真正的回调传给observeForever,回调时检查调用栈,如果回调是observeForever方法,那么就不调真正的回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 包装类包裹真正的Observer,处理非生命周期感知的注册监听
private static class ObserverWrapper<T> implements Observer<T> {

private Observer<T> observer;

public ObserverWrapper(Observer<T> observer) {
this.observer = observer;
}

@Override
public void onChanged(@Nullable T t) {
if (observer != null) {
// 目标方法不调onChanged
if (isCallOnObserve()) {
return;
}
observer.onChanged(t);
}
}

private boolean isCallOnObserve() {
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
if (stackTrace != null && stackTrace.length > 0) {
for (StackTraceElement element : stackTrace) {
// 如果当前是LiveData对象且为observeForever方法
if ("android.arch.lifecycle.LiveData".equals(element.getClassName()) &&
"observeForever".equals(element.getMethodName())) {
return true;
}
}
}
return false;
}
}

修改BusMutableLiveData增加对非生命周期感知的注册监听处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
  private static class BusMutableLiveData<T> extends MutableLiveData<T> {

private Map<Observer, Observer> observerMap = new HashMap<>();

// 生命周期感知的注册监听处理,去除粘性事件
@Override
public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) {
super.observe(owner, observer);
try {
hook(observer);
} catch (Exception e) {
e.printStackTrace();
}
}

// 非生命周期感知的注册监听处理,去除粘性事件
@Override
public void observeForever(@NonNull Observer<T> observer) {
if (!observerMap.containsKey(observer)) {
observerMap.put(observer, new ObserverWrapper(observer));
}
super.observeForever(observerMap.get(observer));
}

// 非生命周期感知取消注册监听
@Override
public void removeObserver(@NonNull Observer<T> observer) {
Observer realObserver = null;
if (observerMap.containsKey(observer)) {
realObserver = observerMap.remove(observer);
} else {
realObserver = observer;
}
super.removeObserver(realObserver);
}

// 去除粘性事件
private void hook(@NonNull Observer<T> observer) throws Exception {
//get wrapper's version
Class<LiveData> classLiveData = LiveData.class;
Field fieldObservers = classLiveData.getDeclaredField("mObservers");
fieldObservers.setAccessible(true);
Object objectObservers = fieldObservers.get(this);
Class<?> classObservers = objectObservers.getClass();
Method methodGet = classObservers.getDeclaredMethod("get", Object.class);
methodGet.setAccessible(true);
Object objectWrapperEntry = methodGet.invoke(objectObservers, observer);
Object objectWrapper = null;
if (objectWrapperEntry instanceof Map.Entry) {
objectWrapper = ((Map.Entry) objectWrapperEntry).getValue();
}
if (objectWrapper == null) {
throw new NullPointerException("Wrapper can not be bull!");
}
Class<?> classObserverWrapper = objectWrapper.getClass().getSuperclass();
Field fieldLastVersion = classObserverWrapper.getDeclaredField("mLastVersion");
fieldLastVersion.setAccessible(true);
//get livedata's version
Field fieldVersion = classLiveData.getDeclaredField("mVersion");
fieldVersion.setAccessible(true);
Object objectVersion = fieldVersion.get(this);
//set wrapper's version
fieldLastVersion.set(objectWrapper, objectVersion);
}
}
}

发送事件

1
LiveDataBus.get().with("test_event").setValue("this is a message");

接收事件

1
2
3
4
5
6
LiveDataBus.get().with("test_event", String.class)
.observe(this, new Observer<String>() {
@Override
public void onChanged(@Nullable String aBoolean) {
}
});