急成長するLINE配信対象ユーザー数にGCPアーキテクチャの改善で立ち向かった話

ogp_image

はじめに

こんにちは、EC基盤本部・MA部・MA基盤チームでマーケティングオートメーションのシステムを開発している長澤(@snagasawa_)です。この記事では、社内で運用しているLINEメッセージ配信基盤の課題を、アーキテクチャ改善によって解決した話をご紹介します。

当時、LINEメッセージ配信基盤では、配信処理を担っていたApp Engineで2つの課題を抱えていました。「メモリ不足による配信処理の中断」と「リクエストタイムアウト後の意図しない処理の継続」です。一時はスケールアップによるメモリ増強を検討しましたが、後者の課題を解決できないためアーキテクチャの変更に着手しました。

結果として、App Engineが担っていた処理をBigQuery・Cloud Storage・Dataflow Batch Jobに置き換えることにより、この2つの課題を解決しました。加えて、配信対象ユーザーの増加にも対応しました。この記事が類似するシステムを開発されているエンジニアの方にとって参考になれば幸いです。

LINEメッセージ配信基盤とは

はじめにLINEメッセージ配信基盤を説明します。

LINEメッセージ配信基盤とは、我々が「LINE Friendship Manager(以下、LFM)」と呼んでいるLINEユーザー向けのメッセージ配信システムです。

ZOZOTOWNではLINEの企業公式アカウントを運用しています。エンドユーザーはこの公式アカウントをLINE友だちに追加することで、ZOZOTOWNのキャンペーン情報やお気に入りアイテムの値下げ情報などをLINE上で受け取ることができます。 LFMはLINE社が提供するMessaging APIを利用して、LINE友だちにメッセージやリッチメニューのバッチ配信を予約配信できます1

LFMの詳細は採用イベントにて発表した登壇資料があります。こちらを参考にしてください。

アーキテクチャの改善に取り組んだ背景

ZOZOTOWNではLINEを重要なユーザーコミュニケーションのチャネルと位置づけて、戦略的なマーケティング施策を展開しています。

www.linebiz.com

その甲斐もあって、昨今のZOZOTOWN LINE公式アカウントでは年々LINE友だちが増加しています。具体的には、累計LINE友だち数(配信対象外のブロックユーザーを含む)が2021年4月に1,000万ユーザーを突破し、2018年年初の約1.4倍になりました。

line_friends_graph

LINE友だち数が増えれば、LFMで扱うユーザー数もまた増えます。そのため、LFMでは冒頭の課題を解決しつつ、今後の配信対象ユーザー数増加に耐えうるスケーラビリティを備えるようアーキテクチャ改善に着手しました。

改善前のアーキテクチャ

LFM改善前のメッセージ配信方法を紹介します。

前提として、配信に必要なデータはそれぞれ以下の通り格納されています。

  • Cloud SQL
    • 配信開始の予約時間
    • 配信対象ユーザーを抽出する条件
    • メッセージのコンテンツ情報(画像のURLやクリック後の遷移先URLなど)
  • BigQuery
    • 配信対象のユーザーIDリスト

処理の流れは以下の通りです。

  • (1) App Engine Cron Jobで予約時間になった配信処理を開始
  • (2) Cloud SQLからメッセージのコンテンツを取得
  • (3) BigQueryからユーザーIDのリストを取得
  • (4) ユーザーIDとメッセージの組み合わせをCloud Pub/Subにpublish
  • (5)〜(8) Cloud FunctionsでLINE Messaging APIにリクエスト

参考までに、以下のコードがこの (2)~(4) の実装を単純化したものになります。

require 'google/cloud/bigquery'
require 'google/cloud/pubsub'
require 'json'

# ユーザーIDのリストを取得
bigquery = Google::Cloud::BigQuery.new
sql = 'SELECT user_id FROM `project_id.dataset_id.users`'
user_ids = bigquery.query(sql)

# メッセージを生成
content_id = 1
content = Content.find(content_id)
messages = user_ids.map do |user_id|
  {
    user_id: user_id,
    messages: [
      {
        type: 'image',
        url: content.image_url,
        size: 'full',
        action: {
          type: 'uri',
          uri: content.action_url,
        },
      }
    ]
  }
end

