1. 概要

複数のサブスクライバーのデフォルトの動作が常に望ましいとは限りません。 この記事では、この動作を変更し、複数のサブスクライバーを適切な方法で処理する方法について説明します。

ただし、最初に、複数のサブスクライバーのデフォルトの動作を見てみましょう。

2. デフォルトの動作

次のObservableがあるとします。

private static Observable getObservable() {
    return Observable.create(subscriber -> {
        subscriber.onNext(gettingValue(1));
        subscriber.onNext(gettingValue(2));

        subscriber.add(Subscriptions.create(() -> {
            LOGGER.info("Clear resources");
        }));
    });
}

これは、サブスクライバーがサブスクライブするとすぐに2つの要素を放出します。

この例では、2つのサブスクライバーがあります。

LOGGER.info("Subscribing");

Subscription s1 = obs.subscribe(i -> LOGGER.info("subscriber#1 is printing " + i));
Subscription s2 = obs.subscribe(i -> LOGGER.info("subscriber#2 is printing " + i));

s1.unsubscribe();
s2.unsubscribe();

各要素の取得はコストのかかる操作であると想像してください。たとえば、集中的な計算やURL接続の開始が含まれる場合があります。

簡単にするために、数値を返します。

private static Integer gettingValue(int i) {
    LOGGER.info("Getting " + i);
    return i;
}

出力は次のとおりです。

Subscribing
Getting 1
subscriber#1 is printing 1
Getting 2
subscriber#1 is printing 2
Getting 1
subscriber#2 is printing 1
Getting 2
subscriber#2 is printing 2
Clear resources
Clear resources

ご覧のとおり、各要素の取得とリソースのクリアは、デフォルトで2回サブスクライバーごとに1回実行されます。 これは私たちが望んでいることではありません。 ConnectableObservable クラスは、問題の修正に役立ちます。

3. ConnectableObservable

ConnectableObservable クラスを使用すると、サブスクリプションを複数のサブスクライバーと共有でき、基礎となる操作を複数回実行する必要がなくなります。

ただし、最初にConnectableObservableを作成しましょう。

3.1. publish()

publish()メソッドは、ObservableからConnectableObservableを作成するものです。

ConnectableObservable obs = Observable.create(subscriber -> {
    subscriber.onNext(gettingValue(1));
    subscriber.onNext(gettingValue(2));
    subscriber.add(Subscriptions.create(() -> {
        LOGGER.info("Clear resources");
    }));
}).publish();

しかし今のところ、それは何もしません。 それを機能させるのは、 connect()メソッドです。

3.2. connect()

ConnectableObservableのconnect()メソッドが呼び出されないまでObservableのonSubcribe()コールバックはトリガーされません一部のサブスクライバーが存在する場合でも。

これをデモンストレーションしましょう:

LOGGER.info("Subscribing");
obs.subscribe(i -> LOGGER.info("subscriber #1 is printing " + i));
obs.subscribe(i -> LOGGER.info("subscriber #2 is printing " + i));
Thread.sleep(1000);
LOGGER.info("Connecting");
Subscription s = obs.connect();
s.unsubscribe();

サブスクライブしてから、接続する前に1秒待ちます。 出力は次のとおりです。

Subscribing
Connecting
Getting 1
subscriber #1 is printing 1
subscriber #2 is printing 1
Getting 2
subscriber #1 is printing 2
subscriber #2 is printing 2
Clear resources

私たちが見ることができるように:

    • 要素の取得は、必要に応じて1回だけ行われます
    • リソースのクリアも1回だけ行われます
    • 要素の取得は、サブスクライブの1秒後に開始されます。
    • サブスクライブしても、要素の放出はトリガーされなくなりました。 connect()のみがこれを行います

この遅延は有益な場合があります。場合によっては、サブスクライバーの1つが別のサブスクライバーよりも早くサブスクライブする場合でも、すべてのサブスクライバーに同じ要素のシーケンスを与える必要があります。

3.3. オブザーバブルの一貫したビュー– connect() subscribe()の後

このユースケースは、以前の Observable で動作し、両方のサブスクライバーがいずれにせよ要素のシーケンス全体を取得するため、実証できません。

代わりに、放出する要素がサブスクリプションの瞬間に依存しないことを想像してください。たとえば、マウスクリックで放出されるイベントです。 ここで、2番目のサブスクライバーが最初のサブスクライバーの1秒後にサブスクライブすることも想像してください。

最初のSubscriberは、この例で発行されたすべての要素を取得しますが、2番目の Subscriber は、一部の要素のみを受信します。

