Spring Bootを使用したRSocket

1. 概要

RSocketは、Reactive Streamsセマンティクスを提供するアプリケーションプロトコルです。たとえば、HTTPの代替として機能します。
このチュートリアルでは、https://www.baeldung.com/spring-boot-start [Spring Boot]を使用してlink:/rsocket[RSocket]を調べ、具体的には下位レベルのRSocket APIを抽象化するのに役立ちます。

2. 依存関係

https://mvnrepository.com/search?q=spring-boot-starter-rsocket[_spring-boot-starter-rsocket_]依存関係の追加から始めましょう。
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
これにより、_https://mvnrepository.com/search?q = rsocket-core [rsocket-core] _やhttps://mvnrepository.com/search?q=rsocket-transport-netty [ _rsocket-transport-netty_]。

3. サンプルアプリケーション

次に、サンプルアプリケーションを続行します。 RSocketが提供する相互作用モデルを強調するために、トレーダーアプリケーションを作成します。 トレーダーアプリケーションは、クライアントとサーバーで構成されます。

3.1. サーバーのセットアップ

最初に、RSocketサーバーをブートストラップするSpring Bootアプリケーションになるサーバーをセットアップしましょう。
  • spring-boot-starter-rsocket依存関係があるため、Spring BootはRSocketを自動構成します*通常のSpring Bootでは、RSocketサーバーのデフォルト設定値をプロパティ駆動型で変更できます。

    たとえば、_application.properties_ファイルに次の行を追加して、RSocketサーバーのポートを変更しましょう。
spring.rsocket.server.port=7000
https://docs.spring.io/spring-boot/docs/2.2.0.M2/reference/html/appendix.html#rsocket-properties [その他のプロパティ]を変更して、必要に応じてサーバーをさらに変更することもできます。 。

3.2. クライアント設定

次に、Spring Bootアプリケーションとなるクライアントを設定しましょう。
Spring BootはRSocket関連のコンポーネントのほとんどを自動構成しますが、セットアップを完了するためにいくつかのBeanも定義する必要があります。
@Configuration
public class ClientConfiguration {

    @Bean
    public RSocket rSocket() {
        return RSocketFactory
          .connect()
          .mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
          .frameDecoder(PayloadDecoder.ZERO_COPY)
          .transport(TcpClientTransport.create(7000))
          .start()
          .block();
    }

    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
    }
}
ここでは、_RSocket_クライアントを作成し、ポート7000でTCPトランスポートを使用するように構成しています。 これは以前に設定したサーバーポートであることに注意してください。
次に、_RSocket_のラッパーである_RSocketRequester_ Beanを定義しています。 このBeanは、RSocketサーバーとやり取りする際に役立ちます。
これらのBean構成を定義すると、ベアボーン構造になります。
次に、*さまざまな相互作用モデル*を調査し、そこでSpring Bootがどのように役立つかを見ていきます。

4. RSocketとSpring Bootを使用したリクエスト/レスポンス

要求/応答から始めましょう。 *これはおそらくHTTPもこのタイプの通信を採用しているため、おそらく最も一般的で馴染みのある相互作用モデルです。*
この相互作用モデルでは、クライアントが通信を開始し、リクエストを送信します。 その後、サーバーは操作を実行し、クライアントに応答を返します。つまり、通信は完了します。
トレーダーアプリケーションでは、クライアントは特定の株式の現在の市場データを要求します。 代わりに、サーバーは要求されたデータを渡します。

4.1. サーバ

サーバー側では、最初にコントローラーを作成してハンドラーメソッドを保持する必要があります。 *ただし、Spring MVCのようなlink:/spring-requestmapping[[email protected]_]またはlink:/spring-new-requestmapping-shortcuts[[email protected]_]アノテーションの代わりに、_ @ MessageMapping_アノテーションを使用します*:
@Controller
public class MarketDataRSocketController {

    private final MarketDataRepository marketDataRepository;

    public MarketDataRSocketController(MarketDataRepository marketDataRepository) {
        this.marketDataRepository = marketDataRepository;
    }

    @MessageMapping("currentMarketData")
    public Mono<MarketData> currentMarketData(MarketDataRequest marketDataRequest) {
        return marketDataRepository.getOne(marketDataRequest.getStock());
    }
}
それでは、コントローラーを調べてみましょう。
@_Controller_アノテーションを使用して、着信RSocket要求を処理するハンドラーを定義しています。 さらに、_ @ MessageMapping_アノテーションを使用すると、関心のあるルートと、リクエストへの対応方法を定義できます。
この場合、サーバーは_currentMarketData_ルートをリッスンします。これは*単一の結果を** Mono <MarketData> _ *としてクライアントに返します。

4.2. クライアント

次に、RSocketクライアントは株価の現在の価格を尋ね、単一の応答を取得する必要があります。
リクエストを開始するには、_RSocketRequester_クラスを使用する必要があります。
@RestController
public class MarketDataRestController {

    private final RSocketRequester rSocketRequester;

    public MarketDataRestController(RSocketRequester rSocketRequester) {
        this.rSocketRequester = rSocketRequester;
    }

