R2DBC

이정원·2024년 11월 16일
post-thumbnail

비동기 어플리케이션에서 처리 절차 중간에 동기가 포함되면 해당 항목에서 병목이 되어 응답 속도가 떨어지고 전체 속도가 저하된다.

해당 그림과 같이 Client요청에 대해 Api서버 외부 요청과 DB쿼리 응답 절차가 있다. 1,2번과 같이 Non-blocking으로 진행되지만 3번 DB와 Communication 하는 부분에서 JDBC,JPA와 같은 ORM을 사용하게 되면 동기 방식으로 처리하게 된다. 이 과정에서 Thread는 대기 상태가 되어 4번 작업을 진행하지 못한다.

따라서 해당 문제를 해결하기 위해 R2DBC(Reactive-Relational-Database-Connectivit)가 나오게 되었다. R2DBC는 기존 JDBC와 다르게 비동기적 데이터베이스 접근을 가능하게 한다. 또한 Reactive-Stream 표준을 따르고 있어 DB쿼리 결과를 stream으로 처리할수 있고 Non-blocking I/O 지원,Spring webflux 생태계와 통합되어 제공된다.
다양한 데이터베이스(MYSQL,MariaDB,MongoDB)에 대한 SPI를 구현하여 호환성을 제공한다.

SPI: 다양한 데이터베이스 드라이버가 R2DBC의 표준 인터페이스를 따라 자신만의 방식으로 비동기 데이터베이스 작업을 처리하도록 만드는 과정

R2DBC

Mysql 초기 설정

CREATE TABLE users (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(128),
    email VARCHAR(255),
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL
);

CREATE TABLE posts (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    title VARCHAR(30),
    content VARCHAR(200),
    user_id BIGINT,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL
);

Docker mysql을 실행 후, users,posts 테이블을 생성하였다. 또한 post테이블의 user_id에 인덱스를 생성하여 추후 users 테이블과 빠른 조인을 위해 인덱스를 생성하였다.

CREATE INDEX idx_user_id ON posts(user_id); 

mysql 인코딩 설정 명령어:

SHOW VARIABLES LIKE 'character_set%';

Spring Boot 초기 설정

build.gradle

implementation 'io.asyncer:r2dbc-mysql:1.0.2' //spi 1.0.0 (spring boot3)
compileOnly 'org.projectlombok:lombok'

application.yaml

spring:
  r2dbc:
    url: r2dbc:mysql://localhost:3306/fastsns
    username: root
    password: fastcampus

해당 의존성을 추가하고 application.yaml 파일에 데이터 url을 추가했다.

R2dbcConfig

@Slf4j
@Component
@RequiredArgsConstructor
@EnableR2dbcRepositories
@EnableR2dbcAuditing
public class R2dbcConfig implements ApplicationListener<ApplicationReadyEvent> {

    private final DatabaseClient databaseClient;

    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        //reactor: publisher,subscriber
        databaseClient.sql("SELECT 1").fetch().one().subscribe(
                success->{
                    log.info("Initialize r2dbc database connection");
                },
                error->{
                    log.error("Failed to Initialize r2dbc database connection");
                    SpringApplication.exit(event.getApplicationContext(),()-> -110);
                }
        );
    }
}

단순 yaml파일 설정은 애플리케이션이 시작할 때 데이터베이스 연결 설정 값을 로드만 하기 때문에 커넥션이 이루어졌는지 확인이 어렵다. 따라서 해당 코드를 작성하여 정상적인 연결을 확인 한다.

  • @EnableR2dbcRepositories: 다양한 DB를 비동기적 논블로킹 방식으로 처리할 수 있도록 R2DBC를 활성화
  • @EnableR2dbcAuditing: R2DBC에서 엔터티 생성 및 수정 시간을 자동으로 관리(@CreatedDate,@LastModifiedDate)

1.User

User

@Data
@Builder
@AllArgsConstructor
@Table("users")
public class User {
    @Id
    private Long id;
    private String name;
    private String email;
    @CreatedDate
    private LocalDateTime createdAt;
    @LastModifiedDate
    private LocalDateTime updatedAt;
}

UserR2dbcRepository

