1.はじめに

このチュートリアルでは、まずhttp://rsocket.io/[RSocket]を見て、それがどのようにしてクライアントサーバー通信を可能にするかを見ていきます。

2. RSocketとは何ですか?

  • RSocketは、分散アプリケーションでの使用を目的としたバイナリのポイントツーポイント通信プロトコルです。その意味で、それはHTTPのような他のプロトコルに代わるものを提供します。

RSocketと他のプロトコルとの完全な比較はこの記事の範囲を超えています。代わりに、RSocketの重要な機能に焦点を当てます。

その相互作用モデル

  • RSocketは4つのインタラクションモデルを提供しています。

3. Mavenの依存関係

この例では、RSocketは2つの直接的な依存関係しか必要ありません。

<dependency>
    <groupId>io.rsocket</groupId>
    <artifactId>rsocket-core</artifactId>
    <version>0.11.13</version>
</dependency>
<dependency>
    <groupId>io.rsocket</groupId>
    <artifactId>rsocket-transport-netty</artifactId>
    <version>0.11.13</version>
</dependency>


rsocket-core

およびhttps://search.maven.org/search?q=rsocket-transport-netty[rsocket-transport-netty]依存関係はMaven Centralで利用できます。

  • 重要な注意点は、RSocketライブラリはhttps://projectreactor.io/[リアクティブストリーム]を頻繁に使用することです。


    Flux


    および


    M

    ]

    クラスが使用されているので、それらの基本的な理解に役立ちます。

4.サーバーの設定

まず、

Server

クラスを作成しましょう。

public class Server {
    private final Disposable server;

    public Server() {
        this.server = RSocketFactory.receive()
          .acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl()))
          .transport(TcpServerTransport.create("localhost", TCP__PORT))
          .start()
          .subscribe();
    }

    public void dispose() {
        this.server.dispose();
    }

    private class RSocketImpl extends AbstractRSocket {}
}

  • ここでは、TCPソケットを設定して待機するために

    RSocketFactory

    を使用しています。** クライアントからの要求を処理するために、カスタムの

    RSocketImpl

    を渡します。

それでは、

RSocketImpl

にメソッドを追加します。

次に、サーバーを起動するには、インスタンスを作成するだけです。

Server server = new Server();

  • 単一のサーバーインスタンスが複数の接続を処理できます** 。結果として、1つのサーバーインスタンスだけが私たちの例のすべてをサポートするでしょう。

終了したら、

dispose

メソッドでサーバーを停止し、TCPポートを解放します。

4.インタラクションモデル

4.1. リクエスト/レスポンス

RSocketはリクエスト/レスポンスモデルを提供します – 各リクエストは単一のレスポンスを受け取ります。

このモデルでは、メッセージをクライアントに返す簡単なサービスを作成します。


AbstractRSocket、RSocketImpl

の拡張にメソッドを追加することから始めましょう:

@Override
public Mono<Payload> requestResponse(Payload payload) {
    try {
        return Mono.just(payload);//reflect the payload back to the sender
    } catch (Exception x) {
        return Mono.error(x);
    }
}


requestResponse

メソッドは、

Mono <Payload>

応答型でわかるように、各要求に対して単一の結果を返します** 。


  • Payload

    は、メッセージの内容とメタデータ** を含むクラスです。

これはすべてのインタラクションモデルで使用されています。ペイロードのコンテンツはバイナリですが、

String

ベースのコンテンツをサポートする便利なメソッドがあります。

次に、クライアントクラスを作成します。

public class ReqResClient {

    private final RSocket socket;

    public ReqResClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP__PORT))
          .start()
          .block();
    }

    public String callBlocking(String string) {
        return socket
          .requestResponse(DefaultPayload.create(string))
          .map(Payload::getDataUtf8)
          .block();
    }

    public void dispose() {
        this.socket.dispose();
    }
}

クライアントは

RSocketFactory.connect()

メソッドを使用してサーバーとのソケット接続を開始します。

ソケットに

requestResponse

メソッドを使用してペイロードをサーバーに送信します

ペイロードには、クライアントに渡された

String

が含まれています。



Mono





_ <Payload>


レスポンスが到着したら、レスポンスの

String

コンテンツにアクセスするために


getDataUtf8()

_

methodを使用できます。

最後に、統合テストを実行してリクエスト/レスポンスの動作を確認します。

