0%

封装FlowEventBus事件总线


  • Kotlin是Android开发的趋势,协程是Kotlin必不可少的一部分,而Flow是协程的一部分,是类似RxJava的基于流的一种链式调用的异步响应式编程框架,可以说用kotlin开发Android是绕不开对Flow的学习和使用,关于组件间的通信,事件总线的实现方案有很多,如handler有内存泄漏问题,高耦合且不好维护,本地Broadcast和EventBus无法感知生命周期需要注册和反注册,EventBus还要配置混淆,直接使用interface又不好维护,RxBus学习成本高还依赖RxJava,自从google推出LiveData后封装了LiveDataBus但是要解决粘性事件问题和onCreate/onStop/onDestroy收不到数据问题以及postValue丢值问题,而这些问题在Flow上是不用考虑的
  • 根据大部分业务需要,采用SharedFlow去封装EventBus是目前比较完美的事件总线解决方案,这里记录一下本次封装的详细过程,同时这次封装还解决了大部分事件总线的痛点,临时事件滥用维护难度大,事件名易重复,发送接收数据类型不一致导致数据转换错误,欢迎交流和讨论~

为什么采用SharedFlow

官方推荐用Flow去替代Livedata

从设计上我们可以看出SharedFlow是高配版的LiveData,理论上LiveData能做的它也可以

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
require(replay >= 0) { "replay cannot be negative, but was $replay" }
require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" }
require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND) {
"replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy $onBufferOverflow"
}
val bufferCapacity0 = replay + extraBufferCapacity
val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0 // coerce to MAX_VALUE on overflow
return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}
  • LiveData容量是1,SharedFlow容量支持0到多个
  • LiveData无法应对背压问题,SharedFlow有缓存空间能应对背压问题
  • LiveData固定重播1个数据,SharedFlow支持重播0个到多个数据
  • LiveData只能在主线程订阅,SharedFlow支持在任意线程订阅

    适合大多数业务场景

  • 支持一对多,即一条消息支持多个订阅者
  • 具有时效性,过期的消息没有意义且不应该被延迟发送
    对照SharedFlow本身是热流,支持多个订阅者,默认重播为0,容量为0,不会出现粘性事件,没有订阅直接丢弃

具体实现

定义FlowDataEvent

  • 定义EventBus发送的数据类FlowDataEvent,keyEvent为事件名称,data为发送的任意数据类型
1
class FlowDataEvent(val keyEvent: String, val data: Any): FlowEvent

封装FlowEventBus

  • 定义EventBus的CoroutineScope
  • 定义全局的SharedFlow
  • SharedFlow转冷流与订阅者绑定,LifecycleOwner的扩展用于Fragment中订阅数据,生命周期与Fragment绑定,CoroutineScope的扩展用于ViewModel中订阅数据,生命周期与ViewModel绑定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
object FlowEventBus {
// flowEventBus对应的scope
private val flowEventBusScope = CoroutineScope(CoroutineName("FlowEventBus"))
// flowEventBus使用全局SharedFlow
private val mutableSharedFlow = MutableSharedFlow<FlowDataEvent>()

private val flowEventBus: Flow<FlowDataEvent> get() = mutableSharedFlow.asSharedFlow()

// SharedFlow绑定scope
init {
mutableSharedFlow.launchIn(flowEventBusScope)
}

/**
* 发送一个事件
*/
fun sendEvent(event: FlowDataEvent) {
flowEventBusScope.launch {
mutableSharedFlow.emit(event)
}
}


/**
* 绑定LifecycleOwner返回FlowDataEvent
*/
fun LifecycleOwner.collectDataEvent(action: suspend (e: FlowDataEvent) -> Unit): Job {
return flowEventBus.collectWhenCreated(this){
action.invoke(it)
}
}


/**
* 绑定Scope返回FlowDataEvent
*/
fun CoroutineScope.collectDataEvent(action: suspend (e: FlowDataEvent) -> Unit): Job {
return launch {
flowEventBus.collect{
action.invoke(it)
}
}
}
}

fun <T> Flow<T>.collectWhenCreated(owner: LifecycleOwner, action: suspend (value: T) -> Unit): Job = owner.lifecycleScope.launch {
owner.lifecycle.repeatOnLifecycle(Lifecycle.State.CREATED) {
collect{action.invoke(it)}
}
}

对外调用扩展

  • 根据事件名做一个String类型的扩展,LifecycleOwner和CoroutineScope适配Fragment/Activity和ViewModel场景的绑定订阅者,监听事件获取发送的数据
  • 根据需要发送的任意对象,传入事件名称构建一个FlowDataEvent对象并调用FlowEventBus发送
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    /**
    * 绑定事件,生命周期为LifecycleOwner
    */
    fun <T> String.bindFlowEvent(lifecycleOwner: LifecycleOwner, block: (T) -> Unit){
    lifecycleOwner.collectDataEvent {
    if (it.keyEvent == this) {
    val result = it.data as? T
    result?.apply {
    block.invoke(this)
    } ?: "lifecycleOwner bindFlowEvent convert T error".logD()
    }
    }
    }

    /**
    * 绑定事件,生命周期为CoroutineScope
    */
    fun <T> String.bindFlowEvent(coroutineScope: CoroutineScope, block: (T) -> Unit) {
    coroutineScope.collectDataEvent {
    if (it.keyEvent == this) {
    val result = it.data as? T
    result?.apply {
    block.invoke(this)
    } ?: "coroutineScope bindFlowEvent convert T error".logD()
    }
    }
    }


    /**
    * 创建一个当前对象为data的并发送
    */
    fun Any.sendFlowEvent(key: String){
    FlowEventBus.sendEvent(FlowDataEvent(key, this))
    }

    解决痛点

  • 临时事件滥用
    收敛到一个文件,只能根据预先定义好的事件发送和接收
  • 事件名易重复
    采用object事件类的类名作为事件名保证唯一性
  • 发送接收数据类型不一致
    抽象出一个泛型抽象类,传入泛型的具体类型为发送和接收的统一类型,保证发送接收数据类型一致

泛型抽象类

所有FlowEventBus需要发送的事件类都要继承这个抽象类,保证发送和接收数据类型一致

1
2
3
4
5
6
7
8
9
10
11
12
13
 abstract class IEvent<T>  {
open fun sendEvent(t: T){
t?.sendFlowEvent(this.javaClass.name)
}

fun bindEvent(lifecycleOwner: LifecycleOwner, block: (T) -> Unit){
this.javaClass.name.bindFlowEvent(lifecycleOwner, block)
}

fun bindEvent(coroutineScope: CoroutineScope, block: (T) -> Unit){
this.javaClass.name.bindFlowEvent(coroutineScope, block)
}
}

定义FlowEventBus的事件类

所有的事件定义在一个文件中方便维护,避免临时事件,事件名重复问题

1
2
3
object ClickItemEvent: IEvent<BottomSheetDialogHelper.SelectEntity>()

object TestEvent: IEvent<String>()

业务场景中使用

1
2
3
4
5
6
7
8
9
10
11
// 发送 
ClickItemEvent.sendEvent(selectEntity)
TestEvent.sendEvent("Test")

// 接收
ClickItemEvent.bindEvent(this) {
"${it.toJson()}".logD()
}
TestEvent.bindEvent(this) {
showToast("$it")
}