Glue data quality check 서비를 이용하여 data quality ruleset 을 관리하고 데이터 품질 검사하는 방법을 기록해본다.Glue 를 이용한 데이터 품질검사 및 Slack 알람 연동 기본 구성도는 아래와 같이 작업 하였다.

Glue data quality rules 생성
- DQC 를 위한 최소 IAM 권한 문서 자료를 참고하여 아래와 같이 Policy 를 생성해주었다.
- 작업 상황에 따라 권한을 유연하게 추가해서 사용하면 된다.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowGlueGetDataQualityRuleset",
"Effect": "Allow",
"Action": [
"glue:GetDataQualityRuleset"
],
"Resource": "arn:aws:glue:ap-northeast-2:{my_account}:dataQualityRuleset/*"
},
{
"Sid": "AllowGlueRulesetEvaluationRunActions",
"Effect": "Allow",
"Action": [
"glue:GetDataQualityRulesetEvaluationRun",
"glue:PublishDataQuality"
],
"Resource": "arn:aws:glue:ap-northeast-2:{my_account}:dataQualityRuleset/*"
}
]
}
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowCatalogPermissions",
"Effect": "Allow",
"Action": [
"glue:GetPartitions",
"glue:GetTable"
],
"Resource": [
"arn:aws:glue:ap-northeast-2:{my_account}:database/*",
"arn:aws:glue:ap-northeast-2:{my_account}:catalog",
"arn:aws:glue:ap-northeast-2:{my_account}:table/*/*"
]
}
]
}
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::aws-glue-*/*"
}
]
}
{
"Version": "2012-10-17",
"Statement":
[
{
"Sid": "AllowCloudWatchPutMetricDataToPublishTaskMetrics",
"Effect": "Allow",
"Action": "cloudwatch:PutMetricData",
"Resource": "*",
"Condition":
{
"StringEquals":
{
"cloudwatch:namespace": "Glue Data Quality"
}
}
}
]
}
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowS3PutObjectToWriteTaskResults",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectRetention",
"s3:PutObjectVersionAcl",
"s3:PutObjectVersionTagging",
"s3:PutObjectTagging",
"s3:PutObjectLegalHold",
"s3:PutObjectAcl"
],
"Resource": "arn:aws:s3:::{my_bucket}/*"
}
]
}
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:CreateLogGroup",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}
Data Quality 탭을 누른 후 Create data quality rules 를 선택한다.

# 데이터 품질 규칙 예제
# my_column_1 컬럼은 PK 와 같이 모두 유니크하고 NULL 이 없어야하며, 시작 글자가 A 또는 B 여야 한다.
# my_column_2 컬럼은 NULL 없이 모두 존재해야한다.
# 테이블의 행이 1개 이상이여야 한다.
Rules = [
IsPrimaryKey "my_column_1",
ColumnValues "my_column_1" matches "^(A|B)",
IsComplete "my_column_2",
RowCount > 0
]
아래 코드와 같이 개발하면 Glue data catalog 의
Database,Table,Glue data quality ruleset정보를 파라미터로 받는 구조로, 하나의 스크립트로 여러 테이블의 데이터 품질 검사를 진행할 수 있다.
import sys
from awsglue.transforms import *
# from awsglue.transforms import SelectFromCollection
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsgluedq.transforms import EvaluateDataQuality
def resolve_args(args_list):
"""
Glue Job parameters 핸들링 함수
"""
available_args_list = []
for item in args_list:
try:
args = getResolvedOptions(
sys.argv, [f'{item}']
)
available_args_list.append(item)
except Exception as e:
print(f"WARNING: Missing argument, {e}")
return available_args_list
def get_ruleset_content(ruleset_name, region='ap-northeast-2'):
"""
Glue Data Catalog에 등록된 데이터 품질 룰셋을 조회하는 함수.
refs: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/client/get_data_quality_ruleset.html
"""
import boto3
glue_client = boto3.client('glue', region_name=region)
response = glue_client.get_data_quality_ruleset(Name=ruleset_name)
ruleset_content = response.get("Ruleset")
if not ruleset_content:
raise Exception(f"Ruleset content not found for ruleset: {ruleset_name}")
return ruleset_content
## 파라미터 세팅
args_list = ['JOB_NAME', 'database', 'table', 'ruleset_name', 'push_down_predicate']
available_args_list = resolve_args(args_list)
args = getResolvedOptions(
sys.argv, available_args_list
)
database = args['database'] # required
table = args['table'] # required
ruleset_name = args['ruleset_name'] # required
push_down_predicate = args.get('push_down_predicate', '') # optional
## Spark Context 생성
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## Spark 세션 타임존 설정
spark.conf.set("spark.sql.session.timeZone", "Asia/Seoul")
## Logger 생성
mark = '@'*5
logger = glueContext.get_logger()
logger.info(mark + f" START!!! {args['JOB_NAME']} glue job " + mark)
## 카탈로그 Dynamic Frame 생성
df = glueContext.create_dynamic_frame.from_catalog(
database=database,
table_name=table,
push_down_predicate=push_down_predicate,
transformation_ctx="dqc_target_table_ctx"
)
## DQC 룰셋 조회
dqc_ruleset = get_ruleset_content(ruleset_name)
logger.info(mark + f"\nData Quality Ruleset:\n{dqc_ruleset}\n" + mark)
## DQC 실행(return: DynamicFrameCollection)
dqc_results = EvaluateDataQuality().process_rows(
frame=df,
ruleset=dqc_ruleset,
publishing_options={
"dataQualityEvaluationContext": "dqc_target_table_ctx",
"enableDataQualityResultsPublishing": True,
},
additional_options={"performanceTuning.caching": "CACHE_NOTHING"}
)
## 평가 결과에서 ruleOutcomes 추출
rule_outcomes = SelectFromCollection.apply(
dfc=dqc_results,
key="ruleOutcomes",
transformation_ctx="rule_outcomes_ctx"
)
logger.info(mark + f"\nRule Outcomes:\n{rule_outcomes.toDF().show()}\n" + mark)
job.commit()
- AWS CloudTrail 에는 AWS Account 에 해당하는 모든 Log 가 수집된다고 한다.
- 그리고 EventBridge 서비스 deafult 이벤트 버스에는 모든 Log 가 흐른다고 한다.
- default 이벤트 버스 중 eventName 값이
PublishDataQualityResult인 데이터만 Lambda 로 전송하여 동작 되도록 트리깅 한 후- Lambda 에서 Slack webhook 을 통해 알람을 보내주는 형태로 개발할 수 있다.

PublishDataQualityResult 인 것중 jobName 이 prod 로 시작하면서 DQC 결과 점수(score) 값이 1 미만일 때만 트리깅 되도록 규칙을 설정한 것이다. 즉, 품질 검사 결과가 모두 만족하지 못했을 때에만 트리깅 하는 것이다.{
"source": ["aws.glue"],
"detail-type": ["AWS API Call via CloudTrail"],
"detail": {
"eventSource": ["glue.amazonaws.com"],
"eventName": ["PublishDataQualityResult"],
"requestParameters": {
"jobName": [{
"prefix": "prod"
}],
"score": [{
"numeric": ["<", 1]
}]
}
}
}
import boto3
import json
import logging
import os
from base64 import b64decode
from datetime import datetime, timedelta
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
# Get values from Environments variables
SLACK_CHANNEL = os.environ['SLACK_CHANNEL']
HOOK_URL = os.environ['HOOK_URL']
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def send_message(message):
req = Request(HOOK_URL, data = json.dumps(message).encode('utf-8'))
try:
response = urlopen(req)
response.read()
logger.info("Message posted to %s", SLACK_CHANNEL)
except HTTPError as e:
logger.error("Request failed: %d %s", e.code, e.reason)
except URLError as e:
logger.error("Server connection failed: %s", e.reason)
def lambda_handler(event, context):
# Event detail
detail = event['detail']
# Event - CloudTrail Log URL
event_id = detail['eventID']
event_log_url = f'https://ap-northeast-2.console.aws.amazon.com/cloudtrailv2/home?region=ap-northeast-2#/events/{event_id}'
# Event - Createtime
event_time_utc_str = detail['eventTime']
event_time_utc = datetime.strptime(event_time_utc_str, "%Y-%m-%dT%H:%M:%SZ")
event_time_kst = event_time_utc + timedelta(hours=9)
event_time_kst_str = event_time_kst.strftime("%Y-%m-%d %H:%M:%S")
# Event - Name
event_name = detail['eventName']
# Event - Glue Job Properties
job_name = detail['requestParameters']['jobName'] # Glue Job 이름
dqc_rulesetname = detail['requestParameters']['rulesetName'] # 데이터품질검사 Ruleset 이름
dqc_score = detail['requestParameters']['score'] # 데이터품질검사 점수
dqc_rule_results = detail['requestParameters']['ruleResults'] # 데이터품질검사 결과 리스트
dqc_fail_count = sum([ 1 for result in dqc_rule_results if result['result'] == 'FAIL' ]) # 데이터품질검사 결과 중 실패한 규칙 개수
slack_message = {
"channel": SLACK_CHANNEL,
"attachments": [{
"blocks": [
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": '*🏷️ Job Name :*\n' + job_name
},
{
"type": "mrkdwn",
"text": '*DQC Rule Name :*\n' + dqc_rulesetname
},
{
"type": "mrkdwn",
"text": '*DQC Score :*\n' + str(dqc_score)
},
{
"type": "mrkdwn",
"text": '*DQC Fail Count :*\n' + str(dqc_fail_count)
},
{
"type": "mrkdwn",
"text": '*DQC Time :*\n' + str(event_time_kst_str) + '(KST)'
}
]
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": f'*🏷️ Event Log :*\n{event_log_url}'
}
]
}
]
}],
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": '*🚫 Glue Job - Data Quality Check Failed Alert*'
}
},
{
"type": "divider"
}
]
}
send_message(slack_message)
