본문 바로가기
코틀린

코루틴을 사용한 동시성, 병렬처리 - 1

by RWriter 2022. 1. 14.
반응형

비동기, 논블록킹을 다룰 수 있는 기술은 매우 중요하다.

 

모바일 웹 화면의 경우 기본은 싱글 스레드로 작동하는데 비동기,논블록킹을 적용하지 않으면 화면의 여러 인터렉션을 동시에 처리할 수가 없다. (버튼을 누르면 모든게 멈추고 하나씩만 처리한다고 생각해보면 된다.)

 

탐캣 같은 웹 서버는 동시에 여러 요청을 받기 위해서 스레드 풀을 만들어 각 요청당 하나의 스레드가 비즈니스 로직을 처리하게 되어있지만 모든 로직을 스레드 하나로 처리하기에는 비효율적인 부분이 많다. (예를 들어 서로 무관한 네트워크 요청을 동시에 5개를 요청해야 하는데, 한번에 할 수가 없어서 순서대로 5번을 요청해야 하는 경우)

 

 

코루틴은 이런 비동기, 논블록킹 패러다임을 다룰 수 있게 해주는 개념이고

코틀린을 사용한다면 코루틴에 대해 학습해보는 것이 좋다. 

 

 

코루틴을 묘사하는 말들이 있다.

  • 경량화된 스레드 (ightweight thread)
  • 자바스크립트의 async, await 와 같다.
  • 코루틴을 사용한 코드는 마치 동기식으로 짠듯하다. 

 

경량화된 스레드라는 표현은 마치 스레드처럼 사용할 수 있지만 훨씬 가볍다라는 뜻인데, 아래와 같은 예시처럼 이다.

Thread.sleep(1000) // thread
delay(1000) // coroutine

 

그렇다고 코루틴이 스레드를 대체하는 자리는 아니다.

 

스레드는 JVM 단계에서의 개념이고

코루틴은 컴파일 단계(코드레벨)에서의 개념이다.

 

코루틴은 suspend 라는 키워드를 사용하는데 (async, await 같은) 이는 컴파일 단계에 감쪽같이 사라지고, 대신에 코드가 조금 조작되는 것이라고 보면 된다. 

 

정확하게는 suspend라는 키워드를 사용해 Continuation Passing Style (CPS) 이 가능하도록 컴파일러가 도와주는 것이라고 볼 수 있다.

메서드(함수) 특정 위치에서 코드 실행의 제어(continuation)를 협력코드블럭(aka. 코루틴) 한테 넘길 수 있다.

return 이 여러개 라고 비유할 수도 있다.

 

 

적절한지 모르겠지만.. 예시..

  • b코드블럭: a야 그다음은 너가 실행해~ 나는 내 일이나 하고 있을게
  • a코드블럭: 알았어~ 내가 어디까지만 실행하고 다시 넘겨줄게~

코드레벨에서의 변화이기 때문에 코루틴도 IO 성격에 따라 스레드 풀을 사용하기도 하고, 사용하지 않기도 한다.

 

https://resources.jetbrains.com/storage/products/kotlinconf2017/slides/2017+KotlinConf+-+Deep+dive+into+Coroutines+on+JVM.pdf 

 

Λrrow

Functional companion to Kotlin's Standard Library

arrow-kt.io

 

 

 

코루틴을 사용한 병렬처리

아래는 동시에 두 가지 코루틴을 실행시키고 프린트를 찍는 예시이다.

(직접 실행하면서 이리저리 바꿔보는 것을 추천)

suspend fun parallelLaunchDelay() = coroutineScope {
    launch {
        delay(1000) // 1. 실행흐름 밖으로 넘기고, 자기 할일
        println("print after 1 second same time. job1")
    }
    launch {
        delay(1000) // 2. 실행흐름 밖으로 넘기고, 자기 할일
        println("print after 1 second same time. job2")
    }
    println("print first") // 3. 위에 두개가 다 넘겨서 이쪽을 실행
    delay(2000) // 4. 두개 launch 밖의 코루틴 범위에서 기다림
    
}

fun main() = runBlocking<Unit> {
    parallelLaunchDelay()
}