public interface UserR2dbcRepository extends ReactiveCrudRepository<User,Long> {
}

ReactiveCrudRepository를 상속받으면 기본 기능을 제공하여 mysql에 바로 쿼리를 내리고 저장,조회까지 가능하다.


@Service
@RequiredArgsConstructor
public class UserService {
    //private final UserRepository userrepository;
    private final UserR2dbcRepository userR2dbcRepository;
    public Mono<User> create(String name, String email) {
        return userR2dbcRepository.save(User.builder().name(name).email(email).build());
    }

    public Flux<User> findAll() {
        return userR2dbcRepository.findAll();
    }

    public Mono<User> findById(Long id) {
        return userR2dbcRepository.findById(id);
    }

    public Mono<Void> deleteById(Long id) {
        return userR2dbcRepository.deleteById(id);
    }

    public Mono<User> update(Long id, String name, String email) {
        return userR2dbcRepository.findById(id)
                .flatMap(u -> {
                    u.setName(name);
                    u.setEmail(email);
                    return userR2dbcRepository.save(u);
                });
    }

}

2.Post

Post

@Table("posts")
public class Post {
    @Id
    private Long id;

    @Column("user_id")
    private Long userId;

    private String title;
    private String content;

    @Column("created_at")
    @CreatedDate
    private LocalDateTime createdAt;
    
    @Column("updated_at")
    @LastModifiedDate
    private LocalDateTime updatedAt;

PostR2dbcRepository

public interface PostR2dbcRepository extends ReactiveCrudRepository<Post,Long> {
}

PostControllerV2

@RestController
@RequestMapping("/V2/posts")
@RequiredArgsConstructor
public class PostControllerV2 {
    private final PostServiceV2 postServiceV2;

    @PostMapping("")
    public Mono<PostResponseV2> createPost(@RequestBody PostCreateRequest request){
        return postServiceV2.create(request.getUserId(), request.getTitle(), request.getContent())
                .map(PostResponseV2::of);
    }
    @GetMapping("")
    public Flux<PostResponseV2> findAllPost(){
        return postServiceV2.findAll().map(PostResponseV2::of);
    }

    @GetMapping("/{id}")
    public Mono<ResponseEntity<PostResponseV2>> findPost(@PathVariable Long id) {
        return postServiceV2.findById(id)
                .map(p -> ResponseEntity.ok().body(PostResponseV2.of(p)))
                .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
    }
    @DeleteMapping("/{id}")
    public Mono<ResponseEntity<?>> deletePost(@PathVariable Long id){
        return postServiceV2.deleteById(id).then(Mono.just(ResponseEntity.noContent().build()));

    }
}

3.애노태이션(@Query)을 활용한 복잡한 쿼리 처리

UserR2dbcRepository

public interface UserR2dbcRepository extends ReactiveCrudRepository<User,Long> {
    Flux<User> findByName(String name);
    Flux<User> findByNameOrderByIdDesc(String name);
    
    //기본 기능은 deleteById
    @Modifying
    @Query("DELETE FROM users WHERE name = :name")
    Mono<Void> deleteByName(String name);
}

1.메서드 이름 기반 쿼리 생성: 메서드 이름을 분석하여 자동으로 SQL 쿼리를 생성한다. 이름의 규칙을 통해 어떤 작업을 수행할지 파악한다.

