スプリングブートを備えたApacheRocketMQ
1. 序章
このチュートリアルでは、SpringBootとオープンソースの分散メッセージングおよびストリーミングデータプラットフォームであるApacheRocketMQを使用して、メッセージプロデューサーとコンシューマーを作成します。
2. 依存関係
Mavenプロジェクトの場合、 RocketMQ Spring BootStarter依存関係を追加する必要があります。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
3. メッセージの作成
この例では、ユーザーがショッピングカートにアイテムを追加または削除するたびにイベントを送信する、基本的なメッセージプロデューサーを作成します。
まず、application.propertiesでサーバーの場所とグループ名を設定しましょう。
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=cart-producer-group
複数のネームサーバーがある場合は、 host:port; host:portのようにリストできることに注意してください。
ここで、簡単にするために、 CommandLineRunner アプリケーションを作成し、アプリケーションの起動中にいくつかのイベントを生成します。
@SpringBootApplication
public class CartEventProducer implements CommandLineRunner {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public static void main(String[] args) {
SpringApplication.run(CartEventProducer.class, args);
}
public void run(String... args) throws Exception {
rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("bike", 1));
rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("computer", 2));
rocketMQTemplate.convertAndSend("cart-item-removed-topic", new CartItemEvent("bike", 1));
}
}
CartItemEvent は、アイテムのIDと数量の2つのプロパティで構成されています。
class CartItemEvent {
private String itemId;
private int quantity;
// constructor, getters and setters
}
上記の例では、 convertAndSend()メソッドを使用します。これは、 AbstractMessageSendingTemplate 抽象クラスによって定義された汎用メソッドであり、カートイベントを送信します。 これには2つのパラメーターが必要です。宛先(この場合はトピック名)とメッセージペイロードです。
4. メッセージ消費者
RocketMQメッセージの使用は、 @RocketMQMessageListener で注釈が付けられたSpringコンポーネントを作成し、RocketMQListenerインターフェイスを実装するのと同じくらい簡単です。
@SpringBootApplication
public class CartEventConsumer {
public static void main(String[] args) {
SpringApplication.run(CartEventConsumer.class, args);
}
@Service
@RocketMQMessageListener(
topic = "cart-item-add-topic",
consumerGroup = "cart-consumer_cart-item-add-topic"
)
public class CardItemAddConsumer implements RocketMQListener<CartItemEvent> {
public void onMessage(CartItemEvent addItemEvent) {
log.info("Adding item: {}", addItemEvent);
// additional logic
}
}
@Service
@RocketMQMessageListener(
topic = "cart-item-removed-topic",
consumerGroup = "cart-consumer_cart-item-removed-topic"
)
public class CardItemRemoveConsumer implements RocketMQListener<CartItemEvent> {
public void onMessage(CartItemEvent removeItemEvent) {
log.info("Removing item: {}", removeItemEvent);
// additional logic
}
}
}
聞いているメッセージトピックごとに個別のコンポーネントを作成する必要があります。 これらの各リスナーでは、 @ RocketMQMessageListenerアノテーションを使用してトピックの名前とコンシューマーグループの名前を定義します。
5. 同期および非同期送信
前の例では、convertAndSendメソッドを使用してメッセージを送信しました。 ただし、他にもいくつかのオプションがあります。
たとえば、 SendResult オブジェクトを返すため、convertAndSendとは異なるsyncSendを呼び出すことができます。
たとえば、メッセージが正常に送信されたかどうかを確認したり、IDを取得したりするために使用できます。
public void run(String... args) throws Exception {
SendResult addBikeResult = rocketMQTemplate.syncSend("cart-item-add-topic",
new CartItemEvent("bike", 1));
SendResult addComputerResult = rocketMQTemplate.syncSend("cart-item-add-topic",
new CartItemEvent("computer", 2));
SendResult removeBikeResult = rocketMQTemplate.syncSend("cart-item-removed-topic",
new CartItemEvent("bike", 1));
}
convertAndSendと同様に、このメソッドは、送信手順が完了したときにのみ返されます。
重要な通知メッセージやSMS通知など、信頼性が必要な場合は同期送信を使用する必要があります。
一方、代わりにメッセージを非同期で送信し、送信が完了したときに通知を受け取ることもできます。
これは、 asyncSend を使用して行うことができます。これは、 SendCallback をパラメーターとして受け取り、すぐに戻ります。
rocketMQTemplate.asyncSend("cart-item-add-topic", new CartItemEvent("bike", 1), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.error("Successfully sent cart item");
}
@Override
public void onException(Throwable throwable) {
log.error("Exception during cart item sending", throwable);
}
});
高スループットが必要な場合は、非同期送信を使用します。
最後に、スループット要件が非常に高いシナリオでは、asyncSendの代わりにsendOneWayを使用できます。 sendOneWay は、メッセージが送信されることを保証しないという点でasyncSendとは異なります。
一方向伝送は、ログの収集などの通常の信頼性の場合にも使用できます。
6. トランザクションでのメッセージの送信
RocketMQは、トランザクション内でメッセージを送信する機能を提供します。 sendInTransaction()メソッドを使用してこれを行うことができます。
MessageBuilder.withPayload(new CartItemEvent("bike", 1)).build();
rocketMQTemplate.sendMessageInTransaction("test-transaction", "topic-name", msg, null);
また、RocketMQLocalTransactionListenerインターフェイスを実装する必要があります。
@RocketMQTransactionListener(txProducerGroup="test-transaction")
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// ... local transaction process, return ROLLBACK, COMMIT or UNKNOWN
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// ... check transaction status and return ROLLBACK, COMMIT or UNKNOWN
return RocketMQLocalTransactionState.COMMIT;
}
}
sendMessageInTransaction()では、最初のパラメーターはトランザクション名です。 これ @RocketMQTransactionListenerのメンバーフィールドtxProducerGroup。と同じである必要があります
7. メッセージプロデューサーの構成
メッセージプロデューサー自体の側面を構成することもできます。
- rocketmq.producer.send-message-timeout :ミリ秒単位のメッセージ送信タイムアウト–デフォルト値は3000です
- rocketmq.producer.compress-message-body-threshold :それを超えると、RocketMQはメッセージを圧縮します。デフォルト値は1024です。
- rocketmq.producer.max-message-size :バイト単位の最大メッセージサイズ–デフォルト値は4096です。
- rocketmq.producer.retry-times-when-send-async-failed :失敗を送信する前に非同期モードで内部的に実行する再試行の最大数–デフォルト値は2です。
- rocketmq.producer.retry-next-server :内部での送信の失敗時に別のブローカーを再試行するかどうかを示します-デフォルト値はfalseです。
- rocketmq.producer.retry-times-when-send-failed :失敗を送信する前に非同期モードで内部的に実行する再試行の最大数–デフォルト値は2です。
8. 結論
この記事では、ApacheRocketMQとSpringBootを使用してメッセージを送信および消費する方法を学習しました。 いつものように、すべてのソースコードはGitHubで利用できます。