jcloudsを使用したS3でのマルチパートアップロード

  • link:/category/cloud/ [クラウド]

  • AWS

1. ゴール

link:/ 2013/04/01 / upload-on-s3-with-jclouds / [S3アップロードに関する以前の記事]では、jcloudsから汎用Blob APIを使用してコンテンツをS3にアップロードする方法を検討しました。 この記事では、jclouds *の* S3固有の非同期APIを使用してコンテンツをアップロードし、マルチパートアップロード機能http://aws.typepad.com/aws/2010/11/amazon-s3-multipart-upload.html [ S3によって提供されます。

2. 準備

* 2.1。 カスタムAPIをセットアップします*

アップロードプロセスの最初の部分はjclouds APIの作成です。これはAmazon S3のカスタムAPIです。
public AWSS3AsyncClient s3AsyncClient() {
   String identity = ...
   String credentials = ...

   BlobStoreContext context = ContextBuilder.newBuilder("aws-s3").
      credentials(identity, credentials).buildView(BlobStoreContext.class);

   RestContext<AWSS3Client, AWSS3AsyncClient> providerContext = context.unwrap();
   return providerContext.getAsyncApi();
}

* 2.2。 コンテンツのパーツ数の決定*

Amazon S3には、アップロードする各パーツに5 MBの制限があります。 そのため、最初に行う必要があるのは、この5 MBの制限を下回らないように、コンテンツを分割できる適切なパーツ数を決定することです。
public static int getMaximumNumberOfParts(byte[] byteArray) {
   int numberOfParts= byteArray.length / fiveMB; // 5*1024*1024
   if (numberOfParts== 0) {
      return 1;
   }
   return numberOfParts;
}

* 2.3。 コンテンツをパーツに分割*

バイト配列を設定された数の部分に分割します。
public static List<byte[]> breakByteArrayIntoParts(byte[] byteArray, int maxNumberOfParts) {
   List<byte[]> parts = Lists.<byte[]> newArrayListWithCapacity(maxNumberOfParts);
   int fullSize = byteArray.length;
   long dimensionOfPart = fullSize / maxNumberOfParts;
   for (int i = 0; i < maxNumberOfParts; i++) {
      int previousSplitPoint = (int) (dimensionOfPart * i);
      int splitPoint = (int) (dimensionOfPart * (i + 1));
      if (i == (maxNumberOfParts - 1)) {
         splitPoint = fullSize;
      }
      byte[] partBytes = Arrays.copyOfRange(byteArray, previousSplitPoint, splitPoint);
      parts.add(partBytes);
   }

   return parts;
}
バイト配列を複数の部分に分割するロジックを*テスト*します。いくつかのバイトを生成し、バイト配列を分割し、グアバを使用して元に戻し、元に戻すことを*検証*します。 :
@Test
public void given16MByteArray_whenFileBytesAreSplitInto3_thenTheSplitIsCorrect() {
   byte[] byteArray = randomByteData(16);

   int maximumNumberOfParts = S3Util.getMaximumNumberOfParts(byteArray);
   List<byte[]> fileParts = S3Util.breakByteArrayIntoParts(byteArray, maximumNumberOfParts);

   assertThat(fileParts.get(0).length + fileParts.get(1).length + fileParts.get(2).length,
      equalTo(byteArray.length));
   byte[] unmultiplexed = Bytes.concat(fileParts.get(0), fileParts.get(1), fileParts.get(2));
   assertThat(byteArray, equalTo(unmultiplexed));
}
データを生成するには、_Random_のサポートを使用するだけです。
byte[] randomByteData(int mb) {
   byte[] randomBytes = new byte[mb * 1024 * 1024];
   new Random().nextBytes(randomBytes);
   return randomBytes;
}

* 2.4。 ペイロードの作成

*
コンテンツの正しいパーツ数を決定し、コンテンツをパーツに分割することができたので、jclouds APIの*ペイロードオブジェクト*を生成する必要があります。
public static List<Payload> createPayloadsOutOfParts(Iterable<byte[]> fileParts) {
   List<Payload> payloads = Lists.newArrayList();
   for (byte[] filePart : fileParts) {
      byte[] partMd5Bytes = Hashing.md5().hashBytes(filePart).asBytes();
      Payload partPayload = Payloads.newByteArrayPayload(filePart);
      partPayload.getContentMetadata().setContentLength((long) filePart.length);
      partPayload.getContentMetadata().setContentMD5(partMd5Bytes);
      payloads.add(partPayload);
   }
   return payloads;
}

3. アップロード