# メッセージをCloud Pub/Subにpublish
pubsub = Google::Cloud::Pubsub.new
topic_name = 'topic_name'
topic = pubsub.topic(topic_name)
topic.publish do |batch|
  messages.each do |message|
    batch.publish(JSON.dump(message))
  end
end

この実装はMVPとしてファーストリリース当初の要件を問題なく満たすものでしたが、運用中に先述の課題が発覚しました。

課題1: メモリ不足による配信処理の中断

1つ目の「メモリ不足による配信処理の中断」は、上記の (2)〜(4) の処理を行なっていたApp EngineのFlexible Environment(以下、FE)で発生しました2

下のグラフは、実際に配信が中断した直前1週間のメモリ使用量推移です。数日間稼働しているFEのインスタンスのメモリ使用量が右肩上がりに増えていき、グラフの最後ではメモリ不足による502エラーによって配信が中断されて2GiBほど減少しています。このようにメモリリークによって消費メモリが肥大しているため、スケールアップしたとしても502エラーの再発が懸念されました。

memory_usage

課題2: リクエストタイムアウト後の意図しない処理の継続

それでもまだメモリ不足だけであればスケールアップを検討する余地もありましたが、別の課題が存在しました。それが2つ目の「リクエストタイムアウト後の意図しない処理の継続」です。

FEにはリクエストの最大タイムアウトが60分という制約があります。一方で、LFMでは時折このタイムアウトを超過しても配信処理が継続していることに気がつきました。調査の結果、これはバックグラウンドスレッドによるものでした。

App Engineのドキュメントにもある通り、FEではバックグラウンドスレッド・バックグラウンドプロセスが動作します。 LFMのコード上ではバックグラウンド処理を行っていませんでしたが、使用していたCloud Pub/Subのクライアントライブラリで内部的にスレッドを生成していました。この影響で、タイムアウトになると504エラーのレスポンスが返ってくるものの、バックグラウンドで配信処理は継続するという現象が発生していました。

開発チームとしてもこれは事後的に発覚したものであり、実装当初からは想定外の挙動でした。また、バックグラウンド処理自体、推奨されていません。そのため、このバックグラウンド処理を避けるべくFE以外の選択肢を検討し始めました。

まず、同じApp Engineの中で、タイムアウト上限が24時間であるStandard EnvironmentのBasic Scaling・Manual Scalingが候補となりました。しかし、インスタンスクラスの最大メモリが2048MBとスペック不足であり断念しました。

cloud.google.com

以上の理由からApp Engine以外での解決に取り組みました。

改善後のアーキテクチャ

以下が改善後の配信方法です。

new_lfm_architecture

BigQueryへのメッセージ出力

new_lfm_architecture_01

1ステップ目はBigQueryへのメッセージ出力です。変更前の配信開始時の処理では、BigQueryから取得したユーザーIDのリストと、Cloud SQLから取得したメッセージのコンテンツを組み合わせてメッセージを生成していました。しかし、変更後はユーザーIDとメッセージのコンテンツを組み合わせた「クエリ」を生成し、その実行結果をBigQueryの別のテーブルに非構造化データとして出力します。

具体的には以下のような再帰メソッドを使うことで、Rubyの連想配列をBigQueryのStructの文字列に変換し、その結果をクエリに組み込んで非同期のクエリジョブを実行します。

def hash_to_bigquery_struct_string(hash, parent_key: nil)
  array_of_hash_to_bigquery_struct_string =
    proc do |array_of_hash, key|
      array_of_hash
        .map { |hash| hash_to_bigquery_struct_string(hash) }
        .join(",\n")
        .then { |str| "[\n#{str}\n] AS #{key}" }
    end

  case hash
  when Array then array_of_hash_to_bigquery_struct_string.call(hash, parent_key)
  when Hash
    hash
      .map { |key, value|
        case value
        when Array then array_of_hash_to_bigquery_struct_string.call(value, key)
        when Hash then hash_to_bigquery_struct_string(value, parent_key: key)
        else "#{value.is_a?(String) ? "'#{value}'" : value} AS #{key}"
        end
      }
      .join(",\n")
      .then { |str| "STRUCT(#{str}#{parent_key ? "\n) AS #{parent_key}" : ')'}" }
  else raise(ArgumentError, "#{hash} is neither hash nor array of hash.")
  end
