TalendをDigdagとEmbulkに移行した

f:id:vasilyjp:20190319014359p:plain

はじめまして! ZOZOテクノロジーズ開発部の平田(@TrsNium)と申します。 業務ではデータ基盤の開発・運用を行っています。 よろしくお願いいたします。

今回複数のツールが混在していたデータ基盤を「Digdag・Embulk」に統一したので、その取り組みを紹介します。

概要

弊社のデータ基盤は注文情報や顧客情報などをSQL Serverから取得しBigQueryに転送しています。 以前のデータ基盤では「Talend」と「Embulk・Digdag」でデータの収集と転送をしていました。

Talendは、タスクのスケジューリングとデータ転送を行うツールです。 Digdagはタスクのスケジューリングをするツールで、Embulkはデータを転送を実行するツールです。 「Talend」と「Digdag・Embulk」は別々のチームが管理・運用をしており、運用負荷が高いという問題がありました。

そこでデータの収集と転送をするツールを、「Digdag・Embulk」に統一することで運用負荷を下げました。

はじめに、移行前後のデータ基盤について紹介します。

以前のデータ基盤

以下の図が移行前のデータ基盤です。

f:id:vasilyjp:20190315165559p:plain

TalendがSQL Serverからデータを収集し、S3にデータを配置します。 その後、DigdagとEmbulkがS3に配置されたデータをBigQueryに転送します。 なお、途中にS3を介しているのはDWHとしてRedshiftを使用していた時代の名残です。 現在はDWHとしてBigQueryを使用しているので、S3が存在する必然性はありません。

移行後のデータ基盤

以下の図が移行後のデータ基盤です。

f:id:vasilyjp:20190315165628p:plain

SQL ServerからS3、S3からBigQueryまでの転送をDigdagとEmbulkで行います。

ツールの紹介

Talendとは

ここで、Talendが何かご存じでない方もいると思うので、紹介させていただきます。 TalendとはETLツール兼ワークフローエンジンです。 ETLとはあるシステムからデータの抽出・加工をし、別のシステムにエクスポートする工程のことです。 またTalendには以下の様な特徴があります。

  • ビジュアルスクリプトにより、プログラミングの知識がなくとも直感的にデータ転送をできる
  • 多種多様なDBやクラウドストレージ、csvなどのファイルフォーマットをサポートしている
  • データ加工処理をJavaで拡張可能

ビジュアルスクリプトにより、プログラミングの知識がなくとも直感的にデータ転送をできる

Talendは主にビジュアルスクリプトで操作を行うことができます。 ビジュアルスクリプトは、エンジニア以外の人にも直感的で分かり易いマウスのドロップ&ドラッグで操作ができます。

多種多様なDBやクラウドストレージ、csvなどのファイルフォーマットをサポートしている

Talendは主要なクラウドストレージ(S3, GCS, Azure Data Lake Storeなど)に対応しています。 それに加え、データフォーマットも多岐にわたり対応しているため、クラウドストレージにあるデータのフォーマットを気にせず利用できます。 そして様々なデータベース(MySQL, SQL Server, Netezzaなど)をサポートしています。 これらを組み合わせることで、データ転送や、データベース・クラウドストレージへのデータの参照と書き込みができます。

データ加工処理をJavaで拡張可能

Talendではデータ転送だけではなく、ルーチンと呼ばれるデータを加工する機能があります。 例えば日付型の値に対しては、TalendDate.diffDateで2つの日付データから時間の差を求められます。 多くのルーチンがデフォルトで提供されているため、様々なデータの加工ができます。 またルーチンはユーザーが独自定義できるようになっています。 Javaでルーチンを拡張できるため、より柔軟にデータの加工ができます。

先の通り、拡張性・柔軟性・敷居の低さからTalendが利用されていました。

Digdag・Embulkとは

DigdagとEmbulkの紹介をさせていただきます。 Digdagとはオープンソースのワークフローエンジンです。 複数のタスクをワークフローとして定義し、Digdagがワークフローを実行します。

Digdagには以下の様な特徴があります。

  • YAMLフォーマットでワークフローを定義する
  • タスクに依存関係を持たせることができる
  • タスクの実行環境が柔軟である