/**
print first
print after 1 second same time. job1
print after 1 second same time. job2
/**

 

처음 보는 키워드, 함수에 대해서 정리를 하면

  • suspend : 코루틴 안에서만 호출될 수 있는 키워드.  coroutineScope, delay 도 suspend 함수이다. 코드의 실행흐름이 suspend 함수를 만나면 실행 제어흐름이 호출자한테로(밖으로) 넘어간다. 
  • coroutineScope : 코루틴의 범위(scope) 를 나타낸다. 코루틴은 스코프에 어떤지에 따라서 예외를 전파시킬 수도, 아닐 수도 있다. 올바른 에러 전파 범위를 지정하기 위해서는 필수이다. 메서드를 리팩토링 하듯이 사용해야 한다고 생각한다.
  • launch : 코루틴 builder라고 한다. 코루틴을 만들어 실행시키는 함수이며 응답값으로 Job 객체를 받을 수 있다. suspend 함수가 아니다.
  • delay : 코루틴을 일시적으로 멈춘다. suspend 함수이다.
  • runBlocking : suspend 세계와 블록킹 세계와 연결해주는 함수. runBlocking 을 호출한 스레드는 runBlocking 이 다 끝날때 까지 기다리게 된다. 

runBlocking 의 바디에 작성하는 함수는 CoroutineScope 를 receiver로 하는 함수를 넣는 것이며 suspend 이기 때문에 runBlocking 안에서 suspend 함수를 호출할 수 있다.

 

어떻게 코루틴이 동시에 두개의 잡을 실행시켰는지는 위 주석을 따라 읽어보면 된다. 코드에서 suspend는 delay 뿐이므로 delay를 만날 때마다 실행의 제어권을 호출부로 다시 넘겼다.

 

 

 

스레드 Sleep 이 포함된 병렬처리

delay를 sleep 으로 바꾸었다. 하지만 이것은 print가 동시에 찍히지 않고 1초 2초 후에 찍히게 된다. 왜 그럴까?

suspend fun noParallelLaunchSleep() = coroutineScope {
    launch {
        Thread.sleep(1000)
        println("print after 1 second. job1")
    }
    launch {
        Thread.sleep(1000)
        println("print after 2 second. job2")
    }
    println("print first")
    Thread.sleep(1000)
}

fun main() = runBlocking<Unit> {
    noParallelLaunchSleep()
}

/**
print first
print after 1 second. job1
print after 2 second. job2
**/

 

 

 

 

코루틴 Context, Dispatcher

코루틴이 실행되는 Context 를 정의할 수 있다. (context는 해석하면 맥락 인데, 프로그래밍 세계에서 context라는 개념이 참 많다.)

그리고 Context 에 포함되는 Dispatcher 가 있는데, 코루틴의 Worker 를 할당하고 값을 받아오는 역할을 한다. 

 

어떤 context 이냐에 따라서 코루틴이 동작하는 방식을 제어할 수 있다.

 

결론적으로 아래 두가지 방식으로 Context, 여기서는 Dispatcher를 지정해줄 수 있다.


suspend fun parallelLaunchSleep_1() = coroutineScope {
    launch(Dispatchers.IO) {
        Thread.sleep(1000)
        println("print after 1 second same time. job1. ${Thread.currentThread()}")
    }
    launch(Dispatchers.IO) {
        Thread.sleep(1000)
        println("print after 1 second same time. job2. ${Thread.currentThread()}")
    }
    Thread.sleep(1000)
}

suspend fun parallelLaunchSleep_2() = coroutineScope {
    withContext(Dispatchers.IO) {
        launch {
            Thread.sleep(1000)
            println("print after 1 second same time. job1. ${Thread.currentThread()}")
        }
        launch {
            Thread.sleep(1000)
            println("print after 1 second same time. job2. ${Thread.currentThread()}")
        }
    }
    Thread.sleep(1000)
}

 

코루틴 가이드에 의하면 Blocking 되는 로직이 있을 때 Dispatcher.IO 를 사용하라고 권고하고 있다. 

쉽게 스레드 풀을 할당해주는 것인데, 풀 사이즈를 제어하고 싶으면 시스템 프로퍼티를 바꿔주거나, 스레드풀을 직접 만들어 사용하라고 한다.

 

TASK_PROBABLY_BLOCKING 파라미터를 넘겨주어 블록킹 연산이라는 힌트를 주고 있다.

 

직접 디스패처 만들기

val myDispatcher = newFixedThreadPoolContext(100, "my-pool")

 

 

디스패처를 지정하지 않으면 기본적으로 하나의 스레드가 여러 코루틴을 처리하려 하기 때문에 억지로 Thread.sleep 을 하게 되면 어쩔 수 없이 스레드가 블록킹 된다.

 

db io, network io, file io 등 실제 io가 일어나는 경우에는 꼭 디스패처를 지정해야 한다.

 

delay를 사용한 예제에서의 코루틴 로그

VM 파라미터를 아래와 같이 주게 되면 Thread.currentThread().name 에서 코루틴 정보를 볼 수 있다.

/**
print first
print after 1 second same time. job1. main @coroutine#2
print after 1 second same time. job2. main @coroutine#3
**/

 

 

Launch Job 사용하기

launch 는 코루틴을 실행하는 빌더이며, 응답으로 Job 객체를 얻어오는데 이를 활용하여 불필요한 delay 를 없앨 수 있다.

suspend fun parallelLaunchDelay_Join() = coroutineScope {
    val job1: Job = launch {
        delay(1000)
        println("print after 1 second same time. job1")
    }
    val job2: Job = launch {
        delay(1000)
        println("print after 1 second same time. job2")
    }
    println("print first")
    joinAll(job1, job2) // blocking point. 잡들이 끝날때까지 기다린다.
    println("print after complete two jobs")
}

/**
print first
print after 1 second same time. job1
print after 1 second same time. job2
print after complete two jobs
**/

 

 

반응형

댓글