1. 概要

InfluxDBは、時系列データの高性能ストアです。 SQLのようなクエリ言語を介した、データの挿入とリアルタイムクエリをサポートします。

この紹介記事では、InfluxDbサーバーに接続し、データベースを作成し、時系列情報を書き込んでから、データベースにクエリを実行する方法を示します。

2. 設定

データベースに接続するには、pom.xmlファイルにエントリを追加する必要があります。

<dependency>
    <groupId>org.influxdb</groupId>
    <artifactId>influxdb-java</artifactId>
    <version>2.8</version>
</dependency>

この依存関係の最新バージョンは、 MavenCentralにあります。

InfluxDBインスタンスも必要です。 データベースのダウンロードとインストールの手順は、InfluxDataのWebサイトにあります。

3. サーバーへの接続

3.1. 接続の作成

データベース接続を作成するには、URL Stringとユーザー資格情報を接続ファクトリに渡す必要があります。

InfluxDB influxDB = InfluxDBFactory.connect(databaseURL, userName, password);

3.2. 接続の確認

データベースとの通信はRESTfulAPIを介して実行されるため、永続的ではありません。

APIは、接続が機能していることを確認するための専用の「ping」サービスを提供します。 接続が良好な場合、応答にはデータベースバージョンが含まれます。 そうでない場合は、「不明」が含まれます。

したがって、接続を作成した後、次のようにして接続を確認できます。

Pong response = this.influxDB.ping();
if (response.getVersion().equalsIgnoreCase("unknown")) {
    log.error("Error pinging server.");
    return;
} 

3.3. データベースの作成

InfluxDBデータベースの作成は、ほとんどのプラットフォームでのデータベースの作成に似ています。 ただし、使用する前に少なくとも1つの保持ポリシーを作成する必要があります。

保持ポリシーは、データを保存する期間をデータベースに指示します。 CPUやメモリの統計などの時系列は、大規模なデータセットに蓄積される傾向があります。

時系列データベースのサイズを制御するための一般的な戦略は、ダウンサンプリングです。 「生」データは高速で保存され、要約され、しばらくすると削除されます。

保持ポリシーは、データの一部を有効期限に関連付けることでこれを簡素化します。 InfluxDataのサイトには、詳細な説明があります。

データベースを作成した後、defaultPolicyという名前の単一のポリシーを追加します。データを30日間保持するだけです。

influxDB.createDatabase("baeldung");
influxDB.createRetentionPolicy(
  "defaultPolicy", "baeldung", "30d", 1, true);

保持ポリシーを作成するには、名前、 データベース、 間隔、 レプリケーション係数(単一インスタンスデータベースの場合は1)、およびbooleanはデフォルトポリシーであることを示します。

3.4. ロギングレベルの設定

内部的には、InfluxDBAPIはRetrofit を使用し、ロギングインターセプターを介してRetrofitのロギング機能へのインターフェイスを公開します。

したがって、次を使用してロギングレベルを設定できます。

influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);

これで、接続を開いてpingを実行するとメッセージが表示されます。

Dec 20, 2017 5:38:10 PM okhttp3.internal.platform.Platform log
INFO: --> GET http://127.0.0.1:8086/ping

利用可能なレベルはベーシック 満杯 ヘッダー 、 となし。

4. データの追加と取得

4.1. ポイント

これで、データの挿入と取得を開始する準備が整いました。

InfluxDBの情報の基本単位は、 Point、であり、これは基本的にタイムスタンプとキー値マップです。

メモリ使用率データを保持するポイントを見てみましょう。

Point point = Point.measurement("memory")
  .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
  .addField("name", "server1")
  .addField("free", 4743656L)
  .addField("used", 1015096L)
  .addField("buffer", 1010467L)
  .build();

メモリ統計、ホスト名、およびタイムスタンプとして3つのLongsを含むエントリを作成しました。

これをデータベースに追加する方法を見てみましょう。

4.2. バッチの作成

時系列データは多くの小さなポイントで構成される傾向があり、一度に1つずつそれらのレコードを書き込むことは非常に非効率的です。推奨される方法は、レコードをバッチに収集することです。

InfluxDB APIは、BatchPointオブジェクトを提供します。

BatchPoints batchPoints = BatchPoints
  .database(dbName)
  .retentionPolicy("defaultPolicy")
  .build();

Point point1 = Point.measurement("memory")
  .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
  .addField("name", "server1") 
  .addField("free", 4743656L)
  .addField("used", 1015096L) 
  .addField("buffer", 1010467L)
  .build();

