1. 概要

この記事では、JavaNIOのSelectorコンポーネントの紹介部分について説明します。

セレクターは、1つ以上のNIOチャネルを監視し、1つ以上がデータ転送に使用できるようになるタイミングを認識するためのメカニズムを提供します。

このように、単一のスレッドを使用して複数のチャネルを管理し、複数のネットワーク接続を管理できます。

2. なぜセレクターを使用するのですか?

セレクターを使用すると、複数のチャネルを管理するために、複数のスレッドではなく1つのスレッドを使用できます。 スレッド間のコンテキスト切り替えはオペレーティングシステムにとってコストがかかり、さらに各スレッドはメモリを消費します。

したがって、使用するスレッドが少ないほど良いです。 ただし、最新のオペレーティングシステムとCPUはマルチタスクで向上し続けているため、マルチスレッドのオーバーヘッドは時間の経過とともに減少し続けることを覚えておくことが重要です。

ここでは、セレクターを使用して、単一のスレッドで複数のチャネルを処理する方法について説明します。

セレクターはデータの読み取りに役立つだけではないことにも注意してください。 また、着信ネットワーク接続をリッスンし、低速チャネルを介してデータを書き込むこともできます。

3. 設定

セレクターを使用するために、特別な設定は必要ありません。 必要なクラスはすべてコアjava.nioパッケージに含まれており、必要なものをインポートするだけです。

その後、セレクターオブジェクトに複数のチャンネルを登録できます。 いずれかのチャネルでI/Oアクティビティが発生すると、セレクターが通知します。 これは、単一のスレッドで多数のデータソースから読み取る方法です。

セレクターに登録するチャネルは、SelectedChannelのサブクラスである必要があります。 これらは、非ブロッキングモードにすることができる特別なタイプのチャネルです。

4. セレクターの作成

セレクターは、Selectorクラスの静的openメソッドを呼び出すことで作成できます。このメソッドは、システムのデフォルトのセレクタープロバイダーを使用して新しいセレクターを作成します。

Selector selector = Selector.open();

5. 選択可能なチャネルの登録

セレクターがチャネルを監視するには、これらのチャネルをセレクターに登録する必要があります。 これを行うには、選択可能なチャネルのregisterメソッドを呼び出します。

ただし、チャネルをセレクターに登録する前に、非ブロッキングモードにする必要があります。

channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

つまり、 FileChannel は、ソケットチャネルの場合のように非ブロッキングモードに切り替えることができないため、セレクターで使用することはできません。

最初のパラメーターは前に作成したSelectorオブジェクトで、2番目のパラメーターはインタレストセットを定義します。これは、セレクターを介して、監視対象チャネルでリッスンすることに関心のあるイベントを意味します。

リッスンできるイベントは4つあり、それぞれがSelectionKeyクラスの定数で表されます。

  • Connect クライアントがサーバーに接続しようとしたとき。 SelectionKey.OP_CONNECTで表されます
  • Accept サーバーがクライアントからの接続を受け入れる場合。 SelectionKey.OP_ACCEPTで表されます
  • Read サーバーがチャネルから読み取る準備ができたとき。 SelectionKey.OP_READで表されます
  • Write サーバーがチャネルに書き込む準備ができたとき。 SelectionKey.OP_WRITEで表されます

返されるオブジェクトSelectionKeyは、セレクターへの選択可能なチャネルの登録を表します。 次のセクションで詳しく説明します。

6. SelectionKeyオブジェクト

前のセクションで見たように、チャネルをセレクターに登録すると、SelectionKeyオブジェクトが取得されます。 このオブジェクトは、チャネルの登録を表すデータを保持します。

これには、チャネルでセレクターを使用できるようにするためによく理解する必要のあるいくつかの重要なプロパティが含まれています。 これらのプロパティについては、次のサブセクションで説明します。

6.1. インタレストセット

インタレストセットは、セレクターがこのチャネルで監視するイベントのセットを定義します。 これは整数値です。 この情報は次の方法で取得できます。

