1. 序章

このチュートリアルでは、SpringBootとオープンソースの分散メッセージングおよびストリーミングデータプラットフォームであるApacheRocketMQを使用して、メッセージプロデューサーとコンシューマーを作成します。

2. 依存関係

Mavenプロジェクトの場合、 RocketMQ Spring BootStarter依存関係を追加する必要があります。

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.4</version>
</dependency>

3. メッセージの作成

この例では、ユーザーがショッピングカートにアイテムを追加または削除するたびにイベントを送信する、基本的なメッセージプロデューサーを作成します。

まず、application.propertiesでサーバーの場所とグループ名を設定しましょう。

rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=cart-producer-group

複数のネームサーバーがある場合は、 host:port; host:portのようにリストできることに注意してください。

ここで、簡単にするために、 CommandLineRunner アプリケーションを作成し、アプリケーションの起動中にいくつかのイベントを生成します。

@SpringBootApplication
public class CartEventProducer implements CommandLineRunner {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public static void main(String[] args) {
        SpringApplication.run(CartEventProducer.class, args);
    }

    public void run(String... args) throws Exception {
        rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("bike", 1));
        rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("computer", 2));
        rocketMQTemplate.convertAndSend("cart-item-removed-topic", new CartItemEvent("bike", 1));
    }
}

CartItemEvent は、アイテムのIDと数量の2つのプロパティで構成されています。

class CartItemEvent {
    private String itemId;
    private int quantity;

    // constructor, getters and setters
}

上記の例では、 convertAndSend()メソッドを使用します。これは、 AbstractMessageSendingTemplate 抽象クラスによって定義された汎用メソッドであり、カートイベントを送信します。 これには2つのパラメーターが必要です。宛先(この場合はトピック名)とメッセージペイロードです。

4. メッセージ消費者

RocketMQメッセージの使用は、 @RocketMQMessageListener で注釈が付けられたSpringコンポーネントを作成し、RocketMQListenerインターフェイスを実装するのと同じくらい簡単です。

@SpringBootApplication
public class CartEventConsumer {

    public static void main(String[] args) {
        SpringApplication.run(CartEventConsumer.class, args);
    }

    @Service
    @RocketMQMessageListener(
      topic = "cart-item-add-topic",
      consumerGroup = "cart-consumer_cart-item-add-topic"
    )
    public class CardItemAddConsumer implements RocketMQListener<CartItemEvent> {
        public void onMessage(CartItemEvent addItemEvent) {
            log.info("Adding item: {}", addItemEvent);
            // additional logic
        }
    }

    @Service
    @RocketMQMessageListener(
      topic = "cart-item-removed-topic",
      consumerGroup = "cart-consumer_cart-item-removed-topic"
    )
    public class CardItemRemoveConsumer implements RocketMQListener<CartItemEvent> {
        public void onMessage(CartItemEvent removeItemEvent) {
            log.info("Removing item: {}", removeItemEvent);
            // additional logic
        }
    }
}

聞いているメッセージトピックごとに個別のコンポーネントを作成する必要があります。 これらの各リスナーでは、 @ RocketMQMessageListenerアノテーションを使用してトピックの名前とコンシューマーグループの名前を定義します。

5. 同期および非同期送信

前の例では、convertAndSendメソッドを使用してメッセージを送信しました。 ただし、他にもいくつかのオプションがあります。

たとえば、 SendResult オブジェクトを返すため、convertAndSendとは異なるsyncSendを呼び出すことができます。

たとえば、メッセージが正常に送信されたかどうかを確認したり、IDを取得したりするために使用できます。

public void run(String... args) throws Exception { 
    SendResult addBikeResult = rocketMQTemplate.syncSend("cart-item-add-topic", 
      new CartItemEvent("bike", 1)); 
    SendResult addComputerResult = rocketMQTemplate.syncSend("cart-item-add-topic", 
      new CartItemEvent("computer", 2)); 
    SendResult removeBikeResult = rocketMQTemplate.syncSend("cart-item-removed-topic", 
      new CartItemEvent("bike", 1)); 
}

convertAndSendと同様に、このメソッドは、送信手順が完了したときにのみ返されます。

重要な通知メッセージやSMS通知など、信頼性が必要な場合は同期送信を使用する必要があります。

一方、代わりにメッセージを非同期で送信し、送信が完了したときに通知を受け取ることもできます。

これは、 asyncSend を使用して行うことができます。これは、 SendCallback をパラメーターとして受け取り、すぐに戻ります。

rocketMQTemplate.asyncSend("cart-item-add-topic", new CartItemEvent("bike", 1), new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        log.error("Successfully sent cart item");
    }

    @Override
    public void onException(Throwable throwable) {
        log.error("Exception during cart item sending", throwable);
    }
});

高スループットが必要な場合は、非同期送信を使用します。

最後に、スループット要件が非常に高いシナリオでは、asyncSendの代わりにsendOneWayを使用できます。  sendOneWay は、メッセージが送信されることを保証しないという点でasyncSendとは異なります。

一方向伝送は、ログの収集などの通常の信頼性の場合にも使用できます。

6. トランザクションでのメッセージの送信

RocketMQは、トランザクション内でメッセージを送信する機能を提供します。 sendInTransaction()メソッドを使用してこれを行うことができます。

MessageBuilder.withPayload(new CartItemEvent("bike", 1)).build();
rocketMQTemplate.sendMessageInTransaction("test-transaction", "topic-name", msg, null);

また、RocketMQLocalTransactionListenerインターフェイスを実装する必要があります。

@RocketMQTransactionListener(txProducerGroup="test-transaction")
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
      @Override
      public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
          // ... local transaction process, return ROLLBACK, COMMIT or UNKNOWN
          return RocketMQLocalTransactionState.UNKNOWN;
      }

      @Override
      public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
          // ... check transaction status and return ROLLBACK, COMMIT or UNKNOWN
          return RocketMQLocalTransactionState.COMMIT;
      }
}

sendMessageInTransaction()では、最初のパラメーターはトランザクション名です。 これ @RocketMQTransactionListenerのメンバーフィールドtxProducerGroup。と同じである必要があります

7. メッセージプロデューサーの構成

メッセージプロデューサー自体の側面を構成することもできます。

  • rocketmq.producer.send-message-timeout :ミリ秒単位のメッセージ送信タイムアウト–デフォルト値は3000です
  • rocketmq.producer.compress-message-body-threshold :それを超えると、RocketMQはメッセージを圧縮します。デフォルト値は1024です。
  • rocketmq.producer.max-message-size :バイト単位の最大メッセージサイズ–デフォルト値は4096です。
  • rocketmq.producer.retry-times-when-send-async-failed :失敗を送信する前に非同期モードで内部的に実行する再試行の最大数–デフォルト値は2です。
  • rocketmq.producer.retry-next-server :内部での送信の失敗時に別のブローカーを再試行するかどうかを示します-デフォルト値はfalseです。
  • rocketmq.producer.retry-times-when-send-failed :失敗を送信する前に非同期モードで内部的に実行する再試行の最大数–デフォルト値は2です。

8. 結論

この記事では、ApacheRocketMQとSpringBootを使用してメッセージを送信および消費する方法を学習しました。 いつものように、すべてのソースコードはGitHubで利用できます。