1. 概要

RxJava は、非同期およびイベントベースのプログラムを作成するための人気のあるライブラリであり、 ReactiveExtensionsイニシアチブによって提唱された主要なアイデアからインスピレーションを得ています。

Vert.x は、 Eclipse の傘下にあるプロジェクトであり、リアクティブパラダイムを完全に活用するためにゼロから設計されたいくつかのコンポーネントを提供します。

これらを一緒に使用すると、リアクティブである必要があるJavaプログラムの有効な基盤となる可能性があります。

この記事では、都市名のリストを含むファイルをロードし、それぞれについて、日の出から日の入りまでの1日の長さを印刷します。

公開されているwww.metaweather.com REST API から公開されたデータを使用して、Vertで日光とRxJavaの長さを計算します。 x純粋に反応的な方法でそれを行います。

2. Mavenの依存関係

vertx-rx-java2をインポートすることから始めましょう。

<dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-rx-java2</artifactId>
    <version>3.5.0-Beta1</version>
</dependency>

執筆時点では、Vert.xと新しいRxJava2 の統合はベータリリースとしてのみ利用可能ですが、私たちが使用しているプログラムに対しては十分に安定しています。建物。

io.vertx:vertx-rx-java2io.reactivex.rxjava2:rxjava に依存しているため、RxJava関連のパッケージを明示的にインポートする必要はありません。

Vert.xRxJavaの統合の最新バージョンは、 MavenCentralにあります。

3. 設定

Vert.x、を使用するすべてのアプリケーションと同様に、すべてのVert.x機能へのメインエントリポイントであるvertxオブジェクトの作成を開始します。

Vertx vertx = io.vertx.reactivex.core.Vertx.vertx();

vertx-rx-java2 ライブラリは、io.vertx.core.Vertxio.vertx.reactivex.core.Vertxの2つのクラスを提供します。 前者はVert.xに一意に基づくアプリケーションの通常のエントリポイントですが、後者はRxJavaとの統合を取得するために使用するものです。

後で使用するオブジェクトの定義に進みます。

FileSystem fileSystem = vertx.fileSystem();
HttpClient httpClient = vertx.createHttpClient();

Vert.xFileSystemはリアクティブな方法でファイルシステムへのアクセスを提供し、Vert.xHttpClientはリアクティブな方法でファイルシステムへのアクセスを提供します。 HTTPについても同じです。

4. リアクティブチェーン

リアクティブコンテキストでは、意味のある計算を取得するために、いくつかのより単純なリアクティブ演算子を連結するのは簡単です。

私たちの例のためにそれをやってみましょう:

fileSystem
  .rxReadFile("cities.txt").toFlowable()
  .flatMap(buffer -> Flowable.fromArray(buffer.toString().split("\\r?\\n")))
  .flatMap(city -> searchByCityName(httpClient, city))
  .flatMap(HttpClientResponse::toFlowable)
  .map(extractingWoeid())
  .flatMap(cityId -> getDataByPlaceId(httpClient, cityId))
  .flatMap(toBufferFlowable())
  .map(Buffer::toJsonObject)
  .map(toCityAndDayLength())
  .subscribe(System.out::println, Throwable::printStackTrace);

次に、コードの各論理チャンクがどのように機能するかを調べてみましょう。

5. 都市名

最初のステップは、都市名のリストを含むファイルを読み取ることです。1行に1つの名前が含まれます。

fileSystem
 .rxReadFile("cities.txt").toFlowable()
 .flatMap(buffer -> Flowable.fromArray(buffer.toString().split("\\r?\\n")))

方法 rxReadFile() 反応的にファイルを読み取り、 RxJava独身 。 これで、探している統合が得られました。RxJavaのデータ構造におけるVert.xの非同期性です。

ファイルは1つしかないため、ファイルの全内容を含むBufferが1回放出されます。 その入力をRxJavaFlowableに変換し、ファイルの行をフラットマップして、各都市名のイベントを発行するFlowableを作成します。代わりは。

6. JSON City Descriptor

都市名を取得したら、次のステップは Metaweather REST API を使用して、その都市のIDコードを取得することです。 この識別子は、都市の日の出と日の入りの時刻を取得するために使用されます。 呼び出しのチェーンを続けましょう:

呼び出しのチェーンを続けましょう:

.flatMap(city -> searchByCityName(httpClient, city))
.flatMap(HttpClientResponse::toFlowable)

searchByCityName()メソッドは、最初のステップで作成した HttpClient を使用して、都市の識別子を提供するRESTサービスを呼び出します。 次に、2番目の flatMap()、を使用して、応答を含むBufferを取得します。

searchByCityName()の本体を記述して、このステップを完了しましょう。

