Kotlinの高度なクエーサーの使用法

1. 前書き

link:/kotlin-quasar [最近クエーサーを調べた]は、非同期プログラミングをより使いやすく効率的にするためのツールを提供します。 これでできることの基本を見て、軽量スレッドとメッセージの受け渡しを可能にしました。
このチュートリアルでは、非同期プログラミングをさらに進めるためにQuasarで実行できる、より高度なことをいくつか見ていきます。

2. 俳優

https://en.wikipedia.org/wiki/Actor_model[Actors]は、並行プログラミングの有名なプログラミング手法であり、Erlangで特に人気があります。 * Quasarでは、この形式のプログラミングの基本的な構成要素であるアクターを定義できます*。
俳優は次のことができます。
  • 他の俳優を開始する

  • 他のアクターにメッセージを送信する

  • 反応する他のアクターからメッセージを受信する

    これらの3つの機能は、アプリケーションの構築に必要なすべてを提供します。
    Quasarでは、アクターはストランドとして表されます—通常はファイバーですが、必要に応じてスレッドもオプションです—メッセージを受信するチャネル、およびライフサイクル管理とエラー処理の特別なサポート。

* 2.1。 ビルドにアクターを追加*

俳優はクエーサーの中核概念ではありません。 代わりに、それらへのアクセスを提供するhttps://search.maven.org/search?q=g:co.paralleluniverse%20and%20a:quasar-actors[thedependency]を追加する必要があります*:
<dependency>
    <groupId>co.paralleluniverse</groupId>
    <artifactId>quasar-actors</artifactId>
    <version>0.8.0</version>
</dependency>
使用中の他のQuasar依存関係と同じバージョンのこの依存関係を使用することが重要です。

* 2.2。 アクターの作成*

  • _Actor_クラスをサブクラス化してアクタを作成し、名前とa_MailboxConfig_を提供し、_doRun()_メソッドを実装します。

val actor = object : Actor<Int, String>("noopActor", MailboxConfig(5, Channels.OverflowPolicy.THROW)) {
    @Suspendable
    override fun doRun(): String {
        return "Hello"
    }
}
名前とメールボックス構成はどちらもオプションです。メールボックス構成を指定しない場合、デフォルトは無制限のメールボックスです。
*アクターのメソッドを手動で_ @ Suspendable_としてマークする必要があることに注意してください*。 Kotlinでは、例外を宣言する必要はまったくありません。つまり、拡張する基本クラスで__SuspendException __thatを宣言しないことを意味します。 これは、Quasarがメソッドを一時停止可能であると見なすには、もう少し助けが必要だということです。
アクターを作成したら、_spawn()_メソッドを使用して新しいファイバーを開始するか、_spawnThread()_を使用して新しいスレッドを開始することで、アクターを開始する必要があります。 繊維と糸の違いを除けば、これら2つは同じように機能します。
アクターをスポーンしたら、他のストランドと同じ程度に扱うことができます。 これには、_join()_を呼び出して実行が完了するのを待つことや、_get()_から値を取得することが含まれます。
actor.spawn()

println("Noop Actor: ${actor.get()}")

* 2.3。* アクターへのメッセージ送信

新しいアクタを生成すると、_spawn()_および_spawnThread()_メソッドは_ActorRef_インスタンスを返します。 *これを使用して、受信者にメッセージを送信することで、アクター自体と対話できます。*
Actor_ActorRef_は_SendPort_インターフェイスを実装するため、a and_Channel_の半分を生成するのと同じように使用できます。 これにより、アクターにメッセージを渡すために使用できる_send_および_trySend_メソッドにアクセスできます。
val actorRef = actor.spawn()

actorRef.send(1)

* 2.4。* アクターによるメッセージの受信

メッセージをアクターに渡すことができるようになったので、それらを使用して何かを実行できるようにする必要があります。 *アクター自体のitself_doRun()_メソッド内でこれを行います。そこで、_receive()_メソッドを呼び出して、処理する次のメッセージを取得できます*:
val actor = object : Actor<Int, Void?>("simpleActor", null) {
    @Suspendable
    override fun doRun(): Void? {
        val msg = receive()
        println("SimpleActor Received Message: $msg")
        return null
    }
}
_receive()_メソッドは、メッセージが利用可能になるまでアクター内でブロックし、アクターが必要に応じてこのメッセージを処理できるようにします。
多くの場合、アクターは多くのメッセージを受信し、それらすべてを処理するように設計されます。 そのため、アクターは通常、_doRun()_メソッド内に無限ループを持ち、着信するすべてのメッセージを処理します。
val actor = object : Actor<Int, Void?>("loopingActor", null) {
    @Suspendable
    override fun doRun(): Void? {
        while (true) {
            val msg = receive()

            if (msg > 0) {
                println("LoopingActor Received Message: $msg")
            } else {
                break
            }
        }

        return null
    }
}
これにより、値0を受け取るまで着信メッセージの処理が継続されます。

