Flink 프로그램은 StreamExecutionEnvironment를 기반으로 실행됩니다.
환경 설정은 로컬 또는 클러스터 환경에 따라 다르게 구성할 수 있습니다.
// 로컬 환경 설정 (JVM에서 실행)
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// 클러스터 환경 설정 (외부 클러스터에서 실행)
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
"hostname", // 클러스터 호스트 이름
8081, // 클러스터 포트
"path/to/jar" // 실행할 JAR 파일 경로
);
// 병렬 처리 수준 설정
env.setParallelism(4); // 병렬성 4로 설정
Flink는 다양한 데이터 소스를 지원하며, 외부 데이터 시스템에서 데이터를 읽어오는 기능을 제공합니다.
DataStream<String> text = env.readTextFile("file:///path/to/input/file");
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092"); // Kafka 서버
properties.setProperty("group.id", "test-group"); // 소비자 그룹 ID
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
"topic-name", // Kafka 토픽 이름
new SimpleStringSchema(), // 데이터 포맷
properties // 설정 프로퍼티
);
DataStream<String> kafkaStream = env.addSource(kafkaSource);
Flink는 데이터를 처리하기 위해 다양한 변환 연산자를 제공합니다.
DataStream<Integer> mapped = kafkaStream.map(s -> Integer.parseInt(s));
DataStream<Integer> filtered = mapped.filter(number -> number > 100);
DataStream<String> words = kafkaStream.flatMap((String line, Collector<String> out) -> {
for (String word : line.split("\\s")) {
out.collect(word);
}
}).returns(Types.STRING); // 반환 타입 명시
DataStream<Tuple2<String, Integer>> wordCounts = words
.map(word -> new Tuple2<>(word, 1))
.keyBy(value -> value.f0) // 키를 기준으로 그룹화
.sum(1); // 두 번째 필드(Count)를 합산
스트리밍 데이터는 시간 기반 또는 이벤트 기반의 윈도우로 그룹화하여 처리할 수 있습니다.
kafkaStream.keyBy(value -> value.key)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 5초 단위 윈도우
.sum("value");
kafkaStream.keyBy(value -> value.key)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 10초 윈도우, 5초 슬라이드
.aggregate(new MyAggregateFunction());
Flink는 상태를 통해 스트리밍 데이터의 중간 결과를 유지하거나 집계 데이터를 저장할 수 있습니다.
// 상태 초기화
ValueState<Integer> state = getRuntimeContext().getState(
new ValueStateDescriptor<>("stateName", Integer.class)
);
// 상태 업데이트 및 조회
public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
state.update(value); // 상태 저장
Integer currentValue = state.value(); // 상태 조회
out.collect(currentValue);
}
처리된 데이터를 외부 시스템에 저장하거나 전송합니다.
kafkaStream.writeAsText("file:///path/to/output/file");
FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(
"output-topic", // 대상 Kafka 토픽
new SimpleStringSchema(), // 데이터 포맷
properties // 설정 프로퍼티
);
kafkaStream.addSink(kafkaSink);
작성된 Flink 작업을 실행합니다.
env.execute("Flink Job Name");
