BigQueryの監査ログをリアルタイムに監視して使いすぎを防止してみる

OGP

こんにちは。SRE部の塩崎です。七味唐辛子の粉末を7種類に分類するという趣味を発展させて、おっとっとを新口動物と旧口動物に分類するという趣味を最近発明しました。

BigQueryは非常にパワフルなData WareHouse(DWH) SaaSであり、大容量のデータを一瞬で分析できます。しかし、課金額がスキャンしたデータ量に比例するという特徴があるため、意図せずに大量のデータをスキャンしてしまい大金を溶かしてしまうことを懸念する人もいます。

qiita.com

そのため、課金額が大きすぎるクエリを発見した際にSlackへ通知する仕組みを作りました。GCP Organization内の全プロジェクトで実行されたBigQueryの監査ログをリアルタイムにチェックすることによってこの仕組みは実現されています。本記事では作成したシステムを紹介します。

なお、本記事は以下のQiita記事に着想を得たものであり、@irotorisさんにはこの場を借りてお礼申し上げます。 qiita.com

最初は元記事で公開されているソースコードをそのまま使用することを考えていたのですが、ライセンス表記がなかったため独自で実装し直し、我々のユースケースにおいて不足する機能を付け足しました。

BigQueryのコストを抑える方法

最初にBigQueryのコスト削減に有効な機能の紹介と、それらだけでは不十分であり、監査ログのリアルタイムスキャン機能を作成した理由について説明します。

Custom Quotas

まず、BigQueryのコストを抑える方法として、プロジェクトレベル・ユーザーレベルにQuotaを割り当てる機能があります。

cloud.google.com

この機能を使えば、特定のユーザーが高額なクエリを実行しすぎた際に、それ以上のクエリの実行を停止できます。

しかし、ユーザー毎にQuotaの上限を変えたい場合は、そのユーザーが属するプロジェクトを変える必要があります。プロジェクトを適切に分けてある場合はこの機能の導入がしやすいですが、そうでない場合は事故を防ぐために相当大きめのQuotaとして設定する必要が出てきてしまいます。特に落ちてはいけない大事なバッチが実行されるプロジェクトとアドホックなクエリが実行されるプロジェクトの分離は必須です。そうしない場合はアドホックなクエリによってQuotaが全部使い果たされてしまい、大事なバッチがusageQuotaExceededエラーになってしまいます。

我々の環境ではこれらの分離がまだまだ不十分であったため、この機能の導入は一旦見送りました。分離が十分にできた時点で導入を再度検討することを考えております。

Reservations

次に紹介するのがReservations機能です。

cloud.google.com

この機能を使うと、スキャンしたデータ量によらず毎月固定の金額が課金されます。どれだけの計算量が必要なのかというのをSlotという単位であらかじめコミットします。このSlotはCPUを仮想化した概念で1Slotが1CPUに相当する量です。

cloud.google.com

このReservations機能を使うことで、「お値段定額クエリ実行し放題プラン」になると勘違いしやすいですが、実際は異なります。

実際は「お値段定額『Slotの枠内で』クエリ実行し放題プラン」です。購入したSlot数を超えたパフォーマンスが出ることはないため、Slotを大量に消費するクエリが同時に実行された場合は、クエリの実行時間が伸びるという結果になります。Slotはプロジェクト毎でしか割り当てできないので、大事な集計バッチとアドホッククエリのプロジェクトを変えないと、集計バッチのSLAを担保できません。

我々の環境ではプロジェクトごとにReservations機能の有効・無効を切り替え、コストを最適化することを試みています。定常的にクエリが実行されているプロジェクトではこの機能を有効化しコストの予測可能性を高めます。

一方でクエリの実行頻度が低いプロジェクトやクエリの実行頻度が一定でないプロジェクトでは従来の料金プランと最低60秒の枠でSlotを購入できるFlex Slotsを利用しようとしています。

いずれのプロジェクトに対しても、スキャン量が多すぎるクエリや、Slot使用量が多すぎるクエリを発見しアラートを上げる仕組みが必要です。

そのため、BigQueryの課金ログをリアルタイムにチェックし、スキャン量やSlot使用量などが多すぎるクエリを通知する仕組みを作成しました。

インフラ構成

今回構築したシステムのインフラ構成図を以下に示します。

インフラ図

GCPのマネージドサービスを活用した、イベントドリブンかつサーバレスな構成です。

