1. 概要

この記事では、Netflixによって開発されたMantisプラットフォームを見ていきます。

ストリーム処理ジョブを作成、実行、調査することにより、Mantisの主要な概念を探ります。

2. カマキリとは何ですか?

Mantisは、ストリーム処理アプリケーション(ジョブ)を構築するためのプラットフォームです。 これは、ジョブの展開とライフサイクルを管理する簡単な方法を提供します。さらに、は、これらのジョブ間のリソース割り当て、検出、および通信を容易にします。

したがって、開発者は、堅牢でスケーラブルなプラットフォームをサポートしながら、実際のビジネスロジックに集中して、大量、低遅延、ノンブロッキングのアプリケーションを実行できます。

Mantisの仕事は、次の3つの部分で構成されています。

  • source 、外部ソースからのデータの取得を担当
  • 着信イベントストリームの処理を担当する1つ以上のステージ
  • 処理されたデータを収集するシンク

それぞれを調べてみましょう。

3. セットアップと依存関係

mantis-runtimejackson-databindの依存関係を追加することから始めましょう。

<dependency>
    <groupId>io.mantisrx</groupId>
    <artifactId>mantis-runtime</artifactId>
</dependency>

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

ここで、ジョブのデータソースを設定するために、Mantis Sourceインターフェイスを実装しましょう。

public class RandomLogSource implements Source<String> {

    @Override
    public Observable<Observable<String>> call(Context context, Index index) {
        return Observable.just(
          Observable
            .interval(250, TimeUnit.MILLISECONDS)
            .map(this::createRandomLogEvent));
    }

    private String createRandomLogEvent(Long tick) {
        // generate a random log entry string
        ...
    }

}

ご覧のとおり、1秒間に複数回ランダムなログエントリを生成するだけです。

4. 私たちの最初の仕事

次に、RandomLogSourceからログイベントを収集するだけのMantisジョブを作成しましょう。 後で、より複雑で興味深い結果を得るために、グループ変換と集計変換を追加します。

まず、LogEventエンティティを作成しましょう。

public class LogEvent implements JsonType {
    private Long index;
    private String level;
    private String message;

    // ...
}

次に、TransformLogStage。を追加しましょう。

これは、ScalarComputationインターフェイスを実装し、ログエントリを分割してLogEventを構築する単純なステージです。 また、間違った形式の文字列を除外します。

public class TransformLogStage implements ScalarComputation<String, LogEvent> {

    @Override
    public Observable<LogEvent> call(Context context, Observable<String> logEntry) {
        return logEntry
          .map(log -> log.split("#"))
          .filter(parts -> parts.length == 3)
          .map(LogEvent::new);
    }

}

4.1. ジョブの実行

この時点で、Mantisの仕事をまとめるのに十分なビルディングブロックがあります。

public class LogCollectingJob extends MantisJobProvider<LogEvent> {

    @Override
    public Job<LogEvent> getJobInstance() {
        return MantisJob
          .source(new RandomLogSource())
          .stage(new TransformLogStage(), new ScalarToScalar.Config<>())
          .sink(Sinks.eagerSubscribe(Sinks.sse(LogEvent::toJsonString)))
          .metadata(new Metadata.Builder().build())
          .create();
    }

}

私たちの仕事を詳しく見てみましょう。

ご覧のとおり、 MantisJobProviderを拡張します。最初に、 RandomLogSource からデータをフェッチし、TransformLogStageをフェッチしたデータに適用します。 最後に、処理されたデータを組み込みのシンクに送信します。組み込みのシンクは、SSEを介してデータを熱心にサブスクライブして配信します。

それでは、起動時にローカルで実行するようにジョブを構成しましょう。

@SpringBootApplication
public class MantisApplication implements CommandLineRunner {

    // ...
 
    @Override
    public void run(String... args) {
        LocalJobExecutorNetworked.execute(new LogCollectingJob().getJobInstance());
    }
}

アプリケーションを実行してみましょう。 次のようなログメッセージが表示されます。

...
Serving modern HTTP SSE server sink on port: 86XX

curlを使用してシンクに接続しましょう。

$ curl localhost:86XX
data: {"index":86,"level":"WARN","message":"login attempt"}
data: {"index":87,"level":"ERROR","message":"user created"}
data: {"index":88,"level":"INFO","message":"user created"}
data: {"index":89,"level":"INFO","message":"login attempt"}
data: {"index":90,"level":"INFO","message":"user created"}
data: {"index":91,"level":"ERROR","message":"user created"}
data: {"index":92,"level":"WARN","message":"login attempt"}
data: {"index":93,"level":"INFO","message":"user created"}
...

4.2. シンクの構成

これまで、処理されたデータを収集するために組み込みのシンクを使用してきました。 カスタムシンクを提供することでシナリオに柔軟性を追加できるかどうかを見てみましょう。

たとえば、メッセージでログをフィルタリングしたい場合はどうなりますか?

を作成しましょう LogSink を実装しますシンクインターフェース:

public class LogSink implements Sink<LogEvent> {
    @Override
    public void call(Context context, PortRequest portRequest, Observable<LogEvent> logEventObservable) {
        SelfDocumentingSink<LogEvent> sink = new ServerSentEventsSink.Builder<LogEvent>()
          .withEncoder(LogEvent::toJsonString)
          .withPredicate(filterByLogMessage())
          .build();
        logEventObservable.subscribe();
        sink.call(context, portRequest, logEventObservable);
    }
    private Predicate<LogEvent> filterByLogMessage() {
        return new Predicate<>("filter by message",
          parameters -> {
            if (parameters != null && parameters.containsKey("filter")) {
                return logEvent -> logEvent.getMessage().contains(parameters.get("filter").get(0));
            }
            return logEvent -> true;
        });
    }
}

