[Snowflake] Tasks & Streams

차지예·2026년 5월 13일

Snowflake

목록 보기
11/49
post-thumbnail

1. Tasks (태스크)

개념 정의

A Task is an object used to schedule the execution of a SQL statement, a stored procedure, or procedural logic using Snowflake Scripting.
Task는 SQL 문, 저장 프로시저(stored procedure), 또는 Snowflake Scripting을 사용한 절차적 로직(procedural logic)의 실행을 스케줄링(예약 실행) 하기 위해 사용되는 객체이다.

  • SQL 문, 저장 프로시저, Snowflake Scripting 등의 실행을 스케줄링하는 객체
  • Task는 SQL 커맨드 및 JavaScript, Python, Java, Scala, Snowflake Scripting으로 작성된 저장 프로시저를 실행할 수 있음

Task 생성에 필요한 권한

ACCOUNTADMIN role 또는 CREATE TASK 권한 필요

Task 생성 문법

CREATE TASK t1
  WAREHOUSE = mywh
  SCHEDULE = '30 MINUTE'
AS
  COPY INTO my_table
  FROM $my_stage;
구문 요소설명
WAREHOUSE = mywhTask 실행에 사용할 웨어하우스 지정
SCHEDULE = '30 MINUTE'트리거 메커니즘 (실행 주기 설정)
AS ...실행할 SQL 쿼리 정의

Task 시작 / 중지

-- Task 시작 (Suspended → Running 상태로 전환)
ALTER TASK minute_task RESUME;

-- Task 중지
ALTER TASK minute_task SUSPEND;

📌보충: Task는 생성 시 기본적으로 Suspended(중지) 상태로 시작됨. 스케줄에 따라 실행되게 하려면 반드시 ALTER TASK ... RESUME을 실행해야 함.


Task Graph (DAG)

Task들을 연결하여 Directed Acyclic Graph(DAG) 형태의 복잡한 워크플로우를 구성할 수 있음.

        T1  ← Root Task (반드시 SCHEDULE 정의 필요)
       /  \
      T2   T3  ← Child Tasks (SCHEDULE 정의 불가)
       \  /
        T4

Root Task vs Child Task

구분Root TaskChild Task
스케줄 정의필수불가
위치DAG의 시작점Root Task 이후 실행
AFTER 절없음AFTER predecessor_task 명시

Child Task 생성 예시

CREATE TASK t4
  WAREHOUSE = mywh
  AFTER t2, t3   -- Child task 트리거 메커니즘
AS
  COPY INTO my_table FROM $my_stage;

Task Graph 제한 사항

  • 가장 바깥 → Snowflake 계정 (모든 DAG의 resumed Task 합산 30,000개 한계)
  • 그 안 → DAG 하나 (Root + Child 전체 합산 1,000개 한계)
  • DAG 안 → Root Task 1개 + Child Task들로 구성
  • 맨 아래 → 단일 Task 하나를 기준으로 Predecessor/Child 각각 100개 한계

2. Streams (스트림)

개념 정의

A stream is an object created to view & track DML changes to a source table — inserts, updates & deletes.

  • 소스 테이블에 발생한 DML 변경 사항(INSERT, UPDATE, DELETE)을 추적하는 CDC(Change Data Capture) 객체
  • Stream은 두 트랜잭션 시점 사이의 행 수준 변경사항을 쿼리하고 소비할 수 있게 해줌

Stream 생성 및 조회

-- Stream 생성
CREATE STREAM my_stream ON TABLE my_table;

-- Stream 조회
SELECT * FROM my_stream;

Stream 메타데이터 컬럼

Stream을 조회하면 원본 테이블 컬럼 외에 아래 3개의 메타데이터 컬럼이 추가됨:

컬럼명설명
METADATA$ACTION변경 유형: INSERT 또는 DELETE
METADATA$ISUPDATEUPDATE 문의 일부인 경우 TRUE, 아니면 FALSE
METADATA$ROW_ID행의 고유하고 불변하는 ID (변경 추적용)

📌 UPDATE 처리 방식 : 소스 테이블의 UPDATE는 스트림에서 DELETE + INSERT 쌍으로 기록되며, 두 레코드 모두 METADATA$ISUPDATE = TRUE로 표시됨.