  • 기본구조:[prefix][property][operator]
  • Prefix:findBy,readBy,queryBy,getBy
  • Property: 엔티티 클래스의 필드 이름
  • Operator: 조건을 나타내는 키워드 + [property](And,Or,GreaterThan,LessThan,Between,Like,In,Not)
public interface PostR2dbcRepository extends ReactiveCrudRepository<Post,Long> {
	Flux<Post> findByFirstNameAndLastName(String firstName, String lastName)
}

2.@Query를 활용한 커스텀 쿼리: 자동 생성 메서드로는 지원되지 않는 복잡한 쿼리나 DELETE/UPDATE와 같은 데이터 변경 작업에는 @Query 애노테이션을 사용한다. 기본 기능인 deleteBy는 단일 조건으로만 DELETE를 수행한다. 그러나 다중 조건이나 복잡한 삭제 쿼리는 직접 작성해야 한다.

4.Custom Repository 생성

User가 작성한 여러 Post들을 가져와보자.

user는 여러개의 post를 작성할수 있다.{User(1) -> Post(N)}
Post는 한명의 유저로 인해 작성된다.{Post -> User(1)}

UserController에서 userId를 요청하면 작성한 Post들을 반환해보자.
UserController

@GetMapping("/{id}/posts")
    public Flux<UserPostResponse> getUserPosts(@PathVariable Long id){
        return postServiceV2.findByAllUserId(id).map(UserPostResponse::of);
    }

요청한 id와 매칭되는 userId를 가진 Post 인스턴스들의 집합을 Flux타입으로 변환하여 DTO를 통해 반환한다. 하지만 post에 필드 값 username이 존재하지 않아 @Transient를 통해 영속되지 않은 필드를 생성한다.
Post

@Transient
private User user;
//나머지 생략

@Transient 필드를 사용하는 경우: 임시 데이터를 저장하거나, 비즈니스 로직에서만 사용되는 값을 표현할 때

@Transient 필드는 데이터베이스와 매핑되지 않기 때문에 해당 필드를 기준으로 ReactiveCrudRepository 기본 메서드의 CRUD 작업을 수행하거나 데이터베이스에서 값을 조회할 수 없다. 따라서 저장소 인터페이스를 새로 생성하여 확장한다.

PostR2dbcRepository

public interface PostR2dbcRepository extends ReactiveCrudRepository<Post,Long>,PostCustomR2dbcRepository {
    Flux<Post> findByUserId(Long id);
}

ReactiveCrudRepository에서 Custom Repository 확장을 통해 자식 구현체를 자동으로 찾아 내부에 통합한다.

PostCustomR2dbcRepository

public interface PostCustomR2dbcRepository {
    Flux<Post> findAllByUserId(Long userId);
}

PostCustomR2dbcRepositoryImpl

@Repository
@RequiredArgsConstructor
public class PostCustomR2dbcRepositoryImpl implements PostCustomR2dbcRepository {
    private final DatabaseClient databaseClient;

    @Override
    public Flux<Post> findAllByUserId(Long userId) {
        var sql = """
                SELECT p.id as pid, p.user_id as userId, p.title, p.content, p.created_at as createdAt, p.updated_at as updatedAt,
                    u.id as uid, u.name as name, u.email as email, u.created_at as uCreatedAt, u.updated_at as uUpdatedAt
                FROM posts p
                LEFT JOIN users u ON p.user_id = u.id
                WHERE p.user_id = :userId
                """;
        return databaseClient.sql(sql)
                .bind("userId", userId)
                .fetch()
                .all()
                .map(row -> Post.builder()
                        .id((Long) row.get("pid"))
                        .userId((Long) row.get("userId"))
                        .title((String) row.get("title"))
                        .content((String) row.get("content"))
                        .user(User.builder()
                                .id((Long)row.get("uid"))
                                .name((String)row.get("name"))
                                .email((String)row.get("email"))
                                .createdAt(((ZonedDateTime)row.get("uCreatedAt")).toLocalDateTime())
                                .createdAt(((ZonedDateTime)row.get("uUpdatedAt")).toLocalDateTime())
                                .build()
                        )
                        .createdAt(((ZonedDateTime)row.get("createdAt")).toLocalDateTime())
                        .createdAt(((ZonedDateTime)row.get("updatedAt")).toLocalDateTime())
                        .build());
    }
}

posts 테이블에서 user_id가 전달받은 파라미터 userId와 일치하는 데이터를 조회한 뒤, 해당 데이터와 일치하는 users 테이블을 LEFT JOIN하여 결합한다. (필드명 중복을 방지하기 위해 SELECT 절에서 별칭을 지정하였다.) 이후, 조회된 결과의 모든 행을 처리하여 Post 객체를 생성하며, Post의 비영속성 필드로 설정된 User 객체를 builder를 통해 주입하여 사용자 이름(username)을 포함할 수 있도록 하였다.

DatabaseClient: Spring Data R2DBC에서 제공하는 비동기적(non-blocking) SQL 데이터베이스 접근 도구

0개의 댓글