jcloudsを使ったS3でのマルチパートアップロード
1ゴール
リンク:/2013/04/01/upload-on-s3-with-jclouds/[S3アップロードに関する前の記事]では、jcloudsから汎用Blob APIを使用してコンテンツをS3にアップロードする方法について説明しました。この記事では、jclouds
の
S3固有の非同期APIを使用してコンテンツをアップロードし、マルチパートアップロード機能を活用します。http://aws.typepad.com/aws/2010/11/amazon-s3-multipart-upload.html[S3によって提供される。
2準備
2.1. カスタムAPIを設定する
アップロードプロセスの最初の部分はjclouds APIの作成です。これはAmazon S3用のカスタムAPIです。
public AWSS3AsyncClient s3AsyncClient() {
String identity = ...
String credentials = ...
BlobStoreContext context = ContextBuilder.newBuilder("aws-s3").
credentials(identity, credentials).buildView(BlobStoreContext.class);
RestContext<AWSS3Client, AWSS3AsyncClient> providerContext = context.unwrap();
return providerContext.getAsyncApi();
}
2.2. コンテンツのパーツ数の決定
Amazon S3では、アップロードされる各パートに対して5 MBの制限があります。そのため、最初にする必要があるのは、この5 MBの制限を下回るパーツがないように、コンテンツを分割できる適切なパーツ数を決定することです。
public static int getMaximumNumberOfParts(byte[]byteArray) {
int numberOfParts= byteArray.length/fiveMB;//5** 1024** 1024
if (numberOfParts== 0) {
return 1;
}
return numberOfParts;
}
2.3. コンテンツを複数の部分に分割する
バイト配列を設定された数の部分に分割します。
public static List<byte[]> breakByteArrayIntoParts(byte[]byteArray, int maxNumberOfParts) {
List<byte[]> parts = Lists.<byte[]> newArrayListWithCapacity(maxNumberOfParts);
int fullSize = byteArray.length;
long dimensionOfPart = fullSize/maxNumberOfParts;
for (int i = 0; i < maxNumberOfParts; i++) {
int previousSplitPoint = (int) (dimensionOfPart ** i);
int splitPoint = (int) (dimensionOfPart ** (i + 1));
if (i == (maxNumberOfParts - 1)) {
splitPoint = fullSize;
}
byte[]partBytes = Arrays.copyOfRange(byteArray, previousSplitPoint, splitPoint);
parts.add(partBytes);
}
return parts;
}
バイト配列を複数の部分に分割するロジックを
テスト
します。いくつかのバイトを生成し、バイト配列を分割し、Guavaを使用してそれを再構成して、元の状態に戻ったことを確認します。
@Test
public void given16MByteArray__whenFileBytesAreSplitInto3__thenTheSplitIsCorrect() {
byte[]byteArray = randomByteData(16);
int maximumNumberOfParts = S3Util.getMaximumNumberOfParts(byteArray);
List<byte[]> fileParts = S3Util.breakByteArrayIntoParts(byteArray, maximumNumberOfParts);
assertThat(fileParts.get(0).length + fileParts.get(1).length + fileParts.get(2).length,
equalTo(byteArray.length));
byte[]unmultiplexed = Bytes.concat(fileParts.get(0), fileParts.get(1), fileParts.get(2));
assertThat(byteArray, equalTo(unmultiplexed));
}
データを生成するには、単に
Random
のサポートを使用します。
byte[]randomByteData(int mb) {
byte[]randomBytes = new byte[mb ** 1024 ** 1024];
new Random().nextBytes(randomBytes);
return randomBytes;
}
** 2.4. ペイロードを作成する
**
コンテンツの正しいパーツ数を決定し、コンテンツをいくつかの部分に分割することができたので、jclouds API用にPayloadオブジェクトを生成する必要があります。
public static List<Payload> createPayloadsOutOfParts(Iterable<byte[]> fileParts) {
List<Payload> payloads = Lists.newArrayList();
for (byte[]filePart : fileParts) {
byte[]partMd5Bytes = Hashing.md5().hashBytes(filePart).asBytes();
Payload partPayload = Payloads.newByteArrayPayload(filePart);
partPayload.getContentMetadata().setContentLength((long) filePart.length);
partPayload.getContentMetadata().setContentMD5(partMd5Bytes);
payloads.add(partPayload);
}
return payloads;
}
3アップロードする
アップロードプロセスは柔軟なマルチステッププロセスです。つまり、
-
すべてのデータを取得する前に
アップロードを開始することができます
– データを保存することができます
入ってくるのでアップロード
データは
チャンク** でアップロードされます – これらの操作の1つが失敗した場合、それは
単純に取得することができます
チャンクは
並行して** アップロードすることができます – これは大幅に増加することができます
特に大きなファイルの場合、アップロード速度
3.1. アップロード操作の開始
アップロード操作の最初のステップは、
プロセスを開始することです
。
S3へのこの要求は標準のHTTPヘッダを含まなければなりません –
Content
–
MD5
ヘッダは特に計算される必要があります。ここでは、Guavaハッシュ関数サポートを使用します。
Hashing.md5().hashBytes(byteArray).asBytes();
これはバイト配列全体の
md5ハッシュ
であり、部分ごとのものではありません。
-
アップロードを開始** し、さらにS3とやり取りするために、AWSS3AsyncClient(以前に作成した非同期API)を使用します。
ObjectMetadata metadata = ObjectMetadataBuilder.create().key(key).contentMD5(md5Bytes).build();
String uploadId = s3AsyncApi.initiateMultipartUpload(container, metadata).get();
key
はオブジェクトに割り当てられたハンドルです – これはクライアントによって指定されたユニークな識別子である必要があります。
また、非同期バージョンのAPIを使用しているにもかかわらず、この操作の結果をブロックしているのです。
操作の結果は、S3から返される
upload id
です。これは、ライフサイクル全体を通してアップロードを識別し、それ以降のすべてのアップロード操作に表示されます。
3.2. パーツをアップロードする
次のステップは
パーツのアップロード
です。ここでの目標は、パーツアップロード操作がアップロードプロセスの大部分を占めるので、これらの要求を
並行して
送信することです。
List<ListenableFuture<String>> ongoingOperations = Lists.newArrayList();
for (int partNumber = 0; partNumber < filePartsAsByteArrays.size(); partNumber++) {
ListenableFuture<String> future = s3AsyncApi.uploadPart(
container, key, partNumber + 1, uploadId, payloads.get(partNumber));
ongoingOperations.add(future);
}
部品番号は連続している必要がありますが、要求が送信される順序は関係ありません。
すべてのアップロード部分の要求が送信された後、各部分の個々のETag値を収集できるように、
それらの応答を待つ
必要があります。
Function<ListenableFuture<String>, String> getEtagFromOp =
new Function<ListenableFuture<String>, String>() {
public String apply(ListenableFuture<String> ongoingOperation) {
try {
return ongoingOperation.get();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
};
List<String> etagsOfParts = Lists.transform(ongoingOperations, getEtagFromOp);
何らかの理由で一部のアップロード操作が失敗した場合は、成功するまで** 操作を再試行できます。上記のロジックには再試行メカニズムは含まれていませんが、それを組み込むことは十分に簡単なはずです。
3.3. アップロード操作を完了する
アップロードプロセスの最後のステップは、
マルチパート操作の完了
です。 S3 APIでは、前のパートからの応答を
Map
としてアップロードする必要があります。これは、上記で取得したETagのリストから簡単に作成できます。
Map<Integer, String> parts = Maps.newHashMap();
for (int i = 0; i < etagsOfParts.size(); i++) {
parts.put(i + 1, etagsOfParts.get(i));
}
そして最後に、完全なリクエストを送信します。
s3AsyncApi.completeMultipartUpload(container, key, uploadId, parts).get();
これにより、完成したオブジェクトの最終的なETagが返され、アップロードプロセス全体が完了します。
4結論
この記事では、カスタムS3 jclouds APIを使用して、マルチパート対応のS3への完全並列アップロード操作を作成しました。この操作はそのまま使うことができますが、いくつかの方法で
改善
することができます。
まず、失敗に対処するために、アップロード操作の前後に
再試行ロジック
を追加する必要があります。
次に、本当に大きなファイルの場合、メカニズムがすべてのアップロードマルチパート要求を並行して送信していても、
調整メカニズムによって
送信される並行要求の数が制限されるはずです。これは、帯域幅がボトルネックになるのを防ぐためと、Amazon自身が1秒あたりのリクエストの許容制限を超えているとアップロードプロセスがフラグを立てないようにするための両方です。/guava/src/com/google/common/util/concurrent/RateLimiter.java[Guava RateLimiter]は、これに非常に適している可能性があります。