DOORS DX

ベストなDXへの入り口が
見つかるメディア

Airflow のログ周りは取扱注意!

公開日
2022.12.20
更新日
2024.03.11

データ活用をシステム面から支援するデータエンジニアリング本部(DE本部)の社員によるエンジニアブログです。今回は、筆者がワークフロー管理ツール「Airflow(Apache Airflow)」を使う中で遭遇したログ周りの課題やその解決策などを幅広めにご紹介します!

こんにちは、DE本部 アナリティクスアプリケーション開発部の家入です。
好きなアーティストは iri です、名前が似ているからではありません。
仕事では機械学習システムのバックエンドやデータパイプラインを担当しています。

私たちのチームでは、プロジェクトの規模にもよりますが、ワークフローツールとして Airflow を採用しています。今回は、プロジェクトで Airflow を運用していて遭遇した、ログ周りの課題とワークアラウンドについて話します。

Airflow とは

Airflow 自体は広く使われるようになっており、ご存知の方も多いかと思いますので、ここでは簡単に概要だけ説明します。

Airflow は Python 製のワークフローツールで、何をいつ、どの順で実行するかを柔軟に設定することができます。

DAG(Directed Acyclig Graph)に、実行したい一連のタスクの依存関係や実行設定を定義します。
タスクには実際の処理内容を定義します。
あとは Airflow Scheduler が設定に従って、DAG を実行してくれます。

ところで

今回紹介する話は、パブリッククラウドで Kubernetes を利用して Airflow を構築したときの話です。
マネージドサービスの場合は一部後述していますが、同様のことが起きるかは要検証です。

用語

  • Composer: Cloud Composer
  • GCS: Google Cloud Storage
  • MWAA: Amazon Managed Workflow Apache Airflow

タスクのログを取得したくってカスタムハンドラを作ったが不採用にした

タスク状況を把握し、状況によってアラートを出すために、Airflow のログを Azure の Log Analytics で収集する必要がありました。
通常、Airflow のログはウェブサーバーの画面や、スケジューラーの標準出力で確認できます。
しかし、個別のワーカーコンテナではログが確認できず、Log Analytics でも取得できませんでした。
これはおそらく、Airflow がデフォルトで使う FileTaskHandler が FileHandler を継承しているためと思われます。

airflow/airflow/utils/log/file_task_handler.py at main · apache/airflow · GitHub

そこで、この FileTaskHandler からさらに、標準出力にもログを出力するようなカスタムハンドラを作成して利用にしたところ、Log Analytics でもログ取得できるようになりました。
しかしこれにより、原因は不明ですが、Airflow がワーカーのログを取得するのに、ランダムで失敗するようになりました。
Log Analytics によるログ取得を優先して調査したところ、動作確認で使っていた BashOperator や Task Decorator と、実際に使う KubernetesPodOperator では挙動が異なることが分かりました。
KubernetesPodOperator であれば FileTaskHandler を使っても、ログ取得ができたのです。
結局、上記のカスタムハンドラを不採用にしました。

なお、パブリッククラウドにログを出力するというだけであれば、下記の手法があります。

Writing logs — apache-airflow-providers Documentation

特に AWS には CloudwatchTaskHandler、GCP には StackdriverTaskHandler があるため、比較的簡単に実装・運用できそうですね。

Composer や MWAA では組み込みのようです。

今回は Log Analytics を使いたいために紆余曲折してしまいました。
もし Airflow のログの詳細を知りたい場合は、以下を参照ください。

ログでディスクスペースが枯渇した

そんなこんなでログと格闘していたのですが、ある日 DAG が全く動かなくなりました。
エラー内容は下記の通りで、ログのためにディスクスペースが枯渇していました。

OSError: [Errno 28] No space left on device: ‘/opt/airflow/logs/scheduler/2021-12-17

Airflow は、ログをタスクごとにファイルに残します。
タスクが起動するとまず、ログファイルを作成しようとしますが、 このファイルに書き込めないと DAG がエラーとなります。
ログファイルは日々蓄積されますが、Airflow には過去のログを削除する機能がありません。
大きなディスクを用意しても、長く運用するとこの問題に当たります。

古い記事ですが、他社様でも同様の事象に遭遇しているようです。

Apache Airflow でタスクスケジューリングしてみた ~ログを退避させる~ – GiXo Ltd.

また、この件に関して、イシューが作成されていますが、対応しないとしてクローズされています。

Log rotation for scheduler logs · Issue #8759 · apache/airflow · GitHub

対策として、定期的にログを削除するようにしました。
ログを削除するためのメンテナンスDAGのサンプルがあり、これを参考に実装しました。

airflow-maintenance-dags/log-cleanup/airflow-log-cleanup.py at master · teamclairvoyant/airflow-maintenance-dags · GitHub

また、ログファイルは前述の通り、クラウドストレージに保存するように設定できます。
ただ、私が検証した範囲では、この設定をした時でも、ローカルにもログを保存していたため、根本的解決ではなさそうです。

https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/logging-tasks.html

幸い Composer ではログを GCS に保存するようにしており、ディスク容量の心配をする必要はないそうです。

