GitHubで管理されたデータマート構築基盤の紹介

f:id:vasilyjp:20181004105553j:plain

こんにちは。バックエンドエンジニアの田島(@katsuyan121)です。

弊社ではデータマートをBigQuery上に構築しています。データマートはデータベース全体のデータのうち、必要なデータだけを使いやすい形にしたデータベースです。データマート作成のためのSQLクエリは日々更新や追加があり、BigQueryのコンソールから自由にデータマートを作ってしまうと管理が大変になってしまいます。

そこで、データマートをすべてGitHub上でバージョン管理し、運用の効率化をしました。また、差分更新の導入や依存関係のあるデータマートへの対応などのデータマート構築に必要な機能を作成しました。

弊社のデータ基盤をざっくり紹介します。まずデータはBigQueryへ集約し、Digdagを用いてデータ基盤を構築しています。以下がその概要図です。S3などの分散ストレージや各種DBからデータをBigQueryへ同期し、BigQuery内部でデータマートを構築します。本ブログではこのうちのデータマート構築について紹介します。

f:id:vasilyjp:20181004105602p:plain

BigQueryでのデータマートの実現方法

データマートをBigQuery上に構築していると紹介しましたがその実態は以下の2つです。

  1. BigQueryのView
  2. BigQueryのTable

計算コストが小さいマートに関しては View を用いることでデータの鮮度を保つようにします。逆に計算コストの高いマートは事前に集計などの計算をし Table として保持することで、参照時にコストの高い計算しなくて済むようにしています。

データマート構築の問題点

冒頭でもいくつか紹介しましたがデータマートの構築には以下のような問題が存在します。

  • データマートは日々更新や追加がされる
    • 現在弊社のデータマートの Table View の数は100を超えており、今でも日々追加されている
  • データマートをBigQueryのWebコンソールから自由に作ってしまうと管理が大変
    • 誰が作ったのかわからない
    • 一時的に作ったデータマートなどがどこから参照されているかわからない
    • 溜まってしまったデータマートがどこから参照されているかわからないので不用意に消せない

またテーブルを用いてデータマートを構築する場合以下のことが問題となります。

  • 更新に時間がかかる
  • データマートがどういったクエリにより作られたかわからない
  • 定期的にテーブルを更新する必要がある
  • データマートの Table View が他のTable View を参照している場合、更新する順番を間違えるとデータに不整合が生じる

そこで、これらの問題を解決するためのシステムを作成しました。

データマートのクエリをGitHubで管理する

BigQueryのViewやTableはBigQueryのWebコンソール画面から簡単に作ることができます。しかしそれでは誰が作ったマートなのかなのかが簡単には分からないなどの運用の問題が生じてしまうと紹介しました。そこでSQLファイルを作成し、それらをAPIからBigQueryに反映をします。実際にはRubyの Google Cloud SDK を利用して反映を行っています。

また、SQLファイルのファイル名を View または Table の名前にすることで、実際のマートとSQLファイルとの対応関係が簡単にわかります。以下がその概要です。

f:id:vasilyjp:20181004105622p:plain

Before After
マート作成方法 BigQueryのコンソールから手動で作成 SQLファイルをGit管理しAPI経由で作成

以下に View Table をデータマートへ反映する方法を示しますが、全てのマートをSELECT文のSQLファイルとして表現できます。 BigQueryにSQLを反映する仕組みを作ったことで、すべてのマートをSQLファイルとしてGit管理できるようになりました。

Viewのマート反映

View のマートに関しては作成したSQLをそのままBigQueryに反映します。

  • 新規作成の場合
require 'google/cloud/bigquery'

sql = File.read('テーブル名.sql')
bq_client = Google::Cloud::Bigquery.new(project: 'project', credentials: 'path')
dataset = bq_clent.dataset('データセット名')
dataset.create_view('ビューの名前', sql, standard_sql: true)
  • 更新の場合
require 'google/cloud/bigquery'
bq_client = Google::Cloud::Bigquery.new(project: 'project', credentials: 'path')

