이전 포스팅에서는 lichess에서 제공한 체스 오프닝 데이터를 간단하게 전처리 했다. 이번엔 본격적으로 우리가 데이터베이스에 저장할 포맷으로 전처리를 진행한다.
데이터를 전처리하기전에 어떤식으로 구조를 짜야 쿼리를 쉽고 빠르게 할 수 있을지에 대해서 고민을 해보다 2가지 선택지를 생각했다.
결론부터 말하자면 hash table (dict)의 구조를 채용하여 전처리 하기로 하였다.
double linked list의 구조도 역시 활용할만 하였으나 여러 예외처리들과 오프닝 데이터를 쿼리를 할 때 특정 오프닝의 이름을 알기 위해서 끝까지 탐색해야 한다는 점이 비효율적이라고 생각했다.
체스 기보를 나타내는 방식엔 pgn방식과 uci방식이 있는데 후에 stockfish를 활용하기 위하여 stockfish와 호환이 잘되는 uci방식으로 데이터를 저장한다.
스톡피시는 꽤 오랜 기간동안 사용된 대중에게 공개된 가장 강력한 체스 엔진입니다. 무료 오픈 소스 엔진인 스톡피시는 현재 사람들이 자발적으로 참여하여 개발되고 있습니다. 스톡피시는 2004년 토르드 롬스타드(Tord Romstad)가 개발한 엔진을 기반으로 하며, 이후 마르코 코스탈바(Marco Costalba)가 2008년 이를 발전시켰습니다. 주나 키이스키(Joona Kiiski)와 게리 린스콧(Gary Linscott)또한 창립자로 여겨집니다.
출처 (https://www.chess.com/ko/terms/stockfish-chess-engine-ko#what)
#pipelines.py
import pandas as pd
from .utils import _revmove_number
FPATH = "<path>"
FNAME = 'pre_data.csv'
class PreProcessing:
def __init__(self, fpath) -> None:
self.fname = FNAME
self.fpath = FPATH
self.__fpath = fpath
self.raw_data = None
self.data_preprocessing()
def data_preprocessing(self):
df =self.import_data_csv(fpath=self.__fpath)
self.raw_data = df
df.to_csv(f'{self.fpath}{self.fname}')
def import_data_csv(self, fpath, sep = '\t'):
data = pd.read_csv(fpath, sep=sep)
df = self.split_data(data)
df = pd.DataFrame(df, columns=['pgn','uci','name','wdl'])
return df
def split_data(self, data):
df = []
for i in data.index:
val = data.loc[i, 'uci'].split(' ')
pgn_val = data.loc[i, 'pgn'].split(' ')
k = _revmove_number(pgn_val)
name = data.loc[i, 'name']
df.append([k,val,name,''])
df.sort(key=lambda x: len(x[0]))
return df
#process.py
import pandas as pd
from ast import literal_eval
import csv
DEFAULT_FILE_NAME = 'preprocessing_opening.csv'
FPATH = "C:\\Users\\aaa57\\chess\\dist\\"
class Hash:
def __init__(self) -> None:
self.default_dic = {
'next' : [],
'name' : 'default',
}
class ProcessData:
def __init__(self,data) -> None:
self.data = data
self.hash_data = {
'start' : Hash().default_dic
}
for i in self.data.index:
try:
self.to_hash(literal_eval(self.data.loc[i,'uci']), self.data.loc[i,'name'])
except:
self.to_hash(self.data.loc[i,'uci'], self.data.loc[i,'name'])
self.to_csv()
def to_hash(self, uci, name):
if len(uci) == 1:
self.hash_data[str(uci)] = Hash().default_dic
self.hash_data['start']['next'].append(uci[-1])
self.hash_data[str(uci)]['name'] = name
else:
try:
self.hash_data[str(uci[:-1])]['next'].append(uci[-1])
except:
self.to_hash(uci[:-1],name)
self.hash_data[str(uci[:-1])]['next'].append(uci[-1])
self.hash_data[str(uci)] = Hash().default_dic
self.hash_data[str(uci)]['name'] = name
def to_csv(self):
with open(f'{FPATH}{DEFAULT_FILE_NAME}', "w", newline="", encoding='utf-8') as csv_file:
# Create a CSV writer object
writer = csv.writer(csv_file)
# Write the header row
writer.writerow(["Move", "Next Moves", "Opening Name"])
# Iterate over the dictionary and write each row to the CSV file
for move, details in self.hash_data.items():
# Convert the string representation of the list back to a list
if move != 'start':
move = eval(move)
next_moves = ",".join(details["next"])
writer.writerow([move, next_moves, details["name"]])
class ProcessUtil:
def to_hash(self, row):
if len(row['pgn']) == 1:
return
데이터가 생성되면 pipelines.py 에서 1차적으로 데이터를 추출 변형시키고 process.py에서 1차적으로 처리된 데이터를 데이터베이스에 저장할 수 있도록 hash table형태로 저장한다.
#Processor.py
from .pipelines import PreProcessing
from process.process import ProcessData
class DataProcessor:
def __init__(self):
self.data = None
self.data_processor("C:\\Users\\aaa57\\chess\\dist\\chess_opening.csv")
self.process()
def data_processor(self, fpath):
self.data = PreProcessing(fpath=fpath).raw_data
def process(self):
ProcessData(self.data)
후에 지속적으로 체스 기보 데이터가 들어올 때마다 코드를 일일히 실행하는 것은 비효율적이기 때문에 DataProcessor라는 객체를 생성해 코드를 최소한으로 작성해 실행할 수 있도록 하였다.