1概要

簡単に言うと、https://github.com/bennidi/mbassador[MBassador]は、https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe__pattern[publish-subscribeを利用した** 高性能イベントバスです]セマンティクス

メッセージは1人以上のピアにブロードキャストされ、ユーザの数やメッセージの使用方法に関する事前の知識はありません。


2 Mavenの依存関係

ライブラリを使用する前に、https://search.maven.org/classic/#search%7Cga%7C1%7Ca%3A%22mbassador%22[mbassador]依存関係を追加する必要があります。

<dependency>
    <groupId>net.engio</groupId>
    <artifactId>mbassador</artifactId>
    <version>1.3.1</version>
</dependency>


3基本イベント処理


3.1. 簡単な例

メッセージを公開する簡単な例から始めましょう。

private MBassador<Object> dispatcher = new MBassador<>();
private String messageString;

@Before
public void prepareTests() {
    dispatcher.subscribe(this);
}

@Test
public void whenStringDispatched__thenHandleString() {
    dispatcher.post("TestString").now();

    assertNotNull(messageString);
    assertEquals("TestString", messageString);
}

@Handler
public void handleString(String message) {
    messageString = message;
}

このテストクラスの冒頭で、デフォルトのコンストラクタを使った

MBassador

の作成を見ています。次に、

@ Before

メソッドで、

subscribe()

を呼び出し、クラス自体への参照を渡します。


subscribe()では、ディスパッチャは

@ Handler__アノテーションについてサブスクライバを調べます。

そして最初のテストでは、メッセージをディスパッチするために

dispatcher.post(…​)。now()

を呼び出します – その結果

handleString()

が呼び出されます。

この初期テストはいくつかの重要な概念を実証します。 **

@Handler

でアノテーションが付けられた1つ以上のメソッドがある限り、任意の

Object

をサブスクライバーにすることができます。

わかりやすくするために自分自身を購読するテストオブジェクトを使用していますが、ほとんどのプロダクションシナリオでは、メッセージディスパッチャはコンシューマとは異なるクラスになります。

  • ハンドラメソッドは一つの入力パラメータ – メッセージのみを持ち、チェックされた例外を投げることはできません。


subscribe()

メソッドと同様に、postメソッドは任意の

Object

を受け入れます。この

Object

は購読者に配信されます。

メッセージが投稿されると、そのメッセージタイプを購読しているリスナーに配信されます。

別のメッセージハンドラを追加して、異なるメッセージタイプを送信しましょう。

private Integer messageInteger;

@Test
public void whenIntegerDispatched__thenHandleInteger() {
    dispatcher.post(42).now();

    assertNull(messageString);
    assertNotNull(messageInteger);
    assertTrue(42 == messageInteger);
}

@Handler
public void handleInteger(Integer message) {
    messageInteger = message;
}

予想通り、

Integer

をディスパッチすると、

handleInteger()

が呼び出され、

handleString()

は呼び出されません。単一のディスパッチャを使用して複数のメッセージタイプを送信できます。


3.2. デッドメッセージ

それでそれのためのハンドラがないときメッセージはどこに行きますか?新しいイベントハンドラを追加してから、3番目のメッセージタイプを送信しましょう。

private Object deadEvent;

@Test
public void whenLongDispatched__thenDeadEvent() {
    dispatcher.post(42L).now();

    assertNull(messageString);
    assertNull(messageInteger);
    assertNotNull(deadEvent);
    assertTrue(deadEvent instanceof Long);
    assertTrue(42L == (Long) deadEvent);
}

@Handler
public void handleDeadEvent(DeadMessage message) {
    deadEvent = message.getMessage();
}

このテストでは、

Integerではなく

Long__をディスパッチします。

  • メッセージのハンドラがない場合、

    DeadMessage

    オブジェクトにラップされます。**

    Deadmessage

    のハンドラを追加したので、キャプチャします。


DeadMessage

は無視しても問題ありません。アプリケーションがデッドメッセージを追跡する必要がなければ、どこにも行かないようにすることができます。


4イベント階層の使用


String

および

Integer

イベントを送信することは制限的です。メッセージクラスをいくつか作成しましょう。

public class Message {}

public class AckMessage extends Message {}

public class RejectMessage extends Message {
    int code;

   //setters and getters
}

単純な基本クラスとそれを拡張する2つのクラスがあります。


4.1. 基本クラスを送信する

Message


私たちは

Message

イベントから始めます。

private MBassador<Message> dispatcher = new MBassador<>();

private Message message;
private AckMessage ackMessage;
private RejectMessage rejectMessage;

