
main 디렉토리에 있는 application.properties를 삭제하고 application.yml 파일을 생성하고 작성
server:
port: 7000
spring:
datasource:
driver-class-name: org.mariadb.jdbc.Driver
url: jdbc:mariadb://localhost:3306/springcqrs
username: root
password: mypassword
jpa:
hibernate:
ddl-auto: update
properties:
hibernate:
format_sql: true
show_sql: true
설정 클래스를 추가하고 작성(WebConfig)
package com.bh.cqrswrite;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configurable
public class WebConfig implements WebMvcConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry){
registry.addMapping("/**")
.allowedOrigins("http:/localhost:3000");
}
}
ddl-auto 가 update로 설정되어 있으면 이 클래스를 수정하면 관계형 데이터베이스에 자동 반영됩니다.
package com.bh.cqrswrite;
import jakarta.persistence.*;
import lombok.*;
import java.util.Date;
//이 클래스가 데이터베이스 와 연동되는 클래스라는 것을 명시
@Entity
//테이블 이름
@Table(name="book")
//인스턴스 출력할 때 사용할 메서드
@ToString
//접근자 메서드
@Getter
//생성자를 이용해서 값을 대입하지 않고 .을 이용해서 메서드를 연속적으로 호출해서 값 설정
@Builder
//모든 속성을 대입받아서 생성하는 생성자
@AllArgsConstructor
//매개변수 없는 생성자를 생성
@NoArgsConstructor
public class Book {
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private Long bid;
@Column(length = 50, nullable = false)
private String title;
@Column(length = 50, nullable = false)
private String author;
@Column(length = 50, nullable = false)
private String category;
@Column
private int pages;
@Column
private int price;
@Column
private Date published_Date;
@Column(length = 50, nullable = false)
private String description;
}
클래스를 만들고 프로젝트를 실행하면 데이터베이스에 테이블이 자동으로 생성됩니다.
use springcqrs;
show tables;
Django는 Model 이 테이블 과 매핑이 되고 데이터베이스 작업도 수행할 수 있도록 만들어지는데 Spring에서는 Entity는 데이터 모델의 역할만 하고 작업은 Repository 인터페이스가 담당합니다.
인터페이스 생성(BookRepository)
package com.bh.cqrswrite;
import org.springframework.data.jpa.repository.JpaRepository;
//Book 테이블에 CRUD 작업을 수행할 수 있는 인스턴스를 만들어서 사용할 수 있도록 해주는 인터페이스
public interface BookRepository extends JpaRepository<Book, Long> {
}
Controller 와 Service 계층 사이에 데이터 전달을 위한 클래스를 생성(BookDTO)
package com.bh.cqrswrite;
import lombok.Data;
@Data
public class BookDTO {
private String title;
private String author;
private String category;
private int pages;
private int price;
private String publishedDate;
private String description;
}
Service 클래스 생성(BookService)
package com.bh.cqrswrite;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.Date;
@Service
@RequiredArgsConstructor
//django 에서는 views.py 파일의 역할
public class BookService {
private final BookRepository bookRepository;
//데이터 저장
//파라미터를 받아서 엔티티를 생성하고 repository를 이용해서 삽입
public void saveBook(BookDTO bookDTO){
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
//ParseException 처리를 강제합니다.
Date publishedDate = simpleDateFormat.parse(bookDTO.getPublishedDate());
//빌더 패턴을 이용한 Entity 생성
Book book = Book.builder().
title(bookDTO.getTitle()).
author(bookDTO.getAuthor()).
category(bookDTO.getCategory()).
pages(bookDTO.getPages()).
price(bookDTO.getPrice()).
published_Date(publishedDate).
description(bookDTO.getDescription()).
build();
//데이터베이스에 데이터 삽입
bookRepository.save(book);
}
catch (Exception e){
System.out.println(e.getMessage());
}
}
}
package com.bh.cqrswrite;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequiredArgsConstructor
public class BookController {
private final BookService bookService;
@PostMapping("/cqrs/book")
public String saveBook(@RequestBody BookDTO bookDTO) {
bookService.saveBook(bookDTO);
return "Saved";
}
}
실행한 후 POSTMAN을 이용하여
http://127.0.0.7000/cqrs/book 에 데이터 삽입 요청
{
"title":"spring cqrs write",
"author":"bh",
"category":"IT",
"pages":160,
"price":16000,
"publishedDate":"2024-10-10",
"description":"없음"
}

데이터베이스에서 데이터 삽입 확인
select * from book;

