Trino, 병렬 처리

Jeonghak Cho·2025년 8월 31일

Bigdata

목록 보기
28/30

Trino 병렬 처리 개요

Trino는 MPP (Massively Parallel Processing) 아키텍처를 기반으로 하며, 대규모 분산 쿼리를 병렬로 처리하는 데 최적화되어 있다. Trino의 병렬 처리 아키텍처는 다음과 같은 주요 컴포넌트와 흐름으로 구성되어 있다

Coordinator와 Worker

Coordinator (조정자)

  • 클러스터에서 단 하나만 존재
  • 사용자의 쿼리를 수신하고, SQL 파싱, 분석, 최적화를 수행
  • 전체 쿼리를 여러 단계의 Stage로 나누고 각 Stage를 Worker에 분배
  • 쿼리 상태와 진행 상황을 관리

Worker (작업자)

  • 실질적인 쿼리 실행을 담당 (스캔, 필터, 조인, 집계 등)
  • Coordinator로부터 받은 Task를 병렬로 처리
  • 서로 직접 데이터를 교환 (shuffle, broadcast 등)

쿼리 실행 흐름과 병렬 처리 구조

  1. 쿼리 수신 및 분석
    사용자가 쿼리를 제출하면 Coordinator가 SQL을 파싱 → 분석 → 논리 계획 생성

  2. 분산 쿼리 계획 생성

  • 쿼리는 DAG 형태의 여러 Stage로 나뉨
  • 각 Stage는 하나 이상의 Task로 분할됨
  • Task는 Worker 노드에 배치되고 병렬로 실행됨
  1. Pipeline 기반 Task 처리
  • 각 Task는 내부적으로 여러 Pipeline을 가짐
  • 각 Pipeline은 Driver라고 불리는 단위로 실행됨 (멀티 스레드 처리 가능)
  • Driver는 Operator 체인을 통해 데이터를 처리 (예: Scan → Filter → Join → Aggregate)
  1. 데이터 교환
  • Stage 간에는 Exchange Operator를 통해 데이터를 주고받음
  • 이는 일반적으로 shuffle 또는 broadcast 방식으로 병렬 수행됨

병렬 처리 핵심 요소

구성 요소설명
Stage논리적 쿼리 단계, 종속성을 가진 DAG 형태로 구성
TaskStage의 실행 단위, 여러 Worker에 분산되어 병렬 수행
PipelineTask 내부 처리 단계, 연산 체인 구성
DriverPipeline을 실행하는 실제 쓰레드 단위
OperatorScan, Filter, Join 등 작업 단위, Driver 내부에 포함
ExchangeStage 간 데이터 전송 (broadcast, shuffle 등)

JOIN 쿼리 처리 예

SELECT * FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE c.region = 'APAC';

Stage 1: customers 테이블에서 조건 필터 → region = 'APAC'
Stage 2: orders 테이블 전처리
Stage 3: 두 데이터 셋을 join → output 전달

각 Stage는 여러 Task로 나뉘며, Worker에 분산되어 동시에 실행된다.

병렬 처리 최적화 기법

Broadcast Join

한 테이블이 매우 작고, 다른 테이블이 크다면 작은 테이블을 모든 Worker 노드에 복사(broadcast)해서 join을 수행한다.
Shuffle을 피하고, 각 Worker가 로컬에서 join을 처리하므로 성능이 비약적으로 향상될 수 있다.

SELECT * 
FROM large_table l
JOIN small_lookup s ON l.key = s.key;

small_lookup 테이블이 작다면, Trino는 이를 자동으로 broadcast해서 large_table을 처리 중인 모든 Task에 전달

네트워크 I/O 감소 (shuffle 회피), join 수행 시간 단축의 장점을 가지나 Broadcast할 테이블이 너무 크면 Out of Memory (OOM) 발생 가능한다.
Trino는 broadcast 가능한 크기(limit)를 설정값에 따라 판단한다.

join-distribution-type=AUTO  # 기본값. 필요시 BROADCAST 또는 PARTITIONED 지정 가능
join.max-broadcast-table-size=100MB  # broadcast 허용 최대 테이블 크기

Partitioned Join (Shuffle Join)

두 테이블 모두 크거나, broadcast하기엔 부담될 때, join key 기준으로 hash partitioning 후 데이터를 각 Worker로 분산한다.

SELECT * 
FROM sales s
JOIN transactions t ON s.txn_id = t.txn_id;

