개발/코틀린

(코틀린) 동시성

DinoDev 2022. 12. 11. 20:48
728x90
반응형

동시성(concurrent) 주제로 코루틴에 대해 알아보겠습니다.

코루틴

코틀린 프로그램에서도 자바 동시성 기본 요소를 쉽게 사용해 스레드 안전성을 이용할 수 있습니다. 하지만 동시성 연산인 Thread.sleep(), Thread.join(), Object.wait()등이 스레드를 블럭하기 때문에 문제가 남아있습니다. 스레드를 블럭하고 나중에 재개하려면 문맥 전환(context switch)를 하면서 성능이슈가 발생할 수 있습니다. 또한 자원을 많이 사용하기 때문에 비효율적입니다.

더 효율적인 접근 방법은 비동기(Asynchronous) 프로그래밍입니다. 동시성 연산에 대해 연산이 완료될 때 호출될 수 있는 람다를 제공할 수 있고, 원래 스레드는 블럭된 상태로 작업 완료를 기다리는 대신 다른 유용한 작업을 계속 수행할 수 있습니다. 하지만 이런 방법은 일반적인 명령형 제어 흐름을 사용할 수 없어서 코드 복잡도가 올라갑니다.

 

코틀린에서는 코루틴을 사용하면 명령형 스타일 코드로 비동기 계산을 할 수 있도록 처리 해줍니다.

Coroutine과 Suspend function

코루틴 라이브러리의 기본 요소는 suspend function 입니다. 이 함수는 원하는 지점에서 런타임 문맥을 저장하고 함수 실행을 중단한 다음 나중에 다시 재개 할 수 있게 합니다. susepnd라는 변경자를 사용합니다.

import kotlinx.coroutines.delay

suspend fun foo() {
    println("Task started")
    delay(1000)
    println("Task finished")
}

delay()는 코루틴 라이브러리에 정의된 일시 중단 함수입니다. 자바의 Thread.sleep()과 비슷하지만 현재 스레드를 블럭하지 않고 자신을 호출한 함수를 일시 중단시키며 스레드는 다른 작업을 수행할 수 있게 합니다.

suspend function은 일반 함수와 일시 중단 함수 둘 다 호출할 수 있지만 일반 함수는 일시 중단 함수를 호출 할 수 없습니다.

import kotlinx.coroutines.delay

fun foo() {
    println("Task started")
    delay(1000) // Suspend function 'delay' should be called only from a coroutine or another suspend function
    println("Task finished")
}

suspend function이 아니라 일반 함수에서 suspend function을 호출하고 싶다면 코루틴 빌더를 사용해야 합니다.

코루틴 빌더

launch() 함수는 코루틴을 시작하고, 코루틴을 실행 중인 작업의 상태를 추적하고 변경할 수 있는 Job 객체를 리턴합니다. 이 함수는 CoroutineScope.() -> Unit 타입의 일시 중단 람다를 받습니다.

import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import java.lang.System.currentTimeMillis

fun main() {
    val time = currentTimeMillis()

    GlobalScope.launch {
        delay(100)
        println("Task 1 finished in ${currentTimeMillis() - time} ms")
    }

    GlobalScope.launch {
        delay(100)
        println("Task 2 finished in ${currentTimeMillis() - time} ms")
    }

    Thread.sleep(200)
}

두 작업이 거의 동시에 끝났다는 점으로 보면 이 두 작업은 병렬적으로 실행됐다는 것을 알 수 있습니다. 항상 Task1이 먼저 끝나지 않을 수 있기 때문에 작업의 순서를 보장할 수 없습니다.

코루틴은 스레드보다 훨씬 가볍습니다. 코루틴은 유지해야 하는 상태가 더 간단하며 일시 중단되고 재개될 때 완전한 문맥 전환을 사용하지 않아도 되기 때문에 엄청난 수의 코루틴을 충분히 동시에 실행할 수 있습니다.

 

launch() 빌더는 결과를 만들어내지 않는 경우에 적합합니다. 결과를 만들어내야 하는 작업인 경우는 async()를 사용합니다.

async()는 Deferred 인스턴스를 만들고 await() 함수를 호출해서 결과를 전달받습니다.

import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.delay

suspend fun main() {
    val message = GlobalScope.async {
        delay(100)
        "abc"
    }

    val count = GlobalScope.async {
        delay(100)
        1 + 2
    }

    delay(200)

    val result = message.await().repeat(count.await())
    println(result) // abcabcabc
}

launch()와 async() 빌더의 경우 스레드를 블럭시키지 않지만 백그라운드 스레드를 공유하는 풀(pool)을 통해 작업을 실행합니다.

launch()의 경우 메인 스레드가 처리할 일이 없기 때문에 sleep()을 통해 기다리게 만들었습니다. 하지만 runBlocking() 빌더는 디폴트로 현재 스레드에서 실행되는 코루틴을 만들고 완료 될 때 까지 스레드의 실행을 블럭시킵니다. 그리고 그 결과를 반환합니다.

