1편에서는 대용량 교통 데이터 처리 시스템의 배경과 아키텍처 설계에 대해 살펴보았다. 이번 편에서는 실제 구현 과정에서 만난 문제점들과 그 해결 방법, 성능 최적화에 대해 이야기하고자 한다.
새로운 아키텍처를 구현하고 운영하면서 몇 가지 중요한 문제점을 발견했다.
처음 시스템을 구축했을 때, 다음과 같은 로그를 발견했다.
25/03/20 10:19:07 INFO SparkAggregator: 원본 데이터 샘플:
+--------------------+-----------+-------+---------------+-------------+-----+
| create_date|create_hour|link_id|road_mgr_agency|traffic_speed|count|
+--------------------+-----------+-------+---------------+-------------+-----+
|"20250315","0000"...| null| null| null| null| null|
CSV 파일이 공백으로 구분되어 있었지만, Spark의 기본 구분자는 쉼표(,)다. 또한 데이터 내에 큰따옴표가 포함되어 있었다.
데이터 처리 자체는 빠르게 완료되었지만, 데이터베이스 저장 단계에서 심각한 병목 현상이 발생했다.
SQL : UPDATE szms.spark_job_status SET status = 'PROCESSING', progress = 80, message = '시간별 집계 완료. 140728383개 집계 레코드 생성됨'
...
SQL : UPDATE szms.spark_job_status SET status = 'COMPLETED', progress = 100, message = '작업이 성공적으로 완료되었습니다'
2시간 7분이 소요된 작업(1654a3ce-d17e-4194-b7d6-612ce88e3e16)에서 데이터베이스 저장 단계만 1시간 40분이 소요되었다.
데이터 로드 중 다음과 같은 타입 변환 오류가 발생했다.
ERROR: invalid input syntax for type integer: "116.12903225806451"
Where: COPY vs_its_hourly, line 2, column count: "116.12903225806451"
PostgreSQL 테이블의 count 열이 정수형(integer)으로 정의되어 있는데, CSV에는 소수점 값이 포함되어 있었다.
SparkAggregator 클래스의 CSV 파일 읽기 부분을 다음과 같이 수정했다.
// 스키마 정의: 명시적으로 각 컬럼 타입 지정
StructType schema = new StructType()
.add("create_date", DataTypes.StringType) // 날짜 형식 (20250315)
.add("create_hour", DataTypes.IntegerType) // 시간 (0-23)
.add("link_id", DataTypes.StringType) // 링크 ID (과학적 표기법 포함 가능)
.add("road_mgr_agency", DataTypes.StringType) // 도로관리기관
.add("traffic_speed", DataTypes.DoubleType) // 속도
.add("count", DataTypes.IntegerType); // 카운트
// 공백 구분자로 CSV 파일 읽기
Dataset<Row> rawData = spark.read()
.option("delimiter", " ") // 공백 구분자 사용
.option("inferSchema", "false") // 스키마 자동 추론 비활성화
.option("header", "false") // 헤더 없음
.option("quote", "\"") // 따옴표 처리
.option("escape", "\"") // 이스케이프 문자
.option("mode", "PERMISSIVE") // 오류 허용 모드
.schema(schema) // 위에서 정의한 스키마 적용
.csv(extractedFile.toString());
이러한 수정을 통해 데이터가 정상적으로 파싱되어 처리되기 시작했다.
DB 저장 성능 최적화를 위해 여러 단계적 접근 방식을 적용했다.
처음에는 JDBC 설정과 Spark의 병렬 처리 능력을 활용한 기본적인 최적화를 시도했다:
// 병렬 저장을 위한 설정
hourlyData
.repartition(10, functions.col("create_date"), functions.col("link_id")) // 파티션 키로 분산
.write()
.mode(SaveMode.Append)
.option("numPartitions", 10) // 병렬 연결 수
.jdbc(dbUrl, tableName, connectionProperties);
// 대량 삽입을 위한 JDBC 연결 속성 설정
Properties connectionProperties = new Properties();
connectionProperties.put("user", dbUser);
connectionProperties.put("password", dbPassword);
connectionProperties.put("driver", "org.postgresql.Driver");
connectionProperties.put("reWriteBatchedInserts", "true"); // PostgreSQL 배치 삽입 최적화
connectionProperties.put("batchsize", "10000"); // 더 큰 배치 크기
하지만 이러한 최적화로는 저장 시간이 1시간 40분에서 약 한시간으로만 단축되었지만 여전히 너무 오래 걸렸다. 특히 원격 데이터베이스의 경우 네트워크 병목이 주요 원인이었다. 특히 제일 오래걸리는 이유가 스파크 하나당 8GB 램을 할당했기때문에 한번에 전송할 수 있는 양이 제한되어 있었고, 이는 수많은 JDBC 통신 > 인덱스생성의 반복으로 이어졌기 때문이다.
아까 2번 문제를 자세히 살펴보면, 테이블 크기와 관련하여 흥미로운 발견이 있었다. 원본 데이터는 CSV 형태로 약 6GB이지만, 데이터베이스에 저장 시 인덱스를 포함하면 16GB 이상으로 증가했다. 인덱스가 데이터베이스 크기에 미치는 영향을 분석한 결과
이런 문제점들이 있었다.
데이터 로드 성능을 더욱 개선하기 위해 인덱스 비동기 생성 방식을 도입했다.
CREATE TABLE IF NOT EXISTS tems_test.vs_its_hourly (
create_date DATE NOT NULL,
create_hour INTEGER NOT NULL,
link_id VARCHAR NOT NULL,
-- 다른 컬럼들...
);
CompletableFuture.runAsync(() -> {
try (Connection conn = DriverManager.getConnection(dbUrl, dbUser, dbPassword);
Statement stmt = conn.createStatement()) {
logger.info("백그라운드에서 인덱스 생성 시작...");
// 기본키 생성
stmt.execute("ALTER TABLE " + tableName +
" ADD PRIMARY KEY (create_date, create_hour, link_id)");
// 인덱스 생성
stmt.execute("CREATE INDEX idx_vs_its_hourly_date_hour ON " +
tableName + " (create_date, create_hour)");
stmt.execute("CREATE INDEX idx_vs_its_hourly_link_id ON " +
tableName + " (link_id)");
logger.info("모든 인덱스 생성 완료");
} catch (SQLException e) {
logger.error("인덱스 생성 중 오류 발생", e);
}
});
이러한 접근 방식은 초기 데이터 로드 속도를 크게 향상시키면서도, 나중에 인덱스를 통한 쿼리 성능 이점을 얻을 수 있게 했다.
PostgreSQL 서버 설정 최적화를 통해 대량 삽입 성능을 추가로 개선했다
-- PostgreSQL 임시 최적화 설정 (작업 전/후 적용)
SET maintenance_work_mem = '1GB';
SET synchronous_commit = OFF;
SET wal_buffers = '16MB';
SET checkpoint_timeout = '30min';
SET max_wal_size = '4GB';
데이터 파싱 신중히 처리하기
단계적 성능 최적화
분산 시스템 설계 고려사항
데이터베이스 최적화 기법
플랫폼 종속성 이해와 해결
데이터 파티셔닝 및 인덱싱 전략
데이터 압축 및 저장 형식 최적화
비동기 인덱스 생성의 DB 커넥션 관리

