정산 자동화 작업기 [Airflow]

ahn__jh·2023년 1월 11일
0
post-thumbnail

1. 개요

현재 재직중인 회사에서 판매된 물건의 대금 및 수수료의 정산을 운영팀에서 맡아서 수기로 처리하고 있었다.

많은 주문 및 정산 금액을 관리하는 과정에서 시간이 너무나 많이 소요 되고 인력도 많이 투입되어 시간과 인력의 리소스를 낭비하고 있는 문제가 있다.

이러한 문제를 해결하기 위해 파트너센터 팀은 정산을 자동화 하는 프로젝트를 진행 하였다.

1-1. 백엔드 정산 FLOW

  1. 판매된 주문을 수집하여 DB 테이블에 적재

  2. 적재된 주문을 통해 정산 및 상계 데이터 생성

  3. 생성된 정산 데이터를 통해 파트너센터 정산 관리 페이지에서 확인

  4. 파트너에게 정산서 발송

  5. 정산 처리

이렇게 구성 되었으며 저는 이 과정에서 airflow를 통해 2번인 판매된 주문을 통해 수수료 및 정산 금액을 찾아 정산 데이터를 생성하고 DB에 적재하는 작업을 진행 하게 되었습니다.

2. 정산 자동화에 Airflow를 선택하게 된 이유

정산 즉 돈이 달려 있는 문제기 때문에 람다로 작업을 하게 되었을 경우 한번에 모든 작업을 완벽하게 종료 할 수 없는 상태 혹은 상황으로 데이터의 오류가 생길 수 있기 때문에 멱등성을 갖고 오류가 생기 더라도 재실행 하여 해결 할 수 있는 프레임워크인 airflow를 선택 하게 되었다.

람다와 다르게 airflow는 TASK를 잘게 쪼개어 큰 작업을 디버깅 하기가 쉽고 어디에 이슈가 있고 어떤 이유로 에러가 났는지 트래킹이 빠르기 때문에도 있다!

3. 작업

수수료 혹은 주문의 에러가 있는 주문들이 있을 경우를 고려 하여 파트너의 id를 입력 받아 수동 트리거 하여 해당 파트너의 정산 데이터만 재 생성할 수 있도록 설계 하였다.

또한 python 스크립트를 통해 데이터를 가공 하는 로직을 최대한 배제 했다.

3-1. Airflow TASK FLOW

  1. 정산 월 생성 (스케줄 매월 1일 실행)

  2. 정산 파트너 생성

  3. 주문에 대한 수수료, 정산 책정 금액, 판매코드 검증 -> 에러 주문이 있을 시 슬랙 알림 처리

  4. 주문에 대한 정산 주문, 상계 주문 데이터 생성

  5. 정산 주문 데이터의 수수료 or 정산 책정 금액 업데이트

  6. 정산 주문, 상계 데이터를 CSV파일로 s3에 적재

  7. 정산서 데이터 생성(사입의 경우 SKU로 생성되기 때문에 별도로 처리)

  8. 정산 월, 정산 월의 정산 대상인 파트너 (총 정산금액 및 판매 금액 업데이트)

3-2. 작업 과정

위탁 판매와 사입이 달라서 수수료 책정 하는 방법이 달랐다.

위탁 판매의 경우 판매된 금액의 상품 판매코드 별 예를 들어 15%의 수수료를 책정하여 정산 금액을 지급하는 반면
사입의 경우 sku별 정산 책정 금액이 있기 때문에 콤보로 판매 하더라도 각 상품의 정산 책정 금액을 각각 계산 하여 파트너에게 지급 된다.

이러한 문제 때문에 위탁 주문과 사입 주문의 쿼리가 별도로 필요했고 초기 개발하며 복잡해진 쿼리를 작성할 때 몹시 괴로웠다...😞

가장 기억에 남고 괴롭고 어려웠던 TASK (feat.살려줘요)

지금은 잘게 쪼개어져 정산 주문 생성과 수수료 업데이트가 별도의 TASK로 나뉘었지만 초기 작업엔 하나의 TASK로 작업 했었다.

이 때 가장 어렵고 힘들고 괴로웠던 작업이 수수료와 주문을 하나의 쿼리로 생성 하려고 하니 주문을 가져와 위탁 정산 데이터를 생성하는 쿼리와 사입의 정산 데이터를 생성하는 쿼리를 union 해서 한번에 작업을 하려고 하니 미친듯이 길어진 쿼리를 보게 되었다.

이게 무슨 쿼린지 지금 다시봐도 나중에 수정 사항이 생기면 피를 토할 지경이었다.. 왜 그랬을까🤯...

