1. 序章

QuasarはKotlinライブラリであり、いくつかの非同期の概念を管理しやすい方法でKotlinにもたらします。 これには、軽量スレッド、チャネル、アクターなどが含まれます。

2. ビルドの設定

Quasarの最新バージョンを使用するには、JDKバージョン11以降で実行する必要があります。 古いバージョンは、Java11にまだアップグレードできない状況でJDK7をサポートします。

Quasarには、使用している機能に応じて、必要な4つの依存関係が付属しています。 これらを組み合わせるときは、それぞれに同じバージョンを使用することが不可欠です。

正しく機能するには、Quasarはバイトコードインストルメンテーションを実行する必要があります。 これは、Javaエージェントを使用して実行時に実行することも、コンパイル時に実行することもできます。 Javaエージェントは、特別なビルド要件がなく、どのセットアップでも機能するため、推奨されるアプローチです。 ただし、Javaは一度に1つのJavaエージェントしかサポートしないため、これには欠点があります。

2.1. コマンドラインから実行

Quasarを使用してアプリケーションを実行する場合、JVMに対して-javaagentフラグを使用してJavaエージェントを指定します。 これにより、quasar-core.jarファイルへのフルパスがパラメーターとして使用されます。

$ java -javaagent:quasar-core.jar -cp quasar-core.jar:quasar-kotlin.jar:application.jar fully.qualified.main.Class

2.2. Mavenからアプリケーションを実行する

必要に応じて、Mavenを使用してJavaエージェントを追加することもできます。

いくつかの手順で、Mavenを使用してこれを達成できます。

まず、依存関係プラグインを設定して、quasar-core.jarファイルを指すプロパティを生成します。

<plugin>
    <artifactId>maven-dependency-plugin</artifactId>
    <version>3.1.1</version>
    <executions>
        <execution>
            <id>getClasspathFilenames</id>
            <goals>
               <goal>properties</goal>
            </goals>
        </execution>
    </executions>
</plugin>

次に、Execプラグインを使用して、実際にアプリケーションを起動します。

<plugin>
    <groupId>org.codehaus.mojo</groupId>
    <artifactId>exec-maven-plugin</artifactId>
    <version>1.3.2</version>
    <configuration>
        <workingDirectory>target/classes</workingDirectory>
        <executable>echo</executable>
        <arguments>
            <argument>-javaagent:${co.paralleluniverse:quasar-core:jar}</argument>
            <argument>-classpath</argument> <classpath/>
            <argument>com.baeldung.quasar.QuasarHelloWorldKt</argument>
        </arguments>
    </configuration>
</plugin>

次に、これを利用するために、正しい呼び出しMavenを実行する必要があります。

mvn compile dependency:properties exec:exec

これにより、アプリケーションを実行する前に、最新のコードがコンパイルされ、Javaエージェントを指すプロパティが使用可能になります。

2.3. 単体テストの実行

を取得するのは素晴らしいことですユニットテストでは、Quasarエージェントから得られるのと同じ利点があります。

テストを実行するときにこれと同じプロパティを利用するようにSurefireを設定できます。

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-surefire-plugin</artifactId>
    <version>2.22.1</version>
    <configuration>
        <argLine>-javaagent:${co.paralleluniverse:quasar-core:jar}</argLine>
    </configuration>
</plugin>

統合テストにもFailsafeを使用している場合は、Failsafeでも同じことができます。

3. 繊維

Quasarのコア機能はファイバーの機能です。 これらは概念的にはスレッドに似ていますが、微妙に異なる目的を果たします。 ファイバーはスレッドよりも大幅に軽量であり、標準のスレッドが必要とするよりも劇的に少ないメモリとCPU時間を消費します。

ファイバーはスレッドの直接の代替品ではありません。 それらは、ある状況ではより良い選択であり、他の状況ではより悪い選択です。

具体的には、これらは、実行中のコードが他のファイバー、スレッド、またはプロセスでをブロックするのに多くの時間を費やすシナリオ(データベースからの結果を待つなど)向けに設計されています。

