1. 概要

java.util.concurrent パッケージは、並行アプリケーションを作成するためのツールを提供します。

この記事では、パッケージ全体の概要を説明します。

2. メインコンポーネント

java.util.concurrent には、1回の記事で説明するには多すぎる機能が含まれています。 この記事では、主に次のようなこのパッケージの最も便利なユーティリティのいくつかに焦点を当てます。

  • エグゼキュータ
  • ExecutorService
  • ScheduledExecutorService
  • 未来
  • CountDownLatch
  • CyclicBarrier
  • セマフォ
  • ThreadFactory
  • BlockingQueue
  • DelayQueue
  • ロック
  • Phaser

また、ここで個々のクラスに捧げられた多くの記事を見つけることができます。

2.1. エグゼキュータ

Executor は、提供されたタスクを実行するオブジェクトを表すインターフェースです。

タスクを新しいスレッドまたは現在のスレッドで実行する必要があるかどうかは、特定の実装(呼び出しが開始される場所)によって異なります。 したがって、このインターフェイスを使用して、タスク実行フローを実際のタスク実行メカニズムから切り離すことができます。

ここで注意すべき点の1つは、 Executor では、タスクの実行が厳密に非同期である必要はないということです。 最も単純なケースでは、エグゼキュータは、呼び出しスレッドで送信されたタスクを即座に呼び出すことができます。

エグゼキュータインスタンスを作成するには、インボーカを作成する必要があります。

public class Invoker implements Executor {
    @Override
    public void execute(Runnable r) {
        r.run();
    }
}

これで、この呼び出し元を使用してタスクを実行できます。

public void execute() {
    Executor executor = new Invoker();
    executor.execute( () -> {
        // task to be performed
    });
}

ここで注意すべき点は、エグゼキュータがタスクの実行を受け入れることができない場合、RejectedExecutionExceptionをスローすることです。

2.2. ExecutorService

ExecutorService は、非同期処理のための完全なソリューションです。 インメモリキューを管理し、スレッドの可用性に基づいて送信されたタスクをスケジュールします。

ExecutorServiceを使用するには、1つのRunnableクラスを作成する必要があります。

public class Task implements Runnable {
    @Override
    public void run() {
        // task details
    }
}

これで、 ExecutorService インスタンスを作成し、このタスクを割り当てることができます。 作成時に、スレッドプールのサイズを指定する必要があります。

ExecutorService executor = Executors.newFixedThreadPool(10);

シングルスレッドのExecutorServiceインスタンスを作成する場合は、 newSingleThreadExecutor(ThreadFactory threadFactory)を使用してインスタンスを作成できます。

エグゼキュータが作成されると、それを使用してタスクを送信できます。

public void execute() { 
    executor.submit(new Task()); 
}

タスクの送信中にRunnableインスタンスを作成することもできます。

executor.submit(() -> {
    new Task();
});

また、すぐに使用できる2つの実行終了メソッドが付属しています。 1つ目はshutdown()です。 送信されたすべてのタスクの実行が完了するまで待機します。 The other method is shutdownNow() which attempts to terminate all actively executing tasks and halts the processing of waiting tasks.

別のメソッドawaitTermination(long timeout、TimeUnit unit)もあります。このメソッドは、シャットダウンイベントがトリガーされた後、実行タイムアウトが発生した後、または実行スレッド自体が中断された後、すべてのタスクの実行が完了するまで強制的にブロックします。

try {
    executor.awaitTermination( 20l, TimeUnit.NANOSECONDS );
} catch (InterruptedException e) {
    e.printStackTrace();
}

2.3. ScheduledExecutorService

ScheduledExecutorService は、 ExecutorService、と同様のインターフェースですが、タスクを定期的に実行できます。

ExecutorおよびExecutorServiceのメソッドは、人為的な遅延を発生させることなくその場でスケジュールされます。ゼロまたは負の値は、リクエストを即座に実行する必要があることを示します。

RunnableCallableの両方のインターフェースを使用してタスクを定義できます。

public void execute() {
    ScheduledExecutorService executorService
      = Executors.newSingleThreadScheduledExecutor();

    Future<String> future = executorService.schedule(() -> {
        // ...
        return "Hello world";
    }, 1, TimeUnit.SECONDS);

    ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {
        // ...
    }, 1, TimeUnit.SECONDS);

    executorService.shutdown();
}

