OSS「Coppe」の公開 〜 BigQuery基盤のデータ監視ツールによるデータ品質担保

OGP-image

はじめに

こんにちは、データシステム部データ基盤ブロックの纐纈です。9月から22卒内定者として、チームにジョインしました。

本記事では、弊社のデータ基盤チームが抱えていた課題と、その解決のために公開したOSSツール「Coppe」を紹介します。Coppeは、以下のような方にお勧めできるツールです。

  • BigQueryを使用したデータ基盤の監視に興味がある
  • BigQueryの監視ツールとしてRedashを採用しているが、運用が面倒に感じている
  • インフラの設定なしにBigQueryの監視を行えるツールが欲しい

なお、本OSSはMonotaRO Tech Blogの記事「SQLを使った監視でデータ基盤の品質を向上させる」で紹介されていた仕組みを参考にし、より柔軟に監視項目を設定できるように新規開発しています。

OSSとして公開しているため、本記事と併せてご覧ください。

github.com

開発の経緯

現在、ZOZOはデータ基盤としてBigQueryを採用しています。そこには、オンプレやAWS、アプリケーションのログなど、あらゆるデータを集めており、タイミングも日次収集のものや、リアルタイム収集のものが存在します。その収集時に、遅延やオペレーションミス、意図しないデータの肥大化により、データ品質が下がってしまうことがあります。

その結果、データ基盤を利用した関連サービスに最新の正しい情報を反映できなくなってしまいます。そうなってしまうと、ZOZOが提供するサービスを利用するユーザーに、直接的な影響を与えてしまう可能性もあります。そのため、データの品質劣化には、いち早く気づき、対応する必要があります。

その対応策として、現在はRedashを使用しています。Redashは、SQLの分析結果をダッシュボードに可視化するOSSのBIツールです。これを利用し、BigQueryに定期的な監視クエリを実行し、その結果が期待値から外れる場合には、Slack通知で検知できるようにしています。一見すると、Redashで事足りているように見えますが、監視ツールとしては物足りない部分もあります。

redash.io

1点目の課題は、Redash自体をホスティングするためにWebサーバーやデータベース、Redisなどを自前で用意する必要がある点です。これは導入時に手間がかかるだけでなく、用意した環境の1つに障害が起きた際には、データ品質の監視ができなくなるという欠点があります。加えて、障害が発生したサーバーやサービスを立ち上げ直すのに手間と時間を要する点も懸念点です。

techblog.zozo.com

また、いつ誰によってどんな目的でその監視項目を追加したのかといったことが不明瞭になったり、他チームからの監視項目の追加の要請をRedashを管理する弊チーム以外ができなかったりという課題点もあります。

そこで、Redashよりも気軽に運用が可能で、監視項目の管理をGitHub上で行える監視ツールを開発することにしました。

Coppeの機能

監視ツールCoppe(以下、Coppe)は、BigQueryへの定期的な監視を実施します。また、非機能要件として、以下の点を目的にしています。

  • 障害発生時に、可能な限り自動再生できるインフラ構成
  • 導入時のセットアップや監視項目の追加を気軽に行える仕様

なお、「Coppe」という名前は蜘蛛から着想を得ています。監視項目を「蜘蛛の巣」と見立て、エラーを検知したらすぐに検知して動き出すイメージで名付けました。「Coppe」は英語で昔使われていた蜘蛛を意味する単語です。私は虫が苦手なため、「Spider」のような蜘蛛を直接連想しやすい名前ではないので、この名前を気に入っています。

Coppeは、監視項目をYAMLとSQLで指定することで、指定されたスケジュールに沿ってBigQueryへの定期的なチェックを実行し、データ品質の監視を行います。監視項目が検知された場合には、Slackにアラートメッセージを通知します。アラートメッセージは、監視項目ごとにクエリの実行結果などを設定可能です。また、監視項目の追加は、YAMLとSQLで記述してGitHubにプッシュすると、GitHub ActionsによってGCPに自動デプロイされます。インフラのセットアップも、GitHub ActionsからTerraformを利用して、必要な環境を自動的にセットアップします。詳しいインフラ構成は後述します。

次に、Coppeの監視項目をサンプルを用いて説明します。

監視項目の追加は、以下のようなフォーマットでYAMLファイルに記述します。

- schedule: "*/5 * * * *"
  sql: SELECT COUNT(*) AS error_log_cnt FROM `project.schema.table` WHERE ...
  expect:
    row_count: 0
  description: 直近5分の間にエラーログを検知しました。

上記の例で示したパターン以外にも、様々なオプションを用意しています。基本となる設定項目は以下の4つです。

  • 監視スケジュール
    • crontab形式による指定
  • BigQueryで実行するクエリ
  • 期待するクエリ結果
  • アラートメッセージの内容

さらに複雑な監視項目を設定したい場合、以下のようなフォーマットにも対応しています。

- schedule: "0 * * * SUN,TUE,WED,THU,FRI"
  sql_file: streaming-datatransfer-delay-alert.sql
  matrix:
    env: [stg, prd]
  params:
    interval_minute: 5 
  expect:
    row_count: 0
  description: |
    次のテーブルで5分以上の遅延が発生しています
    {{ range . }}
    {{ .table_name }} : {{ .cnt }} (cnt) : {{ .delay_avg }} (delay_avg) : {{ .delay_max }} (delay_max)
    {{ end }}
# streaming-datatransfer-delay-alert.sql

SELECT
  label,
  table_name,
  COUNT(*) AS cnt,
  AVG(diff) AS delay_avg,
  MAX(diff) AS delay_max
FROM (
  SELECT
    table_name,
    changetrack_start_time,
    bigquery_insert_time,
    TIMESTAMP_DIFF(bigquery_insert_time, changetrack_start_time, SECOND) AS diff,
    CASE
      WHEN TIMESTAMP_DIFF(bigquery_insert_time, changetrack_start_time, SECOND) > 600 THEN 1
    ELSE
    0
  END
    AS label
  FROM
    `streaming-datatransfer-{{.env}}.streaming_datatransfer.streaming_changetracktransfer_T*`
  WHERE
    bigquery_insert_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {{.interval_minute}} MINUTE))
WHERE
  label = 1
GROUP BY
  label,
  table_name

上記の例は、以下の設定をしています。

  • 日曜日と火〜金曜日に、指定のSQLファイルに記載されたクエリを毎時実行する
  • streaming-datatransfer-stgstreaming-datatransfer-prd のプロジェクトで実行する
  • クエリ結果を評価し、期待されている結果と照らし合わせる
  • アラートメッセージにクエリ結果を展開し、パースした状態で通知する

監視項目のフィールドや書式について、詳しく説明します。

スケジュール

schedule: "* * * * *"

監視のスケジュールは、crontab形式を採用しています。そのため、crontab同様に以下の順で指定します。

  • 曜日

書式に従い、以下のように使用します。

  • 毎分: "* * * * *"
  • 10分おき: "*/10 * * * *"
  • 毎時0分: "0 * * * *"
  • 毎日0時: "0 0 * * *"
  • 毎週月〜金の18時: "* 18 * * MON,TUE,WED,THU,FRI"

クエリ

BigQueryで実行するクエリは、 直接SQLを記載する方法と、SQLを記載した別ファイルへの相対パスを指定する方法があります。

sql: に直接SQLを記載する場合は、以下のようにYAML内にSQLを埋め込みます。

sql: SELECT * FROM ...

また、SQLを記載した別ファイルの相対パスを指定する場合は、sql_file: に記載します。

sql_file: some_dir/file_name.sql

他にも、テキストテンプレートの書式を使用し、以下のように params: を使用してSQL内にパラメータを入れることもできます。さらに、1つの監視項目を複数の組み合わせのパラメータに対して実行したい場合は、GitHub Actionsのマトリックスの仕様と同様に、matrix: に配列を指定することも可能です。

sql: SELECT * FROM `sample-{{ .env }}-svc.some_schema.some_table_{{ .platform }}` WHERE timestamp > {{ .since }}
params:
  since: TIMESTAMP_SUB(CURRENT_TIMESTAMP, INTERVAL 1 HOUR)
matrix: 
  env: [dev, stg, prd]
  platform: [android, ios]

期待するクエリ結果

監視対象となるクエリの期待値は、以下のように指定します。クエリ結果の列数を row_count: に指定したり、クエリ結果を使用した式 expression:を指定可能です。なお、式を利用することで、より複雑な監視条件の設定ができます。

expectation:
  row_count: 0
expectation:
  expression: column_name_1 == "foo" && column_name_2 == 0

アラートメッセージ

監視対象に指定したクエリの期待値から外れた結果を得た場合、Slackチャンネルにアラートを通知します。そのアラートの通知内容は description: にて指定可能です。

なお、アラートメッセージには、以下のように、クエリ結果やSQLで使用した params:matrix: の値を展開して利用可能です。

# サンプルの監視項目
- schedule: "0 * * * *"
  sql: SELECT column_1, column_2 FROM `sample-{{ .env }}-svc.some_schema.some_table_{{ .platform }}` WHERE timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP, INTERVAL {{ .since_hour }} HOUR)
  params:
    since_hour: 1
  matrix:
    env: [prd, stg, dev]
    platform: [android, browser, ios, server]
  expect:
    row_count: 0
  description: |
    `sample-{{ .matrix.env }}-svc.some_schema.some_table_{{ .matrix.platform }}`にて、{{ .params.since_hour }}時間前から現在までの間に、以下のレコードが検出されました
    
    クエリ結果
    {{ range .query_result }}
    - column_1: {{ .column_1 }}, column_2: {{ .column_2 }}
    {{ end }}

上記の設定をした場合、実際には以下のようなアラートメッセージが生成されます。

# 実際のアラートメッセージ
`sample-prd-svc.some_schema.some_table_ios`にて、1時間前から現在までの間に、以下のレコードが検出されました

クエリ結果
- column_1: aa, column_2: bb
- column_1: ab, column_2: bc

Coppeのインフラ構成

本章では、Coppeのインフラ構成と、その選定理由を説明します。

Coppeのインフラ構成

Coppeのインフラ構成は、上図に示す通りです。Cloud Functionsをデプロイ先として、Pub/SubやCloud Schedulerを使用します。また、上図では省略していますが、Cloud Functions自体を監視するために、Cloud Monitoringも使用しています。

具体的な仕組みを説明します。Cloud Schedulerにより、毎分の間隔でPub/Subを介して上図左側のCloud Functionsを起動します。ここでは、YAMLファイルを元にスケジュールを実行するか判断し、SQLファイルのパースを行った上で、上図右側のPub/Subに監視項目のデータを渡します。そして、上図右側のCloud Functionsは、Pub/Subから監視項目を受け取り、BigQueryに問い合わせ、その結果が期待される値と等しいかどうかを確認します。期待される値と異なった場合は、指定したSlackチャンネルに通知します。

上記のインフラ構成は、前述のRedash運用の課題も考慮し、以下の選定基準で策定しました。

  • 基準1:運用の手間が可能な限り不要である
  • 基準2:費用が可能な限り抑えられる

実際に、デプロイ環境の候補に挙がったのは、以下の5つでした。なお、クエリの実行先がBigQueryということもあり、今回のインフラ選定ではGCP環境のみを検証対象にしています。

  • Cloud Run
  • App Engine
  • Cloud Functions
  • Compute Engine
  • Kubernetes Engine

この5つの候補から、上記の2つの基準から選定していきます。アプリケーションだけでなく、インフラ面の運用が必要となるCompute Engine、クラスター自体に固定費がかかるKubernetesは基準から外れるため、候補から除外されました。アプリケーションであれば、BigQueryから取得した内容の計算が必要となる場合が多いです。しかし、Coppeでは、ほとんど計算力や拡張性を必要としておらず、2つの基準にある運用面と金銭面で、費用に見合わないため、この判断をしました。

また、Cloud Runも別途コンテナを用意する必要があるため、運用面の基準により候補から除外しました。Cloud Runはコンテナを使うため、Googleがサポートする言語以外も利用可能になるメリットがあります。しかし、今回のアプリケーションの要件では、特定の言語に依存する必要性もなく、Googleがサポートしている言語で十分に開発可能でした。そのため、このメリットの恩恵は受けられないと判断しました。

ここまでの検討の結果で、App EngineとCloud Functionsの2択に絞られました。どちらも、機能面ではCoppeで実現させたいことが可能であり、費用も少額、そして運用の手間も少なくて済む特徴を持っています。

しかし、App Engineは、プロジェクト毎に1つのアプリケーションしかデプロイできない条件があります。これは、「既にApp Engineを使っているプロジェクトでは、Coppeを使うことができない」という制約を発生させます。そのため、最終的にCloud Functionsを選定しました。Cloud Functionsは、ファンアウトパターンを容易に実現でき、コードがシンプルに書けるといった利点も持っています。

また、Coppeに必要なインフラ構成はTerraformを使って管理しています。初回に限り、以下の処理が必要ですが、それ以降はGitHub Actionsによって自動デプロイされる仕組みです。

  • SlackのWebhook URLとGCPのプロジェクト名を環境変数用のファイルに書き込む
  • Slackへの通知をCloud Console上で許可する
  • Terraformによるインフラ構築の下準備に必要なスクリプトを実行する

まとめ

本記事では、データ品質担保のためのBigQuery基盤のデータ監視ツールである「Coppe」を紹介しました。Coppeの開発により、YAMLやSQLを使った監視項目の追加が可能になり、複数の環境を横断する設定も容易に実現可能になりました。その結果、BigQueryのデータに異常がないか、容易に定期的なチェックができるようになりました。

運用を開始してから、まだ日は浅いですが、Redashを活用していた時と同様の監視をCoppeで実現できています。本記事を読んで、使ってみたいと思った方は、是非使ってみてください。IssueやPull Requestもお待ちしております。

ZOZOではデータ基盤のガバナンスを強化し、利用者にとって安全安心なデータ基盤を整備していく仲間を募集しています。ご興味のある方は、以下のリンクからご応募ください。

hrmos.co

INFORMATION_SCHEMAを用いたBigQueryのストレージ無駄遣い調査

ogp

こんにちは、『地球の歩き方ムー』創刊のニュースに心を踊らせている、データ基盤ブロックの塩崎です。

本記事では、データ基盤の管理者としてBigQueryのストレージコストの削減に取り組んだ事例を紹介します。

BigQuery費用はクエリ費用だけではない

ZOZOのデータ基盤として利用されているBigQueryは、非常にパワフルなDWH(Data WareHouse)です。しかし、それ故に利用者の意図しないところで費用が高騰することもしばしば発生します。よく問題になるのはクエリ費用の高騰であり、以下のQiita記事はBigQuery利用者の中でも有名です。

qiita.com

このクエリ費用の高騰に対し、我々データ基盤ブロックはこれまでに、いくつもの方法で対処してきました。具体的な取り組みの一部は以下の記事で紹介しているので、併せてご覧ください。

techblog.zozo.com techblog.zozo.com

しかし、BigQueryの費用はクエリに関するもののみではありません。以下のドキュメントによると、BigQueryの費用はクエリに関する費用(Analytis)とストレージに関する費用(Storage)の2つがメインであることが分かります。

BigQuery pricing has two main components: Analysis pricing is the cost to process queries, including SQL queries, user-defined functions, scripts, and certain data manipulation language (DML) and data definition language (DDL) statements that scan tables. Storage pricing is the cost to store data that you load into BigQuery.

cloud.google.com

このストレージに関する費用は、USマルチリージョンの場合、1か月1GBあたり0.020 USDであり、90日間変更のないテーブルはその半額の0.010 USDに自動的に割引されます。

この単価は、Google Cloud StorageAmazon S3などと比較しても安価であり、BigQueryの導入初期はあまり気にならないことも多いです。しかし、BigQueryをデータ基盤として長年利用すると、徐々にストレージ利用量が増加することもしばしば発生します。

現在のZOZOのデータ基盤は約100のGCPプロジェクト、約1000のデータセット、数十万以上のテーブルにまたがる大規模なものへと成長しました。これらの全てのデータを1つのチームが把握することは非現実的であるため、各GCPプロジェクト毎に管理者を立て分割統治を行っています。そのため、全てのプロジェクトの中にある、全てのテーブルのデータサイズを一覧で表示して可視化を行うダッシュボードを作成しました。そして、そのダッシュボードに基づき、不必要にストレージコストが高騰している疑いのあるテーブルを洗い出しました。それらのテーブルの情報を個別に管理者に連絡することでコストの削減に成功しました。

以降で、その具体的な流れを説明していきます。

ストレージコストの可視化

本章では、ストレージ利用量の調査から、Data Studioで可視化するまでの流れを説明します。

ストレージ利用量の調査方法

はじめに、BigQueryのストレージ利用量をダンプして1つのテーブルに集約します。BigQueryのストレージ利用量は INFORMATION_SCHEMA.PARTITIONS に格納されているので、それを参照します。

cloud.google.com

このビューの STORAGE_TIER 列を参照すると ACTIVELONG_TERM かが分かり、1GBあたりの単価が分かります。しかし、今回は分かりやすさのために、この部分はあえて無視していることをご了承ください。全てのプロジェクトのPARTITIONSビューを一括で取得する方法があれば楽なのですが、現時点ではそのような仕組みがないため、分割して取得します。大量のテーブルの情報を分割して取得するにあたり、特に以下の2点に注意する必要があります。

  1. PARTITIONSビューから多くのテーブルの情報を取得するとエラーになる
  2. 1つのテーブルに対するINSERTは1日あたり1000回の上限がある

PARTITIONSビューから多くのテーブルの情報を取得するとエラーになる注意点

1点目は、PARTITIONSビューのドキュメントにも記載のない罠であり、特に注意が必要です。多くのテーブルを保持しているデータセットに対して無邪気に以下のようなクエリを実行するとエラーになります。

SELECT * FROM `project_id`.`dataset_name`.INFORMATION_SCHEMA.PARTITIONS
発生するエラー:
INFORMATION_SCHEMA.PARTITIONS query attempted to read too many tables.
Please add more restrictive filters.

このエラーが発生する閾値はドキュメントに記載がないため、正確な値は不明です。テーブルの数を変えながら実験した結果、テーブルの数が1000程度であればエラーが発生しないため、以下のようなWHERE句を使い参照するテーブルの数を限定するようにしました。

WHERE table_id IN (table_name1, table_name2, ..., table_name1000)

1つのテーブルに対するINSERTは1日あたり1000回の上限がある注意点

次に2つ目の注意点です。前述の通り、PARTITIONSビューからの情報取得は1000テーブル毎に分割されます。そのため、ストレージ容量をまとめるテーブルに対するINSERTの回数もそれに応じて増加します。

INSERT INTO bq_storage_stats
SELECT * FROM `project_id`.`dataset_name`.INFORMATION_SCHEMA.PARTITIONS
WHERE table_id IN (table_name1, table_name2, ..., table_name1000);

INSERT INTO bq_storage_stats
SELECT * FROM `project_id`.`dataset_name`.INFORMATION_SCHEMA.PARTITIONS
WHERE table_id IN (table_name1001, table_name1002, ..., table_name2000);

...

一方で、BigQueryは1つのテーブルに対するDML操作の上限が1日あたり1500回に設定されています。

cloud.google.com

そのため、多くのプロジェクト・データセットに関する情報を取得する際には、この上限に気をつける必要があります。我々の環境では上限に達してしまったため、Streaming Insertを行うことでエラーを回避しました。Streaming Insertの上限は先程のDMLの上限とは別であり、閾値がかなり大きいため回避策として利用できます。

