1.概要

複数の加入者のデフォルトの動作が常に望ましいとは限りません。この記事では、この動作を変更し、複数の加入者を適切に処理する方法について説明します。

しかし、最初に、複数の加入者のデフォルトの動作を見てみましょう。

2.デフォルトの動作

以下の

観測可能

があるとしましょう。

private static Observable getObservable() {
    return Observable.create(subscriber -> {
        subscriber.onNext(gettingValue(1));
        subscriber.onNext(gettingValue(2));

        subscriber.add(Subscriptions.create(() -> {
            LOGGER.info("Clear resources");
        }));
    });
}

これは

__Subscriber

__sが購読するとすぐに2つの要素を放出します。

この例では、2つの

__Subscriber

__があります。

LOGGER.info("Subscribing");

Subscription s1 = obs.subscribe(i -> LOGGER.info("subscriber#1 is printing " + i));
Subscription s2 = obs.subscribe(i -> LOGGER.info("subscriber#2 is printing " + i));

s1.unsubscribe();
s2.unsubscribe();

各要素を取得することはコストのかかる操作であると想像してください – それは例えば集中的な計算やURL接続を開くことを含みます。

簡単にするために、数値を返すだけです。

private static Integer gettingValue(int i) {
    LOGGER.info("Getting " + i);
    return i;
}

これが出力です:

Subscribing
Getting 1
subscriber#1 is printing 1
Getting 2
subscriber#1 is printing 2
Getting 1
subscriber#2 is printing 1
Getting 2
subscriber#2 is printing 2
Clear resources
Clear resources

ご覧のとおり、** 各要素の取得とリソースのクリアは、デフォルトでは2回実行されます – 各

Subscriber

に1回。これは私たちが望むことではありません。

ConnectableObservable

クラスは問題を解決するのに役立ちます。

3.

ConnectableObservable



ConnectableObservable


クラスを使用すると、購読を複数の購読者と共有でき、基になる操作を複数回実行することはできません。

しかし、最初に

ConnectableObservable

を作成しましょう。

3.1.

publish()


publish()

メソッドは、

Observable

から

ConnectableObservable

を作成するものです。

ConnectableObservable obs = Observable.create(subscriber -> {
    subscriber.onNext(gettingValue(1));
    subscriber.onNext(gettingValue(2));
    subscriber.add(Subscriptions.create(() -> {
        LOGGER.info("Clear resources");
    }));
}).publish();

しかし今のところ、それは何もしません。それが機能するのは

connect()

メソッドです。

3.2.

connect()


  • ConnectableObservable

    s

    connect()

    メソッドが呼び出されるまで

    Observable

    s

    onSubcribe()

    コールバックは、いくつかのサブスクライバがあっても引き起こされません。

これを実証しましょう。

LOGGER.info("Subscribing");
obs.subscribe(i -> LOGGER.info("subscriber #1 is printing " + i));
obs.subscribe(i -> LOGGER.info("subscriber #2 is printing " + i));
Thread.sleep(1000);
LOGGER.info("Connecting");
Subscription s = obs.connect();
s.unsubscribe();

購読してから、しばらく待ってから接続します。出力は以下のとおりです。

Subscribing
Connecting
Getting 1
subscriber #1 is printing 1
subscriber #2 is printing 1
Getting 2
subscriber #1 is printing 2
subscriber #2 is printing 2
Clear resources

私たちが見ることができるように:



  • 要素を取得することは我々が望んだように一度だけ起こります

  • ** 決済リソースは一度だけ発生します

  • ** 購読から1秒後に要素の取得が始まります。

  • ** 購読しても、要素の発行は発生しませんのみ


connect()

はこれを行います

この遅延は有益になることがあります – 時には、たとえそれらのうちの1つが他のものより早く購読するとしても、すべての購読者に同じ要素のシーケンスを与える必要があります。

3.3. オブザーバブルの一貫した見方 –

connect()

after

subscribe()

このユースケースは、以前の

Observable

ではうまく動作しないため、両方のサブスクライバがいずれにしても要素のシーケンス全体を取得するため、このユースケースを示すことはできません。

そうではなく、要素の発行が購読の瞬間、たとえばマウスクリックで発生したイベントに依存しないと想像してください。今度は、2番目の

Subscriber

が最初のサブスクリプションの後に1秒サブスクライブすることも想像してください。

最初の

Subscriber

はこの例の間に発行されたすべての要素を取得しますが、2番目の

Subscriber

はいくつかの要素のみを受け取ります。

