Trino는 MPP (Massively Parallel Processing) 아키텍처를 기반으로 하며, 대규모 분산 쿼리를 병렬로 처리하는 데 최적화되어 있다. Trino의 병렬 처리 아키텍처는 다음과 같은 주요 컴포넌트와 흐름으로 구성되어 있다
쿼리 수신 및 분석
사용자가 쿼리를 제출하면 Coordinator가 SQL을 파싱 → 분석 → 논리 계획 생성
분산 쿼리 계획 생성
| 구성 요소 | 설명 |
|---|---|
| Stage | 논리적 쿼리 단계, 종속성을 가진 DAG 형태로 구성 |
| Task | Stage의 실행 단위, 여러 Worker에 분산되어 병렬 수행 |
| Pipeline | Task 내부 처리 단계, 연산 체인 구성 |
| Driver | Pipeline을 실행하는 실제 쓰레드 단위 |
| Operator | Scan, Filter, Join 등 작업 단위, Driver 내부에 포함 |
| Exchange | Stage 간 데이터 전송 (broadcast, shuffle 등) |
SELECT * FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE c.region = 'APAC';
Stage 1: customers 테이블에서 조건 필터 → region = 'APAC'
Stage 2: orders 테이블 전처리
Stage 3: 두 데이터 셋을 join → output 전달
각 Stage는 여러 Task로 나뉘며, Worker에 분산되어 동시에 실행된다.
한 테이블이 매우 작고, 다른 테이블이 크다면 작은 테이블을 모든 Worker 노드에 복사(broadcast)해서 join을 수행한다.
Shuffle을 피하고, 각 Worker가 로컬에서 join을 처리하므로 성능이 비약적으로 향상될 수 있다.
SELECT *
FROM large_table l
JOIN small_lookup s ON l.key = s.key;
small_lookup 테이블이 작다면, Trino는 이를 자동으로 broadcast해서 large_table을 처리 중인 모든 Task에 전달
네트워크 I/O 감소 (shuffle 회피), join 수행 시간 단축의 장점을 가지나 Broadcast할 테이블이 너무 크면 Out of Memory (OOM) 발생 가능한다.
Trino는 broadcast 가능한 크기(limit)를 설정값에 따라 판단한다.
join-distribution-type=AUTO # 기본값. 필요시 BROADCAST 또는 PARTITIONED 지정 가능
join.max-broadcast-table-size=100MB # broadcast 허용 최대 테이블 크기
두 테이블 모두 크거나, broadcast하기엔 부담될 때, join key 기준으로 hash partitioning 후 데이터를 각 Worker로 분산한다.
SELECT *
FROM sales s
JOIN transactions t ON s.txn_id = t.txn_id;
Trino는 txn_id 기준으로 두 테이블 모두를 파티셔닝해서 동일 키끼리 join 가능하도록 재분배한다. 대규모 join 처리 가능 (scale-out에 유리)하고, 데이터 양이 많을수록 병렬 처리 효과 극대화한다. 하지만 네트워크 shuffle 비용 발생하고, 불균형한 key 분포(skew) 의 분포로 인해 일부 Worker에 부하 집중 가능한점에 유의해야 한다.
join-distribution-type=PARTITIONED # 강제로 shuffle join 사용
| 기법 | 설명 |
|---|---|
| Dynamic Filtering | join 조건이 실행 중 확정될 때, 이를 기반으로 필터 pushdown을 적용하여 unnecessary scan을 줄임 |
| Table Statistics | Hive Metastore 또는 Iceberg의 metadata를 활용하여 join 전략 결정 |
| Repartitioning Hints | 특정 column으로 데이터 분산을 명시 (REPARTITION, DISTRIBUTE BY 등) |
실제 실행 성능 진단을 하는 예시다. 로컬에서 단일 인스턴스로 트리노를 실행하는 간단한 예시이다.
Trino version: 476
Queued: 374.28us, Analysis: 37.34ms, Planning: 68.90ms, Execution: 1.04s
Fragment 1 [SOURCE]
CPU: 1.82s, Scheduled: 4.27s, Blocked 2.03s (Input: 0.00ns, Output: 2.03s), Input: 5519143 rows (94.74MB); per task: avg.: 5519143.00 std.dev.: 0.00, Output: 5519143 rows (94.74MB)
Peak Memory: 21.75MB, Tasks count: 1; per task: max: 38.69MB
Output layout: [ss_sold_date_sk, ss_sold_time_sk]
Output partitioning: SINGLE []
TableScan[table = iceberg:default.store_sales$data@6054846871047636417 constraint on [d_date]]
Layout: [ss_sold_date_sk:bigint, ss_sold_time_sk:bigint]
Estimates: {rows: 5519143 (93.75MB), cpu: 93.75M, memory: 0B, network: 0B}
CPU: 1.81s (100.00%), Scheduled: 4.26s (100.00%), Blocked: 2.03s (100.00%), Output: 5519143 rows (94.74MB)
Input avg.: 459928.58 rows, Input std.dev.: 52.92%
ss_sold_time_sk := 2:ss_sold_time_sk:bigint
ss_sold_date_sk := 1:ss_sold_date_sk:bigint
24:d_date:date
:: [[2000-01-01, 2000-12-31]]
Input: 5519143 rows (94.74MB), Physical input: 3.73MB, Physical input time: 863.39ms, Splits: 12, Splits generation wait time: 39.04ms
CPU vs Scheduled vs Blocked
- CPU time: 1.82s → 실제 연산에 사용한 시간
- Scheduled: 4.27s → 전체 실행 시간 중에서 리소스 할당 포함
- Blocked: 2.03s → Output 단계에서 Blocked → 결과를 Coordinator 단일 노드로 모으는 동안 대기한 시간 발생
SINGLE output partitioning 때문에 Worker → Coordinator 수집 과정에서 Blocked time이 생김
Input vs Output
- Input: 5519143 rows (94.74MB)
- Physical input: 3.73MB (Iceberg 파일 실제 I/O량)
- Output: 동일한 row/크기
Iceberg의 column pruning + predicate pruning 최적화가 적용됨
Tasks & Splits
Task count: 1
Splits: 12
이 쿼리는 단일 Fragment에 단일 Task만 생성 (즉, Worker 병렬 스케줄링이 안 됨)
Input skew
Iceberg 파일/파티션 크기가 균등하지 않다는 뜻 (작은 파일/큰 파일 혼재)