序章

Transporterは、さまざまなデータストア間でデータを移動するためのオープンソースツールです。 開発者は、データベース間でのデータの移動、ファイルからデータベースへのデータの移動、またはその逆などのタスクのために1回限りのスクリプトを作成することがよくありますが、Transporterなどのツールを使用することにはいくつかの利点があります。

Transporterでは、パイプラインを構築します。これは、ソース(データが読み取られる場所)からシンク(データが書き込まれる場所)へのデータのフローを定義します。 )。 ソースとシンクは、SQLまたはNoSQLデータベース、フラットファイル、またはその他のリソースです。 Transporterはプラグイン可能な拡張機能であるadaptorsを使用してこれらのリソースと通信し、プロジェクトにはデフォルトで人気のあるデータベース用のいくつかのアダプターが含まれています。

Transporterでは、データの移動に加えて、トランスフォーマーを使用してパイプラインを移動するときにデータを変更することもできます。 アダプターと同様に、デフォルトでいくつかのトランスフォーマーが含まれています。 独自のトランスフォーマーを作成して、データの変更をカスタマイズすることもできます。

このチュートリアルでは、Transporterの組み込みアダプターとJavaScriptで記述されたカスタムトランスフォーマーを使用して、MongoDBデータベースからElasticsearchにデータを移動および処理する例について説明します。

前提条件

このチュートリアルに従うには、次のものが必要です。

トランスポーターパイプラインはJavaScriptで記述されています。 このチュートリアルに従うためにJavaScriptの予備知識や経験は必要ありませんが、これらのJavaScriptチュートリアルで詳細を学ぶことができます。

ステップ1—トランスポーターのインストール

Transporterは、最も一般的なオペレーティングシステム用のバイナリを提供します。 Ubuntuのインストールプロセスには、Linuxバイナリをダウンロードして実行可能にするという2つのステップが含まれます。

まず、GitHubTransporterの最新リリースページから最新バージョンへのリンクを取得します。 で終わるリンクをコピーします -linux-amd64. このチュートリアルでは、執筆時点で最新のv0.5.2を使用しています。

バイナリをホームディレクトリにダウンロードします。

  1. cd
  2. wget https://github.com/compose/transporter/releases/download/v0.5.2/transporter-0.5.2-linux-amd64

それを/usr/ local /binまたはお好みのインストールディレクトリに移動します。

  1. mv transporter-*-linux-amd64 /usr/local/bin/transporter

次に、実行可能にして、実行できるようにします。

  1. chmod +x /usr/local/bin/transporter

バイナリを実行することにより、Transporterが正しくセットアップされていることをテストできます。

  1. transporter

使用法ヘルプの出力とバージョン番号が表示されます。

Output
USAGE transporter <command> [flags] COMMANDS run run pipeline loaded from a file . . . VERSION 0.5.2

Transporterを使用してMongoDBからElasticsearchにデータを移動するには、移動するMongoDBのデータと、Transporterに移動方法を指示するパイプラインの2つが必要です。 次のステップでいくつかのサンプルデータを作成しますが、移動するMongoDBデータベースがすでにある場合は、次のステップをスキップして、ステップ3に直接進むことができます。

ステップ2—サンプルデータをMongoDBに追加する(オプション)

このステップでは、MongoDBに単一のコレクションを含むサンプルデータベースを作成し、そのコレクションにいくつかのドキュメントを追加します。 次に、チュートリアルの残りの部分で、このサンプルデータをTransporterパイプラインを使用して移行および変換します。

まず、MongoDBデータベースに接続します。

  1. mongo

これにより、プロンプトが次のように変更されます mongo>、MongoDBシェルを使用していることを示します。

ここから、作業するデータベースを選択します。 私たちは私たちと呼びます my_application.

  1. use my_application

MongoDB、データベースまたはコレクションを明示的に作成する必要はありません。 名前で選択したデータベースへのデータの追加を開始すると、そのデータベースが自動的に作成されます。

だから、作成するには my_application データベース、2つのドキュメントをそのデータベースに保存します users コレクション:1つはサミーシャークを表し、もう1つはギリーグロウフィッシュを表します。 これがテストデータになります。

  1. db.users.save({"firstName": "Sammy", "lastName": "Shark"});
  2. db.users.save({"firstName": "Gilly", "lastName": "Glowfish"});

ドキュメントを追加したら、クエリを実行できます users あなたの記録を見るためのコレクション。

  1. db.users.find().pretty();

出力は以下の出力のようになりますが、 _id 列が異なります。 MongoDBは、コレクション内のドキュメントを一意に識別するためにオブジェクトIDを自動的に追加します。

output
{ "_id" : ObjectId("59299ac7f80b31254a916456"), "firstName" : "Sammy", "lastName" : "Shark" } { "_id" : ObjectId("59299ac7f80b31254a916457"), "firstName" : "Gilly", "lastName" : "Glowfish" }

