1. 概要

このチュートリアルでは、Java SDK を使用してAmazonのSQS(Simple Queue Service)を使用する方法について説明します。

2. 前提条件

Amazon AWS SDK for SQSを使用するために必要なMavenの依存関係、AWSアカウント設定、およびクライアント接続は、こちらの記事と同じです。

前の記事で説明したように、 AWSCredentials、のインスタンスを作成したと仮定すると、先に進んでSQSクライアントを作成できます。

AmazonSQS sqs = AmazonSQSClientBuilder.standard()
  .withCredentials(new AWSStaticCredentialsProvider(credentials))
  .withRegion(Regions.US_EAST_1)
  .build();

3. キューの作成

SQSクライアントをセットアップしたら、キューの作成はかなり簡単です。

3.1. 標準キューの作成

標準キューを作成する方法を見てみましょう。 これを行うには、 CreateQueueRequestのインスタンスを作成する必要があります:

CreateQueueRequest createStandardQueueRequest = new CreateQueueRequest("baeldung-queue");
String standardQueueUrl = sqs.createQueue(createStandardQueueRequest).getQueueUrl();

3.2. FIFOキューの作成

FIFOの作成は、標準キューの作成に似ています。 以前と同様に、CreateQueueRequestのインスタンスを引き続き使用します。 今回のみ、キュー属性を渡し、FifoQueue属性をtrueに設定する必要があります:

Map<String, String> queueAttributes = new HashMap<>();
queueAttributes.put("FifoQueue", "true");
queueAttributes.put("ContentBasedDeduplication", "true");
CreateQueueRequest createFifoQueueRequest = new CreateQueueRequest(
  "baeldung-queue.fifo").withAttributes(queueAttributes);
String fifoQueueUrl = sqs.createQueue(createFifoQueueRequest)
  .getQueueUrl();

4. キューへのメッセージの投稿

キューを設定したら、メッセージの送信を開始できます。

4.1. 標準キューへのメッセージの投稿

標準キューにメッセージを送信するには、 SendMessageRequestのインスタンスを作成する必要があります。

次に、メッセージ属性のマップをこのリクエストに添付します。

Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
messageAttributes.put("AttributeOne", new MessageAttributeValue()
  .withStringValue("This is an attribute")
  .withDataType("String"));  
    
SendMessageRequest sendMessageStandardQueue = new SendMessageRequest()
  .withQueueUrl(standardQueueUrl)
  .withMessageBody("A simple message.")
  .withDelaySeconds(30)
  .withMessageAttributes(messageAttributes);

sqs.sendMessage(sendMessageStandardQueue);

withDelaySeconds()は、メッセージがキューに到着するまでの時間を指定します。

4.2. FIFOキューへのメッセージの投稿

この場合の唯一の違いは、メッセージが属するグループを指定する必要があることです:

SendMessageRequest sendMessageFifoQueue = new SendMessageRequest()
  .withQueueUrl(fifoQueueUrl)
  .withMessageBody("Another simple message.")
  .withMessageGroupId("baeldung-group-1")
  .withMessageAttributes(messageAttributes);

上記のコード例でわかるように、 withMessageGroupId()。を使用してグループを指定します。

4.3. キューへの複数のメッセージの投稿

私たちもできます単一のリクエストを使用して、複数のメッセージをキューに投稿します。 のリストを作成します SendMessageBatchRequestEntry のインスタンスを使用して送信します SendMessageBatchRequest

List <SendMessageBatchRequestEntry> messageEntries = new ArrayList<>();
messageEntries.add(new SendMessageBatchRequestEntry()
  .withId("id-1")
  .withMessageBody("batch-1")
  .withMessageGroupId("baeldung-group-1"));
messageEntries.add(new SendMessageBatchRequestEntry()
  .withId("id-2")
  .withMessageBody("batch-2")
  .withMessageGroupId("baeldung-group-1"));

SendMessageBatchRequest sendMessageBatchRequest
 = new SendMessageBatchRequest(fifoQueueUrl, messageEntries);
sqs.sendMessageBatch(sendMessageBatchRequest);

