Embulkを利用したデータ転送基盤の構築

f:id:vasilyjp:20180927090637j:plain

こんにちは。バックエンドエンジニアインターンの田島です。

VASILYでは分析にBigQueryを使用しており、MySQLのデータを毎日BigQueryに同期しています。この同期処理を行うシステムは、約2年前にRubyで書かれたもので、プロダクトの成長に伴うデータ量の増加に耐えることができなくなり始めていました。そのため、同期処理を行うシステムを一から作り直しましたので、その詳細についてご紹介します。

弊社DEVELOPERS BLOGでは以前、『インターン生がデータ転送基盤を一から設計する、VASILYバックエンドインターンの紹介』としてシステムの概要・開発の流れをご紹介しましたが、今回はシステムの詳細についてご紹介します。

f:id:vasilyjp:20170626143637j:plain (photoクレジット *1

データ同期ツールの紹介

新たなデータ同期システムとして、次のように利用する社内ツールを作成しました。 以下のような環境変数を設定し、コマンドを実行するだけで、指定したデータベースの全てのテーブルがMySQLからBigQueryに同期されます。

export DB_HOST=DBホスト名
export DB_USER=DBユーザー名
export DB_PASS=DBパスワード
export DB_DATABASE=DBデータベース名
export GCP_KEY=GCPのkeyfileへのパス
$ mysql_to_bigquery

また、以下のように引数を渡すことで、同期したいテーブルだけを指定することも可能です。

$ mysql_to_bigquery --tables tablename1,tablename2

システム構成

構成は以下のようになっています。

f:id:vasilyjp:20170626143659p:plain (ファイル・YAMLファイル・ほうれん草の画像クレジット *2

実装詳細

Embulkについて

今回、データ同期処理にEmbulkを利用しました。Embulkとは、ログ収集でよく利用されるfluentdのバッチ版のようなツールです。データベースやストレージからデータを吸い出し、別のデータベースやストレージにロードできます。YAML形式で設定ファイルを書くことで、その設定を元にEmbulkがデータを同期します。

Embulkプラグイン

Embulkではプラグインが採用されており、今回の同期のシステムでは以下の2種類のプラグインを利用しています。

  1. embulk-input-mysql
  2. embulk-output-bigquery

embulk-input-mysql

embulk-input-mysqlはembulk-input-jdbcの1つです。embulk-input-mysqlを利用することで、MySQLからのデータの吸い出しが可能です。以下のように設定ファイルに記述することで、テーブルごとに出力することができます。optionsを設定することで、jdbcオプションを渡すことができます。

in:
  type: mysql
  host: ホスト名
  user: ユーザー名
  password: パスワード
  database: 対象のデータベース
  table: 対象のテーブル
  select: "*"
  options: {useLegacyDatetimeCode: false, serverTimezone: Asia/Tokyo}

今回利用したオプションの

{useLegacyDatetimeCode: false, serverTimezone: Asia/Tokyo}

はTime Zoneに関するオプションです。今回のシステムでは、Embulkが動作するサーバーのTime ZoneがAsia/Tokyoとなっています。上記オプションを付けない場合、EmbulkがMySQLのTime ZoneをUTCであると判断してしまい、9時間ずれた状態でTimeStamp型のデータを取得してしまいます。 このオプションについては、以下の記事の中で教えて頂きました。Embulkコミュニティは親切な人が多いのが特徴です。

embulk-input-jdbcのMySQLプラグインで9時間時間がずれる

embulk-output-bigquery

embulk-output-bigqueryを利用することで、BigQueryへのデータ出力が可能です。以下のように設定ファイルを記述することで、任意の場所に吸い出したデータを格納できます。以下のようにgcs_bucketを設定することで、吸い出したデータを一度GCSに格納します。これにより、BigQueryへのinsert処理の高速化とjob数の節約ができます。

out:
  type: bigquery
  mode: replace
  auth_method: json_key
  json_keyfile: json_keyfileへのパス
  project: プロジェクト名
  dataset: データセット名
  table: テーブル名
  gcs_bucket: GCSのバケット名
  auto_create_gcs_bucket: false
  compression: GZIP
  source_format: NEWLINE_DELIMITED_JSON
  default_timezone: "Asia/Tokyo"

Goラッパー

今回のデータ同期システムを実装する上で、Embulkだけでは実現できないことがありました。そこでGoラッパーを作成しました。Goラッパーを作成する主な目的として以下が挙げられます。

Goラッパーを作成する目的

  • 設定ファイル生成とEmbulkの実行
  • logの管理と通知

設定ファイル生成

Goラッパーを作成する1つ目の目的として設定ファイルの生成が挙げられます。Embulkでは1つのテーブルにつき1つの設定ファイルを作成する必要があります。同期するべきテーブルが複数あるので、全ての設定ファイルを手で作成することは現実的ではありません。そこで、Goラッパーによる設定ファイルの自動生成を行うことにしました。

設定ファイル生成と実行の流れ

  1. MySQLからスキーマの情報を取得
  2. 得られた情報から設定ファイルを生成
  3. 各設定ファイルを元に、Embulkを実行

MySQLからスキーマの情報を取得

複数のテーブル同期するために、最初にMySQLのスキーマ情報を取得する必要があります。そこで、

SELECT TABLE_NAME, COLUMN_NAME, COLUMN_TYPE 
FROM INFORMATION_SCHEMA.COLUMNS 
WHERE TABLE_SCHEMA = (SELECT database())

というSQLを発行し、テーブル名、カラム名、カラム型を取得します。この情報を利用し、

{name: '', colomns: [{name: '', type: ''}]}

というデータ構造を作成し保持します。

得られた情報からYAMLファイルを生成

Embulkの設定ファイルにはテンプレートエンジンの"Liquid"がデフォルトで利用できるようになっています。しかし、今回はGoとの相性を考え、Goのテンプレートエンジンである"pongo2"を利用しました。"pongo2"は"jinja"ライクなテンプレートエンジンで以下のように利用しています。

in:
  type: mysql
  host: {{ host }}
  user: {{ user }}
  password: {{ password }}
  database: {{ database }}
  table: {{ table }}
  select: "*"
  options: {useLegacyDatetimeCode: false, serverTimezone: Asia/Tokyo}

out:
  type: bigquery
  mode: replace
  auth_method: json_key
  json_keyfile: json_keyfile_path
  project: project_name
  dataset: dataset_name
  table: {{ table }}
  gcs_bucket: gcs_bucket_name
  auto_create_gcs_bucket: false
  compression: GZIP
  source_format: NEWLINE_DELIMITED_JSON
  default_timezone: "Asia/Tokyo"

上記のテンプレートにMySQLから取得したスキーマ情報をそれぞれ流します。これにより得られた設定をtable_name.ymlとして保存することで、各テーブルの同期設定ファイルを作成します。

各YAMLファイルを元に、Embulkを実行

最後に、作成された設定ファイルを元にEmbulkを実行することで、テーブルが同期されます。

ログ収集と通知

Goラッパーの2つ目の目的としてはログの収集と、通知処理が挙げられます。Embulkだけでは、柔軟なログ収集とエラー発生時の通知ができないので、Go側で管理する必要があります。 弊社ではエラーの収集にhorensoを利用しています。horensoについては弊社DEVELOPERS BLOG『horensoで作るモダンなcronスクリプト監視環境』で紹介しています。horensoに対して適切にエラー発生を通知するには、stderrにログを出力する必要があります。Embulkではエラー発生時のログも合わせて、全てstdoutに出力されるようになっています。その為、Go側でEmbulkの終了ステータスを利用し、適切にログの出力先を変える必要があります。

まとめ

Embulkを利用することで同期処理が安定し、プロダクトの成長を支えるデータ同期基盤を構築することに成功しました。また、Goラッパー作成により、運用の効率化に成功しました。Embulkはまだまだ発展中のプロジェクトでプラグインも増え続けています。Embulkのコミュニティは親切な人が多いので、困ったことがあっても素早く解決することができます。ぜひ、Embulkを利用してみてください。

最後に

このシステムは、短期インターン時に一から作ったものです。VASILYのインターンでは、このような実践的・挑戦的な課題に挑むことができます。 サマーインターンへのご応募をお待ちしております!!

https://www.wantedly.com/projects/103184www.wantedly.com

*1 :
photo:Christopher Michel
*2:
File icon: Icons made by Freepik from www.flaticon.com is licensed by Flaticon Basic License
YML icon: Icons made by Yannick from www.flaticon.com is licensed by CC 3.0 BY
Salad icon: Icons made by Madebyoliver from www.flaticon.com is licensed by Flaticon Basic License

カテゴリー