ZOZOTOWNを支えるリアルタイムデータ連携基盤

概要

こんにちは、SRE部MA基盤チームの谷口です。私達のチームでは、データ連携基盤の開発・運用を行っています。

データ基盤には大きく分けて2種類あり、日次でデータ連携してるものとリアルタイムにデータ連携しているものがあります。本記事ではリアルタイムデータ連携基盤についてご紹介します。

既存のデータ連携基盤の紹介

まず既存のデータ連携基盤について簡単にご紹介させていただきます。

既存のデータ連携基盤ではオンプレ環境やクラウドにあるデータをBigQueryへ日次で1回連携しています。ETLツールはSQL Server専用の「bcp」とTreasure Dataが開発しているOSSである「Embulk」を使っています。まずbcpを使い、オンプレ環境の基幹データベース内のテーブルを中間データベースへ連携します。中間データベースへ連携されたデータはEmbulkを使って、BigQueryへ連携されます。この際秘密情報のハッシュ化なども行っています。

これらの処理はワークフローエンジンで制御されていてTreasure Dataが開発しているOSSであるDigdagを使っています。余談ですがZOZOテクノロジーズにはDigdagのコントリビューターが7人もいます!

概要

リアルタイムデータ連携基盤の紹介

なぜ必要なのか

これまでZOZOテクノロジーズでは日次でBigQueryへデータを連携していました。最近では機械学習を使った案件も増えてきており、リアルタイムなデータを必要とするサービスが増えてきています。機械学習の他にも配信基盤では商品が残り1点になったタイミングで通知を行う仕組みがあります。このような要件に対応するためには商品の在庫状況をリアルタイムで連携する必要があります。その他にも、施策のモニタリングや不正利用を素早く検知したいなど様々な案件があり、ZOZOテクノロジーズでもリアルタイムデータ連携基盤を構築することになりました。

活用事例の紹介

リアルタイムデータ連携基盤は検索パーソナライズ基盤の商品在庫の連携で使われています。検索パーソナライズとはユーザごとに商品をおすすめ順で紹介する機能となります。商品在庫がないとユーザに対してレコメンドしたにもかかわらず、商品が既に売り切れているといった機会損失が起きてしまいます。このような要件から検索パーソナライズ基盤ではリアルタイムに商品の在庫状況を知る必要があります。

概要

データ連携の仕組みと課題

リアルタイムデータ連携基盤はオンプレ環境からGCP環境まで多段にレプリケーションを行いデータ連携をしています。SQL ServerからKafkaへの差分連携にはQlik Replicateを採用してます。

Qlik ReplicateはSQL ServerからCDCを取得し、解析可能なメッセージの形へ変換する役割をになっています。CDCとはChange Data Captureの略で、データベース内で行われた変更履歴を追うことができる機能です。オンプレ環境のSQL Serverはバージョンが古くCDCを使うことができないため、Compute Engine上にSQL Serverを立てて差分データを取得しています。

概要

www.qlik.com techblog.zozo.com

多段にレプリケーションを行うことで高頻度連携を実現しようとしましたが、運用する過程でデータの欠損や遅延が課題としてあがりました。

  • データの遅延
    オンプレ環境からBigQueryまでに多段にレプリケーションを行うことで10分から30分程度の遅延が発生していました。
  • データの欠損
    既存の処理系にメモリリークがあり、定期的な再起動によるデータの欠損も発生していました。
  • コスト
    インフラ費用も月額で約200万円程度かかっており汎用的な基盤として使うには課題がありました。

 リプレイス後のリアルタイムデータ連携基盤

既存のリアルタイムデータ連携基盤の課題を解決するため、私たちのチームでリプレイスをすることになりました。

SQL Serverの差分データの取り方を検討

新規に作成するリアルタイムデータ連携基盤では、差分の取得にSQL ServerのChange Trackingを採用しました。

Change TrackingとはCDCのようにSQL Serverの差分データを取得する仕組みです。CDCとの違いはCDCが非同期的な連携であるのに対しChange Trackingは同期的に連携します。CDCよりもリアルタイム性をもって連携できる一方で、変更履歴などは取得はできません。取得できるのは削除や更新、追加といった更新処理の内容や更新バージョン、それと更新のあった主キーのみです。