プレス CTRL+C MongoDBシェルを終了します。

次に、このデータをMongoDBからElasticsearchに移動するためのTransporterパイプラインを作成しましょう。

ステップ3—基本的なパイプラインを作成する

Transporterのパイプラインは、という名前のJavaScriptファイルによって定義されます。 pipeline.js デフォルトでは。 ビルトイン init コマンドは、ソースとシンクを指定して、正しいディレクトリに基本的な構成ファイルを作成します。

スターターを初期化する pipeline.js ソースとしてMongoDB、シンクとしてElasticsearchを使用します。

  1. transporter init mongodb elasticsearch

次の出力が表示されます。

Output
Writing pipeline.js...

変更する必要はありません pipeline.js このステップでは、それがどのように機能するかを見てみましょう。

ファイルは次のようになりますが、コマンドを使用してファイルの内容を表示することもできます cat pipeline.js, less pipeline.js (出口 less を押すことによって q)、またはお気に入りのテキストエディタで開きます。

パイプライン.js
var source = mongodb({
  "uri": "${MONGODB_URI}"
  // "timeout": "30s",
  // "tail": false,
  // "ssl": false,
  // "cacerts": ["/path/to/cert.pem"],
  // "wc": 1,
  // "fsync": false,
  // "bulk": false,
  // "collection_filters": "{}",
  // "read_preference": "Primary"
})

var sink = elasticsearch({
  "uri": "${ELASTICSEARCH_URI}"
  // "timeout": "10s", // defaults to 30s
  // "aws_access_key": "ABCDEF", // used for signing requests to AWS Elasticsearch service
  // "aws_access_secret": "ABCDEF" // used for signing requests to AWS Elasticsearch service
  // "parent_id": "elastic_parent" // defaults to "elastic_parent" parent identifier for Elasticsearch
})

t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")

で始まる行 var sourcevar sink MongoDBアダプターとElasticsearchアダプターにそれぞれJavaScript変数を定義します。 を定義します MONGODB_URIELASTICSEARCH_URI これらのアダプターがこのステップの後半で必要とする環境変数。

で始まる行 // コメントです。 これらは、パイプラインに設定できるいくつかの一般的な構成オプションを強調していますが、ここで作成する基本的なパイプラインには使用していません。

最後の行は、ソースとシンクを接続します。 変数 transporter また t パイプラインにアクセスできます。 を使用します .Source().Save() 関数を使用してソースとシンクを追加します sourcesink 以前にファイルで定義された変数。

の3番目の引数 Source()Save() 関数は namespace. 通過 /.*/ 最後の引数は、MongoDBからすべてのデータを転送し、Elasticsearchの同じ名前空間に保存することを意味します。

このパイプラインを実行する前に、 MongoDBURIおよびElasticsearchURI環境変数を設定する必要があります。 使用している例では、両方ともデフォルト設定でローカルにホストされていますが、既存のMongoDBまたはElasticsearchインスタンスを使用している場合は、これらのオプションをカスタマイズしてください。

  1. export MONGODB_URI='mongodb://localhost/my_application'
  2. export ELASTICSEARCH_URI='http://localhost:9200/my_application'

これで、パイプラインを実行する準備が整いました。

  1. transporter run pipeline.js

次のように終了する出力が表示されます。

Output
. . . INFO[0001] metrics source records: 2 path=source ts=1522942118483391242 INFO[0001] metrics source/sink records: 2 path="source/sink" ts=1522942118483395960 INFO[0001] exit map[source:mongodb sink:elasticsearch] ts=1522942118483396878

最後から2番目と3番目の行で、この出力は、ソースに2つのレコードが存在し、2つのレコードがシンクに移動されたことを示しています。

両方のレコードが処理されたことを確認するために、Elasticsearchにクエリを実行して my_application データベース。これで存在するはずです。

  1. curl $ELASTICSEARCH_URI/_search?pretty=true

The ?pretty=true パラメータを使用すると、出力が読みやすくなります。

Output
{ "took" : 5, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "my_application", "_type" : "users", "_id" : "5ac63e9c6687d9f638ced4fe", "_score" : 1.0, "_source" : { "firstName" : "Gilly", "lastName" : "Glowfish" } }, { "_index" : "my_application", "_type" : "users", "_id" : "5ac63e986687d9f638ced4fd", "_score" : 1.0, "_source" : { "firstName" : "Sammy", "lastName" : "Shark" } } ] } }

MongoDBのデータベースとコレクションは、Elasticsearchのインデックスとタイプに類似しています。 それを念頭に置いて、次のことを確認してください。

  • The _index フィールドをに設定 my_application, 元のMongoDBデータベースの名前)。
  • The _type フィールドをに設定 users, MongoDBコレクションの名前。
  • The firstNamelastName それぞれ「サミー」「サメ」「ギリー」「グローフィッシュ」で埋められたフィールド。

