1. 概要

ストリーミングを使用すると、通常はメモリに収まらない大きなデータセットまたは無限のデータセットを段階的にロードして、ヒープを壊したりOOMエラーが発生したりすることなくそれらを操作できます。

FS2 ストリーミングライブラリは、関数型プログラミング(FP)パラダイムを利用することにより、完全に機能的な方法でこの目標を達成するのに役立ちます。

2. ストリーム

90 GBを超える大きなデータファイルがあり、ファイル内の単語の総数をカウントして結果を別のファイルに書き込むプログラムを作成する必要があるとします。 単純なアプローチは、ファイルをメモリにロードしてから、単語を数えようとすることです。

これは、機能しない方法で大きなファイル内の単語をカウントしようとする例です。

def readAndWriteFile(readFrom: String, writeTo: String) = {
  val counts = mutable.Map.empty[String, Int]
  val fileSource = scala.io.Source.fromFile(readFrom)
  try {
    fileSource
      .getLines()
      .toList
      .flatMap(_.split("\\W+"))
      .foreach { word =>
        counts += (word -> (counts.getOrElse(word, 0) + 1))
      }
  } finally {
    fileSource.close()
  }
  val fileContent = counts.foldLeft("") {
   case (accumulator, (word, count)) =>
     accumulator + s"$word = $count\n"
 }
  val writer = new PrintWriter(new File(writeTo))
  writer.write(fileContent)
  writer.close()
}

このコードを実行すると、コンパイラからこのエラーが発生します。

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space

このエラーは、ファイルが大きすぎてメモリに収まらないことを示しています。 イテレータでtoListを呼び出し、ファイルからメモリにすべてのデータをフェッチしました。

ScalaのStreamsは、データを段階的にロードし、データの各チャンクを処理するのに役立つAPIを提供することで、この問題を解決するのに役立ちます。 これで、ファイル全体をメモリにロードするのではなく、構成可能なチャンクにデータをロードし、それらが入ってくるときに処理します。

ストリーミングは、迅速かつ応答性の高い方法でデータを取り込み、処理、分析、および保存するのに役立ちます。

ストリームは通常、ListVectorなどの通常のScalaコレクションと同様のAPIを定義します。 これらのストリームには、データ処理用の map flatMap collectなどの高階関数が定義されています。

ストリームは、他のデータソースとやり取りするときにも役立ちます。 上記で説明した大きなデータをダウンロードするために使用されるRESTAPIがあるとします。 クライアントにプッシュする前にすべてのデータをサーバーにロードしようとすると、1つのクライアントにサービスを提供するために使用されるメモリの量が多すぎるため、サーバーのメモリが不足したり、他のクライアントの速度が低下したりする可能性があります。 ただし、ファイルを小さなチャンクでストリーミングすると、メモリが不足する可能性が低くなり、複数のクライアントに同等のサービスを簡単に提供できます。

ストリームが遅延評価されることを知っておくことが重要です。 map などの高階関数を適用する場合、ストリーム全体をすぐに作成するのではなく、ストリームが遅延して作成されます。

上記の例でメモリエラーを防ぐことができた1つの方法は、イテレータでtoListの代わりにtoStreamを呼び出すことでした。

非同期および並列プログラミングの容易さやスロットリングなど、標準ライブラリの Stream が提供しない、日常のプログラミングに必要ないくつかの高度な機能があります。 ここで、 AkkaStreamsFS2などのこの機能を提供するライブラリの機能を活用します。

Fs2ストリームの能力を使用して、特に大規模なデータの問題を機能的に解決する方法を見てみましょう。

3. FS2ストリーム

FS2は、データの増分ロードと変換を含むストリームを定義する機能的な方法を提供します。

FS2ストリームは、タイプ Stream [+ F [_]、+ O] によって定義されます。これは、タイプ F の環境(コンテキストとも呼ばれる)を必要とするストリームを定義します。 Oの値を出力します。

FS2ストリームは、リソースの取得と解放、並列計算などを組み込んだストリームを構築する機能を備えています。 また、モジュール性を提供するように設計されているため、簡単に構成および再利用できるストリームの小さな要素を徐々に構築できます。