@Before
public void prepareTests() {
    dispatcher.subscribe(this);
}

@Test
public void whenMessageDispatched__thenMessageHandled() {
    dispatcher.post(new Message()).now();
    assertNotNull(message);
    assertNull(ackMessage);
    assertNull(rejectMessage);
}

@Handler
public void handleMessage(Message message) {
    this.message = message;
}

@Handler
public void handleRejectMessage(RejectMessage message) {
   rejectMessage = message;
}

@Handler
public void handleAckMessage(AckMessage message) {
    ackMessage = message;
}

高性能のパブサブイベントバス – MBassadorを発見してください。これは

Messages

の使用に限定しますが、型安全の層を追加します。


Message

を送信すると、

handleMessage()

がそれを受け取ります。他の2つのハンドラはそうしません。


4.2. サブクラスメッセージを送信する


RejectMessage

を送信しましょう。

@Test
public void whenRejectDispatched__thenMessageAndRejectHandled() {
    dispatcher.post(new RejectMessage()).now();

    assertNotNull(message);
    assertNotNull(rejectMessage);
    assertNull(ackMessage);
}


RejectMessage

を送信すると、

handleRejectMessage()と

handleMessage()の両方がそれを受け取ります。


RejectMessage



Messageを拡張しているので、

RejectMessage

ハンドラーに加えて

Message__ハンドラーがそれを受け取りました。

この動作を

AckMessage

で検証しましょう。

@Test
public void whenAckDispatched__thenMessageAndAckHandled() {
    dispatcher.post(new AckMessage()).now();

    assertNotNull(message);
    assertNotNull(ackMessage);
    assertNull(rejectMessage);
}

予想通り、

AckMessage

を送信すると、

handleAckMessage()

と__handleMessage()の両方がそれを受け取ります。


5メッセージのフィルタリング

メッセージをタイプ別に整理することはすでに強力な機能ですが、それらをさらに絞り込むことができます。


5.1. クラスとサブクラスで絞り込む


RejectMessage

または

AckMessage

を投稿すると、特定の型のイベントハンドラと基本クラスの両方でイベントを受け取りました。


Message

を抽象化し、

GenericMessage

などのクラスを作成することで、この型階層の問題を解決できます。しかし、この贅沢がなければどうなりますか?

メッセージフィルタを使用できます。

private Message baseMessage;
private Message subMessage;

@Test
public void whenMessageDispatched__thenMessageFiltered() {
    dispatcher.post(new Message()).now();

    assertNotNull(baseMessage);
    assertNull(subMessage);
}

@Test
public void whenRejectDispatched__thenRejectFiltered() {
    dispatcher.post(new RejectMessage()).now();

    assertNotNull(subMessage);
    assertNull(baseMessage);
}

@Handler(filters = { @Filter(Filters.RejectSubtypes.class) })
public void handleBaseMessage(Message message) {
    this.baseMessage = message;
}

@Handler(filters = { @Filter(Filters.SubtypesOnly.class) })
public void handleSubMessage(Message message) {
    this.subMessage = message;
}


  • @ Handler

    アノテーションの

    filters

    パラメータは

    IMessageFilter

    ** を実装する

    Class

    を受け入れます。ライブラリには2つの例があります。


Filters.RejectSubtypes

は、その名前が示すように機能します。サブタイプを除外します。この場合、

RejectMessage

は__handleBaseMessage()によって処理されません。


Filters.SubtypesOnly

も、その名前が示すとおりに機能します。つまり、基本型を除外します。この場合、

Message

は__handleSubMessage()によって処理されないことがわかります。


5.2.

IMessageFilter



Filters.RejectSubtypes



Filters.SubtypesOnly

はどちらも

IMessageFilter

を実装しています。


RejectSubTypes

は、メッセージのクラスを定義済みのメッセージタイプと比較し、サブクラスとは対照的に、そのタイプの1つに等しいメッセージの通過を許可します。


5.3. 条件で絞り込む

幸い、メッセージをより簡単にフィルタリングする方法があります。

メッセージをフィルタリングするための条件として、MBassadorはhttps://en.wikipedia.org/wiki/Unified

Expression

Language[Java EL式]のサブセットをサポートしています。

長さに基づいて

String

メッセージをフィルタリングしましょう。

private String testString;

@Test
public void whenLongStringDispatched__thenStringFiltered() {
    dispatcher.post("foobar!").now();

    assertNull(testString);
}

@Handler(condition = "msg.length() < 7")
public void handleStringMessage(String message) {
    this.testString = message;
}

