1. 概要
RSocketは、Reactive Streamsセマンティクスを提供するアプリケーションプロトコルです。たとえば、HTTPの代替として機能します。
このチュートリアルでは、Spring Bootを使用してRSocketを確認し、具体的には、低レベルのRSocketAPIを抽象化するのにどのように役立つかを説明します。
2. 依存関係
spring-boot-starter-rsocket依存関係を追加することから始めましょう。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
これにより、rsocket-coreやrsocket-transport-nettyなどのRSocket関連の依存関係が一時的に取り込まれます。
3. サンプルアプリケーション
次に、サンプルアプリケーションを続行します。 RSocketが提供するインタラクションモデルを強調するために、トレーダーアプリケーションを作成します。 トレーダーアプリケーションは、クライアントとサーバーで構成されます。
3.1. サーバーのセットアップ
まず、サーバーをセットアップしましょう。これは、RSocketサーバーをブートストラップするSpring Bootアプリケーションになります。
spring-boot-starter-rsocketの依存関係があるため、SpringBootはRSocketサーバーを自動構成します。Spring Bootの場合と同様に、プロパティ駆動型の方法でRSocketサーバーのデフォルト構成値を変更できます。 。
たとえば、 application.properties ファイルに次の行を追加して、RSocketサーバーのポートを変更しましょう。
spring.rsocket.server.port=7000
その他のプロパティを変更して、必要に応じてサーバーをさらに変更することもできます。
3.2. クライアントのセットアップ
次に、Spring Bootアプリケーションでもあるクライアントを設定しましょう。
Spring BootはRSocket関連のコンポーネントのほとんどを自動構成しますが、セットアップを完了するためにいくつかのBeanも定義する必要があります。
@Configuration
public class ClientConfiguration {
@Bean
public RSocketRequester getRSocketRequester(){
RSocketRequester.Builder builder = RSocketRequester.builder();
return builder
.rsocketConnector(
rSocketConnector ->
rSocketConnector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2)))
)
.dataMimeType(MimeTypeUtils.APPLICATION_JSON)
.tcp("localhost", 7000);
}
}
ここでは、 RSocket クライアントを作成し、ポート7000でTCPトランスポートを使用するように構成しています。 これは、以前に構成したサーバーポートであることに注意してください。
このBean構成を定義すると、必要最低限の構造になります。
次に、さまざまな相互作用モデルを調べ、Spring Bootがそこでどのように役立つかを確認します。
4. RSocketとSpringBootを使用した要求/応答
リクエスト/レスポンスから始めましょう。 HTTPもこのタイプの通信を採用しているため、これはおそらく最も一般的で馴染みのある対話モデルです。
このインタラクションモデルでは、クライアントが通信を開始し、リクエストを送信します。 その後、サーバーは操作を実行し、クライアントに応答を返します。これにより、通信が完了します。
私たちのトレーダーアプリケーションでは、クライアントは特定の株式の現在の市場データを要求します。 その見返りに、サーバーは要求されたデータを渡します。
4.1. サーバ
サーバー側では、最初にハンドラーメソッドを保持するコントローラーを作成する必要があります。 ただし、Spring MVCのような@RequestMappingまたは@GetMappingアノテーションの代わりに、@MessageMappingアノテーションを使用します。
@Controller
public class MarketDataRSocketController {
private final MarketDataRepository marketDataRepository;
public MarketDataRSocketController(MarketDataRepository marketDataRepository) {
this.marketDataRepository = marketDataRepository;
}
@MessageMapping("currentMarketData")
public Mono<MarketData> currentMarketData(MarketDataRequest marketDataRequest) {
return marketDataRepository.getOne(marketDataRequest.getStock());
}
}
それでは、コントローラーを調べてみましょう。
@ Controller アノテーションを使用して、着信RSocketリクエストを処理するハンドラーを定義しています。 さらに、 @MessageMapping アノテーションを使用すると、関心のあるルートと、リクエストへの対応方法を定義できます。
この場合、サーバーは currentMarketData ルート、 単一の結果をクライアントに返します単核症 。
4.2. クライアント
次に、RSocketクライアントは、株式の現在の価格を尋ねて、単一の応答を取得する必要があります。
リクエストを開始するには、RSocketRequesterクラスを使用する必要があります。
@RestController
public class MarketDataRestController {
private final RSocketRequester rSocketRequester;
public MarketDataRestController(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
@GetMapping(value = "/current/{stock}")
public Publisher<MarketData> current(@PathVariable("stock") String stock) {
return rSocketRequester
.route("currentMarketData")
.data(new MarketDataRequest(stock))
.retrieveMono(MarketData.class);
}
}
この場合、RSocketクライアントはRESTコントローラーでもあり、そこからRSocketサーバーを呼び出すことに注意してください。 したがって、@RestControllerと@GetMappingを使用して、要求/応答エンドポイントを定義しています。
エンドポイントメソッドでは、 RSocketRequester を使用して、ルートを指定しています。 実際、これはRSocketサーバーが期待するルートです。 次に、リクエストデータを渡します。 そして最後に、 retrieveMono()メソッドを呼び出すと、SpringBootは要求/応答の相互作用を開始します。
5. RSocketおよびSpring Bootでファイアアンドフォーゲット
次に、ファイアアンドフォーゲットの相互作用モデルを見ていきます。 名前が示すように、クライアントはサーバーに要求を送信しますが、応答が返されることを期待していません。
私たちのトレーダーアプリケーションでは、一部のクライアントがデータソースとして機能し、市場データをサーバーにプッシュします。
5.1. サーバ
サーバーアプリケーションに別のエンドポイントを作成しましょう。
@MessageMapping("collectMarketData")
public Mono<Void> collectMarketData(MarketData marketData) {
marketDataRepository.add(marketData);
return Mono.empty();
}
ここでも、ルート値collectMarketDataを使用して新しい@MessageMappingを定義しています。 さらに、SpringBootは着信ペイロードをMarketDataインスタンスに自動的に変換します。
ただし、ここでの大きな違いはモノを返しますクライアントは私たちからの応答を必要としないので。
5.2. クライアント
ファイアアンドフォーゲットリクエストを開始する方法を見てみましょう。
別のRESTエンドポイントを作成します。
@GetMapping(value = "/collect")
public Publisher<Void> collect() {
return rSocketRequester
.route("collectMarketData")
.data(getMarketData())
.send();
}
ここではルートを指定しており、ペイロードはMarketplaceDataインスタンスになります。 retrieveMono()の代わりにsend()メソッドを使用してリクエストを開始しているため、インタラクションモデルはファイアアンドフォーゲットになります。
6. RSocketとSpringBootを使用したストリームのリクエスト
リクエストストリーミングは、より複雑なインタラクションモデルであり、クライアントはリクエストを送信しますが、サーバーから時間の経過とともに複数の応答を取得します。
この相互作用モデルをシミュレートするために、クライアントは特定の株式のすべての市場データを要求します。
6.1. サーバ
サーバーから始めましょう。 別のメッセージマッピングメソッドを追加します。
@MessageMapping("feedMarketData")
public Flux<MarketData> feedMarketData(MarketDataRequest marketDataRequest) {
return marketDataRepository.getAll(marketDataRequest.getStock());
}
ご覧のとおり、このハンドラーメソッドは他のメソッドと非常によく似ています。 別の部分はそれですフラックスを返しますモノの代わりに 。 最終的に、RSocketサーバーはクライアントに複数の応答を送信します。
6.2. クライアント
クライアント側では、リクエスト/ストリーム通信を開始するためのエンドポイントを作成する必要があります。
@GetMapping(value = "/feed/{stock}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<MarketData> feed(@PathVariable("stock") String stock) {
return rSocketRequester
.route("feedMarketData")
.data(new MarketDataRequest(stock))
.retrieveFlux(MarketData.class);
}
RSocketリクエストを調べてみましょう。
まず、ルートとリクエストのペイロードを定義します。 次に、retrieveFlux()メソッド呼び出しを使用して応答の期待値を定義します。 これは、相互作用モデルを決定する部分です。
また、クライアントはRESTサーバーでもあるため、応答メディアタイプをMediaType.TEXT_EVENT_STREAM_VALUE。として定義していることにも注意してください。
7. 例外処理
次に、サーバーアプリケーションで宣言的な方法で例外を処理する方法を見てみましょう。
要求/応答を行うときは、@MessageExceptionHandlerアノテーションを使用するだけです。
@MessageExceptionHandler
public Mono<MarketData> handleException(Exception e) {
return Mono.just(MarketData.fromException(e));
}
ここでは、例外ハンドラーメソッドに@MessageExceptionHandlerという注釈を付けています。 その結果、 Exception クラスは他のすべてのスーパークラスであるため、すべてのタイプの例外を処理します。
より具体的に、さまざまな例外タイプに対してさまざまな例外ハンドラーメソッドを作成できます。
これはもちろんリクエスト/レスポンスモデルの場合なので、 単核症 。 ここでのリターンタイプは、インタラクションモデルのリターンタイプと一致する必要があります。
8. 概要
このチュートリアルでは、Spring BootのRSocketサポートと、RSocketが提供するさまざまなインタラクションモデルについて詳しく説明しました。
いつものように、GitHubですべてのコードサンプルをチェックアウトできます。