1概要

この記事では、RxJava

ObservableのsubscribeOn

メソッドと

observeOn

メソッドに基づくマルチスレッドプログラムの作成に使用するさまざまなタイプの

スケジューラ

に焦点を当てます。

スケジューラは、オブザーバブルチェーンの操作に関連するタスクをどこで、そしていつ実行するのかを指定する機会を与えます。

クラス


Schedulers

.

に記述されているファクトリメソッドから

Scheduler

を取得できます。


2デフォルトのスレッド動作

  • デフォルトでは、

    Rxはシングルスレッド

    です。これは、

    Observable

    とそれに適用できる一連の演算子が、その

    subscribe()

    メソッドが呼び出されたのと同じスレッド上のオブザーバに通知することを意味します。


observeOn

および

subscribeOn

メソッドは、引数として__Schedulerを取ります。


createWorker

メソッドを使用して

Scheduler

の実装を作成します。このメソッドは__http://reactivex.io/RxJava/javadoc/rx/Scheduler.Worker.html[Scheduler.Worker]を返します。それらはシングルスレッドで順番に実行されます。

ある意味では、

worker



__S


chedulerそのものですが、混乱を避けるために

Scheduler__とは呼びません。


2.1. アクションのスケジューリング

新しい

worker

を作成し、いくつかのアクションをスケジュールすることで、任意の

Scheduler

にジョブをスケジュールできます。

Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> result += "action");

Assert.assertTrue(result.equals("action"));

その後、アクションはワーカーが割り当てられているスレッドでキューに入れられます。


2.2. アクションをキャンセルする


Scheduler.Worker



Subscription

を拡張します。

worker



unsubscribe

メソッドを呼び出すと、キューが空になり、保留中のタスクがすべてキャンセルされます。例を見ればわかります。

Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += "First__Action";
    worker.unsubscribe();
});
worker.schedule(() -> result += "Second__Action");

Assert.assertTrue(result.equals("First__Action"));

2番目のタスクは、前のタスクが操作全体を取り消したために実行されることはありません。実行中のアクションは中断されます。


3

Schedulers.newThread


このスケジューラは、

subscribeOn()

または

observeOn()

を介して要求されるたびに新しいスレッドを開始するだけです。

スレッド開始時の待ち時間だけでなく、このスレッドが再利用されていないため、これは決して良い選択ではありません。

Observable.just("Hello")
  .observeOn(Schedulers.newThread())
  .doOnNext(s ->
    result2 += Thread.currentThread().getName()
  )
  .observeOn(Schedulers.newThread())
  .subscribe(s ->
    result1 += Thread.currentThread().getName()
  );
Thread.sleep(500);
Assert.assertTrue(result1.equals("RxNewThreadScheduler-1"));
Assert.assertTrue(result2.equals("RxNewThreadScheduler-2"));


Worker

が完了すると、スレッドは単に終了します。この

Scheduler

は、タスクが粗粒度の場合にのみ使用できます。完了するまでに時間がかかりますが、スレッドがまったく再利用されないようにするために非常に少数です。

Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += Thread.currentThread().getName() + "__Start";
    worker.schedule(() -> result += "__worker__");
    result += "__End";
});
Thread.sleep(3000);
Assert.assertTrue(result.equals(
  "RxNewThreadScheduler-1__Start__End__worker__"));


NewThreadSchedulerに

worker

をスケジュールしたとき、

ワーカーが特定のスレッドにバインドされていることがわかりました。


4

Schedulers.immediate



Schedulers.immediate

は、非同期的ではなくブロック的にクライアントスレッド内のタスクを呼び出す特別なスケジューラです。

Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += Thread.currentThread().getName() + "__Start";
    worker.schedule(() -> result += "__worker__");
    result += "__End";
});
Thread.sleep(500);
Assert.assertTrue(result.equals(
  "main__Start__worker____End"));

実際、

immediate Scheduler

を介して

Observable

を購読することは、通常、特定の

__S

__chedulerをまったく購読しないことと同じ効果があります。

Observable.just("Hello")
  .subscribeOn(Schedulers.immediate())
  .subscribe(s ->
    result += Thread.currentThread().getName()
  );
Thread.sleep(500);
Assert.assertTrue(result.equals("main"));


5

Schedulers.trampoline



trampoline


