1. 概要

この記事では、RxJavaObservableのsubscribeOnobserveOnに基づくマルチスレッドプログラムの作成に使用するさまざまなタイプのスケジューラーに焦点を当てます。 ]メソッド。

スケジューラーは、Observableチェーンの操作に関連するタスクを実行する場所と可能性を指定する機会を提供します。

Scheduler は、クラスSchedulers。に記述されているファクトリメソッドから取得できます。

2. デフォルトのスレッド動作

デフォルトでは、 Rxはシングルスレッドです。これは、Observableとそれに適用できる演算子のチェーンが同じスレッドのオブザーバーに通知することを意味しますそのsubscribe()メソッドが呼び出されます。

observeOnおよびsubscribeOnメソッドは、引数として Scheduler、を取ります。これは、名前が示すように、個々のアクションをスケジュールするために使用できるツールです。

create Worker メソッドを使用して、 Scheduler の実装を作成します。このメソッドは、Scheduler.Worker。Aを返します。 ] worker はアクションを受け入れ、それらを単一のスレッドで順番に実行します。

ある意味で、ワーカー S スケジューラーそのものですが、混乱を避けるためにスケジューラーとは呼びません。

2.1. アクションのスケジュール

新しいworkerを作成し、いくつかのアクションをスケジュールすることで、任意のSchedulerでジョブをスケジュールできます。

Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> result += "action");
 
Assert.assertTrue(result.equals("action"));

次に、アクションは、ワーカーが割り当てられているスレッドでキューに入れられます。

2.2. アクションのキャンセル

Scheduler.Workerサブスクリプションを拡張します。 workerunsubscribeメソッドを呼び出すと、キューが空になり、保留中のすべてのタスクがキャンセルされます。 例でそれを見ることができます:

Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += "First_Action";
    worker.unsubscribe();
});
worker.schedule(() -> result += "Second_Action");
 
Assert.assertTrue(result.equals("First_Action"));

2番目のタスクは、前のタスクが操作全体をキャンセルしたため、実行されません。 実行中のアクションは中断されます。

3. Schedulers.newThread

このスケジューラーは、 subscribeOn()またはobserveOn()を介して要求されるたびに、新しいスレッドを開始するだけです。

スレッドの開始時に待ち時間が発生するだけでなく、このスレッドが再利用されないため、これは決して良い選択ではありません。

Observable.just("Hello")
  .observeOn(Schedulers.newThread())
  .doOnNext(s ->
    result2 += Thread.currentThread().getName()
  )
  .observeOn(Schedulers.newThread())
  .subscribe(s ->
    result1 += Thread.currentThread().getName()
  );
Thread.sleep(500);
Assert.assertTrue(result1.equals("RxNewThreadScheduler-1"));
Assert.assertTrue(result2.equals("RxNewThreadScheduler-2"));

Worker が完了すると、スレッドは単に終了します。 このSchedulerは、タスクが粗い場合にのみ使用できます。完了するまでに長い時間がかかりますが、タスクが非常に少ないため、スレッドがまったく再利用される可能性はほとんどありません。

Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += Thread.currentThread().getName() + "_Start";
    worker.schedule(() -> result += "_worker_");
    result += "_End";
});
Thread.sleep(3000);
Assert.assertTrue(result.equals(
  "RxNewThreadScheduler-1_Start_End_worker_"));

NewThreadSchedulerでworkerをスケジュールしたとき、ワーカーが特定のスレッドにバインドされていることがわかりました。

4. Schedulers.immediate

Schedulers.immediate は、非同期ではなく、クライアントスレッド内のタスクをブロックする方法で呼び出し、アクションが完了すると戻る特別なスケジューラーです。

Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += Thread.currentThread().getName() + "_Start";
    worker.schedule(() -> result += "_worker_");
    result += "_End";
});
Thread.sleep(500);
Assert.assertTrue(result.equals(
  "main_Start_worker__End"));

実際、即時スケジューラを介して Observable にサブスクライブすると、通常、特定のSスケジューラにサブスクライブしないのと同じ効果があります。

Observable.just("Hello")
  .subscribeOn(Schedulers.immediate())
  .subscribe(s ->
    result += Thread.currentThread().getName()
  );
Thread.sleep(500);
Assert.assertTrue(result.equals("main"));

