이 글은 MSA 설계를 하면서 모니터링, 로깅을 중앙집중식 로깅 시스템을 구축하기 위한 단계를 기록한 것입니다.
| 여러 서버의 로그를 중앙저장소에 모은 뒤 대시보드를 통해 조회하는 방식으로 Kafka + ElasticSearch 구성을 생각했으나 변경될 수 있다. |
🚩 로깅 시스템 구축 이유
이전에 모놀리식으로 프로젝트를 진행하고 실무에서 사용할때는 로깅 시스템에 대해서 불편함을 느끼지 못했다. 이전 프로젝트에서는 SSH 접속을 통해 log를 조회하고 LogBack을 구현하여 원하는 구성으로 로그를 받고 각 모듈별, 로그 레벨별로 서버 로컬 저장소에 적재하는 방식을 사용했다. 로컬 저장소에 저장하다보니 로그 적재에 문제가 생길때면 서비스에 치명적인 에러가 발생하는 경우도 있었고 MSA 환경 프로젝트를 진행하면서는 서버 종류가 수가 늘어나고 그에 따른 로그를 관리하는 범위가 더 넓고 커진다고 생각하여 빠른 로그 대응과 적재, 수집 등 최적화할 수 있는 방법을 찾아 현재 서비스 구조에 맞는 로깅 시스템을 고민하여 아래와 같은 구성을 설계하게 되었다.
로깅 시스템의 목표 및 요구사항 정의
로깅 시스템을 설계하기 전에 다음과 같은 목표와 요구사항을 명확히 정의해야 합니다.
아래는 MSA 환경에서 로깅 시스템의 전반적인 아키텍처 개요입니다.
build.gradle 파일을 열고 필요한 의존성을 추가합니다.
spring-cloud-dependencies 버전은 최신 버전을 확인하여 사용하세요.
https://techblog.lycorp.co.jp/ko/how-to-migrate-to-spring-boot-3
https://easywritten.com/post/using-spring-boot-3-with-zipkin/
https://bluecheat.medium.com/java-spring-boot-3-zipkin-%ED%99%98%EA%B2%BD%EC%97%90%EC%84%9C-repository-tracing-%EC%B2%98%EB%A6%AC%ED%95%98%EA%B8%B0-7bc9fc3a49f3
https://engineering.linecorp.com/ko/blog/line-ads-msa-opentracing-zipkin
plugins {
id 'org.springframework.boot' version '3.1.2'
id 'io.spring.dependency-management' version '1.1.0'
id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '17'
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
// Logback Logstash Encoder
implementation 'net.logstash.logback:logstash-logback-encoder:7.3'
// Spring Cloud Sleuth for distributed tracing
springboot 3 에서 Spring Cloud Sleuth의 기능 Micrometer Tracing 프로젝트로 이관되면서(참고) Spring Cloud Sleuth가 Spring Cloud Release Train에서 제거됐습니다. Zipkin으로 트레이싱
//implementation 'org.springframework.cloud:spring-cloud-starter-sleuth:3.1.3'~~
// 프로젝트에서 Zipkin을 사용하지 않는 경우
implementation("io.micrometer:micrometer-tracing-bridge-brave") {
exclude(group = "io.zipkin.reporter2")
}
// 프로젝트에서 Zipkin을 사용하는 경우
// micrometer
implementation 'io.micrometer:micrometer-tracing-bridge-brave' // Micrometer와 Brave 사이의 연동을 위한 브리지 역할을 하는 라이브러리입니다.
implementation 'io.zipkin.reporter2:zipkin-reporter-brave'
// Kafka dependencies
implementation 'org.springframework.boot:spring-boot-starter-log4j2' // For Log4j2 Kafka appender if needed
implementation 'org.springframework.kafka:spring-kafka:3.0.9'
// Optional: Lombok for reducing boilerplate code
compileOnly 'org.projectlombok:lombok:1.18.24'
annotationProcessor 'org.projectlombok:lombok:1.18.24'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:2022.0.4"
}
}
tasks.named('test') {
useJUnitPlatform()
}
애플리케이션 프로퍼티 설정
src/main/resources/application.yml 파일을 생성하고 다음과 같이 설정합니다.
spring:
application:
name: user-service
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
logging:
level:
root: INFO
com.example: DEBUG
# Logstash 관련 설정 (필요 시)
logstash:
enabled: true
Logback 설정 파일 생성
src/main/resources/logback-spring.xml 파일을 생성하고 다음과 같이 구성합니다.
학습내용 https://velog.io/@may_yun/logback
설명:
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- Async Appender for non-blocking logging -->
<appender name="ASYNC_FLUENT" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="FLUENT" />
<queueSize>500</queueSize>
<discardingThreshold>0</discardingThreshold>
<neverBlock>true</neverBlock>
</appender>
<!-- Fluentd Appender to send logs to Kafka -->
<appender name="FLUENT" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>localhost:5000</destination> <!-- Kafka 플러그인 대신 Fluentd 사용 시 -->
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<customFields>{"service":"${spring.application.name}"}</customFields>
</encoder>
</appender>
<!-- Console Appender for local debugging -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<customFields>{"service":"${spring.application.name}"}</customFields>
<providers>
<timestamp>
<timeZone>UTC</timeZone>
</timestamp>
<pattern>
<pattern>
{
"traceId": "%X{X-B3-TraceId}",
"spanId": "%X{X-B3-SpanId}",
"userId": "%X{userId}"
}
</pattern>
</pattern>
</providers>
</encoder>
</appender>
<!-- Rolling File Appender -->
<appender name="ROLLING_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>logs/app.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>logs/app.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<customFields>{"service":"${spring.application.name}"}</customFields>
</encoder>
</appender>
<!-- Root Logger -->
<root level="INFO">
<appender-ref ref="ASYNC_FLUENT" />
<appender-ref ref="CONSOLE" />
<appender-ref ref="ROLLING_FILE" />
</root>
</configuration>
MDC를 활용하여 각 요청에 대한 추가 정보를 로그에 포함시킵니다. 이를 위해 필터를 생성합니다.
package com.yun.loggingservice.filter;
import jakarta.servlet.FilterChain;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.slf4j.MDC;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.filter.OncePerRequestFilter;
import java.io.IOException;
import java.util.UUID;
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class MDCLoggingFilter extends OncePerRequestFilter {
@Override
protected void doFilterInternal(HttpServletRequest request,
HttpServletResponse response,
FilterChain filterChain) throws ServletException, IOException {
try {
// 고유한 Trace ID 생성
String traceId = UUID.randomUUID().toString();
//MDC.put(): 로그에 포함시킬 컨텍스트 정보 설정
MDC.put("traceId", traceId);
// 사용자 ID 추출 (예: 헤더에서)
String userId = extractUserId(request);
if (userId != null) {
MDC.put("userId", userId);
} else {
MDC.put("userId", "anonymous");
}
// 요청 처리
filterChain.doFilter(request, response);
} finally {
// 요청 후 MDC 데이터 제거
// MDC.remove(): 요청 처리 후 MDC 데이터를 제거하여 메모리 누수를 방지
MDC.remove("traceId");
MDC.remove("userId");
}
}
// 요청에서 사용자 ID를 추출하는 메서드입니다. pre-api-secure 인증 시스템과 연동하여 실제 사용자 ID를 가져올 수 있습니다.
// 인증 서버와 로깅을 ..
private String extractUserId(HttpServletRequest request) {
// ex: 헤더에서 userId 추출
return request.getHeader("X-User-Id");
}
}
설명:
각 HTTP 요청에 대해 고유한 traceId를 생성하고, userId를 추출하여 MDC에 저장합니다.
요청 처리 후 MDC 정보를 제거하여 메모리 누수를 방지합니다.
Spring Boot 필터 등록
Spring Boot에서는 @Component 애노테이션을 사용하여 필터를 자동으로 등록할 수 있습니다. 위의 MDCFilter 클래스가 이를 담당합니다.
Kafka를 로컬에서 실행하거나, Docker를 사용하여 쉽게 배포할 수 있습니다. 여기서는 Docker Compose를 사용하여 Kafka와 Zookeeper를 설정하는 방법을 설명하겠습니다.
4.1.1. Docker Compose 파일 생성
프로젝트 루트 디렉토리에 docker-compose.yml 파일을 생성하고 다음과 같이 작성합니다.
version: '3.7'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.3.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Kafka Producer 설정
Spring Boot 애플리케이션에서 Kafka로 로그를 전송하기 위해 Kafka Producer 설정을 추가합니다. 여기서는 Logback을 통해 Kafka로 직접 로그를 전송하는 대신, Fluentd나 Logstash를 중간에 두는 방식을 권장합니다. 그러나 직접 Logback에서 Kafka로 로그를 전송하려면 Kafka Appender를 사용할 수 있습니다. 여기서는 Logstash를 중간에 두고, Logstash가 Kafka로부터 로그를 수신하도록 설정하는 예제를 계속 사용하겠습니다.
참고: Logback에서 직접 Kafka로 로그를 전송하는 것은 복잡할 수 있으므로, Fluentd나 Logstash를 사용하는 것이 일반적입니다.
Docker Compose에 ELK 추가
docker-compose.yml 파일에 Elasticsearch, Logstash, Kibana 서비스를 추가합니다.
version: '3.7'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.3.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.5.0
environment:
- discovery.type=single-node
- ES_JAVA_OPTS=-Xms512m -Xmx512m
- xpack.security.enabled=false
ports:
- "9200:9200"
volumes:
- esdata:/usr/share/elasticsearch/data
kibana:
image: docker.elastic.co/kibana/kibana:8.5.0
ports:
- "5601:5601"
environment:
ELASTICSEARCH_HOSTS: http://elasticsearch:9200
depends_on:
- elasticsearch
logstash:
image: docker.elastic.co/logstash/logstash:8.5.0
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
ports:
- "5000:5000" # Fluentd input port
- "9600:9600" # Logstash monitoring port
environment:
LS_JAVA_OPTS: "-Xmx256m -Xms256m"
depends_on:
- kafka
- elasticsearch
volumes:
esdata:
driver: local
logstash.conf 파일을 프로젝트 루트 디렉토리에 생성하고 다음과 같이 작성합니다.
input {
tcp {
port => 5000
codec => json
}
}
filter {
# 필요한 필터링 및 변환 로직 추가
# 예: 특정 필드 추가, 제거, 변환 등
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "msa-logs-%{+YYYY.MM.dd}"
user => "elastic" # 기본 사용자 (Elasticsearch 보안 활성 시)
password => "changeme" # 기본 비밀번호 (Elasticsearch 보안 활성 시)
}
stdout { codec => rubydebug } # 디버깅용 출력
}
보안이 활성화된 Elasticsearch를 사용하는 경우, 사용자 인증 정보를 설정해야 합니다. 위 예제에서는 보안을 비활성화한 상태를 가정합니다.
Logback 설정 수정 (Kafka 대신 Logstash 사용)
logback-spring.xml을 수정하여 Kafka 대신 Logstash로 로그를 전송하도록 설정합니다.
이전에 FLUENT Appender를 사용했지만, Kafka로 직접 로그를 보내려면 Logstash Input을 Kafka로 변경해야 합니다. 여기서는 간단하게 TCP를 통한 Logstash로의 전송을 계속 사용합니다.
참고: Kafka를 직접 사용하려면 Logstash에서 Kafka Input Plugin을 사용해야 합니다. 그러나 여기서는 Logstash가 TCP를 통해 로그를 수신하도록 설정했습니다.
Logstash Kafka Input 설정 (선택 사항)
만약 Kafka에서 직접 로그를 수신하려면 logstash.conf 파일을 다음과 같이 수정해야 합니다.
input {
kafka {
bootstrap_servers => "kafka:9092"
topics => ["logs"]
codec => json
}
}
filter {
# 필요한 필터링 및 변환 로직 추가
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "msa-logs-%{+YYYY.MM.dd}"
}
stdout { codec => rubydebug }
}
참고
- https://blog.barogo.io/%EA%B0%9C%EB%B0%9C%EC%9D%B8%ED%84%B4-%EB%A7%88%EC%9D%B4%ED%81%AC%EB%A1%9C-%EC%84%9C%EB%B9%84%EC%8A%A4-%EC%95%84%ED%82%A4%ED%85%8D%EC%B2%98%EC%9D%98-%EB%93%B1%EC%9E%A5%EA%B3%BC-%EB%A1%9C%EA%B9%85-%EA%B8%B0%EC%88%A0%EC%9D%98-%EB%B0%9C%EC%A0%84-f37106aecfc5
- https://medium.com/@minina1868/%EB%A1%9C%EA%B7%B8-%EA%B0%9C%EC%84%A0-%EA%B3%BC%EC%A0%95-devops-observability-1-logs-3baaf2699546
- https://netmarble.engineering/observability-logging-a/?hilite=%EB%A1%9C%EA%B7%B8