* 2.5。 メッセージの送信が速すぎる*

*場合によっては、アクターは送信されるメッセージよりも遅いメッセージを処理します*。 これにより、メールボックスがいっぱいになり、潜在的にオーバーフローします。
デフォルトのメールボックスポリシーには無制限の容量があります。 ただし、_MailboxConfigを提供することにより、アクターを作成するときにこれを構成できます。
代わりに、Quasarは、指定内容に関係なく、_THROW_のポリシーを使用します。
Actor<Int, String>("backlogActor",
    MailboxConfig(1, Channels.OverflowPolicy.THROW)) {
}
メールボックスのサイズを指定し、メールボックスがオーバーフローした場合、アクター内の_receive()_メソッドは、例外をスローしてアクターを中断させます。
これはどんな方法でも処理できるものではありません。
try {
    receive()
} catch (e: Throwable) {
    // This is never reached
}
これが発生すると、アクターの外部からの_get()_メソッドも例外をスローしますが、これは処理可能です*。 この場合、オーバーフローメッセージを追加した_send()_メソッドを指すスタックトレースで__QueueCapacityExceededException __をラップする_ExecutionException_を取得します。
メールボックスのサイズが制限されているアクターを使用していることがわかっている場合は、代わりに_trySend()_メソッドを使用してメッセージを送信できます。 これにより、アクターは失敗しません*。代わりに、メッセージが正常に送信されたかどうかを報告します。
val actor = object : Actor<Int, String>("backlogTrySendActor",
  MailboxConfig(1, Channels.OverflowPolicy.THROW)) {
    @Suspendable
    override fun doRun(): String {
        TimeUnit.MILLISECONDS.sleep(500);
        println("Backlog TrySend Actor Received: ${receive()}")

        return "No Exception"
    }
}

val actorRef = actor.spawn()

actorRef.trySend(1) // Returns True
actorRef.trySend(2) // Returns False

* 2.6。 速すぎるメッセージの読み取り*

反対の場合、*提供されているよりも速くメッセージを読み込もうとしているアクターがあるかもしれません*。 通常、これは問題ありません。アクターはメッセージが利用可能になるまでブロックしてから処理します。
ただし、状況によっては、これを他の方法で処理できるようにしたい場合があります。
メッセージの受信に関しては、次の3つのオプションを使用できます。
  • メッセージが利用可能になるまで無期限にブロックする

  • メッセージが利用可能になるか、タイムアウトが発生するまでブロックする

  • まったくブロックしないでください

    これまで、永久にブロックする_receive()_メソッドを使用しました。
    必要に応じて、* _receive()_メソッドにタイムアウトの詳細を提供できます*。 これにより、返される前にその期間だけブロックされます。受信したメッセージまたはタイムアウトした場合は_null_のいずれかです。
while(true) {
    val msg = receive(1, TimeUnit.SECONDS)
    if (msg != null) {
        // Process Message
    } else {
        println("Still alive")
    }
}
まれに、まったくブロックしないで、代わりにメッセージまたは_null_ですぐに戻ることができます。 代わりに_tryReceive()_メソッドを使用してこれを行うことができます—上記で見た__trySend()__methodのミラーとして:
while(true) {
    val msg = tryReceive()
    if (msg != null) {
        // Process Message
    } else {
        print(".")
    }
}

2.7. メッセージのフィルタリング

*これまでのところ、私たちのアクターは自分に送信されたすべてのメッセージを受信して​​います*。 ただし、必要に応じてこれを調整できます。
_doRun()_メソッドは、アクター機能の大部分を表すように設計されており、これから呼び出される_receive()_メソッドは、作業する次のメソッドを提供します。
*特定のメッセージを処理するかどうかを決定するa_filterMessage()_というメソッドをオーバーライドすることもできます*。 _receive()_メソッドはこれを呼び出し、_null_を返す場合、メッセージはアクターに渡されません。 たとえば、次の例は、奇数のすべてのメッセージを除外します。
override fun filterMessage(m: Any?): Int? {
    return when (m) {
        is Int -> {
            if (m % 2 == 0) {
                m
            } else {
                null
            }
        } else -> super.filterMessage(m)
    }
}
_filterMessage()_メソッドは、メッセージが到着するときに変換することもできます。 返す値はアクターに提供される値であるため、_filter_と_map_の両方として機能します。 唯一の制限は、戻り値の型がアクターの期待されるメッセージの型と一致する必要があるということです。
たとえば、次のコマンドはすべての奇数を除外しますが、すべての偶数に10を掛けます。
override fun filterMessage(m: Any?): Int? {
    return when (m) {
        is Int -> {
            if (m % 2 == 0) {
                m * 10
            } else {
                null
            }
        }
        else -> super.filterMessage(m)
    }
}