具体的には、次のようにして差分データの最新の状態を取得しています。

  SELECT
    a.SYS_CHANGE_OPERATION as changetrack_type,
    a.SYS_CHANGE_VERSION as changetrack_ver,
    #{columns}
  FROM
    CHANGETABLE(CHANGES #{@tablename},
      @前回更新したバージョン) AS a
  LEFT OUTER JOIN #{@tablename} ON a.#{@primary_key} = b.#{@primary_key}

差分データの取得方法としてはChange Tracking以外にも、テーブルの更新タイムスタンプを参照する方法やCDCを使う方法も検討しました。

しかし、更新タイムスタンプは付与されているテーブルが非常に少なく、付与されていても更新されないタイムスタンプが多くありました。CDCもオンプレ環境にCDCが使える2016以降のSQL Serverがほとんどありませんでした。また、非同期的なCDCより同期的なChange Trackingの方が高速にデータを取得できます。

このような理由からChange Trackingを使って差分データを取得することになりました。

アーキテクチャ概要と処理の流れ

ここからはリプレイス後のリアルタイムデータ連携基盤のアーキテクチャ概要と処理の流れについてご紹介します。

アーキテクチャの全体図は次の通りです。

概要

Fluentdのプラグインを使った差分データの取得

Change Trackingの実行からPub/Subへのメッセージ転送はFluentdのプラグインを使っています。

冗長構成を実現するため、Compute Engine 2台にプラグインをデプロイしています。片方のインスタンスに問題が起きても、もう片方が生きていればデータの欠損が起きない仕組みとなっています。

オンプレ環境からGCP環境へ高速にデータ連携できるよう専用線としてDedicated Interconnectを使っています。多段にレプリケーションを行ったことによる遅延が課題だったので、最も根元の基幹データベースからデータを取得するようにしています。取得したデータはPub/Subのアウトプットプラグインを使い転送されます。

cloud.google.com

概要

プラグインでは次のようにChange Tracking実行時にレコード単位でユニークとなるメッセージIDを生成しています。生成されたメッセージIDはDataflowのメッセージの重複排除で使います。

BigQueryで主キーの最新の状態を集計できるようChange Trackingのバージョンも渡しています。Dataflowでテーブル名を考慮してBigQueryへ書き込みができるようテーブル名も渡しています。

query = """
  declare @last_synchronization_version bigint;
  SET @last_synchronization_version = #{changetrack_ver};
  SET lock_timeout #{@lock_timeout}
  SELECT
    CONCAT('#{@tablename}','-',a.#{@primary_key.join(',').gsub(',', ',a.')},a.SYS_CHANGE_VERSION) as massage_unique_id,
    '#{@tablename}' as table_name,
    '#{@changetrack_interval}' as changetrack_interval,
    '#{Time.now.utc}' as changetrack_start_time,
    a.SYS_CHANGE_OPERATION as changetrack_type,
    a.SYS_CHANGE_VERSION as changetrack_ver,
    #{columns}
  FROM
    CHANGETABLE(CHANGES #{@tablename},
      @last_synchronization_version) AS a
  LEFT OUTER JOIN #{@tablename} ON a.#{@primary_key} = b.#{@primary_key}
"""

Pub/Subのアウトプットプラグインでは差分データに加えてattributeにDataflowの重複排除で使うメッセージIDを渡しています。Dataflowの重複排除は次章でご紹介します。

<system>
  workers '<worker count>'
</system>
<worker 1>
  <source>
    // Input Plugin
  </source>
  <match '<tag_name>'
    @type gcloud_pubsub
    project "#{ENV['PROJECT_ID']}"
    key /usr/src/app/config/gcp_credential.json
    topic "projects/#{ENV['PROJECT_ID']}/topics/<topic-name>"
    autocreate_topic false
    max_messages 1000
    max_total_size 9800000
    max_message_size 4000000
    attribute_keys ["message_unique_id"]
    // Buffer Plugin
  </match>
</worker>

github.com docs.fluentd.org

