1. 概要
JGroups は、信頼性の高いメッセージ交換のためのJavaAPIです。 これは、以下を提供するシンプルなインターフェースを備えています。
- TCPとUDPを含む柔軟なプロトコルスタック
- 大きなメッセージの断片化と再構築
- 信頼性の高いユニキャストとマルチキャスト
- 障害検出
- フロー制御
他の多くの機能と同様に。
このチュートリアルでは、アプリケーション間で String メッセージを交換し、新しいアプリケーションがネットワークに参加するときに共有状態を提供するための簡単なアプリケーションを作成します。
2. 設定
2.1. Mavenの依存関係
pom.xmlに単一の依存関係を追加する必要があります。
<dependency>
<groupId>org.jgroups</groupId>
<artifactId>jgroups</artifactId>
<version>4.0.10.Final</version>
</dependency>
ライブラリの最新バージョンは、MavenCentralで確認できます。
2.2. ネットワーキング
JGroupsは、デフォルトでIPV6を使用しようとします。 システム構成によっては、アプリケーションが通信できなくなる場合があります。
これを回避するために、ここでアプリケーションを実行するときに、 java.net.preferIPv4Stackをtrueプロパティに設定します。
java -Djava.net.preferIPv4Stack=true com.baeldung.jgroups.JGroupsMessenger
3. JChannels
JGroupsネットワークへの接続はJChannelです。チャネルはクラスターに参加し、メッセージとネットワークの状態に関する情報を送受信します。
3.1. チャネルの作成
構成ファイルへのパスを使用してJChannelを作成します。 ファイル名を省略すると、現在の作業ディレクトリでudp.xmlが検索されます。
明示的に名前が付けられた構成ファイルを使用してチャネルを作成します。
JChannel channel = new JChannel("src/main/resources/udp.xml");
JGroupsの構成は非常に複雑になる可能性がありますが、ほとんどのアプリケーションではデフォルトのUDPおよびTCP構成で十分です。 コードにUDPのファイルを含め、このチュートリアルで使用します。
トランスポートの構成の詳細については、JGroupsのマニュアルここを参照してください。
3.2. チャネルの接続
チャネルを作成したら、クラスターに参加する必要があります。 クラスターは、メッセージを交換するノードのグループです。
クラスターに参加するには、クラスター名が必要です。
channel.connect("Baeldung");
クラスターに参加しようとする最初のノードは、クラスターが存在しない場合にクラスターを作成します。 このプロセスの実際の動作を以下に示します。
3.3. チャネルの命名
ノードは名前で識別されるため、ピアはダイレクトメッセージを送信し、クラスターに出入りするユーザーに関する通知を受信できます。 JGroupsは自動的に名前を割り当てますが、独自の名前を設定することもできます。
channel.name("user1");
以下では、これらの名前を使用して、ノードがクラスターに出入りするタイミングを追跡します。
3.4. チャネルを閉じる
終了したという通知をピアにタイムリーに受信させたい場合は、チャネルのクリーンアップが不可欠です。
JChannelをcloseメソッドで閉じます。
channel.close()
4. クラスタービューの変更
JChannelが作成されたら、クラスター内のピアの状態を確認し、それらとメッセージを交換する準備が整いました。
JGroupsは、Viewクラス内のクラスター状態を維持します。各チャネルには、ネットワークの単一のViewがあります。 ビューが変更されると、 viewAccepted()コールバックを介して配信されます。
このチュートリアルでは、アプリケーションに必要なすべてのインターフェイスメソッドを実装する ReceiverAdaptorAPIクラスを拡張します。
これは、コールバックを実装するための推奨される方法です。
viewAcceptedをアプリケーションに追加しましょう。
public void viewAccepted(View newView) {
private View lastView;
if (lastView == null) {
System.out.println("Received initial view:");
newView.forEach(System.out::println);
} else {
System.out.println("Received new view.");
List<Address> newMembers = View.newMembers(lastView, newView);
System.out.println("New members: ");
newMembers.forEach(System.out::println);
List<Address> exMembers = View.leftMembers(lastView, newView);
System.out.println("Exited members:");
exMembers.forEach(System.out::println);
}
lastView = newView;
}
各Viewには、クラスターの各メンバーを表すAddressオブジェクトのListが含まれています。 JGroupsは、あるビューを別のビューと比較するための便利なメソッドを提供します。これを使用して、クラスターの新しいメンバーまたは終了したメンバーを検出します。
5. メッセージの送信
JGroupsでのメッセージ処理は簡単です。 メッセージには、送信者と受信者に対応するbyte配列とAddressオブジェクトが含まれています。
このチュートリアルでは、コマンドラインから読み取った文字列を使用していますが、アプリケーションが他のデータ型を交換する方法を簡単に確認できます。
5.1. ブロードキャストメッセージ
メッセージは、宛先とバイト配列を使用して作成されます。 JChannel は、送信者を設定します。ターゲットがヌルの場合、クラスター全体がメッセージを受信します。
コマンドラインからテキストを受け取り、クラスターに送信します。
System.out.print("Enter a message: ");
String line = in.readLine().toLowerCase();
Message message = new Message(null, line.getBytes());
channel.send(message);
プログラムの複数のインスタンスを実行してこのメッセージを送信すると(以下の receive()メソッドを実装した後)、送信者を含むを含むすべてのインスタンスがメッセージを受信します。
5.2. メッセージのブロック
メッセージを表示したくない場合は、そのためのプロパティを設定できます。
channel.setDiscardOwnMessages(true);
前のテストを実行すると、メッセージ送信者はブロードキャストメッセージを受信しません。
5.3. ダイレクトメッセージ
ダイレクトメッセージを送信するには、有効なアドレスが必要です。 ノードを名前で参照している場合は、アドレスを検索する方法が必要です。 幸い、そのためのViewがあります。
現在のViewは、JChannelから常に利用できます。
private Optional<address> getAddress(String name) {
View view = channel.view();
return view.getMembers().stream()
.filter(address -> name.equals(address.toString()))
.findAny();
}
Address の名前は、クラス toString()メソッドを介して使用できるため、クラスターメンバーのListで目的の名前を検索するだけです。
したがって、コンソールから名前を受け入れ、関連する宛先を見つけて、ダイレクトメッセージを送信できます。
Address destination = null;
System.out.print("Enter a destination: ");
String destinationName = in.readLine().toLowerCase();
destination = getAddress(destinationName)
.orElseThrow(() -> new Exception("Destination not found");
Message message = new Message(destination, "Hi there!");
channel.send(message);
6. メッセージの受信
メッセージを送信できるので、今すぐ受信してみましょう。
ReceiverAdaptorの空の受信メソッドをオーバーライドしてみましょう。
public void receive(Message message) {
String line = Message received from: "
+ message.getSrc()
+ " to: " + message.getDest()
+ " -> " + message.getObject();
System.out.println(line);
}
メッセージにStringが含まれていることがわかっているので、 getObject()をSystem.outに安全に渡すことができます。
7. 州の交換
ノードがネットワークに入るとき、クラスターに関する状態情報を取得する必要がある場合があります。 JGroupsは、このための状態転送メカニズムを提供します。
ノードがクラスターに参加すると、 getState()を呼び出すだけです。 クラスターは通常、グループ内の最も古いメンバーであるコーディネーターから状態を取得します。
アプリケーションにブロードキャストメッセージカウントを追加しましょう。 新しいメンバー変数を追加し、 receive()内でインクリメントします。
private Integer messageCount = 0;
public void receive(Message message) {
String line = "Message received from: "
+ message.getSrc()
+ " to: " + message.getDest()
+ " -> " + message.getObject();
System.out.println(line);
if (message.getDest() == null) {
messageCount++;
System.out.println("Message count: " + messageCount);
}
}
null の宛先を確認します。これは、ダイレクトメッセージをカウントすると、ノードごとに番号が異なるためです。
次に、ReceiverAdaptorのさらに2つのメソッドをオーバーライドします。
public void setState(InputStream input) {
try {
messageCount = Util.objectFromStream(new DataInputStream(input));
} catch (Exception e) {
System.out.println("Error deserialing state!");
}
System.out.println(messageCount + " is the current messagecount.");
}
public void getState(OutputStream output) throws Exception {
Util.objectToStream(messageCount, new DataOutputStream(output));
}
メッセージと同様に、JGroupsは状態をbytesの配列として転送します。
JGroupsは、状態を書き込むための InputStream と、新しいノードが読み取るためのOutputStreamをコーディネーターに提供します。 APIは、データをシリアル化および逆シリアル化するための便利なクラスを提供します。
本番コードでは、状態情報へのアクセスはスレッドセーフでなければならないことに注意してください。
最後に、クラスターに接続した後、 getState()の呼び出しをスタートアップに追加します。
channel.connect(clusterName);
channel.getState(null, 0);
getState()は、状態を要求する宛先とミリ秒単位のタイムアウトを受け入れます。 null 宛先はコーディネーターを示し、0はタイムアウトしないことを意味します。
このアプリをノードのペアで実行し、ブロードキャストメッセージを交換すると、メッセージ数が増加します。
次に、3番目のクライアントを追加するか、そのうちの1つを停止して開始すると、新しく接続されたノードが正しいメッセージ数を出力するのがわかります。
8. 結論
このチュートリアルでは、JGroupsを使用して、メッセージを交換するためのアプリケーションを作成しました。 APIを使用して、クラスターに接続およびクラスターから離脱したノードを監視し、クラスターが参加したときにクラスターの状態を新しいノードに転送しました。
コードサンプルは、いつものように、GitHubのにあります。