1. 概要

JavaのExecutorFramework は、タスクの送信をタスクの実行から切り離そうとする試みです。 このアプローチでは、タスク実行の詳細が非常にうまく抽象化されますが、場合によっては、さらに最適な実行のために構成する必要があります。

このチュートリアルでは、スレッドプールがこれ以上タスクを受け入れられなくなったときに何が起こるかを見ていきます。 次に、飽和ポリシーを適切に適用して、このコーナーケースを制御する方法を学習します。

2. スレッドプールの再検討

次の図は、エグゼキュータサービスが内部でどのように機能するかを示しています。

新しいタスクをエグゼキュータに送信すると、は次のようになります。

  1. スレッドの1つが使用可能な場合、それはタスクを処理します。
  2. それ以外の場合、エグゼキュータは新しいタスクをキューに追加します。
  3. スレッドが現在のタスクを終了すると、キューから別のタスクを取得します。

2.1. ThreadPoolExecutor

ほとんどのエグゼキュータ実装は、よく知られているThreadPoolExecutorを基本実装として使用します。 したがって、タスクキューイングがどのように機能するかをよりよく理解するには、そのコンストラクターを詳しく調べる必要があります。

public ThreadPoolExecutor(
  int corePoolSize,
  int maximumPoolSize,
  long keepAliveTime,
  TimeUnit unit,
  BlockingQueue<Runnable> workQueue,
  RejectedExecutionHandler handler
)

2.2. コアプールサイズ

corePoolSize パラメーターは、スレッドプールの初期サイズを決定します。 通常、エグゼキュータはスレッドプールに少なくともcorePoolSize数のスレッドが含まれていることを確認します。

ただし、 allowCoreThreadTimeOut パラメーターを有効にすると、スレッド数を減らすことができます。

2.3. 最大プールサイズ

すべてのコアスレッドがいくつかのタスクの実行でビジーであると仮定しましょう。 その結果、エグゼキュータは、後で処理される機会が得られるまで、新しいタスクをキューに入れます。

このキューがいっぱいになると、エグゼキュータはスレッドプールにスレッドを追加できます。 maximumPoolSizeは、スレッドプールに含まれる可能性のあるスレッド数に上限を設定します。

これらのスレッドがしばらくアイドル状態のままである場合、エグゼキュータはそれらをプールから削除できます。 したがって、プールサイズはコアサイズに縮小できます。

2.4. キューイング

前に見たように、すべてのコアスレッドがビジー状態になると、エグゼキュータは新しいタスクをキューに追加します。 キューイングには3つの異なるアプローチがあります

  • 無制限のキュー:キューは無制限の数のタスクを保持できます。 このキューがいっぱいになることはないため、エグゼキュータは最大サイズを無視します。 固定サイズシングルスレッドエグゼキュータはどちらもこのアプローチを使用します。
  • バウンドキューその名前が示すように、キューは限られた数のタスクしか保持できません。 その結果、制限されたキューがいっぱいになると、スレッドプールが大きくなります。
  • 同期ハンドオフ:非常に驚くべきことに、このキューはタスクを保持できません。 このアプローチでは、反対側で同じタスクを同時に選択する別のスレッドがある場合にのみ、タスクをキューに入れることができますキャッシュスレッドプールエグゼキューターは、このアプローチを内部的に使用します。

制限付きキューイングまたは同期ハンドオフのいずれかを使用している場合、次のシナリオを想定します。

  • すべてのコアスレッドがビジーです
  • 内部キューがいっぱいになります
  • スレッドプールは可能な最大サイズに拡大し、それらのスレッドもすべてビジー状態になります

新しいタスクが入ってくるとどうなりますか?

3. 飽和ポリシー

すべてのスレッドがビジーで、内部キューがいっぱいになると、エグゼキュータは飽和状態になります。

エグゼキュータは、飽和状態に達すると、事前定義されたアクションを実行できます。 これらのアクションは、飽和ポリシーと呼ばれます。 RejectedExecutionHandlerのインスタンスをコンストラクターに渡すことで、エグゼキューターの飽和ポリシーを変更できます。

幸い、Javaにはこのクラスの組み込み実装がいくつか用意されており、それぞれが特定のユースケースをカバーしています。 次のセクションでは、これらのポリシーを詳細に評価します。

3.1. ポリシーを中止する

デフォルトのポリシーはabortポリシーです。 アボートポリシーにより、エグゼキュータは RejectedExecutionException をスローします:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, 
  new SynchronousQueue<>(), 
  new ThreadPoolExecutor.AbortPolicy());

executor.execute(() -> waitFor(250));

assertThatThrownBy(() -> executor.execute(() -> System.out.println("Will be rejected")))
  .isInstanceOf(RejectedExecutionException.class);

