コトリンのクエーサー入門

1. 前書き

  • Quasarは、管理しやすい方法で非同期の概念をKotlinにもたらすKotlinライブラリです*。 これには、軽量スレッド、チャンネル、アクターなどが含まれます。

2. ビルドのセットアップ

  • Quasarの最新バージョンを使用するには、JDKバージョン11以降で実行する必要があります*。 古いバージョンでは、まだJava 11にアップグレードできない状況向けにJDK 7をサポートしています。

  • Quasarには、使用している機能に応じて、必要な4つの依存関係が付属しています*。 これらを組み合わせるときは、それぞれに同じバージョンを使用することが不可欠です。

  • co.paralleluniverse:quasar-core
    –クエーサーの中核。

  • co.paralleluniverse:quasar-kotlin
    –クエーサーのKotlin拡張機能

  • co.paralleluniverse:quasar-actors
    –クエーサーのアクターのサポート。 これらについては、今後の記事で説明します。

  • co.paralleluniverse:quasar-reactive-streams
    – Quasarのリアクティブストリームのサポート。 これらについては、今後の記事で説明します。

    *正しく動作するには、Quasarはバイトコードのインストルメンテーションを実行する必要があります*。 これは、Javaエージェントを使用した実行時またはコンパイル時に実行できます。 Javaエージェントは、特別なビルド要件がなく、どのセットアップでも機能するため、推奨されるアプローチです。 ただし、Javaには一度に1つのJavaエージェントしかサポートされないため、これには欠点があります。

* 2.1。 コマンドラインから実行*

Quasarを使用してアプリケーションを実行する場合、JVMに対して_-javaagent_フラグを使用してJavaエージェントを指定します。 これは、パラメータとして_quasar-core.jar_ファイルへのフルパスを取ります:
$ java -javaagent:quasar-core.jar -cp quasar-core.jar:quasar-kotlin.jar:application.jar fully.qualified.main.Class

* 2.2。 Maven *からアプリケーションを実行する

必要に応じて、* Mavenを使用してJavaエージェントを追加することもできます。*
数ステップでMavenを使用してこれを達成できます。
最初に、依存関係プラグインをセットアップして、_quasar-core.jar_ファイルを指すプロパティを生成します。
<plugin>
    <artifactId>maven-dependency-plugin</artifactId>
    <version>3.1.1</version>
    <executions>
        <execution>
            <id>getClasspathFilenames</id>
            <goals>
               <goal>properties</goal>
            </goals>
        </execution>
    </executions>
</plugin>
次に、Execプラグインを使用して、実際にアプリケーションを起動します。
<plugin>
    <groupId>org.codehaus.mojo</groupId>
    <artifactId>exec-maven-plugin</artifactId>
    <version>1.3.2</version>
    <configuration>
        <workingDirectory>target/classes</workingDirectory>
        <executable>echo</executable>
        <arguments>
            <argument>-javaagent:${co.paralleluniverse:quasar-core:jar}</argument>
            <argument>-classpath</argument> <classpath/>
            <argument>com.baeldung.quasar.QuasarHelloWorldKt</argument>
        </arguments>
    </configuration>
</plugin>
次に、これを利用するために、正しい呼び出しでMavenを実行する必要があります。
mvn compile dependency:properties exec:exec
これにより、アプリケーションを実行する前に、最新のコードがコンパイルされ、Javaエージェントを指すプロパティが使用可能になります。

* 2.3。 単体テストの実行*

Quasarエージェントから得られる単体テストでも同じ利点を得ることができればうれしいです。*
テストを実行するときにこの同じプロパティを使用するようにSurefireをセットアップできます。
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-surefire-plugin</artifactId>
    <version>2.22.1</version>
    <configuration>
        <argLine>-javaagent:${co.paralleluniverse:quasar-core:jar}</argLine>
    </configuration>
</plugin>
統合テストにも使用している場合は、フェイルセーフでも同じことができます。

3. 繊維

