Vertex Feature Storeの機械学習システムへの導入

OGP

こんにちは、データシステム部推薦基盤ブロックの寺崎(@f6wbl6)です。現在、推薦基盤ブロックではデータサイエンス部MLOpsブロックのメンバーと協力しながらMLOps基盤の構築を進めています。本記事ではMLOps基盤構築の一環として進めているVertex Feature Storeの機械学習システムへの導入に関する知見およびVertex Feature Storeを導入する上での制限や課題をご紹介します。

MLOps基盤に関する取り組みについては以下のテックブログでも取り上げていますので、こちらもご参照ください。

techblog.zozo.com

techblog.zozo.com

techblog.zozo.com

推薦基盤ブロックが抱える機械学習システムの課題

Vertex Feature Storeを説明する前に、推薦基盤ブロックでの機械学習システム(以降、MLシステム)に関する課題について紹介します。

私たちのチームではこれまでに様々なMLモデルやシステムを開発し運用してきています。そもそもMLシステムを構築・運用していくことは、今後継続的にメンテナンスコストがかかることとトレードオフです。このことはMLOps領域に携わっているエンジニアなら何度も目にしたことがある例の図がそれを表しています。

MLシステムに関する技術的負債の有名な図

引用:Hidden Technical Debt in Machine Learning Systems

私たちのチームでもその例に漏れず、開発・運用するモデルが増えるにつれて以下のような課題が浮き彫りとなっていました。

  1. 案件の度に様々なコードや特徴量を生成するクエリが流用・改変され、車輪の再発明がなされている
  2. モデルの入力として与えているデータが適切なものであることを保証できていない
  3. モデルの出力として得ている予測結果が適切なものであることを保証できていない

まず、様々な案件で似たようなコードや特徴量が量産されているという課題があります。特に、特徴量に関しては基本的にBigQueryのデータマートやリアルタイムデータ基盤からユーザー情報や商品情報を取得して生成するというのはどの案件でも共通しています。その結果、一度作ったクエリをマイナーアップデートしたものが各所に散らばっている・もしくはそれを認識できていない状態となる傾向があります。つまり、知らず知らずのうちに車輪の再発明をし、メンテナンスの及ばないものが増えていっていることを意味します。

2つ目と3つ目は運用しているMLモデルのモニタリングに関する課題です。MLシステムにおいて、「システムがエラーを起こしていない状態」の定義が困難であることは、MLシステムを運用したことのある方なら容易に想像がつくものと思います。特にMLモデルが正しい振る舞いをしていると保証するには、モデルに与えるデータ(特徴量)が適切であり、またモデルのアウトプットも想定していたものであることを検証する必要があります。

MLモデルの状態や入出力データが正常であるとする定義が非常に難しいということは言うまでもありませんが、まずはそれらを定常的に観測するための仕組みを設けないことには検証自体ができません。

機械学習システムの課題に対する取り組み

前述したもの以外にも様々な課題はありますが、現在は「MLシステム開発の高速化および標準化」を主軸としながら優先順位をつけてそれぞれの課題に取り組んでいます。具体的な取り組みについて以前別の記事で今後の展望として取り上げられているのでこちらもご一読ください。

techblog.zozo.com

私たちのチームでは上記の「案件の度に様々なコードやクエリについて車輪の再発明がされている」という課題に対する取り組みの一環で、Vertex Feature Storeの導入を進めてきました。Vertex Feature Storeを導入することで、特徴量の再発明を防止して適切に再利用し、また特徴量が適切にモニタリングできる状態の実現を目指しました。

Feature Store

概要

Vertex Feature Storeの説明の前に、まずFeature Storeとはどのようなものなのかを簡単に説明します。

Feature Storeは機械学習システムで扱う特徴量を管理・提供するための基盤の総称であり、2017年にUberの以下のブログで紹介されたのが初出とされています。

eng.uber.com

Feature Storeは機械学習で用いる特徴量を共有・再利用することを主な目的としたもので、現在までにOSS・マネージドサービス問わず様々な形でFeature Storeが公開されています。以下にFeature Storeの概念図を示します。

Feature Storeの構成要素

各構成要素の詳細については以下の記事が詳しいので、ぜひ参考にしてみてください。

www.tecton.ai

