1前書き

RxJavaの人気は、その機能を拡張する複数のサードパーティライブラリの作成をもたらしました。

これらのライブラリの多くは、RxJavaを使用するときに開発者が対処していた典型的な問題に対する答えです。


  • RxRelay

    ** はこれらの解決策の1つです。


2

Subject


を扱う

簡単に言うと、

Subject



Observable

と__Observerの間のブリッジとして機能します。

また、同時に

Observable

であれば、イベントを再発行したり、新しいイベントをその購読者に送信することもできます。

Subject

に関する詳細は、このリンクにあります:/rx-java[article]。


Subject

の問題の1つは、

onComplete()

または

onError()

を受け取った後、データを移動できなくなったことです。

時にはそれは望ましい振る舞いですが、時にはそうではありません。

このような動作が望ましくない場合は、

RxRelay

を使用することを検討してください。


3

リレー



Relay

は基本的には

Subject

ですが、

onComplete()

および__onError()を呼び出すことができないため、常にデータを送信することができます。**

これにより、誤ってターミナル状態が引き起こされることを心配することなく、異なるタイプのAPI間のブリッジを作成することができます。


RxRelay

を使用するには、プロジェクトに次の依存関係を追加する必要があります。

<dependency>
  <groupId>com.jakewharton.rxrelay2</groupId>
  <artifactId>rxrelay</artifactId>
  <version>1.2.0</version>
</dependency>


4

Relay


の種類

ライブラリには3種類の

Relay

があります。

ここでは、3つすべてについてすぐに説明します。

4.1.

PublishRelay

このタイプの

Relay

は、

Observer

がそれを購読すると、すべてのイベントを再発行します。

イベントはすべての加入者に発行されます。

public void whenObserverSubscribedToPublishRelay__itReceivesEmittedEvents() {
    PublishRelay<Integer> publishRelay = PublishRelay.create();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();

    publishRelay.subscribe(firstObserver);
    firstObserver.assertSubscribed();
    publishRelay.accept(5);
    publishRelay.accept(10);
    publishRelay.subscribe(secondObserver);
    secondObserver.assertSubscribed();
    publishRelay.accept(15);
    firstObserver.assertValues(5, 10, 15);

   //second receives only the last event
    secondObserver.assertValue(15);
}

この場合、イベントのバッファリングはありませんので、この動作はコールド

Observable.

に似ています。

4.2.

BehaviorRelay

このタイプの

Relay

は、O

__bserver

__が購読した時点で最新の監視対象イベントとそれ以降のすべてのイベントを再発行します。

public void whenObserverSubscribedToBehaviorRelay__itReceivesEmittedEvents() {
    BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    behaviorRelay.accept(5);
    behaviorRelay.subscribe(firstObserver);
    behaviorRelay.accept(10);
    behaviorRelay.subscribe(secondObserver);
    behaviorRelay.accept(15);
    firstObserver.assertValues(5, 10, 15);
    secondObserver.assertValues(10, 15);
}


BehaviorRelay

を作成するときに、デフォルト値を指定できます。デフォルト値を他に発行するイベントがない場合に発行されます。

デフォルト値を指定するには、

createDefault()

メソッドを使用します。

public void whenObserverSubscribedToBehaviorRelay__itReceivesDefaultValue() {
    BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.createDefault(1);
    TestObserver<Integer> firstObserver = new TestObserver<>();
    behaviorRelay.subscribe(firstObserver);
    firstObserver.assertValue(1);
}

デフォルト値を指定したくない場合は、

create()

メソッドを使用できます。

public void whenObserverSubscribedToBehaviorRelayWithoutDefaultValue__itIsEmpty() {
    BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
    TestObserver<Integer> firstObserver = new TestObserver<>();
    behaviorRelay.subscribe(firstObserver);
    firstObserver.assertEmpty();
}

4.3.

ReplayRelay

このタイプの

Relay

