RxJavaを使用したRatpack

1. 前書き

link:/rxjava-tutorial[RxJava]は、最も人気のあるリアクティブプログラミングライブラリの1つです。
そして、https://www.baeldung.com/ratpack [_Ratpack_]は、Netty上に構築された無駄のない強力なWebアプリケーションを作成するためのJavaライブラリのコレクションです。
このチュートリアルでは、RtJavaをRatpackアプリケーションに組み込み、反応の良いWebアプリを作成する方法について説明します。

2. Mavenの依存関係

さて、最初に__https://search.maven.org/search?q = g:io.ratpack%20AND%20a:ratpack-core [ratpack-core] __and __https://search.maven.org/searchが必要です?q = g:io.ratpack%20AND%20a:ratpack-rx [ratpack-rx] __dependencies:
<dependency>
    <groupId>io.ratpack</groupId>
    <artifactId>ratpack-core</artifactId>
    <version>1.6.0</version>
</dependency>
<dependency>
    <groupId>io.ratpack</groupId>
    <artifactId>ratpack-rx</artifactId>
    <version>1.6.0</version>
</dependency>
ちなみに、_ratpack-rx_は_rxjava_依存関係をインポートします。

3. 初期設定

RxJavaは、プラグインシステムを使用して、サードパーティライブラリの統合をサポートしています。 *したがって、異なる実行戦略をRxJavaの実行モデルに組み込むことができます。 *
Ratpackは、_RxRatpack_を介してこの実行モデルにプラグインし、起動時に初期化します。
RxRatpack.initialise();
ここで、* JVMの実行ごとに1回だけメソッドを呼び出す必要があることに注意することが重要です*。
その結果、* RxJavaの__Observable__sをRxRatpackの_Promise_タイプにマップできるようになり、その逆も可能です。

4.  ObservablesからPromises

RxJavaの_Observable_をRatpack _Promise._に変換できます
*ただし、少し不一致があります。
RxRatpackは、_promiseSingle()_と_promise()._の2つの異なるメソッドを提供することでこれを処理します。
それで、__ getMovie()で単一のpromiseを発行する_MovieService_という名前のサービスがあるとしましょう。 __一度だけしか放出しないことがわかっているため、_promiseSingle()_を使用します。
Handler movieHandler = (ctx) -> {
    MovieService movieSvc = ctx.get(MovieService.class);
    Observable<Movie> movieObs = movieSvc.getMovie();
    RxRatpack.promiseSingle(movieObs)
      .then(movie -> ctx.render(Jackson.json(movie)));
};
一方、_getMovies()_が映画の結果のストリームを返すことができる場合、_promise()_を使用します。
Handler moviesHandler = (ctx) -> {
    MovieService movieSvc = ctx.get(MovieService.class);
    Observable<Movie> movieObs = movieSvc.getMovies();
    RxRatpack.promise(movieObs)
      .then(movie -> ctx.render(Jackson.json(movie)));
};
次に、これらのハンドラーを通常のようにRatpackサーバーに追加します。
RatpackServer.start(def -> def.registryOf(rSpec -> rSpec.add(MovieService.class, new MovieServiceImpl()))
  .handlers(chain -> chain
    .get("movie", movieHandler)
    .get("movies", moviesHandler)));

5. オブザーバブルへの約束

逆に、Ratpackの_Promise_タイプをRxJava _Observable_にマップし直すことができます。* *
  • RxRatpackにも2つのメソッドがあります:observe()_および_observeEach(). *

    この場合、__Observable__sではなく__Promise__sを返すムービーサービスがあると想像できます。
    __getMovie()で、____ observe()_を使用します:
Handler moviePromiseHandler = ctx -> {
    MoviePromiseService promiseSvc = ctx.get(MoviePromiseService.class);
    Promise<Movie> moviePromise = promiseSvc.getMovie();
    RxRatpack.observe(moviePromise)
      .subscribe(movie -> ctx.render(Jackson.json(movie)));
};
そして、_getMovies()_のようにリストを取得するとき、_observeEach()_を使用します。
Handler moviesPromiseHandler = ctx -> {
    MoviePromiseService promiseSvc = ctx.get(MoviePromiseService.class);
    Promise<List<Movie>> moviePromises = promiseSvc.getMovies();
    RxRatpack.observeEach(moviePromises)
        .toList()
        .subscribe(movie -> ctx.render(Jackson.json(movie)));
};
次に、期待どおりにハンドラーを追加できます。
RatpackServer.start(def -> def.registryOf(regSpec -> regSpec
  .add(MoviePromiseService.class, new MoviePromiseServiceImpl()))
    .handlers(chain -> chain
      .get("movie", moviePromiseHandler)
      .get("movies", moviesPromiseHandler)));

6. 並列処理

  • RxRatpackは、_fork()_および_forkEach()_メソッドを使用して並列処理をサポートします。*

    そして、それはそれぞれについてすでに見たパターンに従います。
    _fork()_は単一の_Observable_を取り、その実行を異なる計算スレッドに並列化します*。 次に、データを元の実行に自動的にバインドします。
    一方、_forkEach()_は、_Observable_’sの値のストリームによって発行される各要素に対して同じことを行います。
    私たちが映画のタイトルを大文字にしたいと思い、それが高価な操作であることを少し想像してみましょう。
    簡単に言えば、_forkEach()_を使用して、それぞれの実行をスレッドプールにオフロードできます。
Observable<Movie> movieObs = movieSvc.getMovies();
Observable<String> upperCasedNames = movieObs.compose(RxRatpack::forkEach)
  .map(movie -> movie.getName().toUpperCase())
  .serialize();

7. 暗黙的なエラー処理

最後に、暗黙的なエラー処理はRxJava統合の重要な機能の1つです。
デフォルトでは、RxJavaオブザーバブルシーケンスは、例外を実行コンテキスト例外ハンドラーに転送します。 このため、エラーハンドラーを監視可能なシーケンスで定義する必要はありません。
したがって、RxJavaによって発生したこれらのエラーを処理するようにRatpackを構成できます。*
たとえば、HTTP応答に各エラーを出力したいとします。
_Observable_を介してスローする例外は、_ServerErrorHandler_によってキャッチおよび処理されることに注意してください。
RatpackServer.start(def -> def.registryOf(regSpec -> regSpec
  .add(ServerErrorHandler.class, (ctx, throwable) -> {
        ctx.render("Error caught by handler : " + throwable.getMessage());
    }))
  .handlers(chain -> chain
    .get("error", ctx -> {
        Observable.<String> error(new Exception("Error from observable")).subscribe(s -> {});
    })));
*ただし、サブスクライバレベルのエラー処理が優先されることに注意してください。

8. 結論

この記事では、Ratpackを使用してRxJavaを構成する方法について説明しました。
RxJavaでの_Observables_のRatpackでの_Promise_型への変換、およびその逆の変換について検討しました。 また、統合によってサポートされる並列処理と暗黙的なエラー処理機能についても調べました。
この記事で使用されているすべてのコードサンプルは、https://github.com/eugenp/tutorials/tree/master/ratpack [Githubで]にあります。