いくつかFeature Storeを紹介すると、OSSとして公開されているものとしてはFeastHopsworksなどがあり、マネージドFeature StoreとしてはTectonや2020年のAWS re:Inventで発表されたAmazon SageMaker Feature Storeなどがあります。その他にもAirbnbのZiplineやFacebookのFBLearner、AppleのOvertonといった、各社で構築しているML基盤にもその要素の1つとしてFeature Storeが組み込まれているようです。

Feature Storeの選定

このように様々なFeature Storeが公開されている中で、私たちのチームでは2021年5月のGoogle I/Oで発表されたVertex AIのコンポーネントの1つであるVertex Feature Storeを使うことにしました。

選定にあたっての理由は大きく以下の2点です。

  1. MLOps基盤の関連資産をGCPに集約させている
  2. Feature Storeの技術検証を高速に実施するため、マネージドサービスが望ましい

1点目については大前提とも言えるのですが、GCPをベースとしてMLOps基盤を構築している以上、GCPの外部に依存関係を極力持ちたくないという意図です。

2点目はFeature Storeという、まだ発展途上の概念を導入する上での保険です。調査を進めていく中で有用そうであることはわかったものの、導入することで何がどこまで解決できるのか・できないのかの技術検証を高速に行いたいという意図からマネージドで手軽に使えるものを選んでいます。

Vertex Feature Store

Vertex AI

Vertex Feature Storeは、GCPが提供しているFeature Storeのフルマネージドサービスです。2022年1月現在でどのような機能が提供されているかを以下の表にまとめます。

機能 提供されている機能の概要
Serving バッチ/オンラインサービング可能。ただしストリーミングデータのサービングは不可。
Storage オフライン/オンラインストアが提供されている。特徴量の取り込み処理は自前で用意する必要がある。
Transform サポートなし
Monitoring Feature Store自体に対する負荷等のメトリクスはCloud Monitoring経由で監視可能。
特徴量の監視は1日単位のスナップショットを確認可能。
Registry GUIベースでの特徴量共有・探索が可能。Python SDKでの取得も可能。

上記表の中で最もボトルネックとなる点はTransformのサポートがないことと、Storageに特徴量を取り込む処理は自前で用意する必要がある点です。Feature Storeは特徴量を管理・提供するための基盤ですが、その特徴量自体をFeature Storeに取り込む処理が必要となります。また特徴量として扱われるデータはほとんどの場合何らかのrawデータを加工したものであるため、rawデータを加工する何らかの変換処理も必要です。Vertex Feature Storeを利用する場合はこのデータを取り込む処理と加工する処理(=Transform)を用意しなければいけません。

次に、Vertex Feature StoreをMLシステムへ導入していく上でこれらのボトルネックをどのように解消したかをご紹介します。

Vertex Feature StoreをMLシステムに導入する

Vertex Feature Storeの導入にあたり、大きく以下の2点に分けて説明します。

  • 特徴量の登録・管理方法
  • 特徴量取り込みのための仕組み作り

特徴量の登録

システム構成

まず、管理対象にしたい特徴量名とそのメタデータをVertex Feature Storeに登録します。システム構成は以下の通りです。

特徴量登録のシステム構成

特徴量の一覧を記述した以下のようなyamlファイルをGitHubでバージョン管理し、新しく特徴量を登録する場合はこのyamlファイルに追記していく形としています。

- feature_id: user_id
  description: zozotown user_id
  value_type: int64
- feature_id: pv_per_day
  description: ...
  value_type: float64
- feature_id: frequent_device_type
  description: ...
  value_type: object
...

登録対象となる特徴量はyamlファイルに記述された特徴量とVertex Feature Storeに登録されている特徴量の差分です。GitHub Actionsでその差分となる特徴量を抽出し、特徴量を登録するためのVertex Pipelinesを実行する構成としています。

登録済み特徴量との比較

Vertex Pipelinesでは指定されたyamlファイルの内容を元に、Feature Storeへ特徴量を登録するようなコンポーネントを作成して実行しています。

実装例

上記yamlファイルを入力として特徴量の登録処理を行うコンポーネントの実装例を以下に示します。

import logging
import time

import fire
from google.api_core.client_options import ClientOptions
from google.cloud.aiplatform_v1 import FeaturestoreServiceClient
from google.cloud.aiplatform_v1.types import feature, featurestore_service


