4. airflow-day3-4

data_hamster·2023년 6월 11일
0

학습주제
야후 파이낸스 API 대그 작성

학습내용

부록

https://velog.io/@sophi_e/Airflow-docker-compose%EB%A1%9C-%EC%97%90%EC%96%B4%ED%94%8C%EB%A1%9C%EC%9A%B0-%EC%84%A4%EC%B9%98%ED%95%98%EA%B8%B0
에어플로우 유저 권한 셋팅

mkdir -p ./dags ./logs ./plugins # mkdir -p 옵션 : 여러 하위 디렉토리를 생성시에 사용한다.
echo -e "AIRFLOW_UID=$(id -u)" > .env

AIRFLOW_UID=50000

에어플로우 init 하기 전에 실행시켜주면 됨. 윈도우, 우분투에서 8080 unhealty 이슈 해결


애플 주식을 읽어오는 풀리프레쉬 기반의 DAG 작성

풀리프레쉬 형태로 만들고 이후 인크리멘탈 업데이트 형식으로 바꿔본다


파이썬 모듈을 사용할 수 있음
yfinance를 설치

도커라면 도커에 로그인해서 그 안에서 설치.

기본적으로 지난 30일 주식정보를 리턴, 과거 어느 시점을 가져오진 않음.
풀리프레쉬 구현 의미 매일 지난 30일 정보를 가져온다.

인크리멘탈 업데이트시, 테이블 날리는게 아니라, 적재 하고, 중복을 제거하는 형태로 감.
매일 레코드가 한개씩 늘어나는 형태.

stock_info라는 테이블에 적재함.
풀리프레쉬로 갈꺼기에 레코드 날리고, 재적재.
이번엔 DROP TABLE, CREATE TABLE
두개의 태스크로 구현
Extract/Transform
Load


https://github.com/learndataeng/learn-airflow/blob/main/dags/UpdateSymbol.py#L20
aws 서버에는
pip3 install yfinance 설치함

import yfinance as yf

@task
def get_historical_prices(symbol):
	ticket = yf.Ticker(symbol)
    data = tickey.history()
    records =[]
    
    for index, row in data.iterrows();
    	date = index.strftime("%Y-%m-%d %H:%M:%S)
        records.append([date, row["Open"], row["High"], row["Close"], row["Volume"]])
    return records

태스크 데코레이터를 사용해서 함수를 만듦
심볼. 미국주식이기 때문이 심볼이 4개로 구성되어 있음. 애플의 경우 AAPL 구글은GOOG 이런 심볼은 받아서 Ticker라는 함수에 스톡 심볼을 넣으면 티켓 정보가 생기고, 히스토리 함수를 부르면 지난 30일 간의 해당 주식의 정보를 판다스 데이터프레임 형태로 리턴해줌. 판다스 데이터프레임에는 30일 주식정보. 휴일은 안하기에 20~22개의 레코드가 있을 예정. 이를 루프를 돌면서, 각 날짜에 해당하는 정보를 하나의 아이템으로 만들어서 리스트에 append.
인덱스가 날짜 정보. 판다스의 타임스탬프 정보라 그대로 쓸수 없음. 이에 연월일 시분초 형태의 스트링으로 변환.
date가 해당 날짜. 시작 주가, 최고, 최저, 끝날때, 매수매도 물량
최종적으로 records 리스트 리턴

뒷부분이 transform이 됨.


풀리프레쉬 형태로 테이블 삭제하고
테이블 만들고 INSERT INTO로 레코드 적재.
아무문제 없으면 트랜잭션 닫기

name, gender와 흡사함.
다만 테이블이 만들어져 있다는 전제하에 DELETE를 썼던거와는 다름
https://github.com/learndataeng/learn-airflow/blob/main/dags/UpdateSymbol.py

도커에소 yfinace 모듈이 필요함
이 대그를 실행해본다
airflow dags test UpdateSymbol 2023-05-20

또 도커 컨테이너에 루트 유저로 로그인하는 방법이 어떤건지. 지금 로그인 방식은 슈퍼유저가 아닌 airflow 유저로 로그인. 일부 오퍼레이션을 할 수 없는게 있음. 가끔은 루트 유저로 로그인해야함.

해당코드 UpdateSymbol.py임
task 모듈을 임포트하고
postgresHook 사용
에어플로우 커넥션스 매커니즘을 사용. 연결정보를 감춤
get_historical_price 그대로
load의 경우
name, gender 때와 인자 받는게 비슷
cur 받아서 try 블록 안에서 트랜잭션 열고
테이블 날리고 새로 만듦.
바깥에서 테이블을 만들지 않아도 만들고 시작할 수 있음. 조금 더 완전한 형태의 구현.
테이블 스키마를 보면,
extract의 리스트 만드는 순서와 동일한 것을 볼 수 있다.

6개의 필드로 구성된 테이블이 보인다. 지금은 애플주식만 대상으로 하기에 별다른 주식에 대한 정보는 없다. 다양한 회사에 대해 주가정보를 기록하고 싶다면 company, stock symbol같은 필드가 있을 수 있다.

레코드 받은걸 가지고
INSERT INTO 가지고 적재시도.
루프 마무리하면
COMMIT 시도
에러나면
ROLLBACK

DAG로 가면 별다른게 없음
AAPL 스톡심볼 넘기고, 주가정보 리스트를 load함수 인자로 넘김.
실행시킬 때는 스키마 이름을 내껄로 바꾸기.

도커 컨테이너 스케줄러로 로그인
docker ps
스케줄러 아이디 복사
docker exec -it 컨테이너아이디 sh
winpty docker exec -it afafa358637c sh
git bash에선 앞에 winpty를 붙여야 실행됨
들어가서
yfinance를 여기서 설치
pip3 install yfinance


dags 폴더로 들어가
저 대그의 태크스들을 확인해봄
airflow tasks list UpdateSymbol


대그 전체를 커맨드라인에서 실행시켜본다
airflow dags test UpdateSymbol 2023-05-30
5월 30일 부터 시작이기에 이날짜로 줘본다.
로그들이 생성됨.
태스크 2개가 순차실행

잘 실행이 된거 같음.

yfinance 모듈이 제대로 설치되지 않아도 에러 뜸.
airflow유저로 접속해서 모듈 설치하면 정상 작동.

이번에는 루트 유저로 로그인해본다.
docker exec --user root -it 컨테이너아이디 sh
루트 유저로 로그인. 특별한 차이가 없어보인다.

깃배쉬에서 winpty 앞에 붙이기
그러나 이 안에서 무슨일인지 수행 가능
exit 로 나가본다.

profile
반갑습니다 햄스터 좋아합니다

0개의 댓글