Kafka Consumer Grafana 모니터링

dh·2025년 5월 6일
post-thumbnail

Kafka consumer와 메세지 전송 성공/실패 로그를 모니터링하기 위해 Loki, Prometheus, Grafana를 사용했다.

모니터링 docker-compse.yml

dokcer-compose.yml에 모니터링 관련 설정을 추가했다.

services:
  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
    networks:
      - app_network

  grafana:
    image: grafana/grafana:latest
    container_name: grafana
    ports:
      - "7770:3000"
    networks:
      - app_network

  loki:
    container_name: loki
    image: grafana/loki:2.9.2
    ports:
      - "7771:3100"
    volumes:
      - ./loki/config.yaml:/etc/loki/config.yaml
    command: -config.file=/etc/loki/config.yaml
    networks:
      - app_network

  kafka-exporter:
    image: danielqsj/kafka-exporter:latest
    ports:
      - "9308:9308"
    command: [ "--kafka.server=kafka:29092" ]
    depends_on:
      - kafka
    restart: unless-stopped
    networks:
      - app_network
networks:
  app_network:
    driver: bridge

Prometheus

kafka-exporter에서 수집한 kafka내부의 메트릭을 prometheus에서 수집하도록 prometheus.yml 파일 설정을 한다.

global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'prometheus'
    static_configs:
      - targets: ['localhost:9090']

  - job_name: 'alert-service'
    metrics_path: '/actuator/prometheus'
    scrape_interval: 5s
    static_configs:
      - targets: [ 'host.docker.internal:19015' ]

  - job_name: 'kafka'
    static_configs:
      - targets: [ 'kafka-exporter:9308' ]

Loki

메세지 전송 성공/실패에 관해 로그를 찍을 건데, Loki 로그 수집 서버의 동작 방식, 저장 위치, 수집 속도 등 loki/config.yml설정을 한다.

auth_enabled: false

server:
  http_listen_port: 3100
  grpc_listen_port: 9096

common:
  path_prefix: /tmp/loki
  storage:
    filesystem:
      chunks_directory: /tmp/loki/chunks
      rules_directory: /tmp/loki/rules
  replication_factor: 1
  ring:
    instance_addr: 127.0.0.1
    kvstore:
      store: inmemory

schema_config:
  configs:
    - from: 2020-10-24
      store: boltdb-shipper
      object_store: filesystem
      schema: v11
      index:
        prefix: index_
        period: 24h

ruler:
  alertmanager_url: http://localhost:9093

# 로그를 로컬 파일 시스템에 저장
storage_config:
  boltdb_shipper:
    active_index_directory: /tmp/loki/boltdb-shipper-active
    cache_location: /tmp/loki/boltdb-shipper-cache
    cache_ttl: 24h
    shared_store: filesystem
  filesystem:
    directory: /tmp/loki/chunks

compactor:
  working_directory: /tmp/loki/compactor
  shared_store: filesystem

limits_config:
  enforce_metric_name: false
  reject_old_samples: true
  reject_old_samples_max_age: 168h
  ingestion_rate_mb: 4
  ingestion_burst_size_mb: 6

chunk_store_config:
  max_look_back_period: 0s

table_manager:
  retention_deletes_enabled: false
  retention_period: 0s

query_range:
  results_cache:
    cache:
      embedded_cache:
        enabled: true
        max_size_mb: 100

로그 출력 방식과 Loki와 연동하기 위해 resources/logback-spring.xml설정을 한다.

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <!-- 기본 Spring Boot 로깅 설정을 포함 -->
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>
    <include resource="org/springframework/boot/logging/logback/console-appender.xml"/>

    <!-- 애플리케이션 이름 설정 (환경 변수나 속성에서 가져옴) -->
    <springProperty scope="context" name="appName" source="spring.application.name" defaultValue="alert-service"/>

    <!-- 로그 패턴 설정 (트레이스 ID와 스팬 ID 포함) -->
    <property name="CONSOLE_LOG_PATTERN"
              value="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) [${appName},%X{status:-}] %logger{36} - %msg%n"/>

    <!-- Loki 어펜더 설정 -->
    <appender name="LOKI" class="com.github.loki4j.logback.Loki4jAppender">
        <!-- Loki 서버 URL 설정 -->
        <http>
            <url>http://localhost:3100/loki/api/v1/push</url>
        </http>
        <!-- 로그 포맷 설정 -->
        <format>
            <label>
                <pattern>
                    app=${appName},host=${HOSTNAME},level=%level,trace_id=%X{traceId:-unknown},span_id=%X{spanId:-unknown},status=%X{status:-unknown}
                </pattern>
            </label>
            <message>
                <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            </message>
            <sortByTime>true</sortByTime>
        </format>
        <!-- 배치 설정: 로그를 얼마나 자주 Loki로 전송할지 결정 -->
        <batchProcessing>
            <batchSize>100</batchSize>
            <batchTimeoutMs>1000</batchTimeoutMs>
        </batchProcessing>
    </appender>

    <!-- 로컬 개발 환경을 위한 파일 로깅 설정 (선택 사항) -->
    <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>logs/alert-service.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>logs/alert-service.%d{yyyy-MM-dd}.log</fileNamePattern>
            <maxHistory>30</maxHistory>
        </rollingPolicy>
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            <charset>utf8</charset>
        </encoder>
    </appender>

    <!-- 로그 레벨 설정 -->
    <logger name="com._42195km" level="DEBUG"/>
    <logger name="org.hibernate.SQL" level="DEBUG"/>
    <logger name="org.hibernate.type.descriptor.sql.BasicBinder" level="TRACE"/>
    <logger name="org.springframework.transaction" level="DEBUG"/>
    <logger name="org.springframework.orm.jpa" level="DEBUG"/>

    <!-- 루트 로거 설정 -->
    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
        <appender-ref ref="LOKI"/>
        <appender-ref ref="FILE"/>
    </root>
</configuration>

로그를 찍을 때 MDC로 status를 설정해서 로그에 status가 찍히도록 한다.

MDC.put("status", "success");
log.info("전송 완료: {}", response);

logback에 설정한 패턴대로 로그가 찍힌다.

Grafana

kafka consumer를 모니터링 하기 위해 Grafana 대시보드에 7589(Kafka Exporter Overview)를 import한다. Kafka Consuemr Lag, Topic, Partion등을 모니터링할 수 있다.


Loki에서 수집한 로그를 시각화 하기 위해 판넬을 추가한다.
24시간 동안 성공/실패 로그가 몇번 나왔는지 시각화 했다.
Datasource를 Loki로 설정하고 수식을 작성한다. visualization은 stat으로 설정했다.

sum by(status) (count_over_time({app="alert-service", status="success"} [24h]))
sum by(status) (count_over_time({app="alert-service", status="retry"} [24h]))
sum by(status) (count_over_time({app="alert-service", status="DLT"} [24h]))

sum by(status) (count_over_time({app="alert-service", status=~"success|DLT|retry"} [10m]))
sum by(status) (count_over_time({app="alert-service", status=~"success|DLT|retry"} [24h]))

왼쪽은 10분동안 각 success, DLT, retry로그를 그래프로, 오른쪽은 파이차트로 표현했다.

0개의 댓글