1概要


java.util.concurrent

パッケージは、並行アプリケーションを作成するためのツールを提供します。

この記事では、パッケージ全体の概要を説明します。

** 2メインコンポーネント

**


java.util.concurrent

には、一度の書き込みで説明するには多すぎる機能が含まれています。この記事では、主にこのパッケージの中で最も有用なユーティリティのいくつかに焦点を当てます。


  • Executor


  • ExecutorService


  • ScheduledExecutorService


  • 未来


  • CountDownLatch


  • CyclicBarrier


  • スマホア


  • ThreadFactory


  • BlockingQueue


  • DelayQueue


  • ロック


  • フェイザー

ここには、個々のクラスに関する多数の記事があります。


2.1.

実行者




  • Executor


    は、提供されたタスクを実行するオブジェクトを表すインタフェースです。

タスクを新しいスレッドまたは現在のスレッドで実行するかどうかは、(呼び出しが開始された場所からの)特定の実装によって異なります。

したがって、このインタフェースを使用して、タスク実行フローを実際のタスク実行メカニズムから切り離すことができます。

ここで注意しなければならないのは、

Executor

が厳密にタスクの実行を非同期にする必要がないということです。最も単純な場合では、実行者は実行スレッドで即座に投入されたタスクを呼び出すことができます。

executorインスタンスを作成するために呼び出し側を作成する必要があります。

public class Invoker implements Executor {
    @Override
    public void execute(Runnable r) {
        r.run();
    }
}

これで、この呼び出し元を使用してタスクを実行できます。

public void execute() {
    Executor executor = new Invoker();
    executor.execute( () -> {
       //task to be performed
    });
}

ここで注意しなければならないのは、エグゼキュータがタスクを実行のために受け入れられない場合、


https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/RejectedExecutionException.htmlがスローされることです。

]


2.2.

ExecutorService



ExecutorService

は非同期処理のための完全なソリューションです。インメモリキューを管理し、スレッドの可用性に基づいて送信されたタスクをスケジュールします。


ExecutorServiceを使用するには、

Runnable__クラスを1つ作成する必要があります。

public class Task implements Runnable {
    @Override
    public void run() {
       //task details
    }
}

これで

ExecutorService

インスタンスを作成してこのタスクを割り当てることができます。

作成時に、スレッドプールサイズを指定する必要があります。

ExecutorService executor = Executors.newFixedThreadPool(10);

シングルスレッドの

ExecutorService

インスタンスを作成する場合は、


newSingleThreadExecutor(ThreadFactory threadFactory)


を使用してインスタンスを作成できます。

エグゼキュータが作成されたら、それを使ってタスクを送信できます。

public void execute() {
    executor.submit(new Task());
}

タスクを送信しながら

Runnable

インスタンスを作成することもできます。

executor.submit(() -> {
    new Task();
});

2つのすぐに実行可能な実行終了メソッドもあります。最初のものは

shutdown()

です。送信されたすべてのタスクが実行を終了するまで待機します。もう1つのメソッドは__shutdownNow()です。

シャットダウンイベントがトリガされた後、または実行タイムアウトが発生した後、または実行スレッド自体が中断された後、すべてのタスクの実行が完了するまで強制的にブロックする別のメソッド__awaitTermination(long timeout、TimeUnit unit)もあります。

try {
    executor.awaitTermination( 20l, TimeUnit.NANOSECONDS );
} catch (InterruptedException e) {
    e.printStackTrace();
}


2.3.

ScheduledExecutorService



ScheduledExecutorService

は__ExecutorServiceと同様のインターフェースですが、定期的にタスクを実行できます。


  • ExecutorおよびExecutorService

    のメソッドは、人為的な遅延を招くことなくその場でスケジュールされます。** ゼロまたは任意の負の値は、要求を即座に実行する必要があることを意味します。

タスクを定義するには、

Runnable



Callable

の両方のインターフェースを使用できます。

public void execute() {
    ScheduledExecutorService executorService
      = Executors.newSingleThreadScheduledExecutor();

    Future<String> future = executorService.schedule(() -> {
       //...
        return "Hello world";
    }, 1, TimeUnit.SECONDS);

    ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {
       //...
    }, 1, TimeUnit.SECONDS);

    executorService.shutdown();
}


