저는 현재 연극/뮤지컬/콘서트 예매 사이트를 만드는 프로젝트를 진행 중인데,
이번에는 Redis와 Kafka를 이용해서 사용자별로 작품을 추천해주는 기능을 만들고자 합니다.
ProductController.java
// 상품 목록 조회 API (단일 조회) @GetMapping("/products/{productId}") public ResponseEntity<ProductResponseDto> getProduct(@PathVariable Long productId, @CookieValue(value = "jwt", required = false) String token) { if (token == null) { throw new CustomException(CustomErrorCode.INVALID_TOKEN); // 401 에러 반환 } String email = jwtUtil.extractEmail(token); User user = userRepository.findByEmail(email) .orElseThrow(() -> new CustomException(CustomErrorCode.USER_NOT_FOUND)); // 연쇄 클릭 이벤트 저장 (이전 상품 → 현재 상품) userRecommendService.saveUserClickSequence(user.getId(), productId); userRecommendService.saveUserClick(user.getId(), productId); // kafka에 클릭 이벤트 저장 ProductResponseDto response = productService.getProduct(productId); return ResponseEntity.ok(response); }UserRecommendService.java
// 사용자의 클릭 이벤트 저장 @Transactional public void saveUserClick(Long userId, Long productId) { kafkaTemplate.send("user-click-events", userId + "," + productId); }
userRecommendService.saveUserClick(user.getId(), productId); 라인을 통해 상품을 조회할 때 마다 클릭 이벤트를 Kafka에 저장해줍니다.
ClickEventConsumer.java
@KafkaListener(topics = "user-click-events", groupId = "recommendation-group", containerFactory = "kafkaBatchListenerContainerFactory") public void consumeClickEvents(List<String> messages) { Map<String, Integer> categoryClicksMap = new HashMap<>(); Map<String, Integer> productClicksMap = new HashMap<>(); Map<String, String> lastViewedMap = new HashMap<>(); for (String message : messages) { String[] data = message.split(","); Long userId = Long.parseLong(data[0]); Long productId = Long.parseLong(data[1]); Product product = productRepository.findById(productId) .orElseThrow(() -> new CustomException(CustomErrorCode.PRODUCT_NOT_FOUND)); Long categoryId = product.getCategory().getId(); // 마지막 본 상품 저장 lastViewedMap.put(LAST_VIEWED_PRODUCT_KEY + userId, productId.toString()); // 카테고리 클릭 수 누적 incrementClickCount(categoryClicksMap, CATEGORY_CLICKS_KEY, userId, categoryId); // 상품 클릭 수 누적 incrementClickCount(productClicksMap, PRODUCT_CLICKS_KEY, userId, productId); } // Redis에 일괄 저장 (배치 처리) if (!lastViewedMap.isEmpty()) { redisTemplate.opsForValue().multiSet(lastViewedMap); } // 카테고리 클릭 수 saveClicks(categoryClicksMap); // 상품 클릭 수 saveClicks(productClicksMap); } // 클릭 카운트를 누적 private void incrementClickCount(Map<String, Integer> clicksMap, String baseKey, Long userId, Long itemId) { String key = baseKey + userId + ":" + itemId; clicksMap.put(key, clicksMap.getOrDefault(key, 0) + 1); } // ClicksMap 별로 Redis에 저장 private void saveClicks(Map<String, Integer> clicksMap) { clicksMap.forEach((fullKey, count) -> { String[] splitKey = fullKey.split(":"); String redisKey = String.join(":", Arrays.copyOf(splitKey, splitKey.length - 1)); String field = splitKey[splitKey.length - 1]; redisTemplate.opsForHash().increment(redisKey, field, count); }); }
KafkaListener를 이용해 배치 시스템을 설정하여 설정 시간 or 설정 메시지량에 따라 Kafka에 누적된 메시지를 일정 시간마다 Redis로 전송합니다.
이렇게 하면 설정은 완료됩니다.
이제 프론트에서 UserRecommendController로 요청을 보내면,
UserRecommendController.java
// 사용자가 많이 클릭한 카테고리의 인기 상품 추천 @GetMapping("/click") public ResponseEntity<List<ProductResponseDto>> getRecommendedProductsByCategory(@CookieValue(value = "jwt", required = false) String token) { String email = jwtUtil.extractEmail(token); User user = userRepository.findByEmail(email) .orElseThrow(() -> new CustomException(CustomErrorCode.USER_NOT_FOUND)); List<Product> recommendedProducts = userRecommendService.getRecommendedProductsByCategory(user.getId()); List<ProductResponseDto> responseDtos = recommendedProducts.stream() .map(ProductResponseDto::fromEntity) .collect(Collectors.toList()); return ResponseEntity.ok(responseDtos); }
userRecommendService.getRecommendedProductsByCategory(user.getId()); 를 통해 상품 리스트들을 반환해줍니다.
UserRecommendService.java
// 인기 장르 추출 @Transactional public Long getMostClickedCategory(Long userId) { String redisKey = "user:clicks:categories:" + userId; // Redis에서 가장 많이 클릭된 카테고리 찾기 Map<Object, Object> categoryClicks = redisTemplate.opsForHash().entries(redisKey); return categoryClicks.entrySet().stream() .max(Comparator.comparingLong(e -> Long.parseLong(e.getValue().toString()))) .map(entry -> Long.parseLong(entry.getKey().toString())) .orElse(null); }// 가장 많이 클릭한 카테고리의 인기 작품 추출 @Transactional public List<Product> getRecommendedProductsByCategory(Long userId) { // 가장 많이 클릭한 카테고리 가져오기 Long mostClickedCategory = getMostClickedCategory(userId); if (mostClickedCategory == null) { return Collections.emptyList(); // 클릭한 카테고리가 없으면 빈 리스트 반환 } // 해당 카테고리에서 판매량이 높은 상품을 가져오기 return productRepository.findTop5ByCategoryIdOrderBySalesCountDesc(mostClickedCategory); }
넘겨준 UserId를 기반으로 Redis에서 가장 많이 클릭된 카테고리를 찾아서 해당 카테고리의 판매량이 높은 상품 5개를 List형태로 넘겨줍니다.

이렇게 하면 사용자별 자주 클릭한 장르의 인기 작품을 추천해 줄 수 있습니다. (현재 작품이 장르별로 많이 등록되어 있지 않아 3개만 추천됩니다.)
이번에는 다른 사용자들이 연쇄적으로 클릭한 작품 로그를 수집해서, 사용자가 많이 클릭한 상품의 연관 상품들을 추천해주려고 합니다.
ProductController.java
// 상품 목록 조회 API (단일 조회) @GetMapping("/products/{productId}") public ResponseEntity<ProductResponseDto> getProduct(@PathVariable Long productId, @CookieValue(value = "jwt", required = false) String token) { if (token == null) { throw new CustomException(CustomErrorCode.INVALID_TOKEN); // 401 에러 반환 } String email = jwtUtil.extractEmail(token); User user = userRepository.findByEmail(email) .orElseThrow(() -> new CustomException(CustomErrorCode.USER_NOT_FOUND)); // 연쇄 클릭 이벤트 저장 (이전 상품 → 현재 상품) userRecommendService.saveUserClickSequence(user.getId(), productId); userRecommendService.saveUserClick(user.getId(), productId); // kafka에 클릭 이벤트 저장 ProductResponseDto response = productService.getProduct(productId); return ResponseEntity.ok(response); }UserRecommendService.java
// 연쇄 클릭 이벤트 저장 (이전 상품 A → 현재 상품 B) @Transactional public void saveUserClickSequence(Long userId, Long currentProductId) { String redisKey = LAST_VIEWED_PRODUCT_KEY + userId; ValueOperations<String, String> ops = redisTemplate.opsForValue(); String lastViewedProductId = ops.get(redisKey); // 만약 이전에 봤던 상품이 있다면, A → B 관계 저장 if (lastViewedProductId != null && !lastViewedProductId.equals(currentProductId.toString())) { Long previousProductId = Long.parseLong(lastViewedProductId); kafkaTemplate.send("related-product-events", userId + "," + previousProductId + "," + currentProductId); } // Redis에 현재 상품을 최신값으로 갱신 ops.set(redisKey, currentProductId.toString()); }
먼저 이번에도 상품을 클릭하면 해당 내용을 Kafka에 저장합니다.
저장 방식은 이전에 ClickEventConsumer.java 파일을 보면 사용자 별로 마지막에 본 작품을 저장하는데, 이를 이용해서 그 기록이 남아있고, 현재 다른 작품을 클릭한다면(A -> B) 그 상품을 연관관계로 묶어 Kafka에 저장합니다.
RelatedProductConsumer.java
@KafkaListener(topics = "related-product-events", groupId = "recommendation-group", containerFactory = "kafkaBatchListenerContainerFactory") public void consumeClickChain(List<String> messages) { Map<String, Map<String, Double>> clickChainMap = new HashMap<>(); for (String message : messages) { String[] parts = message.split(","); if (parts.length != 3) continue; String productA = parts[1]; String productB = parts[2]; String redisKey = "related:clicks:" + productA; clickChainMap .computeIfAbsent(redisKey, k -> new HashMap<>()) .merge(productB, 1.0, Double::sum); } // Redis에 배치 저장 clickChainMap.forEach((key, relatedProducts) -> relatedProducts.forEach((productB, score) -> redisTemplate.opsForZSet().incrementScore(key, productB, score) )); }
또한 앞전에 설명한 것과 같이 배치 시스템을 이용하여 일정 기간동안 모았던 메시지를 Redis로 전송합니다.
UserRecommendController.java
// 다른 사용자들이 연쇄적으로 클릭한 상품 추천 @GetMapping("/chain") public ResponseEntity<List<ProductResponseDto>> getRecommendedProductsByChain(@CookieValue(value = "jwt", required = false) String token) { String email = jwtUtil.extractEmail(token); User user = userRepository.findByEmail(email) .orElseThrow(() -> new CustomException(CustomErrorCode.USER_NOT_FOUND)); List<Product> recommendedProducts = userRecommendService.getRecommendedProductsByChain(user.getId()); List<ProductResponseDto> responseDtos = recommendedProducts.stream() .map(ProductResponseDto::fromEntity) .collect(Collectors.toList()); return ResponseEntity.ok(responseDtos); }
UserRecommendService.java
@Transactional public List<Product> getRecommendedProductsByChain(Long userId) { // 사용자가 가장 많이 클릭한 상품 찾기 String userClicksKey = "user:clicks:products:" + userId; Map<Object, Object> clickedProducts = redisTemplate.opsForHash().entries(userClicksKey); if (clickedProducts.isEmpty()) { return Collections.emptyList(); // 클릭 기록이 없으면 빈 리스트 반환 } // 가장 많이 클릭한 상품 찾기 Long mostClickedProductId = clickedProducts.entrySet().stream() .max(Comparator.comparingLong(entry -> Long.parseLong(entry.getValue().toString()))) .map(entry -> Long.parseLong(entry.getKey().toString())) .orElse(null); if (mostClickedProductId == null) { return Collections.emptyList(); } // 해당 상품의 연관 상품 ID 목록 가져오기 String redisKey = "related:clicks:" + mostClickedProductId; Set<String> recommendedProductIds = redisTemplate.opsForZSet() .reverseRange(redisKey, 0, 4); // 상위 5개 추천 if (recommendedProductIds == null || recommendedProductIds.isEmpty()) { return Collections.emptyList(); } // 연관 상품 ID 리스트를 Product 엔티티로 변환 List<Long> productIds = recommendedProductIds.stream() .map(Long::parseLong) .collect(Collectors.toList()); return productRepository.findAllById(productIds); }
위와 같은 방법으로 프론트에서 UserRecommendController를 호출하면 가장 많이 클릭한 상품을 추출해서 해당 상품의 연관 상품을 리스트 형태로 보내주어 추천해줍니다.

이번에는 Redis에 저장되는 메시지를 정리해 보려고 합니다
- String 타입
- 유저별로 가장 마지막에 본 상품 ID를 저장, 연관 상품 저장을 위해 쓰임
- GET user:lastViewed:1 을 통해 값 확인 가능
- Hash 타입
- 유저별 카테고리 클릭 수 확인
- HGETALL user:clicks:categories:1 를 통해 값 확인 가능
- 홀수 값들은 카테고리 ID, 짝수 값들은 해당 카테고리 ID의 조회수를 의미
- Hash 타입
- 유저별 상품 클릭 수 확인
- HGETALL user:clicks:products:1 을 통해 값 확인 가능
- 홀수 값들은 상품 ID, 짝수 값들은 해당 상품의 조회수를 의미
- ZSET 타입
- 해당 상품의 연관 상품들을 확인
- ZREVRANGE related:clicks:5 0 -1 WITHSCORES 을 통해 값 확인 가능
- 홀수 값들은 상품 ID, 짝수 값들은 해당 상품의 조회수를 의미 (내림차순 정렬)
Redis 사용에 미숙치 않아 처음에는 여러 값들을 저장할때 Hash 타입을 사용했는데, 이번 프로젝트를 하면서 ZSET 타입에 대해 알게 됐습니다.
ZSET은 Sorted Set이라고도 불리며, 각 요소에 score(점수)를 부여하여 점수 기준으로 자동 정렬을 해줍니다. 따라서 가장 자주 연쇄 클릭된 상품을 부가 구현 없이 쉽게 찾을 수 있었습니다.
다른 Hash 타입의 경우에도 ZSET 타입을 쓰게 되면 더 편리하게 상위 값들을 조회하고 추출할 수 있으므로 앞으로 더 애용할 것 같습니다.