Kubeflow PipelinesからVertex Pipelinesへの移行による運用コスト削減

こんにちは、技術本部 データシステム部 MLOpsブロックの平田(@TrsNium)です。約2年半ぶりの執筆となる今回の記事では、MLOps向け基盤を「Kubeflow Pipelines」から「Vertex Pieplines」へ移行して運用コストを削減した取り組みを紹介します。

目次

はじめに

弊社ではML(Machine Learning)のモデル生成や特徴量生成にGKE(Google Kubernetes Engine)上でセルフホストしたKubeflow Pipelinesを使用していました。しかし、構築・運用コストが大きすぎるという課題感がありました。具体的にはKubeflowの依存するIstioKubernetes Applicationsのバージョンが古く、Kubernetesクラスタのバージョンアップデートをできなかったり、Kubeflowの内部ステートを保持しているMySQLが実際のステートと一致しない状況が発生していました。

詳しくは、中山(@Civitaspo)が過去の記事「KubeflowによるMLOps基盤構築から得られた知見と課題」で、構築や運用に関する課題感を紹介しているので、併せてご覧ください。 techblog.zozo.com

このような運用課題へアプローチしていたところ、Google I/O 2021Vertex AIの発表がありました。その後、Vertex AIのコンポーネントの1つであるVertex Pipelinesを調査し、Kubeflow Pipelinesの恩恵を享受しつつ運用コストを大幅に削減できる確信が得られたため、Kubeflow PipelinesからVertex Pipelinesへの移行を開始しました。

Vertex Pipelinesとは

Vertex Pipelinesは、GCPが提供しているKubeflow Pipelinesのフルマネージドサービスです。似たサービスにCloud AI Platform Pipelinesがありますが、明確に違いがあります。

Cloud AI Platform PipelinesではKubeflow PipelinesをGKEやCloud SQLをプロビジョニングして構築するのに対し、Vertex Pipelinesでは構築が不要です1。これにより、GKEやCloud SQLを管理する必要がなくなります。また、ワークフローが動いてない間の待機時間はCloud AI Platform PipelinesではGKEやCloud SQLの料金が必要なのに対し、Vertex Pipelinesではそれらの料金が発生しません。

つまり、構築や運用コストの面でKubeflow PipelinesやCloud AI Platform Pipelinesと比べ、Vertex Pipelinesには大きなアドバンテージがあります。

また、2つ目の違いは、Kubeflow PipelinesのSDK(kfp)のバージョンが異なる点です。Cloud AI Platform Pipelinesや、私たちがこれまで利用していたKubeflow PipelinesではSDKのバージョンがV1だったのに対し、Vertex PipelinesではV2です。なお、Kubeflow Pipelines 1.6以上のバージョンであればSDK V1はSDK V2と互換性がありますが、それ以外はありません。

Vertex Pipelinesへの移行

本章では、Kubeflow PipelinesからVertex Pipelinesへの移行の流れを説明します。

移行前に運用していたKubeflow Pipelinesのバージョンが1.2であり、SDK V2との互換性がないため、SDK V2でワークフローを記述し直す必要がありました。また、Kubeflow Pipelinesは、AWS(Amazon Web Services)やGCP、オンプレミス等で動作するようにKubernetesの様々な機能を駆使して設計されています。

一方、Vertex Pipelinesでは、それらの機能をGCPのサービスに置き換えているため、ワークフロー実行時の挙動が異なることがあります。提供されて間もないサービスなこともあり、Cloud Monitoringで取得可能なメトリクスが多くなく、ワークフローを外部から監視できる仕組みがありません。

これらの課題に対し、移行時にどのように解決していったのか、説明します。

Vertex Pipelinesへ移行するワークフロー

Vertex Pipelinesへ移行するワークフローは、WEARユーザーのコーディネート画像からアイテム特徴量を抽出し、Firestoreへそれを保存するような処理を行っています。対象のコーディネート画像が3000万件以上と膨大にあるため、日次の差分で処理をしています。

下図がワークフローの全体像です。

このワークフローでは、日次の差分データを取得するためにデータ基盤チームが管理するBigQueryからコーディネート情報を全件取得し、前日の全件取得との差分から新規コーディネート情報を一覧化しています。このコーディネート情報には、ユーザーの情報とコーディネート画像のURLが含まれています。