Dataflowでメッセージの重複を排除

Fluentd2台の冗長構成によるデータの重複はDataflowのidAttributeを使い重複を排除しています。idAttributeを使うことで、プラグインで付与したメッセージIDを参照して10分以内であれば同じメッセージの重複を排除します。Dataflowを使うとPub/Subで自動的に付与されるメッセージIDの重複は自動で排除でき、at least onceを採用しているPub/Subとは相性の良いツールです。しかし、パブリッシャーが複数回同じメッセージを送った場合、Pub/Subでは異なるメッセージと扱われるため重複の自動排除はできません。このような理由からメッセージのユニーク性を担保したい場合はDatafflowでidAttributeを使います。idAttributeを使うことで2台のFluentdから送られてくるデータの重複を排除しています。

概要

次のサンプルはidAttributeを使ってメッセージの重複排除をする例です。

public static PipelineResult run(Options options) {
  // Create the pipeline
  Pipeline pipeline = Pipeline.create(options);
  pipeline
      .apply(
          "Read PubSub Events",
          PubsubIO.readMessagesWithAttributes()
          .withIdAttribute("message_unique_id")
          .fromSubscription(options.getInputSubscription()))
      .apply(
          "Filter Events If Enabled",
          ParDo.of(
              ExtractAndFilterEventsFn.newBuilder()
                  .withFilterKey(options.getFilterKey())
                  .withFilterValue(options.getFilterValue())
                  .build()))
      .apply("Write PubSub Events", PubsubIO.writeMessages().to(options.getOutputTopic()));
  return pipeline.run();
}

cloud.google.com cloud.google.com

Dataflowで動的にBigQueryの各テーブルに出力

Pub/Subに送られ重複排除されたメッセージはDataflowを使ってBigQueryのテーブルに書き込まれます。DataflowのDynamic Destinationsを使うとメッセージ内のテーブル名に基づいて、出力先のテーブルを動的に振り分けることが可能です。そのため、Dynamic Destinationsを使うことで、1つのDataflowで複数テーブルのデータ連携ができるようになりインフラコストを抑えることができます。

なお、DataflowのDynamic Destinations機能は現時点だとJavaのみサポートしてます。

概要

次のサンプルはDynamic Destinationsを使ってストリーム内のテーブル名を参照してBigQueryのテーブルに書き込む例です。監視や分析用の遅延時間を計測するため、BigQueryへのインサート時刻も取得しています。

WriteResult writeResult = convertedTableRows.get(TRANSFORM_OUT)
.apply(
    BigQueryIO.<TableRow>write()
        .to(
            new DynamicDestinations<TableRow, String>() {
            @Override
            public String getDestination(ValueInSingleWindow<TableRow> elem) {
                return elem.getValue().get("table_name").toString();
            }
            @Override
            public TableDestination getTable(String destination) {
                return new TableDestination(
                    new TableReference()
                        .setProjectId("project_id")
                        .setDatasetId("dataset_name")
                        .setTableId("table_prefix" + "_" + destination), // destination: table name
                    "destination table" + destination);
            }
            @Override
            public TableSchema getSchema(String destination) {
                TableSchema schema =  new TableSchema()
                switch (destination) {
                case "table_a":
                    schema.setFields(ImmutableList.of(new TableFieldSchema().setName("column").setType("STRING").setMode("NULLABLE")));
                    break;
                case "table_b":
                    schema.setFields(ImmutableList.of(new TableFieldSchema().setName("column").setType("STRING").setMode("NULLABLE")));
                    break;
                default:
                }
                return schema
            }
        })
        // BigQuery Insert Time
        .withFormatFunction((TableRow elem) -> elem.set("bigquery_insert_time", Instant.now().toString()))
        .withoutValidation()
        .withCreateDisposition(CreateDisposition.CREATE_NEVER)
        .withWriteDisposition(WriteDisposition.WRITE_APPEND)
        .withExtendedErrorInfo()
        .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
        .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));

www.case-k.jp beam.apache.org

Pub/Subのメッセージ管理

