はじめに
こんにちはZOZOデータサイエンス部MLOpsブロック松岡です。
本記事では先日リリースされたGCP(Google Cloud Platform)Cloud Composerの最新バージョンCloud Composer 2について紹介します。
ZOZOTOWNでは、多種多様な商品が毎日新たに出品されています。現在MLOpsブロックでは、機械学習で商品情報の登録を補佐するシステムを開発しています。
このシステムでは商品情報を保存するデータベースへ大量の書き込み処理が発生します。このアクセスによる負荷が日常業務に影響を及ぼすリスクを最小限に抑えるため、推論処理は夜間に行います。夜間に処理を完了させるには強力なマシンリソースを使用する必要があります。コストの観点から処理が行われていない時間はマシンリソースを使用停止する必要もあります。また、人手を介さずに安定して稼働出来る仕組みも求められます。
上記の要件を満たすためにワークフローエンジンを使用することになりました。
MLOpsブロックでは当初Vertex AI Pipelinesを検討しました。
しかし、類似アイテム検索機能にCloud Composerを採用していたことや、スケジューリング機能やリトライ処理が充実していることからCloud Composerについても検討することとしました。
類似アイテム検索ではCloud Composer 1を使用していたため、そのバージョンアップにおける技術調査を兼ねて、Cloud Composer 2における変更点について調査しました。併せてApache Airflow(以下Airflowと記述)2についても調査しています。
目次
Airflowはワークフローエンジン
Cloud ComposerはGCP上にてAirflowをマネージドに提供するサービスです。Cloud Composerについて説明する前にまずAirflowについて簡単に紹介します。
ワークフローエンジンを使うメリット
Airflowは様々な処理を行うワークフローをスケジュールして実行出来るワークフローエンジンです。例えばデータを推論するワークフローを次のような複数のタスクに分けて協調動作させることができます。
- 外部APIから必要なデータを取得するタスク
- 取得したデータに対して分類ごとに並列で推論処理を行うタスク
- 推論の結果を出力するタスク
複数のタスクが依存関係に基づく順序で実行され、各タスクを異なるワーカーインスタンスが処理し、エラー時にはリトライさせることが出来ます。これによりネットワークエラーのような不測の事態が発生した場合でも、人手を介さずにワークフローを復旧させることが出来ます。
Airflowの強み
ワークフローの定義にPythonを使用
AirflowはPythonにより実装されており、ワークフローの定義にもPythonを使用します。そのためXMLなどの設定ファイルでワークフローを記述する方式に比べてプログラマーにとって理解しやすく感じます。
多種多様なOperator
タスクはOperatorをインスタンス化することで定義します。Airflowには標準的なOperatorに加えGKEインスタンスを起動するOperatorなど多様なOperatorがすでに用意されていることも魅力です。適切なOperatorを使用することでタスクを簡単に記載できます。
スケジューリング機能を備えている
Airflow自身にスケジューリング機能を備えており、特定の時間や一定時間ごとにワークフローを自動実行することが出来ます。
ワークフローの流れ
ワークフローはタスクを組み合わせてDAG(有向非巡回グラフ)として定義します。実行可能となったタスクはスケジューラーによりワーカーと呼ばれる実行用のマシンインスタンスに割り当てられます。ワーカーがタスクを完了するとDAGに基づいて次のタスクが実行可能となります。実行中のタスクと実行可能なタスクが全て無くなればDAGが終了します。
Cloud Composer 2について
Cloud ComposerはAirflowをGCP上で実行するマネージドなサービスです。詳細なアーキテクチャはCloud Composerのバージョンによって異なりますが、どちらも大部分はGKE(Google Kubernetes Engine)上で動作しています。
Cloud Composer 2では更にGKEに寄ったアーキテクチャとなっています。例えばAirflow Web ServerはGAE(Google App Engine)からGKE上のDeployment上で動作するよう変更されています。
現在はCloud Composer 1とCloud Composer 2が提供されています。Cloud Composer 2になっての変更点はCloud Composer のバージョニングの概要に記載されていますが、ここではその中でも特に便利に感じた部分を紹介します。
柔軟なマシンスペックの指定
Cloud Composer 2では実行環境がより柔軟に指定出来るようになりました。
より細かいマシンスペックを指定可能
Cloud Composer 1のGKE環境はGKE Standard上に構築されていました。そのため、KubernetesのNodeを実行するvCPUやメモリーを予め決められたマシンタイプの中から選ぶ必要がありました。
Cloud Composer 2のGKE環境はGKE Autopilot上に構築されています。スケジューラー/ウェブサーバー/ワーカーのPodsで使うvCPU、メモリ、ストレージをそれぞれ個別に指定出来るようになりました。
Cloud Composer 1では環境構築後は変更不可であったワーカー、管理用Webサーバー、データベースのマシンスペックも後から変更出来るようになりました。このため環境構築時に将来のスケーリングについて正確に見積もる必要がなくなります。
また、ワークフローの性質に基づいて次のような運用も可能です。
- 安定稼働しているワークフローではウェブサーバーの性能を落とし、緊急対応時のみウェブサーバーの性能を上げる
- DAGは単純だが個別の処理が重たい場合、スケジューラーの性能を落としてワーカーの性能を上げる
ワーカーの水平スケールが可能
ワーカー数を負荷に応じて自動でスケールさせられるようになりました。特別な操作や設定は不要で、必要な性能に応じて性能を保ったまま低負荷時のコストを削減できます。
実際にCloud Composer 2の環境を用意して水平スケールを試してみます。Cloud Composer 2の水平スケールはCustom Metrics - Stackdriver Adapterを用いて得られる、未割り当てタスクと現在のワーカー状況を指標として使用します。
未割り当てタスク数とワーカーにより実行可能なタスク数が一致しなくなると次のようにスケーリングが実行されます。
- cluster-autoscalerとnode-auto-provisioningによりノード数とノードサイズをスケーリング
- HPA(HorizontalPodAutoscaler)によってPods数をスケーリング
今回はワーカーに0.5個のvCPU、1.875GBメモリ、1GBストレージを使用します。ワーカーの自動スケーリングは、ワーカーの最小数を1、最大数が3で試します。
Cloud Composer 2ではワーカー1vCPUあたりデフォルトで12個のタスクを同時に実行します。これではワーカーへの割当が0.5vCPUの場合でさえ、6タスクまで同時に実行可能となり、なかなかスケーリングが発生しません。
そこで1vCPUあたりの同時実行タスク数を減らしてスケーリングが起こりやすくします。この設定は「AIRFLOW構成のオーバーライド」タブから celery
の worker_concurrency
を書き換えることで変更ができます。
値に1
を設定することでそれぞれのワーカーは一度に1つのタスクしか処理しなくなり、ワーカーが枯渇しやすくなります。
実行するDAGは次のとおりです。タスクを12並列で実行するDAGを用意しました。スケーリングは時間がかかるためスケール前にDAGが完了しないよう各タスクで60秒待機します。
import logging import time from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago from airflow import models default_dag_args = {"start_date": days_ago(2)} from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator with models.DAG( "parallel_tasks", default_args=default_dag_args, schedule_interval=None, ) as dag: def task_method(i: int, **context): time.sleep(60) logging.info(f"Hello: {i} Task") start_task = DummyOperator(task_id="start", dag=dag) for i in range(0, 12): task = PythonOperator( task_id=f"hello{i}", python_callable=task_method, provide_context=True, dag=dag, op_kwargs={"i": i}, ) start_task >> task
DAGのGraphは次のとおりです。
上記のDAGを実行すると、最初はワーカーが1つしかないので1タスクずつ実行されます。
しばらくするとワーカーがスケーリングされ3つのタスクを同時実行するようになります。
kubectlの kubectl get -w pods --all-namespaces
を実行して airflow-worker
で始まるワーカーPodsが増えるのを確認できます。
実行前はワーカーのPodsは1つだけです。
未割り当てのタスクが増えるとともにPodsも増加します。
タスクが消化され未割り当てのタスクが減ると、次第に過剰となったワーカーPodsは破棄されます。
Cloud Composerで測定する環境を選びモニタリングでアクティブワーカー数を見て、ワーカーがオートスケールされたことを時系列で確認することも出来ます。
注意点として、ワーカーの立ち上がりと終了には時間がかかります。
今回の検証ではPodsが立ち上がって実際にタスクが振られるまでに3分程度かかりました。試しに上記のDAGの内容を書き換え、各タスクの待ち時間を60秒から10秒に短縮して試してみました。この場合オートスケールは行われますが、オートスケールされたワーカーへタスクが割り振られる前にDAG全体が終了してしまいました。これではオートスケールの恩恵を受けられず、料金だけ掛かってしまうことになります。オートスケールが効率よく働くようにするには同時実行するタスク数が短時間で増減しないようにDAGを組むと良さそうです。
Airflow 2での強化点
Cloud Composer 2へ移行する時に考慮すべき点はCloud Composer 2がAirflow 2しかサポートしていないことです。Cloud Composer1は当初Airflow 2系をサポートしていなかったので、Cloud Composer 2へ移行するにはAirflow 2への移行も必要となる場合が多いと思われます。
Airflow2への移行コストを理由にCloud Composer 2の移行を悩まれている人、移行コストを払ってでもAirflow2に移行したくなるAirflow2の改善点をいくつか紹介します。
スケジューラーの強化
Airflowのスケジューラは、DAGを解析して実行可能なタスクをワーカーに割り当てます。Airflow2になってからこのスケジューラが大幅に強化されています。
スケジューラーのパフォーマンス改善
Airflow1ではスケジューラーが一定時間ごとに未割り当ての実行可能なタスクを探しワーカーを割り当てており、タスクが終了後次のタスクにワーカーが割り当てられるまで一定の待ち時間が発生していました。タスクを分割するほどワーカーの割り当て待ち時間が増えるため、タスクの粒度をあまり細かくできませんでした。
Airflow2ではワーカーがタスクを終了時に後続の実行可能なタスクの存在を確認するようになりました。実行可能なタスクがあった場合mini schedulingでワーカーが自身をそのまま即時スケジューリングします。 これによりワーカーがスケジュールされるのを待つ必要がなくなり、ワーカーはすでにDAGを解析済みであることから再解析も不要となるため後続タスクを速やかに実行できます。
実際に、どれくらいスケジューラーのパフォーマンスが向上されているかを調べてみます。Cloud Composerのバージョンにより使用するマシン性能の指定方法が変わっているので可能な限り性能を合わせました。Cloud Composer 1ではノード数3、マシンタイプはn1-standard-1を指定します。Cloud Compsoer 2ではマシンスペックをスケジューラー、ウェブサーバー、ワーカーの性能をそれぞれで選べるのでn1-standardに合わせて1vCPU、メモリ3.75GBとします。ワーカーの最大数は1としてオートスケールによるメリットが発生しないようにしています。
実行するDAGは次のとおり、ログを出力するだけの簡単なタスクを10回繰り返します。
import logging from airflow import models from airflow.utils.dates import days_ago from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator default_dag_args = {"start_date": days_ago(2)} with models.DAG( "serial_tasks", default_args=default_dag_args, schedule_interval=None, ) as dag: def task_method(i: int, **context): logging.info(f"Hello: {i} Task") start_task = DummyOperator(task_id="start", dag=dag) latest_task = start_task for i in range(0, 10): task = PythonOperator( task_id=f"hello{i}", python_callable=task_method, provide_context=True, dag=dag, op_kwargs={"i": i}, ) latest_task >> task latest_task = task
測定は5回繰り返して、平均と最速、最遅のデータを取得しました。
環境 | 最速 | 平均 | 最遅 |
---|---|---|---|
Cloud Composer1.17.7 Airflow1.10.15 | 03:46 | 04:10 | 04:47 |
Cloud Composer 2.0.0 Airflow2.1.4 | 00:28 | 00:31 | 00:35 |
Cloud Composer 1とCloud Composer 2の処理時間の差(少ないほうが高速)
約8倍も高速にDAGを完了することが出来ました。今回の例では3分半程度の差ですが、タスクの数が増えるほどこの差は比例して広がっていくことになります。
例えばタスク数を20に増やすと7分ほどの差が付きました。
環境 | 最速 | 平均 | 最遅 |
---|---|---|---|
Cloud Composer1.17.7 Airflow1.10.15 | 07:39 | 08:08 | 08:29 |
Cloud Composer 2.0.0 Airflow2.1.4 | 00:56 | 00:59 | 01:08 |
スケジューラーがHA(High Availability)構成に対応
Airflow2ではスケジューラーを複数起動して、HA構成を取ることが出来るようになりました。各スケジューラーはデータベースのロック機能を使って作業を同期しているため、スケジューラ同士は直接通信せず独立して動作します。このためスケジューラーの1つが障害をおこしても、別スケジューラーへは影響なく、別スケジューラーにより作業を継続できます。これにより可用性を向上させられます。
実際にスケジューラーをクラッシュさせてHA構成が有効に働くか試してみます。10秒待つタスクを20回繰り返すワークフローを用意しました。
import logging import time from airflow.utils.dates import days_ago from airflow.operators.python_operator import PythonOperator from airflow import models default_dag_args = {"start_date": days_ago(2)} from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator with models.DAG( "serial_wait_tasks", default_args=default_dag_args, schedule_interval=None, ) as dag: def task_method(i: int, **context): time.sleep(10) logging.info(f"Hello: {i} Task") start_task = DummyOperator(task_id="start", dag=dag) latest_task = start_task for i in range(0, 20): task = PythonOperator( task_id=f"hello{i}", python_callable=task_method, provide_context=True, dag=dag, op_kwargs={"i": i}, ) latest_task >> task latest_task = task
※注意:この操作は最悪の場合、環境を壊す可能性があります。自己責任でお試しください。特に全てのスケジューラーを同時に落とさないように注意してください。
ワークロードの構成でスケジューラーの数に2を設定してスケジューラーのHA構成を有効にします。
airflow-scheduler podsが2つ起動するのを待ちます。
ワークフローを実行します。黄緑色の点がスケジューリングされて実行しているタスクです。
ワークフローを実行中にkubectl delete pod
でairflow-schedulerを1つ止めます。
スケジューラーが1つ止まった後も、別のスケジューラーによりタスクが正常にスケジューリングされます。
たまたま動作していなかったスケジューラーを止めてしまっただけではないことを確認するために、もう片方のスケジューラーも停止します。これで当初スケジューリングしていたスケジューラーはいなくなったことになります。
それでもタスクはスケジュールされ続けます。新たに起動したスケジューラーによりタスクが正常に動作し続けていることがわかります。
このように、HA構成を取ることでタスクスケジューラーの可用性が上がるのを確認できました。
先程はスケジューラーを明示的に停止しましたが、スケジューラーを停止しなくても複数のスケジューラーは常に動作しています。
スケジューラーのログを見るとDAGを実行時にスケジューラーを停止しなくても複数のスケジューラーがタスクをスケジューリングしていることがわかります。
このことからスケジューラーはActive-Active構成で動いていることがわかります。これはスケジューラーを複数建てることで可用性だけでなく性能を向上させることが出来ることを意味します。
更新されたAIRFLOW UI
AirflowはWebでワークフローの管理ができるAIRFLOW UIを備えています。Airflow2ではこのAIRFLOW UIが全面的に作り直されモダンな構造となりました。新しいAIRFLOW UIは単に見た目が良くなっただけでなく、よく使う機能がより探しやすくなっています。それでいながらAirflow1を使い慣れているユーザーが違和感ない操作性を実現できており、秀逸なデザインとなっています。
実際に見比べてみましょう。
DAGs一覧画面
Airflow1では、Linksに大量のアイコンが並んでいました。アイコンは直感的と言い難く、アイコンの説明もカーソルを乗せるまで表示されませんでした。
Airflow2では、アイコンが整理されよく使うタスクの実行/更新/削除のみがActionsとして表示されるようになりました。
その他の機能はLinks内に収められアイコンとラベル表示でわかりやすくなりました。直感的ではなかったアイコン表示も改められ初見でも機能を理解しやすくなっています。
また、管理画面のタイムゾーンを指定することが出来るようになったのも見逃せないポイントです。Airflow1ではAIRFLOW UIの時刻表示がUTCから変更できずJSTで動いている業務とのマッチングが手間でした。Airflow2はAIRFLOW UIで表示するタイムゾーンを指定出来るようになりJSTも指定可能になりました。
DAG詳細画面
DAGの詳細画面もモダンになっています。
やはりここでもDAGの実行/更新/削除のアイコンが分離して見やすくなりました。加えてAuto-refresh
機能が追加されDAG実行時にリロード不要でタスクの進行具合がリアルタイム更新されるようになりました。
シンプルなDAGの記法に対応
DAGを構築する上では、シンプルに記述出来るようになったのも嬉しいポイントです。特に Python Operator
の記載法は一般的なPythonで関数を記載するスタイルに近くなりわかりやすくなりました。
それでは、実際にDAGを作って見比べてみます。次のようにそれぞれのタスク間で依存関係があるDAGを作ります。
get_week
:曜日のCSVを作成する。parse_week_(0〜6)
:get_week
が作成したCSVから曜日ごとに文字列を作成する。print_week
:parse_week_(0〜6)
が作成した曜日の文字列を表示する。
Airflow1では次のように記載していました。
from airflow import models from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago default_dag_args = {"start_date": days_ago(2), "provide_context": True} with models.DAG( "parallel_tasks_dag1", default_args=default_dag_args, schedule_interval=None, ) as dag: def get_week(**kwargs): return "Sunday, Monday, Tuesday, Wednesday, Thursday, Friday, Saturday" def parse_week(**kwargs): ti = kwargs["task_instance"] week = ti.xcom_pull(task_ids="get_week_task") return week.split()[kwargs["index"]] def print_week(**kwargs): ti = kwargs["task_instance"] for i in range(7): day = ti.xcom_pull(task_ids=f"parse_week_{i}") print(day) get_week_task = PythonOperator( task_id="get_week_task", python_callable=get_week, provide_context=True, dag=dag, ) print_week_task = PythonOperator( task_id="print_week", python_callable=print_week, provide_context=True, dag=dag, ) for i in range(7): parse_week_task = PythonOperator( task_id=f"parse_week_{i}", python_callable=parse_week, provide_context=True, dag=dag, op_kwargs={"index": i}, ) get_week_task >> parse_week_task >> print_week_task
タスクを記述するには、Python Operator
のインスタンスを生成して行います。タスクに値を渡したり、他タスクの戻り値を受け取るには dict
を使う必要があります。このようにやや特殊で冗長な記法が必要でした。また、タスクの順序は>>
で明示する必要がありました。
同じタスクをAirflow2では次のように記述出来ます。
from airflow.decorators import dag, task from airflow.utils.dates import days_ago @dag(default_args={"owner": "airflow"}, schedule_interval=None, start_date=days_ago(2)) def parallel_tasks_dag2(): @task def get_week() -> str: return "Sunday, Monday, Tuesday, Wednesday, Thursday, Friday, Saturday" @task def parse_week(week, index): return week.split()[index] @task def print_week(day_list: dict): for day in day_list: print(day) week = get_week() day_list = [] for i in range(7): day_list.append(parse_week(week=week, index=i)) print_week(day_list=day_list) dags = parallel_tasks_dag2()
タスクと引数や戻り値をやり取りする際も、まるで普通にPythonの関数を呼ぶように記述できます。タスクの順序は明示する必要がなく、変数のやり取りを通じて自動的に解析されます。タスク間で依存するデータが見やすいこと、冗長で特殊な記法が減ったことでDAGが書きやすく読みやすくなりました。
RBAC UIに標準対応
Cloud Composer上ではIAM(Identity and Access Management)によるAIRFLOW UIまでのアクセス権限がおこなわれます。
Airflow2では加えてRBAC(Role Based Access Control) UI機能が追加されました。RBAC UIによりAIRFLOW UI上でのユーザーの権限を個別にロールとして付与できるようになりました。
たとえば Viewer
ロールのみが付与されたユーザーの場合、DAGの実行状況を見れるだけで、実行や停止は出来なくなります。
デフォルトで作成されたロールの設定
試しに Viewer
ロールを付与してみます。
Cloud Composer 2では新規アカウントに標準でOpロールが付与されるため Op
ロールを剥奪します。 ロールを剥奪するには gcloud
コマンドを使用します。
gcloud beta composer environments run ENVIRONMENT_NAME \ --location LOCATION \ users remove-role -- -e USER_EMAIL -r ROLE
大文字部分は次のように置き換えます。
- ENVIRONMENT_NAME:Cloud Composerの環境名
- LOCATION:Cloud Composerのロケーション
- USER_EMAIL:ユーザーのメールアドレス
- ROLE:剥奪するロール
今回はROLEに Op
を指定します。
ロールを付与するには剥奪時と同様に gcloud
コマンドを実行します。指定方法はロールの剥奪時と同じで remove-role
の代わりに add-role
とします。
gcloud beta composer environments run ENVIRONMENT_NAME \ --location LOCATION \ users add-role -- -e USER_EMAIL -r ROLE
ROLE
にデフォルトで用意されている Viewer
を指定します。
Viewer
ロールのみを設定したユーザーでAIRFLOW UIを開くと、DAGの実行状況は見えますがActionsはグレーアウトされ実行などは出来なくなっています。
カスタムロールの作成
Op
や Viewer
のようなデフォルトで設定されているロールだけでなく、より細かな制御を行えるカスタムロールの作成もできます。
次のDAGsから normal_tasks
のみが表示可能なカスタムロールを作ってみます。
カスタムロールを作るにはまず、gcloud
コマンドを使用して管理者ユーザーに Admin
ロールを付与します。
Admin権限が付与されたユーザーでAIRFLOW UIの Security
- List Roles
を開きます。
Permissions
に次の権限を追加します。
Viewer
ロールを付与したときと同じ要領で作成したロールを対象ユーザーへ付与します。
このロールが付与されたユーザーでAIRFLOW UIにアクセスすると normal_tasks
が表示はできるが実行は出来ないこと、他のタスクはその存在自体が見えなくなっていることがわかります。
RBAC UIはAIRFLOW UI上でのみ有効
RBAC UIによるロールの制御はAIRFLOW UI上でのみで有効であることに注意が必要です。
そのため上記ユーザーであってもIAMで roles/composer.environmentAndStorageObjectViewer
を保持していた場合。gcloud
コマンドを使って、次のコマンドを実行すると非表示にしたかったDAGsも見えてしまいます。
gcloud beta composer environments storage dags list --environment=ENVIRONMENT_NAME --location LOCATION
ユーザーを管理するにはユーザーに対して roles/composer.user
のみを設定するというようにIAMの権限設定も必要です。IAMでの権限設定については、IAM を使用したアクセス制御が参考になります。
最後に
本記事では新しくなったCloud Composer 2とAirflow2の特性を紹介しました。
Cloud Composer 2とAirflow2を組み合わせることで、可用性が高く低コストで高速なワークフロー環境を簡単に作ることが出来るようになりました。
ZOZOではこの他にもVertex AI Pipelinesなど複数の仕組みを採用してワークフローを実装しています。
ZOZOでは一緒にサービスを作り上げてくれる仲間を募集中です。ご興味のある方は、以下のリンクからぜひご応募ください!