* 2.8。 アクターとエラー処理のリンク*

これまでのところ、私たちの俳優はすべて厳密に孤立して働いてきました。 *私たちは、お互いを監視するアクターを持ち、一方が他方のイベントに反応できるようにします*。 これは、必要に応じて対称または非対称の方法で実行できます。
現在、処理できる唯一のイベントは、アクターが終了したときです-故意に、または何らかの理由で失敗したためです。
_watch()_メソッドを使用してアクターをリンクすると、一方のアクター(ウォッチャー)に他方のアクター(監視対象)のライフサイクルイベントを通知できます。 これは厳密に一方向の問題であり、監視対象のアクターは監視者に関する通知を受け取りません。
val watcherRef = watcher.spawn()
val watchedRef = watched.spawn()
watcher.watch(watchedRef)
または、対称バージョンである_link()_メソッドを使用できます。 この場合、両方のアクターは、ウォッチャーとウォッチされたアクターを持つ代わりに、他方のライフサイクルイベントについて通知されます。
val firstRef = first.spawn()
val secondRef = second.spawn()
first.watch(secondRef)
どちらの場合も、効果は同じです。 *監視対象のアクターで発生するライフサイクルイベントは、_LifecycleMessage_タイプの特別なメッセージを監視者アクターの入力チャネルに追加します*。 これは、前述のように_filterMessage()_メソッドによって処理されます。
デフォルトの実装は、代わりにアクターの_handleLifecycleMessage()_メソッドにこれを渡し、必要に応じてこれらのメッセージを処理できます。
override fun handleLifecycleMessage(m: LifecycleMessage?): Int? {
    println("WatcherActor Received Lifecycle Message: ${m}")
    return super.handleLifecycleMessage(m)
}
ここで、_link()_と_watch()。には微妙な違いがあります。例外は、_receive()_呼び出しに応答してin_doRun()_メッセージで受信されます。
つまり、_link()_を使用すると、リンクされたアクターが終了したときに、アクターの_doRun()_メソッドが自動的に例外を認識し、_watch()_がreact_handleLifecycleMessage()_メソッドを実装してメッセージ。

* 2.9。 アクターの登録と取得*

これまでのところ、作成した直後にアクタとやり取りしたことがあるため、スコープ内の変数を使用してアクタとやり取りすることができました。 *ただし、場合によっては、アクタとスポーンした場所から遠く離れた場所でアクタとやり取りできる必要があります。*
これを行う1つの方法は、標準のプログラミング手法を使用することです。必要な場所からアクセスできるように、_ActorRef_変数を渡します。
クエーサーはこれを実現する別の方法を提供します。 アクタをcentral _ActorRegistry_に登録し、後で名前でアクセスできます。
val actorRef = actor.spawn()
actor.register()

val retrievedRef = ActorRegistry.getActor<ActorRef<Int>>("theActorName")

assertEquals(actorRef, retrievedRef)
これは、作成時にアクターに名前を付け、その名前で登録することを前提としています。 アクターに名前が付けられていない場合(たとえば、最初のコンストラクター引数が_null_である場合)、代わりに_register()_メソッドに名前を渡すことができます。
actor.register("renamedActor")
_ActorRegistry.getActor()_は静的なので、アプリケーションのどこからでもアクセスできます。
*未知の名前を使用してアクターを取得しようとすると、クエーサーはそのようなアクターが存在するまでブロックします。*これは潜在的に永久になる可能性があるため、回避するためにアクターを取得するときにタイムアウトを指定することもできますこの。 これは、要求されたアクターが見つからない場合、タイムアウト時に_null_を返します。
val retrievedRef = ActorRegistry.getActor<ActorRef<Int>>("unknownActor", 1, TimeUnit.SECONDS)

Assert.assertNull(retrievedRef)

3. アクターテンプレート