繊維は似ていますが、グリーンスレッドと同じではありません。 グリーンスレッドは、OSスレッドと同じように機能するように設計されていますが、OSスレッドに直接マップされません。 これは、通常はブロッキングしている状況で使用するように設計されたファイバーとは対照的に、グリーンスレッドは常に処理している状況で最もよく使用されることを意味します。

必要に応じて、ファイバーとスレッドを一緒に使用して、必要な結果を得ることができます。

3.1. ファイバーの起動

スレッドを起動する方法と非常によく似た方法でファイバーを起動します。 のインスタンスを作成しますファイバ実行するコードをラップするクラス–の形式で SuspendableRunnable –次に、 始める方法:

class MyRunnable : SuspendableRunnable {
    override fun run() {
        println("Inside Fiber")
    }
}
Fiber<Void>(MyRunnable()).start()

Kotlinでは、必要に応じてSuspendableRunnableインスタンスをラムダに置き換えることができます。

val fiber = Fiber<Void> {
    println("Inside Fiber Lambda")
}
fiber.start()

さらに、上記のすべてをさらに単純な形式で実行する特別なヘルパーDSLもあります。

fiber @Suspendable {
    println("Inside Fiber DSL")
}

これにより、ファイバーが作成され、提供されたブロックをラップする SuspendableRunnable が作成され、実行が開始されます。

インプレースで実行したい場合は、ラムダよりもDSLの使用をお勧めします。 ラムダオプションを使用すると、必要に応じてラムダを変数として渡すことができます。

3.2. ファイバーからの戻り値

ファイバーでのSuspendableRunnableの使用は、スレッドでのRunnableと直接同等です。 SuspensableCallableを使用することもできますファイバーを使用します。これは、スレッドを使用して呼び出し可能に相当します。

これは、上記と同じ方法で、明示的なタイプ、ラムダ、またはDSLを使用して行うことができます。

class MyCallable : SuspendableCallable<String> {
    override fun run(): String {
        println("Inside Fiber")
        return "Hello"
    }
}
Fiber<String>(MyCallable()).start()

fiber @Suspendable {
    println("Inside Fiber DSL")
    "Hello"
}

SuspendableRunnableの代わりにSuspendableCallableを使用するということは、ファイバーが一般的なリターンタイプになったことを意味します。 –上記では、 ファイバの代わりにファイバ

ファイバーを入手したら私たちの手で、私たちはそれから価値を引き出すことができます –これはによって返される値です SuspendableCallable –を使用して得る() ファイバーの方法:

val pi = fiber @Suspendable {
    computePi()
}.get()

get()メソッドは、 java.util.concurrent.Future と同じように機能し、1つの観点から直接機能します。 これは、値が存在するまでブロックされることを意味します。

3.3. 繊維を待っています

また、ファイバーの実行が終了するのを待ちたい場合もあります。 これは通常、非同期コードを使用する理由に反しますが、使用する必要がある場合もあります。

Javaスレッドと同じように、 ファイバーで呼び出すことができるjoin()メソッドがあります実行が完了するまでブロックされます

val fiber = Fiber<Void>(Runnable()).start()
fiber.join()

タイムアウトを提供することもできるので、ファイバーの終了に予想よりも時間がかかる場合でも、無期限にブロックすることはありません。

fiber @Suspendable {
    TimeUnit.SECONDS.sleep(5)
}.join(2, TimeUnit.SECONDS)

ファイバーに時間がかかりすぎる場合、join()メソッドはTimeoutException をスローして、これが発生したことを示します。 同じ方法で、前に見た get()メソッドにこれらのタイムアウトを提供することもできます。

3.4. ファイバーのスケジューリング

ファイバーはすべてスケジューラーで実行されます。 具体的には、FiberSchedulerまたはそのサブクラスのインスタンスによって。 指定しない場合は、代わりにデフォルトが使用されます。これは、DefaultFiberScheduler.instanceとして直接利用できます。