DigdagはYAMLフォーマットでワークフローを記述ができるため、Git上でDigDagのワークフローを管理できます。 タスクは依存関係を持つことができ「データの加工→データの集計→通知」の様なワークフローを定義をできます。 この機能により、どの様なタスクでもワークフローとして定義できます。 またDigdagのタスク実行環境は自由に選択をできます。 これはタスクをDockerコンテナ内で行うことができるためです。

Embulkはバルクデータをバッチ処理するオープンソースのバルクデータローダーです。 Embulkはプラグインベースのバルクローダーであり、以下の様な特徴があります。

  • バッチ処理をYAMLフォーマットで定義をできる
  • 様々なプラグインがオープンソースとして公開されている

EmbulkはYAMLフォーマットでバッチ処理を定義できるため、Git上でワークフローの管理をできます。 また多くのプラグインがオープンソースとして公開されています。 プラグインは「input」「filter」「output」「parser」「formatter」などの複数種類があります。 これらのプラグインを組み合わせることで、多様なのデータソースからデータを読み込み、加工し、ターゲットソースに書き込むことができます。 更にEmbulkはデータを並列処理をするため、バッチの実行時間を短くできます。

弊社にはembulk-output-bigqueryembulk-filter-columnembulk-filter-timestamp_formatなどの作者でもある@sonots氏が所属しています。 Embulkに詳しいエンジニアが在籍していることで、Embulkに関する問題解決速度が速くなりました。

Talendの運用上の問題点

Talendを使ってきましたが運用する上で問題がありました。 実際に起きた問題は以下の通りです。

  • 無償版のTalendはビジュアルスクリプトで書かれている部分をGitで管理できない
  • CUI操作を行うことができない

Talendには製品版のTalend PlatformとTalend Enterprise、無償版のTalend Open Studioがあります。 弊社で運用されていたTalendは無償版のTalend Open Studioでした。 無償版のTalendでは、開発管理の機能やジョブ実行機能などに制限があります。 そのうちの一つにプロジェクトの管理をGitで行うことができません。 Git上でプロジェクト管理ができないために「どのETLを変更したのかの把握」や「機能の追加・変更による障害の対応」などの難しさがありました。 また、GUI操作で基本操作をするため、反復的な操作を行う業務が困難でした。

この様なことを踏まえて他のツールへ置き換えることを検討しました。 EmbulkとDigdagでは以上の問題を解決できることや、Talendと同様にデータを処理できる点から選択しました。 またチーム内で、で複数の運用実績があるのも選択の決め手となりました。

Digdag・Embulkへの移行

Digdag・Embulkへの移行手順について紹介します。 Talendのリプレイスは以下の様な手順で行いました。

  1. TalendのETLを把握する
  2. Talendと同様にEmbulkでデータを転送する
  3. Digdagでデータ転送のタスクをスケジューリングする
  4. Talendと並走してDigdagとEmbulkを実行する
  5. 移行前後のデータの差分を確認する
  6. DigdagとEmbulkへ移行する

1. TalendのETLを把握する

TalendのETLを把握するため「Talendが参照しているテーブル」「どの様なルーチンを使用しているのか」「データ転送スケジュール」を調べました。 参照しているテーブルは200程度でした。 ルーチンは14種類あり「対象のカラムをコピーし別のカラムへ移す」「データの秘匿化」「文字列の正規化」「日付のフォーマットを変更する」などがありました。 またS3へのデータ転送は、1日に1回AM4:00〜AM7:30の間に終えられるようにスケジューリングされていました。

Embulkでは、Talendで行っていた14のルーチンと同様な処理をしなければなりません。 Digdagでは、Talendと同様にAM4:00〜AM7:30の間にデータを転送できるようにスケジューリングする必要がありました。

2. Talendと同様にEmbulkでデータを転送できるようにする

データ転送ではEmbulkを使用します。 EmbulkでSQL Serverのデータを参照するには、embulk-input-sqlserverプラグインを使用しました。 ルーチンで行われている処理は、embulk-filter-ruby_procプラグインで代替えしました。 embulk-filter-ruby_procでルーチンと同じ処理ができているかは、手順5の方法で確認をしました。

またEmbulkの設定ファイルはYAMLで記述されます。 約200のテーブルを手書きでEmbulkのYAMLフォーマットに変換するのは、現実的ではないと考えました。 そこでTalendのプロジェクトファイル中にある、マークアップ言語で記述されているETLの情報を利用しました。 TalendのETLの情報をパースするスクリプトを書き、EmbulkのYAMLをしました。 副次的なメリットとしてデータを参照するためのSQLや、データ加工の対象となるカラム名のtypoが発生しませんでした。

