Source Transformation Sink Operator - Flink

Andy (Yoon Yong) Shin·2021년 9월 26일
1
post-thumbnail

개요

Flink로 stream processing program을 만든다면, operator라는 개념으로 프로그램을 구성하게 되며, Operator는 source, transformation 그리고 sink라는 3개의 종류로 분류 되어 있습니다. Apache flink라는 framework 맹목적이 쉽고 빠르게 대용량 데이터를 병렬처리할 수 있게 도와주는 것이기 때문에, operator들의 분류 자체가 Extract Transform Load (ETL)에 가깝게 분류 되어 있습니다.

해당 operator에 대한 설명을 시작하기 전에 Flink에서 정의 하는 2가지의 data stream인 bounded stream 그리고 unbounded stream에 대해 간단하게 정의하고 넘어 가겠습니다.

Unbounded Stream - 끝이 없는 stream으로 데이터가 무한으로 들어오는 stream입니다. 일반적으로는 실시간으로 데이터를 바로 처리한다면, 언제 끝이 나는지 알수 없기에 unbounded stream이라고 이야기할 수 있습니다.

Bounded Stream - 끝을 알고 있는 stream입니다. 데이터를 새벽에 1일치식만 처리 한다면, 이것은 bounded stream일라고 할 수 있습니다.

Flink는 stream에 종류에 따라 데이터 처리의 방식이 달라지게 만들어졌기때문에, Flink는 최초 환경설정에 선택된 stream 종류에 따라 해당 환경에 최적화된 방식으로 구동되며, 선택된 stream에 따라 제공되는 operator 종류도 조금 달라집니다. 간단한 예로, 온도 sensor data를 취합해 평균치를 구한다면, 1일 bounded stream 같은 경우 전체 데이터에 끝을 알기에 모두 취합하여 평균을 구할수 있는 반면, unbounded stream은 끝을 알수 없기에, 해당 데이터를 한번에 취합하게 된다면, OOM이 날수도 있어, 보통 windowing이라는 기술을 사용하여, 현시점 기준 마지막 1일치 평균을 구하는 형식으로 처리하는 방식이 달라질것입니다.

Source

Source는 ETL에 extract와 같다고 보시면 될거 같으며, data를 정해진 곳에서 stream 형태로 가져옵니다. 이 정해진 곳은 Kafka, Cassandra, Elasticsearch, File 또는 java collection 등 다양하며, user defined source로 사용자가 직접 만든 source operator로 원하는 데이터를 아무곳에서나 가져올수 있습니다. User defined source operator도 만들기가 쉬운 편으로, 간단한 for loop 만드는 것과 같이 설계되어 있습니다.

User defined source를 만드려면 먼저 SourceFunction 또는 RichSourceFunction을 implment해야 합니다. Rich-이냐 아니냐는 추가 적인 operator life cycle을 구현 해야 하느냐 마느냐에 차이가 있는데 Rich-는 open, close라는 추가 method도 구현 해야 합니다. Open, close는 이미 예상하셔겟지만, operator가 flink job에 의해 생성 될때 최초와 끝에 한번식 실행 되는 method로 database connection pool 또는 외부 io stream 같은 객체들을 정리할때 용이하게 사용 됩니다. 추후 transformation 또는 sink operator 에서도 Rich- 패턴은 동일하게 사용됩니다. 추가로 Generic type으로 source operator이기 때문에 stream으로 내보낼 element type을 지정 하고, run이라는 method에 for loop 형식에 코드만 작성하면 끝입니다. Run methond에 있는 SourceContext를 통해 Flink는 다음 operator로 데이터를 전달 합니다.

아래 간단한 예제 코드는 지정한 파일을 line별로 읽는 user defined source 입니다.

public class LineReaderSource extends RichSourceFunction<String> {

    private String filePath;
    private FileReader fr;
    private BufferedReader br;

    public LineReaderSource(String filePath) {
        this.filePath = filePath;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.fr = new FileReader(this.filePath);
        this.br = new BufferedReader(this.fr);
    }

