열명 변경 처리를 하기 위해 AWS glue 서비스를 활용하여 spark와 notebook을 사용하는 방법에 대해 정리합니다.
# report db와 연결하기 위한 연결 정보
connection_mysql8_options_source_mapping = {
"url": "jdbc:mysql://data.ap-northeast-2.rds.amazonaws.com:3306/mapping",
"dbtable": "tb_col",
"user": "hello",
"password": "1234",
"customJdbcDriverS3Path": "s3://s3-data/service=market/mysql-connector-java-8.0.26.jar",
"customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"}
glueContext=GlueContext(sc)
df_mapping = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql8_options_source_mapping)
S3_IN = "s3://s3주소/파일명.csv"
start = datetime.datetime.now()
csv_df = (
spark.read.format("org.apache.spark.csv")
.option("header", True)
.option("quote", '"')
.option("escape", '"')
.option("inferSchema", True)
.option("ignoreLeadingWhiteSpace", True)
.option("ignoreTrailingWhiteSpace", True)
.csv(S3_IN, multiLine=False)
)
end = datetime.datetime.now()
print("##########total_time : " + str(end-start))
# s3에 업로드 하기 위해 spark dataframe을 glue dynamicframe으로 변환
dynamic_csv_df = DynamicFrame.fromDF(csv_df, glueContext, "dynamic_csv_df")
S3_location = "s3://s3-data-dev/service=market"
datasink = glueContext.write_dynamic_frame_from_options(
frame=dynamic_csv_df,
connection_type="s3",
connection_options={
"path": S3_location,
"partitionKeys": ["Date_"]
},
format="glueparquet",
format_options = {"compression": "snappy"},
transformation_ctx ="datasink")
분석 방법관련 추가사항은 아래를 참고하실 수 있습니다.
저 연결 정보는 언제 어떻게 활용 되나요?