Cloud Composer environment architecture  |  Google Cloud

MWAA では、下記のようなアーキテクチャで、S3 や CloudWatchと 連携しているようです。
2022年11月11日時点では、logs ディレクトリを S3 にマウントしている、といった明確な記述は公式ドキュメントに見つからなかったものの、社内で確認した限り、ログをローカルに残さないようでした。


メタデータ DB のログを削除しすぎて DAG が再実行された

こうして、ログの蓄積が問題となったので、古いログファイルを削除しました。
また、メタデータ DB にも DAG やタスクに関するデータが格納されています。

Understanding the Airflow metadata database | Astronomer Documentation

そのため、メタデータ DB も肥大化の恐れがあるため、一定期間より前のログを削除したところ、すでに実行済みの DAG を再実行してしまうことになりました。
この原因は、一部の DAG は実行間隔が長く、最新の実行ログも削除されてしまったためで、考えてみれば当たり前かもしれません。
しかし、catchup = False で、次の実行日時に至っていない DAG でも再実行されていたのは想定外で、挙動を調査することにしました。

まず、次の実行日時に至っていない DAG の実行についてです。
例えば DAG の start_date を2022年1月1日、実行スケジュールを毎日00:00時 ( 0 0 * * * ) に、今が2022年7月1日13:00時としましょう。
この時、次の実行予定時刻は7月2日00:00時ですが、DAGの実行ログがないと、すぐにDAGを再実行してしまいます。
この原因は、Airflow は「DAG の実行タイミング」をスケジュールするのではなく、「data interval と呼ばれる DAG で扱うデータの収集間隔」をスケジュールするという考え方に基づいているためです。

https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#data-interval

A DAG run is usually scheduled after its associated data interval has ended, to ensure the run is able to collect all the data within the time period. In other words, a run covering the data period of 2020-01-01 generally does not start to run until 2020-01-01 has ended, i.e. after 2020-01-02 00:00:00.

このため、7月1日までの全ての実行ログが削除されていると、6月30日から1日の data interval が過ぎているため、対応する DAG が実行されてしまうのです。
具体的な解説はこちらも参照ください。

Airflowのstart_dateとexecution_date (logical date)を把握する|Dentsu Digital Tech Blog

この問題があるため、前述のメンテナンス DAG では最新の実行ログを残す設計になっているようです。

airflow-maintenance-dags/db-cleanup/airflow-db-cleanup.py at master · teamclairvoyant/airflow-maintenance-dags · GitHub

そして、バージョン2.3でファーストクラスサポートとなった db clean コマンドでは、最新の実行ログを残すような実装になっています。

メタデータDBのログを削除することは、公式にもベストプラクティスとして記載されています。

Best Practices — Airflow Documentation

さて、自前で実装したメタデータ DB を整理する DAG は、前述のメンテナンス DAG を参考にしつつも、ナイーブに一定期間より前のデータを消すようにしていました。
結局、メンテナンス DAG にならって、最新の DAG の実行履歴は消さないように修正しました。

Composer や MWAA でもメタデータ DB を定期的にメンテナンスすることが推奨されています。
ただ、この記述を見る限り、マネージドとして実行してくれるわけではなく、ユーザー側が DAG を用意しないといけないようです。

終わりに: 使ってみたい Airflow の機能

Airflow を運用していて遭遇した課題は、上記の通りです。
ログの肥大化は、開発時にはなかなか遭遇しないため、見逃しがちなのではないでしょうか。
加えて、ベストプラクティスをあらためて確認する重要性を認識しました。

さて、Airflow はユーザーの増加もあって、いろいろと機能が追加されています。
注目している機能の1つが、Dynamic な DAG や Task の定義、実行です。

通常 DAG や Task は静的なのですが、テナントや取得されたデータに応じて、動的に DAG やタスクを増減させたいケースがあります。
そうした時に、これらの機能を活用できれば、類似したコードの重複を避けられて、開発しやすくなると期待できます。

もう一つの機能が Deferrable Operator です。
通常、1つのワーカーには1つのタスクが割り当てられるのですが、タスクによっては、ネットワーク待ちなどで CPU がアイドル状態になっている場合があります。
利用可能な条件があるものの、Deferrable Operator を使えば、ワーカー内でうまくリソースを各タスク処理に割り当ててくれるようです。
リソースを無駄にすることなく処理ができればコスト節約にもなるので、期待したいですね。

https://airflow.apache.org/docs/apache-airflow/stable/concepts/deferring.html

現場からは以上です。
それでは良いクリスマスを!


このページをシェアする

株式会社ブレインパッドについて

2004年の創業以来、「データ活用の促進を通じて持続可能な未来をつくる」をミッションに掲げ、データの可能性をまっすぐに信じてきたブレインパッドは、データ活用を核としたDX実践経験により、あらゆる社会課題や業界、企業の課題解決に貢献してきました。 そのため、「DXの核心はデータ活用」にあり、日々蓄積されるデータをうまく活用し、データドリブン経営に舵を切ることであると私達は考えています。

メールマガジン

Mail Magazine