[MariaDB] r2dbc batch insert, webflux blocking call 처리

이현지·2022년 8월 29일
0

현재 webflux 와 r2dbc 를 활용하여 다중 api를 개발하던 중, r2dbc 에서 batch insert 를 어떻게 사용하는지 찾아보게되었고, 이를 공유해 보고자 한다.

우선 R2dbcCrudRepository 를 사용하고 있었기 때문에, 단순히 batch insert를 위해서는 제공하는 saveAll 메서드를 사용해도된다.

궁금하니 다른 방법도 알아보자.
MariaDB의 공식문서에 r2dbc batch insert 의 예시가 잘 나와있다.

        try {
            conf = MariadbConnectionConfiguration.builder()
                .host("127.0.0.1").port(3306)
                .username("username").password("password")
                .database("db").build();

            connFactory = new MariadbConnectionFactory(conf);
        }
        catch (Exception e) {
            e.printStackTrace();
        }

        try {
            conn = connFactory.create().block();
            Batch batch = conn.createBatch();

            for(Rule rule : ruleList) {
                batch.add("INSERT INTO rule (route_path_id, user_id) VALUES (" +
                    rule.getRoutePathId() + ",'" + rule.getUserId() + "')");
            }

            return Flux.from(batch.execute()).subscribe();
        }
        catch (Exception e) {

        }
        finally {
            conn.close();
        }

공식 문서를 참고해서 batch insert 로직을 작성했다.

  for(Rule rule : ruleList) {
                batch.add("INSERT INTO rule (route_path_id, user_id) VALUES (" +
                    rule.getRoutePathId() + ",'" + rule.getUserId() + "')");
            }

위와 같이 VALUES 에 문자열 변수를 넣을 땐 '' 작은 따옴표로 감싸는 것을 잊지말자.
그렇지 않으면 unknown column 컬럼명 in field list 라는 에러를 만날 수 있다...

이렇게 작성 후 실행을 해보면 아마 에러가 날 것이다.
왜냐하면 block() 메서드 때문이다.

논블로킹 처리 중 block() 메서드를 사용했기 때문이다.
비동기 작업을 하는 스레드에 block 을 걸어서 동기식으로 만든다면, 비동기 작업의 의미가 없어지게 된다.
커넥션은 동기화가 필요한 작업이므로 block이 필요하다. 그러면 어떻게 해아할까?

비동기 작업 중 blocking call을 하고싶다면 다음과 같은 방식을 사용하면 된다.

Mono blockingWrapper = Mono.fromCallable(() -> { 
    return // blocking 코드 작성
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.boundedElastic()); 

동기식 코드를 Mono.fromCallable 로 감싼 후, Schedulers.boundedElastic() 을 사용하면, subscribe 가 발생했을 때 동기식 코드를 별도의 스레드에서 동작하도록 하는 것이다.
그러므로 동기식 코드가 다른 스레드에서 동작하므로, 에러가 발생하지 않는다.

  • Mono.fromCallable
    데이터의 방출을 구독 전까지 지연시킨다.
    Cold Publisher로, subscribe가 일어나지 않으면 동작하지 않는다.
  • Schedulers.boundedElastic
    별도의 스레드를 생성하며, 생성할 수 있는 스레드 개수의 제한을 둔다.
    그러므로 요청이 많을 경우 blocking 작업을 잠시 큐에 넣어놓을 수 있다.
    기존에 elastic 은 생성할 수 있는 스레드 개수의 제한을 두지 않는다는 문제로 현재 deprecated 되었다.

최종적으로 작성한 코드는 다음과 같다.

Mono blocking = Mono.fromCallable(() -> {
            try {
                conf = MariadbConnectionConfiguration.builder()
                    .host("127.0.0.1").port(3306)
                    .username("username").password("password")
                    .database("db").build();

                connFactory = new MariadbConnectionFactory(conf);
            }
            catch (Exception e) {
                e.printStackTrace();
            }

            conn = connFactory.create().block();
            Batch batch = conn.createBatch();

            for(Rule rule : ruleList) {
                batch.add("INSERT INTO rule (route_path_id, user_id) VALUES (" +
                    rule.getRoutePathId() + ",'" + rule.getUserId() + "')");
            }

            return Flux.from(batch.execute()).subscribe();
        });

        return blocking.subscribeOn(Schedulers.boundedElastic());

Reference

https://mariadb.com/docs/connect/programming-languages/java-r2dbc/native/batch/#batch-operations
https://godekdls.github.io/Reactor%20Core/appendixbfaqbestpracticesandhowdoi/#b1-how-do-i-wrap-a-synchronous-blocking-call
https://binux.tistory.com/135?category=907689

profile
Backend Developer👩‍💻

0개의 댓글