
SNS의 피드를 구성하는 방법 중 하나
pull model 도 있음
A가 신규 게시물을 작성하면 A 팔로워들의 피드 목록에 신규 게시물 ID를 push하는 방식으로 피드를 구성
신규 게시물 작성 과 팔로워들의 피드 목록에 신규 게시물 ID를 push 로직은 서로 다른 관심사임
동기적으로 처리할 시 팔로워들이 많다면 각 팔로워들의 피드 목록에 push하는 로직이 오래 걸려서 게시물 생성 로직이 너무 오래 걸림
implementation 'org.springframework.kafka:spring-kafka'
kafka:
bootstrap:
servers: localhost:9092
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, Long> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Long> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, Long> feedConsumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "sns-feed");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConsumerFactory<String, Long> postDeleteConsumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "post");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Long> feedKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Long> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(feedConsumerFactory());
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Long> postDeleteKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Long> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(postDeleteConsumerFactory());
return factory;
}
}
Component
@RequiredArgsConstructor
@Slf4j
public class FeedUpdateProducer {
private final KafkaTemplate<String, Long> kafkaTemplate;
public void send(String topic, Long userId, Long postId) {
log.info("sending postId = {} to topic = {}", postId, topic);
kafkaTemplate.send(topic, userId.toString(), postId);
}
}
send()를 통해 메시지 생성topic을 처리하는 KafkaConsumer가 메시지 수신해서 로직 처리@Component
@RequiredArgsConstructor
@Slf4j
public class FeedUpdateConsumer {
private final RedisTemplate<String, Object> redisTemplate;
// consumer 설정
@KafkaListener(topics = "feed", groupId = "sns-feed", containerFactory = "feedKafkaListenerContainerFactory")
public void receive(ConsumerRecord<String, Long> consumerRecord) {
Long userId = Long.parseLong(consumerRecord.key());
Long postId = consumerRecord.value();
log.info("=========== received userID = {}, postID = {}", userId, postId);
// Redis에서 userId의 팔로워 ID 목록 가져오기
Set<Object> followerIds = redisTemplate.opsForSet().members("followers:" + userId);
if (followerIds == null) {
log.error("====================== userID {}는 팔로워가 없습니다.", userId);
return;
}
log.info("=========== followerIds = {}", followerIds);
// 내 피드 목록에도 내가 생성한 postId 넣기
redisTemplate.opsForList().leftPush("feed:" + userId, postId);
// 각 팔로워의 피드목록에 postId 넣기
followerIds.forEach(
id -> {
String feedKey = "feed:" + id;
redisTemplate.opsForList().leftPush(feedKey, postId);
});
log.info("=========== feed push success!");
}
}
feed라는 topic을 처리하는 consumer 로직Redis에서 userId의 팔로워 ID 리스트를 가져온다
내 피드 목록에 내가 생성한 postId 넣기
가져온 팔로워 ID 리스트를 순회하면서 각 팔로워의 피드 목록에 postId 넣기
@Transactional
public void insertPost(CreatePostReq createPostReq, Long userId) {
PostDTO newPost = PostDTO.builder()
.contents(createPostReq.getContents())
.userId(userId)
.createDate(LocalDateTime.now())
.updateDate(LocalDateTime.now())
.build();
// 게시물 내용 저장 (insertPost 정상 실행되면, newPost의 id 속성에 id값이 들어 있다)
postMapper.insertPost(newPost);
Long newPostId = newPost.getId();
// 로컬 디렉토리에 이미지 저장 후, DB에 이미지 정보 저장
imageService.saveImages(createPostReq.getImgFiles(),
newPostId);
// kafka에 메시지 발행 : 팔로워들의 피드목록에 내가 작성한 게시물 ID 넣기
feedUpdateProducer.send("feed", userId, newPostId);
}
A user가 게시물을 작성하면 DB에 게시물 저장하고
feedUpdateProducer를 통해 topic이 feed인 메시지를 발행함
FeedUpdateConsumer가 topic이 feed인 메시지 처리
A user의 팔로워 ID 리스트 가져와서 각 피드 목록에 신규 postId를 push
내 피드 목록에도 push
Redis에 저장되어 있는 피드 목록을 조회 (postId 리스트가 저장되어 있음)
피드 목록을 순회하면서 각 게시물의 정보를 가져옴
피드의 경우, 내가 팔로잉 하는 유저가 게시물 생성 시 내 피드 목록에 해당 게시물 ID가 들어옴
그럼 기존 피드 결과물 캐싱해놓은건 의미 없어짐
그래서 각 게시물을 캐싱해놓으면 피드 결과물 전체를 캐싱해놓는건 보단 좀 느리지만, 충분히 빠름
또한, 다른 유저들의 피드 결과물 생성할 때도 다른 유저로 인해 캐싱해놓은 게시물 정보를 사용할 수 있음