슬라이드 예시 데이터

EMP_IDEMP_NAMEEMP_DOBEMP_POSITIONMETADATA$ACTIONMETADATA$ISUPDATEMETADATA$ROW_ID
AC1909Ramesh Aravind10/09/1964ActorINSERTFALSEcc576bf4fee43b88c4fd03...

Stream의 동작 원리 (Offset 개념)

Action          Table Version    Stream 상태
─────────────────────────────────────────────
                    V1           SELECT * → 빈 스트림 (Empty stream)
CREATE STREAM
  MYSTREAM ON
  TABLE MYTABLE;
                    │
UPDATE 2 ROWS  →   V2           SELECT * → 2 Updates
                    │
DELETE 1 ROW   →   V3           SELECT * → 2 Updates + 1 Delete
                    ●  (Stream offset)
  • Stream은 offset(기준 시점) 을 저장하며, offset 이후의 변경사항만 반환
  • DML 트랜잭션에서 Stream이 사용되면 offset이 전진(advance) 되고, 소비된 데이터는 더 이상 조회되지 않음
  • Stream 자체는 실제 테이블 데이터를 저장하지 않으며, offset 정보만 저장

즉, offset은 "여기까지는 이미 읽었어요" 표시
Stream은 테이블에 변경이 생길 때마다 그걸 기록하는데, 어디까지 읽었는지 표시하는 책갈피가 바로 Offset입니다.
Offset 이후에 생긴 변경사항만 Stream에서 보여줘요.


Stream Offset 진행 (Progress stream offset)

-- 스트림 소비 예시: Stream의 데이터를 다른 테이블로 이동하면 offset이 진행됨
INSERT INTO mytable2 SELECT * FROM mystream;

📌 Stream offset은 DML 트랜잭션 내에서 Stream이 사용될 때만 전진함. SELECT만으로는 offset이 변경되지 않음.


3. Tasks + Streams 조합

Tasks와 Streams를 결합하면 새로운 데이터나 변경된 데이터를 지속적으로(continuously) 처리하는 파이프라인 구성 가능.

SYSTEM$STREAM_HAS_DATA 함수

  • Stream에 처리할 CDC 레코드가 있는지 확인
  • Task의 WHEN 절과 함께 사용하여 데이터가 있을 때만 Task를 실행

Tasks & Streams 예시 코드 (슬라이드)

CREATE TASK mytask1
  WAREHOUSE = mywh
  SCHEDULE = '5 MINUTE'
  WHEN
    SYSTEM$STREAM_HAS_DATA('MYSTREAM')
AS
  INSERT INTO mytable3(id, name)
    SELECT id, name
    FROM mystream
    WHERE METADATA$ACTION = 'INSERT';
구문 요소설명
SCHEDULE = '5 MINUTE'5분마다 실행 여부 확인
WHEN SYSTEM$STREAM_HAS_DATA(...)Stream에 데이터가 있을 때만 실행
WHERE METADATA$ACTION = 'INSERT'INSERT 변경 사항만 필터링하여 처리

📌 보충: SYSTEM$STREAM_HAS_DATA는 Stream의 offset과 현재 트랜잭션 시각 사이의 테이블 버전 메타데이터를 비교하여 CDC 레코드 유무를 판별함.


4. 핵심 요약

항목핵심 내용
Task 생성 권한ACCOUNTADMIN 또는 CREATE TASK 권한
Task 기본 상태생성 시 Suspended (반드시 RESUME 필요)
Root Task스케줄 필수, DAG의 시작점
Child Task스케줄 불가, AFTER 절로 선행 Task 지정
DAG 최대 Task 수1,000개 (Root 포함)
Stream이 추적하는 변경INSERT, UPDATE, DELETE
UPDATE의 Stream 표현DELETE + INSERT 쌍, METADATA$ISUPDATE = TRUE
Stream 메타데이터 컬럼METADATA$ACTION, METADATA$ISUPDATE, METADATA$ROW_ID
Stream offset 전진 조건DML 트랜잭션 내 Stream 사용 시 (SELECT 제외)
Tasks + Streams 연동WHEN SYSTEM$STREAM_HAS_DATA('stream_name')

0개의 댓글