スケジューラーの構成に使用できるシステムプロパティはいくつかあります。

  • co.paralleluniverse.fibers.DefaultFiberPool.parallelism –使用するスレッドの数。
  • co.paralleluniverse.fibers.DefaultFiberPool.exceptionHandler –ファイバーが例外をスローした場合に使用する例外ハンドラー
  • co.paralleluniverse.fibers.DefaultFiberPool.monitor –ファイバーを監視する手段
  • co.paralleluniverse.fibers.DefaultFiberPool.detailedFiberInfo –モニターが詳細情報を取得するかどうか。

デフォルトでは、これは利用可能なCPUコアごとに1つのスレッドを実行するFiberForkJoinScheduler であり、JMXを介して簡単な監視情報を提供します。

これはほとんどの場合に適していますが、場合によっては別の選択肢が必要になることもあります。 他の標準的な選択肢はFiberExecutorSchedulerで、提供されたJava Executorでファイバーを実行してスレッドプールで実行します、または必要に応じて独自のファイバーを提供できます。たとえば、特定のJavaExecutorですべてを実行する必要がある場合があります。 AWTまたはSwingシナリオのスレッド。

3.5. 中断可能なメソッド

Quasarは、SuspendableMethodsとして知られる概念の観点から機能します。 これらは特別にタグ付けされたメソッドであり、中断することが許可されているため、ファイバー内で実行できます。

通常、これらのメソッドは、SuspendExceptionをスローすることを宣言するメソッドです。 ただし、これが常に可能であるとは限らないため、使用できる他の特殊なケースがいくつかあります。

  • @Suspendableアノテーションでアノテーションを付けるメソッド
  • 最終的にJava8ラムダメソッドになるもの–これらは例外を宣言できないため、特別に扱われます
  • リフレクションによって行われた呼び出し。これらは実行時に計算され、コンパイル時ではないためです。

さらに、コンストラクターまたはクラス初期化子をサスペンド可能なメソッドとして使用することは許可されていません

サスペンド可能なメソッドと一緒に同期ブロックを使用することもできません。 つまり、メソッド自体を同期としてマークしたり、内部から同期メソッドを呼び出したり、同期を使用したりすることはできません。メソッド内のブロック。

サスペンド可能なメソッド内で同期を使用できないのと同じように、他の方法で実行のスレッドを直接ブロックしないでください-たとえば、 Thread.sleep() 。 これを行うと、パフォーマンスの問題が発生し、システムが不安定になる可能性があります。

これらのいずれかを実行すると、QuasarJavaエージェントからエラーが生成されます。 デフォルトの場合、何が起こったかを示すコンソールへの出力が表示されます。

WARNING: fiber Fiber@10000004:fiber-10000004[task: ParkableForkJoinTask@40c7e038(Fiber@10000004), target: co.paralleluniverse.kotlin.KotlinKt$fiber$sc$1@7d289a68, scheduler: co.paralleluniverse.fibers.FiberForkJoinScheduler@5319f44e] is blocking a thread (Thread[ForkJoinPool-default-fiber-pool-worker-3,5,main]).
	at java.base@11/java.lang.Thread.sleep(Native Method)
	at java.base@11/java.lang.Thread.sleep(Thread.java:339)
	at java.base@11/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446)
	at app//com.baeldung.quasar.SimpleFiberTest$fiberTimeout$1.invoke(SimpleFiberTest.kt:43)
	at app//com.baeldung.quasar.SimpleFiberTest$fiberTimeout$1.invoke(SimpleFiberTest.kt:12)
	at app//co.paralleluniverse.kotlin.KotlinKt$fiber$sc$1.invoke(Kotlin.kt:32)
	at app//co.paralleluniverse.kotlin.KotlinKt$fiber$sc$1.run(Kotlin.kt:65535)
	at app//co.paralleluniverse.fibers.Fiber.run(Fiber.java:1099)

4. ストランド

ストランドは、ファイバーとスレッドの両方を組み合わせたQuasarのコンセプトです。 これらにより、アプリケーションの他の部分を気にすることなく、必要に応じてスレッドとファイバーを交換できます。

Strand.of():を使用して、スレッドまたはファイバーインスタンスをStrandクラスでラップすることにより、Strandを作成します。

val thread: Thread = ...
val strandThread = Strand.of(thread)

val fiber: Fiber = ...
val strandFiber = Strand.of(fiber)