select 'SETTLEMENT'                                                          AS settlement_status,
       psg.order_number,
       p.product_name,
       psg.count                                                             as sales_count,
       psg.price_original                                                    as single_price,
       sum(psg.price) * psg.count                                            as sales_price,
       0                                                                     as sales_fee,
       if(sub_criteria.fee_rate, sub_criteria.fee_rate,
          if(sub_default.fee_rate, sub_default.fee_rate, 0))                 as sales_fee_rate,
       if(sub_criteria.fee_rate, psg.price * sub_criteria.fee_rate / 100, 0) as settlement_price,
       c.id                                                                  as channel_id,
       sp.id                                                                 as settlement_partner_id,
       psc.id                                                                AS sales_code_id,
       ordered_at,
       paid_at,
       if(sub_criteria.fee_rate is null and sub_default.fee_rate is null, 'FEE_ERROR',
          null)                                                              as error_status,
       psc.id,
       sub_criteria.fee_rate                                                 as sub,
       sub_default.fee_rate                                                  as default_,
       psg.id                                                                as psgid
from partner_settlement_gomi psg
         left join product_sales_code psc on psc.sales_code = psg.gspc
         left join product_sales_code_detail pscd on psc.id = pscd.product_sales_code_id
         left join product p on p.id = pscd.product_id
         left join channel c on c.channel_name = psg.channel
         left join product_group pg on pg.id = p.product_group_id
         left join company cp on cp.id = pg.company_id
         left join settlement_partner sp on pg.company_id = sp.company_id
         left join settlement_monthly sm on sm.id = sp.settlement_monthly_id
         left join (
    select psg2.id      as psg2_id,
           psc.id,
           scc.start_at as start_at,
           scc.end_at,
           scc.fee_rate,
           c.id         as cid,
           is_default
    from partner_settlement_gomi psg2
             left join product_sales_code psc on psc.sales_code = psg2.gspc
             left join product_sales_code_detail pscd on psc.id = pscd.product_sales_code_id
             left join product p on p.id = pscd.product_id
             left join channel c on c.channel_name = psg2.channel
             left join settlement_criteria_consignment scc on scc.sales_code_id = psc.id and c.id = scc.channel_id
             left join (select sub_scc.sales_code_id, start_at, fee_rate, end_at
                        from settlement_criteria_consignment sub_scc
                        where channel_id = 5) as sub_query on sub_query.sales_code_id = psc.id
    where cancelled_at is null
      and paid_at is not null
      and ordered_at >= scc.start_at
      and is_default = 1
      and scc.deleted_at is null
    group by psg2.id, p.id
) sub_default on psg.id = sub_default.psg2_id

         left join(
    select psg3.id      as psg3_id,
           scc.start_at as start_at,
           scc.end_at,
           scc.fee_rate,
           p.id         as pid,
           c.id         as cid,
           scc.is_default
    from partner_settlement_gomi psg3
             left join product_sales_code psc on psc.sales_code = psg3.gspc
             left join product_sales_code_detail pscd on psc.id = pscd.product_sales_code_id
             left join product p on p.id = pscd.product_id
             left join channel c on c.channel_name = psg3.channel
             left join settlement_criteria_consignment scc on scc.sales_code_id = psc.id and c.id = scc.channel_id
    where cancelled_at is null
      and paid_at is not null
      and scc.deleted_at is null
      and psg3.ordered_at BETWEEN scc.start_at AND scc.end_at
      and scc.is_default = 0
    group by psg3.id, p.id
) sub_criteria on psg.id = sub_criteria.psg3_id
where ordered_at BETWEEN '2022-10-01 00:00:00' AND '2022-11-30 23:59:59'
  AND sm.settlement_year_month = '2022-12'
  and cp.sales_type = 0 #0: 위탁, 1: 사입
  AND cancelled_at is null
  AND paid_at is not null
  AND (ordered_at, order_number, sales_code, price) not in
      (SELECT soi.ordered_at, soi.order_number, psc2.sales_code, single_price
       FROM settlement_order_item soi
                LEFT JOIN settlement_monthly s ON soi.settlement_monthly_id = s.id
                LEFT JOIN product_sales_code psc2 ON soi.sales_code_id = psc2.id
       WHERE s.settlement_year_month = '2022-11'
         AND settlement_status = 'SETTLEMENT')
group by psg.id
having error_status != 'FEE_ERROR'

union