このシンクの実装では、 filter パラメーターを使用して、filterパラメーターに設定されたテキストを含むログのみを取得する述語を構成しました。

$ curl localhost:8874?filter=login
data: {"index":93,"level":"ERROR","message":"login attempt"}
data: {"index":95,"level":"INFO","message":"login attempt"}
data: {"index":97,"level":"ERROR","message":"login attempt"}
...

注Mantisは、強力なクエリ言語 MQL も提供します。これは、SQL方式でストリームデータのクエリ、変換、および分析に使用できます。

5. ステージチェーン

ここで、特定の時間間隔にある ERROR WARN、、またはINFOログエントリの数を知りたいとしましょう。 このために、ジョブにさらに2つのステージを追加し、それらをチェーンします。

5.1. グループ化

まず、GroupLogStage。を作成しましょう。

このステージは、既存のTransformLogStageからLogEventストリームデータを受信するToGroupComputation実装です。 その後、ログレベルごとにエントリをグループ化し、次のステージに送信します。

public class GroupLogStage implements ToGroupComputation<LogEvent, String, LogEvent> {

    @Override
    public Observable<MantisGroup<String, LogEvent>> call(Context context, Observable<LogEvent> logEvent) {
        return logEvent.map(log -> new MantisGroup<>(log.getLevel(), log));
    }

    public static ScalarToGroup.Config<LogEvent, String, LogEvent> config(){
        return new ScalarToGroup.Config<LogEvent, String, LogEvent>()
          .description("Group event data by level")
          .codec(JacksonCodecs.pojo(LogEvent.class))
          .concurrentInput();
    }
    
}

また、説明、出力のシリアル化に使用するコーデックを提供することでカスタムステージ構成を作成し、 constantInput()。を使用してこのステージの呼び出しメソッドを同時に実行できるようにしました。

注意すべき点の1つは、このステージは水平方向にスケーラブルであるということです。 つまり、このステージのインスタンスを必要な数だけ実行できます。 また、Mantisクラスターにデプロイすると、 このステージはデータを次のステージに送信し、特定のグループに属するすべてのイベントが次のステージの同じワーカーに到達するようにします。

5.2. 集約

次のステージに進んで作成する前に、まずLogAggregateエンティティを追加しましょう。

public class LogAggregate implements JsonType {

    private final Integer count;
    private final String level;

}

それでは、チェーンの最後のステージを作成しましょう。

このステージは、 GroupToScalarComputation を実装し、ロググループのストリームをスカラーLogAggregateに変換します。 これは、各タイプのログがストリームに表示される回数をカウントすることによって行われます。 さらに、 LogAggregationDuration パラメーターもあり、これを使用して集計ウィンドウのサイズを制御できます。

public class CountLogStage implements GroupToScalarComputation<String, LogEvent, LogAggregate> {

    private int duration;

    @Override
    public void init(Context context) {
        duration = (int)context.getParameters().get("LogAggregationDuration", 1000);
    }

    @Override
    public Observable<LogAggregate> call(Context context, Observable<MantisGroup<String, LogEvent>> mantisGroup) {
        return mantisGroup
          .window(duration, TimeUnit.MILLISECONDS)
          .flatMap(o -> o.groupBy(MantisGroup::getKeyValue)
            .flatMap(group -> group.reduce(0, (count, value) ->  count = count + 1)
              .map((count) -> new LogAggregate(count, group.getKey()))
            ));
    }

    public static GroupToScalar.Config<String, LogEvent, LogAggregate> config(){
        return new GroupToScalar.Config<String, LogEvent, LogAggregate>()
          .description("sum events for a log level")
          .codec(JacksonCodecs.pojo(LogAggregate.class))
          .withParameters(getParameters());
    }

    public static List<ParameterDefinition<?>> getParameters() {
        List<ParameterDefinition<?>> params = new ArrayList<>();

        params.add(new IntParameter()
          .name("LogAggregationDuration")
          .description("window size for aggregation in milliseconds")
          .validator(Validators.range(100, 10000))
          .defaultValue(5000)
          .build());

        return params;
    }
    
}

5.3. ジョブの構成と実行

今やらなければならないことは、ジョブを構成することだけです。

public class LogAggregationJob extends MantisJobProvider<LogAggregate> {

    @Override
    public Job<LogAggregate> getJobInstance() {

        return MantisJob
          .source(new RandomLogSource())
          .stage(new TransformLogStage(), TransformLogStage.stageConfig())
          .stage(new GroupLogStage(), GroupLogStage.config())
          .stage(new CountLogStage(), CountLogStage.config())
          .sink(Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString)))
          .metadata(new Metadata.Builder().build())
          .create();
    }
}

アプリケーションを実行して新しいジョブを実行するとすぐに、ログカウントが数秒ごとに取得されていることがわかります。

$ curl localhost:8133
data: {"count":3,"level":"ERROR"}
data: {"count":13,"level":"INFO"}
data: {"count":4,"level":"WARN"}

data: {"count":8,"level":"ERROR"}
data: {"count":5,"level":"INFO"}
data: {"count":7,"level":"WARN"}
...

6. 結論

要約すると、この記事では、Netflix Mantisとは何か、そしてそれが何に使用できるかを見てきました。 さらに、主要な概念を確認し、それらを使用してジョブを構築し、さまざまなシナリオのカスタム構成を検討しました。

いつものように、完全なコードはGitHubから入手できます。