안녕하세요.
이번 포스팅에서는 최근 진행한 로그 분석 시스템 개선 프로젝트에 대해
상세히 공유하고자 합니다.

1. 프로젝트 배경 및 문제 상황

1.1 기존 시스템 구성

  • ELK Stack (Elasticsearch 7.10.2, Logstash 7.10.2, Kibana 7.10.2)
  • Spring Boot 2.5.x 기반 백엔드 서버
  • 일일 로그 데이터: 약 5천만 건 (약 50GB)
  • 실시간 이상 거래 탐지 시스템 운영 중

1.2 직면한 문제점들

1. 검색 성능 저하

  • 피크 시간대 평균 조회 시간: 3초 → 15초로 증가
  • 특정 복합 쿼리의 경우 30초 이상 소요
  • 실시간 모니터링 대시보드 지연 발생

2. 스토리지 비용 증가

  • 월 데이터 증가량: 약 1.5TB
  • 압축률 저조: 평균 압축률 2.1:1
  • 보관 기간 제한: 3개월 → 1개월로 축소

3. 운영 비용 증가

  • 월 클라우드 인프라 비용: 약 800만원
  • Elasticsearch 클러스터 운영을 위한 전담 인력 필요
  • 잦은 장애 대응으로 인한 개발 리소스 낭비

4. 이상 거래 탐지 정확도 저하

  • 오탐율(False Positive Rate): 15% 수준
  • 누락율(False Negative Rate): 8% 수준
  • 실시간 처리 지연으로 인한 대응 시간 증가

2. 해결 방안 검토 및 설계

2.1 기술 스택 선정

여러 대안을 검토한 결과, 다음과 같은 이유로 ClickHouse 도입을 결정했습니다.

  • 컬럼 기반 스토리지 구조로 인한 높은 압축률
  • 실시간 분석에 최적화된 쿼리 성능
  • SQL 친화적인 인터페이스
  • 머신러닝 모델 통합 용이성

2.2 시스템 아키텍처 설계

@Configuration
@EnableScheduling
public class LogAnalysisConfig {
    
    @Bean
    public ClickHouseDataSource clickHouseDataSource() {
        ClickHouseProperties properties = new ClickHouseProperties();
        properties.setSocketTimeout(300000);
        properties.setCompress(true);
        properties.setDecompress(true);
        properties.setUseServerTimeZone(false);
        properties.setUseTimeZone("Asia/Seoul");
        
        return new ClickHouseDataSource(
            "jdbc:clickhouse://localhost:8123/retail_logs",
            properties
        );
    }
    
    @Bean
    public ClickHouseTemplate clickHouseTemplate(ClickHouseDataSource dataSource) {
        return new ClickHouseTemplate(dataSource);
    }
    
    @Bean
    public ThreadPoolTaskExecutor logProcessorExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(8);
        executor.setQueueCapacity(10000);
        executor.setThreadNamePrefix("log-processor-");
        executor.initialize();
        return executor;
    }
}

2.3 데이터 모델 설계

CREATE TABLE retail_logs (
    event_timestamp DateTime64(3),
    user_id String,
    session_id String,
    event_type Enum8(
        'page_view' = 1,
        'add_to_cart' = 2,
        'purchase' = 3,
        'payment' = 4
    ),
    ip_address IPv4,
    device_info String,
    location_city String,
    amount Decimal64(2),
    payment_method String,
    transaction_id UUID,
    embedding_vector Array(Float32),
    risk_score Float32,
    
    -- 최적화를 위한 인덱스 설정
    INDEX idx_user_id user_id TYPE minmax GRANULARITY 4,
    INDEX idx_event_type event_type TYPE set(100) GRANULARITY 1,
    INDEX idx_embedding embedding_vector TYPE nndescent GRANULARITY 32
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_timestamp)
ORDER BY (event_timestamp, user_id)
SETTINGS index_granularity = 8192;

2.4 로그 수집 및 전처리 파이프라인

