AWS DMS(Database Migration Service)는 관계형 데이터베이스, 데이터 웨어하우스, NoSQL 데이터베이스 및 기타 유형의 데이터 스토어를 AWS 클라우드로 안전하게 마이그레이션할 수 있는 관리형 서비스입니다. AWS DMS는 데이터베이스 마이그레이션을 위한 간편하면서도 강력한 솔루션을 제공하며, 마이그레이션 과정에서 서비스 중단 시간을 최소화할 수 있습니다.
AWS DMS의 주요 특징:
AWS DMS는 초기 데이터 로드뿐만 아니라 변경 데이터 캡처(CDC)를 통해 소스 데이터베이스에서 발생하는 지속적인 변경사항을 대상 데이터베이스에 적용할 수 있어, 실시간에 가까운 데이터 동기화가 가능합니다.
AWS DMS는 다음과 같은 다양한 데이터베이스를 지원합니다:
소스 데이터베이스:
대상 데이터베이스:
AWS DMS는 소스 데이터베이스의 가용성에 거의 영향을 주지 않으면서 데이터를 안전하게 마이그레이션합니다. 초기 로드 중에도 소스 데이터베이스의 변경 사항을 캡처하여 대상 데이터베이스에 적용함으로써 다운타임을 최소화합니다.
CDC(Change Data Capture)를 통해 소스 데이터베이스에서 발생하는 지속적인 변경사항을 캡처하여 대상에 적용할 수 있습니다. 이는 다음과 같은 용도로 활용될 수 있습니다:
AWS Schema Conversion Tool(SCT)과 함께 사용하면 소스 데이터베이스 스키마를 대상 데이터베이스의 호환 가능한 형식으로 자동 변환할 수 있습니다.
Delta Lake는 데이터브릭스에서 개발한 오픈 소스 스토리지 레이어로, 데이터 레이크에 신뢰성을 제공합니다. 기존 데이터 레이크 위에서 작동하며 Apache Spark API와 완벽하게 호환됩니다.
Delta Lake의 주요 특징:
AWS DMS와 데이터브릭스를 연동하면 트랜잭션 데이터를 지속적으로 데이터 레이크로 이동시키고, 이를 분석 및 AI에 활용할 수 있는 강력한 파이프라인을 구축할 수 있습니다. 이 연동 과정은 다음과 같은 아키텍처로 구성됩니다:
복제 인스턴스 생성
엔드포인트 생성
복제 작업 구성
트랜잭션 로그 활성화
Copycall mysql.rds_set_configuration('binlog retention hours', 24);
S3 출력 형식 구성
S3 마운트 구성
Copydbutils.fs.mount(
source = "s3a://<your-bucket>",
mount_point = "/mnt/your-mount-name",
extra_configs = {"fs.s3a.access.key": access_key, "fs.s3a.secret.key": secret_key}
)
초기 데이터 로드
Copy# CSV 파일을 읽어 Delta 테이블로 저장
df = spark.read.option("header", True).option("inferSchema", True).csv("/mnt/your-mount/initial-load/")
df.write.format("delta").save("/delta/your-table")
# Delta 테이블 생성
spark.sql("CREATE TABLE your_table USING DELTA LOCATION '/delta/your-table/'")
변경 데이터 처리 (CDC)
다음은 AWS DMS에서 S3로 내보낸 CDC 데이터를 데이터브릭스의 Delta Lake에 적용하는 코드 예제입니다:
Copyimport dlt
from pyspark.sql.functions import *
# CDC 소스 데이터 정의
@dlt.table
def customer_cdc_raw():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/mnt/s3-bucket/customer/")
.withColumn("_ingest_file_name", input_file_name())
)
# CDC 변경 데이터 적용
@dlt.table
def customers():
return (
dlt.apply_changes(
target = "customers",
source = "customer_cdc_raw",
keys = ["customer_id"],
sequence_by = "dms_timestamp",
ignore_null_updates = True,
apply_as_deletes = expr("op = 'D'"),
except_column_list = ["op", "dms_timestamp"]
)
)
CopyCREATE STREAMING LIVE TABLE customer_cdc_raw
COMMENT "Raw customer change data from AWS DMS"
AS SELECT *, input_file_name() AS _ingest_file_name
FROM cloud_files('/mnt/s3-bucket/customer/', 'csv', map('header', 'true'));
CREATE STREAMING LIVE TABLE customers
COMMENT "Customers table with change data applied"
AS
APPLY CHANGES INTO LIVE.customers
FROM STREAM(LIVE.customer_cdc_raw)
KEYS (customer_id)
APPLY AS DELETE WHEN op = 'D'
SEQUENCE BY dms_timestamp
COLUMNS * EXCEPT (op, dms_timestamp);
Copy# CDC 변경 데이터 읽기
changes_df = spark.read.csv("dbfs:/mnt/s3-bucket/customer",
inferSchema=True,
header=True)
changes_df.createOrReplaceTempView("customer_changes")
# MERGE 쿼리 실행
spark.sql("""
MERGE INTO customers target
USING (
SELECT
op,
latest_changes.customer_id,
name,
email,
address,
create_date,
last_update
FROM customer_changes latest_changes
INNER JOIN (
SELECT
customer_id,
max(last_update) AS max_date
FROM customer_changes
GROUP BY customer_id
) cm
ON latest_changes.customer_id = cm.customer_id
AND latest_changes.last_update = cm.max_date
) as source
ON source.customer_id = target.customer_id
WHEN MATCHED AND source.op = 'D' THEN DELETE
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
AWS DMS와 데이터브릭스 Delta Lake를 연동하면 실시간에 가까운 데이터 파이프라인을 구축할 수 있습니다. 이를 통해 다음과 같은 다양한 이점을 얻을 수 있습니다:
AWS DMS와 데이터브릭스 Delta Lake를 활용하면 데이터 엔지니어링 팀은 복잡한 데이터 파이프라인을 더 쉽게 구축하고 유지보수할 수 있으며, 데이터 과학자와 분석가는 더 신선하고 일관된 데이터에 접근할 수 있게 됩니다.
이제 트랜잭션 데이터베이스의 변경 사항을 실시간으로 추적하고, 이를 데이터 레이크하우스에 반영하여 최신 데이터에 기반한 고급 분석과 AI 모델을 구축할 수 있습니다.
참고 자료: