image

Javaでは、 `BlockingQueue`を使用して、プロデューサとコンシューマの両方が共有するキューを作成できます。

  1. プロデューサ – データを生成してキューに入れます.

  2. コンシューマ – キューからデータを削除します.

BlockingQueueの実装はスレッドセーフで、複数のプロデューサと複数のコンシューマで安全に使用できます。

1. BlockingQueue

単純な「BlockingQueue」の例では、プロデューサはデータを生成してキューに入れると同時に、コンシューマは同じキューからデータを取得します。

1.1 Producer – 20個の整数をキューに入れるための `Runnable`オブジェクト。

ExecutorExample1.java

package com.mkyong.concurrency.queue.simple.raw;

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {

    private final BlockingQueue<Integer> queue;

    @Override
    public void run() {

        try {
            process();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

    }

    private void process() throws InterruptedException {

       //Put 20 ints into Queue
        for (int i = 0; i < 20; i++) {
            System.out.println("[Producer]Put : " + i);
            queue.put(i);
            System.out.println("[Producer]Queue remainingCapacity : " + queue.remainingCapacity());
            Thread.sleep(100);
        }

    }

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }
}

1.2 Consumer – キューからアイテムを取得するための `Runnable`オブジェクトです。

Consumer.java

package com.mkyong.concurrency.queue.simple.raw;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {

    private final BlockingQueue<Integer> queue;

    @Override
    public void run() {

        try {
            while (true) {
                Integer take = queue.take();
                process(take);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

    }

    private void process(Integer take) throws InterruptedException {
        System.out.println("[Consumer]Take : " + take);
        Thread.sleep(500);
    }

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }
}

1.3それを実行します。 1プロデューサと1コンシューマを開始し、サイズが10のキューを作成します。

Main.java

package com.mkyong.concurrency.queue.simple;

import com.mkyong.concurrency.queue.simple.raw.Consumer;
import com.mkyong.concurrency.queue.simple.raw.Producer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Main {

    public static void main(String[]args) {

        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);

        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();

    }

}

出力

プロデューサは、いっぱいになると、キューにデータを入れようとしません。

…​.[Producer]Put : 0[Producer]Queue remainingCapacity : 9[Consumer]Take : 0[Producer]Put : 1[Producer]Queue remainingCapacity : 9[Producer]Put : 2[Producer]Queue remainingCapacity : 8[Producer]Put : 3[Producer]Queue remainingCapacity : 7[Producer]Put : 4[Producer]Queue remainingCapacity : 6[Producer]Put : 5[Producer]Queue remainingCapacity : 5[Consumer]Take : 1[Producer]Put : 6[Producer]Queue remainingCapacity : 5[Producer]Put : 7[Producer]Queue remainingCapacity : 4[Producer]Put : 8[Producer]Queue remainingCapacity : 3[Producer]Put : 9[Producer]Queue remainingCapacity : 2[Producer]Put : 10[Producer]Queue remainingCapacity : 1[Consumer]Take : 2[Producer]Put : 11[Producer]Queue remainingCapacity : 1[Producer]Put : 12[Producer]Queue remainingCapacity : 0[Producer]Put : 13[Consumer]Take : 3[Producer]Queue remainingCapacity : 0[Producer]Put : 14[Consumer]Take : 4[Producer]Queue remainingCapacity : 0[Producer]Put : 15[Consumer]Take : 5[Producer]Queue remainingCapacity : 0[Producer]Put : 16[Consumer]Take : 6[Producer]Queue remainingCapacity : 0[Producer]Put : 17[Consumer]Take : 7[Producer]Queue remainingCapacity : 0[Producer]Put : 18[Consumer]Take : 8[Producer]Queue remainingCapacity : 0[Producer]Put : 19[Consumer]Take : 9[Producer]Queue remainingCapacity : 0[Consumer]Take : 10[Consumer]Take : 11[Consumer]Take : 12[Consumer]Take : 13[Consumer]Take : 14[Consumer]Take : 15[Consumer]Take : 16[Consumer]Take : 17[Consumer]Take : 18[Consumer]Take : 19

プログラムは停止したり終了したりすることはありません。実行し続けると、 `BlockingQueue`

===  2. BlockingQueue Poison Pill

