1. 序章

このチュートリアルでは、 RSocket と、それがクライアント/サーバー通信を可能にする方法を最初に見ていきます。

2. RSocket とは何ですか?

RSocketは、分散アプリケーションでの使用を目的としたバイナリのポイントツーポイント通信プロトコルです。 その意味で、HTTPのような他のプロトコルに代わるものを提供します。

RSocketと他のプロトコルの完全な比較は、この記事の範囲を超えています。 代わりに、RSocketの主要な機能である相互作用モデルに焦点を当てます。

RSocketは4つの相互作用モデルを提供します。それを念頭に置いて、それぞれを例を挙げて説明します。

3. Mavenの依存関係

この例では、RSocketに必要な直接の依存関係は次の2つだけです。

<dependency>
    <groupId>io.rsocket</groupId>
    <artifactId>rsocket-core</artifactId>
    <version>0.11.13</version>
</dependency>
<dependency>
    <groupId>io.rsocket</groupId>
    <artifactId>rsocket-transport-netty</artifactId>
    <version>0.11.13</version>
</dependency>

rsocket-coreおよびrsocket-transport-nettyの依存関係は、MavenCentralで使用できます。

重要な注意点は、RSocketライブラリがリアクティブストリームを頻繁に使用することですFluxクラスとMonoクラスはこの記事全体で使用されているため、これらの基本的な理解が役立ちます。

4. サーバーのセットアップ

まず、Serverクラスを作成しましょう。

public class Server {
    private final Disposable server;

    public Server() {
        this.server = RSocketFactory.receive()
          .acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl()))
          .transport(TcpServerTransport.create("localhost", TCP_PORT))
          .start()
          .subscribe();
    }

    public void dispose() {
        this.server.dispose();
    }

    private class RSocketImpl extends AbstractRSocket {}
}

ここでは、RSocketFactoryを使用してTCPソケットをセットアップし、リッスンします。カスタム RSocketImpl を渡して、クライアントからの要求を処理します。 RSocketImplにメソッドを追加していきます。

次に、サーバーを起動するには、サーバーをインスタンス化する必要があります。

Server server = new Server();

単一のサーバーインスタンスで複数の接続を処理できます。 結果として、1つのサーバーインスタンスだけがすべての例をサポートします。

終了すると、 dispose メソッドはサーバーを停止し、TCPポートを解放します。

4. 相互作用モデル

4.1. 要求/応答

RSocketは要求/応答モデルを提供します–各要求は単一の応答を受け取ります。

このモデルでは、クライアントにメッセージを返す単純なサービスを作成します。

AbstractRSocket、 RSocketImplの拡張機能にメソッドを追加することから始めましょう。

@Override
public Mono<Payload> requestResponse(Payload payload) {
    try {
        return Mono.just(payload); // reflect the payload back to the sender
    } catch (Exception x) {
        return Mono.error(x);
    }
}

requestResponseメソッドは、リクエストごとに1つの結果を返します 、私たちが見ることができるように単核症応答タイプ。

Payloadは、メッセージの内容とメタデータを含むクラスです。 これは、すべての相互作用モデルで使用されます。 ペイロードのコンテンツはバイナリですが、Stringベースのコンテンツをサポートする便利なメソッドがあります。

次に、クライアントクラスを作成できます。

public class ReqResClient {

    private final RSocket socket;

    public ReqResClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP_PORT))
          .start()
          .block();
    }

    public String callBlocking(String string) {
        return socket
          .requestResponse(DefaultPayload.create(string))
          .map(Payload::getDataUtf8)
          .block();
    }

    public void dispose() {
        this.socket.dispose();
    }
}

クライアントはRSocketFactory.connect()メソッドを使用して、サーバーとのソケット接続を開始します。 ソケットでrequestResponseメソッドを使用して、サーバーにペイロードを送信します

ペイロードには、クライアントに渡されたStringが含まれています。 モノのとき応答が到着し、使用できます getDataUtf8() アクセスする方法応答の内容。

最後に、統合テストを実行して、要求/応答の動作を確認できます。 String をサーバーに送信し、同じStringが返されることを確認します。

