Kotlinの高度なQuasarの使用法
1. 序章
私たちは最近Quasarを調べました。これは、非同期プログラミングをよりアクセスしやすく、より効率的にするためのツールを提供します。 軽量スレッドとメッセージパッシングを可能にする、それを使ってできることの基本を見てきました。
このチュートリアルでは、非同期プログラミングをさらに進めるためにQuasarで実行できるいくつかのより高度なことを見ていきます。
2. 俳優
Actors は、並行プログラミングでよく知られているプログラミング手法であり、Erlangで特に人気があります。 Quasarを使用すると、この形式のプログラミングの基本的な構成要素であるアクターを定義できます。
アクターは次のことができます。
- 他の俳優を始める
- 他のアクターにメッセージを送信する
- 彼らが反応する他のアクターからのメッセージを受信する
これらの3つの機能により、アプリケーションの構築に必要なすべてのものが得られます。
Quasarでは、アクターはストランド(通常はファイバーですが、必要に応じてスレッドもオプション)として表され、メッセージを取り込むためのチャネルと、ライフサイクル管理とエラー処理の特別なサポートがあります。
2.1. ビルドへのアクターの追加
アクターはQuasarのコアコンセプトではありません。 代わりに、それらへのアクセスを可能にする依存関係を追加する必要があります:
<dependency>
<groupId>co.paralleluniverse</groupId>
<artifactId>quasar-actors</artifactId>
<version>0.8.0</version>
</dependency>
使用中の他のQuasar依存関係と同じバージョンのこの依存関係を使用することが重要です。
2.2. アクターの作成
アクタークラスをサブクラス化し、名前と MailboxConfig を指定し、 doRun()メソッドを実装することでアクターを作成します。
val actor = object : Actor<Int, String>("noopActor", MailboxConfig(5, Channels.OverflowPolicy.THROW)) {
@Suspendable
override fun doRun(): String {
return "Hello"
}
}
名前とメールボックス構成はどちらもオプションです。メールボックス構成を指定しない場合、デフォルトは無制限のメールボックスです。
アクターのメソッドを手動で@Suspendableとしてマークする必要があることに注意してください。 Kotlinでは、例外を宣言する必要はまったくありません。つまり、拡張する基本クラスにあるSuspendExceptionを宣言しません。 これは、Quasarが、もう少し助けがなければ、メソッドを一時停止可能と見なさないことを意味します。
アクターを作成したら、それを開始する必要があります。 spawn()メソッドを使用して新しいファイバーを開始するか、 spawnThread()を使用して新しいスレッドを開始します。 ファイバーとスレッドの違いを除けば、これら2つは同じように機能します。
アクターをスポーンすると、他のストランドと同じ程度に扱うことができます。 これには、 join()を呼び出して実行が終了するのを待つことや、 get()を呼び出して値を取得できることが含まれます。
actor.spawn()
println("Noop Actor: ${actor.get()}")
2.3. アクターへのメッセージの送信
新しいアクターを生成すると、 spawn()メソッドと spawnThread()メソッドはActorRefインスタンスを返します。 これを使用して、アクターが受信するメッセージを送信することにより、アクター自体と対話できます。
ActorRefはSendPortインターフェイスを実装しているため、Channelの半分を生成する場合と同じように使用できます。 これにより、アクターにメッセージを渡すために使用できるsendおよびtrySendメソッドにアクセスできます。
val actorRef = actor.spawn()
actorRef.send(1)
2.4. アクターとのメッセージの受信
アクターにメッセージを渡すことができるようになったので、それらを使用して何かを実行できるようにする必要があります。 これは、アクター自体のdoRun()メソッド内で行います。ここで、receive()メソッドを呼び出して、次のメッセージを処理することができます:
val actor = object : Actor<Int, Void?>("simpleActor", null) {
@Suspendable
override fun doRun(): Void? {
val msg = receive()
println("SimpleActor Received Message: $msg")
return null
}
}
receive()メソッドは、メッセージが使用可能になるまでアクター内でブロックし、その後、アクターが必要に応じてこのメッセージを処理できるようにします。
多くの場合、アクターは多くのメッセージを受信してそれらすべてを処理するように設計されます。 そのため、アクターは通常、 doRun()メソッド内に無限ループを持ち、次のメッセージをすべて処理します。
val actor = object : Actor<Int, Void?>("loopingActor", null) {
@Suspendable
override fun doRun(): Void? {
while (true) {
val msg = receive()
if (msg > 0) {
println("LoopingActor Received Message: $msg")
} else {
break
}
}
return null
}
}
これにより、値0を受信するまで着信メッセージの処理が続行されます。
2.5. メッセージの送信が速すぎる
場合によっては、アクターはメッセージが送信されるよりも遅く処理します。 これにより、メールボックスがいっぱいになり、オーバーフローする可能性があります。
デフォルトのメールボックスポリシーには無制限の容量があります。 ただし、アクターを作成するときにこれを構成するには、
代わりに、Quasarは、指定内容に関係なく、THROWのポリシーを使用します。
Actor<Int, String>("backlogActor",
MailboxConfig(1, Channels.OverflowPolicy.THROW)) {
}
メールボックスのサイズを指定し、メールボックスがオーバーフローした場合、アクター内の receive()メソッドにより、アクターは例外をスローして中止します。
これは、私たちが処理できるものではありません。
try {
receive()
} catch (e: Throwable) {
// This is never reached
}
これが発生すると、アクターの外部からの get()メソッドも例外をスローしますが、これは処理できます。 この場合、オーバーフローメッセージを追加した send()メソッドを指すスタックトレースでQueueCapacityExceededExceptionをラップするExecutionExceptionを取得します。
メールボックスのサイズが制限されているアクターを使用していることがわかっている場合は、
val actor = object : Actor<Int, String>("backlogTrySendActor",
MailboxConfig(1, Channels.OverflowPolicy.THROW)) {
@Suspendable
override fun doRun(): String {
TimeUnit.MILLISECONDS.sleep(500);
println("Backlog TrySend Actor Received: ${receive()}")
return "No Exception"
}
}
val actorRef = actor.spawn()
actorRef.trySend(1) // Returns True
actorRef.trySend(2) // Returns False
2.6. メッセージを読むのが速すぎる
逆の場合、提供されているよりも速くメッセージを読み取ろうとしているアクターがいる可能性があります。 通常、これは問題ありません。アクターは、メッセージが利用可能になるまでブロックしてから処理します。
ただし、状況によっては、これを他の方法で処理できるようにしたい場合もあります。
メッセージの受信に関しては、次の3つのオプションを利用できます。
- メッセージが利用可能になるまで無期限にブロックする
- メッセージが利用可能になるまで、またはタイムアウトが発生するまでブロックします
- まったくブロックしないでください
これまで、永久にブロックする receive()メソッドを使用してきました。
必要に応じて、 receive()メソッドにタイムアウトの詳細を提供できます。 これにより、返される前にその期間だけブロックされます—受信したメッセージまたはタイムアウトした場合はnullのいずれかです。
while(true) {
val msg = receive(1, TimeUnit.SECONDS)
if (msg != null) {
// Process Message
} else {
println("Still alive")
}
}
まれに、まったくブロックせずに、メッセージまたはnullですぐに戻ることが必要になる場合があります。 これは、代わりに tryReceive()メソッドを使用して行うことができます—上記で見た trySend()メソッドのミラーとして:
while(true) {
val msg = tryReceive()
if (msg != null) {
// Process Message
} else {
print(".")
}
}
2.7. メッセージのフィルタリング
これまでのところ、アクターは送信されたすべてのメッセージを受信しています。 ただし、必要に応じてこれを調整できます。
doRun()メソッドは、アクター機能の大部分を表すように設計されており、これから呼び出される receive()メソッドは、次に使用するメソッドを提供します。
特定のメッセージを処理する必要があるかどうかを判断するfilterMessage()というメソッドをオーバーライドすることもできます。 receive()メソッドはこれを呼び出し、 null を返す場合、メッセージはアクターに渡されません。 たとえば、以下は奇数であるすべてのメッセージを除外します。
override fun filterMessage(m: Any?): Int? {
return when (m) {
is Int -> {
if (m % 2 == 0) {
m
} else {
null
}
} else -> super.filterMessage(m)
}
}
filterMessage()メソッドは、メッセージが届くときにメッセージを変換することもできます。 返される値はアクターに提供された値であるため、filterとmapの両方として機能します。 唯一の制限は、リターンタイプがアクターの予想されるメッセージタイプと一致する必要があることです。
たとえば、次の例では、すべての奇数を除外しますが、すべての偶数に10を掛けます。
override fun filterMessage(m: Any?): Int? {
return when (m) {
is Int -> {
if (m % 2 == 0) {
m * 10
} else {
null
}
}
else -> super.filterMessage(m)
}
}
2.8. アクターのリンクとエラー処理
これまでのところ、私たちのアクターはすべて厳密に孤立して働いてきました。 一方が他方のイベントに反応できるように、お互いを監視するアクターを持つ機能があります。 必要に応じて、対称または非対称の方法でこれを行うことができます。
現在、処理できる唯一のイベントは、アクターが意図的に、または何らかの理由で失敗したために終了したときです。
watch()メソッドを使用してアクターをリンクすると、一方のアクター(ウォッチャー)に、もう一方のアクター(ウォッチャー)のライフサイクルイベントを通知できます。 これは厳密には一方向の問題であり、監視対象のアクターは監視者について何も通知されません。
val watcherRef = watcher.spawn()
val watchedRef = watched.spawn()
watcher.watch(watchedRef)
または、対称バージョンである link()メソッドを使用することもできます。 この場合、ウォッチャーと監視対象のアクターを用意する代わりに、両方のアクターにもう一方のライフサイクルイベントが通知されます。
val firstRef = first.spawn()
val secondRef = second.spawn()
first.watch(secondRef)
どちらの場合も、効果は同じです。 監視対象のアクターで発生するライフサイクルイベントにより、LifecycleMessageタイプの特別なメッセージが監視対象のアクターの入力チャネルに追加されます。 次に、これは前述のように filterMessage()メソッドによって処理されます。
デフォルトの実装は、これを代わりにアクターの handleLifecycleMessage()メソッドに渡します。これにより、必要に応じてこれらのメッセージを処理できます。
override fun handleLifecycleMessage(m: LifecycleMessage?): Int? {
println("WatcherActor Received Lifecycle Message: ${m}")
return super.handleLifecycleMessage(m)
}
ここでは、 link()と watch()の間に微妙な違いがあります。 watch()では、標準の handleLifecycleMessage ()メソッドはリスナー参照を削除するだけですが、 link()を使用すると、 doRun()メッセージで受信される例外もスローされます。 receive()呼び出しに。
つまり、 link()を使用すると、リンクされたアクターが終了したときに、アクターの doRun()メソッドに例外が自動的に表示されますが、 watch()は強制されます handleLifecycleMessage()メソッドを実装して、メッセージに反応できるようにします。
2.9. アクターの登録と取得
これまでのところ、アクターを作成した直後にのみアクターと対話したことがあるため、スコープ内の変数を使用してアクターと対話することができました。 ただし、場合によっては、アクターを生成した場所から遠く離れた場所でアクターと対話できる必要があります。
これを行う1つの方法は、標準のプログラミング手法を使用することです。 ActorRef 変数を渡して、必要な場所からアクセスできるようにします。
クエーサーは、これを達成するための別の方法を提供します。 アクターを中央のActorRegistryに登録し、後で名前でアクセスすることができます。
val actorRef = actor.spawn()
actor.register()
val retrievedRef = ActorRegistry.getActor<ActorRef<Int>>("theActorName")
assertEquals(actorRef, retrievedRef)
これは、アクターを作成したときにアクターに名前を付け、その名前で登録したことを前提としています。 アクターに名前が付けられていない場合(たとえば、最初のコンストラクター引数が null の場合)、代わりに register()メソッドに名前を渡すことができます。
actor.register("renamedActor")
ActorRegistry.getActor()は静的であるため、アプリケーションのどこからでもこれにアクセスできます。
不明な名前を使用してアクターを取得しようとすると、Quasarはそのようなアクターが存在するまでブロックします。これは永久に発生する可能性があるため、取得時にタイムアウトを指定することもできます。これを避けるために俳優。 これにより、要求されたアクターが見つからない場合、タイムアウト時にnullが返されます。
val retrievedRef = ActorRegistry.getActor<ActorRef<Int>>("unknownActor", 1, TimeUnit.SECONDS)
Assert.assertNull(retrievedRef)
3. アクターテンプレート
これまで、私たちは第一原理からアクターを書きました。 ただし、何度も使用される一般的なパターンがいくつかあります。 そのため、Quasarは、これらを簡単に再利用できるようにパッケージ化しました。
これらのテンプレートは、Erlangで使用されているのと同じ概念の用語を借用して、Behaviorsと呼ばれることがよくあります。
これらのテンプレートの多くは、ActorおよびActorRefのサブクラスとして実装されており、使用する機能が追加されています。これにより、 Actor クラス内に、オーバーライドまたは呼び出し元の追加メソッドが提供されます。実装された機能の内部、および呼び出し元のコードがアクターと対話するためのActorRefクラスの追加メソッド。
3.1. リクエスト/リプライ
アクターの一般的な使用例は、呼び出し元のコードがメッセージを送信し、次にアクターが作業を行って結果を送り返すことです。呼び出し元のコードは応答を受信し、処理を続行します。 Quasarは、 RequestReplyHelper を提供して、これの両面を簡単に実現できるようにします。
これを使用するには、メッセージがすべてRequestMessageクラスのサブクラスである必要があります。 これにより、Quasarは追加情報を保存して、正しい呼び出しコードに応答を返すことができます。
data class TestMessage(val input: Int) : RequestMessage<Int>()
呼び出し元のコードとして、 RequestReplyHelper.call()を使用してアクターにメッセージを送信し、必要に応じて応答または例外を取得できます。
val result = RequestReplyHelper.call(actorRef, TestMessage(50))
アクター自体の内部で、メッセージを受信して処理し、 RequestReplyHelper.reply()を使用して結果を送り返します。
val actor = object : Actor<TestMessage, Void?>() {
@Suspendable
override fun doRun(): Void {
while (true) {
val msg = receive()
RequestReplyHelper.reply(msg, msg.input * 100)
}
return null
}
}
3.2. サーバ
ServerActorは、リクエスト/リプライ機能がアクター自体の一部である上記の拡張機能です。これにより、アクターへの同期呼び出しを行い、アクターから応答を取得する機能が提供されます。 call()メソッド—または応答が不要なアクターへの非同期呼び出しを行う— cast()メソッドを使用します。
ServerActor クラスを使用し、 ServerHandler のインスタンスをコンストラクターに渡すことにより、この形式のアクターを実装します。 これは、同期呼び出しで処理するメッセージ、同期呼び出しから戻るメッセージ、および非同期呼び出しで処理するメッセージのタイプに対して一般的です。
ServerHandler を実装する場合、実装する必要のあるメソッドがいくつかあります。
- init —起動するアクターを処理します
- terminate —アクターのシャットダウンを処理します
- handleCall —同期呼び出しを処理し、応答を返します
- handleCast —非同期呼び出しを処理します
- handleInfo — CallでもCastでもないメッセージを処理します
- handleTimeout —設定された期間メッセージを受信しなかった場合に処理します
これを実現する最も簡単な方法は、 AbstractServerHandler をサブクラス化することです。これには、すべてのメソッドのデフォルトの実装があります。 これにより、ユースケースに必要なビットのみを実装できるようになります。
val actor = ServerActor(object : AbstractServerHandler<Int, String, Float>() {
@Suspendable
override fun handleCall(from: ActorRef<*>?, id: Any?, m: Int?): String {
println("Called with message: " + m + " from " + from)
return m.toString() ?: "None"
}
@Suspendable
override fun handleCast(from: ActorRef<*>?, id: Any?, m: Float?) {
println("Cast message: " + m + " from " + from)
}
})
handleCall()メソッドとhandleCast()メソッドは、handle のメッセージとともに呼び出されますが、重要な場合は、メッセージの送信元への参照と、呼び出しを識別するための一意のIDも提供されます。 ソースActorRefとIDはどちらもオプションであり、存在しない場合があります。
ServerActor を生成すると、Serverインスタンスが返されます。 これはActorRefのサブクラスであり、 call()および cast()、に応じてメッセージを送信するための追加機能、およびシャットダウンするメソッドを提供します。サーバーがダウンしています:
val server = actor.spawn()
val result = server.call(5)
server.cast(2.5f)
server.shutdown()
3.3. プロキシサーバー
Server パターンは、メッセージと特定の応答を処理するための特定の方法を提供します。 これに代わるものはProxyServerで、これは同じ効果がありますが、より使いやすい形式です。これは、 Java動的プロキシを使用して、アクターを使用して標準のJavaインターフェースを実装できるようにします。
このパターンを実装するには、機能を説明するインターフェースを定義する必要があります。
@Suspendable
interface Summer {
fun sum(a: Int, b: Int) : Int
}
これは、必要な機能を備えた標準のJavaインターフェイスであればどれでもかまいません。
次に、このインスタンスを ProxyServerActor コンストラクターに渡して、アクターを作成します。
val actor = ProxyServerActor(false, object : Summer {
override fun sum(a: Int, b: Int): Int {
return a + b
}
})
val summerActor = actor.spawn()
ProxyServerActor にも渡されるブール値は、voidメソッドにアクターのストランドを使用するかどうかを示すフラグです。 true に設定すると、呼び出し側のストランドはメソッドが完了するまでブロックされますが、メソッドからの戻りはありません。
その後、Quasarは、呼び出し側ストランドではなく、必要に応じてアクター内でメソッド呼び出しを実行するようにします。 spawn()またはspawnThread()から返されたインスタンスは、Java動的プロキシの機能のおかげで、サーバー(上記のように)とインターフェイスの両方を実装します。
// Calling the interface method
val result = (summerActor as Summer).sum(1, 2)
// Calling methods on Server
summerActor.shutdown()
内部的には、Quasarは以前に見た Server の動作を使用して、 ProxyServerActor を実装し、同じように使用できます。 動的プロキシを使用すると、メソッドの呼び出しが簡単になります。
3.4. イベントソース
イベントソースパターンを使用すると、送信されたメッセージが複数のイベントハンドラーによって処理されるアクターを作成できます。 これらのハンドラーは、必要に応じて追加および削除されます。 これは、非同期イベントを処理するために何度か見たパターンに従います。 ここでの唯一の本当の違いは、イベントハンドラーが呼び出し側ストランドではなく、アクターストランドで実行されることです。
特別なコードなしでEventSourceActorを作成し、標準的な方法で実行を開始します。
val actor = EventSourceActor<String>()
val eventSource = actor.spawn()
アクターが生成されたら、それに対してイベントハンドラーを登録できます。 これらのハンドラーの本体は、アクターのストランドで実行されますが、アクターの外部で登録されます。
eventSource.addHandler { msg ->
println(msg)
}
Kotlinを使用すると、イベントハンドラーをラムダ関数として記述できるため、ここにあるすべての機能を使用できます。 これには、ラムダ関数の外部からの値へのアクセスが含まれますが、これらは異なるストランド間でアクセスされるため、マルチスレッドシナリオの場合と同様に、これを行う場合は注意が必要です。
val name = "Baeldung"
eventSource.addHandler { msg ->
println(name + " " + msg)
}
また、イベント処理コードの主な利点は、必要なときに必要な数のハンドラーを登録できることです。各ハンドラーは1つのタスクに集中しています。 すべてのハンドラーは、アクターが実行されるストランドと同じストランドで実行されるため、ハンドラーは、実行する処理でこれを考慮する必要があります。
そのため、これらのハンドラーに別のアクターに渡すことによって重い処理を実行させるのが一般的です。
3.5. 有限状態マシン
有限状態マシンは、可能な状態の数が固定されており、ある状態の処理を別の状態に切り替えることができる標準的な構成です。 このようにして多くのアルゴリズムを表すことができます。
Quasarを使用すると、有限状態マシンをアクターとしてモデル化できるため、アクター自体が現在の状態を維持し、各状態は基本的にメッセージハンドラーになります。
これを実装するには、アクターをFiniteStateMachineActorのサブクラスとして作成する必要があります。 次に、必要な数のメソッドがあり、それぞれがメッセージを処理し、新しい状態を返して次の状態に遷移します。
@Suspendable
fun lockedState() : SuspendableCallable<SuspendableCallable<*>> {
return receive {msg ->
when (msg) {
"PUSH" -> {
println("Still locked")
lockedState()
}
"COIN" -> {
println("Unlocking...")
unlockedState()
}
else -> TERMINATE
}
}
}
次に、 initialState()メソッドを実装して、アクターにどこから開始するかを指示する必要もあります。
@Suspendable
override fun initialState(): SuspendableCallable<SuspendableCallable<*>> {
return SuspendableCallable { lockedState() }
}
各stateメソッドは、必要な処理を実行し、必要に応じて3つの可能な値のいずれかを返します。
- 使用する新しい状態
- アクターがシャットダウンする必要があることを示す特別なトークンTERMINATE
- null は、この特定のメッセージを消費しないことを示します—この場合、メッセージは、遷移する次の状態で使用できます
4. リアクティブストリーム
Reactive Streams は比較的新しい標準であり、多くの言語とプラットフォームで普及しつつあります。 このAPIを使用すると、非同期I / Oをサポートするさまざまなライブラリとフレームワーク(RxJava、Akka、Quasarなど)間の相互運用が可能になります。
Quasarの実装により、ReactiveストリームとQuasarチャネルを変換できます。これにより、これらのストリームからのイベントをストランドにフィードしたり、ストランドからのメッセージをストリームにフィードしたりできます。
リアクティブストリームには、 出版社と
これらは同様の概念であり、Quasarを使用すると一方を他方に変換できます。
4.1. ビルドへのリアクティブストリームの追加
リアクティブストリームは、Quasarのコアコンセプトではありません。 代わりに、それらへのアクセスを可能にする依存関係を追加する必要があります:
<dependency>
<groupId>co.paralleluniverse</groupId>
<artifactId>quasar-reactive-streams</artifactId>
<version>0.8.0</version>
</dependency>
使用中の他のQuasar依存関係と同じバージョンのこの依存関係を使用することが重要です。 依存関係が、アプリケーションで使用しているReactiveStreamsAPIと一致していることも重要です。 たとえば、 quasar-reactive-streams:0.8.0はreactive-streams:1.0.2に依存します。
まだリアクティブストリームに依存していない場合、これは問題ではありません。 ローカル依存関係はQuasarが依存する依存関係をオーバーライドするため、すでにリアクティブストリームに依存している場合にのみ、これに注意する必要があります。
4.2. リアクティブストリームへの公開
Quasarを使用すると、チャネルをパブリッシャーに変換できるため、標準のQuasarチャネルを使用してメッセージを生成できますが、受信コードはそれをリアクティブなパブリッシャーとして扱うことができます。
val inputChannel = Channels.newChannel<String>(1)
val publisher = ReactiveStreams.toPublisher(inputChannel)
これを実行すると、 Publisherを他のPublisherインスタンスのように扱うことができます。つまり、クライアントコードはQuasarをまったく認識する必要がありません。または、コードが非同期であることさえあります。
inputChannel に送信されるすべてのメッセージは、サブスクライバーがプルできるように、このストリームに追加されます。
この時点で、ストリームへのサブスクライバーは1人だけです。 2番目のサブスクライバーを追加しようとすると、代わりに例外がスローされます。
複数のサブスクライバーをサポートする場合は、代わりにトピックを使用できます。これは、Reactive Streams側からは同じように見えますが、複数のサブスクライバーをサポートするPublisherになります。
val inputTopic = Topic<String>()
val publisher = ReactiveStreams.toPublisher(inputTopic)
4.3. リアクティブストリームのサブスクライブ
これの反対側は、パブリッシャーをチャネルに変換することです。 これにより、他のチャネルと同じように、標準のQuasarチャネルを使用してReactiveストリームからのメッセージを消費できます。
val channel = ReactiveStreams.subscribe(10, Channels.OverflowPolicy.THROW, publisher)
これにより、チャネルのReceivePort部分が得られます。 完了したら、標準のQuasar構造を使用してメッセージを消費することで、他のチャネルと同じように扱うことができます。 これらのメッセージは、どこから来たとしても、リアクティブストリームから発信されます。
5. 結論
Quasarを使用して達成できるいくつかのより高度な手法を見てきました。 これらにより、より優れた、より保守しやすい非同期コードを記述し、さまざまな非同期ライブラリから出力されるストリームとより簡単に対話できるようになります。
ここで取り上げたいくつかの概念の例は、GitHubのにあります。