データ処理パイプラインの Argo Workflows 移行を検討した話

AirflowからArgo Workflowsへ
AirflowからArgo Workflowsへ

freee の AI ラボというチームでエンジニアをしている id:nagomiso と⾔います。好きな飲み物はストロング系チューハイです。オススメはキリン・ザ・ストロングのコーラサワーと SAPPORO 99.99 のクリアレモンです。

さて, あまりイメージがないかも知れませんが実は freee の AI ラボでは機械学習やデータを活用したサービスの検討・開発だけではなく, 開発や運用を効率的に行うためのインフラ整備にも取り組んでいます。(取り組みの一部は 開発スピードを止めない機械学習インフラ基盤――freeeに学ぶAI開発で本質的価値を提供する方法 でも紹介しています)

こうしたインフラ整備の一環としてデータ処理パイプラインの Argo Workflows 移行を進めているので今回はその話をしようと思います。

動機

もともと AI ラボではデータ処理パイプライン用のワークフローエンジンとして Apache Airflow を採用していました。 Airflow は Python スクリプトでワークフロー(= DAG; Directed Acyclic Graph)を定義できるので定義の柔軟性・実装の容易さといった面では優れているのですが, 一方で運用していくうちに

  • DAG の定義が書かれたスクリプトを所定のディレクトリへ配置する以外にワークフローをデプロイする方法がなくモダンな CI/CD フローが構築しづらい(少なくともいまはできていない)
    • Twelve-Factor App で言うところの「単一のコードベースと複数のデプロイ」のような状態を実現しづらい
    • ワークフロー自体は各アプリケーションのリポジトリで分散管理したいがその場合 Airflow の環境とリポジトリの内容を同期する仕組みを整えるのが大変(そのためいまは複数アプリケーションのワークフローを単一のリポジトリで管理している)
  • Python スクリプトで DAG の定義が書けるがゆえにワークフローの定義にロジックが入り込みやすく関心が分離されていない状態に陥りやすい

という点が気になるようになりました。特に 1 点目は少人数で複数アプリケーションのリポジトリを管理している AI ラボの開発効率を向上させるため今のうちに改善しておきたいと考え, ワークフローエンジン移行検討に踏み切ったのでした。

技術選定

移行先のワークフローエンジンを選ぶためにまずは AI ラボの用途と課題感に合わせて要件を考えました。観点が少し散らかっていますがおおよそ以下のようなことができることを要件として技術選定しました。

  • Airflow の現運用と同じようなことができる
    • ワークフローの定義をリポジトリで管理できる
    • ワークフローの実行スケジューラが内蔵されている
    • リトライ機構が備わっている
    • Web UI でワークフローの実行状態を確認できる / 失敗時手動リトライができる
    • 単純なタスクのステップ実行だけではなくある程度の分岐や並列実行があるワークフローが定義できる
  • 単一のコードベースから複数の環境へ簡単にデプロイできる
    • ワークフローは環境(多くの場合は ステージング / 本番)に応じた差分以外の定義を共通化できる
    • デプロイ時はデプロイ先に環境に応じた値を簡単に切り替えられる
  • ワークフローの定義を各アプリケーションのリポジトリで分散管理しやすい
    • 複数のリポジトリで管理されているワークフローを簡単に動作環境へデプロイできる
  • その他
    • ワークフローの定義と各タスクにおけるロジックの関心が分離できる方が嬉しい(programable 過ぎないほうが良い)

特に 2 番目のデプロイと 3 番目の分散管理のことを考えて Cloud Native なワークフローエンジンがよさそうということで候補にあがったのが Argo WorkflowsTekton pipeline でした*1

Argo Workflows も Tekton pipeline もワークフローエンジンとしての基本機能やマニフェストの書き方は大差がなかったのですが

  • 導入に使うマニフェストの構造が比較的単純でチームの用途に合わせた設定のカスタマイズがやりやすそう
  • スケジューラや Web UI が標準で装備されている

という点を鑑みて最終的には Argo Workflows に絞って移行検討を行うことにしました *2