これまで、私たちは第一原理から俳優を書きました。 ただし、繰り返し使用される一般的なパターンがいくつかあります。 そのため、Quasarはこれらを簡単に再利用できるようにパッケージ化しました。
これらのテンプレートは、しばしばビヘイビアと呼ばれ、Erlangで使用されているのと同じ概念の用語を借用しています。
*これらのテンプレートの多くは、使用する追加機能を追加する_Actor_およびof_ActorRef_のサブクラスとして実装されています。アクターと対話する呼び出しコードの_ActorRef_クラス。

* 3.1。 リクエスト/返信*

*アクターの一般的な使用例は、呼び出し元のコードがメッセージを送信し、その後、アクターが何らかの作業を行って結果を送り返すことです。 Quasarは、_RequestReplyHelper_を提供して、この両方を簡単に実現できるようにします。
これを使用するには、メッセージはすべて_RequestMessage_クラスのサブクラスである必要があります。 これにより、Quasarは追加情報を保存して、正しい呼び出しコードへの返信を取得できます。
data class TestMessage(val input: Int) : RequestMessage<Int>()
呼び出しコードとして、_RequestReplyHelper.call()_を使用してアクターにメッセージを送信し、必要に応じて応答または例外を取得できます。
val result = RequestReplyHelper.call(actorRef, TestMessage(50))
アクター自体の内部で、メッセージを受信して​​処理し、_RequestReplyHelper.reply()_を使用して結果を送り返します。
val actor = object : Actor<TestMessage, Void?>() {
    @Suspendable
    override fun doRun(): Void {
        while (true) {
            val msg = receive()

            RequestReplyHelper.reply(msg, msg.input * 100)
        }

        return null
    }
}

* 3.2。 サーバ*

  • _ServerActor_は、要求/応答機能がアクター自体の一部である上記の拡張機能です。*これにより、アクターに同期呼び出しを行い、アクターから応答を取得することができます— _call()_を使用して_cast()_メソッドを使用して、応答を必要としないアクターに非同期呼び出しを行います。

    form_ServerActor_クラスを使用し、_ServerHandler_のインスタンスをコンストラクターに渡すことにより、この形式のアクターを実装します。 これは、同期呼び出しを処理するメッセージ、同期呼び出しから戻るメッセージ、および非同期呼び出しを処理するメッセージのタイプに共通です。
    a _ServerHandler_を実装する場合、実装する必要があるいくつかのメソッドがあります。
  • init —アクターの起動を処理する

  • terminate —アクターのシャットダウンの処理

  • handleCall —同期呼び出しを処理し、応答を返す

  • handleCast —非同期呼び出しを処理する

  • handleInfo — _Call_でも_Cast_でもないメッセージを処理する

  • handleTimeout —のメッセージを受信して​​いないときに処理します
    設定された期間

    これを実現する最も簡単な方法は、サブクラスmethods_AbstractServerHandler_です。これには、すべてのメソッドのデフォルト実装があります。 これにより、ユースケースに必要なビットのみを実装することができます。
val actor = ServerActor(object : AbstractServerHandler<Int, String, Float>() {
    @Suspendable
    override fun handleCall(from: ActorRef<*>?, id: Any?, m: Int?): String {
        println("Called with message: " + m + " from " + from)
        return m.toString() ?: "None"
    }

    @Suspendable
    override fun handleCast(from: ActorRef<*>?, id: Any?, m: Float?) {
        println("Cast message: " + m + " from " + from)
    }
})
  • Our handleCall()and _handleCast()_メソッドは、処理するメッセージで呼び出されますが、重要な場合に、メッセージの送信元への参照と、呼び出しを識別するための一意のIDも提供されます。 ソース_ActorRef_とIDの両方はオプションであり、存在しない場合があります。

    a _ServerActor_を生成すると、a _Server_インスタンスが返されます。 これは_ActorRef_のサブクラスであり、_call()_および__cast()、__の追加機能を提供し、必要に応じてメッセージを送信し、サーバーをシャットダウンするメソッドを提供します。
val server = actor.spawn()

val result = server.call(5)
server.cast(2.5f)

server.shutdown()

* 3.3。 プロキシサーバー*