これにより、MongoDBの両方のレコードがTransporterを介して正常に処理され、Elasticsearchに読み込まれたことが確認されます。 この基本的なパイプラインに基づいて構築するために、入力データを変換できる中間処理ステップを追加します。

ステップ4—トランスフォーマーを作成する

名前が示すように、トランスフォーマーは、ソースデータをシンクにロードする前に変更します。 たとえば、新しいフィールドを追加したり、フィールドを削除したり、フィールドのデータを変更したりできます。 Transporterには、事前定義されたトランスフォーマーとカスタムトランスポーターのサポートが付属しています。

通常、カスタムトランスフォーマーはJavaScript関数として記述され、別のファイルに保存されます。 それらを使用するには、トランスフォーマーファイルへの参照を追加します。 pipeline.js . Transporterには、OttoとGojaの両方のJavaScriptエンジンが含まれています。 Gojaは新しく、一般的に高速であるため、ここで使用します。 唯一の機能上の違いは構文です。

というファイルを作成します transform.js、これを使用して変換関数を記述します。

  1. nano transform.js

これが使用する関数です。これにより、という新しいフィールドが作成されます。 fullName、その値は firstNamelastName スペースで区切られた、連結されたフィールド( Sammy Shark).

transform.js
function transform(msg) {
	msg.data.fullName = msg.data.firstName + " " + msg.data.lastName;
	return msg
}

このファイルの行を見ていきましょう。

  • ファイルの最初の行、 function transform(msg), 関数定義です。
  • msg ソースドキュメントの詳細を含むJavaScriptオブジェクトです。 このオブジェクトを使用して、パイプラインを通過するデータアクセスします。
  • 関数の最初の行は2つの既存のフィールドを連結し、はその値を新しいフィールドに割り当てます fullName 分野。
  • 関数の最後の行は、新しく変更されたものを返します msg パイプラインの残りの部分が使用するオブジェクト。

ファイルを保存して閉じます。

次に、このトランスフォーマーを使用するようにパイプラインを変更する必要があります。 を開きます pipeline.js 編集用のファイル。

  1. nano pipeline.js

最後の行で、に呼び出しを追加する必要があります Transform() 呼び出し間のパイプラインにトランスフォーマーを追加する関数 Source()Save()、 このような:

〜/transporter/pipeline.js
. . .
t.Source("source", source, "/.*/")
.Transform(goja({"filename": "transform.js"}))
.Save("sink", sink, "/.*/")

に渡された引数 Transform() は変換のタイプであり、この場合はGojaです。 を使用して goja 関数では、相対パスを使用してトランスフォーマーのファイル名を指定します。

ファイルを保存して閉じます。 パイプラインを再実行してトランスフォーマーをテストする前に、Elasticsearchの既存のデータを前のテストからクリアしましょう。

  1. curl -XDELETE $ELASTICSEARCH_URI

コマンドの成功を確認するこの出力が表示されます。

Output
{"acknowledged":true}

次に、パイプラインを再実行します。

  1. transporter run pipeline.js

出力は前のテストと非常によく似ており、パイプラインが以前と同じように正常に完了したかどうかを最後の数行で確認できます。 確かに、Elasticsearchを再度チェックして、データが期待する形式で存在するかどうかを確認できます。

  1. curl $ELASTICSEARCH_URI/_search?pretty=true

あなたは見ることができます fullName 新しい出力のフィールド:

Output
{ "took" : 9, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "my_application", "_type" : "users", "_id" : "5ac63e9c6687d9f638ced4fe", "_score" : 1.0, "_source" : { "firstName" : "Gilly", "fullName" : "Gilly Glowfish", "lastName" : "Glowfish" } }, { "_index" : "my_application", "_type" : "users", "_id" : "5ac63e986687d9f638ced4fd", "_score" : 1.0, "_source" : { "firstName" : "Sammy", "fullName" : "Sammy Shark", "lastName" : "Shark" } } ] } }

に注意してください fullName 値が正しく設定されたフィールドが両方のドキュメントに追加されました。 これで、トランスポーターパイプラインにカスタム変換を追加する方法がわかりました。

結論

MongoDBからElasticsearchにデータをコピーおよび変更するためのトランスフォーマーを備えた基本的なTransporterパイプラインを構築しました。 同じ方法でより複雑な変換を適用したり、同じパイプラインで複数の変換をチェーンしたりすることができます。 MongoDBとElasticsearchは、Transporterがサポートするアダプターの2つにすぎません。 また、フラットファイル、PostgresなどのSQLデータベース、およびその他の多くのデータソースもサポートしています。

GitHubTransporterプロジェクトをチェックして、APIの最新の変更を更新し、 Transporter wiki にアクセスして、アダプター、トランスフォーマー、およびトランスフォーマーの他の機能。