まず、SelectionKeyinterestOpsメソッドによって返されるインタレストセットがあります。 次に、前に見たSelectionKeyにイベント定数があります。

これらの2つの値をANDすると、イベントが監視されているかどうかを示すブール値が得られます。

int interestSet = selectionKey.interestOps();

boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;

6.2. レディセット

レディセットは、チャネルの準備ができているイベントのセットを定義します。 これも整数値です。 この情報は次の方法で取得できます。

SelectionKeyreadyOpsメソッドによって返されるレディセットがあります。 関心セットの場合と同様に、この値をイベント定数とANDすると、チャネルが特定の値の準備ができているかどうかを表すブール値が得られます。

これを行う別の代替のより短い方法は、これと同じ目的でSelectionKey’の便利なメソッドを使用することです。

selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWriteable();

6.3. チャンネル

SelectionKeyオブジェクトから監視対象のチャネルにアクセスするのは非常に簡単です。 channelメソッドを呼び出すだけです。

Channel channel = key.channel();

6.4. セレクター

チャネルを取得するのと同じように、SelectionKeyオブジェクトからSelectorオブジェクトを取得するのは非常に簡単です。

Selector selector = key.selector();

6.5. オブジェクトの添付

オブジェクトをにアタッチできます SelectionKey。 チャネルにカスタムIDを指定したり、追跡したい任意の種類のJavaオブジェクトを添付したりする場合があります。

オブジェクトをアタッチすることは、それを行うための便利な方法です。 SelectionKeyからオブジェクトをアタッチおよび取得する方法は次のとおりです。

key.attach(Object);

Object object = key.attachment();

または、チャネル登録時にオブジェクトをアタッチすることもできます。 次のように、チャネルのregisterメソッドに3番目のパラメータとして追加します。

SelectionKey key = channel.register(
  selector, SelectionKey.OP_ACCEPT, object);

7. チャンネルキーの選択

これまで、セレクターを作成し、それにチャネルを登録し、セレクターへのチャネルの登録を表すSelectionKeyオブジェクトのプロパティを検査する方法を見てきました。

これはプロセスの半分に過ぎません。今度は、前に見たレディセットを選択する連続プロセスを実行する必要があります。 次のように、セレクターのselectメソッドを使用して選択を行います。

int channels = selector.select();

このメソッドは、少なくとも1つのチャネルが操作の準備ができるまでブロックします。 返される整数は、チャネルが操作の準備ができているキーの数を表します。

次に、通常、処理のために選択したキーのセットを取得します。

Set<SelectionKey> selectedKeys = selector.selectedKeys();

取得したセットはSelectionKeyオブジェクトであり、各キーは操作の準備ができている登録済みチャネルを表します。

この後、通常、このセットを繰り返し処理し、キーごとにチャネルを取得して、関心のあるセットに表示される操作を実行します。

チャネルの存続期間中、そのキーがさまざまなイベントの準備完了セットに表示されるため、チャネルが数回選択される場合があります。 これが、チャネルイベントが発生したときにそれをキャプチャして処理するための継続的なループが必要な理由です。

8. 完全な例

前のセクションで得た知識を強化するために、完全なクライアントサーバーの例を作成します。

コードのテストを簡単にするために、エコーサーバーとエコークライアントを構築します。 この種のセットアップでは、クライアントはサーバーに接続し、サーバーへのメッセージの送信を開始します。 サーバーは、各クライアントから送信されたメッセージをエコーバックします。

サーバーは、 end などの特定のメッセージを検出すると、それを通信の終了として解釈し、クライアントとの接続を閉じます。

8.1. サーバー

EchoServer.javaのコードは次のとおりです。

public class EchoServer {

    private static final String POISON_PILL = "POISON_PILL";

    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        serverSocket.bind(new InetSocketAddress("localhost", 5454));
        serverSocket.configureBlocking(false);
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        ByteBuffer buffer = ByteBuffer.allocate(256);