_Server_パターンは、メッセージと指定された応答を処理する特定の方法を提供します。 *これに代わるものは_ProxyServer_です。これは同じ効果がありますが、より使いやすい形式です。*これはlink:/java-dynamic-proxies[Java動的プロキシ]を使用して標準を実装できるようにしますアクターを使用したJavaインターフェイス。
このパターンを実装するには、機能を説明するインターフェイスを定義する必要があります。
@Suspendable
interface Summer {
    fun sum(a: Int, b: Int) : Int
}
これは、必要な機能を備えた任意の標準Javaインターフェイスにすることができます。
次に、このインスタンスを_ProxyServerActor_コンストラクターに渡して、アクターを作成します。
val actor = ProxyServerActor(false, object : Summer {
    override fun sum(a: Int, b: Int): Int {
        return a + b
    }
})

val summerActor = actor.spawn()
_ProxyServerActor_にも渡されるブール値は、_void_メソッドにアクターのストランドを使用するかどうかを示すフラグです。 _true_に設定すると、メソッドが完了するまで呼び出し側のストランドはブロックされますが、メソッドからの戻りはありません。
Quasarは、呼び出し側ストランドではなく、必要に応じてアクター内でメソッド呼び出しを実行するようにします。 * _spawn()_または_spawnThread()_から返されるインスタンスは、上記のように_Server_とJava動的プロキシの力のおかげでインターフェイスの両方を実装します*:
// Calling the interface method
val result = (summerActor as Summer).sum(1, 2)

// Calling methods on Server
summerActor.shutdown()
内部では、Quasarは以前見た_Server_動作を使用して_ProxyServerActor_を実装し、同じように使用できます。 動的プロキシを使用すると、そのメソッドの呼び出しが簡単になります。

* 3.4。 イベントソース*

*イベントソースパターンにより、送信されたメッセージが複数のイベントハンドラーによって処理されるアクターを作成できます*。 これらのハンドラーは、必要に応じて追加および削除されます。 これは、非同期イベントを処理するために何度か見たパターンに従います。 ここでの唯一の本当の違いは、イベントハンドラーが呼び出し側ストランドではなく、アクターストランドで実行されることです。
特別なコードなしで_EventSourceActor_を作成し、標準的な方法で実行を開始します。
val actor = EventSourceActor<String>()
val eventSource = actor.spawn()
アクターが生成されたら、それに対してイベントハンドラーを登録できます。 これらのハンドラーの本体は、アクターのストランドで実行されますが、外部で登録されます:
eventSource.addHandler { msg ->
    println(msg)
}
  • Kotlinを使用すると、イベントハンドラーをラムダ関数*として記述できるため、ここにあるすべての機能を使用できます。 これには、ラムダ関数の外部から値にアクセスすることが含まれますが、これらは異なるストランド間でアクセスされます。したがって、マルチスレッドシナリオのようにこれを行う場合は注意する必要があります。

val name = "Baeldung"
eventSource.addHandler { msg ->
    println(name + " " + msg)
}
また、イベント処理コードの大きな利点も得られます。必要なときにいつでも必要な数のハンドラーを登録でき、各ハンドラーはそれぞれ1つのタスクに焦点を当てています。 すべてのハンドラーは同じストランド(アクターが実行されるストランド)で実行されるため、ハンドラーは処理する際にこれを考慮する必要があります。
そのため、これらのハンドラーに別のアクターに渡すことで重い処理を実行させるのが一般的です。

* 3.5。 有限状態マシン*

  • finite-state machineは、固定数の可能な状態があり、1つの状態の処理が別の状態に切り替えることができる標準の構成体です。 *。 この方法で多くのアルゴリズムを表現できます。

    Quasarでは、有限状態マシンをアクターとしてモデル化することができるため、アクター自体が現在の状態を維持し、各状態は本質的にメッセージハンドラーです。
    これを実装するには、アクターを_FiniteStateMachineActor_のサブクラスとして記述する必要があります。 その後、必要な数のメソッドを使用します。各メソッドはメッセージを処理し、次の状態に移行する新しい状態を返します。
@Suspendable
fun lockedState() : SuspendableCallable<SuspendableCallable<*>> {
    return receive {msg ->
        when (msg) {
            "PUSH" -> {
                println("Still locked")
                lockedState()
            }
            "COIN" -> {
                println("Unlocking...")
                unlockedState()
            }
            else -> TERMINATE
        }
    }
}
次に、_initialState()_メソッドを実装して、アクターに開始位置を指示する必要があります。
@Suspendable
override fun initialState(): SuspendableCallable<SuspendableCallable<*>> {
    return SuspendableCallable { lockedState() }
}
各stateメソッドは、必要なことを何でも行い、必要に応じて3つの可能な値のいずれかを返します。
  • 使用する新しい状態

  • 特別なトークン_TERMINATE_は、アクターがすべきことを示します
    シャットダウン

  • この特定のメッセージを消費しないことを示す_null_ —これで
    場合、メッセージは次の状態に移行できます