cloud.google.com

複数のGCPプロジェクトのBigQueryのストレージ利用量を収集するスクリプトを以下に示します。

from google.cloud import bigquery
from itertools import zip_longest, groupby
import time
import string
import random
import concurrent.futures

# 集計対象のGCPプロジェクトIDの配列
project_ids = [
  'project_id1',
  'project_id2',
  ...
]

# ストレージ利用量を集約するテーブル
destination_table = 'project_id.dataset_id.table_name'

# Ref: https://docs.python.org/3/library/itertools.html#itertools-recipes
def grouper(iterable, n, fillvalue=None):
    args = [iter(iterable)] * n
    return zip_longest(*args, fillvalue=fillvalue)

def create_destination_table(client, destination_table):
    schema = [
        bigquery.SchemaField("project_id", "STRING", mode="NULLABLE"),
        bigquery.SchemaField("dataset_name", "STRING", mode="NULLABLE"),
        bigquery.SchemaField("table_name", "STRING", mode="NULLABLE"),
        bigquery.SchemaField("table_rows", "INTEGER", mode="NULLABLE"),
        bigquery.SchemaField("total_logical_bytes", "INTEGER", mode="NULLABLE"),
        bigquery.SchemaField("total_billable_bytes", "INTEGER", mode="NULLABLE"),
    ]
    table = bigquery.Table(destination_table, schema=schema)
    client.create_table(table)

def get_dataset_names(client, project_id):
    query = f"SELECT SCHEMA_NAME AS dataset_name FROM `{project_id}`.`region-us`.INFORMATION_SCHEMA.SCHEMATA ORDER BY SCHEMA_NAME ASC"
    rows = client.query(query)

    return [row['dataset_name'] for row in rows]

def get_table_names(client, project_id):
    query = f"SELECT table_schema AS dataset_name, table_name from `{project_id}`.`region-us`.INFORMATION_SCHEMA.TABLES ORDER BY table_schema ASC, table_name ASC"
    rows = client.query(query)
    return rows

def generate_table_size_job(client, project_id, dataset_name, table_names, destination_table):
    table_names_str = ",".join(['"' + t + '"' for t in table_names])
    query = f"""
        SELECT
            table_catalog AS project_id,
            table_schema AS dataset_name,
            table_name,
            sum(total_rows) AS total_rows,
            sum(total_logical_bytes) AS total_logical_bytes,
            sum(total_billable_bytes) AS total_billable_bytes
        FROM `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.PARTITIONS
        WHERE table_name IN ({table_names_str})
        GROUP BY table_catalog, table_schema, table_name
    """

    return client.query(query, bigquery.job.QueryJobConfig(priority="BATCH"))

def retvieve_rows(query_job):
    exception = query_job.exception()
    if exception is not None:
        print(exception)
        print("Error occurred during the execution of the following query")
        print(query_job.query)
        raise exception

    results = []
    for row in query_job.result():
        results.append(row)

    return results

client = bigquery.Client()

temp_destination_table = destination_table + "_" + ''.join(random.choices(string.ascii_letters + string.digits, k=16))

print(f"Temp Table: {temp_destination_table}")

create_destination_table(client, temp_destination_table)
print(f"created {temp_destination_table}")

# 高速化のためにBigQueryへのJobを並列して投げる
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
    for project_id in project_ids:
        dataset_names = get_dataset_names(client, project_id)
        dataset_count = len(dataset_names)
        print(f"{dataset_count} dataset(s) found in {project_id}.")

        dataset_table_names = get_table_names(client, project_id)

        query_job_futures = []
        for dataset_name, rows in groupby(dataset_table_names, lambda r: r['dataset_name']):
            table_names = [row['table_name'] for row in rows]
            table_count = len(table_names)
            print(f"{table_count} table(s) found in {project_id}.{dataset_name}.")

            # 1000テーブル毎に分割してPARTITIONSビューにクエリを投げる
            for table_names_chunk in grouper(table_names, 1000):
                table_names_chunk = [t for t in table_names_chunk if t is not None]
                query_job = executor.submit(generate_table_size_job, client, project_id, dataset_name, table_names_chunk, temp_destination_table)
                query_job_futures.append(query_job)

        print(f"waiting for all query jobs have been created")
        concurrent.futures.wait(query_job_futures)

        query_jobs = [f.result() for f in query_job_futures]
        query_job_count = len(query_jobs)
        print(f"{query_job_count} query jobs has been created")

        while not all([q.done() for q in query_jobs]):
            print("waiting for all jobs completed")
            time.sleep(1)

        print(f"{query_job_count} query jobs has been completed")

        rows_futures = []
        for query_job in query_jobs:
            rows_future = executor.submit(retvieve_rows, query_job)
            rows_futures.append(rows_future)

        concurrent.futures.wait(rows_futures)
        results = []
        for rows_future in rows_futures:
            results += rows_future.result()

        result_count = len(results)
        print(f"{result_count} rows retvieved.")

        if results:
            for results_chunk in grouper(results, 1000):
                results_chunk = [r for r in results_chunk if r is not None]
                insert_errors = client.insert_rows(client.get_table(temp_destination_table), results_chunk)
                results_chunk_count = len(results_chunk)
                print(f"{results_chunk_count} rows inserted.")
                if insert_errors:
                    print("Error occured during inserting the following rows")
                    print(insert_errors)

        print(f"saved storage stats of {project_id}")

copy_job = client.copy_table(temp_destination_table, destination_table)
copy_job.result()
print("Copy Temp table to Destination table")

delete_job = client.delete_table(temp_destination_table, not_found_ok=True)
delete_job.result()
print("Delete Temp table")

Data Studioで可視化

上記の手順で、複数のGCPプロジェクト内のストレージ利用量を1つのテーブルに集約しました。次に、この情報を可視化し、大量のストレージを利用しているGCPプロジェクト・データセット・テーブルを見つけていきます。可視化には、Google Data Studioを利用します。

datastudio.google.com

完成したダッシュボードを以下に示します。画面上部のフィルターでプロジェクト・データセットを絞り込み、その中でストレージ利用量の多いTOP 5のデータセット・テーブルを確認できるようにしました。

BigQueryストレージダッシュボード

これは余談ですが、せっかくなので下図のようなバブルチャートを利用し、「見た目がカッコ良いダッシュボード」を作ろうともしました。しかし、上図の表形式のダッシュボードの方が役に立ちました。「見た目がカッコ良いダッシュボード」が必ずしも実用的だとは言えないことを実感しました。

BigQueryストレージバブル

テーブル利用状況の調査

ストレージ利用量の大きいテーブルが発見できたので、次に利用状況を調査します。ストレージを大量に利用していたとしても、それが利用されているテーブルであれば無闇に消すことはできません。そのために、テーブルが最近どの程度参照されたのかを確認します。

INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION ビューを用いると、特定のテーブルに対して実行されたクエリを確認できます。なお、JOBS_BY_ORGANIZATION ビューには実行されたSQL文が格納されていないので、必要に応じてJOBS_BY_PROJECT ビューも併用してクエリの利用状況を確認します。そして、BigQueryのストレージを多く消費しており、かつ最近の利用実績が乏しいテーブルを「無駄遣い疑惑」のテーブルとしてリストアップしていきます。

SELECT
  job_id,
  creation_time,
  project_id,
  user_email,
  job_type,
  statement_type,
  destination_table,
  referenced_tables
FROM
  `region-us`.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION
WHERE
  creation_time > TIMESTAMP('2021-09-01')
  AND (
    SELECT
      LOGICAL_OR(rt.dataset_id = "データセット名" AND rt.table_id = "テーブル名")
    FROM UNNEST(referenced_tables) AS rt
  )
ORDER BY creation_time DESC

担当者への対応依頼

最後に「無駄遣い疑惑」テーブルの削除を前提とした対応を担当者に依頼しました。その際には、テーブル名・ストレージ利用量だけではなく、以下の情報も併せて伝えました。それにより、迅速に対処をしてもらうように心がけました。

  • 1年あたりのストレージコスト見積もり
  • 最近数カ月間でそのテーブルに実行されたクエリ
  • 実は必要だということが後から判明しても、削除から7日以内であれば復元可能であること

巨大テーブル棚卸し表

関係部署の協力も得られ、結果として合計で約1000TBの無駄遣いテーブルを削除できました。再発防止策として、アドホック分析の中間結果を配置する場合には、データセット毎にテーブルのデフォルト有効期間を設定するように働きかけました。

cloud.google.com

まとめ

BigQueryはコンピュートだけではなく、ストレージも非常にパワフルなDWHです。そのため、利用者の意図しないところでストレージ費用が高騰する恐れもあります。ZOZOのデータ基盤は多くの部署が利用しているため、それぞれの利用者の努力に依存するだけでは発生を抑制することは困難です。そのため、 INFORMATION_SCHEMA というBigQueryに備わっている仕組みを活用することで、横断的かつ効率的に費用を無駄遣いしているテーブルの発見・削除をしました。

ZOZOではデータ基盤のガバナンスを強化し、利用者にとって安全安心なデータ基盤を整備していく仲間を募集中です。ご興味のある方は以下のリンクからご応募ください。

hrmos.co

コンピュートとストレージの分離から紐解くBigQueryの権限モデル

OGP

こんにちは。SRE部DATA-SREチームの塩崎です。Software Design誌の2021年9月号に弊社でのBigQuery活用事例を寄稿しましたので、書店などで見かけた際は購入していただけますと嬉しいです。

gihyo.jp

さて、BigQueryはコンピュートとストレージを分離することで高いスケーラビリティを達成しているData WareHouse(DWH)です。しかし、そのアーキテクチャを採用したがゆえに権限モデルが複雑化し、初心者にとって理解の難しい挙動をすることもあります。この記事ではBigQueryの権限モデルをコンピュートとストレージの分離という観点から紐解きます。

なお、記事中に記載している費用は全てUS Multi Regionにおけるものです。asia-northeast-1 Resion(東京)とは異なりますので、ご注意ください。

よくあるエラーとそこから湧く疑問

BigQueryを使い始めた人が高確率で遭遇する問題として、「BigQuery Data Viewerロールを割り当てたのにも関わらずSELECT文が実行できない」というものがあります。MySQLなどのRDBにおけるGRANT SELECT ON 〜と同じような感覚で権限を割り当てると発生しやすい問題です。

権限エラー

このケースでは、上記の権限に加えてBigQuery Job User権限の付与で問題なくSELECTが実行できます。ここで以下の疑問が浮かびます。

  • なぜ片方の権限だけではエラーになってしまうのか
  • これらの権限はセットで使うことが必須なのだろうか
  • 片方の権限のみで問題ないケースはどのような時だろうか

cloud.google.com

以降ではBigQueryのアーキテクチャに触れながらこれらの疑問に答えていきます。

BigQueryのアーキテクチャについて

BigQueryのアーキテクチャとして特徴的なものは、下図に示すコンピュートとストレージの分離です。それぞれが独立してスケールすることで、高いスケーラビリティが実現されています。

実は権限を考えるときにもそれらが分離されているということを念頭に置くと理解しやすいです。そのため、ここからはBigQueryの権限をストレージに関するものとコンピュートに関するものに分けて解説していきます。

コンピュートとストレージの分離

cloud.google.com

Dremel: A Decade of Interactive SQL Analysis at Web Scale

ストレージに関する権限

まずはストレージに関する権限です。ストレージに関する読み出し権限があると、ストレージからデータを読み出してコンピュート部分に送ることができます。注意するところは、読みだしたデータを処理して返すのがコンピュート部分という点です。ストレージの権限だけではコンピュート部分を操作できずエラーになります。ストレージの権限のみを持っている場合の典型的なエラーメッセージは以下のものです。

Access Denied: Project XXX: User does not have bigquery.jobs.create permission in project XXX.

さきほど例に挙げたBigQuery Data Viewerロール1はストレージに関する権限のみを持っているため、これ単独では権限不足のエラーになっていました。

ストレージに関する権限

コンピュートに関する権限

次がコンピュートに関する権限です。この権限があると、ストレージ部分から送られてきたデータを処理して、SELECT文の実行結果を作ることができます。前述したストレージに関する権限と同様に、この権限だけを持っていても権限不足のエラーになります。コンピュートの権限のみを持っている場合の典型的なエラーメッセージは以下のものです。

Access Denied: Table XXX: User does not have permission to query table XXX.

BigQuery Job Userロール2が代表的なコンピュートに関する権限です。

コンピュートに関する権限

課金との兼ね合い

ここからは少し話題を変えて、課金モデルについて説明します。一見すると権限と課金は無関係なように見えますが、この後に説明するマルチテナント構成を考える上で、課金モデルを知っていると理解がスムーズになるためここで説明します。

ストレージに関する課金

まずはストレージに関する課金です。ストレージは従量課金制で、1GB毎、1か月毎に0.020USDの費用がかかります3。この費用はそのデータを保持しているプロジェクトが支払います4

cloud.google.com

コンピュートに関する課金

次にコンピュートに関する課金です。こちらも従量課金制で、1TBのデータをスキャンする毎に5USDの費用がかかります5。この費用はコンピュートリソースを保有しているプロジェクトが支払います。

文章だけですと、分かりづらいかもしれないので、具体例を出して説明します。

課金

ここでは、プロジェクトAが保有しているデータに対してSELECTすることを考えます。クエリを実行するユーザーは、プロジェクトAのストレージの権限と、複数プロジェクト(A, B)のコンピュートに関する権限を保持しているとします。この時、プロジェクトA側のコンピュートリソースでクエリを実行した場合(図中1の経路)は、プロジェクトAが費用を負担します。同様に、図中2の経路でクエリを実行した場合は、プロジェクトBが費用を負担します。

なお、複数のプロジェクトのコンピュートに関する権限を保持している場合は、どちらのコンピュートリソースを利用するかを選択できます。bqコマンドで実行する場合は --project_id オプションで指定できます。Webコンソールからの実行の場合は、画面上部の青いバーでプロジェクトを指定できます。

webコンソールからプロジェクトを指定

先ほど、BigQuery Job Userロールが代表的なコンピュートに関する権限と説明しましたが、正確には言葉足らずな表現です。正しくは、その権限に加えて、コンピュート部分で発生した費用をそのプロジェクトに対して請求する権限を持ったロールです。この事が、以降の事例にて重要になります。

事例紹介

ここからは、BigQuery利用の拡大に伴う権限管理について、具体的な例を使って紹介します。

使い始め:シンプルに1プロジェクトを管理する

まずは、一番シンプルに、プロジェクトが1つだけパターンです。BigQueryを使い始めた時点では、この構成になっていることが多いかと思います。この場合はBigQuery Data ViewerとBigQuery Job Userの両方のロールが必要です。

単一プロジェクト

規模拡大:複数部署のBQ利用を管理会計する

次に紹介するのはプロジェクトが複数あるパターンです。BigQueryの利用者が多くなり、複数部署がBigQueryを使用するようになりました。この時、組織によっては部署毎のBigQuery利用費を分離して管理したいかもしれません。このパターンでは権限付与の方法がやや難しいため、注意が必要です。利用者には、「データを保持しているプロジェクトすべて」のBigQuery Data Viewerロールと、「クエリを実行するプロジェクト」のBigQuery Job Userロールの両方を付与します。

複数プロジェクト

なお、図には載せませんでしたが、実際のケースでは部署横断的なプロジェクトも分離すると管理がしやすくなります。例えば、ETL用のプロジェクトや専用線・VPNなどのネットワークリソースをホスティングするプロジェクトなどがこれに該当します。

外部連携:社外へデータ提供する

BigQueryの利用が更に進むと、社外に対してデータを提供することがあるかもしれません。社外に対するデータ提供も上記の複数部署パターンとほぼ同じです。重要なのは、自社管理のプロジェクトに対するBigQuery Data Viewerロールのみを付与し、BigQuery Job User権限は付与しないという部分です。

社外へのデータ共有

社外のユーザーに対して、自社管理のプロジェクトに対するBigQuery Job Userロールを付与しないのは、いわゆる「タダ乗り」を防止するためです。仮にBigQuery Job Userロールを付与してしまうと、以下の図に示すように、社外ユーザーの保持しているデータを自分たちのコンピュートリソースで処理できてしまいます。この場合のコンピュートに関する費用は自社側に請求されるため、「タダ乗り」となります。

社外へのデータ共有(ミス)

厳格化:特定のデータセットのみの閲覧権限をつける

最後はアクセス権限の厳格化です。今まではプロジェクトレベルの権限を考えていましたが、BigQuery Data Viewerロールはリソースレベルでも付与できます。その場合も今までと同様に考えれば問題ありません。「閲覧をしたいデータセットのみ」にBigQuery Data Viewerロールを付与し、「プロジェクト全体」のBigQuery Job Userロールを付与すればクエリを実行できます。

データセット単位の権限付与

まとめ

BigQueryの権限について、ストレージとコンピュートの分離という観点から解説しました。一見すると不思議に見える権限セットも内部アーキテクチャから理解することで体系的に理解しやすくなります。

ZOZOテクノロジーズでは、一緒にサービスを作り上げてくれる仲間を募集中です。ご興味のある方は、以下のリンクからご応募ください!

tech.zozo.com


  1. より具体的にはbigquery.tables.getDataパーミッション

  2. より具体席にはbigquery.jobs.createパーミッション

  3. 90日間変更されていないデータはLongTerm Storageという区分に自動的に変更され、料金が半額の0.010USDになります。

  4. より正確にはそのプロジェクトに紐付いている請求アカウントが支払います。

  5. Reservation機能を使えば定額にもできます。

急成長するLINE配信対象ユーザー数にGCPアーキテクチャの改善で立ち向かった話

ogp_image

はじめに

こんにちは、EC基盤本部・MA部・MA基盤チームでマーケティングオートメーションのシステムを開発している長澤(@snagasawa_)です。この記事では、社内で運用しているLINEメッセージ配信基盤の課題を、アーキテクチャ改善によって解決した話をご紹介します。

続きを読む

Flex Slotsを用いたBigQueryのコストパフォーマンス改善と運用

ogp

こんにちは、SRE部の谷口(case-k)です。私たちのチームではデータ基盤の開発や運用をしています。1年ほど前からBigQueryのコストパフォーマンス改善を目的にFlex Slotsを導入しています。

本記事ではFlex Slotsの導入効果や運用における注意点、ワークフロー設計についてご紹介します。BigQueryのコストやパフォーマンスで課題を抱えているチームや管理業務を行っている方の参考になれば幸いです。

BigQuery Reservationsとは

Flex Slotsを紹介する前に、まずBigQueryの費用を管理するプラットフォームであるBigQuery Reservationsをご紹介します。

BigQuery ReservationsとはBigQueryの費用や組織・プロジェクトのワークロードを管理するプラットフォームです。

BigQueryの料金モデルには「オンデマンド料金モデル」と「定額料金モデル」の2種類あります。オンデマンド料金モデルはBigQueryのクエリスキャン量に基づいた料金モデルです。一方、定額料金モデルの場合は事前にBigQueryのコンピューティングリソースであるスロットを購入する料金モデルです。

デフォルトではオンデマンド料金モデルが適用されます。オンデマンド料金モデルでは、1プロジェクトあたり2000スロットまで保証されますが、2000スロット以上は保証されていません。そのため、BigQuery全体で空きがあれば2000以上も使えますが、なければ使えません。つまり、データ量が多くなるにつれ、BigQuery Reservationsを使って定額料金モデルにする方が、コストメリットやパフォーマンスの恩恵を受けやすいと言えます。