5. Schedulers.trampoline

トランポリンスケジューラーは、即時と非常によく似ています。これは、同じスレッドでタスクをスケジュールし、効果的にブロックするためです。

ただし、以前にスケジュールされたすべてのタスクが完了すると、次のタスクは実行されますu

Observable.just(2, 4, 6, 8)
  .subscribeOn(Schedulers.trampoline())
  .subscribe(i -> result += "" + i);
Observable.just(1, 3, 5, 7, 9)
  .subscribeOn(Schedulers.trampoline())
  .subscribe(i -> result += "" + i);
Thread.sleep(500);
Assert.assertTrue(result.equals("246813579"));

Immediate は特定のタスクをすぐに呼び出しますが、トランポリンは現在のタスクが終了するのを待ちます。

トランポリンworkerは、最初のタスクをスケジュールしたスレッド上のすべてのタスクを実行します。 schedule への最初の呼び出しは、キューが空になるまでブロックされます。

Scheduler scheduler = Schedulers.trampoline();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += Thread.currentThread().getName() + "Start";
    worker.schedule(() -> {
        result += "_middleStart";
        worker.schedule(() ->
            result += "_worker_"
        );
        result += "_middleEnd";
    });
    result += "_mainEnd";
});
Thread.sleep(500);
Assert.assertTrue(result
  .equals("mainStart_mainEnd_middleStart_middleEnd_worker_"));

6. Schedulers.from

スケジューラーは、java.util.concurrentExecutorsよりも内部的に複雑であるため、別の抽象化が必要でした。

ただし、概念的には非常に似ているため、当然のことながら、fromファクトリメソッドを使用してExecutorSchedulerに変換できるラッパーがあります。

private ThreadFactory threadFactory(String pattern) {
    return new ThreadFactoryBuilder()
      .setNameFormat(pattern)
      .build();
}

@Test
public void givenExecutors_whenSchedulerFrom_thenReturnElements() 
 throws InterruptedException {
 
    ExecutorService poolA = newFixedThreadPool(
      10, threadFactory("Sched-A-%d"));
    Scheduler schedulerA = Schedulers.from(poolA);
    ExecutorService poolB = newFixedThreadPool(
      10, threadFactory("Sched-B-%d"));
    Scheduler schedulerB = Schedulers.from(poolB);

    Observable<String> observable = Observable.create(subscriber -> {
      subscriber.onNext("Alfa");
      subscriber.onNext("Beta");
      subscriber.onCompleted();
    });;

    observable
      .subscribeOn(schedulerA)
      .subscribeOn(schedulerB)
      .subscribe(
        x -> result += Thread.currentThread().getName() + x + "_",
        Throwable::printStackTrace,
        () -> result += "_Completed"
      );
    Thread.sleep(2000);
    Assert.assertTrue(result.equals(
      "Sched-A-0Alfa_Sched-A-0Beta__Completed"));
}

SchedulerB は短時間使用されますが、すべての作業を実行するscheduleerAでの新しいアクションのスケジュールはほとんどありません。 したがって、複数の subscribeOnメソッドは無視されるだけでなく、わずかなオーバーヘッドが発生します。

7. Schedulers.io

このSchedulerは、 newThread と似ていますが、すでに開始されているスレッドがリサイクルされ、将来のリクエストを処理できる可能性があるという点が異なります。

この実装は、java.util.concurrentThreadPoolExecutorと同様に、無制限のスレッドプールで機能します。 新しいworkerが要求されるたびに、新しいスレッドが開始されるか(後でしばらくアイドル状態が維持されます)、アイドル状態のスレッドが再利用されます。

Observable.just("io")
  .subscribeOn(Schedulers.io())
  .subscribe(i -> result += Thread.currentThread().getName());
 
Assert.assertTrue(result.equals("RxIoScheduler-2"));

あらゆる種類の無制限のリソースに注意する必要があります。Webサービスのような低速または応答のない外部依存関係の場合、 io scheduleer は膨大な数のスレッドを開始し、非常に多くのスレッドにつながる可能性があります。自分のアプリケーションが応答しなくなります。

実際には、Schedulers.ioに従うのがほとんどの場合より良い選択です。

8. Schedulers.computation

