1概要

この記事では、JavaでReactive Extensions(Rx)を使用して一連のデータを作成して使用することに焦点を当てます。

一見すると、APIはJava 8 Streamsに似ているように見えますが、実際には、はるかに柔軟で流暢で、強力なプログラミングパラダイムになっています。

RxJavaについてもっと知りたいのなら、リンクをチェックしてください:/rxjava-backpressure[この記事]。


2セットアップ


Maven

プロジェクトでRxJavaを使用するには、

pomに次の依存関係を追加する必要があります。 .xml:

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxjava</artifactId>
    <version>${rx.java.version}</version>
</dependency>

または、Gradleプロジェクトの場合:

compile 'io.reactivex.rxjava:rxjava:x.y.z'


3機能的反応概念

一方で、「関数型プログラミング」とは、純粋な関数を構成し、共有状態、可変データ、および副作用を回避してソフトウェアを構築するプロセスです。

一方、** リアクティブプログラミングは、データストリームと変更の伝播に関する非同期プログラミングパラダイムです。

まとめると、「機能的リアクティブプログラミング」は、イベントドリブンプログラミングへの洗練されたアプローチを表すことができる機能的手法とリアクティブ手法の組み合わせを形成します。

このテクノロジーは、そのコア原則のさまざまな実装をまとめたもので、新しいタイプのアプリケーションを記述するための共通語彙を定義する文書を作成した著者もいます。


3.1. 反応マニフェスト


Reactive Manifesto

は、ソフトウェア開発業界におけるアプリケーションのための高水準を提示するオンライン文書です。簡単に言うと、反応システムは次のとおりです。

  • レスポンシブ – システムはタイムリーに対応するべきです

  • メッセージ駆動 – システム間で非同期メッセージパッシングを使用する必要があります。

疎結合を確実にするための部品
** 弾力性 – システムは高負荷の下でも反応し続けるべきです

  • 回復力 – 一部のコンポーネントで障害が発生してもシステムは応答し続けるべき


4観測量

__Rxを扱うときに理解する必要がある2つの重要なタイプがあります。


  • Observable

    は、データからデータを取得できるオブジェクトを表します。

ソースとその状態が他のオブジェクトと同じように興味深い場合があります。
興味を登録することができます
**

observer

は、状態が変化したときに通知を受けたいオブジェクトです。

他のオブジェクトの変更


observer



Observable

シーケンスを購読します。 ** シーケンスは一度に1つずつ

observer

にアイテムを送ります。


observer

は次のものを処理する前にそれぞれを処理します。多くのイベントが非同期的に入ってくる場合は、それらをキューに保管するかドロップする必要があります。


Rx

では、

observer

は項目の順序が乱れて呼び出されたり、前の項目に対してコールバックが返される前に呼び出されたりすることはありません。


4.1. __観測可能

の種類

2種類あります:


  • ノンブロッキング –

    非同期実行がサポートされています。

イベントストリームの任意の時点で購読を中止します。この記事では、主にこの種のタイプに焦点を当てます。


  • ブロック –


    すべての

    onNext__オブザーバ呼び出しは同期的になります。

イベントストリームの途中で登録解除することはできません。メソッド

toBlockingを使用して、常に

Observable



Blocking Observable__に変換できます。

BlockingObservable<String> blockingObservable = observable.toBlocking();


4.2. 演算子


operator

は最初の引数として1つの

Observable

(ソース)を取り、別の

Observable

(デスティネーション)を返す関数です。そして、ソースオブザーバブルが発行するすべての項目について、その項目に関数を適用してから発行宛先

Observable

の結果

特定の基準に基づいてイベントをフィルタ処理する複雑なデータフローを作成するために、

演算子

を連鎖させることができます。複数の演算子を同じ

観測可能

に適用することができます。


Observable



operator



observer

がそれらを消費するよりも速くアイテムを放出している状況に入るのは難しくありません。

バックプレッシャーリンクについてもっと読むことができます:/rxjava-backpressure[ここ]。


4.3. 観測量を作成する

基本演算子

just

は、完了する前に単一の汎用インスタンスを生成する

Observable

を生成します。文字列__“ Hello”です。

