Airflow の流れを制す

最近バックグラウンドのジョブスケジューラとして使用しているのが、 Apache Airflow だ。

https://airflow.apache.org/

Pythonで複数ジョブ(Operator)の依存関係をDAGとしてDSL的に書けるのは魅力的だが、 一方でスケジューラーとしては、UI付きのcronだと思っていたら相当なハマりどころを感じたので、 同じくハマりそうな人や将来の自分に向けてAirfronのスケジューリングの知見を残しておきたいというのが趣旨。

TL;DR

以下の通り、ちゃんと公式でも書いてあるが、正直なところ自分でやってみないとわかりづらく、 ちゃんと補足してある資料があったので紹介しておく。 自分で試しつつ、以下の資料と照らし合わせて自分の認識が一致したことを再確認した。

airflow.apache.org

towardsdatascience.com

Airflow の DAG は期間を処理対象にする

基本的にcron、あるいはJP1などのジョブスケジューラで処理をする時間を設定した場合、 その時刻になったらタイマーで何らかの処理が動くと想定すると思う。

Airflowの一応、DAGで @daily やあるいは cronで記述した時間になったら処理が動くのだが、 それはその時間で閉じたもっとも直近の時間帯に対してDAGを起動するといった感じで動く。

以下のような 2分間間隔で動くようなDAGを作って実際に試すのがわかりやすい。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    "owner": "airflow",
    "start_date": datetime(2020, 5, 6),
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=5),
    "depends_on_past": True,
    "wait_for_downstream": True,
}

dag = DAG("tutorial3", default_args=default_args, schedule_interval="*/2 * * * *")

t1 = BashOperator(task_id="print_date", bash_command="date", dag=dag)

t2 = BashOperator(task_id="test1", bash_command="exit 0", dag=dag)

templated_command = """
    {% for i in range(1) %}
        echo "ts={{ ts_nodash_with_tz }}"
        echo "exe_date={{ execution_date }}"
        echo "{{ dag.get_last_dagrun() }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id="templated",
    bash_command=templated_command,
    params={"my_param": "Parameter I passed in"},
    dag=dag,
)

t4 = BashOperator(task_id="test2", bash_command="exit 0", dag=dag)

t1 >> t2 >> t3 >> t4

これで何度かDAGを実行し、ちょうど 9:40分付近での実行状態のスクリーンショットを撮ったものが次のものだ。

f:id:kencharos:20200507094204p:plain

9:40時点で実行中のDAGがあり、これはcronに従って9:40に起動されたものとわかるが、 execution date には 9:38 と記載されていて、まるで一つ前の時間であるように見える。

別のUIで確認すると(スクショを撮るタイミングが遅れて終了してまったが)、 execution date が9:38 に対し、 staredが9:40:05 と実態に即した時間となっている。

f:id:kencharos:20200507094410p:plain

Airflowを始めて、最初にハマるのがこの execution_date が実際の一つ前の時間になっている問題だと思う。

実際の処理開始時刻は exectution_date の時間に schedule_interval (この場合 cronの2分) を足したものだという風に思えばいいし、実際そういう風に書いてある資料も多いのだけど、なぜそうなるのだろうか。

これは、Airflowが期間を意識していることと、UIの表現の悪さが関係してくる。

Airflowのスケジューラは DAGに定義した start_date から現在時刻までの間に 同じく DAGに定義された schedule_interval の従って、期間(period)を作っていく。
期間の開始となる時刻が execution_date であり、 終了となる時刻は execution_date + schedule_interval となる。
execution_date は DAGがいつ実行されたり、リトライされたりしても絶対に変わらない値となる。

Airflow はこの期間一つに対して、DAGのインスタンス一つを割り当てる。 そしてその割り当てるタイミングは、現在時刻に対して期間が閉じている、
つまり execution_date + schedule_interval < 現在時刻 となるものだ。

図にすると次のようになる。

f:id:kencharos:20200507102510p:plain

つまり、前述の execution_date が現在時刻の2分前となるというのは、 現在時刻(9:40:xx)から見て直近の閉じた期間のDAG(9:38-9:40)の開始時刻のみが UI に出ているためだ。
(正直なところ、UIには期間と実際の開始時刻をちゃんと出してもらえれば、そこまでの混乱はない気がする。)

期間に対して処理をするというAirflowのポリシー上、閉じた期間に対してしか処理できないというのは理にかなっているが、 単純なcronを期待して Airflowを導入すると少々痛い目に合うだろう。

Catchup

start_data から現在時刻までの閉じた期間一つに対してDAGインスタンスを割り当てるのが Airflow の基本である。 そのため、 以下のように 複数のDAGが未実行のまま残るという状態がありえる

  • DAGを初めて作って stard_date から現在時刻の間に、複数期間の未実行のDAGがある
  • pause というDAGの実行を止める仕組みを利用して、そのあと pause を解除した場合

この時、 DAG の Catchup の設定によって、過去分のDAGの実行の仕方が異なる。

  • catchup = True の場合、 未実行のすべてのDAG を逐次実行する
    • 上の図の例でいえば、 execution_dateが9:30から9:38の4つのDAGが対象になる
  • catchup = False の場合、未実行のうち、最新の DAG のみを実行する
    • 上の図の例でいえば、 execution_dateが9:38のDAGのみが対象になる

ちなみに Catchupのデフォルトは True で、これはDAGごとの設定か Airflowの設定ファイルの catch_up_default の設定で変更できる。 なので、基本的にはAirflow は未実行のDAGをすべて実行しきるつもりでいる。

これは、AIrflowがETLツールとしての性質を持っているからだろうと思われる。

DAGのコード中に、execution_date を context や マクロとして取得することが可能となっている。 上で示したコードの "ts_nodash_with_tz" や "execution_date" などだ。

templated_command = """
    {% for i in range(1) %}
        echo "ts={{ ts_nodash_with_tz }}"
        echo "exe_date={{ execution_date }}"
        echo "{{ dag.get_last_dagrun() }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id="templated",
    bash_command=templated_command,
    params={"my_param": "Parameter I passed in"},
    dag=dag,
)

