1概要



ExecutorService


フレームワークを使用すると、タスクを複数のスレッドで簡単に処理できます。スレッドが実行を終了するのを待ついくつかのシナリオを例示します。

また、

ExecutorService

を適切にシャットダウンし、既に実行中のスレッドが実行を終了するのを待つ方法も示します。


2 __Executorのシャットダウン後


Executorを使用する場合は、

shutdown()

メソッドまたは

shutdownNow()

メソッドを呼び出すことによって

Executorをシャットダウンできます。ただし、すべてのスレッドが実行を停止するまで待つことはありません。

既存のスレッドが実行を完了するのを待つには、

awaitTermination()

メソッドを使用します。

これは、すべてのタスクが実行を完了するか、または指定されたタイムアウトに達するまでスレッドをブロックします。

public void awaitTerminationAfterShutdown(ExecutorService threadPool) {
    threadPool.shutdown();
    try {
        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
            threadPool.shutdownNow();
        }
    } catch (InterruptedException ex) {
        threadPool.shutdownNow();
        Thread.currentThread().interrupt();
    }
}


3

CountDownLatch


を使用する

次に、この問題を解決するための別のアプローチを見てみましょう – タスクの完了を知らせるために

CountDownLatch

を使うことです。


await()

メソッドを呼び出したすべてのスレッドに通知されるまでにデクリメントできる回数を表す値で初期化できます。

たとえば、現在のスレッドに別の

N

個のスレッドが実行を終了するのを待つ必要がある場合、

N

を使用してラッチを初期化できます。

ExecutorService WORKER__THREAD__POOL
  = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
    WORKER__THREAD__POOL.submit(() -> {
        try {
           //...
            latch.countDown();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
}
//wait for the latch to be decremented by the two remaining threads
latch.await();


4

invokeAll()


を使う

スレッドを実行するために使用できる最初のアプローチは

invokeAll()

メソッドです。

このメソッドは、すべてのタスクが終了するかタイムアウトが期限切れになると、

Future

オブジェクトのリストを返します

また、返される

Future

オブジェクトの順序は、提供された

Callable

オブジェクトのリストと同じであることに注意してください。

ExecutorService WORKER__THREAD__POOL = Executors.newFixedThreadPool(10);

List<Callable<String>> callables = Arrays.asList(
  new DelayedCallable("fast thread", 100),
  new DelayedCallable("slow thread", 3000));

long startProcessingTime = System.currentTimeMillis();
List<Future<String>> futures = WORKER__THREAD__POOL.invokeAll(callables);

awaitTerminationAfterShutdown(WORKER__THREAD__POOL);

long totalProcessingTime = System.currentTimeMillis() - startProcessingTime;

assertTrue(totalProcessingTime >= 3000);

String firstThreadResponse = futures.get(0).get();

assertTrue("fast thread".equals(firstThreadResponse));

String secondThreadResponse = futures.get(1).get();
assertTrue("slow thread".equals(secondThreadResponse));


5

ExecutorCompletionService


を使用する

マルチスレッドを実行するもう1つの方法は、

ExecutorCompletionServiceを使用することです。これは、提供された

ExecutorServiceを使用してタスクを実行します。


invokeAll()

との違いの1つは、実行されたタスクを表す

Futures

が返される順序です。


  • ExecutorCompletionService

    は、キューを使用して、結果を終了した順序で格納します** 。一方、

    invokeAll()

    は、指定されたタスクリストの反復子によって生成されたものと同じ順序を持つリストを返します。

CompletionService<String> service
  = new ExecutorCompletionService<>(WORKER__THREAD__POOL);

List<Callable<String>> callables = Arrays.asList(
  new DelayedCallable("fast thread", 100),
  new DelayedCallable("slow thread", 3000));

for (Callable<String> callable : callables) {
    service.submit(callable);
}

結果は

take()

メソッドを使ってアクセスできます。

long startProcessingTime = System.currentTimeMillis();

Future<String> future = service.take();
String firstThreadResponse = future.get();
long totalProcessingTime
  = System.currentTimeMillis() - startProcessingTime;

assertTrue("First response should be from the fast thread",
  "fast thread".equals(firstThreadResponse));
assertTrue(totalProcessingTime >= 100
  && totalProcessingTime < 1000);
LOG.debug("Thread finished after: " + totalProcessingTime
  + " milliseconds");

future = service.take();
String secondThreadResponse = future.get();
totalProcessingTime
  = System.currentTimeMillis() - startProcessingTime;

assertTrue(
  "Last response should be from the slow thread",
  "slow thread".equals(secondThreadResponse));
assertTrue(
  totalProcessingTime >= 3000
  && totalProcessingTime < 4000);
LOG.debug("Thread finished after: " + totalProcessingTime
  + " milliseconds");

awaitTerminationAfterShutdown(WORKER__THREAD__POOL);


6. 結論

ユースケースに応じて、スレッドが実行を終了するのを待つためのさまざまなオプションがあります。


CountDownLatch

は、他のスレッドによって実行された一連の操作が終了したことを1つ以上のスレッドに通知するメカニズムが必要な場合に役立ちます。


  • ExecutorCompletionService

    は、タスクの結果にできるだけ早くアクセスする必要がある場合や、実行中のすべてのタスクが終了するのを待つ必要がある場合のその他のアプローチに役立ちます。

この記事のソースコードはhttps://github.com/eugenp/tutorials/tree/master/core-java-concurrency[over on GitHub]から入手できます。