*クエーサーのコア機能はファイバー*です。 これらは概念的にはスレッドに似ていますが、微妙に異なる目的を果たします。 ファイバーはスレッドよりも大幅に軽量です。標準スレッドが必要とするよりも劇的に少ないメモリとCPU時間を使用します。
*ファイバーは、スレッドを直接置き換えるものではありません*。 ある状況ではより良い選択であり、他の状況ではより悪い選択です。
具体的には、*実行中のコードが他のファイバー、スレッド、またはプロセスをブロックするのに多くの時間を費やしているシナリオ向けに設計されています。たとえば、データベースからの結果を待っています。
繊維は似ていますが、緑色の糸と同じではありません。 グリーンスレッドは、OSスレッドと同じように動作するように設計されていますが、OSスレッドに直接マップしません。 これは、通常はブロックしている状況で使用されるように設計されたファイバーとは対照的に、常に処理している状況でグリーンスレッドが最適に使用されることを意味します。
必要に応じて、必要な結果を達成するために、繊維と糸を一緒に使用することが可能です。

* 3.1。 ファイバーの起動*

*スレッドを起動する方法と非常に似た方法でファイバーを起動します*。 実行するコードをラップする_Fiber <V> _クラスのインスタンスを_SuspendableRunnable_の形式で作成し、_start_メソッドを呼び出します。
class MyRunnable : SuspendableRunnable {
    override fun run() {
        println("Inside Fiber")
    }
}
Fiber<Void>(MyRunnable()).start()
Kotlinでは、必要に応じて_SuspendableRunnable_インスタンスをラムダに置き換えることができます。
val fiber = Fiber<Void> {
    println("Inside Fiber Lambda")
}
fiber.start()
さらに、上記のすべてをさらに簡単な形式で実行する特別なヘルパーDSLもあります。
fiber @Suspendable {
    println("Inside Fiber DSL")
}
これにより、ファイバーが作成され、提供されたブロックをラップする_SuspendableRunnable_が作成され、実行が開始されます。
DSLをインプレースで実行する場合は、ラムダよりもDSLを使用することをお勧めします。 lambdaオプションを使用すると、必要に応じて変数としてラムダを渡すことができます。

* 3.2。 ファイバーから値を返す*

繊維でa__SuspendableRunnable_を使用することは、スレッドで_Runnable_を直接使用することと同等です。 *また、ファイバーで_SuspensableCallable <V> _を使用できます。これは、スレッドで_Callable_に相当します*。
明示的な型、ラムダ、またはDSLを使用して、上記と同じ方法でこれを行うことができます。
class MyCallable : SuspendableCallable<String> {
    override fun run(): String {
        println("Inside Fiber")
        return "Hello"
    }
}
Fiber<String>(MyCallable()).start()

fiber @Suspendable {
    println("Inside Fiber DSL")
    "Hello"
}
  • a _SuspendableRunnable_の代わりにa _SuspendableCallable_を使用することは、ファイバーに汎用戻り型があることを意味します*上記では、a__Fiber <Unit> _の代わりに_Fiber <String> _を取得しました。

    *いったん_Fiber <V> _を手に入れたら、それから値を抽出することができます* –これは_SuspendableCallable_によって返される値です–ファイバーの_get()_メソッドを使用して:
val pi = fiber @Suspendable {
    computePi()
}.get()
_get()_メソッドはa _java.util.concurrent.Future_と同じように機能し、1つの点で直接機能します。 これは、値が存在するまでブロックすることを意味します。*

* 3.3。 ファイバーで待機中*

他の場合には、*ファイバーの実行が完了するのを待つことができます*。 これは通常、非同期コードを使用する理由に反しますが、そうする必要がある場合があります。
Javaスレッドと同じように、実行が完了するまでブロックする__Fiber <V> __で呼び出すことができる_join()_メソッドがあります*:
val fiber = Fiber<Void>(Runnable()).start()
fiber.join()
*タイムアウトを提供することもできます。そのため、ファイバーが予想よりも終了するのに時間がかかる場合、無期限にブロックすることはありません。*
fiber @Suspendable {
    TimeUnit.SECONDS.sleep(5)
}.join(2, TimeUnit.SECONDS)
*ファイバーに時間がかかりすぎる場合、_join()_メソッドは_TimeoutException_ *をスローして、これが発生したことを示します。 同じ方法で、前に見た_get()_メソッドにこれらのタイムアウトを提供することもできます。

* 3.4。 ファイバーのスケジューリング*

