
제공해주신 소스 파일 "03-30-2.03 데이터플로우_v12.pdf"의 모든 페이지 내용을 생략 없이, 특히 누락되었던 적용 예시와 수식들을 원문 그대로 포함하여 정리해 드립니다. 여러 줄로 된 부분은 합치지 않고 소스 형태를 유지했습니다.
매핑 데이터 플로우는 데이터 변환을 시각적으로 설계할 수 있는 기능으로, 코딩 없이 GUI 기반으로 데이터 흐름을 정의하고 실행할 수 있도록 지원합니다.
| 구분 | 파이프라인 | 데이터 플로우 |
|---|---|---|
| 역할 | 실행 제어 / 전체 흐름 관리 | 데이터 변환 수행 |
input, output 컨테이너의 기존 데이터 삭제 후 StudentsPerformance.csv 업로드.BlobStorage1 (Azure Blob Storage) 생성 및 연결 테스트 성공 확인.StudentsInputDS): input 컨테이너, StudentsPerformance.csv 참조, 첫 번째 행을 머리글로 설정, 스키마 가져오기(연결/저장소에서).StudentsOutputDS): output 컨테이너 참조, 첫 번째 행을 머리글로 설정.
StudentsCleanFlow
Students): StudentsInputDS 연결 후 프로젝션 가져오기를 통해 데이터 형식 검색.

스키마 드리프트: 원본이 아닌 별도의 테이블을 만들어 사용할 때 어느정도의 오류는 ok 하는 기능
AverageScore)컴포넌트 옵션에는 항상 출력, 입력 스트림이 존재함

+ 누른 후 파생 열 추가AverageScoreavg_score(toInteger({math score})+toInteger({reading score})+toInteger({writing score}))/3GradeLevel)
GradeLevelgradeiif(avg_score>=90, 'A',
iif(avg_score>=80, 'B',
iif(avg_score>=70, 'C',
iif(avg_score>=60, 'D', 'F'))))


데이터 세트: StudentsOutputDS
설정: 파티션 설정을 '단일 파티션'으로 변경 후 단일 파일로 출력 선택. (기본적으로는 분산 저장을 지원)

파일명: Students_clean.csv

StudentsPerformancePipeline에 Data Flow 활동(RunStudentsCleanFlow) 추가.output 컨테이너의 Students_clean.csv에서 평균 점수와 등급(A~F) 확인.

AverageScore 단계에서 새 분기(New Branch) 추가.
AverageByGroup)
출력 스트림 이름: AverageByGroup
그룹화 방법: race/ethnicity 열 기준.

집계 컬럼: avg_score_by_group
식: avg(avg_score)

StudentsOutputDSavg_score_by_group.csv


통합 런타임(Integration Runtime)은 Azure Data Factory에서 데이터 이동, 변환, 실행 등의 기능을 수행하기 위한 컴퓨팅 인프라 역할을 수행합니다.
Integration Runtime의 핵심
| IR의 수행 기능 | IR의 수행 내용 |
|---|---|
| 활동 실행 | Copy, 외부 리소스 실행 등 여러 가지 활동의 지원 |
| 데이터 이동 | 클라우드 ↔ 온프레미스 간 안전한 복사 수행 |
| 데이터 흐름 실행 | Mapping Data Flow 등 고급 데이터 변환 지원 * Azure IR 전용 |
IR은 "ADF의 실행 엔진"이며, 어떤 네트워크에서 데이터를 가져오고 어디로 보낼 것인지에 따라 적절한 유형을 선택하는 것이 필수적입니다.
데이터 팩토리에서는 다양한 환경에 맞게 세 가지 유형의 통합 런타임을 제공합니다. 각 유형은 사용자의 네트워크 구조, 데이터 위치, 기존 시스템 여부에 따라 선택해야 합니다.
| 통합 런타임 유형 | 주요 목적 및 특성 |
|---|---|
| Azure Integration Runtime | Azure 내부 서비스 간 데이터 이동 및 변환, 완전 관리형, 서버리스 컴퓨팅 환경 |
| Self-hosted IR (SHIR) | 온프레미스 또는 VNet 내 리소스와 연결, 사용자 컴퓨터/VM에 런타임 설치 필요 |
| Azure-SSIS IR | SSIS 패키지를 Azure에서 실행하기 위한 전용 런타임, SQL Managed Instance 필요 |
| 구분 | Azure IR | SHIR | Azure-SSIS IR |
|---|---|---|---|
| 관리 주체 | Microsoft | 사용자 직접 관리 | Microsoft |
| 설치 필요 여부 | 없음 | 설치 필요 | 설치 필요 |
| 데이터 흐름 지원 여부 | 지원 | 미지원 | 미지원 |
| 사용 위치 | Azure 간 서비스 | 온프레미스, VNet 리소스 | SSIS 기반 데이터 마이그레이션 |
IR 선택은 데이터의 출발지/도착지, 네트워크 형태(공용/사설), 기존 시스템 여부(SSIS 등) 등을 고려하여 결정합니다.
Self-Hosted 통합 런타임은 Azure Data Factory에서 사설 네트워크나 온프레미스 환경의 데이터에 접근할 수 있도록 해주는 소프트웨어 컴포넌트입니다.
| 유형 | Azure Cloud | Private Network |
|---|---|---|
| Azure IR | Activity 실행, 데이터 이동, 데이터 플로우 | 지원 안함 |
| Self-hosted IR | Activity 실행, 데이터 이동 | Activity 실행, 데이터 이동 |
| Azure-SSIS IR | SSIS 패키지 실행 (제한적) | SSIS 패키지 실행 |