Maximum concurrent slots per project for on-demand pricing — 2,000

引用:Quotas and limits  |  BigQuery  |  Google Cloud

BigQuery Reservationsを使って定額料金モデルにするには「コミットメント」「予約」「割り当て」が必要です。この操作をすることで、スロットの購入からプロジェクトへの割り当てが可能となります。

各操作の関係は以下のようになっています。

  1. コミットメントでスロットを購入し、予約でプロジェクトに対して割り当てるスロットを決める
  2. 予約したスロットに対して割り当てを行い、プロジェクトを紐付ける
  3. 紐付いたプロジェクトで、予約で確保したスロットを利用できるようになる

image 引用:Workload management using Reservations  |  BigQuery  |  Google Cloud

続いて、各操作の詳細を説明します。

コミットメント

コミットメントではBigQueryのコンピューティングリソースであるスロットを購入します。スロットの購入には次の3つの方法があります。

  • 月次コミットメント
    • 30日単位でスロットを購入
    • 購入後30日間はキャンセルできない
  • 年次コミットメント
    • 365日単位でスロットを購入
    • 購入後365日間はキャンセルできない
  • Flex Slots
    • 60秒単位でスロットを購入
    • 購入から60秒後にはキャンセル可能なため、より柔軟にスロットを購入できる

スロットは100スロットから購入でき、100スロット単位で増やせます。なお、オンデマンド料金モデルは1プロジェクトあたり、2000スロットまでは保証されます。一方、定額料金モデルで2000スロット以下にするとパフォーマンスが落ちる可能性もあります。

BigQueryの利用状況によっては、2000スロット以上必要とすることはなく、その分コストを抑えたい場合もあるかと思います。その場合には、Cloud MonitoringやInformation Schemaを用いてBigQueryのスロット利用状況を確認することで、利用状況に応じた最適なスロット数を決めることができます。

image

Estimating how many slots to purchase

引用:Workload management using Reservations  |  BigQuery  |  Google Cloud

The minimum commitment size is 100 slots,

引用:Reservations details and limitations  |  BigQuery  |  Google Cloud

Note: On-demand pricing gives you access to 2,000 slots per Google Cloud project. With flat-rate pricing, you can commit to fewer than 2000 slots, but your queries might be less performant, depending on your workload demands.

引用:Introduction to Reservations  |  BigQuery  |  Google Cloud

予約

購入したコミットメントはバケットに対して割り当てられます。この操作を「予約」と呼びます。予約の操作では、どのくらいスロットを割り当てるか決めます。これをすることにより、購入したスロットをプロジェクトや組織に適用できるようになります。

例えば、2000スロット購入している場合、1000スロットを予約すれば、割り当てられたプロジェクトで1000スロットまで使えます。2000スロット全ての予約も可能です。なお、購入したスロット数以上の予約はできません。

image

割り当て

予約したスロットに対してGCPプロジェクトなどのリソースを割り当てます。この操作を「割り当て」と呼びます。リソースには組織、フォルダ、プロジェクトを割り当てることができます。企業で利用する場合に当てはめると、組織には会社名、フォルダには部署名、プロジェクトには部署で管理するGCPプロジェクトが該当します。なお、このリソースは継承関係を持ち、プロジェクトはフォルダの割り当てを継承し、フォルダは「組織」の割り当てを継承しています。この「割り当て」の有無で料金モデルが決まります。もし、オンデマンド料金モデルに戻したい場合は割り当てを削除します。 image

なぜFlex Slotsを使う必要があるのか

次にどうしてFlex Slotsを使う必要があったのか、弊社のデータ基盤が抱えていた課題を紹介します。

弊社でもデータの利活用が進んでおり、全てのデータをBigQueryに集める方針があります。集められたデータはデータ分析やML案件など幅広く用いられています。データの利活用が進むにつれ、BigQueryのデータ量やそれを扱う利用者も増加しました。データ量や利用者、BigQueryを使う案件も増えたことで「パフォーマンス」や「コスト」の課題が見えてきました。

  • パフォーマンスの課題
    • データマートなどのバッチ処理の集計が課題になっていた
    • データ量が少ない時は必要な時間までにバッチ処理の集計を終えることができていたが、データ量や必要なデータマートが増えるにつれ、必要な時間までに集計を終えることができなくなった
    • オンデマンド料金モデルではプロジェクトあたり2000スロットまでしか保証されないため、安定したパフォーマンスを得るためにはもっとスロットが必要だった
  • コストの課題
    • クエリ実行に伴うコストも上がっていた
    • データの利活用が進むにつれ、データ量に加えてBigQueryの利用者も増え、オンデマンド料金モデルなので当然費用も上がった
    • 利用者のBigQueryのリテラシーも人それぞれなので、あまり詳しくない人でもコストを意識せずに使えるように管理する必要があった

これらの課題を解決するためにFlex Slotsを導入しました。先に述べたようにFlex Slotsを用いることで、より柔軟にスロットを購入できます。60秒単位で購入のキャンセルができるため、重いバッチ処理など一時的に多くのスロットを使う場合に有効です。

従来の定額料金モデルには、年次コミットメントと月次コミットメントの2種類しかありませんでした。月次コミットメントを使えばコストを抑えることができますが、スロットを多く使うバッチ処理の集計を時間内に終えることができません。一方、月次コミットメントでバッチ集計に必要な量のスロットを購入すれば、集計を時間内に終えることができますが、コスト面でのデメリットが大きくなります。定額料金モデルが2種類しかなかった頃は、私たちのユースケースとは相性が良くありませんでした。

そこに、スロットの購入方法として新たにFlex Slotsが導入され、より柔軟なスロットの購入が可能となりました。スロットをあまり必要としないアドホックなクエリなどは、月次コミットメントでコストを抑えます。一方でバッチ処理の集計など多くのスロットが必要な時は、バッチ集計中のみFlex Slotsで必要なスロットを追加で購入し、割り当てることができます。Flex Slotsを導入することでパフォーマンス、コスト面の課題を解決することが可能になりました。

Flex Slotsを用いたコストパフォーマンス改善設計

私たちは月次コミットメント2000スロット、データマート集計前にFlex Slots 7000スロットを購入しています。データマート集計時は月次コミットメントで購入した2000スロットとFlex Slotsにて購入した7000スロット、合計9000スロットを割り当てます。本章では、管理プロジェクトの作成からFlex Slotsを用いたワークフロー設計まで、弊社の活用事例をご紹介します。

image

Icons made by irasutoya from www.irasutoya.com

管理プロジェクトの作成

まず、BigQuery Reservations用の管理プロジェクトを作成します。既存のプロジェクトとは分けて管理用のプロジェクトを作ります。

そして、この管理プロジェクトを用いて、組織内の各プロジェクトにあるBigQueryのスロットを管理できるようにします。なお、管理プロジェクトではBigQuery Reservations APIを有効化する必要があります。

弊社では複数のGCPプロジェクトが存在しますが、現在BigQuery Reservationsを用いて定額モデルを採用しているのはデータ基盤のプロジェクトのみです。そして、権限は管理用のプロジェクトと割り当てプロジェクト両方で必要な点に注意しましょう。

image

Icons made by irasutoya from www.irasutoya.com

Project-to-reservation assignment requires that you grant permission in both the administration project and the assignee projects. We recommend that you grant administrators the bigquery.resourceAdmin role at the organization or folder level.

引用:Reservations details and limitations  |  BigQuery  |  Google Cloud

月次コミットメントの活用

月次コミットメントを用いて、スロットの購入から予約、プロジェクトに対する割り当てまでの流れを説明します。Flex Slotsと違い、一度操作するだけで良いものなので、GCPコンソールから設定します。

image

Icons made by irasutoya from www.irasutoya.com

最初にスロットの購入をします。GCPコンソールから、月次コミットメントを2000スロット購入します。

image

次に予約です。先ほど購入した2000スロットを全て予約します。予約では手持ちのスロットのうち何スロット割り当てるか決めます。弊社の場合、データ基盤用のGCPプロジェクト1つのみが対象なので、購入したスロット全てを予約しています。

image

最後に割り当てです。予約から割り当てを作り、スロットを割り当てるプロジェクトを選択します。そして、割り当てが完了すると、スロットを割り当てたプロジェクトはオンデマンド料金モデルから定額料金モデルに切り替わります。その結果、割り当て完了後には、弊社の環境の場合はデータ基盤のプロジェクトで2000スロットまで使うことができるようになります。

image

Flex Slotsの活用とワークフロー設計

コストパフォーマンスを改善させるために、Flex Slotsを用いてバッチ処理の前にスロットを購入し、完了後にスロットを破棄させるようにします。この制御はワークフローエンジンであるDigdagを用います。ここでは、Digdagのワークフローや実行してるタスクもご紹介します。

ワークフロー

Digdagで実行しているワークフローを紹介します。Digdagではコンテナイメージ内部でオペレータのタスクを実行できます。それを利用するために、GCPのサービスアカウントなどの秘密情報をDigdag Secretに登録し、タスク実行時に環境変数として渡しています。

タスクの流れを順に説明します。まず、バッチ処理の実行前にFlex Slotsで7000スロット購入します。次に、月次コミットメントで購入した2000スロットと合わせて9000スロット割り当てます。なお、割り当ては既に月次コミットメント適用時に作成済みなので、再度作る必要はありません。

image

Icons made by irasutoya from www.irasutoya.com
+bigquery_flex_slots_up:
  +bigquery_flex_slots_commitment:
    _retry:
      limit: 5
      interval: 10
      interval_type: exponential
    _env:
      GCP_CREDENTIAL: ${secret:gcp.credential}
    _export:
      docker:
        image: ${docker_cloudsdk.image}
        pull_always: ${docker_cloudsdk.pull_always}
    sh>: tasks/bigquery_flex_slots_purchase_commitment.sh 7000
  +bigquery_flex_slots_reservation:
    _retry: 3
    _env:
      GCP_CREDENTIAL: ${secret:gcp.credential}
    _export:
      docker:
        image: ${docker_cloudsdk.image}
        pull_always: ${docker_cloudsdk.pull_always}
    sh>: tasks/bigquery_flex_slots_reservation.sh batch 9000

続いて、購入完了後にワークフローエンジンでバッチ処理を実行します。バッチ処理完了後は再度予約で2000スロットに戻してから、Flex Slotsで購入した7000スロットを削除します。なお、2000スロットへ戻す前にFlex Slotsを削除してしまうと、予約で割り当てたスロットを満たすことができなくなり、エラーが発生します。

image

Icons made by irasutoya from www.irasutoya.com
+bigquery_flex_slots_down:
  +bigquery_flex_slots_reservation:
    _retry: 3
    _env:
      GCP_CREDENTIAL: ${secret:gcp.credential}
    _export:
      docker:
        image: ${docker_cloudsdk.image}
        pull_always: ${docker_cloudsdk.pull_always}
    sh>: tasks/bigquery_flex_slots_reservation.sh batch 2000
  +bigquery_flex_slots_removement:
    _retry: 3
    _env:
      GCP_CREDENTIAL: ${secret:gcp.credential}
    _export:
      docker:
        image: ${docker_cloudsdk.image}
        pull_always: ${docker_cloudsdk.pull_always}
    sh>: tasks/bigquery_flex_slots_remove_commitment.sh

タスク

コミットメント、予約、割り当て実行しているタスクをご紹介します。

「コミットメントの購入」タスク

コミットメントの購入は以下のスクリプト bigquery_flex_slots_purchase_commitment.sh で行います。当時はbqコマンドしかサポートされていなかったため、bqコマンドを使っています。

#!/bin/bash
admin_project_id=$ADMIN_PROJECT_ID
slots=$1

# authorization
echo $GCP_CREDENTIAL > GCP_CREDENTIAL.json
gcloud auth activate-service-account --key-file=GCP_CREDENTIAL.json
export BIGQUERYRC=/root/.bigqueryrc

bq mk --project_id=${admin_project_id} --location=US --capacity_commitment --plan=FLEX --slots=${slots}

そして、スロットの購入に成功すると以下のようなログが出力されます。stateがACTIVEであれば、問題なく購入できていることを示しています。

                     name                       slotCount   plan   renewalPlan   state        commitmentStartTime            commitmentEndTime       
 --------------------------------------------- ----------- ------ ------------- -------- ----------------------------- ----------------------------- 
  admin-gcp-project:US.12697877815420638341   7000        FLEX                 ACTIVE   2021-07-06T01:03:56.570385Z   2021-07-06T01:04:56.570385Z

この状態でGCPコンソールを確認すると、月次コミットメントに加え、購入したFlex Slotsが表示されていることを確認できます。 image

「コミットメントの削除」タスク

コミットメントの削除は以下のスクリプト bigquery_flex_slots_remove_commitment.sh で行います。なお、削除対象のコミットメントIDを指定する必要があります。そのため、Flex SlotsのコミットメントIDを取得し、そのIDを利用して対象のコミットメントを削除します。

#!/bin/bash
admin_project_id=$ADMIN_PROJECT_ID
# authorization
echo $GCP_CREDENTIAL > GCP_CREDENTIAL.json
gcloud auth activate-service-account --key-file=GCP_CREDENTIAL.json
export BIGQUERYRC=/root/.bigqueryrc

capacity_commitment_id=$(bq ls --capacity_commitment --location US --format prettyjson --project_id=${admin_project_id} | jq 'map(select(.["plan"] | startswith("FLEX"))) | .[] | .name | split("/") | .[5]'| sed 's/"//g')
# removement
bq rm --project_id=${admin_project_id} --location=US --capacity_commitment ${admin_project_id}:US.${capacity_commitment_id}

「予約」タスク

予約は以下のスクリプト bigquery_flex_slots_reservation.sh で行います。予約により、プロジェクトに対し割り当てるスロットを確保します。ここでは、月次コミットメント2000とFlex Slots 7000、合計9000スロットを予約します。

#!/bin/bash
admin_project_id=$ADMIN_PROJECT_ID
reservation=$1
assignment_project_slot=$2

# authorization
echo $GCP_CREDENTIAL > GCP_CREDENTIAL.json
gcloud auth activate-service-account --key-file=GCP_CREDENTIAL.json
export BIGQUERYRC=/root/.bigqueryrc

# reservation
bq update --project_id=${admin_project_id} --location=US --slots=${assignment_project_slot} --reservation ${reservation}

そして、予約に成功すると以下のようなログが出力されます。

              name               slotCapacity   ignoreIdleSlots          creationTime                   updateTime           
 ------------------------------ -------------- ----------------- ----------------------------- ----------------------------- 
  admin-gcp-project:US.batch   9000           False             2020-06-01T01:48:43.480961Z   2021-07-06T01:06:55.357921Z  

また、GCPコンソールの予約を確認すると2000スロットから9000スロットに更新されていることも確認できます。 image

運用におけるFlex Slotsの注意点とワークフロー設計

上記の運用を実際に行ってみると、Flex Slotsを購入できないことが1〜2か月に1度程度発生します。スロットが購入できない場合、そのままでは月次コミットメントで購入した2000スロットを用いてバッチ処理の集計をすることになります。

本来9000スロットを必要とするバッチ処理の集計を2000スロットで実施することになり、完了していなければいけない想定の時間までに集計を終えることができません。そのため、Flex Slotsを購入できなかった場合には、割り当てを削除し、定額料金モデルからオンデマンド料金モデルに切り替える必要があります。

それらを考慮し、以下のワークフロー設計で運用をしています。

image

運用上の注意点2選

前述のフロー設計で運用していく場合にも注意点が存在するので、2つ紹介します。どちらもFlex Slotsが購入できない場合に必要となる対応です。

コミットメント購入時は冪等にする

コミットメント購入時には冪等性を意識する必要があります。

実際に次のようなエラーで購入できない場合があります。

BigQuery error in mk operation: Failed to create capacity commitment in '': The
service is currently unavailable.

このエラーの場合、リトライすることで購入できます。リトライによりFlex Slotsを2回購入してしまうと費用がその分加算されてしまうので、以下のスクリプト例のように、既に購入したコミットメントを削除して冪等性を意識した処理にします。なお、PENDING状態でもしばらくするとACTIVE状態になるため、スロット分の費用が発生します。

#!/bin/bash
slots=$1

bash tasks/bigquery_flex_slots_remove_commitment_wrapper.sh
bash tasks/bigquery_flex_slots_purchase_commitment.sh $slots

オンデマンド料金モデルに自動で切り替える

Flex Slotsで購入したコミットメントの状態がPENDING状態の場合、次の購入が長時間できない状態になることが多いです。Exponential Backoffでリトライしても購入できません。そのため、後続処理が遅延しないように定額料金モデルからオンデマンド料金モデルへ切り替える必要があります。公式ドキュメントでもそのような対応を勧めています。

発生頻度や遅延の影響を考慮すると、自動切り替えが可能なワークフロー設計にする必要があります。それには、割り当てを削除することで切り替え可能な設計にできます。月次コミットメントは削除できませんが、PENDING状態のFlex Slotsは削除できるので、割り当てと一緒に削除します。なお、PENDING状態の場合には費用は請求されませんが、ACTIVE状態になると請求されてしまいます。

Slots are subject to available capacity. When you purchase slots and BigQuery allocates them, then the Status column shows a check mark. If BigQuery can't allocate the requested slots immediately, then the Status column remains pending. You might have to wait several hours for the slots to become available. If you need access to slots sooner, try the following: If a slot commitment fails or takes a long time to complete, consider using on-demand pricing temporarily. With this solution, you might need to run critical queries on a different project that's not assigned to any reservations, or you might need to remove the project assignment altogether.

引用:Working with Reservations  |  BigQuery  |  Google Cloud

ワークフロー

運用上の注意点で述べたように、Flex Slotsを購入できなかった場合には、オンデマンド料金モデルに切り替える必要があります。冒頭で紹介したワークフローと違い、各タスクで冪等性を考慮したり、コミットメントの状態を確認し、オンデマンド料金モデルに切り替えられるよう設計し直しています。実行してるタスクはラッパータスクとして後ほどご紹介します。

image

+bigquery_flex_slots_up:
  +bigquery_flex_slots_commitment:
    _retry:
      limit: 5
      interval: 10
      interval_type: exponential
    _env:
      GCP_CREDENTIAL: ${secret:gcp.credential}
    _export:
      docker:
        image: ${docker_cloudsdk.image}
        pull_always: ${docker_cloudsdk.pull_always}
    sh>: tasks/bigquery_flex_slots_purchase_commitment_wrapper.sh 7000
  +bigquery_flex_slots_verify_for_ondemand_planning:
    _retry: 3
    _env:
      GCP_CREDENTIAL: ${secret:gcp.credential}
    _export:
      docker:
        image: ${docker_cloudsdk.image}
        pull_always: ${docker_cloudsdk.pull_always}
    sh>: tasks/bigquery_flex_slots_verify_for_ondemand_planning.sh
  +bigquery_flex_slots_reservation:
    _retry: 3
    _env:
      GCP_CREDENTIAL: ${secret:gcp.credential}
    _export:
      docker:
        image: ${docker_cloudsdk.image}
        pull_always: ${docker_cloudsdk.pull_always}
    sh>: tasks/bigquery_flex_slots_reservation.sh batch 9000

これにより、オンデマンド料金モデルへ切り替えた場合、割り当ては削除されるようになります。バッチ処理完了後に割り当ての有無を確認し、割り当てがなければ割り当てを作る点が、前述のタスクとの違いです。

image