重複排除されてPub/Subに送られてきたメッセージは別のサブスクライバーからも参照できるよう7日間メッセージを保持しています。新しくサブスクライバーを作ればBigtableなどBigQuery以外にも出力できるようになっており、Dataflowのウィンドウ処理等でリアルタイムに特徴量生成などもできるようになっています。

概要

次のサンプルはPub/Subでメッセージを7日間保持するTerraformの例となります。retain_acked_messagesをtrueとすることでサブスクライブされたメッセージを破棄せずに保持します。

message_retention_durationはメッセージの保有期間を決めることができます。なお、メッセージの保有期間は最大で7日です。

resource "google_pubsub_subscription" "message_hub" {
  name  = "message_hub"
  topic = google_pubsub_topic.message_hub.name
  # subscribe from multiple subscriber
  message_retention_duration = "604800s"
  retain_acked_messages      = true
  ack_deadline_seconds = 60
}

cloud.google.com beam.apache.org

イベントログ収集基盤

まだ構想段階ではありますが、データ量の多いイベントログのリアルタイムデータ連携基盤も作ろうとしています。イベントログについてもPub/Subに投げてもらうのが理想的ですが、クライアント側の負担も考慮し現在検討中です。

概要

個人情報の取り扱い

BigQueryとPub/Subに保持される個人情報や秘密情報はアクセスできるユーザを制限しています。

BigQueryではカラムレベルでのアクセス制御を行い、Pub/Subはトピック単位でアクセス制御をしています。カラムレベルのアクセス制御を行うため、ポリシータグを個人情報や秘密情報のカラムに付与しています。ポリシータグとはBigQueryのテーブルに対してカラムレベルのアクセス制御を行うリソースです。

ポリシータグのカラム付与はTerraformで次のようにしてできます。ポリシータグ自体を作ることはまだTerraformではサポートされていないようです。

resource "google_bigquery_table" "table-name" {
  dataset_id = google_bigquery_dataset.<dataset-name>.dataset_id
  table_id   = "<table-name>"
  schema = <<EOF
  [
    {
      "name": "column-name>",
      "type": "STRING",
      "mode": "NULLABLE",
      "policyTags": {
        "names": [
          "projects/<project-id>/locations/<location>/taxonomies/<taxonomies-id>/policyTags/<policy-tag-id>"
        ]
      }
    }
  ]
  EOF
}

cloud.google.com github.com

Pub/SubはDataflowで個人情報や秘密情報をNULL置換したトピックを作ろうと考えています。トピック単位で参照ユーザを制限することで、秘密情報を必要としないサブスクライバーからは参照できないようにします。

cloud.google.com

ビルド・デプロイ戦略

FluentdのプラグインとDataflowのビルド・デプロイ方法についてご紹介できればと思います。CI/CDツールとしてはCircleCIを使っています。Fluentdのプラグインはコンテナイメージを作り、作られたコンテナイメージをContainer RegistryにPUSHしています。Container RegistryのコンテナイメージはCompute Engine起動時にPULLされデプロイされます。データ欠損や遅延が発生しないよう2台のCompute Engineを1台ずつ再起動させ無停止でデプロイできるようにしています。

module "gce-container" {
  source  = "Terraform-google-modules/container-vm/google"
  version = "~> 2.0"

  container = {
    image = "gcr.io/${var.project}/<image-name>"
    tty : true
  }
  restart_policy = "Always"
}

resource "google_compute_instance" "compute engine" {
  name         = "name"
  machine_type = "n2-custom-4-10240"
  zone         = "asia-northeast1-a"

  boot_disk {
    initialize_params {
      image = module.gce-container.source_image
      size  = 500
    }
  }

  metadata_startup_script = "#!/bin/bash /usr/bin/docker-credential-gcr configure-docker EOF"

  metadata = {
    gce-container-declaration = module.gce-container.metadata_value
    google-logging-enabled    = "true"
    google-monitoring-enabled = "true"
  }

  service_account {
    email = "${google_service_account.tracker_app.email}"
    scopes = [
      "https://www.googleapis.com/auth/cloud-platform",
    ]
  }
}

Dataflowはカスタムテンプレートをビルドし、既存のパイプラインの更新を行います。

