INFORMATION_SCHEMAを用いたBigQueryのストレージ無駄遣い調査

ogp

こんにちは、『地球の歩き方ムー』創刊のニュースに心を踊らせている、データ基盤ブロックの塩崎です。

本記事では、データ基盤の管理者としてBigQueryのストレージコストの削減に取り組んだ事例を紹介します。

BigQuery費用はクエリ費用だけではない

ZOZOのデータ基盤として利用されているBigQueryは、非常にパワフルなDWH(Data WareHouse)です。しかし、それ故に利用者の意図しないところで費用が高騰することもしばしば発生します。よく問題になるのはクエリ費用の高騰であり、以下のQiita記事はBigQuery利用者の中でも有名です。

qiita.com

このクエリ費用の高騰に対し、我々データ基盤ブロックはこれまでに、いくつもの方法で対処してきました。具体的な取り組みの一部は以下の記事で紹介しているので、併せてご覧ください。

techblog.zozo.com techblog.zozo.com

しかし、BigQueryの費用はクエリに関するもののみではありません。以下のドキュメントによると、BigQueryの費用はクエリに関する費用(Analytis)とストレージに関する費用(Storage)の2つがメインであることが分かります。

BigQuery pricing has two main components: Analysis pricing is the cost to process queries, including SQL queries, user-defined functions, scripts, and certain data manipulation language (DML) and data definition language (DDL) statements that scan tables. Storage pricing is the cost to store data that you load into BigQuery.

cloud.google.com

このストレージに関する費用は、USマルチリージョンの場合、1か月1GBあたり0.020 USDであり、90日間変更のないテーブルはその半額の0.010 USDに自動的に割引されます。

この単価は、Google Cloud StorageAmazon S3などと比較しても安価であり、BigQueryの導入初期はあまり気にならないことも多いです。しかし、BigQueryをデータ基盤として長年利用すると、徐々にストレージ利用量が増加することもしばしば発生します。

現在のZOZOのデータ基盤は約100のGCPプロジェクト、約1000のデータセット、数十万以上のテーブルにまたがる大規模なものへと成長しました。これらの全てのデータを1つのチームが把握することは非現実的であるため、各GCPプロジェクト毎に管理者を立て分割統治を行っています。そのため、全てのプロジェクトの中にある、全てのテーブルのデータサイズを一覧で表示して可視化を行うダッシュボードを作成しました。そして、そのダッシュボードに基づき、不必要にストレージコストが高騰している疑いのあるテーブルを洗い出しました。それらのテーブルの情報を個別に管理者に連絡することでコストの削減に成功しました。

以降で、その具体的な流れを説明していきます。

ストレージコストの可視化

本章では、ストレージ利用量の調査から、Data Studioで可視化するまでの流れを説明します。

ストレージ利用量の調査方法

はじめに、BigQueryのストレージ利用量をダンプして1つのテーブルに集約します。BigQueryのストレージ利用量は INFORMATION_SCHEMA.PARTITIONS に格納されているので、それを参照します。

cloud.google.com

このビューの STORAGE_TIER 列を参照すると ACTIVELONG_TERM かが分かり、1GBあたりの単価が分かります。しかし、今回は分かりやすさのために、この部分はあえて無視していることをご了承ください。全てのプロジェクトのPARTITIONSビューを一括で取得する方法があれば楽なのですが、現時点ではそのような仕組みがないため、分割して取得します。大量のテーブルの情報を分割して取得するにあたり、特に以下の2点に注意する必要があります。

  1. PARTITIONSビューから多くのテーブルの情報を取得するとエラーになる
  2. 1つのテーブルに対するINSERTは1日あたり1000回の上限がある

PARTITIONSビューから多くのテーブルの情報を取得するとエラーになる注意点

1点目は、PARTITIONSビューのドキュメントにも記載のない罠であり、特に注意が必要です。多くのテーブルを保持しているデータセットに対して無邪気に以下のようなクエリを実行するとエラーになります。

SELECT * FROM `project_id`.`dataset_name`.INFORMATION_SCHEMA.PARTITIONS
発生するエラー:
INFORMATION_SCHEMA.PARTITIONS query attempted to read too many tables.
Please add more restrictive filters.

このエラーが発生する閾値はドキュメントに記載がないため、正確な値は不明です。テーブルの数を変えながら実験した結果、テーブルの数が1000程度であればエラーが発生しないため、以下のようなWHERE句を使い参照するテーブルの数を限定するようにしました。

WHERE table_id IN (table_name1, table_name2, ..., table_name1000)

1つのテーブルに対するINSERTは1日あたり1000回の上限がある注意点

次に2つ目の注意点です。前述の通り、PARTITIONSビューからの情報取得は1000テーブル毎に分割されます。そのため、ストレージ容量をまとめるテーブルに対するINSERTの回数もそれに応じて増加します。

INSERT INTO bq_storage_stats
SELECT * FROM `project_id`.`dataset_name`.INFORMATION_SCHEMA.PARTITIONS
WHERE table_id IN (table_name1, table_name2, ..., table_name1000);