sql = File.read('テーブル名.sql')
dataset = bq_clent.dataset('データセット名')
view = dataset.table('テーブル名)
view.set_query(sql, standard_sql: true)

テーブルのマート反映

テーブルのマートについてはBigQueryの Destination Table という機能を使うことで、SELECT文の結果をそのままテーブルに反映できます。

以下のようなコードを実行することでBigQueryにSELECT文の結果がテーブルとして作成されます。この処理は、BigQueryによってアトミックにテーブルが作成されます。

require 'google/cloud/bigquery'

bq_client = Google::Cloud::Bigquery.new(project: 'project', credentials: 'path')
sql =  File.read('テーブル名.sql')
job = bq_client.query_job(sql, standart_sql: true,
                               table: 'テーブル名',
                               write: 'truncate',
                               create: 'needed')
job.wait_until_done!

差分更新

計算コストの高いマートは事前に集計などの計算をし Table として保持すると紹介しました。しかし集計するデータが大きすぎる場合、毎回全量のデータを集計・更新すると時間がかかりすぎてしまいます。

例えば日付ごとに集計するテーブルの場合、毎日全量のデータを集計し直していると最新の日付以外の集計処理は前日と同じ計算をしてしまうことになり集計処理が増えて行きます。そこで差分更新をすることでこの問題を解決します。

本システムでは差分更新にappendとoverwriteの2種類を行っています。

append

appendは集計した結果を既存のテーブルに対してinsertします。

f:id:vasilyjp:20181004105633p:plain

以下のようなテンプレートに対しSQLファイルの中身をそのまま埋め込むことで、「テーブルのマート反映までの流れ」で紹介した処理と同じようにBigQueryに反映できます。

SELECT * FROM `既存のテーブル`
union all
(
<%= sql %>
)

overwrite

一部集計した結果を既存のマートに上書きしなければならない場合appendだけでは対応できません。overwriteの処理ではそのようなケースに対応するため pk などのユニークキーを利用して集計結果をマージします。

f:id:vasilyjp:20181004105642p:plain

こちらもappendのときと同じように以下のようなテンプレートに対してSQLを埋め込みBigQueryに反映します。

SELECT * except(priority, row_number) FROM (
  SELECT
    *,
    row_number() over (partition by <%= pk %> order by priority) as row_number
    FROM (
      SELECT *, 1 AS priority FROM (
        <%= sql %>
        )
      union all
      SELECT *, 2  FROM `既存のテーブル`
    )
)
WHERE row_number = 1

以上のように差分更新する対象のテーブルは統一した方法で差分更新が行われるようになりました。

append更新の改善

最初は以上のようにappendとoverwriteの処理を通常の更新処理と同じように行っていました。しかし運用しているうちに SELECT * FROM により全件のデータを毎回取得していることが以下の2点で問題になりました。

  • 処理に時間がかかる
  • BigQueryはクエリの使用量に対して課金されるためお金がかかってしまう

そこでBigQueryのappendの機能を利用するように変更しました。この、 append によるデータの追記もBigQueryによりアトミックに行われることが保証されています。以下のコードで、それを実現しています。

require 'google/cloud/bigquery'

bq_client = Google::Cloud::Bigquery.new(project: 'project', credentials: 'path')
sql =  File.read('テーブル名.sql')
job = bq_client.query_job(sql, standard_sql: true,
                               table: 'テーブル名',
                               write: 'append',
                               create: 'needed')
job.wait_until_done!

これにより、SELECT * FROM の処理をなくすことに成功しました。ただし、overwriteは以上の解決策は適用できないため上で紹介した通りに今も更新を行っており、課題となっています。

冪等性の担保

差分更新の仕組みを紹介しましたが、差分更新の処理があると何かの問題で処理を再実行した場合データが重複するといったデータの整合性に対する問題が生じてしまいます。

例えば、既存のテーブルに対して append の処理をします。その後再実行したい場合、もう一度 append の処理を行うと append により追加されたデータが重複してしまいます。 そこで、以下のようにすることで冪等性を担保し、何回同じ処理をしても同じ結果になるよう工夫しました。 また、 overwrite でも同じように処理を行います。

マート更新時に、マートと同じデータを保持したバックアップを作成します。そして、差分更新を行う場合はバックアップに対して差分更新を行い、その結果を実際のマートに反映します。

2018-01-06 にマートを更新すると以下のような処理になります。

f:id:vasilyjp:20181004105653p:plain

これをコードにしたものが以下になります。

require 'google/cloud/bigquery'

bq_client = Google::Cloud::Bigquery.new(project: 'project', credentials: 'path')
dataset = bq_clent.dataset('データセット名')

table = dataset.table('mart_table', skip_lookup: true)
backup_table_yesterday = dataset.table('mart_table_20180105', skip_lookup: true)
backup_table_today = dataset.table('mart_table_20180106', skip_lookup: true)

backup_table.copy_job(destination_table, create: 'needed', write: 'truncate') # バックアップテーブルを実際のテーブルにコピー

job = bq_client.query_job(sql, standard_sql: true,
                               table: table,
                               write: 'append',
                               create: 'needed',
                               &job_configuration)
job.wait_until_done!
raise "Fail BigQuery job: #{job.error}" if job.failed?

table.copy_job('バックアップテーブル.sql', create: 'needed', write: 'truncate') # 更新されたテーブルをバックアップ

以上のように差分更新をすることで、何回リトライしてもデータの整合性が保たれるうようになりました。

マートの依存関係

マートが他のマートを参照している場合、更新する順番を間違えるとデータに不整合が生じると紹介しました。例えば existing_table1 existing_table2 のテーブルが存在するとし、以下のような4つのマート構築を考えます。その場合、「table3の前にtable1」「table4の前にtable1とtable3」が更新されている必要があります。

table1.sql

SELECT * FROM `project.dataset.existing_table1`;

table2.sql

SELECT * FROM `project.dataset.xisting_table2`;

table3.sql

SELECT * FROM `project.dataset.table2`;

table4.sql

SELECT * FROM `project.dataset.table1`
UNION ALL
SELECT * FROM `project.dataset.table4`;

そこで以下のようなグラフを作成し、実行順を確認します。このグラフは、SQLの FROM または JOIN の後ろのテーブル名とSQLファイル名を利用します。FROMの後ろに書かれているテーブルはSQLファイル名のマートよりも前に実行しなければなりません。そのため「FROMの後ろのテーブル名」->「SQLファイルのファイル名」というグラフを作成します。

例えば上の4つのSQLからマートのグラフを作成すると以下のようになります。

f:id:vasilyjp:20181004105705p:plain

以下のようなコードでグラフを生成します。

graph = {}
sql_files = [table1.sql, table2.sql, table3.sql, table4.sql]

sql_files.each do |sql_file|
  sql_file_table_name = sql_file.split('.').first
  sql = File.open(sql_file, 'rt:UTF-8') { |file| file.read.chomp }

  related_tables = sql.scan(/(?:FROM|JOIN)[\s \n]+`(.+?)`/i)

  graph[sql_file_table_name] = related_tables.map do |str|
    _, table = str.first.split('.')
    next if table == sql_file_table_name # 自己参照は除外
    table
  end.compact
end

この結果に対してマート以外のテーブルを削除すると、以下のような結果になります。

[
  {'table3' => ['table2']},
  {'table4' => ['table1', 'table3']}
]

並列実行

以上のグラフを利用することで、マート作成のクエリの実行の順番を担保したまま並列化できます。並列実行を簡易化するために以上のようなグラフを、以下のようなテーブルのリストのリストに変換します。各テーブルのリストは先頭から順番に実行でき、テーブルのリストの中身はそれぞれ並列実行することが可能になります。

[[table1, table2], [table3], [table4]]

この変換は、まず親ノードがないテーブル一覧をリストに追加します。そして追加したテーブルをグラフから削除し、もう一度親ノードがないテーブル一覧をリストに追加します。これを繰り返すことで、以上のようなリストが生成されます。

f:id:vasilyjp:20181004105718p:plain

以下のようなコードでグラフからリストを生成します。

def sort_tables(graph)
  graph = graph.dup

  result = []

  until graph.empty?
    nodes = graph.keys.select { |node| graph[node].empty? }
    result << nodes

    raise 'cyclic path detected!' if nodes.empty?

    nodes.each { |node| graph.delete(node) }
    graph.transform_values! { |v| v - nodes }
  end
  return result
end

このリストを利用し、最初に「table1, table2」続いて「table3」最後に「table4」を実行します。 並列実行はDigdagで実現しており、以下のようなconfigファイルになっています。

+set_refresh_views:
  call> set_refresh_views # REFRESH_TABLESESにテーブルのリストのリストを格納

+update_views:
  for_each>:
    REFRESH_TABLES: ${REFRESH_TABLESES}
  _parallel: false
  _do:
    for_each>:
      REFRESH_TABLE: ${REFRESH_TABLES}
    _parallel: true # ここをtrueにすることで並列実行を実現
    _do:
      _retry: 3
      call> refresh # 実際にマートを更新する処理

以上のように実行する順番を考える必要があった所を、システムにより自動解決することに成功しました。さらに、並列実行をすることで更新スピードの短縮にも成功しました。

このシステムを作ってどうなったか

最後に本システムによりデータマートの運用に関してどのようになったか紹介します。

  • GitHubでマート管理できるようになり日々追加されるマートの管理を統一したルールで管理できるようになった
  • 差分更新や並列実行を導入したことで、データマートの更新の時間短縮に成功した
  • データマートのすべてのクエリをGitHubで参照できるようになった
  • システムを定期的に実行することでテーブルの更新が自動化された
  • データマート同士の参照が自動解決されるようになった

以上のように冒頭に挙げた問題を解決することに成功しました。またその他に以下のようなメリットが見られました。

  • 冪等性の担保をしたことで、マート更新の再実行を気軽にできるようになった
  • マートのSQLをGitHubで管理することによりコードレビューが活発になった
  • GitHub上でレビューができるため誰でも気軽にマートを追加できるようになった

以上のようにデータ基盤のコード管理とマート管理をGIthubに統一、システム化することで様々なメリットが得られました。

最後に

本文でも紹介しましたが、本システムにはまだまだ問題が残されています。弊社では一緒にデータ基盤を作ってくれる方を大募集しています。 ご興味がある方は以下のリンクから是非ご応募ください!

www.wantedly.com

「品質」の基準とは?

f:id:vasilyjp:20180927112742j:plain

こんにちは。品質管理部エンジニアリングチームの高橋です。

今回は品質管理部として初のTECH BLOG投稿ということもあり、
「品質 / Quality」について掘り下げてみたいと思います。

「品質」の意味

「品質」という言葉の語源は古代ギリシャにまで遡ります。
「万学の祖」と称されるアリストテレスは、物質(Substance)を「量的側面」と「質的側面」に分けて定義しました。
その際に量的な側面を表現する言葉として英語の質量(Quantitiy)に相当するギリシャ語を用いました。 そして対立概念である質的な側面を表現する言葉として、英語のQualityの語源となったラテン語のQualitasを用いたと言われています。

月日は流れ現在、ISO 9000シリーズにて「品質」は「本来備わっている特性の集まりが、要求事項を満たす程度」と述べられています。※ISO 9000シリーズとは、国際標準化機構が定めた品質マネジメントシステムに関する規格の総称です。

『プログラミングの心理学』や『一般システム思考入門』を著した、ソフトウェア開発の人類学者であるジェラルド・ワインバーグは、「品質は誰かにとっての価値である」と自著内で述べています。

続きを読む

Kotlin Fest2018参加レポート

f:id:vasilyjp:20180927112737j:plain

Kotlin Fest2018参加レポート

福岡研究所の渡辺(しかじろう @shikajiro)です。Kotlinのおっきなイベントが東京で開催されるということで福岡から飛んで✈いきました。

福岡でもFukuoka.ktという名前で過去に2回ほどイベントを主催しており、KotlinFest主催の太郎さんに登壇していただいたこともありました。僕自身3か月ほどKotlinから離れてましたが、直近の技術情報などをフォローできたらいいなと思い参加しました。

続きを読む

iOSでリストに種類の違うデータを挿入する実装方法

f:id:vasilyjp:20180927112735j:plain

こんにちは、新事業創造部の遠藤です。現在WEARの開発を行っています。
最近はWEARのコーディネート一覧やユーザー一覧など、リスト画面にバナー型の広告を実装をしました。

リストにデータを挿入する実装は簡単なように思えますが、種類の違うデータを扱う場合には、考慮するべきポイントがいくつかあります。
本記事ではリストに広告を表示することを例に、種類の違うデータをリストに挿入する際のデータの持ち方・実装ついて紹介したいと思います。

続きを読む

MIRU 2018本会議・若手プログラム参加報告

f:id:vasilyjp:20180927112732j:plain

こんにちは。スタートトゥデイ研究所の真木です。 8月5日から8月8日にかけて開催されたMIRU 2018という学会に行ってきました。また、5月下旬から約2か月間にわたって実施されてきた「MIRU若手プログラム」という 若手研究者同士 の交流プログラムにも参加してきたので、今回はその報告をします。

続きを読む

AWS CloudFormationをフル活用してAmazon RDS for MySQLからAmazon Auroraへ移行する

f:id:vasilyjp:20180927112728j:plain

こんにちは。新事業創造部インフラチームの内山(@k4ri474)です。

弊社が運営するIQONというサービスでは、長らくMySQLのバージョン5.6.27を利用していました。これは2018年9月にEOLを迎えるため、RDSの方針として強制アップグレードがアナウンスされています。
MySQLを継続する選択肢もありましたが、Auroraの運用知見が溜まっていたということもあり、これをキッカケにMySQLからのAurora移行を実施しました。

新事業創造部ではほぼ全てのAWSリソースをCloudFormationを使って宣言しているため、例に漏れずAuroraもテンプレートへ落とし込むことにしました。
ただ、CloudFormationだけで完結しない作業があったので、今回はCloudFormationでの宣言とコンソールからの手作業を織り交ぜるという対応を取っています。

執筆時点の公式ドキュメントではカバーできていない需要を満たすべく、CloudFormationを使ったAmazon Auroraへの移行の手順を皆さんに共有したいと思います。

続きを読む

Google Cloud Next '18 参加レポート

f:id:vasilyjp:20180927112730j:plain

こんにちは! スタートトゥデイテクノロジーズ新事業創造部の塩崎です。 2018年7月24日〜26日にかけてサンフランシスコでGoogle Cloud Next '18が開催されました。 このイベントに新事業創造部の塩崎、今村、そして代表取締役CIOの金山の3名で参加してきました。

この記事では多数あった講演の中で特に印象に残ったものをいくつか紹介いたします。

続きを読む

iOSアプリの多言語対応について

f:id:vasilyjp:20180927112725j:plain

こんにちは。新事業創造部の荒井です。 今回はiOSアプリの多言語対応について紹介します。

はじめに

私は今までいくつかのiOSアプリを運営してきましたが、どのアプリも日本語のみのサポートでした。現在関わっているWEARでは、すでに多言語対応が進められており、良い機会ですので個人的に知見がなかった多言語対応について調査をしました。今回は基本となる文字列の翻訳について触れていきたいと思います。

続きを読む

集合データを学習するモデルの紹介

f:id:vasilyjp:20180927112723j:plain

(Icon Credit *1

こんにちは。スタートトゥデイ研究所の後藤です。

今回は、集合を入力として扱うネットワークモデルの紹介をしたいと思います。機械学習の多くのモデルは、固定長の入出力や順序のある可変長の入出力を扱うように設計されます。画像データやテーブルデータは各サンプルの入出力の次元を合わせて学習しますし、自然言語処理のコーパスや時系列データは入出力の順序を保持して利用します。

その一方で、可変長で順序のない集合データを扱うモデルの研究は最近になって取り組み始められたばかりです。我々が研究しているファッションの領域において、入力データを集合として扱いたくなる状況がたびたびあるため、理解を深めておきたい問題設定です。

続きを読む

CodeDeployで構築するAutoScalingに追従可能なデプロイ環境

f:id:vasilyjp:20180927112721j:plain

こんにちは。新事業創造部インフラチームの光野(kotatsu360)です。

先日、VASILY時代1から長らく使われていたCapistranoによるデプロイを見直し、CodePipeline+CodeDeployによるデプロイフローを導入しました。

CodeDeployはEC2 AutoScalingとよく統合されており、この新しいデプロイフローによって最新のアプリケーションコードをどう反映するかという悩みから開放されました。この記事ではそのフローについて設計と運用を交えつつ紹介します。


  1. 株式会社スタートトゥデイテクノロジーズはスタートトゥデイ工務店 + VASILY + カラクルの3社を統合し発足されました

続きを読む

Lambda@EdgeでCloudFrontへのアクセスをいい感じに振り分ける

f:id:vasilyjp:20180927112718j:plain

(Icon Credit *1

こんにちは。PB開発部インフラチームの@inductorです。最近はすっかりインフラ勉強会というオンライン勉強会の運営が趣味になっています。

今回はLambda@EdgeというAWSのサービスを使って、CloudFrontへのアクセスを「細かいルール」を設定して振り分けてみたいと思います。

Lambda@Edgeについてもう詳しく知っているよ! という方は、次のセクションはスキップしてもらって構いません。もしよく知らないという方は、一緒に勉強してみましょう!

AWS Lambda@Edgeとは

Lambda@Edgeは以下の2つのサービスから成り立ちます。

  • AWS Lambda
  • Amazon CloudFront

CloudFrontのエッジロケーションにおいて、Lambdaで定義した任意のコードを実行できるというのがLambda@Edgeで、具体的には以下のような恩恵を得ることができます。

  • エッジロケーション(地域)に応じて表示させるコンテンツを変えたい
  • User-Agentなどに応じて取得するコンテンツを変えたい
  • アクセス元のIPアドレスによって表示させるコンテンツを変えたい
    • 開発環境に、アクセス元IPが自社でない場合のみBasic認証を導入したい
  • 実際に叩かれるURLと、S3などから実際に取得する資源のURIを変更したい

この他にも、CloudFrontから取得できるデータに応じて様々な対応を柔軟に行えるのが特徴です。

続きを読む
カテゴリー