FS2ストリームを使用して、単語数の例を実行するストリームを定義できます。

  def readAndWriteFile(readFrom: String, writeTo: String): Stream[IO, Unit] =
    Stream.resource(Blocker[IO]).flatMap { blocker =>
    val source: Stream[IO, Byte] =   io.file.readAll[IO](Paths.get(readFrom), blocker, 4096)
 
  val pipe : Pipe[IO,Byte,Byte] = src =>
    src.through(text.utf8Decode)
    .through(text.lines)
      .flatMap(line => Stream.apply(line.split("\\W+"): _*))
      .fold(Map.empty[String, Int]) {
        (count, word) =>
          count + (word -> (count.getOrElse(word, 0) + 1))
      }
      .map (_.foldLeft("") {
          case (accumulator, (word, count)) =>
            accumulator + s"$word = $count\n"
        }
      )
      .through(text.utf8Encode)

  val sink : Pipe[IO,Byte,Unit] = io.file.writeAll(Paths.get(writeTo), blocker)

  source
    .through(pipe)
    .through(sink)
}

FS2を使用することの利点は、機能的な方法で同じ目標を達成できたことです。どこにも変更可能な変数はありません。

この例は、特にFS2を初めて使用する場合は、少し気が遠くなるように見えることがあります。 そこで、ストリームの各要素について説明します。

ストリームを理解する良い方法は、 Akka Streams と同様に、 Source Pipe 、およびSinkの観点からストリームを考えることです。 ]、適切に接続されたETLパイプラインを形成します。

3.1. ソース

Source は、ファイル、データベース、またはソケットに至るまでのあらゆるものからデータを段階的にフェッチする役割を担う1つのオープン出力を持つストリームの開始点またはエントリポイントと考えることができます。 これは、相互に接続された任意の数の内部ソースと変換で構成できます。

FS2ストリームモデルは「プル」モデルです。つまり、「ダウンストリーム」関数またはパーツが「アップストリーム」関数を呼び出して、必要に応じてデータを取得します。

これが意味するのは、データのロードを担当する Source が、処理パイプラインのさらに下流でデータが必要な場合にのみデータをロードするということです。 たとえば、ダウンストリームでエラーが発生した場合、または例外がスローされて処理されなかった場合、 Source はそれ以上のデータのロードを停止し、通常は取得したリソースを解放します。

Source を定義する最も簡単な方法は、applyメソッドを使用することです。

val intStream: Stream[Pure, Int] = Stream(1,2,3,4,5)

コンテキストを必要としないストリーム、つまり Pure F タイプを定義し、Intを発行しました。

単語数の例では、読み取りたいファイルパスを取得するFS 2.io.file.readAllメソッドを使用してSourceを定義しました。これは、 ExecutionContext、と、ダウンストリームがデータをプルしてバイトを継続的に生成するたびに読み取るデータのサイズ:

val source: Stream[IO, Byte] =   io.file.readAll[IO](Paths.get(readFrom), blocker, 4096)

定義したSourceの型署名を見ると、最初のパラメーター IO がコンテキストまたは環境タイプであり、2番目のパラメーターByteであることがわかります。 ]は、このSourceがダウンストリームで送信するデータのタイプです。

すべてのストリームに少なくとも1つのソースが必要です。

3.2. パイプ

Pipe は、入力と出力が開いているストリーミング要素です。 これは、通常Sourceから取得されるデータを変換するための処理ステップとして使用されます。 これは、単一または複数の処理ステップで構成できます。

Pipeには型署名Pipe[F [_]、-I、+ O] があり、これは Stream [F、I]=>Streamの型エイリアスです。 [F、O] ここで、 F はエフェクトタイプを表し、 I は入力要素のタイプを表し、Oは出力のタイプを表します。エレメント。

簡単に言うと、FS2の Pipe は、特定のタイプのストリームを受け取り、同じタイプまたは異なるタイプの別のストリームを返す関数です。

IntStreamを取り、それらに1を追加する単純なPipeを定義しましょう。

val add1Pipe : Pipe[Pure, Int,Int] = stream => stream.map(_ + 1)

これは簡単に次のように書き直すことができます。

val add1Pipe : Pipe[Pure, Int,Int] = _.map(_ + 1)

単語数の例では、カウントロジックをパイプに入れます。

  • バイトのストリームを取得します
  • それらをStringsに変換します
  • それらを行で分割します
  • 行を単語に分割し、ストリームにフラット化します
  • 単語と出現回数をマップに保存します
  • 単語とその出現回数を表すStringを作成します
  • 作成されたStringByteのストリームに変換します
