이제 기존의 Server(Spring) To Server(FastAPI) 소켓 통신을 Kafka 통신으로 변경하려고 한다.
기존 Socket 통신
Socker 을 활용한 Server To Server를 구성할때는
한쪽이 서버 한쪽이 클라이언트로 구성되어 연동을 시도해야 한다.
저는 Spring Boot를 서버로 (Stomp)
FastAPI를 클라이언트로 구성하여 Server To Server with socket 연동을 구현했습니다.
따라서 FastAPI는 Socket 지속적으로 연동을 시도합니다.
Spring Server의 stomp 설정이다.
package com.ssafy.today.global.config;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@RequiredArgsConstructor
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/sub");
registry.setApplicationDestinationPrefixes("/pub");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*");
.withSockJS();
}
}
이후 Fast API로 메시지 요청시에는
Controller 일부
@PostMapping
public ResponseEntity<?> createDiary(HttpServletRequest request, @RequestBody DiaryContentRequest diaryContentRequest) {
Long memberId = (Long) request.getAttribute("memberId");
diaryContentRequest.setMemberId(memberId);
// 이미지를 제외한 diary 생성
DiaryResponse diaryResponse = diaryService.createDiary(memberId, diaryContentRequest);
diaryContentRequest.setCreatedAt(diaryResponse.getCreatedAt());
diaryContentRequest.setDiaryId(diaryResponse.getId());
// gpu 서버에 소켓통신을 통한 이미지 생성 요청 보내기
simpMessagingTemplate.convertAndSend("/sub/fastapi", diaryContentRequest);
System.out.println("Diary 생성 요청");
return getResponseEntity(SuccessCode.OK, diaryResponse);
}
simpMessagingTemplate.convertAndSend("/sub/fastapi", diaryContentRequest);
다음과 같이 템플릿을 가져와 소켓 통신을 진행할 수 있다.
Fast API의 메시지를 수신할때는
Controller 일부
public void createdDiary(DiaryContentCreated diaryContentCreated){
System.out.println("Diary 생성 완료");
// Analysis 에 저장, Diary 테이블에 저장, tempImg 테이블에 저장
analysisService.createOrUpdateAnalysis(diaryContentCreated.getMemberId(), diaryContentCreated);
diaryService.updateAfterCreateImg(diaryContentCreated);
tempImgService.createTempImages(diaryContentCreated);
// TODO : 클라이언트 알람 전송
}
@MessageMapping("/diary/created")
소켓이 연동된 이후 해당 ws 통신으로 오는 요청을 수신합니다.
FastAPI Socket 클라이언트 구성
async def connect_socket():
ws_url = f"wss://dangil.store/api/ws"
isConnected = False
websocket = {}
while True:
print("=================== ws connect start ===================")
if not isConnected:
try:
websocket = await websockets.connect(ws_url)
isConnected = True
await websocket.send("CONNECT\naccept-version:1.0,1.1,2.0\n\n\x00\n")
sub_offer = stomper.subscribe("/sub/fastapi", idx="fastapi", ack="auto")
await websocket.send(sub_offer)
while True:
try:
# Recv Listen
message = await websocket.recv()
# Get current event_loop
loop = asyncio.get_event_loop()
# Response Split
msg_type = message.split("\n")
# Destination /sub/fatapi 체크
if "/sub/fastapi" in msg_type[1]:
# Dict to JSON
# Execute AI Flow
result = await loop.run_in_executor(None, lambda: make_image(loads(msg_type[7].replace("\x00", ""))))
result = dumps(result, default=datetime_to_json_formatting)
send = stomper.send("/pub/diary/created", result, None, "application/json")
await websocket.send(send)
except websockets.ConnectionClosed as e:
print(f"WebSocket Disconnected", e)
isConnected = False
break
except:
isConnected = False
print("WebSocket Connection Failed")
time.sleep(10)
print("=================== ws connect end ===================")
FastAPI 쪽에서 Spring Server로 핸드쉐이크 연동을 시도합니다.
첫번째 while은 Spring 재로드시에 지속된 연동을 잡기 위해서 사용하고 있습니다.
두번째 while은 await을 걸어서 소켓의 메세지가 존재할때 한번의 루프가 실행된다.
하지만 Stomp 브로커는 Spring Server 에서 관리하고 해당 토픽(큐)에 들어있는 메세지는 Spring 서버가 재 로드되면 사라진다.
이때문에 결합성도 너무 높다는 문제점이 있다
더더욱 fast API에서 GPU 성능이 좋지 않아 병렬 처리를 제한하고 있고, 하나의 스레드로만 처리하고 있기 때문에 브로커에 많은 pub이 쌓여 있는데 서버가 죽는다면 이건 서비스에 큰 타격을 가져왔다.
해당 방법으로 Server To Server 양방향 통신을 사용하고 있었습니다.
하지만 Spring Version 바뀌고 새로운 컨테이너가 로드 될때 일시적으로 소켓 통신망이 끊어지고,
해당 상황에서 데이터가 유실되는 문제점이 존재했습니다.
따라서 -> Kafka의 장점인 데이터 무결성, 확장성, 탈중앙화을 활용해보고자 합니다.
Kafka 통신
저번 포스팅에서 학습한 내용을 바탕으로 Kafka 통신을 구축하고자 합니다.
Kafka 통신을 사용하기 때문에 이제 어느 쪽이 Server 가 되는 개념이 아니고, 모든 통신을 Kafka 에서 관리할수 있습니다.
따라서 각 Spring Server와 FastAPI 서버에 각 각 프로듀서와 컨슈머를 설정해놓고 사용하려고 합니다.
Spring Server
Producer 설정입니다.
package com.ssafy.today.global.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.server}")
private String KafkaServerIp;
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
// Broker 서버 설정
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaServerIp);
// Key & Value 직렬화 설정
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Consumer 설정입니다.
해당 프로젝트에서 토픽은 2개로 Spring Server는 하나의 컨슈머에 하나의 객체만 수신하므로 JsonDeserializer 를 사용했다.
package com.ssafy.today.global.config;
import com.ssafy.today.domain.diary.dto.request.DiaryContentCreated;
import com.ssafy.today.domain.diary.dto.request.DiaryContentRequest;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.server}")
private String KafkaServerIp;
@Value("${kafka.group}")
private String KafkaSpringGroup;
@Bean
public ConsumerFactory<String, DiaryContentCreated> consumerFactory() {
Map<String, Object> config = new HashMap<>();
// Broker 서버 설정
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaServerIp);
// consumer 그룹 설정
config.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaSpringGroup);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(DiaryContentCreated.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, DiaryContentCreated> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, DiaryContentCreated> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
FastAPI
init 파일의 일부입니다.
def connect_kafka():
print("=================== kafka connect start ===================")
producer = KafkaProducer(
bootstrap_servers=[os.getenv("KAFKA_SERVER")],
value_serializer=lambda x:dumps(x, default=datetime_to_json_formatting).encode('utf-8')
)
consumer = KafkaConsumer(
'image-request',
bootstrap_servers=[os.getenv("KAFKA_SERVER")],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id=os.getenv("KAFKA_GROUP"),
value_deserializer=lambda x: loads(x.decode('utf-8')),
max_poll_records=1,
max_poll_interval_ms=600000
)
app.utils.global_vars.producer = producer
app.utils.global_vars.consumer = consumer
print("=================== kafka connect end ===================")
async def consumer_listener():
consumer = app.utils.global_vars.consumer
producer = app.utils.global_vars.producer
loop = asyncio.get_running_loop()
while True:
message = consumer.poll()
if len(message) == 0:
continue
for topic_partition, records in message.items():
for record in records:
print(record.value)
result = await loop.run_in_executor(None, lambda: make_image(record.value))
producer.send('image-created', value=result)
producer.flush()
카프카의 기본 연결 설정과 컨슈머 listener 설정을 진행합니다.
해당 Fast API 에서는 수신후 송신 기능만 있기 때문에,
consumer_listener() 에서 데이터를 수신하고
이미지 로드가 완료된 이후에 Producer를 활용해서 Kafka에 데이터를 송신합니다.
카프카를 적용해도 GPU 서버에서 사진을 생성하는 과정을 하나의 스레드만 사용해서 처리했기에 속도적인 큰 증가는 가져올수 없었습니다.
하지만 FastAPI에서 Offset을 확인하며 데이터를 가져오기에 중단된 순서부터 이어서 사진 생성 처리가 가능했고, 실제 프로젝트에서 한번의 오류 없이 서비스를 가동할 수 있었습니다.