1. 序章

AWSは、公式SDKを使用してJavaからアクセスできる多くのAPIを介して多くのサービスを提供しています。 ただし、最近まで、このSDKはリアクティブ操作のサポートを提供せず、非同期アクセスのサポートは限られていました。

AWS SDK for Java 2.0のリリースにより、 Reactive Streams標準を採用したおかげで、これらのAPIを完全にノンブロッキングI/Oモードで使用できるようになりました。

このチュートリアルでは、よく知られているS3サービスをストレージバックエンドとして使用する単純なblobストアRESTAPIをSpringBootに実装することにより、これらの新機能について説明します。

2. AWSS3オペレーションの概要

実装に飛び込む前に、ここで達成したいことの概要を簡単に説明しましょう。 一般的なBLOBストアサービスは、フロントエンドアプリケーションが消費するCRUD操作を公開して、エンドユーザーがオーディオ、写真、ドキュメントなどのいくつかの種類のコンテンツをアップロード、一覧表示、ダウンロード、および削除できるようにします。

従来の実装で対処しなければならない一般的な問題は、大きなファイルや低速の接続を効率的に処理する方法です。 初期のバージョン(servlet 3.0より前)では、JavaEE仕様が提供する必要があるのはブロッキングAPIだけだったため、同時BLOBストアクライアントごとにスレッドが必要でした。 このモデルには、より多くのサーバーリソース(エルゴ、より大きなマシン)を必要とし、DoSタイプの攻撃に対してより脆弱になるという欠点があります。

リアクティブスタックを使用することで、同じ数のクライアントに対してサービスのリソースを大幅に削減できます。 リアクタの実装では、読み取りへの新しいデータの可用性や以前の書き込みの完了など、I/O完了イベントに応答してディスパッチされる少数のスレッドを使用します。

これは、実行可能な作業がなくなるまで、同じスレッドがこれらのイベント(アクティブなクライアント接続のいずれかから発生する可能性がある)を処理し続けることを意味します。 このアプローチにより、コンテキストスイッチの数が大幅に削減され(非常にコストのかかる操作)、利用可能なリソースを非常に効率的に使用できます。

3. プロジェクトの設定

私たちのデモプロジェクトは、LombokやJUnitなどの通常のサポート依存関係を含む標準の Spring BootWebFluxアプリケーションです。

これらのライブラリに加えて、AWS SDK forJavaV2の依存関係を取り込む必要があります。

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>bom</artifactId>
            <version>2.10.1</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>software.amazon.awssdk</groupId>
        <artifactId>s3</artifactId>
        <scope>compile</scope>
    </dependency>

    <dependency>
        <artifactId>netty-nio-client</artifactId>
        <groupId>software.amazon.awssdk</groupId>
        <scope>compile</scope>
    </dependency>
</dependencies>

AWS SDKはすべての依存関係に必要なバージョンを定義するBOMを提供するため、POMファイルの依存関係セクションでそれらを指定する必要はありません。

S3クライアントライブラリを追加しました。これにより、SDKから他のコア依存関係がもたらされます。 AWSとのやり取りに非同期APIを使用するため、Nettyクライアントライブラリも必要です。

AWSの公式ドキュメントには、利用可能なトランスポートの詳細が含まれています。

4. AWSS3クライアントの作成

S3操作のエントリポイントはS3AsyncClientクラスであり、これを使用して新しいAPI呼び出しを開始します。

このクラスのインスタンスは1つだけ必要なので、 @ Beanメソッドを使用して@Configurationクラスを作成し、必要な場所に挿入できるようにします。

@Configuration
@EnableConfigurationProperties(S3ClientConfigurarionProperties.class)
public class S3ClientConfiguration {
    @Bean
    public S3AsyncClient s3client(S3ClientConfigurarionProperties s3props, 
      AwsCredentialsProvider credentialsProvider) {
        SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder()
          .writeTimeout(Duration.ZERO)
          .maxConcurrency(64)
          .build();
        S3Configuration serviceConfiguration = S3Configuration.builder()
          .checksumValidationEnabled(false)
          .chunkedEncodingEnabled(true)
          .build();
        S3AsyncClientBuilder b = S3AsyncClient.builder().httpClient(httpClient)
          .region(s3props.getRegion())
          .credentialsProvider(credentialsProvider)
          .serviceConfiguration(serviceConfiguration);

        if (s3props.getEndpoint() != null) {
            b = b.endpointOverride(s3props.getEndpoint());
        }
        return b.build();
    }
}

