AWS Glue Data quality check 을 위한 Glue ETL Job 개발하기(with.

김재민·2025년 3월 18일

aws-glue

목록 보기
1/1
post-thumbnail

Background


  • AWS Glue Data catalog 에서 관리중인 테이블 데이터 품질 검사 방법을 알아본다.
  • 데이터 품질 검사를 위한 여러 방법이 있겠지만, Glue data catalog 에서 관리되는 테이블과 호환성이 좋은 Glue data quality check 서비를 이용하여 data quality ruleset 을 관리하고 데이터 품질 검사하는 방법을 기록해본다.

Spec


Glue 를 이용한 데이터 품질검사 및 Slack 알람 연동 기본 구성도는 아래와 같이 작업 하였다.

작업 순서

  1. IAM Policy, Role 생성(for. Glue ETL Job, Glue DQC 등...)
  2. Glue data quality rules 생성
  3. 데이터 품질 검사를 위한 Glue ETL Job 개발(with. boto3)
  4. 데이터 품질 검사 결과를 Slack 으로 알람 연동(with. CloudTrail, EventBridge)

IAM 관리


IAM Policy 생성하기

  • Glue data quality ruleset 을 조회할 수 있도록 권한을 추가한다.
{
	"Version": "2012-10-17",
	"Statement": [
    {
      "Sid": "AllowGlueGetDataQualityRuleset",
      "Effect": "Allow",
      "Action": [
        "glue:GetDataQualityRuleset"
      ],
      "Resource": "arn:aws:glue:ap-northeast-2:{my_account}:dataQualityRuleset/*"
    },
    {
      "Sid": "AllowGlueRulesetEvaluationRunActions",
      "Effect": "Allow",
      "Action": [
        "glue:GetDataQualityRulesetEvaluationRun",
        "glue:PublishDataQuality"
      ],
      "Resource": "arn:aws:glue:ap-northeast-2:{my_account}:dataQualityRuleset/*"
    }
	]
}
  • Glue job 에서 data catalog 에 접근하여 품질검사 진행을 위해 추가한다.
{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "AllowCatalogPermissions",
			"Effect": "Allow",
			"Action": [
				"glue:GetPartitions",
				"glue:GetTable"
			],
			"Resource": [
				"arn:aws:glue:ap-northeast-2:{my_account}:database/*",
				"arn:aws:glue:ap-northeast-2:{my_account}:catalog",
				"arn:aws:glue:ap-northeast-2:{my_account}:table/*/*"
			]
		}
	]
}
  • Glue job 에서 EvaluateDataQuality class 사용 시 S3 aws-glue-ml-data-quality-assets-ap-northeast-2 버킷 접근을 위한 권한이다.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": "s3:GetObject",
            "Resource": "arn:aws:s3:::aws-glue-*/*"
        }
    ]
}
  • DQC 관련 Metric 을 CloudWatch 로 Put 하기 위한 권한이다.
{
    "Version": "2012-10-17",
    "Statement":
    [
        {
            "Sid": "AllowCloudWatchPutMetricDataToPublishTaskMetrics",
            "Effect": "Allow",
            "Action": "cloudwatch:PutMetricData",
            "Resource": "*",
            "Condition":
            {
                "StringEquals":
                {
                    "cloudwatch:namespace": "Glue Data Quality"
                }
            }
        }
    ]
}
  • DQC 결과를 S3 에 Write 하기 위한 권한이다.
{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "AllowS3PutObjectToWriteTaskResults",
			"Effect": "Allow",
			"Action": [
				"s3:PutObject",
				"s3:PutObjectRetention",
				"s3:PutObjectVersionAcl",
				"s3:PutObjectVersionTagging",
				"s3:PutObjectTagging",
				"s3:PutObjectLegalHold",
				"s3:PutObjectAcl"
			],
			"Resource": "arn:aws:s3:::{my_bucket}/*"
		}
	]
}
  • CloudWatch 로그를 생성하기 위한 권한이다.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:CreateLogGroup",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        }
    ]
}

IAM Role 생성하기

  • 앞서 생성한 각 Policy 들을 권한 추가 해둔 IAM Role 을 생성한다.
  • 이후 Glue job 설정 시 해당 IAM Role 을 사용하도록 하면 된다.

Ruleset 관리


Glue table - Data Quality Ruleset 추가하기

  • Glue table 콘솔 화면에서Data Quality 탭을 누른 후 Create data quality rules 를 선택한다.

