1. 序章

Kinesisは、Amazonで開発された、データストリームをリアルタイムで収集、処理、分析するためのツールです。 その主な利点の1つは、イベント駆動型アプリケーションの開発に役立つことです。

このチュートリアルでは、SpringアプリケーションがKinesisStreamからレコードを生成および消費できるようにするいくつかのライブラリについて説明します。 コード例は基本的な機能を示していますが、本番環境に対応したコードを表すものではありません。

2. 前提条件

先に進む前に、2つのことを行う必要があります。

1つ目は、Springプロジェクトを作成することです。ここでの目標はSpringプロジェクトのKinesisと対話することです。

2つ目は、Kinesisデータストリームを作成することです。 これは、AWSアカウントのウェブブラウザから実行できます。 AWS CLIファンの代替案の1つは、コマンドラインを使用することです。 コードから操作するため、AWS IAM Credentials 、アクセスキーとシークレットキー、およびリージョンも手元に用意する必要があります。

すべてのプロデューサーはダミーのIPアドレスレコードを作成し、コンシューマーはそれらの値を読み取ってアプリケーションコンソールに一覧表示します。

3. AWS SDK for Java

使用する最初のライブラリは、AWS SDKforJavaです。 その利点は、Kinesisデータストリームの操作の多くの部分を管理できることです。 データの読み取り、データの生成、データストリームの作成、およびデータストリームの再シャーディングを行うことができます。 欠点は、本番環境に対応したコードを作成するには、リシャーディング、エラー処理、デーモンなどの側面をコーディングして、コンシューマーを存続させる必要があることです。

3.1. Mavenの依存関係

amazon-kinesis-client Mavenの依存関係により、実用的な例を作成するために必要なすべてのものが提供されます。 これをpom.xmlファイルに追加します。

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>1.11.2</version>
</dependency>

3.2. 春のセットアップ

Kinesisストリームとのやり取りに必要なAmazonKinesisオブジェクトを再利用しましょう。 @SpringBootApplicationクラス内に@Beanとして作成します。

@Bean
public AmazonKinesis buildAmazonKinesis() {
    BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
    return AmazonKinesisClientBuilder.standard()
      .withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
      .withRegion(Regions.EU_CENTRAL_1)
      .build();
}

次に、ローカルマシンに必要なaws.access.keyaws.secret.keyapplication.propertiesで定義しましょう。

aws.access.key=my-aws-access-key-goes-here
aws.secret.key=my-aws-secret-key-goes-here

そして、@Valueアノテーションを使用してそれらを読み取ります。

@Value("${aws.access.key}")
private String accessKey;

@Value("${aws.secret.key}")
private String secretKey;

簡単にするために、@Scheduledメソッドを使用してレコードを作成および使用します。

3.3. 消費者

AWS SDK Kinesisコンシューマーはプルモデルを使用します。つまり、コードはKinesisデータストリームのシャードからレコードを描画します。

GetRecordsRequest recordsRequest = new GetRecordsRequest();
recordsRequest.setShardIterator(shardIterator.getShardIterator());
recordsRequest.setLimit(25);

GetRecordsResult recordsResult = kinesis.getRecords(recordsRequest);
while (!recordsResult.getRecords().isEmpty()) {
    recordsResult.getRecords().stream()
      .map(record -> new String(record.getData().array()))
      .forEach(System.out::println);

    recordsRequest.setShardIterator(recordsResult.getNextShardIterator());
    recordsResult = kinesis.getRecords(recordsRequest);
}

GetRecordsRequestオブジェクトは、ストリームデータのリクエストを作成します。 この例では、リクエストごとに25レコードの制限を定義し、読み取るものがなくなるまで読み取りを続けます。

また、反復では、GetShardIteratorResultオブジェクトを使用したことにも気付くでしょう。 このオブジェクトは@PostConstruc tメソッド内に作成したため、レコードの追跡をすぐに開始できます。

private GetShardIteratorResult shardIterator;

@PostConstruct
private void buildShardIterator() {
    GetShardIteratorRequest readShardsRequest = new GetShardIteratorRequest();
    readShardsRequest.setStreamName(IPS_STREAM);
    readShardsRequest.setShardIteratorType(ShardIteratorType.LATEST);
    readShardsRequest.setShardId(IPS_SHARD_ID);

    this.shardIterator = kinesis.getShardIterator(readShardsRequest);
}