Point point2 = Point.measurement("memory")
  .time(System.currentTimeMillis() - 100, TimeUnit.MILLISECONDS)
  .addField("name", "server1")
  .addField("free", 4743696L)
  .addField("used", 1016096L)
  .addField("buffer", 1008467L)
  .build();

batchPoints.point(point1);
batchPoints.point(point2);
influxDB.write(batchPoints);

BatchPoint を作成し、それにPointsを追加します。 タイムスタンプはプライマリインデックスであるため、2番目のエントリのタイムスタンプを過去100ミリ秒に設定しました。 同じタイムスタンプで2つのポイントを送信すると、1つだけが保持されます。

BatchPointsをデータベースおよび保持ポリシーに関連付ける必要があることに注意してください。

4.3. 一度に1つずつ書く

一部のユースケースでは、バッチ処理は実用的でない場合があります。

InfluxDB接続への1回の呼び出しでバッチモードを有効にしましょう。

influxDB.enableBatch(100, 200, TimeUnit.MILLISECONDS);

サーバーに挿入したり、200ミリ秒ごとに送信したりするために、100のバッチ処理を有効にしました。

バッチモードを有効にしても、一度に1つずつ書き込むことができます。 ただし、いくつかの追加設定が必要です。

influxDB.setRetentionPolicy("defaultPolicy");
influxDB.setDatabase(dbName);

さらに、個々のポイントを書き込むことができるようになり、バックグラウンドスレッドによってバッチで収集されています。

influxDB.write(point);

個々のポイントをエンキューする前に、データベース(SQLの use コマンドと同様)を設定し、デフォルトの保持ポリシーを設定する必要があります。複数の保持ポリシーを使用してダウンサンプリングを利用したい場合は、バッチを作成するのが最善の方法です。

バッチモードは、個別のスレッドプールを利用します。 したがって、不要になったときに無効にすることをお勧めします。

influxDB.disableBatch();

接続を閉じると、スレッドプールもシャットダウンされます。

influxDB.close();

4.4. クエリ結果のマッピング

クエリはQueryResultを返します。これは、POJOにマップできます。

クエリ構文を見る前に、メモリ統計を保持するクラスを作成しましょう。

@Measurement(name = "memory")
public class MemoryPoint {

    @Column(name = "time")
    private Instant time;

    @Column(name = "name")
    private String name;

    @Column(name = "free")
    private Long free;

    @Column(name = "used")
    private Long used;

    @Column(name = "buffer")
    private Long buffer;
}

クラスには@Measurement(name =“ memory”)の注釈が付けられ、 Pointsの作成に使用したPoint.measurement(“ memory”)に対応します。 。

QueryResult の各フィールドに、対応するフィールドの名前を使用して @Column(name =“ XXX”)アノテーションを追加します。

QueryResults は、InfluxDBResultMapper。を使用してPOJOにマップされます。

4.5. InfluxDBのクエリ

それでは、2ポイントバッチでデータベースに追加したポイントでPOJOを使用しましょう。

QueryResult queryResult = connection
  .performQuery("Select * from memory", "baeldung");

InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
List<MemoryPoint> memoryPointList = resultMapper
  .toPOJO(queryResult, MemoryPoint.class);

assertEquals(2, memoryPointList.size());
assertTrue(4743696L == memoryPointList.get(0).getFree());

このクエリは、 memory という名前の測定値が、selectから選択できるPointsのテーブルとしてどのように保存されるかを示しています。

InfluxDBResultMapper は、 QueryResultを使用してMemoryPoint.classへの参照を受け入れ、ポイントのリストを返します。

結果をマッピングした後、クエリから受け取った List の長さをチェックして、2つ受け取ったことを確認します。 次に、リストの最初のエントリを見て、挿入した2番目のポイントの空きメモリサイズを確認します。 InfluxDBからのクエリ結果のデフォルトの順序は、タイムスタンプの昇順です。

それを変えましょう:

queryResult = connection.performQuery(
  "Select * from memory order by time desc", "baeldung");
memoryPointList = resultMapper
  .toPOJO(queryResult, MemoryPoint.class);

assertEquals(2, memoryPointList.size());
assertTrue(4743656L == memoryPointList.get(0).getFree());

order by time desc を追加すると、結果の順序が逆になります。

InfluxDBクエリはSQLと非常によく似ています。 彼らのサイトには広範なリファレンスガイドがあります。

5. 結論

InfluxDBサーバーに接続し、保持ポリシーを使用してデータベースを作成してから、サーバーにデータを挿入して取得しました。

例の完全なソースコードは、GitHuboverです。