GitHubで管理されたデータマート構築基盤の紹介

f:id:vasilyjp:20181004105553j:plain

こんにちは。バックエンドエンジニアの田島(@katsuyan121)です。

弊社ではデータマートをBigQuery上に構築しています。データマートはデータベース全体のデータのうち、必要なデータだけを使いやすい形にしたデータベースです。データマート作成のためのSQLクエリは日々更新や追加があり、BigQueryのコンソールから自由にデータマートを作ってしまうと管理が大変になってしまいます。

そこで、データマートをすべてGitHub上でバージョン管理し、運用の効率化をしました。また、差分更新の導入や依存関係のあるデータマートへの対応などのデータマート構築に必要な機能を作成しました。

弊社のデータ基盤をざっくり紹介します。まずデータはBigQueryへ集約し、Digdagを用いてデータ基盤を構築しています。以下がその概要図です。S3などの分散ストレージや各種DBからデータをBigQueryへ同期し、BigQuery内部でデータマートを構築します。本ブログではこのうちのデータマート構築について紹介します。

f:id:vasilyjp:20181004105602p:plain

BigQueryでのデータマートの実現方法

データマートをBigQuery上に構築していると紹介しましたがその実態は以下の2つです。

  1. BigQueryのView
  2. BigQueryのTable

計算コストが小さいマートに関しては View を用いることでデータの鮮度を保つようにします。逆に計算コストの高いマートは事前に集計などの計算をし Table として保持することで、参照時にコストの高い計算しなくて済むようにしています。

データマート構築の問題点

冒頭でもいくつか紹介しましたがデータマートの構築には以下のような問題が存在します。

  • データマートは日々更新や追加がされる
    • 現在弊社のデータマートの Table View の数は100を超えており、今でも日々追加されている
  • データマートをBigQueryのWebコンソールから自由に作ってしまうと管理が大変
    • 誰が作ったのかわからない
    • 一時的に作ったデータマートなどがどこから参照されているかわからない
    • 溜まってしまったデータマートがどこから参照されているかわからないので不用意に消せない

またテーブルを用いてデータマートを構築する場合以下のことが問題となります。

  • 更新に時間がかかる
  • データマートがどういったクエリにより作られたかわからない
  • 定期的にテーブルを更新する必要がある
  • データマートの Table View が他のTable View を参照している場合、更新する順番を間違えるとデータに不整合が生じる

そこで、これらの問題を解決するためのシステムを作成しました。

データマートのクエリをGitHubで管理する

BigQueryのViewやTableはBigQueryのWebコンソール画面から簡単に作ることができます。しかしそれでは誰が作ったマートなのかなのかが簡単には分からないなどの運用の問題が生じてしまうと紹介しました。そこでSQLファイルを作成し、それらをAPIからBigQueryに反映をします。実際にはRubyの Google Cloud SDK を利用して反映を行っています。

また、SQLファイルのファイル名を View または Table の名前にすることで、実際のマートとSQLファイルとの対応関係が簡単にわかります。以下がその概要です。

f:id:vasilyjp:20181004105622p:plain

Before After
マート作成方法 BigQueryのコンソールから手動で作成 SQLファイルをGit管理しAPI経由で作成

以下に View Table をデータマートへ反映する方法を示しますが、全てのマートをSELECT文のSQLファイルとして表現できます。 BigQueryにSQLを反映する仕組みを作ったことで、すべてのマートをSQLファイルとしてGit管理できるようになりました。

Viewのマート反映

View のマートに関しては作成したSQLをそのままBigQueryに反映します。

  • 新規作成の場合
require 'google/cloud/bigquery'

sql = File.read('テーブル名.sql')
bq_client = Google::Cloud::Bigquery.new(project: 'project', credentials: 'path')
dataset = bq_clent.dataset('データセット名')
dataset.create_view('ビューの名前', sql, standard_sql: true)
  • 更新の場合
require 'google/cloud/bigquery'
bq_client = Google::Cloud::Bigquery.new(project: 'project', credentials: 'path')

