Airflow에 직접 기여한 후기

박지은·2023년 9월 11일
0

프로젝트를 진행하면서 Airflow의 소스코드를 직접 수정하고 기여하게 된 이야기에 대해 작성해보려고 한다.

배경 상황

현재 데이터 엔지니어링 사이드 프로젝트를 진행하며, 프로젝트 배포를 위해 Naver Cloud Platform을 이용 중이다.

프로젝트 초기에 Airflow 상에서 pandas를 이용해 간단히 데이터를 처리하고 저장하던 그런 시기에 Object Storage에 저장하기 위해 Airflow와 Object Storage 사이에 Connection을 만들어 줘야 하는 상황이 있었다.

지금은 데이터 처리 작업을 Kubernetes Cluster에서 수행하기 때문에 Airflow와 Object Storage 사이에 직접 connection을 만들어줄 필요가 없어 더이상 문제가 되지는 않지만 말이다.

문제 발생

Naver Cloud Platform의 Object Storage는 Boto3 라이브러리를 이용해 python으로 쉽게 데이터를 업로드, 삭제, 다운로드 등의 작업을 수행할 수 있게 지원하고 있었다.
기존의 AWS connection에서 endpoint url만 naver cloud에서 제공하는 url로 바꾸면 되는 거였다!

그렇게 잘 될거라 생각하고 connection 정보를 입력하고 test connection을 수행해 보았는데.... 무한 실패 루프에 빠져버렸다 ㅎㅎ
그런데, 이런 실패한 connection으로 작업을 수행해 봤는데, 작업이 잘 수행이 되는거다 ???~

원인 분석

이걸 그냥 넘어갈 수 없었던 필자는 Airflow 코드를 살펴보기로 하였다.

def test_connection(self):
        """Test the AWS connection by call AWS STS (Security Token Service) GetCallerIdentity API.

        .. seealso::
            https://docs.aws.amazon.com/STS/latest/APIReference/API_GetCallerIdentity.html
        """
        try:
            session = self.get_session()
            conn_info = session.client("sts").get_caller_identity()
            metadata = conn_info.pop("ResponseMetadata", {})
            ...

AWS connection을 위해 Test connection을 수행할 때 AWS의 Security Token Service의 get caller identity 를 이용한다.

이때, get_caller_identity API를 이용해 계정이 유효한지 확인을 하는데, url을 지정해주지 않으면 무조건 https://sts.amazonaws.com로 확인 요청을 보내게 된다. 필자는 Naver Cloud에서 발급받은 key들을 이용하고 있으니 저 Test에서 유효한 결과가 나올리가 없었다.

문제 해결

이 될 줄 알았으나... 결론부터 말하자면 해결이 되지는 않았다.

Naver Cloud에도 AWS랑 비슷한 Secure Token Service를 제공해 여기에서 이용하는 endpoint인 https://sts.apigw.ntruss.com/api/v1를 이용해 전송을 해보기로 하였다.
따라서, 위 코드를 아래와 같이 직접 지정한 test_endpoint_url을 이용해 수행을 할 수 있도록 수정을 진행했다.
만약 test_endpoint_url을 지정해 주지 않았다면 aws의 default endpoint를 이용하게 된다.

def test_connection(self):
        """Test the AWS connection by call AWS STS (Security Token Service) GetCallerIdentity API.

        .. seealso::
            https://docs.aws.amazon.com/STS/latest/APIReference/API_GetCallerIdentity.html
        """
        try:
            session = self.get_session()
            test_endpoint_url = self.conn_config.extra_config.get("test_endpoint_url")
            conn_info = session.client(
                "sts",
                endpoint_url=test_endpoint_url,
            ).get_caller_identity()
            ...

그럼 이제 될까...?

그래도 Test connection은 수행되지 않았다.
다시금 문서를 읽어보니 둘이 같은 기능을 수행하는 API는 맞으나, API 호출 방법과 그에 따른 응답 사항이 달라서 수행할 수가 없었다.
Naver Cloud는 Object Storage만 Boto3와의 호환을 지원하는것 같다.

그래도 Naver처럼 Boto3를 이용하는 다른 Cloud Service Provider가 있지 않을까? 라는 생각에 일단 Issue를 등록해 두었고, assign도 받았겠다 구현 사항을 pull request를 해봤다.

커밋을 향하여

그렇게 열심히 작성해서 pull request를 진행했고, 첫 리뷰를 받게 되었다...!
코드 수정에 대한 요청과 테스트 코드 추가에 대한 요청이였다.

얼른 코드를 수정하고, 테스트 코드를 구현했다.
테스트 상에서 검증해야 할 건 두가지였다.

  1. test_endpoint=None일때, default endpoint로 실행이 되는가?
  2. test_endpoint를 설정했을 경우 해당 url로 실행이 되는가?

따라서 Mock connection을 생성해 주고, client에서 이용된 endpoint url과 예상했던 url이 같은지 확인할 수 있도록 구현하였다.

    @mock_sts
    @pytest.mark.parametrize(
        "test_endpoint_url, result_url",
        [
            (None, "https://sts.amazonaws.com"),
            ("https://sts.us-east-1.amazonaws.com", "https://sts.us-east-1.amazonaws.com"),
        ],
    )
    def test_hook_connection_endpoint_url_valid(self, test_endpoint_url, result_url):
        """Test if test_endpoint_url is valid in test connection"""
        conn = AwsConnectionWrapper.from_connection_metadata(conn_id=None)
        sf = BaseSessionFactory(conn=conn)
        session = sf.create_session()
        client = session.client("sts", endpoint_url=test_endpoint_url)

        assert client._endpoint.host == result_url

그리고...

Merge 되었다 ㅎㅎㅎ

결론

결론적으로 본래의 문제는 해결하지 못했지만, 보완 사항을 제안해서 처음으로 오픈소스에 기여할 수 있는 기회를 가지게 되었다.

어쨌든 처음으로 오픈소스에 기여하게 된 소중한 기회가 되었기도 하고, 잘 끝나서 기분이 좋다 ㅎㅎ

이를 시작으로 프로젝트 진행하다 오픈소스에 보완할 점이 보인다면 또 해볼것 같고, 그 외에도 직접 기능을 구현해서 기여해보고 싶기도 하다.

profile
Today I learned...

0개의 댓글