select 'SETTLEMENT'                                                          AS settlement_status,
       psg.order_number,
       p.product_name,
       psg.count                                                             as sales_count,
       psg.price_original                                                    as single_price,
       sum(psg.price) * psg.count                                            as sales_price,
       0                                                                     as sales_fee,
       if(sub_default.fee_rate, sub_default.fee_rate,
          if(sub_default.fee_rate, sub_default.fee_rate, 0))                 as sales_fee_rate,
       if(sub_default.fee_rate, psg.price * sub_default.fee_rate / 100, 0) as settlement_price,
       c.id                                                                  as channel_id,
       sp.id                                                                 as settlement_partner_id,
       psc.id                                                                AS sales_code_id,
       ordered_at,
       paid_at,
       null as error_status,
       psc.id,
       sub_default.fee_rate                                                 as sub,
       sub_default.fee_rate                                                  as default_,
       psg.id                                                                as psgid
from partner_settlement_gomi psg
         left join product_sales_code psc on psc.sales_code = psg.gspc
         left join product_sales_code_detail pscd on psc.id = pscd.product_sales_code_id
         left join product p on p.id = pscd.product_id
         left join channel c on c.channel_name = psg.channel
         left join product_group pg on pg.id = p.product_group_id
         left join company cp on cp.id = pg.company_id
         left join settlement_partner sp on pg.company_id = sp.company_id
         left join settlement_monthly sm on sm.id = sp.settlement_monthly_id
         left join (
    select psg2.id      as psg2_id,
           psc.id,
           scc.start_at as start_at,
           scc.end_at,
           scc.fee_rate,
           is_default,
           scc.channel_id
    from partner_settlement_gomi psg2
             left join product_sales_code psc on psc.sales_code = psg2.gspc
             left join product_sales_code_detail pscd on psc.id = pscd.product_sales_code_id
             left join product p on p.id = pscd.product_id
             left join settlement_criteria_consignment scc on scc.sales_code_id = psc.id
    where cancelled_at is null
      and paid_at is not null
      and ordered_at >= scc.start_at
      and is_default = 1
      and scc.channel_id = 5
      and scc.deleted_at is null
    group by psg2.id, p.id
) sub_default on psg.id = sub_default.psg2_id
where ordered_at BETWEEN '2022-10-01 00:00:00' AND '2022-11-30 23:59:59'
  AND sm.settlement_year_month = '2022-12'
  and cp.sales_type = 0 #0: 위탁, 1: 사입
  AND cancelled_at is null
  AND paid_at is not null
  AND (ordered_at, order_number, sales_code, price) not in
      (SELECT soi.ordered_at, soi.order_number, psc2.sales_code, single_price
       FROM settlement_order_item soi
                LEFT JOIN settlement_monthly s ON soi.settlement_monthly_id = s.id
                LEFT JOIN product_sales_code psc2 ON soi.sales_code_id = psc2.id
       WHERE s.settlement_year_month = '2022-11'
         AND settlement_status = 'SETTLEMENT')
and fee_rate is not null
group by psg.id;코드를 입력하세요

이후 TASK를 분리하여 정산 주문을 생성하는 쿼리 그리고 수수료를 업데이트 하는 쿼리를 분리하니 훨씬 보기 좋아졌다.

  • 정산 주문 생성 쿼리
select 'SETTLEMENT'                     AS settlement_status,
        psg.order_number,
        psg.count                       as sales_count,
        p.product_name,
        psg.price_original              as single_price,
        psg.price_original * psg.count  as sales_price,
        0                               as sales_fee,
        0                               as sales_fee_rate,
        0                               as settlement_price,
        c.id                            as channel_id,
        sp.id                           as settlement_partner_id,
        %s                              as settlement_monthly_id,
        psc.id                          as sales_code_id,
        ordered_at,
        paid_at
from partner_settlement_gomi psg
        left join product_sales_code psc on psc.sales_code = psg.gspc
        left join product_sales_code_detail pscd on psc.id = pscd.product_sales_code_id
        left join product p on p.id = pscd.product_id
        left join channel c on c.channel_name = psg.channel
        left join product_group pg on pg.id = p.product_group_id
        left join company cp on cp.id = pg.company_id
        left join settlement_partner sp on pg.company_id = sp.company_id
where ordered_at BETWEEN %s AND %s
    AND sp.settlement_monthly_id = %s
    AND cancelled_at is null
    AND paid_at is not null
    # 정산 대상 주문에 포함 되어 있지 않은 주문을 조회
    AND (ordered_at, order_number, sales_code) not in
        (SELECT soi.ordered_at, soi.order_number, psc2.sales_code
            FROM settlement_order_item soi
                LEFT JOIN settlement_monthly s ON soi.settlement_monthly_id = s.id
                LEFT JOIN product_sales_code psc2 ON soi.sales_code_id = psc2.id
        WHERE s.settlement_year_month = %s
            AND settlement_status = 'SETTLEMENT')

  • 수수료 업데이트 쿼리
