OSS「Coppe」の公開 〜 BigQuery基盤のデータ監視ツールによるデータ品質担保

OGP-image

はじめに

こんにちは、データシステム部データ基盤ブロックの纐纈です。9月から22卒内定者として、チームにジョインしました。

本記事では、弊社のデータ基盤チームが抱えていた課題と、その解決のために公開したOSSツール「Coppe」を紹介します。Coppeは、以下のような方にお勧めできるツールです。

  • BigQueryを使用したデータ基盤の監視に興味がある
  • BigQueryの監視ツールとしてRedashを採用しているが、運用が面倒に感じている
  • インフラの設定なしにBigQueryの監視を行えるツールが欲しい

なお、本OSSはMonotaRO Tech Blogの記事「SQLを使った監視でデータ基盤の品質を向上させる」で紹介されていた仕組みを参考にし、より柔軟に監視項目を設定できるように新規開発しています。

OSSとして公開しているため、本記事と併せてご覧ください。

github.com

開発の経緯

現在、ZOZOはデータ基盤としてBigQueryを採用しています。そこには、オンプレやAWS、アプリケーションのログなど、あらゆるデータを集めており、タイミングも日次収集のものや、リアルタイム収集のものが存在します。その収集時に、遅延やオペレーションミス、意図しないデータの肥大化により、データ品質が下がってしまうことがあります。

その結果、データ基盤を利用した関連サービスに最新の正しい情報を反映できなくなってしまいます。そうなってしまうと、ZOZOが提供するサービスを利用するユーザーに、直接的な影響を与えてしまう可能性もあります。そのため、データの品質劣化には、いち早く気づき、対応する必要があります。

その対応策として、現在はRedashを使用しています。Redashは、SQLの分析結果をダッシュボードに可視化するOSSのBIツールです。これを利用し、BigQueryに定期的な監視クエリを実行し、その結果が期待値から外れる場合には、Slack通知で検知できるようにしています。一見すると、Redashで事足りているように見えますが、監視ツールとしては物足りない部分もあります。

redash.io

1点目の課題は、Redash自体をホスティングするためにWebサーバーやデータベース、Redisなどを自前で用意する必要がある点です。これは導入時に手間がかかるだけでなく、用意した環境の1つに障害が起きた際には、データ品質の監視ができなくなるという欠点があります。加えて、障害が発生したサーバーやサービスを立ち上げ直すのに手間と時間を要する点も懸念点です。

techblog.zozo.com

また、いつ誰によってどんな目的でその監視項目を追加したのかといったことが不明瞭になったり、他チームからの監視項目の追加の要請をRedashを管理する弊チーム以外ができなかったりという課題点もあります。

そこで、Redashよりも気軽に運用が可能で、監視項目の管理をGitHub上で行える監視ツールを開発することにしました。

Coppeの機能

監視ツールCoppe(以下、Coppe)は、BigQueryへの定期的な監視を実施します。また、非機能要件として、以下の点を目的にしています。

  • 障害発生時に、可能な限り自動再生できるインフラ構成
  • 導入時のセットアップや監視項目の追加を気軽に行える仕様

なお、「Coppe」という名前は蜘蛛から着想を得ています。監視項目を「蜘蛛の巣」と見立て、エラーを検知したらすぐに検知して動き出すイメージで名付けました。「Coppe」は英語で昔使われていた蜘蛛を意味する単語です。私は虫が苦手なため、「Spider」のような蜘蛛を直接連想しやすい名前ではないので、この名前を気に入っています。

Coppeは、監視項目をYAMLとSQLで指定することで、指定されたスケジュールに沿ってBigQueryへの定期的なチェックを実行し、データ品質の監視を行います。監視項目が検知された場合には、Slackにアラートメッセージを通知します。アラートメッセージは、監視項目ごとにクエリの実行結果などを設定可能です。また、監視項目の追加は、YAMLとSQLで記述してGitHubにプッシュすると、GitHub ActionsによってGCPに自動デプロイされます。インフラのセットアップも、GitHub ActionsからTerraformを利用して、必要な環境を自動的にセットアップします。詳しいインフラ構成は後述します。

次に、Coppeの監視項目をサンプルを用いて説明します。

監視項目の追加は、以下のようなフォーマットでYAMLファイルに記述します。

- schedule: "*/5 * * * *"
  sql: SELECT COUNT(*) AS error_log_cnt FROM `project.schema.table` WHERE ...
  expect:
    row_count: 0
  description: 直近5分の間にエラーログを検知しました。

上記の例で示したパターン以外にも、様々なオプションを用意しています。基本となる設定項目は以下の4つです。

  • 監視スケジュール
    • crontab形式による指定
  • BigQueryで実行するクエリ
  • 期待するクエリ結果
  • アラートメッセージの内容

