
시스템이 복잡해 짐에 따라 데이터를 보내고 받는 시스템 간의 관계가 더욱 복잡해짐. protocol, data format, data schema 등이 다 다르다면 복잡도는 더욱 올라가게 되었음.
이를 해결하기 위해 링크드인에서 2011년에 Kafka 를 개발하였음.

위 아키텍처는 시스템간 의존도를 떨어뜨려 모두 Kafka 를 통해 데이터를 주고받게 되어있음.
특화 프로젝트 - 빅데이터 분산 (데이터 스트림 처리)

카프카를 통해 Youtube Url 을 Spark 로 쏴주고, Spark 에서 데이터 분산 분석을 진행한 뒤 다시 카프카로 Spring 에 전송.
public void sendYoutubeUrl(String url) {
try (KafkaProducer<String, String> kafkaProducer = createKafkaProducer()) {
ProducerRecord<String, String> record = new ProducerRecord<>(youtubeUrlTopic, url);
RecordMetadata metadata = kafkaProducer.send(record).get();
System.out.printf("Produced record (key=%s, value=%s) meta(partition=%d, offset=%d)%n",
record.key(), record.value(), metadata.partition(), metadata.offset());
} catch (Exception e) {
e.printStackTrace();
}
}
public Payload consumeYoutubeAnalyze() {
try (KafkaConsumer<String, String> kafkaConsumer = createKafkaConsumer()) {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String value = record.value();
System.out.printf("Received record (key=%s, value=%s, partition=%d, offset=%d)%n",
record.key(), value, record.partition(), record.offset());
// 메시지 처리
JSONObject jsonObject = new JSONObject(value);
List<Payload.Comment> commentList = Arrays.stream(jsonObject.getJSONArray("comment_df").toList().toArray())
.map(obj -> {
JSONObject commentObj = new JSONObject((String) obj);
return new Payload.Comment(
commentObj.getString("id"),
commentObj.getString("comments"),
commentObj.getInt("likes"),
commentObj.getInt("dislikes"),
commentObj.getDouble("sentiment"),
commentObj.getInt("label")
);
})
.collect(Collectors.toList());
List<Payload.AnalyzeResult> resultList = Arrays.stream(jsonObject.getJSONArray("cnt_df").toList().toArray())
.map(obj -> {
JSONObject resultObj = new JSONObject((String) obj);
return new Payload.AnalyzeResult(
resultObj.getInt("label"),
resultObj.getInt("count"),
resultObj.getDouble("ratio")
);
})
.collect(Collectors.toList());
JSONObject videoInfoJson = jsonObject.getJSONObject("video_info");
Payload.VideoInfo videoInfo = new Payload.VideoInfo(
videoInfoJson.getString("channel_title"),
videoInfoJson.getString("subscriber_count"),
videoInfoJson.getString("comment_count"),
videoInfoJson.getString("like_count"),
videoInfoJson.getString("title"),
videoInfoJson.getString("view_count")
);
Payload payload = new Payload(commentList, resultList, videoInfo);
// 잘 들어갔는지 확인하려고 출력해봄
ObjectMapper mapper = new ObjectMapper();
String jsonString = mapper.writeValueAsString(payload);
System.out.println(jsonString);
return payload;
}
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
Spark Streaming
consumer = KafkaConsumer(
'youtube_url',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest', # earliest or latest
enable_auto_commit=True,
group_id=None
)
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:
json.dumps(x).encode('utf-8'))
자율 프로젝트 - MSA

public NoticeRequest sendUserNotice(String promptUuid, String crntMemberUuid) {
Prompt prompt = promptRepository.findByPromptUuid(UUID.fromString(promptUuid))
.orElseThrow(PromptNotFoundException::new);
NoticeRequest newNotice = new NoticeRequest(crntMemberUuid,
"사용해본 프롬프트를 평가하세요 : " + prompt.getTitle(),
"평가 url");
kafkaProducer.sendNotification("send-notification", newNotice);
return newNotice;
}
SSAFY GPT - Kafka Connect