3. Digdagでデータ転送のタスクをスケジューリングする

Digdagは、Embulkが行うデータ転送をスケジューリングするために使用します。 Digdagでは繰り返し処理を並列に実行をできます。 繰り返し処理は複数のテーブルを転送する際に利用されています。 しかし、並列にテーブルのデータ転送を行う処理を行うことは、SQL ServerのDBリソースを占有してしまう恐れがあります。 そこで以下の様なことに気をつけました。

  • データベースの許可されている同時接続数や計算リソースを使い切らないようにする
  • 同時に並列で処理する各テーブルのデータサイズが均一になるようにグルーピングすることで、従来よりデータの転送所要時間を少なくする

SQL Serverの同時接続数を超えないように処理するために、以下の図のようにスケジューリングしました。

f:id:vasilyjp:20190315130042p:plain

上の図では、複数のデータ転送を纏めて幾つかのグループにしています。 グループ内のデータ転送は並列に処理されますが、グループ自体は逐次的に処理されます。 つまり、同時にデータベースへアクセスするEmbulkプロセス数の上限は、1グループあたりのタスク数に制限されます。 グループ内の数を減らすことによりデータベースへの負荷を軽減ができますが、同時に全体を通したデータ転送の時間は長くなるトレードオフの関係になります。

更に最適化されていないスケジューリングでは以下の図の様に、データ転送の所要時間が大きくなります。

f:id:vasilyjp:20190315130057p:plain

この図のテーブルサイズは、各テーブルのデータサイズを表しています。 Costは全テーブルの転送にかかる時間を示しています。 1グループの転送時間はグループ内の最大転送時間に依存するため、Costは並列で転送されるグループの最大テーブルサイズに依存します。 つまり、全体のグループ内のテーブルサイズを均一にすることでCostの最適化ができます。 しかし上の図では、グループ内のテーブルサイズが均一になってないため最適化されていません。 そこで、テーブルサイズで全体をソートすることで、Costを以下の図のように最適化しました。

f:id:vasilyjp:20190315130115p:plain

テーブルサイズでソートすることによりCostが最適化されました。 これによりデータベースの負荷を制限しつつ、転送時間を短くできました。

4. Talendと並走してDigdagとEmbulkを走らせる

次に、Digdagサーバーにワークフローを登録します。 ワークフローは以下の図のようになっています。

f:id:vasilyjp:20190315130129p:plain

以上の図では「Talend」と「Digdag・Embulk」はデータの干渉をしないために別々のバケットとデータセットを利用しています。 またDigdagとEmbulkを動作させる際には以下のことを気をつけました。

  • SQL Serverから読み込んだデータをS3にデータを転送できること
  • S3からBigQueryにデータを転送できること

SQL ServerからS3にデータを転送できない場合には、YAMLのフォーマットエラーやembulk-filter-ruby_procで使用するRubyコードが原因でした。 例えば、S3からBigQueryでデータの転送できないケースとして、日付型のデータフォーマットの違いによりデータを転送できないことがありました。 Talendでは2019/04/01 12:00:00の様にフォーマットされているのに対し、Embulkでは2019/04/01 12:00:00.000の様にデータを送っていました。 Talendの2019/04/01 12:00:00の様なフォーマットで送られることが期待されているので、Embulkで送ったデータは日付をうまく読み取れず転送に失敗します。

これらの問題がないことを確認することで、正常にDigdag・Embulkが動作していること、Talendとのデータフォーマットに違いがないことを把握できます。

5. 移行前後のデータの差分を確認する

データに差分がある場合、分析で行なっている集計処理が正しく行われないため、差分がないことを確認します。 データの差分の確認はBigQuery上で行いました。

f:id:vasilyjp:20190315130144p:plain

BigQuery上でデータの差分の確認を行うのは、以下の様なメリットがあります。

  • 差分があった際に、どのカラムから差分が発生しているかの確認が容易
  • 複数の差分を同時に求めることができる