4. リアクティブストリーム

link:/java-9-reactive-streams[Reactive Streams]は、多くの言語とプラットフォームで一般的になっている比較的新しい標準です。 このAPIにより、RxJava、Akka、Quasarなど、非同期I / Oをサポートするさまざまなライブラリとフレームワーク間の相互運用が可能になります。
  • Quasarの実装により、リアクティブストリームとQuasarチャネルを変換できます*。これにより、これらのストリームからのイベントをストランドにフィードしたり、ストランドからのメッセージをストリームにフィードしたりすることができます。

    リアクティブストリームの概念はa _Publisher_およびa __Subscriberです。 __パブリッシャーは、サブスクライバーにメッセージをパブリッシュできるものです。 逆に、Quasarは_SendPort_と_ReceivePort_の概念を使用します。ここでは、_SendPort_を使用してメッセージを送信し、_ReceivePort_を使用して同じメッセージを受信します。 Quasarには、_Topic_という概念もあります。これは、複数のチャネルにメッセージを送信できるようにするための単なるメカニズムです。
    これらは同様の概念であり、Quasarは一方を他方に変換します。

* 4.1。 ビルドにリアクティブストリームを追加する*

リアクティブストリームは、クエーサーのコアコンセプトではありません。 代わりに、* https://search.maven.org/search?q = g:co.paralleluniverse%20and%20a:quasar-reactive-streams [dependency]を追加して、それらにアクセスできるようにする必要があります*:
<dependency>
    <groupId>co.paralleluniverse</groupId>
    <artifactId>quasar-reactive-streams</artifactId>
    <version>0.8.0</version>
</dependency>
使用中の他のQuasar依存関係と同じバージョンのこの依存関係を使用することが重要です。 また、アプリケーションで使用しているReactive Streams APIと依存関係が一致していることも重要です。 たとえば、_quasar-reactive-streams:0.8.0_は_reactive-streams:1.0.2_に依存します。
既にReactive Streamsに依存していない場合、これは問題になりません。 すでにReactiveストリームに依存している場合にのみ、これを気にする必要があります。これは、ローカルの依存関係がQuasarが依存するものをオーバーライドするためです。

* 4.2。 リアクティブストリームへの公開*

  • Quasarはa Channel_をa _Publisher *に変換する機能を提供するため、標準のQuasarチャネルを使用してメッセージを生成できますが、受信コードはそれをリアクティブ_Publisher_として処理できます。

val inputChannel = Channels.newChannel<String>(1)
val publisher = ReactiveStreams.toPublisher(inputChannel)
これが完了したら、_Publisher_を他の__Publisher ___instanceのように扱うことができます。これは、クライアントコードがQuasarをまったく意識する必要がないこと、またはコードが非同期であることを意味します。
_inputChannel_に送信されるメッセージはすべて、このストリームに追加されるため、サブスクライバーがプルできます。
この時点では、ストリームのサブスクライバーは1人のみです。 2番目のサブスクライバーを追加しようとすると、代わりに例外がスローされます。
*複数のサブスクライバーをサポートする場合は、代わりに_Topic_を使用できます。*これは、Reactive Streamsエンドからは同じように見えますが、複数のサブスクライバーをサポートする_Publisher_になります。
val inputTopic = Topic<String>()
val publisher = ReactiveStreams.toPublisher(inputTopic)

* 4.3。 リアクティブストリームのサブスクライブ*

*これの反対側は、a _Publisher_をa _Channel_ *に変換します。 これにより、標準のQuasarチャネルを他のチャネルであるかのように使用して、Reactiveストリームからのメッセージを消費できます。
val channel = ReactiveStreams.subscribe(10, Channels.OverflowPolicy.THROW, publisher)
これにより、チャネルの_ReceivePort_部分が得られます。 完了したら、標準のQuasarコンストラクトを使用してメッセージを消費し、他のチャネルと同じように処理できます。 これらのメッセージは、どこから来たとしても、Reactiveストリームから発信されます。

5. 結論

Quasarを使用して達成できる、より高度な手法を見てきました。 *これらにより、より良い、より保守性の高い非同期コードを記述し、異なる非同期ライブラリから出力されるストリームとより簡単に対話できるようになります*。
ここで取り上げたいくつかの概念の例は、https://github.com/eugenp/tutorials/tree/master/kotlin-quasar [GitHubで]にあります。