DataflowのカスタムテンプレートはGoogle提供のテンプレートをベースにカスタマイズしています。ビルド時にenableStreamingEngineオプションを利用すると使用するディスク容量を420GBから30GBにインフラ費用を抑えることができます。

次のコードはテンプレートをビルドする際にenableStreamingEngineオプションを指定する例です。

mvn -Pdataflow-runner compile exec:java \
  -Dexec.mainClass=com.google.cloud.teleport.templates.PubsubToPubsub \
  -Dexec.args="--project=${project_id} \
              --tempLocation=gs://${project_id}/tmp \
              --templateLocation=gs://${project_id}/templates/<template-name> \
              --experiments=enable_stackdriver_agent_metrics \
              --enableStreamingEngine \
              --runner=DataflowRunner"

cloud.google.com cloud.google.com github.com

ビルドされたテンプレートはupdateオプションを使い既存のパイプラインの更新を行っています。互換性チェックにより、中間状態やバッファデータなどが前のジョブから置換ジョブに確実に転送することが可能です。

次のコードはPythonクライアントを使ってパイプラインを更新する例です。

def create_template_request(self, job_name, template_path, parameters, environment, update_options):
    request = self.dataflow.projects().templates().launch(
        projectId = self.project_id,
        location = 'us-central1',
        gcsPath = template_path,
        body = {
            "jobName": job_name,
            "parameters": parameters,
            "environment": environment,
            "update": update_options
        }
    )
    return request

def deploy_dynamic_destinations_datatransfer(self, active_jobs):
    job_name='dynamic_destinations_datatransfer'
    template_name = 'PubSubToBigQueryDynamicDestinationsTemplate'
    template_path = "gs://{}/templates/{}".format(self.project_id, template_name)
    input_subscription = 'message_hub'
    output_default_table = 'streaming_datatransfer.streaming_dynamic_changetracktransfer'
    parameters = {
        "inputSubscription": "projects/{}/subscriptions/{}".format(self.project_id, input_subscription),
        "outputTableSpec": "{}:{}".format(self.project_id, output_default_table),
        "autoscalingAlgorithm": "THROUGHPUT_BASED"
    }
    environment = {
        "machineType": 'n2-standard-2',
        "maxWorkers": 5
    }
    update_options='false'
    if 'dynamic_destinations_datatransfer' in active_jobs:
        update_options='true'
    request = self.create_template_request(job_name, template_path, parameters, environment, update_options)
    request.execute()

cloud.google.com

監視

監視対象としてはデータの欠損や遅延が発生してないかCloud LoggingやMonitoring、Redashを使い監視しています。

データの欠損

データの欠損はリトライログとメモリの使用率を確認してます。リトライの上限を超えるとデータが欠損してしまうのと、メモリの使用率が100%に達すると基幹データベースへ接続ができなくなるからです。

次のようにしてFluentdのプラグイン側でリトライ時にログを出力しています。

def execute_changetracking(changetrack_ver)
  try = 0
  begin
    try += 1
    query = generate_query(changetrack_ver)
    changetrack_results = execute_query(query)
    if !changetrack_results.nil?
      changetrack_results.each_slice(@batch_size) { |rows|
        es = MultiEventStream.new
        rows.each do |r|
          r["changetrack_end_time"] = Time.now.utc
          es.add(Fluent::Engine.now, r)
          if changetrack_ver < r["changetrack_ver"] then
            changetrack_ver = r["changetrack_ver"]
          end
        end
        router.emit_stream(@output_tag, es)
      }
      update_changetrack_version(changetrack_ver)
    end
  rescue => e
    puts "Write Retry Cnt: #{try}, Table Name: #{@tablename}, Error Message: #{e}"
    sleep try**2
    retry if try < @retry_max_times
    raise
  end
end

リトライ時のログはCompute Engine起動時にデプロイしたCloud Loggingエージェントでログを取得しています。

metadata = {
  gce-container-declaration = module.gce-container.metadata_value
  google-logging-enabled    = "true"
  google-monitoring-enabled = "true"
}

次のようにしてCloud Loggingでメトリクスを作ります。今回リトライ回数が10回でアラートを通知するように設定しました。

