RSocketの紹介
1. 序章
このチュートリアルでは、 RSocket と、それがクライアント/サーバー通信を可能にする方法を最初に見ていきます。
2. RSocket とは何ですか?
RSocketは、分散アプリケーションでの使用を目的としたバイナリのポイントツーポイント通信プロトコルです。 その意味で、HTTPのような他のプロトコルに代わるものを提供します。
RSocketと他のプロトコルの完全な比較は、この記事の範囲を超えています。 代わりに、RSocketの主要な機能である相互作用モデルに焦点を当てます。
RSocketは4つの相互作用モデルを提供します。それを念頭に置いて、それぞれを例を挙げて説明します。
3. Mavenの依存関係
この例では、RSocketに必要な直接の依存関係は次の2つだけです。
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-core</artifactId>
<version>0.11.13</version>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-transport-netty</artifactId>
<version>0.11.13</version>
</dependency>
rsocket-coreおよびrsocket-transport-nettyの依存関係は、MavenCentralで使用できます。
重要な注意点は、RSocketライブラリがリアクティブストリームを頻繁に使用することです。 FluxクラスとMonoクラスはこの記事全体で使用されているため、これらの基本的な理解が役立ちます。
4. サーバーのセットアップ
まず、Serverクラスを作成しましょう。
public class Server {
private final Disposable server;
public Server() {
this.server = RSocketFactory.receive()
.acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl()))
.transport(TcpServerTransport.create("localhost", TCP_PORT))
.start()
.subscribe();
}
public void dispose() {
this.server.dispose();
}
private class RSocketImpl extends AbstractRSocket {}
}
ここでは、RSocketFactoryを使用してTCPソケットをセットアップし、リッスンします。カスタム RSocketImpl を渡して、クライアントからの要求を処理します。 RSocketImplにメソッドを追加していきます。
次に、サーバーを起動するには、サーバーをインスタンス化する必要があります。
Server server = new Server();
単一のサーバーインスタンスで複数の接続を処理できます。 結果として、1つのサーバーインスタンスだけがすべての例をサポートします。
終了すると、 dispose メソッドはサーバーを停止し、TCPポートを解放します。
4. 相互作用モデル
4.1. 要求/応答
RSocketは要求/応答モデルを提供します–各要求は単一の応答を受け取ります。
このモデルでは、クライアントにメッセージを返す単純なサービスを作成します。
AbstractRSocket、 RSocketImplの拡張機能にメソッドを追加することから始めましょう。
@Override
public Mono<Payload> requestResponse(Payload payload) {
try {
return Mono.just(payload); // reflect the payload back to the sender
} catch (Exception x) {
return Mono.error(x);
}
}
requestResponseメソッドは、リクエストごとに1つの結果を返します 、私たちが見ることができるように単核症
Payloadは、メッセージの内容とメタデータを含むクラスです。 これは、すべての相互作用モデルで使用されます。 ペイロードのコンテンツはバイナリですが、Stringベースのコンテンツをサポートする便利なメソッドがあります。
次に、クライアントクラスを作成できます。
public class ReqResClient {
private final RSocket socket;
public ReqResClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
}
public String callBlocking(String string) {
return socket
.requestResponse(DefaultPayload.create(string))
.map(Payload::getDataUtf8)
.block();
}
public void dispose() {
this.socket.dispose();
}
}
クライアントはRSocketFactory.connect()メソッドを使用して、サーバーとのソケット接続を開始します。 ソケットでrequestResponseメソッドを使用して、サーバーにペイロードを送信します。
ペイロードには、クライアントに渡されたStringが含まれています。 モノのとき
最後に、統合テストを実行して、要求/応答の動作を確認できます。 String をサーバーに送信し、同じStringが返されることを確認します。
@Test
public void whenSendingAString_thenRevceiveTheSameString() {
ReqResClient client = new ReqResClient();
String string = "Hello RSocket";
assertEquals(string, client.callBlocking(string));
client.dispose();
}
4.2. ファイアアンドフォーゲット
ファイアアンドフォーゲットモデルでは、クライアントはサーバーから応答を受信しません。
この例では、クライアントはシミュレートされた測定値を50ミリ秒間隔でサーバーに送信します。 サーバーは測定値を公開します。
RSocketImplクラスのサーバーにファイアアンドフォーゲットハンドラーを追加しましょう。
@Override
public Mono<Void> fireAndForget(Payload payload) {
try {
dataPublisher.publish(payload); // forward the payload
return Mono.empty();
} catch (Exception x) {
return Mono.error(x);
}
}
このハンドラーは、要求/応答ハンドラーと非常によく似ています。 でも、 fireAndForgetはMonoを返します
dataPublisher は、org.reactivestreams.Publisherのインスタンスです。 したがって、ペイロードをサブスクライバーが利用できるようにします。 リクエスト/ストリームの例でそれを利用します。
次に、ファイアアンドフォーゲットクライアントを作成します。
public class FireNForgetClient {
private final RSocket socket;
private final List<Float> data;
public FireNForgetClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
}
/** Send binary velocity (float) every 50ms */
public void sendData() {
data = Collections.unmodifiableList(generateData());
Flux.interval(Duration.ofMillis(50))
.take(data.size())
.map(this::createFloatPayload)
.flatMap(socket::fireAndForget)
.blockLast();
}
// ...
}
ソケットの設定は以前とまったく同じです。
sendData()メソッドは、Fluxストリームを使用して複数のメッセージを送信します。 メッセージごとに、socket ::fireAndForgetを呼び出します。
モノを購読する必要があります
flatMap オペレーターは、 Void 応答がサブスクライバーに渡されることを確認し、blockLastオペレーターはサブスクライバーとして機能します。
次のセクションまで、ファイアアンドフォーゲットテストを実行するのを待ちます。 その時点で、ファイアアンドフォーゲットクライアントによってプッシュされたデータを受信するためのリクエスト/ストリームクライアントを作成します。
4.3. リクエスト/ストリーム
リクエスト/ストリームモデルでは、単一のリクエストが複数のレスポンスを受信する場合があります。 これが実際に動作していることを確認するために、ファイアアンドフォーゲットの例に基づいて構築することができます。 そのために、前のセクションで送信した測定値を取得するためのストリームをリクエストしましょう。
前と同じように、サーバー上のRSocketImplに新しいリスナーを追加することから始めましょう。
@Override
public Flux<Payload> requestStream(Payload payload) {
return Flux.from(dataPublisher);
}
requestStreamハンドラーはFluxを返します
次に、リクエスト/ストリームクライアントを作成しましょう。
public class ReqStreamClient {
private final RSocket socket;
public ReqStreamClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
}
public Flux<Float> getDataStream() {
return socket
.requestStream(DefaultPayload.create(DATA_STREAM_NAME))
.map(Payload::getData)
.map(buf -> buf.getFloat())
.onErrorReturn(null);
}
public void dispose() {
this.socket.dispose();
}
}
以前のクライアントと同じ方法でサーバーに接続します。
の getDataStream() socket.requestStream()を使用してFluxを受信します
それでは、テストしてみましょう。 ファイアアンドフォーゲットからリクエスト/ストリームまでの往復を確認します。
各値は、送信されたのと同じ順序で受信されたと断言できます。 次に、送信されたのと同じ数の値を受信したと断言できます。
@Test
public void whenSendingStream_thenReceiveTheSameStream() {
FireNForgetClient fnfClient = new FireNForgetClient();
ReqStreamClient streamClient = new ReqStreamClient();
List<Float> data = fnfClient.getData();
List<Float> dataReceived = new ArrayList<>();
Disposable subscription = streamClient.getDataStream()
.index()
.subscribe(
tuple -> {
assertEquals("Wrong value", data.get(tuple.getT1().intValue()), tuple.getT2());
dataReceived.add(tuple.getT2());
},
err -> LOG.error(err.getMessage())
);
fnfClient.sendData();
// ... dispose client & subscription
assertEquals("Wrong data count received", data.size(), dataReceived.size());
}
4.4. チャネル
チャネルモデルは双方向通信を提供します。 このモデルでは、メッセージストリームは両方向に非同期で流れます。
これをテストするための簡単なゲームシミュレーションを作成しましょう。 このゲームでは、チャンネルの両側がプレーヤーになります。 ゲームが実行されると、これらのプレーヤーはランダムな時間間隔で反対側にメッセージを送信します。 反対側はメッセージに反応します。
まず、サーバー上にハンドラーを作成します。 前と同じように、RSocketImplに次のように追加します。
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
Flux.from(payloads)
.subscribe(gameController::processPayload);
return Flux.from(gameController);
}
requestChannelハンドラーには、入力と出力の両方のペイロードストリームがあります。 The 出版社
それに応じて、別のFluxストリームをクライアントに返します。 このストリームは、 gameController から作成されます。これは、Publisherでもあります。
GameControllerクラスの概要は次のとおりです。
public class GameController implements Publisher<Payload> {
@Override
public void subscribe(Subscriber<? super Payload> subscriber) {
// send Payload messages to the subscriber at random intervals
}
public void processPayload(Payload payload) {
// react to messages from the other player
}
}
GameController がサブスクライバーを受信すると、そのサブスクライバーへのメッセージの送信を開始します。
次に、クライアントを作成しましょう。
public class ChannelClient {
private final RSocket socket;
private final GameController gameController;
public ChannelClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
this.gameController = new GameController("Client Player");
}
public void playGame() {
socket.requestChannel(Flux.from(gameController))
.doOnNext(gameController::processPayload)
.blockLast();
}
public void dispose() {
this.socket.dispose();
}
}
前の例で見たように、クライアントは他のクライアントと同じ方法でサーバーに接続します。
クライアントは、GameControllerの独自のインスタンスを作成します。
socket.requestChannel()を使用して、ペイロードストリームをサーバーに送信します。 サーバーは、独自のペイロードストリームで応答します。
サーバーからペイロードを受信すると、それらを gameController ::processPayloadハンドラーに渡します。
私たちのゲームシミュレーションでは、クライアントとサーバーはお互いの鏡像です。 つまり、各側がペイロードのストリームを送信し、もう一方の端からペイロードのストリームを受信しています。
ストリームは、同期せずに独立して実行されます。
最後に、テストでシミュレーションを実行してみましょう。
@Test
public void whenRunningChannelGame_thenLogTheResults() {
ChannelClient client = new ChannelClient();
client.playGame();
client.dispose();
}
5. 結論
この紹介記事では、RSocketが提供する相互作用モデルについて説明しました。 例の完全なソースコードは、Githubリポジトリにあります。
詳細については、RSocketのWebサイトを確認してください。 特に、FAQおよびMotivationsドキュメントは優れた背景を提供します。