end


message_contents = [
  {
    type: 'image',
    url: 'https://cdn.sample.jp/images/example_01.png',
    size: 'full',
    action: {
      type: 'uri',
      uri: 'https://zozo.jp/sale/',
    }
  }
]

hash_to_bigquery_struct_string(message_contents, parent_key: 'messages')
# =>
#    [
#       STRUCT('image' AS type,
#              'https://cdn.sample.jp/images/example_01.png' AS url,
#              'full' AS size,
#              STRUCT('uri' AS type,
#                     'https://zozo.jp/sale/' AS uri
#              ) AS action)
#    ] AS messages
CREATE TABLE dataset_id.table_id
OPTIONS (
  expiration_timestamp=TIMESTAMP "2022-01-01 00:00:00"
)
AS
(
  WITH segments AS (
    SELECT '1111' AS user_id UNION ALL
    SELECT '1112' AS user_id
  )

  SELECT
    user_id,
    [
      STRUCT('image' AS type,
             'https://cdn.sample.jp/images/example_01.png' AS url,
             'full' AS size,
             STRUCT('uri' AS type,
                    'https://zozo.jp/sale/' AS uri
             ) AS action)
    ] AS messages
  FROM
    segments
)

上のクエリ実行すると次のテーブルが作成されます。

exported_table_schema exported_table_preview_01

これによってApp Engineのメモリ消費を抑制しつつ、LINE Messaging APIに渡すリクエストボディとほぼ同様の構造のままデータをテーブルに出力できます。実際にこの変更によってStandard Environmentの「B8」インスタンスクラスでも事足りるようになりました。 配信後のテーブルは不要なため、クエリのオプションにexpiration_timestampを指定します。これで自動的にテーブルが削除され、ストレージの課金コストを削減できます。

cloud.google.com

Cloud Storageへのファイル出力

new_lfm_architecture_02

2ステップ目では、BigQuery APIによって前の手順で出力したテーブルをJSONファイルとしてCloud Storageに出力します。

cloud.google.com

注意点としては、Cloud Storageへの単一ファイルの出力には最大1GBという上限があります。このため、ワイルドカードURLを指定することで自動的に複数ファイルに分割して出力されるようにします。

このJSONファイルも配信後は不要なため、 バケットを作成する際にライフサイクルルールを指定します。これで自動的にファイルが削除され、 ストレージの課金コストを削減できます。GCPのリソースはTerraformで管理しており、以下のように記述することでバケットにライフサイクルルールを適用できます。

resource "google_storage_bucket" "pubsub-message" {
  name          = "${var.project}-pubsub-message"
  location      = "US"
  force_destroy = true

  lifecycle_rule {
    action {
      type = "Delete"
    }
    condition {
      age = 1
    }
  }
}

Dataflow Batch Jobによるメッセージのpublish

new_lfm_architecture_03

3ステップ目では、Cloud StorageのJSONファイルを読み込み、Pub/Sub TopicへpublishするDataflow Batch Jobを実行します。このJobもCloud Storageへのファイル出力後にRailsから起動します。

Dataflow Batch Jobは処理対象のデータ量に合わせてワーカーとインスタンスをオートスケールさせるため、大量のデータも高速に処理できます。今回はGoogle Cloudが提供するテンプレートの「Cloud Storage Text to Pub/Sub (Batch)」を利用します。 テンプレートの利用により、Dataflowの実装が不要になります。

今回のケースでは、RubyのGCPクライアントライブラリgoogle-cloud-rubyでAPIがサポートされていなかったため、 google-api-ruby-clientgoogle-auth-library-rubyを利用して実装しました。

require 'googleauth'
require 'google/apis/dataflow_v1b3'
DataflowV1b3 = Google::Apis::DataflowV1b3