실습을 위해 다음과 같은 리소스를 순차적으로 생성합니다.
1. Azure Virtual Network 생성: vnet (주소 공간: 10.0.0.0/16).

2. Subnet 생성: subnet (주소 범위: 10.0.1.0/24)

3. SQL Server용 VM 생성: SQL-vm

이미지: SQL Server 2019 Developer on Windows Server 2019

모든 이미지 보기 선택

만들기 후 2세대 선택
크기: Standard_B2ms (2 vcpu, 8 GiB 메모리)



SHIR-vm이미지: Windows Server 2019 Datacenter

디스크 표준으로 설정

네트워크: vnet / subnet 연결


들어가서 RDP 파일 다운로드
SHIR-vm 접속: RDP를 통해 가상 머신에 접속합니다.
런타임 다운로드: VM 내부 브라우저에서 'Microsoft Integration Runtime'을 검색하여 설치 파일을 다운로드합니다.

ADF에서 SHIR 생성: ADF Studio의 [관리] > [통합 런타임]에서 '자체 호스팅' 유형으로 shir를 생성하고 인증 키를 복사합니다.



노드 등록: VM에 설치된 Configuration Manager를 실행하고 복사한 인증 키를 입력하여 등록을 완료합니다.

ADF 관리탭에서도 표시된다.

input 컨테이너에 StudentsPerformance.csv 업로드.SQL-vm 내 SSMS를 실행하여 데이터베이스와 테이블을 생성합니다.(동일하게 SQL-vm의 rdp 파일 설치 후 실행)


CREATE DATABASE StudentsDB;
GO
USE StudentsDB;
CREATE TABLE StudentsPerformance (
gender NVARCHAR(10),
race_ethnicity NVARCHAR(20),
parental_level_of_education NVARCHAR(50),
lunch NVARCHAR(20),
test_preparation_course NVARCHAR(20),
math_score INT,
reading_score INT,
writing_score INT
);
링크드 서비스 (원본): BlobStorage1 (AutoResolveIntegrationRuntime 사용).

링크드 서비스 (싱크): vnetSqlServer1
shir 선택10.0.1.4)StudentsDB


StudentsInputDS (DelimitedText)vnetStudentsDS1 (SQL Server 테이블)



Copy Blob to SQL) 추가.스키마 가져오기 후 매핑

SELECT TOP 50 * FROM StudentsPerformance; 쿼리를 통해 데이터 복사 여부를 검증합니다.
실습 완료 후 비용 발생을 방지하기 위해 다음 리소스들을 반드시 삭제해야 합니다.
SHIR-vm, SQL-vmvnet-demo (가상 네트워크)주의: SHIR 노드가 설치된 VM을 삭제하면 ADF Studio에서 해당 통합 런타임 상태가 '사용할 수 없음'으로 표시됩니다. 필요하지 않은 경우 ADF 내의 통합 런타임과 연결된 서비스도 함께 삭제하여 정리합니다.
이후 연결된 서비스에서 vnetSqlServer1의 통합 런타임 연결을 AutoResolveIntegrationRuntime으로 변경하여 적용 후 데이터세트의 vnetStudentsDS1, 파이프라인의 CopyStudentPerformance 삭제, 게시 후 연결된서비스 마저 삭제
Lookup 활동을 통해 다양한 데이터 소스에서 목록이나 데이터를 조회하여, 파이프라인의 동적인 데이터 처리 흐름을 설계할 수 있습니다. Lookup 활동은 반복 작업의 시작점이 되는 정보를 제공합니다.
ForEach 활동을 사용하면, 조회된 데이터 목록을 활용하여 각 항목별로 동일하거나 다양한 작업을 반복 실행할 수 있습니다. ForEach 활동을 통해 대량 데이터의 일괄 처리와 자동화가 가능합니다.

