Kotlin Flow 操作符源码解析

drop

  • 返回忽略前 [count] 个元素的流。
public fun <T> Flow<T>.drop(count: Int): Flow<T> {
 // 如果 [count] 小于0,则抛出 [IllegalArgumentException]。
 require(count >= 0) { "Drop count should be non-negative, but had $count" }
 return flow {
 var skipped = 0 // 默认忽略次数0
 collect { value -> // 如果忽略次数大于等于count则发射元素、否则忽略次数自增1
 if (skipped >= count) emit(value) else ++skipped
 }
 }
}

使用示例

private suspend fun dropUse() {
 flowOf(0, 1, 2, 3, 4, 5)
 .drop(2)
 .collect { print("$it ") }
}

2 3 4 5

dropWhile

  • 忽略满足条件的元素,返回一个包含之后所有元素的流。
  • 若第一个元素就不满足,则返回一个包含所有元素的流
public fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T> = flow {
 var matched = false
 collect { value ->
 if (matched) {
 emit(value)
 } else if (!predicate(value)) {
 matched = true
 emit(value)
 }
 }
}

使用示例

private suspend fun dropWhileUse() {
 flowOf(0, 1, 2, 3, 4, 5)
 //.dropWhile { it == 0 } //结果输出 1 2 3 4 5
 //.dropWhile { it < 2 } //结果输出 2 3 4 5
 .dropWhile { it > 3 } //结果输出 0 1 2 3 4 5
 .collect { print("$it ") }
}

0 1 2 3 4 5

take

  • 返回包含前 [count] 个元素的流。
  • 当 [count] 个元素被消耗时,原Flow被取消。
public fun <T> Flow<T>.take(count: Int): Flow<T> {
 // 如果 [count] 小于0,则抛出 [IllegalArgumentException]。
 require(count > 0) { "Requested element count $count should be positive" }
 return flow {
 var consumed = 0 // 默认消耗数0
 try {
 collect { value ->
 // 消耗数自增1后、如果此时消耗数小于[count]发射元素、否则抛出Flow中断异常
 // AbortFlowException是flow内部自定义的异常、继承自CancellationException
 // 如果需要我们也可以自定义这样的异常。
 if (++consumed < count) {
 return@collect emit(value)
 } else {
 return@collect emitAbort(value)
 }
 }
 } catch (e: AbortFlowException) {
 e.checkOwnership(owner = this)
 }
 }
}

使用示例

private suspend fun takeUse() {
 flowOf(0, 1, 2, 3, 4, 5)
 .take(2)
 .collect { print("$it ") }
}

0 1

takeWhile

  • 返回满足条件元素的流。
  • 若第一个元素就不满足,则返回一个空的流
public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T> = flow {
 // This return is needed to work around a bug in JS BE: KT-39227
 return@flow collectWhile { value ->
 if (predicate(value)) {// 满足条件则发射元素、并将 collectWhile 接收的值置为 true
 emit(value)
 true
 } else {// 否则 collectWhile 接收的值置为 false, collectWhile 内部会抛出Flow中断异常
 false
 }
 }
}

使用示例

private suspend fun takeWhileUse() {
 flowOf(0, 1, 2, 3, 4, 5)
 .takeWhile { it<4 }
 .collect { print("$it ") }
}

0 1 2 3

transformWhile

  • 根据入参代码块最后返回的布尔值决定是否后续的转换,True继续执行 False不再进行后续变换。
public fun <T, R> Flow<T>.transformWhile(
 @BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Boolean
): Flow<R> =
 safeFlow {
 return@safeFlow collectWhile { value ->
 transform(value)
 }
 }

使用示例

private suspend fun transformWhileUse() {
 var isMaxSize = false
 flow {
 for (i in 0..10) {
 if (i == 2) isMaxSize = true
 emit(i)
 }
 }.transformWhile { value: Int ->
 emit("transform to string $value")
 !isMaxSize
 }.collect { println("$it") }
}

transform to string 0
transform to string 1
transform to string 2

collectWhile

  • 收集时,满足条件则正常发射元素,否则抛出异常中断Flow
  • predicate 谓词:就是接收一个最终返回布尔值的代码块
internal suspend inline fun <T> Flow<T>.collectWhile(crossinline predicate: suspend (value: T) -> Boolean) {
 val collector = object : FlowCollector<T> {
 override suspend fun emit(value: T) {
 if (!predicate(value)) {
 throw AbortFlowException(this)
 }
 }
 }
 try {
 collect(collector)
 } catch (e: AbortFlowException) {
 e.checkOwnership(collector)
 }
}
作者:星野無上

%s 个评论

要回复文章请先登录注册