class Dataflow
  def launch_gcs_to_pubsub_template(project_id:, job_name:, input_file_pattern:, output_topic:)
    service.launch_project_template(
      project_id,
      DataflowV1b3::LaunchTemplateParameters.new(
        job_name: job_name,
        parameters: {
          inputFilePattern: input_file_pattern,
          outputTopic: "projects/#{project_id}/topics/#{output_topic}",
        },
        environment: DataflowV1b3::RuntimeEnvironment.new(
          temp_location: "gs://#{project_id}-dataflow-template/temp",
        ),
      ),
      gcs_path: 'gs://dataflow-templates/latest/GCS_Text_to_Cloud_PubSub',
    )
  end

  private

  def service
    @service ||= DataflowV1b3::DataflowService.new.then do |service|
      service.authorization = credentials
      service
    end
  end

  def credentials
    credentials = Google::Auth::ServiceAccountCredentials.make_creds(scope: [DataflowV1b3::AUTH_COMPUTE])
    credentials.fetch_access_token!
    credentials
  end
end

Cloud Pub/Sub & App Engine(フロー制御)

new_lfm_architecture_04

4ステップ目では、App EngineとCloud Pub/Subによるフロー制御をしています。この実装はオプションであり、要件次第では不要です。今回はLINE Messaging APIのレート制限を回避するために実装しています。

LINE Messaging APIには特定のエンドポイントを除いて「2000リクエスト/秒」という制限があり、超過するとエラーが返る仕様になっています。 Dataflowは大量データを高速に処理可能ですが、それゆえにフロー制御をしないスループットだとこのレート制限を超過してしまいます。そのため、今回は2種類のPub/Sub Topicを用意してあります。ひとつはフロー制御用で、もうひとつはCloud Functionsの配信トリガー用です。

フロー制御用のTopicにメッセージがpublishされると、App Engineからメッセージをsubscribeし、1秒ごとに配信用Topicへpublishします。この処理はスループットが一定に保たれるため、配信メッセージ数が増加してもスケーリングは不要になります。なお、別のアプローチとして、Dataflowのパラメータによるワーカー数の固定も試しました。これはJavaのStackOverflowErrorが頻発した事、毎秒制御のほうがより確実であった事からあえなく断念しました。

この処理はApp EngineのCron Jobによって1分間隔で実行しています。 注意点として、このCron Jobは開始時に前の処理を終了していないとその処理がスキップされてしまうため、 次の開始時間を超えないようにレスポンスを返す必要があります。

flow_control

cloud.google.com

Cloud Pub/Sub & Cloud Functions(配信)

new_lfm_architecture_05

最後の5ステップ目に、Pub/SubトリガーのCloud FunctionsがLINE Messaing APIにリクエストを送信し、エンドユーザーのLINE上でメッセージが表示されます。 Cloud FunctionsもPub/Sub Topicのトラフィックによってインスタンスがスケールし、並列処理によって大量メッセージを高速に処理できます。参考値として、フロー制御をしない場合、約500万ユーザーへの配信が35分ほどで完了します。この時間にはBigQueryへのテーブル出力やDataflow Jobの起動による10分程度のオーバーヘッドを含みます。

ここまでの一連の流れによって、先の課題に対応しつつ、GCPのリソースを活用したスケーラブルな配信処理が可能になります。

各GCPサービスのボトルネック

さて、スケーラブルになったとして現実的にはどこまでスケール可能なのでしょうか。ここまで読んでいただいた方も気になる部分かと思います。結論としては、スケール上限はGCPの使い方に準じます。今回使用している各GCPサービスにおけるボトルネックについて紹介します。

BigQuery

BigQueryはよくペタバイト級のデータウェアハウスとして謳われますが、クエリの内容によってはそのスケールメリットを享受できず、ボトルネックになる可能性があります。 下記記事によると、ORDER BY句や分析関数、番号付関数などはすべてのデータを対象にシングルノード上で処理が行われるという性質上、メモリ上限に引っかかりやすいとのことです。

Operations that need to see all the data in the resulting table at once have to operate on a single node. Un-partitioned window functions like RANK() OVER() or ROW_NUMBER() OVER() will operate on a single node. Another operation that is mentioned to face a lot of problems is the ORDER BY clause. It is an operation that requires a single node to see all data, so it may hit the memory limit of a single.

medium.com

公式ドキュメントにおいても、クエリパフォーマンスの最大化について以下のような説明があります。

When you use an ORDER BY clause, it should appear only in the outermost query. Placing an ORDER BY clause in the middle of a query greatly impacts performance unless it is being used in a window (analytic) function. Another technique for ordering your query is to push complex operations, such as regular expressions and mathematical functions to the end of the query. Again, this technique allows the data to be pruned as much as possible before the complex operations are performed.