winequality-red.csv (또는 wine.csv)titanic.csvadult.csvoutput 컨테이너를 준비합니다.SQL 데이터베이스에서 다음 쿼리를 실행하여 테이블을 생성합니다.
wine 테이블 생성
CREATE TABLE wine (
fixed_acidity FLOAT,
volatile_acidity FLOAT,
citric_acid FLOAT,
residual_sugar FLOAT,
chlorides FLOAT,
free_sulfur_dioxide FLOAT,
total_sulfur_dioxide FLOAT,
density FLOAT,
pH FLOAT,
sulphates FLOAT,
alcohol FLOAT,
quality INT
);
titanic 테이블 생성
CREATE TABLE titanic (
PassengerId INT,
Survived INT,
Pclass INT,
Name NVARCHAR(100),
Sex NVARCHAR(10),
Age FLOAT,
SibSp INT,
Parch INT,
Ticket NVARCHAR(20),
Fare FLOAT,
Cabin NVARCHAR(20),
Embarked NVARCHAR(5)
);
adult 테이블 생성
CREATE TABLE adult (
age INT,
workclass NVARCHAR(20),
fnlwgt INT,
education NVARCHAR(20),
education_num INT,
marital_status NVARCHAR(30),
occupation NVARCHAR(20),
relationship NVARCHAR(20),
race NVARCHAR(20),
sex NVARCHAR(10),
capital_gain INT,
capital_loss INT,
hours_per_week INT,
native_country NVARCHAR(30),
income NVARCHAR(10)
);
이후 컨테이너에 csv파일을 올리고, 이전 매개변수 실습에서 사용했던 파이프라인을 이용하여 해당 테이블에 csv의 데이터들을 넣고 input 스토리지에서 파일 삭제

연결된 서비스(Linked Service)
데이터세트(Dataset)


SourceTableDS: 매개변수(schemaName, tableName)를 사용하여 동적으로 테이블을 지정함
schemaName 식: @dataset().schemaNametableName 식: @dataset().tableName



SinkCsvDS: 매개변수(fileName)를 사용하여 동적으로 출력 파일명을 지정함
* fileName 식: @dataset().fileName







ListTablesTableListDSSELECT TABLE_SCHEMA, TABLE_NAME
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE = 'BASE TABLE'
AND TABLE_SCHEMA = 'dbo'









ForEachTable@activity('ListTables').output.valueExportTableSourceTableDStableName: @item().TABLE_NAMEschemaName: @item().TABLE_SCHEMASinkCsvDSfileName: @concat(item().TABLE_NAME, '.csv')파이프라인 실행: '모두 게시' 후 파이프라인을 실행합니다.

모니터링: ListTables 성공 후 ForEachTable 내에서 각 테이블(adult, titanic, wine)에 대한 ExportTable 활동이 성공했는지 확인합니다.
출력 확인: 스토리지의 output 컨테이너에 adult.csv, titanic.csv, wine.csv 파일이 생성되었는지 확인합니다.