*ファイバーはすべてスケジューラーで実行されます*。 具体的には、a__FiberScheduler_またはそのサブクラスのインスタンスによって。 指定されていない場合は、代わりにデフォルトが使用され、_DefaultFiberScheduler.instance_として直接使用できます。
スケジューラを構成するために使用できるいくつかのシステムプロパティがあります。
  • co.paralleluniverse.fibers.DefaultFiberPool.parallelism –数
    使用するスレッドの数。

  • co.paralleluniverse.fibers.DefaultFiberPool.exceptionHandler
    ファイバーが例外をスローする場合に使用する例外ハンドラー

  • co.paralleluniverse.fibers.DefaultFiberPool.monitor –への手段
    繊維を監視する

  • co.paralleluniverse.fibers.DefaultFiberPool.detailedFiberInfo
    モニターが詳細情報を取得するかどうか。

    *デフォルトでは、これは使用可能なCPUコアごとに1つのスレッドを実行し、JMX経由で簡単な監視情報を提供する_FiberForkJoinScheduler_になります。
    *これはほとんどの場合に適した選択肢*ですが、場合によっては別の選択肢が必要になることがあります。 *他の標準的な選択肢は、_FiberExecutorScheduler_です。これは、提供されたJava _Executor_でファイバーを実行し、スレッドプールで実行します。*または、必要に応じて独自に提供することもできます。またはSwingシナリオ。

* 3.5。 一時停止可能なメソッド*

  • Quasarは、Suspendable Methods *として知られる概念の観点から機能します。 これらは特別にタグ付けされたメソッドであり、一時停止が許可されているため、ファイバー内で実行できます。

    *通常、これらのメソッドは、_SuspendException_ *をスローすることを宣言するメソッドです。 ただし、これは常に可能であるとは限らないため、使用できる特別なケースがいくつかあります。
  • _ @ Suspendable_アノテーションを付けた任意のメソッド

  • 最終的にJava 8ラムダメソッドになるもの-これらはできません
    例外を宣言するため、特別に扱われます

  • これらは実行時に計算され、
    コンパイル時間ではない

    さらに、*コンストラクタまたはクラス初期化子を一時停止可能なメソッドとして使用することは許可されていません*。
    *一時停止可能なメソッドと一緒に_synchronized_ブロックを使用することもできません*。 これは、メソッド自体を_synchronized_としてマークできず、その内部から_synchronized_メソッドを呼び出せず、メソッド内で_synchronized_ブロックを使用できないことを意味します。
    *サスペンド可能なメソッド内で_synchronized_を使用できないのと同じ方法で、他の方法で実行スレッドを直接ブロックしないでください。たとえば、_Thread.sleep()_を使用します。 そうすると、パフォーマンスの問題が発生し、システムが不安定になる可能性があります。
    これらのいずれかを実行すると、Quasar Javaエージェントからエラーが生成されます。 デフォルトの場合、何が起こったかを示すコンソールへの出力が表示されます:
WARNING: fiber [email protected]:fiber-10000004[task: [email protected]([email protected]), target: [email protected], scheduler: [email protected]] is blocking a thread (Thread[ForkJoinPool-default-fiber-pool-worker-3,5,main]).
    at [email protected]/java.lang.Thread.sleep(Native Method)
    at [email protected]/java.lang.Thread.sleep(Thread.java:339)
    at [email protected]/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446)
    at app//com.baeldung.quasar.SimpleFiberTest$fiberTimeout$1.invoke(SimpleFiberTest.kt:43)
    at app//com.baeldung.quasar.SimpleFiberTest$fiberTimeout$1.invoke(SimpleFiberTest.kt:12)
    at app//co.paralleluniverse.kotlin.KotlinKt$fiber$sc$1.invoke(Kotlin.kt:32)
    at app//co.paralleluniverse.kotlin.KotlinKt$fiber$sc$1.run(Kotlin.kt:65535)
    at app//co.paralleluniverse.fibers.Fiber.run(Fiber.java:1099)

4. より線

*ストランドとは、繊維と糸の両方を組み合わせたクエーサーの概念です*。 アプリケーションのその他の部分を気にすることなく、必要に応じてスレッドとファイバーを交換できます。
  • Strand.of(): *を使用して、Strandクラスのスレッドまたはファイバーインスタンスをラップすることにより、Strandを作成します。

val thread: Thread = ...
val strandThread = Strand.of(thread)