「foobar!」というメッセージは7文字の長さで、フィルタリングされています。もっと短い

String

を送りましょう:

@Test
public void whenShortStringDispatched__thenStringHandled() {
    dispatcher.post("foobar").now();

    assertNotNull(testString);
}

今、「foobar」の長さはわずか6文字で、通過します。


RejectMessage

には、アクセサを含むフィールドが含まれています。そのためのフィルタを書きましょう。

private RejectMessage rejectMessage;

@Test
public void whenWrongRejectDispatched__thenRejectFiltered() {

    RejectMessage testReject = new RejectMessage();
    testReject.setCode(-1);

    dispatcher.post(testReject).now();

    assertNull(rejectMessage);
    assertNotNull(subMessage);
    assertEquals(-1, ((RejectMessage) subMessage).getCode());
}

@Handler(condition = "msg.getCode() != -1")
public void handleRejectMessage(RejectMessage rejectMessage) {
    this.rejectMessage = rejectMessage;
}

ここでもまた、オブジェクトのメソッドをクエリして、メッセージをフィルタリングするかどうかを選択できます。


5.4. フィルタリングされたメッセージをキャプチャする

__DeadEventsと同様に、フィルタ処理されたメッセージをキャプチャして処理することをお勧めします。フィルタリングされたイベントをキャプチャするための専用のメカニズムもあります。フィルタリングされたイベントは「デッド」イベントとは異なる方法で処理されます。

これを説明するテストを書きましょう。

private String testString;
private FilteredMessage filteredMessage;
private DeadMessage deadMessage;

@Test
public void whenLongStringDispatched__thenStringFiltered() {
    dispatcher.post("foobar!").now();

    assertNull(testString);
    assertNotNull(filteredMessage);
    assertTrue(filteredMessage.getMessage() instanceof String);
    assertNull(deadMessage);
}

@Handler(condition = "msg.length() < 7")
public void handleStringMessage(String message) {
    this.testString = message;
}

@Handler
public void handleFilterMessage(FilteredMessage message) {
    this.filteredMessage = message;
}

@Handler
public void handleDeadMessage(DeadMessage deadMessage) {
    this.deadMessage = deadMessage;
}


FilteredMessage

ハンドラを追加すると、長さのためにフィルタ処理された

Strings

を追跡できます。

filterMessage

には長すぎる

String

が含まれていますが、

deadMessage



null.

のままです。


6. 非同期メッセージのディスパッチと処理

これまでのすべての例では同期メッセージのディスパッチを使用しました。

post.now()

を呼び出したとき、メッセージは

post()

fromと同じスレッドの各ハンドラに配信されました。


6.1. 非同期ディスパッチ


MBassador.post()

はhttp://bennidi.github.io/mbassador/net/engio/mbassy/bus/publication/SyncAsyncPostCommand.html[SyncAsyncPostCommand]を返します。

このクラスには、以下のようないくつかのメソッドがあります。


  • now()

    – メッセージを同期的にディスパッチします。コールはまでブロックされます

すべてのメッセージが配信されました
**

asynchronously()

– メッセージパブリケーションを非同期的に実行します。

サンプルクラスで非同期ディスパッチを使用しましょう。これらのテストでは、コードを簡素化するために

Awaitility

を使用します。

private MBassador<Message> dispatcher = new MBassador<>();
private String testString;
private AtomicBoolean ready = new AtomicBoolean(false);

@Test
public void whenAsyncDispatched__thenMessageReceived() {
    dispatcher.post("foobar").asynchronously();

    await().untilAtomic(ready, equalTo(true));
    assertNotNull(testString);
}

@Handler
public void handleStringMessage(String message) {
    this.testString = message;
    ready.set(true);
}

このテストでは

asynchronously()

を呼び出し、

await()

と共に

AtomicBoolean

をフラグとして使用して、配信スレッドがメッセージを配信するのを待ちます。


await()

の呼び出しをコメントアウトすると、配信スレッドが完了する前に

testString

をチェックするため、テストが失敗する危険性があります。


6.2. 非同期ハンドラ呼び出し

非同期ディスパッチは、メッセージが各ハンドラに配信される前にメッセージプロバイダがメッセージ処理に戻ることを可能にしますが、それでも各ハンドラを順番に呼び出し、各ハンドラは前のハンドラが終了するのを待たなければなりません。

1つのハンドラが高価な操作を実行すると、これが問題につながる可能性があります。

MBassadorは非同期ハンドラ呼び出しのためのメカニズムを提供します。

