1. 概要

コルーチンは、Kotlinでノンブロッキングの並行アプリケーションを構築するための推奨される方法です。 このチュートリアルでは、チャネルについて学習します。 コルーチンが相互に通信できるようにします

2. チャネルとは何ですか?

チャネルは概念的にキューに似ています。 1つ以上のプロデューサーコルーチンがチャネルに書き込みます。 1つ以上のconsumerコルーチンが同じチャネルから読み取ることができます。 チャネルには、サスペンドsend機能とサスペンドreceive機能があります。 これは、複数のコルーチンがチャネルを使用して、非ブロッキング方式で相互にデータを渡すことができることを意味します。

チャネルの例を見てみましょう。

@Test
fun should_pass_data_from_one_coroutine_to_another() {
    runBlocking {
        // given
        val channel = Channel<String>()

        // when
        launch { // coroutine1
            channel.send("Hello World!")
        }
        val result = async { // coroutine2
            channel.receive()
        }

        // then
        assertThat(result.await()).isEqualTo("Hello World!")
    }
}

まず、チャンネルを作成します。 次に、 coroutine1 を起動し、値「HelloWorld!」を送信します。 チャネルに。 最後に、asyncコルーチンビルダーを使用してcoroutine2を作成します。 Coroutine2 は、終了時に結果を返します。 coroutine2内のchannel.receive()呼び出しは、coroutine1によって書き込まれた値を返します。

3. チャネルの種類

チャネルには4つのタイプがあり、一度に保持できる値の数が異なります。 それぞれのタイプを詳しく見ていきましょう。

3.1. ランデブーチャンネル

ランデブーチャネルにはバッファがありません。 送信コルーチンは、受信コルーチンがチャネルで受信を呼び出すまで中断します。 同様に、消費コルーチンは、プロデューサーコルーチンがチャネルで送信を呼び出すまで中断します 。 引数なしでデフォルトのChannelコンストラクターを使用してランデブーチャネルを作成します。

このタイプのチャネルの例を見てみましょう。

val basket = Channel<String>()

launch { // coroutine1
    val fruits = listOf("Apple", "Orange")
    for (fruit in fruits) {
        println("coroutine1: Sending $fruit")
        basket.send(fruit)
    }
}

launch { // coroutine2
    repeat(2) {
        delay(100)
        println("coroutine2: Received ${basket.receive()}")
    }
}

このプログラムの出力を見てみましょう。

coroutine1: Sending Apple
coroutine2: Received Apple
coroutine1: Sending Orange
coroutine2: Received Orange

Coroutine1 は、値「Apple」を送信しようとし、受信者がいないため、すぐに一時停止します。 Coroutine2 はこの値を受け取り、チャネルから受け取る値がなくなったため、この値を一時停止します。 Coroutine1 は一時停止を解除し、次の値をチャネルに送信するようになりました。

3.2. バッファチャネル

名前が示すように、バッファリングされたチャネルには事前定義されたバッファがあります。 Channelコンストラクターでバッファーの容量を指定できます。

前の例を変更して、バッファリングされたチャネルの例を見てみましょう。

val basket = Channel<String>(1)

launch { // coroutine1
    val fruits = listOf("Apple", "Orange", "Banana")
    for (fruit in fruits) {
        println("coroutine1: Sending $fruit")
        basket.send(fruit)
    }
}

launch { // coroutine2
    repeat(3) {
        delay(100)
        println("coroutine2: Received ${basket.receive()}")
    }
}

このプログラムの出力を見てみましょう。

coroutine1: Sending Apple
coroutine1: Sending Orange
coroutine2: Received Apple
coroutine1: Sending Banana
coroutine2: Received Orange
coroutine2: Received Banana

今回は、coroutine1が中断せずに「Apple」と書き込みます。 ただし、「オレンジ」を書き込もうとすると中断します。 これは、バッファ容量が1のチャネルを作成したためです。

したがって、現時点でこの値を受信するレシーバーがない場合でも、バッファーに1つの値を保持できますが、 coroutine1 は、バッファーがいっぱいであるため、チャネルにさらに値を書き込む前に待機(一時停止)する必要があります。

coroutine2 がバッファーから値を読み取ると、 coroutine1 は一時停止を解除し、次の値をチャネルに書き込みます。

3.3. 無制限のチャネル

無制限のチャネルには無制限の容量のバッファがあります。 ただし、バッファが過負荷になり、使用可能なすべてのメモリが使い果たされると、OutOfMemoryErrorが発生する可能性があることに注意してください。 チャネルコンストラクターに特別な定数UNLIMITEDを提供することにより、無制限のチャネルを作成できます。

このチャネルの例を見てみましょう。

val channel = Channel<Int>(UNLIMITED)

launch { // coroutine1
    repeat(100) {
        println("coroutine1: Sending $it")
        channel.send(it)
    }
}

launch { // coroutine2
    repeat(100) {
        println("coroutine2: Received ${channel.receive()}")
    }
}

このプログラムの出力を調べてみましょう。

coroutine1: Sending 0
coroutine1: Sending 1


...

coroutine1: Sending 98
coroutine1: Sending 99
coroutine2: Received 0
coroutine2: Received 1