String

をサーバーに送信し、同じ

String

が返されることを確認します。

@Test
public void whenSendingAString__thenRevceiveTheSameString() {
    ReqResClient client = new ReqResClient();
    String string = "Hello RSocket";

    assertEquals(string, client.callBlocking(string));

    client.dispose();
}

4.2. 火と忘れ

消し忘れモデルでは、クライアントはサーバーから応答を受信しません。

この例では、クライアントはシミュレートされた測定値を50ミリ秒間隔でサーバーに送信します。サーバーは測定値を公開します。


RSocketImpl

クラスのサーバーにfire-and-forgetハンドラを追加しましょう。

@Override
public Mono<Void> fireAndForget(Payload payload) {
    try {
        dataPublisher.publish(payload);//forward the payload
        return Mono.empty();
    } catch (Exception x) {
        return Mono.error(x);
    }
}

このハンドラは、要求/応答ハンドラと非常によく似ています。

ただし、


fireAndForget

は、

Mono <Payload>

の代わりに

Mono <Void>


を返します。


dataPublisher

はhttp://www.reactive-streams.org/reactive-streams-1.0.2-javadoc/org/reactivestreams/Publisher.html?is-external=true[

org.reactivestreams.Publisher

]のインスタンスです。

したがって、ペイロードが加入者に利用可能になります。 request/streamの例でそれを利用します。

次に、忘れ去りクライアントを作成します。

public class FireNForgetClient {
    private final RSocket socket;
    private final List<Float> data;