execution_date は常に不変であることは先ほど述べたとおりなので、 DAGごとに期間に対しての取り込みデータの取得、つまり execution_date から execution_date + schedule_interval のデータを取得して処理するようにしていれば(取り込みファイルのタイムスタンプによるフィルタやDBのWhere条件の期間などに使う)、 何らかの理由で止まっていたスケジューラが再開されて、複数DAGが一気に動き出しても問題は起きないだろう。
(ただし、なるべくDAGの処理はべき等に作るべきだろう)

一方で、常に最新の情報だけ取り込めればよいというのであれば、catchup=False にして最新のDAGだけを動かすようにすればよい。
cron の代わりに使うのであれば、こちらのケースの方が多いのではないだろうか。

catcup=False にまつわるやらかしで多いのが、 pause 解除と同時にDAGが動き出す、というものだ。

0時を起点に3時間ごとに動くDAG(catchup=False)があり、 何らかの理由でAM 0:00からAM7:00 の間のジョブの実行を取りやめたい、 つまりAM9:00からDAGを実行したい。

そこで、pause 機能を利用して、 AM0:00 前くらいに 対象DAGのpause を実行する。
そして、AM7:00 に停止の時間帯が終わったからといって、pause を解除すると何が起きるか?

答えは、期間 3:00-6:00 のDAGがその場で実行される、である。 pauseしている間に未実行となっていたDAG(0:00-3:00, 3:00-6:00) のうち、最新のDAGがpause解除と同時に動き出した訳である。

普通のcronのつもりで、次回の実行タイミングから実行して欲しいと思って AM6:00を過ぎた時点で pauseを解除すればOKだろうと思ってたら面食らう挙動だが、Airflowのスケジュールの実行の仕組みとしては、全く正しい(辛い)。

つまり、AM 9:00からDAGを実行したいのであれば、 AM9:00を過ぎた時点でpauseを解除する必要がある(とても辛い)。

現在の時点で、pause解除の次回のタイミングからDAGを実行するという仕組みは Airflowにはなく、 運用でカバーするか、何らかの拡張を用意する必要があり、個人的には カスタム Operator を作ったらいいのではと思っている。

Airflow 1.10.10 から提供された LatestOnlyOperator は、複数DAGが現在実行中の場合に最新以外のDAGの下流タスクをすべてスキップする Operator だ。

airflow.apache.org

これを参考に、停止期間中であれば下流タスクをスキップするような仕組みが作れないかと思っている。

DAG間のタスク制御

Airflow はリトライの仕組みもよくできていて、DAGレベル・タスクレベルで、失敗タスクのリトライや先送り、強制キャンセル、強制成功などがUIやCLIでできる。

