1ゴール

リンク:/2013/04/01/upload-on-s3-with-jclouds/[S3アップロードに関する前の記事]では、jcloudsから汎用Blob APIを使用してコンテンツをS3にアップロードする方法について説明しました。この記事では、jclouds



S3固有の非同期APIを使用してコンテンツをアップロードし、マルチパートアップロード機能を活用します。http://aws.typepad.com/aws/2010/11/amazon-s3-multipart-upload.html[S3によって提供される。


2準備


2.1. カスタムAPIを設定する

アップロードプロセスの最初の部分はjclouds APIの作成です。これはAmazon S3用のカスタムAPIです。

public AWSS3AsyncClient s3AsyncClient() {
   String identity = ...
   String credentials = ...

   BlobStoreContext context = ContextBuilder.newBuilder("aws-s3").
      credentials(identity, credentials).buildView(BlobStoreContext.class);

   RestContext<AWSS3Client, AWSS3AsyncClient> providerContext = context.unwrap();
   return providerContext.getAsyncApi();
}


2.2. コンテンツのパーツ数の決定

Amazon S3では、アップロードされる各パートに対して5 MBの制限があります。そのため、最初にする必要があるのは、この5 MBの制限を下回るパーツがないように、コンテンツを分割できる適切なパーツ数を決定することです。

public static int getMaximumNumberOfParts(byte[]byteArray) {
   int numberOfParts= byteArray.length/fiveMB;//5** 1024** 1024
   if (numberOfParts== 0) {
      return 1;
   }
   return numberOfParts;
}


2.3. コンテンツを複数の部分に分割する

バイト配列を設定された数の部分に分割します。

public static List<byte[]> breakByteArrayIntoParts(byte[]byteArray, int maxNumberOfParts) {
   List<byte[]> parts = Lists.<byte[]> newArrayListWithCapacity(maxNumberOfParts);
   int fullSize = byteArray.length;
   long dimensionOfPart = fullSize/maxNumberOfParts;
   for (int i = 0; i < maxNumberOfParts; i++) {
      int previousSplitPoint = (int) (dimensionOfPart **  i);
      int splitPoint = (int) (dimensionOfPart **  (i + 1));
      if (i == (maxNumberOfParts - 1)) {
         splitPoint = fullSize;
      }
      byte[]partBytes = Arrays.copyOfRange(byteArray, previousSplitPoint, splitPoint);
      parts.add(partBytes);
   }

   return parts;
}

バイト配列を複数の部分に分割するロジックを

テスト

します。いくつかのバイトを生成し、バイト配列を分割し、Guavaを使用してそれを再構成して、元の状態に戻ったことを確認します。

@Test
public void given16MByteArray__whenFileBytesAreSplitInto3__thenTheSplitIsCorrect() {
   byte[]byteArray = randomByteData(16);

   int maximumNumberOfParts = S3Util.getMaximumNumberOfParts(byteArray);
   List<byte[]> fileParts = S3Util.breakByteArrayIntoParts(byteArray, maximumNumberOfParts);

   assertThat(fileParts.get(0).length + fileParts.get(1).length + fileParts.get(2).length,
      equalTo(byteArray.length));
   byte[]unmultiplexed = Bytes.concat(fileParts.get(0), fileParts.get(1), fileParts.get(2));
   assertThat(byteArray, equalTo(unmultiplexed));
}

データを生成するには、単に

Random

のサポートを使用します。

byte[]randomByteData(int mb) {
   byte[]randomBytes = new byte[mb **  1024 **  1024];
   new Random().nextBytes(randomBytes);
   return randomBytes;
}

** 2.4. ペイロードを作成する

**

コンテンツの正しいパーツ数を決定し、コンテンツをいくつかの部分に分割することができたので、jclouds API用にPayloadオブジェクトを生成する必要があります。

public static List<Payload> createPayloadsOutOfParts(Iterable<byte[]> fileParts) {
   List<Payload> payloads = Lists.newArrayList();
   for (byte[]filePart : fileParts) {
      byte[]partMd5Bytes = Hashing.md5().hashBytes(filePart).asBytes();
      Payload partPayload = Payloads.newByteArrayPayload(filePart);
      partPayload.getContentMetadata().setContentLength((long) filePart.length);
      partPayload.getContentMetadata().setContentMD5(partMd5Bytes);
      payloads.add(partPayload);
   }
   return payloads;
}


3アップロードする

アップロードプロセスは柔軟なマルチステッププロセスです。つまり、

  • すべてのデータを取得する前に

    アップロードを開始することができます

    – データを保存することができます