server:
port: 8000
spring:
datasource:
driver-class-name: org.mariadb.jdbc.Driver
url: jdbc:mariadb://localhost:3306/springcqrs
username: root
password: mypassword
jpa:
hibernate:
ddl-auto: update
properties:
hibernate:
format_sql: true
show_sql: true
package com.bh.cqrsread;
import jakarta.persistence.*;
import lombok.*;
import java.util.Date;
//이 클래스가 데이터베이스 와 연동되는 클래스라는 것을 명시
@Entity
//테이블 이름
@Table(name="book")
//인스턴스 출력할 때 사용할 메서드
@ToString
//접근자 메서드
@Getter
//생성자를 이용해서 값을 대입하지 않고 .을 이용해서 메서드를 연속적으로 호출해서 값 설정
@Builder
//모든 속성을 대입받아서 생성하는 생성자
@AllArgsConstructor
//매개변수 없는 생성자를 생성
@NoArgsConstructor
public class Book {
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private Long bid;
@Column(length = 50, nullable = false)
private String title;
@Column(length = 50, nullable = false)
private String author;
@Column(length = 50, nullable = false)
private String category;
@Column
private int pages;
@Column
private int price;
@Column
private Date published_date;
@Column(length = 50, nullable = false)
private String description;
}
package com.bh.cqrsread;
import org.springframework.data.jpa.repository.JpaRepository;
//Book 테이블에 CRUD 작업을 수행할 수 있는 인스턴스를 만들어서 사용할 수 있도록 해주는 인터페이스
public interface BookRepository extends JpaRepository<Book, Long> {
}
package com.bh.cqrsread;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configurable
public class WebConfig implements WebMvcConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry){
registry.addMapping("/**")
.allowedOrigins("http:/localhost:3000");
}
}
: MariaDB의 데이터가 없었다면 MariaDB 설정이나 Book, BookRepository 그리고 이 작업은 수행할 필요가 없습니다.
ApplicationListener 인터페이스의 onApplicationEvent 메서드를 오버라이딩 하면 됩니다.(StartListener)
package com.bh.cqrsread;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import lombok.RequiredArgsConstructor;
import org.bson.Document;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import java.util.List;
// 싱글톤 패턴의 인스턴스를 자동 생성해주는 어노테이션
@Component
@RequiredArgsConstructor
public class StartListener implements ApplicationListener<ApplicationStartedEvent> {
private final BookRepository bookRepository;
@Override
// 어플리케이션이 시작하면 한 번만 호출되는 메서드
public void onApplicationEvent(ApplicationStartedEvent event) {
//MariaDB의 데이터 가져오기
List<Book> books = bookRepository.findAll();
//MongoDB 연결
MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017");
//컬렉션 연결
MongoDatabase mongoDatabase = mongoClient.getDatabase("springcqrs");
MongoCollection<Document> mongoBooks = mongoDatabase.getCollection("books");
//기존의 데이터 삭제
mongoBooks.drop();
mongoBooks = mongoDatabase.getCollection("books");
// 데이터 복사
for (Book book : books) {
Document mongoBook = new Document();
mongoBook.append("bid", book.getBid());
mongoBook.append("title", book.getTitle());
mongoBook.append("author", book.getAuthor());
mongoBook.append("category", book.getCategory());
mongoBook.append("pages", book.getPages());
mongoBook.append("price", book.getPrice());
mongoBook.append("publishedDate", book.getPublished_date());
mongoBook.append("description", book.getDescription());
mongoBooks.insertOne(mongoBook);
}
}
}
use springcqrs
db.books.find({})

package com.bh.cqrsread;
import com.mongodb.client.*;
import org.bson.Document;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
@RestController
public class BookController {
@GetMapping("/cqrs/book")
public ResponseEntity<List> getBooks(){
MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017");
//컬렉션 연결
MongoDatabase mongoDatabase = mongoClient.getDatabase("springcqrs");
MongoCollection<Document> mongo_books = mongoDatabase.getCollection("books");
List<Document> list = new ArrayList<Document>();
try{
try(MongoCursor<Document> cur = mongo_books.find().iterator()){
while (cur.hasNext()){
Document doc = cur.next();
list.add(doc);
}
}
}catch(Exception e){
System.out.println(e.getMessage());
}finally{
mongoClient.close();
}
return ResponseEntity.status(HttpStatus.OK).body(list);
}
}