val fiber: Fiber = ...
val strandFiber = Strand.of(fiber)
または、_Strand.currentStrand():_ *を使用して、現在実行中のスレッドまたはファイバーのStrandインスタンスを取得できます
val myFiber = fiber @Suspendable {
    // Strand.of(myFiber) == Strand.currentStrand()
}
完了したら、*同じAPI *を使用して両方とやり取りできます。これにより、ストランドに問い合わせ、実行が完了するまで待つことができます。
strand.id // Returns the ID of the Fiber or Thread
strand.name // Returns the Name of the Fiber or Thread
strand.priority // Returns the Priority of the Fiber or Thread

strand.isAlive // Returns if the Fiber or Thread is currently alive
strand.isFiber // Returns if the Strand is a Fiber

strand.join() // Block until the Fiber or Thread is completed
strand.get() // Returns the result of the Fiber or Thread

5. 折り返しコールバック

*ファイバーの主な用途の1つは、コールバックを使用して呼び出し元にステータスを返す非同期コードをラップすることです*
Quasarは、_FiberAsync <T、E> _というクラスを提供します。これはまさにこの場合に使用できます。 同じコードに対してコールバックベースのAPIではなく、ファイバーベースのAPIを提供するように拡張できます。
これは、コールバックインターフェイスを実装し、__ FiberAsync __classを拡張し、コールバックメソッドをthe_FiberAsync_クラスに委任して処理するクラスを記述することで実行されます。
interface PiCallback {
    fun success(result: BigDecimal)
    fun failure(error: Exception)
}

class PiAsync : PiCallback, FiberAsync<BigDecimal, Exception>() {
    override fun success(result: BigDecimal) {
        asyncCompleted(result)
    }

    override fun failure(error: Exception) {
        asyncFailed(error)
    }

    override fun requestAsync() {
        computePi(this)
    }
}
これで、結果を計算するために使用できるクラスができました。これを、コールバックベースのAPIではなく、単純な呼び出しであるかのように扱うことができます。
val result = PiAsync().run()
*これは、成功値(_asyncCompleted()_に渡した値)を返すか、失敗例外(_asyncFailed_に渡したもの)をスローします。*
*これを使用すると、クエーサーは現在のファイバーに直接結び付けられた新しいファイバーを起動し、結果が利用可能になるまで現在のファイバーを一時停止します* これは、スレッド内ではなく、ファイバー内から使用する必要があることを意味します。 また、_FiberAsync_のインスタンスは、同じファイバー内で作成して実行し、動作させる必要があります。
さらに、*それらは再利用可能ではありません* –完了したら再起動できません。

6. チャンネル

Quasarは、異なるストランド間でメッセージの受け渡しを可能にするチャネルの概念を導入しています。 これらはhttps://gobyexample.com/channels[Goプログラミング言語のチャンネル]と非常によく似ています。

* 6.1。 チャンネルの作成*

*静的メソッド_Channels.newChannel _。*を使用してチャネルを作成できます*
Channels.newChannel(bufferSize, overflowPolicy, singleProducerOptimized, singleConsumerOptimized);
したがって、バッファーがいっぱいになり、単一のプロデューサーとコンシューマーをターゲットにするとブロックする例は次のようになります。
Channels.newChannel<String>(1024, Channels.OverflowPolicy.BLOCK, true, true);
*特定のプリミティブ型*-creating_newIntChannel _、_ newLongChannel _、_ newFloatChannel_、および_newDoubleChannel_のチャンネルを作成するための特別な方法もあります。 これらの特定のタイプのメッセージを送信し、ファイバー間のより効率的なフローを取得する場合、これらを使用できます。 *複数のコンシューマからのこれらのプリミティブチャネルを使用することはできません*-これは、Quasarが提供する効率の一部です。

* 6.2。 チャンネルの使用*

結果の_Channel_オブジェクトは、_SendPort_と_ReceivePort_の2つの異なるインターフェースを実装します。
*メッセージを消費しているストランドから_ReceivePort_インターフェイスを使用できます。*
fiber @Suspendable {
    while (true) {
        val message = channel.receive()
        println("Received: $message")
    }
}
*その後、同じチャネルの_SendPort_インターフェイスを使用して、上記で消費されるメッセージを生成できます*。
channel.send("Hello")
channel.send("World")
明らかな理由により、同じストランドからこれらの両方を使用することはできませんが、*異なるストランド間で同じ_channel_インスタンスを共有して、2つの間でメッセージを共有することができます*。 この場合、ストランドは繊維または糸のいずれかです。

