1. 序章

RxJava は、世の中で最も人気のあるリアクティブプログラミングライブラリの1つです。

また、 Ratpack は、Netty上に構築された無駄のない強力なWebアプリケーションを作成するためのJavaライブラリのコレクションです。

このチュートリアルでは、RatpackアプリケーションにRxJavaを組み込んで、優れたリアクティブWebアプリを作成する方法について説明します。

2. Mavenの依存関係

ここで、最初にratpack-coreおよびratpack-rxの依存関係が必要です。

<dependency>
    <groupId>io.ratpack</groupId>
    <artifactId>ratpack-core</artifactId>
    <version>1.6.0</version>
</dependency>
<dependency>
    <groupId>io.ratpack</groupId>
    <artifactId>ratpack-rx</artifactId>
    <version>1.6.0</version>
</dependency>

ちなみに、ratpack-rxrxjavaの依存関係をインポートすることに注意してください。

3. 初期設定

RxJavaは、プラグインシステムを使用して、サードパーティライブラリの統合をサポートします。 したがって、さまざまな実行戦略をRxJavaの実行モデルに組み込むことができます。 

Ratpackは、 RxRatpack を介してこの実行モデルにプラグインします。これは、起動時に初期化されます。

RxRatpack.initialise();

ここで、メソッドはJVMの実行ごとに1回だけ呼び出す必要があることに注意することが重要です。

その結果、 RxJavaのObservablesをRxRatpackのPromiseタイプに、またはその逆にマップできるようになります。

4.  Observable s to Promise s

RxJavaのObservableをRatpackPromiseに変換できます。

ただし、多少の不一致があります。 Promise は単一の値を出力しますが、Observableはそれらのストリームを出力できます。

RxRatpackは、次の2つの異なる方法を提供することでこれを処理します。 promiseSingle()約束()。

それで、私たちがという名前のサービスを持っていると仮定しましょう MovieService に単一の約束を発します getMovie()。 使用します promiseSingle() 一度だけ放出されることがわかっているので:

Handler movieHandler = (ctx) -> {
    MovieService movieSvc = ctx.get(MovieService.class);
    Observable<Movie> movieObs = movieSvc.getMovie();
    RxRatpack.promiseSingle(movieObs)
      .then(movie -> ctx.render(Jackson.json(movie)));
};

一方、 getMovies()が映画の結果のストリームを返すことができる場合は、 promise()を使用します。

Handler moviesHandler = (ctx) -> {
    MovieService movieSvc = ctx.get(MovieService.class);
    Observable<Movie> movieObs = movieSvc.getMovies();
    RxRatpack.promise(movieObs)
      .then(movie -> ctx.render(Jackson.json(movie)));
};

次に、これらのハンドラーを通常のようにRatpackサーバーに追加できます。

RatpackServer.start(def -> def.registryOf(rSpec -> rSpec.add(MovieService.class, new MovieServiceImpl()))
  .handlers(chain -> chain
    .get("movie", movieHandler)
    .get("movies", moviesHandler)));

5. PromisesからObservable s

逆に、RatpackのPromiseタイプをRxJavaObservableにマップして戻すことができます。

RxRatpackには、 observe()observeEach()。の2つのメソッドがあります。

この場合、Observableの代わりにPromiseを返す映画サービスがあると想像します。

getMovie()では、 observe()を使用します。

Handler moviePromiseHandler = ctx -> {
    MoviePromiseService promiseSvc = ctx.get(MoviePromiseService.class);
    Promise<Movie> moviePromise = promiseSvc.getMovie();
    RxRatpack.observe(moviePromise)
      .subscribe(movie -> ctx.render(Jackson.json(movie)));
};

そして、 getMovies()のようにリストを取得するときは、 observeEach()を使用します。

Handler moviesPromiseHandler = ctx -> {
    MoviePromiseService promiseSvc = ctx.get(MoviePromiseService.class);
    Promise<List<Movie>> moviePromises = promiseSvc.getMovies();
    RxRatpack.observeEach(moviePromises)
        .toList()
        .subscribe(movie -> ctx.render(Jackson.json(movie)));
};

次に、期待どおりにハンドラーを追加できます。

RatpackServer.start(def -> def.registryOf(regSpec -> regSpec
  .add(MoviePromiseService.class, new MoviePromiseServiceImpl()))
    .handlers(chain -> chain
      .get("movie", moviePromiseHandler)
      .get("movies", moviesPromiseHandler)));

6. 並列処理

RxRatpackは、 fork()および forkEach()メソッドを使用して並列処理をサポートします。

そして、それは私たちがそれぞれですでに見たパターンに従います。

fork()は単一の Observable を取り、はその実行を別の計算スレッドに並列化します。 次に、データを元の実行に自動的にバインドします。

一方、 forkEach()は、Observableの値のストリームによって放出される各要素に対して同じことを行います。

私たちが映画のタイトルを活用したいと思っていて、それが費用のかかる操作であると少し想像してみましょう。

簡単に言うと、 forkEach()を使用して、それぞれの実行をスレッドプールにオフロードできます。

Observable<Movie> movieObs = movieSvc.getMovies();
Observable<String> upperCasedNames = movieObs.compose(RxRatpack::forkEach)
  .map(movie -> movie.getName().toUpperCase())
  .serialize();

7. 暗黙のエラー処理

最後に、暗黙的なエラー処理は、RxJava統合の重要な機能の1つです。

デフォルトでは、RxJavaの監視可能なシーケンスは、例外を実行コンテキスト例外ハンドラーに転送します。 このため、エラーハンドラーを監視可能なシーケンスで定義する必要はありません。

したがって、RxJavaによって発生したこれらのエラーを処理するようにRatpackを構成できます。

たとえば、各エラーをHTTP応答に出力したいとします。

Observable を介してスローする例外は、ServerErrorHandlerによってキャッチおよび処理されることに注意してください。

RatpackServer.start(def -> def.registryOf(regSpec -> regSpec
  .add(ServerErrorHandler.class, (ctx, throwable) -> {
        ctx.render("Error caught by handler : " + throwable.getMessage());
    }))
  .handlers(chain -> chain
    .get("error", ctx -> {
        Observable.<String> error(new Exception("Error from observable")).subscribe(s -> {});
    })));

ただし、サブスクライバーレベルのエラー処理が優先されることに注意してください。 Observable が独自のエラー処理を実行したい場合は、実行できますが、実行しないため、例外が浸透します。 Ratpackに。

8. 結論

この記事では、Ratpackを使用してRxJavaを構成する方法について説明しました。

RxJavaのObservablesからRatpackのPromiseタイプへの変換、およびその逆の変換について検討しました。 また、統合によってサポートされる並列処理と暗黙のエラー処理機能についても調べました。

この記事で使用されているすべてのコードサンプルは、Githubにあります。