[Airflow]KubernetesPodOperatorのXComのサイドカーに独自コンテナを使う

KubernetesPodOperatorのXComsサイドカーに独自コンテナを使う

タイトルの通りです。通常はalpine:latestが用いられますが、閉域網で実行している場合など、独自のコンテナを使用する際の設定を備忘録として残しておきます。

検証時のairflowのversionは2.9.2になります。

XComとは

airflowの機能の一つでcross-communicationsの略。task間でデータを連携できる便利機能です。

公式サイトをご参照ください。
do_xcom_pushTrueにすることでインターネットにつながる環境下では普通に使うことができます。以下は公式のコードより転載。

write_xcom = KubernetesPodOperator(
        namespace="default",
        image="alpine",
        cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
        name="write-xcom",
        do_xcom_push=True,
        on_finish_action="delete_pod",
        in_cluster=True,
        task_id="write-xcom",
        get_logs=True,
    )

    pod_task_xcom_result = BashOperator(
        bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
        task_id="pod_task_xcom_result",
    )

    write_xcom >> pod_task_xcom_result

この際にalpine:latestdocker.ioからpullされて使用されますが、閉域環境などでは独自のコンテナを使用することになります。

独自コンテナを使用する

githubのコードを見るとPodDefaultsでxcomsのデフォルトコンテナを指定していることが分かります。これを上書きするコードをairflow_local_settings.pyに書くことでコンテナを変更できます。

from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults
PodDefaults.SIDECAR_CONTAINER.image = private.registry.com/alpine

詳細は公式ドキュメントをご確認ください。

コメント

タイトルとURLをコピーしました