Scheduler



immediate

と非常によく似ていますが、それは同じスレッド内でタスクをスケジュールし、事実上ブロックするからです。

ただし、次のタスクは、以前にスケジュールされたすべてのタスクが完了したときに実行されます。

Observable.just(2, 4, 6, 8)
  .subscribeOn(Schedulers.trampoline())
  .subscribe(i -> result += "" + i);
Observable.just(1, 3, 5, 7, 9)
  .subscribeOn(Schedulers.trampoline())
  .subscribe(i -> result += "" + i);
Thread.sleep(500);
Assert.assertTrue(result.equals("246813579"));


Immediate

は与えられたタスクをすぐに起動しますが、

trampoline

は現在のタスクが終了するのを待ちます。


trampoline

s

worker

は、最初のタスクをスケジュールしたスレッド上のすべてのタスクを実行します。

schedule

の最初の呼び出しは、キューが空になるまでブロックされています。

Scheduler scheduler = Schedulers.trampoline();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += Thread.currentThread().getName() + "Start";
    worker.schedule(() -> {
        result += "__middleStart";
        worker.schedule(() ->
            result += "__worker__"
        );
        result += "__middleEnd";
    });
    result += "__mainEnd";
});
Thread.sleep(500);
Assert.assertTrue(result
  .equals("mainStart__mainEnd__middleStart__middleEnd__worker__"));


6.

Schedulers.from



Schedulers

は、

java.util.concurrent



Executors

よりも内部的に複雑です。したがって、個別の抽象化が必要でした。

しかし、それらは概念的に非常に似ているので、驚くことに

from

factoryメソッドを使って

Executor



Scheduler

に変えることができるラッパーがあります。

private ThreadFactory threadFactory(String pattern) {
    return new ThreadFactoryBuilder()
      .setNameFormat(pattern)
      .build();
}

@Test
public void givenExecutors__whenSchedulerFrom__thenReturnElements()
 throws InterruptedException {

    ExecutorService poolA = newFixedThreadPool(
      10, threadFactory("Sched-A-%d"));
    Scheduler schedulerA = Schedulers.from(poolA);
    ExecutorService poolB = newFixedThreadPool(
      10, threadFactory("Sched-B-%d"));
    Scheduler schedulerB = Schedulers.from(poolB);

    Observable<String> observable = Observable.create(subscriber -> {
      subscriber.onNext("Alfa");
      subscriber.onNext("Beta");
      subscriber.onCompleted();
    });;

    observable
      .subscribeOn(schedulerA)
      .subscribeOn(schedulerB)
      .subscribe(
        x -> result += Thread.currentThread().getName() + x + "__",
        Throwable::printStackTrace,
        () -> result += "__Completed"
      );
    Thread.sleep(2000);
    Assert.assertTrue(result.equals(
      "Sched-A-0Alfa__Sched-A-0Beta____Completed"));
}


SchedulerB

は短期間使用されますが、

schedulerA

に対する新しいアクションをほとんどスケジュールしないため、すべての作業が実行されます。したがって、複数の

subscribeOnメソッド

が無視されるだけではなく、わずかなオーバーヘッドも発生します。


7.

Schedulers.io


この

Scheduler



newThread

と似ていますが、既に開始されているスレッドはリサイクルされており、将来の要求を処理できる可能性があります。

この実装は、無制限のスレッドプールを持つ

java.util.concurrent



ThreadPoolExecutor

と同様に機能します。新しい

worker

が要求されるたびに、新しいスレッドが開始され(その後しばらくの間アイドル状態に保たれる)、またはアイドル状態のスレッドが再利用されます。

Observable.just("io")
  .subscribeOn(Schedulers.io())
  .subscribe(i -> result += Thread.currentThread().getName());

Assert.assertTrue(result.equals("RxIoScheduler-2"));

私たちはあらゆる種類の無限のリソースに注意する必要があります – Webサービスのような遅いまたは無応答な外部依存関係の場合、

io


scheduler

は膨大な数のスレッドを開始し、私たち自身のアプリケーションが応答しなくなります。

実際には、

Schedulers.io

に従うことがほとんどの場合より良い選択です。


8

Schedulers.computation



Runtime.getRuntime()

ユーティリティクラスにあるように、

Computation

S

__cheduler


はデフォルトで

