1. ゴール

S3アップロードに関する前回の記事では、jcloudsからの汎用BlobAPIを使用してコンテンツをS3にアップロードする方法について説明しました。 この記事では、jcloudsS3固有の非同期APIを使用してコンテンツをアップロードし、S3が提供するマルチパートアップロード機能を活用します。

2. 準備

2.1. カスタムAPIを設定する

アップロードプロセスの最初の部分は、jclouds APIの作成です。これは、AmazonS3のカスタムAPIです。

public AWSS3AsyncClient s3AsyncClient() {
   String identity = ...
   String credentials = ...

   BlobStoreContext context = ContextBuilder.newBuilder("aws-s3").
      credentials(identity, credentials).buildView(BlobStoreContext.class);

   RestContext<AWSS3Client, AWSS3AsyncClient> providerContext = context.unwrap();
   return providerContext.getAsyncApi();
}

2.2. コンテンツのパーツ数の決定

Amazon S3には、アップロードするパーツごとに5MBの制限があります。 そのため、最初に行う必要があるのは、コンテンツを分割できる適切な数のパーツを決定して、この5MBの制限を下回るパーツがないようにすることです。

public static int getMaximumNumberOfParts(byte[] byteArray) {
   int numberOfParts= byteArray.length / fiveMB; // 5*1024*1024
   if (numberOfParts== 0) {
      return 1;
   }
   return numberOfParts;
}

2.3. コンテンツをパーツに分割する

バイト配列を設定された数の部分に分割します。

public static List<byte[]> breakByteArrayIntoParts(byte[] byteArray, int maxNumberOfParts) {
   List<byte[]> parts = Lists.<byte[]> newArrayListWithCapacity(maxNumberOfParts);
   int fullSize = byteArray.length;
   long dimensionOfPart = fullSize / maxNumberOfParts;
   for (int i = 0; i < maxNumberOfParts; i++) {
      int previousSplitPoint = (int) (dimensionOfPart * i);
      int splitPoint = (int) (dimensionOfPart * (i + 1));
      if (i == (maxNumberOfParts - 1)) {
         splitPoint = fullSize;
      }
      byte[] partBytes = Arrays.copyOfRange(byteArray, previousSplitPoint, splitPoint);
      parts.add(partBytes);
   }

   return parts;
}

バイト配列を部分に分割するロジックをテストします。いくつかのバイトを生成し、バイト配列を分割し、Guavaとverifyを使用して再構成します。元の状態に戻すこと:

@Test
public void given16MByteArray_whenFileBytesAreSplitInto3_thenTheSplitIsCorrect() {
   byte[] byteArray = randomByteData(16);

   int maximumNumberOfParts = S3Util.getMaximumNumberOfParts(byteArray);
   List<byte[]> fileParts = S3Util.breakByteArrayIntoParts(byteArray, maximumNumberOfParts);

   assertThat(fileParts.get(0).length + fileParts.get(1).length + fileParts.get(2).length, 
      equalTo(byteArray.length));
   byte[] unmultiplexed = Bytes.concat(fileParts.get(0), fileParts.get(1), fileParts.get(2));
   assertThat(byteArray, equalTo(unmultiplexed));
}

データを生成するには、ランダムのサポートを使用するだけです。

byte[] randomByteData(int mb) {
   byte[] randomBytes = new byte[mb * 1024 * 1024];
   new Random().nextBytes(randomBytes);
   return randomBytes;
}

2.4. ペイロードの作成

コンテンツの正しいパーツ数を決定し、コンテンツをパーツに分割できたので、jcloudsAPIのペイロードオブジェクトを生成する必要があります。

public static List<Payload> createPayloadsOutOfParts(Iterable<byte[]> fileParts) {
   List<Payload> payloads = Lists.newArrayList();
   for (byte[] filePart : fileParts) {
      byte[] partMd5Bytes = Hashing.md5().hashBytes(filePart).asBytes();
      Payload partPayload = Payloads.newByteArrayPayload(filePart);
      partPayload.getContentMetadata().setContentLength((long) filePart.length);
      partPayload.getContentMetadata().setContentMD5(partMd5Bytes);
      payloads.add(partPayload);
   }
   return payloads;
}

3. アップロード