当初は、データの差分をS3上にPutされているデータをもとにPythonスクリプトで求めていました。 しかし、データサイズの大きなテーブルでは差分を求めるのに時間がかかることや、一度に大量のテーブルの差分を求めることができませんでした。 そこで、BigQueryへ複数リクエストすることで、一度に複数テーブルの差分結果を短時間で取得をできました。

BigQueryでは、以下のクエリで差分結果を取得しました。

(select * from  `production.Order`
except distinct
select * from  `test.Order`)
union all
(select * from `test.Order`
except distinct
select * from `production.Order`)

production.OrderはTalendで転送しているデータセットのテーブルで、test.OrderがEmbulkで転送しているデータセットのテーブルです。 except演算子では、左側の入力クエリに存在し、右側の入力クエリには存在しない行を返します。 例えばselect * from `production.Order` except distinct select * from `test.Order`をベン図で表すと以下の様になります。

f:id:vasilyjp:20190315125754p:plain
production.Orderにのみあるデータ

以上の図の斜線部ではproduction.Orderにはあるが、test.Orderにはないレコードを表しています。 全体の差分を求めるには上記クエリのExcept前後のテーブルを入れ替え、unionで両方の差分を結合し求めることができます。

6. TalendのジョブをDigdagとEmbulkに置き換える

TalendをDigdagとEmbulkに移行する際の手順を紹介します。 移行手順は以下の2つです。

  1. Talendの設定を変更して、移行前ジョブを無効化する
  2. EmbulkのS3へ転送するBucketを変更する

以上の手順は以下の図の様なイメージです。

f:id:vasilyjp:20190315125907p:plain

以上の図ではTalendの設定を無効化した後に、Embulkのデータ転送の向き先をTalendが転送していたProductionバケットに向けています。

また、安全に移行するために、1日で移行するのではなく複数日に分けました。 なぜなら、移行を全て同日にするのはリスクがあるからです。 例えば、1日で移行する際に複数の折り重なった障害が起きた場合、本番環境に与える障害の影響が大きくなります。 そこで、同日にすべての転送するのをやめ、複数日に分けることでリスクを分配しました。

統一したことによるメリット

EmbulkとDigdagに統一することで、以下の様なメリットがありました。

  • Gitを用いてDigdagとEmbulkの設定が管理できるようになった
  • 時間に余裕を持ってデータ転送を終了できるようになった

Git上でDigdagとEmbulkの設定を管理できるようになった

DigdagとEmbulkに置き換わったことにより、スケジューリングやデータ転送の設定をGit上で管理できるようになりました。 Git上で管理できるようになったことで、複数人に情報共有をしやすくなりました。 またGitのリポジトリ運用はGitHubFlowのため、レビュー段階でミスや改善点に気付けます。

時間に余裕を持ってデータ転送を終了できるようになった

EmbulkとDigdagに置き換えることで余裕を持って、バッチを終了できるようになりました。 TalendがAM4:00 ~ AM7:30の間にデータ転送を終えていたので、それと同様にできる必要がありました。 結果的にデータ転送を1時間30分程度で終了できるようになり、データ転送に余裕を持つことができました。 これはEmbulkとDigdagがデータ転送を並列処理できるからです。

次のデータ基盤のステージ

今回データ基盤のツールの統一をしました。 しかし、データフローの最適化がまだなされていません。 現在のデータ基盤のデータフローを最適化した図は以下のようになります。

f:id:vasilyjp:20190315165759p:plain

以上の図ではデータフローを最適化することで、SQL ServerからS3を経由するフローがなくなりました。 データフローの最適化を行うことでデータフロー全体の転送時間を短くでき、ワークフローの数が減るためDigdagとEmbulkの設定ファイルを少なくできます。 S3を経由するのを止め直接BigQueryにデータを転送すれば、全体のデータ転送時間は1〜2時間程度短くできます。 そしてEmbulkとDigdagの設定ファイルを少なくできるため、新規テーブルの転送設定を行う手間を軽減できます。

今回は紹介ができませんが、データベースのレプリケーションなどまだまだ改善の余地があります。

まとめ

本記事では、「Talend」を「Digdag・Embulk」に移行したことについて紹介しました。 データ基盤の構築や、リプレイスをする際の参考になれば幸いです。

ZOZOテクノロジーズでは、一緒にサービスを作り上げてくれる方を募集中です。ご興味のある方は、以下のリンクからぜひご応募ください。

www.wantedly.com

カテゴリー