INSERT INTO bq_storage_stats
SELECT * FROM `project_id`.`dataset_name`.INFORMATION_SCHEMA.PARTITIONS
WHERE table_id IN (table_name1001, table_name1002, ..., table_name2000);

...

一方で、BigQueryは1つのテーブルに対するDML操作の上限が1日あたり1500回に設定されています。

cloud.google.com

そのため、多くのプロジェクト・データセットに関する情報を取得する際には、この上限に気をつける必要があります。我々の環境では上限に達してしまったため、Streaming Insertを行うことでエラーを回避しました。Streaming Insertの上限は先程のDMLの上限とは別であり、閾値がかなり大きいため回避策として利用できます。

cloud.google.com

複数のGCPプロジェクトのBigQueryのストレージ利用量を収集するスクリプトを以下に示します。

from google.cloud import bigquery
from itertools import zip_longest, groupby
import time
import string
import random
import concurrent.futures

# 集計対象のGCPプロジェクトIDの配列
project_ids = [
  'project_id1',
  'project_id2',
  ...
]

# ストレージ利用量を集約するテーブル
destination_table = 'project_id.dataset_id.table_name'

# Ref: https://docs.python.org/3/library/itertools.html#itertools-recipes
def grouper(iterable, n, fillvalue=None):
    args = [iter(iterable)] * n
    return zip_longest(*args, fillvalue=fillvalue)

def create_destination_table(client, destination_table):
    schema = [
        bigquery.SchemaField("project_id", "STRING", mode="NULLABLE"),
        bigquery.SchemaField("dataset_name", "STRING", mode="NULLABLE"),
        bigquery.SchemaField("table_name", "STRING", mode="NULLABLE"),
        bigquery.SchemaField("table_rows", "INTEGER", mode="NULLABLE"),
        bigquery.SchemaField("total_logical_bytes", "INTEGER", mode="NULLABLE"),
        bigquery.SchemaField("total_billable_bytes", "INTEGER", mode="NULLABLE"),
    ]
    table = bigquery.Table(destination_table, schema=schema)
    client.create_table(table)

def get_dataset_names(client, project_id):
    query = f"SELECT SCHEMA_NAME AS dataset_name FROM `{project_id}`.`region-us`.INFORMATION_SCHEMA.SCHEMATA ORDER BY SCHEMA_NAME ASC"
    rows = client.query(query)

    return [row['dataset_name'] for row in rows]

def get_table_names(client, project_id):
    query = f"SELECT table_schema AS dataset_name, table_name from `{project_id}`.`region-us`.INFORMATION_SCHEMA.TABLES ORDER BY table_schema ASC, table_name ASC"
    rows = client.query(query)
    return rows

def generate_table_size_job(client, project_id, dataset_name, table_names, destination_table):
    table_names_str = ",".join(['"' + t + '"' for t in table_names])
    query = f"""
        SELECT
            table_catalog AS project_id,
            table_schema AS dataset_name,
            table_name,
            sum(total_rows) AS total_rows,
            sum(total_logical_bytes) AS total_logical_bytes,
            sum(total_billable_bytes) AS total_billable_bytes
        FROM `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.PARTITIONS
        WHERE table_name IN ({table_names_str})
        GROUP BY table_catalog, table_schema, table_name
    """

    return client.query(query, bigquery.job.QueryJobConfig(priority="BATCH"))

def retvieve_rows(query_job):
    exception = query_job.exception()
    if exception is not None:
        print(exception)
        print("Error occurred during the execution of the following query")
        print(query_job.query)
        raise exception

    results = []
    for row in query_job.result():
        results.append(row)

    return results

client = bigquery.Client()

temp_destination_table = destination_table + "_" + ''.join(random.choices(string.ascii_letters + string.digits, k=16))

print(f"Temp Table: {temp_destination_table}")

create_destination_table(client, temp_destination_table)
print(f"created {temp_destination_table}")