BigQueryの監査ログはデフォルトではONになっているため、Cloud Loggingに送られています。Organization内の全プロジェクトから送られてくる監査ログをAggregated sinks機能で集約し、Cloud Pub/Subの1つのTopicに集めます。

その後、Cloud Pub/SubのPush SubscriptionでCloud Runを起動します。HTTPリクエストのBodyの中に監査ログが含まれているため、Cloud Run上に実装したアプリケーションでクエリがリソースを使いすぎていないかどうかをチェックし、Slackに通知します。

ここからは各サービスの連携方法をTerraformのtfファイルを交えながら説明していきます。

Cloud Logging → Cloud Pub/Sub

一番最初にCloud Pub/SubのTopicを作成します。これについては特に詳しい説明は不要かと思います。

resource "google_pubsub_topic" "bq-police" {
  name = "bq-police"
}

次に先程作成したTopicに対して、Cloud LoggingからpublishするためのLog Sinkを作成します。Organization内の全てのログを出力するために、Aggregated sinks機能を使っています。このSinkを作成するためにはOrganizationの logging.configWriter を付与したアカウントで terraform apply をする必要があります。

cloud.google.com

org_id パラメーターは、gcloud organizations list コマンドを実行すると確認できます。filterで指定しているクエリは以下のページを参考にして作成しました。古いタイプの課金ログ(AuditData)と新しいタイプの課金ログ(BigQueryAuditMetadata)の2種類があり、新しい側の利用が推奨されている点に注意が必要です。

cloud.google.com

resource "google_logging_organization_sink" "bq-police-org" {
  name = "bq-police-org"
  destination = "pubsub.googleapis.com/${google_pubsub_topic.bq-police.id}"
  org_id = "123456789012" # 各自の環境で変える
  include_children = true

  filter = <<-EOT
  protoPayload.metadata."@type"="type.googleapis.com/google.cloud.audit.BigQueryAuditMetadata"
  protoPayload.metadata.jobChange.job.jobConfig.type = "QUERY"
  EOT
}

そして、Cloud Loggingのサービスアカウント(writer_identity)にTopicへのpublish権限を付与します。Log Sinkはそれぞれ固有のサービスアカウントで実行されており、そのサービスアカウントに対する書き込み権限の付与が必要です。

cloud.google.com

resource "google_pubsub_topic_iam_member" "bq-police-org" {
  project = google_pubsub_topic.bq-police.project
  topic = google_pubsub_topic.bq-police.name
  role = "roles/pubsub.publisher"
  member = google_logging_organization_sink.bq-police-org.writer_identity
}

Cloud Pub/Sub → Cloud Run

次にCloud Pub/SubとCloud Runの連携について説明します。

まずは、Cloud Runのサービスを作成します。アラートを発報するための閾値(THRESHOLD_*)やアラート対象から除外するユーザー(EXEMPTED_USERS)などを環境変数で設定できるようにしています。また、Slackに通知するためのWeb hookのURLは後述するBerglasで設定できるようにしています。

resource "google_service_account" "bq-police-cloud-run" {
  account_id = "bq-police-cloud-run"
  display_name = "BQ Police(Cloud Run)"
}

resource "google_cloud_run_service" "bq-police" {
  name = "bq-police"
  location = "us-central1"

  template {
    spec {
      containers {
        image = "gcr.io/プロジェクトID/bq-police:latest"
        resources {
          limits = {
            cpu = "1000m"
            memory = "256Mi"
          }
        }
        env {
          name = "TZ"
          value = "Asia/Tokyo"
        }
        env {
          name = "THRESHOLD_TOTAL_BILLED_BYTES"
          value = "0"
        }
        env {
          name = "THRESHOLD_TOTAL_SLOT_MS"
          value = "0"
        }
        env {
          name = "EXEMPTED_USERS"
          value = ""
        }
        env {
          name = "SLACK_WEBHOOK_URL"
          value = "sm://プロジェクトID/slack_webhook_url"
        }
      }

      container_concurrency = 10
      service_account_name = google_service_account.bq-police-cloud-run.email
    }
    metadata {
      annotations = {
        "autoscaling.knative.dev/maxScale" = "10"
        "client.knative.dev/user-image"    = "gcr.io/プロジェクトID/bq-police:latest"
        "run.googleapis.com/client-name"   = "terraform"
      }
    }
  }
  autogenerate_revision_name = true
}