resource "google_logging_metric" "retry_error_tracker_a_metric" {
  name   = "retry-error-tracker-a/metric"
  filter = "resource.type=\"gce_instance\" severity>=DEFAULT jsonPayload.message: \"Write Retry Cnt: 10\" resource.labels.instance_id: \"${google_compute_instance.streaming_datatransfer_a.instance_id}\""
  metric_descriptor {
    metric_kind = "DELTA"
    value_type  = "INT64"
  }
}

作成したメトリクスを使いCloud Monitoringでアラートを通知します。

resource "google_monitoring_alert_policy" "tracker_a_retry_error_alert_policy" {
  display_name = "Tracker A Retry Error"
  depends_on   = [google_logging_metric.retry_error_tracker_a_metric]
  combiner     = "OR"
  conditions {
    display_name = "condition"
    condition_threshold {
      filter     = "metric.type=\"logging.googleapis.com/user/retry-error-tracker-a/metric\" resource.type=\"gce_instance\""
      duration   = "0s"
      comparison = "COMPARISON_GT"
      aggregations {
        alignment_period   = "60s"
        per_series_aligner = "ALIGN_DELTA"
      }
      trigger {
        count = 1
      }
      threshold_value = 0
    }
  }
  enabled = true
  # gcloud alpha monitoring policies list --project=streaming-datatransfer-env
  notification_channels = ["projects/${var.project}/notificationChannels/${var.slack_notification_channel_id}"]
}

メモリが枯渇するとプラグインから基幹データベースへのデータ取得が失敗するので、メモリ使用率が80%を超えた場合アラートを投げるよう設定してます。

resource "google_monitoring_alert_policy" "tracker_a_memory_alert_policy" {
  display_name = "Tracker A Memory Utilization"
  combiner     = "OR"
  conditions {
    display_name = "condition"
    condition_threshold {
      filter     = "metric.type=\"agent.googleapis.com/memory/percent_used\" resource.type=\"gce_instance\" resource.labels.instance_id=\"${google_compute_instance.streaming_datatransfer_a.instance_id}\" metric.label.\"state\"=\"used\""
      duration   = "60s"
      comparison = "COMPARISON_GT"
      aggregations {
        alignment_period   = "60s"
        per_series_aligner = "ALIGN_MEAN"
      }
      trigger {
        count = 1
      }
      threshold_value = 80
    }
  }
  enabled = true
  notification_channels = ["projects/${var.project}/notificationChannels/${var.slack_notification_channel_id}"]
}

データの遅延

データの遅延はRedashを使い定期的にクエリを投げて監視しています。

データの遅延はChange Trackingの開始時間とBigQueryのインサート時刻の差分を確認しています。不正なレコードが混在した際はBigQueryの_error_recordsテーブルに書き込まれるため、書き込みを検知してアラートを通知するようにします。

また、CPUの使用状況も遅延に影響があるため、メモリ使用率と同様に監視しています。

filter     = "metric.type=\"agent.googleapis.com/cpu/utilization\" resource.type=\"gce_instance\" resource.labels.instance_id=\"${google_compute_instance.streaming_datatransfer_a.instance_id}\" metric.label.\"state\"=\"used\""

性能評価

リプレイスよってデータ欠損もなくなり、遅延時間としても取得のインターバルを除けば数秒程度でデータ連携を行うことができるようになりました。

次の図はChange Trackingで取得したレコード数と遅延時間(秒)の関係となります。取得するレコード数が多いと遅延しますが、40万レコードほどの更新でも5分以内に連携できる基盤を作ることができました。 概要

またコスト面でも月間で約200万円ほどかかっていましたが約5万円程度にできました。

まとめ

今回リアルタイムデータ連携基盤についてご紹介しました。

現在ZOZOTOWNでは、リアルタイムデータを活用した案件が増えてきています。この記事を読んで、もしご興味をもたれた方は是非採用ページからお申し込みください。

tech.zozo.com

また、8/27(木)にリアルタイムデータ連携基盤含めMAの取り組みについてのイベントを行いますのでぜひご参加ください。

zozotech-inc.connpass.com

カテゴリー