ScheduledExecutorService

は、一定の遅延** の後にタスクをスケジュールすることもできます。

executorService.scheduleAtFixedRate(() -> {
   //...
}, 1, 10, TimeUnit.SECONDS);

executorService.scheduleWithFixedDelay(() -> {
   //...
}, 1, 10, TimeUnit.SECONDS);

ここで、


scheduleAtFixedRate(Runnableコマンド、long initialDelay、long period、TimeUnit単位)


メソッドは、最初に提供された初期遅延の後、次にサービスインスタンスがシャットダウンするまでの指定期間に呼び出される定期的なアクションを作成および実行します。



scheduleWithFixedDelay(Runnableコマンド、long initialDelay、long delay、TimeUnit単位)


メソッドは、指定された初期遅延の後に最初に呼び出され、実行中のアクションが終了するまでの間に、指定された遅延で繰り返し実行される次の呼び出し


2.4.

未来



  • Future

    は、非同期操作の結果を表すために使用されます** 非同期操作が完了したかどうかをチェックし、計算結果を取得するなどのためのメソッドが付属しています

さらに、

cancel(boolean mayInterruptIfRunning)

APIは操作をキャンセルし、実行中のスレッドを解放します。

mayInterruptIfRunning

の値がtrueの場合、タスクを実行しているスレッドは即座に終了します。

それ以外の場合は、進行中のタスクを完了することが許可されます。

以下のコードスニペットを使用して将来のインスタンスを作成できます。

public void invoke() {
    ExecutorService executorService = Executors.newFixedThreadPool(10);

    Future<String> future = executorService.submit(() -> {
       //...
        Thread.sleep(10000l);
        return "Hello world";
    });
}

次のコードスニペットを使用して、将来の結果の準備ができているかどうかを確認し、計算が完了したらデータを取得できます。