import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() {
    GlobalScope.launch {
        delay(100)
        println("Background task: ${Thread.currentThread().name}")
    }
    runBlocking {
        println("Primary task: ${Thread.currentThread().name}")
        delay(200)
    }
    // Primary task: main
    // Background task: DefaultDispatcher-worker-1
}

runBlocking은 main에서 실행되고 launch는 백그라운드 스레드에서 실행되는 것을 볼 수 있습니다.

코루틴 영역과 구조적 동시성

GlobalScope는 전역 영역으로 코루틴의 생명주기가 애플리케이션의 생명주기와 같습니다. 어떤 특정 경우에는 어떤 연산이 수행하는 중에만 동작하기를 원하는데 동시성 작업 사이의 부모 자식 관계로 인해 이런 실행 시간제한이 가능합니다. A 코루틴을 B 코루틴의 문맥에서 실행하면 B가 A의 부모가 됩니다. 이 경우는 자식의 실행이 모두 끝나야 부모가 끝날 수 있습니다.

이런 기능을 구조적 동시성(structured concurrency)이라고 부르고, 지역 변수 영역 안에서 블럭이나 서브루틴을 사용하는 경우와 구조적 동시성을 비교할 수 있습니다.

import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() {
    runBlocking {
        println("Parent task started")

        launch {
            println("Task A started")
            delay(200)
            println("Task A finished")
        }

        launch {
            println("Task B started")
            delay(200)
            println("Task B finished")
        }

        delay(100)
        println("Parent task finished")
    }
    println("Shutting down...")
}
// Parent task started
// Task A started
// Task B started
// Parent task finished
// Task A finished
// Task B finished
// Shutting down...

runBlocking으로 메인스레드를 블럭 했기 때문에 runBlocking의 작업이 모두 끝나고 Shutting down...이 출력되는 것을 확인할 수 있습니다. 

coroutineScope() 호출로 코드 블럭을 감싸면 커스텀 영역을 도입할 수 있습니다. runBlocking()과 마찬가지로 람다의 결과를 반환하고, 자식들이 완료되기 전까지 실행이 완료되지 않습니다.

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() {
    runBlocking {
        println("Custom scope start")

        coroutineScope {

            launch {
                delay(100)
                println("Task 1 finished")
            }

            launch {
                delay(100)
                println("Task 2 finished")
            }

        }
        println("Custom scope end")
    }
}
// Custom scope start
// Task 1 finished
// Task 2 finished
// Custom scope end

coroutineScope 내부에 두 개의 launch가 모두 끝나기 전까지 coroutineScope의 호출 부분을 일시 중단하게 되고 모두 끝나면 다시 함수를 재개해서 Custom scope end를 호출합니다.

코루틴 문맥

코루틴마다 CoroutineContext 인터페이스로 표현되는 문맥이 연관돼 있으며, 코루틴을 감싸는 변수 영역의 coroutineContext 프로퍼티를 통해 이 문맥에 접근할 수 있습니다. 문맥은 key-value 형태로 이뤄진 불변 컬렉션이고 코루틴에서 사용할 수 있는 여러 가지 데이터가 들어있습니다. 데이터 중 일부는 코루틴 장치에서 특별한 의미를 가지며, 런타임에 코루틴이 실행되는 방식에 영향을 미칩니다.

  • 코루틴이 실행 중인 취소 가능한 작업을 표현하는 잡(Job)
  • 코루틴과 스레드의 연관을 제어하는 디스패처(dispatcher)

일반적으로 문맥은 CoroutineContext.Element를 구현하는 아무 데이터나 저장할 수 있습니다.

import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

suspend fun main() {
    GlobalScope.launch {
        // 현재 잡을 얻고 "Task is active: true"를 출력
        println("Task is active: ${coroutineContext[Job.Key]!!.isActive}")
    }
    delay(100)
}

launch()나 async()등 코루틴 빌더에 의해 만들어지는 코루틴은 현재 문맥을 이어 받습니다. 필요하면 빌더 함수에 context 파라미터를 지정해서 새 문맥을 넘길 수 있습니다. 문맥을 만들려면 두 문맥의 데이터를 합쳐주는 + 연산자를 사용하거나, 주어진 키에 해당하는 원소를 문맥에서 제거해주는 minusKey() 함수를 사용하면 됩니다.

import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

private fun CoroutineScope.showName() {
    println("Current coroutine: ${coroutineContext[CoroutineName]?.name}")
}

fun main() {
    runBlocking {
        showName() // Current coroutine: null
        launch(coroutineContext + CoroutineName("Worker")) {
            showName() // Current coroutine: Worker
        }
    }
}

코루틴 흐름 제어와 잡 생명 주기

