1. 序章

RxJavaの人気により、その機能を拡張する複数のサードパーティライブラリが作成されました。

これらのライブラリの多くは、開発者がRxJavaを使用するときに対処していた典型的な問題に対する答えでした。 RxRelayはこれらのソリューションの1つです。

2. 件名の処理

簡単に言えば、 主題間の架け橋として機能します観察可能観察者。 それは観察者 、1つ以上をサブスクライブできますオブザーバブルそしてそれらからイベントを受け取ります。

また、同時に Observable であるため、イベントを再送信したり、サブスクライバーに新しいイベントを発行したりできます。 Subject の詳細については、このarticleを参照してください。

Subject の問題の1つは、 onComplete()または onError()を受信した後、データを移動できなくなることです。 望ましい動作である場合もあれば、そうでない場合もあります。

このような動作が望ましくない場合は、RxRelayの使用を検討する必要があります。

3. リレー

リレーは基本的にSubjectですが、 onComplete()および onError()を呼び出す機能がないため、は常に可能です。データを出力します。

これにより、誤ってターミナル状態をトリガーすることを心配せずに、さまざまなタイプのAPI間にブリッジを作成できます。

RxRelay を使用するには、プロジェクトに次の依存関係を追加する必要があります。

<dependency>
  <groupId>com.jakewharton.rxrelay2</groupId>
  <artifactId>rxrelay</artifactId>
  <version>1.2.0</version>
</dependency>

4. リレーの種類

ライブラリで使用できるリレーには3つの異なるタイプがあります。 ここでは、3つすべてについて簡単に説明します。

4.1. PublishRelay

このタイプのRelayは、 Observer がサブスクライブすると、すべてのイベントを再送信します。

イベントはすべてのサブスクライバーに発行されます。

public void whenObserverSubscribedToPublishRelay_itReceivesEmittedEvents() {
    PublishRelay<Integer> publishRelay = PublishRelay.create();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    
    publishRelay.subscribe(firstObserver);
    firstObserver.assertSubscribed();
    publishRelay.accept(5);
    publishRelay.accept(10);
    publishRelay.subscribe(secondObserver);
    secondObserver.assertSubscribed();
    publishRelay.accept(15);
    firstObserver.assertValues(5, 10, 15);
    
    // second receives only the last event
    secondObserver.assertValue(15);
}

この場合、イベントのバッファリングは行われないため、この動作はコールドObservableに似ています。

4.2. BehaviorRelay

このタイプのRelayは、O bserver がサブスクライブすると、最新の観測イベントと後続のすべてのイベントを再送信します。

public void whenObserverSubscribedToBehaviorRelay_itReceivesEmittedEvents() {
    BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    behaviorRelay.accept(5);     
    behaviorRelay.subscribe(firstObserver);
    behaviorRelay.accept(10);
    behaviorRelay.subscribe(secondObserver);
    behaviorRelay.accept(15);
    firstObserver.assertValues(5, 10, 15);
    secondObserver.assertValues(10, 15);
}

BehaviorRelay を作成するときに、他に発行するイベントがない場合に発行されるデフォルト値を指定できます。

デフォルト値を指定するには、 createDefault()メソッドを使用できます。

public void whenObserverSubscribedToBehaviorRelay_itReceivesDefaultValue() {
    BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.createDefault(1);
    TestObserver<Integer> firstObserver = new TestObserver<>();
    behaviorRelay.subscribe(firstObserver);
    firstObserver.assertValue(1);
}

デフォルト値を指定したくない場合は、 create()メソッドを使用できます。

public void whenObserverSubscribedToBehaviorRelayWithoutDefaultValue_itIsEmpty() {
    BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
    TestObserver<Integer> firstObserver = new TestObserver<>();
    behaviorRelay.subscribe(firstObserver);
    firstObserver.assertEmpty();
}

4.3. ReplayRelay