if (future.isDone() && !future.isCancelled()) {
    try {
        str = future.get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

特定の操作に対してタイムアウトを指定することもできます。タスクがこの時間以上かかると、

TimeoutException

がスローされます。

try {
    future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
    e.printStackTrace();
}


2.5.

CountDownLatch



CountDownLatch



JDK 5

で導入)は、何らかの操作が完了するまでスレッドのセットをブロックするユーティリティクラスです。


CountDownLatch



counter(Integer

type)で初期化されます。従属スレッドが実行を完了すると、このカウンタは減少します。しかしカウンターがゼロに達すると、他のスレッドは解放されます。


CountDownLatch

linkについてさらに学ぶことができます:/java-countdown-latch[ここ]。


2.6.

CyclicBarrier



CyclicBarrier

は、再利用できることを除けば、

CountDownLatch

とほぼ同じです。

CountDownLatch

とは異なり、最終タスクを呼び出す前に

await()

メソッド(バリア条件と呼ばれる)を使用して複数のスレッドが互いに待機することができます。

バリア条件を開始する

Runnable

タスクインスタンスを作成する必要があります。

public class Task implements Runnable {

    private CyclicBarrier barrier;

    public Task(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public void run() {
        try {
            LOG.info(Thread.currentThread().getName() +
              " is waiting");
            barrier.await();
            LOG.info(Thread.currentThread().getName() +
              " is released");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

}

これでいくつかのスレッドを呼び出してバリア条件を競うことができます。

public void start() {

    CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
       //...
        LOG.info("All previous tasks are completed");
    });

    Thread t1 = new Thread(new Task(cyclicBarrier), "T1");
    Thread t2 = new Thread(new Task(cyclicBarrier), "T2");
    Thread t3 = new Thread(new Task(cyclicBarrier), "T3");

    if (!cyclicBarrier.isBroken()) {
        t1.start();
        t2.start();
        t3.start();
    }
}

ここで、

isBroken()

メソッドは、実行時間中にスレッドのいずれかが中断されたかどうかを確認します。実際のプロセスを実行する前に、必ずこのチェックを実行してください。


2.7.

セマフォ



Semaphore

は、物理リソースまたは論理リソースの一部へのスレッドレベルのアクセスをブロックするために使用されます。セマフォには一連の許可が含まれています。スレッドがクリティカルセクションに入ろうとする時はいつでも、許可が利用可能かどうかセマフォをチェックする必要があります。

  • 許可が利用できない場合(

    tryAcquire()

    を介して)、スレッドはクリティカルセクションにジャンプすることを許可されません。ただし、許可が利用可能な場合はアクセスが許可され、許可カウンタは減少します。

実行スレッドがクリティカルセクションを解放すると、再度許可カウンタが増加します(__release()メソッドによって行われます)。


tryAcquire(long timeout、TimeUnit unit)

メソッドを使ってアクセスを取得するためのタイムアウトを指定することができます。

利用可能な許可の数やセマフォを獲得するのを待っているスレッドの数も確認できます。

次のコードスニペットを使用してセマフォを実装することができます。

static Semaphore semaphore = new Semaphore(10);

public void execute() throws InterruptedException {

    LOG.info("Available permit : " + semaphore.availablePermits());
    LOG.info("Number of threads waiting to acquire: " +
      semaphore.getQueueLength());

    if (semaphore.tryAcquire()) {
        semaphore.acquire();
       //...
        semaphore.release();
    }

}


Semaphore

を使って

Mutex

のようなデータ構造を実装することができます。このリンクに関するより詳細な情報:/java-semaphore[ここにあります。]

** 2.8.

ThreadFactory

**

名前が示すように、

ThreadFactory

はオンデマンドで新しいスレッドを作成するスレッド(存在しない)プールとして機能します。それは効率的なスレッド作成メカニズムを実装するための多くの定型的なコーディングの必要性を排除します。


ThreadFactory

を定義できます。

public class BaeldungThreadFactory implements ThreadFactory {
    private int threadId;
    private String name;

    public BaeldungThreadFactory(String name) {
        threadId = 1;
        this.name = name;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, name + "-Thread__" + threadId);
        LOG.info("created new thread with id : " + threadId +
            " and name : " + t.getName());
        threadId++;
        return t;
    }
}

実行時に新しいスレッドを作成するには、この

newThread(Runnable r)

メソッドを使用します。

BaeldungThreadFactory factory = new BaeldungThreadFactory(
    "BaeldungThreadFactory");
for (int i = 0; i < 10; i++) {
    Thread t = factory.newThread(new Task());
    t.start();
}


2.9.


BlockingQueue

非同期プログラミングでは、最も一般的な統合パターンの1つがhttps://en.wikipedia.org/wiki/Producer%E2%80%93consumer

problem[producer-consumerパターン]です。

java.util.concurrent

パッケージには、

BlockingQueue__と呼ばれるデータ構造が付属しています。これは、これらの非同期シナリオで非常に役立ちます。

これに関するより多くの情報と実用的な例は利用可能なリンクです:/java-blocking-queue[ここ]。


2.10.

DelayQueue



DelayQueue

は、有効期限(ユーザー定義の遅延と呼ばれる)が完了した場合にのみ要素を取得できる、無限サイズの要素ブロックキューです。したがって、一番上の要素(

head

)が最も遅延が大きくなり、最後にポーリングされます。

これに関するより多くの情報と実用的な例は利用可能なリンクです:/java-delay-queue[ここ]。


2.11.

Locks


当然のことながら、

Lock

は、現在実行中のスレッドとは別に、他のスレッドが特定のコードセグメントにアクセスするのをブロックするためのユーティリティです。

LockブロックとSynchronizedブロックの主な違いは、synchronizedブロックがメソッドに完全に含まれていることです。ただし、Lock APIのlock()操作とunlock()操作を別々のメソッドに含めることができます。

これに関するより多くの情報と実用的な例は利用可能なリンクです:/java-concurrent-locks[ここ]。


2.12.

フェイザー



Phaser

は、

CyclicBarrier



CountDownLatch

よりも柔軟性の高いソリューションです。動的なスレッド数が実行を続行する前に待機する必要がある、再利用可能なバリアとして機能します。各プログラムフェーズに

Phaser

インスタンスを再利用して、実行の複数フェーズを調整できます。

これに関するより多くの情報と実用的な例は利用可能なリンクです:/java-phaser[ここ]。


3結論

この概要の概要記事では、

java.util.concurrent

パッケージで利用可能なさまざまなユーティリティに焦点を当てました。

いつものように、完全なソースコードはhttps://github.com/eugenp/tutorials/tree/master/core-java-concurrency[over on GitHub]から入手できます。