Cloudwatch
aws cli를 통해 설정해준다.
aws logs create-log-group --log-group-name=[로그 그룹 경로]
예시 > aws logs create-log-group --log-group-name=/emr-on-eks/emr-eks-dev-logs
S3
넘겨주는 변수가 없는 Spark Job
Spark script (경로 : s3://emr-on-eks-examples/spark-script/job-without-var.py)
- 임의의 spark dataframe을 만들어 'S3://emr-on-eks-examples/result/job-without-var/' 경로에 저장하는 스크립트이다.
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.sql import types as T
import argparse
spark = SparkSession.builder.appName("sample_script").getOrCreate()
def main():
df_spark = spark.createDataFrame([
Row(a=1, b=11.2, c='apple'),
Row(a=2, b=3.5, c='banana'),
Row(a=3, b=7.3, c='tomato'),
])
df_spark.write.mode('overwrite').parquet('s3://emr-on-eks-examples/result/job-without-var/')
if __name__ == "__main__":
main()
실행할 Json 파일 (sparkjob-without-var.json)
{
"name": "[Spark Job 이름]",
"virtualClusterId": "[VIRTUAL CLUSTER ID]",
"executionRoleArn": "[EMR ROLE ARN]",
"releaseLabel": "emr-6.2.0-latest",
"jobDriver": {
"sparkSubmitJobDriver": {
"entryPoint": "s3://emr-on-eks-examples/spark-script/job-without-var.py",
"sparkSubmitParameters": "--conf spark.executor.instances=1 --conf spark.executor.memory=2G --conf spark.executor.cores=1 --conf spark.driver.cores=1"
}
},
"configurationOverrides": {
"applicationConfiguration": [
{
"classification": "spark-defaults",
"properties": {
"spark.dynamicAllocation.enabled": "false",
"spark.kubernetes.executor.deleteOnTermination": "true"
}
}
],
"monitoringConfiguration": {
"cloudWatchMonitoringConfiguration": {
"logGroupName": "[로그 그룹 경로]",
"logStreamNamePrefix": "sample"
},
"s3MonitoringConfiguration": {
"logUri": "[로그 저장될 S3 버킷 경로]"
}
}
}
}
VIRTUAL CLUSTER ID -> $(aws emr-containers list-virtual-clusters --query "virtualClusters[?state=='RUNNING'].id" --output text)
EMR ROLE ARN -> $(aws iam get-role --role-name EMRContainers-JobExecutionRole --query Role.Arn --output text)
aws emr-containers start-job-run --cli-input-json file://[sparkjob-without-var.json 파일 경로]
넘겨주는 변수가 있는 Spark Job
Spark script (경로 : s3://emr-on-eks-examples/spark-script/job-with-var.py)
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.sql import types as T
import sys
spark = SparkSession.builder.appName("sample_script").getOrCreate()
def main(var1, var2, var3):
df_spark = spark.createDataFrame([
Row(a=1, b=11.2, c=var1),
Row(a=2, b=3.5, c=var2),
Row(a=3, b=7.3, c=var3),
])
df_spark.write.mode('overwrite').parquet('s3://emr-on-eks-examples/result/job-with-var/')
if __name__ == "__main__":
main(sys.argv[1], sys.argv[2], sys.argv[3])
실행할 Json 파일 (sparkjob-with-var.json)
{
"name": "[Spark Job 이름]",
"virtualClusterId": "[VIRTUAL CLUSTER ID]",
"executionRoleArn": "[EMR ROLE ARN]",
"releaseLabel": "emr-6.2.0-latest",
"jobDriver": {
"sparkSubmitJobDriver": {
"entryPoint": "s3://developer-personal-storage/hyunsoo/emr_on_eks/sample-4.py",
"entryPointArguments": ["apple", "banana", "tomato"],
"sparkSubmitParameters": "--conf spark.executor.instances=1 --conf spark.executor.memory=2G --conf spark.executor.cores=1 --conf spark.driver.cores=1"
}
},
"configurationOverrides": {
"applicationConfiguration": [
{
"classification": "spark-defaults",
"properties": {
"spark.dynamicAllocation.enabled": "false",
"spark.kubernetes.executor.deleteOnTermination": "true"
}
}
],
"monitoringConfiguration": {
"cloudWatchMonitoringConfiguration": {
"logGroupName": "[로그 그룹 경로]",
"logStreamNamePrefix": "sample"
},
"s3MonitoringConfiguration": {
"logUri": "[로그 저장될 S3 버킷 경로]"
}
}
}
}
수정 필요한 인자들 추가 설명
실행 명령
aws emr-containers start-job-run --cli-input-json file://[sparkjob-with-var.json 파일 경로]
명령을 통해 작업을 실행하게 되면 아래와 같이 EKS 클러스터에 Pod들이 차례대로 떴다가 종료되었다 하면서 작업이 수행되며 Virtual Cluster에서 작업 완료 여부 확인이 가능하다.
작업이 성공적으로 Completed 되었다면 스크립트에서 지정한 저장 경로에 dataframe이 저장되어 있는 것을 확인 할 수 있다.
Cloudwatch의 Log Group 및 S3 경로에도 해당 작업에 대한 로그가 잘 적재된 것을 확인할 수 있다.