MBassadorの紹介
1. 概要
簡単に言うと、 MBassador は、パブリッシュ/サブスクライブセマンティクスを利用した高性能イベントバスです。
メッセージは、サブスクライバーの数やメッセージの使用方法を事前に知らなくても、1つ以上のピアにブロードキャストされます。
2. Mavenの依存関係
ライブラリを使用する前に、mbassador依存関係を追加する必要があります。
<dependency>
<groupId>net.engio</groupId>
<artifactId>mbassador</artifactId>
<version>1.3.1</version>
</dependency>
3. 基本的なイベント処理
3.1. 簡単な例
メッセージを公開する簡単な例から始めましょう。
private MBassador<Object> dispatcher = new MBassador<>();
private String messageString;
@Before
public void prepareTests() {
dispatcher.subscribe(this);
}
@Test
public void whenStringDispatched_thenHandleString() {
dispatcher.post("TestString").now();
assertNotNull(messageString);
assertEquals("TestString", messageString);
}
@Handler
public void handleString(String message) {
messageString = message;
}
このテストクラスの上部に、デフォルトのコンストラクターを使用してMBassadorが作成されていることがわかります。 次に、 @Before メソッドで、 subscribe()を呼び出し、クラス自体への参照を渡します。
subscribe()では、ディスパッチャは、@Handlerアノテーションについてサブスクライバを検査します。
そして、最初のテストでは、 dispatcher.post(…).now()を呼び出してメッセージをディスパッチします。これにより、 handleString()が呼び出されます。
この最初のテストは、いくつかの重要な概念を示しています。 @Handlerで注釈が付けられた1つ以上のメソッドがある限り、任意のオブジェクトをサブスクライバーにすることができます。サブスクライバーは任意の数のハンドラーを持つことができます。
簡単にするために、自分自身をサブスクライブするテストオブジェクトを使用していますが、ほとんどの本番シナリオでは、メッセージディスパッチャーはコンシューマーとは異なるクラスになります。
ハンドラーメソッドには、メッセージという1つの入力パラメーターしかなく、チェックされた例外をスローすることはできません。
subscribe()メソッドと同様に、postメソッドは任意のObjectを受け入れます。 このオブジェクトはサブスクライバーに配信されます。
メッセージが投稿されると、メッセージタイプをサブスクライブしているすべてのリスナーに配信されます。
別のメッセージハンドラーを追加して、別のメッセージタイプを送信してみましょう。
private Integer messageInteger;
@Test
public void whenIntegerDispatched_thenHandleInteger() {
dispatcher.post(42).now();
assertNull(messageString);
assertNotNull(messageInteger);
assertTrue(42 == messageInteger);
}
@Handler
public void handleInteger(Integer message) {
messageInteger = message;
}
予想どおり、 an Integer をディスパッチすると、 handleInteger()が呼び出されますが、 handleString()は呼び出されません。 1つのディスパッチャを使用して、複数のメッセージタイプを送信できます。
3.2. デッドメッセージ
では、ハンドラーがない場合、メッセージはどこに送られますか? 新しいイベントハンドラーを追加してから、3番目のメッセージタイプを送信しましょう。
private Object deadEvent;
@Test
public void whenLongDispatched_thenDeadEvent() {
dispatcher.post(42L).now();
assertNull(messageString);
assertNull(messageInteger);
assertNotNull(deadEvent);
assertTrue(deadEvent instanceof Long);
assertTrue(42L == (Long) deadEvent);
}
@Handler
public void handleDeadEvent(DeadMessage message) {
deadEvent = message.getMessage();
}
このテストでは、Integerの代わりにLongをディスパッチします。handleInteger()も handleString()も呼び出されませんが、[ X151X] handleDeadEvent()はです。
メッセージのハンドラーがない場合、メッセージはDeadMessageオブジェクトにラップされます。 Deadmessage のハンドラーを追加したので、それをキャプチャします。
DeadMessageは安全に無視できます。 アプリケーションがデッドメッセージを追跡する必要がない場合、それらはどこにも行かないようにすることができます。
4. イベント階層の使用
StringおよびIntegerイベントの送信には制限があります。 いくつかのメッセージクラスを作成しましょう。
public class Message {}
public class AckMessage extends Message {}
public class RejectMessage extends Message {
int code;
// setters and getters
}
単純な基本クラスとそれを拡張する2つのクラスがあります。
4.1. 基本クラスの送信メッセージ
メッセージイベントから始めます。
private MBassador<Message> dispatcher = new MBassador<>();
private Message message;
private AckMessage ackMessage;
private RejectMessage rejectMessage;
@Before
public void prepareTests() {
dispatcher.subscribe(this);
}
@Test
public void whenMessageDispatched_thenMessageHandled() {
dispatcher.post(new Message()).now();
assertNotNull(message);
assertNull(ackMessage);
assertNull(rejectMessage);
}
@Handler
public void handleMessage(Message message) {
this.message = message;
}
@Handler
public void handleRejectMessage(RejectMessage message) {
rejectMessage = message;
}
@Handler
public void handleAckMessage(AckMessage message) {
ackMessage = message;
}
MBassador –高性能のパブサブイベントバスをご覧ください。 これにより、メッセージの使用が制限されますが、タイプセーフティのレイヤーが追加されます。
Message を送信すると、 handleMessage()が受信します。 他の2つのハンドラーはそうではありません。
4.2. サブクラスメッセージの送信
RejectMessage を送信しましょう:
@Test
public void whenRejectDispatched_thenMessageAndRejectHandled() {
dispatcher.post(new RejectMessage()).now();
assertNotNull(message);
assertNotNull(rejectMessage);
assertNull(ackMessage);
}
RejectMessage を送信すると、 handleRejectMessage()と handleMessage()の両方がそれを受信します。
RejectMessageはMessageを拡張するため、 R ejectMessage ハンドラーに加えて、 Messageハンドラーがそれを受信しました。
AckMessageを使用してこの動作を確認しましょう。
@Test
public void whenAckDispatched_thenMessageAndAckHandled() {
dispatcher.post(new AckMessage()).now();
assertNotNull(message);
assertNotNull(ackMessage);
assertNull(rejectMessage);
}
予想どおり、 AckMessage を送信すると、 handleAckMessage()と handleMessage()の両方がそれを受信します。
5. メッセージのフィルタリング
タイプ別にメッセージを整理することはすでに強力な機能ですが、さらにフィルタリングすることができます。
5.1. クラスとサブクラスのフィルター
RejectMessageまたはAckMessageを投稿すると、特定のタイプのイベントハンドラーと基本クラスの両方でイベントを受信しました。
Message を抽象化し、 GenericMessage などのクラスを作成することで、この型階層の問題を解決できます。 しかし、この贅沢がなければどうなるでしょうか。
メッセージフィルターを使用できます。
private Message baseMessage;
private Message subMessage;
@Test
public void whenMessageDispatched_thenMessageFiltered() {
dispatcher.post(new Message()).now();
assertNotNull(baseMessage);
assertNull(subMessage);
}
@Test
public void whenRejectDispatched_thenRejectFiltered() {
dispatcher.post(new RejectMessage()).now();
assertNotNull(subMessage);
assertNull(baseMessage);
}
@Handler(filters = { @Filter(Filters.RejectSubtypes.class) })
public void handleBaseMessage(Message message) {
this.baseMessage = message;
}
@Handler(filters = { @Filter(Filters.SubtypesOnly.class) })
public void handleSubMessage(Message message) {
this.subMessage = message;
}
@Handlerアノテーションのfiltersパラメーターは、IMessageFilterを実装するクラスを受け入れます。 ライブラリには2つの例があります。
Filters.RejectSubtypes は、その名前が示すように、すべてのサブタイプを除外します。 この場合、 RejectMessageはhandleBaseMessage()。によって処理されないことがわかります。
Filters.SubtypesOnly も、その名前が示すように機能します。つまり、すべての基本タイプをフィルターで除外します。 この場合、 MessageはhandleSubMessage()。によって処理されないことがわかります。
5.2. IMessageFilter
Filters.RejectSubtypesとFilters.SubtypesOnlyはどちらもIMessageFilterを実装しています。
RejectSubTypes は、メッセージのクラスを定義されたメッセージタイプと比較し、サブクラスとは対照的に、そのタイプの1つに等しいメッセージのみを許可します。
5.3. 条件付きフィルター
幸い、メッセージをフィルタリングする簡単な方法があります。 MBassadorは、メッセージをフィルタリングするための条件としてJavaEL式のサブセットをサポートします。
Stringメッセージをその長さに基づいてフィルタリングしてみましょう。
private String testString;
@Test
public void whenLongStringDispatched_thenStringFiltered() {
dispatcher.post("foobar!").now();
assertNull(testString);
}
@Handler(condition = "msg.length() < 7")
public void handleStringMessage(String message) {
this.testString = message;
}
「foobar!」 メッセージの長さは7文字で、フィルタリングされます。 短い文字列を送信しましょう:
@Test
public void whenShortStringDispatched_thenStringHandled() {
dispatcher.post("foobar").now();
assertNotNull(testString);
}
現在、「foobar」はわずか6文字の長さであり、通過します。
RejectMessage には、アクセサーのあるフィールドが含まれています。 そのためのフィルターを書いてみましょう:
private RejectMessage rejectMessage;
@Test
public void whenWrongRejectDispatched_thenRejectFiltered() {
RejectMessage testReject = new RejectMessage();
testReject.setCode(-1);
dispatcher.post(testReject).now();
assertNull(rejectMessage);
assertNotNull(subMessage);
assertEquals(-1, ((RejectMessage) subMessage).getCode());
}
@Handler(condition = "msg.getCode() != -1")
public void handleRejectMessage(RejectMessage rejectMessage) {
this.rejectMessage = rejectMessage;
}
ここでも、オブジェクトのメソッドをクエリして、メッセージをフィルタリングするかどうかを指定できます。
5.4. フィルタリングされたメッセージをキャプチャする
DeadEvents、と同様に、フィルタリングされたメッセージをキャプチャして処理したい場合があります。 フィルタリングされたイベントをキャプチャするための専用のメカニズムもあります。 フィルタリングされたイベントは「デッド」イベントとは異なる方法で処理されます。
これを説明するテストを書いてみましょう。
private String testString;
private FilteredMessage filteredMessage;
private DeadMessage deadMessage;
@Test
public void whenLongStringDispatched_thenStringFiltered() {
dispatcher.post("foobar!").now();
assertNull(testString);
assertNotNull(filteredMessage);
assertTrue(filteredMessage.getMessage() instanceof String);
assertNull(deadMessage);
}
@Handler(condition = "msg.length() < 7")
public void handleStringMessage(String message) {
this.testString = message;
}
@Handler
public void handleFilterMessage(FilteredMessage message) {
this.filteredMessage = message;
}
@Handler
public void handleDeadMessage(DeadMessage deadMessage) {
this.deadMessage = deadMessage;
}
FilteredMessage ハンドラーを追加すると、長さのためにフィルター処理されたStringsを追跡できます。 filterMessage には長すぎる文字列が含まれていますが、deadMessageはnullのままです。
6. 非同期メッセージのディスパッチと処理
これまでのところ、すべての例で同期メッセージディスパッチを使用しています。 post.now()を呼び出すと、メッセージは post()を呼び出したのと同じスレッドの各ハンドラーに配信されました。
6.1. 非同期ディスパッチ
MBassador.post()は、SyncAsyncPostCommandを返します。 このクラスは、次のようないくつかのメソッドを提供します。
- now() –メッセージを同期的にディスパッチします。 すべてのメッセージが配信されるまで、通話はブロックされます
- asynchronously() –メッセージパブリケーションを非同期で実行します
サンプルクラスで非同期ディスパッチを使用してみましょう。 これらのテストではAwaitilityを使用して、コードを簡略化します。
private MBassador<Message> dispatcher = new MBassador<>();
private String testString;
private AtomicBoolean ready = new AtomicBoolean(false);
@Test
public void whenAsyncDispatched_thenMessageReceived() {
dispatcher.post("foobar").asynchronously();
await().untilAtomic(ready, equalTo(true));
assertNotNull(testString);
}
@Handler
public void handleStringMessage(String message) {
this.testString = message;
ready.set(true);
}
このテストではasynchronously()を呼び出し、 AtomicBoolean をフラグとして使用し、 await()を使用して、配信スレッドがメッセージを配信するのを待ちます。
await()の呼び出しをコメントアウトすると、配信スレッドが完了する前に testString をチェックするため、テストが失敗するリスクがあります。
6.2. 非同期ハンドラーの呼び出し
非同期ディスパッチを使用すると、メッセージプロバイダーは、メッセージが各ハンドラーに配信される前にメッセージ処理に戻ることができますが、それでも各ハンドラーを順番に呼び出し、各ハンドラーは前のハンドラーが終了するのを待つ必要があります。
これは、1つのハンドラーが高価な操作を実行する場合に問題を引き起こす可能性があります。
MBassadorは、非同期ハンドラー呼び出しのメカニズムを提供します。 このために構成されたハンドラーは、スレッドでメッセージを受信します。
private Integer testInteger;
private String invocationThreadName;
private AtomicBoolean ready = new AtomicBoolean(false);
@Test
public void whenHandlerAsync_thenHandled() {
dispatcher.post(42).now();
await().untilAtomic(ready, equalTo(true));
assertNotNull(testInteger);
assertFalse(Thread.currentThread().getName().equals(invocationThreadName));
}
@Handler(delivery = Invoke.Asynchronously)
public void handleIntegerMessage(Integer message) {
this.invocationThreadName = Thread.currentThread().getName();
this.testInteger = message;
ready.set(true);
}
ハンドラーは、Handlerアノテーションのdelivery=Invoke.Asynchronouslyプロパティを使用して非同期呼び出しを要求できます。 テストでは、ディスパッチメソッドとハンドラーのThread名を比較してこれを確認します。
7. MBassadorのカスタマイズ
これまで、デフォルト構成でMBassadorのインスタンスを使用してきました。 ディスパッチャの動作は、これまでに見たものと同様に、注釈を使用して変更できます。 このチュートリアルを終了するために、さらにいくつか説明します。
7.1. 例外処理
ハンドラーは、チェックされた例外を定義できません。 代わりに、ディスパッチャーには、コンストラクターへの引数としてIPublicationErrorHandlerを提供できます。
public class MBassadorConfigurationTest
implements IPublicationErrorHandler {
private MBassador dispatcher;
private String messageString;
private Throwable errorCause;
@Before
public void prepareTests() {
dispatcher = new MBassador<String>(this);
dispatcher.subscribe(this);
}
@Test
public void whenErrorOccurs_thenErrorHandler() {
dispatcher.post("Error").now();
assertNull(messageString);
assertNotNull(errorCause);
}
@Test
public void whenNoErrorOccurs_thenStringHandler() {
dispatcher.post("Error").now();
assertNull(errorCause);
assertNotNull(messageString);
}
@Handler
public void handleString(String message) {
if ("Error".equals(message)) {
throw new Error("BOOM");
}
messageString = message;
}
@Override
public void handleError(PublicationError error) {
errorCause = error.getCause().getCause();
}
}
handleString()が Errorをスローすると、はerrorCauseに保存されます。
7.2. ハンドラーの優先順位
ハンドラーは、追加された順序とは逆の順序で呼び出されますが、これは信頼できる動作ではありません。スレッドでハンドラーを呼び出す機能があっても、ハンドラーの順序を知る必要がある場合があります。で呼び出されます。
ハンドラーの優先順位を明示的に設定できます。
private LinkedList<Integer> list = new LinkedList<>();
@Test
public void whenRejectDispatched_thenPriorityHandled() {
dispatcher.post(new RejectMessage()).now();
// Items should pop() off in reverse priority order
assertTrue(1 == list.pop());
assertTrue(3 == list.pop());
assertTrue(5 == list.pop());
}
@Handler(priority = 5)
public void handleRejectMessage5(RejectMessage rejectMessage) {
list.push(5);
}
@Handler(priority = 3)
public void handleRejectMessage3(RejectMessage rejectMessage) {
list.push(3);
}
@Handler(priority = 2, rejectSubtypes = true)
public void handleMessage(Message rejectMessage)
logger.error("Reject handler #3");
list.push(3);
}
@Handler(priority = 0)
public void handleRejectMessage0(RejectMessage rejectMessage) {
list.push(1);
}
ハンドラーは、優先度の高いものから低いものへと呼び出されます。 デフォルトの優先度であるゼロのハンドラーは、最後に呼び出されます。 ハンドラー番号pop()が逆の順序でオフになっていることがわかります。
7.3. サブタイプを拒否する、簡単な方法
上記のテストでhandleMessage()はどうなりましたか? サブタイプをフィルタリングするためにRejectSubTypes.classを使用する必要はありません。
RejectSubTypes は、クラスと同じフィルタリングを提供するブールフラグですが、IMessageFilter実装よりも優れたパフォーマンスを提供します。
ただし、サブタイプのみを受け入れるために、フィルターベースの実装を使用する必要があります。
8. 結論
MBassadorは、オブジェクト間でメッセージを渡すためのシンプルでわかりやすいライブラリです。 メッセージはさまざまな方法で編成でき、同期または非同期でディスパッチできます。
そして、いつものように、この例はこのGitHubプロジェクトで利用できます。