def main(
        project_id: str,
        region: str,
        featurestore_id: str,
        entity_id: str,
        feature_requests: str,
):
    logging.basicConfig(level=logging.INFO)
    vfs_feature_types = {
        "int64": feature.Feature.ValueType.INT64,
        "object": feature.Feature.ValueType.STRING,
        "float64": feature.Feature.ValueType.DOUBLE,
        "array": feature.Feature.ValueType.STRING_ARRAY,
    }

    fs_client = FeaturestoreServiceClient(
        client_options=ClientOptions(api_endpoint=f"{region}-aiplatform.googleapis.com")
    )
    entity_type_path = fs_client.entity_type_path(
        project_id, region, featurestore_id, entity_id
    )
    create_feature_requests = [
        featurestore_service.CreateFeatureRequest(
            feature=feature.Feature(
                value_type=vfs_feature_types[fr["value_type"]],
                description=fr["description"],
            ),
            feature_id=fr["feature_id"],
        )
        for fr in feature_requests
    ]

    # 一度のリクエスト数の上限は 100 features のためミニバッチに分ける
    split_requests = [
        create_feature_requests[i: i + 100]
        for i in range(0, len(create_feature_requests), 100)
    ]
    for requests in split_requests:
        logging.info(f"requests: {requests}")
        lro_response = fs_client.batch_create_features(
            parent=entity_type_path, requests=requests
        )
        create_feature_response = lro_response.result(timeout=3600)
        logging.info("create_feature_response:", create_feature_response)
        time.sleep(60)  # quota


if __name__ == "__main__":
    fire.Fire(main)

実装上の注意点

ドキュメントにも記載されている内容ではありますが、特徴量の登録時には以下の制約に注意する必要があります。

  • 特徴量名はアルファベット小文字・数字・アンダースコアのみで構成されていること
  • 特徴量の登録は1分間に100件まで
  • 特徴量の登録処理はステップ実行される

1点目は既に運用されているモデルの特徴量名でこの制約が守られていないものがあった場合に関する注意点です。Vertex Feature Storeに登録する特徴量名は上記のような制約があるため、特徴量の登録時とFeature Storeから特徴量を取得した時に変換処理を加えることになります。

2点目はまとめて特徴量を登録したいような場合にはFeature Storeのquotaを意識しなければいけないため、以下のように100件ずつのバッチに分ける・かつ1分のsleepを挟むなどの処理が必要となります。

# requests は最大100件
for requests in split_requests:
    logging.info(f"requests: {requests}")
    lro_response = fs_client.batch_create_features(
        parent=entity_type_path, requests=requests
    )
    create_feature_response = lro_response.result(timeout=3600)
    logging.info("create_feature_response:", create_feature_response)
    time.sleep(60)  # quotaによる制約のため

3点目は、Vertex Feature Storeへの特徴量の登録処理としてSDKではバッチ実行用のメソッドが提供されているものの内部では各特徴量が一件ずつ登録されている、ということです。例えば登録対象の特徴量100件があったうちの50件目に不正な特徴量名などがあった場合、100件の登録失敗ではなく50件の登録が成功し残り50件の登録が失敗します。このような場合、特徴量の登録処理をリトライしても同名の特徴量が登録できない旨のエラーで弾かれるため、登録できなかった残りの50件を抽出して登録処理を実行する必要があります。

特徴量取り込みのための仕組み作り

システム構成

続いて、Vertex Feature Storeに対して具体的な特徴量の値を取り込む方法について設計します。設計にあたり、特徴量のサービングはバッチ実行にのみ対応することを前提としています。理由としてVertex Feature Storeではオンラインサービング機能が提供されているものの、特徴量の値をリアルタイムに更新する仕組みが現時点では提供されていません。そのため、まずはバッチサービングという軽めの要件で設計して検証します。

これを念頭に設計した業務フローが以下となります。

特徴量取り込みのシステム構成

クエリ・変換部分と特徴量取り込み部分をそれぞれパイプラインとして実装し、この実行順を制御する大元のパイプラインとしてrecurring ingest pipelineを設けています。query transform pipelineではBigQueryからデータを取得して特徴量へと変換してGCSにCSVファイルとして保存します。ここで得られた前処理済みの特徴量をingest pipelineによってFeature Storeへ取り込みます。これらの一連の流れを管理するワークフローツールにはVertex Pipelinesを利用しています。

