[TIL 45일자] 데브코스 데이터엔지니어링

·2023년 6월 11일
0

데브코스

목록 보기
41/55
post-thumbnail
post-custom-banner

📚 오늘 공부한 내용

1. MySQL 테이블 복사하기

  • INSERT INTO는 내가 적재하려는 데이터의 수가 적을 경우 선호되는 방식이다.
  • 그렇기 때문에 대용량의 데이터를 적재할 때는 COPY 명령어를 사용하는 것이 더 선호된다.

2. MySQL Connections 설정 시 유의할 사항

  • MySQL 모듈이 없다는 오류가 발생할 수 있다. 그럴 경우 이렇게 처리해야 한다.
    • Airflow Scheduler Docker Container에 root 유저로 로그인해야 한다.
    • docker exec --user root -it Scheduler_Container_ID sh
    • sudo apt-get update
    • sudo apt-get install -y default-libmysqlclient-dev
    • sudo apt-get install -y gcc
    • sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"

3. AWS S3 접근은? (Connections)

  • Access Key IDSecret Access Key를 사용하는 것으로 바뀌었다.
  • 루트 사용자의 키를 사용하면 해킹 시 AWS 자원을 마음대로 사용할 수 있게 되기 때문에 루트 사용자를 사용하면 안 된다.
  • IAM을 사용해 별도의 사용자를 만들고 S3 bucket을 읽고 쓸 수 있는 권한을 제공한다.
  • 그리고 그 사용자의 Access Key IDSecret Access Key를 사용하고 주기적으로 변경해 준다.
  • 이때 해당 IAM 사용자에게 S3FullAccess 권한을 주는 것은 위험하기 때문에 policy를 직접 작성해 지정해 준다.
{
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Action": [
 "s3:GetBucketLocation",
 "s3:ListAllMyBuckets"
 ],
 "Resource": "arn:aws:s3:::*"
 },
 {
 "Effect": "Allow",
 "Action": "s3:*",
 "Resource": [
 "arn:aws:s3:::s3이름",
 "arn:aws:s3:::s3이름/*"
 ]
 }
 ]

4. Incremental Update 구현 방식 (실습)

1) Incremental Update 시 있어야 할 컬럼

  • created (timestamp): 데이터가 처음 생성된 시간, 꼭 필요한 것은 아님.
  • modified (timestamp): 수정된 시간.
  • deleted (boolean): 테이블에서 레코드가 삭제를 하게 된다면 deleted True를 설정해 줌. (이력을 남기기 위해서)

2) Primary Key Uniqueness

Daily Update이고, 테이블 이름이 A이고 MySQL에서 읽어온다면

  • ROW_NUMBER로 직접 구현하는 경우,
    • Redshift A 테이블 내용을 임시 테이블을 생성해 임시 테이블로 복사
    • A 레코드 중 modified의 날짜가 지난 일(execution_date)에 해당하는 모든 레코드를 읽어서 임시 테이블에 복사
    • SELECT * FROM A WHERE DATE(MODIFIED) = DATE(EXECUTION_DATE)
    • 임시 테이블 레코드들을 Primary Key 기준으로 PARTITION BY 해 준 다음 modified 값을 기준으로 DESC 정렬해 해당 번호가 1인 것만 A 테이블에 복사
  • S3ToRedshiftOperator로 구현하는 경우,
    • query 파라미터로 아래를 지정해 준다.
    • SELECT * FROM A WHERE DATE(MODIFIED) = DATE(EXECUTION_DATE)
    • method 파라미터UPSERT를 지정
    • upsert_keys 파라미터Primary key로 지정해 준다. 이때 upsert_keys는 리스트이다. 특정 테이블의 PK가 컬럼이 하나가 아니라 여러 개라면 컬럼 이름을 쭉 적어 주면 된다.
s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id = 's3_to_redshift_nps',
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    schema = schema,
    table = table,
    copy_options=['csv'],
    redshift_conn_id = "redshift_dev_db",
    aws_conn_id = "aws_conn_id",    
    method = "UPSERT",
    upsert_keys = ["id"], #id를 기준으로 있는 레코드인지 없는 레코드인지를 확인해 주고 없다면 새로 만드는 것을 redshift에서 알아서 처리해 줌
    dag = dag
)