Job은 동시성 작업의 생명 주기를 표현하는 객체입니다. Job으로 코루틴 작업을 추적하고 취소할 수 있습니다. Job이 Active 한 상태면 아직 완료나 취소가 되지 않은 활성 상태입니다.

launch()나 async()는 CoroutineStart타입의 인자를 지정해서 잡의 초기 상태를 선택하는 기능을 제공합니다.

  • CoroutineStart.DEFAULT: Job을 즉시 시작합니다.
  • CoroutineStart.LAZY: Job을 자동으로 시작하지 말라는 뜻입니다. Job이 신규 상태가 되고 시작을 기다리게 됩니다. start()나 join()을 호출하면 활성 상태가 됩니다.
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() {
    runBlocking {
        val job = launch(start = CoroutineStart.LAZY) {
            println("Job started")
        }

        delay(100)

        println("Preparing to start...")
        job.start()
    }
}
// Preparing to start...
// Job started

활성 상태에서는 코루틴 장치가 Job을 반복적으로 일시 중단하고 재개합니다.  Job이 다른 Job을 시작할 수도 있는데, 이 경우 새 Job은 기존 Job의 자식이 됩니다. 따라서 Job의 부모 자식 관계는 동시성 계산 사이에 트리 형태의 의존 구조를 만듭니다. children 프로퍼티를 통해 완료되지 않은 자식 Job들에 접근할 수 있습니다.

import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() {
    runBlocking {
        val job = coroutineContext[Job.Key]!!

        launch { println("This is task A") }
        launch { println("This is task B") }

        println("${job.children.count()} children running") // 2 children running
    }
}

Job의 join() 메서드를 사용하면 Job이 완료될 때까지 현재 코루틴을 일시 중단시킬 수 있도록 합니다. 위 예제에서 launch들을 join()하게 되면 job.children.count()는 0이 됩니다.

import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() {
    runBlocking {
        val job = coroutineContext[Job.Key]!!

        val jobA = launch { println("This is task A") }
        val jobB = launch { println("This is task B") }

        jobA.join()
        jobB.join()

        println("${job.children.count()} children running") // 0 children running
    }
}

현재 Job 상태를 isActive, isCancelled, isComplete 프로퍼티로 추적할 수 있습니다.

Job status isActive isCompleted isCancelled
신규 false false false
활성화 true false false
완료 중 true false false
취소 중 false false true
취소됨 false true true
완료됨 false true false

취소됨과 완료됨이 둘 다 isCompleted값이 true라서 isCancelled값도 같이 비교해서 확인해야 합니다.

취소

Job의 cancel() 함수를 호출하면 Job을 취소할 수 있습니다. 

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

suspend fun main() {
    val squarePrinter = GlobalScope.launch(Dispatchers.Default) {
        var i = 1
        while (true) {
            println(i++)
        }
    }

    delay(100)
    squarePrinter.cancel()
    delay(100)
}

100 밀리초 이후에 squarePrinter를 취소했지만 내부 로직이 계속 실행됩니다. 이것은 코루틴이 취소에 적합한 코드로 동작하지 않기 때문입니다. 취소에 적합한 코드로 만들기 위해서는 코루틴이 현재 활성 상태인지 검사하는 isActive 프로퍼티를 사용해야 합니다.

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch

suspend fun main() {
    val squarePrinter = GlobalScope.launch(Dispatchers.Default) {
        var i = 1
        while (isActive) {
            println(i++)
        }
    }

    delay(100)
    squarePrinter.cancel()
    delay(100)
}

cancel() 함수를 호출하면 isActive가 false로 변경되기 때문에 취소에 적합한 코드로 동작하게 됩니다.

다른 방법은 CancellationException을 발생시키면서 취소에 반응할 수 있게 일시 중단 함수를 호출하는 것입니다. yield()는 실행 중인 Job을 일시 중단시켜서 자신을 실행 중인 스레드를 다른 코루틴에게 양보해서 일시 중단시킵니다.

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.yield

suspend fun main() {
    val squarePrinter = GlobalScope.launch(Dispatchers.Default) {
        var i = 1
        while (true) {
            yield()
            println(i++)
        }
    }

    delay(100)
    squarePrinter.cancel()
    delay(100)
}

부모 코루틴이 취소되면 자동으로 모든 자식의 실행을 취소합니다.

import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() {
    runBlocking {
        val parentJob = launch {
            println("Parent started")

            launch {
                println("Child 1 started")
                delay(500)
                println("Child 1 completed")
            }

            launch {
                println("Child 2 started")
                delay(500)
                println("Child 2 completed")
            }

            delay(500)
            println("Parent completed")
        }
        delay(100)
        parentJob.cancel()
    }
}
// Parent started
// Child 1 started
// Child 2 started

모든 작업들이 500밀리초를 기다리게 했지만 100밀리초 이후 parentJob을 cancel()했기 때문에 completed는 출력되지 않은 것을 확인할 수 있습니다.