    public FireNForgetClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP__PORT))
          .start()
          .block();
    }

   /** **  Send binary velocity (float) every 50ms ** /    public void sendData() {
        data = Collections.unmodifiableList(generateData());
        Flux.interval(Duration.ofMillis(50))
          .take(data.size())
          .map(this::createFloatPayload)
          .flatMap(socket::fireAndForget)
          .blockLast();
    }

   //...
}

ソケットの設定は以前とまったく同じです。


sendData()

メソッドは

Flux

ストリームを使用して複数のメッセージを送信します。 ** 各メッセージについて、__socket

fireAndForget__ ** を呼び出します。


メッセージごとに

Mono <Void>

レスポンスを購読する必要があります

。購読を忘れた場合、__socket

fireAndForget__は実行されません。


flatMap

演算子は、

Void

応答がサブスクライバに渡されるようにします。一方、

blockLast

演算子は、サブスクライバとして機能します。

次のセクションまで、忘却テストを実行するのを待ちます。その時点で、fire-and-forgetクライアントによってプッシュされたデータを受信するためのrequest/streamクライアントを作成します。

4.3. リクエスト/ストリーム

要求/ストリームモデルでは、単一の要求が複数の応答を受け取る可能性があります。実際に動作していることを確認するには、火と忘れの例を基にします。そのためには、前のセクションで送信した測定値を取得するためのストリームをリクエストしましょう。

前と同様に、サーバーの

RSocketImpl

に新しいリスナーを追加することから始めましょう。

@Override
public Flux<Payload> requestStream(Payload payload) {
    return Flux.from(dataPublisher);
}


  • requestStream

    ハンドラは

    Flux <Payload>

    stream ** を返します。前のセクションで思い出したように、

    fireAndForget

    ハンドラーは受信データを

    dataPublisherに公開しました。今度は、同じ

    dataPublisher

    をイベントソースとして使用して

    Flux__ストリームを作成します。これを行うことによって、測定データは私たちの忘れ去りクライアントから私たちの要求/ストリームクライアントに非同期に流れます。

次にリクエスト/ストリームクライアントを作成しましょう:

public class ReqStreamClient {

    private final RSocket socket;

    public ReqStreamClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP__PORT))
          .start()
          .block();
    }

    public Flux<Float> getDataStream() {
        return socket
          .requestStream(DefaultPayload.create(DATA__STREAM__NAME))
          .map(Payload::getData)
          .map(buf -> buf.getFloat())
          .onErrorReturn(null);
    }

    public void dispose() {
        this.socket.dispose();
    }
}

以前のクライアントと同じ方法でサーバーに接続します。


getDataStream()


では、サーバーからFlux <Payload>ストリームを受け取るために

socket.requestStream()

を使用しています

。そのストリームから、バイナリデータから

Float

値を抽出します。最後に、ストリームは呼び出し元に返され、呼び出し元はそれをサブスクライブして結果を処理できます。

それではテストしましょう。往復のリクエストからストリーミングまでの往復を確認します。

それぞれの値が送信されたのと同じ順序で受信されたと主張できます。それから、送信されたのと同じ数の値を受け取ったと主張できます。

@Test
public void whenSendingStream__thenReceiveTheSameStream() {
    FireNForgetClient fnfClient = new FireNForgetClient();
    ReqStreamClient streamClient = new ReqStreamClient();

    List<Float> data = fnfClient.getData();
    List<Float> dataReceived = new ArrayList<>();

    Disposable subscription = streamClient.getDataStream()
      .index()
      .subscribe(
        tuple -> {
            assertEquals("Wrong value", data.get(tuple.getT1().intValue()), tuple.getT2());
            dataReceived.add(tuple.getT2());
        },
        err -> LOG.error(err.getMessage())
      );

    fnfClient.sendData();

   //... dispose client & subscription

    assertEquals("Wrong data count received", data.size(), dataReceived.size());
}

4.4. チャネル

  • チャネルモデルは双方向通信を提供します** 。このモデルでは、メッセージストリームは両方向に非同期に流れます。

これをテストするために簡単なゲームシミュレーションを作成しましょう。このゲームでは、チャンネルの両側がプレイヤーになります。ゲームが実行されると、これらのプレイヤーはランダムな時間間隔で相手にメッセージを送ります。

反対側はメッセージに反応します。

まず、サーバー上にハンドラーを作成します。前と同様に、

RSocketImpl

に追加します。

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
    Flux.from(payloads)
      .subscribe(gameController::processPayload);
    return Flux.from(gameController);
}


  • requestChannel

    ハンドラには、入力と出力の両方に

    Payload

    ストリームがあります** 。

    Publisher <Payload>

    入力パラメータは、クライアントから受信したペイロードのストリームです。到着すると、これらのペイロードは

    gameController :: processPayload

    関数に渡されます。

それに応じて、異なる

Flux

ストリームをクライアントに返します。

このストリームは、

Publisher

でもある

gameController

から作成されます。

これは

GameController

クラスの概要です。

public class GameController implements Publisher<Payload> {

    @Override
    public void subscribe(Subscriber<? super Payload> subscriber) {
       //send Payload messages to the subscriber at random intervals
    }

    public void processPayload(Payload payload) {
       //react to messages from the other player
    }
}


GameController

が購読者を受信すると、その購読者へのメッセージ送信を開始します。

次に、クライアントを作成しましょう。

public class ChannelClient {

    private final RSocket socket;
    private final GameController gameController;

    public ChannelClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP__PORT))
          .start()
          .block();

        this.gameController = new GameController("Client Player");
    }

    public void playGame() {
        socket.requestChannel(Flux.from(gameController))
          .doOnNext(gameController::processPayload)
          .blockLast();
    }

    public void dispose() {
        this.socket.dispose();
    }
}

前の例で見たように、クライアントは他のクライアントと同じ方法でサーバーに接続します。

クライアントは

GameController

の独自のインスタンスを作成します。


  • Payload

    ストリームをサーバーに送信するために

    socket.requestChannel()

    を使用します。サーバーはそれ自身のペイロードストリームで応答します。

    ペイロードがサーバから受信されると、それらを__gameController

    processPayload__ハンドラに渡します。

私たちのゲームシミュレーションでは、クライアントとサーバーはお互いの鏡像です。つまり、** それぞれの側が

Payload

のストリームを送信し、もう一方の端から

Payload

のストリームを受信して​​います。

ストリームは同期せずに独立して実行されます。

最後に、テストでシミュレーションを実行しましょう。

@Test
public void whenRunningChannelGame__thenLogTheResults() {
    ChannelClient client = new ChannelClient();
    client.playGame();
    client.dispose();
}

5.まとめ

この紹介記事では、RSocketが提供するインタラクションモデルについて説明しました。例の完全なソースコードは、https://github.com/eugenp/tutorials/tree/master/rsocket[Githubレポジトリ]にあります。

より深い議論については、必ずhttp://rsocket.io/[RSSocket Webサイト]をチェックしてください。特に、http://rsocket.io/docs/FAQ[FAQ]およびhttp://rsocket.io/docs/Motivations[Motivations]の文書が優れた背景を提供します。