JavaでのInfluxDBの使用
1. 概要
InfluxDBは、時系列データの高性能ストアです。 SQLのようなクエリ言語を介した、データの挿入とリアルタイムクエリをサポートします。
この紹介記事では、InfluxDbサーバーに接続し、データベースを作成し、時系列情報を書き込んでから、データベースにクエリを実行する方法を示します。
2. 設定
データベースに接続するには、pom.xmlファイルにエントリを追加する必要があります。
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.8</version>
</dependency>
この依存関係の最新バージョンは、 MavenCentralにあります。
InfluxDBインスタンスも必要です。 データベースのダウンロードとインストールの手順は、InfluxDataのWebサイトにあります。
3. サーバーへの接続
3.1. 接続の作成
データベース接続を作成するには、URL Stringとユーザー資格情報を接続ファクトリに渡す必要があります。
InfluxDB influxDB = InfluxDBFactory.connect(databaseURL, userName, password);
3.2. 接続の確認
データベースとの通信はRESTfulAPIを介して実行されるため、永続的ではありません。
APIは、接続が機能していることを確認するための専用の「ping」サービスを提供します。 接続が良好な場合、応答にはデータベースバージョンが含まれます。 そうでない場合は、「不明」が含まれます。
したがって、接続を作成した後、次のようにして接続を確認できます。
Pong response = this.influxDB.ping();
if (response.getVersion().equalsIgnoreCase("unknown")) {
log.error("Error pinging server.");
return;
}
3.3. データベースの作成
InfluxDBデータベースの作成は、ほとんどのプラットフォームでのデータベースの作成に似ています。 ただし、使用する前に少なくとも1つの保持ポリシーを作成する必要があります。
保持ポリシーは、データを保存する期間をデータベースに指示します。 CPUやメモリの統計などの時系列は、大規模なデータセットに蓄積される傾向があります。
時系列データベースのサイズを制御するための一般的な戦略は、ダウンサンプリングです。 「生」データは高速で保存され、要約され、しばらくすると削除されます。
保持ポリシーは、データの一部を有効期限に関連付けることでこれを簡素化します。 InfluxDataのサイトには、詳細な説明があります。
データベースを作成した後、defaultPolicyという名前の単一のポリシーを追加します。データを30日間保持するだけです。
influxDB.createDatabase("baeldung");
influxDB.createRetentionPolicy(
"defaultPolicy", "baeldung", "30d", 1, true);
保持ポリシーを作成するには、名前、 データベース、 間隔、 レプリケーション係数(単一インスタンスデータベースの場合は1)、およびbooleanはデフォルトポリシーであることを示します。
3.4. ロギングレベルの設定
内部的には、InfluxDBAPIはRetrofit を使用し、ロギングインターセプターを介してRetrofitのロギング機能へのインターフェイスを公開します。
したがって、次を使用してロギングレベルを設定できます。
influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
これで、接続を開いてpingを実行するとメッセージが表示されます。
Dec 20, 2017 5:38:10 PM okhttp3.internal.platform.Platform log
INFO: --> GET http://127.0.0.1:8086/ping
利用可能なレベルはベーシック 、 満杯 、 ヘッダー 、 と
4. データの追加と取得
4.1. ポイント
これで、データの挿入と取得を開始する準備が整いました。
InfluxDBの情報の基本単位は、 Point、であり、これは基本的にタイムスタンプとキー値マップです。
メモリ使用率データを保持するポイントを見てみましょう。
Point point = Point.measurement("memory")
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.addField("name", "server1")
.addField("free", 4743656L)
.addField("used", 1015096L)
.addField("buffer", 1010467L)
.build();
メモリ統計、ホスト名、およびタイムスタンプとして3つのLongsを含むエントリを作成しました。
これをデータベースに追加する方法を見てみましょう。
4.2. バッチの作成
時系列データは多くの小さなポイントで構成される傾向があり、一度に1つずつそれらのレコードを書き込むことは非常に非効率的です。推奨される方法は、レコードをバッチに収集することです。
InfluxDB APIは、BatchPointオブジェクトを提供します。
BatchPoints batchPoints = BatchPoints
.database(dbName)
.retentionPolicy("defaultPolicy")
.build();
Point point1 = Point.measurement("memory")
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.addField("name", "server1")
.addField("free", 4743656L)
.addField("used", 1015096L)
.addField("buffer", 1010467L)
.build();
Point point2 = Point.measurement("memory")
.time(System.currentTimeMillis() - 100, TimeUnit.MILLISECONDS)
.addField("name", "server1")
.addField("free", 4743696L)
.addField("used", 1016096L)
.addField("buffer", 1008467L)
.build();
batchPoints.point(point1);
batchPoints.point(point2);
influxDB.write(batchPoints);
BatchPoint を作成し、それにPointsを追加します。 タイムスタンプはプライマリインデックスであるため、2番目のエントリのタイムスタンプを過去100ミリ秒に設定しました。 同じタイムスタンプで2つのポイントを送信すると、1つだけが保持されます。
BatchPointsをデータベースおよび保持ポリシーに関連付ける必要があることに注意してください。
4.3. 一度に1つずつ書く
一部のユースケースでは、バッチ処理は実用的でない場合があります。
InfluxDB接続への1回の呼び出しでバッチモードを有効にしましょう。
influxDB.enableBatch(100, 200, TimeUnit.MILLISECONDS);
サーバーに挿入したり、200ミリ秒ごとに送信したりするために、100のバッチ処理を有効にしました。
バッチモードを有効にしても、一度に1つずつ書き込むことができます。 ただし、いくつかの追加設定が必要です。
influxDB.setRetentionPolicy("defaultPolicy");
influxDB.setDatabase(dbName);
さらに、個々のポイントを書き込むことができるようになり、バックグラウンドスレッドによってバッチで収集されています。
influxDB.write(point);
個々のポイントをエンキューする前に、データベース(SQLの use コマンドと同様)
バッチモードは、個別のスレッドプールを利用します。 したがって、不要になったときに無効にすることをお勧めします。
influxDB.disableBatch();
接続を閉じると、スレッドプールもシャットダウンされます。
influxDB.close();
4.4. クエリ結果のマッピング
クエリはQueryResultを返します。これは、POJOにマップできます。
クエリ構文を見る前に、メモリ統計を保持するクラスを作成しましょう。
@Measurement(name = "memory")
public class MemoryPoint {
@Column(name = "time")
private Instant time;
@Column(name = "name")
private String name;
@Column(name = "free")
private Long free;
@Column(name = "used")
private Long used;
@Column(name = "buffer")
private Long buffer;
}
クラスには@Measurement(name =“ memory”)の注釈が付けられ、 Pointsの作成に使用したPoint.measurement(“ memory”)に対応します。 。
QueryResult の各フィールドに、対応するフィールドの名前を使用して @Column(name =“ XXX”)アノテーションを追加します。
QueryResults は、InfluxDBResultMapper。を使用してPOJOにマップされます。
4.5. InfluxDBのクエリ
それでは、2ポイントバッチでデータベースに追加したポイントでPOJOを使用しましょう。
QueryResult queryResult = connection
.performQuery("Select * from memory", "baeldung");
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
List<MemoryPoint> memoryPointList = resultMapper
.toPOJO(queryResult, MemoryPoint.class);
assertEquals(2, memoryPointList.size());
assertTrue(4743696L == memoryPointList.get(0).getFree());
このクエリは、 memory という名前の測定値が、selectから選択できるPointsのテーブルとしてどのように保存されるかを示しています。
InfluxDBResultMapper は、 QueryResultを使用してMemoryPoint.classへの参照を受け入れ、ポイントのリストを返します。
結果をマッピングした後、クエリから受け取った List の長さをチェックして、2つ受け取ったことを確認します。 次に、リストの最初のエントリを見て、挿入した2番目のポイントの空きメモリサイズを確認します。 InfluxDBからのクエリ結果のデフォルトの順序は、タイムスタンプの昇順です。
それを変えましょう:
queryResult = connection.performQuery(
"Select * from memory order by time desc", "baeldung");
memoryPointList = resultMapper
.toPOJO(queryResult, MemoryPoint.class);
assertEquals(2, memoryPointList.size());
assertTrue(4743656L == memoryPointList.get(0).getFree());
order by time desc を追加すると、結果の順序が逆になります。
InfluxDBクエリはSQLと非常によく似ています。 彼らのサイトには広範なリファレンスガイドがあります。
5. 結論
InfluxDBサーバーに接続し、保持ポリシーを使用してデータベースを作成してから、サーバーにデータを挿入して取得しました。
例の完全なソースコードは、GitHubのoverです。