타임아웃

특정 경우에는 Job을 무한정 기다릴 수 없기 때문에 시간제한을 걸어두는 경우가 있습니다. 이런 경우에는 withTimeout() 함수를 사용합니다. 이 함수는 TimeoutCancellationException을 throw 하기 때문에 코루틴이 취소 될 수 있습니다. 비슷한 함수로 withTimeoutOrNull()함수가 있는데 이것은 throw하지는 않고 null을 반환합니다.

import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
import java.io.File

fun main() {
    runBlocking {
        val asyncData = async { File("data.txt").readText() }
        try {
            val text = withTimeout(50) { asyncData.await() }
            println("Data loaded: $text")
        } catch (e: Exception) {
            println("Timeout exceeded")
        }
    }
}

코루틴 디스패치하기

코루틴은 스레드와 무관하게 일시 중단 가능한 계산을 구현할 수 있게 해주지만, 코루틴을 실행하려면 여전히 스레드와 연관시켜야 합니다. 코루틴을 실행할 때 사용할 스레드를 제어하는 작업을 담당하는 코루틴 디스패처(dispatcher)를 사용하면 됩니다.

디스패처는 Coroutine context의 일부입니다. 그래서 launch()나 runBlocking() 등의 코루틴 빌더 함수에 지정할 수 있습니다.

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() {
    runBlocking {
        launch(Dispatchers.Default) {
            println(Thread.currentThread().name) // DefaultDispatcher-worker-1
        }
    }
}

코루틴 라이브러리에는 기본적으로 몇 가지 디스패처를 제공합니다.

  • Dispatchers.Default: 공유 스레드 풀로, 풀 크기는 디폴트로 사용 가능한 CPU 코어 수거나 2다(둘 중 큰 값). 이 구현은 일반적으로 작업 성능이 주로 CPU 속도에 의해 결정되는 CPU 위주의 작업에 적합합니다.
  • Dispatchers.IO: 스레드 풀 기반이며 디폴트 구현과 비슷하지만, 파일을 읽고 쓰는 것처럼 잠재적으로 블러킹될 수 있는 I/O를 많이 사용하는 작업에 최적화되어 있습니다. 이 디스패처는 스레드 풀을 디폴트 구현과 함께 공유하지만, 필요에 따라 스레드를 추가하거나 종료시켜줍니다.
  • Dispatchers.Main: 사용자 입력이 처리되는 UI 스레드에서만 배타적으로 작동하는 디스패처입니다.

newFixedThreadPoolContext()나 newSingleThreadPoolContext()를 사용하면 직접 스레드 풀을 사용하거나 심지어는 스레드를 하나만 사용하는 디스패처도 만들 수 있습니다.

import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.newFixedThreadPoolContext
import kotlinx.coroutines.runBlocking

fun main() {
    newFixedThreadPoolContext(5, "WorkerThread").use { dispatcher ->
        runBlocking {
            for (i in 1..3) {
                launch(dispatcher) {
                    println(Thread.currentThread().name)
                    delay(1000)
                }
            }
        }
    }
}
// WorkerThread-1
// WorkerThread-2
// WorkerThread-3

디스패처를 명시적으로 지정하지 않으면 코루틴을 시작한 영역으로부터 디스패처가 자동으로 상속됩니다.

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() {
    runBlocking {
        println("Root: ${Thread.currentThread().name}")

        launch {
            println("Nested, inherited: ${Thread.currentThread().name}")
        }

        launch(Dispatchers.Default) {
            println("Nested, explicit: ${Thread.currentThread().name}")
        }
    }
}
// Root: main
// Nested, explicit: DefaultDispatcher-worker-1
// Nested, inherited: main

예외 처리

코루틴 빌더들은 두 가지 기본 전략 중에 하나를 따릅니다. 첫 번째는 예외를 부모 코루틴으로 전달하는 것입니다.

  • 부모 코루틴이 똑같은 오류로 취소됩니다. 이로 인해 부모의 나머지 자식도 모두 취소됩니다.
  • 자식들이 모두 취소되고 나면 부모는 예외를 코루틴 트리의 윗부분으로 전달합니다.

전역 영역에 있는 코루틴에 도달할 때까지 이 과정이 반복됩니다. 그 후 예외가 CoroutineExceptionHandler.Consider에 의해 처리됩니다.

import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() {
    runBlocking {
        launch {
            throw Exception("Error in task A")
            println("Task A completed")
        }

        launch {
            delay(1000)
            println("Task B completed")
        }

        println("Root")
    }
}
// Exception in thread "main" java.lang.Exception: Error in task A

첫 번째 launch에서 exception이 발생했기 때문에 그 exception이 부모로 전달됐고, 부모가 가지고 있는 자식들의 모든 코루틴이 취소됩니다.

