
Finance data reader를 사용해서 매일 실시간으로 전세계의 데이터를 나의 로컬 데이터베이스에 수집하고 싶은 경우에 쿼리문을 사용해서 데이터를 취득해야 하는 상황이 있다.
Market은 오차가 있지만 KOSPI(958 종목), KOSDAQ(약 1700 종목), NASDAQ(약 3300 종목) 뿐만 아니라 매우 많은 종목이 있다. 이렇게 종목도 많은데 데이터베이스에 수집하려면 조회 성능을 필수적으로 좋게 만들어야 한다.
첫번째로 시도해본 것은 일단 테이블이 없는 경우에는 만들어야 하기 때문에 IF NOT EXISTS 옵셥을 넣어서 테이블을 만들었다. 이것으로 테이블 없는 경우에 대한 불편함은 해결되었다.
두번째로 시도해본 것은 데이터를 삽입할 때 기존에 수집되어 있는 데이터 처리가 중복으로 수집되는 것을 방지 해야하는 문제에 들어왔다. 이런경우에는 데이터를 제거(DELECTE)하고 새롭게 넣는 것(INSERT)이 좋은지 아니면 데이터 REPLACE INTO로 하는 것이 좋은지를 비교해 보았다. 실제로 비교를 해보니까 REPLACE INTO를 쓰는 방법이 월등히 조회 성능이 좋았다.
세번째로 조회 성능을 높이기 위해서 문게가 되었던 것은 commit을 너무 많이하면 안된다는 것을 알게 되었는데, 아래에 첫번째 코드를 보면 commit이 쿼리문 한번 실행할 때마다 실행되고 있어서 한 1종목 지날때마다 너무 많은 리소스를 잡아 먹어서 30번 반복했을 때는 거의 다음 단계로 못넘어가고 있는 것을 확인했다.
def dataInsert(self, data):
data = data.replace({np.nan: None})
data = data.reset_index()
# Cursor
with self.conn.cursor() as cursor:
# Create if not exists
columns_sql = ", ".join([f"`{col}` TEXT" for col in self.col_label]) # 주식 데이터 컬럼명 지정
# 테이블리 없는 경우에 생성
create_table_query = "CREATE TABLE IF NOT EXISTS `{}_log` ({}) CHARACTER SET utf8mb4;".format(
self.table_name, columns_sql)
cursor.execute(create_table_query)
# self.conn.commit()
# 계산한 날 제거 쿼리 (중복을 방지하려고)
delete_query = f"""
DELETE FROM `{self.table_name}_log`
WHERE `Time_stamp` = %s AND `{self.Code_index}` = %s;
"""
keys_to_delete = data[['Time_stamp', self.Code_index]].values.tolist()
# 삭제 조건에 부합한 것을 적용한다.
cursor.executemany(delete_query, keys_to_delete)
# self.conn.commit()
# 제거된 부분에 INSERT 쿼리 (새 행 삽입)
insert_query = f"""
INSERT INTO `{self.table_name}_log` ({', '.join([f'`{col}`' for col in self.col_label])})
VALUES ({', '.join(['%s' for _ in self.col_label])});
"""
# 값을 집어넣는다.
cursor.executemany(insert_query, data.values.tolist())
self.conn.commit()
print(f"Inserted {len(data)} rows into '{self.table_name}_log'.")
def dataInsert(self, data):
data = data.replace({np.nan: None})
data = data.reset_index()
with self.conn.cursor() as cursor:
# 1) 최초 한 번만 생성해야 하는 코드 → 여기는 호출마다 넣지 않기
columns_sql = ", ".join([f"`{col}` TEXT" for col in self.col_label])
create_table_query = f"""
CREATE TABLE IF NOT EXISTS `{self.table_name}_log` (
{columns_sql},
UNIQUE KEY unique_key (`Time_stamp`, `{self.Code_index}`)
) CHARACTER SET utf8mb4;
"""
cursor.execute(create_table_query)
# 2) DELETE + INSERT → REPLACE INTO 로 통합
replace_query = f"""
REPLACE INTO `{self.table_name}_log`
({', '.join([f'`{col}`' for col in self.col_label])})
VALUES ({', '.join(['%s' for _ in self.col_label])});
"""
cursor.executemany(replace_query, data.values.tolist())
self.conn.commit() # 단 한 번만 커밋
print(f"Upserted {len(data)} rows into '{self.table_name}_log'.")
```