一方、適切な場所で connect()メソッドを使用すると、Observableシーケンスで両方のサブスクライバーに同じビューを与えることができます。

ホットObservableの例

ホットなObservableを作成しましょう。 JFrameをマウスでクリックすると要素が放出されます。

各要素はクリックのx座標になります。

private static Observable getObservable() {
    return Observable.create(subscriber -> {
        frame.addMouseListener(new MouseAdapter() {
            @Override
            public void mouseClicked(MouseEvent e) {
                subscriber.onNext(e.getX());
            }
        });
        subscriber.add(Subscriptions.create(() {
            LOGGER.info("Clear resources");
            for (MouseListener listener : frame.getListeners(MouseListener.class)) {
                frame.removeMouseListener(listener);
            }
        }));
    });
}

ホットObservableのデフォルトの動作

ここで、2つのサブスクライバーを2番目の間隔で次々にサブスクライブし、プログラムを実行してクリックを開始すると、最初のサブスクライバーがより多くの要素を取得することがわかります。

public static void defaultBehaviour() throws InterruptedException {
    Observable obs = getObservable();

    LOGGER.info("subscribing #1");
    Subscription subscription1 = obs.subscribe((i) -> 
        LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription subscription2 = obs.subscribe((i) -> 
        LOGGER.info("subscriber#2 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("unsubscribe#1");
    subscription1.unsubscribe();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe#2");
    subscription2.unsubscribe();
}
subscribing #1
subscriber#1 is printing x-coordinate 280
subscriber#1 is printing x-coordinate 242
subscribing #2
subscriber#1 is printing x-coordinate 343
subscriber#2 is printing x-coordinate 343
unsubscribe#1
clearing resources
unsubscribe#2
clearing resources

connect() subscribe()の後

両方のサブスクライバーが同じシーケンスを取得できるようにするには、このObservableConnectableObservableに変換し、両方の Subscriberサブスクリプションの後にconnect()を呼び出します。 ] s:

public static void subscribeBeforeConnect() throws InterruptedException {

    ConnectableObservable obs = getObservable().publish();

    LOGGER.info("subscribing #1");
    Subscription subscription1 = obs.subscribe(
      i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription subscription2 = obs.subscribe(
      i ->  LOGGER.info("subscriber#2 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("connecting:");
    Subscription s = obs.connect();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe connected");
    s.unsubscribe();
}

これで、同じシーケンスが得られます。

subscribing #1
subscribing #2
connecting:
subscriber#1 is printing x-coordinate 317
subscriber#2 is printing x-coordinate 317
subscriber#1 is printing x-coordinate 364
subscriber#2 is printing x-coordinate 364
unsubscribe connected
clearing resources

したがって、重要なのは、すべてのサブスクライバーの準備が整うまで待ってから、 connect()を呼び出すことです。

Springアプリケーションでは、たとえば、アプリケーションの起動時にすべてのコンポーネントをサブスクライブし、 onApplicationEvent() connect()を呼び出すことがあります。

しかし、例に戻りましょう。 connect()メソッドの前のすべてのクリックが失われることに注意してください。 要素を見逃したくないが、逆にそれらを処理する場合は、コードの前に connect()を配置し、ObservableにObservableに[X X191X]サブスクライバー。

3.4. サブスクライバー connect()の前に subscribe()がない場合にサブスクリプションを強制する

これを示すために、例を修正しましょう。

public static void connectBeforeSubscribe() throws InterruptedException {
    ConnectableObservable obs = getObservable()
      .doOnNext(x -> LOGGER.info("saving " + x)).publish();
    LOGGER.info("connecting:");
    Subscription s = obs.connect();
    Thread.sleep(1000);
    LOGGER.info("subscribing #1");
    obs.subscribe((i) -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    obs.subscribe((i) -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));
    Thread.sleep(1000);
    s.unsubscribe();
}

手順は比較的簡単です。

  • まず、接続します
  • 次に、1秒間待って、最初のサブスクライバーをサブスクライブします。
  • 最後に、もう1秒待って、2番目のサブスクライバーをサブスクライブします。

doOnNext()演算子を追加したことに注意してください。 ここでは、たとえばデータベースに要素を格納できますが、コードでは「saving…」と出力するだけです。

コードを起動してクリックを開始すると、 connect()呼び出しの直後に要素が出力されて処理されることがわかります。

connecting:
saving 306
saving 248
subscribing #1
saving 377
subscriber#1 is printing x-coordinate 377
saving 295
subscriber#1 is printing x-coordinate 295
saving 206
subscriber#1 is printing x-coordinate 206
subscribing #2
saving 347
subscriber#1 is printing x-coordinate 347
subscriber#2 is printing x-coordinate 347
clearing resources

サブスクライバーがなかった場合でも、要素は処理されます。

したがって、connect()メソッドは、誰かがサブスクライブされているかどうかに関係なく、要素を消費する空のアクションを持つ人工のサブスクライバーがあるかのように、要素の発行と処理を開始します

また、実際のサブスクライバーがサブスクライブしている場合、この人工メディエーターは要素をそれらに伝達するだけです。

人工的なSubscriberの購読を解除するには、次のようにします。

s.unsubscribe();

どこ:

Subscription s = obs.connect();

3.5. autoConnect()

このメソッドは、connect()がサブスクリプションの前後ではなく、最初のサブスクライバーがサブスクライブしたときに自動的に呼び出されることを意味します

このメソッドを使用すると、返されるオブジェクトは通常の Observable であり、このメソッドはありませんが、基になる ConnectableObservable を使用するため、 connect()を自分で呼び出すことはできません。 ]:

public static void autoConnectAndSubscribe() throws InterruptedException {
    Observable obs = getObservable()
    .doOnNext(x -> LOGGER.info("saving " + x)).publish().autoConnect();

    LOGGER.info("autoconnect()");
    Thread.sleep(1000);
    LOGGER.info("subscribing #1");
    Subscription s1 = obs.subscribe((i) -> 
        LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription s2 = obs.subscribe((i) -> 
        LOGGER.info("subscriber#2 is printing x-coordinate " + i));

    Thread.sleep(1000);
    LOGGER.info("unsubscribe 1");
    s1.unsubscribe();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe 2");
    s2.unsubscribe();
}

人工的なSubscriberの購読を解除することもできないことに注意してください。 すべての実際のサブスクライバーのサブスクライブを解除できますが、人工のサブスクライバーは引き続きイベントを処理します。

これを理解するために、最後のサブスクライバーがサブスクライブを解除した後、最後に何が起こっているかを見てみましょう。

subscribing #1
saving 296
subscriber#1 is printing x-coordinate 296
saving 329
subscriber#1 is printing x-coordinate 329
subscribing #2
saving 226
subscriber#1 is printing x-coordinate 226
subscriber#2 is printing x-coordinate 226
unsubscribe 1
saving 268
subscriber#2 is printing x-coordinate 268
saving 234
subscriber#2 is printing x-coordinate 234
unsubscribe 2
saving 278
saving 268

ご覧のとおり、リソースのクリアは行われず、 doOnNext()を使用した要素の保存は、2回目のサブスクライブ解除後も続行されます。 これは、人工のサブスクライバーがサブスクライブを解除せず、要素を消費し続けることを意味します。

3.6. refCount()

refCount()は、 autoConnect()に似ており、最初のサブスクライバーがサブスクライブするとすぐに接続も自動的に行われます。

autoconnect()とは異なり、最後のサブスクライバーがサブスクライブを解除すると、切断も自動的に行われます。

public static void refCountAndSubscribe() throws InterruptedException {
    Observable obs = getObservable()
      .doOnNext(x -> LOGGER.info("saving " + x)).publish().refCount();

    LOGGER.info("refcount()");
    Thread.sleep(1000);
    LOGGER.info("subscribing #1");
    Subscription subscription1 = obs.subscribe(
      i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription subscription2 = obs.subscribe(
      i -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));

    Thread.sleep(1000);
    LOGGER.info("unsubscribe#1");
    subscription1.unsubscribe();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe#2");
    subscription2.unsubscribe();
}
refcount()
subscribing #1
saving 265
subscriber#1 is printing x-coordinate 265
saving 338
subscriber#1 is printing x-coordinate 338
subscribing #2
saving 203
subscriber#1 is printing x-coordinate 203
subscriber#2 is printing x-coordinate 203
unsubscribe#1
saving 294
subscriber#2 is printing x-coordinate 294
unsubscribe#2
clearing resources

4. 結論

ConnectableObservable クラスは、わずかな労力で複数のサブスクライバーを処理するのに役立ちます。

そのメソッドは似ていますが、メソッドの順序さえ重要であることを意味する実装の微妙さのために、サブスクライバーの動作を大幅に変更します。

この記事で使用されているすべての例の完全なソースコードは、GitHubプロジェクトにあります。