または、 Strand.currentStrand():を使用して、現在実行中のスレッドまたはファイバーのStrandインスタンスを取得できます。

val myFiber = fiber @Suspendable {
    // Strand.of(myFiber) == Strand.currentStrand()
}

完了すると、同じAPI を使用して両方と対話できるようになり、ストランドに問い合わせたり、実行が完了するまで待機したりできます。

strand.id // Returns the ID of the Fiber or Thread
strand.name // Returns the Name of the Fiber or Thread
strand.priority // Returns the Priority of the Fiber or Thread

strand.isAlive // Returns if the Fiber or Thread is currently alive
strand.isFiber // Returns if the Strand is a Fiber

strand.join() // Block until the Fiber or Thread is completed
strand.get() // Returns the result of the Fiber or Thread

5. コールバックのラッピング

ファイバーの主な用途の1つは、コールバックを使用してステータスを呼び出し元に返す非同期コードをラップすることです。

Quasarはと呼ばれるクラスを提供します FiberAsync これはまさにこの場合に使用できます。 同じコードに対してコールバックベースのAPIではなく、ファイバーベースのAPIを提供するように拡張できます。

これは、コールバックインターフェイスを実装し、 FiberAsync クラスを拡張し、コールバックメソッドをFiberAsyncクラスに委任して処理するクラスを作成することによって行われます。

interface PiCallback {
    fun success(result: BigDecimal)
    fun failure(error: Exception)
}

class PiAsync : PiCallback, FiberAsync<BigDecimal, Exception>() {
    override fun success(result: BigDecimal) {
        asyncCompleted(result)
    }

    override fun failure(error: Exception) {
        asyncFailed(error)
    }

    override fun requestAsync() {
        computePi(this)
    }
}

これで、結果の計算に使用できるクラスができました。これを、コールバックベースのAPIではなく単純な呼び出しであるかのように扱うことができます。

val result = PiAsync().run()

これにより、成功値( asyncCompleted()に渡した値)が返されるか、失敗例外( asyncFailed に渡した値)がスローされます。

これを使用すると、Quasarは現在のファイバーに直接接続されている新しいファイバーを起動し、結果が利用可能になるまで現在のファイバーを一時停止します。 これは、スレッド内ではなく、ファイバー内から使用する必要があることを意味します。 また、 FiberAsync のインスタンスを作成し、同じファイバー内から実行する必要があることも意味します。

さらに、それらは再利用できません –完了した後は再起動できません。

6. チャネル

Quasarは、異なるストランド間でメッセージを渡すことができるようにするチャネルの概念を導入しています。 これらは、Goプログラミング言語チャネルと非常によく似ています。

6.1. チャネルの作成

静的メソッドChannels.newChannelを使用してチャネルを作成できます。

Channels.newChannel(bufferSize, overflowPolicy, singleProducerOptimized, singleConsumerOptimized);

したがって、バッファがいっぱいになったときにブロックし、単一のプロデューサーとコンシューマーをターゲットにする例は次のようになります。

Channels.newChannel<String>(1024, Channels.OverflowPolicy.BLOCK, true, true);

特定のプリミティブタイプのチャネルを作成するための特別なメソッドもいくつかあります newIntChannel newLongChannel newFloatChannel newDoubleChannel ]。 これらの特定のタイプのメッセージを送信し、ファイバー間のフローをより効率的にする場合は、これらを使用できます。 複数のコンシューマーからのこれらのプリミティブチャネルを使用することはできないことに注意してください-これは、Quasarが提供する効率の一部です。

6.2. チャネルの使用

結果のChannelオブジェクトは、SendPortReceivePortの2つの異なるインターフェイスを実装します。

メッセージを消費しているストランドからReceivePortインターフェイスを使用できます。

fiber @Suspendable {
    while (true) {
        val message = channel.receive()
        println("Received: $message")
    }
}

次に、同じチャネルのSendPortインターフェイスを使用して、上記で消費されるメッセージを生成できます。

channel.send("Hello")
channel.send("World")