Computing S cheduler は、デフォルトで、 Runtime.getRuntime( )ユーティリティクラス。

したがって、タスクが完全にCPUにバインドされている場合は、計算スケジューラを使用する必要があります。 つまり、計算能力が必要であり、ブロッキングコードはありません。

すべてのスレッドの前で無制限のキューを使用するため、タスクがスケジュールされているが、すべてのコアが占有されている場合は、キューに入れられます。 ただし、各スレッドの直前のキューは増え続けます。

Observable.just("computation")
  .subscribeOn(Schedulers.computation())
  .subscribe(i -> result += Thread.currentThread().getName());
Assert.assertTrue(result.equals("RxComputationScheduler-1"));

何らかの理由でデフォルトとは異なるスレッド数が必要な場合は、いつでもrx.scheduler.max-computation-threadsシステムプロパティを使用できます。

使用するスレッドを少なくすることで、常に1つ以上のCPUコアがアイドル状態になり、負荷が高い場合でも、Computingスレッドプールがサーバーを飽和させないようにすることができます。 コアよりも多くの計算スレッドを持つことは不可能です。

9. Schedulers.test

このSchedulerはテスト目的でのみ使用され、本番コードでは表示されません。 その主な利点は、任意に通過する時間をシミュレートして、時計を進めることができることです。

List<String> letters = Arrays.asList("A", "B", "C");
TestScheduler scheduler = Schedulers.test();
TestSubscriber<String> subscriber = new TestSubscriber<>();

Observable<Long> tick = Observable
  .interval(1, TimeUnit.SECONDS, scheduler);

Observable.from(letters)
  .zipWith(tick, (string, index) -> index + "-" + string)
  .subscribeOn(scheduler)
  .subscribe(subscriber);

subscriber.assertNoValues();
subscriber.assertNotCompleted();

scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
subscriber.assertNoErrors();
subscriber.assertValueCount(1);
subscriber.assertValues("0-A");

scheduler.advanceTimeTo(3, TimeUnit.SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(3);
assertThat(
  subscriber.getOnNextEvents(), 
  hasItems("0-A", "1-B", "2-C"));

10. デフォルトのスケジューラ

RxJavaの一部のObservableオペレーターには、オペレーターが操作に使用するSchedulerを設定できる代替形式があります。 その他は、特定のスケジューラーで動作しないか、特定のデフォルトのスケジューラーで動作します。

たとえば、 delay オペレーターは、アップストリームイベントを取得し、指定された時間の後にそれらをダウンストリームにプッシュします。 明らかに、その期間中は元のスレッドを保持できないため、別のスケジューラーを使用する必要があります。

ExecutorService poolA = newFixedThreadPool(
  10, threadFactory("Sched1-"));
Scheduler schedulerA = Schedulers.from(poolA);
Observable.just('A', 'B')
  .delay(1, TimeUnit.SECONDS, schedulerA)
  .subscribe(i -> result+= Thread.currentThread().getName() + i + " ");

Thread.sleep(2000);
Assert.assertTrue(result.equals("Sched1-A Sched1-B "));

カスタムschedulerAを提供しない場合、delayより下のすべてのオペレーターはcomputationSchedulerを使用します。

カスタムスケジューラーをサポートするその他の重要な演算子は、バッファー、 間隔範囲タイマースキップです。 take timeout 、その他いくつか。 そのようなオペレーターにSchedulerを提供しない場合、計算スケジューラーが使用されます。これはほとんどの場合安全なデフォルトです。

11. 結論

すべての長時間実行操作が非同期である真にリアクティブなアプリケーションでは、スレッドが非常に少ないため、スケジューラーが必要です。

RxJavaを使用してスケーラブルで安全なコードを作成するには、スケジューラーの習得が不可欠です。 subscribeOnobserveOnの違いは、すべてのタスクを期待どおりに正確に実行する必要がある高負荷の下で特に重要です。

最後になりましたが、ダウンストリームで使用されるスケジューラーが、スケジューラー upstreamによって生成されるloadに対応できることを確認する必要があります。 詳細については、背圧に関するこの記事があります。

これらすべての例とコードスニペットの実装は、 GitHubプロジェクトにあります。これはMavenプロジェクトであるため、そのままインポートして実行するのは簡単です。