こんにちは。バックエンドエンジニアの田島(@katsuyan121)です。
弊社ではデータマートをBigQuery上に構築しています。データマートはデータベース全体のデータのうち、必要なデータだけを使いやすい形にしたデータベースです。データマート作成のためのSQLクエリは日々更新や追加があり、BigQueryのコンソールから自由にデータマートを作ってしまうと管理が大変になってしまいます。
そこで、データマートをすべてGitHub上でバージョン管理し、運用の効率化をしました。また、差分更新の導入や依存関係のあるデータマートへの対応などのデータマート構築に必要な機能を作成しました。
弊社のデータ基盤をざっくり紹介します。まずデータはBigQueryへ集約し、Digdagを用いてデータ基盤を構築しています。以下がその概要図です。S3などの分散ストレージや各種DBからデータをBigQueryへ同期し、BigQuery内部でデータマートを構築します。本ブログではこのうちのデータマート構築について紹介します。
BigQueryでのデータマートの実現方法
データマートをBigQuery上に構築していると紹介しましたがその実態は以下の2つです。
- BigQueryのView
- BigQueryのTable
計算コストが小さいマートに関しては View
を用いることでデータの鮮度を保つようにします。逆に計算コストの高いマートは事前に集計などの計算をし Table
として保持することで、参照時にコストの高い計算しなくて済むようにしています。
データマート構築の問題点
冒頭でもいくつか紹介しましたがデータマートの構築には以下のような問題が存在します。
- データマートは日々更新や追加がされる
- 現在弊社のデータマートの
Table
View
の数は100を超えており、今でも日々追加されている
- 現在弊社のデータマートの
- データマートをBigQueryのWebコンソールから自由に作ってしまうと管理が大変
- 誰が作ったのかわからない
- 一時的に作ったデータマートなどがどこから参照されているかわからない
- 溜まってしまったデータマートがどこから参照されているかわからないので不用意に消せない
またテーブルを用いてデータマートを構築する場合以下のことが問題となります。
- 更新に時間がかかる
- データマートがどういったクエリにより作られたかわからない
- 定期的にテーブルを更新する必要がある
- データマートの
Table
View
が他のTable
View
を参照している場合、更新する順番を間違えるとデータに不整合が生じる
そこで、これらの問題を解決するためのシステムを作成しました。
データマートのクエリをGitHubで管理する
BigQueryのViewやTableはBigQueryのWebコンソール画面から簡単に作ることができます。しかしそれでは誰が作ったマートなのかなのかが簡単には分からないなどの運用の問題が生じてしまうと紹介しました。そこでSQLファイルを作成し、それらをAPIからBigQueryに反映をします。実際にはRubyの Google Cloud SDK を利用して反映を行っています。
また、SQLファイルのファイル名を View
または Table
の名前にすることで、実際のマートとSQLファイルとの対応関係が簡単にわかります。以下がその概要です。
Before | After | |
---|---|---|
マート作成方法 | BigQueryのコンソールから手動で作成 | SQLファイルをGit管理しAPI経由で作成 |
以下に View
Table
をデータマートへ反映する方法を示しますが、全てのマートをSELECT文のSQLファイルとして表現できます。
BigQueryにSQLを反映する仕組みを作ったことで、すべてのマートをSQLファイルとしてGit管理できるようになりました。
Viewのマート反映
View
のマートに関しては作成したSQLをそのままBigQueryに反映します。
- 新規作成の場合
require 'google/cloud/bigquery' sql = File.read('テーブル名.sql') bq_client = Google::Cloud::Bigquery.new(project: 'project', credentials: 'path') dataset = bq_clent.dataset('データセット名') dataset.create_view('ビューの名前', sql, standard_sql: true)
- 更新の場合
require 'google/cloud/bigquery' bq_client = Google::Cloud::Bigquery.new(project: 'project', credentials: 'path') sql = File.read('テーブル名.sql') dataset = bq_clent.dataset('データセット名') view = dataset.table('テーブル名) view.set_query(sql, standard_sql: true)
テーブルのマート反映
テーブルのマートについてはBigQueryの Destination Table という機能を使うことで、SELECT文の結果をそのままテーブルに反映できます。
以下のようなコードを実行することでBigQueryにSELECT文の結果がテーブルとして作成されます。この処理は、BigQueryによってアトミックにテーブルが作成されます。
require 'google/cloud/bigquery' bq_client = Google::Cloud::Bigquery.new(project: 'project', credentials: 'path') sql = File.read('テーブル名.sql') job = bq_client.query_job(sql, standart_sql: true, table: 'テーブル名', write: 'truncate', create: 'needed') job.wait_until_done!
差分更新
計算コストの高いマートは事前に集計などの計算をし Table
として保持すると紹介しました。しかし集計するデータが大きすぎる場合、毎回全量のデータを集計・更新すると時間がかかりすぎてしまいます。
例えば日付ごとに集計するテーブルの場合、毎日全量のデータを集計し直していると最新の日付以外の集計処理は前日と同じ計算をしてしまうことになり集計処理が増えて行きます。そこで差分更新をすることでこの問題を解決します。
本システムでは差分更新にappendとoverwriteの2種類を行っています。
append
appendは集計した結果を既存のテーブルに対してinsertします。
以下のようなテンプレートに対しSQLファイルの中身をそのまま埋め込むことで、「テーブルのマート反映までの流れ」で紹介した処理と同じようにBigQueryに反映できます。
SELECT * FROM `既存のテーブル` union all ( <%= sql %> )
overwrite
一部集計した結果を既存のマートに上書きしなければならない場合appendだけでは対応できません。overwriteの処理ではそのようなケースに対応するため pk
などのユニークキーを利用して集計結果をマージします。
こちらもappendのときと同じように以下のようなテンプレートに対してSQLを埋め込みBigQueryに反映します。
SELECT * except(priority, row_number) FROM ( SELECT *, row_number() over (partition by <%= pk %> order by priority) as row_number FROM ( SELECT *, 1 AS priority FROM ( <%= sql %> ) union all SELECT *, 2 FROM `既存のテーブル` ) ) WHERE row_number = 1
以上のように差分更新する対象のテーブルは統一した方法で差分更新が行われるようになりました。
append更新の改善
最初は以上のようにappendとoverwriteの処理を通常の更新処理と同じように行っていました。しかし運用しているうちに SELECT * FROM
により全件のデータを毎回取得していることが以下の2点で問題になりました。
- 処理に時間がかかる
- BigQueryはクエリの使用量に対して課金されるためお金がかかってしまう
そこでBigQueryのappendの機能を利用するように変更しました。この、 append
によるデータの追記もBigQueryによりアトミックに行われることが保証されています。以下のコードで、それを実現しています。
require 'google/cloud/bigquery' bq_client = Google::Cloud::Bigquery.new(project: 'project', credentials: 'path') sql = File.read('テーブル名.sql') job = bq_client.query_job(sql, standard_sql: true, table: 'テーブル名', write: 'append', create: 'needed') job.wait_until_done!
これにより、SELECT * FROM
の処理をなくすことに成功しました。ただし、overwriteは以上の解決策は適用できないため上で紹介した通りに今も更新を行っており、課題となっています。
冪等性の担保
差分更新の仕組みを紹介しましたが、差分更新の処理があると何かの問題で処理を再実行した場合データが重複するといったデータの整合性に対する問題が生じてしまいます。
例えば、既存のテーブルに対して append
の処理をします。その後再実行したい場合、もう一度 append
の処理を行うと append
により追加されたデータが重複してしまいます。
そこで、以下のようにすることで冪等性を担保し、何回同じ処理をしても同じ結果になるよう工夫しました。 また、 overwrite
でも同じように処理を行います。
マート更新時に、マートと同じデータを保持したバックアップを作成します。そして、差分更新を行う場合はバックアップに対して差分更新を行い、その結果を実際のマートに反映します。
2018-01-06
にマートを更新すると以下のような処理になります。
これをコードにしたものが以下になります。
require 'google/cloud/bigquery' bq_client = Google::Cloud::Bigquery.new(project: 'project', credentials: 'path') dataset = bq_clent.dataset('データセット名') table = dataset.table('mart_table', skip_lookup: true) backup_table_yesterday = dataset.table('mart_table_20180105', skip_lookup: true) backup_table_today = dataset.table('mart_table_20180106', skip_lookup: true) backup_table.copy_job(destination_table, create: 'needed', write: 'truncate') # バックアップテーブルを実際のテーブルにコピー job = bq_client.query_job(sql, standard_sql: true, table: table, write: 'append', create: 'needed', &job_configuration) job.wait_until_done! raise "Fail BigQuery job: #{job.error}" if job.failed? table.copy_job('バックアップテーブル.sql', create: 'needed', write: 'truncate') # 更新されたテーブルをバックアップ
以上のように差分更新をすることで、何回リトライしてもデータの整合性が保たれるうようになりました。
マートの依存関係
マートが他のマートを参照している場合、更新する順番を間違えるとデータに不整合が生じると紹介しました。例えば existing_table1
existing_table2
のテーブルが存在するとし、以下のような4つのマート構築を考えます。その場合、「table3の前にtable1」「table4の前にtable1とtable3」が更新されている必要があります。
table1.sql
SELECT * FROM `project.dataset.existing_table1`;
table2.sql
SELECT * FROM `project.dataset.xisting_table2`;
table3.sql
SELECT * FROM `project.dataset.table2`;
table4.sql
SELECT * FROM `project.dataset.table1` UNION ALL SELECT * FROM `project.dataset.table4`;
そこで以下のようなグラフを作成し、実行順を確認します。このグラフは、SQLの FROM
または JOIN
の後ろのテーブル名とSQLファイル名を利用します。FROMの後ろに書かれているテーブルはSQLファイル名のマートよりも前に実行しなければなりません。そのため「FROMの後ろのテーブル名」->「SQLファイルのファイル名」というグラフを作成します。
例えば上の4つのSQLからマートのグラフを作成すると以下のようになります。
以下のようなコードでグラフを生成します。
graph = {} sql_files = [table1.sql, table2.sql, table3.sql, table4.sql] sql_files.each do |sql_file| sql_file_table_name = sql_file.split('.').first sql = File.open(sql_file, 'rt:UTF-8') { |file| file.read.chomp } related_tables = sql.scan(/(?:FROM|JOIN)[\s \n]+`(.+?)`/i) graph[sql_file_table_name] = related_tables.map do |str| _, table = str.first.split('.') next if table == sql_file_table_name # 自己参照は除外 table end.compact end
この結果に対してマート以外のテーブルを削除すると、以下のような結果になります。
[ {'table3' => ['table2']}, {'table4' => ['table1', 'table3']} ]
並列実行
以上のグラフを利用することで、マート作成のクエリの実行の順番を担保したまま並列化できます。並列実行を簡易化するために以上のようなグラフを、以下のようなテーブルのリストのリストに変換します。各テーブルのリストは先頭から順番に実行でき、テーブルのリストの中身はそれぞれ並列実行することが可能になります。
[[table1, table2], [table3], [table4]]
この変換は、まず親ノードがないテーブル一覧をリストに追加します。そして追加したテーブルをグラフから削除し、もう一度親ノードがないテーブル一覧をリストに追加します。これを繰り返すことで、以上のようなリストが生成されます。
以下のようなコードでグラフからリストを生成します。
def sort_tables(graph) graph = graph.dup result = [] until graph.empty? nodes = graph.keys.select { |node| graph[node].empty? } result << nodes raise 'cyclic path detected!' if nodes.empty? nodes.each { |node| graph.delete(node) } graph.transform_values! { |v| v - nodes } end return result end
このリストを利用し、最初に「table1, table2」続いて「table3」最後に「table4」を実行します。 並列実行はDigdagで実現しており、以下のようなconfigファイルになっています。
+set_refresh_views: call> set_refresh_views # REFRESH_TABLESESにテーブルのリストのリストを格納 +update_views: for_each>: REFRESH_TABLES: ${REFRESH_TABLESES} _parallel: false _do: for_each>: REFRESH_TABLE: ${REFRESH_TABLES} _parallel: true # ここをtrueにすることで並列実行を実現 _do: _retry: 3 call> refresh # 実際にマートを更新する処理
以上のように実行する順番を考える必要があった所を、システムにより自動解決することに成功しました。さらに、並列実行をすることで更新スピードの短縮にも成功しました。
このシステムを作ってどうなったか
最後に本システムによりデータマートの運用に関してどのようになったか紹介します。
- GitHubでマート管理できるようになり日々追加されるマートの管理を統一したルールで管理できるようになった
- 差分更新や並列実行を導入したことで、データマートの更新の時間短縮に成功した
- データマートのすべてのクエリをGitHubで参照できるようになった
- システムを定期的に実行することでテーブルの更新が自動化された
- データマート同士の参照が自動解決されるようになった
以上のように冒頭に挙げた問題を解決することに成功しました。またその他に以下のようなメリットが見られました。
- 冪等性の担保をしたことで、マート更新の再実行を気軽にできるようになった
- マートのSQLをGitHubで管理することによりコードレビューが活発になった
- GitHub上でレビューができるため誰でも気軽にマートを追加できるようになった
以上のようにデータ基盤のコード管理とマート管理をGIthubに統一、システム化することで様々なメリットが得られました。
最後に
本文でも紹介しましたが、本システムにはまだまだ問題が残されています。弊社では一緒にデータ基盤を作ってくれる方を大募集しています。 ご興味がある方は以下のリンクから是非ご応募ください!