Javaでは、 `Semaphore`を使って特定のリソースにアクセスするスレッドの数を制限することができます。

1.セマフォとは何ですか?

要するに、セマフォーは許可(チケット)のセットを維持し、各 `acquire()`はセマフォーから許可(チケット)を取り、それぞれの `release()`は許可(チケット)をセマフォーに返送します。 permit(チケット)が利用できない場合、 `acquire()`は利用可能になるまでブロックします。

   //5 tickets
    Semaphore semaphore = new Semaphore(5);

   //take 1 ticket
    semaphore.acquire();

   //4
    semaphore.availablePermits();

   //return back ticket
    semaphore.release();

   //5
    semaphore.availablePermits();

2. Javaセマフォ


ExecutorService`で実行中のタスクの数を制限するJava Semaphoreの例です。この例では、5つの `Callable`タスクが

ExecutorService`に提出されますが、同時に実行されるタスクは2つだけです。

TaskLimitSemaphore.java

package com.mkyong.concurrency.synchronizer.semaphore;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.** ;
//Throttle task submission
public class TaskLimitSemaphore {

    private final ExecutorService executor;
    private final Semaphore semaphore;

    public TaskLimitSemaphore(ExecutorService executor, int limit) {
        this.executor = executor;
        this.semaphore = new Semaphore(limit);
    }

    public <T> Future<T> submit(final Callable<T> task) throws InterruptedException {

        semaphore.acquire();
        System.out.println("semaphore.acquire()...");

        return executor.submit(() -> {
            try {
                return task.call();
            } finally {
                semaphore.release();
                System.out.println("semaphore.release()...");
            }
        });

    }

    private static final DateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");

    public static void main(String[]args) throws InterruptedException {

        ExecutorService executor = Executors.newCachedThreadPool();

       //Only 2 tasks are able to run concurrently
        TaskLimitSemaphore obj = new TaskLimitSemaphore(executor, 2);

        obj.submit(() -> {
            System.out.println(getCurrentDateTime() + " : task1 is running!");
            Thread.sleep(2000);
            System.out.println(getCurrentDateTime() + " : task1 is done!");
            return 1;
        });

        obj.submit(() -> {
            System.out.println(getCurrentDateTime() + " : task2 is running!");
            Thread.sleep(2000);
            System.out.println(getCurrentDateTime() + " task2 is done!");
            return 2;
        });

        obj.submit(() -> {
            System.out.println(getCurrentDateTime() + " task3 is running!");
            Thread.sleep(2000);
            System.out.println(getCurrentDateTime() + " task3 is done!");
            return 3;
        });

        obj.submit(() -> {
            System.out.println(getCurrentDateTime() + " task4 is running!");
            Thread.sleep(2000);
            System.out.println(getCurrentDateTime() + " task4 is done!");
            return 4;
        });

        obj.submit(() -> {
            System.out.println(getCurrentDateTime() + " task5 is running!");
            Thread.sleep(2000);
            System.out.println(getCurrentDateTime() + " task5 is done!");
            return 5;
        });

        executor.shutdown();
    }

    private static String getCurrentDateTime() {
        return sdf.format(new Date());
    }
}

出力

semaphore.acquire()...
semaphore.acquire()...
2018/12/06 18:45:22 : task1 is running!
2018/12/06 18:45:22 : task2 is running!
2018/12/06 18:45:24 : task1 is done!
2018/12/06 18:45:24 task2 is done!
semaphore.release()...
semaphore.acquire()...
semaphore.release()...
semaphore.acquire()...
2018/12/06 18:45:24 task3 is running!
2018/12/06 18:45:24 task4 is running!
2018/12/06 18:45:26 task4 is done!
2018/12/06 18:45:26 task3 is done!
semaphore.acquire()...
semaphore.release()...
semaphore.release()...
2018/12/06 18:45:26 task5 is running!
2018/12/06 18:45:28 task5 is done!
semaphore.release()...

3.ミューテックス

要約すると、常に `new Semaphore(1)`は、特定のリソースにアクセスできるスレッドは1つだけです。

PrintSequenceCallable.java

package com.mkyong.concurrency.synchronizer.semaphore;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SshLoginSemaphore {

    private final Semaphore mutex;

   //only 1 user is allow
    public SshLoginSemaphore() {
        this.mutex = new Semaphore(1);
    }

    private void ssh(String user) throws InterruptedException {

        mutex.acquire();
        System.out.println(getCurrentDateTime() + " : " + user + " mutex.acquire()");

        Thread.sleep(2000);

        mutex.release();
        System.out.println(getCurrentDateTime() + " : " + user + " mutex.release()");

    }

    private static final DateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");

    public static void main(String[]args) {

        ExecutorService executor = Executors.newFixedThreadPool(5);

        SshLoginSemaphore task = new SshLoginSemaphore();

       //submit 3 tasks
        executor.submit(() -> {
            try {
                task.ssh("mkyong");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        executor.submit(() -> {
            try {
                task.ssh("yflow");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        executor.submit(() -> {
            try {
                task.ssh("zilap");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        executor.shutdown();

    }

    private static String getCurrentDateTime() {
        return sdf.format(new Date());
    }

}

出力

時間(秒)、1つのスレッドのみ、 `acquire()`と1つの `release()`を確認してください。

2018/12/06 18:54:25 : mkyong mutex.acquire()
2018/12/06 18:54:27 : yflow mutex.acquire()
2018/12/06 18:54:27 : mkyong mutex.release()
2018/12/06 18:54:29 : zilap mutex.acquire()
2018/12/06 18:54:29 : yflow mutex.release()
2018/12/06 18:54:31 : zilap mutex.release()

ソースコードをダウンロードする

参考文献

  1. ウィキペディア –

doc – セマフォ]


並行処理


java


mutex

リンク://タグ/セマフォ/[セマフォ]リンク://タグ/スレッド/[スレッド]