1. 概要

このチュートリアルでは、RxKotlinライブラリを使用した慣用的なKotlinでの Reactive Extensions(Rx)の使用を確認します。

RxKotlinは、それ自体がReactiveExtensionsの実装ではありません。 代わりに、それは主に拡張メソッドのコレクションです。 つまり、RxKotlinは、Kotlinを念頭に置いて設計されたAPIを使用してRxJavaライブラリを拡張します。

したがって、この記事の Introduction to RxJava の概念と、専用の記事で紹介したFlowablesの概念を使用します。

2. RxKotlinのセットアップ

MavenプロジェクトでRxKotlinを使用するには、rxkotlin依存関係pom.xml:に追加する必要があります。

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxkotlin</artifactId>
    <version>2.3.0</version>
</dependency>

または、Gradleプロジェクトの場合は、 build.gradle:

implementation 'io.reactivex.rxjava2:rxkotlin:2.3.0'

ここでは、RxJava2を対象とするRxKotlin2.xを使用しています。RxJava 1を使用するプロジェクトでは、RxKotlin1.xを使用する必要があります。 同じ概念が両方のバージョンに適用されます。

RxKotlinはRxJavaに依存していますが、依存関係を最新リリースに頻繁に更新しないことに注意してください。 したがって、 RxJavaの記事で詳しく説明されているように、依存する特定のRxJavaバージョンを明示的に含めることをお勧めします。

3. RxKotlinでObservableを作成する

RxKotlinには、コレクションからObservableおよびFlowableオブジェクトを作成するための多数の拡張メソッドが含まれています。

特に、すべてのタイプの配列にはtoObservable()メソッドとtoFlowable()メソッドがあります:

val observable = listOf(1, 1, 2, 3).toObservable()
observable.test().assertValues(1, 1, 2, 3)
val flowable = listOf(1, 1, 2, 3).toFlowable()
flowable.buffer(2).test().assertValues(listOf(1, 1), listOf(2, 3))

3.1. 完了可能

RxKotlinは、Completeableインスタンスを作成するためのいくつかのメソッドも提供します。 特に、アクション、Callables、Futures、およびゼロアリティ関数を、拡張メソッドtoCompletable:を使用してCompletableに変換できます。

var value = 0
val completable = { value = 3 }.toCompletable()
assertFalse(completable.test().isCancelled())
assertEquals(3, value)

4. ObservableおよびFlowableからMapおよびMultimap

ペアインスタンスを生成するObservableまたはFlowableがある場合、それらをを生成するSingleobservableに変換できます。マップ:

val list = listOf(Pair("a", 1), Pair("b", 2), Pair("c", 3), Pair("a", 4))
val observable = list.toObservable()
val map = observable.toMap()
assertEquals(mapOf(Pair("a", 4), Pair("b", 2), Pair("c", 3)), map.blockingGet())

前の例でわかるように、 toMap は、同じキーを持っている場合、以前に発行された値を後の値で上書きします。

キーに関連付けられているすべての値をコレクションに蓄積する場合は、代わりにtoMultimapを使用します。

val list = listOf(Pair("a", 1), Pair("b", 2), Pair("c", 3), Pair("a", 4))
val observable = list.toObservable()
val map = observable.toMultimap()
assertEquals(
  mapOf(Pair("a", listOf(1, 4)), Pair("b", listOf(2)), Pair("c", listOf(3))),
  map.blockingGet())

5. ObservableFlowableの組み合わせ

Rxのセールスポイントの1つは、ObservableFlowableをさまざまな方法で組み合わせることができることです。 実際、RxJavaは、すぐに使用できる多数の演算子を提供します。

それに加えて、RxKotlinには、Observableなどを組み合わせるための拡張メソッドがいくつか含まれています。

5.1. 観測可能な排出量の組み合わせ

他のObservableを放出するObservableがある場合、RxKotlinの拡張メソッドの1つを使用して、放出された値を組み合わせることができます。

特に、 mergeAllは、オブザーバブルをflatMap:と組み合わせます。

val subject = PublishSubject.create<Observable<String>>()
val observable = subject.mergeAll()

これは次と同じになります:

val observable = subject.flatMap { it }

結果のObservableは、ソースObservableのすべての値を不特定の順序で出力します。

同様に、concatAllはconcatMap (値はソースと同じ順序で発行されます)を使用し、switchLatestはswitchMapを使用します(値は最後に発行された Observableから発行されます) )。

これまで見てきたように、上記のすべてのメソッドはFlowableソースにも提供されており、同じセマンティクスを備えています。