このデモでは、S3サービスにアクセスするために必要な次の情報を保持する最小限の @ConfigurationProperties クラス(リポジトリで入手可能)を使用しています。

  • region: us-east-1などの有効なAWSリージョン識別子
  • accessKeyId / secretAccessKey :AWSAPIキーと識別子
  • エンドポイント:S3のデフォルトのサービスエンドポイントをオーバーライドするために使用できるオプションのURI。 主な使用例は、S3互換のAPIを提供する代替ストレージソリューションでデモコードを使用することです(minioとlocalstackが例です)
  • バケット:アップロードされたファイルを保存するバケットの名前

クライアントの初期化コードについて言及する価値のあるいくつかのポイントがあります。 まず、書き込みタイムアウトを無効にし、最大同時実行数を増やしているため、低帯域幅の状況でもアップロードを完了できます。

次に、チェックサム検証を無効にし、チャンクエンコーディングを有効にします。 これは、データがストリーミング方式でサービスに到着したらすぐにバケットへのデータのアップロードを開始するためです。

最後に、バケットの作成自体については説明していません。これは、バケットがすでに管理者によって作成および構成されていることを前提としているためです。

クレデンシャルについては、Springプロパティからクレデンシャルを回復できるカスタマイズされたAwsCredentialsProviderを提供しています。 これにより、Springの環境抽象化およびサポートされているすべての PropertySource 実装(VaultやConfig Serverなど)を介してこれらの値を挿入できるようになります。

@Bean
public AwsCredentialsProvider awsCredentialsProvider(S3ClientConfigurarionProperties s3props) {
    if (StringUtils.isBlank(s3props.getAccessKeyId())) {
        return DefaultCredentialsProvider.create();
    } else {
        return () -> {
            return AwsBasicCredentials.create(
              s3props.getAccessKeyId(),
              s3props.getSecretAccessKey());
        };
    }
}

5. アップロードサービスの概要

次に、アップロードサービスを実装します。このサービスには、 /inboxパスでアクセスできます。

このリソースパスへのPOSTは、ランダムに生成されたキーの下でS3バケットにファイルを保存します。 元のファイル名をメタデータキーとして保存するので、それを使用してブラウザ用の適切なHTTPダウンロードヘッダーを生成できます。

単純なアップロードとマルチパートのアップロードという2つの異なるシナリオを処理する必要があります。 先に進み、 @RestController を作成して、これらのシナリオを処理するメソッドを追加しましょう。

@RestController
@RequestMapping("/inbox")
@Slf4j
public class UploadResource {
    private final S3AsyncClient s3client;
    private final S3ClientConfigurarionProperties s3config;

    public UploadResource(S3AsyncClient s3client, S3ClientConfigurarionProperties s3config) {
        this.s3client = s3client;
        this.s3config = s3config;        
    }
    
    @PostMapping
    public Mono<ResponseEntity<UploadResult>> uploadHandler(
      @RequestHeader HttpHeaders headers, 
      @RequestBody Flux<ByteBuffer> body) {
      // ... see section 6
    }

    @RequestMapping(
      consumes = MediaType.MULTIPART_FORM_DATA_VALUE,
      method = {RequestMethod.POST, RequestMethod.PUT})
    public Mono<ResponseEntity<UploadResult>> multipartUploadHandler(
      @RequestHeader HttpHeaders headers,
      @RequestBody Flux<Part> parts ) {
      // ... see section 7
    }
}

ハンドラーの署名は、両方の場合の主な違いを反映しています。単純な場合、本文にはファイルコンテンツ自体が含まれますが、マルチパートの場合、それぞれがファイルまたはフォームデータに対応する複数の「パーツ」を持つことができます。

