
필터 활동은 필터 변환과 달리, 파이프라인 내에서 배열(Array) 데이터(예: Lookup, Get Metadata, Web Activity 등에서 반환된 값)를 조건에 따라 필터링하는 활동(Activity)입니다.
| 구분 | 필터 활동 | 필터 변환 |
|---|---|---|
| 적용 위치 | 파이프라인(Activity) | 매핑 데이터 플로우(Transformation) |
| 적용 대상 | 배열 데이터(Array, JSON 등) | 테이블 데이터(행/컬럼 기반) |
| 조건 작성 방식 | 파이프라인 식(표현식) | 데이터 플로우 내 식(식 편집기) |
| 용도 | 리스트, 메타데이터 등 구조화된 배열 | 레코드(행) 기반 데이터 처리 |
| 주요 사용 예시 | 파일/테이블/객체 리스트 조건 분기 | 데이터 필터링(컬럼 조건에 따라 행 추출) |

a000storagedemobaseball-hitter2000_2001_hitter.csv2000_2001_hitter.xlsx2002_2013_hitter.csv2002_2013_hitter.xlsx2014_hitter.csv2014_hitter.xlsxBlobStorage1AutoResolveIntegrationRuntimeGet Metadata - List Files)baseballinput_DS (baseball-hitter 컨테이너 연결)existschildItems{
"exists": true,
"itemName": "baseball-hitter",
"itemType": "Folder",
"childItems": [
{ "name": "2000_2001_hitter.csv", "type": "File" },
{ "name": "2000_2001_hitter.xlsx", "type": "File" },
...
]
}If Exist)@activity('Get Metadata - List Files').output.exists
fileListToProcessArrayFilter CSV)@pipeline().parameters.fileListToProcess@and(
equals(item().type, 'File'),
endswith(item().name, '.csv')
)ForEach Files)@activity('Filter CSV').output.value
Copy Files)baseballCopyInput_DSfileName 사용: @dataset().fileName@item().namebaseballCopyOutput_DSbaseball-hitter/csvfileName 사용: @dataset().fileName@item().nameExecute Pipeline 활동을 추가합니다.FilterForEachCopy_PLfileListToProcess) 전달:@activity('Get Metadata - List Files').output.childItems.csv와 .xlsx 파일이 모두 포함된 배열 전달.csv 파일 3개만 필터링)baseball-hitter/csv 폴더 내에 필터링된 파일들이 정상 복사되었는지 확인합니다:2000_2001_hitter.csv2002_2013_hitter.csv2014_hitter.csv마무리: Filter 활동을 통해 파이프라인 흐름 제어 단계에서 배열 데이터를 정교하게 제어할 수 있으며, If Condition과 결합 시 자식 파이프라인 호출 방식을 활용해야 함을 유의하시기 바랍니다.
파생열 변환은 기존 데이터 컬럼을 가공하거나 새로운 컬럼을 추가할 때 사용하는 변환 단계입니다. 입력된 데이터의 컬럼 값을 수식/표현식으로 가공하여 파생 컬럼을 생성하거나 기존 컬럼 값을 대체할 수 있습니다.
조건부 분할 변환은 입력 데이터 행(Row)을 지정한 조건(수식/표현식)에 따라 여러 그룹(분기)으로 나누어 주는 변환 단계입니다.
employee.csv)ID, Name, Salary, Address, Location, Email
1, 김철수, 2750000, 서울 강남구, Korea, chulsu.kim@example1.com
2, 이영희,, Irvine CA, US, younghee.lee@example3.com
3, 박민준, 3820000, 인천 연수구, Korea, minjun.park@example1.com
4, 최지영, 4500000, 부산 해운대구, Korea, jiyoung.choi@example2.com
5, 정윤화, 2810000, 대구 수성구, Korea, yoonhwa.chung@example2.com
6, 강서준, 3540000,, China, seojun.kang@example2.com
7, 윤아영,, 제주 제주시, Korea, ayoung.yoon@example3.com
a000storagedemo2 내 employee 컨테이너 생성 및 employee.csv 업로드.sourceCsv_DS): employee.csv 연결, 첫 번째 행을 머리글로 설정.missingValueAddress)Addressiif(isNull(Address), 'unknown', Address)missingValueSalary)Salaryiif(isNull(Salary), '2500000', Salary)upperLocationToCountry)Country (새로 만들기)upper(Location)sink1): employee_processed.csv로 단일 파일 출력.employee1_PL 생성 후 데이터 흐름 실행.employee_processed.csv 생성 완료.기존 파이프라인에 추가 실습 구성을 연결합니다.
addSalaryGrade)SalaryGradeiif(toInteger(Salary) <= 3000000, 'Low',
iif(toInteger(Salary) <= 4000000, 'Mid', 'High'))addEmailDomain)EmailDomainsplit(Email,'@')SplitByLocation)headquarters):Location == 'Korea'Branch):headquaterSink):headquarters-employee.csvbranchSink):branch-employee.csvheadquarters-employee.csv 예시:
ID, Name, Salary, Address, Location, Email, Country, SalaryGrade, EmailDomain
1, 김철수, 2750000, 서울 강남구, Korea, chulsu.kim@example1.com, KOREA, Low, example1.com
3, 박민준, 3820000, 인천 연수구, Korea, minjun.park@example1.com, KOREA, Mid, example1.com
4, 최지영, 4500000, 부산 해운대구, Korea, jiyoung.choi@example2.com, KOREA, High, example2.com
...
branch-employee.csv 예시:
ID, Name, Salary, Address, Location, Email, Country, SalaryGrade, EmailDomain
2, 이영희, 2500000, Irvine CA, US, younghee.lee@example3.com, US, Low, example3.com
6, 강서준, 3540000, unknown, China, seojun.kang@example2.com, CHINA, Mid, example2.com
실습이 완료된 후에는 불필요한 비용이 발생하지 않도록 데이터 흐름 디버그 모드를 반드시 중지하고 게시(Publish)를 확인합니다.
Until 활동은 지정한 조건이 만족될 때까지 내부에 정의한 액티비티를 반복 실행하는 파이프라인 제어 단계입니다.
@equals(variables('fileFound'), true))Set Variable 활동은 파이프라인 변수(Pipeline Variable)의 값을 동적으로 변경하는 액티비티입니다.
@item().name, @add(variables('count'),1))
employee_batch.csv)ID, Name, Department, Salary, Location
1, 오준호, IT, 69190, Gwangju
2, 최예린, Finance, 61538, Daegu
3, 강다은, Finance, 55729, Seoul
4, 정우성, Marketing, 46409, Gwangju
5, 오준호, Marketing, 57249, Seoul
6, 서지아, HR, 45784, Incheon
7, 박지훈, HR, 53096, Seoul
8, 오준호, Marketing, 52560, Seoul
9, 강다은, Finance, 57533, Seoul
10, 정우성, IT, 52343, Daegu
11, 최예린, Marketing, 52206, Seoul
12, 강다은, HR, 66980, Gwangju
13, 강다은, Finance, 50801, Incheon
14, 박지훈, Marketing, 64190, Incheon
15, 한수진, Marketing, 61921, Seoul
16, 정우성, Finance, 50986, Daegu
17, 이서연, Marketing, 63225, Daegu
18, 강다은, Finance, 55647, Gwangju
19, 한수진, IT, 53716, Seoul
20, 이서연, Finance, 68355, Incheon
21, 정우성, Finance, 67009, Daegu
22, 김민준, Finance, 64334, Seoul
23, 서지아, Marketing, 69376, Daegu
24, 한수진, Marketing, 57323, Daegu
25, 윤도현, Finance, 49780, Incheon
26, 김민준, Marketing, 47368, Busan
27, 서지아, HR, 57039, Gwangju
28, 박지훈, HR, 51655, Gwangju
29, 오준호, Marketing, 53173, Incheon
30, 최예린, Marketing, 49495, Daegu
31, 윤도현, Finance, 55893, Seoul
32, 박지훈, IT, 67386, Daegu
33, 정우성, Marketing, 67998, Incheon
34, 박지훈, HR, 58403, Gwangju
35, 오준호, Finance, 58121, Daegu
36, 정우성, Marketing, 67303, Gwangju
37, 윤도현, HR, 55966, Seoul
38, 오준호, HR, 45853, Gwangju
39, 이서연, IT, 65530, Gwangju
40, 최예린, Finance, 65153, Busan
41, 윤도현, Finance, 61958, Busan
42, 이서연, IT, 62532, Busan
43, 서지아, IT, 67677, Gwangju
employee_batch.csv 업로드 완료GetTotalCount_DF)전체 레코드 수를 계산하여 파일로 저장하는 데이터 플로우를 구성합니다,.
employeeBatchData): employee_batch.csv 연결.aggregateCount):totalRecordscount(ID).sinkTotalCount)total_count.csv.ProcessEmployeeBatches_DF)데이터를 배치 단위로 읽어 부서별로 분기하여 저장합니다,.
offset (integer): 시작 지점limit (integer): 읽어올 행 수employeeBatchData2)$offset$limitSplitByDepartment)Department=='HR'Department=='IT'concat('employee_hr_',toString($offset),'.csv')concat('employee_it_',toString($offset),'.csv')concat('employee_others_',toString($offset),'.csv')ProcessEmployeeBatch_PL)batchSize (Int, 기본값: 50)batchOffset (Integer, 기본값: 0)totalCount (Integer, 기본값: 0)tempOffset (Integer, 기본값: 0)GetDataFlowTotalCount): GetTotalCount_DF 실행.SetTotalCount): 조회된 행 수를 변수에 저장.@activity('GetDataFlowTotalCount').output.runStatus.metrics.sinkTotalCount.sources.employeeBatchData.rowsRead
```,UntilAllBatches)@greaterOrEquals(variables('batchOffset'), variables('totalCount'))
```,ProcessBatchDataFlow): 데이터를 배치 단위로 처리.offset 파라미터 매핑: variables('batchOffset')limit 파라미터 매핑: pipeline().parameters.batchSizeSetOffset): 현재 오프셋 임시 저장.tempOffset = variables('batchOffset')SetOffset2): 다음 실행을 위한 오프셋 갱신.batchOffset = @add(variables('tempOffset'), pipeline().parameters.batchSize)GetDataFlowTotalCount: 성공 (200개 레코드 조회 확인)SetTotalCount: 성공 (totalCount = 200 설정)UntilAllBatches: 성공 (배치 크기 50 기준 총 4회 반복 실행)ProcessBatchDataFlow (각 회차): 성공output 컨테이너)total_count.csv (200 저장 확인)employee_hr_0.csv, employee_hr_50.csv, employee_hr_100.csv, employee_hr_150.csvemployee_it_0.csv, employee_it_50.csv, employee_it_100.csv, employee_it_150.csvemployee_others_0.csv, employee_others_50.csv, employee_others_100.csv, employee_others_150.csv실습 완료 후에는 비용 발생 방지를 위해 데이터 흐름 디버그 모드를 반드시 중지하십시오.
Join 변환(Join Transformation)은 두 개의 입력 스트림을 지정된 키를 기준으로 병합(Join)하는 데이터 변환 단계입니다.,
두 테이블 모두에 조인 키가 존재하는 데이터만 반환합니다.
| A.ID | A.이름 | A.도시 | B.주문ID | B.고객ID | B.상품명 |
|---|---|---|---|---|---|
| 1 | 김철수 | 서울 | 101 | 1 | 노트북 |
| 1 | 김철수 | 서울 | 104 | 1 | 모니터 |
| 2 | 이영희 | 부산 | 102 | 2 | 마우스 |
양쪽 테이블의 모든 데이터를 반환하며, 짝이 없는 경우 NULL로 표시됩니다.
| A.이름 | A.ID | A.도시 | B.주문ID | B.고객ID | B.상품명 |
|---|---|---|---|---|---|
| 김철수 | 1 | 서울 | 101 | 1 | 노트북 |
| 김철수 | 1 | 서울 | 104 | 1 | 모니터 |
| 이영희 | 2 | 부산 | 102 | 2 | 마우스 |
| 박민준 | 3 | 서울 | NULL | NULL | NULL |
| 최지우 | 5 | 인천 | NULL | NULL | NULL |
| NULL | NULL | NULL | 103 | 4 | 키보드 |
왼쪽 테이블(고객 정보)의 모든 데이터와 오른쪽 테이블(주문 정보)의 매칭되는 데이터를 반환합니다.
| A.ID | A.이름 | A.도시 | B.주문ID | B.고객ID | B.상품명 |
|---|---|---|---|---|---|
| 1 | 김철수 | 서울 | 101 | 1 | 노트북 |
| 1 | 김철수 | 서울 | 104 | 1 | 모니터 |
| 2 | 이영희 | 부산 | 102 | 2 | 마우스 |
| 3 | 박민준 | 서울 | NULL | NULL | NULL |
| 5 | 최지우 | 인천 | NULL | NULL | NULL |
오른쪽 테이블(주문 정보)의 모든 데이터와 왼쪽 테이블(고객 정보)의 매칭되는 데이터를 반환합니다.
| A.ID | A.이름 | A.도시 | B.주문ID | B.고객ID | B.상품명 |
|---|---|---|---|---|---|
| 1 | 김철수 | 서울 | 101 | 1 | 노트북 |
| 1 | 김철수 | 서울 | 104 | 1 | 모니터 |
| 2 | 이영희 | 부산 | 102 | 2 | 마우스 |
| NULL | NULL | NULL | 103 | 4 | 키보드 |
ID, Name, Address
1, 김철수, 서울
2, 이영희, 부산
3, 박민준, 서울
5, 최지우, 인천OrderID, CustomerID, ProductName
101, 1, 노트북
102, 2, 마우스
103, 4, 키보드
104, 1, 모니터customers.csv, orders.csv 업로드customerData): CustomersInput_DS 연결 (customers.csv).ordersData): Orderinput_DS 연결 (orders.csv).joinCustomerOrders)customerDataordersData내부(Inner)IDCustomerIDsinkJoinedData)JoinedOutputCSV_DSjoined_customer_orders.csvJoinCustomerOrders_PLExecuteJoin_DF (Join_DF 실행)output 컨테이너에 3개의 행이 포함된 파일 생성 확인.,aggregateTotalOrdersByCustomer)joinCustomerOrdersID, NametotalOrderscount(OrderID)customer_total_orders.csvfilterSeoulCustomers)joinCustomerOrdersAddress == '서울'seoul_customer_orders.csv,실습 마무리: 모든 작업이 완료되면 비용 발생 방지를 위해 데이터 흐름 디버그 모드를 종료합니다.