cloud.google.com

LFMでもABテストのグループ分けのために分析関数を使用しており、ユーザーID数が1,400万に達すると以下のメモリ超過エラーに引っかかることが判明しています。直近では問題ありませんが、将来的には適切なバッチサイズに分割してクエリを実行するように修正する必要があります。

Resources exceeded during query execution: The query could not be executed in the allotted memory.

Cloud Storage

Cloud Storageへの出力は先述の通り、ワイルドカードURLにより複数ファイルに分割する必要があります。 しかし、速度的には十分に高速であるため、将来的にボトルネックになりうる懸念はほとんどありません。データ量によって出力完了までの時間は多少増減しますが、参考値として500万ユーザーを対象する配信でも1分程度で完了します。

Dataflow

Dataflowはオートスケールに加えて、パラメータでCompute Engineのマシンタイプを変更可能なため、ボトルネックになりうる懸念は少ないです。しかし、ジョブの同時実行数はプロジェクトにつき上限「25個」と少ないため要注意です。これはソフトリミットのため、サポートへの問い合わせにより上限緩和できます。

cloud.google.com

Cloud Pub/Sub

Cloud Pub/Subはリージョンごとのスループットに上限が設けられているため、メッセージサイズおよびメッセージ数が大きい場合には注意が必要です。

Pub/Subで割り当て可能なリージョンは大規模と小規模の2種類があり、スループットの上限はこの種類によって大きく差があります。例えばLFMのようにPub/SubトリガーのCloud Functionsを利用している場合、pushサブスクライバーのスループットに上限があります。大規模リージョンでは「8,400,000 KB(140 MB/秒)」であるのに対し、小規模リージョンでは「1,200,000 KB(20 MB/秒)」と7分の1しかありません。このため、同時間帯に複数の配信がされたり、メッセージサイズが大きいユースケースには注意が必要です。

前提条件によりますが、Pub/Subのリージョンはリクエストを送信したクライアントのリージョンにルーティングされることがあります。LFMのケースであれば、フロー制御のApp Engineインスタンスのリージョンが配信用Pub/Subのリージョンになります。したがって、Pub/Subのリージョンと合わせてリクエストを送信するクライアントのリージョンにも留意する必要があります。

現在のLFMは小規模リージョンに存在しており、配信によっては上限近くのスループットに達するため、いずれは大規模リージョンへ移行する予定です。

cloud.google.com

Cloud Functions

Cloud FunctionsはPub/Subトリガーで実行される場合、バックグラウンド関数のみに適用される割り当てが存在するため要注意です。

特にLFMのようなフロー制御をしない場合、パフォーマンス要件によっては関数ごとの最大同時呼び出し回数がボトルネックになりうる懸念があります。LFMではフロー制御に加えてトリガーのPub/Sub TopicとCloud Functionsを並列化しているため、この呼び出し回数がボトルネックにはなりません。

cloud.google.com

今後の改善

今後の改善案として検討しているのは、Dataflow SQL Jobの採用です。Dataflow SQL Jobは内部的にBigQuery Storage Read APIを利用します。これによりCloud Storageへの出力を介さずにBigQueryから直接Pub/Sub Topicにメッセージをpublishできます。

現在はまだDataflow SQLクエリの実行方法がCloud Consoleかgcloudコマンドの2択しか存在しないため、クライアントライブラリでのサポート開始後に本格的に採用検討します。 cloud.google.com

まとめ

本記事では、GCPのアーキテクチャ改善によって、既存の課題を解決しつつ更なるスケーラビリティを実現したLINEメッセージ配信基盤について紹介しました。今後もDataflow SQL Jobの採用検討などにより、さらなる改善を取り入れていきたいと考えています。

さいごに

ZOZOテクノロジーズではGCPのアーキテクチャ改善やマーケティングに関連するプロダクトの開発に関心のあるエンジニアを募集しています。ご興味のある方は下記リンクからぜひご応募ください!

tech.zozo.com


  1. 一部のバッチ配信とリアルタイム配信は別システムでも配信しています。詳しくは別記事を参照ください。

  2. FEを使っていた理由は、その当時使っていたRubyのバージョンにStandard Environmentが未対応だったため。

カテゴリー