それはよいのだが、一方で前回のDAGのステータスによらず、後続のDAGインスタンスは実行され続ける。 つまり、DAGが失敗していたり、遅延で次回DAGの実行時間に到達してもまだ実行中などの場合に、次のDAGの実行を自動的に止める方法はない。

一方で(なぜか)タスク単位では抑止の手段がある。

  • depends_on_past = True -> 前回DAGの同じタスクが成功している場合のみ、今回DAGのタスクを実行する
  • wait_for_downstream = True -> 前回DAGの同じタスクの直系の子供のDAGが成功している場合のみ、今回DAGのタスクを実行する

そのため、これらのフラグを使えば、ある程度のDAGの実行制御は可能ではある。 しかし、これでDAGの実行が止まるのは、前回DAGの最初のタスクが失敗した場合のみくらいで、DAGの途中でタスクが失敗した場合はそこまでは実行されてしまう。

前述のDAGのサンプルで、最後のタスクを無理やり失敗させ、wait_for_downstream = True を設定して、しばらくスケジュールを回したものがこちらだ。

t4 = BashOperator(task_id="test2", bash_command="exit 2", dag=dag)

f:id:kencharos:20200507150652p:plain

DAGの最後のタスクが失敗しても、次のDAGは投入され続け、wait_for_downstream = True により、DAGごとに停止するタスクが一つずつ上に上がるようになっていく。
UIから失敗タスクのステータス変更などを行えば、停止しているDAGはまた流れ出す。

現状、DAGの開始を最初から確実に止めたい場合は、 試してはいないが ExternalTaskSensor などが使えそうではある。

python - Is it possible for Airflow scheduler to first finish the previous day's cycle before starting the next? - Stack Overflow

manually trigger

最後に、manually trigger の紹介をしておく。 Airflowでは UI あるいはCLIで DAGをその場ですぐに実行できる仕組みがある。

また、 schedule_interval 設定なしのDAGであれば、未来日に対するトリガーの指定も設定により、可能となっている。 ( https://airflow.apache.org/docs/stable/configurations-ref.html#allow-trigger-in-future )

この機能は、schedule_interval を設定しないDAGで使うものだろうが、 一応 schedule_interval を設定してあるDAGでも実行は可能で、DAGのスケジューリングとは独立して実行される。 (つまり、DAGのスケジューリングを先行して動かすというものではない)

ただし、UI上は統合されて見えるため、不思議な見え方をすることがある。

例として、2分間隔で動くDAGの合間に手動実行を行ってみる。

execution_date 15:34 のDAGが完了している状態で、15:37分に UIから手動実行を行う。

f:id:kencharos:20200507154701p:plain

手動実行のDAGは run_id に manual がつく、external_triggerがtrueになる他、execution_date が 現在時刻となる。

f:id:kencharos:20200507154902p:plain

そして、スケジューラが次のDAGを起動した瞬間、おかしなことが起こる。

f:id:kencharos:20200507155008p:plain

execution_date の設定がスケジューラでは閉じた期間の開始時刻、手動では現在時刻となるため、UI上では execution_date の昇順で並ぶので、順序が実際のDAGの投入順とはずれてしまう。

これは、他のUIでも起きてしまう。
こちらのUIでも手動実行したDAGの前に、後続のスケジューリングのDAGが並んでいる。

f:id:kencharos:20200507155255p:plain

AirflowのUIは実際の処理時間ではなく、DAGの期間順に並ぶので間違ってはいないのかもしれないが、それでも混乱する。 (DAGの実際の開始時間の表示と、開始時間のソートオプションが欲しい。Databaseのフィールドとしては存在している)

基本的には手動実行は、専用のDAGを実行してそちらで実行した方が良いだろう。

まとめ

  • Airflow は期間に対して動作する
  • UIが貧弱でAIrflowのコンテキストに合わせた読み替えが現在は必須
  • なるべくDAGは大きめに作りたくさんタスクを入れた方が、一定間隔で動かす方が間違いは起きづらい気がしている
  • スケジュールのDAGは線で、手動のDAGは点
  • 手動DAG は独立して用意する
  • DAGの抑止機能は作りこみが必要

問題ばっかりあげたが、DSLでかけるDAGや豊富なOperator, Sensor 類は魅力ではある。 競合となる Luigi や DigDag なども気にはなるが、乗り換えには勇気がいる。

これを乗り越えた先に幸せが待っているかは、一年後の自分が知るだろう