val pipe : Pipe[IO,Byte,Byte] = src =>
     src.through(text.utf8Decode) // bytes to String
     .through(text.lines) // split Strings to lines 
       .flatMap(line => Stream.apply(line.split("\\W+"): _*)) // split lines to words and flatten
       .fold(Map.empty[String, Int]) {
         (count, word) =>
           count + (word -> (count.getOrElse(word, 0) + 1))
       }
       .map (_.foldLeft("") {
           case (accumulator, (word, count)) =>
             accumulator + s"$word = $count\n"
         }
       )
       .through(text.utf8Encode) // transform String to stream of bytes 

パイプの型署名はPipe[IO、Byte、Byte] です。これは、バイトのストリームから別のバイトのストリームへの関数として表されるストリーム変換です。

パイプソースに接続されている場合、結果は新しいソースになります。 同様に、パイプシンクに接続すると、新しいシンクが作成されます。

3.3. シンク

シンクは、主にストリームの終わりを表すストリーム処理要素と考えることができます。 これは通常、ファイル、データベース、またはソケットへの書き込みやコンソールへの印刷など、効果的な操作が実行される場所です。

FS2では、シンクはシンク[F [_]、-I] として表されます。これは、パイプ[F、I、ユニット]のタイプエイリアスです。 前者は後者のために非推奨になりました。 シンクは、ストリームをストリーム[F、ユニット]に変換する特殊なパイプです。

これは、入力値を受け取り、通常はエフェクトを実行し、その入力値を使用する可能性のある効果的な操作を示すUnitを返す関数です。 

単語数の例では、 Byte のストリームを取得し、それらのByteをファイルに書き込むシンクを定義しました。

val sink : Pipe[IO,Byte,Unit] = io.file.writeAll(Paths.get(writeTo), blocker)

シンクは任意の値を返すことができる単なる通常のパイプであることに注意することが重要ですが、通常は効果的な計算を表すため、Unitの戻りタイプです。

3.4. ストリームを接続する

ストリーム要素を定義したので、これらの要素がどのように接続されてストリーム全体を形成するかについて説明しましょう。

単語数の例では、 Byteを出力するSource、それらのバイトへの一連の変換を実行するパイプ、およびそれらのバイトを取得して書き込むシンクを定義しました。ファイル。

これらの要素を接続する主な方法の1つは、からと呼ばれるFS2ストリームのメソッドを使用することです。これは、パイプを指定して特定のストリームを変換します。

val stream : Stream[IO,Unit] = 
   source
    .through(pipe)
    .through(sink)

これは、要素を接続して、単語数を実行する実行可能なストリームを形成する場所です。

4. 高度な機能

FS2ストリーミングライブラリには、他にもたくさんの機能があり、エキサイティングです。 非同期計算またはバッチ処理のサポートが組み込まれています。 このセクションでは、このストリーミングライブラリが提供するより高度な機能のいくつかを見ていきます。

4.1. バッチ処理

バッチ処理は、要素をグループ化し、処理のためにそれらをダウンストリームに送信するプロセスです。 FS2では、これらの要素のグループはチャンクと呼ばれます。

チャンクは、FS2ストリームによって内部的に使用される値の有限シーケンスと考えることができます。

Stream((1 to 100) : _*)
  .chunkN(10) // group 10 elements together
  .map(println)
  .compile
  .drain

4.2. 非同期性

FS2ストリームを使用して、並行性と並列性の力を活用する非同期コードを記述できます。

これは、ある範囲の数値を同時にソケットに書き込むという不自然な例です。

def writeToSocket[F[_] : Async](chunk: Chunk[String]): F[Unit] =
  Async[F].async { callback =>
     println(s"[thread: ${Thread.currentThread().getName}] :: Writing $chunk to socket")
     callback(Right(()))
  }

Stream((1 to 100).map(_.toString): _*)
  .chunkN(10)
  .covary[IO]
  .parEvalMapUnordered(10)(writeToSocket[IO])
  .compile
  .drain

parEvalMapUnordered は効果を並行して評価し、結果を順序付けられていない方法でダウンストリームに出力します。 このコードを実行すると、数値が並行して出力されることがわかります。 同時効果の数は、提供されたmaxConcurrentパラメーターによって制限されます。

ライブラリは、エラー処理やスロットリングなどの他の機能も提供します。

5. 結論

この記事では、ストリームが大規模または無限のデータセットの処理にどのように役立つかを見てきました。 また、 FS2 ライブラリを調べて、ストリーミングアプリケーションを機能的に作成する方法を確認し、ライブラリが提供するいくつかの追加機能について学習しました。

いつものように、ソースコードはGitHubにあります。