저번 글에 이어서 Youtube Data Analysis를 이어가겠다. 
Glue에 crawler를 하나 추가해보자. 이때의 Data Source는 S3 버킷 중 raw data > statistics 로 설정한다. 이후 IAM role은 이전에 만들어두었던 glue 전용 role을 선택해주고, Database도 raw 파일 전용으로 선택해준다. Glue Crawler가 생성되면 RUN 시키고, 자동적으로 멈출때까지 기다린다.
그런 다음 Athena로 돌아가서 raw DB를 선택하고, Tables에서 raw_statistics를 선택한다. (Partitioned) 테이블을 우클릭하고 Preview Table을 선택하면 자동적으로 쿼리가 실행되고, 결과도 보여준다.
이때 중요한 것은 현재 쿼리를 실행하고 있는 테이블이 Partitioned table이라는 것이다. 따라서 클렌징이 완료된 reference_data 테이블과 join을 시켜줄 것이다. cleaned DB를 선택하고 다음의 쿼리를 입력 후 실행해본다.
SELECT a.title, a.category_id, b.snippet_title FROM "de_youtube_rqw"."raw_statistics" a
INNER JOIN "db_youtube_cleaned"."cleaned_statistics_reference_data" b ON a.category_id = cast(b.id as int);a.category_id는 bigint이고 b.id는 varchar이기때문에 cast를 이용해서 int로 변환했다. 사실 모든 데이터를 pre-processing을 마쳐야 하기 때문에 쿼리에서 cast를 쓰는 것은 그렇게 좋은 방법은 아니다. 데이터의 양이 많아질수록 cast는 더 사용하기 어려워진다. 따라서 테이블 자체의 id라는 컬럼을 int로 바꿀 것이다.
Glue의 Tables로 돌아가서 cleaned_statistics_reference_data를 선택하면 id라는 컬럼은 string 타입 인것을 알 수 있다. 하지만 Edit Schema를 통해 string 타입을 bigint 타입으로 변경할 수 있다. 변경을 저장하고, cast 부분을 지운 다음 쿼리를 실행하면 오류가 뜬다. parquet과 metastore schema 사이에 match가 되지 않는다는 것이다.
즉, 실제 parquet은 string인데, metadata에 따르면 bigint여야 한다는 mismatch가 발생한다. 이를 해결하기 위해서는 다음을 따라야 한다.
1. Keep the data type change in the data catalogue
2. Delete our testing JSON file
3. Confirm APPEND in Lambda
4. Run Test event in Lambda
5. Copy again our data, from our laptops (AWS CLI)
6. Add the S3 Trigger to Lambda
1번은 그냥 현재 상태를 유지하는 것이니 2번부터 천천히 따라해보자. S3 버킷 중 cleaned를 모아두는 버킷에서 lambda를 사용해서 만들었던 parquet 파일을 삭제한다. 3번을 하기 위해 Lambda로 들어가 사용했던 함수를 열고 새로운 event를 생성한다. S3 put 템플릿을 이용하고, 이전에 만들었던것과 같이 raw reference data에 대해서 버킷과 key를 설정하고, TEST를 돌린다. 코드가 정상적으로 실행되고 나면 이전과 같이 path가 나오면서 parquet이 존재하는 경로를 알려준다. 그런 다음 위에서 오류가 발생했던 Athena 쿼리를 실행하면 정상적으로 실행됨을 알 수 있다.
Glue Catalog에서 " id 컬럼은 bigint 타입이다. "로 설정해두고 나서 lambda를 이용해 다시 parquet 파일을 생성했다. 다시 만들어진 parquet 파일은 카탈로그를 따라서 id를 bigint로 생성한 것이다.
이제 raw 데이터를 cleaned reference data가 있는 버킷으로 옮기기 위해 Glue에서 Job을 생성할 것이다. 사실 유튜브 영상에서 사용하는 Glue Job과 현재의 Glue가 많이 달라져서, 영상 그대로 따라할 수는 없었다. 그래도 Reference Github에 올라와있는 코드를 활용해나가며 완성했다.
현재로써는 Glue에서 ETL Job을 생성할 때 "Spark script editor"를 선택하고, Github의 코드를 나의 DB이름, table 이름, path 등에 맞게 수정한 뒤 Script로 사용했다. 새로 나온 방법인 Visual ETL로도 시도를 해봤지만, region을 세 가지만 선택하는 부분이 도저히 어려울 것 같아서 그냥 spark script editior로 최대한 Reference tutorial을 따라갔다. 그 과정에서 다양한 오류들과 만났는데 이에 대해 간단하게 덧붙이고 넘어가겠다.
아무튼 위의 방법대로 Job을 수행하고 나면 지정한 폴더 내에 3가지 region에 대한 폴더가 새롭게 생성되어있는 것과, 폴더 내에는 parquet format의 자료들이 들어있음을 알 수 있다. 나는 이 3가지 폴더를 1가지 폴더 내로 이동시켜 정리했다. Job 코드를 실행하기 전에 폴더를 미리 만들어 두고 connection_type의 path를 수정해도 같은 결과가 나올 것이다.
이제 정말로 데이터는 준비됐으니 Crawler를 생성해 Data catalog를 만들어보자.
Create Crawler 를 선택하고 이름을 적당히 지어준다. data source는 위에서 생성한 3가지 폴더가 들어있는 폴더로 지정, IAM role은 glue 전용으로 만들어 둔 role로 설정, target DB는 cleaned로 설정한다. crawler를 완성하면 run을 돌려주고 작업이 완료될 때까지 기다린다.
완료되기를 기다리면서 Lambda에 접속한다. 지금까지 우리는 lambda에서 "configure test event"에서 한가지 json파일을 지정하고 이를 통해 test event를 돌렸다. 하지만 우리는 이제 모든 json파일에 대해서 함수를 돌리고 싶고, 언제든 json 파일이 버킷 안으로 들어오면 자동적으로 함수가 돌아가서 output을 생성하게끔 만들고 싶은 것이다. 이를 위해서는 생성해둔 lambda function에 트리거를 추가하면 된다.
트리거 추가를 누르고 S3를 선택하면 현재 내가 생성해둔 버킷을 모두 볼 수 있다. 여기서 raw data가 들어있는 버킷을 선택한다. 이벤트 유형은 자동적으로 모든 객체 생성 이벤트가 선택되어있다. 이는 create, copy 등의 모든 작업을 포함하는 것이기 때문에 그대로 두면 된다. 접두사와 접미사의 경우는 파일이 저장되는 경로와 파일 형식을 지정할 수 있는데, 접두사로는 버킷 내에서 json 파일이 들어있는 곳까지의 경로를(띄어쓰기 없이), 접미사는 ".json"으로 설정하면 된다.
트리거가 잘 작동하는지 알아보기 위해서 기존의 json파일들을 전부 삭제하고, 다시 업로드해볼것이다. 일단 json 파일을 전부 삭제하고, 이전에 test event를 통해 lambda function이 생성한 parquet 파일도 삭제해준다. 이제 json 파일들을 업로드 해줄 것인데, 이는 처음에 했던 것과 같이 하면 된다. 다음의 코드를 cmd에서 실행하면 된다.
aws s3 cp . s3://de-on-youtube-raw-useast1-dev/youtube/raw_statistics_reference_data/ --recursive --exclude "*" --include "*.json"데이터 업로드가 끝나면 parquet 파일들이 잘 생성되었는지 확인해보자.
이제 데이터의 준비는 끝냈으니 쉽게 사용할 수 있도록 pipeline을 만들어보자. Glue로 다시 돌아가서 Job을 만들 것이다. 이번에는 "Visual with a source and target"을 선택하여 생성한다. 일단 화면이 뜨면 적당한 이름을 붙여주고 Visual에 존재하는 모든 노드들을 삭제한다.
이제 노드들을 생성해보자. 일단 +를 눌러서 2개의 AWS Glue Data Catalog를 만들고, cleaned DB의 두가지 테이블로 설정한다. 그런 다음 다시 +를 눌러서 join을 불러오고, Node parent로 두개의 AWS Glue Data Catalog를 설정하고 Condition을 추가해서 id와 category_id를 맞춰준다. 이렇게 두 개의 table을 Athena 없이 JOIN시킬 수 있는 JOB을 만든다.
이제 이 JOIN의 결과값을 저장할 S3버킷을 하나 생성해야 한다. 적당한 이름을 붙여 생성하고 그에 해당하는 노드를 생성해 JOIN과 연결해준다. 이때 주의할 것은 Source인 S3가 아니라 Target인 S3로 생성해야 하는 것이다. 위에서 AWS data catalog는 source가 맞지만, 여기에서의 s3은 target이기 때문에 구별해주어야 한다. 추가로 "Data Catalog update options"에서 "Create a table in the Data Catalog and on subsequent runs, update the schema and add new partitions"를 선택해준다. 이렇게되면 data catalog를 저장할 새로운 DB가 필요해지는데, 당연히 새로 하나 만들어주면 된다. Athena에서 "Create Database"를 이용하면 간단히 만들 수 있다. 생성하고, 노드에서 선택해주고 나서 table 이름은 적당히 지어주면 된다. partition으로는 region과 category_id를 선택해준다.
Job details에서 IAM role을 선택해주고 나면 Job 생성 완료이다. Run을 눌러 실행시키고 성공했다는 메세지가 뜨면 S3 버킷에서 category_id와 region으로 잘 partitioned되었는지, region은 3개가 맞는지 등을 확인할 수 있다. 이제 SQL Query를 사용할 때 join을 사용하지 않고 쉽게 데이터에 접근할 수 있다.
데이터 시각화는 AWS QuickSight를 사용할 것이다. AWS console에서 검색한 후 접속해보면 요금 청구가 나와있는데, 우리는 사업체가 아니라 개인으로 프로젝트를 진행하는 것이기 때문에 Standard를 사용할 것이다. 이름과 이메일을 설정해주고 다음을 눌러 계정 생성을 완료하면 된다.
접속하고 나서 가장 먼저 할 일은 오른쪽 상단의 프로필을 눌러 "Quicksight 관리하기"를 들어가 보안 및 권한에서 S3에 대한 권한을 부여하는 것이다. S3를 추가하고, 프로젝트와 관련된 모든 버킷들에 대한 권한을 허용하면 된다.
권한 허용이 끝났다면 왼쪽의 데이터세트를 눌러 시각화를 시작해보자. 적당한 이름을 붙이고 연결확인을 하면 새로운 Athena 데이터 세트를 추가할 수 있다. 데이터 미리보기를 통해 데이터를 확인하고 우측 상단의 저장 및 게시를 하면 데이터셋에 athena 데이터셋이 추가된 것을 볼 수 있다.
다시 Quicksight 메인화면으로 돌아가 데이터셋>athena데이터셋>분석에 사용>생성 을 하면 dashboard를 만들 수 있다. 왼쪽에서 필드를 선택하고, 합계 / 개수 / 평균 등 다양한 설정을 조작할 수 있다. 필드의 아래부분에서 그래프의 형식도 선택할 수 있다. 필드 부분에서 필드를 선택하거나 상단의 박스 안으로 넣어 Group by처럼 사용할수도 있다.