5. キューからのメッセージの読み取り

ReceiveMessageRequest:のインスタンスでreceiveMessage()メソッドを呼び出すことにより、キューからメッセージを受信できます。

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(fifoQueueUrl)
  .withWaitTimeSeconds(10)
  .withMaxNumberOfMessages(10);

List<Message> sqsMessages = sqs.receiveMessage(receiveMessageRequest).getMessages();

withMaxNumberOfMessages()を使用して、キューから取得するメッセージの数を指定します—最大は10であることに注意してください。

メソッドwithWaitTimeSeconds()は、long-pollingを有効にします。longpollingは、SQSに送信する受信メッセージリクエストの数を制限する方法です。

簡単に言えば、これは、メッセージを取得するために指定された秒数まで待機することを意味します。 その期間キューにメッセージがない場合、リクエストは空を返します。 その間にメッセージがキューに到着すると、メッセージが返されます。

特定のメッセージの属性と本文を取得できます:

sqsMessages.get(0).getAttributes();
sqsMessages.get(0).getBody();

6. キューからのメッセージの削除

メッセージを削除するには、DeleteMessageRequestを使用します。

sqs.deleteMessage(new DeleteMessageRequest()
  .withQueueUrl(fifoQueueUrl)
  .withReceiptHandle(sqsMessages.get(0).getReceiptHandle()));

7. デッドレターキュー

デッドレターキューは、そのベースキューと同じタイプである必要があります— ベースキューがFIFOの場合はFIFOであり、ベースキューが標準の場合は標準である必要があります。 この例では、標準キューを使用します。

最初に行う必要があるのは、デッドレターキューとなるものを作成することです:

String deadLetterQueueUrl = sqs.createQueue("baeldung-dead-letter-queue").getQueueUrl();

次に、新しく作成されたキューのARN(Amazonリソース名)を取得します:

GetQueueAttributesResult deadLetterQueueAttributes = sqs.getQueueAttributes(
  new GetQueueAttributesRequest(deadLetterQueueUrl)
    .withAttributeNames("QueueArn"));

String deadLetterQueueARN = deadLetterQueueAttributes.getAttributes()
  .get("QueueArn");

最後に、この新しく作成されたキューを元の標準キューのデッドレターキューに設定します:

SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest()
  .withQueueUrl(standardQueueUrl)
  .addAttributesEntry("RedrivePolicy",
    "{\"maxReceiveCount\":\"2\", "
      + "\"deadLetterTargetArn\":\"" + deadLetterQueueARN + "\"}");

sqs.setQueueAttributes(queueAttributesRequest);

SetQueueAttributesRequestインスタンスを構築するときにaddAttributesEntry()メソッドで設定したJSONパケットには、必要な情報が含まれていますmaxReceiveCount2です。つまり、メッセージはこれまで何度も受信され、正しく処理されていないと見なされ、デッドレターキューに送信されます。

deadLetterTargetArn 属性は、標準キューが新しく作成されたデッドレターキューを指すようにします。

8. モニタリング

私たちはできる特定のキューに現在あるメッセージの数と、SDKで処理中のメッセージの数を確認します。 まず、作成する必要があります GetQueueAttributesRequest。 

そこから、キューの状態を確認します。

GetQueueAttributesRequest getQueueAttributesRequest 
  = new GetQueueAttributesRequest(standardQueueUrl)
    .withAttributeNames("All");
GetQueueAttributesResult getQueueAttributesResult 
  = sqs.getQueueAttributes(getQueueAttributesRequest);
System.out.println(String.format("The number of messages on the queue: %s", 
  getQueueAttributesResult.getAttributes()
    .get("ApproximateNumberOfMessages")));
System.out.println(String.format("The number of messages in flight: %s", 
  getQueueAttributesResult.getAttributes()
    .get("ApproximateNumberOfMessagesNotVisible")));

Amazon Cloud Watch を使用すると、より詳細なモニタリングを実現できます。

9. 結論

この記事では、AWSJavaSDKを使用してSQSキューを管理する方法を説明しました。

いつものように、この記事で使用されているすべてのコードサンプルは、GitHubにあります。