そして、コーディネート画像はAmazon S3に保存されていますが、データ基盤にはCDN経由のURLを格納しているため、S3へ直接取得するためのパス情報がありません。また、S3から直接画像を取得する料金と、CDN経由で画像をダウンロードする料金にさほど差がないため、CDN経由で画像を取得するようにしています。ただし、CDNに大量のリクエストを送ることになるので、DDoSと誤判定されないように固定の外部アドレスを使用しアクセスします。

上記の要件を満たすようにVertex Pipelinesへ移行した結果、ワークフローは以下の構成になりました。

なお、移行にあたり取り組んだ主な内容は以下の通りです。

  1. ワークフローのKubeflow Pipelines SDK V2への移行
  2. スケジュール実行されているワークフローへ前回実行分が終わるまでの待機処理を追加
  3. Vertex Pipelinesの監視

各取り組みの詳細を紹介します。

1. ワークフローのKubeflow Pipelines SDK V2への移行

Kubeflow Pipelines SDK V1からSDK V2への移行に際し、影響のある変更点として以下の点が挙げられます。

  • コンパイラのデータ型の制約が厳しくなった
  • ContainerOp APIが非推奨になった
  • Kubeflow PipelinesのPlaceholderを使用できなくなった

コンパイラのデータ型の制約が厳しくなった

Kubeflow Pipelinesでは、コンテナ化されたコマンドラインプログラムをコンポーネントとして記述できます。そして、そのコンポーネントはyamlで定義する方法の他に、Pythonで処理を定義しコンポーネントにできます。しかし、SDK V2では、Pythonで記述するコンポーネントの入出力に必ずデータ型を注釈する必要があるよう、仕様が変更されました。そこでサポートされる基本的なデータ型は str, int, float, bool, dict, list です。

他にもGCP関連のコンポーネントで使用される型や、大量のデータをアーティファクトとしてやりとりするための型が用意されています。今回移行するパイプラインでは、中間データはBigQueryに保存しておりアーティファクトは使用していないため、基本的なデータ型とGCP関連の型に関する修正を行いました。

from typing import NamedTuple

# SDK V2では動作しない
# NamedTupleに型を指定しても動かない
#   ref. https://github.com/kubeflow/pipelines/issues/5912#issuecomment-872112664
@component
def example(a: float, b: float) -> NamedTuple(
  'output',
  [
    ('sum', 'product'),
  ]):
  sum_ = a + b
  product_value = a * b

  from collections import namedtuple
  output = namedtuple('output', ['sum', 'product'])
  return output(sum_value, product_value)

# SDK V2で動作する
@component
def example(a: float, b: float) -> typing.Dict:
  sum_value = a + b
  product_value = a * b

  return dict['sum': sum_value, 'product': product_value]

基本的なデータ型は、コンパイラに型を正しく伝えるために全てのPython関数に対して型アノテーションをつけるように変更しました。また、GCP関連の型に関しては、以前str型で値を受け渡しできていたものができなくなり、専用のGCPProjectID型等を使用する必要があります。

しかし、GCPProjectID型等でデータの受け渡しをしてもコンパイラから型が間違っているとエラーが起きる状態になっています。この問題に関しては、メンテナーが改修したりドキュメントを整備しているようなので対応を待っている状態です2。なお、現状の回避策として、GCPProjectID型等で定義されている型をString等にコンポーネントのyamlを書き換え運用をしています。

また、SDK V2移行に伴い、ExitHandler APIが正しく動作しなくなりました。 ExitHandlerは、ExitHandler内に記述したタスクが終了したら終了ハンドラーを呼ぶオペレーターです3。これは、ExitHandlerを使用した際にWITH句内に記述しているタスクへ正しくパラメータが伝わっていないことが原因でした。このコンパイラ起因の問題は、Pull Requestで修正を加え、既にmergeされています。

ContainerOp APIが非推奨になった

SDK V2ではContainerOp APIが非推奨になり、代わりにコンポーネントを使用する必要があります。

ただし、Vertex PipelinesはVPCネイティブではないため、タスク毎に動的に外部アドレスが割り当てられます。これは、CDNへアクセスする際に固定の外部アドレスでアクセスしなければならない要件を満たすことができません。今回は、この問題を回避するためにGKE上でPodとしてタスクを実行するコンポーネントを作成しました。

下図がそのコンポーネントのイメージです。

上図のように、Vertex Pipelinesのワーカー内でGKEとの認証を通しPodを作成します。そして、Podが作成されたらPodが実行を正常または異常終了するまで待っています。