更にこの特徴量の取り込み処理(=更新処理)は定期的に実行する必要があるため、Vertex Pipelinesの公式ドキュメントで紹介されている、Cloud SchedulerとCloud Functionsでの定期実行の構成を組んでいます。 AI Platform PipelinesにおけるRecurring Runのような機能がVertex Pipelinesには存在しないため、スケジュール実行やパイプラインの重複実行の制御についてはこのように自前で構成することになります。パイプラインの重複実行を制限する方法については以下の記事で取り上げています。

techblog.zozo.com

なぜ特徴量への変換処理をDataflow等へオフロードしていないのか?

これらの構成を検証する上で、既存MLシステムでの特徴量を取得する部分をVertex Feature Storeに切り替えることを最初のステップとして想定しています。そしてこの既存MLシステムでは特徴量を取得する部分がDataflowで処理されていないことが主な理由です。本来はデータの取得と変換をDataflowで完結させる構成を取りたいものの、既存処理をApache Beamに書き換える作業は検証段階としてはコストがかかりすぎるため今回はこのような構成としました。

なぜ定期実行の仕組みにCloud Composerを使っていないのか?

これは理由として以下の2点が挙げられます。

  • 定期実行の仕組みのためだけにCloud Composerを使うのはコストがかかりすぎる
  • Vertex Pipelines + Cloud Scheduler + Cloud Functionsという構成が楽だった

コストという点ではCloud Composerの料金的な面もありますが、私自身がCloud Composerに慣れていなかったことによる学習コストの側面も含んでいます。Cloud SchedulerとCloud Functionsを使うことで依存するサービスが増えて管理が煩雑になるように思えますが、やっていることは定期的にVertex Pipelinesを実行しているだけなので見た目以上にシンプルな構成となっています。

実装例

以下に特徴量の取り込み処理を行うコンポーネントの実装例を示します。BigQueryにクエリを投げて変換処理を加えた結果がCSV形式でGCSに出力された後の状態を想定をしています。

import logging

import fire
import gcsfs
from google.api_core.client_options import ClientOptions
from google.cloud.aiplatform_v1 import FeaturestoreServiceClient
from google.cloud.aiplatform_v1.types import featurestore_service, io


def main(
    project_id: str,
    region: str,
    featurestore_id: str,
    entity_id: str,
    feature_ids: str,
    gcs_ingest_csv_folder: str,
    entity_id_field: str,
    timestamp_field: str,
    worker_count: int = 1,
):
    logging.basicConfig(level=logging.INFO)
    fs = gcsfs.GCSFileSystem()
    csv_files = [f"gs://{path}" for path in fs.glob(f"{gcs_ingest_csv_folder}/*.csv")]

    fs_client = FeaturestoreServiceClient(
        client_options=ClientOptions(api_endpoint=f"{region}-aiplatform.googleapis.com")
    )
    entity_type_path = fs_client.entity_type_path(
        project_id, region, featurestore_id, entity_id
    )
    feature_specs = [
        featurestore_service.ImportFeatureValuesRequest.FeatureSpec(id=feature_id)
        for feature_id in feature_ids
    ]
    import_request = featurestore_service.ImportFeatureValuesRequest(
        entity_type=entity_type_path,
        csv_source=io.CsvSource(gcs_source=io.GcsSource(uris=csv_files)),
        entity_id_field=entity_id_field,
        feature_specs=feature_specs,
        feature_time_field=timestamp_field,
        worker_count=worker_count,
    )
    lro_response = fs_client.import_feature_values(import_request)
    import_feature_values_response = lro_response.result(timeout=3600)
    logging.info("import_feature_values_response:", import_feature_values_response)


if __name__ == "__main__":
    fire.Fire(main)

実装上の注意点

特に特徴量の取り込み部分にいくつかハマりどころがあったのでご紹介します。

  • 取り込み対象のデータソースはVertex Feature Storeと同一リージョンに配置する
  • 特徴量の更新時間を示すタイムスタンプの形式はデータソースごとに異なる
  • 登録した特徴量と型の不整合があっても処理自体は正常終了するが取り込みはされない