明らかな理由で、同じストランドからこれらの両方を使用することはできませんが、異なるストランド間で同じチャネルインスタンスを共有して、2つの間でメッセージを共有できるようにすることができます。 この場合、ストランドはファイバーまたはスレッドのいずれかになります。

6.3. チャンネルを閉じる

上記では、チャネルからの読み取りが無限ループになっています。 これは明らかに理想的ではありません。

チャネルがアクティブにメッセージを生成している間はループし、チャネルが終了したら停止することをお勧めします。 これを行うには、 close()を使用してチャネルを閉じているとマークし、isClosedプロパティを使用してチャネルが閉じているかどうかを確認します。

fiber @Suspendable {
    while (!channel.isClosed) {
        val message = channel.receive()
        println("Received: $message")
    }
    println("Stopped receiving messages")
}

channel.send("Hello")
channel.send("World")

channel.close()

6.4. チャネルのブロック

チャネルは、その性質上、概念をブロックしています ReceivePort は、メッセージを処理できるようになるまでブロックします。メッセージをバッファリングできるようになるまで、SendPortをブロックするように構成できます。

これは、繊維が吊り下げ可能であるという重要な概念を活用しています。 これらのブロッキングアクションのいずれかが発生すると、Quasarは、チャネルを繰り返しポーリングする代わりに、非常に軽量なメカニズムを使用して、ファイバーが動作を継続できるようになるまでファイバーを一時停止します。 これにより、システムリソースを他の場所で使用できるようになります。たとえば、他のファイバーを処理するために使用できます。

6.5. 複数のチャネルで待機中

Quasarは、アクションを実行できるようになるまで、単一のチャネルでブロックできることを確認しました。 Quasarには、複数のチャネル間で待機する機能もあります

これは、Selector.selectステートメントを使用して行います。 この概念は、Goと JavaNIOの両方でおなじみかもしれません。

select()メソッドは、 SelectAction インスタンスのコレクションを取得し、次のいずれかのアクションが実行されるまでブロックします。

fiber @Suspendable {
    while (!channel1.isClosed && !channel2.isClosed) {
        val received = Selector.select(
          Selector.receive(channel1), 
          Selector.receive(channel2)
        )

        println("Received: $received")
    }
}

上記では、複数のチャネルに書き込むことができ、メッセージが利用可能なのチャネルのいずれかでファイバーがすぐに読み取ります。 セレクターは、使用可能な最初のメッセージのみを消費するため、メッセージがドロップされることはありません。

これを使用して、複数のチャネルに送信することもできます。

fiber @Suspendable {
    for (i in 0..10) {
        Selector.select(
          Selector.send(channel1, "Channel 1: $i"),
          Selector.send(channel2, "Channel 2: $i")
        )
    }
}

receive と同様に、これは最初のアクションを実行できるようになるまでブロックされ、その後そのアクションを実行します。 これには、メッセージが正確に1つのチャネルに送信されるという興味深い副作用がありますが、メッセージが送信されるチャネルは、バッファスペースを使用できる最初のチャネルです。 これにより、複数のチャネルに、それらのチャネルの受信側からのバックプレッシャに正確に基づいてメッセージを配信できます。

6.6. ティッカーチャネル

私たちが作成できる特別な種類のチャネルはティッカーチャネルです。 これらは、証券取引所のティッカーと概念が似ています。新しいメッセージが古いメッセージに置き換わるため、消費者がすべてのメッセージを見ることが重要ではありません。

これらは、ステータスの更新が一定の流れである場合に役立ちます-たとえば、証券取引所の価格や完了率などです。

これらを通常のチャネルとして作成しますが、OverflowPolicy.DISPLACE設定を使用します。 この場合、新しいメッセージを生成するときにバッファがいっぱいになると、最も古いメッセージがサイレントにドロップされて、そのためのスペースが確保されます。

これらのチャネルは、単一のストランドからのみ消費できます。 ただし、 TickerChannelConsumerを作成して、このチャネルから複数のストランドにまたがって読み取ることができます:

val channel = Channels.newChannel<String>(3, Channels.OverflowPolicy.DISPLACE)