에러를 핸들링하기 위해서는 CoroutineExceptionHandler를 사용해야 합니다. 이것은 현재 CoroutineContext와 같은 인자로 전달받습니다.

import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

suspend fun main() {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("Caught $exception")
    }

    GlobalScope.launch(handler) {
        launch {
            throw Exception("Error in task A")
            println("Task A completed")
        }

        launch {
            delay(1000)
            println("Task B completed")
        }

        println("Root")
    }.join()
}
// Root
// Caught java.lang.Exception: Error in task A

예외를 처리하는 다른 방법은 async() 빌더에서 사용하는 방법으로, 던져진 예외를 저장했다가 예외가 발생한 계산에 대한 await() 호출을 받았을 때 다시 던지는 것입니다.

import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking

fun main() {
    runBlocking {
        val deferredA = async {
            throw Exception("Error in task A")
            println("Task A completed")
        }
        val deferredB = async {
            println("Task B completed")
        }

        try {
            deferredA.await()
            deferredB.await()
        } catch (e: Exception) {
            println("Caught $e")
        }
        println("Root")
    }
}
// Caught java.lang.Exception: Error in task A
// Root
// Exception in thread "main" java.lang.Exception: Error in task A

await()하는 시점에 try를 걸어서 예외를 잡긴 했지만 프로그램이 예외로 종료되는 것을 확인할 수 있습니다.

자식인 deferredA가 부모를 취소시키기 위해 자동으로 예외를 다시 던지기 때문입니다. 이 동작을 변경하려면 supervisor Job을 사용해야 합니다.

supervisor Job은 취소를 아래로만 전달하기 때문에 부모나 부모의 자식들에 영향이 없습니다.

import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.supervisorScope

fun main() {
    runBlocking {
        supervisorScope {

            val deferredA = async {
                throw Exception("Error in task A")
                println("Task A completed")
            }
            val deferredB = async {
                println("Task B completed")
            }

            try {
                deferredA.await()
                deferredB.await()
            } catch (e: Exception) {
                println("Caught $e")
            }
            println("Root")
        }
    }
}
// Task B completed
// Caught java.lang.Exception: Error in task A
// Root

동시성 통신

스레드 안전성을 유지하면서 여러 동시성 작업 사이에 효율적으로 데이터를 공유할 수 있게 해주는 코루틴 라이브러리 고급 기능에 대해 소개합니다.

Channel

임의의 데이터 스트림을 코루틴 사이에 공유할 수 있는 편리한 방법입니다. Channel 인터페이스에는 기본적으로 보내는 메서드인 send()와 받는 메서드인 receive() 메서드가 있습니다.

제네릭 Channel() 함수를 사용해 채널을 만듭니다. 이 함수는 채널의 용량을 지정하는 인자를 받습니다. 채널은 기본적으로 크기가 정해진 내부 버퍼를 사용하고 이 버퍼가 가득 차면 하나 이상의 채널 원소가 상대방에 의해 수신될 때까지 send() 호출이 일시 중단됩니다. 반대로 버퍼가 비어있으면 누군가 최소 하나 이상의 원소를 채널로 send할 때까지 recevie() 호출이 일시 중단됩니다.

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.random.Random

fun main() {
    runBlocking {
        val streamSize = 5
        val channel = Channel<Int>(3) // 채널 용량 = 3

        launch {
            for (n in 1..streamSize) {
                delay(Random.nextLong(100))
                val square = n * n
                println("Sending: $square")
                channel.send(square) // suspend
            }
        }

        launch {
            for (i in 1..streamSize) {
                delay(Random.nextLong(100))
                val n = channel.receive() // suspend
                println("Receiving: $n")
            }
        }
    }
}

// Sending: 1
// Receiving: 1
// Sending: 4
// Receiving: 4
// Sending: 9
// Sending: 16
// Sending: 25
// Receiving: 9
// Receiving: 16
// Receiving: 25

Channel()함수는 채널의 동작을 바꿀 수 있는 여러 특별한 값이 있습니다.

  • Channel.UNLIMITED(= Int.MAX_VALUE): 이 경우 채널의 용량은 제한이 없고, 내부 버퍼는 필요에 의해 증가합니다. send()는 중단되는 일이 없습니다.
  • Channel.RENDEZVOUS(= 0): 이 경우 채널은 아무 내부 버퍼가 없는 랑데부 채널이 됩니다. send() 호출은 receive()를 호출하기 전까지 일시 중단되고 receive() 호출은 send() 호출 전까지 일시 중단됩니다. 채널 생성 시 용량을 지정하지 않으면 이 방식의 채널이 생성됩니다.
  • Channel.CONFLATED(= -1): 이 경우 송신된 값이 합쳐지는 채널입니다. send()로 보낸 원소가 receive()되기 전에 새로운 원소가 send()되면 덮어 씌워집니다. 이 경우 send() 함수는 일시 중단되는 일이 없습니다.
  • Channel.UNLIMITED보다 작은 임의의 양수를 지정하면 버퍼 크기가 일정하게 제한된 채널이 생깁니다.