ドキュメントに記載されている内容がほとんではありますが、これらについて1つずつ説明していきます。

取り込み対象のデータソースはFeature Storeと同一リージョンに配置する

2022年1月時点ではVertex Feature StoreのデータソースとしてBigQueryのテーブルかGCSのCSVまたはAvro形式のファイルを利用できます。これらのデータを読み込んでVertex Feature Storeに特徴量を取り込みますが、このデータを配置しているリージョンをVertex Feature Storeと一致させる必要があります。

Feature Storeを特徴量と同一Regionに配置する

これは特徴量のバッチ取得の際には大きな問題になりませんが、オンラインサービングをする場合には構成を工夫する必要があります。例えば日本だけで展開しているサービスを考えた場合、レイテンシを下げるためにasia-northeast1(東京リージョン)にVertex Feature Storeを配置したとします。こうした場合データソースとして指定できるのはasia-northeast1のみとなるため、特徴量取り込み用のデータの配置先が制限されることになります。

例えば「BigQueryのデータセットはUSリージョンのみを使っていた」というユーザーも多いかと思うので、上記のような場合には例外的に別なリージョンへデータセットを作るといった運用が必要となります。

特徴量の更新時間を示すタイムスタンプの形式はデータソースごとに異なる

これはドキュメントに記載されている内容ではありますが、データソースごとに以下のような制約があります。

BigQuery テーブルの場合、タイムスタンプは TIMESTAMP 列に入ります。 Avro の場合、タイムスタンプは long 型かつ、論理型が timestamp-micros でなければなりません。 CSV ファイルの場合、タイムスタンプは RFC 3339 形式にする必要があります。

例えばCSVファイルの場合は%Y-%m-%dT%H:%M:%SZといった形式でタイムスタンプを持たせることになります。様々なデータソースから特徴量の取り込みを行いたい場合はデータソースに合ったタイムスタンプへの変換処理が必要となります。

登録した特徴量と型の不整合があっても処理自体は正常終了するが取り込みはされない

当然と言えば当然ではありますが、取り込む特徴量の型は事前に登録した特徴量の型と合致させなければいけません。以下の図で正しく特徴量が取り込まれているジョブは一番下のもので、それ以外のジョブはFinishedとなっているものの実際には特徴量の取り込みに失敗しています。

失敗した取り込みジョブの例

初めは一度に取り込む特徴量数・ジョブ数の量が原因かと思って調査していましたが、結論はタイトルの通り特徴量の型の不整合が原因でした。例えば特徴量としてint64型を期待していても、pandasではnull値が存在しているとカラムの型はfloat64となってしまいます。

import pandas as pd
df = pd.DataFrame([[1,2],[3,None]])
df.dtypes
"""
0      int64
1    float64
dtype: object
"""

これと同様の現象が起こり、期待した型での取り込みが行われていない状態でした。 この事象の暫定対策として、特徴量の登録で使用したyamlファイルを元に特徴量の型を参照してキャストする方法を取りました。ここで保存した特徴量を取り込み対象として指定するイメージです。

import yaml

with open("path-to-yaml/feature_set.yaml", "r") as f:
    feature_set = yaml.safe_load(f)

# features は前処理済みの特徴量セット
# 既に null のレコードは drop した後である想定
for fs in feature_set:
    features[fs["feature_id"]] = features[fs["feature_id"]].astype(fs["value_type"])
features.to_csv("path-to-csv/features.csv", index=False)

上記の例ではVertex Feature Storeに登録した内容と辻褄を合わせるために型を変換していますが、本来はここで特徴量の型不一致としてアラートを投げるか否かの検証が入ります。今回はVertex Feature Storeの検証が目的なのでそういった監視は入れていませんが、今後の課題として検討していきたいと考えています。

特徴量の管理基盤を設計・運用してみてわかったこと

ここで、冒頭に述べた弊チームでのMLシステムに対する課題を再掲します。

私たちのチームでは上記の「案件の度に様々なコードやクエリについて車輪の再発明がされている」という課題に対する取り組みの一環で、特徴量の再発明を防止して適切に再利用できること・特徴量が適切にモニタリングすることを目的として、Vertex Feature Storeの導入を検討することとしました。