Flowable<HttpClientResponse> searchByCityName(HttpClient httpClient, String cityName) {
    HttpClientRequest req = httpClient.get(
        new RequestOptions()
          .setHost("www.metaweather.com")
          .setPort(443)
          .setSsl(true)
          .setURI(format("/api/location/search/?query=%s", cityName)));
    return req
      .toFlowable()
      .doOnSubscribe(subscription -> req.end());
}

Vert.xHttpClientは、リアクティブHTTP応答を発行するRxJavaFlowableを返します。 これにより、Buffersに分割された応答の本体が出力されます。

適切なURLへの新しいリアクティブリクエストを作成しましたが、 Vert.xでは、リクエストを送信できることを通知するためにHttpClientRequest.end()メソッドを呼び出す必要があり、少なくとも1つのサブスクリプションが必要であることに注意してください。 end()が正常に呼び出される前。

これを実現するための解決策は、 RxJavadoOnSubscribe()を使用して、コンシューマーがサブスクライブするとすぐに end()を呼び出すことです。

7. 都市識別子

ここで、返されたJSONオブジェクトのwoeidプロパティの値を取得する必要があります。これは、カスタムメソッドを介して都市を一意に識別します。

.map(extractingWoeid())

extractWoeid()メソッドは、RESTサービス応答に含まれるJSONから都市識別子を抽出する関数を返します。

private static Function<Buffer, Long> extractingWoeid() {
    return cityBuffer -> cityBuffer
      .toJsonArray()
      .getJsonObject(0)
      .getLong("woeid");
}

Bufferが提供する便利なtoJson…()メソッドを使用して、必要なプロパティにすばやくアクセスできることに注意してください。

8. 都市の詳細

REST API から必要な詳細を取得するために、リアクティブチェーンを続けましょう。

.flatMap(cityId -> getDataByPlaceId(httpClient, cityId))
.flatMap(toBufferFlowable())

getDataByPlaceId()メソッドの詳細を見てみましょう。

static Flowable<HttpClientResponse> getDataByPlaceId(
  HttpClient httpClient, long placeId) {
 
    return autoPerformingReq(
      httpClient,
      format("/api/location/%s/", placeId));
}

ここでは、前の手順で行ったのと同じアプローチを使用しました。 getDataByPlaceId() を返します流動性 。 次に、 HttpClientResponse は、数バイトより長い場合、API応答をチャンクで出力します。

toBufferFlowable()メソッドを使用して、応答チャンクを1つに減らし、完全なJSONオブジェクトにアクセスできるようにします。

static Function<HttpClientResponse, Publisher<? extends Buffer>>
  toBufferFlowable() {
    return response -> response
      .toObservable()
      .reduce(
        Buffer.buffer(),
        Buffer::appendBuffer).toFlowable();
}

9. 日没と日の出の時間

JSON オブジェクトから関心のある情報を取得して、リアクティブチェーンに追加し続けましょう。

.map(toCityAndDayLength())

toCityAndDayLength()メソッドを書いてみましょう。

static Function<JsonObject, CityAndDayLength> toCityAndDayLength() {
    return json -> {
        ZonedDateTime sunRise = ZonedDateTime.parse(json.getString("sun_rise"));
        ZonedDateTime sunSet = ZonedDateTime.parse(json.getString("sun_set"));
        String cityName = json.getString("title");
        return new CityAndDayLength(
          cityName, sunSet.toEpochSecond() - sunRise.toEpochSecond());
    };
}

JSON に含まれる情報をマップする関数を返し、日の出から日の入りまでの時間を時間単位で計算するPOJOを作成します。

10. サブスクリプション

反応連鎖が完了します。 これで、 CityAndDayLength の発行されたインスタンス、またはエラーの場合はスタックトレースを出力するハンドラーを使用して、結果のFlowableをサブスクライブできます。

.subscribe(
  System.out::println, 
  Throwable::printStackTrace)

アプリケーションを実行すると、リストに含まれている都市とアプリケーションが実行された日付に応じて、次のような結果が表示されます。

In Chicago there are 13.3 hours of light.
In Milan there are 13.5 hours of light.
In Cairo there are 12.9 hours of light.
In Moscow there are 14.1 hours of light.
In Santiago there are 11.3 hours of light.
In Auckland there are 11.2 hours of light.

HTTP API へのすべてのリクエストは非同期で実行されるため、都市はファイルで指定されている順序とは異なる順序で表示される可能性があります。

11. 結論

この記事では、Vert.xリアクティブモジュールをRxJavaによって提供される演算子および論理構造と簡単に組み合わせることができることを確認しました。

私たちが構築したリアクティブチェーンは、長いものの、複雑なシナリオをかなり簡単に作成できることを示しています。

いつものように、完全なソースコードはGitHubから入手できます。