Kotlin Flow 操作符源码解析
drop
- 返回忽略前 [count] 个元素的流。
public fun <T> Flow<T>.drop(count: Int): Flow<T> {
// 如果 [count] 小于0,则抛出 [IllegalArgumentException]。
require(count >= 0) { "Drop count should be non-negative, but had $count" }
return flow {
var skipped = 0 // 默认忽略次数0
collect { value -> // 如果忽略次数大于等于count则发射元素、否则忽略次数自增1
if (skipped >= count) emit(value) else ++skipped
}
}
}
使用示例
private suspend fun dropUse() {
flowOf(0, 1, 2, 3, 4, 5)
.drop(2)
.collect { print("$it ") }
}
2 3 4 5
dropWhile
- 忽略满足条件的元素,返回一个包含之后所有元素的流。
- 若第一个元素就不满足,则返回一个包含所有元素的流
public fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T> = flow {
var matched = false
collect { value ->
if (matched) {
emit(value)
} else if (!predicate(value)) {
matched = true
emit(value)
}
}
}
使用示例
private suspend fun dropWhileUse() {
flowOf(0, 1, 2, 3, 4, 5)
//.dropWhile { it == 0 } //结果输出 1 2 3 4 5
//.dropWhile { it < 2 } //结果输出 2 3 4 5
.dropWhile { it > 3 } //结果输出 0 1 2 3 4 5
.collect { print("$it ") }
}
0 1 2 3 4 5
take
- 返回包含前 [count] 个元素的流。
- 当 [count] 个元素被消耗时,原Flow被取消。
public fun <T> Flow<T>.take(count: Int): Flow<T> {
// 如果 [count] 小于0,则抛出 [IllegalArgumentException]。
require(count > 0) { "Requested element count $count should be positive" }
return flow {
var consumed = 0 // 默认消耗数0
try {
collect { value ->
// 消耗数自增1后、如果此时消耗数小于[count]发射元素、否则抛出Flow中断异常
// AbortFlowException是flow内部自定义的异常、继承自CancellationException
// 如果需要我们也可以自定义这样的异常。
if (++consumed < count) {
return@collect emit(value)
} else {
return@collect emitAbort(value)
}
}
} catch (e: AbortFlowException) {
e.checkOwnership(owner = this)
}
}
}
使用示例
private suspend fun takeUse() {
flowOf(0, 1, 2, 3, 4, 5)
.take(2)
.collect { print("$it ") }
}
0 1
takeWhile
- 返回满足条件元素的流。
- 若第一个元素就不满足,则返回一个空的流
public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T> = flow {
// This return is needed to work around a bug in JS BE: KT-39227
return@flow collectWhile { value ->
if (predicate(value)) {// 满足条件则发射元素、并将 collectWhile 接收的值置为 true
emit(value)
true
} else {// 否则 collectWhile 接收的值置为 false, collectWhile 内部会抛出Flow中断异常
false
}
}
}
使用示例
private suspend fun takeWhileUse() {
flowOf(0, 1, 2, 3, 4, 5)
.takeWhile { it<4 }
.collect { print("$it ") }
}
0 1 2 3
transformWhile
- 根据入参代码块最后返回的布尔值决定是否后续的转换,True继续执行 False不再进行后续变换。
public fun <T, R> Flow<T>.transformWhile(
@BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Boolean
): Flow<R> =
safeFlow {
return@safeFlow collectWhile { value ->
transform(value)
}
}
使用示例
private suspend fun transformWhileUse() {
var isMaxSize = false
flow {
for (i in 0..10) {
if (i == 2) isMaxSize = true
emit(i)
}
}.transformWhile { value: Int ->
emit("transform to string $value")
!isMaxSize
}.collect { println("$it") }
}
transform to string 0
transform to string 1
transform to string 2
collectWhile
- 收集时,满足条件则正常发射元素,否则抛出异常中断Flow
- predicate 谓词:就是接收一个最终返回布尔值的代码块
internal suspend inline fun <T> Flow<T>.collectWhile(crossinline predicate: suspend (value: T) -> Boolean) {
val collector = object : FlowCollector<T> {
override suspend fun emit(value: T) {
if (!predicate(value)) {
throw AbortFlowException(this)
}
}
}
try {
collect(collector)
} catch (e: AbortFlowException) {
e.checkOwnership(collector)
}
}