결론적으로 일반 배치였다면 40~50분 걸리는 작업을 20~30분 걸리는 작업으로 단축 시킬 수 있었다.
전자정부 시스템에서 교통 데이터 처리를 위해 Kafka와 Spark를 도입한 이번 시도는 다양한 기술적 도전과 학습 기회가 되었다. 처음에는 데이터 파싱 문제와 데이터베이스 저장 성능 이슈라는 두 가지 주요 어려움에 직면했지만, 단계적인 문제 해결 접근 방식을 통해 안정적이고 효율적인 시스템을 구축할 수 있었다.
특히 Spark의 분산 처리 능력과 Kafka의 메시지 큐 기능을 활용한 아키텍처는 기존 배치 시스템의 한계를 극복하고, 대용량 데이터를 더 유연하게 처리할 수 있는 기반을 마련했다. 데이터베이스 저장 성능 최적화 과정에서 얻은 교훈, 특히 PostgreSQL COPY 명령의 활용과 인덱스 비동기 생성 전략 부분쪽은 다른 유사한 대용량 데이터 처리 시스템에도 적용할 수 있는 귀중한 지식이 되었다.
이번 최적화를 직접 고민해보고 공부하면서 구축해보니 시간이 가는줄 몰랐고, 너무 재밌었다.
개발자 하기 잘한거 같다..
향후에는 클러스터 자원 활용 최적화, 그리고 더 정교한 데이터 관리 전략을 통해 시스템을 계속 발전시켜 나갈 계획이다.실제 운영에 적용된다면 말이다
이번 경험이 앞으로의 다른 프로젝트들에서의 문제 해결에 많은 도움이 될 것 같다.