便宜上、POSTまたはPUTメソッドを使用したマルチパートアップロードをサポートします。 この理由は、一部のツール( cURL 、特に)は、 -F オプションを使用してファイルをアップロードするときに、デフォルトで後者を使用するためです。

どちらの場合も、操作の結果と、クライアントが元のファイルを復元するために使用する必要のある生成されたファイルキーを含むUploadResultを返します。これについては後で詳しく説明します。

6. 単一ファイルのアップロード

この場合、クライアントは、生データを含むリクエスト本文を使用して、単純なPOST操作でコンテンツを送信します。 リアクティブWebアプリケーションでこのコンテンツを受信するには、宣言するだけです。 @PostMapping を取るメソッドフラックス口論。

この場合、このフラックスを新しいS3ファイルにストリーミングするのは簡単です。

必要なのは、生成されたキー、ファイル長、MIMEコンテンツタイプを使用して PutObjectRequest を構築し、それをS3クライアントの putObject()メソッドに渡すことです。

@PostMapping
public Mono<ResponseEntity<UploadResult>> uploadHandler(@RequestHeader HttpHeaders headers,
  @RequestBody Flux<ByteBuffer> body) {
    // ... some validation code omitted
    String fileKey = UUID.randomUUID().toString();
    MediaType mediaType = headers.getContentType();

    if (mediaType == null) {
        mediaType = MediaType.APPLICATION_OCTET_STREAM;
    }
    CompletableFuture future = s3client
      .putObject(PutObjectRequest.builder()
        .bucket(s3config.getBucket())
        .contentLength(length)
        .key(fileKey.toString())
        .contentType(mediaType.toString())
        .metadata(metadata)
        .build(), 
      AsyncRequestBody.fromPublisher(body));

    return Mono.fromFuture(future)
      .map((response) -> {
        checkResult(response);
        return ResponseEntity
          .status(HttpStatus.CREATED)
          .body(new UploadResult(HttpStatus.CREATED, new String[] {fileKey}));
        });
}

ここで重要なのは、着信 FluxputObject()メソッドに渡す方法です。

このメソッドは、オンデマンドでコンテンツを提供するAsyncRequestBodyオブジェクトを想定しています。 基本的に、これは通常の Publisher であり、いくつかの便利なメソッドが追加されています。 この例では、 fromPublisher ()メソッドを利用して、Fluxを必要なタイプに変換します。

また、クライアントが Content-LengthHTTPヘッダーを正しい値で送信すると想定しています。 この情報がないと、これは必須フィールドであるため、呼び出しは失敗します。

SDK V2の非同期メソッドは、常にCompleteableFutureオブジェクトを返します。 それを取得し、 fromFuture()ファクトリメソッドを使用してMonoに適合させます。 これは、最終的なUploadResultオブジェクトにマップされます。

7. 複数のファイルをアップロードする

multipart /form-dataアップロードmayの処理は、特にすべての詳細を処理するライブラリを使用する場合、簡単に思えます。 では、アップロードされたファイルごとに前の方法を単純に使用できますか? ええ、そうですが、これには代償が伴います:バッファリング。

前の方法を使用するには、パーツの長さが必要ですが、チャンク化されたファイル転送には常にこの情報が含まれているとは限りません。 1つのアプローチは、パーツを一時ファイルに保存してからAWSに送信することですが、これにより合計アップロード時間が遅くなります。 また、サーバー用の追加ストレージも意味します。

別の方法として、ここではAWSマルチパートアップロードを使用します。 この機能により、単一のファイルのアップロードを複数のチャンクに分割して、並行して順序を変えて送信することができます。

手順は次のとおりです。送信する必要があります。

  • createMultipartUpload リクエスト– AWSは、次の呼び出しで使用するuploadIdで応答します
  • uploadId 、シーケンス番号、コンテンツを含むファイルチャンク–AWSは各パーツのETag識別子で応答します
  • uploadIdと受信したすべてのETagを含むcompleteUploadリクエスト