@Test
public void whenSendingAString_thenRevceiveTheSameString() {
    ReqResClient client = new ReqResClient();
    String string = "Hello RSocket";

    assertEquals(string, client.callBlocking(string));

    client.dispose();
}

4.2. ファイアアンドフォーゲット

ファイアアンドフォーゲットモデルでは、クライアントはサーバーから応答を受信しません。

この例では、クライアントはシミュレートされた測定値を50ミリ秒間隔でサーバーに送信します。 サーバーは測定値を公開します。

RSocketImplクラスのサーバーにファイアアンドフォーゲットハンドラーを追加しましょう。

@Override
public Mono<Void> fireAndForget(Payload payload) {
    try {
        dataPublisher.publish(payload); // forward the payload
        return Mono.empty();
    } catch (Exception x) {
        return Mono.error(x);
    }
}

このハンドラーは、要求/応答ハンドラーと非常によく似ています。 でも、 fireAndForgetはMonoを返しますそれ以外の単核症

dataPublisher は、org.reactivestreams.Publisherのインスタンスです。 したがって、ペイロードをサブスクライバーが利用できるようにします。 リクエスト/ストリームの例でそれを利用します。

次に、ファイアアンドフォーゲットクライアントを作成します。

public class FireNForgetClient {
    private final RSocket socket;
    private final List<Float> data;

    public FireNForgetClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP_PORT))
          .start()
          .block();
    }

    /** Send binary velocity (float) every 50ms */
    public void sendData() {
        data = Collections.unmodifiableList(generateData());
        Flux.interval(Duration.ofMillis(50))
          .take(data.size())
          .map(this::createFloatPayload)
          .flatMap(socket::fireAndForget)
          .blockLast();
    }

    // ... 
}

ソケットの設定は以前とまったく同じです。

sendData()メソッドは、Fluxストリームを使用して複数のメッセージを送信します。 メッセージごとに、socket ::fireAndForgetを呼び出します。

モノを購読する必要があります各メッセージの応答 。 サブスクライブを忘れると、 socket ::fireAndForgetは実行されません。

flatMap オペレーターは、 Void 応答がサブスクライバーに渡されることを確認し、blockLastオペレーターはサブスクライバーとして機能します。

次のセクションまで、ファイアアンドフォーゲットテストを実行するのを待ちます。 その時点で、ファイアアンドフォーゲットクライアントによってプッシュされたデータを受信するためのリクエスト/ストリームクライアントを作成します。

4.3. リクエスト/ストリーム

リクエスト/ストリームモデルでは、単一のリクエストが複数のレスポンスを受信する場合があります。 これが実際に動作していることを確認するために、ファイアアンドフォーゲットの例に基づいて構築することができます。 そのために、前のセクションで送信した測定値を取得するためのストリームをリクエストしましょう。

前と同じように、サーバー上のRSocketImplに新しいリスナーを追加することから始めましょう。

@Override
public Flux<Payload> requestStream(Payload payload) {
    return Flux.from(dataPublisher);
}

requestStreamハンドラーはFluxを返しますストリーム 。 前のセクションで思い出したように、 fireAndForgetハンドラーは着信データをdataPublisherに公開しました。次に、同じを使用してFluxストリームを作成します。 ]dataPublisherをイベントソースとして使用します。 これを行うことにより、測定データはファイアアンドフォーゲットクライアントからリクエスト/ストリームクライアントに非同期的に流れます。

次に、リクエスト/ストリームクライアントを作成しましょう。

public class ReqStreamClient {

    private final RSocket socket;

    public ReqStreamClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP_PORT))
          .start()
          .block();
    }

    public Flux<Float> getDataStream() {
        return socket
          .requestStream(DefaultPayload.create(DATA_STREAM_NAME))
          .map(Payload::getData)
          .map(buf -> buf.getFloat())
          .onErrorReturn(null);
    }

    public void dispose() {
        this.socket.dispose();
    }
}

以前のクライアントと同じ方法でサーバーに接続します。

getDataStream() socket.requestStream()を使用してFluxを受信しますサーバーからのストリーム 。 そのストリームから、バイナリデータからFloat値を抽出します。 最後に、ストリームが呼び出し元に返され、呼び出し元がストリームをサブスクライブして結果を処理できるようになります。