"poison pill"は、プロデューサスレッドとコンシューマスレッドの両方を停止または中断するための一般的な解決策です。アイデアは、 "消費者"が "ポイズンピル"を参照して停止して終了する場合は、プロデューサーがキューに "毒の丸薬"を入れて終了します。

2.1毒丸剤を含む製造者。

ProducerPoison.java

package com.mkyong.concurrency.queue.simple.poison;

import java.util.concurrent.BlockingQueue;

public class ProducerPoison implements Runnable {

private final BlockingQueue<Integer> queue;
private final Integer POISON;

@Override
public void run() {

try {
    process();
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
} finally {
    while (true) {
        try {
            queue.put(POISON);
            break;
        } catch (InterruptedException e) {
           //...
        }
    }
}

}

private void process() throws InterruptedException {

//Put 20 elements into Queue
 for (int i = 0; i < 20; i++) {
     System.out.println("[Producer]Put : " + i);
     queue.put(i);
     System.out.println("[Producer]Queue remainingCapacity : " + queue.remainingCapacity());
     Thread.sleep(100);
 }

}

public ProducerPoison(BlockingQueue<Integer> queue, Integer POISON) {
    this.queue = queue;
    this.POISON = POISON;
}

}

2.2毒丸剤を含む消費者。

ConsumerPoison.java

package com.mkyong.concurrency.queue.simple.poison;

import java.util.concurrent.BlockingQueue;

public class ConsumerPoison implements Runnable {

private final BlockingQueue<Integer> queue;
private final Integer POISON;

@Override
public void run() {

try {
    while (true) {
        Integer take = queue.take();
        process(take);

//if this is a poison pill, break, exit
 if (take == POISON) {
     break;
 }

    }
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
}

}

private void process(Integer take) throws InterruptedException {
    System.out.println("[Consumer]Take : " + take);
    Thread.sleep(500);
}

    public ConsumerPoison(BlockingQueue<Integer> queue, Integer POISON) {
        this.queue = queue;
        this.POISON = POISON;
    }
}

2.3 2人の生産者と2人の消費者を開始する。

Main.java

package com.mkyong.concurrency.queue.simple;

import com.mkyong.concurrency.queue.simple.poison.ConsumerPoison;
import com.mkyong.concurrency.queue.simple.poison.ProducerPoison;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Main {

public static void main(String[]args) {

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);

//new Thread(new Producer(queue)).start();
//new Thread(new Consumer(queue)).start();

Integer poison = -1;
new Thread(new ProducerPoison(queue, poison)).start();
new Thread(new ProducerPoison(queue, poison)).start();

new Thread(new ConsumerPoison(queue, poison)).start();
new Thread(new ConsumerPoison(queue, poison)).start();

}

}

出力