静的な外部アドレスをCloud NATにアタッチしたネットワーク環境下でGKEを構築することにより、外部へアクセスする際の外部アドレスを固定化できます。そして、そのGKEのPod上でタスクを実行することにより固定の外部アドレスでCDNへアクセスすることが可能となります。今回は既にセットアップされたGKEがあったためこの方法をとりましたが、Cloud RunのVPCコネクタを使用することで外部アドレスを固定しアクセスできます。

Kubeflow PipelinesのPlaceholderを使用できなくなった

Vertex PipelinesではKubeflow Pipelinesで使用できていたPlaceholderが使用不可能になりました。例えば、Placeholderには次のようなものがあります。

{{workflow.uid}}, {{workflow.name}}, {{workflow.status}}, {{workflow.creationTimestamp.Y}}, {{workflow.creationTimestamp.m}}, {{workflow.creationTimestamp.d}}

これらはワークフローの名前、終了ステータス、実行時間を取得するものです。しかし、このPlaceholderが使用できなくなったため、自前で代わりになるものを実装したり運用でカバーする必要が出てきました。

例えば、ワークフローの終了ステータスを取得するPlaceholderは、exit_handler内でSlackへ通知をする処理をしていました。しかし、ステータスの取得が不可能になったので、後述するCloud Scheduler + Cloud Functionsで代替機能を作りました。また、実行日時の取得もPythonのdatetimeモジュール等を使用して置き換えています。

2. スケジュール実行されているワークフローへ前回実行分が終わるまでの待機処理を追加

Kubeflow Pipelinesのスケジュールドワークフロー(Recurring Run)には、前回実行分が終わっていない場合に、後続のワークフローを待機させる機能がありました。

しかし、Vertex Pipelinesのスケジューリング機能はCloud Scheduler + Cloud Functionで構成されており、前回実行分を考慮せずに後続のワークフローをキックするようになっています。そこで、ワークフローのタスク内部から前回実行分のワークフローが終了しているかを確認し、終了していなければsleepして待つ実装をし、同等の機能を担保します。

def wait_previous_execution(pipeline_name: str, project: str, region: str):
    from google.cloud.aiplatform_v1.services.pipeline_service import PipelineServiceClient
    from google.api_core.client_options import ClientOptions
    from google.cloud.aiplatform_v1.types.pipeline_service import ListPipelineJobsRequest
    from datetime import datetime

    import time
    import pytz

    CURRENT_TIME = pytz.UTC.localize(datetime.utcnow())

    option = ClientOptions(api_endpoint=f"{region}-aiplatform.googleapis.com")
    client = PipelineServiceClient(client_options=option)


    REQUEST = ListPipelineJobsRequest(parent=f"projects/{project}/locations/{region}", filter='state="PIPELINE_STATE_RUNNING"')
    def _get_running_pipelines():
        result = client.list_pipeline_jobs(REQUEST)

        pipelines = [pipeline for pipeline in result if pipeline.pipeline_spec['pipelineInfo']['name']==pipeline_name]

        sorted_pipelines = sorted(
            pipelines, key=lambda pipeline: pipeline.create_time, reverse=True
        )
        # Ignore pipelines created after this one.
        filtered_pipelines = [
            pipeline
            for pipeline in sorted_pipelines
            if CURRENT_TIME > pipeline.create_time
        ]
        return filtered_pipelines

    running_pipelines = _get_running_pipelines()
    # Wait for the other pipelines to finish
    # The pipeline executing this function is also counted, so the condition is greater than 1
    while len(running_pipelines) > 1:
        time.sleep(120)
        running_pipelines = _get_running_pipelines()
    return None

3. Vertex Pipelinesの監視

私たちのチームでは普段からサービスの監視等にはCloud Monitoringを使用しています。しかし、Cloud Monitoringで利用できるVertex Pipelinesのメトリクスに有用なものが少ないため、監視の仕組みを内製しています。監視はCloud Scheduler + Cloud Functionsで行っており、Cloud Schedulerから定期的にCloud Functionsを叩き、アラートの閾値に達していないかの確認しています。以下が監視の仕組みのイメージです。

Cloud Functions内では以下のようなコードを使用し、Vertex PipelinesのAPIを叩き監視対象のパイプラインの成功可否と実行時間SLOが満たされているかをチェックします。

import crontab
from datetime import datetime, timedelta
import json
import requests
import os
import pytz
import math
import typing