availableProcessors()__の値に並行して実行されるスレッドの数を制限します。

そのため、タスクが完全にCPUバウンドの場合は、計算スケジューラを使用してください。つまり、それらは計算能力を必要とし、ブロックコードを持ちません。

すべてのスレッドの前に無制限のキューを使用するので、タスクがスケジュールされていてもすべてのコアが占有されている場合は、キューに入れられます。ただし、各スレッドの直前のキューは増え続けます。

Observable.just("computation")
  .subscribeOn(Schedulers.computation())
  .subscribe(i -> result += Thread.currentThread().getName());
Assert.assertTrue(result.equals("RxComputationScheduler-1"));

何らかの理由でデフォルトとは異なる数のスレッドが必要な場合は、常に

rx.scheduler.max-computing-threads

システムプロパティを使用できます。

スレッド数を減らすことで、常にアイドル状態のCPUコアが1つ以上あることを確認できます。負荷が高い場合でも、スレッドプールがサーバーを飽和させることはありません。コアよりも多くのスレッドを計算することは不可能です。


9

Schedulers.test


この

Scheduler

はテスト目的でのみ使用され、本番コードでは決して見られません。その主な利点は、時間の経過を任意にシミュレートして時計を進めることができることです。

List<String> letters = Arrays.asList("A", "B", "C");
TestScheduler scheduler = Schedulers.test();
TestSubscriber<String> subscriber = new TestSubscriber<>();

Observable<Long> tick = Observable
  .interval(1, TimeUnit.SECONDS, scheduler);

Observable.from(letters)
  .zipWith(tick, (string, index) -> index + "-" + string)
  .subscribeOn(scheduler)
  .subscribe(subscriber);

subscriber.assertNoValues();
subscriber.assertNotCompleted();

scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
subscriber.assertNoErrors();
subscriber.assertValueCount(1);
subscriber.assertValues("0-A");

scheduler.advanceTimeTo(3, TimeUnit.SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(3);
assertThat(
  subscriber.getOnNextEvents(),
  hasItems("0-A", "1-B", "2-C"));


10デフォルトスケジューラ

RxJavaの

Observable

演算子の中には、演算子がその演算に使用する

Scheduler

を設定することを可能にする代替形式を持つものがあります。他の人は特定の

Scheduler

を操作したり、特定のデフォルト

Scheduler

を操作したりしません。

たとえば、

delay

演算子はアップストリームイベントを受け取り、一定時間後にそれらをダウンストリームにプッシュします。明らかに、その期間中は元のスレッドを保持することはできません。したがって、別の

Scheduler

を使用する必要があります。

ExecutorService poolA = newFixedThreadPool(
  10, threadFactory("Sched1-"));
Scheduler schedulerA = Schedulers.from(poolA);
Observable.just('A', 'B')
  .delay(1, TimeUnit.SECONDS, schedulerA)
  .subscribe(i -> result+= Thread.currentThread().getName() + i + " ");

Thread.sleep(2000);
Assert.assertTrue(result.equals("Sched1-A Sched1-B "));

カスタム

schedulerA

を指定しないと、

delay

より下のすべての演算子で

computation Scheduler

が使用されます。

カスタム

Schedulers

をサポートする他の重要な演算子は、

buffer、


interval



range



timer



skip



take



timeout

、およびその他のいくつかです。そのようなオペレータに

Scheduler

を提供しない場合、

computation

schedulerが利用されます。これはほとんどの場合安全なデフォルトです。


11結論

長時間にわたって実行されるすべての操作が非同期である真に反応的なアプリケーションでは、非常に少数のスレッド、したがって

スケジューラー

が必要です。

マスタリングスケジューラは、RxJavaを使用してスケーラブルで安全なコードを書くために不可欠です。

subscribeOn



observeOn

の違いは、予想されるときにすべてのタスクを正確に実行する必要がある高負荷下で特に重要です。

大事なことを言い忘れましたが、ダウンストリームで使用される

Schedulers

がlomに追いつくことができることを確認する必要があります。詳細については、

backpressure

に関する記事があります。

これらすべての例とコードスニペットの実装はhttps://github.com/eugenp/tutorials/tree/master/rxjava[GitHubプロジェクト]で見つけることができます – これはMavenプロジェクトです。そのまま実行します。