D2 Timeline 최우수작품 훑어보기

지난 한 달동안 진행한 프로젝트에서 우수상을 받았다. 프로젝트를 진행하면서 다른 사람들은 어떻게 개발할지, 최우수 작품은 어떻게 구현되어 있을지 궁금했다. 여러 복합적인 이유로 우리는 아키텍처를 최대한 단순하게 가져갔다. 프로젝트가 마무리 되고나서도 메시지 큐잉과 캐시 저장소를 적용하지 못한게 아쉬웠다. 해서 이번 리뷰는 메시지 큐잉과 캐시 저장소를 어떻게 적용했는지를 중점적으로 살펴 볼 예정이다.

본 코드들은 제가 작성한 코드가 아닌, D2 Fest mini Timeline부문 최우수 작품 리뷰입니다.

Kafka

Kafka는 아파치 재단의 pub-sub 모델의 메시지 큐, 분산환경에 특화되어 설계되어 있어 다른 메세지 큐와의 성능차이가 있다. 그 외에도 클러스터 구성, fail-over, replication와 같은 여러가지 특징을 가지고 있다.

Kafka는 pub-sub 모델이기에, producerconsumer가 있다. 둘은 서로의 존재를 모르고 오직 topic을 통해서 쓰고 읽고 한다. 그럼 당연히 코드 level에서도 이 3가지 요소에 대한 configration이 있을 거라고 쉽게 예상 할 수 있다. 하나씩 살펴보자

Topic

@Configuration
public class KafkaTopicConfig {
    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Value(value = "${message.topic.name}")
    private String topicName;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic(topicName, 1, (short) 1);
    }
}

spring-kafka 문서에 따르면 KafkaAdmin Been은 broker에게 자동으로 topic을 추가해준다. KafkaAdmin은 kafka 주소를 가지는 config 파일을 파라미터로 받는다.
NewTopic 의 생성자는 아래와 같다,

public NewTopic(String name, int numPartitions, short replicationFactor) 

위 코드에서는 1개의 파티션과 1개의 레플리카를 가지는 topic을 생성했다.

Producer Config

@Configuration
public class KafkaProducerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, PostingMessageModel> postingProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, PostingMessageModel> postingKafkaTemplate() {
        return new KafkaTemplate<>(postingProducerFactory());
    }

}

KafkaTemplate은 Topic에 데이터를 보내기 위한 메소드들을 제공한다.
Template을 이용하기 위해서 producerFactory를 이용 할 수 있다. 위 코드에서는 postingProducerFactory를 빈 등록하고 이를 이용해 postingKafkaTemplate를 빈 등록했다.

StringSerializer.class 와 JsonSerializer.class는 공식문서에서도 별 다른 설명없이 등록해준다.

@Service
public class KafkaProducer {

    private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);

    @Value(value = "${message.topic.name}")
    private String postingTopicName;

    private final KafkaTemplate<String, PostingMessageModel> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, PostingMessageModel> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}

    public void sendMessage(PostingMessageModel postingMessageModel) {
        ListenableFuture<SendResult<String, PostingMessageModel>> future = kafkaTemplate.send(postingTopicName, postingMessageModel);

        future.addCallback(new ListenableFutureCallback<SendResult<String, PostingMessageModel>>() {

            @Override
            public void onSuccess(SendResult<String, PostingMessageModel> result) {
                log.info("Sent message= [" + postingMessageModel.toString() + "] with offset=[" + result.getRecordMetadata().offset() + "]");
            }

            @Override
            public void onFailure(Throwable ex) {
                log.info("Message 전달 오류 " + postingMessageModel.toString() + "] due to : " + ex.getMessage());
            }
        });

    }
}

ProducerKafkaTemplate을 이용해서 데이터를 Topic에 전송한다.
send 메소드로 데이터를 전송하고 나면 ListenableFuture<SendResult> 객체가 리턴되고, 이 객체에 callback을 등록 할 수 있다.

ConsumerConfig

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Value(value = "${kafka.groupId}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, PostingMessageModel> postingConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.timeline.api.application.model");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, PostingMessageModel> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, PostingMessageModel> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(postingConsumerFactory());
        return factory;
    }
}

KafkaListener로 구독할 토픽과 처리할 메소드를 지정한다.

Counsumer

@Service
public class KafkaConsumer {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);

    private final TimelineRepository timelineRepository;
    private final PostService postService;

    public KafkaConsumer(TimelineRepository timelineRepository, PostService postService) {
        this.timelineRepository = timelineRepository;
        this.postService = postService;
    }

    // 카프카 리스너, PostId와 팔로워 리스트를 타임라인 테이블에 저장하는 async 함수로 넘겨준다
    @KafkaListener(topics = "${message.topic.name}", groupId = "${kafka.groupId}")
    public void postingListener(PostingMessageModel postingMessageModel) {
        log.info("Kafka Consumer Listening");
        UUID stringId = postingMessageModel.getPostId();
        List<String> followers = postingMessageModel.getFollowerId();
        for (String follower : followers) {
            postService.insertToTimelineTable(stringId, follower);
        }
    }
}

KafkaListener 어노테이션을 통해 구독할 topic과 consumer groupId를 지정해준다. 해당 topic에 메시지가 들어올 경우 어노테이션이 달린 메소드가 수행된다.

kafka에 대한 기본 설정들을 확인 했으니 게시물이 작성되는 로직을 살펴보자