"""
cron: "*/5 * * * *"
project: something-dev
pipelines:
- name: something
  slo_execution_time: 4h
  slo_time_format: '%Hh'
  region: asia-east1
environment: dev
slack_webhook: something
...省略
"""

SLACK_MESSAGE_FORMAT = """
{{
    "text": "{text}",
    "attachments": [
        {{
            "color": "{color}",
            "text": "{attachment_text}",
            "fields": {fields}
        }}
    ]
}}
"""

CRON = os.environ.get("cron")
ENV = os.environ.get("environment")
PROJECT = os.environ.get("project")
PIPELINES = os.environ.get("pipelines")
SLACK_WEBHOOK = os.environ.get("slack_webhook")

from google.cloud.aiplatform_v1.services.pipeline_service import PipelineServiceClient
from google.api_core.client_options import ClientOptions
from google.cloud.aiplatform_v1.types.pipeline_service import ListPipelineJobsRequest


class MonitorVertexPipelines:
    def __init__(
        self,
        project: str,
        monitor_schedule: crontab._crontab.CronTab,
        monitor_targets: dict,
    ):
        self.project = project
        self.monitor_schedule = monitor_schedule
        self.monitor_targets = monitor_targets

    def __n_times_previous_time(self, start, n):
        assert n > 0, "n must be greater than or equal to 1"

        def _previous(p, cnt):
            if cnt == 0:
                return p
            else:
                return _previous(
                    p
                    + timedelta(
                        seconds=math.floor(self.monitor_schedule.previous(now=p))
                    ),
                    cnt - 1,
                )

        return _previous(start, n)

    def __get_pipelines(self, region: str, name: str, filter: str):
        option = ClientOptions(api_endpoint=f"{region}-aiplatform.googleapis.com")
        client = PipelineServiceClient(client_options=option)

        request = ListPipelineJobsRequest(
            parent=f"projects/{self.project}/locations/{region}", filter=filter
        )
        pipelines = client.list_pipeline_jobs(request)
        return [
            pipeline
            for pipeline in pipelines
            if pipeline.pipeline_spec["pipelineInfo"]["name"] == name
        ]

    def finished_pipelines(self, current_time: datetime = datetime.utcnow()):
        previous_schedule_time = pytz.UTC.localize(
            self.__n_times_previous_time(current_time, 2)
        )

        result = []
        for target in self.monitor_targets:
            if not all(
                [
                    must_included_key in target.keys()
                    for must_included_key in ("region", "name")
                ]
            ):
                continue

            pipeline_name = target.get("name")
            pipelines = self.__get_pipelines(
                target.get("region"),
                pipeline_name,
                filter='state!="PIPELINE_STATE_RUNNING"',
            )
            pipelines = [
                pipeline for pipeline in pipelines if pipeline.end_time is not None
            ]

            # Ignore pipelines ended before previous monitoring time.
            pipelines = [
                pipeline
                for pipeline in pipelines
                if previous_schedule_time < pipeline.end_time
            ]
            result += pipelines
        return result

    def pipelines_not_satisfy_slo(
        self, current_time: datetime = datetime.utcnow()
    ):
        # NOTE default strptime value is 1900-01-01T00:00:00.000
        # ref. https://docs.python.org/3/library/datetime.html#technical-detail
        DEFAULT_STRPTIME = datetime.strptime("", "")
        previous_schedule_time = pytz.UTC.localize(
            self.__n_times_previous_time(current_time, 2)
        )
        current_schedule_time = pytz.UTC.localize(
            self.__n_times_previous_time(current_time, 1)
        )

        result = []
        for target in self.monitor_targets:
            if not all(
                [
                    must_included_key in target.keys()
                    for must_included_key in (
                        "region",
                        "name",
                        "slo_execution_time",
                        "slo_time_format",
                    )
                ]
            ):
                continue

            pipeline_name = target.get("name")
            slo_execution_time = target.get("slo_execution_time")
            slo_format = target.get("slo_time_format")
            slo = datetime.strptime(slo_execution_time, slo_format) - DEFAULT_STRPTIME
            pipelines = self.__get_pipelines(
                target.get("region"),
                pipeline_name,
                filter='state="PIPELINE_STATE_RUNNING"',
            )

            pipelines = [
                pipeline
                for pipeline in pipelines
                if previous_schedule_time < pipeline.create_time + slo
                and pipeline.create_time + slo < current_schedule_time
            ]
            result += pipelines
        return result