ScheduledExecutorService は、特定の固定遅延の後にタスクをスケジュールすることもできます。

executorService.scheduleAtFixedRate(() -> {
    // ...
}, 1, 10, TimeUnit.SECONDS);

executorService.scheduleWithFixedDelay(() -> {
    // ...
}, 1, 10, TimeUnit.SECONDS);

ここで、 scheduleAtFixedRate(Runnable command、long initialDelay、long period、TimeUnit unit)メソッドは、最初に指定された初期遅延の後に呼び出され、その後サービスインスタンスまで指定された期間で呼び出される定期的なアクションを作成して実行しますシャットダウンします。

scheduleWithFixedDelay(Runnable command、long initialDelay、long delay、TimeUnit unit)メソッドは、指定された初期遅延の後に最初に呼び出され、実行の終了の間に指定された遅延で繰り返し呼び出される定期的なアクションを作成して実行します1つと次の呼び出し。

2.4. 将来

Futureは、非同期操作の結果を表すために使用されます。 非同期操作が完了したかどうかを確認したり、計算結果を取得したりするためのメソッドが付属しています。

さらに、 cancel(boolean mayInterruptIfRunning) APIは操作をキャンセルし、実行中のスレッドを解放します。 mayInterruptIfRunning の値がtrueの場合、タスクを実行しているスレッドは即座に終了します。

それ以外の場合、進行中のタスクは完了できます。

以下のコードスニペットを使用して、将来のインスタンスを作成できます。

public void invoke() {
    ExecutorService executorService = Executors.newFixedThreadPool(10);

    Future<String> future = executorService.submit(() -> {
        // ...
        Thread.sleep(10000l);
        return "Hello world";
    });
}

次のコードスニペットを使用して、将来の結果の準備ができているかどうかを確認し、計算が完了したらデータをフェッチできます。