これに対して設定されたハンドラは、スレッド内でメッセージを受信します。

private Integer testInteger;
private String invocationThreadName;
private AtomicBoolean ready = new AtomicBoolean(false);

@Test
public void whenHandlerAsync__thenHandled() {
    dispatcher.post(42).now();

    await().untilAtomic(ready, equalTo(true));
    assertNotNull(testInteger);
    assertFalse(Thread.currentThread().getName().equals(invocationThreadName));
}

@Handler(delivery = Invoke.Asynchronously)
public void handleIntegerMessage(Integer message) {

    this.invocationThreadName = Thread.currentThread().getName();
    this.testInteger = message;
    ready.set(true);
}

ハンドラーは、

Handler

アノテーションの

delivery = Invoke.Asynchronously

プロパティーを使用して非同期呼び出しを要求できます。私たちのテストでは、ディスパッチメソッドとハンドラの

Thread

名を比較することによってこれを確認します。

** 7. MBassadorのカスタマイズ

これまでのところ、デフォルト設定のMBassadorのインスタンスを使用しています。これまで見てきたように、ディスパッチャの振る舞いはアノテーションで変更することができます。このチュートリアルを終えるために、もう少しカバーします。


7.1. 例外処理

ハンドラはチェック済み例外を定義できません。代わりに、ディスパッチャにそのコンストラクタへの引数として

IPublicationErrorHandler

を渡すことができます。

public class MBassadorConfigurationTest
  implements IPublicationErrorHandler {

    private MBassador dispatcher;
    private String messageString;
    private Throwable errorCause;

    @Before
    public void prepareTests() {
        dispatcher = new MBassador<String>(this);
        dispatcher.subscribe(this);
    }

    @Test
    public void whenErrorOccurs__thenErrorHandler() {
        dispatcher.post("Error").now();

        assertNull(messageString);
        assertNotNull(errorCause);
    }

    @Test
    public void whenNoErrorOccurs__thenStringHandler() {
        dispatcher.post("Error").now();

        assertNull(errorCause);
        assertNotNull(messageString);
    }

    @Handler
    public void handleString(String message) {
        if ("Error".equals(message)) {
            throw new Error("BOOM");
        }
        messageString = message;
    }

    @Override
    public void handleError(PublicationError error) {
        errorCause = error.getCause().getCause();
    }
}


handleString()が

Errorをスローすると、__errorCauseに保存されます。


7.2. ハンドラの優先順位


  • Handlers

    は追加された順番と逆の順番で呼ばれますが、これは私たちが頼りにしたい動作ではありません。 。

ハンドラの優先順位を明示的に設定できます。

private LinkedList<Integer> list = new LinkedList<>();

@Test
public void whenRejectDispatched__thenPriorityHandled() {
    dispatcher.post(new RejectMessage()).now();

   //Items should pop() off in reverse priority order
    assertTrue(1 == list.pop());
    assertTrue(3 == list.pop());
    assertTrue(5 == list.pop());
}

@Handler(priority = 5)
public void handleRejectMessage5(RejectMessage rejectMessage) {
    list.push(5);
}

@Handler(priority = 3)
public void handleRejectMessage3(RejectMessage rejectMessage) {
    list.push(3);
}

@Handler(priority = 2, rejectSubtypes = true)
public void handleMessage(Message rejectMessage)
    logger.error("Reject handler #3");
    list.push(3);
}

@Handler(priority = 0)
public void handleRejectMessage0(RejectMessage rejectMessage) {
    list.push(1);
}

ハンドラは最高の優先度から最低の優先度まで呼び出されます。デフォルトの優先順位がゼロであるハンドラーは、最後に呼び出されます。ハンドラ番号が

pop()

offの逆順になっているのがわかります。


7.3. サブタイプを拒否する、簡単な方法

上記のテストで

handleMessage()

はどうなりましたか?サブタイプをフィルタするために

RejectSubTypes.class

を使用する必要はありません。


RejectSubTypes

は、クラスと同じフィルタリングを提供するブール値のフラグですが、

IMessageFilter

実装よりもパフォーマンスが向上します。

ただし、サブタイプのみを受け入れるためにフィルタベースの実装を使用する必要があります。


8結論

MBassadorは、オブジェクト間でメッセージをやり取りするためのシンプルで直接的なライブラリです。メッセージはさまざまな方法で編成でき、同期的または非同期的に送信できます。

そして、いつものように、この例はhttps://github.com/eugenp/tutorials/tree/master/libraries/[このGitHubプロジェクト]にあります。