....[Producer]Put : 0[Producer]Put : 0[//...[Consumer]Take : 18[Consumer]Take : 18[Consumer]Take : 19[Consumer]Take : 19[Consumer]Take : -1[Consumer]Take : -1

Process finished with exit code 0

3. BlockingQueueファイルのインデックス作成

単純なファイルインデックスエンジンを作成するための `BlockingQueue`の例。プロデューサはディレクトリをクロールし、ファイル名をキューに入れます。同時に、コンシューマは同じキューからファイル名を取り出してインデックスを作成します。

3.1プロデューサー。

FileCrawlerProducer.java

package com.mkyong.concurrency.queue.crawler;

import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
//Producer//Crawl file system and put the filename in BlockingQueue.
public class FileCrawlerProducer implements Runnable {

    private final BlockingQueue<File> fileQueue;
    private final FileFilter fileFilter;
    private final File file;
    private final File POISON;
    private final int N__POISON__PILL__PER__PRODUCER;

    @Override
    public void run() {

        try {
            crawl(file);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            while (true) {
                try {
                    System.out.println(Thread.currentThread().getName()
                            + " - FileCrawlerProducer is done, try poison all the consumers!");
                   //poison all threads
                    for (int i = 0; i < N__POISON__PILL__PER__PRODUCER; i++) {
                        System.out.println(Thread.currentThread().getName() + " - puts poison pill!");
                        fileQueue.put(POISON);
                    }
                    break;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    public FileCrawlerProducer(BlockingQueue<File> fileQueue,
                               FileFilter fileFilter, File file,
                               File POISON, int n__POISON__PILL__PER__PRODUCER) {
        this.fileQueue = fileQueue;
        this.fileFilter = fileFilter;
        this.file = file;
        this.POISON = POISON;
        N__POISON__PILL__PER__PRODUCER = n__POISON__PILL__PER__PRODUCER;
    }

    private void crawl(File root) throws InterruptedException {

        File[]entries = root.listFiles(fileFilter);
        if (entries != null) {
            for (File entry : entries) {
                if (entry.isDirectory()) {
                    crawl(entry);
                } else if (!isIndexed(entry)) {
                    System.out.println("[FileCrawlerProducer]- Found..."
                            + entry.getAbsoluteFile());
                    fileQueue.put(entry);
                }
            }
        }

    }

    private boolean isIndexed(File f) {
        return false;
    }

}

3.2消費者。

IndexerConsumer.java

package com.mkyong.concurrency.queue.crawler;

import java.io.File;
import java.util.concurrent.BlockingQueue;
//Consumer
public class IndexerConsumer implements Runnable {

    private final BlockingQueue<File> fileQueue;
    private final File POISON;

    @Override
    public void run() {

        try {
            while (true) {
                File take = fileQueue.take();
                if (take == POISON) {
                    System.out.println(Thread.currentThread().getName() + " die");
                    break;
                }
                indexFile(take);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

    }

    public void indexFile(File file) {
        if (file.isFile()) {
            System.out.println(Thread.currentThread().getName()
                    + "[IndexerConsumer]- Indexing..." + file.getAbsoluteFile());
        }

    }

    public IndexerConsumer(BlockingQueue<File> fileQueue, File POISON) {
        this.fileQueue = fileQueue;
        this.POISON = POISON;
    }
}

3.3 1人のプロデューサーと2人の消費者を開始する。

Main.java

package com.mkyong.concurrency.queue.crawler;

import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Main {

    private static final File POISON = new File("This is a POISON PILL");

    public static void main(String[]args) {

        int N__PRODUCERS = 1;
        int N__CONSUMERS = 2;//Runtime.getRuntime().availableProcessors();
        int N__POISON__PILL__PER__PRODUCER = N__CONSUMERS/N__PRODUCERS;
        int N__POISON__PILL__REMAIN = N__CONSUMERS % N__PRODUCERS;

        System.out.println("N__PRODUCERS : " + N__PRODUCERS);
        System.out.println("N__CONSUMERS : " + N__CONSUMERS);
        System.out.println("N__POISON__PILL__PER__PRODUCER : " + N__POISON__PILL__PER__PRODUCER);
        System.out.println("N__POISON__PILL__REMAIN : " + N__POISON__PILL__REMAIN);

       //unbound queue, no limit
        BlockingQueue<File> queue = new LinkedBlockingQueue<>();

        FileFilter filter = new FileFilter() {
            public boolean accept(File file) {
                return true;
            }
        };

        File root = new File("C:\\users");

        for (int i = 0; i < N__PRODUCERS - 1; i++) {
            new Thread(new FileCrawlerProducer(queue, filter, root,
                    POISON, N__POISON__PILL__PER__PRODUCER)).start();
        }
        new Thread(new FileCrawlerProducer(queue, filter, root, POISON,
                N__POISON__PILL__PER__PRODUCER + N__POISON__PILL__REMAIN)).start();

        for (int i = 0; i < N__CONSUMERS; i++) {
            new Thread(new IndexerConsumer(queue, POISON)).start();
        }

    }
}

出力

…​.//…​[FileCrawlerProducer]- Found…​C:\users\Public\Videos\desktop.ini
Thread-2[IndexerConsumer]- Indexing…​C:\users\Public\Videos\desktop.ini
Thread-0 – FileCrawlerProducer is done, try poison all the consumers!
Thread-0 – puts poison pill!
Thread-0 – puts poison pill!
Thread-1 die
Thread-2 die

Process finished with exit code 0

=== ソースコードをダウンロードする

git clone https://github.com/mkyong/java-concurrency.git

=== 参考文献

.  https://en.wikipedia.org/wiki/Producer%E2%80%93consumer__problem[Producer-consumer

問題]。 https://docs.oracle.com/javase/8/docs/api/?java/util/concurrent/BlockingQueue.html[BlockingQueue

JavaDoc]

link://tag/blockingqueue/[BlockingQueue]link://tag/concurrency/[並行処理]link://タグ/java/[java]link://tag/queue/[queue]link://タグ/thread/[thread]