それでは、テストしてみましょう。 ファイアアンドフォーゲットからリクエスト/ストリームまでの往復を確認します。

各値は、送信されたのと同じ順序で受信されたと断言できます。 次に、送信されたのと同じ数の値を受信したと断言できます。

@Test
public void whenSendingStream_thenReceiveTheSameStream() {
    FireNForgetClient fnfClient = new FireNForgetClient(); 
    ReqStreamClient streamClient = new ReqStreamClient();

    List<Float> data = fnfClient.getData();
    List<Float> dataReceived = new ArrayList<>();

    Disposable subscription = streamClient.getDataStream()
      .index()
      .subscribe(
        tuple -> {
            assertEquals("Wrong value", data.get(tuple.getT1().intValue()), tuple.getT2());
            dataReceived.add(tuple.getT2());
        },
        err -> LOG.error(err.getMessage())
      );

    fnfClient.sendData();

    // ... dispose client & subscription

    assertEquals("Wrong data count received", data.size(), dataReceived.size());
}

4.4. チャネル

チャネルモデルは双方向通信を提供します。 このモデルでは、メッセージストリームは両方向に非同期で流れます。

これをテストするための簡単なゲームシミュレーションを作成しましょう。 このゲームでは、チャンネルの両側がプレーヤーになります。  ゲームが実行されると、これらのプレーヤーはランダムな時間間隔で反対側にメッセージを送信します。 反対側はメッセージに反応します。

まず、サーバー上にハンドラーを作成します。 前と同じように、RSocketImplに次のように追加します。

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
    Flux.from(payloads)
      .subscribe(gameController::processPayload);
    return Flux.from(gameController);
}

requestChannelハンドラーには、入力と出力の両方のペイロードストリームがあります。 The 出版社 inputパラメータは、クライアントから受信したペイロードのストリームです。 それらが到着すると、これらのペイロードは gameController ::processPayload関数に渡されます。

それに応じて、別のFluxストリームをクライアントに返します。 このストリームは、 gameController から作成されます。これは、Publisherでもあります。

GameControllerクラスの概要は次のとおりです。

public class GameController implements Publisher<Payload> {
    
    @Override
    public void subscribe(Subscriber<? super Payload> subscriber) {
        // send Payload messages to the subscriber at random intervals
    }

    public void processPayload(Payload payload) {
        // react to messages from the other player
    }
}

GameController がサブスクライバーを受信すると、そのサブスクライバーへのメッセージの送信を開始します。

次に、クライアントを作成しましょう。

public class ChannelClient {

    private final RSocket socket;
    private final GameController gameController;

    public ChannelClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP_PORT))
          .start()
          .block();

        this.gameController = new GameController("Client Player");
    }

    public void playGame() {
        socket.requestChannel(Flux.from(gameController))
          .doOnNext(gameController::processPayload)
          .blockLast();
    }

    public void dispose() {
        this.socket.dispose();
    }
}

前の例で見たように、クライアントは他のクライアントと同じ方法でサーバーに接続します。

クライアントは、GameControllerの独自のインスタンスを作成します。

socket.requestChannel()を使用して、ペイロードストリームをサーバーに送信します。  サーバーは、独自のペイロードストリームで応答します。

サーバーからペイロードを受信すると、それらを gameController ::processPayloadハンドラーに渡します。

私たちのゲームシミュレーションでは、クライアントとサーバーはお互いの鏡像です。 つまり、各側がペイロードのストリームを送信し、もう一方の端からペイロードのストリームを受信しています

ストリームは、同期せずに独立して実行されます。

最後に、テストでシミュレーションを実行してみましょう。

@Test
public void whenRunningChannelGame_thenLogTheResults() {
    ChannelClient client = new ChannelClient();
    client.playGame();
    client.dispose();
}

5. 結論

この紹介記事では、RSocketが提供する相互作用モデルについて説明しました。 例の完全なソースコードは、Githubリポジトリにあります。

詳細については、RSocketのWebサイトを確認してください。 特に、FAQおよびMotivationsドキュメントは優れた背景を提供します。