    @Override
    public void close() throws Exception {
        super.close();
        this.br.close();
    }

    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
        String line = this.br.readLine();
        while (line != null) {
            sourceContext.collect(line);
            line = this.br.readLine();
        }
    }

    @Override
    public void cancel() {
        try {
            this.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Transformation

ETL의 transformation과 동일 하지만, 재사용 가능한 operator의 단위로 구성이 가능하며, bounded stream이냐 아니냐에 따라 아래와 같은 operator들을 기본으로 제공합니다.

DataStream API (Unbounded stream)

  • Map
  • FlatMap
  • Filter
  • KeyBy
  • Reduce
  • Window
  • WindowAll
  • Window Apply
  • WindowReduce
  • Union
  • Window Join
  • Interval Join
  • Window CoGroup
  • Connect
  • CoMap, CoFlatMap
  • Iterate
  • Partitioning

DataSet API (Bounded stream)

  • Map
  • FlatMap
  • MapPartition
  • Filter
  • Projection of Tuple DataSet
  • Transformations on Grouped DataSet
  • Reduce on Grouped DataSet
  • GroupReduce on Grouped DataSet
  • GroupCombine on a Grouped DataSet
  • Aggregate on Grouped Tuple DataSet
  • MinBy / MaxBy on Grouped Tuple DataSet
  • Reduce on full DataSet
  • GroupReduce on full DataSet
  • GroupCombine on a full DataSet
  • Aggregate on full Tuple DataSet
  • MinBy / MaxBy on full Tuple DataSet
  • Distinct
  • Join
  • OuterJoin
  • Cross
  • CoGroup
  • Union
  • Rebalance
  • Hash-Partition
  • Range-Partition
  • Sort Partition
  • First-n

위와 같이 bounded stream에게 제공 되는 기본 transformation operator가 많습니다. 각 operator에 자세한 사용법은 여기 에서 확인이 가능합니다. (다음 블로그에 해당 operator를 자세히 들여다 볼 생각입니다 ㅎ)

여기서는 간단하게, Map을 사용하여, 위 source operator에서 넘겨 받은 stream data를 모두 uppercase로 변경하는 transformation map operator를 예제로 보여드리겟습니다.

public class UpperCaseMap implements MapFunction<String, String> {

    @Override
    public String map(String s) throws Exception {
        return s.toUpperCase(Locale.ROOT);
    }
}

간단하게 MapFuncrtion의 interface를 implement 하여, generic type <IN, OUT>으로 어떤 data stream을 받아서 어떤 stream으로 보낼지 정의하면 끝입니다.

Sink

마지막인 Load와 동일한 sink operator입니다. 보통 Flink는 sink operator를 통해, 외부로 처리된 데이터를 내보냅니다. 이는 source operator와 동일하며, 기본적으로 Kafka, Cassandra, Elasticsearch, File 또는 java collection 등 다양한 sink operator를 기본적으로 제공 해주시만, 간단한 user defined sink operator 구성을 위해 File로 저장 하는 sink operator를 만들어 보겟습니다. Source operator 와 다른점은, run method 대신 invoke가 있는 것 말고는 크게 차이가 없습니다. Sink operator에 invoke 같은 경우, source에 run과 다르게, stream에서 데이터가 넘어 올때 마다, invoke 실행되는 형식이라, for loop처럼 구성하면 안됩니다.

public class FileWriteSink extends RichSinkFunction<String> {

    private String filePath;
    private FileWriter fw;
    private BufferedWriter bw;
    
    public FileWriteSink(String filePath) {
        this.filePath = filePath;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.fw = new FileWriter(this.filePath);
        this.bw = new BufferedWriter(this.fw);
    }

    @Override
    public void close() throws Exception {
        super.close();
        this.bw.close();
    }

    @Override
    public void invoke(String value, Context context) throws Exception {
        super.invoke(value, context);
        this.bw.write(value);
    }
}

Job

위 user defined operator들로 job을 아래와 같이 역으면, file을 line 별로 읽어서 모두 영문 대문자로 변환하는 job 이됩니다.

public class StreamingJob {

    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setClosureCleanerLevel(ExecutionConfig.ClosureCleanerLevel.NONE);
        env.setParallelism(1);
        
        env.addSource(new LineReaderSource("input.txt"))
                .map(new UpperCaseMap())
                .addSink(new FileWriteSink("output.txt"));

        env.execute("Flink Streaming Java API Skeleton");
    }
}

결과

이번 글에서는 source, transformation 그리고 sink를 모두 user defined operator로 구성하여, text file을 모두 대문자로 변경하는 stream job을 만들어 보았습니다. 다음 글에서는 보다 depth 잇는 transformation operator에 대해 알아볼 예정입니다.

0개의 댓글