Webflux에서 Mongo Aggregation 사용하기

dev-well-being·2023년 5월 11일
1
post-thumbnail
post-custom-banner

MongoDB에서 RDB에서 사용하는 것처럼 collection을 join 할 수 있도록 지원하는 aggregation 기능을 제공한다.

기존 JPA에서는 native Query를 spring에서 사용할 수 있도록 @Query annotation을 지원하고, QueryDSL이라는 별도 라이브러리도 존재한다.

MongoDB도 Spring에서 Aggregation을 사용할 수 있도록 @Aggregation annotation을 지원한다.

Webflux와 Reactive Mongo의 개발환경 설정이 되어 있다는 전제하에 Document Entity와 Repository를 생성한다.

  • Document Entity
// 부서 Collection
@Document(collection = "department")
@Getter
@Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Department {
    @Id
    private String id;
    @Field(name = "name")
    private String name;
}
// 임직원 Collection
@Document(collection = "employee")
@Getter
@Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Employee {
    @Id
    private String id;
    @Field(name = "department_id")
    private String departmentId;
    @Field(name = "name")
    private String name;
}

  • Repository
public interface DepartmentRepository extends ReactiveMongoRepository<Department, String> {
    Mono<Department> findByName(String name);
}
public interface EmployeeRepository extends ReactiveMongoRepository<Employee, String> {
}

테스트를 위한 샘플데이터를 생성해 준다.
@SpringBootTest
@Slf4j
class AggregationTest {
    @Autowired
    DepartmentRepository departmentRepository;
    @Autowired
    EmployeeRepository employeeRepository;

    @Test
    void sampleDataCreate(){

        //create department
        List<Department> departments = new ArrayList<>();
        departments.add(Department.builder().name("인사팀").build());
        departments.add(Department.builder().name("총무팀").build());
        departments.add(Department.builder().name("구매팀").build());
        Flux<Department> departmentFlux = departmentRepository.deleteAll()
                .thenMany(departmentRepository.insert(departments));
        StepVerifier.create(departmentFlux)
                .expectNextCount(3)
                .verifyComplete();

        Mono<Department> hrPart = departmentRepository.findByName("인사팀");
        Mono<Department> affairPart = departmentRepository.findByName("총무팀");
        Mono<Department> purchasePart = departmentRepository.findByName("구매팀");
        
        //create employee
        Flux<Employee> employeeFlux =  employeeRepository.deleteAll()
                .thenMany(hrPart.flatMapMany(department -> {
                    List<Employee> employees = new ArrayList<>();
                    employees.add(Employee.builder().departmentId(department.getId()).name("홍길동").build());
                    employees.add(Employee.builder().departmentId(department.getId()).name("임꺽정").build());
                    return employeeRepository.insert(employees);
                }))
                .thenMany(affairPart.flatMapMany(department -> {
                    List<Employee> employees = new ArrayList<>();
                    employees.add(Employee.builder().departmentId(department.getId()).name("박서준").build());
                    employees.add(Employee.builder().departmentId(department.getId()).name("강동원").build());
                    employees.add(Employee.builder().departmentId(department.getId()).name("박보검").build());
                    return employeeRepository.insert(employees);
                }))
                .thenMany(purchasePart.flatMapMany(department -> {
                    List<Employee> employees = new ArrayList<>();
                    employees.add(Employee.builder().departmentId(department.getId()).name("박연진").build());
                    return employeeRepository.insert(employees);
                }))
                .thenMany(employeeRepository.findAll());

        StepVerifier.create(employeeFlux)
                .expectNextCount(6)
                .verifyComplete();
    }
}

DB에도 정상적으로 저장되었다.

db.department.find()
{
  _id: ObjectId("645c4216b90e216a6aa54ea6"),
  name: '인사팀',
  _class: 'com.seegene.pda.domain.entity.Department'
}
{
  _id: ObjectId("645c4216b90e216a6aa54ea7"),
  name: '총무팀',
  _class: 'com.seegene.pda.domain.entity.Department'
}
{
  _id: ObjectId("645c4216b90e216a6aa54ea8"),
  name: '구매팀',
  _class: 'com.seegene.pda.domain.entity.Department'
}
db.employee.find()
{
  _id: ObjectId("645c4216b90e216a6aa54ea9"),
  department_id: '645c4216b90e216a6aa54ea6',
  name: '홍길동',
  _class: 'com.seegene.pda.domain.entity.Employee'
}
{
  _id: ObjectId("645c4216b90e216a6aa54eaa"),
  department_id: '645c4216b90e216a6aa54ea6',
  name: '임꺽정',
  _class: 'com.seegene.pda.domain.entity.Employee'
}
{
  _id: ObjectId("645c4216b90e216a6aa54eab"),
  department_id: '645c4216b90e216a6aa54ea7',
  name: '박서준',
  _class: 'com.seegene.pda.domain.entity.Employee'
}
{
  _id: ObjectId("645c4216b90e216a6aa54eac"),
  department_id: '645c4216b90e216a6aa54ea7',
  name: '강동원',
  _class: 'com.seegene.pda.domain.entity.Employee'
}
{
  _id: ObjectId("645c4216b90e216a6aa54ead"),
  department_id: '645c4216b90e216a6aa54ea7',
  name: '박보검',
  _class: 'com.seegene.pda.domain.entity.Employee'
}
{
  _id: ObjectId("645c4216b90e216a6aa54eae"),
  department_id: '645c4216b90e216a6aa54ea8',
  name: '박연진',
  _class: 'com.seegene.pda.domain.entity.Employee'
}