入ってくるのでアップロード

データは

チャンク** でアップロードされます – これらの操作の1つが失敗した場合、それは

単純に取得することができます

チャンクは

並行して** アップロードすることができます – これは大幅に増加することができます

特に大きなファイルの場合、アップロード速度


3.1. アップロード操作の開始

アップロード操作の最初のステップは、

プロセスを開始することです

S3へのこの要求は標準のHTTPヘッダを含まなければなりません –

Content



MD5

ヘッダは特に計算される必要があります。ここでは、Guavaハッシュ関数サポートを使用します。

Hashing.md5().hashBytes(byteArray).asBytes();

これはバイト配列全体の

md5ハッシュ

であり、部分ごとのものではありません。

  • アップロードを開始** し、さらにS3とやり取りするために、AWSS3AsyncClient(以前に作成した非同期API)を使用します。

ObjectMetadata metadata = ObjectMetadataBuilder.create().key(key).contentMD5(md5Bytes).build();
String uploadId = s3AsyncApi.initiateMultipartUpload(container, metadata).get();



key


はオブジェクトに割り当てられたハンドルです – これはクライアントによって指定されたユニークな識別子である必要があります。

また、非同期バージョンのAPIを使用しているにもかかわらず、この操作の結果をブロックしているのです。

操作の結果は、S3から返される

upload id

です。これは、ライフサイクル全体を通してアップロードを識別し、それ以降のすべてのアップロード操作に表示されます。


3.2. パーツをアップロードする

次のステップは

パーツのアップロード

です。ここでの目標は、パーツアップロード操作がアップロードプロセスの大部分を占めるので、これらの要求を

並行して

送信することです。

List<ListenableFuture<String>> ongoingOperations = Lists.newArrayList();
for (int partNumber = 0; partNumber < filePartsAsByteArrays.size(); partNumber++) {
   ListenableFuture<String> future = s3AsyncApi.uploadPart(
      container, key, partNumber + 1, uploadId, payloads.get(partNumber));
   ongoingOperations.add(future);
}

部品番号は連続している必要がありますが、要求が送信される順序は関係ありません。

すべてのアップロード部分の要求が送信された後、各部分の個々のETag値を収集できるように、

それらの応答を待つ

必要があります。

Function<ListenableFuture<String>, String> getEtagFromOp =
  new Function<ListenableFuture<String>, String>() {
   public String apply(ListenableFuture<String> ongoingOperation) {
      try {
         return ongoingOperation.get();
      } catch (InterruptedException | ExecutionException e) {
         throw new IllegalStateException(e);
      }
   }
};
List<String> etagsOfParts = Lists.transform(ongoingOperations, getEtagFromOp);

何らかの理由で一部のアップロード操作が失敗した場合は、成功するまで** 操作を再試行できます。上記のロジックには再試行メカニズムは含まれていませんが、それを組み込むことは十分に簡単なはずです。


3.3. アップロード操作を完了する

アップロードプロセスの最後のステップは、

マルチパート操作の完了

です。 S3 APIでは、前のパートからの応答を

Map

としてアップロードする必要があります。これは、上記で取得したETagのリストから簡単に作成できます。

Map<Integer, String> parts = Maps.newHashMap();
for (int i = 0; i < etagsOfParts.size(); i++) {
   parts.put(i + 1, etagsOfParts.get(i));
}

そして最後に、完全なリクエストを送信します。

s3AsyncApi.completeMultipartUpload(container, key, uploadId, parts).get();

これにより、完成したオブジェクトの最終的なETagが返され、アップロードプロセス全体が完了します。


4結論

この記事では、カスタムS3 jclouds APIを使用して、マルチパート対応のS3への完全並列アップロード操作を作成しました。この操作はそのまま使うことができますが、いくつかの方法で

改善

することができます。

まず、失敗に対処するために、アップロード操作の前後に

再試行ロジック

を追加する必要があります。

次に、本当に大きなファイルの場合、メカニズムがすべてのアップロードマルチパート要求を並行して送信していても、

調整メカニズムによって

送信される並行要求の数が制限されるはずです。これは、帯域幅がボトルネックになるのを防ぐためと、Amazon自身が1秒あたりのリクエストの許容制限を超えているとアップロードプロセスがフラグを立てないようにするための両方です。/guava/src/com/google/common/util/concurrent/RateLimiter.java[Guava RateLimiter]は、これに非常に適している可能性があります。