        while (true) {
            selector.select();
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> iter = selectedKeys.iterator();
            while (iter.hasNext()) {

                SelectionKey key = iter.next();

                if (key.isAcceptable()) {
                    register(selector, serverSocket);
                }

                if (key.isReadable()) {
                    answerWithEcho(buffer, key);
                }
                iter.remove();
            }
        }
    }

    private static void answerWithEcho(ByteBuffer buffer, SelectionKey key)
      throws IOException {
 
        SocketChannel client = (SocketChannel) key.channel();
        client.read(buffer);
        if (new String(buffer.array()).trim().equals(POISON_PILL)) {
            client.close();
            System.out.println("Not accepting client messages anymore");
        }
        else {
            buffer.flip();
            client.write(buffer);
            buffer.clear();
        }
    }

    private static void register(Selector selector, ServerSocketChannel serverSocket)
      throws IOException {
 
        SocketChannel client = serverSocket.accept();
        client.configureBlocking(false);
        client.register(selector, SelectionKey.OP_READ);
    }

    public static Process start() throws IOException, InterruptedException {
        String javaHome = System.getProperty("java.home");
        String javaBin = javaHome + File.separator + "bin" + File.separator + "java";
        String classpath = System.getProperty("java.class.path");
        String className = EchoServer.class.getCanonicalName();

        ProcessBuilder builder = new ProcessBuilder(javaBin, "-cp", classpath, className);

        return builder.start();
    }
}

これが起こっていることです。 静的なopenメソッドを呼び出して、Selectorオブジェクトを作成します。 次に、静的 open メソッド、具体的にはServerSocketChannelインスタンスを呼び出してチャネルを作成します。

これは、 ServerSocketChannelが選択可能であり、ストリーム指向のリスニングソケットに適しているためです。

次に、それを選択したポートにバインドします。 選択可能なチャネルをセレクターに登録する前に、まずそれを非ブロッキングモードに設定する必要があることを先に述べたことを思い出してください。 次に、これを実行してから、チャネルをセレクターに登録します。

この段階では、このチャネルの SelectionKey インスタンスは必要ないため、覚えていません。

Java NIOは、ストリーム指向モデル以外のバッファ指向モデルを使用します。 したがって、ソケット通信は通常、バッファへの書き込みとバッファからの読み取りによって行われます。

したがって、サーバーが書き込みおよび読み取りを行う新しいByteBufferを作成します。 256バイトに初期化します。これは、転送するデータの量に応じて、任意の値になります。

最後に、選択プロセスを実行します。 準備ができているチャネルを選択し、それらの選択キーを取得し、キーを反復処理して、各チャネルの準備ができている操作を実行します。

サーバーは通常、アクティビティがあるかどうかに関係なく実行を継続する必要があるため、これは無限ループで実行されます。

ServerSocketChannel が処理できる唯一の操作は、ACCEPT操作です。 クライアントからの接続を受け入れると、読み取りと書き込みを実行できるSocketChannelオブジェクトを取得します。 これを非ブロッキングモードに設定し、READ操作用にセレクターに登録します。

後続の選択の1つで、この新しいチャネルは読み取り可能になります。 それを取得し、その内容をバッファーに読み込みます。 エコーサーバーとしての事実であるため、このコンテンツをクライアントに書き戻す必要があります。

読み取っているバッファに書き込みたい場合は、flip()メソッドを呼び出す必要があります。

最後に、 flip メソッドを呼び出してバッファを書き込みモードに設定し、単純に書き込みます。

start()メソッドは、ユニットテスト中にエコーサーバーを個別のプロセスとして開始できるように定義されています。

8.2. クライアント

EchoClient.javaのコードは次のとおりです。

public class EchoClient {
    private static SocketChannel client;
    private static ByteBuffer buffer;
    private static EchoClient instance;

    public static EchoClient start() {
        if (instance == null)
            instance = new EchoClient();

        return instance;
    }

    public static void stop() throws IOException {
        client.close();
        buffer = null;
    }