Cloud Pub/SubからCloud Runを呼び出すために、Push Subscriptionを作成します。今回作成したCloud Runサービスは呼び出すための認証が必要なため、Pushする際にHTTPヘッダーへ認証情報を埋め込む設定をします。このSubscription専用のサービスアカウントを作成し、serviceAccountTokenCreatorのロールを与えます。これにより、このサービスアカウントは自身のOIDCトークンを取得できるようになります。push_config で生成されたOIDCトークンをHTTPヘッダーに埋め込むように設定します。

cloud.google.com

なお、このOIDCトークンを埋め込んだリクエストを生成するという方式はCloud Pub/Sub以外のプロダクトでも活用できます。以下の記事で詳しく解説されています。

medium.com

そして、Cloud Pub/SubのサービスアカウントにCloud Runサービスの run.invoker ロールを付与することで、この認証済みリクエストに対する認可をします。

resource "google_service_account" "bq-police-pubsub" {
  account_id = "bq-police-pubsub"
  display_name = "BQ Police(Cloud Pub/Sub)"
}

resource "google_project_iam_member" "bq-police-pubsub" {
  member = "serviceAccount:${google_service_account.bq-police-pubsub.email}"
  role = "roles/iam.serviceAccountTokenCreator"
}

resource "google_pubsub_subscription" "bq-police" {
  name  = "bq-police"
  topic = google_pubsub_topic.bq-police.name

  push_config {
    push_endpoint = google_cloud_run_service.bq-police.status[0].url
    oidc_token {
      service_account_email = google_service_account.bq-police-pubsub.email
    }
  }
}

resource "google_cloud_run_service_iam_member" "bq-police-pubsub" {
  service = google_cloud_run_service.bq-police.name
  location = google_cloud_run_service.bq-police.location
  role = "roles/run.invoker"
  member = "serviceAccount:${google_service_account.bq-police-pubsub.email}"
}

アプリケーション

今回はアプリケーションの実行基盤にCloud Runを使っています。そのため、App Engine(Standard Environment)やCloud Functionsと比較して使用できる言語・フレームワークの自由度が高いです。使い慣れているという理由で、Ruby + Sinatraを使って実装してみました。特別なことはしていないので、詳しい説明は省略します。

require 'json'
require 'yaml'
require 'erb'
require 'sinatra'
require 'faraday'

THRESHOLD = {
  total_billed_bytes: ENV.fetch('THRESHOLD_TOTAL_BILLED_BYTES', '0').to_i,
  total_slot_ms: ENV.fetch('THRESHOLD_TOTAL_SLOT_MS', '0').to_i,
}
EXEMPTED_USERS = ENV.fetch('EXEMPTED_USERS', '').split(',')
WEBHOOK_URL = ENV['SLACK_WEBHOOK_URL']

class AuditLog
  attr_reader :project_id, :total_billed_bytes, :total_slot_ms, :principal_email, :start_time, :end_time, :query

  def self.from_pubsub_format(data)
    # Cloud Pub/Subから送られてくるBigQueryの監査ログはBase64でエンコードされたJSON
    self.new(JSON.load(Base64.decode64(data['message']['data'])))
  end

  def initialize(log)
    # Ref: https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/BigQueryAuditMetadata
    @project_id         = log['resource']['labels']['project_id']
    @principal_email    = log['protoPayload']['authenticationInfo']['principalEmail']
    @total_billed_bytes = log['protoPayload']['metadata']['jobChange']['job']['jobStats']['queryStats']['totalBilledBytes'].to_i
    @total_slot_ms      = log['protoPayload']['metadata']['jobChange']['job']['jobStats']['totalSlotMs'].to_i
    @start_time         = Time.parse(log['protoPayload']['metadata']['jobChange']['job']['jobStats']['startTime'])
    @end_time           = Time.parse(log['protoPayload']['metadata']['jobChange']['job']['jobStats']['endTime'])
    @query              = log['protoPayload']['metadata']['jobChange']['job']['jobConfig']['queryConfig']['query']
  end

  def duration
    end_time - start_time
  end

  def total_billed_gb
    total_billed_bytes.to_f / 1024 / 1024 / 1024
  end

  def cost
    # Ref: https://cloud.google.com/bigquery/pricing/#on_demand_pricing
    total_billed_gb.to_f / 1024 * 5
  end

  def total_slot_s
    total_slot_ms.to_f / 1000
  end
