Airflow KubernetesPodOperator 작업에서 실패한 후 XCom 값을 가져오는 방법은?

저는 bash 스크립트를 실행하는 KubernetesPodOperator(KPO)를 상속한 연산자를 개발 중입니다. 이 bash 스크립트의 다양한 실패 원인을 처리하기 위한 오류 처리를 내장했습니다.

이 오류 처리가 제대로 작동하는지 확인하기 위해 테스트 DAG를 만들고 있습니다. 이 DAG에는 KPO 기반 작업의 종료 코드를 예상된 종료 코드 값과 비교하는 후속 작업이 포함되어 있습니다. 이를 위해 KPO 기반 연산자 작업에서 종료 코드를 XCom으로 푸시하고, 후속 작업에서 이 종료 코드를 가져오려고 시도하고 있습니다.​

성공적인 실행의 경우에는 이 방법이 작동하지만, 작업이 처음에 실패한 경우에는 아직 작동하지 않습니다. KPO 문서에서 "XCom은 State.SUCCESS로 표시된 작업에 대해서만 푸시됩니다."라는 내용을 확인했고, 이에 대한 해결 방법을 찾고 있습니다.​ 작업을 성공으로 표시하는 방법을 시도했지만, 작업 상태는 성공으로 변경되었지만 종료 코드는 여전히 XCom 값으로 존재하지 않아 후속 작업에서 종료 코드를 가져올 수 없습니다.

KubernetesPodOperator는 callbacks 매개변수를 허용합니다. 이를 활용하여 Pod 완료 후, 삭제되기 전에 호출되는 메서드를 생성할 수 있습니다.​

예를 들어, 다음과 같은 클래스를 생성할 수 있습니다:​

python

CopyEdit

class MyKpoCallback(KubernetesPodOperatorCallback):
    @staticmethod
    def on_pod_completion(*, pod: k8s.V1Pod, client: client_type, mode: str, **kwargs) -> None:
        context = get_current_context()
        exit_code = pod.status.container_statuses[0].state.terminated.exit_code
        context["ti"].xcom_push(key="exit_code", value=exit_code)

이 클래스를 KubernetesPodOperator에 전달하면, Pod 완료 후 종료 코드를 XCom에 푸시할 수 있습니다. 위 코드는 테스트되지 않았으며, k8s 클라이언트 라이브러리에 대한 고수준 검토를 기반으로 한 의사 코드입니다.