...

coroutine2: Received 98
coroutine2: Received 99

ご覧のとおり、 coroutine1 は、無制限のバッファ容量のおかげで、中断することなく100個すべての値をチャネルに書き込みます。

3.4. 収縮したチャネル

混同されたチャネルでは、最後に書き込まれた値が以前に書き込まれた値をオーバーライドします。 したがって、チャネルのsendメソッドが中断することはありません。 receive メソッドは、最新の値のみを受け取ります。

このタイプのチャネルの例を見てみましょう。

val basket = Channel<String>(CONFLATED)

launch { // coroutine1
    val fruits = listOf("Apple", "Orange", "Banana")
    for (fruit in fruits) {
        println("coroutine1: Sending $fruit")
        basket.send(fruit)
    }
}

launch { // coroutine2
    println("coroutine2: Received ${basket.receive()}")
}

このプログラムの出力を調べてみましょう。

coroutine1: Sending Apple
coroutine1: Sending Orange
coroutine1: Sending Banana
coroutine2: Received Banana

coroutine1 はチャネルに3つの値を送信しますが、coroutine2は最後の値のみを受信することがわかります。 これは、coroutine2の消費が遅いためです。 バスケットから読み取るまでに、coroutine1は以前に書き込まれた値を上書きしました。

4. チャネルを使用したプロデューサー-コンシューマーの実装

並行プログラムでは、値のシーケンスを生成するプログラムを実装する必要があることがよくあります。 別のプログラムは、これらの値が利用可能になると、それらを消費します。

2つのプログラムは同時に実行されますが、相互に値を渡すための通信メカニズムを共有しています。 これは一般にプロデューサー-コンシューマーパターンとして知られています。

Kotlinコルーチンとチャネルを使用して生産者/消費者パターンを実装する方法を見てみましょう。

4.1. 1人の生産者から消費する1人の消費者

produce coroutine builderメソッドを使用して、producercoroutineを作成できます。

プロデューサーを作成する方法を見てみましょう。

fun CoroutineScope.produceFruits(): ReceiveChannel<String> = produce {
    val fruits = listOf("Apple", "Orange", "Apple")
    for (fruit in fruits) send(fruit)
}

ここで、produceコルーチンがReceiveChannelを返すことに注意してください。 ReceiveChannelには受信メソッドのみがあります。 sendメソッドはありません。 これは、別のコルーチンがこの出力チャネルからのみ読み取ることができることを意味します。

定期的にforループ構文を使用して、ReceiveChannelに存在する値を反復処理できます。

ここで、プロデューサーからの値をどのように消費できるかを見てみましょう。

val fruitChannel = produceFruits()
for (fruit in fruitChannel) {
    println(fruit)
}

このコンシューマーの出力を見てみましょう。

Apple
Orange
Apple
End!

ご覧のとおり、コンシューマーコードは、プロデューサーによって生成された順序で値を受け取ります。 生産者と消費者のコルーチンは同時に実行されます。 彼らはfruitChannelを使用して相互に通信します。

4.2. 1つの生産者から消費する複数の消費者

1つのプロデューサーによって生成された値を消費する複数のコンシューマーを作成できます。 このようにして、複数のコンシューマーに作業を分散できます

1秒あたり10個のピザ注文を生成するプロデューサーを作成しましょう。

fun CoroutineScope.producePizzaOrders(): ReceiveChannel<String> = produce {
    var x = 1
    while (true) {
        send("Pizza Order No. ${x++}")
        delay(100)
    }
}

ピザ注文プロセッサを作成しましょう–消費者:

fun CoroutineScope.pizzaOrderProcessor(id: Int, orders: ReceiveChannel<String>) = launch {
    for (order in orders) {
        println("Processor #$id is processing $order")
    }
}

このコルーチンは、ordersチャネルを入力パラメーターとして受け取ります。 新しいピザの注文は、このチャネルに到着します。

次に、ピザ注文プロセッサの3つのインスタンスを実行し、それらの間で作業を分散させましょう。

fun main() = runBlocking {
    val pizzaOrders = producePizzaOrders()
    repeat(3) {
        pizzaOrderProcessor(it + 1, pizzaOrders)
    }

    delay(1000)
    pizzaOrders.cancel()
}

このプログラムの出力を調べてみましょう。

Processor #1 is processing Pizza Order No. 1
Processor #1 is processing Pizza Order No. 2
Processor #2 is processing Pizza Order No. 3
Processor #3 is processing Pizza Order No. 4
Processor #1 is processing Pizza Order No. 5
Processor #2 is processing Pizza Order No. 6
Processor #3 is processing Pizza Order No. 7
Processor #1 is processing Pizza Order No. 8
Processor #2 is processing Pizza Order No. 9
Processor #3 is processing Pizza Order No. 10

注文処理作業は、3つのプロセッサ間でほぼ均等に分散されていることがわかります。

4.3. 複数の生産者から消費する1人の消費者

複数のプロデューサーコルーチンからチャネルに書き込むことができます。 消費者コルーチンは、そのチャネルからのすべてのメッセージを読み取ることができます。