5.2. Completable s Maybe s、および Singlesの組み合わせ

Completeable Maybe 、またはSingleのインスタンスを発行するObservableがある場合、それらを適切なと組み合わせることができます。たとえば、 mergeAllMaybesのようなmergeAllXsメソッド:

val subject = PublishSubject.create<Maybe<Int>>()
val observable = subject.mergeAllMaybes()
subject.onNext(Maybe.just(1))
subject.onNext(Maybe.just(2))
subject.onNext(Maybe.empty())
subject.onNext(Maybe.error(Exception("error")))
subject.onNext(Maybe.just(3))
observable.test().assertValues(1, 2).assertError(Exception::class.java)

5.3. IterableObservableの組み合わせ

代わりに、ObservableまたはFlowableインスタンスのコレクションの場合、RxKotlinにはmergeおよびmergeDelayErrorという2つの演算子があります。 これらは両方とも、すべてのObservablesまたはFlowablesを1つに結合して、すべての値を順番に出力する効果があります。

val observables = mutableListOf(Observable.just("first", "second"))
val observable = observables.merge()
observables.add(Observable.just("third", "fourth"))
observable.test().assertValues("first", "second", "third", "fourth")

RxJavaの同じ名前の演算子から直接派生した2つの演算子の違いは、エラーの処理です。

merge メソッドは、ソースからエラーが発生するとすぐにエラーを発生します。

// ...
observables.add(Observable.error(Exception("e")))
observables.add(Observable.just("fifth"))
// ...
observable.test().assertValues("first", "second", "third", "fourth")

mergeDelayErrorは、ストリームの最後にそれらを発行します:

// ...
observables.add(Observable.error(Exception("e")))
observables.add(Observable.just("fifth"))
// ...
observable.test().assertValues("first", "second", "third", "fourth", "fifth")

6. さまざまなタイプの値の処理

次に、さまざまなタイプの値を処理するためのRxKotlinの拡張メソッドを見てみましょう。

これらはRxJavaメソッドの変形であり、Kotlinの洗練されたジェネリックを利用します。 特に、次のことができます。

  • 放出された値をあるタイプから別のタイプにキャストする、または
  • 特定のタイプではない値を除外する

したがって、たとえば、 Observable of NumbersをIntの1つにキャストできます。

val observable = Observable.just<Number>(1, 1, 2, 3)
observable.cast<Int>().test().assertValues(1, 1, 2, 3)

ここではキャストは不要です。 ただし、異なるオブザーバブルを組み合わせる場合は、それが必要になることがあります。

代わりにofType、を使用すると、期待するタイプではない値を除外できます。

val observable = Observable.just(1, "and", 2, "and")
observable.ofType<Int>().test().assertValues(1, 2)

いつものように、キャスト ofType は、ObservableFlowableの両方に適用できます。

さらに、多分はこれらのメソッドもサポートしています。 代わりに、シングルクラスはキャストのみをサポートします。

7. その他のヘルパーメソッド

最後に、RxKotlinにはいくつかのヘルパーメソッドが含まれています。 簡単に見てみましょう。

subscribeの代わりにsubscribeByを使用できます。これにより、名前付きパラメーターが許可されます。

Observable.just(1).subscribeBy(onNext = { println(it) })

同様に、サブスクリプションをブロックするには、blockingSubscribeBy。を使用できます。

さらに、RxKotlinには、RxJavaのメソッドを模倣するが、Kotlinの型推論の制限を回避するメソッドがいくつか含まれています。

たとえば、 Observable#zipを使用する場合、zipperを指定しても見栄えがよくありません。

Observable.zip(Observable.just(1), Observable.just(2), BiFunction<Int, Int, Int> { a, b -> a + b })

したがって、 RxKotlinは、より慣用的な使用法のためにObservables#zipを追加します:

Observables.zip(Observable.just(1), Observable.just(2)) { a, b -> a + b }

の最後の「s」に注意してくださいオブザーバブル。 同様に、 フローアブル、シングル、多分。

8. 結論

この記事では、RxJavaを拡張してAPIを慣用的なKotlinのように見せるためのRxKotlinライブラリを徹底的に確認しました。

詳細については、RxKotlinGitHubページを参照してください。 その他の例については、RxKotlinテストをお勧めします。

これらすべての例とコードスニペットの実装は、 GitHubプロジェクトにMavenおよびGradleプロジェクトとして含まれているため、そのままインポートして実行するのは簡単です。