AWS Cloudtrail 서비스는 AWS 계정을 통해 접속한 User들의 활동 기록들을 남겨주는 서비스이다. AWS 공식 문서에는 아래와 같이 나온다.
AWS CloudTrail은 AWS 계정의 운영 및 위험 감사, 거버넌스 및 규정 준수를 활성화하는 데 도움이 되는 AWS 서비스입니다. 사용자, 역할 또는 AWS 서비스가 수행하는 작업들은 CloudTrail에 이벤트로 기록됩니다. 이벤트에는 AWS Management Console, AWS Command Line Interface 및 AWS SDK, API에서 수행되는 작업들이 포함됩니다.
단지 이용자들이 수행한 활동만이 아닌, AWS 서비스들끼리 통신하는 기록, API나 SDK를 통한 기록들까지 모두 저장이 되고 Log를 탐색해보면 알 수 있겠지만 정말 세세한 부분 하나하나까지 다 기록으로 저장이 된다. 이를 통해 AWS 리소스 이용에 대한 세세한 로그 기록들을 통하여 특정 서비스별 사용 인원, 리소스 사용 비율, 시간대별 사용량 등 다양한 방향으로 분석을 진행할 수 있으며 이를 통해 AWS 클라우드 관리나 이용, 보안 등에 관한 유의미한 데이터들을 추출해낼 수 있다. 아래는 Cloudtrail Data Usage에 대한 공식 문서의 설명 중 일부이다.
AWS 계정 활동에 대한 가시성은 보안 및 운영 모범 사례에서 중요한 측면입니다. CloudTrail을 사용하여 AWS 인프라 전반에서 계정 활동을 확인, 응답할 수 있습니다. 누가 또는 무엇이 어떤 작업을 수행했는지, 어떤 리소스에 대해 조치가 취해졌는지, 언제 이벤트가 발생했는지, AWS 계정에서 활동 분석 및 응답에 도움이 되는 기타 세부 정보를 식별할 수 있습니다.
현재 회사에서는 AWS Cloud 서비스들을 활용하여 데이터 분석을 진행하고 있으며 하나의 계정 아래 여러명의 IAM User가 Group별로 나뉘어 등록되어 있는 구조이다. 일과중에는 약 15명 남짓의 직원이 동시에 접속하여 사용하고 있으며 타 그룹 유저들까지 포함한다면 20명 이상으로 올라갈 것이다. 이러한 AWS 계정에 대한 Cloudtrail Log 분석을 진행하며 느꼈던 점들과 분석에 대한 포인트들을 공유해보고자 한다.
Athena를 통한 Query
Spark를 통한 데이터 가공
Cloudtrail > Event History > Create Athena Table 항목에서 아래와 같은 Create Table 쿼리 포맷을 얻을 수 있다. 해당 쿼리에서 [TABLE_NAME] 입력, [S3_BUCKET_NAME], [S3_BUCKET_URL] 부분만 로그 쌓이는 S3 버킷 정보에 맞게 변경해준 후 Athena 쿼리 창에 명령을 실행하면 로그 테이블이 생성된다.
CREATE EXTERNAL TABLE [TABLE_NAME] (
eventVersion STRING,
userIdentity STRUCT<
type: STRING,
principalId: STRING,
arn: STRING,
accountId: STRING,
invokedBy: STRING,
accessKeyId: STRING,
userName: STRING,
sessionContext: STRUCT<
attributes: STRUCT<
mfaAuthenticated: STRING,
creationDate: STRING>,
sessionIssuer: STRUCT<
type: STRING,
principalId: STRING,
arn: STRING,
accountId: STRING,
username: STRING>,
ec2RoleDelivery: STRING,
webIdFederationData: MAP<STRING,STRING>>>,
eventTime STRING,
eventSource STRING,
eventName STRING,
awsRegion STRING,
sourceIpAddress STRING,
userAgent STRING,
errorCode STRING,
errorMessage STRING,
requestParameters STRING,
responseElements STRING,
additionalEventData STRING,
requestId STRING,
eventId STRING,
resources ARRAY<STRUCT<
arn: STRING,
accountId: STRING,
type: STRING>>,
eventType STRING,
apiVersion STRING,
readOnly STRING,
recipientAccountId STRING,
serviceEventDetails STRING,
sharedEventID STRING,
vpcEndpointId STRING,
tlsDetails STRUCT<
tlsVersion: STRING,
cipherSuite: STRING,
clientProvidedHostHeader: STRING>
)
COMMENT 'CloudTrail table for [S3_BUCKET_NAME] bucket'
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '[S3_BUCKET_URL]'
TBLPROPERTIES ('classification'='cloudtrail');
분석 필요한 Data를 Schema와 함께 Read.
_path_list = ["작업일_기준_2일전_cloudtrail_s3_path", "작업일_기준_1일전_cloudtrail_s3_path"]
cloudTrailSchema = T.StructType() \
.add("Records", T.ArrayType(T.StructType() \
.add("additionalEventData", T.StringType()) \
.add("apiVersion", T.StringType()) \
.add("awsRegion", T.StringType()) \
.add("errorCode", T.StringType()) \
.add("errorMessage", T.StringType()) \
.add("eventID", T.StringType()) \
.add("eventName", T.StringType()) \
.add("eventSource", T.StringType()) \
.add("eventTime", T.StringType()) \
.add("eventType", T.StringType()) \
.add("eventVersion", T.StringType()) \
.add("readOnly", T.BooleanType()) \
.add("recipientAccountId", T.StringType()) \
.add("requestID", T.StringType()) \
.add("requestParameters", T.MapType(T.StringType(), T.StringType())) \
.add("resources", T.ArrayType(T.StructType() \
.add("ARN", T.StringType()) \
.add("accountId", T.StringType()) \
.add("type", T.StringType()) \
)) \
.add("responseElements", T.MapType(T.StringType(), T.StringType())) \
.add("sharedEventID", T.StringType()) \
.add("sourceIPAddress", T.StringType()) \
.add("serviceEventDetails", T.MapType(T.StringType(), T.StringType())) \
.add("userAgent", T.StringType()) \
.add("userIdentity", T.StructType() \
.add("accessKeyId", T.StringType()) \
.add("accountId", T.StringType()) \
.add("arn", T.StringType()) \
.add("invokedBy", T.StringType()) \
.add("principalId", T.StringType()) \
.add("sessionContext", T.StructType() \
.add("attributes", T.StructType() \
.add("creationDate", T.StringType()) \
.add("mfaAuthenticated", T.StringType()) \
) \
.add("sessionIssuer", T.StructType() \
.add("accountId", T.StringType()) \
.add("arn", T.StringType()) \
.add("principalId", T.StringType()) \
.add("type", T.StringType()) \
.add("userName", T.StringType()) \
)
) \
.add("type", T.StringType()) \
.add("userName", T.StringType()) \
.add("webIdFederationData", T.StructType() \
.add("federatedProvider", T.StringType()) \
.add("attributes", T.MapType(T.StringType(), T.StringType())) \
)
) \
.add("vpcEndpointId", T.StringType())))
# Schema와 함께 이틀치 로그 path Read
sdf = \
spark \
.read \
.schema(cloudTrailSchema) \
.json(_path_list)
최상위 컬럼 Flatten 작업 및 정규화 컬럼 이름 추출
# Array, Struct 형으로 들어가 있는 data들을 depth 탐색하여 정규화될 수 있도록 column을 생성해주는 함수
def column_explosion(df):
exploded_columns = []
for col_name, col_type in df.dtypes:
if col_type.startswith('array'):
exploded_columns += [f"{col_name}.{i}" for i in df.select(F.explode(col_name).alias(col_name.lower())).select(f"{col_name.lower()}.*").columns]
elif col_type.startswith('struct'):
exploded_columns += [f"{col_name}.{i}" for i in column_explosion(df.select(f"{col_name}.*"))]
else:
exploded_columns.append(col_name)
return exploded_columns
# 최상위 컬럼 펼치기 작업(explode)
table = \
table\
.select(F.explode("Records").alias("record")).select("record.*")
# Nested 된 data 내의 컬럼들 포함하여 가장 안쪽에 있는 컬럼들 이름 추출
exploded_columns = column_explosion(table)
# Nested data의 depth에 따른 구분자를 .에서 _으로 변경 및 컬럼명 소문자 처리
column_name = [i.replace('.','_').lower() for i in exploded_columns]
# 최대한 정규화 할 수 있도록 추출한 column들을 기준으로 1차 explode된 테이블의 nested 데이터 내용 추출 및 컬럼명 일괄 변경
cloudtrail_table = \
table \
.select(exploded_columns) \
.toDF(*column_name)
KST 컬럼 및 기준일자(proc_ymd) 컬럼 추가 후 최종 테이블 생성
# 정규화된 테이블, 작업일자를 변수로 넘겨준다.
def cloudtrail_log_processing(sdf_cloudtrail, yyyymmdd: str):
_sdf_cloudtrail = sdf_cloudtrail
# 필요한 컬럼들 생성
# eventtime_kst : utc를 한국시간으로 변경
# proc_ymd : 기준일자 컬럼 생성
_sdf_cloudtrail = \
_sdf_cloudtrail \
.withColumn('eventtime_kst', _sdf_cloudtrail.eventtime + F.expr('INTERVAL 9 HOURS')) \
.withColumn('proc_ymd', F.date_format('eventtime_kst', 'yyyyMMdd'))
# eventtime_kst이 yyyymmdd에 해당하는 날짜에 발생된 데이터만 필터링
_sdf_cloudtrail = \
_sdf_cloudtrail \
.filter(_sdf_cloudtrail.eventtime_kst.contains(f'{yyyymmdd[0:4]}-{yyyymmdd[4:6]}-{yyyymmdd[6:8]}'))
# 컬럼 순서 선언
_sdf_cloudtrail_columns = ['proc_ymd', 'additionaleventdata', 'apiversion', 'awsregion', 'errorcode', 'errormessage'
, 'eventid', 'eventname', 'eventsource', 'eventtime', 'eventtime_kst', 'eventtype', 'eventversion'
, 'readonly', 'recipientaccountid', 'requestid', 'requestparameters', 'resources_arn', 'resources_accountid'
, 'resources_type', 'responseelements', 'sharedeventid', 'sourceipaddress', 'serviceeventdetails'
, 'useragent', 'useridentity_accesskeyid', 'useridentity_accountid', 'useridentity_arn'
, 'useridentity_invokedby', 'useridentity_principalid', 'useridentity_sessioncontext_attributes_creationdate'
, 'useridentity_sessioncontext_attributes_mfaauthenticated', 'useridentity_sessioncontext_sessionissuer_accountid'
, 'useridentity_sessioncontext_sessionissuer_arn', 'useridentity_sessioncontext_sessionissuer_principalid'
, 'useridentity_sessioncontext_sessionissuer_type', 'useridentity_sessioncontext_sessionissuer_username'
, 'useridentity_type', 'useridentity_username', 'useridentity_webidfederationdata_federatedprovider'
, 'useridentity_webidfederationdata_attributes', 'vpcendpointid']
_sdf_cloudtrail = \
_sdf_cloudtrail \
.select(_sdf_cloudtrail_columns)
return _sdf_cloudtrail
final_cloudtrail_log_table = cloudtrail_log_processing(cloudtrail_table, '작업일자')
Data 저장 및 마무리
Cloudtrail 원천 데이터와 이를 정제하는 Spark 코드는 최종적으로 Airflow Dag의 Work Process에 맞게 스케쥴링되어 작업되고 매일매일 지속적인 정제작업이 일어나 특정 S3 디렉토리 내에 파티셔닝되어 저장되게 된다.
이렇게 정제되어 쌓이는 매일매일의 데이터들을 목적에 맞게 시각화 될 수 있도록 Athena를 통해 Custom Query를 짜고 QuickSight를 통해 해당 Custom Query 데이터를 바라보고 시각화한다. 아래는 최종적으로 시각화 된 화면들 중 일부이다.
Athena 및 Quicksight 월간 이용자 수 및 Event 횟수
AWS 서비스별 일일 이용자 수 및 일평균 Event