一方、正しい場所で

connect()

メソッドを使用すると、両方のサブスクライバに

Observable

シーケンスに関する同じビューを与えることができます。

  • ホット

    観測可能

    の例**

ホットなオブザーバブルを作りましょう。それは

JFrame

上でマウスクリックで要素を放出するでしょう。

各要素はクリックのX座標になります。

private static Observable getObservable() {
    return Observable.create(subscriber -> {
        frame.addMouseListener(new MouseAdapter() {
            @Override
            public void mouseClicked(MouseEvent e) {
                subscriber.onNext(e.getX());
            }
        });
        subscriber.add(Subscriptions.create(() {
            LOGGER.info("Clear resources");
            for (MouseListener listener : frame.getListeners(MouseListener.class)) {
                frame.removeMouseListener(listener);
            }
        }));
    });
}

  • Hotのデフォルトの動作

    Observable

    **

2つの

__Subscriber


を2番目の間隔で次々にサブスクライブして、プログラムを実行してクリックを開始すると、最初の

Subscriber__にさらに要素が追加されることがわかります。

public static void defaultBehaviour() throws InterruptedException {
    Observable obs = getObservable();

    LOGGER.info("subscribing #1");
    Subscription subscription1 = obs.subscribe((i) ->
        LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription subscription2 = obs.subscribe((i) ->
        LOGGER.info("subscriber#2 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("unsubscribe#1");
    subscription1.unsubscribe();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe#2");
    subscription2.unsubscribe();
}

subscribing #1
subscriber#1 is printing x-coordinate 280
subscriber#1 is printing x-coordinate 242
subscribing #2
subscriber#1 is printing x-coordinate 343
subscriber#2 is printing x-coordinate 343
unsubscribe#1
clearing resources
unsubscribe#2
clearing resources


  • connect()



    subscribe()

    **

両方のサブスクライバが同じシーケンスを使用するようにするには、この

Observable



ConnectableObservable

に変換し、サブスクリプションの両方の後に

connect()

を呼び出します。

public static void subscribeBeforeConnect() throws InterruptedException {

    ConnectableObservable obs = getObservable().publish();

    LOGGER.info("subscribing #1");
    Subscription subscription1 = obs.subscribe(
      i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription subscription2 = obs.subscribe(
      i ->  LOGGER.info("subscriber#2 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("connecting:");
    Subscription s = obs.connect();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe connected");
    s.unsubscribe();
}

これで、同じシーケンスになります。

subscribing #1
subscribing #2
connecting:
subscriber#1 is printing x-coordinate 317
subscriber#2 is printing x-coordinate 317
subscriber#1 is printing x-coordinate 364
subscriber#2 is printing x-coordinate 364
unsubscribe connected
clearing resources

つまり、すべてのサブスクライバの準備が整ってから

connect()

を呼び出すことが重要です。

Springアプリケーションでは、たとえばアプリケーションの起動時にすべてのコンポーネントをサブスクライブし、

onApplicationEvent()



connect()

を呼び出します。

しかし、私たちの例に戻りましょう。

connect()

メソッドの前のクリックはすべて見逃されていることに注意してください。要素を見逃すのではなく、逆にそれらを処理する場合は、

connect()

をコードの前の方に配置し、

Observable



Subscriber

がないイベントを生成させることができます。

3.4.

Subscriber



connect()


subscribe()

の前に、何も存在しない場合のサブスクリプション

これを実証するために、私たちの例を修正しましょう:

public static void connectBeforeSubscribe() throws InterruptedException {
    ConnectableObservable obs = getObservable()
      .doOnNext(x -> LOGGER.info("saving " + x)).publish();
    LOGGER.info("connecting:");
    Subscription s = obs.connect();
    Thread.sleep(1000);
    LOGGER.info("subscribing #1");
    obs.subscribe((i) -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    obs.subscribe((i) -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));
    Thread.sleep(1000);
    s.unsubscribe();
}

手順は比較的簡単です。

  • まず、つなげます

  • それから私達は1秒待って最初の

    Subscriber

    を購読

  • 最後に、もう1秒待って2秒目に申し込みます


加入者


doOnNext()

演算子が追加されています。ここでは、たとえばデータベースに要素を格納することができますが、コードでは、単に “saving …​”を印刷します。

コードを起動してクリックを開始すると、__connect()呼び出しの直後に要素が発行および処理されることがわかります。

connecting:
saving 306
saving 248
subscribing #1
saving 377
subscriber#1 is printing x-coordinate 377
saving 295
subscriber#1 is printing x-coordinate 295
saving 206
subscriber#1 is printing x-coordinate 206
subscribing #2
saving 347
subscriber#1 is printing x-coordinate 347
subscriber#2 is printing x-coordinate 347
clearing resources

購読者がいない場合でも、要素は処理されます。

したがって、

connect()メソッドは、誰かが購読しているかどうかにかかわらず、要素を消費する空のアクションを持つ人工的な

Subscriber__があるかのように** 要素の発行と処理を開始します。

そして本物の

__Subscriber

__が購読している場合、この人工的な仲介者はそれらに要素を伝播するだけです。

人工の「購読者」を購読解除するには、次のように実行します。

s.unsubscribe();

どこで:

Subscription s = obs.connect();

3.5.

autoConnect()

  • このメソッドは、

    connect()

    が購読の前後に呼び出されるのではなく、最初の

    Subscriber

    が購読するときに自動的に呼び出されることを意味します。

このメソッドを使用すると、返されるオブジェクトは通常の

Observable

なので、

connect()

を呼び出すことはできません。このメソッドにはありませんが、基礎となる

ConnectableObservable

を使用します。

public static void autoConnectAndSubscribe() throws InterruptedException {
    Observable obs = getObservable()
    .doOnNext(x -> LOGGER.info("saving " + x)).publish().autoConnect();

    LOGGER.info("autoconnect()");
    Thread.sleep(1000);
    LOGGER.info("subscribing #1");
    Subscription s1 = obs.subscribe((i) ->
        LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription s2 = obs.subscribe((i) ->
        LOGGER.info("subscriber#2 is printing x-coordinate " + i));

    Thread.sleep(1000);
    LOGGER.info("unsubscribe 1");
    s1.unsubscribe();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe 2");
    s2.unsubscribe();
}

人工的な

Subscriber

を購読解除することもできません。実際の

Subscribers

をすべて購読解除することはできますが、人工の

Subscriber

がイベントを処理します。

これを理解するために、最後の加入者が退会した後の最後に何が起こっているのかを見てみましょう。

subscribing #1
saving 296
subscriber#1 is printing x-coordinate 296
saving 329
subscriber#1 is printing x-coordinate 329
subscribing #2
saving 226
subscriber#1 is printing x-coordinate 226
subscriber#2 is printing x-coordinate 226
unsubscribe 1
saving 268
subscriber#2 is printing x-coordinate 268
saving 234
subscriber#2 is printing x-coordinate 234
unsubscribe 2
saving 278
saving 268

ご覧のとおり、リソースのクリアは行われず、

doOnNext()

を使用した要素の保存は2回目の登録解除後も続行されます。つまり、人工の

Subscriber

は購読を中止するのではなく、要素を使い続けるということです。

3.6

refCount()


  • refCount()



    autoConnect()

    と似ていますが、最初の

    Subscriber

    がサブスクライブすると同時に接続も自動的に行われます。


autoconnect()

と異なり、最後の

Subscriber

が登録解除したときにも自動的に切断されます。

public static void refCountAndSubscribe() throws InterruptedException {
    Observable obs = getObservable()
      .doOnNext(x -> LOGGER.info("saving " + x)).publish().refCount();

    LOGGER.info("refcount()");
    Thread.sleep(1000);
    LOGGER.info("subscribing #1");
    Subscription subscription1 = obs.subscribe(
      i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription subscription2 = obs.subscribe(
      i -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));

    Thread.sleep(1000);
    LOGGER.info("unsubscribe#1");
    subscription1.unsubscribe();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe#2");
    subscription2.unsubscribe();
}

refcount()
subscribing #1
saving 265
subscriber#1 is printing x-coordinate 265
saving 338
subscriber#1 is printing x-coordinate 338
subscribing #2
saving 203
subscriber#1 is printing x-coordinate 203
subscriber#2 is printing x-coordinate 203
unsubscribe#1
saving 294
subscriber#2 is printing x-coordinate 294
unsubscribe#2
clearing resources

4.まとめ


ConnectableObservable

クラスは、少しの労力で複数のサブスクライバを処理するのに役立ちます。

そのメソッドは似ているように見えますが、メソッドの順序が重要であることを意味する実装の微妙さのために、加入者の振る舞いを大きく変えます。

この記事で使われているすべての例の完全なソースコードはhttps://github.com/eugenp/tutorials/tree/master/rxjava[GitHubプロジェクト]にあります。