アップロードプロセスは柔軟なマルチステッププロセスです。つまり、次のことを意味します。

  • すべてのデータを取得する前にアップロードを開始できます–データが入ってくるときにアップロードできます
  • データはチャンクにアップロードされます–これらの操作のいずれかが失敗した場合、データを簡単に取得できます
  • チャンクは並列でアップロードできます–これにより、特に大きなファイルの場合、アップロード速度が大幅に向上します。

3.1. アップロード操作の開始

アップロード操作の最初のステップは、プロセスを開始するです。 S3へのこのリクエストには、標準のHTTPヘッダーが含まれている必要があります。特に Content MD5ヘッダーを計算する必要があります。 ここでは、Guavaハッシュ関数のサポートを使用します。

Hashing.md5().hashBytes(byteArray).asBytes();

これは、バイト配列全体の md5 hash であり、まだ一部ではありません。

アップロードを開始する、およびS3との以降のすべての対話には、先に作成した非同期APIであるAWSS3AsyncClientを使用します。

ObjectMetadata metadata = ObjectMetadataBuilder.create().key(key).contentMD5(md5Bytes).build();
String uploadId = s3AsyncApi.initiateMultipartUpload(container, metadata).get();

key は、オブジェクトに割り当てられたハンドルです。これは、クライアントによって指定された一意の識別子である必要があります。

また、APIの非同期バージョンを使用している場合でも、この操作の結果をブロックしていることに注意してください。これは、初期化の結果を移動できるようにする必要があるためです。前方。

操作の結果は、S3によって返される upload id です。これにより、ライフサイクル全体でアップロードが識別され、後続のすべてのアップロード操作に存在します。

3.2. パーツのアップロード

次のステップは、パーツのアップロードです。 ここでの目標は、これらのリクエストを並列で送信することです。これは、パーツのアップロード操作がアップロードプロセスの大部分を表すためです。

List<ListenableFuture<String>> ongoingOperations = Lists.newArrayList();
for (int partNumber = 0; partNumber < filePartsAsByteArrays.size(); partNumber++) {
   ListenableFuture<String> future = s3AsyncApi.uploadPart(
      container, key, partNumber + 1, uploadId, payloads.get(partNumber));
   ongoingOperations.add(future);
}

部品番号は連続している必要がありますが、要求が送信される順序は関係ありません。

すべてのパーツのアップロードリクエストが送信されたら、応答を待つ必要があります。これにより、各パーツの個々のETag値を収集できます。

Function<ListenableFuture<String>, String> getEtagFromOp = 
  new Function<ListenableFuture<String>, String>() {
   public String apply(ListenableFuture<String> ongoingOperation) {
      try {
         return ongoingOperation.get();
      } catch (InterruptedException | ExecutionException e) {
         throw new IllegalStateException(e);
      }
   }
};
List<String> etagsOfParts = Lists.transform(ongoingOperations, getEtagFromOp);

何らかの理由でパーツのアップロード操作の1つが失敗した場合、操作は成功するまで再試行できます。 上記のロジックには再試行メカニズムは含まれていませんが、組み込みは十分に簡単なはずです。

3.3. アップロード操作の完了

アップロードプロセスの最後のステップは、マルチパート操作の完了です。 S3 APIでは、前のパーツからの応答を Map としてアップロードする必要があります。これは、上記で取得したETagのリストから簡単に作成できるようになりました。

Map<Integer, String> parts = Maps.newHashMap();
for (int i = 0; i < etagsOfParts.size(); i++) {
   parts.put(i + 1, etagsOfParts.get(i));
}

そして最後に、完全なリクエストを送信します。

s3AsyncApi.completeMultipartUpload(container, key, uploadId, parts).get();

これにより、完成したオブジェクトの最終的なETagが返され、アップロードプロセス全体が完了します。

4. 結論

この記事では、カスタムS3 jclouds APIを使用して、S3へのマルチパート対応の完全並列アップロード操作を構築しました。 この操作はそのまま使用できますが、いくつかの方法で改善することができます。

まず、失敗に適切に対処するために、アップロード操作の周囲に再試行ロジックを追加する必要があります。

次に、非常に大きなファイルの場合、メカニズムがすべてのアップロードマルチパートリクエストを並列に送信している場合でも、スロットルメカニズムは送信される並列リクエストの数を制限する必要があります。 これは、帯域幅がボトルネックになるのを防ぐためと、Amazon自体がアップロードプロセスに1秒あたりのリクエストの許可された制限を超えているというフラグを立てないようにするためです。GuavaRateLimiterは潜在的に非常に適しています。これ。