は、受信したすべてのイベントをバッファしてから、それを購読しているすべての購読者に再送信します。

 public void whenObserverSubscribedToReplayRelay__itReceivesEmittedEvents() {
    ReplayRelay<Integer> replayRelay = ReplayRelay.create();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    replayRelay.subscribe(firstObserver);
    replayRelay.accept(5);
    replayRelay.accept(10);
    replayRelay.accept(15);
    replayRelay.subscribe(secondObserver);
    firstObserver.assertValues(5, 10, 15);
    secondObserver.assertValues(5, 10, 15);
}

すべての要素はバッファされ、すべてのサブスクライバは同じイベントを受け取ります。


ReplayRelay

を作成するときに、最大のバッファサイズとイベントの有効期間を指定できます。

バッファサイズを制限して

Relay

を作成するには、

createWithSize()

メソッドを使用します。設定したバッファサイズよりも多くのイベントをバッファリングする必要がある場合は、以前の要素は破棄されます。

public void whenObserverSubscribedToReplayRelayWithLimitedSize__itReceivesEmittedEvents() {
    ReplayRelay<Integer> replayRelay = ReplayRelay.createWithSize(2);
    TestObserver<Integer> firstObserver = TestObserver.create();
    replayRelay.accept(5);
    replayRelay.accept(10);
    replayRelay.accept(15);
    replayRelay.accept(20);
    replayRelay.subscribe(firstObserver);
    firstObserver.assertValues(15, 20);
}


createWithTime()

メソッドを使用して、バッファされたイベントに残す最大時間で

ReplayRelay

を作成することもできます。

public void whenObserverSubscribedToReplayRelayWithMaxAge__thenItReceivesEmittedEvents() {
    SingleScheduler scheduler = new SingleScheduler();
    ReplayRelay<Integer> replayRelay =
      ReplayRelay.createWithTime(2000, TimeUnit.MILLISECONDS, scheduler);
    long current =  scheduler.now(TimeUnit.MILLISECONDS);
    TestObserver<Integer> firstObserver = TestObserver.create();
    replayRelay.accept(5);
    replayRelay.accept(10);
    replayRelay.accept(15);
    replayRelay.accept(20);
    Thread.sleep(3000);
    replayRelay.subscribe(firstObserver);
    firstObserver.assertEmpty();
}


5カスタム

リレー


上記のすべての型は共通の抽象クラス

Relayを継承します。これにより、独自のカスタム

Relay__クラスを作成することができます。

カスタムの

Relay

を作成するには、3つのメソッドを実装する必要があります。


accept()、hasObservers()

、および

subscribeActual().

無作為に選ばれた購読者の一人にイベントを再送信する簡単なRelayを書きましょう:

public class RandomRelay extends Relay<Integer> {
    Random random = new Random();

    List<Observer<? super Integer>> observers = new ArrayList<>();

    @Override
    public void accept(Integer integer) {
        int observerIndex = random.nextInt() % observers.size();
        observers.get(observerIndex).onNext(integer);
    }

    @Override
    public boolean hasObservers() {
        return observers.isEmpty();
    }

    @Override
    protected void subscribeActual(Observer<? super Integer> observer) {
        observers.add(observer);
        observer.onSubscribe(Disposables.fromRunnable(
          () -> System.out.println("Disposed")));
    }
}

これで、1人の加入者だけがイベントを受け取ることをテストできます。

public void whenTwoObserversSubscribedToRandomRelay__thenOnlyOneReceivesEvent() {
    RandomRelay randomRelay = new RandomRelay();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    randomRelay.subscribe(firstObserver);
    randomRelay.subscribe(secondObserver);
    randomRelay.accept(5);
    if(firstObserver.values().isEmpty()) {
        secondObserver.assertValue(5);
    } else {
        firstObserver.assertValue(5);
        secondObserver.assertEmpty();
    }
}


6. 結論

このチュートリアルでは、

Subject

に似たタイプのRxRelayを見ましたが、端末状態をトリガーする機能はありません。

より多くの情報はhttp://jakewharton.github.io/RxRelay/[documentation]にあります。そして、いつものように、すべてのコードサンプルはhttps://github.com/eugenp/tutorials/tree/master/rxjava-2[over on GitHub]にあります。