# 데이터 품질 규칙 예제
# my_column_1 컬럼은 PK 와 같이 모두 유니크하고 NULL 이 없어야하며, 시작 글자가 A 또는 B 여야 한다.
# my_column_2 컬럼은 NULL 없이 모두 존재해야한다.
# 테이블의 행이 1개 이상이여야 한다.
Rules = [
    IsPrimaryKey "my_column_1",
    ColumnValues "my_column_1" matches "^(A|B)",
    IsComplete "my_column_2",
	RowCount > 0
]

Glue ETL Job 코드


아래 코드와 같이 개발하면 Glue data catalog 의 Database, Table, Glue data quality ruleset 정보를 파라미터로 받는 구조로, 하나의 스크립트로 여러 테이블의 데이터 품질 검사를 진행할 수 있다.

import sys
from awsglue.transforms import *
# from awsglue.transforms import SelectFromCollection
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsgluedq.transforms import EvaluateDataQuality


def resolve_args(args_list):
    """
    Glue Job parameters 핸들링 함수
    """
    available_args_list = []
    for item in args_list:
        try:
            args = getResolvedOptions(
                sys.argv, [f'{item}']
            )
            available_args_list.append(item)
        except Exception as e:
            print(f"WARNING: Missing argument, {e}")
    return available_args_list
    
def get_ruleset_content(ruleset_name, region='ap-northeast-2'):
    """
    Glue Data Catalog에 등록된 데이터 품질 룰셋을 조회하는 함수.
    refs: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/client/get_data_quality_ruleset.html
    """
    import boto3
    glue_client = boto3.client('glue', region_name=region)
    response = glue_client.get_data_quality_ruleset(Name=ruleset_name)
    ruleset_content = response.get("Ruleset")
    
    if not ruleset_content:
        raise Exception(f"Ruleset content not found for ruleset: {ruleset_name}")
    
    return ruleset_content

    
## 파라미터 세팅
args_list = ['JOB_NAME', 'database', 'table', 'ruleset_name', 'push_down_predicate']
available_args_list = resolve_args(args_list)
args = getResolvedOptions(
    sys.argv, available_args_list
)

database = args['database'] # required
table = args['table'] # required
ruleset_name = args['ruleset_name'] # required
push_down_predicate = args.get('push_down_predicate', '') # optional


## Spark Context 생성
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

## Spark 세션 타임존 설정
spark.conf.set("spark.sql.session.timeZone", "Asia/Seoul")

## Logger 생성
mark = '@'*5
logger = glueContext.get_logger()
logger.info(mark + f" START!!! {args['JOB_NAME']} glue job " + mark)


## 카탈로그 Dynamic Frame 생성
df = glueContext.create_dynamic_frame.from_catalog(
    database=database,
    table_name=table,
    push_down_predicate=push_down_predicate,
    transformation_ctx="dqc_target_table_ctx"
)

## DQC 룰셋 조회
dqc_ruleset = get_ruleset_content(ruleset_name)
logger.info(mark + f"\nData Quality Ruleset:\n{dqc_ruleset}\n" + mark)

## DQC 실행(return: DynamicFrameCollection)
dqc_results = EvaluateDataQuality().process_rows(
    frame=df,
    ruleset=dqc_ruleset,
    publishing_options={
        "dataQualityEvaluationContext": "dqc_target_table_ctx",
        "enableDataQualityResultsPublishing": True,
    },
    additional_options={"performanceTuning.caching": "CACHE_NOTHING"}
)

## 평가 결과에서 ruleOutcomes 추출
rule_outcomes = SelectFromCollection.apply(
    dfc=dqc_results,
    key="ruleOutcomes",
    transformation_ctx="rule_outcomes_ctx"
)
logger.info(mark + f"\nRule Outcomes:\n{rule_outcomes.toDF().show()}\n" + mark)

job.commit()

Slack 알람 전송


  1. AWS CloudTrail 에는 AWS Account 에 해당하는 모든 Log 가 수집된다고 한다.
  2. 그리고 EventBridge 서비스 deafult 이벤트 버스에는 모든 Log 가 흐른다고 한다.
  3. default 이벤트 버스 중 eventName 값이 PublishDataQualityResult 인 데이터만 Lambda 로 전송하여 동작 되도록 트리깅 한 후
  4. Lambda 에서 Slack webhook 을 통해 알람을 보내주는 형태로 개발할 수 있다.