update settlement_order_item as soi, (
    select 
        soi.id,
        soi.sales_price - sum(sub_default.criteria_price * pscd.count * soi.sales_count) as sales_fee,
        sum(sub_default.criteria_price * pscd.count * soi.sales_count)                   as settlement_price,
        soi.sales_price                                                                  as sales_price,
        if(sub_default.criteria_price, null, 'FEE_ERROR')                                as error_status
    from settlement_order_item soi
        left join settlement_partner sp on soi.settlement_partner_id = sp.id
        left join company cp on sp.company_id = cp.id
        left join product_sales_code_detail pscd on soi.sales_code_id = pscd.product_sales_code_id
        left join product p on p.id = pscd.product_id
        left join (select max(start_at) as start_at, criteria_price, soi3.id, p.id as pid
                    from settlement_order_item soi3
                            left join product_sales_code_detail pscd on soi3.sales_code_id = pscd.product_sales_code_id
                            left join product p on p.id = pscd.product_id
                            left join settlement_criteria_purchase_of_sales scpos on p.id = scpos.product_id
                    where soi3.ordered_at >= scpos.start_at
                    group by soi3.id, p.id) as sub_default on soi.id = sub_default.id and p.id = sub_default.pid
    where soi.settlement_monthly_id = %s
        and cp.sales_type = 1
        and settlement_status = 'SETTLEMENT'
    group by soi.id) as sub
set soi.sales_fee        = sub.sales_fee,
    soi.settlement_price = sub.settlement_price,
    soi.error_status = sub.error_status
where soi.id = sub.id

쿼리 다 짜고 테스트하면 어..? 이런 예외 케이스가 또 생겼네..

쿼리 수정..
2차 수정..
3차 수정..

어?.. 왜 안돼?

저... 살려주세요...

이 쿼리의 고난과 역경을 살려줘요 했을 때 같이 도와주신 백엔드 팀원들께 감사합니다...

4. 파이썬 그리고 Airflow를 접하며

1. 파이썬

원래 파이썬 장고로 첫 개발을 시작했었던 점도 있고 타 팀에서 Airflow를 운영 했던분이 계셔서
그래서 였을까 팀 리더 분이 " 정현님 파이썬 어때요 Airflow로 하는게 좋을 것 같은데 어때요 한번 해보실래요? " 라고 물었었다.

그 땐 당차게 아 좋죠 파이썬 좋아요 하시죠! 했었다.

파이썬은 재밌었다 읽기 쉽고 뭔가 친숙하긴 했다.

그러나

작업 하면서 ORM이 그렇게 그립고 그립고 또 그리웠다.

2. Airflow

Airflow를 사용하기 위해 학습이 필요했다.

혼자 슬슬 DB연결 하고 여러 Airflow에서 지원하는 Operator들을 찾아보고 사용하며 학습하는데 진짜 공식문서가 불친절 한게 예제가 하나도없네.. 라고 느꼈다.

Operator가 있는데 어떡하라고ㅋㅋㅋㅋ 쓰는 방법은 결국 몸빵했다.
믿을 수 있는건 스태오버플로우와 구글링 뿐
그렇게 하나하나 터득 해가며 첫 TASK를 완료 되었을 땐 어찌어찌 굴러는 가네... 하며 기뻐했다.

각각의 DAG를 폴더로 구성하고 관련된 쿼리와 dag파일을 묶었다.

공통으로 사용되는 날짜 관련 util 함수 enum을 includes 폴더를 구성해 사용했다.

솔직히 처음 접하는거며 Airflow 폴더 구성도 잘한지 못 한지 잘 모르겠다.

5. QA과정

TASK를 QA하기 위한 TASK의 기능과 TEST CASE 문서를 노션으로 작성했다.

가장 중점적으로 생각하고 제일 많이 테스트 한 경우는 정산서와 정산 주문 생성 TASK였다.

콤보(묶음)인 주문이 쿼리를 작성 할 때 생각보다 예외 케이스가 너무 많았어서.

6. 모든 작업 이후 현재 Airflow!

병렬로 처리할 수 있는 TASK들은 병렬로 실행 되도록!
TASK가 실패하면 슬랙으로 알림이 오도록!
중간중간 작업 할 데이터가 없어서 생성 할 부분이 없다면 skip하고 다음 TASK를 실행 할 수 있게

0개의 댓글