소비자 쪽에서는 명시적인 이터레이션을 사용하지 않고 consumeEach() 함수를 통해 모든 채널 콘텐츠를 얻어서 사용할 수 있습니다.

channel.consumeEach {
    println("Receiving: $it")
    delay(200)
}

채널이 닫힌 후 send()를 호출하면 ClosedSendChannelException 예외가 발생합니다.

채널이 닫힌 후 receive()를 호출하면 버퍼에 있는 원소가 소진될 때까지 정상 적으로 원소가 전달되지만 그 이후에 ClosedSendChannelException이 발생합니다.

 

채널 통신에 참여하는 생산자와 소비자가 꼭 하나씩일 필요는 없습니다. 이런 경우를 팬아웃(fan out)라고 합니다.

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.random.Random

fun main() {
    runBlocking {
        val streamSize = 5
        val channel = Channel<Int>(2)

        launch {
            for (n in 1..streamSize) {
                val square = n * n
                println("Sending: $square")
                channel.send(square)
            }
            channel.close()
        }

        for (i in 1..3) {
            launch {
                for (n in channel) {
                    println("Receiving by consumer #$i: $n")
                    delay(Random.nextLong(100))
                }
            }
        }
    }
}

// Sending: 1
// Sending: 4
// Sending: 9
// Receiving by consumer #1: 1
// Receiving by consumer #2: 4
// Receiving by consumer #3: 9
// Sending: 16
// Sending: 25
// Receiving by consumer #1: 16
// Receiving by consumer #1: 25

생산자

동시성 데이터 스트림을 생성할 수 있는 produce()라는 코루틴 빌더가 있습니다. 이 빌더는 채널과 비슷한 send() 메서드를 제공하는 ProducerScope 영역을 도입해줍니다.

import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() {
    runBlocking {
        val channel = produce {
            for (n in 1..5) {
                val square = n * n
                println("Sending: $square")
                send(square)
            }
        }

        launch {
            channel.consumeEach {
                println("Receiving: $it")
            }
        }
    }
}

// Sending: 1
// Receiving: 1
// Sending: 4
// Sending: 9
// Receiving: 4
// Receiving: 9
// Sending: 16
// Sending: 25
// Receiving: 16
// Receiving: 25

produce()는 명시적으로 채널을 닫을 필요는 없고 코루틴이 종료되면 produce() 빌더가 채널을 자동으로 닫아줍니다.

예외 처리 관점에서 볼 때 produce()는 async()/await() 정책을 따릅니다. produce() 안에서 예외가 발생하면 예외를 저장했다가 해당 채널에 대해 receive()를 가장 처음 호출한 코루틴 쪽에 예외가 다시 던져집니다.

티커

coroutine 라이브러리에는 Ticker라고 특별한 랑데부 채널이 있습니다. 이 채널은 Unit 값을 계속 발생시키고 원소를 일정 시간만큼 지연한 다음 전달하도록 합니다. 티커를 만들려면 ticker() 함수를 사용합니다. 이 함수에는 아래와 같은 파라미터를 전달합니다.

  • delayMillis: 티커 원소의 발생 시간 간격을 밀리초 단위로 지정합니다.
  • initialDelayMillis: 티커 생성 시점과 원소가 최초로 발생하는 시점 사이의 시간 간격입니다. 디폴트 값은 delayMillis와 같습니다.
  • context: 티커를 실행할 코루틴 문맥입니다.(디폴트는 빈 문맥입니다.)
  • mode: 티커의 행동을 결정하는 TickerMode enum 입니다.
    • TickerMode.FIXED_PERIOD: 생성되는 원소 사이의 시간 간격을 지정된 지연 시간에 최대한 맞추기 위해 실제 지연 시간을 조정합니다.
    • TickerMode.FIXED_DELAY: 실제 흘러간 시간과 관계없이 delayMillis로 지정한 지연 시간만큼 시간을 지연시킨 후 다음 원소를 송신합니다.
import kotlinx.coroutines.channels.ticker
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeoutOrNull

fun main() = runBlocking {
    val ticker = ticker(100)
    println(withTimeoutOrNull(50) { ticker.receive() }) // 100 millis 전이라 null
    println(withTimeoutOrNull(60) { ticker.receive() }) // 110 millis 안에 100 millis로 값을 전달 받음
    delay(250) // 250 millis 기다리면서 ticker가 값을 send()하고 150 millis 동안 일시 중단
    println(withTimeoutOrNull(1) { ticker.receive() }) // send 된 것을 receive해서 값 전달 받음
    println(withTimeoutOrNull(60) { ticker.receive() }) // receive() 받을 당시 ticker는 50 millis가 지난 상태라서 60 millis 가 가기 전에 값을 전달 받음
    println(withTimeoutOrNull(60) { ticker.receive() }) // receive() 를 호출 했지만 100 millis안에 받지 못해서 null 전달
}

