
안녕하세요.
이번 포스팅에서는 최근 진행한 로그 분석 시스템 개선 프로젝트에 대해
상세히 공유하고자 합니다.
여러 대안을 검토한 결과, 다음과 같은 이유로 ClickHouse 도입을 결정했습니다.
@Configuration
@EnableScheduling
public class LogAnalysisConfig {
@Bean
public ClickHouseDataSource clickHouseDataSource() {
ClickHouseProperties properties = new ClickHouseProperties();
properties.setSocketTimeout(300000);
properties.setCompress(true);
properties.setDecompress(true);
properties.setUseServerTimeZone(false);
properties.setUseTimeZone("Asia/Seoul");
return new ClickHouseDataSource(
"jdbc:clickhouse://localhost:8123/retail_logs",
properties
);
}
@Bean
public ClickHouseTemplate clickHouseTemplate(ClickHouseDataSource dataSource) {
return new ClickHouseTemplate(dataSource);
}
@Bean
public ThreadPoolTaskExecutor logProcessorExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(10000);
executor.setThreadNamePrefix("log-processor-");
executor.initialize();
return executor;
}
}
CREATE TABLE retail_logs (
event_timestamp DateTime64(3),
user_id String,
session_id String,
event_type Enum8(
'page_view' = 1,
'add_to_cart' = 2,
'purchase' = 3,
'payment' = 4
),
ip_address IPv4,
device_info String,
location_city String,
amount Decimal64(2),
payment_method String,
transaction_id UUID,
embedding_vector Array(Float32),
risk_score Float32,
-- 최적화를 위한 인덱스 설정
INDEX idx_user_id user_id TYPE minmax GRANULARITY 4,
INDEX idx_event_type event_type TYPE set(100) GRANULARITY 1,
INDEX idx_embedding embedding_vector TYPE nndescent GRANULARITY 32
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_timestamp)
ORDER BY (event_timestamp, user_id)
SETTINGS index_granularity = 8192;
@Service
@Slf4j
public class LogIngestionService {
private static final int BATCH_SIZE = 10_000;
private final BlockingQueue<LogEvent> eventQueue = new LinkedBlockingQueue<>(100_000);
private final ClickHouseTemplate clickHouseTemplate;
private final ThreadPoolTaskExecutor executor;
@Autowired
public LogIngestionService(
ClickHouseTemplate clickHouseTemplate,
ThreadPoolTaskExecutor executor
) {
this.clickHouseTemplate = clickHouseTemplate;
this.executor = executor;
}
public void ingestLog(LogEvent event) {
try {
eventQueue.offer(event, 100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Failed to queue log event: {}", event, e);
Thread.currentThread().interrupt();
}
}
@Scheduled(fixedRate = 1000)
public void processBatch() {
List<LogEvent> batch = new ArrayList<>(BATCH_SIZE);
eventQueue.drainTo(batch, BATCH_SIZE);
if (!batch.isEmpty()) {
executor.submit(() -> insertBatch(batch));
}
}
private void insertBatch(List<LogEvent> events) {
try {
String sql = generateBatchInsertSQL(events);
clickHouseTemplate.execute(sql);
log.info("Successfully inserted batch of {} events", events.size());
} catch (Exception e) {
log.error("Failed to insert batch", e);
// 실패한 이벤트 재처리 로직
handleFailedEvents(events);
}
}
private String generateBatchInsertSQL(List<LogEvent> events) {
StringBuilder builder = new StringBuilder(1000);
builder.append("INSERT INTO retail_logs FORMAT Values ");
for (LogEvent event : events) {
builder.append("(")
.append(event.getTimestamp().toEpochSecond(ZoneOffset.UTC)).append("000,")
.append("'").append(escapeString(event.getUserId())).append("',")
.append("'").append(escapeString(event.getSessionId())).append("',")
.append("'").append(event.getEventType().name()).append("',")
.append("'").append(event.getIpAddress()).append("',")
.append("'").append(escapeString(event.getDeviceInfo())).append("',")
.append("'").append(escapeString(event.getLocationCity())).append("',")
.append(event.getAmount()).append(",")
.append("'").append(escapeString(event.getPaymentMethod())).append("',")
.append("'").append(event.getTransactionId()).append("',")
.append(arrayToString(event.getEmbeddingVector())).append(",")
.append(event.getRiskScore())
.append("),");
}
return builder.substring(0, builder.length() - 1);
}
private String escapeString(String str) {
return str.replace("'", "\\'").replace("\\", "\\\\");
}
private String arrayToString(float[] array) {
return "[" + Arrays.stream(array)
.mapToObj(String::valueOf)
.collect(Collectors.joining(",")) + "]";
}
}
@Service
@Slf4j
public class AnomalyDetectionService {
private final ClickHouseTemplate clickHouseTemplate;
private final TransformerModel model;
private final RiskScoreCalculator riskScoreCalculator;
@Value("${anomaly.detection.threshold}")
private double anomalyThreshold;
@Autowired
public AnomalyDetectionService(
ClickHouseTemplate clickHouseTemplate,
@Qualifier("fraudDetectionModel") TransformerModel model,
RiskScoreCalculator riskScoreCalculator
) {
this.clickHouseTemplate = clickHouseTemplate;
this.model = model;
this.riskScoreCalculator = riskScoreCalculator;
}
@Transactional(readOnly = true)
public List<TransactionAnomaly> detectAnomalies(
LocalDateTime startTime,
LocalDateTime endTime
) {
String sql = """
SELECT
transaction_id,
user_id,
event_timestamp,
amount,
payment_method,
device_info,
location_city,
embedding_vector,
risk_score
FROM retail_logs
WHERE event_timestamp BETWEEN {start} AND {end}
AND event_type = 'payment'
AND risk_score >= {threshold}
ORDER BY risk_score DESC
LIMIT 1000
""";
MapSqlParameterSource params = new MapSqlParameterSource()
.addValue("start", startTime)
.addValue("end", endTime)
.addValue("threshold", anomalyThreshold);
return clickHouseTemplate.query(sql, params, (rs, rowNum) -> {
float[] vector = (float[]) rs.getArray("embedding_vector").getArray();
double modelScore = model.predict(vector);
double riskScore = riskScoreCalculator.calculateScore(
modelScore,
rs.getDouble("amount"),
rs.getString("payment_method"),
rs.getString("device_info"),
rs.getString("location_city")
);
return new TransactionAnomaly(
UUID.fromString(rs.getString("transaction_id")),
rs.getString("user_id"),
rs.getTimestamp("event_timestamp").toLocalDateTime(),
riskScore,
modelScore,
buildAnomalyContext(rs)
);
});
}
private Map<String, Object> buildAnomalyContext(ResultSet rs)
throws SQLException {
Map<String, Object> context = new HashMap<>();
context.put("amount", rs.getBigDecimal("amount"));
context.put("payment_method", rs.getString("payment_method"));
context.put("device_info", rs.getString("device_info"));
context.put("location_city", rs.getString("location_city"));
return context;
}
}
@RestController
@RequestMapping("/api/v1/anomalies")
@Slf4j
public class AnomalyDetectionController {
private final AnomalyDetectionService anomalyService;
@Autowired
public AnomalyDetectionController(AnomalyDetectionService anomalyService) {
this.anomalyService = anomalyService;
}
@GetMapping
public ResponseEntity<AnomalyDetectionResponse> detectAnomalies(
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
LocalDateTime startTime,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
LocalDateTime endTime
) {
try {
List<TransactionAnomaly> anomalies =
anomalyService.detectAnomalies(startTime, endTime);
AnomalyDetectionResponse response = AnomalyDetectionResponse.builder()
.anomalies(anomalies)
.totalCount(anomalies.size())
.startTime(startTime)
.endTime(endTime)
.build();
return ResponseEntity.ok(response);
} catch (Exception e) {
log.error("Failed to detect anomalies", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
}
최적화 전후의 주요 쿼리 성능을 비교했습니다
-- 최적화 전 (avg: 3.2초)
SELECT *
FROM log_events
WHERE user_id = ?
AND event_time >= ?
AND event_type = 'purchase'
ORDER BY event_time DESC
LIMIT 100;
-- 최적화 후 (avg: 0.3초)
SELECT *
FROM retail_logs
FINAL
WHERE user_id = ?
AND event_timestamp >= ?
AND event_type = 'purchase'
ORDER BY event_timestamp DESC
LIMIT 100;
-- 최적화 전 (avg: 8.5초)
SELECT
transaction_id,
user_id,
amount,
risk_score
FROM log_events
WHERE event_type = 'payment'
AND