for (i in 0..10) {
    val tickerConsumer = Channels.newTickerConsumerFor(channel)
    fiber @Suspendable {
        while (!tickerConsumer.isClosed) {
            val message = tickerConsumer.receive()
            println("Received on $i: $message")
        }
        println("Stopped receiving messages on $i")
    }
}

for (i in 0..50) {
    channel.send("Message $i")
}

channel.close()

TickerChannelConsumerのすべてのインスタンスは、ラップされたチャネルに送信されたすべてのメッセージを受信する可能性があります –オーバーフローポリシーによってドロップされたメッセージを許可します。

私たちは常に正しい順序でメッセージを受信し、各 TickerChannelConsumer を必要な速度で消費できます–1本のファイバーの速度が遅いと他のファイバーに影響を与えません

ラップされたチャネルがいつ閉じられるかもわかります私たちからの読書をやめることができるように TickerChannelConsumer。 これにより、プロデューサーは、コンシューマーがメッセージを読み取る方法や、使用されているチャネルのタイプを気にする必要がなくなります。

6.7. チャネルへの機能変換

私たちは皆、ストリームを使用したJavaでの機能変換に慣れています。 これらの同じ標準変換をチャネルに適用できます–送信と受信の両方のバリエーションとして。

適用できるこれらのアクションには、次のものがあります。
  • filter –特定のラムダに適合しないメッセージを除外します
  • map –メッセージがチャネルを流れるときにメッセージを変換します
  • flatMap –マップと同じですが、1つのメッセージを複数のメッセージに変換します
  • reduce reduction関数をチャネルに適用します

たとえば、 ReceivePort 以下を使用して、その中を流れるすべての文字列を逆にするものに変換します。

val transformOnReceive = Channels.map(channel, Function<String, String> { msg: String? -> msg?.reversed() })

これは元のチャネルのメッセージには影響せず、この変換の効果を確認せずに他の場所でメッセージを消費することができます。

または、変換することもできます SendPort 次のようにチャネルに書き込むときに、すべての文字列を大文字に強制するものに変換します。

val transformOnSend = Channels.mapSend(channel, Function<String, String> { msg: String? -> msg?.toUpperCase() })

これは、メッセージが書き込まれるときに影響します。この場合、ラップされたチャネルには、変換されたメッセージのみが表示されます。 ただし、必要に応じて、ラップされているチャネルに直接書き込み、この変換をバイパスすることもできます。

7. データフロー

Quasar Coreは、リアクティブプログラミングをサポートするためのツールをいくつか提供します。 これらはRxJavaのようなものほど強力ではありませんが、ほとんどの場合には十分すぎるほどです。

ValVarの2つの概念にアクセスできます。  Valは一定の値を表し、Varは変化する値を表します

どちらのタイプも、値なし、または値を計算するためにファイバーで使用されるSuspendableCallableのいずれかで構成されます。

val a = Var<Int>()
val b = Val<Int>()

val c = Var<Int> { a.get() + b.get() }
val d = Var<Int> { a.get() * b.get() }

// (a*b) - (a+b)
val initialResult = Val<Int> { d.get() - c.get() }
val currentResult = Var<Int> { d.get() - c.get() }

最初は、initialResultcurrentResultには値がなく、それらから値を取得しようとすると、現在のストランドがブロックされます。 aとbの値を指定するとすぐに、initialResultとcurrentResultの両方から値を読み取ることができます。

これに加えて、さらに変更すると、currentResultはこれを反映するように更新されますが、initialResultは更新されません

a.set(2)
b.set(4)

Assert.assertEquals(2, initialResult.get())
Assert.assertEquals(2, currentResult.get())

a.set(3)

Assert.assertEquals(2, initialResult.get()) // Unchanged
Assert.assertEquals(5, currentResult.get()) // New Value

b を変更しようとすると、代わりに例外がスローされます。これは、aValには単一の値しか割り当てることができないためです

8. 結論

この記事では、非同期プログラミングに使用できるQuasarライブラリについて紹介しました。 ここで見たのは、Quasarで達成できることの基本にすぎません。 次のプロジェクトで試してみませんか?

ここで取り上げたいくつかの概念の例は、GitHubにあります。