// null
// kotlin.Unit
// kotlin.Unit
// kotlin.Unit
// null

TickerMode를 FIXED_DELAY로 지정하면 결과는 아래와 같이 변합니다.

null
kotlin.Unit
kotlin.Unit
null
kotlin.Unit

실제 흘러간 시간과 상관없이 delayMillis로 지정한 지연 시간만큼 시간을 지연시킨 후 원소를 전달하기 때문에 3번째 receive() 이후 다시 100millis만큼 기다리고 원소를 전달해서 4번째는 timeout이 발생합니다.

액터

가변 상태를 스레드 안전하게 공유하는 방법을 구현하는 일반적인 방법으로 액터(actor) 모델이 있습니다. 액터는 내부 상태와 다른 액터에게 메시지를 보내서 동시성 통신을 진행할 수 있는 수단을 제공하는 객체입니다. 액터는 자신에게 들어오는 메시지를 리슨(listen)하고, 자신의 상태를 바꾸면서 메시지에 응답할 수 있으며, 다른 메시지를 보낼 수 있고, 새로운 액터를 시작할 수 있습니다. 액터의 상태는 액터 내부에 감춰져 있어서 다른 액터가 접근할 수 없고 단지 메시지를 보내고 응답을 받을 뿐입니다. 따라서 액터 모델은 락 기반의 동기화와 관련한 여러 가지 문제로부터 자유롭습니다.

 

코틀린 코루틴 라이브러리에서는 actor() 코루틴 빌더를 사용해 액터를 만듭니다. 액터는 특별한 영역(ActorScope)을 만들며, 이 영역은 기본 코루틴 영역에 자신에게 들어오는 메시지에 접근할 수 있는 수신자 채널이 추가된 것입니다. actor() 빌더는 결과를 생성해내는 것이 목적이 아닌 잡을 시작한다는 점에서 launch()와 비슷하지만 CoroutineExceptionHandler에 의존하는 launch()와 같은 예외 처리 정책을 따릅니다.

import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext

sealed class AccountMessage

class GetBalance(
    val amount: CompletableDeferred<Long>,
) : AccountMessage()

class Deposit(val amount: Long) : AccountMessage()

class Withdraw(
    val amount: Long,
    val isPermitted: CompletableDeferred<Boolean>,
) : AccountMessage()

fun CoroutineScope.accountManager(
    initialBalance: Long,
) = actor<AccountMessage> {
    var balance = initialBalance

    for (message in channel) {
        when (message) {
            is GetBalance -> {
                message.amount.complete(balance)
            }

            is Deposit -> {
                balance += message.amount
                println("Deposited ${message.amount}")
            }

            is Withdraw -> {
                val canWithdraw = balance >= message.amount
                if (canWithdraw) {
                    balance -= message.amount
                    println("Withdraw ${message.amount}")
                }
                message.isPermitted.complete(canWithdraw)
            }
        }
    }
}

private suspend fun SendChannel<AccountMessage>.deposit(
    name: String,
    amount: Long,
) {
    send(Deposit(amount))
    println("$name: deposit $amount")
}

private suspend fun SendChannel<AccountMessage>.tryWithdraw(
    name: String,
    amount: Long,
) {
    val status = CompletableDeferred<Boolean>().let {
        send(Withdraw(amount, it))
        if (it.await()) "OK" else "DENIED"
    }
    println("$name: withdraw $amount ($status)")
}

private suspend fun SendChannel<AccountMessage>.printBalance(
    name: String,
) {
    val balance = CompletableDeferred<Long>().let {
        send(GetBalance(it))
        it.await()
    }
    println("$name: balance is $balance")
}

fun main() {
    runBlocking {
        val manager = accountManager(100)
        withContext(Dispatchers.Default) {
            launch {
                manager.deposit("Client #1", 50)
                manager.printBalance("Client #1")
            }

            launch {
                manager.tryWithdraw("Client #2", 100)
                manager.printBalance("Client #2")
            }
        }
        manager.tryWithdraw("Client #0", 1000)
        manager.printBalance("Client #0")
        manager.close()
    }
}

// Client #1: deposit 50
// Deposited 50
// Withdraw 100
// Client #2: withdraw 100 (OK)
// Client #2: balance is 50
// Client #1: balance is 150
// Client #0: withdraw 1000 (DENIED)
// Client #0: balance is 50

자바 동시성 사용하기

스레드 시작하기

