[Apache Flink] 기본 문법과 주요 구성 요소 정리하기

궁금하면 500원·2024년 11월 23일

데이터 저장하기

목록 보기
5/23

Apache Flink 기본 문법 및 주요 구성 요소

1. 실행 환경 설정 (StreamExecutionEnvironment)

Flink 프로그램은 StreamExecutionEnvironment를 기반으로 실행됩니다.

환경 설정은 로컬 또는 클러스터 환경에 따라 다르게 구성할 수 있습니다.

// 로컬 환경 설정 (JVM에서 실행)
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// 클러스터 환경 설정 (외부 클러스터에서 실행)
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
    "hostname", // 클러스터 호스트 이름
    8081,       // 클러스터 포트
    "path/to/jar" // 실행할 JAR 파일 경로
);

// 병렬 처리 수준 설정
env.setParallelism(4); // 병렬성 4로 설정

2. 데이터 소스 정의

Flink는 다양한 데이터 소스를 지원하며, 외부 데이터 시스템에서 데이터를 읽어오는 기능을 제공합니다.

(1) 파일에서 데이터 읽기

DataStream<String> text = env.readTextFile("file:///path/to/input/file");

(2) Kafka에서 데이터 읽기

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092"); // Kafka 서버
properties.setProperty("group.id", "test-group"); // 소비자 그룹 ID

FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
    "topic-name",                   // Kafka 토픽 이름
    new SimpleStringSchema(),       // 데이터 포맷
    properties                      // 설정 프로퍼티
);
DataStream<String> kafkaStream = env.addSource(kafkaSource);

3. 데이터 변환 연산자 (Transformation)

Flink는 데이터를 처리하기 위해 다양한 변환 연산자를 제공합니다.

(1) Map 변환

DataStream<Integer> mapped = kafkaStream.map(s -> Integer.parseInt(s));

(2) Filter 변환

DataStream<Integer> filtered = mapped.filter(number -> number > 100);

(3) FlatMap 변환

DataStream<String> words = kafkaStream.flatMap((String line, Collector<String> out) -> {
    for (String word : line.split("\\s")) {
        out.collect(word);
    }
}).returns(Types.STRING); // 반환 타입 명시

(4) KeyBy 및 집계

DataStream<Tuple2<String, Integer>> wordCounts = words
    .map(word -> new Tuple2<>(word, 1))
    .keyBy(value -> value.f0) // 키를 기준으로 그룹화
    .sum(1); // 두 번째 필드(Count)를 합산

4. 윈도우 처리 (Windowing)

스트리밍 데이터는 시간 기반 또는 이벤트 기반의 윈도우로 그룹화하여 처리할 수 있습니다.

(1) Tumbling Window (고정 윈도우)

kafkaStream.keyBy(value -> value.key)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 5초 단위 윈도우
    .sum("value");

(2) Sliding Window (슬라이딩 윈도우)

kafkaStream.keyBy(value -> value.key)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 10초 윈도우, 5초 슬라이드
    .aggregate(new MyAggregateFunction());

5. 상태 관리 (State Management)

Flink는 상태를 통해 스트리밍 데이터의 중간 결과를 유지하거나 집계 데이터를 저장할 수 있습니다.

(1) 값 상태(ValueState) 정의 및 사용

// 상태 초기화
ValueState<Integer> state = getRuntimeContext().getState(
    new ValueStateDescriptor<>("stateName", Integer.class)
);

// 상태 업데이트 및 조회
public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
    state.update(value); // 상태 저장
    Integer currentValue = state.value(); // 상태 조회
    out.collect(currentValue);
}

6. 싱크(Sink) 정의

처리된 데이터를 외부 시스템에 저장하거나 전송합니다.

(1) 파일에 쓰기

kafkaStream.writeAsText("file:///path/to/output/file");

(2) Kafka로 쓰기

FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(
    "output-topic",              // 대상 Kafka 토픽
    new SimpleStringSchema(),    // 데이터 포맷
    properties                   // 설정 프로퍼티
);
kafkaStream.addSink(kafkaSink);

7. 작업 실행

작성된 Flink 작업을 실행합니다.

env.execute("Flink Job Name");

profile
에러가 나도 괜찮아 — 그건 내가 배우고 있다는 증거야.

0개의 댓글