# 高速化のためにBigQueryへのJobを並列して投げる
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
    for project_id in project_ids:
        dataset_names = get_dataset_names(client, project_id)
        dataset_count = len(dataset_names)
        print(f"{dataset_count} dataset(s) found in {project_id}.")

        dataset_table_names = get_table_names(client, project_id)

        query_job_futures = []
        for dataset_name, rows in groupby(dataset_table_names, lambda r: r['dataset_name']):
            table_names = [row['table_name'] for row in rows]
            table_count = len(table_names)
            print(f"{table_count} table(s) found in {project_id}.{dataset_name}.")

            # 1000テーブル毎に分割してPARTITIONSビューにクエリを投げる
            for table_names_chunk in grouper(table_names, 1000):
                table_names_chunk = [t for t in table_names_chunk if t is not None]
                query_job = executor.submit(generate_table_size_job, client, project_id, dataset_name, table_names_chunk, temp_destination_table)
                query_job_futures.append(query_job)

        print(f"waiting for all query jobs have been created")
        concurrent.futures.wait(query_job_futures)

        query_jobs = [f.result() for f in query_job_futures]
        query_job_count = len(query_jobs)
        print(f"{query_job_count} query jobs has been created")

        while not all([q.done() for q in query_jobs]):
            print("waiting for all jobs completed")
            time.sleep(1)

        print(f"{query_job_count} query jobs has been completed")

        rows_futures = []
        for query_job in query_jobs:
            rows_future = executor.submit(retvieve_rows, query_job)
            rows_futures.append(rows_future)

        concurrent.futures.wait(rows_futures)
        results = []
        for rows_future in rows_futures:
            results += rows_future.result()

        result_count = len(results)
        print(f"{result_count} rows retvieved.")

        if results:
            for results_chunk in grouper(results, 1000):
                results_chunk = [r for r in results_chunk if r is not None]
                insert_errors = client.insert_rows(client.get_table(temp_destination_table), results_chunk)
                results_chunk_count = len(results_chunk)
                print(f"{results_chunk_count} rows inserted.")
                if insert_errors:
                    print("Error occured during inserting the following rows")
                    print(insert_errors)

        print(f"saved storage stats of {project_id}")

copy_job = client.copy_table(temp_destination_table, destination_table)
copy_job.result()
print("Copy Temp table to Destination table")

delete_job = client.delete_table(temp_destination_table, not_found_ok=True)
delete_job.result()
print("Delete Temp table")

Data Studioで可視化

上記の手順で、複数のGCPプロジェクト内のストレージ利用量を1つのテーブルに集約しました。次に、この情報を可視化し、大量のストレージを利用しているGCPプロジェクト・データセット・テーブルを見つけていきます。可視化には、Google Data Studioを利用します。

datastudio.google.com

完成したダッシュボードを以下に示します。画面上部のフィルターでプロジェクト・データセットを絞り込み、その中でストレージ利用量の多いTOP 5のデータセット・テーブルを確認できるようにしました。

BigQueryストレージダッシュボード

これは余談ですが、せっかくなので下図のようなバブルチャートを利用し、「見た目がカッコ良いダッシュボード」を作ろうともしました。しかし、上図の表形式のダッシュボードの方が役に立ちました。「見た目がカッコ良いダッシュボード」が必ずしも実用的だとは言えないことを実感しました。

BigQueryストレージバブル

テーブル利用状況の調査

ストレージ利用量の大きいテーブルが発見できたので、次に利用状況を調査します。ストレージを大量に利用していたとしても、それが利用されているテーブルであれば無闇に消すことはできません。そのために、テーブルが最近どの程度参照されたのかを確認します。

INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION ビューを用いると、特定のテーブルに対して実行されたクエリを確認できます。なお、JOBS_BY_ORGANIZATION ビューには実行されたSQL文が格納されていないので、必要に応じてJOBS_BY_PROJECT ビューも併用してクエリの利用状況を確認します。そして、BigQueryのストレージを多く消費しており、かつ最近の利用実績が乏しいテーブルを「無駄遣い疑惑」のテーブルとしてリストアップしていきます。

SELECT
  job_id,
  creation_time,
  project_id,
  user_email,
  job_type,
  statement_type,
  destination_table,
  referenced_tables
FROM
  `region-us`.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION
WHERE
  creation_time > TIMESTAMP('2021-09-01')
  AND (
    SELECT
      LOGICAL_OR(rt.dataset_id = "データセット名" AND rt.table_id = "テーブル名")
    FROM UNNEST(referenced_tables) AS rt
  )
ORDER BY creation_time DESC

担当者への対応依頼

最後に「無駄遣い疑惑」テーブルの削除を前提とした対応を担当者に依頼しました。その際には、テーブル名・ストレージ利用量だけではなく、以下の情報も併せて伝えました。それにより、迅速に対処をしてもらうように心がけました。

  • 1年あたりのストレージコスト見積もり
  • 最近数カ月間でそのテーブルに実行されたクエリ
  • 実は必要だということが後から判明しても、削除から7日以内であれば復元可能であること

巨大テーブル棚卸し表

関係部署の協力も得られ、結果として合計で約1000TBの無駄遣いテーブルを削除できました。再発防止策として、アドホック分析の中間結果を配置する場合には、データセット毎にテーブルのデフォルト有効期間を設定するように働きかけました。

cloud.google.com

まとめ

BigQueryはコンピュートだけではなく、ストレージも非常にパワフルなDWHです。そのため、利用者の意図しないところでストレージ費用が高騰する恐れもあります。ZOZOのデータ基盤は多くの部署が利用しているため、それぞれの利用者の努力に依存するだけでは発生を抑制することは困難です。そのため、 INFORMATION_SCHEMA というBigQueryに備わっている仕組みを活用することで、横断的かつ効率的に費用を無駄遣いしているテーブルの発見・削除をしました。

ZOZOではデータ基盤のガバナンスを強化し、利用者にとって安全安心なデータ基盤を整備していく仲間を募集中です。ご興味のある方は以下のリンクからご応募ください。

hrmos.co

カテゴリー