* 6.3。 閉会チャンネル*

上記では、チャネルからの無限ループ読み取りがありました。 これは明らかに理想的ではありません。
*チャンネルがアクティブにメッセージを生成している間はすべてループし、チャンネルが終了したら停止する*ことをお勧めします*。 これを行うには、_close()_を使用してチャネルを閉じているとマークし、_isClosed_プロパティを使用してチャネルが閉じているかどうかを確認します。
fiber @Suspendable {
    while (!channel.isClosed) {
        val message = channel.receive()
        println("Received: $message")
    }
    println("Stopped receiving messages")
}

channel.send("Hello")
channel.send("World")

channel.close()

* 6.4。 ブロッキングチャネル*

*チャネルは、その性質上、概念をブロックします*。 _ReceivePort_は、メッセージが処理できるようになるまでブロックします。メッセージがバッファリングされるまでブロックするように_SendPort_を構成できます。
これは、繊維の重要な概念を活用します-繊維は吊り下げ可能です。 *これらのブロックアクションのいずれかが発生すると、Quasarは、チャネルを繰り返しポーリングする代わりに、非常に軽量なメカニズムを使用して、作業を継続できるまでファイバーを一時停止します*。 これにより、システムリソースを他の場所で(たとえば、他のファイバーの処理に)使用できます。

6.5. 複数のチャネルで待機する

Quasarは、アクションを実行できるようになるまで、単一のチャネルでブロックできることを確認しました。 * Quasarは、複数のチャネルで待機する機能も提供しています*。
これは、_Selector.select_ステートメントを使用して行います。 この概念は、Goとlink:/java-nio-selector[Java NIO]の両方でおなじみの場合があります。
  • _select()_メソッドは_SelectAction_インスタンスのコレクションを取り、これらのアクションのいずれかが実行されるまでブロックします:*

fiber @Suspendable {
    while (!channel1.isClosed && !channel2.isClosed) {
        val received = Selector.select(
          Selector.receive(channel1),
          Selector.receive(channel2)
        )

        println("Received: $received")
    }
}
上記では、*複数のチャネルに書き込むことができ、メッセージが利用可能なチャネルのいずれかでファイバがすぐに読み取れます*。 セレクタは、利用可能な最初のメッセージのみを消費するため、メッセージはドロップされません。
*複数チャンネルへの送信にも使用できます:*
fiber @Suspendable {
    for (i in 0..10) {
        Selector.select(
          Selector.send(channel1, "Channel 1: $i"),
          Selector.send(channel2, "Channel 2: $i")
        )
    }
}
_receive_と同様に、これは最初のアクションを実行できるまでブロックし、その後そのアクションを実行します。 これには、*メッセージが正確に1つのチャネル*に送信されるという興味深い副作用がありますが、送信先のチャネルはたまたま使用可能なバッファスペースがある最初のチャネルです。 *これにより、複数のチャネルにメッセージを配信することができます*それらのチャネルの受信端からのバックプレッシャーに正確に基づいています。

* 6.6 ティッカーチャンネル*

*作成できる特別な種類のチャネルは、ティッカーチャネルです*。 これらは証券取引所のティッカーと概念が似ています。新しいメッセージが古いメッセージを置き換えるため、消費者がすべてのメッセージを見ることが重要ではありません。
*これらは、ステータスの更新の流れが一定である場合に役立ちます*-例えば、証券取引所の価格や完了した割合など。
*これらを通常のチャネルとして作成しますが、_OverflowPolicy.DISPLACE_設定を使用します*。 この場合、新しいメッセージを作成するときにバッファがいっぱいになると、最も古いメッセージが静かにドロップされて、そのスペースが確保されます。
これらのチャネルは、単一のストランドからのみ使用できます。 ただし、* TickerChannelConsumer_を作成して、このチャネルから複数のストランドにわたって読み取ることができます:*
val channel = Channels.newChannel<String>(3, Channels.OverflowPolicy.DISPLACE)