@Service
@Slf4j
public class LogIngestionService {
    private static final int BATCH_SIZE = 10_000;
    private final BlockingQueue<LogEvent> eventQueue = new LinkedBlockingQueue<>(100_000);
    private final ClickHouseTemplate clickHouseTemplate;
    private final ThreadPoolTaskExecutor executor;
    
    @Autowired
    public LogIngestionService(
        ClickHouseTemplate clickHouseTemplate,
        ThreadPoolTaskExecutor executor
    ) {
        this.clickHouseTemplate = clickHouseTemplate;
        this.executor = executor;
    }
    
    public void ingestLog(LogEvent event) {
        try {
            eventQueue.offer(event, 100, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("Failed to queue log event: {}", event, e);
            Thread.currentThread().interrupt();
        }
    }
    
    @Scheduled(fixedRate = 1000)
    public void processBatch() {
        List<LogEvent> batch = new ArrayList<>(BATCH_SIZE);
        eventQueue.drainTo(batch, BATCH_SIZE);
        
        if (!batch.isEmpty()) {
            executor.submit(() -> insertBatch(batch));
        }
    }
    
    private void insertBatch(List<LogEvent> events) {
        try {
            String sql = generateBatchInsertSQL(events);
            clickHouseTemplate.execute(sql);
            log.info("Successfully inserted batch of {} events", events.size());
        } catch (Exception e) {
            log.error("Failed to insert batch", e);
            // 실패한 이벤트 재처리 로직
            handleFailedEvents(events);
        }
    }
    
    private String generateBatchInsertSQL(List<LogEvent> events) {
        StringBuilder builder = new StringBuilder(1000);
        builder.append("INSERT INTO retail_logs FORMAT Values ");
        
        for (LogEvent event : events) {
            builder.append("(")
                   .append(event.getTimestamp().toEpochSecond(ZoneOffset.UTC)).append("000,")
                   .append("'").append(escapeString(event.getUserId())).append("',")
                   .append("'").append(escapeString(event.getSessionId())).append("',")
                   .append("'").append(event.getEventType().name()).append("',")
                   .append("'").append(event.getIpAddress()).append("',")
                   .append("'").append(escapeString(event.getDeviceInfo())).append("',")
                   .append("'").append(escapeString(event.getLocationCity())).append("',")
                   .append(event.getAmount()).append(",")
                   .append("'").append(escapeString(event.getPaymentMethod())).append("',")
                   .append("'").append(event.getTransactionId()).append("',")
                   .append(arrayToString(event.getEmbeddingVector())).append(",")
                   .append(event.getRiskScore())
                   .append("),");
        }
        
        return builder.substring(0, builder.length() - 1);
    }
    
    private String escapeString(String str) {
        return str.replace("'", "\\'").replace("\\", "\\\\");
    }
    
    private String arrayToString(float[] array) {
        return "[" + Arrays.stream(array)
                          .mapToObj(String::valueOf)
                          .collect(Collectors.joining(",")) + "]";
    }
}

3. AI 모델 통합 및 이상 거래 탐지

3.1 이상 거래 탐지 모델 구현

@Service
@Slf4j
public class AnomalyDetectionService {
    private final ClickHouseTemplate clickHouseTemplate;
    private final TransformerModel model;
    private final RiskScoreCalculator riskScoreCalculator;
    
    @Value("${anomaly.detection.threshold}")
    private double anomalyThreshold;
    
    @Autowired
    public AnomalyDetectionService(
        ClickHouseTemplate clickHouseTemplate,
        @Qualifier("fraudDetectionModel") TransformerModel model,
        RiskScoreCalculator riskScoreCalculator
    ) {
        this.clickHouseTemplate = clickHouseTemplate;
        this.model = model;
        this.riskScoreCalculator = riskScoreCalculator;
    }
    
