Kafka 정리 - 2

유호준·2024년 1월 13일

Kafka

목록 보기
2/6

설계

  • 가상의 컨설팅 회사 설정
  • 센서는 모니터링하는 내부 장비 상태와 이 상태 이벤트를 지속적으로 제공하는 자전거 전체에 설치된다.

내장 기능

  • 카프카 커넥트의 용도는 자체 프로듀서와 컨슈머를 작성하지 않고 카프카 안팎으로의 데이터 이동을 돕는 것이다.
  • 가장 쉬운 방법은 독립 실행형 모드이다.
    - config 디럭터리 아래에서 아래 파일들을 찾을 수 있다.
    - connect-standalone.properties
    - connect-file-source.properties

파일 소스에 대해 커넥트 구성하기

name=alert-source
connector.class=FileStreamSource
tasks.max=1 # 독립 실행형으로 테스트
file=alert.txt # 이 파일을 모니터링
topic=kinaction_alert_connect # 데이터가 전송될 토픽

하기 전에 Kafka Connector를 설치해야한다.
https://docs.confluent.io/platform/7.5/connect/quickstart.html

bin/connect-standalone.sh config/connect-standalone.properties config/alert-standalone.properties

토픽 확인

  • 파일의 내용이 메시지로 온다

파일 싱크 커넥트 구성하기

name=alert-sink
connector.class=FileStreamSink
tasks.max=1 # 독립 실행형으로 테스트
file=alert-sink.txt # 이 파일을 모니터링
topic=kinaction_alert_connect # 데이터가 전송될 토픽
bin/connect-standalone.sh config/connect-standalone.properties config/alert-standalone.properties alert-sink.properties

결과

  • 메시지가 파일에 싱크된다.

데이터 사일로 다루기

  • 데이터와 그 데이터의 처리는 애플리케이션이 소유한다
    - 다른 사람이 사용하려면 소유자와 대화해야한다

복구 가능성

  • 카프카는 실패를 고려하고 설계되었다
  • 예상되는 실패는 통제가 가능하다
    - --fom-beginning으로토픽 처음부터 소비를 할수있다
    • 데이터 보존기간을 통해 계속해서 사용할 수 있다
  • 이벤트는 인스턴스의 소시에서 한번만 생성하기 떄문에 메시지 브로커는 소비 패턴에서 중요한 역할을 할 수 있다.
    - 큐잉 시스템의 구독자가 메시지를 읽은 후 브로커에서 제거되면 시스템에서 사라진다.
    - 로직의 결함이 발견되면 원래 이벤트가 발생하지 않기 때문에 이벤트의 처리에서 남아 있던 것을 사용해 데이터를 보정할 수 있는지 확인하는 분석이 필요하다.
    • 이벤트를 다시 처리할 수 있는 기회를 통해 데이터 손실이나 손상 없이 애플리케이션을 개선할 수 있다

데이터를 언제 변경해야 하는가?

  • 데이터는 카프카로 먼저 가져오는 것이 좋다
    - 오염되지 않은 형태로 사용할 수 있다
    - 실패가 발생해도 데이터를 리플레이 할 수 있다

카프카가 적합한 이유

  • 데이터베이스가 수직으로 확장하는데 비용이 많이든다
  • 지속적으로 생산되는 이벤트가 있다
    - 이러한 이벤트를 지속적으로 처리할 준비가 되어야 한다

설계에 대한 생각 시작점

  • 시스템이 메시지를 잃어도 괜찮은가
  • 어떤 방식으로 데이터를 그룹화해야 하는가
  • 특정순서로 데이터를 전달해야 하는가
  • 특정 항목의 마지막 값만 필요한가? 아니면 이력이 중요한가?
  • 얼마나 많은 컨슈머를 가질 것인가

사용자 데이터 요구 사항

  • 소비 서비스가 다운된 경우에도 메시지를 캡처할 수 있는 기능을 원한다
    - 원격 공장에서 다운된 경우 메시지가 완전히 삭제되지 않고 나중에 이벤트를 처리할 수 있도록 하고 싶다
  • 유지 관리가 중단되거나 오류가 발생한 후 다시 시작될 떄 필요한 데이터가 계속 유지되기를 바란다
  • 이전 정보와 함께 센서의 얼럿 상태 기록도 유지하려고 한다
  • 센서에 대해 업데이트 하거나 쿼리를 푸시하는 모든 사용자의 감사 로그를 유지하려고 한다

상위 수준 계획

감사 데이터

  • 관리 API에 들어오는 모든 이벤트를 캡처해야할 것 같다
  • 접근 권한이 있는 사용자만 센서에 대해 작업을 수행할 수 있다
  • 모든 이벤트가 없으면 감사가 완료되지 않으므로 메시지를 잃지 않아야 한다
  • 이벤트를 독립적으로 처리할 수 있으므로 그룹화 키가 필요하지는 않다
  • 자체 타임스탭프가 있기 때문에 순서가 중요하지는 않다
  • 감사 프로듀서는 데이터가 손실되지 않도록 하고 소비하는 애플리케이션이 데이터의 순서나 조정에 대한 걱정이 없도록 해야한다.

데이터 형식

데이터를 위한 계획

  • 카프카를 사용해 얻을 수 있는 중요한 이점 중 하나는 프로듀서와 컨슈머가 서로 직접적으로 연결되어 있지 않다는 점이다
  • 스키마를 사용해 애플리케이션 개발자가 데이터의 구조와 의도를 이해할 수 있는 방법을 제공한다
{
	"type": "record",
	"name": "kinaction_libraryCheckout",
    "fields": [{...}]
}

의존성 설정

  • 에이브로는 스키마를 사용해 데이터를 직렬화한다
  • 시간이 지남에 따라 변경되는 스키마를 처리하는 규칙도 적용할 수 있다

직렬화 & 역직렬화

				.... 중략 ....
//직렬화
properties.put("value.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer")
//역직렬화
properties.put("value.deserializer","io.confluent.kafka.serializers.KafkaAvroDeserializer")

설정

plugins {
   kotlin("jvm") version "1.9.0"
   id("com.github.davidmc24.gradle.plugin.avro") >version "1.9.1"
}

dependencies {
  implementation("org.apache.avro:avro:1.11.3")
  implementation("io.confluent:kafka-avro-> serializer:7.5.1") 
}

avro {
   isCreateSetters.set(true)
   isCreateOptionalGetters.set(false)
   isGettersReturnOptional.set(false)
   isOptionalGettersForNullableFieldsOnly.set(false)
   fieldVisibility.set("PUBLIC")
   outputCharacterEncoding.set("UTF-8")
   stringType.set("String")
   templateDirectory.set(null as String?)
   isEnableDecimalLogicalType.set(true)
}

tasks.withType(JavaCompile::class).configureEach > {
   options.encoding = "UTF-8"
}

tasks {
   register("generateAvro", > GenerateAvroJavaTask::class){
       source("src/avro")
       setOutputDir(file("src/main/avro"))
   }
}

profile
포트폴리오 - https://drive.google.com/file/d/152OM9p7JQorjUfvR4BaxqGuP5xtQ8-fM/view?usp=sharing

0개의 댓글