Digdagのワークフローを動的に実行できる仕組みの導入

OGP

はじめに

こんにちは、MA基盤の@gachi-muchi-engineerです。

私達のチームでは、Digdagを利用してユーザーにメールを配信したり、データ連携を定期的に行うような様々なワークフローを運用しています。今回その中でも特定の対象者にポイントを付与したり、メールを配信するなどのビジネス要素が強いワークフローを、エンジニアでない運用者が運用していくなかで課題がいくつか出てきました。そこで、動的にワークフローを起動する仕組みを構築することで課題を解決したので、その方法について紹介します。

目次

Digdag

Digdagはワークフローエンジンと呼ばれるOSSのソフトウェアです。複数個のタスク間の依存関係からなるワークフローを定義し、そのワークフローの実行及び管理をします。ワークフローはdigという拡張子のファイルにワークフローのスケジュールやタスクの定義を記述します。詳しくは、公式サイトを確認してください。MA部では、digファイルをGitHubで管理し、GitHub Actionsを用いてリリースする形で運用しています。

背景

今回動的にワークフローを起動する仕組みを導入した背景には、特にビジネス要素が強いワークフローをエンジニアでない運用者が運用していくなかで、以下の課題があったからです。

  1. スケジュール設定の柔軟性
  2. パラメータ定義の柔軟性

それぞれについて詳しく説明します。

1. スケジュール設定の柔軟性

ビジネス要素が強いワークフローの場合、柔軟にスケジューリングを行いたいケースがあります。例えば、月や週毎に実行する時間や曜日を変更したり、曜日ごとに実行日時を変更したいなどのケースがあります。しかし、Digdagでは以下のようなスケジューリングしかできません。

Syntax Description Example
hourly>: MM:SS Run this job every hour at MM:SS hourly>: 30:00
daily>: HH:MM:SS Run this job every day at HH:MM:SS daily>: 07:00:00
weekly>: DDD,HH:MM:SS Run this job every week on DDD at HH:MM:SS weekly>: Sun,09:00:00
monthly>: D,HH:MM:SS Run this job every month on D at HH:MM:SS monthly>: 1,09:00:00
minutes_interval>: M Run this job every this number of minutes minutes_interval>: 30
cron>: CRON Use cron format for complex scheduling cron>: 42 4 1 * *

setting-up-a-schedule

また、このようなワークフローの運用者はエンジニアではありません。そのため、変更するためにdigファイルを修正しGitHubへPRを作成する作業のハードルが高く、結局運用者が実行したいタイミングでWeb UIから手動でワークフローを実行する状態になっていました。

2. パラメータ定義の柔軟性

Digdagのワークフローはcall!includeを利用して別のワークフローやタスクを呼び出すことができます。この機能を利用して、パラメータ化された共通のワークフローを用意していました。しかし、利用する際のパラメータの組み合わせの追加や変更することがエンジニア以外に実施できず、そのために運用コストがかかってしまうケースがありました。具体例として、対象者やタイトル、本文がパラメータ化されたメールを送るというワークフローがあったとして、それを利用するためにワークフローを増やさなければならない状態になっていました。

課題点のまとめ

これらの課題点をまとめると解決するべきことは以下の通りになります。

  • エンジニアではない運用者がスケジュール設定を簡単かつ柔軟に設定できるようにしたい
  • エンジニアではない運用者が実行パラメータの変更/追加をもっと簡単にできるようにしたい
  • 複数のワークフローで同じ課題を抱えている

解決策

解決策として、Digdagのワークフローを起動するワークフローを開発し、ワークフローを動的に実行できる仕組みを導入することを考えました。具体的には、以下のイメージ図のようにCMSから運用者がワークフローのスケジュールと設定を登録し、それを元に実行するワークフローです。この仕組みの導入によって運用者はdigファイルを編集することなくワークフローを動的に実行できるようになります。Dynamic Workflow Starterと名付けましたが、長いのでワークフロースターターとします。

仕組み

ここでは具体的な仕組みを紹介します。

CMSとDBについて

CMSは今回Google SheetsGoogle Apps Scriptを利用、DBはBigQueryを採用し、CMSと合わせて簡易的なCMSを作りました。

管理するデータについて

ワークフロースターターが実行するワークフローの設定、実行日時を管理し、その値を元にワークフローを実行します。DBで管理するデータは以下のようになります。

テーブル:dynamic_workflow_config

column type null note
uuid string x uuid
start_at timestamp x ワークフローの実行日時
project string x 実行したいワークフローのプロジェクト
workflow string x 実行したいワークフロー
parameters json x 実行するワークフローに渡すパラメータ
session_id string 実行したワークフローのsession_id
attempt_id string 実行したワークフローのattempt_id

parametersをjsonで持つことによってどんなパラメータでも対応できるようにしています。

シーケンス図

ここでは、シーケンス図を元にワークフロースターターの説明します。

1. select dynamic_workflow_config

まずは、シーケンス図の通りのクエリで実行するべきワークフローが存在するか確認します。条件に指定しているsession_id,attempt_idについては後述のシーケンスで説明します。

2. execute workflow by Digdag rest api