このタイプのRelayは、受信したすべてのイベントをバッファリングしてから、それをサブスクライブしているすべてのサブスクライバーに再送信します。

 public void whenObserverSubscribedToReplayRelay_itReceivesEmittedEvents() {
    ReplayRelay<Integer> replayRelay = ReplayRelay.create();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    replayRelay.subscribe(firstObserver);
    replayRelay.accept(5);
    replayRelay.accept(10);
    replayRelay.accept(15);
    replayRelay.subscribe(secondObserver);
    firstObserver.assertValues(5, 10, 15);
    secondObserver.assertValues(5, 10, 15);
}

すべての要素がバッファリングされ、すべてのサブスクライバーが同じイベントを受信するため、この動作はcoldObservableに似ています。

ReplayRelay を作成するときに、最大のバッファーサイズとイベントの存続時間を提供できます。

限られたバッファサイズでRelayを作成するには、 createWithSize()メソッドを使用できます。 設定されたバッファサイズよりも多くのバッファされるイベントがある場合、前の要素は破棄されます。

public void whenObserverSubscribedToReplayRelayWithLimitedSize_itReceivesEmittedEvents() {
    ReplayRelay<Integer> replayRelay = ReplayRelay.createWithSize(2);
    TestObserver<Integer> firstObserver = TestObserver.create();
    replayRelay.accept(5);
    replayRelay.accept(10);
    replayRelay.accept(15);
    replayRelay.accept(20);
    replayRelay.subscribe(firstObserver);
    firstObserver.assertValues(15, 20);
}

createWithTime()メソッドを使用して、バッファリングされたイベントに残す最大時間でReplayRelayを作成することもできます。

public void whenObserverSubscribedToReplayRelayWithMaxAge_thenItReceivesEmittedEvents() {
    SingleScheduler scheduler = new SingleScheduler();
    ReplayRelay<Integer> replayRelay =
      ReplayRelay.createWithTime(2000, TimeUnit.MILLISECONDS, scheduler);
    long current =  scheduler.now(TimeUnit.MILLISECONDS);
    TestObserver<Integer> firstObserver = TestObserver.create();
    replayRelay.accept(5);
    replayRelay.accept(10);
    replayRelay.accept(15);
    replayRelay.accept(20);
    Thread.sleep(3000);
    replayRelay.subscribe(firstObserver);
    firstObserver.assertEmpty();
}

5. カスタムリレー

上記のすべてのタイプは、共通の抽象クラス Relay、を拡張します。これにより、独自のカスタムRelayクラスを作成できます。

カスタムを作成するにはリレー 3つのメソッドを実装する必要があります。 accept()、hasObservers() subscribeActual()。

ランダムに選択されたサブスクライバーの1つにイベントを再送信する単純なリレーを作成しましょう。

public class RandomRelay extends Relay<Integer> {
    Random random = new Random();

    List<Observer<? super Integer>> observers = new ArrayList<>();

    @Override
    public void accept(Integer integer) {
        int observerIndex = random.nextInt() % observers.size();
        observers.get(observerIndex).onNext(integer);
    }

    @Override
    public boolean hasObservers() {
        return observers.isEmpty();
    }

    @Override
    protected void subscribeActual(Observer<? super Integer> observer) {
        observers.add(observer);
        observer.onSubscribe(Disposables.fromRunnable(
          () -> System.out.println("Disposed")));
    }
}

これで、1人のサブスクライバーのみがイベントを受信することをテストできます。

public void whenTwoObserversSubscribedToRandomRelay_thenOnlyOneReceivesEvent() {
    RandomRelay randomRelay = new RandomRelay();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    randomRelay.subscribe(firstObserver);
    randomRelay.subscribe(secondObserver);
    randomRelay.accept(5);
    if(firstObserver.values().isEmpty()) {
        secondObserver.assertValue(5);
    } else {
        firstObserver.assertValue(5);
        secondObserver.assertEmpty();
    }
}

6. 結論

このチュートリアルでは、 Subject に似たタイプですが、ターミナル状態をトリガーする機能がないRxRelayについて説明しました。

詳細については、ドキュメントを参照してください。 そして、いつものように、すべてのコードサンプルはGitHubにあります。