    @Transactional(readOnly = true)
    public List<TransactionAnomaly> detectAnomalies(
        LocalDateTime startTime,
        LocalDateTime endTime
    ) {
        String sql = """
            SELECT 
                transaction_id,
                user_id,
                event_timestamp,
                amount,
                payment_method,
                device_info,
                location_city,
                embedding_vector,
                risk_score
            FROM retail_logs 
            WHERE event_timestamp BETWEEN {start} AND {end}
                AND event_type = 'payment'
                AND risk_score >= {threshold}
            ORDER BY risk_score DESC
            LIMIT 1000
            """;
            
        MapSqlParameterSource params = new MapSqlParameterSource()
            .addValue("start", startTime)
            .addValue("end", endTime)
            .addValue("threshold", anomalyThreshold);
            
        return clickHouseTemplate.query(sql, params, (rs, rowNum) -> {
            float[] vector = (float[]) rs.getArray("embedding_vector").getArray();
            double modelScore = model.predict(vector);
            double riskScore = riskScoreCalculator.calculateScore(
                modelScore,
                rs.getDouble("amount"),
                rs.getString("payment_method"),
                rs.getString("device_info"),
                rs.getString("location_city")
            );
            
            return new TransactionAnomaly(
                UUID.fromString(rs.getString("transaction_id")),
                rs.getString("user_id"),
                rs.getTimestamp("event_timestamp").toLocalDateTime(),
                riskScore,
                modelScore,
                buildAnomalyContext(rs)
            );
        });
    }
    
    private Map<String, Object> buildAnomalyContext(ResultSet rs) 
        throws SQLException {
        Map<String, Object> context = new HashMap<>();
        context.put("amount", rs.getBigDecimal("amount"));
        context.put("payment_method", rs.getString("payment_method"));
        context.put("device_info", rs.getString("device_info"));
        context.put("location_city", rs.getString("location_city"));
        return context;
    }
}

3.2 실시간 모니터링 API 구현

@RestController
@RequestMapping("/api/v1/anomalies")
@Slf4j
public class AnomalyDetectionController {
    private final AnomalyDetectionService anomalyService;
    
    @Autowired
    public AnomalyDetectionController(AnomalyDetectionService anomalyService) {
        this.anomalyService = anomalyService;
    }
    
    @GetMapping
    public ResponseEntity<AnomalyDetectionResponse> detectAnomalies(
        @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) 
        LocalDateTime startTime,
        
        @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) 
        LocalDateTime endTime
    ) {
        try {
            List<TransactionAnomaly> anomalies = 
                anomalyService.detectAnomalies(startTime, endTime);
                
            AnomalyDetectionResponse response = AnomalyDetectionResponse.builder()
                .anomalies(anomalies)
                .totalCount(anomalies.size())
                .startTime(startTime)
                .endTime(endTime)
                .build();
                
            return ResponseEntity.ok(response);
        } catch (Exception e) {
            log.error("Failed to detect anomalies", e);
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
        }
    }
}

4. 성능 측정 및 최적화 결과

4.1 쿼리 성능 개선

최적화 전후의 주요 쿼리 성능을 비교했습니다

1. 사용자별 거래 내역 조회

-- 최적화 전 (avg: 3.2초)
SELECT * 
FROM log_events 
WHERE user_id = ? 
  AND event_time >= ?
  AND event_type = 'purchase'
ORDER BY event_time DESC
LIMIT 100;

-- 최적화 후 (avg: 0.3초)
SELECT * 
FROM retail_logs 
FINAL 
WHERE user_id = ? 
  AND event_timestamp >= ?
  AND event_type = 'purchase'
ORDER BY event_timestamp DESC
LIMIT 100;

2. 이상 거래 탐지 쿼리

-- 최적화 전 (avg: 8.5초)
SELECT 
    transaction_id,
    user_id,
    amount,
    risk_score
FROM log_events
WHERE event_type = 'payment'
  AND
profile
에러가 나도 괜찮아 — 그건 내가 배우고 있다는 증거야.

0개의 댓글