シーケンス1のクエリの結果で実行するべきワークフローがあればDidgagのAPI(PUT /api/attempts) を利用してワークフローを実行します。リクエストボディに指定するsessionTimeにはstart_atを指定しparamsにparametersを指定しています。sessionTimeにstart_atを指定するのは、2以降のシーケンスで失敗した際にリトライ時のワークフローの重複実行を防ぐためです。これは「sessionTimeはワークフローの履歴で一意になる」というDigdagの仕様を利用するためです。

該当のDigdagの仕様は以下のようになります。

まず、Digdagのワークフローとセッション、アテンプトの関係は下図のようになっています。

ワークフローにおいて、セッションは実行計画を表しており、アテンプトは実際の実行を表します。sessionTimeはセッションが実行される時間を表しており、ワークフロー単位で一意になります。

参考 * sessions-and-attempts * scheduled-execution-and-session-time

また、DigdagのAPI(PUT /api/attempts) は同じsessionTimeを指定した場合は、すでに実行されているアテンプトがレスポンスされます。今回この仕様を利用して2以降のシーケンスで失敗しても、次回実行時や自動リトライを行った際に同じsessionTimeを指定することによって重複実行が行われないようにしました。

3. response session_id attempt_id

シーケンス2で実行したsession_id,attempt_idを後続のBigQueryに保存するため保持します。

4. running workflow

DigdagのAPIを利用して実行されたワークフローは非同期で実行されています。

5. update dynamic_workflow_config with session_id and attempt_id

シーケンス3で取得したsession_id,attempt_idをBigQueryに保存します。ここでdynamic_workflow_configのsession_id,attempt_idに保存することによって次回の実行時にシーケンス1で実行済みと判断されるようにしています。

ワークフローでの工夫

ここでは、実際に開発したワークフローの一部抜粋から工夫した点を紹介します。

...
+start_workflows:
  +get_dynamic_workflow_config_list:
    _retry: 5
    ...
    py>: get_dynamic_workflow_config

  +loop:
    for_each>:
      workflow_config: ${workflow_config_list}
    _parallel: true
    _do:
      +execute_workflow_and_update_table:
        _retry: 5
        _export:
          start_at: ${workflow_config.start_at}
          project_name: ${workflow_config.project_name}
          workflow_name: ${workflow_config.workflow_name}
          parameters: ${workflow_config.parameters}
        py>: execute_workflow_and_update_table
...

+get_dynamic_workflow_config_list

このタスクはシーケンス図の1にあたるタスクになります。

+loop

このタスクをパラレルで実行することによりシーケンス図2以降が並列で動作するようにしました。理由は、もしシーケンシャルに実行した場合途中で失敗してしまうと後続のワークフローの実行に影響があるためです。

導入結果

もともとの課題だったスケジュール設定の柔軟性とパラメータ定義の柔軟性の課題を、ワークフロースターターを導入したことによって解決できました。これらの課題によって手動実行しないといけなかったり、パラメータ変更のためにdigファイルを変更しリリースするなどの作業がなくなり運用コストをさげることがきました。また、汎用的に利用できる仕組みにできたので、今後ワークフローを設計する時の実行方法の選択肢を増やすことができました。

今後の展望

運用管理ツール

CMSやDB部分に関して簡単に実装できる方法を選択しましたが、やはり今後運用が増えていくにあたってちゃんとした管理ツールの準備が必要だと感じております。管理ツールを用意しなかった理由としては、ワークフロースターターをスモールスタートさせたかったためです。実際に導入してから利用するケースが増えてきたので、管理ツール準備の検討を進めていきたいと思います。

スケジュール設定の分散

スケジュール設定が外部のDBとDigdagで分散してしまった点はデメリットになってしまったと感じました。今後、Digdagのスケジューリングされたワークフローの一覧と、ワークフロースターターで実行されるワークフローの一覧をあわせて確認できる仕組みを検討しています。

これは、そもそも現状のDigdagのUIで、スケジューリングされたワークフローの確認などができない課題も一緒に解決したい思いがあります。一覧で確認したい理由は、新しくワークフローを作成する際や依存するシステムを停止させる際に、いつ/どのワークフローが実行されるのかを知りたいときがあるためです。

リトライ方法

今までパラメータが原因でワークフローが失敗した場合、基本的にはdigファイルを修正してリトライする運用をしていました。ですが、今回導入したワークフロースターターで実行したものはAPIでリトライが必要になってしまいました。この課題に関しては、DigdagのCLIのretry commandで少しでも楽ができるように以下のようなPRを提案しております。

github.com

まとめ

今回MA部で導入した、動的にワークフローを実行する仕組みを紹介しました。この仕組みを導入したことによって課題だった点が解決され、汎用的に利用できるようにしたため同様の課題を抱えたワークフローに対しても課題を解決できました。また、ワークフローを起動する方法に選択肢を増やすことができたため、とても良い改善を行えたと思っています。

さいごに

ZOZOでは一緒にプロダクトを開発してくれるエンジニアを募集しています。ご興味のある方は下記リンクからぜひご応募ください!

hrmos.co

カテゴリー