Kafka Connect 를 사용하여 MySQL (소스 DB) 에 변경이 일어날 때 마다 자동으로 감지하여 ES와 정합성을 맞추어줌. (코드 작성 불필요)
{
"name": "prompt-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "labgptbe.p.ssafy.io",
"database.port": "3306",
"database.user": "ssafy",
"database.password": "ssafy",
"database.server.id": "102132",
"database.server.name": "GPT_EXTENSION",
"connectionTimeZone": "Asia/Seoul",
"db.timezone": "Asia/Seoul",
"database.whitelist": "ssafyv2",
"table.include.list": "ssafyv2.prompt",
"topic.prefix": "ssafyv2-mysql",
"include.schema.changes": "false",
"schema.history.internal.kafka.topic": "dbhistory.prompt",
"schema.history.internal.kafka.bootstrap.servers": "kafka1:9092,kafka2:9092,kafka3:9092",
"database.history.kafka.bootstrap.servers": "kafka1:9092,kafka2:9092,kafka3:9092",
"snapshot.mode": "when_needed",
"binary.handling.mode": "base64",
"database.history.kafka.topic": "dbhistory.prompt",
"transforms": "unwrap,convertPromptUuid,convertMemberUuid",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode":"rewrite",
"transforms.convertPromptUuid.type": "com.github.cjmatta.kafka.connect.smt.InsertUuid$Value",
"transforms.convertPromptUuid.uuid.field.name": "prompt_uuid",
"transforms.convertMemberUuid.type": "com.github.cjmatta.kafka.connect.smt.InsertUuid$Value",
"transforms.convertMemberUuid.uuid.field.name": "member_uuid"
}
}
{
"name": "es-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elk-elasticsearch-1:9200",
"connection.username": "elastic",
"connection.password": "gptextension",
"connectionTimeZone": "Asia/Seoul",
"tasks.max": "1",
"topics": "ssafyv2-mysql.ssafyv2.prompt",
"type.name": "_doc",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true",
"transforms": "ExtractField, unwrap",
"transforms.ExtractField.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.ExtractField.field": "prompt_id",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"behavior.on.null.values": "IGNORE",
"db.timezone": "Asia/Seoul"
}
}



브로커는 모든 토픽과 파티션에 대한 메타데이터를 주키퍼에서 읽어야 하며, 메타데이터의 업데이트는 주키퍼에서 동기방식으로 일어나고, 브로커에는 비동기방식으로 전달되었습니다.
이 과정에서 메타데이터의 불일치가 발생할 수도 있으며, 컨트롤러 재식자 시 모든 메타데이터를 주키퍼로부터 읽어야 하는 것도 부담이었습니다.
특히 토픽과 파티션이 많은 대규모 카프카 클러스터에서는 오랜 시간이 걸리는 등 병목 현상이 발생할 수 있습니다.
주키퍼와 카프카는 완전히 서로 다른 애플리케이션으로, 서로 다른 구성 파일, 환경, 서비스 데몬을 가지고 있습니다.
결국 관리자는 동시에 서로 다른 애플리케이션을 운영해야 합니다. 동시에 두 가지 애플리케이션을 운영한다는 것은 관리자에게 큰 부담이 됩니다.
예를 들어, 주키퍼의 릴리스 노트 확인, 버전 업그레이드, 구성 파일 변경과 동시에 카프카의 릴리즈 노트 확인, 버전 업그레이드, 구성 파일 변경을 해야 합니다.
모든 서비스는 모니터링이 필수이며, 주키퍼와 카프카 둘 다 모니터링을 해야 합니다.
두 애플리케이션은 서로 다른 애플리케이션이므로, 모니터링을 적용하는 방법과 각 애플리케이션에서 보여주는 주요 메트릭도 다릅니다.
또한 모니터링에 필요한 필수 메트릭을 이해하고 모니터링하는 방법까지 완전히 다릅니다.
그 외에도 각 애플리케이션에서 빈번하게 발생하는 이슈 또는 장애 상황에 개별적으로 대처해야 하며,
두 애플리케이션 간 통신 이슈라도 발생한다면, 매우 곤혹스러울 것입니다.