if (future.isDone() && !future.isCancelled()) {
    try {
        str = future.get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

特定の操作のタイムアウトを指定することもできます。 タスクにこの時間がかかる場合は、TimeoutExceptionがスローされます。

try {
    future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
    e.printStackTrace();
}

2.5. CountDownLatch

CountDownLatch JDK 5 で導入)は、いくつかの操作が完了するまでスレッドのセットをブロックするユーティリティクラスです。

CountDownLatch は、 counter(Integer type);で初期化されます。 このカウンターは、依存スレッドが実行を完了すると減少します。 ただし、カウンターがゼロに達すると、他のスレッドが解放されます。

CountDownLatchの詳細についてはこちらをご覧ください。

2.6. CyclicBarrier

CyclicBarrier は、再利用できることを除いて、CountDownLatchとほぼ同じように機能します。 CountDownLatch とは異なり、 await()メソッド(バリア条件と呼ばれる)を使用して複数のスレッドが互いに待機してから、最終タスクを呼び出すことができます。

バリア条件を開始するには、Runnableタスクインスタンスを作成する必要があります。

public class Task implements Runnable {

    private CyclicBarrier barrier;

    public Task(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public void run() {
        try {
            LOG.info(Thread.currentThread().getName() + 
              " is waiting");
            barrier.await();
            LOG.info(Thread.currentThread().getName() + 
              " is released");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

}

これで、いくつかのスレッドを呼び出して、バリア条件を競うことができます。

public void start() {

    CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
        // ...
        LOG.info("All previous tasks are completed");
    });

    Thread t1 = new Thread(new Task(cyclicBarrier), "T1"); 
    Thread t2 = new Thread(new Task(cyclicBarrier), "T2"); 
    Thread t3 = new Thread(new Task(cyclicBarrier), "T3"); 

    if (!cyclicBarrier.isBroken()) { 
        t1.start(); 
        t2.start(); 
        t3.start(); 
    }
}

ここで、 isBroken()メソッドは、実行時にスレッドのいずれかが中断されたかどうかを確認します。 実際のプロセスを実行する前に、常にこのチェックを実行する必要があります。

2.7. セマフォ

セマフォは、物理リソースまたは論理リソースの一部へのスレッドレベルのアクセスをブロックするために使用されます。 セマフォには一連の許可が含まれています。 スレッドがクリティカルセクションに入ろうとするときはいつでも、許可が利用可能かどうかセマフォをチェックする必要があります。

許可が利用できない場合( tryAcquire()を介して)、スレッドはクリティカルセクションにジャンプできません。 ただし、許可証が利用可能な場合はアクセスが許可され、許可証カウンターが減少します。

実行中のスレッドがクリティカルセクションを解放すると、許可カウンターが再び増加します( release()メソッドによって実行されます)。

tryAcquire(long timeout、TimeUnit unit)メソッドを使用して、アクセスを取得するためのタイムアウトを指定できます。

また、使用可能な許可の数や、セマフォの取得を待機しているスレッドの数を確認することもできます。

次のコードスニペットを使用して、セマフォを実装できます。

static Semaphore semaphore = new Semaphore(10);

public void execute() throws InterruptedException {

    LOG.info("Available permit : " + semaphore.availablePermits());
    LOG.info("Number of threads waiting to acquire: " + 
      semaphore.getQueueLength());

    if (semaphore.tryAcquire()) {
        try {
            // ...
        }
        finally {
            semaphore.release();
        }
    }

}

セマフォを使用して、Mutexのようなデータ構造を実装できます。 this の詳細については、こちらをご覧ください。

2.8. ThreadFactory

名前が示すように、 ThreadFactory は、オンデマンドで新しいスレッドを作成するスレッド(存在しない)プールとして機能します。 これにより、効率的なスレッド作成メカニズムを実装するための多くの定型コーディングが不要になります。

ThreadFactoryを定義できます。

public class BaeldungThreadFactory implements ThreadFactory {
    private int threadId;
    private String name;

    public BaeldungThreadFactory(String name) {
        threadId = 1;
        this.name = name;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, name + "-Thread_" + threadId);
        LOG.info("created new thread with id : " + threadId +
            " and name : " + t.getName());
        threadId++;
        return t;
    }
}

このnewThread(Runnable r)メソッドを使用して、実行時に新しいスレッドを作成できます。

BaeldungThreadFactory factory = new BaeldungThreadFactory( 
    "BaeldungThreadFactory");
for (int i = 0; i < 10; i++) { 
    Thread t = factory.newThread(new Task());
    t.start(); 
}

2.9. BlockingQueue

非同期プログラミングでは、最も一般的な統合パターンの1つは、生産者/消費者パターンです。 java.util.concurrent パッケージには、 BlockingQueue と呼ばれるデータ構造が付属しています。これは、これらの非同期シナリオで非常に役立ちます。

詳細とこれに関する実用的な例は、こちらで入手できます。

2.10. DelayQueue

DelayQueue は、要素の無限サイズのブロッキングキューであり、有効期限(ユーザー定義の遅延と呼ばれる)が完了した場合にのみ要素をプルできます。 したがって、最上位の要素( head )の遅延が最も大きくなり、最後にポーリングされます。

詳細とこれに関する実用的な例は、こちらで入手できます。

2.11. ロック

当然のことながら、 Lock は、現在実行しているスレッドとは別に、他のスレッドがコードの特定のセグメントにアクセスするのをブロックするためのユーティリティです。

LockとSynchronizedブロックの主な違いは、同期ブロックがメソッドに完全に含まれていることです。 ただし、Lock APIのlock()操作とunlock()操作を別々のメソッドで実行できます。

詳細とこれに関する実用的な例は、こちらで入手できます。

2.12. フェイザー

Phaser は、CyclicBarrierおよびCountDownLatchよりも柔軟なソリューションです。これは、実行を続行する前に動的な数のスレッドが待機する必要がある再利用可能なバリアとして機能するために使用されます。 プログラムフェーズごとにPhaserインスタンスを再利用して、実行の複数のフェーズを調整できます。

詳細とこれに関する実用的な例は、こちらで入手できます。

3. 結論

この高レベルの概要記事では、 java.util.concurrentパッケージで利用可能なさまざまなユーティリティに焦点を当てました。

いつものように、完全なソースコードはGitHubから入手できます。