def monitor_vertex_pipelines(request):
    """Responds to any HTTP request.
    Args:
        request (flask.Request): HTTP request object.
    Returns:
        The response text or any set of values that can be turned into a
        Response object using
        `make_response <https://flask.palletsprojects.com/en/1.1.x/api/#flask.Flask.make_response>`.
    """

    now = datetime.utcnow()
    monitor_schedule = crontab.CronTab(CRON)
    monitor_targets = json.loads(PIPELINES)

    monitor = MonitorVertexPipelines(PROJECT, monitor_schedule, monitor_targets)
    finished_pipelines = monitor.finished_pipelines(now)
    pipelines_not_satisfy_slo = monitor.pipelines_not_satisfy_slo(now)

    for pipeline in finished_pipelines:
        notify_to_slack(
            ENV,
            pipeline,
            "monitor pipeline status",
            lambda pipeline: pipeline.state.name != "PIPELINE_STATE_SUCCEEDED",
        )

    for pipeline in pipelines_not_satisfy_slo:
        notify_to_slack(ENV, pipeline, "monitor pipeline slo", lambda _: True)

    return None


def notify_to_slack(
    env: str, pipeline: dict, attachment_text: str, danger_condition: typing.Callable
):
    display_name = pipeline.display_name
    pipeline_name = pipeline.pipeline_spec["pipelineInfo"]["name"]
    state = pipeline.state.name
    start_time = pipeline.start_time
    end_time = pipeline.end_time

    fields = [
        {"title": "Display Name", "value": display_name, "short": False},
        {"title": "Pipeline Name", "value": pipeline_name, "short": False},
        {"tile": "State", "value": state, "short": False},
        {"title": "Start Time", "value": str(start_time), "short": False},
        {"title": "End Time", "value": str(end_time), "short": False},
    ]

    if danger_condition(pipeline):
        text = "<!channel>" if env in ("stg", "prd") else ""
        color = "danger"
    else:
        text = ""
        color = "good"

    DATA = SLACK_MESSAGE_FORMAT.format(
        text=text,
        color=color,
        attachment_text=attachment_text,
        fields=json.dumps(fields),
    )
    requests.post(SLACK_WEBHOOK, data=DATA)
    return None

今後の展望

以上のように、Kubeflow PipelinesからVertex Pipelinesへの移行を実施してきました。現在は、よりVertex Pipelinesを快適に使えるよう、以下のことに取り組んでいます。

  • 各プロジェクトで使える便利共通コンポーネント集の作成
  • Vertex Pipelines用のテンプレートリポジトリの作成

各プロジェクトで使える便利共通コンポーネント集の作成

前述のGKE上でPodとしてjobを実行するコンポーネントであったり、GCPのSecret Managerから秘匿情報を取得するような便利コンポーネントをまとめたリポジトリです。このリポジトリをフェッチし、コンポーネントをロードするだけで、それらを利用できるような世界観を目指しています。リポジトリのCI/CD・テスト等の基本的な仕組みはできており、後はコンポーネントを追加するだけの状態まで到達しています。

Vertex Pipelines用のテンプレートリポジトリの作成

Vertex PipelinesのCI/CD、監視、スクリプト類がまとまったテンプレートリポジトリを用いて「開発の高速化/SREのキャッチアップコストの低下」を実現させるための取り組みです。前述の監視やSDK V2でパイプラインを記述する知見は社内に多くないので、先回りをし、より便利な環境を整えていくことで開発者/SREがストレスフリーにVertex Pipelinesを利用できる環境を目指しています。

まとめ

本記事ではKubeflow PipelinesからVertex Pipelinesへの移行により運用コストを削減させる取り組みを紹介をしました。Kubeflow PipelinesからVertex Pipelinesへ移行するコストは高いですが、Kubeflow Pipelinesをセルフホストした際の構築・運用コストからは解放されました。

現在、私たちのチームではバッチ処理の実行環境の整備以外にも、汎用的なML系サービスのサービング環境も構築中です。ZOZOでは一緒にサービスを作り上げてくれる仲間を募集中です。ご興味のある方は、以下のリンクからぜひご応募ください!

corp.zozo.com hrmos.co

参考


  1. GCPのコンソールなどから確認する方法はありませんが、gVisor上に構築されるようです

  2. 同様のIssue(convert string to GCSPath object or create one #4710)と、それに対するメンテナー動向(Update KFP samples to use types that are compatible in v2. #5801

  3. ExitHandlerの例: exit_handler/exit_handler.py

カテゴリー