스레드에서 실행 가능(Runnable) 객체와 대응하는 람다와 스레드 프로퍼티들을 지정해서 thread() 함수를 사용합니다.

  • start: 스레드를 생성하자마자 시작할지 여부(디폴트는 true)
  • isDaemon: 스레드를 데몬 모드로 시작할지 여부(디폴트는 false), 데몬 스레드는 JVM의 종료에 방해하지 않고 메인 스레드가 종료될 때 자동으로 함께 종료됩니다.
  • contextClassLoader: 스레드 코드가 클래스와 자원을 적재할 때 사용할 클래스 로더(디폴트는 null)
  • name: 커스텀 스레드 이름, 디폴트는 null인데 JVM이 (Thread-1, Thread-2)와 같이 자동으로 지정합니다.
  • priority: Thread.MIN_PRIORITY(=1)부터 Thread.MAX_PRIORITY(=10) 사이의 값으로 정해지는 우선순위로, 어떤 스레드가 다른 스레드에 비해 얼마나 많은 CPU 시간을 배정받는지 결정한다. 디폴트 값은 -1이며, 이 값은 자동으로 우선순위를 정하라는 뜻입니다.
  • block: () -> Unit 타입의 함숫값으로 새 스레드가 생성되면 실행할 코드입니다.
import kotlin.concurrent.thread

fun main() {
    println("Starting a thread..")

    thread(name = "Worker", isDaemon = true) {
        for (i in 1..5) {
            println("${Thread.currentThread().name}: $i")
            Thread.sleep(150)
        }
    }

    Thread.sleep(500)
    println("Shutting down...")
}

// Starting a thread..
// Worker: 1
// Worker: 2
// Worker: 3
// Worker: 4
// Shutting down...

새 스레드가 데몬 모드로 시작했으므로, 메인 스레드가 500 밀리초 동안 슬립 한 다음 실행을 끝낼 때 이 스레드도 함께 끝나기 때문에 메시지가 4개만 출력됩니다.

 

지정한 시간 간격으로 동작을 수행하는 자바 타이머 관련 함수로 timer() 함수가 있습니다. 어떤 작업을 이전 작업이 끝난 시점을 기준으로 고정된 시간 간격으로 실행하는 타이머를 설정합니다. 그 결과 어떤 작업이 오래 걸리면 이후의 모든 실행이 연기됩니다. 따라서 이 타이머는 코루틴 티커에서 FIXED_DELAY와 비슷하게 동작합니다.

timer()는 아래 파라미터와 함께 동작합니다.

  • name: 타이머 스레드의 이름(디폴트는 null)
  • daemon: 타이머 스레드를 데몬 스레드로 할지 여부(디폴트는 false)
  • startAt: 최초로 타이머 이벤트가 발생하는 시간을 나타내는 Date 객체
  • period: 연속된 타이머 이벤트 사이의 시간 간격(밀리초 단위)
  • action: 타이머 이벤트가 발생할 때마다 실행될 TimeTask.() -> Unit 타입의 람다
import kotlin.concurrent.timer

fun main() {
    println("Starting a thread...")
    var counter = 0

    timer(period = 150, name = "Worker", daemon = true) {
        println("${Thread.currentThread().name}: ${++counter}")
    }

    Thread.sleep(500)
    println("Shutting down...")
}

// Starting a thread...
// Worker: 1
// Worker: 2
// Worker: 3
// Worker: 4
// Shutting down...

두 타이머 이벤트 사이의 시간 간격을 최대한 일정하게 맞춰주는 fixedRateTimer() 함수들도 있습니다. 이 함수는 코루틴 티커에서 FIXED_PERIOD 모드와 비슷하게 동작합니다.

동기화와 락

동기화는 특정 코드 조각이 한 스레드에서만 실행되도록 보장하기 위한 공통적인 기본 요소입니다. 이런 코드 조각을 다른 스레드가 실행하고 있다면 해당 코드에 진입하려고 시도하는 다른 스레드들은 모두 대기해야 합니다. 락으로 사용하려는 어떤 객체를 지정하는 특별한 동기화 블록을 사용해 동기화해야 하는 코드를 감쌀 수 있습니다. 

import kotlin.concurrent.thread

fun main() {
    var counter = 0
    val lock = Any()

    for (i in 1..5) {
        thread {
            synchronized(lock) {
                counter += i
                println(counter)
            }
        }
    }
}

// 1
// 6
// 10
// 13
// 15

개별 덧셈의 결과는 달라질 수 있어서 중간 결과는 다르지만 동기화로 인해 전체 합계는 항상 15가 됩니다.

 

코틀린에서 @Synchronized annotation을 사용하면 동기화를 할 수 있습니다.

import kotlin.concurrent.thread

class Counter {
    private var value = 0
    @Synchronized fun addAndPrint(value: Int) {
        this.value += value
        println(this.value)
    }
}

fun main() {
    val counter = Counter()
    for (i in 1..5) {
        thread {
            counter.addAndPrint(i)
        }
    }
}

// 1
// 4
// 9
// 13
// 15

 

 

 

 

 

728x90
반응형