EventBridge 규칙 생성하기

  • 아래 이미지와 같이 이벤트 규칙을 생성한다.

  • 아래 예제는 eventName 값이 PublishDataQualityResult 인 것중 jobName 이 prod 로 시작하면서 DQC 결과 점수(score) 값이 1 미만일 때만 트리깅 되도록 규칙을 설정한 것이다. 즉, 품질 검사 결과가 모두 만족하지 못했을 때에만 트리깅 하는 것이다.
{
  "source": ["aws.glue"],
  "detail-type": ["AWS API Call via CloudTrail"],
  "detail": {
    "eventSource": ["glue.amazonaws.com"],
    "eventName": ["PublishDataQualityResult"],
    "requestParameters": {
      "jobName": [{
        "prefix": "prod"
      }],
      "score": [{
        "numeric": ["<", 1]
      }]
    }
  }
}

Lambda python 코드 개발하기

  • 아래 예제 코드와 같이 개발하면 lambda 로 전달 받은 event 데이터를 파싱하여 slack 으로 전달할 수 있다.
  • slack 관련 채널 정보와 webhook url 정보는 lambda 환경 변수에 등록해 두었다.
import boto3
import json
import logging
import os

from base64 import b64decode
from datetime import datetime, timedelta
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
   
# Get values from Environments variables
SLACK_CHANNEL = os.environ['SLACK_CHANNEL']
HOOK_URL = os.environ['HOOK_URL']
   
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def send_message(message):
    
    req = Request(HOOK_URL, data = json.dumps(message).encode('utf-8'))

    try:
        response = urlopen(req)
        response.read()
        logger.info("Message posted to %s", SLACK_CHANNEL)
    except HTTPError as e:
        logger.error("Request failed: %d %s", e.code, e.reason)
    except URLError as e:
        logger.error("Server connection failed: %s", e.reason)

   
def lambda_handler(event, context):
    
    # Event detail
    detail = event['detail']
    
    # Event - CloudTrail Log URL
    event_id = detail['eventID']
    event_log_url = f'https://ap-northeast-2.console.aws.amazon.com/cloudtrailv2/home?region=ap-northeast-2#/events/{event_id}'
    
    # Event - Createtime
    event_time_utc_str = detail['eventTime']
    event_time_utc = datetime.strptime(event_time_utc_str, "%Y-%m-%dT%H:%M:%SZ")
    event_time_kst = event_time_utc + timedelta(hours=9)
    event_time_kst_str = event_time_kst.strftime("%Y-%m-%d %H:%M:%S")
    
    # Event - Name
    event_name = detail['eventName']
    
    # Event - Glue Job Properties
    job_name = detail['requestParameters']['jobName']                                           # Glue Job 이름
    dqc_rulesetname = detail['requestParameters']['rulesetName']                                # 데이터품질검사 Ruleset 이름
    dqc_score = detail['requestParameters']['score']                                            # 데이터품질검사 점수
    dqc_rule_results = detail['requestParameters']['ruleResults']                               # 데이터품질검사 결과 리스트
    dqc_fail_count = sum([ 1 for result in dqc_rule_results if result['result'] == 'FAIL' ])    # 데이터품질검사 결과 중 실패한 규칙 개수
    

    slack_message = {
        "channel": SLACK_CHANNEL,
        "attachments": [{
            "blocks": [
                {
                    "type": "section",
                    "fields": [
                        {
                            "type": "mrkdwn",
                            "text": '*🏷️ Job Name :*\n' + job_name
                        },
                        {
                            "type": "mrkdwn",
                            "text": '*DQC Rule Name :*\n' + dqc_rulesetname
                        },
                        {
                            "type": "mrkdwn",
                            "text": '*DQC Score :*\n' + str(dqc_score)
                        },
                        {
                            "type": "mrkdwn",
                            "text": '*DQC Fail Count :*\n' + str(dqc_fail_count)
                        },
                        {
                            "type": "mrkdwn",
                            "text": '*DQC Time :*\n' + str(event_time_kst_str) + '(KST)'
                        }
                    ]
                },
                {
                    "type": "section",
                    "fields": [
                        {
                            "type": "mrkdwn",
                            "text": f'*🏷️ Event Log :*\n{event_log_url}'
                        }
                    ]
                }
            ]
        }],
        "blocks": [
            {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": '*🚫 Glue Job - Data Quality Check Failed Alert*'
            }
            },
            {
            "type": "divider"
            }
        ]
    }
    
    send_message(slack_message)
  • Slack 알람 전송 예시


References


profile
안녕하세요. 데이터 엔지니어 김재민 입니다.

0개의 댓글