さらに複雑な監視項目を設定したい場合、以下のようなフォーマットにも対応しています。

- schedule: "0 * * * SUN,TUE,WED,THU,FRI"
  sql_file: streaming-datatransfer-delay-alert.sql
  matrix:
    env: [stg, prd]
  params:
    interval_minute: 5 
  expect:
    row_count: 0
  description: |
    次のテーブルで5分以上の遅延が発生しています
    {{ range . }}
    {{ .table_name }} : {{ .cnt }} (cnt) : {{ .delay_avg }} (delay_avg) : {{ .delay_max }} (delay_max)
    {{ end }}
# streaming-datatransfer-delay-alert.sql

SELECT
  label,
  table_name,
  COUNT(*) AS cnt,
  AVG(diff) AS delay_avg,
  MAX(diff) AS delay_max
FROM (
  SELECT
    table_name,
    changetrack_start_time,
    bigquery_insert_time,
    TIMESTAMP_DIFF(bigquery_insert_time, changetrack_start_time, SECOND) AS diff,
    CASE
      WHEN TIMESTAMP_DIFF(bigquery_insert_time, changetrack_start_time, SECOND) > 600 THEN 1
    ELSE
    0
  END
    AS label
  FROM
    `streaming-datatransfer-{{.env}}.streaming_datatransfer.streaming_changetracktransfer_T*`
  WHERE
    bigquery_insert_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {{.interval_minute}} MINUTE))
WHERE
  label = 1
GROUP BY
  label,
  table_name

上記の例は、以下の設定をしています。

  • 日曜日と火〜金曜日に、指定のSQLファイルに記載されたクエリを毎時実行する
  • streaming-datatransfer-stgstreaming-datatransfer-prd のプロジェクトで実行する
  • クエリ結果を評価し、期待されている結果と照らし合わせる
  • アラートメッセージにクエリ結果を展開し、パースした状態で通知する

監視項目のフィールドや書式について、詳しく説明します。

スケジュール

schedule: "* * * * *"

監視のスケジュールは、crontab形式を採用しています。そのため、crontab同様に以下の順で指定します。

  • 曜日

書式に従い、以下のように使用します。

  • 毎分: "* * * * *"
  • 10分おき: "*/10 * * * *"
  • 毎時0分: "0 * * * *"
  • 毎日0時: "0 0 * * *"
  • 毎週月〜金の18時: "* 18 * * MON,TUE,WED,THU,FRI"

クエリ

BigQueryで実行するクエリは、 直接SQLを記載する方法と、SQLを記載した別ファイルへの相対パスを指定する方法があります。

sql: に直接SQLを記載する場合は、以下のようにYAML内にSQLを埋め込みます。

sql: SELECT * FROM ...

また、SQLを記載した別ファイルの相対パスを指定する場合は、sql_file: に記載します。

sql_file: some_dir/file_name.sql

他にも、テキストテンプレートの書式を使用し、以下のように params: を使用してSQL内にパラメータを入れることもできます。さらに、1つの監視項目を複数の組み合わせのパラメータに対して実行したい場合は、GitHub Actionsのマトリックスの仕様と同様に、matrix: に配列を指定することも可能です。

sql: SELECT * FROM `sample-{{ .env }}-svc.some_schema.some_table_{{ .platform }}` WHERE timestamp > {{ .since }}
params:
  since: TIMESTAMP_SUB(CURRENT_TIMESTAMP, INTERVAL 1 HOUR)
matrix: 
  env: [dev, stg, prd]
  platform: [android, ios]

期待するクエリ結果

監視対象となるクエリの期待値は、以下のように指定します。クエリ結果の列数を row_count: に指定したり、クエリ結果を使用した式 expression:を指定可能です。なお、式を利用することで、より複雑な監視条件の設定ができます。

expectation:
  row_count: 0
expectation:
  expression: column_name_1 == "foo" && column_name_2 == 0

アラートメッセージ

監視対象に指定したクエリの期待値から外れた結果を得た場合、Slackチャンネルにアラートを通知します。そのアラートの通知内容は description: にて指定可能です。

なお、アラートメッセージには、以下のように、クエリ結果やSQLで使用した params:matrix: の値を展開して利用可能です。

# サンプルの監視項目
- schedule: "0 * * * *"
  sql: SELECT column_1, column_2 FROM `sample-{{ .env }}-svc.some_schema.some_table_{{ .platform }}` WHERE timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP, INTERVAL {{ .since_hour }} HOUR)
  params:
    since_hour: 1
  matrix:
    env: [prd, stg, dev]
    platform: [android, browser, ios, server]
  expect:
    row_count: 0
  description: |
    `sample-{{ .matrix.env }}-svc.some_schema.some_table_{{ .matrix.platform }}`にて、{{ .params.since_hour }}時間前から現在までの間に、以下のレコードが検出されました
    
    クエリ結果
    {{ range .query_result }}
    - column_1: {{ .column_1 }}, column_2: {{ .column_2 }}
    {{ end }}