5. Daily Incremental DAG에서 2018 년 7월달 데이터를 다 다시 읽어와야 한다면?

  • Airflow에서 추천하는 방식으로 Incremental Update를 구현하면 backfill이 쉬워진다.
  • 하루에 하나씩 365 번 돌리거나,
  • 하루씩 31 번씩 다른 날짜에 대해 실행해 줄 수 있다.
    • 이 방법으로 실행하는 경우 데이터 소스에 들어가는 오버 헤드가 클 수 있다. (동시에 들어가게 되면 동작을 멈출 수 있음) -> 프로덕션 DB라면 큰 문제가 된다.
    • DAG가 어떻게 구현했느냐에 따라서 한 번에 하나 이상의 데이터를 다른 날짜에 대해 돌리면 TASK끼리 충돌이 날 수도 있다. 뭐가 먼저 실행되느냐에 따라 읽어야 하는 값이 오버라이딩 될 수도 있고, 동시에 접근한다면 시작 데이터가 달라야 하는데 같아질 수도 있다.
    • DAG 파라미터 max_active_runs (한 번에 실행할 최대 작업의 수를 설정)

✍ 어떻게 backfill로 구현할 것인가?

  • 제일 중요한 것은 데이터 소스가 backfill 방식을 지원해야 한다.
  • "execution_date"를 사용해서 업데이트할 데이터를 결정한다.
  • "catchup" 필드는 True여야 한다.
  • start_date/end_date를 backfill 하려는 날짜로 설정한다.
  • DAG 구현이 execution_date를 고려해야 하고 멱등성(idempotent)을 고려해야 한다.


🔎 어려웠던 내용 & 새로 알게 된 내용

1. SqlToS3Operator

  • SqlToS3Operator는 MySQL의 SQL 결과를 S3에 적재해 준다.
  • 이때 S3 어떤 bucket에 적재를 할 것인지에 대해 설정해 주어야 한다.
  • s3://s3_bucket/s3_key

2. S3ToRedshiftOperator

  • S3를 Redshift 테이블로 COPY 해 준다.
  • 이때 파라미터를 제대로 넘겨 주어야 한다.
  • (s3://s3_bucket/{ID}-nps) -> Redshift (스키마.nps)

3. DB slave/master

  • MasterSlave는 DB의 구조인데 Replication(복제) 매커니즘을 이용해 DB 데이터를 물리적으로 복사해 다른 곳에 넣어 두는 기술을 의미한다.
  • DB 트래픽 분산과 백업을 위해 도입된 개념으로 Master메인 프로세스를 담당하고, Slave는 그외 보조의 일을 담당하는 것이다.
  • 예를 들어 DB 트래픽 분산 시 Master DB는 데이터의 변경(INSERT, UPDATE, DELETE)을 하고, Slave DB는 데이터를 읽기(SELECT)만 한다. 이후 변경이 일어난 데이터들이 Master DB에서 Slave DB로 전달된다.
  • 즉, Master는 데이터 동시성이 아주 높게 요구되는 트랜잭션을 담당하고, Slave는 읽기만 데이터 동시성이 꼭 보장될 필요 없는 경우에 읽기 전용으로 데이터를 가지고 오게 된다.


✍ 회고

- 회사에서 DB 작업을 할 때 처음 생성된 시간에 대한 컬럼이 존재하고, 데이터가 Update 될 때마다 Update 시간이 기록되는 컬럼이 있고, 삭제된 데이터인지 아닌지를 표현해 주는 DEL_YN과 같은 컬럼들이 대부분의 테이블에 들어갔는데 프로덕션 데이터베이스기는 했지만 Incremental Update와 같은 개념이었던 것 같다. 4 번을 작성하면서 그때 경험이 떠올랐다.

- 마지막 강의가 airflow 배운 것들을 최종 정리하는 부분이라 백지 학습을 해 보는 게 좋을 것 같아서 메모로 따로 질문만 모아 정리해 두고 복습해 보기로 하였다.

profile
송의 개발 LOG
post-custom-banner

0개의 댓글