3.4. プロデューサー

次に、Kinesisデータストリームのレコードの作成を処理する方法を見てみましょう。

PutRecordsRequestオブジェクトを使用してデータを挿入します。 この新しいオブジェクトには、複数のPutRecordsRequestEntryオブジェクトで構成されるリストを追加します。

List<PutRecordsRequestEntry> entries = IntStream.range(1, 200).mapToObj(ipSuffix -> {
    PutRecordsRequestEntry entry = new PutRecordsRequestEntry();
    entry.setData(ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()));
    entry.setPartitionKey(IPS_PARTITION_KEY);
    return entry;
}).collect(Collectors.toList());

PutRecordsRequest createRecordsRequest = new PutRecordsRequest();
createRecordsRequest.setStreamName(IPS_STREAM);
createRecordsRequest.setRecords(entries);

kinesis.putRecords(createRecordsRequest);

基本的なコンシューマーとシミュレートされたIPレコードのプロデューサーを作成しました。 あとは、Springプロジェクトを実行して、アプリケーションコンソールに表示されるIPを確認するだけです。

4. KCLとKPL

Kinesisクライアントライブラリ(KCL)は、レコードの消費を簡素化するライブラリです。 これは、Kinesisデータストリーム用のAWS SDKJavaAPIの抽象化レイヤーでもあります。 舞台裏では、ライブラリは多くのインスタンス間での負荷分散、インスタンスの障害への対応、処理されたレコードのチェックポイント設定、およびリシャーディングへの対応を処理します。

Kinesisプロデューサーライブラリ(KPL)は、Kinesisデータストリームへの書き込みに役立つライブラリです。 また、Kinesisデータストリーム用のAWS SDKJavaAPIの上にある抽象化レイヤーも提供します。 パフォーマンスを向上させるために、ライブラリはバッチ処理、マルチスレッド化、および再試行ロジックを自動的に処理します。

KCLとKPLはどちらも使いやすいという主な利点があるため、レコードの作成と消費に集中できます。

4.1. Mavenの依存関係

必要に応じて、2つのライブラリをプロジェクトに別々に持ち込むことができます。 KPLKCLをMavenプロジェクトに含めるには、pom.xmlファイルを更新する必要があります。

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-producer</artifactId>
    <version>0.13.1</version>
</dependency>
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>1.11.2</version>
</dependency>

4.2. 春のセットアップ

必要なSpringの準備は、IAMクレデンシャルが手元にあることを確認することだけです。 aws.access.keyaws.secret.keyの値は、 application.properties ファイルで定義されているため、@で読み取ることができます。必要に応じて値

4.3. 消費者

まず、 IRecordProcessorインターフェースを実装するクラスを作成し、Kinesisデータストリームレコードを処理するためのロジックを定義します。これは、コンソールでそれらを出力することです。

public class IpProcessor implements IRecordProcessor {
    @Override
    public void initialize(InitializationInput initializationInput) { }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        processRecordsInput.getRecords()
          .forEach(record -> System.out.println(new String(record.getData().array())));
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) { }
}

次のステップは、IRecordProcessorFactoryインターフェイスを実装し、以前に作成されたIpProcessorオブジェクトを返すファクトリクラスを定義することです。

public class IpProcessorFactory implements IRecordProcessorFactory {
    @Override
    public IRecordProcessor createProcessor() {
        return new IpProcessor();
    }
}

そして最後のステップとして、ワーカーオブジェクトを使用してコンシューマパイプラインを定義します。 必要に応じて、IAMクレデンシャルとAWSリージョンを定義するKinesisClientLibConfigurationオブジェクトが必要です。

KinesisClientLibConfigurationIpProcessorFactoryオブジェクトをWorkerに渡し、別のスレッドで開始します。 Worker クラスを使用して、レコードを消費するというこのロジックを常に維持しているため、現在、新しいレコードを継続的に読み取っています。

BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisClientLibConfiguration consumerConfig = new KinesisClientLibConfiguration(
  APP_NAME, 
  IPS_STREAM,
  new AWSStaticCredentialsProvider(awsCredentials), 
  IPS_WORKER)
    .withRegionName(Regions.EU_CENTRAL_1.getName());

