はじめまして! ZOZOテクノロジーズ開発部の平田(@TrsNium)と申します。 業務ではデータ基盤の開発・運用を行っています。 よろしくお願いいたします。
今回複数のツールが混在していたデータ基盤を「Digdag・Embulk」に統一したので、その取り組みを紹介します。
概要
弊社のデータ基盤は注文情報や顧客情報などをSQL Serverから取得しBigQueryに転送しています。 以前のデータ基盤では「Talend」と「Embulk・Digdag」でデータの収集と転送をしていました。
Talendは、タスクのスケジューリングとデータ転送を行うツールです。 Digdagはタスクのスケジューリングをするツールで、Embulkはデータを転送を実行するツールです。 「Talend」と「Digdag・Embulk」は別々のチームが管理・運用をしており、運用負荷が高いという問題がありました。
そこでデータの収集と転送をするツールを、「Digdag・Embulk」に統一することで運用負荷を下げました。
はじめに、移行前後のデータ基盤について紹介します。
以前のデータ基盤
以下の図が移行前のデータ基盤です。
TalendがSQL Serverからデータを収集し、S3にデータを配置します。 その後、DigdagとEmbulkがS3に配置されたデータをBigQueryに転送します。 なお、途中にS3を介しているのはDWHとしてRedshiftを使用していた時代の名残です。 現在はDWHとしてBigQueryを使用しているので、S3が存在する必然性はありません。
移行後のデータ基盤
以下の図が移行後のデータ基盤です。
SQL ServerからS3、S3からBigQueryまでの転送をDigdagとEmbulkで行います。
ツールの紹介
Talendとは
ここで、Talendが何かご存じでない方もいると思うので、紹介させていただきます。 TalendとはETLツール兼ワークフローエンジンです。 ETLとはあるシステムからデータの抽出・加工をし、別のシステムにエクスポートする工程のことです。 またTalendには以下の様な特徴があります。
- ビジュアルスクリプトにより、プログラミングの知識がなくとも直感的にデータ転送をできる
- 多種多様なDBやクラウドストレージ、csvなどのファイルフォーマットをサポートしている
- データ加工処理をJavaで拡張可能
ビジュアルスクリプトにより、プログラミングの知識がなくとも直感的にデータ転送をできる
Talendは主にビジュアルスクリプトで操作を行うことができます。 ビジュアルスクリプトは、エンジニア以外の人にも直感的で分かり易いマウスのドロップ&ドラッグで操作ができます。
多種多様なDBやクラウドストレージ、csvなどのファイルフォーマットをサポートしている
Talendは主要なクラウドストレージ(S3, GCS, Azure Data Lake Storeなど)に対応しています。 それに加え、データフォーマットも多岐にわたり対応しているため、クラウドストレージにあるデータのフォーマットを気にせず利用できます。 そして様々なデータベース(MySQL, SQL Server, Netezzaなど)をサポートしています。 これらを組み合わせることで、データ転送や、データベース・クラウドストレージへのデータの参照と書き込みができます。
データ加工処理をJavaで拡張可能
Talendではデータ転送だけではなく、ルーチンと呼ばれるデータを加工する機能があります。
例えば日付型の値に対しては、TalendDate.diffDate
で2つの日付データから時間の差を求められます。
多くのルーチンがデフォルトで提供されているため、様々なデータの加工ができます。
またルーチンはユーザーが独自定義できるようになっています。
Javaでルーチンを拡張できるため、より柔軟にデータの加工ができます。
先の通り、拡張性・柔軟性・敷居の低さからTalendが利用されていました。
Digdag・Embulkとは
DigdagとEmbulkの紹介をさせていただきます。 Digdagとはオープンソースのワークフローエンジンです。 複数のタスクをワークフローとして定義し、Digdagがワークフローを実行します。
Digdagには以下の様な特徴があります。
- YAMLフォーマットでワークフローを定義する
- タスクに依存関係を持たせることができる
- タスクの実行環境が柔軟である
DigdagはYAMLフォーマットでワークフローを記述ができるため、Git上でDigDagのワークフローを管理できます。 タスクは依存関係を持つことができ「データの加工→データの集計→通知」の様なワークフローを定義をできます。 この機能により、どの様なタスクでもワークフローとして定義できます。 またDigdagのタスク実行環境は自由に選択をできます。 これはタスクをDockerコンテナ内で行うことができるためです。
Embulkはバルクデータをバッチ処理するオープンソースのバルクデータローダーです。 Embulkはプラグインベースのバルクローダーであり、以下の様な特徴があります。
- バッチ処理をYAMLフォーマットで定義をできる
- 様々なプラグインがオープンソースとして公開されている
EmbulkはYAMLフォーマットでバッチ処理を定義できるため、Git上でワークフローの管理をできます。 また多くのプラグインがオープンソースとして公開されています。 プラグインは「input」「filter」「output」「parser」「formatter」などの複数種類があります。 これらのプラグインを組み合わせることで、多様なのデータソースからデータを読み込み、加工し、ターゲットソースに書き込むことができます。 更にEmbulkはデータを並列処理をするため、バッチの実行時間を短くできます。
弊社にはembulk-output-bigqueryやembulk-filter-column、embulk-filter-timestamp_formatなどの作者でもある@sonots氏が所属しています。 Embulkに詳しいエンジニアが在籍していることで、Embulkに関する問題解決速度が速くなりました。
Talendの運用上の問題点
Talendを使ってきましたが運用する上で問題がありました。 実際に起きた問題は以下の通りです。
- 無償版のTalendはビジュアルスクリプトで書かれている部分をGitで管理できない
- CUI操作を行うことができない
Talendには製品版のTalend PlatformとTalend Enterprise、無償版のTalend Open Studioがあります。 弊社で運用されていたTalendは無償版のTalend Open Studioでした。 無償版のTalendでは、開発管理の機能やジョブ実行機能などに制限があります。 そのうちの一つにプロジェクトの管理をGitで行うことができません。 Git上でプロジェクト管理ができないために「どのETLを変更したのかの把握」や「機能の追加・変更による障害の対応」などの難しさがありました。 また、GUI操作で基本操作をするため、反復的な操作を行う業務が困難でした。
この様なことを踏まえて他のツールへ置き換えることを検討しました。 EmbulkとDigdagでは以上の問題を解決できることや、Talendと同様にデータを処理できる点から選択しました。 またチーム内で、で複数の運用実績があるのも選択の決め手となりました。
Digdag・Embulkへの移行
Digdag・Embulkへの移行手順について紹介します。 Talendのリプレイスは以下の様な手順で行いました。
- TalendのETLを把握する
- Talendと同様にEmbulkでデータを転送する
- Digdagでデータ転送のタスクをスケジューリングする
- Talendと並走してDigdagとEmbulkを実行する
- 移行前後のデータの差分を確認する
- DigdagとEmbulkへ移行する
1. TalendのETLを把握する
TalendのETLを把握するため「Talendが参照しているテーブル」「どの様なルーチンを使用しているのか」「データ転送スケジュール」を調べました。 参照しているテーブルは200程度でした。 ルーチンは14種類あり「対象のカラムをコピーし別のカラムへ移す」「データの秘匿化」「文字列の正規化」「日付のフォーマットを変更する」などがありました。 またS3へのデータ転送は、1日に1回AM4:00〜AM7:30の間に終えられるようにスケジューリングされていました。
Embulkでは、Talendで行っていた14のルーチンと同様な処理をしなければなりません。 Digdagでは、Talendと同様にAM4:00〜AM7:30の間にデータを転送できるようにスケジューリングする必要がありました。
2. Talendと同様にEmbulkでデータを転送できるようにする
データ転送ではEmbulkを使用します。 EmbulkでSQL Serverのデータを参照するには、embulk-input-sqlserverプラグインを使用しました。 ルーチンで行われている処理は、embulk-filter-ruby_procプラグインで代替えしました。 embulk-filter-ruby_procでルーチンと同じ処理ができているかは、手順5の方法で確認をしました。
またEmbulkの設定ファイルはYAMLで記述されます。 約200のテーブルを手書きでEmbulkのYAMLフォーマットに変換するのは、現実的ではないと考えました。 そこでTalendのプロジェクトファイル中にある、マークアップ言語で記述されているETLの情報を利用しました。 TalendのETLの情報をパースするスクリプトを書き、EmbulkのYAMLをしました。 副次的なメリットとしてデータを参照するためのSQLや、データ加工の対象となるカラム名のtypoが発生しませんでした。
3. Digdagでデータ転送のタスクをスケジューリングする
Digdagは、Embulkが行うデータ転送をスケジューリングするために使用します。 Digdagでは繰り返し処理を並列に実行をできます。 繰り返し処理は複数のテーブルを転送する際に利用されています。 しかし、並列にテーブルのデータ転送を行う処理を行うことは、SQL ServerのDBリソースを占有してしまう恐れがあります。 そこで以下の様なことに気をつけました。
- データベースの許可されている同時接続数や計算リソースを使い切らないようにする
- 同時に並列で処理する各テーブルのデータサイズが均一になるようにグルーピングすることで、従来よりデータの転送所要時間を少なくする
SQL Serverの同時接続数を超えないように処理するために、以下の図のようにスケジューリングしました。
上の図では、複数のデータ転送を纏めて幾つかのグループにしています。 グループ内のデータ転送は並列に処理されますが、グループ自体は逐次的に処理されます。 つまり、同時にデータベースへアクセスするEmbulkプロセス数の上限は、1グループあたりのタスク数に制限されます。 グループ内の数を減らすことによりデータベースへの負荷を軽減ができますが、同時に全体を通したデータ転送の時間は長くなるトレードオフの関係になります。
更に最適化されていないスケジューリングでは以下の図の様に、データ転送の所要時間が大きくなります。
この図のテーブルサイズは、各テーブルのデータサイズを表しています。 Costは全テーブルの転送にかかる時間を示しています。 1グループの転送時間はグループ内の最大転送時間に依存するため、Costは並列で転送されるグループの最大テーブルサイズに依存します。 つまり、全体のグループ内のテーブルサイズを均一にすることでCostの最適化ができます。 しかし上の図では、グループ内のテーブルサイズが均一になってないため最適化されていません。 そこで、テーブルサイズで全体をソートすることで、Costを以下の図のように最適化しました。
テーブルサイズでソートすることによりCostが最適化されました。 これによりデータベースの負荷を制限しつつ、転送時間を短くできました。
4. Talendと並走してDigdagとEmbulkを走らせる
次に、Digdagサーバーにワークフローを登録します。 ワークフローは以下の図のようになっています。
以上の図では「Talend」と「Digdag・Embulk」はデータの干渉をしないために別々のバケットとデータセットを利用しています。 またDigdagとEmbulkを動作させる際には以下のことを気をつけました。
- SQL Serverから読み込んだデータをS3にデータを転送できること
- S3からBigQueryにデータを転送できること
SQL ServerからS3にデータを転送できない場合には、YAMLのフォーマットエラーやembulk-filter-ruby_procで使用するRubyコードが原因でした。
例えば、S3からBigQueryでデータの転送できないケースとして、日付型のデータフォーマットの違いによりデータを転送できないことがありました。
Talendでは2019/04/01 12:00:00
の様にフォーマットされているのに対し、Embulkでは2019/04/01 12:00:00.000
の様にデータを送っていました。
Talendの2019/04/01 12:00:00
の様なフォーマットで送られることが期待されているので、Embulkで送ったデータは日付をうまく読み取れず転送に失敗します。
これらの問題がないことを確認することで、正常にDigdag・Embulkが動作していること、Talendとのデータフォーマットに違いがないことを把握できます。
5. 移行前後のデータの差分を確認する
データに差分がある場合、分析で行なっている集計処理が正しく行われないため、差分がないことを確認します。 データの差分の確認はBigQuery上で行いました。
BigQuery上でデータの差分の確認を行うのは、以下の様なメリットがあります。
- 差分があった際に、どのカラムから差分が発生しているかの確認が容易
- 複数の差分を同時に求めることができる
当初は、データの差分をS3上にPutされているデータをもとにPythonスクリプトで求めていました。 しかし、データサイズの大きなテーブルでは差分を求めるのに時間がかかることや、一度に大量のテーブルの差分を求めることができませんでした。 そこで、BigQueryへ複数リクエストすることで、一度に複数テーブルの差分結果を短時間で取得をできました。
BigQueryでは、以下のクエリで差分結果を取得しました。
(select * from `production.Order` except distinct select * from `test.Order`) union all (select * from `test.Order` except distinct select * from `production.Order`)
production.OrderはTalendで転送しているデータセットのテーブルで、test.OrderがEmbulkで転送しているデータセットのテーブルです。
except演算子では、左側の入力クエリに存在し、右側の入力クエリには存在しない行を返します。
例えばselect * from `production.Order` except distinct select * from `test.Order`
をベン図で表すと以下の様になります。
以上の図の斜線部ではproduction.Orderにはあるが、test.Orderにはないレコードを表しています。 全体の差分を求めるには上記クエリのExcept前後のテーブルを入れ替え、unionで両方の差分を結合し求めることができます。
6. TalendのジョブをDigdagとEmbulkに置き換える
TalendをDigdagとEmbulkに移行する際の手順を紹介します。 移行手順は以下の2つです。
- Talendの設定を変更して、移行前ジョブを無効化する
- EmbulkのS3へ転送するBucketを変更する
以上の手順は以下の図の様なイメージです。
以上の図ではTalendの設定を無効化した後に、Embulkのデータ転送の向き先をTalendが転送していたProductionバケットに向けています。
また、安全に移行するために、1日で移行するのではなく複数日に分けました。 なぜなら、移行を全て同日にするのはリスクがあるからです。 例えば、1日で移行する際に複数の折り重なった障害が起きた場合、本番環境に与える障害の影響が大きくなります。 そこで、同日にすべての転送するのをやめ、複数日に分けることでリスクを分配しました。
統一したことによるメリット
EmbulkとDigdagに統一することで、以下の様なメリットがありました。
- Gitを用いてDigdagとEmbulkの設定が管理できるようになった
- 時間に余裕を持ってデータ転送を終了できるようになった
Git上でDigdagとEmbulkの設定を管理できるようになった
DigdagとEmbulkに置き換わったことにより、スケジューリングやデータ転送の設定をGit上で管理できるようになりました。 Git上で管理できるようになったことで、複数人に情報共有をしやすくなりました。 またGitのリポジトリ運用はGitHubFlowのため、レビュー段階でミスや改善点に気付けます。
時間に余裕を持ってデータ転送を終了できるようになった
EmbulkとDigdagに置き換えることで余裕を持って、バッチを終了できるようになりました。 TalendがAM4:00 ~ AM7:30の間にデータ転送を終えていたので、それと同様にできる必要がありました。 結果的にデータ転送を1時間30分程度で終了できるようになり、データ転送に余裕を持つことができました。 これはEmbulkとDigdagがデータ転送を並列処理できるからです。
次のデータ基盤のステージ
今回データ基盤のツールの統一をしました。 しかし、データフローの最適化がまだなされていません。 現在のデータ基盤のデータフローを最適化した図は以下のようになります。
以上の図ではデータフローを最適化することで、SQL ServerからS3を経由するフローがなくなりました。 データフローの最適化を行うことでデータフロー全体の転送時間を短くでき、ワークフローの数が減るためDigdagとEmbulkの設定ファイルを少なくできます。 S3を経由するのを止め直接BigQueryにデータを転送すれば、全体のデータ転送時間は1〜2時間程度短くできます。 そしてEmbulkとDigdagの設定ファイルを少なくできるため、新規テーブルの転送設定を行う手間を軽減できます。
今回は紹介ができませんが、データベースのレプリケーションなどまだまだ改善の余地があります。
まとめ
本記事では、「Talend」を「Digdag・Embulk」に移行したことについて紹介しました。 データ基盤の構築や、リプレイスをする際の参考になれば幸いです。
ZOZOテクノロジーズでは、一緒にサービスを作り上げてくれる方を募集中です。ご興味のある方は、以下のリンクからぜひご応募ください。