다양한 상황에서 Azure Data Factory의 데이터 처리 상태를 사용자에게 알릴 필요가 있습니다. 이메일 알림은 이런 요구를 충족시키기 위한 유용한 수단입니다.
| 분류 | 사례 | 설명 |
|---|---|---|
| 개발/운영 | ETL 오류 또는 성능 지연 발생 | 운영팀에 오류 발생 사실 실시간 전달 |
| 보안/감사 | 민감한 데이터 이동 완료 시 | 감사 로그 목적의 알림 전송 |
| 데이터 분석 | 외부 시스템에서 데이터 수집 시작 시 | 자동화된 분석 시작 시점 감지 가능 |
| 보고서 갱신 | 데이터 변환 후 Power BI 리프레시 완료 | 사용자에게 최신 리포트 반영 시점 안내 |
Azure Data Factory는 이메일 알림을 위한 두 가지 구현 방식을 고려할 수 있습니다. 각 방식은 사용자의 목적에 따라 선택 가능합니다.
| 옵션 | 설명 | 장점 | 권장 시나리오 |
|---|---|---|---|
| Azure Monitor Alerts | Azure에서 기본 제공하는 모니터링 및 경보 기능을 통해 이메일 알림을 전송 | 설정 간편, 추가 비용 없음 | 단순 성공/실패 모니터링, 장애 감지 시 알림 |
| Web Activity + Logic Apps | ADF의 Web Activity에서 Logic Apps 호출 후, HTTP 트리거 기반의 이메일 전송 | 유연한 구성, 세부 커스터마이징 가능, HTML 이메일 가능 | 알림 내용 및 대상 사용자 지정, 포맷 설정 필요 시 |
Azure Monitor는 Azure에서 기본 제공되는 종합 모니터링 및 알림 솔루션으로, 다양한 리소스 상태를 감시하고 이벤트 발생 시 알림을 보냅니다.
파이프라인 준비: Lab - Email Alert 내에 AzureMonitorAlert 파이프라인을 생성하고 실패를 유도하는 복사 활동을 구성합니다
경고 규칙 생성: ADF 모니터링 탭의 [경고 및 메트릭]에서 [새로운 경고 규칙]을 클릭합니다.
조건 구성:
Failed pipeline runs metrics.FailureType (UserError, SystemError, BadGateway 선택).알림 및 작업 그룹 구성:
Test group.테스트 및 확인: 파이프라인 실행 후 실패가 발생하면 설정한 메일로 알림이 오는지 확인합니다. (디버그하면 안됨, 트리거 사용)
[수신 이메일 예시 - Activated]
Your Azure Monitor alert was triggered
Rule: copy-pipeline failure alert
Metric: PipelineFailedRuns
Value: 1
,
[수신 이메일 예시 - Deactivated]
Your Azure Monitor alert was resolved
Alert deactivated because one of the following conditions is no longer true.


Copy iris, Copy penguins 활동을 포함하는 LogicAppAlert 파이프라인을 생성합니다,.iris.csv, penguins.csv 파일을 스토리지에 준비합니다,.
{
"type": "object",
"properties": {
"title": {
"type": "string"
},
"message": {
"type": "string"
},
"AdfName": {
"type": "string"
},
"pipelineName": {
"type": "string"
},
"pipelineRunID": {
"type": "string"
},
"time": {
"type": "string"
}
}
}

Send OK Email 이름의 Web 활동을 추가합니다.POST. {
"title": "파이프라인 실행 완료 알림",
"message": "데이터 복사 파이프라인이 성공적으로 완료되었습니다.",
"AdfName": "@{pipeline().DataFactory}",
"pipelineName": "@{pipeline().Pipeline}",
"pipelineRunID": "@{pipeline().RunId}",
"time": "@{utcNow()}"
}


title- 데이터 팩토리 이름: AdfName
- 파이프라인 이름: pipelineName
- 파이프라인 Run ID: pipelineRunID
- 실행 완료 시각: time파이프라인 실행 완료 알림.성공 알림뿐만 아니라 실패 시에도 알림을 보내기 위해 파이프라인을 확장합니다.
send NOK Email 이름의 Web 활동을 생성하고 실패 경로(빨간색 선)로 연결합니다.{
"title": "파이프라인 실행 실패 알림",
"message": "데이터 복사 파이프라인의 실행이 실패하였습니다.",
"AdfName": "@{pipeline().DataFactory}",
"pipelineName": "@{pipeline().Pipeline}",
"pipelineRunID": "@{pipeline().RunId}",
"time": "@{utcNow()}"
}실제 운영 시에는 개별 파이프라인에 매번 알림을 넣기보다 MasterAlertPipeline을 구축하여 호출된 파이프라인의 에러 메시지를 전달하는 방식이 효율적입니다,.

```json
{
"title": "파이프라인 실행 실패 알림",
"message": "@{activity('Execute Pipeline1').error.message}",
"AdfName": "@{pipeline().DataFactory}",
"pipelineName": "@{pipeline().Pipeline}",
"pipelineRunID": "@{pipeline().RunId}",
"time": "@{utcNow()}"
}
```

실습이 끝나면 불필요한 비용 발생을 방지하기 위해 생성한 모든 리소스(Logic App, API Connection, Action Group 등)와 리소스 그룹을 삭제합니다,.
데이터 변환 과정에서 특정 조건에 맞는 데이터를 선별하거나, 분석 효율을 높이기 위해 데이터를 정렬하는 과정은 필수적입니다. Azure Data Factory의 매핑 데이터 플로우(MDF)는 이를 시각적으로 구성할 수 있는 기능을 제공합니다.
필터 변환은 소스 데이터에서 지정한 조건을 만족하는 행(row)만 선택해 통과시키는 변환 단계입니다.
Score > 80, Category == 'A' and Region != 'Seoul'정렬 변환은 입력된 데이터 세트를 하나 이상의 컬럼을 기준으로 오름차순 또는 내림차순으로 정렬하는 변환 단계입니다.