for (i in 0..10) {
    val tickerConsumer = Channels.newTickerConsumerFor(channel)
    fiber @Suspendable {
        while (!tickerConsumer.isClosed) {
            val message = tickerConsumer.receive()
            println("Received on $i: $message")
        }
        println("Stopped receiving messages on $i")
    }
}

for (i in 0..50) {
    channel.send("Message $i")
}

channel.close()
  • _TickerChannelConsumer_のすべてのインスタンスは、ラップされたチャネルに送信されたすべてのメッセージを受信する可能性があります*-オーバーフローポリシーによるドロップを許可します。

    メッセージは常に正しい順序で受信され、各_TickerChannelConsumer_を作業に必要な速度で消費できます。
    *ラップされたチャンネルがいつ閉じられるかを知ることもできます* __これにより、プロデューサーは、コンシューマーがメッセージを読んでいる方法や、使用されているチャネルのタイプを気にする必要がなくなります。

* 6.7。 チャネルへの機能変換*

私たちはみな、https://www.baeldung.com/java-8-streams-introduction [streams]を使用したJavaでの関数変換に慣れています。 *これらの同じ標準変換をチャネルに適用できます*-送信と受信の両方のバリエーションとして。
  • filter –指定されたラムダに適合しないメッセージを除外します

  • map –チャネルを流れるメッセージを変換します

  • flatMap –マップと同じですが、1つのメッセージを複数に変換します
    メッセージ

  • reduce –適用
    reduction function to channel

    たとえば、次を使用して、Port_ReceivePort <String> _を、それを流れるすべての文字列を反転するものに変換できます。
val transformOnReceive = Channels.map(channel, Function<String, String> { msg: String? -> msg?.reversed() })
*これは元のチャネルのメッセージには影響しません。また、この変換の効果を見ることなく、他の場所でメッセージを消費できます。*
または、次のようにa__SendPort <String> _をチャネルに書き込むときにすべての文字列を強制的に大文字に変換できます。
val transformOnSend = Channels.mapSend(channel, Function<String, String> { msg: String? -> msg?.toUpperCase() })
*これは、メッセージが書き込まれたときに影響します。この場合、ラップされたチャネルは変換されたメッセージのみを表示します*。 ただし、必要に応じて、この変換をバイパスするために、ラップされているチャネルに直接書き込むこともできます。

7. データフロー

  • Quasar Coreは、リアクティブプログラミングをサポートするいくつかのツールを提供します*。 これらはRxJavaのようなものほど強力ではありませんが、ほとんどの場合には十分です。

    _Val_と_Var_という2つの概念にアクセスできます。 * _Val_は定数値を表し、_Var_は変動する値を表します*。
    どちらのタイプも、値を計算するためにファイバーで使用される値なしまたは_SuspendableCallable_のいずれかで構築されます。
val a = Var<Int>()
val b = Val<Int>()

val c = Var<Int> { a.get() + b.get() }
val d = Var<Int> { a.get() * b.get() }

// (a*b) - (a+b)
val initialResult = Val<Int> { d.get() - c.get() }
val currentResult = Var<Int> { d.get() - c.get() }
最初は、_initialResult_と_currentResult_には値がなく、それらから値を取得しようとすると、現在のストランドがブロックされます。 * _a_と_b_の値を指定するとすぐに、_initialResult_と__ResultcurrentResult__の両方から値を読み取ることができます。*
これに加えて、*さらに変更すると_a_それから_currentResult_はこれを反映して更新されますが、_initialResult_は更新されません*:
a.set(2)
b.set(4)

Assert.assertEquals(2, initialResult.get())
Assert.assertEquals(2, currentResult.get())

a.set(3)

Assert.assertEquals(2, initialResult.get()) // Unchanged
Assert.assertEquals(5, currentResult.get()) // New Value
_b_を変更しようとすると、代わりに例外がスローされます。* a _Val_には単一の値しか割り当てられないためです*。

8. 結論

この記事では、非同期プログラミングに使用できるQuasarライブラリの概要を説明しました。 ここで見てきたことは、クエーサーで達成できることの基本にすぎません。 次のプロジェクトで試してみませんか?
ここで取り上げたいくつかの概念の例https://github.com/eugenp/tutorials/tree/master/kotlin-quasar[GitHubで見つけることができます。]