계약 금액 계산을 최적화 하는 과정을 적어봤습니다.
💡 특정 청구월에 대한 변동관리비 부과금액 계산 시 해당 사용월에 유효했던 계약들에 대한 부과금액을 일할 계산하여 데이터 생성.
재 계산 시에도 특정 계산 데이터에 대한 pk 를 기준으로 하지 않고, 특정 청구월에 대한 전체 재 계산을 해야하기에 업데이트 시에도 pk 를 확인하지 못함
이에 따라 특정 계산이 기존에 진행 됐는지 여부를 확인하고, 등록/수정 여부를 판단해서 저장해야함.
특정 계약의 일할 계산 시에는 해당 계약이 사용한 일자 마다 유효한 계약들의 계약 면적을 모두 합하여 모수로 두고, 해당 계약의 계약면적을 분자로 둬 해당 계약의 전체 변동 관리비에 대한 사용율을 구하여 계산함.
# 비즈니스 로직 구성
'특정 월의 일별 총 계약 면적 합 dict' = 특정_월의_일별_총_계약_면적_합_산정()
""" 데이터 예시
'특정 월의 일별 총 계약 면적 합 dict' = {
'2024-01-01': 1000,
'2024-01-02': 2000,
...
}
"""
'부과 금액' = 0
'해당 월 일별 사용 금액' = '해당 월 총 부과 금액' / '해당 월의 일수'
for '계약 공간' in '계약 공간 리스트':
for '사용 일' in '해당 계약 공간이 사용한 일자 리스트':
'부과 금액' += ('계약 공간'.'계약면적' / '특정 월의 일별 총 계약 면적 합 dict'['사용 일']) * '해당 월 일별 사용 금액'
# 이후 각 계산 내역에 대한 insert/update 처리 및 로그 insert
💡 테스트 환경
python 3.11.0
fastapi 0.88.0
aiomysql 0.2.0
db : 8.0.mysql_aurora.3.04.1
db spec : AWS RDS db.t3.medium (core 1, vCPU 2, 4GB RAM)
계약 1,000건에 대한 특정 변동관리비 1개의 특정 청구월에 대한 부과금액 계산 로직 실행 시 74초가 소요됨.
사용성에도 문제가 있고, 계산 로직이 진행되는 동안 해당 테이블에 락이 걸리는 것도 문제.
효율성을 개선해보자
💡 일자별 유효한 계약의 계약 면적의 합을 구해야함. 구하고자 하는 월의 총 일수를 구하고, 해당 일자에 대한 데이터 조회 후 일자 별 계약 면적 합 산정.
기존 로직에서는 일자별 유효한 계약의 계약면적 합을 구할 때, 해당 월의 일자 별로 반복문을 돌면서 해당 일자의 계약 면적을 구했습니다. 때문에 약 30번의 쿼리가 개별적으로 실행 되었죠.
num_days = 특정 월의 일자 수
daily_sum_of_계약면적 = {}
# 일자별로 일자를 파라미터로 넘겨 쿼리 실행
for day in range(1, num_days + 1):
specific_date = f'{year_month}-{day:02}' # ex) "2024-04-01"
# 특정 일의 계약면적 합 조회
result = await db.select_one(id=xxx, param={..., 'specificDate': specific_date})
daily_sum_of_계약면적.update({specific_data: result.get('계약면적 합'})
->
daily_sum_of_계약면적 = {
'2024-04-01': 100,
'2024-04-02': 200,
...
}
-- 특정 일자의 특정 동의 게약면적 합산
SELECT
SUM(cs.contract_area) as sum_contract_area
FROM `계약공간` as cs
INNER JOIN `계약` as c
ON cs.contract_no = c.contract_no
...
WHERE
...
AND
-- 특정 일에 해당 계약이 유효한 날짜 였는지에 따른 조건
c.계약시작일<= '2024-04-28'
AND
c.계약종료일 >= '2024-04-28'
;
이 로직을 한 번의 쿼리로 실행할 수 있다면 매번 발생했던 30번의 네트워크 비용이 줄어 들겠죠?
이를 가능하게 해주는게 Recursive Common Table Expression (Recursive CTE) 문법 입니다.
💡 Common Table Expression(CTE)?
A common table expression (CTE) is a named temporary result set that exists within the scope of a single statement and that can be referred to later within that statement, possibly multiple times. The following discussion describes how to write statements that use CTEs.
하나의 쿼리를 실행시키는 scope 내에서 임시로 만들어 사용하는 공용 테이블이라고 보면 됩니다.
출처 : mysql-CTE
CTE 를 이용해서 일자 데이터와 계산 대상이 되는 계약 공간 데이터에 대한 임시 테이블을 만들고, 이를 조합해 각 일자에 대한 계약 면적 합을 조회합니다.
변경 결과 해당 부분의 소요 시간은 0.9s에서 0.38s로 줄었습니다. (속도 약 42% 개선)
줄긴 줄었지만 전체적으로 보면 아직 티끌만큼 줄어든 수준 입니다.
-- 특정 일자의 특정 동의 게약면적 합산 with recursive
WITH RECURSIVE target_date AS ( -- recursive 하며 해당 월의 일자 데이터로 이루어진 임시 테이블 생성
SELECT CONCAT(%(prefixDate)s, '01') AS dt, 1 AS dayNum -- '2024-05-01'
UNION ALL
SELECT CONCAT(%(prefixDate)s, LPAD(dayNum + 1, 2, '0')), dayNum + 1
FROM target_date
WHERE dayNum < %(lastDay)s
)
, aggregated_contract_area AS ( -- 조회 타겟이 되는 계약공간 데이터 조회 후 임시 테이블 생성
SELECT
c.계약시작일,
c.계약종료일,
...
FROM 계약 공간 AS cs
INNER JOIN 계약 AS c ON cs.contract_no = c.contract_no
...
WHERE
...
)
SELECT -- 임시테이블을 조합하여 특정 일자에 유효했던 계약들을 구해서 합계 조회
td.dt, IFNULL(SUM(ac.계약면적), 0) AS sum_contract_area
FROM target_date td
LEFT JOIN aggregated_contract_area ac ON
ac.contract_start_date <= td.dt
AND ac.contract_end_date >= td.dt
GROUP BY td.dt
;
num_days = 특정 월의 일자 수
prefix_date = f"{year_month}-"
# 특정 일의 계약면접 합계 조회
sum_data_list = await db.select_list(
id = xxx,
param = {
...,
'prefixDate': prefix_date,
'lastDay': num_days,
}
)
for sum_data in sum_data_list:
daily_sum_of_contract_area_data.update({
sum_data.get('dt'): sum_data.get('sum_contract_area')
})
->
daily_sum_of_계약면적 = {
'2024-04-01': 100,
'2024-04-02': 200,
...
}
이번에는 비즈니스 로직을 일부 개선했습니다. 기존에는 개별 계약 공간 단위로 로직 처리를 해서 (계약 공간 수) x (필요 DB 쿼리 실행 수)
만큼의 쿼리가 실행되고 있었습니다.
예를 들어 아래처럼 로직이 이루어졌었습니다.
for '계약 공간' in '계약 공간 리스트':
# 계약 공간 별 계산 로직 진행
# [SELECT] 계약 공간의 기존 계산 내역이 있는지 조회
if 계산 내역 있으면:
# [UPDATE] 계산 내역 업데이트
else:
# [INSERT] 계산 내역 등록
# [SELECT INSERT] pk 기준 조회 후 로그 등록
좀 더 효율적으로 처리하기 위해 위의 계약 공간 별 구분 된 로직을 되도록 한 번에 처리하기 위해 그룹을 지었습니다.
group_data = {}
'계약공간리스트' = []
##############
# 계산 로직 처리
##############
for '계약 공간' in '계약 공간 리스트':
# 계약 공간 별 계산 로직 진행
group_data['계약 공간'] = upsert_params # 계약 공간 별 insert, update 에 필요한 데이터를 key 조회를 할 수 있는 dictionary 에 모읍니다.
'계약공간리스트'.append('계약 공간'.pk)
##############
# DB 로직 처리
##############
'insert 대상 리스트' = []
'update 대상 리스트' = []
'계약공간 계산 내역 조회 리스트' = 조회()
for '계산 내역' in '계약공간 계산 내역 조회 리스트':
if '계산 내역'이 있으면:
'update 대상 리스트'.append(group_data[pk])
else:
'insert 대상 리스트'.append(group_data[pk])
if 'update 대상 리스트':
# update 처리
if 'insert 대상 리스트':
# bulk insert 처리
# [SELECT INSERT] log 처리
위의 코드처럼 계산 로직 부분과 DB 로직 부분을 1차로 분리하고, DB 로직 부분도 INSERT/UPDATE
용 데이터를 리스트에 담아놓은 뒤, 전체 데이터가 담긴 후 각각 한 번에 로직을 처리했습니다.
계약 공간 수 만큼 곱해졌던 DB 처리 수가 확연히 줄었습니다. 일단 계산 내역 조회도 N → 1건으로 줄었고, UPDATE
쿼리는 N → 3건, INSERT
쿼리는 N → 1건(으로 줄었어야 하는데, 이건 아래에서 따로 설명할게요.)이 되었습니다. 아직 log 처리에서는 개선이 없었구요.
UPDATE
관련해서는 부가적인 설명이 필요한데요. MYSQL 의 UPDATE
는 bulk 처리를 지원하지 않기 때문에 약간의 변칙으로 bulk 처리를 진행해야합니다.
UPDATE
문에 대한 일괄 처리(batch processing)가INSERT
문처럼 널리 최적화되지 않는 이유는 몇 가지 기술적 및 사용 사례의 차이 때문입니다.
- 최적화의 복잡성:
INSERT
연산은 새로운 데이터를 테이블에 추가하는 단순한 작업이며, 많은 데이터베이스 시스템들은 다중 행 삽입을 지원하여 이 과정을 효율적으로 만듭니다. 반면,UPDATE
연산은 조건에 따라 기존의 행을 찾아 해당 필드를 수정해야 합니다. 각각의UPDATE
연산은 다른 조건을 가질 수 있고, 수정할 행의 위치를 찾는 것이 많은 계산을 요구할 수 있습니다. 이로 인해 일괄 처리가 더 복잡해집니다.- 사용 사례의 차이: 대량의 데이터를 한 번에 삽입하는 경우는 매우 흔합니다(예: 로그 데이터, 배치 작업 등). 이에 비해 대량의 데이터를 한 번에
UPDATE
하는 경우는 상대적으로 드뭅니다. 대부분의 애플리케이션에서는UPDATE
작업이INSERT
작업보다 덜 빈번하게 발생하며, 일반적으로 더 세밀한 조건과 로직을 포함합니다.- 성능 고려사항:
executemany
와 같은 메서드를 사용하여UPDATE
문을 일괄 처리할 때, 각각의UPDATE
문에 대해 적절한 인덱스가 없다면 데이터베이스의 성능에 큰 부담을 줄 수 있습니다. 데이터베이스가 각UPDATE
연산을 위해 풀 스캔(full scan)을 수행해야 하는 경우가 발생할 수 있습니다.- 트랜잭션 관리: 대량의
UPDATE
연산을 하나의 트랜잭션으로 처리하려고 할 때, 롤백(rollback)이 필요한 상황이 발생하면 데이터베이스에 상당한 부담을 줄 수 있습니다. 이는 특히 대규모 데이터를 처리하는 경우에 더욱 문제가 될 수 있습니다.
변칙을 사용하는 방법은 임시 테이블을 사용하는 것인데요, 이 방법은 먼저 임시 테이블에 변경하고자 하는 데이터를 삽입한 후, 메인 테이블과 임시 테이블을 조인하여 한 번의 UPDATE
연산으로 필요한 변경을 반영하는 접근 방식을 사용합니다.
먼저 임시 데이터를 담아 놓을 임시 테이블을 생성합니다. 기존 데이터와의 섞임을 방지하기 위해 DROP TABLE IF EXISTS
를 먼저 실행해줍니다.
-- 다량 업데이트를 위한 임시 테이블 생성
DROP TABLE IF EXISTS 임시_계산내역_테이블;
CREATE TABLE 임시_계산내역_테이블
(
...
계산내역 pk int unsigned
, 청구연월 char(7)
, 계약공간 pk int unsigned
, 부과금액 bigint DEFAULT 0
...
)
;
앞 단에서 UPDATE
용으로 모아뒀던 데이터를 bulk INSERT
해줍니다.
-- 다량 업데이트를 위한 임시 테이블에 데이터 insert
INSERT INTO 임시_계산내역_테이블
(
...
계산내역 pk
, 청구연월
, 계약공간 pk
, 부과금액
...
) VALUES (
...
, %(계산내역 pk)s
, %(청구연월)s
, %(계약공간 pk)s
, %(부과금액)s
...
)
;
마지막으로 계산 내역 테이블과 임시 테이블을 pk 기준으로 조인해서 데이터를 업데이트 해줍니다.
기존에는 N 건에 대해 업데이트 쿼리를 날려야했다면 지금은 아래 쿼리 한 번으로 업데이트를 처리할 수 있습니다.
-- 임시테이블과 join 으로 본 테이블에 다량 업데이트
UPDATE 계산내역 테이블 as ct
INNER JOIN 테이블 as tct
ON ct.계산내역 pk = tct.계산내역 pk
SET
ct.부과금액 = tct.부과금액
, ...
;
이렇게 해서 실행 속도는 52초로 약 30% 정도 빨라졌는데요. 뭔가 이상합니다.
지금까지 알던 상식과 다른 일이 벌어지고 있거든요. DB 스펙이 낮다고는 하지만 겨우 천 건의 데이터를 조작하는데에 52초나 걸린다고…?
그래서 로직을 파보다가 aiomysql 에서 제공하고 있는 executemany
사용에 문제가 있는걸 발견했습니다.
executemany
란 무엇이냐. 아래 블로그에 자세히 적어놨지만 간단히 알아보면, aiomysql 에서 bulk INSERT
를 편리하게 말아주는 메서드라고 할 수 있습니다.
[python] aiomysql > excutemany 로 multi insert 쿼리 효율성 개선
아래와 같이 INSERT INTO ~ VALUES ~
문과 여기에 매칭할 파라미터 값 list[dict] 를 넘기면, 정규표현식 확인 및 escaping, 인코딩 처리를 해 bulk INSERT
를 할 수 있게 해줍니다.
INSERT INTO `테이블명` (
필드명,
...
) VALUES (
%(필드명 매칭값)s,
...
)
-- ON DUPLICATE 문도 사용 가능
;
그래서 전 bulk INSERT
가 되고 있는줄 알았습니다…?! 그런데 아니었습니다.
executemany
는 정규표현식으로 우리가 제공한 sql 문이 처리가능한 형식인지 확인하는데, 잘 보면 맨 앞 그룹은 무조건 INSERT 로 시작해야하고(REPLACE 문은 여기서 논외하고), 그 앞에는 아무 것도 존재하면 안됩니다. 그런데 관습적으로 sql 문 맨 윗줄에 주석으로 설명을 적어놓는 것 때문에 이 정규표현식을 통과 안하고 있었습니다.
RE_INSERT_VALUES = re.compile(
r"\s*((?:INSERT|REPLACE)\s.+\sVALUES?\s+)" +
r"(\(\s*(?:%s|%\(.+\)s)\s*(?:,\s*(?:%s|%\(.+\)s)\s*)*\))" +
r"(\s*(?:ON DUPLICATE.*)?);?\s*\Z",
re.IGNORECASE | re.DOTALL)
그리고 친절한 우리 메서드는 정규식을 통과하지 못하면, for 문으로 쿼리를 한땀한땀 실행시켜줍니다.
# aiomysql/cursors.py
m = RE_INSERT_VALUES.match(query)
if m:
q_prefix = m.group(1) % ()
q_values = m.group(2).rstrip()
q_postfix = m.group(3) or ''
assert q_values[0] == '(' and q_values[-1] == ')'
return (await self._do_execute_many(
q_prefix, q_values, q_postfix, args, self.max_stmt_length,
self._get_db().encoding))
else:
rows = 0
for arg in args:
await self.execute(query, arg)
rows += self._rowcount
정규식을 통과하지 못해도 에러를 뱉지 않고 친절하게 실행시켜줘버려 데이터가 적을 때는 인지하지 못하고 있다가, 이번에 알게 되었네요.
우린 상단에 주석을 써야하니까, 주석이 있을 때 이를 제거하고 sql 문을 실행할 수 있도록 코드를 한 줄 추가해줍니다.
# 주석 제거 (pymysql executemany 정규식에 맞도록 변경)
sql = '\n'.join(line for line in sql.split('\n') if not line.strip().startswith('--'))
이러고 보니, 2번에서 개선했던 내용 중 INSERT/UPDATE
를 처리하는 부분은 N → 1 건이 아니라 여전히 N 건으로 처리되고 있었네요.
개선 시간은 하나 더 개선해보고 확인해 보겠습니다.
executemany
로 개선은 했는데, 아직 뭔가 아쉽습니다.
INSERT 를 bulk 로 한 번에 하다보니, INSERT 된 계산내역의 pk 를 바로 얻어낼 수 없어 따로 조회를 해야하고, 이 값을 가지고 로그를 등록할 때도 SELECT INSERT 를 N 번 호출
해야하는 상황 입니다.
임시 테이블을 사용하는 방법을 해보니, 비슷한 방법으로 INSERT/UPDATE
와 로그 처리까지 좀 더 효율적으로 할 수 있어 보여 진행시켜봤습니다.
이전에는 해당되는 계산 내역을 LEFT JOIN
으로 모두 조회해서, 내역이 존재하면 UPDATE
, 없으면 INSERT
로 데이터를 구분하고, 각각의 구분에 따라 DB 처리를 따로 진행했는데요.
이번에는 구분 없이 임시 테이블에 모두 때려박습니다. 다만 이번에는 내역 존재 여부를 확인하지 않았기 때문에 계산 내역 pk 는 담을 수 없습니다.
먼저 임시 데이터를 담아 놓을 임시 테이블을 생성합니다. 이전과 다르게 계산내역 pk 는 Nullable 로 두고, INSERT/UPDATE 구분을 위한 플래그타입을 둡니다.
플래그타입은 디폴트 값으로 insert
를 줍니다. 플래그 타입은 등록/수정을 구분하기 위한 용도이고, 로그 테이블의 로그타입과도 동일하게 적용했습니다.
-- 다량 업데이트를 위한 임시 테이블 생성
DROP TABLE IF EXISTS 임시_계산내역_테이블;
CREATE TABLE 임시_계산내역_테이블
(
...
계산내역 pk int unsigned NULL
, 청구연월 char(7)
, 계약공간 pk int unsigned
, 부과금액 bigint DEFAULT 0
, 플래그 타입 varchar(20) NOT NULL DEFAULT 'insert'
...
)
;
앞 단에서 모아뒀던 데이터를 INSERT/UPDATE 상관 없이 모두 INSERT 해줍니다.
-- 다량 업데이트를 위한 임시 테이블에 데이터 insert
INSERT INTO 임시_계산내역_테이블
(
...
, 청구연월
, 계약공간 pk
, 부과금액
...
) VALUES (
...
, %(청구연월)s
, %(계약공간 pk)s
, %(부과금액)s
...
)
;
이번에는 본 테이블에서 계산내역 존재 여부를 확인해서 임시 테이블의 업데이트 관련 컬럼을 업데이트 해줍니다.
이렇게 하면 임시 테이블에서 등록용인지 업데이트용인지 데이터 구분이 되고, 로그 등록을 위한 데이터도 모두 쌓을 수 있습니다.
-- 임시테이블과 join 으로 임시 테이블에 업데이트 관련 필드 업데이트
UPDATE 임시 계산내역 테이블 as tct
INNER JOIN 계산내역 테이블 as ct
ON tct.청구연월 = ct.청구연월 -- 계산내역 존재 확인 관련 필드 조건
...
SET
tct.계산내역 pk = ct.계산내역 pk
, tct.플래그타입 = 'update'
;
이제 계산내역 INSERT 를 합니다. 임시 테이블에서 플래그 타입이 insert 인 데이터를 조회해서 다 넣어줍니다.
insert into 계산내역 테이블 (
필드1,
...
)
select
필드1,
...
from 임시 계산내역 테이블
where 플래그 타입 = 'insert'
;
업데이트는 전과 같이 pk 를 기준으로 매칭해서 넣어줍니다.
-- 임시테이블과 join 으로 본 테이블에 다량 업데이트
UPDATE 계산내역 테이블 as ct
INNER JOIN 테이블 as tct
ON ct.계산내역 pk = tct.계산내역 pk
SET
ct.부과금액 = tct.부과금액
, ...
;
임시 테이블과 최신 데이터를 가지고 있는 본 테이블을 조인하여 로그에 필요한 데이터 조회하여 INSERT 합니다.
-- 로그 insert
INSERT INTO
로그 테이블
(
로그타입,
필드1,
...
)
SELECT
tct.플래그 타입 AS 로그타입,
필드1,
...
FROM 계산내역 테이블 as tct
INNER JOIN 임시 계산내역 테이블 as ct
ON tct.청구연월 = ct.청구연월
...
;
이렇게 개선한 결과 처음에는 74초가 걸렸던 로직이 1.2초로 줄었습니다.
그런데, 금액을 계산하는 로직들과 4번에서 임시 테이블에 데이터를 넣는 프로세스는 굳이 트랜잭션에 넣지 않아도 될 것 같습니다. 어짜피 임시 테이블에 넣는 것이고, 그 외에는 조회만 있으니까요.
await 임시테이블 데이터 처리
# 임시 데이터 처리 후 트랜잭션 시작
await db.begin
...
# 계산내역 insert / update
await upsert_계산내역
# 트랜잭션 종료
await db.commit()
이렇게 꽤 많은 시간을 들여서 계약의 부과금액을 계산하는 로직을 최적화 해봤습니다.
라이브러리를 사용할 때 좀 더 깊숙이 파봐야겠다는 것과 생각보다 SQL 로 비즈니스 로직을 대체할 수 있는게 많고, 이로 인해 네트워크 지연을 줄여 전체적인 성능 개선을 할 수 있다. 라는 좋은 깨달음을 얻은 경험이었습니다.
-끝-