Observable<String> observable = Observable.just("Hello");
observable.subscribe(s -> result = s);

assertTrue(result.equals("Hello"));


4.4.

OnNext、OnError、

、および

OnCompleted



observer

インターフェースには、知りたい3つのメソッドがあります。

  1. 新しいイベントが発生するたびに、

    OnNext



    observer

    で呼び出されます.

添付の

Observable

に公開されています。これが私たちが目指す方法です。
各イベントに対して何らかのアクションを実行する


OnCompleted

は、関連するイベントのシーケンスが


Observable

は完了しています。これ以上期待できないことを示しています

onNext

がオブザーバを呼び出します
。処理中に未処理の例外がスローされると、

OnError

が呼び出されます。


RxJava

フレームワークコードまたは当社のイベント処理コード


Observables


subscribe

メソッドの戻り値は

subscribe

インタフェースです。

String[]letters = {"a", "b", "c", "d", "e", "f", "g"};
Observable<String> observable = Observable.from(letters);
observable.subscribe(
  i -> result += i, //OnNext
  Throwable::printStackTrace,//OnError
  () -> result += "__Completed"//OnCompleted
);
assertTrue(result.equals("abcdefg__Completed"));


5観測可能な変換と条件付き演算子


5.1.

地図


m

__ap operator


は、各アイテムに関数を適用することによって、

Observable__によって発行されたアイテムを変換します。

アルファベットの文字を含む宣言された文字列の配列があり、それらを大文字モードで出力したいとしましょう。

Observable.from(letters)
  .map(String::toUpperCase)
  .subscribe(letter -> result += letter);
assertTrue(result.equals("ABCDEFG"));


Observablesを入れ子にしたときはいつでも、flatMap

を使用して

Observables

を平坦化することができます。


map



flatMap

の違いについての詳細は

ここ

を見てください。

文字列のリストから

Observable <String>

を返すメソッドがあるとします。今度は、新しい

Observable

から各文字列について、__Subscriberが見たものに基づいてタイトルのリストを出力します。

Observable<String> getTitle() {
    return Observable.from(titleList);
}
Observable.just("book1", "book2")
  .flatMap(s -> getTitle())
  .subscribe(l -> result += l);

assertTrue(result.equals("titletitle"));


5.2.

スキャン



__scan演算子は、

Observable__によって発行された各項目に順に関数を適用し、連続した各値を発行します。

イベントからイベントへ状態を引き継ぐことができます。

String[]letters = {"a", "b", "c"};
Observable.from(letters)
  .scan(new StringBuilder(), StringBuilder::append)
  .subscribe(total -> result += total.toString());
assertTrue(result.equals("aababc"));


5.3.

GroupBy


Group by演算子を使用すると、入力Observable内のイベントを出力カテゴリに分類できます。

0から10までの整数の配列を作成した後、

group by

を適用して、それらをカテゴリ

even



odd

に分割します。

Observable.from(numbers)
  .groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD")
  .subscribe(group ->
    group.subscribe((number) -> {
        if (group.getKey().toString().equals("EVEN")) {
            EVEN[0]+= number;
        } else {
            ODD[0]+= number;
        }
    })
  );
assertTrue(EVEN[0].equals("0246810"));
assertTrue(ODD[0].equals("13579"));


5.4.

フィルタ


演算子

filter



observable

からの

predicate

テストに合格したアイテムだけを出力します。

それでは、奇数の整数配列をフィルタリングしましょう。

Observable.from(numbers)
  .filter(i -> (i % 2 == 1))
  .subscribe(i -> result += i);

assertTrue(result.equals("13579"));


5.5. 条件付き演算子


DefaultIfEmpty

は、ソース

Observable

からアイテムを生成します。ソース

Observable

が空の場合はデフォルトのアイテムを生成します。

Observable.empty()
  .defaultIfEmpty("Observable is empty")
  .subscribe(s -> result += s);

assertTrue(result.equals("Observable is empty"));

次のコードは、配列

letters

が空ではなく、これが最初の位置に含まれるものであるため、アルファベットの最初の文字「__a」を出力します。