アップロードプロセスは柔軟なマルチステッププロセスです。つまり、次のことを意味します。
  • すべてのデータを取得する前に*アップロードを開始できます –データは
    入ってくるのでアップロード

  • データは*チャンク*でアップロードされます。これらの操作のいずれかが失敗すると、
    簡単に取得できます

  • チャンクは*並行して*アップロードできます。これにより、
    特に大きなファイルの場合、アップロード速度

* 3.1。 アップロード操作の開始*

アップロード操作の最初のステップは、*プロセスの開始*です。 このS3への要求には、標準のHTTPヘッダーが含まれている必要があります。特に_Content _–_ MD5_ヘッダーを計算する必要があります。 ここでGuavaハッシュ関数のサポートを使用します。
Hashing.md5().hashBytes(byteArray).asBytes();
これは、バイトアレイ全体の* md5ハッシュ*であり、まだ一部ではありません。
*アップロードを開始*し、S3とのすべての対話のために、AWSS3AsyncClient(以前に作成した非同期API)を使用します。
ObjectMetadata metadata = ObjectMetadataBuilder.create().key(key).contentMD5(md5Bytes).build();
String uploadId = s3AsyncApi.initiateMultipartUpload(container, metadata).get();
_ * key * _はオブジェクトに割り当てられたハンドルです。これはクライアントによって指定された一意の識別子である必要があります。
また、非同期バージョンのAPIを使用している場合でも、この操作の結果をブロックしていることに注意してください。これは、先に進むために初期化の結果が必要だからです。
操作の結果は、S3によって返される*アップロードID *です。これにより、ライフサイクル全体を通してアップロードが識別され、以降のすべてのアップロード操作に表示されます。

* 3.2。 パーツのアップロード*

次のステップは、*部品のアップロード*です。 ここでの目標は、アップロードパーツ操作がアップロードプロセスの大部分を表すため、これらのリクエストを*並行して*送信することです。
List<ListenableFuture<String>> ongoingOperations = Lists.newArrayList();
for (int partNumber = 0; partNumber < filePartsAsByteArrays.size(); partNumber++) {
   ListenableFuture<String> future = s3AsyncApi.uploadPart(
      container, key, partNumber + 1, uploadId, payloads.get(partNumber));
   ongoingOperations.add(future);
}
部品番号は連続している必要がありますが、リクエストが送信される順序は関係ありません。
すべてのアップロードパーツリクエストが送信された後、各パーツの個々のETag値を収集できるように、*レスポンスを待つ*必要があります。
Function<ListenableFuture<String>, String> getEtagFromOp =
  new Function<ListenableFuture<String>, String>() {
   public String apply(ListenableFuture<String> ongoingOperation) {
      try {
         return ongoingOperation.get();
      } catch (InterruptedException | ExecutionException e) {
         throw new IllegalStateException(e);
      }
   }
};
List<String> etagsOfParts = Lists.transform(ongoingOperations, getEtagFromOp);
何らかの理由で、アップロードパーツ操作の1つが失敗した場合、成功するまで*操作を再試行できます*。 上記のロジックには再試行メカニズムが含まれていませんが、それを構築するのは十分簡単です。

* 3.3。 アップロード操作の完了*

アップロードプロセスの最後のステップは、*マルチパート操作の完了*です。 S3 APIは、以前のパーツからの応答を_Map_としてアップロードする必要があります。これは、上記で取得したETagのリストから簡単に作成できます。
Map<Integer, String> parts = Maps.newHashMap();
for (int i = 0; i < etagsOfParts.size(); i++) {
   parts.put(i + 1, etagsOfParts.get(i));
}
最後に、完全なリクエストを送信します。
s3AsyncApi.completeMultipartUpload(container, key, uploadId, parts).get();
これにより、完成したオブジェクトの最終ETagが返され、アップロードプロセス全体が完了します。

4. 結論

この記事では、カスタムS3 jclouds APIを使用して、S3へのマルチパート対応の完全並列アップロード操作を構築しました。 この操作はそのまま使用する準備ができていますが、いくつかの方法で*改善*できます。
まず、失敗に対処するために、アップロード操作の周りに*再試行ロジック*を追加する必要があります。
次に、非常に大きなファイルの場合、メカニズムがすべてのアップロードマルチパートリクエストを並行して送信していても、*スロットルメカニズム*が送信される並行リクエストの数を制限する必要があります。 これは、帯域幅がボトルネックにならないようにするためと、Amazon自体がアップロードプロセスに許可されている1秒あたりのリクエストの制限を超えているとフラグを立てないようにするためです。 /master/guava/src/com/google/common/util/concurrent/RateLimiter.java[Guava RateLimiter]は、これに非常に適している可能性があります。