Airflow と Argo Workflows の比較

技術選定が終わったので次は Airflow 上で稼働していたデータ処理パイプラインのひとつを実際に Argo Workflows 化して利用感を比較してみました。ここでは「使用感が結構違うな」と思った部分に触れていこうと思います。

なお検証時点での Argo Workflows のバージョンは v3.1.8 です。今後は状況が変わるということも十分にありえるということを予めお断りしておきます。

日時操作

ワークフローエンジンを使っていると「トリガーされた時刻の前日」や「トリガーされた時刻の3時間前」などトリガーされた時刻基点での日時操作をしたくなることがあります。このような操作が必要になったときの使用感が Airflow と Argo Workflows では大きく異なりました。

Airflow の場合

トリガー時刻は {{ next_execution_date }} で取得できるかつその操作も組み込みのマクロ関数が用意されているので簡単に操作ができます。例えば「トリガーされた日付の 1 週間前」を取得したいときは以下のように書けば取得できます。簡単ですね *3

one_week_ago = "{{ macros.ds_add(next_execution_date, -7) }}"

Argo Workflows の場合

トリガー時刻自体は {{ workflow.creationTimestamp }} で取得できるのですがそれを操作する関数までは用意されていません。Workflow Variables を読むと一見 Sprig 関数が使えるように見えるのですが

# ❌ これは動かない
{{= workflow.creationTimestamp | date_modify '-168h' }}
# ❌ これも動かない
{{= sprig.dateModify('-168h', workflow.creationTimestamp) }}

のように書いても日付操作してくれません。

これは workflow.creationTimestampk8s.io/apimachinery/pkg/apis/meta/v1metav1.Time 型の値を返しているのに対して Spring 関数の sprig.dateModify() は Go 標準の time.Time 型の値を受け取ることを想定しているという実装のミスマッチが原因のようです*4

以下のように sprig.toDate()sprig.date() を駆使すれば一応操作はできるようになりますが記述が冗長です。

# ⭕ こうすると一応動く
{{= sprig.dateModify('-168h', sprig.toDate('2006-01-02T15:04:05Z07:00', sprig.date('2006-01-02T15:04:05Z07:00', workflow.creationTimestamp))) }}

上記以外の方法で解決する場合は「実行する Step/Task 側のロジック」で解決するか別途操作用のテンプレートを作る必要があります。

Argo Workflows は WorkflowTemplate を使うと共通のテンプレートを定義できるのでそれを使って次のようなテンプレートを作成してそれを使うことで対応しました。

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: utilities
spec:
  templates:
  - name: date-operator
    inputs:
      parameters:
      - name: baseDate
      - name: interval
      - name: outputLevel
        default: date
    script:
      image: ubuntu:20.04
      env:
      - name: TZ
        value: Asia/Tokyo
      command: [bash]
      source: |
        date \
          --date='{{ inputs.parameters.baseDate }} {{ inputs.parameters.interval }}' \
          --iso-8601='{{ inputs.parameters.outputLevel }}'

タスクの定義

ワークフローエンジンは実行の最小単位(ここではタスクと呼びます)を定義してその依存関係を記述します。このタスクの定義方法が Airflow と Argo Workflows ではかなり違います。

Airflow の場合

XXOperator のインスタンスを生成することでタスクを定義します。PythonOperator などを使用する際は Airflow 環境に全 PythonOperator で使用する依存関係をインストールする必要があるため各タスク間の依存関係が競合しやすくなります。

また他の Operator を使ったとしてもその処理内容を直接 DAG 定義に書き込むことになるのでワークフローとロジックの関心の分離が難しいです。

Argo Workflows の場合

template オブジェクトを定義することでタスク(あるいはステップ)を定義します。基本的にはあるコンテナイメージを参照して実行する形なので各タスク間で依存関係が競合することは殆どありえません。

