Kotlin in Action 2판 17장 Flow Operator
Kotlin Coroutines Flow Operator 에 대하여
Kotlin in Action 2판 17장 Flow Operator
17장 Flow 연산자
17.1 Flow 연산자로 Flow 조작
- Flow는 시간에 따라 나타나는 여러 연속적인 값을 처리할 수 있는 고수준 추상화
- Flow도 컬렉션과 마찬가지로 다양한 연산자(map, filter 등)를 사용하여 조작할 수 있음
- Flow 연산자는 중간 연산자와 최종 연산자로 구분됨
17.2 중간 연산자는 Upstream Flow에 적용되고, Downstream Flow를 반환함
- 중간 연산자는 Flow에 적용되어 새로운 Flow를 반환함
- 연산자가 적용되는 Flow를 upstream Flow, 중간 연산자가 반환하는 Flow를 downstream Flow라 부름
- downstream Flow는 또 다른 연산자의 upstream Flow가 될 수 있음
- 중간 연산자가 호출되어도 Flow 코드는 실제로 실행되지 않으며, 반환된 Flow는 콜드 상태임
- map, filter, onEach 등은 컬렉션의 연산자와 유사하게 동작하지만 Flow의 원소에 적용된다는 점이 다르며, Flow에는 컬렉션이나 시퀀스에서 볼 수 없는 특별한 연산자도 존재함
17.2.1 upstream 원소별 임의의 값을 배출: transform 함수
- transform 함수는 upstream Flow의 각 원소에 대해 원하는 만큼의 원소를 downstream Flow에 배출할 수 있음
예제:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() {
val names = flow {
emit("Jo")
emit("May")
emit("Sue")
}
val uppercasedNames = names.map { it.uppercase() }
runBlocking {
uppercasedNames.collect { print("$it ") }
}
// JO MAY SUE
}
transform 사용 예제
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import kotlinx.coroutines.flow.*
fun main() {
val names = flow {
emit("Jo")
emit("May")
emit("Sue")
}
val upperAndLowercasedNames = names.transform {
emit(it.uppercase())
emit(it.lowercase())
}
runBlocking {
upperAndLowercasedNames.collect { print("$it ") }
}
// JO jo MAY may SUE sue
}
- 단순 값 목록의 경우 flowOf(“Jo”, “May”, “Sue”)로 Flow 생성 가능하다는 점을 참고
17.2.2 take나 관련 연산자는 Flow를 취소할 수 있음
- 시퀀스에서 배운 take, takeWhile 등은 Flow에서도 동일하게 사용 가능함
- 이러한 연산자는 조건이 더 이상 유효하지 않을 때 upstream Flow를 취소하고, 더 이상 원소가 배출되지 않게 함
- 예: 5개의 값만 받고 싶을 때 take(5)를 호출하면 다섯 번 배출 후 upstream Flow가 취소됨
예제:
1
2
3
4
5
6
7
8
import kotlinx.coroutines.flow.*
fun main() {
val temps = getTemperatures()
temps
.take(5)
.collect { log(it) }
// 5번 배출 후 Flow 취소
}
- take는 코루틴 스코프 취소 외에도 Flow 수집을 제어적으로 취소하는 수단임
17.2.3 Flow의 각 단계 후킹: onStart, onEach, onCompletion, onEmpty
- onCompletion: Flow가 정상 종료, 취소, 예외 등 어떤 상황에서든 끝날 때 람다를 실행하는 중간 연산자임
- onStart: Flow 수집이 시작될 때, 첫 배출 전에 실행됨
- onEach: upstream Flow에서 각 원소가 배출된 후 실행됨
- onEmpty: 원소를 배출하지 않고 종료되는 경우 기본값 배출 등 실행 가능
예제:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
fun process(flow: Flow<Int>) = flow
.onEmpty {
emit(0)
println("Nothing - emitting default value!")
}
.onStart { println("Starting!") }
.onEach { println("On $it!") }
.onCompletion { println("Done!") }
.collect()
runBlocking {
process(flowOf(1, 2, 3))
// Starting!
// On 1!
// On 2!
// On 3!
// Done!
process(flowOf())
// Starting!
// Nothing - emitting default value!
// On 0!
// Done!
}
- onEach는 onEmpty보다 upstream 쪽에 위치함
- 만약 onEmpty 호출을 더 다운스트림으로 옮기면 downstream 연산자에 의해 배출된 값을 onEach가 받지 못함
17.2.4 downstream 연산자와 수집자를 위한 원소 버퍼링: buffer 연산자
- 기본적으로 Cold Flow에서 값 생산자는 수집자가 이전 원소를 처리할 때까지 대기하기 때문에, Flow의 중간연산자나 collect 내에서 많은 작업을 수행하는 경우, 각 원소의 처리가 느릴 수 있음
- buffer 연산자는 upstream과 downstream 사이에 버퍼를 두어, 수집자가 바쁜 동안에도 upstream에서 원소를 미리 생성해 버퍼에 쌓을 수 있도록 함
- 버퍼 크기 및 overflow 동작(onBufferOverflow) 조절 가능
- SUSPEND: 버퍼가 가득 차면 생산자가 대기
- DROP_OLDEST: 가장 오래된 값을 버림
- DROP_LATEST: 추가 중인 마지막 값을 버림
예제:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
fun getAllUserIds(): Flow<Int> = flow {
repeat(3) {
delay(200.milliseconds)
log("Emitting!")
emit(it)
}
}
suspend fun getProfileFromNetwork(id: Int): String {
delay(2.seconds)
return "Profile[$id]"
}
fun main() {
val ids = getAllUserIds()
runBlocking {
ids
.buffer(3)
.map { getProfileFromNetwork(it) }
.collect { log("Got $it") }
}
}
- buffer를 사용하면, 전체 처리 시간이 줄어듦
17.2.5 중간 값을 버리는 연산자: conflate 연산자
- conflate 연산자는 수집자가 바쁠 때 upstream에서 배출된 중간 값을 무시하고 최신 값만 전달함
- 빠르게 구식이 되는 값 대신 항상 최신 값만 처리할 때 유용함
예제:
1
2
3
4
5
6
7
8
val temps = getTemperatures() // 16장에서 봤던 온도를 연속적으로 반환하느 Flow 함수
temps
.onEach { log("Read $it from sensor") }
.conflate()
.collect {
log("Collected $it")
delay(1.seconds)
}
- conflate를 쓰면 upstream과 downstream 실행이 분리되며, 최신 값만 보장되며 성능을 유지할 수 있음
17.2.6 일정 시간 동안 값을 필터링하는 연산자: debounce 연산자
- debounce(ms) 연산자는 upstream에서 값이 배출된 뒤, 지정한 시간 동안 추가 값이 없으면 그 값을 downstream에 전달함
- 사용자의 입력 등 이벤트에서 불필요한 중복 처리를 막고, 입력이 멈췄을 때만 처리하는 데 유용함
- 대표적인 예시는 텍스트 입력 시 자동 검색 기능을 구현하는 경우
예제:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
val searchQuery = flow {
emit("K")
delay(100.milliseconds)
emit("Ko")
delay(200.milliseconds)
emit("Kotl")
delay(500.milliseconds)
emit("Kotlin")
}
fun main() = runBlocking {
searchQuery
.debounce(250.milliseconds)
.collect {
log("Searching for $it")
}
}
// 644 [main @coroutine#1] Searching for Kotl
// 876 [main @coroutine#1] Searching for Kotlin
- 250ms 이상 멈춘 시점의 값만 배출됨
17.2.7 Flow가 실행되는 코루틴 컨텍스트를 바꾸기: flowOn 연산자
- flowOn 연산자는 Flow 연산자 앞(upstream)에서 실행될 코루틴 디스패처(컨텍스트)를 지정함
- 중요한 점은 upstream 플로우의 dispacher에만 영향을 끼침
- 여러 번 사용할 수 있고, 각 flowOn은 자기보다 앞에 있는 연산자와 Flow에만 영향
예제:
1
2
3
4
5
6
7
8
9
10
11
12
13
runBlocking {
flowOf(1)
.onEach { log("A") }
.flowOn(Dispatchers.Default)
.onEach { log("B") }
.flowOn(Dispatchers.IO)
.onEach { log("C") }
.collect()
}
// 36 [DefaultDispatcher-worker-3 @coroutine#3] A
// 44 [DefaultDispatcher-worker-1 @coroutine#2] B
// 44 [main @coroutine#1] C
- “A”는 Default, “B”는 IO, “C”는 collect가 실행된 컨텍스트에서 실행됨
17.3 커스텀 중간 연산자 만들기
- 표준 연산자 외에 사용자가 직접 중간 연산자를 구현할 수 있음
- 중간 연산자는 upstream Flow를 collect로 수집한 뒤, 변환하거나 부수 효과를 추가한 후 downstream에 emit
- 커스텀 연산자도 collect가 호출될 때까지 콜드 상태를 유지함
예제:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
fun Flow<Double>.averageOfLast(n: Int): Flow<Double> = flow {
val numbers = mutableListOf<Double>()
collect {
if (numbers.size >= n) numbers.removeFirst()
numbers.add(it)
emit(numbers.average())
}
}
fun main() = runBlocking {
flowOf(1.0, 2.0, 30.0, 121.0)
.averageOfLast(3)
.collect { print("$it ") }
}
// 1.0 1.5 11.0 51.0
- 표준 연산자도 이와 비슷하게 구현되며, 성능 최적화 코드가 추가될 수 있음
17.4 최종 연산자는 upstream Flow를 실행하고 값을 계산함
- 중간 연산자는 주어진 Flow를 변환하지만, 실제 코드는 실행하지 않으며, 최종 연산자가 호출될 때 Flow가 실행되고 값이 계산됨
- 최종 연산자는 단일 값이나 값의 컬렉션을 계산하거나, 지정된 연산과 부수효과를 수행함
- 가장 일반적인 최종 연산자는 collect임
- collect는 Flow의 각 원소에 대해 실행할 람다를 지정할 수 있음
예제:
1
2
3
4
5
fun main() = runBlocking {
getTemperatures()
.onEach { log(it) }
.collect()
}
- collect가 호출되어야 Flow 전체가 수집될 때까지 일시 중단됨
- first, firstOrNull 등의 연산자는 첫 원소만 받고 upstream Flow를 취소할 수 있음
예제:
1
2
3
4
fun main() = runBlocking {
getTemperatures()
.first()
}
17.4.1 프레임워크는 커스텀 연산자를 제공함
- Jetpack Compose나 Compose Multiplatform과 같은 프레임워크는 Flow와 직접 통합된 커스텀 연산자와 변환 함수를 제공함
- 예를 들어 collectAsState 함수는 정수 Flow를 State 객체로 변환해줌으로서 Compose에서 UI 상태로 사용할 수 있음
예제:
1
2
3
4
5
6
7
8
9
@Composable
fun TemperatureDisplay(temps: Flow<Int>) {
val temperature = temps.collectAsState(null)
Box {
temperature.value?.let {
Text("The current temperature is $it!")
}
}
}
- Flow는 코루틴 기반 툴킷에 시간에 따른 값을 우아하게 처리할 수 있는 강력한 기능을 추가해줌
요약
- 중간 연산자는 Flow를 다른 Flow로 변환함
- upstream Flow에 대해 작동하며 downstream Flow를 반환함
- 콜드 상태이며, 최종 연산자가 호출될 때까지 실행되지 않음
- 시퀀스에 사용 가능한 많은 중간 연산자를 Flow에서도 사용할 수 있음
- Flow에는 변환(transform), 실행 콘텍스트 관리(flowOn), 특정 단계별 코드 실행(onStart, onCompletion 등) 중간 연산자가 추가로 제공됨
- collect와 같은 최종 연산자는 Flow 코드를 실제로 실행함
- 핫 Flow의 경우 collect는 구독 처리 역할을 담당함
- Flow 빌더 내에서 upstream Flow를 수집하고 변환된 원소를 배출하는 방식으로 커스텀 중간 연산자를 만들 수 있음
- Jetpack Compose와 같은 일부 외부 프레임워크는 코틀린 Flow와의 직접적 통합을 제공함
This post is licensed under CC BY 4.0 by the author.