+bigquery_flex_slots_down:
  +bigquery_flex_slots_reservation:
    _retry: 3
    _env:
      GCP_CREDENTIAL: ${secret:gcp.credential}
    _export:
      docker:
        image: ${docker_cloudsdk.image}
        pull_always: ${docker_cloudsdk.pull_always}
    sh>: tasks/bigquery_flex_slots_reservation.sh batch 2000
  +bigquery_flex_slots_removement:
    _retry: 3
    _env:
      GCP_CREDENTIAL: ${secret:gcp.credential}
    _export:
      docker:
        image: ${docker_cloudsdk.image}
        pull_always: ${docker_cloudsdk.pull_always}
    sh>: tasks/bigquery_flex_slots_remove_commitment_wrapper.sh
  +bigquery_flex_slots_reserve_assignment:
    _retry: 3
    _env:
      GCP_CREDENTIAL: ${secret:gcp.credential}
    _export:
      docker:
        image: ${docker_cloudsdk.image}
        pull_always: ${docker_cloudsdk.pull_always}
    sh>: tasks/bigquery_flex_slots_reserve_assignment_wrapper.sh

ラッパータスク

次に、ワークフローから実行しているタスクを紹介します。冪等性やエラーのハンドリングをする必要があるため、ラッパータスクを用意しています。なお、ラッパータスクから呼び出しているタスクは後述します。

コミットメント

最初に、コミットメントのラッパータスクをご紹介します。

コミットメントの購入

コミットメントの購入時に、以下のエラーが出力される場合もあります。このエラーがでた場合、購入したコミットメントはPENDING状態になります。

BigQuery error in mk operation: Failed to create capacity commitment in '': The
service is currently unavailable.

PENDING状態になった場合、しばらくするとACTIVEになり請求対象となります。そのため、リトライの際は対象のコミットメントを削除してから再購入しています。

Flex Slotsを運用してみたところ、Flex Slotsでコミットメントを購入できないケースが2パターン見つかりました。

1つ目は、コミットメント購入時に上記のエラーが出力されて購入できない場合です。この場合は、Exponential Backoffなどでリトライすることで購入できます。

2つ目は、コミットメント購入時にエラーは出力されないものの、購入したコミットメントの状態がPENDINGになる場合です。

コミットメントの購入に失敗することは、前述の通り1〜2か月に1度程度発生し、ほとんどが上記の2つ目のパターンです。2つ目の場合、Exponential Backoffなどでリトライしても購入できないため、即時オンデマンド料金モデルへ切り替える必要があります。

image

上記の内容を、以下のスクリプト bigquery_flex_slots_purchase_commitment_wrapper.sh で行います。

#!/bin/bash
slots=$1

bash tasks/bigquery_flex_slots_remove_commitment_wrapper.sh
bash tasks/bigquery_flex_slots_purchase_commitment.sh $slots

コミットメントの削除

コミットメントを削除するタスクです。まず、オンデマンド料金モデルへの切り替えを考慮し、割り当ての有無を確認します。その後、割り当ての確認ができたらFlex SlotsのコミットメントIDを取得して、対象のコミットメントIDを削除しています。

image

上記の内容を、以下のスクリプト bigquery_flex_slots_remove_commitment_wrapper.sh で行います。

#!/bin/bash

# removement
assignment=$(bash tasks/bigquery_flex_slots_assignment_status.sh)
if [ -n "$assignment" ]; then
  capacity_commitment_id=$(bash tasks/bigquery_flex_slots_fetch_commitment.sh)
  if [ -n "$capacity_commitment_id" ]; then
    bash tasks/bigquery_flex_slots_remove_commitment.sh $capacity_commitment_id
  fi
fi

なお、コミットメントを削除する際に、以下のエラーが出力される場合もあります。一見、コミットメントの削除に失敗していそうですが、実際には削除できています。そのため、リトライした際に削除済みであるか、コミットメントの有無を確認しています。

BigQuery error in rm operation: Failed to delete capacity commitment 'admin-gcp-project:US.11812766842974244240': The service is currently unavailable.

予約

次に予約のタスクをご紹介します。

予約の作成

このタスクではFlex Slotsで購入した7000スロットと月次コミットメントを合わせた、合計9000スロットをデータ基盤用のプロジェクトへ割り当てるために利用しています。

image

上記の内容を、以下のスクリプト bigquery_flex_slots_reservation.sh で行います。

#!/bin/bash
admin_project_id=$ADMIN_PROJECT_ID
reservation=$1
assignment_project_slot=$2

# authorization
echo $GCP_CREDENTIAL > GCP_CREDENTIAL.json
gcloud auth activate-service-account --key-file=GCP_CREDENTIAL.json
export BIGQUERYRC=/root/.bigqueryrc

# reservation
assignment=$(bash tasks/bigquery_flex_slots_assignment_status.sh)
if [ -n "$assignment" ]; then
  bq update --project_id=${admin_project_id} --location=US --slots=${assignment_project_slot} --reservation ${reservation}
fi

割り当て

定額料金モデルからオンデマンド料金モデルに切り替えるには、作成した割り当てを削除する必要があります。割り当てがあるか確認し、もしなければ割り当てを作ります。

割り当ての作成

このタスクはワークフローでオンデマンド料金モデルに切り替えたのち、バッチ集計完了後に実行しています。割り当てがない場合は割り当てを作ります。

image

上記の内容を、以下のスクリプト bigquery_flex_slots_reserve_assignment_wrapper.sh で行います。

#!/bin/bash

# reservation_assignment
assignment=$(bash tasks/bigquery_flex_slots_assignment_status.sh)
if [ -z "$assignment" ]; then
  bash tasks/bigquery_flex_slots_reserve_assignment.sh
  bash tasks/bigquery_flex_slots_alert_notice.sh 2
fi

割り当ての削除

割り当てを削除するタスクです。具体的には、作成した割り当てを取得して削除します。これは、定額料金からオンデマンド料金モデルへ切り替えるために利用しています。

image

上記の内容を、以下のスクリプト bigquery_flex_slots_remove_assignment_wrapper.sh で行います。

#!/bin/bash
admin_project_id=$ADMIN_PROJECT_ID

assignment=$(bash tasks/bigquery_flex_slots_fetch_assignment.sh)
bash tasks/bigquery_flex_slots_remove_assignment.sh $assignment

オンデマンド料金モデルへ切り替え

Flex Slotsで購入したコミットメントの状態を確認してPENDING状態だった場合、即時にオンデマンド料金モデルへ切り替えています。Flex SlotsはPENDING状態であれば請求されませんが、しばらくするとACTIVE状態になり、請求対象になります。そのため、購入したスロットをまず削除します。

その後、割り当てを削除し、定額料金モデルからオンデマンド料金モデルへ切り替えます。前述の通り、この場合はExponential Backoffなどでリトライしても購入できないため、即時オンデマンド料金モデルに切り替えています。しかし、オンデマンド料金モデルの場合、確実に遅延します。遅延の原因調査をやりやすくするために、料金モデルを切り替えた際には通知を飛ばしています。

image

上記の内容を、以下のスクリプト bigquery_flex_slots_verify_for_ondemand_planning.sh で行います。

#!/bin/bash

# verification
flex_slots_status=$(bash tasks/bigquery_flex_slots_fetch_commitment_status.sh)
if [ $flex_slots_status = "PENDING" ]; then
  # change plan from flex slots to ondemand
  bash tasks/bigquery_flex_slots_remove_commitment_wrapper.sh
  bash tasks/bigquery_flex_slots_remove_assignment_wrapper.sh
  bash tasks/bigquery_flex_slots_alert_notice.sh 1
fi

その結果、購入したスロットを見るとPENDING状態であることが分かります。

Capacity commitment admin-gcp-project:US.
                     name                      slotCount   plan   renewalPlan    state    commitmentStartTime   commitmentEndTime
 -------------------------------------------- ----------- ------ ------------- --------- --------------------- -------------------
  admin-gcp-project:US.9638566938320457134   7000        FLEX                 PENDING

タスク

次にラッパータスクから実行しているタスクを紹介します。

コミットメント

コミットメントの購入、削除、取得、状態確認をするスクリプトをご紹介します。

コミットメントの購入

まずはコミットメントを購入するスクリプトです。その際に、コミットメントでは購入したいスロット数を指定します。弊社では前述の通り7000スロット購入しています。

上記の内容を、以下のスクリプト bigquery_flex_slots_purchase_commitment.sh で行います。

#!/bin/bash
admin_project_id=$ADMIN_PROJECT_ID
slots=$1

# authorization
echo $GCP_CREDENTIAL > GCP_CREDENTIAL.json
gcloud auth activate-service-account --key-file=GCP_CREDENTIAL.json
export BIGQUERYRC=/root/.bigqueryrc

bq mk --project_id=${admin_project_id} --location=US --capacity_commitment --plan=FLEX --slots=${slots}

コミットメントの削除

次に、コミットメントを削除するスクリプトです。購入したコミットメントは削除しないかぎり請求されるため、バッチ処理が終わったら、購入したコミットメントを即時削除する必要があります。

上記の内容を、以下のスクリプト bigquery_flex_slots_remove_commitment.sh で行います。

#!/bin/bash
admin_project_id=$ADMIN_PROJECT_ID
capacity_commitment_id=$1

# authorization
echo $GCP_CREDENTIAL > GCP_CREDENTIAL.json
gcloud auth activate-service-account --key-file=GCP_CREDENTIAL.json
export BIGQUERYRC=/root/.bigqueryrc

# removement
bq rm --project_id=${admin_project_id} --location=US --capacity_commitment ${admin_project_id}:US.${capacity_commitment_id}

コミットメントの取得

コミットメントを取得するスクリプトです。コミットメントを削除する際にはコミットメントIDを取得する必要があるため、その用途で利用するものです。

上記の内容を、以下のスクリプト bigquery_flex_slots_fetch_commitment.sh で行います。

#!/bin/bash
admin_project_id=$ADMIN_PROJECT_ID

# authorization
echo $GCP_CREDENTIAL > GCP_CREDENTIAL.json
gcloud auth activate-service-account --key-file=GCP_CREDENTIAL.json
export BIGQUERYRC=/root/.bigqueryrc

bq ls --capacity_commitment --location US --format prettyjson --project_id=${admin_project_id} | jq 'map(select(.["plan"] | startswith("FLEX"))) | .[] | .name | split("/") | .[5]'| sed 's/"//g'

コミットメントの状態取得

コミットメントの状態を確認するスクリプトです。PENDING状態の場合に、コミットメントと割り当てを削除し、定額料金モデルからオンデマンド料金モデルへ切り替えるために利用します。

上記の内容を、以下のスクリプト bigquery_flex_slots_fetch_commitment_status.sh で行います。

#!/bin/bash
admin_project_id=$ADMIN_PROJECT_ID

# authorization
echo $GCP_CREDENTIAL > GCP_CREDENTIAL.json
gcloud auth activate-service-account --key-file=GCP_CREDENTIAL.json
export BIGQUERYRC=/root/.bigqueryrc

bq ls --capacity_commitment --location US --format prettyjson --project_id=${admin_project_id} | jq 'map(select(.["plan"] | startswith("FLEX"))) | .[] | .state | split("/") | .[0]'| sed 's/"//g'

予約

次に予約を作成するスクリプトをご紹介します。

予約の作成

予約を作成するスクリプトです。Flex Slotsで購入した7000スロットと月次コミットメントを合わせた、合計9000スロットをデータ基盤用のプロジェクトへ割り当てるために利用しています。

上記の内容を、以下のスクリプト bigquery_flex_slots_reservation.sh で行います。

#!/bin/bash
admin_project_id=$ADMIN_PROJECT_ID
reservation=$1
assignment_project_slot=$2

# authorization
echo $GCP_CREDENTIAL > GCP_CREDENTIAL.json
gcloud auth activate-service-account --key-file=GCP_CREDENTIAL.json
export BIGQUERYRC=/root/.bigqueryrc

# reservation
assignment=$(bash tasks/bigquery_flex_slots_assignment_status.sh)
if [ -n "$assignment" ]; then
  bq update --project_id=${admin_project_id} --location=US --slots=${assignment_project_slot} --reservation ${reservation}
fi

割り当て

次に割り当ての作成、削除、取得、作成有無を確認するスクリプトをご紹介します。

割り当ての作成

割り当てを作成するスクリプトです。割り当てを作成することにより、BigQueryの料金モデルがオンデマンド料金モデルから定額料金モデルに切り替わります。

上記の内容を、以下のスクリプト bigquery_flex_slots_reserve_assignment.sh で行います。

#!/bin/bash
admin_project_id=$ADMIN_PROJECT_ID

# authorization
echo $GCP_CREDENTIAL > GCP_CREDENTIAL.json
gcloud auth activate-service-account --key-file=GCP_CREDENTIAL.json
export BIGQUERYRC=/root/.bigqueryrc

# reservation_assignment
bq mk --reservation_assignment --project_id=${admin_project_id} --assignee_id=assignee-gcp-project --location=US --assignee_type=PROJECT  --job_type=QUERY --reservation_id=${admin_project_id}:US.batch

割り当ての削除

割り当てを削除するスクリプトです。割り当てを削除することにより、BigQueryの料金モデルが定額料金からオンデマンド料金モデルへ切り替わります。

上記の内容を、以下のスクリプト bigquery_flex_slots_remove_assignment.sh で行います。

#!/bin/bash
admin_project_id=$ADMIN_PROJECT_ID
assignment=$1

# authorization
echo $GCP_CREDENTIAL > GCP_CREDENTIAL.json
gcloud auth activate-service-account --key-file=GCP_CREDENTIAL.json
export BIGQUERYRC=/root/.bigqueryrc

# removement
bq rm --project_id=${admin_project_id} --location=US --reservation_assignment $assignment

割り当ての取得

割り当てたリソース名を取得するスクリプトです。オンデマンド料金モデルへ切り替える際に、リソース名を取得して削除するために利用しています。

上記の内容を、以下のスクリプト bigquery_flex_slots_fetch_assignment.sh で行います。

#!/bin/bash
admin_project_id=$ADMIN_PROJECT_ID

# authorization
echo $GCP_CREDENTIAL > GCP_CREDENTIAL.json
gcloud auth activate-service-account --key-file=GCP_CREDENTIAL.json
export BIGQUERYRC=/root/.bigqueryrc

bq ls --project_id=${admin_project_id} --location=US --reservation_assignment --format prettyjson | jq 'map(select(.["assignee"] | startswith("projects/assignee-gcp-project"))) | .[] | .name'| sed 's/projects//g' | sed 's/locations/:/g' | sed 's/reservations/./g' | sed 's/assignments/./g'  | sed 's/\///g' | sed 's/"//g'

割り当ての有無を確認するために、以下のスクリプト bigquery_flex_slots_assignment_status.sh も利用しています。

#!/bin/bash
admin_project_id=$ADMIN_PROJECT_ID

# authorization
echo $GCP_CREDENTIAL > GCP_CREDENTIAL.json
gcloud auth activate-service-account --key-file=GCP_CREDENTIAL.json
export BIGQUERYRC=/root/.bigqueryrc

# flex slots status
bq ls --project_id=${admin_project_id} --location=US --reservation_assignment  | sed 's/No reservation assignments found.//g'

Flex Slotsの導入効果

本章では、Flex Slotsの導入効果をご紹介します。

パフォーマンス面の効果

Flex Slots導入により、2〜6時間ほどのパフォーマンス改善が実現されました。スロットを購入できた場合、1.5時間前後でバッチ処理は完了します。スロットを購入できなかった場合は、即時オンデマンド料金モデルへ切り替えています。オンデマンド料金モデルだと、改善された分のパフォーマンスが得られないため、スロットを購入できた場合と比べ2〜6時間ほど遅延します。

コスト面の効果

現時点で毎月数十万円ほどのコストメリットを得られています。

オンデマンド料金モデルでは1TBあたり日本円にして500円ほどで、毎月1TBまでは無料で利用できます。なお、クエリで処理しているバイト数はBigQueryのInformation Schemaより確認できます。

SELECT
  SUM(total_bytes_processed) /(1024 * 1024 * 1024 * 1024) AS total_terabyte_processed