    private EchoClient() {
        try {
            client = SocketChannel.open(new InetSocketAddress("localhost", 5454));
            buffer = ByteBuffer.allocate(256);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public String sendMessage(String msg) {
        buffer = ByteBuffer.wrap(msg.getBytes());
        String response = null;
        try {
            client.write(buffer);
            buffer.clear();
            client.read(buffer);
            response = new String(buffer.array()).trim();
            System.out.println("response=" + response);
            buffer.clear();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;

    }
}

クライアントはサーバーよりも単純です。

シングルトンパターンを使用して、start静的メソッド内でインスタンス化します。 このメソッドからプライベートコンストラクターを呼び出します。

プライベートコンストラクターでは、サーバーチャネルがバインドされているのと同じポートで、同じホスト上で接続を開きます。

次に、書き込みと読み取りが可能なバッファーを作成します。

最後に、 sendMessage メソッドがあります。このメソッドは、渡した文字列をバイトバッファーに読み取り、チャネルを介してサーバーに送信します。

次に、クライアントチャネルから読み取り、サーバーから送信されたメッセージを取得します。 これをメッセージのエコーとして返します。

8.3. テスト

EchoTest.java というクラス内で、サーバーを起動し、サーバーにメッセージを送信し、サーバーから同じメッセージが受信された場合にのみ合格するテストケースを作成します。 最後のステップとして、テストケースは完了する前にサーバーを停止します。

これで、テストを実行できます。

public class EchoTest {

    Process server;
    EchoClient client;

    @Before
    public void setup() throws IOException, InterruptedException {
        server = EchoServer.start();
        client = EchoClient.start();
    }

    @Test
    public void givenServerClient_whenServerEchosMessage_thenCorrect() {
        String resp1 = client.sendMessage("hello");
        String resp2 = client.sendMessage("world");
        assertEquals("hello", resp1);
        assertEquals("world", resp2);
    }

    @After
    public void teardown() throws IOException {
        server.destroy();
        EchoClient.stop();
    }
}

9. Selector.wakeup()

前に見たように、 selector.select()を呼び出すと、監視対象のチャネルの1つが操作可能になるまで現在のスレッドがブロックされます。 別のスレッドからselector.wakeup()を呼び出すことで、これをオーバーライドできます。

その結果、ブロッキングスレッドは、チャネルの準備ができているかどうかに関係なく、待機を継続するのではなく、すぐに戻ります

CountDownLatch を使用し、コード実行ステップを追跡することで、これを実証できます。

@Test
public void whenWakeUpCalledOnSelector_thenBlockedThreadReturns() {
    Pipe pipe = Pipe.open();
    Selector selector = Selector.open();
    SelectableChannel channel = pipe.source();
    channel.configureBlocking(false);
    channel.register(selector, OP_READ);

    List<String> invocationStepsTracker = Collections.synchronizedList(new ArrayList<>());

    CountDownLatch latch = new CountDownLatch(1);

    new Thread(() -> {
        invocationStepsTracker.add(">> Count down");
        latch.countDown();
        try {
            invocationStepsTracker.add(">> Start select");
            selector.select();
            invocationStepsTracker.add(">> End select");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }).start();

    invocationStepsTracker.add(">> Start await");
    latch.await();
    invocationStepsTracker.add(">> End await");

    invocationStepsTracker.add(">> Wakeup thread");
    selector.wakeup();
    //clean up
    channel.close();

    assertThat(invocationStepsTracker)
      .containsExactly(
        ">> Start await",
        ">> Count down",
        ">> Start select",
        ">> End await",
        ">> Wakeup thread",
        ">> End select"
    );
}

この例では、JavaNIOのPipe クラスを使用して、テスト目的でチャネルを開きます。 スレッドセーフリストでコード実行ステップを追跡します。 これらの手順を分析することにより、 selector.wakeup() selector.select()によってブロックされたスレッドを解放する方法を確認できます。

10. 結論

この記事では、JavaNIOSelectorコンポーネントの基本的な使用法について説明しました。

この記事の完全なソースコードとすべてのコードスニペットは、私のGitHubプロジェクトで入手できます。