上記の設定をした場合、実際には以下のようなアラートメッセージが生成されます。

# 実際のアラートメッセージ
`sample-prd-svc.some_schema.some_table_ios`にて、1時間前から現在までの間に、以下のレコードが検出されました

クエリ結果
- column_1: aa, column_2: bb
- column_1: ab, column_2: bc

Coppeのインフラ構成

本章では、Coppeのインフラ構成と、その選定理由を説明します。

Coppeのインフラ構成

Coppeのインフラ構成は、上図に示す通りです。Cloud Functionsをデプロイ先として、Pub/SubやCloud Schedulerを使用します。また、上図では省略していますが、Cloud Functions自体を監視するために、Cloud Monitoringも使用しています。

具体的な仕組みを説明します。Cloud Schedulerにより、毎分の間隔でPub/Subを介して上図左側のCloud Functionsを起動します。ここでは、YAMLファイルを元にスケジュールを実行するか判断し、SQLファイルのパースを行った上で、上図右側のPub/Subに監視項目のデータを渡します。そして、上図右側のCloud Functionsは、Pub/Subから監視項目を受け取り、BigQueryに問い合わせ、その結果が期待される値と等しいかどうかを確認します。期待される値と異なった場合は、指定したSlackチャンネルに通知します。

上記のインフラ構成は、前述のRedash運用の課題も考慮し、以下の選定基準で策定しました。

  • 基準1:運用の手間が可能な限り不要である
  • 基準2:費用が可能な限り抑えられる

実際に、デプロイ環境の候補に挙がったのは、以下の5つでした。なお、クエリの実行先がBigQueryということもあり、今回のインフラ選定ではGCP環境のみを検証対象にしています。

  • Cloud Run
  • App Engine
  • Cloud Functions
  • Compute Engine
  • Kubernetes Engine

この5つの候補から、上記の2つの基準から選定していきます。アプリケーションだけでなく、インフラ面の運用が必要となるCompute Engine、クラスター自体に固定費がかかるKubernetesは基準から外れるため、候補から除外されました。アプリケーションであれば、BigQueryから取得した内容の計算が必要となる場合が多いです。しかし、Coppeでは、ほとんど計算力や拡張性を必要としておらず、2つの基準にある運用面と金銭面で、費用に見合わないため、この判断をしました。

また、Cloud Runも別途コンテナを用意する必要があるため、運用面の基準により候補から除外しました。Cloud Runはコンテナを使うため、Googleがサポートする言語以外も利用可能になるメリットがあります。しかし、今回のアプリケーションの要件では、特定の言語に依存する必要性もなく、Googleがサポートしている言語で十分に開発可能でした。そのため、このメリットの恩恵は受けられないと判断しました。

ここまでの検討の結果で、App EngineとCloud Functionsの2択に絞られました。どちらも、機能面ではCoppeで実現させたいことが可能であり、費用も少額、そして運用の手間も少なくて済む特徴を持っています。

しかし、App Engineは、プロジェクト毎に1つのアプリケーションしかデプロイできない条件があります。これは、「既にApp Engineを使っているプロジェクトでは、Coppeを使うことができない」という制約を発生させます。そのため、最終的にCloud Functionsを選定しました。Cloud Functionsは、ファンアウトパターンを容易に実現でき、コードがシンプルに書けるといった利点も持っています。

また、Coppeに必要なインフラ構成はTerraformを使って管理しています。初回に限り、以下の処理が必要ですが、それ以降はGitHub Actionsによって自動デプロイされる仕組みです。

  • SlackのWebhook URLとGCPのプロジェクト名を環境変数用のファイルに書き込む
  • Slackへの通知をCloud Console上で許可する
  • Terraformによるインフラ構築の下準備に必要なスクリプトを実行する

まとめ

本記事では、データ品質担保のためのBigQuery基盤のデータ監視ツールである「Coppe」を紹介しました。Coppeの開発により、YAMLやSQLを使った監視項目の追加が可能になり、複数の環境を横断する設定も容易に実現可能になりました。その結果、BigQueryのデータに異常がないか、容易に定期的なチェックができるようになりました。

運用を開始してから、まだ日は浅いですが、Redashを活用していた時と同様の監視をCoppeで実現できています。本記事を読んで、使ってみたいと思った方は、是非使ってみてください。IssueやPull Requestもお待ちしております。

ZOZOではデータ基盤のガバナンスを強化し、利用者にとって安全安心なデータ基盤を整備していく仲間を募集しています。ご興味のある方は、以下のリンクからご応募ください。

hrmos.co

カテゴリー