FROM
  `assignee-gcp-project`.`region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE
  creation_time BETWEEN '2021-06-01 00:00:00.000 UTC'
  AND '2021-06-30 23:59:59.000 UTC'

cloud.google.com

Flex Slotsのメリット・デメリット

Flex Slotsを導入して実感したメリットとデメリットをご紹介します。

メリット

Flex Slotsを利用するメリットは以下の通りです。

パフォーマンスが改善される

Flex Slots導入により、オンデマンド料金モデルに比べて格段に集計時間が早くなりました。3〜6時間かかっていた集計が1.5時間程度で終わります。

BigQueryのオンデマンド料金モデルのままでパフォーマンス要件を満たせない場合、Flex Slotsを導入して得られるメリットは大きいでしょう。

コストを削減できる

Flex Slotsの導入効果で述べたように、弊社の利用状況だと月々数十万円ほどのコスト削減を実現できています。なお、既に述べたようにコストメリットはBigQueryの利用状況にも依存します。コストとパフォーマンスはトレードオフなので、Cloud MonitoringやInformation Schemaを使い、適切なスロット数を割り当てることでコスト削減が可能です。

デメリット

Flex Slotsを利用するデメリットは以下の通りです。どれもスロットを購入できないことに起因するものです。

リトライしても購入できない場合がある

Flex Slotsが購入できない場合、そのまま数時間にわたり購入できないことが多いです。月次コミットメントだと購入したスロット数以上のスロットは割り当てられないため、オンデマンド料金モデルに切り替えています。発生頻度は前述の通り1〜2か月に1度程度ですが、最長だと5日間連続で購入できないことも過去に発生しました。発生頻度が低いとは言えないため、手動ではなく自動で切り替わるようにしておかないと、運用負荷が大きくなります。

Slots are subject to available capacity. When you purchase slots and BigQuery allocates them, then the Status column shows a check mark. If BigQuery can't allocate the requested slots immediately, then the Status column remains pending. You might have to wait several hours for the slots to become available. If you need access to slots sooner, try the following:

引用:Working with Reservations  |  BigQuery  |  Google Cloud

オンデマンド料金モデル切り替えに伴いコストが発生する

定額料金モデルからオンデマンド料金モデルへ切り替える場合、割り当てとFlex Slotsで購入したコミットメントを削除します。

しかし、Flex Slotsのコミットメントは削除できますが、月次コミットメントは購入から30日間は削除できません。そのため、切り替えに伴ってオンデマンド料金に加え、月次コミットメント分の費用が余分に発生します。

ワークフローが煩雑化する

Flex Slotsが購入できない場合を考慮したワークフロー設計にする必要があるため、それに伴ってコードも煩雑化します。サポート状況などの確認が必要ですが、必要に応じてOSSにPull Requestを投げつつ、Pythonのクライアントライブラリに置き換えたいと考えています。

googleapis.dev

今後の活用

現在は、社内でもデータ基盤用のプロジェクトのみ、定額料金で運用しています。今後は全社展開を行い、コスト・パフォーマンス改善を全社規模で行いたいと考えています。

社内では、機械学習の活用に伴いBigQueryの利活用が進んでおり、推論のバッチやBigQuery MLなど、Flex Slotsを使ってパフォーマンス面で解決できる部分は多く存在しています。さらに、検索系の案件など、全社のBigQuery費用の上位に該当するプロジェクトもあります。これらのプロジェクトでコスト、パフォーマンスの両面で最適化を行えば、その効果は大きなものになるでしょう。

また、現時点ではBigQueryのジョブユーザ管理ができていません。BigQueryはストレージとコンピューティングリソースが分離されており、IAMもBigQuery閲覧権限とBigQueryジョブユーザと別れています。クエリ実行に伴う請求はこのBigQueryジョブユーザ権限を付与したプロジェクトに対して行われます。下図の場合、プロジェクトBでジョブユーザを管理し、プロジェクトAのテーブルやビューを参照できます。 image

引用:BigQuery for data warehouse practitioners  |  Cloud Architecture Center

しかし、現時点では社内でジョブユーザの管理をうまくできていないため、重複管理やオンデマンド料金モデルのプロジェクトに対しても付与されています。ユーザに紐付くアドホックなクエリなど、定額料金モデルを適用したプロジェクトで管理できるとBigQueryのコストを下げることができるので、この点は今後の課題としています。

おわりに

Flex Slotsを導入することで、BigQueryのコストやパフォーマンスの改善が可能であること、その運用上の注意点をご紹介しました。Flex Slotsの購入に失敗することを考慮したワークフロー設計が必要になるなどの煩雑さはありますが、欠点を大きく上回る利点があります。今後もFlex Slotsを積極的に使っていく予定です。

データの利活用を促進する際に、本記事が同じような課題を抱えている方の参考にれば幸いです。

私たちのチームの業務内容は、以下の記事で紹介しているのでご覧ください。今回紹介した内容以外にも、ログ収集基盤の開発など、データ基盤に関する業務全般を行っています。

https://it-career.blm.co.jp/interviews/zozotechnologies-it-taniguchi-interviewit-career.blm.co.jp

ZOZOテクノロジーズでは、一緒にサービスを作り上げてくれる方を募集中です。ご興味のある方は、以下のリンクからぜひご応募ください!

https://tech.zozo.com/recruit/tech.zozo.com

BigQueryでの集計結果をノーコードでSlackに定期投稿してみた

f:id:vasilyjp:20210412120801j:plain

こんにちは、DATA-SREチームの塩崎です。最近気になるニュースは「ネコがマタタビを好む理由が蚊を避けるためだった1」です。

さて、皆さんはデータ基盤で集計した結果をどのようにして確認していますか。LookerやPower BIなどのBIツールを使って綺麗なダッシュボードを作成している方も多いかと思います。しかし、全員が毎日確認すべき数値はSlackなどの全員が日常的に目にする場所へ掲げたいです。本記事ではBigQueryとSlackを連携させる機能をノーコードで作成する方法を紹介します。

従来手法

BigQueryで集計した結果をSlackに通知するためにはGoogle Apps Script(以下、GAS)を用いるやり方が現在では主流です。GASの文法はJavaScriptとほぼ同じであり、普段分析をメインで担当している人たちには馴染みの薄い言語です。また、Cloud FunctionsとCloud Schedulerを組み合わせて定期的に集計結果をSlackへ通知できますが、これも同様に分析メインな人たちにとっては難易度が高いです。

そのため、Slack通知するためのBotの作成と運用をエンジニアに依頼するという業務フローを採っている組織もあるかと思います。この工数が非常に大きいわけではありませんが、可能ならばエンジニアリソースを使わずにSlackへの通知を実現させたいです。

提案手法

今回提案する手法の全体図を以下に示します。

全体図

BigQuery→Google Sheetsの連携にはConnected Sheetsを使い、Google Sheets→Slackへの連携にはSlack Workflow Builderを使います。Google Sheetsを仲介させることで、SQLのみで集計結果をSlackに通知することが実現できます。

Connected Sheets

Connected SheetsはBigQueryとGoogle Sheetsを繋ぐ機能です。BigQueryに対してクエリを実行した結果をGoogle Sheetsに挿入したり、Google Sheetsにおけるピボットテーブルを自動的にSQLに変換したりできます。今回はクエリの実行結果をGoogle Sheetsへ挿入するために使用しています。

cloud.google.com

support.google.com

Slack Workflow Builder

Slack Workflow Builderは定型的なプロセスをワークフロー化して、Slackで実行するための機能です。デフォルトの状態では、メッセージの送信やフォームの表示などしかできませんが、サードパーティ製のアプリを導入すると外部サービスと連携できます。

slack.com

今回は以下のアプリを使ってGoogle Sheetsとの連携をします。

slack.com

手順

それでは、実際にやってみましょう。今回はお題として「毎朝10時にBigQueryのログを確認し、昨日の利用費が多い人Top3を通知する」を実現させます。

BigQueryでのジョブの実行履歴は、INFORMATION_SCHEMAJOBS_BY_ORGANIZATIONから取得します。

cloud.google.com

完成したクエリを以下に示します。第1列に行番号を入れているのは、Google SheetsとSlackを連携させる時に必要なためです。

select
  row_number() over (order by sum(total_bytes_billed) desc) as row_num,
  user_email,
  cast(sum(total_bytes_billed) / 1024 / 1024 / 1024 / 1024 * 5 as int64) as total_cost_in_usd
from `region-us`.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION
where date(creation_time, 'Asia/Tokyo') = current_date('Asia/Tokyo') - 1 and reservation_id is null
group by user_email
order by total_cost_in_usd desc

BigQueryとGoogle Sheetsの連携

まずはBigQueryとGoogle Sheetsを連携させます。

メニューバーから「Data」→「Data connectors」→「Connect to BigQuery」を選択します。

手順1

課金プロジェクトの選択画面が表示されるので、適当なプロジェクトを選択したあとに「Write Custom Query」を選択してクエリエディタを開きます。ここに先程のクエリを入力して、「Connect」を選択します。

f:id:vasilyjp:20210412120628p:plain

すると、クエリを実行した結果がGoogle Sheetsに挿入されます。

手順3

次に、「Refresh options」から定期実行の設定をします。実行時刻を詳細に指定できず、4時間程度の幅の中から選ぶ必要があります。今回はSlackへの通知時刻が10時なので、それ以前の時間帯であればどの時間でも大丈夫です。

手順4

最後に「Extract」ボタンを選択して、別シートへ結果の書き出しを行います。Data Connectorで自動的に作成されたシートは直接Slackに連携できないので、一旦通常のシートへの書き出しが必要です。

手順5

Google SheetsからSlackへの連携

次にSlackへ連携させます。Google SheetsとSlack Workflow Builderを連携させるためには以下のアプリが必要なので、予めSlackのワークスペースにインストールする必要があります。

slack.com

Slack Workflow Builderを起動し、新規のワークフローを作成します。トリガーは「Scheduled date & time」に設定し、毎日AM 10:00に起動するように設定します。

f:id:vasilyjp:20210412120742p:plain

ここから、Google SheetsとSlack Workflow Builderを連携するための設定を入れていきます。「Add Step」を選択して、Google Sheetsからデータを取得するStepを追加します。

手順7

「Select a spreadsheet row」を選択します。もし、この時にGoogle Sheets関連のStepが見つからない場合は「Google Sheets for Workflow Builder」のインストールが必要です。

手順8

このStepの設定は以下のようにします。「Sheet」はData Connectorが自動的に作成したシートではなく、Extractをして生成したシートにする必要があります。このStepは「Choose a column to search」に設定した列の値が「Define a cell value to find」になっている行をシートから読み取ります。この例ではrow_numが1の列を読み取ることになるので、前述したクエリと併せると、BigQueryの課金額が1番多い人の情報を読み取っています。

手順9

同様に「Add Step」であと2つのStepを作成します。「Define a cell value to find」をそれぞれ2と3の値にする以外は、1つ目と同じ設定のStepにします。これにより、BigQueryの課金額が2番目と3番目に多い人の情報を読み取ります。

手順10

最後に、取得したデータをSlackに投稿するためのStepを作成します。「Add Step」から「Send a message」を選択します。

この時に「Insert a variable」をクリックすると以前のStepで読み取った値を参照できます。同じ名前が3つずつあり少し分かりにくいですが、上のものから順に1番目、2番目、3番目のStepで読み取った値を表しています。

手順11

これらの変数を埋め込み、メッセージを整えていきます。最終的には以下のようなメッセージが出来上がりました。

手順12

あとは、このWorkflowをPublishすれば毎日定期的にBigQueryの高額課金者を通知するBotが完成します。実際に動作している様子を以下に示します。

動作イメージ

メリット・デメリット

従来のGASを使ったやり方に対する、今回の手法のメリット・デメリットをまとめます。

メリット

メリットとして挙げられるのは、SQLだけを知っていればOKという点です。

GASを使う手法で必要だったJavaScriptに関する知識が今回の手法では不要になります。そのため、エンジニアの工数を消費することなく、集計結果の定期的な通知機能を実現できます。このような通知機能は1回作ったらおしまいになることは少なく、プロダクトの成長に併せて確認すべき数値が変わることもしばしばあります。最近では非エンジニアでもSQLを書ける人材が多いので、SQLさえ知っていればOKである仕組みにすると継続的にエンジニアの工数を削減できます。

デメリット

一方で、デメリットもあります。デメリットは大きく分けると2つあり、柔軟性と信頼性が劣るという点です。

まず柔軟性が劣る点について説明します。GASを使ってSlackに連携する場合はSlackのIncoming Webhook機能を使っているケースが多いかと思います。Incoming Webhookで送信するメッセージはBlock Kitに対応しているためリッチな通知ができます。今回の手法でも多少のメッセージの装飾はできますが、標準的なメッセージで可能なものに限ります。

api.slack.com

また、クエリの実行時刻についても柔軟性が劣っています。Connected Sheetsの仕様により、クエリの実行時刻は最大で4時間の誤差が生じることを考慮に入れる必要があります。

さらに、通知の頻度についても柔軟性が劣ります。Slack Workflow BuilderとConnected Sheets両方の仕様により、日次よりも高い頻度では通知できません。

次に信頼性が劣る点について説明します。Slack WorkflowがGoogle Sheetsから値を読み取る時に、BigQueryで実行されているクエリの完了を待ち合わせることができません。クエリの実行が完了していない場合は前日分の集計結果をSlackに通知されてしまいます。そのため、クエリの実行タイミングとSlack Workflowの起動タイミングの間に十分なバッファを用意する必要があります。

また、クエリの実行中にエラーが発生したことを検知する方法がありません。そのため、「通知が来ない」ことによってしかエラーの検知ができません。

これらのデメリットは今後Connected SheetsやSlack Workflowの機能が充実することで解消される可能性があるので、今後に期待したいです。

まとめ

BigQueryで集計をした結果を定期的にSlackに通知する機能をノーコードで作ることができました。GASで作成する場合に比べると柔軟性や信頼性では劣りますが、エンジニアの工数を使わずに通知が実現可能という点が大きなメリットです。簡単な通知Botならば非エンジニアでも作れるようになるので、データ基盤を社内の多くの職種に解放してデータ活用を更にすすめることに貢献できる機能です。

最後に

ZOZOテクノロジーズでは一緒にサービスを作り上げてくれる仲間を募集中です。ご興味のある方は、以下のリンクからぜひご応募ください!

tech.zozo.com

Cloud Composerによるデータバリデーション ~常に正確なデータ集計を実現するために~

OGP

こんにちは。ECプラットフォーム部データエンジニアの遠藤です。現在、私は推薦基盤チームに所属して、データ集計基盤の運用やDMP・広告まわりのデータエンジニアリングなどに従事しています。

以前、私たちのチームではクエリ管理にLookerを導入することで、データガバナンスを効かせたデータ集計基盤を実現しました。詳細は、以前紹介したデータ集計基盤については以下の過去記事をご覧ください。

techblog.zozo.com

本記事では、データ集計基盤に「データバリデーション」の機能を加えて常に正確なデータ集計を行えるように改良する手段をお伝えします。

続きを読む

BigQueryの監査ログをリアルタイムに監視して使いすぎを防止してみる

OGP

こんにちは。SRE部の塩崎です。七味唐辛子の粉末を7種類に分類するという趣味を発展させて、おっとっとを新口動物と旧口動物に分類するという趣味を最近発明しました。

BigQueryは非常にパワフルなData WareHouse(DWH) SaaSであり、大容量のデータを一瞬で分析できます。しかし、課金額がスキャンしたデータ量に比例するという特徴があるため、意図せずに大量のデータをスキャンしてしまい大金を溶かしてしまうことを懸念する人もいます。

qiita.com

そのため、課金額が大きすぎるクエリを発見した際にSlackへ通知する仕組みを作りました。GCP Organization内の全プロジェクトで実行されたBigQueryの監査ログをリアルタイムにチェックすることによってこの仕組みは実現されています。本記事では作成したシステムを紹介します。

なお、本記事は以下のQiita記事に着想を得たものであり、@irotorisさんにはこの場を借りてお礼申し上げます。 qiita.com

最初は元記事で公開されているソースコードをそのまま使用することを考えていたのですが、ライセンス表記がなかったため独自で実装し直し、我々のユースケースにおいて不足する機能を付け足しました。

BigQueryのコストを抑える方法

最初にBigQueryのコスト削減に有効な機能の紹介と、それらだけでは不十分であり、監査ログのリアルタイムスキャン機能を作成した理由について説明します。

Custom Quotas

まず、BigQueryのコストを抑える方法として、プロジェクトレベル・ユーザーレベルにQuotaを割り当てる機能があります。

cloud.google.com

この機能を使えば、特定のユーザーが高額なクエリを実行しすぎた際に、それ以上のクエリの実行を停止できます。

しかし、ユーザー毎にQuotaの上限を変えたい場合は、そのユーザーが属するプロジェクトを変える必要があります。プロジェクトを適切に分けてある場合はこの機能の導入がしやすいですが、そうでない場合は事故を防ぐために相当大きめのQuotaとして設定する必要が出てきてしまいます。特に落ちてはいけない大事なバッチが実行されるプロジェクトとアドホックなクエリが実行されるプロジェクトの分離は必須です。そうしない場合はアドホックなクエリによってQuotaが全部使い果たされてしまい、大事なバッチがusageQuotaExceededエラーになってしまいます。

我々の環境ではこれらの分離がまだまだ不十分であったため、この機能の導入は一旦見送りました。分離が十分にできた時点で導入を再度検討することを考えております。

Reservations

次に紹介するのがReservations機能です。

cloud.google.com

この機能を使うと、スキャンしたデータ量によらず毎月固定の金額が課金されます。どれだけの計算量が必要なのかというのをSlotという単位であらかじめコミットします。このSlotはCPUを仮想化した概念で1Slotが1CPUに相当する量です。

cloud.google.com

このReservations機能を使うことで、「お値段定額クエリ実行し放題プラン」になると勘違いしやすいですが、実際は異なります。

実際は「お値段定額『Slotの枠内で』クエリ実行し放題プラン」です。購入したSlot数を超えたパフォーマンスが出ることはないため、Slotを大量に消費するクエリが同時に実行された場合は、クエリの実行時間が伸びるという結果になります。Slotはプロジェクト毎でしか割り当てできないので、大事な集計バッチとアドホッククエリのプロジェクトを変えないと、集計バッチのSLAを担保できません。

我々の環境ではプロジェクトごとにReservations機能の有効・無効を切り替え、コストを最適化することを試みています。定常的にクエリが実行されているプロジェクトではこの機能を有効化しコストの予測可能性を高めます。

一方でクエリの実行頻度が低いプロジェクトやクエリの実行頻度が一定でないプロジェクトでは従来の料金プランと最低60秒の枠でSlotを購入できるFlex Slotsを利用しようとしています。

いずれのプロジェクトに対しても、スキャン量が多すぎるクエリや、Slot使用量が多すぎるクエリを発見しアラートを上げる仕組みが必要です。

そのため、BigQueryの課金ログをリアルタイムにチェックし、スキャン量やSlot使用量などが多すぎるクエリを通知する仕組みを作成しました。

インフラ構成

今回構築したシステムのインフラ構成図を以下に示します。

インフラ図

GCPのマネージドサービスを活用した、イベントドリブンかつサーバレスな構成です。

BigQueryの監査ログはデフォルトではONになっているため、Cloud Loggingに送られています。Organization内の全プロジェクトから送られてくる監査ログをAggregated sinks機能で集約し、Cloud Pub/Subの1つのTopicに集めます。

その後、Cloud Pub/SubのPush SubscriptionでCloud Runを起動します。HTTPリクエストのBodyの中に監査ログが含まれているため、Cloud Run上に実装したアプリケーションでクエリがリソースを使いすぎていないかどうかをチェックし、Slackに通知します。

ここからは各サービスの連携方法をTerraformのtfファイルを交えながら説明していきます。

Cloud Logging → Cloud Pub/Sub

一番最初にCloud Pub/SubのTopicを作成します。これについては特に詳しい説明は不要かと思います。

resource "google_pubsub_topic" "bq-police" {
  name = "bq-police"
}

次に先程作成したTopicに対して、Cloud LoggingからpublishするためのLog Sinkを作成します。Organization内の全てのログを出力するために、Aggregated sinks機能を使っています。このSinkを作成するためにはOrganizationの logging.configWriter を付与したアカウントで terraform apply をする必要があります。

cloud.google.com

org_id パラメーターは、gcloud organizations list コマンドを実行すると確認できます。filterで指定しているクエリは以下のページを参考にして作成しました。古いタイプの課金ログ(AuditData)と新しいタイプの課金ログ(BigQueryAuditMetadata)の2種類があり、新しい側の利用が推奨されている点に注意が必要です。

cloud.google.com

resource "google_logging_organization_sink" "bq-police-org" {
  name = "bq-police-org"
  destination = "pubsub.googleapis.com/${google_pubsub_topic.bq-police.id}"
  org_id = "123456789012" # 各自の環境で変える
  include_children = true

  filter = <<-EOT
  protoPayload.metadata."@type"="type.googleapis.com/google.cloud.audit.BigQueryAuditMetadata"
  protoPayload.metadata.jobChange.job.jobConfig.type = "QUERY"
  EOT
}

そして、Cloud Loggingのサービスアカウント(writer_identity)にTopicへのpublish権限を付与します。Log Sinkはそれぞれ固有のサービスアカウントで実行されており、そのサービスアカウントに対する書き込み権限の付与が必要です。

cloud.google.com

resource "google_pubsub_topic_iam_member" "bq-police-org" {
  project = google_pubsub_topic.bq-police.project
  topic = google_pubsub_topic.bq-police.name
  role = "roles/pubsub.publisher"
  member = google_logging_organization_sink.bq-police-org.writer_identity
}

Cloud Pub/Sub → Cloud Run

次にCloud Pub/SubとCloud Runの連携について説明します。

まずは、Cloud Runのサービスを作成します。アラートを発報するための閾値(THRESHOLD_*)やアラート対象から除外するユーザー(EXEMPTED_USERS)などを環境変数で設定できるようにしています。また、Slackに通知するためのWeb hookのURLは後述するBerglasで設定できるようにしています。

resource "google_service_account" "bq-police-cloud-run" {
  account_id = "bq-police-cloud-run"
  display_name = "BQ Police(Cloud Run)"
}

resource "google_cloud_run_service" "bq-police" {
  name = "bq-police"
  location = "us-central1"

  template {
    spec {
      containers {
        image = "gcr.io/プロジェクトID/bq-police:latest"
        resources {
          limits = {
            cpu = "1000m"
            memory = "256Mi"
          }
        }
        env {
          name = "TZ"
          value = "Asia/Tokyo"
        }
        env {
          name = "THRESHOLD_TOTAL_BILLED_BYTES"
          value = "0"
        }
        env {
          name = "THRESHOLD_TOTAL_SLOT_MS"
          value = "0"
        }
        env {
          name = "EXEMPTED_USERS"
          value = ""
        }
        env {
          name = "SLACK_WEBHOOK_URL"
          value = "sm://プロジェクトID/slack_webhook_url"
        }
      }

      container_concurrency = 10
      service_account_name = google_service_account.bq-police-cloud-run.email
    }
    metadata {
      annotations = {
        "autoscaling.knative.dev/maxScale" = "10"
        "client.knative.dev/user-image"    = "gcr.io/プロジェクトID/bq-police:latest"
        "run.googleapis.com/client-name"   = "terraform"
      }
    }
  }
  autogenerate_revision_name = true
}

Cloud Pub/SubからCloud Runを呼び出すために、Push Subscriptionを作成します。今回作成したCloud Runサービスは呼び出すための認証が必要なため、Pushする際にHTTPヘッダーへ認証情報を埋め込む設定をします。このSubscription専用のサービスアカウントを作成し、serviceAccountTokenCreatorのロールを与えます。これにより、このサービスアカウントは自身のOIDCトークンを取得できるようになります。push_config で生成されたOIDCトークンをHTTPヘッダーに埋め込むように設定します。

cloud.google.com

なお、このOIDCトークンを埋め込んだリクエストを生成するという方式はCloud Pub/Sub以外のプロダクトでも活用できます。以下の記事で詳しく解説されています。

medium.com

そして、Cloud Pub/SubのサービスアカウントにCloud Runサービスの run.invoker ロールを付与することで、この認証済みリクエストに対する認可をします。

resource "google_service_account" "bq-police-pubsub" {
  account_id = "bq-police-pubsub"
  display_name = "BQ Police(Cloud Pub/Sub)"
}

resource "google_project_iam_member" "bq-police-pubsub" {
  member = "serviceAccount:${google_service_account.bq-police-pubsub.email}"
  role = "roles/iam.serviceAccountTokenCreator"
}

resource "google_pubsub_subscription" "bq-police" {
  name  = "bq-police"
  topic = google_pubsub_topic.bq-police.name

  push_config {
    push_endpoint = google_cloud_run_service.bq-police.status[0].url
    oidc_token {
      service_account_email = google_service_account.bq-police-pubsub.email
    }
  }
}

resource "google_cloud_run_service_iam_member" "bq-police-pubsub" {
  service = google_cloud_run_service.bq-police.name
  location = google_cloud_run_service.bq-police.location
  role = "roles/run.invoker"
  member = "serviceAccount:${google_service_account.bq-police-pubsub.email}"
}

アプリケーション

今回はアプリケーションの実行基盤にCloud Runを使っています。そのため、App Engine(Standard Environment)やCloud Functionsと比較して使用できる言語・フレームワークの自由度が高いです。使い慣れているという理由で、Ruby + Sinatraを使って実装してみました。特別なことはしていないので、詳しい説明は省略します。

require 'json'
require 'yaml'
require 'erb'
require 'sinatra'
require 'faraday'

THRESHOLD = {
  total_billed_bytes: ENV.fetch('THRESHOLD_TOTAL_BILLED_BYTES', '0').to_i,
  total_slot_ms: ENV.fetch('THRESHOLD_TOTAL_SLOT_MS', '0').to_i,
}
EXEMPTED_USERS = ENV.fetch('EXEMPTED_USERS', '').split(',')
WEBHOOK_URL = ENV['SLACK_WEBHOOK_URL']

class AuditLog
  attr_reader :project_id, :total_billed_bytes, :total_slot_ms, :principal_email, :start_time, :end_time, :query

  def self.from_pubsub_format(data)
    # Cloud Pub/Subから送られてくるBigQueryの監査ログはBase64でエンコードされたJSON
    self.new(JSON.load(Base64.decode64(data['message']['data'])))
  end

  def initialize(log)
    # Ref: https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/BigQueryAuditMetadata
    @project_id         = log['resource']['labels']['project_id']
    @principal_email    = log['protoPayload']['authenticationInfo']['principalEmail']
    @total_billed_bytes = log['protoPayload']['metadata']['jobChange']['job']['jobStats']['queryStats']['totalBilledBytes'].to_i
    @total_slot_ms      = log['protoPayload']['metadata']['jobChange']['job']['jobStats']['totalSlotMs'].to_i
    @start_time         = Time.parse(log['protoPayload']['metadata']['jobChange']['job']['jobStats']['startTime'])
    @end_time           = Time.parse(log['protoPayload']['metadata']['jobChange']['job']['jobStats']['endTime'])
    @query              = log['protoPayload']['metadata']['jobChange']['job']['jobConfig']['queryConfig']['query']
  end

  def duration
    end_time - start_time
  end

  def total_billed_gb
    total_billed_bytes.to_f / 1024 / 1024 / 1024
  end

  def cost
    # Ref: https://cloud.google.com/bigquery/pricing/#on_demand_pricing
    total_billed_gb.to_f / 1024 * 5
  end

  def total_slot_s
    total_slot_ms.to_f / 1000
  end
end

class BqPolice < Sinatra::Base
  def alert?(audit_log, threshold, exempted_users)
    if exempted_users.include?(audit_log.principal_email)
      return false
    end

    if audit_log.total_billed_bytes >= threshold[:total_billed_bytes] || audit_log.total_slot_ms >= threshold[:total_slot_ms]
      true
    else
      false
    end
  end

  def format_message(audit_log)
    YAML.load(ERB.new(File.read('slack_message.yml.erb')).result_with_hash(audit_log: audit_log))
  end

  def post_to_slack(webhook_url, message)
    Faraday.post(webhook_url, JSON.dump(message), 'Content-Type' => 'application/json')
  end

  post '/' do
    audit_log = AuditLog.from_pubsub_format(JSON.load(request.body.read))

    if alert?(audit_log, THRESHOLD, EXEMPTED_USERS)
      message = format_message(audit_log)
      post_to_slack(WEBHOOK_URL, message)
    end

    status 200
  end
end

上記のRubyコードで参照している slack_message.yml.erb を以下に示します。

username: 'BigQuery Police'
icon_emoji: ':cop:'
channel: '#投稿したいチャンネル'
attachments:
  - text: "BigQuery cost alert"
    fallback: "BigQuery cost alert"
    color: 'danger'
    fields:
      - title: 'User'
        value: <%= audit_log.principal_email %>
        short: true
      - title: 'Project'
        value: <%= audit_log.project_id %>
        short: true
      - title: 'Billed bytes (GB)'
        value: <%= audit_log.total_billed_gb.round(2) %>
        short: true
      - title: 'Cost (USD)'
        value: <%= audit_log.cost.round(2) %>
        short: true
      - title: 'Start time'
        value: <%= audit_log.start_time.getlocal.to_s %>
        short: true
      - title: 'End time'
        value: <%= audit_log.end_time.getlocal.to_s %>
        short: true
      - title: 'Duration (sec)'
        value: <%= audit_log.duration.round(2) %>
        short: true
      - title: 'Slot time (sec)'
        value: <%= audit_log.total_slot_s.round(2) %>
        short: true
      - title: 'Query'
        value: |-
          ```
          <%= audit_log.query.slice(0, 3000).gsub("\n", "\n          ") %>
          ```
        short: false

このアプリケーションを動かすためのDockerイメージを作成します。SlackのWeb hook URLはSecret Managerに格納されており、それをBerglas経由で取得しています。そのため、Berglasのバイナリをコンテナ内に入れ、Berglas経由でPumaを起動しています。Pumaの設定ファイルやGemfileは省略します。

FROM ruby:2.7-slim

COPY --from=us-docker.pkg.dev/berglas/berglas/berglas:latest /bin/berglas /bin/berglas
RUN apt-get -qq update && \
    apt-get -qq -y install build-essential --fix-missing --no-install-recommends

WORKDIR /usr/src/app
COPY Gemfile Gemfile.lock ./
ENV BUNDLE_FROZEN=true
RUN gem install bundler && bundle config set --local without 'test' && bundle install

COPY . ./

CMD ["/bin/berglas", "exec", "--", "/usr/local/bundle/bin/puma", "-C", "puma.rb"]

Cloud RunがBerglas経由でSecret ManagerからSlack Web hook URLを取得できるように、以下のコマンドを実行しておきます。

berglas create sm://プロジェクト名/slack_webhook_url "SlackのWeb hook URL"
berglas grant sm://プロジェクト名/slack_webhook_url --member serviceAccount:Cloud Runのサービスアカウント

まとめ

BigQueryの監査ログをリアルタイムにCloud Runで処理することによって、BigQueryで高額なクエリを実行されたときにいち早く気づくことができるようになりました。スキャン量に対するアラート条件だけでなく使用したSlot数に対する条件を指定できるようにし、オンデマンド課金のプロジェクトでもFlat rate課金のプロジェクトでも使用できました。

動作イメージ

なお、上図は意図的に閾値を厳しくした時の通知であり、実際にはこのレベルのクエリでアラートを発報させてはいません。

ZOZOテクノロジーズでは多数の社員から使われるデータ基盤のデータガバナンスを高められる人材を募集しています。ご興味のある方は以下のリンクからご応募ください。

https://hrmos.co/pages/zozo/jobs/0000017hrmos.co

10TB超えのBigQuery巨大データを高速にS3に同期する

ogp

こんにちは。SRE部MA基盤チームの川津です。

私たちのチームでは今年サービスを終了した「IQON」の10TBを超える大規模データをBigQueryからS3へ移行しました。本記事ではデータ移行を行った際に検討したこと、実際にどのようにデータ移行を行ったかを紹介します。

データ移行の経緯

IQONは2020年4月6日をもってサービスを終了しました。そのIQONではデータ分析にBigQueryを利用していましたが、Amazon Web Services(AWS)上にもIQONに関するリソースが存在します。そのため、IQONはGCPとAWSの2つのクラウドで運用していました。

しかし、サービス終了に伴いGCP・AWSどちらかにリソースを統一する必要が出てきました。統一する意図としては、終了したサービスが利用する取引先を減らし、請求対応などの事務的なコストを減らしたい意図がありました。そのためGCPとAWSの両方にあったデータをどちらか片方に寄せて、もう片方を解約することにしました。

解約するためにGCP・AWSどちらがIQONのリソースを多く利用しているのか確認を行いました。その結果、AWSではクローズ告知ページ用Webサーバーやドメインの管理を行っており、GCP側はBigQueryのみリソースを使用していました。移行の手間を考えた結果、AWSではなくGCPのリソースを消すことにしました。

以上の経緯からBigQueryのデータをAWS上に移行を行うことに決めました。

AWSにデータを移行するにあたり以下の要件を満たす必要がありました。

  • データにアクセスする頻度は年に数回程度なので、維持費を抑えたい

  • データ量が多く、ローカルにダウンロードしてクライアントPCで検索すると時間がかかるため、BigQueryのようにSQLを使いクラウド上でデータ検索をしたい

以上の要件よりデータを保存するリソースとしてS3を選択しました。S3にはS3の料金表にある通り、低頻度アクセス向けの料金プランがあり、データの取り出しに料金がかかる代わりにストレージの料金を抑えるプランがあります。

またAmazon Athenaを利用することでS3に保存したデータに対してSQLを利用して検索できます。Amazon AthenaはAmazon Athenaの概要にある通り、CSV、JSON、ORC、Parquetなどのデータフォーマットに対応しています。加えて圧縮されたファイル形式に対してもSQLが実行でき、データを確認できます。現在はドキュメントに記載のあるSnappy、zlib、LZO、gzip、bzip2形式がサポートされています。

事前準備

アーキテクチャの選定

まずBigQueryからS3へ移行するにあたりどのような方法があるか調べることにしました。

アーキテクチャを選定する際、下記の項目を考慮しました。

  • 費用をなるべく抑える
  • 導入コストをなるべく小さくする
  • データの欠損が起こらないようにする

以上を踏まえて検討をした結果、以下のようなアーキテクチャを採用しました。

architechture

下記の手順で作業を行います。

  1. bq extractコマンドを用いてGCSへテーブルデータを転送する
  2. gsutil rsyncコマンドを利用しGCSからS3へ転送する
  3. GCSからS3へ移行できたか確認する

ポイントはgsutil rsyncコマンドを使いS3へデータを移行する点です。

gsutilコマンドはGoogle Cloud Storage(GCS)へアクセスできるコマンドラインツールで、Google Cloud PlatformがOSSとして公開されています。そして、gsutil rsyncコマンドはgsutilコマンドで提供されているコマンドの1つです。

gsutil rsyncコマンドはバケット間の同期ができる機能であり、この機能はGCP間のバケットの同期だけでなく、S3とGCS間のバケットの同期もサポートしています。また、オプションに-mをつけることにより並列でデータを同期することもできます。

gsutil rsyncコマンドを使う際には気をつける点があります。gsutil rsyncの注意点にも記載があるので引用して紹介します。

Note 2: If you are synchronizing a large amount of data between clouds you might consider setting up a Google Compute Engine account and running gsutil there. Since cross-provider gsutil data transfers flow through the machine where gsutil is running, doing this can make your transfer run significantly faster than running gsutil on your local workstation.

この注釈の文脈から、gustil rsyncコマンドを利用する際、クラウド間の転送を行うと一度コマンドを実行した環境内にデータ転送される仕組みであることがわかります。そのため、転送するデータのサイズが大きい場合には、手元のPC環境から転送する際に注意する必要があると言えます。

IQONのデータサイズをBigQueryのinformation schemaから取得したところ、およそ10TBのデータ量がありました。データ量的に手元のPCで転送を行うのは難しいのでAWS環境のEC2インスタンスを利用し転送を行うことにしました。GCPではなくAWS環境を利用した理由は、会社としてAWSを活用しており、契約面においてEC2の利用料金を抑えることができるからです。

AWSのアーキテクチャの選定

次に、gsutil rsync実行のためのEC2インスタンスを用意するためのインフラ構成について考えます。

まずGCS→EC2、EC2→S3の2つの経路に分けてどのような構成にするか考えました。

前半の経路:GCS→EC2

GCS→EC2の経路に関して考えられる経路としては下記の経路が考えられます。

  1. GCS → Internet → Internet Gateway → EC2(public subnet)
  2. GCS → Internet → Internet Gateway → NAT Gateway → EC2(private subnet)

1つ目の経路はEC2をpublic subnetに配置する構成です。

price_ec2_1

この構成にかかる費用はGCPネットワーク料金を元に算出しています。

EC2をpublic subnetに置く場合、グローバルIPを直接割り当てることができるのでインターネットと通信を行うことができます。しかし不特定多数のサーバーから通信を行うことが可能になるため、EC2のセキュリティグループの設定に気をつける必要があります。

2つ目の経路はEC2をprivate subnetに配置する構成です。

price_ec2_2

こちらの構成では、GCPネットワーク料金NAT Gateway料金を参考にして費用を参考にしています。

private subnetにEC2を配置することで外部のトラフィックを遮断できます。しかし、EC2単体だとGCSへ通信を行えないのでNAT Gatewayを配置してGCSへ通信を行うことができるようにしています。NAT Gatewayを配置することで外向きの通信を行うことができます。そしてprivate subnetにEC2を配置しているので内向きの通信を遮断できます。よって外部からの通信を必要とする攻撃を防ぐことができます。

ここで、10TBのデータを転送すると仮定してGCS→EC2の経路でかかる費用を算出してみます。

1つ目の経路を図と照らし合わせるとGCS → Internet Gateway間で料金が発生します。GCS → Internet Gateway間の料金はGCPネットワーク料金表より0.11(USD/GB)なので1つ目の経路は約1100USDかかることがわかります。

2つ目の経路を図と照らし合わせるとGCS → Internet Gateway、NAT Gatewayで料金が発生します。NAT Gatewayの通信はNAT Gatewayの料金表から0.062(USD/GB)と確認できるので約600USDかかることになります。1つ目の経路で計算したGCS → Internet Gateway間の料金と合計すると約1700USDです。

後半の経路:EC2→S3

次にEC2→S3の経路です。こちらは下記の経路が考えられます。

  1. EC2(public subnet) → Internet Gateway → S3
  2. EC2(public subnet or private subnet) → VPC Endpoint → S3
  3. EC2(private subnet) → NAT Gateway → Internet Gateway → S3

1つ目の経路としてはInternet Gatewayを通る経路です。

price_s3_1

この構成にかかる費用はAWSネットワーク料金を元に算出しています。

2つ目の経路はVPC Endpointを経由する経路です。こちらはEC2をpublic subnetかprivate subnetに置く2つの方法が存在します。通信経路の部分はこの2つに相違点がありますが、後述する料金に関しては同じなのでまとめて考えます。

price_s3_2

この構成にかかる費用はVPC Endpoint料金を元に算出しています。

VPC Endpointを利用することで料金を抑えながらEC2 → S3の経路を内部の通信で完結できます。VPC EndpointはInterface EndpointとGateway Endpointの2種類存在します。今回はAmazon S3で利用可能なGateway Endpointを利用しています。

料金に関してはGateway Endpointの料金説明に記載のある通り追加料金なしで利用できます。しかし、用途次第ではVPC Endpointの料金とは別に、通常のAWSデータ転送料金が発生します。今回の用途の場合、AWSのデータ転送料金に説明があるので引用します。

Data transferred between Amazon S3, Amazon Glacier, Amazon DynamoDB, Amazon SES, Amazon SQS, Amazon Kinesis, Amazon ECR, Amazon SNS or Amazon SimpleDB and Amazon EC2 instances in the same AWS Region is free.

つまり同一リージョンにあるEC2とS3間の通信は無料です。今回はEC2のリージョンとS3のリージョンを同じにしているので0USDで利用できます。

3つ目の通信はNAT Gatewayを経由し、Internet Gatewayを経てS3へ通信する経路です。

price_s3_3

この構成にかかる費用はNAT Gateway料金AWSネットワーク料金を元に算出しています。

3つ目の経路に使っているNAT GatewayはGCS→EC2に用いたNAT Gatewayと同じ用途で使っています。

EC2→S3の部分でも10TBを転送すると仮定してEC2→S3の経路までの費用を算出してみます。

1つ目の費用を図から辿るとEC2 → Internet Gatewayの間で料金が発生します。そのためAWSネットワーク料金によると0.114(USD/GB)発生することになります。合計すると約1200USD発生することがわかります。

2つ目はVPC Endpointを経由してS3と通信します。Gateway Endpointの料金説明からEC2がpublic subnet、private subnetに配置しても0USDであることがわかります。

3つ目の費用はNAT Gateway、NAT Gateway → Internet Gatewayで発生していることがわかります。NAT Gatewayの料金表からNAT Gatewayを通る際0.062(USD/GB)発生し、AWSネットワーク料金も別途発生するのでAWSネットワーク料金表から0.114(USD/GB)かかることがわかります。合計すると約1700USDです。

経路全体:GCS→S3

単純に考えるとGCS→EC2とEC2→S3の経路の組み合わせで6通り考えられますがpublic subnet or private subnetの条件で以下の4つの経路の組み合わせに絞られます。

  1. GCS → Internet → Internet Gateway → NAT Gateway → EC2(private subnet) → VPC Endpoint → S3
  2. GCS → Internet → Internet Gateway → NAT Gateway → EC2(private subnet) → NAT Gateway → Internet Gateway → S3
  3. GCS → Internet → Internet Gateway → EC2(public subnet) → VPC Endpoint → S3
  4. GCS → Internet → Internet Gateway → EC2(public subnet) → Internet Gateway → S3

各経路で発生する費用をGCS→EC2、EC2→S3の2つの経路に分けて計算した費用を組み合わせると下記の通りです。

  1. 1700USD
  2. 3400USD
  3. 1100USD
  4. 2300USD

最後にNAT GatewayとVPC Endpointの有無での組み合わせを表にまとめます。

VPC Endpointあり VPC Endpointなし
Private Subnet, NAT Gatewayあり 料金:1700USD
セキュリティ:内向きの通信を遮断できる
通信経路: 1つ目
料金:3400USD
セキュリティ:内向きの通信を遮断できる
通信経路: 2つ目
Public Subnet, NAT Gatewayなし 料金:1100USD
セキュリティ:アクセス元をより厳格に管理する必要がある
通信経路: 3つ目
料金:2300USD
セキュリティ:アクセス元をより厳格に管理する必要がある
通信経路: 4つ目

上記の表より、1番コストが低いのは左下の項目です。懸念点はセキュリティなのですが、今回用意したEC2インスタンスはgsutil rsyncを実行するだけで、内向きの通信はオペレーション用のSSHしかありません。public subnetに置く際、アクセス元を限定したSSHだけを許可して露出を最低限にしました。

これらの考察から、コストが一番安く、セキュリティも設定をしっかりすれば担保できる3つ目の経路の構成を採用することにしました。最終的な構成は下記の図の通りです。

architecture

移行手順

今回移行するBigQueryのデータはテーブルの数とテーブルサイズが大きいので、スレッドプールを作ってJOBを効率的に処理するためRubyを用いて自動化しています。

データ転送:BigQuery → GCS

まずgsutil rsyncを扱うにはBigQueryに存在するデータをGCSに移行する必要があります。GoogleはRubyに対してBigQueryのSDKを提供しており、extract_jobメソッドを使うことによって対象のテーブルをGCSに転送できます。extract_jobを使う際にポイントが2つあります。

1つ目のポイントとして、extract_jobメソッドは転送するデータの圧縮形式が指定できる点です。圧縮形式はCSVであれはgzip形式がサポートされています。今回のIQONのデータは10TB以上あることがわかっています。そのためファイルを圧縮して転送できれば先程計算したデータ転送の料金を抑えることができます。また最終的にAmazon Athenaを利用する際もgzip形式でクエリを実行することが可能です。しかしgzipでどれだけ料金が抑えられるかわからないので、いくつかのファイルをgzipで圧縮し確認しました。適当にCSVのファイルを5ファイルほど用意しgzipで圧縮しました。下記の表が圧縮結果です。

圧縮前(Byte) 圧縮後(Byte) 圧縮率(%)
81920 7168 92
11264 2048 82
1266989 49177 62
23552 6843 71
57344 11787 80

gzipに圧縮するとおよそ70〜80%ほど圧縮できました。そのため、10TBのデータも7〜8割ほど圧縮できると予想できます。結論として、データ移行の料金は転送したデータの量に比例するので7〜8割ほどgzipで料金コストを削減できることがわかりました。他にgzipで圧縮した際に起こるデメリットはAmazon Athenaでクエリを実行する際、gzipを解凍する必要があるので速度低下が考えられます。しかしgzipにすることでクエリを実行する際のデータ量を削減できるのでAmazon Athenaの利用料金を抑えることができます。クエリを実行する頻度として年1〜2回程度実行する程度なのでS3の利用料金を抑える利点やAmazon Athenaの利用料金を抑える点を考慮するとgzipで圧縮するメリットが大きいのでgzipで転送しました。

2つ目のポイントはextract_jobを用いてファイルを転送する際、転送するファイルを分割する必要がある点です。分割する必要があるファイルの条件はサイズが1GB以上あるファイルです。そのため転送する元データのサイズが1GB以上の場合は別名を付けてファイルを分割する必要があります。公式ドキュメントによるとワイルドカードで指定ができます。今回は下記のようなURIで分割を行いました。

定義するURI
gs://hoge/file-*.csv

出力されるファイル名
gs://hoge/file-000000000000.csv
gs://hoge/file-000000000001.csv
gs://hoge/file-000000000002.csv
.
.
.

実際に移行で利用したコードを以下に示します。これをEC2上でバックグラウンド実行しました。

require "google/cloud/bigquery"
require "google/cloud/storage"
require "logger"
require "parallel"
def import project_id = ""
  bigquery = Google::Cloud::Bigquery.new(project: project_id)
  storage = Google::Cloud::Storage.new(project: project_id)
  bucket_name = ""
  bucket = storage.bucket("")
  bq_table_name = []
  bigquery.datasets.all.each do |dataset|
    # 100並列で転送を行う
    Parallel.map(dataset.tables.all, in_threads: 100) do |table|
      if (table.bytes_count / (1024.0 * 1024.0 * 1024.0)) < 1
        import_gcs(table, bucket_name, dataset.dataset_id, "-*.csv.gz", "CSV")
      else
        import_gcs(table, bucket_name, dataset.dataset_id, "-*.csv.gz", "CSV")
      end
    end
  end
end

def import_gcs(table, bucket_name, dataset_name, extend, extension)
  log = Logger.new("log.txt")
  uri = "gs://#{bucket_name}/#{dataset_name}/#{table.table_id}/#{table.table_id}#{extend}"
  extract_job = table.extract_job uri, compression: "GZIP", format: extension do |config|
    config.location = "US"
  end
  extract_job.wait_until_done!
  if extract_job.failed?
    log.debug("#{table.table_id} failed") 
    log.debug("#{extract_job.error}")
  end
  return extract_job.failed?
end

import()

実装のポイントはParallelを用いて並列で転送を行っている点です。最初は並列に行わず直列で処理を行っていたのですが1日経っても終わりませんでした。CloudWatchでEC2のメトリクスを確認するとネットワークの帯域やCPU使用率は余裕がありそうでした。BigQuery → GCSの転送自体はGCP側で行っているので100並列で様子を見ながら転送を行いました。その結果、半日かからず終了させることができました。

また、念のためRubyのloggerで簡単なログを取っています。extract_jobの戻り値のfailed?でJobの成功、失敗を確認できます。最初はログを取っておらず、途中でプログラムが落ちた際どのテーブルで失敗したのかがわからず原因を突き止めるのに苦労しました。結論としては、特にJobが失敗したログは発生しませんでした。

最終的にgzipで転送した結果、元のデータサイズと比較すると7〜8割ほどデータを圧縮できました。さらに、料金に関しても7〜8割コストを削減できました。

データ転送:GCS → S3

GCS→S3に関してはgsutil rsyncコマンドを使い転送を行いました。

S3のディレクトリは下記の構成にしました。

├── BigQueryのdataset名
│   ├── BigQueryのtable名
│       ├── (BigQueryのtable名).csv.gz

gsutil rsyncで転送する際は、ルートprefixからgsutil rsyncを行うとエラーが出た際に始点が最初からになってしまうので、今回はdatasetのprefix毎に分けて転送します。

転送には時間がかかるので、ログを残す点やバックグラウンドで動かす点などに注意し、下記のコードを実装しました。

require "google/cloud/bigquery"
require "google/cloud/storage"
require "parallel"
require "logger"
def gcs_to_s3 project_id = ""
  storage = Google::Cloud::Storage.new(project: project_id)
  log = Logger.new("log.txt")
  bucket = storage.bucket("")
  s3_bucket_name = ""
  gcs_bucket_name = ""
  directory_name = bucket.files(delimiter: "/")
  directory_name.prefixes.each do |directory|
    log.debug("start #{directory}")
    success = system("gsutil -m rsync -r gs://#{gcs_bucket_name}/#{directory.gsub("/", "")} s3://#{s3_bucket_name}/#{directory.gsub("/", "")}")
    if not success
      log.debug("failed #{directory}")
      next
    end
    log.debug("success #{directory}")
  end
end

gcs_to_s3()

上記のプログラムで想定通りにgsutil rsync側で転送が行われているか確認を行いました。CloudWatchでEC2のメトリクスを確認するとCPU使用率が飽和している状態でした。CPU使用率が飽和している場合の対策としてEC2インスタンスのインスタンスタイプを上げたり、EC2インスタンスを複数作成して処理を分散する対策が考えられます。しかし、S3とGCPバケットの総データ量を都度確認しファイルの転送速度を確認すると対策をするほど遅くなかったのでこのままの状態で転送を行いました。

ファイルの確認作業

最後の作業として、GCSからS3にデータを転送する際、欠損が起きていないか確認を行います。

確認する項目は以下の通りです。

  • ファイルの存在確認

  • GCSとS3のチェックサム検証

  • GCSとS3のサイズ比較

上記の各項目の確認方法について説明します。

ファイルの存在確認

GCSとS3にあるファイルの存在確認をするためにはコンソール上で確認する方法があります。しかし、ファイル数が数千ファイル存在するのでファイルを1つずつ確認するためには時間と労力が必要です。

そのため、GCS、S3に対象のprefixが存在するか比較し存在の有無を確認します。GCSに存在するファイルはすでにBigQueryから全て転送できていることが確認できているのでGCSのprefixを起点としてS3のprefixを確認します。objectメソッドの戻り値のexits?メソッドで対象のファイルが存在するか確認できます。

GCSとS3のチェックサム検証

チェックサムを確認することによってGCSから送られてきたファイルはGCSと同一のファイルであるか確認できます。hash値の確認に関しては手元に対象のファイルをダウンロードして確認する方法でも可能ですが、こちらも時間と労力が必要です。hash値はGCS、AWSのSDKを使用して確認可能なので各環境のSDKを使用し確認します。

S3に関してはAws::S3::Objectクラスのetagメソッドで確認できます。

GCPではGoogle::Cloud::Storage::Fileクラスのmd5メソッドで確認できます。こちらはbase64でエンコードされた値が返ってくるのでmd5で比較するために一度デコードして比較します。デコードした値はbinaryなのでunpackを行う必要があります。

GCSとS3のサイズ比較

GCSとS3のサイズ比較はgsutil duコマンドを用いることで確認できます。下記のようなコマンドを入力するとbyte表記でバケットの合計サイズを確認できます。

# GCSのバケットの容量の確認する場合
$ gsutil du -s gs://bucket_name
123456
# S3のバケットの容量の確認する場合
$ gsutil du -s s3://bucket_name
123456

GCSとS3のサイズ比較に関してはバケット単位での比較なのでコマンドを複数回実行すれば確認できます。しかし、ファイルの存在確認とチェックサムの検証はファイル単位なので下記のスクリプトを利用して確認します。

require "google/cloud/storage"
require "aws-sdk"
require "parallel"
require "logger"
require "google/cloud/bigquery"
require "digest/md5"
require "base64"

def check_file project_id = "iqon-data-mining"
  storage = Google::Cloud::Storage.new(project: project_id)
  resource = Aws::S3::Resource.new(region: "ap-northeast-1")
  log = Logger.new("log.txt")
  s3 = resource.bucket("iqon-backup")
  bucket = storage.bucket("export-s3-failed")
  files = bucket.files()
  Parallel.map(files.all, in_threads: 100) do |obj|
    dataset_name = obj.name.sub(/\/.+/, "")
    file_name = obj.name.sub(/.+\//, "")
    directory_name = file_name.sub(/-\d+.csv.+/, "").sub(/.csv.+/, "")

    s3_directory = "#{dataset_name}/#{directory_name}/#{file_name}"

    s3object = s3.object(obj.name)

    if (s3object.exists?)
      # gcsのetagとs3のetagを比較
      if (s3object.etag.gsub("\"", "") == Base64.decode64(obj.md5).unpack("H*")[0])
        puts("checked")
      else
        log.debug("file etag validation failed at gcs: #{obj.name}/s3: #{s3_directory}")
      end
    else
      log.debug("file not found gcs: #{obj.name}/s3: #{s3_directory}")
    end
  end
end

check_file()

今回、上記のプログラムで確認作業を行った際、GCSとS3のファイルのhash値が合わない問題に遭遇しました。原因としては、GCS → S3へ転送済みのテーブルに対してBigQuery → GCSへファイル転送を再び行いGCSのファイルを上書きしてしまったことでした。BigQuery → GCSへ転送する場合、CSVの行の順序が保証されていません。そのためBigQuery → GCSへ転送するたびにhash値が変わってしまいます。結局S3に保存されているhash値が一致しないファイルを削除し、削除したファイルをGCSから再転送を行いました。

最後にAmazon Athenaを使ってS3に転送完了したファイルに対してクエリを実行してみました。結果としてgzipで圧縮されたファイルでも問題なく中身を確認できました。

まとめ

BigQueryからS3に移行するまでの手順を紹介しました。S3へデータ移行が完了したのでGCP側のリソースを削除できました。今回のデータ移行は転送するデータ量がかなり多く、転送完了するまで数日かかりました。また移行するデータ量が多い場合は転送時に発生する料金も多く発生し、選定するアーキテクチャによって料金も大きく変わることがわかりました。そのため、たった1回のデータ転送でもデータを移行する前に移行でかかる時間と料金を見積もることが重要になると感じました。

移行が完了した後、継続的にS3の料金が発生します。現在S3の料金プランはS3標準プランに設定していますが徐々にプランを変更し最終的にS3 Glacier Deep Archiveプランへ移行する予定です。気をつけるべき点としてAmazon AthenaがS3にアクセスできるプランはストレージタイプがスタンダードかスタンダードIAであることがあげられます。S3 Glacierプランまで変更するとAmazon Athenaでアクセスするにはファイルをrestoreする必要があります。そのため今後Amazon Athenaでクエリを打つ必要がなくなったタイミングでS3 Glacier Deep Archiveプランへ移行します。

aws.amazon.com

料金を比較するとBigQueryの場合長期保存プランは0.010(USD/GB)に対してS3のS3 Glacier Deep Archiveプランは0.002(USD/GB)です。そのためS3 Glacier Deep Archiveプランに移行すると月およそ82USD削減できます。

MA基盤チームではデータ転送に関わる業務が多く、他のチームと連携しながら仕事をすることがあります。今回のタスクをこなすことで他部署と関わりながらデータの転送方法について知ることができました。

最後に

ZOZOテクノロジーズではより良いサービスを提供するための基盤作りを開発したい仲間を募集中です。以下のリンクからご応募ください。

https://tech.zozo.com/recruit/tech.zozo.com

ZOZOTOWNを支えるリアルタイムデータ連携基盤

概要

こんにちは、SRE部MA基盤チームの谷口(case-k)です。私達のチームでは、データ連携基盤の開発・運用をしています。

データ基盤には大きく分けて2種類あり、日次でデータ連携してるものとリアルタイムにデータ連携しているものがあります。本記事ではリアルタイムデータ連携基盤についてご紹介します。

既存のデータ連携基盤の紹介

まず既存のデータ連携基盤について簡単にご紹介させていただきます。

既存のデータ連携基盤ではオンプレ環境やクラウドにあるデータをBigQueryへ日次で1回連携しています。ETLツールはSQL Server専用の「bcp」とTreasure Dataが開発しているOSSである「Embulk」を使っています。まずbcpを使い、オンプレ環境の基幹データベース内のテーブルを中間データベースへ連携します。中間データベースへ連携されたデータはEmbulkを使って、BigQueryへ連携されます。この際秘密情報のハッシュ化なども行っています。

これらの処理はワークフローエンジンで制御されていてTreasure Dataが開発しているOSSであるDigdagを使っています。余談ですがZOZOテクノロジーズにはDigdagのコントリビューターが7人もいます!

概要

リアルタイムデータ連携基盤の紹介

なぜ必要なのか

これまでZOZOテクノロジーズでは日次でBigQueryへデータを連携していました。最近では機械学習を使った案件も増えてきており、リアルタイムなデータを必要とするサービスが増えてきています。機械学習の他にも配信基盤では商品が残り1点になったタイミングで通知を行う仕組みがあります。このような要件に対応するためには商品の在庫状況をリアルタイムで連携する必要があります。その他にも、施策のモニタリングや不正利用を素早く検知したいなど様々な案件があり、ZOZOテクノロジーズでもリアルタイムデータ連携基盤を構築することになりました。

活用事例の紹介

リアルタイムデータ連携基盤は検索パーソナライズ基盤の商品在庫の連携で使われています。検索パーソナライズとはユーザごとに商品をおすすめ順で紹介する機能となります。商品在庫がないとユーザに対してレコメンドしたにもかかわらず、商品が既に売り切れているといった機会損失が起きてしまいます。このような要件から検索パーソナライズ基盤ではリアルタイムに商品の在庫状況を知る必要があります。

概要

データ連携の仕組みと課題

リアルタイムデータ連携基盤はオンプレ環境からGCP環境まで多段にレプリケーションを行いデータ連携をしています。SQL ServerからKafkaへの差分連携にはQlik Replicateを採用してます。

Qlik ReplicateはSQL ServerからCDCを取得し、解析可能なメッセージの形へ変換する役割をになっています。CDCとはChange Data Captureの略で、データベース内で行われた変更履歴を追うことができる機能です。オンプレ環境のSQL Serverはバージョンが古くCDCを使うことができないため、Compute Engine上にSQL Serverを立てて差分データを取得しています。

概要

https://www.qlik.com/us/attunitywww.qlik.com techblog.zozo.com

多段にレプリケーションを行うことで高頻度連携を実現しようとしましたが、運用する過程でデータの欠損や遅延が課題としてあがりました。

  • データの遅延
    オンプレ環境からBigQueryまでに多段にレプリケーションを行うことで10分から30分程度の遅延が発生していました。
  • データの欠損
    既存の処理系にメモリリークがあり、定期的な再起動によるデータの欠損も発生していました。
  • コスト
    インフラ費用も月額で約200万円程度かかっており汎用的な基盤として使うには課題がありました。

 リプレイス後のリアルタイムデータ連携基盤

既存のリアルタイムデータ連携基盤の課題を解決するため、私たちのチームでリプレイスをすることになりました。

SQL Serverの差分データの取り方を検討

新規に作成するリアルタイムデータ連携基盤では、差分の取得にSQL ServerのChange Trackingを採用しました。

Change TrackingとはCDCのようにSQL Serverの差分データを取得する仕組みです。CDCとの違いはCDCが非同期的な連携であるのに対しChange Trackingは同期的に連携します。CDCよりもリアルタイム性をもって連携できる一方で、変更履歴などは取得はできません。取得できるのは削除や更新、追加といった更新処理の内容や更新バージョン、それと更新のあった主キーのみです。

具体的には、次のようにして差分データの最新の状態を取得しています。

  SELECT
    a.SYS_CHANGE_OPERATION as changetrack_type,
    a.SYS_CHANGE_VERSION as changetrack_ver,
    #{columns}
  FROM
    CHANGETABLE(CHANGES #{@tablename},
      @前回更新したバージョン) AS a
  LEFT OUTER JOIN #{@tablename} ON a.#{@primary_key} = b.#{@primary_key}

差分データの取得方法としてはChange Tracking以外にも、テーブルの更新タイムスタンプを参照する方法やCDCを使う方法も検討しました。

しかし、更新タイムスタンプは付与されているテーブルが非常に少なく、付与されていても更新されないタイムスタンプが多くありました。CDCもオンプレ環境にCDCが使える2016以降のSQL Serverがほとんどありませんでした。また、非同期的なCDCより同期的なChange Trackingの方が高速にデータを取得できます。

このような理由からChange Trackingを使って差分データを取得することになりました。

アーキテクチャ概要と処理の流れ

ここからはリプレイス後のリアルタイムデータ連携基盤のアーキテクチャ概要と処理の流れについてご紹介します。

アーキテクチャの全体図は次の通りです。

概要

Fluentdのプラグインを使った差分データの取得

Change Trackingの実行からPub/Subへのメッセージ転送はFluentdのプラグインを使っています。

冗長構成を実現するため、Compute Engine 2台にプラグインをデプロイしています。片方のインスタンスに問題が起きても、もう片方が生きていればデータの欠損が起きない仕組みとなっています。

オンプレ環境からGCP環境へ高速にデータ連携できるよう専用線としてDedicated Interconnectを使っています。多段にレプリケーションを行ったことによる遅延が課題だったので、最も根元の基幹データベースからデータを取得するようにしています。取得したデータはPub/Subのアウトプットプラグインを使い転送されます。

cloud.google.com

概要

プラグインでは次のようにChange Tracking実行時にレコード単位でユニークとなるメッセージIDを生成しています。生成されたメッセージIDはDataflowのメッセージの重複排除で使います。

BigQueryで主キーの最新の状態を集計できるようChange Trackingのバージョンも渡しています。Dataflowでテーブル名を考慮してBigQueryへ書き込みができるようテーブル名も渡しています。

query = """
  declare @last_synchronization_version bigint;
  SET @last_synchronization_version = #{changetrack_ver};
  SET lock_timeout #{@lock_timeout}
  SELECT
    CONCAT('#{@tablename}','-',a.#{@primary_key.join(',').gsub(',', ',a.')},a.SYS_CHANGE_VERSION) as massage_unique_id,
    '#{@tablename}' as table_name,
    '#{@changetrack_interval}' as changetrack_interval,
    '#{Time.now.utc}' as changetrack_start_time,
    a.SYS_CHANGE_OPERATION as changetrack_type,
    a.SYS_CHANGE_VERSION as changetrack_ver,
    #{columns}
  FROM
    CHANGETABLE(CHANGES #{@tablename},
      @last_synchronization_version) AS a
  LEFT OUTER JOIN #{@tablename} ON a.#{@primary_key} = b.#{@primary_key}
"""

Pub/Subのアウトプットプラグインでは差分データに加えてattributeにDataflowの重複排除で使うメッセージIDを渡しています。Dataflowの重複排除は次章でご紹介します。

<system>
  workers '<worker count>'
</system>
<worker 1>
  <source>
    // Input Plugin
  </source>
  <match '<tag_name>'
    @type gcloud_pubsub
    project "#{ENV['PROJECT_ID']}"
    key /usr/src/app/config/gcp_credential.json
    topic "projects/#{ENV['PROJECT_ID']}/topics/<topic-name>"
    autocreate_topic false
    max_messages 1000
    max_total_size 9800000
    max_message_size 4000000
    attribute_keys ["message_unique_id"]
    // Buffer Plugin
  </match>
</worker>

github.com docs.fluentd.org

Dataflowでメッセージの重複を排除

Fluentd2台の冗長構成によるデータの重複はDataflowのidAttributeを使い重複を排除しています。idAttributeを使うことで、プラグインで付与したメッセージIDを参照して10分以内であれば同じメッセージの重複を排除します。Dataflowを使うとPub/Subで自動的に付与されるメッセージIDの重複は自動で排除でき、at least onceを採用しているPub/Subとは相性の良いツールです。しかし、パブリッシャーが複数回同じメッセージを送った場合、Pub/Subでは異なるメッセージと扱われるため重複の自動排除はできません。このような理由からメッセージのユニーク性を担保したい場合はDatafflowでidAttributeを使います。idAttributeを使うことで2台のFluentdから送られてくるデータの重複を排除しています。

概要

次のサンプルはidAttributeを使ってメッセージの重複排除をする例です。

public static PipelineResult run(Options options) {
  // Create the pipeline
  Pipeline pipeline = Pipeline.create(options);
  pipeline
      .apply(
          "Read PubSub Events",
          PubsubIO.readMessagesWithAttributes()
          .withIdAttribute("message_unique_id")
          .fromSubscription(options.getInputSubscription()))
      .apply(
          "Filter Events If Enabled",
          ParDo.of(
              ExtractAndFilterEventsFn.newBuilder()
                  .withFilterKey(options.getFilterKey())
                  .withFilterValue(options.getFilterValue())
                  .build()))
      .apply("Write PubSub Events", PubsubIO.writeMessages().to(options.getOutputTopic()));
  return pipeline.run();
}

cloud.google.com cloud.google.com

Dataflowで動的にBigQueryの各テーブルに出力

Pub/Subに送られ重複排除されたメッセージはDataflowを使ってBigQueryのテーブルに書き込まれます。DataflowのDynamic Destinationsを使うとメッセージ内のテーブル名に基づいて、出力先のテーブルを動的に振り分けることが可能です。そのため、Dynamic Destinationsを使うことで、1つのDataflowで複数テーブルのデータ連携ができるようになりインフラコストを抑えることができます。

なお、DataflowのDynamic Destinations機能は現時点だとJavaのみサポートしてます。

概要

次のサンプルはDynamic Destinationsを使ってストリーム内のテーブル名を参照してBigQueryのテーブルに書き込む例です。監視や分析用の遅延時間を計測するため、BigQueryへのインサート時刻も取得しています。

WriteResult writeResult = convertedTableRows.get(TRANSFORM_OUT)
.apply(
    BigQueryIO.<TableRow>write()
        .to(
            new DynamicDestinations<TableRow, String>() {
            @Override
            public String getDestination(ValueInSingleWindow<TableRow> elem) {
                return elem.getValue().get("table_name").toString();
            }
            @Override
            public TableDestination getTable(String destination) {
                return new TableDestination(
                    new TableReference()
                        .setProjectId("project_id")
                        .setDatasetId("dataset_name")
                        .setTableId("table_prefix" + "_" + destination), // destination: table name
                    "destination table" + destination);
            }
            @Override
            public TableSchema getSchema(String destination) {
                TableSchema schema =  new TableSchema()
                switch (destination) {
                case "table_a":
                    schema.setFields(ImmutableList.of(new TableFieldSchema().setName("column").setType("STRING").setMode("NULLABLE")));
                    break;
                case "table_b":
                    schema.setFields(ImmutableList.of(new TableFieldSchema().setName("column").setType("STRING").setMode("NULLABLE")));
                    break;
                default:
                }
                return schema
            }
        })
        // BigQuery Insert Time
        .withFormatFunction((TableRow elem) -> elem.set("bigquery_insert_time", Instant.now().toString()))
        .withoutValidation()
        .withCreateDisposition(CreateDisposition.CREATE_NEVER)
        .withWriteDisposition(WriteDisposition.WRITE_APPEND)
        .withExtendedErrorInfo()
        .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
        .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));

www.case-k.jp beam.apache.org

Pub/Subのメッセージ管理

重複排除されてPub/Subに送られてきたメッセージは別のサブスクライバーからも参照できるよう7日間メッセージを保持しています。新しくサブスクライバーを作ればBigtableなどBigQuery以外にも出力できるようになっており、Dataflowのウィンドウ処理等でリアルタイムに特徴量生成などもできるようになっています。

概要

次のサンプルはPub/Subでメッセージを7日間保持するTerraformの例となります。retain_acked_messagesをtrueとすることでサブスクライブされたメッセージを破棄せずに保持します。

message_retention_durationはメッセージの保有期間を決めることができます。なお、メッセージの保有期間は最大で7日です。

resource "google_pubsub_subscription" "message_hub" {
  name  = "message_hub"
  topic = google_pubsub_topic.message_hub.name
  # subscribe from multiple subscriber
  message_retention_duration = "604800s"
  retain_acked_messages      = true
  ack_deadline_seconds = 60
}

cloud.google.com beam.apache.org

イベントログ収集基盤

まだ構想段階ではありますが、データ量の多いイベントログのリアルタイムデータ連携基盤も作ろうとしています。イベントログについてもPub/Subに投げてもらうのが理想的ですが、クライアント側の負担も考慮し現在検討中です。

概要

個人情報の取り扱い

BigQueryとPub/Subに保持される個人情報や秘密情報はアクセスできるユーザを制限しています。

BigQueryではカラムレベルでのアクセス制御を行い、Pub/Subはトピック単位でアクセス制御をしています。カラムレベルのアクセス制御を行うため、ポリシータグを個人情報や秘密情報のカラムに付与しています。ポリシータグとはBigQueryのテーブルに対してカラムレベルのアクセス制御を行うリソースです。

ポリシータグのカラム付与はTerraformで次のようにしてできます。ポリシータグ自体を作ることはまだTerraformではサポートされていないようです。

resource "google_bigquery_table" "table-name" {
  dataset_id = google_bigquery_dataset.<dataset-name>.dataset_id
  table_id   = "<table-name>"
  schema = <<EOF
  [
    {
      "name": "column-name>",
      "type": "STRING",
      "mode": "NULLABLE",
      "policyTags": {
        "names": [
          "projects/<project-id>/locations/<location>/taxonomies/<taxonomies-id>/policyTags/<policy-tag-id>"
        ]
      }
    }
  ]
  EOF
}

cloud.google.com github.com

Pub/SubはDataflowで個人情報や秘密情報をNULL置換したトピックを作ろうと考えています。トピック単位で参照ユーザを制限することで、秘密情報を必要としないサブスクライバーからは参照できないようにします。

cloud.google.com

ビルド・デプロイ戦略

FluentdのプラグインとDataflowのビルド・デプロイ方法についてご紹介できればと思います。CI/CDツールとしてはCircleCIを使っています。Fluentdのプラグインはコンテナイメージを作り、作られたコンテナイメージをContainer RegistryにPUSHしています。Container RegistryのコンテナイメージはCompute Engine起動時にPULLされデプロイされます。データ欠損や遅延が発生しないよう2台のCompute Engineを1台ずつ再起動させ無停止でデプロイできるようにしています。

module "gce-container" {
  source  = "Terraform-google-modules/container-vm/google"
  version = "~> 2.0"

  container = {
    image = "gcr.io/${var.project}/<image-name>"
    tty : true
  }
  restart_policy = "Always"
}

resource "google_compute_instance" "compute engine" {
  name         = "name"
  machine_type = "n2-custom-4-10240"
  zone         = "asia-northeast1-a"

  boot_disk {
    initialize_params {
      image = module.gce-container.source_image
      size  = 500
    }
  }

  metadata_startup_script = "#!/bin/bash /usr/bin/docker-credential-gcr configure-docker EOF"

  metadata = {
    gce-container-declaration = module.gce-container.metadata_value
    google-logging-enabled    = "true"
    google-monitoring-enabled = "true"
  }

  service_account {
    email = "${google_service_account.tracker_app.email}"
    scopes = [
      "https://www.googleapis.com/auth/cloud-platform",
    ]
  }
}

Dataflowはカスタムテンプレートをビルドし、既存のパイプラインの更新を行います。

DataflowのカスタムテンプレートはGoogle提供のテンプレートをベースにカスタマイズしています。ビルド時にenableStreamingEngineオプションを利用すると使用するディスク容量を420GBから30GBにインフラ費用を抑えることができます。

次のコードはテンプレートをビルドする際にenableStreamingEngineオプションを指定する例です。

mvn -Pdataflow-runner compile exec:java \
  -Dexec.mainClass=com.google.cloud.teleport.templates.PubsubToPubsub \
  -Dexec.args="--project=${project_id} \
              --tempLocation=gs://${project_id}/tmp \
              --templateLocation=gs://${project_id}/templates/<template-name> \
              --experiments=enable_stackdriver_agent_metrics \
              --enableStreamingEngine \
              --runner=DataflowRunner"

cloud.google.com cloud.google.com github.com

ビルドされたテンプレートはupdateオプションを使い既存のパイプラインの更新を行っています。互換性チェックにより、中間状態やバッファデータなどが前のジョブから置換ジョブに確実に転送することが可能です。

次のコードはPythonクライアントを使ってパイプラインを更新する例です。

def create_template_request(self, job_name, template_path, parameters, environment, update_options):
    request = self.dataflow.projects().templates().launch(
        projectId = self.project_id,
        location = 'us-central1',
        gcsPath = template_path,
        body = {
            "jobName": job_name,
            "parameters": parameters,
            "environment": environment,
            "update": update_options
        }
    )
    return request

def deploy_dynamic_destinations_datatransfer(self, active_jobs):
    job_name='dynamic_destinations_datatransfer'
    template_name = 'PubSubToBigQueryDynamicDestinationsTemplate'
    template_path = "gs://{}/templates/{}".format(self.project_id, template_name)
    input_subscription = 'message_hub'
    output_default_table = 'streaming_datatransfer.streaming_dynamic_changetracktransfer'
    parameters = {
        "inputSubscription": "projects/{}/subscriptions/{}".format(self.project_id, input_subscription),
        "outputTableSpec": "{}:{}".format(self.project_id, output_default_table),
        "autoscalingAlgorithm": "THROUGHPUT_BASED"
    }
    environment = {
        "machineType": 'n2-standard-2',
        "maxWorkers": 5
    }
    update_options='false'
    if 'dynamic_destinations_datatransfer' in active_jobs:
        update_options='true'
    request = self.create_template_request(job_name, template_path, parameters, environment, update_options)
    request.execute()

cloud.google.com

監視

監視対象としてはデータの欠損や遅延が発生してないかCloud LoggingやMonitoring、Redashを使い監視しています。

データの欠損

データの欠損はリトライログとメモリの使用率を確認してます。リトライの上限を超えるとデータが欠損してしまうのと、メモリの使用率が100%に達すると基幹データベースへ接続ができなくなるからです。

次のようにしてFluentdのプラグイン側でリトライ時にログを出力しています。

def execute_changetracking(changetrack_ver)
  try = 0
  begin
    try += 1
    query = generate_query(changetrack_ver)
    changetrack_results = execute_query(query)
    if !changetrack_results.nil?
      changetrack_results.each_slice(@batch_size) { |rows|
        es = MultiEventStream.new
        rows.each do |r|
          r["changetrack_end_time"] = Time.now.utc
          es.add(Fluent::Engine.now, r)
          if changetrack_ver < r["changetrack_ver"] then
            changetrack_ver = r["changetrack_ver"]
          end
        end
        router.emit_stream(@output_tag, es)
      }
      update_changetrack_version(changetrack_ver)
    end
  rescue => e
    puts "Write Retry Cnt: #{try}, Table Name: #{@tablename}, Error Message: #{e}"
    sleep try**2
    retry if try < @retry_max_times
    raise
  end
end

リトライ時のログはCompute Engine起動時にデプロイしたCloud Loggingエージェントでログを取得しています。

metadata = {
  gce-container-declaration = module.gce-container.metadata_value
  google-logging-enabled    = "true"
  google-monitoring-enabled = "true"
}

次のようにしてCloud Loggingでメトリクスを作ります。今回リトライ回数が10回でアラートを通知するように設定しました。

resource "google_logging_metric" "retry_error_tracker_a_metric" {
  name   = "retry-error-tracker-a/metric"
  filter = "resource.type=\"gce_instance\" severity>=DEFAULT jsonPayload.message: \"Write Retry Cnt: 10\" resource.labels.instance_id: \"${google_compute_instance.streaming_datatransfer_a.instance_id}\""
  metric_descriptor {
    metric_kind = "DELTA"
    value_type  = "INT64"
  }
}

作成したメトリクスを使いCloud Monitoringでアラートを通知します。

resource "google_monitoring_alert_policy" "tracker_a_retry_error_alert_policy" {
  display_name = "Tracker A Retry Error"
  depends_on   = [google_logging_metric.retry_error_tracker_a_metric]
  combiner     = "OR"
  conditions {
    display_name = "condition"
    condition_threshold {
      filter     = "metric.type=\"logging.googleapis.com/user/retry-error-tracker-a/metric\" resource.type=\"gce_instance\""
      duration   = "0s"
      comparison = "COMPARISON_GT"
      aggregations {
        alignment_period   = "60s"
        per_series_aligner = "ALIGN_DELTA"
      }
      trigger {
        count = 1
      }
      threshold_value = 0
    }
  }
  enabled = true
  # gcloud alpha monitoring policies list --project=streaming-datatransfer-env
  notification_channels = ["projects/${var.project}/notificationChannels/${var.slack_notification_channel_id}"]
}

メモリが枯渇するとプラグインから基幹データベースへのデータ取得が失敗するので、メモリ使用率が80%を超えた場合アラートを投げるよう設定してます。

resource "google_monitoring_alert_policy" "tracker_a_memory_alert_policy" {
  display_name = "Tracker A Memory Utilization"
  combiner     = "OR"
  conditions {
    display_name = "condition"
    condition_threshold {
      filter     = "metric.type=\"agent.googleapis.com/memory/percent_used\" resource.type=\"gce_instance\" resource.labels.instance_id=\"${google_compute_instance.streaming_datatransfer_a.instance_id}\" metric.label.\"state\"=\"used\""
      duration   = "60s"
      comparison = "COMPARISON_GT"
      aggregations {
        alignment_period   = "60s"
        per_series_aligner = "ALIGN_MEAN"
      }
      trigger {
        count = 1
      }
      threshold_value = 80
    }
  }
  enabled = true
  notification_channels = ["projects/${var.project}/notificationChannels/${var.slack_notification_channel_id}"]
}

データの遅延

データの遅延はRedashを使い定期的にクエリを投げて監視しています。

データの遅延はChange Trackingの開始時間とBigQueryのインサート時刻の差分を確認しています。不正なレコードが混在した際はBigQueryの_error_recordsテーブルに書き込まれるため、書き込みを検知してアラートを通知するようにします。

また、CPUの使用状況も遅延に影響があるため、メモリ使用率と同様に監視しています。

filter     = "metric.type=\"agent.googleapis.com/cpu/utilization\" resource.type=\"gce_instance\" resource.labels.instance_id=\"${google_compute_instance.streaming_datatransfer_a.instance_id}\" metric.label.\"state\"=\"used\""

性能評価

リプレイスよってデータ欠損もなくなり、遅延時間としても取得のインターバルを除けば数秒程度でデータ連携を行うことができるようになりました。

次の図はChange Trackingで取得したレコード数と遅延時間(秒)の関係となります。取得するレコード数が多いと遅延しますが、40万レコードほどの更新でも5分以内に連携できる基盤を作ることができました。 概要

またコスト面でも月間で約200万円ほどかかっていましたが約5万円程度にできました。

まとめ

今回リアルタイムデータ連携基盤についてご紹介しました。

現在ZOZOTOWNでは、リアルタイムデータを活用した案件が増えてきています。この記事を読んで、もしご興味をもたれた方は是非採用ページからお申し込みください。

https://tech.zozo.com/recruit/tech.zozo.com

また、8/27(木)にリアルタイムデータ連携基盤含めMAの取り組みについてのイベントを行いますのでぜひご参加ください。

zozotech-inc.connpass.com

BigQueryでユーザー定義関数(UDF)は武器になるという話

OGP

はじめに

こんにちは。ZOZOTOWN部サービスグロースチームでアナリティクスをしている井ノ口です。

この記事ではBigQueryで使える、ユーザー定義関数(UDF)という便利な武器をご紹介します。「UDFって何?」「何のために使うの?」という方に向けた記事のため、高度な分析などはこの記事では扱いません。

続きを読む

ZOZOTOWNのインハウス広告運用を支援するデータと仕組みの話

ogp

こんにちは。ECプラットフォーム部 推薦基盤チームで、DWH・DMP・広告まわりのデータエンジニアリングを担当している大谷です。

本記事では、マーケティング部門の広告運用のインハウス化に伴ってこれまで取り組んできた広告データの収集と活用、その仕組みにフォーカスして事例をご紹介します。

続きを読む
カテゴリー