注意:受け取ったFilePartごとにこれらの手順を繰り返します!

7.1. トップレベルのパイプライン

@ControllerクラスのmultipartUploadHandlerは、当然のことながら、マルチパートファイルのアップロードを処理します。 このコンテキストでは、各部分は、そのMIMEタイプによって識別される任意の種類のデータを持つことができます。 Reactive Webフレームワークは、これらのパーツをPartインターフェイスを実装するオブジェクトのFluxとしてハンドラーに配信します。これを順番に処理します:

return parts
  .ofType(FilePart.class)
  .flatMap((part)-> saveFile(headers, part))
  .collect(Collectors.toList())
  .map((keys)-> new UploadResult(HttpStatus.CREATED, keys)));

このパイプラインは、実際にアップロードされたファイルに対応するパーツをフィルタリングすることから始まります。このファイルは、常にFilePartインターフェイスを実装するオブジェクトになります。 次に、各部分が saveFile メソッドに渡されます。このメソッドは、単一ファイルの実際のアップロードを処理し、生成されたファイルキーを返します。

すべてのキーをリストに収集し、最後に最終的なUploadResultを作成します。 常に新しいリソースを作成しているため、通常の OKではなく、よりわかりやすい CREATED ステータス(202)を返します。

7.2. 単一ファイルのアップロードの処理

AWSのマルチパートメソッドを使用してファイルをアップロードするために必要な手順の概要はすでに説明しました。 ただし、落とし穴があります。S3サービスでは、最後の部分を除く各部分に、現在5Mバイトの最小サイズが必要です。

これは、受信したチャンクを取得してすぐに送信することはできないことを意味します。 代わりに、最小サイズまたはデータの終わりに達するまで、それらをローカルでバッファリングする必要があります。 送信したパーツの数と結果のCompletedPartの結果を追跡する場所も必要なので、この状態を保持するための単純なUploadState内部クラスを作成します。

class UploadState {
    String bucket;
    String filekey;
    String uploadId;
    int partCounter;
    Map<Integer, CompletedPart> completedParts = new HashMap<>();
    int buffered = 0;
    // ... getters/setters omitted
    UploadState(String bucket, String filekey) {
        this.bucket = bucket;
        this.filekey = filekey;
    }
}

必要な手順とバッファリングを考えると、実装は一見すると少し威圧的に見えるかもしれません。

Mono<String> saveFile(HttpHeaders headers,String bucket, FilePart part) {
    String filekey = UUID.randomUUID().toString();
    Map<String, String> metadata = new HashMap<String, String>();
    String filename = part.filename();
    if ( filename == null ) {
        filename = filekey;
    }       
    metadata.put("filename", filename);    
    MediaType mt = part.headers().getContentType();
    if ( mt == null ) {
        mt = MediaType.APPLICATION_OCTET_STREAM;
    }
    UploadState uploadState = new UploadState(bucket,filekey);     
    CompletableFuture<CreateMultipartUploadResponse> uploadRequest = s3client
      .createMultipartUpload(CreateMultipartUploadRequest.builder()
        .contentType(mt.toString())
        .key(filekey)
        .metadata(metadata)
        .bucket(bucket)
        .build());

    return Mono
      .fromFuture(uploadRequest)
      .flatMapMany((response) -> {
          checkResult(response);              
          uploadState.uploadId = response.uploadId();
          return part.content();
      })
      .bufferUntil((buffer) -> {
          uploadState.buffered += buffer.readableByteCount();
          if ( uploadState.buffered >= s3config.getMultipartMinPartSize() ) {
              uploadState.buffered = 0;
              return true;
          } else {
              return false;
          }
      })
      .map((buffers) -> concatBuffers(buffers))
      .flatMap((buffer) -> uploadPart(uploadState,buffer))
      .reduce(uploadState,(state,completedPart) -> {
          state.completedParts.put(completedPart.partNumber(), completedPart);              
          return state;
      })
      .flatMap((state) -> completeUpload(state))
      .map((response) -> {
          checkResult(response);
          return  uploadState.filekey;
      });
}