final Worker worker = new Worker.Builder()
  .recordProcessorFactory(new IpProcessorFactory())
  .config(consumerConfig)
  .build();
CompletableFuture.runAsync(worker.run());

4.4. プロデューサー

次に、 KinesisProducerConfiguration オブジェクトを定義し、IAMクレデンシャルとAWSリージョンを追加します。

BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration()
  .setCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials))
  .setVerifyCertificate(false)
  .setRegion(Regions.EU_CENTRAL_1.getName());

this.kinesisProducer = new KinesisProducer(producerConfig);

以前に@Scheduledジョブで作成されたkinesisProducerオブジェクトを含め、Kinesisデータストリームのレコードを継続的に生成します。

IntStream.range(1, 200).mapToObj(ipSuffix -> ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()))
  .forEach(entry -> kinesisProducer.addUserRecord(IPS_STREAM, IPS_PARTITION_KEY, entry));

5. Spring Cloud Stream Binder Kinesis

すでに2つのライブラリを見てきましたが、どちらもSpringエコシステムの外部で作成されています。 ここで、 Spring Cloud Stream Binder Kinesisが、 Spring Cloud Stream の上に構築しながら、私たちの生活をさらに簡素化する方法を見ていきます。

5.1. Mavenの依存関係

Spring Cloud Stream BinderKinesisのアプリケーションで定義する必要のあるMavenの依存関係は次のとおりです。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kinesis</artifactId>
    <version>1.2.1.RELEASE</version>
</dependency>

5.2. 春のセットアップ

EC2で実行する場合、必要なAWSプロパティが自動的に検出されるため、それらを定義する必要はありません。 サンプルはローカルマシンで実行しているため、AWSアカウントのIAMアクセスキー、シークレットキー、リージョンを定義する必要があります。 また、アプリケーションのCloudFormationスタック名の自動検出を無効にしました。

cloud.aws.credentials.access-key=my-aws-access-key
cloud.aws.credentials.secret-key=my-aws-secret-key
cloud.aws.region.static=eu-central-1
cloud.aws.stack.auto=false

Spring Cloud Streamには、ストリームバインディングで使用できる3つのインターフェイスがバンドルされています。

  • シンクはデータ取り込み用です
  • Sourceはレコードの公開に使用されます
  • プロセッサは両方の組み合わせです

必要に応じて、独自のインターフェイスを定義することもできます。

5.3. 消費者

コンシューマーの定義は2つの部分からなる作業です。 まず、 application.properties で、消費するデータストリームを定義します。

spring.cloud.stream.bindings.input.destination=live-ips
spring.cloud.stream.bindings.input.group=live-ips-group
spring.cloud.stream.bindings.input.content-type=text/plain

次に、Spring @Componentクラスを定義しましょう。 アノテーション@EnableBinding(Sink.class)を使用すると、@ StreamListener(Sink.INPUT)でアノテーションが付けられたメソッドを使用してKinesisストリームから読み取ることができます。

@EnableBinding(Sink.class)
public class IpConsumer {

    @StreamListener(Sink.INPUT)
    public void consume(String ip) {
        System.out.println(ip);
    }
}

5.4. プロデューサー

プロデューサーは2つに分割することもできます。 まず、application.properties内でストリームプロパティを定義する必要があります。

spring.cloud.stream.bindings.output.destination=live-ips
spring.cloud.stream.bindings.output.content-type=text/plain

次に、 Spring @Componentに@EnableBinding(Source.class)を追加し、数秒ごとに新しいテストメッセージを作成します。

@Component
@EnableBinding(Source.class)
public class IpProducer {

    @Autowired
    private Source source;

    @Scheduled(fixedDelay = 3000L)
    private void produce() {
        IntStream.range(1, 200).mapToObj(ipSuffix -> "192.168.0." + ipSuffix)
          .forEach(entry -> source.output().send(MessageBuilder.withPayload(entry).build()));
    }
}

Spring Cloud StreamBinderKinesisが機能するために必要なのはこれだけです。 これでアプリケーションを起動できます。

6. 結論

この記事では、Kinesisデータストリームとやり取りするために、Springプロジェクトを2つのAWSライブラリと統合する方法を見てきました。 また、Spring Cloud Stream Binder Kinesisライブラリを使用して、実装をさらに簡単にする方法も確認しました。

この記事のソースコードは、Githubにあります。