Observable.from(letters)
  .defaultIfEmpty("Observable is empty")
  .first()
  .subscribe(s -> result += s);

assertTrue(result.equals("a"));


TakeWhile

演算子は、指定された条件が偽になった後に

Observable

によって発行されたアイテムを破棄します。

Observable.from(numbers)
  .takeWhile(i -> i < 5)
  .subscribe(s -> sum[0]+= s);

assertTrue(sum[0]== 10);

もちろん、__Contain、SkipWhile、SkipUntil、TakeUntilなど、私たちのニーズを満たすことができる他の演算子が他にもあります。


6. 接続可能な観測量


  • ConnectableObservable

    は通常の

    Observable

    と似ていますが、購読時にアイテムの発行を開始しない点で、ただし

    connect

    演算子が適用されている場合のみです。

このようにして、

Observable

がアイテムの発行を開始する前に、すべての対象オブザーバが

Observable

にサブスクライブするのを待つことができます。

String[]result = {""};
ConnectableObservable<Long> connectable
  = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
connectable.subscribe(i -> result[0]+= i);
assertFalse(result[0].equals("01"));

connectable.connect();
Thread.sleep(500);

assertTrue(result[0].equals("01"));

7.シングル


Single



Observable

のようなもので、一連の値を発行するのではなく、1つの値またはエラー通知を発行します。

このデータソースでは、購読には2つの方法しか使用できません。


  • OnSuccess

    は、指定したメソッドも呼び出す

    Single

    を返します。


  • OnError

    は、ただちに通知する

    Single

    も返します。

エラーの購読者

String[]result = {""};
Single<String> single = Observable.just("Hello")
  .toSingle()
  .doOnSuccess(i -> result[0]+= i)
  .doOnError(error -> {
      throw new RuntimeException(error.getMessage());
  });
single.subscribe();

assertTrue(result[0].equals("Hello"));


8科目


Subject

は同時に2つの要素、

subscriber



observable

です。サブスクライバーとして、サブジェクトを使用して複数の観察可能物から来るイベントを公開することができます。

またそれは観測可能なので、複数の加入者からのイベントは、それを観測している人にはそのイベントとして再送信できます。

次の例では、購読後に発生するイベントをオブザーバがどのように確認できるかを見ていきます。

Integer subscriber1 = 0;
Integer subscriber2 = 0;
Observer<Integer> getFirstObserver() {
    return new Observer<Integer>() {
        @Override
        public void onNext(Integer value) {
           subscriber1 += value;
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("error");
        }

        @Override
        public void onCompleted() {
            System.out.println("Subscriber1 completed");
        }
    };
}

Observer<Integer> getSecondObserver() {
    return new Observer<Integer>() {
        @Override
        public void onNext(Integer value) {
            subscriber2 += value;
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("error");
        }

        @Override
        public void onCompleted() {
            System.out.println("Subscriber2 completed");
        }
    };
}

PublishSubject<Integer> subject = PublishSubject.create();
subject.subscribe(getFirstObserver());
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
subject.subscribe(getSecondObserver());
subject.onNext(4);
subject.onCompleted();

assertTrue(subscriber1 + subscriber2 == 14)


9資源管理


Using

操作を使用すると、JDBCデータベース接続、ネットワーク接続、開いているファイルなどのリソースを観察可能ファイルに関連付けることができます。

ここでは、この目標を達成するために必要な手順と、実装例を解説しています。

String[]result = {""};
Observable<Character> values = Observable.using(
  () -> "MyResource",
  r -> {
      return Observable.create(o -> {
          for (Character c : r.toCharArray()) {
              o.onNext(c);
          }
          o.onCompleted();
      });
  },
  r -> System.out.println("Disposed: " + r)
);
values.subscribe(
  v -> result[0]+= v,
  e -> result[0]+= e
);
assertTrue(result[0].equals("MyResource"));


10結論

この記事では、RxJavaライブラリーの使い方と、その最も重要な機能を探る方法について説明しました。

ここで使用されているすべてのコードサンプルを含むプロジェクトの完全なソースコードはhttps://github.com/eugenp/tutorials/tree/master/rxjava[Githubに追加]で見つけることができます。