
데이터 웨어하우스(Data Warehouse)는 운영 시스템에 분산된 데이터를 분석 목적에 최적화된 형태로 통합·정제·보관하는 저장소이다.
Bill Inmon은 데이터 웨어하우스를 다음 네 가지 특성으로 정의했다.
| 특성 | 의미 | OLTP와의 차이 |
|---|---|---|
| 주제 지향(Subject-Oriented) | 거래 단위가 아니라 고객, 상품, 이용 같은 분석 주제 중심으로 구성 | OLTP는 트랜잭션 중심 |
| 통합(Integrated) | 여러 시스템의 코드·단위·정의를 표준화 | 시스템별 자체 코드 사용 |
| 시간 가변(Time-Variant) | 과거 시점 데이터와 이력을 유지 | OLTP는 현재값 중심 |
| 비휘발성(Non-Volatile) | 적재 후 수정·삭제보다 추가 중심 | OLTP는 UPDATE/DELETE 빈번 |
OLTP가 “현재 거래를 빠르고 정확하게 처리”하는 시스템이라면,
DW는 “장기간 데이터를 누적해 패턴과 추세를 분석”하는 시스템이다.
따라서:
모두 OLTP와 다르게 접근해야 한다.
| 관점 | OLTP | OLAP / DW |
|---|---|---|
| 주 사용자 | 거래 시스템(POS, 예약, 결제) | 분석가, BI, 경영진 |
| 쿼리 특성 | 단건 조회·갱신 | 대규모 집계·스캔 |
| 인덱스 전략 | B-Tree 중심 | Columnstore 중심 |
| 스키마 | 정규화(3NF) | 스타/스노우플레이크 |
| 데이터 신선도 | 초·밀리초 | 분·시간·일 단위 |
| 동시성 | 짧은 트랜잭션 다수 | 긴 분석 쿼리 소수 |
| 저장 방식 | Row Store 중심 | Column Store 권장 |
차원 모델링은 Ralph Kimball이 정립한 DW 설계 방식이다.
핵심 목표는:
“사용자가 어떤 측정값을 어떤 관점에서 보고 싶어하는가?”
를 직관적으로 표현하는 것이다.
| 테이블 종류 | 역할 | 따릉이 예시 |
|---|---|---|
| 팩트(Fact) | 측정값 저장. 행 수가 매우 많음 | FactRental |
| 디멘션(Dimension) | 분석 관점 제공 | DimStation, DimDate |
| 브릿지/팩트리스 팩트 | 다대다 관계 표현 | 본 과정 미사용 |
등 숫자형 측정값 중심.
같은 분석 관점을 제공한다.
팩트를 중심에 두고 디멘션이 한 단계로 연결되는 구조.
DimDate
|
DimUserType - FactRental - DimStation(대여)
|
DimStation(반납)
|
DimTime
특징:
디멘션 내부를 다시 정규화한 구조.
예:
DimStation
DimDistrict
장점:
단점:
실무에서는:
스타 스키마를 기본으로 하고,
디멘션 규모가 매우 클 때만 부분 스노우플레이크를 적용한다.
디멘션 데이터는 시간이 지나며 변경된다.
예:
이런 변경 이력을 어떻게 관리할지 정의하는 것이 SCD 전략이다.
| 타입 | 동작 | 특징 | 본 과정 사용 |
|---|---|---|---|
| Type 0 | 변경 금지 | 단순 | DimDate |
| Type 1 | 현재값 덮어쓰기 | 이력 없음 | 코드 정정 |
| Type 2 | 행 추가 + 이력 유지 | 가장 중요 | DimStation |
| Type 3 | 이전값 컬럼 유지 | 1단계 이력만 | 미사용 |
| Type 6 | Hybrid | 복잡 | 미사용 |
Type 2는 기존 행을 수정하지 않고 새로운 행을 추가한다.
주요 컬럼:
| 컬럼 | 역할 |
|---|---|
| EffectiveStart | 시작 시점 |
| EffectiveEnd | 종료 시점 |
| IsCurrent | 현재 유효 여부 |
예:
| StationId | RackCount | EffectiveStart | EffectiveEnd | IsCurrent |
|---|---|---|---|---|
| ST-001 | 10 | 2025-01-01 | 2025-09-01 | 0 |
| ST-001 | 15 | 2025-09-01 | NULL | 1 |
이 방식으로:
해진다.
| 항목 | Lambda | Kappa |
|---|---|---|
| 처리 방식 | Batch + Stream | Stream Only |
| 실시간성 | 좋음 | 매우 좋음 |
| 정확성 | 매우 높음 | 높음 |
| 구조 복잡도 | 높음 | 낮음 |
| 개발 난이도 | 높음 | 상대적으로 쉬움 |
| 재처리 방식 | Batch 재계산 | Kafka replay |
| 대표 기술 | Hadoop + Spark | Kafka + Flink |
Event Hub
├─ Azure Stream Analytics → 실시간 대시보드
└─ Databricks Batch → 정산/통계
사용자 구조랑 비슷하게 보면:
KMA/AirKorea
→ Azure Function
→ Event Hub
→ ASA (실시간)
→ PostgreSQL
Kafka/Event Hub
→ Flink/ASA
→ PostgreSQL/Power BI
배치 없이:
Azure에서 DW를 구축할 때는 다양한 선택지가 존재한다.
등 전체 기능 사용 가능.
| 조건 | 추천 |
|---|---|
| SQL Server 전체 기능 필요 | SQL on VM |
| 온프레미스 거의 그대로 이전 | Managed Instance |
| 비용 최적 + 간헐적 사용 | SQL DB Serverless |
| 일정한 응답 성능 필요 | Provisioned |
| Power BI 중심 SaaS 환경 | Fabric |
Serverless는 사용한 만큼만 과금되는 컴퓨트 모델이다.
핵심 기능은 다음 세 가지다.
일정 시간 동안 쿼리나 연결이 없으면:
스토리지 비용만 유지된다.
교육/개발 환경에서 매우 유리하다.
새 연결이 들어오면:
운영 환경에서는:
등이 필요하다.
최소·최대 vCore 범위를 설정하면:
한다.
DW 적재에서 가장 일반적인 패턴은:
CSV → Storage Account → DW
이다.
본 과정에서는 두 가지 표준 패턴을 사용한다.
외부 CSV 파일을 대량 적재하는 전통적 방식.
외부 파일을 가상 테이블처럼 SELECT 하는 방식.
INSERT INTO staging.RentalRaw (rental_id, station_id, started_at, ended_at, duration_min)
SELECT
JSON_VALUE(c.line, '$.rental_id'),
CAST(c.station_id AS INT),
TRY_CONVERT(datetime2, c.started_at),
TRY_CONVERT(datetime2, c.ended_at),
c.duration_min
FROM OPENROWSET(
BULK '2024/2024-01-rental.csv',
DATA_SOURCE = 'BlobDS',
FORMAT = 'CSV',
FIRSTROW = 2,
FIELDTERMINATOR = ','
) WITH (
rental_id varchar(40),
station_id varchar(20),
started_at varchar(30),
ended_at varchar(30),
duration_min int
) AS c
WHERE TRY_CONVERT(datetime2, c.started_at) IS NOT NULL;
| 항목 | BULK INSERT | OPENROWSET |
|---|---|---|
| 속도 | 매우 빠름 | 상대적으로 느림 |
| 변환 | 낮음 | 높음 |
| 에러 처리 | 제한적 | TRY_CONVERT 가능 |
| 권장 용도 | staging 적재 | 정제·변환 적재 |
| 레이어 | 스키마 | 책임 | 예시 객체 |
|---|---|---|---|
| Raw | Storage raw/ | 원본 보관, 변환·삭제 금지 | 2024/2024-01-rental.csv |
| Staging (DB) | staging | 원형 그대로 적재된 1차 테이블, 클렌징·중복 제거 단계 | staging.RentalRaw |
| Warehouse (DB) | dw | 스타 스키마, 팩트·디멘션 정규 모델 | dw.FactRental, dw.DimStation |
| Mart (DB) | mart | 분석 사용자용 비정규 집계, 뷰·성능 최우선 | mart.vw_HourlyDemand |
배치 적재 이후 단계는 자동화이다.
Azure에서는:
를 조합한다.
Azure 전체 이벤트 라우팅 백본.
예:
Blob 업로드
↓
Event Grid
↓
Function App
↓
Stored Procedure 실행
CSV Upload
↓
Blob Storage
↓ BlobCreated Event
Event Grid
↓
Function App
↓
EXEC sp_LoadFactFromBlob
↓
Azure SQL Database
이 구조의 핵심은:
“Storage가 진실의 원본(Source of Truth)”
이라는 점이다.
| 상황 | 권장 방식 |
|---|---|
| 하루 1회 대량 적재 | 배치 |
| 실시간 데이터 도착 | 이벤트 |
| 대량 + 실시간 혼합 | 하이브리드 |
실무에서는:
조합이 가장 흔하다.
이번 실습의 전체 구조는 다음과 같다.
서울시 따릉이 CSV
↓
Azure Storage Account
(raw / staging / archive)
↓
BULK INSERT / OPENROWSET
↓
Azure SQL Database Serverless
(staging / dw / mart)
↓
분석 쿼리 / Power BI
추가 자동화:
BlobCreated Event
↓
Event Grid
↓
Azure Function
↓
Stored Procedure 실행
↓
DW 자동 적재
핵심은 Storage Account를 원본 데이터 저장소로 두고, Azure SQL Database Serverless를 분석용 DW로 사용하는 것이다.
먼저 실습에서 사용할 변수들을 정의한다.
RG=rg-dwlab-$USER
LOC=koreacentral
SQL_SRV=sql-dwlab-$USER-$RANDOM
SQL_DB=dw_seoulbike
ADMIN_USER=dwadmin
ADMIN_PASS='Dw!Lab2026Secure'
리소스 그룹과 SQL Server를 생성한다.
az group create -n $RG -l $LOC
az sql server create \
--name $SQL_SRV \
--resource-group $RG \
--location $LOC \
--admin-user $ADMIN_USER \
--admin-password $ADMIN_PASS
현재 접속 IP를 방화벽에 등록한다.
MY_IP=$(curl -s https://api.ipify.org)
az sql server firewall-rule create \
--resource-group $RG \
--server $SQL_SRV \
--name allow-me \
--start-ip-address $MY_IP \
--end-ip-address $MY_IP
az sql db create \
--resource-group $RG \
--server $SQL_SRV \
--name $SQL_DB \
--edition GeneralPurpose \
--family Gen5 \
--compute-model Serverless \
--min-capacity 0.5 \
--capacity 2 \
--auto-pause-delay 60 \
--backup-storage-redundancy Local \
--collation Korean_Wansung_CI_AS
주요 옵션은 다음과 같다.
| 옵션 | 의미 |
|---|---|
--compute-model Serverless | Serverless 계층 사용 |
--min-capacity 0.5 | 최소 0.5 vCore |
--capacity 2 | 최대 2 vCore |
--auto-pause-delay 60 | 60분 미사용 시 자동 일시 중지 |
--backup-storage-redundancy Local | 교육용 비용 절감 |
접속 확인:
sqlcmd -S $SQL_SRV.database.windows.net \
-d $SQL_DB \
-U $ADMIN_USER \
-P "$ADMIN_PASS" \
-Q "SELECT @@VERSION;"
교육 환경에서는 자동 일시 중지를 빠르게 확인하기 위해 15분으로 변경한다.
az sql db update \
-g $RG \
-s $SQL_SRV \
-n $SQL_DB \
--auto-pause-delay 15
SQL Database에서 다음 DMV를 조회해 CPU, 메모리, IO 사용률을 확인한다.
SELECT TOP 5
avg_cpu_percent,
avg_memory_usage_percent,
avg_data_io_percent,
end_time
FROM sys.dm_db_resource_stats
ORDER BY end_time DESC;
확인할 내용은 다음과 같다.
| 항목 | 확인 내용 |
|---|---|
| Auto Pause | 일정 시간 미사용 시 DB가 Paused 상태가 되는지 |
| Auto Resume | 다시 접속했을 때 자동으로 Online 상태가 되는지 |
| Cold Start | 첫 연결까지 30~120초 정도 지연되는지 |
운영 환경에서는 Cold Start가 사용자 경험에 영향을 줄 수 있으므로, 재시도 로직이나 Keep-alive 전략을 고려해야 한다.
STO_ACC=stodwlab$USER$RANDOM
az storage account create \
-g $RG \
-n $STO_ACC \
-l $LOC \
--sku Standard_LRS \
--kind StorageV2 \
--access-tier Hot \
--allow-blob-public-access false \
--min-tls-version TLS1_2
Storage Key를 가져온다.
STO_KEY=$(az storage account keys list \
-g $RG \
-n $STO_ACC \
--query [0].value \
-o tsv)
컨테이너 3개를 생성한다.
for c in raw staging archive; do
az storage container create \
--name $c \
--account-name $STO_ACC \
--account-key $STO_KEY
done
| 컨테이너 | 역할 |
|---|---|
raw | 원본 CSV 보관 |
staging | 정제 중간 결과 |
archive | 적재 완료 파일 보관 |
생성된 샘플 CSV 파일을 raw/seoul_bike/ 경로에 업로드한다.
az storage blob upload-batch \
--account-name "$STO_ACC" \
--account-key $STO_KEY \
--destination raw \
--destination-path seoul_bike/ \
--source ./seoul_bike_data \
--pattern "*.csv" \
--overwrite
업로드 확인:
az storage blob list \
--account-name "$STO_ACC" \
--account-key $STO_KEY \
--container-name raw \
--prefix "seoul_bike/" \
--query "[].{name:name, size:properties.contentLength}" \
-o table
Azure SQL Database에서 Blob을 읽기 위해 SAS 토큰을 생성한다.
EXPIRY=$(date -u -d "+7 days" '+%Y-%m-%dT%H:%MZ')
SAS=$(az storage container generate-sas \
--account-name $STO_ACC \
--name raw \
--permissions rl \
--expiry $EXPIRY \
--https-only \
--output tsv)
echo $SAS
SAS는 비밀번호와 같은 민감 정보이므로 외부에 노출되지 않도록 관리해야 한다.
CREATE SCHEMA staging AUTHORIZATION dbo;
GO
CREATE SCHEMA dw AUTHORIZATION dbo;
GO
CREATE SCHEMA mart AUTHORIZATION dbo;
GO
CREATE TABLE dw.DimDate (
DateKey INT NOT NULL PRIMARY KEY,
[Date] DATE NOT NULL,
[Year] SMALLINT NOT NULL,
Quarter TINYINT NOT NULL,
[Month] TINYINT NOT NULL,
MonthName NVARCHAR(10) NOT NULL,
[Day] TINYINT NOT NULL,
DayOfWeek TINYINT NOT NULL,
DayName NVARCHAR(10) NOT NULL,
IsWeekend BIT NOT NULL,
IsHoliday BIT NOT NULL DEFAULT 0
);
CREATE TABLE dw.DimTime (
TimeKey INT NOT NULL PRIMARY KEY,
[Hour] TINYINT NOT NULL,
[Minute] TINYINT NOT NULL,
TimeBucket NVARCHAR(10) NOT NULL
);
CREATE TABLE dw.DimUserType (
UserTypeKey INT IDENTITY PRIMARY KEY,
UserTypeCode VARCHAR(20) NOT NULL UNIQUE,
UserTypeName NVARCHAR(40) NOT NULL
);
CREATE TABLE dw.FactRental (
RentalKey BIGINT IDENTITY,
RentalId VARCHAR(40) NOT NULL,
StartDateKey INT NOT NULL,
StartTimeKey INT NOT NULL,
EndDateKey INT NULL,
EndTimeKey INT NULL,
StartStationKey BIGINT NULL,
EndStationKey BIGINT NULL,
UserTypeKey INT NULL,
DurationMin INT NULL,
DistanceMeter INT NULL,
CarbonGramSaved DECIMAL(10,2) NULL,
LoadedAt DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME(),
SourceFile VARCHAR(256) NULL,
CONSTRAINT PK_FactRental PRIMARY KEY NONCLUSTERED (RentalKey)
);
CREATE CLUSTERED COLUMNSTORE INDEX CCI_FactRental
ON dw.FactRental;
CREATE UNIQUE INDEX UX_FactRental_RentalId
ON dw.FactRental(RentalId);
FactRental은 대량 집계가 주 목적이므로 Clustered Columnstore Index를 적용한다.
WITH d AS (
SELECT CAST('2015-01-01' AS DATE) AS dt
UNION ALL
SELECT DATEADD(DAY, 1, dt)
FROM d
WHERE dt < '2030-12-31'
)
INSERT dw.DimDate (
DateKey, [Date], [Year], Quarter, [Month], MonthName,
[Day], DayOfWeek, DayName, IsWeekend
)
SELECT
CONVERT(INT, FORMAT(dt,'yyyyMMdd')),
dt,
YEAR(dt),
DATEPART(QUARTER, dt),
MONTH(dt),
DATENAME(MONTH, dt),
DAY(dt),
DATEPART(WEEKDAY, dt),
DATENAME(WEEKDAY, dt),
CASE WHEN DATEPART(WEEKDAY, dt) IN (1,7) THEN 1 ELSE 0 END
FROM d
OPTION (MAXRECURSION 0);
WITH m AS (
SELECT 0 AS n
UNION ALL
SELECT n + 1
FROM m
WHERE n < 1439
)
INSERT dw.DimTime (TimeKey, [Hour], [Minute], TimeBucket)
SELECT
(n / 60) * 100 + (n % 60),
n / 60,
n % 60,
CASE
WHEN n/60 BETWEEN 0 AND 5 THEN N'심야'
WHEN n/60 BETWEEN 6 AND 11 THEN N'오전'
WHEN n/60 BETWEEN 12 AND 17 THEN N'오후'
WHEN n/60 BETWEEN 18 AND 22 THEN N'저녁'
ELSE N'심야'
END
FROM m
OPTION (MAXRECURSION 0);
사용자 유형도 기본 적재한다.
INSERT dw.DimUserType (UserTypeCode, UserTypeName) VALUES
('MEMBER', N'정기권 회원'),
('NONMEMBER', N'일일권 비회원'),
('UNKNOWN', N'미상');
검증 기준:
| 테이블 | 기대값 |
|---|---|
dw.DimDate | 5,844행 |
dw.DimTime | 1,440행 |
dw.DimUserType | 3행 |
dw.FactRental | 빈 상태 |
SQL Database에서 Blob Storage에 접근하기 위한 3종 객체를 만든다.
IF NOT EXISTS (
SELECT 1
FROM sys.symmetric_keys
WHERE name = '##MS_DatabaseMasterKey##'
)
CREATE MASTER KEY ENCRYPTION BY PASSWORD = 'Lab!MasterKey2026';
CREATE DATABASE SCOPED CREDENTIAL StorageCred
WITH IDENTITY = 'SHARED ACCESS SIGNATURE',
SECRET = '<SAS_TOKEN>';
CREATE EXTERNAL DATA SOURCE BlobRaw
WITH (
TYPE = BLOB_STORAGE,
LOCATION = 'https://<STO_ACC>.blob.core.windows.net/raw',
CREDENTIAL = StorageCred
);
<SAS_TOKEN>에는 앞의 ?를 제외한 SAS 본문만 넣는다.
DROP TABLE IF EXISTS staging.RentalRaw;
CREATE TABLE staging.RentalRaw (
RentalId VARCHAR(20) NOT NULL,
BikeId VARCHAR(20) NOT NULL,
StartTime DATETIME2(0) NOT NULL,
EndTime DATETIME2(0) NOT NULL,
StartStationId VARCHAR(20) NOT NULL,
EndStationId VARCHAR(20) NOT NULL,
DurationMin INT NOT NULL,
DistanceMeter INT NOT NULL,
UserType VARCHAR(20) NOT NULL
);
TRUNCATE TABLE staging.RentalRaw;
DECLARE @i INT = 1, @sql NVARCHAR(MAX);
WHILE @i <= 7
BEGIN
SET @sql = N'
BULK INSERT staging.RentalRaw
FROM ''seoul_bike/rentals_2025090' + CAST(@i AS VARCHAR(1)) + '.csv''
WITH (
DATA_SOURCE = ''BlobRaw'',
FORMAT = ''CSV'',
FIRSTROW = 2,
FIELDTERMINATOR = '','',
ROWTERMINATOR = ''0x0d0a'',
CODEPAGE = ''65001'',
TABLOCK,
MAXERRORS = 100
);';
EXEC sp_executesql @sql;
SET @i += 1;
END
검증:
SELECT COUNT(*) AS staging_rows
FROM staging.RentalRaw;
기대값은 21,600행이다.
TRUNCATE TABLE dw.FactRental;
INSERT INTO dw.FactRental (
RentalId, StartDateKey, StartTimeKey, EndDateKey, EndTimeKey,
StartStationKey, EndStationKey, UserTypeKey,
DurationMin, DistanceMeter, CarbonGramSaved,
LoadedAt, SourceFile
)
SELECT
s.RentalId,
CONVERT(INT, CONVERT(VARCHAR(8), s.StartTime, 112)) AS StartDateKey,
DATEPART(HOUR, s.StartTime) * 60 + DATEPART(MINUTE, s.StartTime) AS StartTimeKey,
CONVERT(INT, CONVERT(VARCHAR(8), s.EndTime, 112)) AS EndDateKey,
DATEPART(HOUR, s.EndTime) * 60 + DATEPART(MINUTE, s.EndTime) AS EndTimeKey,
CAST(NULL AS BIGINT) AS StartStationKey,
CAST(NULL AS BIGINT) AS EndStationKey,
COALESCE(ut.UserTypeKey, ut_unk.UserTypeKey) AS UserTypeKey,
s.DurationMin,
s.DistanceMeter,
CAST(s.DistanceMeter * 0.21 AS DECIMAL(10,2)) AS CarbonGramSaved,
SYSUTCDATETIME() AS LoadedAt,
'seoul_bike/rentals_' + CONVERT(VARCHAR(8), s.StartTime, 112) + '.csv' AS SourceFile
FROM staging.RentalRaw AS s
LEFT JOIN dw.DimUserType AS ut
ON ut.UserTypeCode = s.UserType
LEFT JOIN dw.DimUserType AS ut_unk
ON ut_unk.UserTypeCode = 'UNKNOWN';
여기서는 아직 StartStationKey, EndStationKey를 채우지 않는다.
대여소 디멘션은 Lab 06에서 SCD Type 2로 처리한 뒤 백필한다.
SELECT COUNT(*) AS fact_rows
FROM dw.FactRental;
SELECT StartDateKey, COUNT(*) AS c
FROM dw.FactRental
GROUP BY StartDateKey
ORDER BY StartDateKey;
SELECT COUNT(*) AS unmapped_user_type
FROM dw.FactRental
WHERE UserTypeKey IS NULL;
검증 포인트:
| 항목 | 기대값 |
|---|---|
| Fact 행수 | 21,600 |
| 날짜 분포 | 7일치 |
| UserTypeKey NULL | 0 |
| StationKey | 아직 NULL |
| CarbonGramSaved | DistanceMeter * 0.21 |
CREATE TABLE dw.DimStation (
StationKey BIGINT IDENTITY(1,1) NOT NULL,
StationId VARCHAR(20) NOT NULL,
StationName NVARCHAR(100) NOT NULL,
Gu NVARCHAR(50) NOT NULL,
Lat DECIMAL(9,5) NOT NULL,
Lng DECIMAL(9,5) NOT NULL,
RackCount INT NOT NULL,
RowHash BINARY(32) NOT NULL,
ValidFrom DATE NOT NULL,
ValidTo DATE NULL,
IsCurrent BIT NOT NULL,
LoadedAt DATETIME2(0) NOT NULL DEFAULT SYSUTCDATETIME(),
CONSTRAINT PK_DimStation PRIMARY KEY NONCLUSTERED (StationKey)
);
CREATE UNIQUE INDEX UX_DimStation_BusinessVersion
ON dw.DimStation (StationId, ValidFrom);
CREATE INDEX IX_DimStation_Current
ON dw.DimStation (StationId)
WHERE IsCurrent = 1;
CREATE OR ALTER FUNCTION dw.fn_StationRowHash(
@StationName NVARCHAR(100),
@Gu NVARCHAR(50),
@Lat DECIMAL(9,5),
@Lng DECIMAL(9,5),
@RackCount INT
) RETURNS BINARY(32)
WITH SCHEMABINDING
AS
BEGIN
RETURN HASHBYTES('SHA2_256',
CONCAT_WS(N'|',
@StationName,
@Gu,
CONVERT(NVARCHAR(20), @Lat, 1),
CONVERT(NVARCHAR(20), @Lng, 1),
CAST(@RackCount AS NVARCHAR(20))
)
);
END
RowHash를 사용하면 여러 컬럼을 각각 비교하지 않고 해시값 하나로 변경 여부를 판단할 수 있다.
DROP TABLE IF EXISTS staging.StationRaw;
CREATE TABLE staging.StationRaw (
StationId VARCHAR(20) NOT NULL,
StationName NVARCHAR(100) NOT NULL,
Gu NVARCHAR(50) NOT NULL,
Lat DECIMAL(9,5) NOT NULL,
Lng DECIMAL(9,5) NOT NULL,
RackCount INT NOT NULL,
OpenedDate DATE NOT NULL
);
TRUNCATE TABLE staging.StationRaw;
BULK INSERT staging.StationRaw
FROM 'seoul_bike/stations.csv'
WITH (
DATA_SOURCE='BlobRaw',
FORMAT='CSV',
FIRSTROW=2,
FIELDTERMINATOR=',',
ROWTERMINATOR='0x0d0a',
CODEPAGE='65001',
TABLOCK,
MAXERRORS=0
);
INSERT INTO dw.DimStation (
StationId, StationName, Gu, Lat, Lng, RackCount,
RowHash, ValidFrom, ValidTo, IsCurrent
)
SELECT
s.StationId,
s.StationName,
s.Gu,
s.Lat,
s.Lng,
s.RackCount,
dw.fn_StationRowHash(s.StationName, s.Gu, s.Lat, s.Lng, s.RackCount),
s.OpenedDate,
NULL,
1
FROM staging.StationRaw s;
UPDATE f
SET f.StartStationKey = s.StationKey
FROM dw.FactRental f
INNER JOIN staging.RentalRaw r
ON r.RentalId = f.RentalId
INNER JOIN dw.DimStation s
ON s.StationId = r.StartStationId
AND CAST(r.StartTime AS DATE) >= s.ValidFrom
AND (s.ValidTo IS NULL OR CAST(r.StartTime AS DATE) < s.ValidTo)
WHERE f.StartStationKey IS NULL;
UPDATE f
SET f.EndStationKey = s.StationKey
FROM dw.FactRental f
INNER JOIN staging.RentalRaw r
ON r.RentalId = f.RentalId
INNER JOIN dw.DimStation s
ON s.StationId = r.EndStationId
AND CAST(r.EndTime AS DATE) >= s.ValidFrom
AND (s.ValidTo IS NULL OR CAST(r.EndTime AS DATE) < s.ValidTo)
WHERE f.EndStationKey IS NULL;
검증:
SELECT
SUM(CASE WHEN StartStationKey IS NULL THEN 1 ELSE 0 END) AS null_start,
SUM(CASE WHEN EndStationKey IS NULL THEN 1 ELSE 0 END) AS null_end
FROM dw.FactRental;
기대값은 0 / 0이다.
SELECT
t.TimeBucket,
d.DayName,
COUNT(*) AS rentals,
AVG(f.DurationMin) AS avg_minutes
FROM dw.FactRental f
JOIN dw.DimTime t
ON t.TimeKey = f.StartTimeKey
JOIN dw.DimDate d
ON d.DateKey = f.StartDateKey
GROUP BY t.TimeBucket, d.DayName
ORDER BY rentals DESC;
이 쿼리로 시간대와 요일별 이용 패턴을 확인할 수 있다.
WITH dep AS (
SELECT s.Gu, COUNT(*) AS departures
FROM dw.FactRental f
JOIN dw.DimStation s
ON s.StationKey = f.StartStationKey
GROUP BY s.Gu
),
arr AS (
SELECT s.Gu, COUNT(*) AS arrivals
FROM dw.FactRental f
JOIN dw.DimStation s
ON s.StationKey = f.EndStationKey
GROUP BY s.Gu
)
SELECT
COALESCE(d.Gu, a.Gu) AS Gu,
d.departures,
a.arrivals,
a.arrivals - d.departures AS net_flow
FROM dep d
FULL OUTER JOIN arr a
ON a.Gu = d.Gu
ORDER BY ABS(a.arrivals - d.departures) DESC;
net_flow가 크면 해당 자치구에서 자전거 적체 또는 부족이 발생할 가능성이 높다.
SELECT TOP 10
st.StationName AS start_station,
en.StationName AS end_station,
COUNT(*) AS trips,
AVG(f.DurationMin) AS avg_min
FROM dw.FactRental f
JOIN dw.DimStation st
ON st.StationKey = f.StartStationKey
JOIN dw.DimStation en
ON en.StationKey = f.EndStationKey
WHERE f.StartStationKey IS NOT NULL
AND f.EndStationKey IS NOT NULL
AND f.StartStationKey <> f.EndStationKey
GROUP BY st.StationName, en.StationName
ORDER BY trips DESC;
SELECT
COALESCE(s.Gu, '<<TOTAL_GU>>') AS Gu,
COALESCE(CAST(t.Hour AS VARCHAR(8)), '<<TOTAL_HOUR>>') AS Hour,
COUNT(*) AS rides
FROM dw.FactRental f
JOIN dw.DimStation s
ON s.StationKey = f.StartStationKey
JOIN dw.DimTime t
ON t.TimeKey = f.StartTimeKey
GROUP BY ROLLUP(s.Gu, t.Hour)
ORDER BY GROUPING(s.Gu), s.Gu, GROUPING(t.Hour), t.Hour;
FUNC_APP=func-dwlab-$USER-$RANDOM
FUNC_LOC="koreacentral"
az functionapp create \
-g $RG \
-n $FUNC_APP \
--consumption-plan-location $FUNC_LOC \
--runtime python \
--runtime-version 3.11 \
--functions-version 4 \
--storage-account $STO_ACC \
--os-type Linux
Managed Identity를 활성화한다.
az functionapp identity assign \
-g $RG \
-n $FUNC_APP
Storage 읽기 권한을 부여한다.
FUNC_PRINCIPAL=$(az functionapp identity show \
-g $RG \
-n $FUNC_APP \
--query principalId \
-o tsv)
STO_ID=$(az storage account show \
-g $RG \
-n $STO_ACC \
--query id \
-o tsv)
az role assignment create \
--assignee "$FUNC_PRINCIPAL" \
--role 'Storage Blob Data Reader' \
--scope "$STO_ID"
CREATE USER [func-dwlab-...] FROM EXTERNAL PROVIDER;
ALTER ROLE db_datareader ADD MEMBER [func-dwlab-...];
ALTER ROLE db_datawriter ADD MEMBER [func-dwlab-...];
GRANT EXECUTE ON SCHEMA :: staging TO [func-dwlab-...];
GRANT EXECUTE ON SCHEMA :: dw TO [func-dwlab-...];
운영 환경에서는 Function 코드에 SQL 비밀번호를 넣지 않는 것이 중요하다.
Managed Identity를 사용하면 Function App 자체의 신원으로 SQL Database에 접근할 수 있다.
import logging, os, struct
import azure.functions as func
from azure.identity import DefaultAzureCredential
import pyodbc
app = func.FunctionApp()
@app.event_grid_trigger(arg_name='event')
def blob_loaded(event: func.EventGridEvent):
data = event.get_json()
blob_url = data.get('url')
logging.info(f'BlobCreated: {blob_url}')
if '/raw/' not in blob_url or not blob_url.endswith('.csv'):
logging.info('Skip non-target blob')
return
rel = blob_url.split('/raw/')[-1]
cred = DefaultAzureCredential()
token = cred.get_token(
'https://database.windows.net/.default'
).token.encode('utf-16-le')
token_struct = struct.pack(f'=i{len(token)}s', len(token), token)
SQL_COPT_SS_ACCESS_TOKEN = 1256
conn_str = (
'Driver={ODBC Driver 18 for SQL Server};'
f'Server=tcp:{os.environ["SQL_SERVER"]},1433;'
f'Database={os.environ["SQL_DB"]};'
'Encrypt=yes;TrustServerCertificate=no;'
)
with pyodbc.connect(
conn_str,
attrs_before={SQL_COPT_SS_ACCESS_TOKEN: token_struct}
) as cn:
cn.cursor().execute(
'EXEC dw.sp_LoadFactFromBlob @blobPath = ?',
rel
).commit()
logging.info('sp_LoadFactFromBlob OK')
CREATE OR ALTER PROCEDURE dw.sp_LoadFactFromBlob
@blobPath NVARCHAR(500)
AS
BEGIN
SET NOCOUNT ON;
DECLARE @sql NVARCHAR(MAX);
TRUNCATE TABLE staging.RentalRaw;
SET @sql = N'BULK INSERT staging.RentalRaw
FROM ''' + @blobPath + N'''
WITH (
DATA_SOURCE=''BlobRaw'',
FORMAT=''CSV'',
FIRSTROW=2,
FIELDTERMINATOR='','',
ROWTERMINATOR=''0x0a'',
CODEPAGE=''65001'',
MAXERRORS=100
)';
EXEC sp_executesql @sql;
EXEC dw.sp_TransformAndLoad @sourceFile = @blobPath;
END
FUNC_KEY=$(az functionapp keys list \
-g $RG \
-n $FUNC_APP \
--query systemKeys.eventgrid_extension \
-o tsv)
ENDPOINT="https://$FUNC_APP.azurewebsites.net/runtime/webhooks/EventGrid?functionName=blob_loaded&code=$FUNC_KEY"
az eventgrid event-subscription create \
--name sub-blob-to-func \
--source-resource-id $STO_ID \
--endpoint-type webhook \
--endpoint "$ENDPOINT" \
--included-event-types Microsoft.Storage.BlobCreated \
--subject-begins-with /blobServices/default/containers/raw/
이제 raw 컨테이너에 CSV가 업로드되면 Event Grid가 Function을 호출하고, Function이 SQL 저장 프로시저를 실행해 적재한다.
Logic Apps는 GUI 기반으로 ETL 흐름을 구성할 수 있다.
워크플로우 구조는 다음과 같다.
Recurrence: 매일 02:00 KST
↓
List blobs in raw
↓
Filter array
↓
For each
↓
Execute stored procedure
↓
성공: archive 이동
실패: 이메일 알림
Function이 “파일이 올라오자마자 즉시 처리”에 적합하다면, Logic Apps는 “정해진 시간에 여러 작업을 순서대로 실행”하는 데 적합하다.
RESTORE_TS=$(date -u -d "-5 min" '+%Y-%m-%dT%H:%M:%S')
az sql db restore \
-g $RG \
-s $SQL_SRV \
-n dw_seoulbike \
--dest-name dw_seoulbike_pitr \
--time $RESTORE_TS \
--edition GeneralPurpose \
--family Gen5 \
--capacity 2
복원된 DB 행수 확인:
sqlcmd -S $SQL_SRV.database.windows.net \
-d dw_seoulbike_pitr \
-U $ADMIN_USER \
-P "$ADMIN_PASS" \
-Q "SELECT COUNT(*) FROM dw.FactRental;"
az monitor metrics alert create \
-g $RG \
-n alert-dw-cpu-high \
--scopes $(az sql db show -g $RG -s $SQL_SRV -n $SQL_DB --query id -o tsv) \
--condition "avg cpu_percent > 80" \
--window-size 5m \
--evaluation-frequency 1m \
--severity 2
CREATE ROLE dw_analyst;
GRANT SELECT ON SCHEMA :: dw TO dw_analyst;
GRANT SELECT ON SCHEMA :: mart TO dw_analyst;
DENY SELECT ON SCHEMA :: staging TO dw_analyst;
CREATE ROLE dw_loader;
GRANT SELECT, INSERT, UPDATE, DELETE ON SCHEMA :: staging TO dw_loader;
GRANT SELECT, INSERT, UPDATE ON SCHEMA :: dw TO dw_loader;
GRANT EXECUTE ON SCHEMA :: dw TO dw_loader;
CREATE ROLE dw_admin;
GRANT CONTROL ON DATABASE :: dw_seoulbike TO dw_admin;
| Role | 역할 |
|---|---|
dw_analyst | 분석가 읽기 권한 |
dw_loader | ETL 적재 권한 |
dw_admin | 운영 관리자 권한 |
실습 종료 후에는 리소스 그룹을 삭제해 비용 누적을 막는다.
az sql db delete \
-g $RG \
-s $SQL_SRV \
-n dw_seoulbike_pitr \
--yes 2>/dev/null
az group delete \
-n $RG \
--yes \
--no-wait
| 단계 | 내용 |
|---|---|
| 1 | Azure SQL Database Serverless 생성 |
| 2 | Serverless auto-pause / auto-resume 검증 |
| 3 | Storage Account 생성 및 CSV 업로드 |
| 4 | 스타 스키마 생성 |
| 5 | BULK INSERT로 staging 적재 |
| 6 | staging → FactRental 변환 적재 |
| 7 | SCD Type 2로 대여소 디멘션 관리 |
| 8 | 시간대·자치구·OD 분석 |
| 9 | Event Grid + Function으로 자동 적재 |
| 10 | Logic Apps로 일일 적재 워크플로우 구성 |
| 11 | PITR, Monitor, Role, 비용 정리 |
이번 실습의 핵심은 다음과 같다.
BULK INSERT는 빠른 1차 적재에 적합하다.