まず、ファイルメタデータを収集し、それを使用して createMultipartUpload()API呼び出しに必要なリクエストオブジェクトを作成します。 この呼び出しは、 CompleteableFuture を返します。これは、ストリーミングパイプラインの開始点です。

このパイプラインの各ステップが何をするかを確認しましょう。

  • S3で生成されたuploadIdを含む初期結果を受け取った後、それをアップロード状態オブジェクトに保存し、ファイルの本文のストリーミングを開始します。 ここでflatMapManyを使用していることに注意してください。これにより、MonoFluxに変わります。
  • bufferUntil()を使用して、必要なバイト数を累積します。 この時点でのパイプラインはフラックス DataBuffer オブジェクトをフラックスリストオブジェクト
  • それぞれを変換するリスト ByteBuffer
  • ByteBuffer をS3に送信し(次のセクションを参照)、結果のCompletedPart値をダウンストリームに返します
  • 結果のCompletedPart値をuploadStateに減らします
  • アップロードが完了したことをS3に通知します(これについては後で詳しく説明します)
  • 生成されたファイルキーを返します

7.3. ファイルパーツのアップロード

繰り返しになりますが、ここでの「ファイル部分」とは、たまたまメッセージの一部ではなく、単一のファイルの一部(たとえば、100MBファイルの最初の5MB)を意味することを明確にしましょう。トップレベルのストリームにあるので、ファイル!

ファイルアップロードパイプラインは、アップロード状態と ByteBufferの2つの引数を使用してuploadPart()メソッドを呼び出します。 そこから、 UploadPartRequest インスタンスを作成し、 S3AsyncClientで使用可能なuploadPart()メソッドを使用してデータを送信します。

private Mono<CompletedPart> uploadPart(UploadState uploadState, ByteBuffer buffer) {
    final int partNumber = ++uploadState.partCounter;
    CompletableFuture<UploadPartResponse> request = s3client.uploadPart(UploadPartRequest.builder()
        .bucket(uploadState.bucket)
        .key(uploadState.filekey)
        .partNumber(partNumber)
        .uploadId(uploadState.uploadId)
        .contentLength((long) buffer.capacity())
        .build(), 
        AsyncRequestBody.fromPublisher(Mono.just(buffer)));
    
    return Mono
      .fromFuture(request)
      .map((uploadPartResult) -> {              
          checkResult(uploadPartResult);
          return CompletedPart.builder()
            .eTag(uploadPartResult.eTag())
            .partNumber(partNumber)
            .build();
      });
}

ここでは、 uploadPart()リクエストからの戻り値を使用して、CompletedPartインスタンスを構築します。 これは、アップロードを閉じる最終リクエストを作成するときに後で必要になるAWSSDKタイプです。

7.4. アップロードの完了

最後になりましたが、S3に completeMultipartUpload()リクエストを送信して、マルチパートファイルのアップロードを完了する必要があります。 アップロードパイプラインが必要なすべての情報を引数として渡すことを考えると、これは非常に簡単です。

private Mono<CompleteMultipartUploadResponse> completeUpload(UploadState state) {        
    CompletedMultipartUpload multipartUpload = CompletedMultipartUpload.builder()
        .parts(state.completedParts.values())
        .build();
    return Mono.fromFuture(s3client.completeMultipartUpload(CompleteMultipartUploadRequest.builder()
        .bucket(state.bucket)
        .uploadId(state.uploadId)
        .multipartUpload(multipartUpload)
        .key(state.filekey)
        .build()));
}

8. AWSからファイルをダウンロードする

マルチパートアップロードと比較して、S3バケットからのオブジェクトのダウンロードははるかに簡単なタスクです。 この場合、チャンクなどについて心配する必要はありません。 SDK APIは、次の2つの引数を取る getObject()メソッドを提供します。

  • 要求されたバケットとファイルキーを含むGetObjectRequestオブジェクト
  • AsyncResponseTransformer 。これにより、着信ストリーミング応答を他の何かにマッピングできます。