また script を使わない限りはワークフロー定義内にロジックが入り込むことがないので関心の分離がしやすいです。一方でこの利点と表裏一体で各 steps/tasks 内で処理を直接記述する方法がなく, まずは spec 内でテンプレートを定義してからその実行時の順序を記述することになるので 1 回しか参照されないような タスクが多いワークフローの場合は定義がかなり冗長になります。ただし v3.2 系では inline template(steps/tasks 内で直接実行コマンドなどが指定できるテンプレート)が使えるようになる予定なので将来的には解消される内容かと思います。

ワークフローデプロイ方法

移行の動機ともなったワークフローのデプロイ方法です。Cloud Native な Argo Workflows と Airflow とではかなり使い勝手が異なります。

Airflow の場合

所定のディレクトリに DAG 定義が書かれた Python スクリプトを配置することでワークフローがデプロイされます。 GitOps ライクに Git リポジトリの中身を環境に反映させる方法を採用すると Airflow 環境を分けるかブランチを分けない限りステージングデプロイと本番デプロイを分離することが難しいです。まさに同一のコードベースから複数のデプロイを実現するのが難しい状況にあると思います。

Argo Workflows の場合

kubectl コマンドか argo コマンドでワークフローをデプロイします。コマンド実行時に Namespace や Context を切り替えれば同一のワークフロー定義を複数の環境にデプロイするのが Airflow と比較して容易に実現できます。

デプロイ環境毎に設定の差分がある場合は Kustomize や任意のテンプレートエンジンを使えば共通部分と最小限の差分を定義するだけで複数の環境にデプロイすることができます。AI ラボでは Kustomize を使って適宜値を切り替えることで宣言的に複数の環境へのデプロイができるようにしました。

また一つの実行環境に対して kubectl コマンドか argo コマンドを実行すればデプロイ出来るのでワークフロー定義を分散管理しやすいというメリットがあります。

結論

Airflow と Argo Workflows の比較の項で触れたように日付操作に若干のクセがありましたが

  • ワークフロー定義を分散管理しやすい
  • 単一のコードベースから複数の環境へデプロイしやすい

という利点が大きくAI ラボの用途においては Airflow を運用し続けるよりも Argo Workflows に移行した方が中長期的にメリットが大きいという判断になりました。

まとめ

検証の結果データ処理パイプラインに使用するワークフローエンジンを Argo Workflows に移行することに決めました。今後は既存のワークフロー定義を Argo Workflows に移行したり自動 CI/CD の仕組みを整備していこうと思います。

個人的な見解ですが Kubernetes を使うことによって得られるパイプラインのスケーラビリティや定義の分散管理可能性, デプロイの容易さを考えると今後は Tekton を含めた Cloud Native なワークフローエンジンをメインで使用するケースが増えてくるのではないでしょうか。Airflow にも Kubernetes Executor がありますが元々 Kubenetes での利用を想定した Argo Workflows や Tekton pipeline を使うほうが素直な運用になるでしょう。Airflow や Digdag といったワークフローエンジンを使っている方は用途にもよりますが一度移行を考えてみても良いかも知れません。


*1:Airflow とよく比較されるワークフローエンジンとして Digdag がありますがこちらはワークフローの定義とロジックの関心が分離できるものの Cloud Native なワークフローエンジンでなく, 分散管理と CI/CD パイプライン構築の容易さを考えて今回は選外としました。また Cloud Native なワークフローエンジンとして Kubeflow Pipelines という選択肢もありましたがこちらは ML 用の色合いが強く, ML 以外のデータ処理用途でも使用できる汎用ワークフローエンジンを求めていたのでこちらも選外にしています。

*2:Tekton はスケジューラは Triggers(の Cron), Web UI は Dashboard をそれぞれ別個に導入する必要があるので構築ハードルが少し高そうな印象を持ちました

*3:トリガーされた日付が欲しいと言っているのに next_execution_date?と思われる方もいるかと思いますがこれは Airflow のスケジューリングの考え方が特殊なことに由来しています。こちらの内容は https://kencharos.hatenablog.com/entry/2020/05/07/161558 が詳しいです。

*4:原因については我らが頼れるマネージャーのRoyさんが調べてくれました。感謝!