RxJavaの紹介
1概要
この記事では、JavaでReactive Extensions(Rx)を使用して一連のデータを作成して使用することに焦点を当てます。
一見すると、APIはJava 8 Streamsに似ているように見えますが、実際には、はるかに柔軟で流暢で、強力なプログラミングパラダイムになっています。
RxJavaについてもっと知りたいのなら、リンクをチェックしてください:/rxjava-backpressure[この記事]。
2セットアップ
Maven
プロジェクトでRxJavaを使用するには、
pomに次の依存関係を追加する必要があります。 .xml:
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>${rx.java.version}</version>
</dependency>
または、Gradleプロジェクトの場合:
compile 'io.reactivex.rxjava:rxjava:x.y.z'
3機能的反応概念
一方で、「関数型プログラミング」とは、純粋な関数を構成し、共有状態、可変データ、および副作用を回避してソフトウェアを構築するプロセスです。
一方、** リアクティブプログラミングは、データストリームと変更の伝播に関する非同期プログラミングパラダイムです。
まとめると、「機能的リアクティブプログラミング」は、イベントドリブンプログラミングへの洗練されたアプローチを表すことができる機能的手法とリアクティブ手法の組み合わせを形成します。
このテクノロジーは、そのコア原則のさまざまな実装をまとめたもので、新しいタイプのアプリケーションを記述するための共通語彙を定義する文書を作成した著者もいます。
3.1. 反応マニフェスト
Reactive Manifesto
は、ソフトウェア開発業界におけるアプリケーションのための高水準を提示するオンライン文書です。簡単に言うと、反応システムは次のとおりです。
-
レスポンシブ – システムはタイムリーに対応するべきです
-
メッセージ駆動 – システム間で非同期メッセージパッシングを使用する必要があります。
疎結合を確実にするための部品
** 弾力性 – システムは高負荷の下でも反応し続けるべきです
-
回復力 – 一部のコンポーネントで障害が発生してもシステムは応答し続けるべき
4観測量
__Rxを扱うときに理解する必要がある2つの重要なタイプがあります。
-
Observable
は、データからデータを取得できるオブジェクトを表します。
ソースとその状態が他のオブジェクトと同じように興味深い場合があります。
興味を登録することができます
**
observer
は、状態が変化したときに通知を受けたいオブジェクトです。
他のオブジェクトの変更
observer
は
Observable
シーケンスを購読します。 ** シーケンスは一度に1つずつ
observer
にアイテムを送ります。
observer
は次のものを処理する前にそれぞれを処理します。多くのイベントが非同期的に入ってくる場合は、それらをキューに保管するかドロップする必要があります。
Rx
では、
observer
は項目の順序が乱れて呼び出されたり、前の項目に対してコールバックが返される前に呼び出されたりすることはありません。
4.1. __観測可能
の種類
2種類あります:
-
ノンブロッキング –
非同期実行がサポートされています。
イベントストリームの任意の時点で購読を中止します。この記事では、主にこの種のタイプに焦点を当てます。
-
ブロック –
すべての
onNext__オブザーバ呼び出しは同期的になります。
イベントストリームの途中で登録解除することはできません。メソッド
toBlockingを使用して、常に
Observable
を
Blocking Observable__に変換できます。
BlockingObservable<String> blockingObservable = observable.toBlocking();
4.2. 演算子
operator
は最初の引数として1つの
Observable
(ソース)を取り、別の
Observable
(デスティネーション)を返す関数です。そして、ソースオブザーバブルが発行するすべての項目について、その項目に関数を適用してから発行宛先
Observable
の結果
特定の基準に基づいてイベントをフィルタ処理する複雑なデータフローを作成するために、
演算子
を連鎖させることができます。複数の演算子を同じ
観測可能
に適用することができます。
Observable
が
operator
や
observer
がそれらを消費するよりも速くアイテムを放出している状況に入るのは難しくありません。
バックプレッシャーリンクについてもっと読むことができます:/rxjava-backpressure[ここ]。
4.3. 観測量を作成する
基本演算子
just
は、完了する前に単一の汎用インスタンスを生成する
Observable
を生成します。文字列__“ Hello”です。
Observable<String> observable = Observable.just("Hello");
observable.subscribe(s -> result = s);
assertTrue(result.equals("Hello"));
4.4.
OnNext、OnError、
、および
OnCompleted
observer
インターフェースには、知りたい3つのメソッドがあります。
-
新しいイベントが発生するたびに、
OnNext
が
observer
で呼び出されます.
添付の
Observable
に公開されています。これが私たちが目指す方法です。
各イベントに対して何らかのアクションを実行する
。
OnCompleted
は、関連するイベントのシーケンスが
Observable
は完了しています。これ以上期待できないことを示しています
onNext
がオブザーバを呼び出します
。処理中に未処理の例外がスローされると、
OnError
が呼び出されます。
RxJava
フレームワークコードまたは当社のイベント処理コード
Observables
subscribe
メソッドの戻り値は
subscribe
インタフェースです。
String[]letters = {"a", "b", "c", "d", "e", "f", "g"};
Observable<String> observable = Observable.from(letters);
observable.subscribe(
i -> result += i, //OnNext
Throwable::printStackTrace,//OnError
() -> result += "__Completed"//OnCompleted
);
assertTrue(result.equals("abcdefg__Completed"));
5観測可能な変換と条件付き演算子
5.1.
地図
m
__ap operator
は、各アイテムに関数を適用することによって、
Observable__によって発行されたアイテムを変換します。
アルファベットの文字を含む宣言された文字列の配列があり、それらを大文字モードで出力したいとしましょう。
Observable.from(letters)
.map(String::toUpperCase)
.subscribe(letter -> result += letter);
assertTrue(result.equals("ABCDEFG"));
Observablesを入れ子にしたときはいつでも、flatMap
を使用して
Observables
を平坦化することができます。
map
と
flatMap
の違いについての詳細は
ここ
を見てください。
文字列のリストから
Observable <String>
を返すメソッドがあるとします。今度は、新しい
Observable
から各文字列について、__Subscriberが見たものに基づいてタイトルのリストを出力します。
Observable<String> getTitle() {
return Observable.from(titleList);
}
Observable.just("book1", "book2")
.flatMap(s -> getTitle())
.subscribe(l -> result += l);
assertTrue(result.equals("titletitle"));
5.2.
スキャン
__scan演算子は、
Observable__によって発行された各項目に順に関数を適用し、連続した各値を発行します。
イベントからイベントへ状態を引き継ぐことができます。
String[]letters = {"a", "b", "c"};
Observable.from(letters)
.scan(new StringBuilder(), StringBuilder::append)
.subscribe(total -> result += total.toString());
assertTrue(result.equals("aababc"));
5.3.
GroupBy
Group by演算子を使用すると、入力Observable内のイベントを出力カテゴリに分類できます。
0から10までの整数の配列を作成した後、
group by
を適用して、それらをカテゴリ
even
と
odd
に分割します。
Observable.from(numbers)
.groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD")
.subscribe(group ->
group.subscribe((number) -> {
if (group.getKey().toString().equals("EVEN")) {
EVEN[0]+= number;
} else {
ODD[0]+= number;
}
})
);
assertTrue(EVEN[0].equals("0246810"));
assertTrue(ODD[0].equals("13579"));
5.4.
フィルタ
演算子
filter
は
observable
からの
predicate
テストに合格したアイテムだけを出力します。
それでは、奇数の整数配列をフィルタリングしましょう。
Observable.from(numbers)
.filter(i -> (i % 2 == 1))
.subscribe(i -> result += i);
assertTrue(result.equals("13579"));
5.5. 条件付き演算子
DefaultIfEmpty
は、ソース
Observable
からアイテムを生成します。ソース
Observable
が空の場合はデフォルトのアイテムを生成します。
Observable.empty()
.defaultIfEmpty("Observable is empty")
.subscribe(s -> result += s);
assertTrue(result.equals("Observable is empty"));
次のコードは、配列
letters
が空ではなく、これが最初の位置に含まれるものであるため、アルファベットの最初の文字「__a」を出力します。
Observable.from(letters)
.defaultIfEmpty("Observable is empty")
.first()
.subscribe(s -> result += s);
assertTrue(result.equals("a"));
TakeWhile
演算子は、指定された条件が偽になった後に
Observable
によって発行されたアイテムを破棄します。
Observable.from(numbers)
.takeWhile(i -> i < 5)
.subscribe(s -> sum[0]+= s);
assertTrue(sum[0]== 10);
もちろん、__Contain、SkipWhile、SkipUntil、TakeUntilなど、私たちのニーズを満たすことができる他の演算子が他にもあります。
6. 接続可能な観測量
-
ConnectableObservable
は通常の
Observable
と似ていますが、購読時にアイテムの発行を開始しない点で、ただし
connect
演算子が適用されている場合のみです。
このようにして、
Observable
がアイテムの発行を開始する前に、すべての対象オブザーバが
Observable
にサブスクライブするのを待つことができます。
String[]result = {""};
ConnectableObservable<Long> connectable
= Observable.interval(200, TimeUnit.MILLISECONDS).publish();
connectable.subscribe(i -> result[0]+= i);
assertFalse(result[0].equals("01"));
connectable.connect();
Thread.sleep(500);
assertTrue(result[0].equals("01"));
7.シングル
Single
は
Observable
のようなもので、一連の値を発行するのではなく、1つの値またはエラー通知を発行します。
このデータソースでは、購読には2つの方法しか使用できません。
-
OnSuccess
は、指定したメソッドも呼び出す
Single
を返します。 -
OnError
は、ただちに通知する
Single
も返します。
エラーの購読者
String[]result = {""};
Single<String> single = Observable.just("Hello")
.toSingle()
.doOnSuccess(i -> result[0]+= i)
.doOnError(error -> {
throw new RuntimeException(error.getMessage());
});
single.subscribe();
assertTrue(result[0].equals("Hello"));
8科目
Subject
は同時に2つの要素、
subscriber
と
observable
です。サブスクライバーとして、サブジェクトを使用して複数の観察可能物から来るイベントを公開することができます。
またそれは観測可能なので、複数の加入者からのイベントは、それを観測している人にはそのイベントとして再送信できます。
次の例では、購読後に発生するイベントをオブザーバがどのように確認できるかを見ていきます。
Integer subscriber1 = 0;
Integer subscriber2 = 0;
Observer<Integer> getFirstObserver() {
return new Observer<Integer>() {
@Override
public void onNext(Integer value) {
subscriber1 += value;
}
@Override
public void onError(Throwable e) {
System.out.println("error");
}
@Override
public void onCompleted() {
System.out.println("Subscriber1 completed");
}
};
}
Observer<Integer> getSecondObserver() {
return new Observer<Integer>() {
@Override
public void onNext(Integer value) {
subscriber2 += value;
}
@Override
public void onError(Throwable e) {
System.out.println("error");
}
@Override
public void onCompleted() {
System.out.println("Subscriber2 completed");
}
};
}
PublishSubject<Integer> subject = PublishSubject.create();
subject.subscribe(getFirstObserver());
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
subject.subscribe(getSecondObserver());
subject.onNext(4);
subject.onCompleted();
assertTrue(subscriber1 + subscriber2 == 14)
9資源管理
Using
操作を使用すると、JDBCデータベース接続、ネットワーク接続、開いているファイルなどのリソースを観察可能ファイルに関連付けることができます。
ここでは、この目標を達成するために必要な手順と、実装例を解説しています。
String[]result = {""};
Observable<Character> values = Observable.using(
() -> "MyResource",
r -> {
return Observable.create(o -> {
for (Character c : r.toCharArray()) {
o.onNext(c);
}
o.onCompleted();
});
},
r -> System.out.println("Disposed: " + r)
);
values.subscribe(
v -> result[0]+= v,
e -> result[0]+= e
);
assertTrue(result[0].equals("MyResource"));
10結論
この記事では、RxJavaライブラリーの使い方と、その最も重要な機能を探る方法について説明しました。
ここで使用されているすべてのコードサンプルを含むプロジェクトの完全なソースコードはhttps://github.com/eugenp/tutorials/tree/master/rxjava[Githubに追加]で見つけることができます。