지난 한 달동안 진행한 프로젝트에서 우수상을 받았다. 프로젝트를 진행하면서 다른 사람들은 어떻게 개발할지, 최우수 작품은 어떻게 구현되어 있을지 궁금했다. 여러 복합적인 이유로 우리는 아키텍처를 최대한 단순하게 가져갔다. 프로젝트가 마무리 되고나서도 메시지 큐잉과 캐시 저장소를 적용하지 못한게 아쉬웠다. 해서 이번 리뷰는 메시지 큐잉과 캐시 저장소를 어떻게 적용했는지를 중점적으로 살펴 볼 예정이다.
본 코드들은 제가 작성한 코드가 아닌, D2 Fest mini Timeline부문 최우수 작품 리뷰입니다.
Kafka는 아파치 재단의 pub-sub 모델의 메시지 큐, 분산환경에 특화되어 설계되어 있어 다른 메세지 큐와의 성능차이가 있다. 그 외에도 클러스터 구성, fail-over, replication와 같은 여러가지 특징을 가지고 있다.
Kafka는 pub-sub 모델이기에, producer
와 consumer
가 있다. 둘은 서로의 존재를 모르고 오직 topic
을 통해서 쓰고 읽고 한다. 그럼 당연히 코드 level에서도 이 3가지 요소에 대한 configration이 있을 거라고 쉽게 예상 할 수 있다. 하나씩 살펴보자
Topic
@Configuration
public class KafkaTopicConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value(value = "${message.topic.name}")
private String topicName;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return new NewTopic(topicName, 1, (short) 1);
}
}
spring-kafka 문서에 따르면 KafkaAdmin
Been은 broker에게 자동으로 topic을 추가해준다. KafkaAdmin
은 kafka 주소를 가지는 config 파일을 파라미터로 받는다.
NewTopic
의 생성자는 아래와 같다,
public NewTopic(String name, int numPartitions, short replicationFactor)
위 코드에서는 1개의 파티션과 1개의 레플리카를 가지는 topic을 생성했다.
Producer Config
@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, PostingMessageModel> postingProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, PostingMessageModel> postingKafkaTemplate() {
return new KafkaTemplate<>(postingProducerFactory());
}
}
KafkaTemplate
은 Topic에 데이터를 보내기 위한 메소드들을 제공한다.
Template을 이용하기 위해서 producerFactory
를 이용 할 수 있다. 위 코드에서는 postingProducerFactory
를 빈 등록하고 이를 이용해 postingKafkaTemplate
를 빈 등록했다.
StringSerializer.class 와 JsonSerializer.class는 공식문서에서도 별 다른 설명없이 등록해준다.
@Service
public class KafkaProducer {
private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
@Value(value = "${message.topic.name}")
private String postingTopicName;
private final KafkaTemplate<String, PostingMessageModel> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, PostingMessageModel> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}
public void sendMessage(PostingMessageModel postingMessageModel) {
ListenableFuture<SendResult<String, PostingMessageModel>> future = kafkaTemplate.send(postingTopicName, postingMessageModel);
future.addCallback(new ListenableFutureCallback<SendResult<String, PostingMessageModel>>() {
@Override
public void onSuccess(SendResult<String, PostingMessageModel> result) {
log.info("Sent message= [" + postingMessageModel.toString() + "] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
log.info("Message 전달 오류 " + postingMessageModel.toString() + "] due to : " + ex.getMessage());
}
});
}
}
Producer
는 KafkaTemplate
을 이용해서 데이터를 Topic에 전송한다.
send
메소드로 데이터를 전송하고 나면 ListenableFuture<SendResult>
객체가 리턴되고, 이 객체에 callback을 등록 할 수 있다.
ConsumerConfig
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value(value = "${kafka.groupId}")
private String groupId;
@Bean
public ConsumerFactory<String, PostingMessageModel> postingConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.timeline.api.application.model");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, PostingMessageModel> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, PostingMessageModel> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(postingConsumerFactory());
return factory;
}
}
KafkaListener
로 구독할 토픽과 처리할 메소드를 지정한다.
Counsumer
@Service
public class KafkaConsumer {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
private final TimelineRepository timelineRepository;
private final PostService postService;
public KafkaConsumer(TimelineRepository timelineRepository, PostService postService) {
this.timelineRepository = timelineRepository;
this.postService = postService;
}
// 카프카 리스너, PostId와 팔로워 리스트를 타임라인 테이블에 저장하는 async 함수로 넘겨준다
@KafkaListener(topics = "${message.topic.name}", groupId = "${kafka.groupId}")
public void postingListener(PostingMessageModel postingMessageModel) {
log.info("Kafka Consumer Listening");
UUID stringId = postingMessageModel.getPostId();
List<String> followers = postingMessageModel.getFollowerId();
for (String follower : followers) {
postService.insertToTimelineTable(stringId, follower);
}
}
}
KafkaListener
어노테이션을 통해 구독할 topic과 consumer groupId를 지정해준다. 해당 topic에 메시지가 들어올 경우 어노테이션이 달린 메소드가 수행된다.
kafka에 대한 기본 설정들을 확인 했으니 게시물이 작성되는 로직을 살펴보자
@CacheEvict(value = "home", key = "#userId")
@Override
@Transactional
public PostingResponse savePost(String userId, String content) {
Post post = new Post();
post.setUserId(userId);
post.setUserName(accountRepository.findByUserId(userId).orElseThrow(ServiceException.UserIsNotExistException::new).getUserName());
post.setContent(content);
postRepository.save(post);
savePostToUserHome(userId, post.getPostId());
List<FollowerListResponse> followerList = followService.getFollowerList(userId);
PostingMessageModel postingMessageModel = new PostingMessageModel();
postingMessageModel.setPostId(post.getPostId());
postingMessageModel.setFollowerId(followerList.stream()
.map(FollowerListResponse::getUserId)
.collect(Collectors.toList()));
kafkaProducer.sendMessage(postingMessageModel);
return modelMapper.map(post, PostingResponse.class);
}
전체 로직은 아래와 같다.
Build Pattern을 적용하지 않고 setter를 사용한 점은 조금 아쉽다.
@EntityListeners
@EntityListeners
는 엔티티를 DB에 적용하기 이전, 이후에 커스텀 콜백을 요청 할 수 있는 어노테이션이다. 본 코드에서는 어노테이션을 이용해 AuditingEntityListener
를 지정해주는데, 이 클래스는 spring-data-jpa
에서 audit 기능을 제공하는 클래스이다. 우리 프로젝트에서는 lombok
을 이용해서 이 부분을 처리를 했다.
@Table(name = "ACCOUNT", indexes = {@Index(columnList = "USER_NAME")})
@Table
에서 테이블 인덱싱을 설정할 수 있다. 나는 이 방법을 모르고 있어 실제 DBMS에서 튜닝을 했다.
기본키, 참조키 는 자동으로 인덱싱 해준다.
@PreAuthorize
Spring Security
는 기본적으로 ROLE 기반의 권한처리가 기본
아래와 같이 @Secured
를 사용해서 사용자 권한정보에 따라 메소드 접근을 제한 할 수 있다.
@Secured("ROLE_ADMIN")
@RequestMapping( value = "/my/api/address", method = RequestMethod.GET )
public String somthingMethod( HttpServletRequest request, HttpServletResponse response ){
return "Hello, World!";
}
@PostAuthorize
는 함수를 실행하고 클라이언트에게 응답을 보내기 전에 권한을 확인한다.
@PostAuthorize("isAuthenticated() and (( returnObject.name == principal.name ) or hasRole('ROLE_ADMIN'))")
@RequestMapping( value = "/{seq}", method = RequestMethod.GET )
public User getuser( @PathVariable("seq") long seq ){
return userService.findOne(seq);
}
위 코드에 쓰인 표현식을 알아보자
isAuthenticated()
: 현재 유저가 익명유저(로그인이 되지 않은)가 아니라면 true 반환
principal
: 사용자를 증명하는 주요객체(User)에 접근 할 수 있다.
hasRole([role])
: 현재 사용자의 권한이 파라미터의 권한과 동일한 경우 true
그렇다면 위 메소드는 함수를 실행하고 클라이언트에게 응답을 보내기 전에 아래와 같은 검증을 수행한다.
로그인된 유저인가 And ( 반환객체의 이름과 현재 사용자의 이름이 같은가 OR 관리자 권한을 가진 유저인가)
실제 코드에서 쓰인 @PreAuthorize
는 함수를 수행하기 전에 권한을 확인한다.
@PreAuthorize("hasRole('ROLE_USER')")
@GetMapping("/api/user/search")
public List<UserListResponse> getUserList(@RequestParam("name") String name) {
return userService.getUserList(name);
}
실제 코드에서는 회원만 유저 조회를 할 수 있게 하는데 사용되었다.
원래는 메시지 큐잉과 더불어서 캐시저장을 어떻게 구현했는지까지 살펴볼려고 했으나 메시지 큐잉만 살펴보는데도 생각보다 많은 시간이 걸렸다. 빠른 시간내에 다시 코드를 살펴보는 시간을 갖도록 해야겠다.
kafka에 대한 한글로 된 예제가 많이 없어 공식문서를 참고하면서 코드리뷰를 진행했다. 처음부터 이 코드를 작성한 분은 얼마나 괴물인거야....
@PreAuthorize : https://steemit.com/kr-dev/@igna84/spring-security-preauthorize-postauthorize
Kafka : https://docs.spring.io/spring-kafka/docs/2.2.8.RELEASE/reference/html/#reference
안녕하세요. D2 Fest mini Timeline부분 최우수상을 수상한 한승우라고 합니다 :) 제 Repo를 보다가 어쩌다보니 이 글 까지 보게되었네요. 부족한 제 코드를 이렇게 봐주시고 리뷰해주셔서 감사하다는 말씀 전하고 싶어서 댓글을 달게 되었습니다. 저도 이 블로그를 통해 새로운 지식 많이 얻고 갑니다. 감사합니다!