end

class BqPolice < Sinatra::Base
  def alert?(audit_log, threshold, exempted_users)
    if exempted_users.include?(audit_log.principal_email)
      return false
    end

    if audit_log.total_billed_bytes >= threshold[:total_billed_bytes] || audit_log.total_slot_ms >= threshold[:total_slot_ms]
      true
    else
      false
    end
  end

  def format_message(audit_log)
    YAML.load(ERB.new(File.read('slack_message.yml.erb')).result_with_hash(audit_log: audit_log))
  end

  def post_to_slack(webhook_url, message)
    Faraday.post(webhook_url, JSON.dump(message), 'Content-Type' => 'application/json')
  end

  post '/' do
    audit_log = AuditLog.from_pubsub_format(JSON.load(request.body.read))

    if alert?(audit_log, THRESHOLD, EXEMPTED_USERS)
      message = format_message(audit_log)
      post_to_slack(WEBHOOK_URL, message)
    end

    status 200
  end
end

上記のRubyコードで参照している slack_message.yml.erb を以下に示します。

username: 'BigQuery Police'
icon_emoji: ':cop:'
channel: '#投稿したいチャンネル'
attachments:
  - text: "BigQuery cost alert"
    fallback: "BigQuery cost alert"
    color: 'danger'
    fields:
      - title: 'User'
        value: <%= audit_log.principal_email %>
        short: true
      - title: 'Project'
        value: <%= audit_log.project_id %>
        short: true
      - title: 'Billed bytes (GB)'
        value: <%= audit_log.total_billed_gb.round(2) %>
        short: true
      - title: 'Cost (USD)'
        value: <%= audit_log.cost.round(2) %>
        short: true
      - title: 'Start time'
        value: <%= audit_log.start_time.getlocal.to_s %>
        short: true
      - title: 'End time'
        value: <%= audit_log.end_time.getlocal.to_s %>
        short: true
      - title: 'Duration (sec)'
        value: <%= audit_log.duration.round(2) %>
        short: true
      - title: 'Slot time (sec)'
        value: <%= audit_log.total_slot_s.round(2) %>
        short: true
      - title: 'Query'
        value: |-
          ```
          <%= audit_log.query.slice(0, 3000).gsub("\n", "\n          ") %>
          ```
        short: false

このアプリケーションを動かすためのDockerイメージを作成します。SlackのWeb hook URLはSecret Managerに格納されており、それをBerglas経由で取得しています。そのため、Berglasのバイナリをコンテナ内に入れ、Berglas経由でPumaを起動しています。Pumaの設定ファイルやGemfileは省略します。

FROM ruby:2.7-slim

COPY --from=us-docker.pkg.dev/berglas/berglas/berglas:latest /bin/berglas /bin/berglas
RUN apt-get -qq update && \
    apt-get -qq -y install build-essential --fix-missing --no-install-recommends

WORKDIR /usr/src/app
COPY Gemfile Gemfile.lock ./
ENV BUNDLE_FROZEN=true
RUN gem install bundler && bundle config set --local without 'test' && bundle install

COPY . ./

CMD ["/bin/berglas", "exec", "--", "/usr/local/bundle/bin/puma", "-C", "puma.rb"]

Cloud RunがBerglas経由でSecret ManagerからSlack Web hook URLを取得できるように、以下のコマンドを実行しておきます。

berglas create sm://プロジェクト名/slack_webhook_url "SlackのWeb hook URL"
berglas grant sm://プロジェクト名/slack_webhook_url --member serviceAccount:Cloud Runのサービスアカウント

まとめ

BigQueryの監査ログをリアルタイムにCloud Runで処理することによって、BigQueryで高額なクエリを実行されたときにいち早く気づくことができるようになりました。スキャン量に対するアラート条件だけでなく使用したSlot数に対する条件を指定できるようにし、オンデマンド課金のプロジェクトでもFlat rate課金のプロジェクトでも使用できました。

動作イメージ

なお、上図は意図的に閾値を厳しくした時の通知であり、実際にはこのレベルのクエリでアラートを発報させてはいません。

ZOZOテクノロジーズでは多数の社員から使われるデータ基盤のデータガバナンスを高められる人材を募集しています。ご興味のある方は以下のリンクからご応募ください。

hrmos.co

カテゴリー