임직원과 부서정보를 join에서 가져오기 위해서는 아래와 같이 Aggregation을 사용해야한다.
db.employee.aggregate([
{$addFields: {o_department_id: {$toObjectId: "$department_id"}}}
,{$lookup: {from: "department",localField: "o_department_id",foreignField: "_id",as: "part"}}
,{$unwind: "$part"}
,{$project: {name:"$name", department:"$part.name"}}
])

{
  _id: ObjectId("645c4216b90e216a6aa54ea9"),
  name: '홍길동',
  department: '인사팀'
}
{
  _id: ObjectId("645c4216b90e216a6aa54eaa"),
  name: '임꺽정',
  department: '인사팀'
}
{
  _id: ObjectId("645c4216b90e216a6aa54eab"),
  name: '박서준',
  department: '총무팀'
}
{
  _id: ObjectId("645c4216b90e216a6aa54eac"),
  name: '강동원',
  department: '총무팀'
}
{
  _id: ObjectId("645c4216b90e216a6aa54ead"),
  name: '박보검',
  department: '총무팀'
}
{
  _id: ObjectId("645c4216b90e216a6aa54eae"),
  name: '박연진',
  department: '구매팀'
}

위 Aggregation을 webflux에서 사용하기 위해서는 @Aggregation을 사용하면 된다.
일단 Aggregation에서 설정한 project에 매칭될 수 있는 class를 하나 생성해보자.

@Getter
@Setter
@ToString
public class EmployeeDepart {
    private String id;
    private String name;
    private String department;
}

그리고 아래와 같이 @Aggregation을 선언하여 메소드를 생성하자.

public interface EmployeeRepository extends ReactiveMongoRepository<Employee, String> {

    @Aggregation({"{$addFields: {o_department_id: {$toObjectId: \"$department_id\"}}}"
    , "{$lookup: {from: \"department\",localField: \"o_department_id\",foreignField: \"_id\",as: \"part\"}}"
    , "{$unwind: \"$part\"}"
    , "{$project: {name:\"$name\", department:\"$part.name\"}}"})
    Flux<EmployeeDepart> aggreFindAll();
}

그러면 정상적으로 값이 넘어오는지 확인해보자.

@Test
    void aggreTest(){
        Flux<EmployeeDepart> employeeDepartFlux = employeeRepository.aggreFindAll();
        StepVerifier.create(employeeDepartFlux)
                .assertNext(employeeDepart -> log.info("{}",employeeDepart.toString()))
                .assertNext(employeeDepart -> log.info("{}",employeeDepart.toString()))
                .assertNext(employeeDepart -> log.info("{}",employeeDepart.toString()))
                .assertNext(employeeDepart -> log.info("{}",employeeDepart.toString()))
                .assertNext(employeeDepart -> log.info("{}",employeeDepart.toString()))
                .assertNext(employeeDepart -> log.info("{}",employeeDepart.toString()))
                .verifyComplete()
        ;
    }
INFO  23-05-11 11:22:54.380[nioEventLoopGroup-2-3] [AggregationTest:76] - EmployeeDepart(id=645c4216b90e216a6aa54ea9, name=홍길동, department=인사팀)
INFO  23-05-11 11:22:54.385[nioEventLoopGroup-2-3] [AggregationTest:77] - EmployeeDepart(id=645c4216b90e216a6aa54eaa, name=임꺽정, department=인사팀)
INFO  23-05-11 11:22:54.387[nioEventLoopGroup-2-3] [AggregationTest:78] - EmployeeDepart(id=645c4216b90e216a6aa54eab, name=박서준, department=총무팀)
INFO  23-05-11 11:22:54.388[nioEventLoopGroup-2-3] [AggregationTest:79] - EmployeeDepart(id=645c4216b90e216a6aa54eac, name=강동원, department=총무팀)
INFO  23-05-11 11:22:54.390[nioEventLoopGroup-2-3] [AggregationTest:80] - EmployeeDepart(id=645c4216b90e216a6aa54ead, name=박보검, department=총무팀)
INFO  23-05-11 11:22:54.392[nioEventLoopGroup-2-3] [AggregationTest:81] - EmployeeDepart(id=645c4216b90e216a6aa54eae, name=박연진, department=구매팀)

값이 잘 매칭되서 넘어왔다.

parameter도 적용 가능하다. ?숫자로 @Aggregation 표현식 안에 선언하면 동적으로 변수를 받아 수행할 수 있다.

public interface EmployeeRepository extends ReactiveMongoRepository<Employee, String> {

    @Aggregation({"{$addFields: {o_department_id: {$toObjectId: \"$department_id\"}}}"
    , "{$lookup: {from: \"department\",localField: \"o_department_id\",foreignField: \"_id\",as: \"part\"}}"
    , "{$unwind: \"$part\"}"
    , "{$project: {name:\"$name\", department:\"$part.name\"}}"})
    Flux<EmployeeDepart> aggreFindAll();

    @Aggregation({"{$match: {name: ?0}}"
    , "{$addFields: {o_department_id: {$toObjectId: \"$department_id\"}}}"
    , "{$lookup: {from: \"department\",localField: \"o_department_id\",foreignField: \"_id\",as: \"part\"}}"
    , "{$unwind: \"$part\"}"
    , "{$project: {name:\"$name\", department:\"$part.name\"}}"})
    Mono<EmployeeDepart> aggreFindByName(String name);
}
@Test
    void aggreTestParam(){
        Mono<EmployeeDepart> employeeDepartMono = employeeRepository.aggreFindByName("홍길동");
        StepVerifier.create(employeeDepartMono)
                .assertNext(employeeDepart -> assertThat(employeeDepart.getName()).isEqualTo("홍길동"))
                .verifyComplete()
        ;
    }
profile
안녕하세요!! 좋은 개발 문화를 위해 노력하는 dev-well-being 입니다.
post-custom-banner

0개의 댓글