2つのプロデューサーを作成しましょう。 1人のプロデューサーがYouTubeビデオをフェッチし、別のプロデューサーがツイートをフェッチします。

suspend fun fetchYoutubeVideos(channel: SendChannel<String>) {
    val videos = listOf("cat video", "food video")
    for (video in videos) {
        delay(100)
        channel.send(video)
    }
}
suspend fun fetchTweets(channel: SendChannel<String>) {
    val tweets = listOf("tweet: Earth is round", "tweet: Coroutines and channels are cool")
    for (tweet in tweets) {
        delay(100)
        channel.send(tweet)
    }
}

次に、両方のプロデューサーを起動して、それらが生成する値を消費しましょう。

fun main() = runBlocking {
    val aggregate = Channel<String>()
    launch { fetchYoutubeVideos(aggregate) }
    launch { fetchTweets(aggregate) }

    repeat(4) {
        println(aggregate.receive())
    }

    coroutineContext.cancelChildren()
}

このプログラムの出力を確認してみましょう。

cat video
tweet: Earth is round
food video
tweet: Coroutines and channels are cool

Aggregateチャネルで両方のプロデューサーによって生成された値を受け取っていることがわかります。

5. チャネルを使用したパイプライン

複数のプロデューサーとコンシューマーをチェーンで組み合わせて、データ処理用のパイプラインを作成できます。 ピザを作るお店を例にとってみましょう。 ピザ作りのプロセスをいくつかのステップに分けることができます。 簡単にするために、ベーキングとトッピングの2つのステップに分けます。

コルーチンを使用してこれらのステップを実装する方法を見てみましょう。 ベーキングコルーチンは、トッピングコルーチンによって消費される基本的なベイクドピザを生成します。 トッピングコルーチンは、必要なトッピングを適用し、出力を提供する準備が整います。

ベーキングおよびトッピングコルーチンの簡単な実装を見てみましょう。

fun CoroutineScope.baking(orders: ReceiveChannel<PizzaOrder>) = produce {
    for (order in orders) {
        delay(200)
        println("Baking ${order.orderNumber}")
        send(order.copy(orderStatus = BAKED))
    }
}

fun CoroutineScope.topping(orders: ReceiveChannel<PizzaOrder>) = produce {
    for (order in orders) {
        delay(50)
        println("Topping ${order.orderNumber}")
        send(order.copy(orderStatus = TOPPED))
    }
}

与えられた数のダミーピザ注文を生成するための別のコルーチンを作成しましょう:

fun CoroutineScope.produceOrders(count: Int) = produce {
    repeat(count) {
        delay(50)
        send(PizzaOrder(orderNumber = it + 1))
    }
}

最後に、これらすべてのコルーチンを組み合わせてパイプラインを作成しましょう。

fun main() = runBlocking {
    val orders = produceOrders(3)

    val readyOrders = topping(baking(orders))

    for (order in readyOrders) {
        println("Serving ${order.orderNumber}")
    }

    delay(3000)
    coroutineContext.cancelChildren()
}

最初に、3つのピザ注文を作成します。 次に、ベーキングトッピングコルーチンを順番に通過します。 最後に、準備ができた注文を繰り返し、到着時にそれぞれにサービスを提供します。

出力を確認しましょう:

Baking 1
Topping 1
Serving 1
Baking 2
Topping 2
Serving 2
Baking 3
Topping 3
Serving 3

ご覧のとおり、ピザの注文準備のすべてのステップは、期待どおりの注文に従います。

6. ティッカーチャネル

Ticker チャネルは、従来のタイマーと同等のコルーチンです。 指定された一定の間隔でユニット値を生成します。 このタイプのチャネルは、一定の間隔でジョブを実行するのに役立ちます

簡単な株価フェッチャーの例を見てみましょう。 私たちのプログラムは、5秒ごとに特定の株式の価格を取得します。 ティッカーチャネルを使用した実装を見てみましょう。

fun stockPrice(stock: String): Double {
    log("Fetching stock price of $stock")
    return Random.nextDouble(2.0, 3.0)
}

fun main() = runBlocking {
    val tickerChannel = ticker(Duration.ofSeconds(5).toMillis())

    repeat(3) {
        tickerChannel.receive()
        log(stockPrice("TESLA"))
    }

    delay(Duration.ofSeconds(11).toMillis())
    tickerChannel.cancel()
}

出力を確認しましょう:

14:11:18 - Fetching stock price of TESLA
14:11:18 - 2.7380844072456583
14:11:23 - Fetching stock price of TESLA
14:11:23 - 2.3459508859536635
14:11:28 - Fetching stock price of TESLA
14:11:28 - 2.3137592916266994

ここでは、新しい株価が5秒ごとに出力されていることがわかります。 完了したら、cancelメソッドを呼び出してティッカーチャネルを停止します。

7. 結論

このチュートリアルでは、チャネルとは何か、およびチャネルをコルーチンで使用して非同期プログラミングを作成する方法を学びました。 さらに、コルーチンとチャネルを使用して、生産者/消費者とパイプラインのパターンも実装しました。

いつものように、すべての例はGitHubから入手できます。