sql = File.read('テーブル名.sql')
dataset = bq_clent.dataset('データセット名')
view = dataset.table('テーブル名)
view.set_query(sql, standard_sql: true)

テーブルのマート反映

テーブルのマートについてはBigQueryの Destination Table という機能を使うことで、SELECT文の結果をそのままテーブルに反映できます。

以下のようなコードを実行することでBigQueryにSELECT文の結果がテーブルとして作成されます。この処理は、BigQueryによってアトミックにテーブルが作成されます。

require 'google/cloud/bigquery'

bq_client = Google::Cloud::Bigquery.new(project: 'project', credentials: 'path')
sql =  File.read('テーブル名.sql')
job = bq_client.query_job(sql, standart_sql: true,
                               table: 'テーブル名',
                               write: 'truncate',
                               create: 'needed')
job.wait_until_done!

差分更新

計算コストの高いマートは事前に集計などの計算をし Table として保持すると紹介しました。しかし集計するデータが大きすぎる場合、毎回全量のデータを集計・更新すると時間がかかりすぎてしまいます。

例えば日付ごとに集計するテーブルの場合、毎日全量のデータを集計し直していると最新の日付以外の集計処理は前日と同じ計算をしてしまうことになり集計処理が増えて行きます。そこで差分更新をすることでこの問題を解決します。

本システムでは差分更新にappendとoverwriteの2種類を行っています。

append

appendは集計した結果を既存のテーブルに対してinsertします。

f:id:vasilyjp:20181004105633p:plain

以下のようなテンプレートに対しSQLファイルの中身をそのまま埋め込むことで、「テーブルのマート反映までの流れ」で紹介した処理と同じようにBigQueryに反映できます。

SELECT * FROM `既存のテーブル`
union all
(
<%= sql %>
)

overwrite

一部集計した結果を既存のマートに上書きしなければならない場合appendだけでは対応できません。overwriteの処理ではそのようなケースに対応するため pk などのユニークキーを利用して集計結果をマージします。

f:id:vasilyjp:20181004105642p:plain

こちらもappendのときと同じように以下のようなテンプレートに対してSQLを埋め込みBigQueryに反映します。

SELECT * except(priority, row_number) FROM (
  SELECT
    *,
    row_number() over (partition by <%= pk %> order by priority) as row_number
    FROM (
      SELECT *, 1 AS priority FROM (
        <%= sql %>
        )
      union all
      SELECT *, 2  FROM `既存のテーブル`
    )
)
WHERE row_number = 1

以上のように差分更新する対象のテーブルは統一した方法で差分更新が行われるようになりました。

append更新の改善

最初は以上のようにappendとoverwriteの処理を通常の更新処理と同じように行っていました。しかし運用しているうちに SELECT * FROM により全件のデータを毎回取得していることが以下の2点で問題になりました。

  • 処理に時間がかかる
  • BigQueryはクエリの使用量に対して課金されるためお金がかかってしまう

そこでBigQueryのappendの機能を利用するように変更しました。この、 append によるデータの追記もBigQueryによりアトミックに行われることが保証されています。以下のコードで、それを実現しています。

require 'google/cloud/bigquery'

bq_client = Google::Cloud::Bigquery.new(project: 'project', credentials: 'path')
sql =  File.read('テーブル名.sql')
job = bq_client.query_job(sql, standard_sql: true,
                               table: 'テーブル名',
                               write: 'append',
                               create: 'needed')
job.wait_until_done!

これにより、SELECT * FROM の処理をなくすことに成功しました。ただし、overwriteは以上の解決策は適用できないため上で紹介した通りに今も更新を行っており、課題となっています。

冪等性の担保

差分更新の仕組みを紹介しましたが、差分更新の処理があると何かの問題で処理を再実行した場合データが重複するといったデータの整合性に対する問題が生じてしまいます。

例えば、既存のテーブルに対して append の処理をします。その後再実行したい場合、もう一度 append の処理を行うと append により追加されたデータが重複してしまいます。 そこで、以下のようにすることで冪等性を担保し、何回同じ処理をしても同じ結果になるよう工夫しました。 また、 overwrite でも同じように処理を行います。

マート更新時に、マートと同じデータを保持したバックアップを作成します。そして、差分更新を行う場合はバックアップに対して差分更新を行い、その結果を実際のマートに反映します。

2018-01-06 にマートを更新すると以下のような処理になります。

f:id:vasilyjp:20181004105653p:plain

これをコードにしたものが以下になります。

require 'google/cloud/bigquery'

bq_client = Google::Cloud::Bigquery.new(project: 'project', credentials: 'path')
dataset = bq_clent.dataset('データセット名')

table = dataset.table('mart_table', skip_lookup: true)
backup_table_yesterday = dataset.table('mart_table_20180105', skip_lookup: true)
backup_table_today = dataset.table('mart_table_20180106', skip_lookup: true)

backup_table.copy_job(destination_table, create: 'needed', write: 'truncate') # バックアップテーブルを実際のテーブルにコピー

job = bq_client.query_job(sql, standard_sql: true,
                               table: table,
                               write: 'append',
                               create: 'needed',
                               &job_configuration)
job.wait_until_done!
raise "Fail BigQuery job: #{job.error}" if job.failed?

table.copy_job('バックアップテーブル.sql', create: 'needed', write: 'truncate') # 更新されたテーブルをバックアップ

以上のように差分更新をすることで、何回リトライしてもデータの整合性が保たれるうようになりました。

マートの依存関係

マートが他のマートを参照している場合、更新する順番を間違えるとデータに不整合が生じると紹介しました。例えば existing_table1 existing_table2 のテーブルが存在するとし、以下のような4つのマート構築を考えます。その場合、「table3の前にtable1」「table4の前にtable1とtable3」が更新されている必要があります。

table1.sql

SELECT * FROM `project.dataset.existing_table1`;

table2.sql

SELECT * FROM `project.dataset.xisting_table2`;

table3.sql

SELECT * FROM `project.dataset.table2`;

table4.sql

SELECT * FROM `project.dataset.table1`
UNION ALL
SELECT * FROM `project.dataset.table4`;

そこで以下のようなグラフを作成し、実行順を確認します。このグラフは、SQLの FROM または JOIN の後ろのテーブル名とSQLファイル名を利用します。FROMの後ろに書かれているテーブルはSQLファイル名のマートよりも前に実行しなければなりません。そのため「FROMの後ろのテーブル名」->「SQLファイルのファイル名」というグラフを作成します。

例えば上の4つのSQLからマートのグラフを作成すると以下のようになります。

f:id:vasilyjp:20181004105705p:plain

以下のようなコードでグラフを生成します。

graph = {}
sql_files = [table1.sql, table2.sql, table3.sql, table4.sql]

sql_files.each do |sql_file|
  sql_file_table_name = sql_file.split('.').first
  sql = File.open(sql_file, 'rt:UTF-8') { |file| file.read.chomp }

  related_tables = sql.scan(/(?:FROM|JOIN)[\s \n]+`(.+?)`/i)

  graph[sql_file_table_name] = related_tables.map do |str|
    _, table = str.first.split('.')
    next if table == sql_file_table_name # 自己参照は除外
    table
  end.compact
end

この結果に対してマート以外のテーブルを削除すると、以下のような結果になります。

[
  {'table3' => ['table2']},
  {'table4' => ['table1', 'table3']}
]

並列実行

以上のグラフを利用することで、マート作成のクエリの実行の順番を担保したまま並列化できます。並列実行を簡易化するために以上のようなグラフを、以下のようなテーブルのリストのリストに変換します。各テーブルのリストは先頭から順番に実行でき、テーブルのリストの中身はそれぞれ並列実行することが可能になります。

[[table1, table2], [table3], [table4]]

この変換は、まず親ノードがないテーブル一覧をリストに追加します。そして追加したテーブルをグラフから削除し、もう一度親ノードがないテーブル一覧をリストに追加します。これを繰り返すことで、以上のようなリストが生成されます。

f:id:vasilyjp:20181004105718p:plain

以下のようなコードでグラフからリストを生成します。

def sort_tables(graph)
  graph = graph.dup

  result = []

  until graph.empty?
    nodes = graph.keys.select { |node| graph[node].empty? }
    result << nodes

    raise 'cyclic path detected!' if nodes.empty?

    nodes.each { |node| graph.delete(node) }
    graph.transform_values! { |v| v - nodes }
  end
  return result
end

このリストを利用し、最初に「table1, table2」続いて「table3」最後に「table4」を実行します。 並列実行はDigdagで実現しており、以下のようなconfigファイルになっています。

+set_refresh_views:
  call> set_refresh_views # REFRESH_TABLESESにテーブルのリストのリストを格納

+update_views:
  for_each>:
    REFRESH_TABLES: ${REFRESH_TABLESES}
  _parallel: false
  _do:
    for_each>:
      REFRESH_TABLE: ${REFRESH_TABLES}
    _parallel: true # ここをtrueにすることで並列実行を実現
    _do:
      _retry: 3
      call> refresh # 実際にマートを更新する処理

以上のように実行する順番を考える必要があった所を、システムにより自動解決することに成功しました。さらに、並列実行をすることで更新スピードの短縮にも成功しました。

このシステムを作ってどうなったか

最後に本システムによりデータマートの運用に関してどのようになったか紹介します。

  • GitHubでマート管理できるようになり日々追加されるマートの管理を統一したルールで管理できるようになった
  • 差分更新や並列実行を導入したことで、データマートの更新の時間短縮に成功した
  • データマートのすべてのクエリをGitHubで参照できるようになった
  • システムを定期的に実行することでテーブルの更新が自動化された
  • データマート同士の参照が自動解決されるようになった

以上のように冒頭に挙げた問題を解決することに成功しました。またその他に以下のようなメリットが見られました。

  • 冪等性の担保をしたことで、マート更新の再実行を気軽にできるようになった
  • マートのSQLをGitHubで管理することによりコードレビューが活発になった
  • GitHub上でレビューができるため誰でも気軽にマートを追加できるようになった

以上のようにデータ基盤のコード管理とマート管理をGIthubに統一、システム化することで様々なメリットが得られました。

最後に

本文でも紹介しましたが、本システムにはまだまだ問題が残されています。弊社では一緒にデータ基盤を作ってくれる方を大募集しています。 ご興味がある方は以下のリンクから是非ご応募ください!

www.wantedly.com

カテゴリー