1. 概要

このチュートリアルでは、RxJava2演算子を使用して従来の同期および非同期APIをObservablesに変換する方法を学習します。

これらの演算子について詳しく説明するのに役立ついくつかの簡単な関数を作成します。

2. Mavenの依存関係

まず、Mavenの依存関係としてRxJava2RxJava2Extensionsを追加する必要があります。

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.2</version>
</dependency>
<dependency>
    <groupId>com.github.akarnokd</groupId>
    <artifactId>rxjava2-extensions</artifactId>
    <version>0.20.4</version>
</dependency>

3. オペレーター

RxJava2は、リアクティブプログラミングのさまざまなユースケースに対して多数の演算子を定義します。

ただし、同期メソッドまたは非同期メソッドをその性質に基づいてObservablesに変換するために一般的に使用されるいくつかの演算子についてのみ説明します。 これらの演算子は関数を引数として取り、その関数から返された値を出力します

RxJava2は、通常の演算子に加えて、拡張機能用にさらにいくつかの演算子を定義しています。

これらの演算子を使用して同期メソッドと非同期メソッドを変換する方法を見てみましょう。

4. 同期メソッド変換

4.1. fromCallable()を使用する

この演算子はObservableを返します。これは、サブスクライバーがサブスクライブすると、引数として渡された関数を呼び出し、その関数から返された値を出力します。 整数を返す関数を作成して変換してみましょう。

AtomicInteger counter = new AtomicInteger();
Callable<Integer> callable = () -> counter.incrementAndGet();

それでは、それを Observable に変換し、サブスクライブしてテストしてみましょう。

Observable<Integer> source = Observable.fromCallable(callable);

for (int i = 1; i < 5; i++) {
    source.test()
      .awaitDone(5, TimeUnit.SECONDS)
      .assertResult(i);
    assertEquals(i, counter.get());
}

fromCallable()オペレーターは、ラップされたObservableがサブスクライブされるたびに、指定された関数を遅延実行します。この動作をテストするために、ループを使用して複数のサブスクライバーを作成しました。

リアクティブストリームはデフォルトで非同期であるため、サブスクライバーはすぐに戻ります。 ほとんどの実際のシナリオでは、呼び出し可能な関数は、その実行を完了するために何らかの遅延があります。 そのため、呼び出し可能な関数の結果をテストする前に、最大待機時間を5秒追加しました。

Observabletest()メソッドを使用したことにも注意してください。 この方法は、Observablesをテストするときに便利です。 それは作成します TestObserver と私たちの購読観察可能。

4.2. start()を使用する

start()演算子は、RxJava2Extensionモジュールの一部です。 指定された関数を非同期的に呼び出し、結果を出力するObservableを返します。

Observable<Integer> source = AsyncObservable.start(callable);

for (int i = 1; i < 5; i++) {
    source.test()
      .awaitDone(5, TimeUnit.SECONDS)
      .assertResult(1);
    assertEquals(1, counter.get());
}

関数は、サブスクライバーが結果の Observable にサブスクライブするたびにではなく、すぐに呼び出されます。このオブザーバブルへの複数のサブスクリプションは、同じ戻り値を監視します。

5. 非同期メソッド変換

5.1. fromFuture()を使用する

ご存知のように、Javaで非同期メソッドを作成する最も一般的な方法は、Future実装を使用することです。 fromFuture メソッドは、引数として Future を取り、 Future.get()メソッドから取得した値を出力します。

まず、以前に作成した関数を非同期にします。

ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(callable);

次に、それを変換してテストを行いましょう。

Observable<Integer> source = Observable.fromFuture(future);

for (int i = 1; i < 5; i++) {
    source.test()
      .awaitDone(5, TimeUnit.SECONDS)
      .assertResult(1);
    assertEquals(1, counter.get());
}
executor.shutdown();

また、各サブスクリプションが同じ戻り値を監視していることにもう一度注意してください。

現在、 Observabledispose()メソッドは、メモリリークの防止に関して非常に役立ちます。 ただし、この場合、 Future.get()のブロックの性質により、futureはキャンセルされません。

したがって、 observable source のdoOnDispose()関数とfuturecancelメソッドを組み合わせることで、将来を確実にキャンセルできます。 ]:

source.doOnDispose(() -> future.cancel(true));

5.2. startFuture()を使用する

名前が示すように、このオペレーターは指定された Future をすぐに開始し、サブスクライバーがサブスクライブすると戻り値を出力します。 次の使用のために結果をキャッシュするfromFuture演算子とは異なり、この演算子は、サブスクライブされるたびに非同期メソッドを実行します

ExecutorService executor = Executors.newSingleThreadExecutor();
Observable<Integer> source = AsyncObservable.startFuture(() -> executor.submit(callable));

for (int i = 1; i < 5; i++) {
    source.test()
      .awaitDone(5, TimeUnit.SECONDS)
      .assertResult(i);
    assertEquals(i, counter.get());
}
executor.shutdown();

5.3. deferFuture()の使用

この演算子は、Futureメソッドから返された複数のObservableを集約し、各Observableから取得した戻り値のストリームを返します。これにより、新しいサブスクライバーがサブスクライブするたびに、渡された非同期ファクトリ関数が開始されます。

それでは、最初に非同期ファクトリ関数を作成しましょう。

List<Integer> list = Arrays.asList(new Integer[] { counter.incrementAndGet(), 
  counter.incrementAndGet(), counter.incrementAndGet() });
ExecutorService exec = Executors.newSingleThreadExecutor();
Callable<Observable<Integer>> callable = () -> Observable.fromIterable(list);

そして、簡単なテストを行うことができます。

Observable<Integer> source = AsyncObservable.deferFuture(() -> exec.submit(callable));
for (int i = 1; i < 4; i++) {
    source.test()
      .awaitDone(5, TimeUnit.SECONDS)
      .assertResult(1,2,3);
}
exec.shutdown();

6. 結論

このチュートリアルでは、同期メソッドと非同期メソッドをRxJava2オブザーバブルに変換する方法を学習しました。

もちろん、ここで示した例は基本的な実装です。 ただし、RxJava2は、ビデオストリーミングなどのより複雑なアプリケーションや、大量のデータを部分的に送信する必要があるアプリケーションに使用できます。

いつものように、ここで説明したすべての短い例は、Githubプロジェクトにあります。