application.yml 파일에 사용하고자 하는 카프카에 대한 정보를 추가
server:
port: 7000
spring:
datasource:
driver-class-name: org.mariadb.jdbc.Driver
url: jdbc:mariadb://localhost:3306/springcqrs
username: root
password: mypassword
jpa:
hibernate:
ddl-auto: update
properties:
hibernate:
format_sql: true
show_sql: true
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: bh
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: org.apache.kafka.common.serialization.StringDeserializer
카프카 환경 설정 클래스를 추가(KafkaConfiguration)
package com.bh.cqrswrite;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory(){
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configs);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
BookDTO 클래스 수정
package com.bh.cqrswrite;
import lombok.Data;
@Data
public class BookDTO {
private Long bid;
private String title;
private String author;
private String category;
private int pages;
private int price;
private String publishedDate;
private String description;
}
카프카 메시지 전송 클래스 추가(KafkaProducer)
package com.bh.cqrswrite;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class KafkaProducer {
private static final String TOPIC = "cqrs-topic";
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(BookDTO bookDTO){
String message =
"{\"bid\":" + "\"" + bookDTO.getBid() + "\""
+ ", \"title\":" + "\"" + bookDTO.getTitle() + "\""
+ ", \"author\":" + "\"" + bookDTO.getAuthor() + "\""
+ ", \"category\":" + "\"" + bookDTO.getCategory() + "\""
+ ", \"pages\":" + "\"" + bookDTO.getPages() + "\""
+ ", \"price\":" + "\"" + bookDTO.getPrice() + "\""
+ ", \"published_date\":" + "\"" + bookDTO.getPublishedDate().toString() + "\""
+ ", \"description\":" + "\"" + bookDTO.getDescription() + "\""
+ "}";
kafkaTemplate.send(TOPIC, message);
}
}
Service 클래스 수정
package com.bh.cqrswrite;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.Date;
@Service
@RequiredArgsConstructor
//django 에서는 views.py 파일의 역할
public class BookService {
private final BookRepository bookRepository;
private final KafkaProducer kafkaProducer;
//데이터 저장
//파라미터를 받아서 엔티티를 생성하고 repository를 이용해서 삽입
public void saveBook(BookDTO bookDTO){
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
//ParseException 처리를 강제합니다.
Date publishedDate = simpleDateFormat.parse(bookDTO.getPublishedDate());
//빌더 패턴을 이용한 Entity 생성
Book book = Book.builder().
title(bookDTO.getTitle()).
author(bookDTO.getAuthor()).
category(bookDTO.getCategory()).
pages(bookDTO.getPages()).
price(bookDTO.getPrice()).
published_Date(publishedDate).
description(bookDTO.getDescription()).
build();
//데이터베이스에 데이터 삽입
bookRepository.save(book);
//kafka 에 메세지를 전송
bookDTO.setBid(book.getBid());
kafkaProducer.sendMessage(bookDTO);
}
catch (Exception e){
System.out.println(e.getMessage());
}
}
}

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cqrs-topic --from-beginning
implementation 'org.json:json:20231013'application.yml 파일에 카프카 설정 추가
server:
port: 8000
spring:
datasource:
driver-class-name: org.mariadb.jdbc.Driver
url: jdbc:mariadb://localhost:3306/springcqrs
username: root
password: mypassword
jpa:
hibernate:
ddl-auto: update
properties:
hibernate:
format_sql: true
show_sql: true
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: bh
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: org.apache.kafka.common.serialization.StringDeserializer
카프카 환경 설정 클래스를 추가(KafkaConfiguration)
-> write 프로젝트와 코드 동일
KafkaConsumer 클래스를 추가해서 메시지가 오면 메시지를 파싱한 후 몽고 데이터베이스에 저장
package com.bh.cqrsread;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.bson.Document;
import org.json.JSONObject;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "cqrs-topic", groupId = "bh")
public void consume(String message) throws IOException {
JSONObject messageObj = new JSONObject(message);
// mongoDB에 삽입
MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017");
// 컬렉션 연결
MongoDatabase database = mongoClient.getDatabase("springcqrs");
MongoCollection<Document> mongo_books = database.getCollection("books");
// 받은 데이터를 가지고 Document를 만들어서 mongoDB에 삽입
Document mongoBook = new Document();
mongoBook.append("bid", messageObj.getLong("bid"));
mongoBook.append("title", messageObj.getString("title"));
mongoBook.append("author", messageObj.getString("author"));
mongoBook.append("category", messageObj.getString("category"));
mongoBook.append("pages", messageObj.getInt("pages"));
mongoBook.append("price", messageObj.getInt("price"));
mongoBook.append("publishedDate", messageObj.getString("published_date"));
mongoBook.append("description", messageObj.getString("description"));
mongo_books.insertOne(mongoBook);
mongoClient.close();
}
}
읽기 프로젝트 실행하고, 데이터를 한 개 삽입한 후 GET 요청을 보내서 데이터가 보이는 지 확인
데이터 POST 요청 : http://localhost:7000/cqrs/book

데이터 GET 요청 : http://localhost:8000/cqrs/book