@CacheEvict(value = "home", key = "#userId")
    @Override
    @Transactional
    public PostingResponse savePost(String userId, String content) {
        Post post = new Post();
        post.setUserId(userId);
        post.setUserName(accountRepository.findByUserId(userId).orElseThrow(ServiceException.UserIsNotExistException::new).getUserName());
        post.setContent(content);
        postRepository.save(post);
        savePostToUserHome(userId, post.getPostId());

        List<FollowerListResponse> followerList = followService.getFollowerList(userId);
        PostingMessageModel postingMessageModel = new PostingMessageModel();
        postingMessageModel.setPostId(post.getPostId());
        postingMessageModel.setFollowerId(followerList.stream()
                                                      .map(FollowerListResponse::getUserId)
                                                      .collect(Collectors.toList()));
        kafkaProducer.sendMessage(postingMessageModel);
        return modelMapper.map(post, PostingResponse.class);
    }

전체 로직은 아래와 같다.

  • 게시물을 post 테이블에 저장
  • home 테이블에 저장
  • FollowerList를 가져와 Topic에 담을 메시지 생성
  • kafka 큐에 메시지 저장
  • 큐에 저장된 메시지를 cousumer가 소비하면서 timeline 테이블에 저장
    여기서 조금 많이 감탄했다. 프로젝트를 진행하면서 여러 자료를 찾아 봤는데 그 중 LINE 소셜 네트워크 서비스의 아키텍처의 글을 거의 완벽하게 구현했기 때문이다. 나는 저 글을 보면서 머리로 이해하기 조차 버거웠고, 설마 이정도까지 구현하는 사람이 있을까 싶었다. 해서 전체적인 컨셉만 참고하는 방향으로 프로젝트를 진행했는데, 저걸 완벽하게 이해하고 구현을 해내다니... 대단한 사람이다. 역시 아직도 많이 부족하다는 생각을 하게 되었다.

    Build Pattern을 적용하지 않고 setter를 사용한 점은 조금 아쉽다.

그 외 새로 알게 된 것

@EntityListeners

@EntityListeners 는 엔티티를 DB에 적용하기 이전, 이후에 커스텀 콜백을 요청 할 수 있는 어노테이션이다. 본 코드에서는 어노테이션을 이용해 AuditingEntityListener 를 지정해주는데, 이 클래스는 spring-data-jpa 에서 audit 기능을 제공하는 클래스이다. 우리 프로젝트에서는 lombok 을 이용해서 이 부분을 처리를 했다.

@Table(name = "ACCOUNT", indexes = {@Index(columnList = "USER_NAME")})

@Table 에서 테이블 인덱싱을 설정할 수 있다. 나는 이 방법을 모르고 있어 실제 DBMS에서 튜닝을 했다.

기본키, 참조키 는 자동으로 인덱싱 해준다.

@PreAuthorize

Spring Security 는 기본적으로 ROLE 기반의 권한처리가 기본

아래와 같이 @Secured 를 사용해서 사용자 권한정보에 따라 메소드 접근을 제한 할 수 있다.

@Secured("ROLE_ADMIN")
@RequestMapping( value = "/my/api/address", method = RequestMethod.GET )
public String somthingMethod( HttpServletRequest request, HttpServletResponse response ){
    return "Hello, World!";
}

@PostAuthorize는 함수를 실행하고 클라이언트에게 응답을 보내기 전에 권한을 확인한다.

@PostAuthorize("isAuthenticated() and (( returnObject.name == principal.name ) or hasRole('ROLE_ADMIN'))")
@RequestMapping( value = "/{seq}", method = RequestMethod.GET )
public User getuser( @PathVariable("seq") long seq ){
    return userService.findOne(seq);
}

위 코드에 쓰인 표현식을 알아보자

  • isAuthenticated() : 현재 유저가 익명유저(로그인이 되지 않은)가 아니라면 true 반환

  • principal : 사용자를 증명하는 주요객체(User)에 접근 할 수 있다.

  • hasRole([role]) : 현재 사용자의 권한이 파라미터의 권한과 동일한 경우 true

그렇다면 위 메소드는 함수를 실행하고 클라이언트에게 응답을 보내기 전에 아래와 같은 검증을 수행한다.

로그인된 유저인가 And ( 반환객체의 이름과 현재 사용자의 이름이 같은가 OR 관리자 권한을 가진 유저인가)

실제 코드에서 쓰인 @PreAuthorize 는 함수를 수행하기 전에 권한을 확인한다.

@PreAuthorize("hasRole('ROLE_USER')")
    @GetMapping("/api/user/search")
    public List<UserListResponse> getUserList(@RequestParam("name") String name) {
        return userService.getUserList(name);
    }

실제 코드에서는 회원만 유저 조회를 할 수 있게 하는데 사용되었다.

마무리

원래는 메시지 큐잉과 더불어서 캐시저장을 어떻게 구현했는지까지 살펴볼려고 했으나 메시지 큐잉만 살펴보는데도 생각보다 많은 시간이 걸렸다. 빠른 시간내에 다시 코드를 살펴보는 시간을 갖도록 해야겠다.

kafka에 대한 한글로 된 예제가 많이 없어 공식문서를 참고하면서 코드리뷰를 진행했다. 처음부터 이 코드를 작성한 분은 얼마나 괴물인거야....

Reference

@PreAuthorize : https://steemit.com/kr-dev/@igna84/spring-security-preauthorize-postauthorize
Kafka : https://docs.spring.io/spring-kafka/docs/2.2.8.RELEASE/reference/html/#reference