Trino는 txn_id 기준으로 두 테이블 모두를 파티셔닝해서 동일 키끼리 join 가능하도록 재분배한다. 대규모 join 처리 가능 (scale-out에 유리)하고, 데이터 양이 많을수록 병렬 처리 효과 극대화한다. 하지만 네트워크 shuffle 비용 발생하고, 불균형한 key 분포(skew) 의 분포로 인해 일부 Worker에 부하 집중 가능한점에 유의해야 한다.

join-distribution-type=PARTITIONED  # 강제로 shuffle join 사용

추가 최적화 기법

기법설명
Dynamic Filteringjoin 조건이 실행 중 확정될 때, 이를 기반으로 필터 pushdown을 적용하여 unnecessary scan을 줄임
Table StatisticsHive Metastore 또는 Iceberg의 metadata를 활용하여 join 전략 결정
Repartitioning Hints특정 column으로 데이터 분산을 명시 (REPARTITION, DISTRIBUTE BY 등)

EXPLAIN ANALYZE 예시

실제 실행 성능 진단을 하는 예시다. 로컬에서 단일 인스턴스로 트리노를 실행하는 간단한 예시이다.

Trino version: 476
Queued: 374.28us, Analysis: 37.34ms, Planning: 68.90ms, Execution: 1.04s
Fragment 1 [SOURCE]
    CPU: 1.82s, Scheduled: 4.27s, Blocked 2.03s (Input: 0.00ns, Output: 2.03s), Input: 5519143 rows (94.74MB); per task: avg.: 5519143.00 std.dev.: 0.00, Output: 5519143 rows (94.74MB)
    Peak Memory: 21.75MB, Tasks count: 1; per task: max: 38.69MB
    Output layout: [ss_sold_date_sk, ss_sold_time_sk]
    Output partitioning: SINGLE []
    TableScan[table = iceberg:default.store_sales$data@6054846871047636417 constraint on [d_date]]
        Layout: [ss_sold_date_sk:bigint, ss_sold_time_sk:bigint]
        Estimates: {rows: 5519143 (93.75MB), cpu: 93.75M, memory: 0B, network: 0B}
        CPU: 1.81s (100.00%), Scheduled: 4.26s (100.00%), Blocked: 2.03s (100.00%), Output: 5519143 rows (94.74MB)
        Input avg.: 459928.58 rows, Input std.dev.: 52.92%
        ss_sold_time_sk := 2:ss_sold_time_sk:bigint
        ss_sold_date_sk := 1:ss_sold_date_sk:bigint
        24:d_date:date
            :: [[2000-01-01, 2000-12-31]]
        Input: 5519143 rows (94.74MB), Physical input: 3.73MB, Physical input time: 863.39ms, Splits: 12, Splits generation wait time: 39.04ms
  • Queued: 쿼리 대기 시간 (≈0.3ms → 무시 가능)
  • Analysis: 쿼리 구문 및 카탈로그 메타데이터 분석 (≈37ms, 정상)
  • Planning: 실행 계획 최적화 (≈68ms, 정상)
  • Execution: 실제 실행 시간 (≈1.04초)

CPU vs Scheduled vs Blocked

  • CPU time: 1.82s → 실제 연산에 사용한 시간
  • Scheduled: 4.27s → 전체 실행 시간 중에서 리소스 할당 포함
  • Blocked: 2.03s → Output 단계에서 Blocked → 결과를 Coordinator 단일 노드로 모으는 동안 대기한 시간 발생

SINGLE output partitioning 때문에 Worker → Coordinator 수집 과정에서 Blocked time이 생김

Input vs Output

  • Input: 5519143 rows (94.74MB)
  • Physical input: 3.73MB (Iceberg 파일 실제 I/O량)
  • Output: 동일한 row/크기

Iceberg의 column pruning + predicate pruning 최적화가 적용됨

  • Iceberg 파일 총 크기 >> 3.73MB인데, 실제 읽은 물리적 데이터는 매우 작음, Iceberg의 Metadata/Manifest pushdown 최적화 성공

Tasks & Splits

Task count: 1

Splits: 12

이 쿼리는 단일 Fragment에 단일 Task만 생성 (즉, Worker 병렬 스케줄링이 안 됨)

  • Iceberg connector가 파일 단위로 12 splits을 만들었으나, Coordinator에서 단일 Task로 실행됨
    따라서 진정한 클러스터 병렬성은 거의 활용되지 않음

Input skew

  • Input avg.: 459928.58 rows, Input std.dev.: 52.92%
  • 평균적으로 스플릿당 약 46만 row 처리
  • std.dev. 52.9% → 분포 불균형이 꽤 있음, 어떤 Split은 훨씬 많은 데이터를 처리했고, 어떤 Split은 적었음

Iceberg 파일/파티션 크기가 균등하지 않다는 뜻 (작은 파일/큰 파일 혼재)

0개의 댓글