현재 webflux 와 r2dbc 를 활용하여 다중 api를 개발하던 중, r2dbc 에서 batch insert 를 어떻게 사용하는지 찾아보게되었고, 이를 공유해 보고자 한다.
우선 R2dbcCrudRepository
를 사용하고 있었기 때문에, 단순히 batch insert를 위해서는 제공하는 saveAll
메서드를 사용해도된다.
궁금하니 다른 방법도 알아보자.
MariaDB의 공식문서에 r2dbc batch insert 의 예시가 잘 나와있다.
try {
conf = MariadbConnectionConfiguration.builder()
.host("127.0.0.1").port(3306)
.username("username").password("password")
.database("db").build();
connFactory = new MariadbConnectionFactory(conf);
}
catch (Exception e) {
e.printStackTrace();
}
try {
conn = connFactory.create().block();
Batch batch = conn.createBatch();
for(Rule rule : ruleList) {
batch.add("INSERT INTO rule (route_path_id, user_id) VALUES (" +
rule.getRoutePathId() + ",'" + rule.getUserId() + "')");
}
return Flux.from(batch.execute()).subscribe();
}
catch (Exception e) {
}
finally {
conn.close();
}
공식 문서를 참고해서 batch insert 로직을 작성했다.
for(Rule rule : ruleList) {
batch.add("INSERT INTO rule (route_path_id, user_id) VALUES (" +
rule.getRoutePathId() + ",'" + rule.getUserId() + "')");
}
위와 같이 VALUES 에 문자열 변수를 넣을 땐 ''
작은 따옴표로 감싸는 것을 잊지말자.
그렇지 않으면 unknown column 컬럼명 in field list
라는 에러를 만날 수 있다...
이렇게 작성 후 실행을 해보면 아마 에러가 날 것이다.
왜냐하면 block()
메서드 때문이다.
논블로킹 처리 중 block()
메서드를 사용했기 때문이다.
비동기 작업을 하는 스레드에 block 을 걸어서 동기식으로 만든다면, 비동기 작업의 의미가 없어지게 된다.
커넥션은 동기화가 필요한 작업이므로 block이 필요하다. 그러면 어떻게 해아할까?
비동기 작업 중 blocking call을 하고싶다면 다음과 같은 방식을 사용하면 된다.
Mono blockingWrapper = Mono.fromCallable(() -> {
return // blocking 코드 작성
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.boundedElastic());
동기식 코드를 Mono.fromCallable
로 감싼 후, Schedulers.boundedElastic()
을 사용하면, subscribe 가 발생했을 때 동기식 코드를 별도의 스레드에서 동작하도록 하는 것이다.
그러므로 동기식 코드가 다른 스레드에서 동작하므로, 에러가 발생하지 않는다.
Mono.fromCallable
Schedulers.boundedElastic
elastic
은 생성할 수 있는 스레드 개수의 제한을 두지 않는다는 문제로 현재 deprecated 되었다.최종적으로 작성한 코드는 다음과 같다.
Mono blocking = Mono.fromCallable(() -> {
try {
conf = MariadbConnectionConfiguration.builder()
.host("127.0.0.1").port(3306)
.username("username").password("password")
.database("db").build();
connFactory = new MariadbConnectionFactory(conf);
}
catch (Exception e) {
e.printStackTrace();
}
conn = connFactory.create().block();
Batch batch = conn.createBatch();
for(Rule rule : ruleList) {
batch.add("INSERT INTO rule (route_path_id, user_id) VALUES (" +
rule.getRoutePathId() + ",'" + rule.getUserId() + "')");
}
return Flux.from(batch.execute()).subscribe();
});
return blocking.subscribeOn(Schedulers.boundedElastic());
https://mariadb.com/docs/connect/programming-languages/java-r2dbc/native/batch/#batch-operations
https://godekdls.github.io/Reactor%20Core/appendixbfaqbestpracticesandhowdoi/#b1-how-do-i-wrap-a-synchronous-blocking-call
https://binux.tistory.com/135?category=907689