Coroutines/Flow

[Coroutine Flow] flatMapConcat을 사용해 flow를 다른 flow로 변환하기

반응형

Flow의 Flattening Operator

flow는 데이터 파이프라인이다. 코드 상에서 데이터 파이프라인은 그 자체로 사용되는 경우는 거의 없으며 보통 다른 데이터 파이프라인들과 합쳐져 하나의 데이터 파이프라인을 완성한다. flow 또한 여러 flow가 합쳐져 하나의 flow로 만들어지기 위한 연산자를 제공하는데 데이터 파이프라인을 합치는(Flatten) 연산자여서 Flattening Operator(하나로 만드는 연산자)라 한다.

 

우리는 이번 글에서 가장 대표적인 Flattening Operator인 flatMapConcat에 대해 다뤄볼 것이다.

 

 

 flatMapConcat 은 무엇을 하는가?

flatMapConcat은 여러 flow를 연결하는(concatenating) 연산자이다. 이름에서 알 수 있듯이 flatMapConcat은 flow를 연결해 새로운 flow를 만드는 역할을 한다. flow간에 연결이 필요할 경우 flatMapConcat을 이용해 연결을 하면 된다.

 

 

flatMapConcat 내부 살펴보기

public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
    map(transform).flattenConcat()

 

flatMapConcat은 내부에서 두 개의 과정을 통해 flow를 flatten(합치기)한다.

1. transform 람다식(변수)을 받아 transform 변수에 대한 map을 수행해 flow에서 발행된 데이터를 flow로 변환한다. 이 단계에서는  flow에서 발행되는 각 데이터가 flow로 변환되어 여러개의 flow가 생성된다.

2. 1에서 생성된 flow들이 flattenConcat()을 통해 합쳐져 하나의 flow가 된다.

 

즉, flatmapConcat은 데이터를 변환하여 flow들을 새로 생성한 후 생성된 flow들을 합쳐 하나의 flow를 만들어낸다. 

 

*참조 : map 내부와 flattenConcat 내부

더보기
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->
   return@transform emit(transform(value))
}
public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
    collect { value -> emitAll(value) }
}

 

flatMapConcat 사용 예시

예를들어 다음과 같은 flow가 있다고 해보자. 이 flow는 단순히 1과 5를 emit 하는 flow이다.

 

val flow = flow<Int> {
    emit(1)
    emit(5)
}

 

 만약 이 flow에서 발행하는 값에에 대해  +1, +2, +3 을 된 값을 생성하는 flow가 있다고 해보자. 우리는 flow에서 발행하는 값을 받아 새로운 flow를 만들어야 하고 새로운 flow에서 발행되는 값은 +1,+2,+3이 수행된 값이다. 즉 1에 대해서는 2,3,4가 발행되는 flow가 생성되고 5에 대해서는 6,7,8이 발행되는 flow가 생성되어야 한다. 마지막으로 새로 생성된 2개의 flow는 하나의 flow로 합쳐져야 한다. 

 

우리는 이를 위해 flatMapConcat을 다음과 같이 사용할 수 있다.

1. CoroutineScope 내에서 flatMapConcat 수행

2. flow로부터 emit되는 데이터를 다시 flow에서 받아서 emit하기

 

fun collectWithFlatMapConcat() {
    viewModelScope.launch {
        flow.flatMapConcat { intValue -> // 1. CoroutineScope 내에서 flatMapConcat 수행
            flow { // 2. flow로부터 emit되는 데이터를 다시 flow에서 받아서 emit하기
                emit(intValue + 1)
                emit(intValue + 2)
                emit(intValue + 3)
            }
        }.collect {
             println("printed value >> $it")
        }
    }
}

 

위를 수행한 결과는 다음과 같다.

 

그림1. flatMapConcat

 1에 대한 변환이 모두 수행된 후 5에 대한 변환이 수행됨(순차적으로 처리됨)을 확인할 수 있다.

 

flatMapConcat의 한계점

flatMapConcat은 원 flow에서 발행된 데이터가 순차적으로 처리되어 새로운 flow를 만들어낸다. 이 말은 데이터 처리하는데 오래 걸리는 연산이 변환 값으로 들어올 경우 데이터가 처리되는데 오래 걸릴 것임을 뜻한다. 예를 들어 아래처럼 flatMapConcat의 변환 연산자에 delay를 줄 경우 변환 처리 과정에서 10초이상 걸리게 된다. 만약 1초에 수십개의 데이터가 flow를 통해 들어올 경우 변환된 데이터의 발행 시점과 변환되기 전 데이터의 발행 시점 사이에 많은 차이가 생길 수 있다. 

flow.flatMapConcat { intValue ->
    flow {
        emit(intValue + 1)
        delay(10000)
        emit(intValue + 2)
        emit(intValue + 3)
    }
}.collect {
     println("printed value >> $it")
}

 

즉, 변환을 하는데 시간이 오래 소모될 경우 아래 그림3과 같이 변환이 완료된 새로운 데이터 파이프라인의 발행 시점이 점점 밀리게 된다. 즉, 첫 데이터 파이프라인의 데이터 발행 시점과 소비자(Consumer)의 데이터 소비 시점 사이에 많은 차이가 생기게 될 수 있다.

그림2. flatMapConcat의 한계점

 

 

반응형
 

Kotlin 사용자 모임 오픈 카톡

오셔서 궁금한 점을 질문해보세요!
비밀번호 : kotlin22

open.kakao.com

 

이 글의 저작권은 Kotlin World 에 있습니다. 글, 이미지 무단 재배포 및 변경을 금지합니다.