こんにちは、SRE部MA基盤チームの谷口(case-k)です。私達のチームでは、データ連携基盤の開発・運用をしています。
データ基盤には大きく分けて2種類あり、日次でデータ連携してるものとリアルタイムにデータ連携しているものがあります。本記事ではリアルタイムデータ連携基盤についてご紹介します。
既存のデータ連携基盤の紹介
まず既存のデータ連携基盤について簡単にご紹介させていただきます。
既存のデータ連携基盤ではオンプレ環境やクラウドにあるデータをBigQueryへ日次で1回連携しています。ETLツールはSQL Server専用の「bcp」とTreasure Dataが開発しているOSSである「Embulk」を使っています。まずbcpを使い、オンプレ環境の基幹データベース内のテーブルを中間データベースへ連携します。中間データベースへ連携されたデータはEmbulkを使って、BigQueryへ連携されます。この際秘密情報のハッシュ化なども行っています。
これらの処理はワークフローエンジンで制御されていてTreasure Dataが開発しているOSSであるDigdagを使っています。余談ですがZOZOテクノロジーズにはDigdagのコントリビューターが7人もいます!
リアルタイムデータ連携基盤の紹介
なぜ必要なのか
これまでZOZOテクノロジーズでは日次でBigQueryへデータを連携していました。最近では機械学習を使った案件も増えてきており、リアルタイムなデータを必要とするサービスが増えてきています。機械学習の他にも配信基盤では商品が残り1点になったタイミングで通知を行う仕組みがあります。このような要件に対応するためには商品の在庫状況をリアルタイムで連携する必要があります。その他にも、施策のモニタリングや不正利用を素早く検知したいなど様々な案件があり、ZOZOテクノロジーズでもリアルタイムデータ連携基盤を構築することになりました。
活用事例の紹介
リアルタイムデータ連携基盤は検索パーソナライズ基盤の商品在庫の連携で使われています。検索パーソナライズとはユーザごとに商品をおすすめ順で紹介する機能となります。商品在庫がないとユーザに対してレコメンドしたにもかかわらず、商品が既に売り切れているといった機会損失が起きてしまいます。このような要件から検索パーソナライズ基盤ではリアルタイムに商品の在庫状況を知る必要があります。
データ連携の仕組みと課題
リアルタイムデータ連携基盤はオンプレ環境からGCP環境まで多段にレプリケーションを行いデータ連携をしています。SQL ServerからKafkaへの差分連携にはQlik Replicateを採用してます。
Qlik ReplicateはSQL ServerからCDCを取得し、解析可能なメッセージの形へ変換する役割をになっています。CDCとはChange Data Captureの略で、データベース内で行われた変更履歴を追うことができる機能です。オンプレ環境のSQL Serverはバージョンが古くCDCを使うことができないため、Compute Engine上にSQL Serverを立てて差分データを取得しています。
https://www.qlik.com/us/attunitywww.qlik.com techblog.zozo.com
多段にレプリケーションを行うことで高頻度連携を実現しようとしましたが、運用する過程でデータの欠損や遅延が課題としてあがりました。
- データの遅延
オンプレ環境からBigQueryまでに多段にレプリケーションを行うことで10分から30分程度の遅延が発生していました。 - データの欠損
既存の処理系にメモリリークがあり、定期的な再起動によるデータの欠損も発生していました。 - コスト
インフラ費用も月額で約200万円程度かかっており汎用的な基盤として使うには課題がありました。
リプレイス後のリアルタイムデータ連携基盤
既存のリアルタイムデータ連携基盤の課題を解決するため、私たちのチームでリプレイスをすることになりました。
SQL Serverの差分データの取り方を検討
新規に作成するリアルタイムデータ連携基盤では、差分の取得にSQL ServerのChange Trackingを採用しました。
Change TrackingとはCDCのようにSQL Serverの差分データを取得する仕組みです。CDCとの違いはCDCが非同期的な連携であるのに対しChange Trackingは同期的に連携します。CDCよりもリアルタイム性をもって連携できる一方で、変更履歴などは取得はできません。取得できるのは削除や更新、追加といった更新処理の内容や更新バージョン、それと更新のあった主キーのみです。
具体的には、次のようにして差分データの最新の状態を取得しています。
SELECT a.SYS_CHANGE_OPERATION as changetrack_type, a.SYS_CHANGE_VERSION as changetrack_ver, #{columns} FROM CHANGETABLE(CHANGES #{@tablename}, @前回更新したバージョン) AS a LEFT OUTER JOIN #{@tablename} ON a.#{@primary_key} = b.#{@primary_key}
差分データの取得方法としてはChange Tracking以外にも、テーブルの更新タイムスタンプを参照する方法やCDCを使う方法も検討しました。
しかし、更新タイムスタンプは付与されているテーブルが非常に少なく、付与されていても更新されないタイムスタンプが多くありました。CDCもオンプレ環境にCDCが使える2016以降のSQL Serverがほとんどありませんでした。また、非同期的なCDCより同期的なChange Trackingの方が高速にデータを取得できます。
このような理由からChange Trackingを使って差分データを取得することになりました。
アーキテクチャ概要と処理の流れ
ここからはリプレイス後のリアルタイムデータ連携基盤のアーキテクチャ概要と処理の流れについてご紹介します。
アーキテクチャの全体図は次の通りです。
Fluentdのプラグインを使った差分データの取得
Change Trackingの実行からPub/Subへのメッセージ転送はFluentdのプラグインを使っています。
冗長構成を実現するため、Compute Engine 2台にプラグインをデプロイしています。片方のインスタンスに問題が起きても、もう片方が生きていればデータの欠損が起きない仕組みとなっています。
オンプレ環境からGCP環境へ高速にデータ連携できるよう専用線としてDedicated Interconnectを使っています。多段にレプリケーションを行ったことによる遅延が課題だったので、最も根元の基幹データベースからデータを取得するようにしています。取得したデータはPub/Subのアウトプットプラグインを使い転送されます。
プラグインでは次のようにChange Tracking実行時にレコード単位でユニークとなるメッセージIDを生成しています。生成されたメッセージIDはDataflowのメッセージの重複排除で使います。
BigQueryで主キーの最新の状態を集計できるようChange Trackingのバージョンも渡しています。Dataflowでテーブル名を考慮してBigQueryへ書き込みができるようテーブル名も渡しています。
query = """ declare @last_synchronization_version bigint; SET @last_synchronization_version = #{changetrack_ver}; SET lock_timeout #{@lock_timeout} SELECT CONCAT('#{@tablename}','-',a.#{@primary_key.join(',').gsub(',', ',a.')},a.SYS_CHANGE_VERSION) as massage_unique_id, '#{@tablename}' as table_name, '#{@changetrack_interval}' as changetrack_interval, '#{Time.now.utc}' as changetrack_start_time, a.SYS_CHANGE_OPERATION as changetrack_type, a.SYS_CHANGE_VERSION as changetrack_ver, #{columns} FROM CHANGETABLE(CHANGES #{@tablename}, @last_synchronization_version) AS a LEFT OUTER JOIN #{@tablename} ON a.#{@primary_key} = b.#{@primary_key} """
Pub/Subのアウトプットプラグインでは差分データに加えてattributeにDataflowの重複排除で使うメッセージIDを渡しています。Dataflowの重複排除は次章でご紹介します。
<system> workers '<worker count>' </system> <worker 1> <source> // Input Plugin </source> <match '<tag_name>' @type gcloud_pubsub project "#{ENV['PROJECT_ID']}" key /usr/src/app/config/gcp_credential.json topic "projects/#{ENV['PROJECT_ID']}/topics/<topic-name>" autocreate_topic false max_messages 1000 max_total_size 9800000 max_message_size 4000000 attribute_keys ["message_unique_id"] // Buffer Plugin </match> </worker>
Dataflowでメッセージの重複を排除
Fluentd2台の冗長構成によるデータの重複はDataflowのidAttributeを使い重複を排除しています。idAttributeを使うことで、プラグインで付与したメッセージIDを参照して10分以内であれば同じメッセージの重複を排除します。Dataflowを使うとPub/Subで自動的に付与されるメッセージIDの重複は自動で排除でき、at least onceを採用しているPub/Subとは相性の良いツールです。しかし、パブリッシャーが複数回同じメッセージを送った場合、Pub/Subでは異なるメッセージと扱われるため重複の自動排除はできません。このような理由からメッセージのユニーク性を担保したい場合はDatafflowでidAttributeを使います。idAttributeを使うことで2台のFluentdから送られてくるデータの重複を排除しています。
次のサンプルはidAttributeを使ってメッセージの重複排除をする例です。
public static PipelineResult run(Options options) { // Create the pipeline Pipeline pipeline = Pipeline.create(options); pipeline .apply( "Read PubSub Events", PubsubIO.readMessagesWithAttributes() .withIdAttribute("message_unique_id") .fromSubscription(options.getInputSubscription())) .apply( "Filter Events If Enabled", ParDo.of( ExtractAndFilterEventsFn.newBuilder() .withFilterKey(options.getFilterKey()) .withFilterValue(options.getFilterValue()) .build())) .apply("Write PubSub Events", PubsubIO.writeMessages().to(options.getOutputTopic())); return pipeline.run(); }
cloud.google.com cloud.google.com
Dataflowで動的にBigQueryの各テーブルに出力
Pub/Subに送られ重複排除されたメッセージはDataflowを使ってBigQueryのテーブルに書き込まれます。DataflowのDynamic Destinationsを使うとメッセージ内のテーブル名に基づいて、出力先のテーブルを動的に振り分けることが可能です。そのため、Dynamic Destinationsを使うことで、1つのDataflowで複数テーブルのデータ連携ができるようになりインフラコストを抑えることができます。
なお、DataflowのDynamic Destinations機能は現時点だとJavaのみサポートしてます。
次のサンプルはDynamic Destinationsを使ってストリーム内のテーブル名を参照してBigQueryのテーブルに書き込む例です。監視や分析用の遅延時間を計測するため、BigQueryへのインサート時刻も取得しています。
WriteResult writeResult = convertedTableRows.get(TRANSFORM_OUT) .apply( BigQueryIO.<TableRow>write() .to( new DynamicDestinations<TableRow, String>() { @Override public String getDestination(ValueInSingleWindow<TableRow> elem) { return elem.getValue().get("table_name").toString(); } @Override public TableDestination getTable(String destination) { return new TableDestination( new TableReference() .setProjectId("project_id") .setDatasetId("dataset_name") .setTableId("table_prefix" + "_" + destination), // destination: table name "destination table" + destination); } @Override public TableSchema getSchema(String destination) { TableSchema schema = new TableSchema() switch (destination) { case "table_a": schema.setFields(ImmutableList.of(new TableFieldSchema().setName("column").setType("STRING").setMode("NULLABLE"))); break; case "table_b": schema.setFields(ImmutableList.of(new TableFieldSchema().setName("column").setType("STRING").setMode("NULLABLE"))); break; default: } return schema } }) // BigQuery Insert Time .withFormatFunction((TableRow elem) -> elem.set("bigquery_insert_time", Instant.now().toString())) .withoutValidation() .withCreateDisposition(CreateDisposition.CREATE_NEVER) .withWriteDisposition(WriteDisposition.WRITE_APPEND) .withExtendedErrorInfo() .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
Pub/Subのメッセージ管理
重複排除されてPub/Subに送られてきたメッセージは別のサブスクライバーからも参照できるよう7日間メッセージを保持しています。新しくサブスクライバーを作ればBigtableなどBigQuery以外にも出力できるようになっており、Dataflowのウィンドウ処理等でリアルタイムに特徴量生成などもできるようになっています。
次のサンプルはPub/Subでメッセージを7日間保持するTerraformの例となります。retain_acked_messagesをtrueとすることでサブスクライブされたメッセージを破棄せずに保持します。
message_retention_durationはメッセージの保有期間を決めることができます。なお、メッセージの保有期間は最大で7日です。
resource "google_pubsub_subscription" "message_hub" { name = "message_hub" topic = google_pubsub_topic.message_hub.name # subscribe from multiple subscriber message_retention_duration = "604800s" retain_acked_messages = true ack_deadline_seconds = 60 }
cloud.google.com beam.apache.org
イベントログ収集基盤
まだ構想段階ではありますが、データ量の多いイベントログのリアルタイムデータ連携基盤も作ろうとしています。イベントログについてもPub/Subに投げてもらうのが理想的ですが、クライアント側の負担も考慮し現在検討中です。
個人情報の取り扱い
BigQueryとPub/Subに保持される個人情報や秘密情報はアクセスできるユーザを制限しています。
BigQueryではカラムレベルでのアクセス制御を行い、Pub/Subはトピック単位でアクセス制御をしています。カラムレベルのアクセス制御を行うため、ポリシータグを個人情報や秘密情報のカラムに付与しています。ポリシータグとはBigQueryのテーブルに対してカラムレベルのアクセス制御を行うリソースです。
ポリシータグのカラム付与はTerraformで次のようにしてできます。ポリシータグ自体を作ることはまだTerraformではサポートされていないようです。
resource "google_bigquery_table" "table-name" { dataset_id = google_bigquery_dataset.<dataset-name>.dataset_id table_id = "<table-name>" schema = <<EOF [ { "name": "column-name>", "type": "STRING", "mode": "NULLABLE", "policyTags": { "names": [ "projects/<project-id>/locations/<location>/taxonomies/<taxonomies-id>/policyTags/<policy-tag-id>" ] } } ] EOF }
Pub/SubはDataflowで個人情報や秘密情報をNULL置換したトピックを作ろうと考えています。トピック単位で参照ユーザを制限することで、秘密情報を必要としないサブスクライバーからは参照できないようにします。
ビルド・デプロイ戦略
FluentdのプラグインとDataflowのビルド・デプロイ方法についてご紹介できればと思います。CI/CDツールとしてはCircleCIを使っています。Fluentdのプラグインはコンテナイメージを作り、作られたコンテナイメージをContainer RegistryにPUSHしています。Container RegistryのコンテナイメージはCompute Engine起動時にPULLされデプロイされます。データ欠損や遅延が発生しないよう2台のCompute Engineを1台ずつ再起動させ無停止でデプロイできるようにしています。
module "gce-container" { source = "Terraform-google-modules/container-vm/google" version = "~> 2.0" container = { image = "gcr.io/${var.project}/<image-name>" tty : true } restart_policy = "Always" } resource "google_compute_instance" "compute engine" { name = "name" machine_type = "n2-custom-4-10240" zone = "asia-northeast1-a" boot_disk { initialize_params { image = module.gce-container.source_image size = 500 } } metadata_startup_script = "#!/bin/bash /usr/bin/docker-credential-gcr configure-docker EOF" metadata = { gce-container-declaration = module.gce-container.metadata_value google-logging-enabled = "true" google-monitoring-enabled = "true" } service_account { email = "${google_service_account.tracker_app.email}" scopes = [ "https://www.googleapis.com/auth/cloud-platform", ] } }
Dataflowはカスタムテンプレートをビルドし、既存のパイプラインの更新を行います。
DataflowのカスタムテンプレートはGoogle提供のテンプレートをベースにカスタマイズしています。ビルド時にenableStreamingEngineオプションを利用すると使用するディスク容量を420GBから30GBにインフラ費用を抑えることができます。
次のコードはテンプレートをビルドする際にenableStreamingEngineオプションを指定する例です。
mvn -Pdataflow-runner compile exec:java \ -Dexec.mainClass=com.google.cloud.teleport.templates.PubsubToPubsub \ -Dexec.args="--project=${project_id} \ --tempLocation=gs://${project_id}/tmp \ --templateLocation=gs://${project_id}/templates/<template-name> \ --experiments=enable_stackdriver_agent_metrics \ --enableStreamingEngine \ --runner=DataflowRunner"
cloud.google.com cloud.google.com github.com
ビルドされたテンプレートはupdateオプションを使い既存のパイプラインの更新を行っています。互換性チェックにより、中間状態やバッファデータなどが前のジョブから置換ジョブに確実に転送することが可能です。
次のコードはPythonクライアントを使ってパイプラインを更新する例です。
def create_template_request(self, job_name, template_path, parameters, environment, update_options): request = self.dataflow.projects().templates().launch( projectId = self.project_id, location = 'us-central1', gcsPath = template_path, body = { "jobName": job_name, "parameters": parameters, "environment": environment, "update": update_options } ) return request def deploy_dynamic_destinations_datatransfer(self, active_jobs): job_name='dynamic_destinations_datatransfer' template_name = 'PubSubToBigQueryDynamicDestinationsTemplate' template_path = "gs://{}/templates/{}".format(self.project_id, template_name) input_subscription = 'message_hub' output_default_table = 'streaming_datatransfer.streaming_dynamic_changetracktransfer' parameters = { "inputSubscription": "projects/{}/subscriptions/{}".format(self.project_id, input_subscription), "outputTableSpec": "{}:{}".format(self.project_id, output_default_table), "autoscalingAlgorithm": "THROUGHPUT_BASED" } environment = { "machineType": 'n2-standard-2', "maxWorkers": 5 } update_options='false' if 'dynamic_destinations_datatransfer' in active_jobs: update_options='true' request = self.create_template_request(job_name, template_path, parameters, environment, update_options) request.execute()
監視
監視対象としてはデータの欠損や遅延が発生してないかCloud LoggingやMonitoring、Redashを使い監視しています。
データの欠損
データの欠損はリトライログとメモリの使用率を確認してます。リトライの上限を超えるとデータが欠損してしまうのと、メモリの使用率が100%に達すると基幹データベースへ接続ができなくなるからです。
次のようにしてFluentdのプラグイン側でリトライ時にログを出力しています。
def execute_changetracking(changetrack_ver) try = 0 begin try += 1 query = generate_query(changetrack_ver) changetrack_results = execute_query(query) if !changetrack_results.nil? changetrack_results.each_slice(@batch_size) { |rows| es = MultiEventStream.new rows.each do |r| r["changetrack_end_time"] = Time.now.utc es.add(Fluent::Engine.now, r) if changetrack_ver < r["changetrack_ver"] then changetrack_ver = r["changetrack_ver"] end end router.emit_stream(@output_tag, es) } update_changetrack_version(changetrack_ver) end rescue => e puts "Write Retry Cnt: #{try}, Table Name: #{@tablename}, Error Message: #{e}" sleep try**2 retry if try < @retry_max_times raise end end
リトライ時のログはCompute Engine起動時にデプロイしたCloud Loggingエージェントでログを取得しています。
metadata = { gce-container-declaration = module.gce-container.metadata_value google-logging-enabled = "true" google-monitoring-enabled = "true" }
次のようにしてCloud Loggingでメトリクスを作ります。今回リトライ回数が10回でアラートを通知するように設定しました。
resource "google_logging_metric" "retry_error_tracker_a_metric" { name = "retry-error-tracker-a/metric" filter = "resource.type=\"gce_instance\" severity>=DEFAULT jsonPayload.message: \"Write Retry Cnt: 10\" resource.labels.instance_id: \"${google_compute_instance.streaming_datatransfer_a.instance_id}\"" metric_descriptor { metric_kind = "DELTA" value_type = "INT64" } }
作成したメトリクスを使いCloud Monitoringでアラートを通知します。
resource "google_monitoring_alert_policy" "tracker_a_retry_error_alert_policy" { display_name = "Tracker A Retry Error" depends_on = [google_logging_metric.retry_error_tracker_a_metric] combiner = "OR" conditions { display_name = "condition" condition_threshold { filter = "metric.type=\"logging.googleapis.com/user/retry-error-tracker-a/metric\" resource.type=\"gce_instance\"" duration = "0s" comparison = "COMPARISON_GT" aggregations { alignment_period = "60s" per_series_aligner = "ALIGN_DELTA" } trigger { count = 1 } threshold_value = 0 } } enabled = true # gcloud alpha monitoring policies list --project=streaming-datatransfer-env notification_channels = ["projects/${var.project}/notificationChannels/${var.slack_notification_channel_id}"] }
メモリが枯渇するとプラグインから基幹データベースへのデータ取得が失敗するので、メモリ使用率が80%を超えた場合アラートを投げるよう設定してます。
resource "google_monitoring_alert_policy" "tracker_a_memory_alert_policy" { display_name = "Tracker A Memory Utilization" combiner = "OR" conditions { display_name = "condition" condition_threshold { filter = "metric.type=\"agent.googleapis.com/memory/percent_used\" resource.type=\"gce_instance\" resource.labels.instance_id=\"${google_compute_instance.streaming_datatransfer_a.instance_id}\" metric.label.\"state\"=\"used\"" duration = "60s" comparison = "COMPARISON_GT" aggregations { alignment_period = "60s" per_series_aligner = "ALIGN_MEAN" } trigger { count = 1 } threshold_value = 80 } } enabled = true notification_channels = ["projects/${var.project}/notificationChannels/${var.slack_notification_channel_id}"] }
データの遅延
データの遅延はRedashを使い定期的にクエリを投げて監視しています。
データの遅延はChange Trackingの開始時間とBigQueryのインサート時刻の差分を確認しています。不正なレコードが混在した際はBigQueryの_error_recordsテーブルに書き込まれるため、書き込みを検知してアラートを通知するようにします。
また、CPUの使用状況も遅延に影響があるため、メモリ使用率と同様に監視しています。
filter = "metric.type=\"agent.googleapis.com/cpu/utilization\" resource.type=\"gce_instance\" resource.labels.instance_id=\"${google_compute_instance.streaming_datatransfer_a.instance_id}\" metric.label.\"state\"=\"used\""
性能評価
リプレイスよってデータ欠損もなくなり、遅延時間としても取得のインターバルを除けば数秒程度でデータ連携を行うことができるようになりました。
次の図はChange Trackingで取得したレコード数と遅延時間(秒)の関係となります。取得するレコード数が多いと遅延しますが、40万レコードほどの更新でも5分以内に連携できる基盤を作ることができました。
またコスト面でも月間で約200万円ほどかかっていましたが約5万円程度にできました。
まとめ
今回リアルタイムデータ連携基盤についてご紹介しました。
現在ZOZOTOWNでは、リアルタイムデータを活用した案件が増えてきています。この記事を読んで、もしご興味をもたれた方は是非採用ページからお申し込みください。
https://tech.zozo.com/recruit/tech.zozo.com
また、8/27(木)にリアルタイムデータ連携基盤含めMAの取り組みについてのイベントを行いますのでぜひご参加ください。