SDKは後者の実装をいくつか提供しており、ストリームをフラックスに適合させることができますが、コストがかかります。データを配列バッファーに内部的にバッファリングします。 。 このバッファリングにより、デモサービスのクライアントの応答時間が遅くなるため、独自のアダプタを実装します。これは、後で説明するように、大したことではありません。

8.1. コントローラをダウンロード

私たちのダウンロードコントローラーは、標準のSpring Reactive @RestController であり、ダウンロード要求を処理する単一の@GetMappingメソッドを備えています。 @PathVariable 引数を介してファイルキーを期待し、ファイルの内容を含む非同期ResponseEntityを返します。

@GetMapping(path="/{filekey}")
Mono<ResponseEntity<Flux<ByteBuffer>>> downloadFile(@PathVariable("filekey") String filekey) {    
    GetObjectRequest request = GetObjectRequest.builder()
      .bucket(s3config.getBucket())
      .key(filekey)
      .build();
    
    return Mono.fromFuture(s3client.getObject(request,new FluxResponseProvider()))
      .map(response -> {
        checkResult(response.sdkResponse);
        String filename = getMetadataItem(response.sdkResponse,"filename",filekey);            
        return ResponseEntity.ok()
          .header(HttpHeaders.CONTENT_TYPE, response.sdkResponse.contentType())
          .header(HttpHeaders.CONTENT_LENGTH, Long.toString(response.sdkResponse.contentLength()))
          .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + filename + "\"")
          .body(response.flux);
      });
}

ここで、 getMetadataItem()は、大文字と小文字を区別しない方法で応答内の特定のメタデータキーを検索する単なるヘルパーメソッドです。

これは重要な詳細です。S3は特別なHTTPヘッダーを使用してメタデータ情報を返しますが、これらのヘッダーでは大文字と小文字が区別されません(RFC 7230、セクション3.2を参照)。 これは、実装が特定のアイテムの大文字と小文字を自由に変更できることを意味します。これは、MinIOを使用するときに実際に発生します。

8.2. FluxResponseProviderの実装

FluxReponseProvider は、 AsyncResponseTransformer インターフェイスを実装する必要があります。このインターフェイスには、次の4つのメソッドしかありません。

  • prepare()、必要なセットアップを行うことができます
  • onResponse()、S3が応答ステータスとメタデータを返すときに呼び出されます
  • onStream()応答に本文がある場合、常に onResponse()の後に呼び出されます
  • エラーが発生した場合に呼び出されるexceptionOccurred()

このプロバイダーの仕事は、これらのイベントを処理し、提供されたGetObjectResponseインスタンスと応答本文の両方をストリームとして含むFluxResponseインスタンスを作成することです。

class FluxResponseProvider implements AsyncResponseTransformer<GetObjectResponse,FluxResponse> {    
    private FluxResponse response;
    @Override
    public CompletableFuture<FluxResponse> prepare() {
        response = new FluxResponse();
        return response.cf;
    }

    @Override
    public void onResponse(GetObjectResponse sdkResponse) {            
        this.response.sdkResponse = sdkResponse;
    }

    @Override
    public void onStream(SdkPublisher<ByteBuffer> publisher) {
        response.flux = Flux.from(publisher);
        response.cf.complete(response);            
    }

    @Override
    public void exceptionOccurred(Throwable error) {
        response.cf.completeExceptionally(error);
    }
}

最後に、FluxResponseクラスを簡単に見てみましょう。

class FluxResponse {
    final CompletableFuture<FluxResponse> cf = new CompletableFuture<>();
    GetObjectResponse sdkResponse;
    Flux<ByteBuffer> flux;
}

9. 結論

このチュートリアルでは、AWSSDKV2ライブラリで利用可能なリアクティブエクステンションの使用の基本について説明しました。 ここで焦点を当てたのはAWSS3サービスでしたが、DynamoDBなどの他のリアクティブ対応サービスにも同じ手法を拡張できます。

いつものように、すべてのコードはGitHubを介して利用できます。