    @GetMapping(value = "/current/{stock}")
    public Publisher<MarketData> current(@PathVariable("stock") String stock) {
        return rSocketRequester
          .route("currentMarketData")
          .data(new MarketDataRequest(stock))
          .retrieveMono(MarketData.class);
    }
}
この場合、RSocketクライアントは、Rsocketサーバーを呼び出すRESTコントローラーでもあることに注意してください。 したがって、_ @ RestController_および_ @ GetMapping_を使用して、要求/応答エンドポイントを定義しています。
エンドポイントメソッドでは、_RSocketRequester_を使用し、ルートを指定しています。 実際、これはRSocketサーバーが期待するルートです。 次に、リクエストデータを渡します。 そして最後に、* _ retrieveMono()_メソッドを呼び出すと、Spring Bootは要求/応答の相互作用を開始します*。

5. RSocketとSpring BootでFire And Forget

次に、火と忘れの相互作用モデルを見ていきます。 名前が示すように、クライアントはサーバーに要求を送信しますが、応答が返されることを期待していません。
トレーダーアプリケーションでは、一部のクライアントがデータソースとして機能し、市場データをサーバーにプッシュします。

5.1. サーバ

サーバーアプリケーションに別のエンドポイントを作成しましょう。
@MessageMapping("collectMarketData")
public Mono<Void> collectMarketData(MarketData marketData) {
    marketDataRepository.add(marketData);
    return Mono.empty();
}
繰り返しますが、_collectMarketData_のルート値で新しい_ @ MessageMapping_を定義しています。 さらに、Spring Bootは着信ペイロードを_MarketData_インスタンスに自動的に変換します。
ただし、ここでの大きな違いは、クライアントが私たちからの応答を必要としないため、* Mono <Void> _を返すことです*。

5.2. クライアント

消火忘れ要求を開始する方法を見てみましょう。
別のRESTエンドポイントを作成します。
@GetMapping(value = "/collect")
public Publisher<Void> collect() {
    return rSocketRequester
      .route("collectMarketData")
      .data(getMarketData())
      .send();
}
ここでは、ルートを指定しており、ペイロードは_MarketData_インスタンスになります。 * _retrieveMono()_の代わりに_send()_メソッドを使用してリクエストを開始しているため、相互作用モデルは完全に忘れられます*。

6. RSocketとSpring Bootを使用したリクエストストリーム

要求のストリーミングは、クライアントが要求を送信しますが、サーバーから時間の経過とともに複数の応答を取得する、より複雑な対話モデルです。
この相互作用モデルをシミュレートするために、クライアントは特定の株式のすべての市場データを要求します。

6.1. サーバ

サーバーから始めましょう。 別のメッセージマッピングメソッドを追加します。
@MessageMapping("feedMarketData")
public Flux<MarketData> feedMarketData(MarketDataRequest marketDataRequest) {
    return marketDataRepository.getAll(marketDataRequest.getStock());
}
ご覧のとおり、このハンドラーメソッドは他のメソッドと非常によく似ています。 異なる部分は、* a _Mono <MarketData> _ *ではなくa _Flux <MarketData> _を返すことです。 最終的に、RSocketサーバーはクライアントに複数の応答を送信します。

6.2. クライアント

クライアント側では、リクエスト/ストリーム通信を開始するエンドポイントを作成する必要があります。
@GetMapping(value = "/feed/{stock}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<MarketData> feed(@PathVariable("stock") String stock) {
    return rSocketRequester
      .route("feedMarketData")
      .data(new MarketDataRequest(stock))
      .retrieveFlux(MarketData.class);
}
RSocketリクエストを調べてみましょう。
まず、ルートとリクエストペイロードを定義しています。 *そして、_retrieveFlux()_メソッド呼び出しで応答の期待値を定義しています*。 これは、相互作用モデルを決定する部分です。
また、クライアントはRESTサーバーでもあるため、応答メディアタイプを_MediaType.TEXT_EVENT_STREAM_VALUE._として定義することにも注意してください。

7. 例外処理

次に、宣言的な方法でサーバーアプリケーションで例外を処理する方法を見てみましょう。
リクエスト/レスポンスを行うとき、単に_ @ MessageExceptionHandler_アノテーションを使用できます:
@MessageExceptionHandler
public Mono<MarketData> handleException(Exception e) {
    return Mono.just(MarketData.fromException(e));
}
ここで、例外ハンドラメソッドに_ @ MessageExceptionHandler_の注釈を付けました。 その結果、_Exception_クラスは他のすべてのクラスのスーパークラスであるため、すべてのタイプの例外を処理します。
より具体的に、異なる例外タイプに対して異なる例外ハンドラメソッドを作成できます。
これはもちろんリクエスト/レスポンスモデルのためです。したがって、_Mono <MarketData> ._を返します。

8. 概要

このチュートリアルでは、Spring BootのRSocketサポートと、RSocketが提供するさまざまな相互作用モデルについて詳しく説明しました。
いつものように、すべてのコードサンプルhttps://github.com/eugenp/tutorials/tree/master/spring-5-webflux[over on GitHub]をチェックアウトできます。