実際にVertex Feature Storeをシステムの一部に導入してみた結果、上記の課題感全てがクリアになるというわけではなく、同時に運用上の新たな課題が生まれてきました。以下ではその課題のいくつかを紹介します。

再利用できる特徴量を作成・更新することの難しさ

例えばあるMLモデルAで利用する、「直近30日間でサイトにアクセスした回数を示す特徴量」としてaccess_count_30daysを作成するとします。別なMLモデルBではより長いスパンでの行動傾向を見るために集計期間を60日間に広げてaccess_count_60daysを作成…となると、結局似たような特徴量が量産されることになります。

似たような特徴量が量産されている例

ここでは集計期間を変更した例を挙げていますが、集計定義を微修正した特徴量を作る場合は同様の問題が生じます。また似たような特徴量を作りすぎると、検索性が悪くなる上に更新の度に集計期間が重複している似たようなクエリを大量に実行することになるため、コストがかかりすぎてしまう可能性もあります。

こうした問題に対するアプローチは予め決めた複数のウィンドウサイズで集計し、そのウィンドウの組み合わせで特徴量を定義するという方法があります。特にオンラインで特徴量をサービングするような場合だと、リアルタイムに特徴量を集計して更新する仕組みが必要となります。具体的な実現方法について、マネージドFeature Storeを提供しているTecton.aiの以下の記事が参考になるかと思います。

towardsdatascience.com

バッチ実行のMLモデルに与える特徴量の鮮度

本記事で紹介した特徴量更新の仕組みを利用してFeature Storeに保存されている特徴量をデイリー更新するとします。更にデイリーバッチで実行される予測モデルがあった場合、予測モデルには予測する直前までの最新の特徴量を与えるかと思います。このような場合、特徴量の更新状態を確認した後にFeature Storeの特徴量を更新する必要があり、結果として予測バッチがシステムの外部に依存関係を持つことになります。

様々なパイプラインがFeature Storeに対して依存関係を持つ

特徴量の更新処理は並行で実行できないので、こうした予測バッチが増えてくるとFeature Storeの更新処理の実行タイミングがMLシステムのボトルネックとなってしまう可能性があります。 また、特徴量を共用する場合はどの予測バッチでその特徴量を更新するか検討するなど、各処理の依存関係が複雑になることが予想されます。

このようにFeature Storeの導入を検討すると、MLシステム単独で動いている際には問題にならなかったことが新たな問題として出てきます。 そのため、既存システムでできていたことを完全に再現するのではなく、特徴量の一元管理による制約が生じることを念頭に置かなければいけません。

システムがFeature Storeに対する依存関係を持つことは避けられないことなので、この依存性は許容しつつFeature Storeを利用する上で何を制約とするのかが設計の鍵となるでしょう。

まとめと今後の展望

ここまでの内容を見てきてわかるように、Feature Storeはまだ「とりあえず使えるものの、長期的に特徴量基盤として運用を続けていくには一朝一夕で使えるものではない」状況と言えます。近頃Feature Storeの話は色々な場所で耳にするものの、いざ導入してみるとML基盤の課題を解決する銀の弾丸というわけではないことを実感しました。

上述した課題はこれまでに名だたる会社が導入してきた先行事例があるため、これからはそれらを参考にしながら課題解決に向けて改善を進めていこうと思っています。また、今回はFeature StoreとしてマネージドサービスのVertex Feature Storeを利用していますが、並行してOSSのFeastをセルフホスティングする技術検証も進めています。Feature Storeとしての課題を解決するためにどちらが適しているか、今後判断していきたいと思います。

MLOps基盤構築をもっと高速に進めたいと思っているものの全く人手が足りていないので、ご興味のある方は以下のリンクからぜひご応募ください。一緒に最高のMLOps基盤を作りましょう!

hrmos.co hrmos.co

参考

  1. Vertex AI Feature Store の概要
  2. Cloud Scheduler でパイプライン実行をスケジュールする
  3. Vertex Feature Store で特徴量管理の MLOps はこう変わる
  4. Feast
  5. What is a Feature Store?
  6. Feature Store Milestones
  7. 【書き起こし】Vertex PipelinesとFeature Storeを活用した不正防止システム – Liu Songjie【Merpay Tech Fest 2021】
  8. How to implement CI/CD for your Vertex AI Machine Learning Pipeline
カテゴリー