dbo.adult 테이블 사용 (age, workclass, income 등 포함).income == ">50K" (고소득자 데이터 선별).adult_over50K_sorted.csv 파일로 저장.CREATE TABLE adult (
age INT,
workclass NVARCHAR(20),
fnlwgt INT,
education NVARCHAR(20),
education_num INT,
marital_status NVARCHAR(30),
occupation NVARCHAR(20),
relationship NVARCHAR(20),
race NVARCHAR(20),
sex NVARCHAR(10),
capital_gain INT,
capital_loss INT,
hours_per_week INT,
native_country NVARCHAR(30),
income NVARCHAR(10)
);select count(*) from [dbo].[adult] 실행 시 32,561건 확인.output 컨테이너 준비.outputSQL (Azure SQL DB), BlobStorage1 (Azure Blob Storage).AdultSqlInput_DS): SQL DB의 dbo.adult 테이블 연결.AdultCsvOutput_DS): Blob Storage의 output 컨테이너 연결.
FilterSort_DF.adult.AdultSqlInput_DS.FilterHighIncome)Filter 활동 연결.FilterHighIncome.adult.income == ">50K">50K인 행만 남는지 확인.SortByAge)Sort 활동 연결.SortByAge.FilterHighIncome.age.오름차순 (Ascending).AdultHighIncomeSortedSink 추가 및 AdultCsvOutput_DS 연결.adult_highincome_sorted.csv 입력.FilterSortAdult_PL 생성 후 데이터 플로우 활동 추가.output 컨테이너에서 결과 파일 확인.
고정된 값이 아닌, 실행 시점에 입력받은 값으로 필터링하고 정렬할 수 있도록 구성을 변경합니다.
매핑 데이터 플로우의 [매개 변수] 탭에서 다음 항목을 추가합니다.

filterColumn (string): 필터링할 컬럼 이름.filterValue (string): 필터링할 기준 값.sortColumn (string): 정렬할 컬럼 이름.FilterAdult):case(
type(byName($filterColumn))=='Integer', toInteger(byName($filterColumn)) == toInteger($filterValue),
toString(byName($filterColumn)) == $filterValue
)SortByParam):byName($sortColumn)
pFilterColumn, pFilterValue, pSortColumn 생성.filterColumn = @pipeline().parameters.pFilterColumnfilterValue = @pipeline().parameters.pFilterValuesortColumn = @pipeline().parameters.pSortColumn
pFilterColumn: agepFilterValue: 52pSortColumn: hours_per_weekadult_filtered_sorted.csv 파일이 생성됩니다.
실습 마무리: 모든 작업이 완료되면 비용 발생 방지를 위해 데이터 흐름 디버그 모드를 반드시 중지하십시오.
Get Metadata 활동은 데이터 소스(파일, 폴더, 테이블 등)의 메타데이터(크기, 수정일, 컬럼 목록 등)를 조회하는 데이터팩토리의 처리 단계입니다.
childItems: 폴더 내 하위 항목 목록exists: 데이터 소스 존재 여부lastModified: 마지막 수정 시간size: 파일 크기structure: 데이터 구조(컬럼 목록) 등winequality-red.csv)과 화이트 와인(winequality-white.csv)의 화학적 테스트 결과 데이터입니다.;)을 구분자로 사용하는 CSV 파일입니다.wine-quality 컨테이너 내에 위 두 파일을 업로드합니다.GetMetadataWine_PL 파이프라인을 생성하고 메타데이터 가져오기 활동을 추가합니다.wineContainer_DS):BlobStorage1.wine-quality 컨테이너 지정.
existslastModifiedchildItems

조회된 메타데이터 목록을 바탕으로 특정 조건(파일명에 'red' 포함)에 맞는 파일만 다른 컨테이너로 복사하는 실습입니다.
wine-quality-output 컨테이너로 복사.ForEachWineFiles)@activity('WineFiles').output.childItemsIf Red)@contains(item().name, 'red')Copy Red Wine)wineContainerInput_DS):fileName 생성.@dataset().fileName.@item().name.wineContainerOutput_DS):wine-quality-output.fileName 생성.@item().name.wine-quality-output 컨테이너에 winequality-red.csv 파일이 정상적으로 복사되었음을 확인합니다.