最初のタスクの実行には長い時間がかかるため、エグゼキュータは2番目のタスクを拒否します。

3.2. 発信者-ポリシーを実行します

このポリシーは、別のスレッドでタスクを非同期で実行する代わりに、呼び出し元のスレッドにタスクを実行させます。

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, 
  new SynchronousQueue<>(), 
  new ThreadPoolExecutor.CallerRunsPolicy());

executor.execute(() -> waitFor(250));

long startTime = System.currentTimeMillis();
executor.execute(() -> waitFor(500));
long blockedDuration = System.currentTimeMillis() - startTime;

assertThat(blockedDuration).isGreaterThanOrEqualTo(500);

最初のタスクを送信した後、エグゼキュータはそれ以上新しいタスクを受け入れることができなくなります。 したがって、呼び出し元のスレッドは、2番目のタスクが戻るまでブロックします。

呼び出し元実行ポリシーを使用すると、単純な形式のスロットルを簡単に実装できます。 つまり、遅いコンシューマーは速いプロデューサーを遅くしてタスク送信フローを制御できます。

3.3. ポリシーを破棄する

破棄ポリシー送信に失敗した場合、新しいタスクをサイレントに破棄します

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
  new SynchronousQueue<>(), 
  new ThreadPoolExecutor.DiscardPolicy());

executor.execute(() -> waitFor(100));

BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("Discarded Result"));

assertThat(queue.poll(200, MILLISECONDS)).isNull();

ここで、2番目のタスクは単純なメッセージをキューに公開します。 実行する機会がないため、しばらくの間ブロックしていても、キューは空のままです。

3.4. 破棄-最も古いポリシー

破棄-最も古いポリシーは、最初にキューの先頭からタスクを削除し、次に新しいタスクを再送信します

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, 
  new ArrayBlockingQueue<>(2), 
  new ThreadPoolExecutor.DiscardOldestPolicy());

executor.execute(() -> waitFor(100));

BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("First"));
executor.execute(() -> queue.offer("Second"));
executor.execute(() -> queue.offer("Third"));
waitFor(150);

List<String> results = new ArrayList<>();
queue.drainTo(results);

assertThat(results).containsExactlyInAnyOrder("Second", "Third");

今回は、2つのタスクのみを保持できる制限付きキューを使用しています。 これらの4つのタスクを送信すると次のようになります。

  • 最初のタスクは、単一のスレッドを100ミリ秒占有します
  • エグゼキュータは2番目と3番目のタスクを正常にキューに入れます
  • 4番目のタスクが到着すると、最も古い破棄ポリシーによって最も古いタスクが削除され、この新しいタスク用のスペースが確保されます。

破棄-最も古いポリシーと優先度キューは一緒にうまく機能しません。 優先キューの先頭が最も優先度が高いため、 最も重要なタスクを失う可能性があります

3.5. カスタムポリシー

RejectedExecutionHandler インターフェースを実装するだけで、カスタム飽和ポリシーを提供することもできます。

class GrowPolicy implements RejectedExecutionHandler {

    private final Lock lock = new ReentrantLock();

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        lock.lock();
        try {
            executor.setMaximumPoolSize(executor.getMaximumPoolSize() + 1);
        } finally {
            lock.unlock();
        }

        executor.submit(r);
    }
}

この例では、エグゼキュータが飽和状態になったら、最大プールサイズを1つ増やしてから、同じタスクを再送信します。

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, 
  new ArrayBlockingQueue<>(2), 
  new GrowPolicy());

executor.execute(() -> waitFor(100));

BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("First"));
executor.execute(() -> queue.offer("Second"));
executor.execute(() -> queue.offer("Third"));
waitFor(150);

List<String> results = new ArrayList<>();
queue.drainTo(results);

assertThat(results).contains("First", "Second", "Third");

予想どおり、4つのタスクすべてが実行されます。

3.6. シャットダウン

オーバーロードされたエグゼキュータに加えて、飽和ポリシーはシャットダウンされたすべてのエグゼキュータにも適用されます

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>());
executor.shutdownNow();

assertThatThrownBy(() -> executor.execute(() -> {}))
  .isInstanceOf(RejectedExecutionException.class);

シャットダウンの最中のすべてのエグゼキュータについても同じことが言えます。

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>());
executor.execute(() -> waitFor(100));
executor.shutdown();

assertThatThrownBy(() -> executor.execute(() -> {}))
  .isInstanceOf(RejectedExecutionException.class);

4. 結論

このチュートリアルでは、最初に、Javaのスレッドプールについてかなり簡単に復習しました。 次に、飽和したエグゼキュータを導入した後、さまざまな飽和ポリシーをいつどのように適用するかを学びました。

いつものように、サンプルコードはGitHubから入手できます。