나는 개인적으로 회사의 데이터 관리를 위해 테이블 명세서를 생성하고 관리하는 것은 매우 중요하다고 생각한다.
테이블 명세서는 개발자, 기획자, 회사의 다른 이해관계자들이 회사에서 관리하는 데이터를 빠르고 구체적으로 파악할 수 있게 해주는 핵심 문서이다. 그리고 유지보수나 시스템 확장 시에도 정확한 정보를 제공해 오류를 방지하고 효율적인 협업을 가능하게 한다.
이러한 테이블 명세서를 일일이 수동으로 업데이트하는 대신, 자동화를 통해 주기적으로 업데이트하여 시간을 절약하고자 해당 프로젝트를 진행했다.
따라서 Google Sheets에서 테이블 명세서(Data Catalog) 작업을 더 빠르고 효율적으로 진행하기 위해 Google Sheets API를 활용하여 시트(sheet)를 쉽게 추가하고 업데이트할 수 있는 프로젝트를 기획했다.
Google Sheet API 란
코드 상에서 구글 스프레드시트를 조작하여 스프레드시트를 읽고 수정하고 삭제할 수 있는 인터페이스이다.
※ 아래 글을 참고하여 설정 했다.
Google Sheets API를 통해 스프레트시트 읽기/쓰기
파이썬으로 구글 스프레드시트 편집 자동화하기 3단계
Data Catalog 생성 자동화
💡 DB별 테이블명, 테이블 설명, 데이터 수, 컬럼 수를 알 수 있다.
관련 쿼리
SELECT TABLE_NAME, TABLE_ROWS, TABLE_COMMENT
FROM information_schema.tables
WHERE TABLE_SCHEMA = '{dbName}' ORDER BY TABLE_NAME;
// Column Count 구할 때 필요한 쿼리
SELECT TABLE_NAME, COLUMN_NAME
FROM information_schema.columns
WHERE table_schema='{dbName}' ORDER BY TABLE_NAME;
💡 table별 테이블명, 필드 정보들을 알 수 있다.
관련 쿼리
SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, COLUMN_DEFAULT, IS_NULLABLE, COLUMN_TYPE, COLUMN_KEY, COLUMN_COMMENT
FROM information_schema.columns
WHERE TABLE_SCHEMA = '{dbName}' ORDER BY TABLE_NAME;
Table Sheets
Master Sheets
pymysql을 사용해서 DB와 python 연결, 쿼리 결과를 pandas dataframe으로 받는다.
# prod db에 연결하는 함수
def connectProdDB(companyName):
dbInfo = openDBInfo()
connect = pymysql.connect(
host = dbInfo[f'{companyName}_PROD_DB']['HOST'],
port = dbInfo[f'{companyName}_PROD_DB']['PORT'],
user = dbInfo[f'{companyName}_PROD_DB']['USERNAME'],
password = dbInfo[f'{companyName}_PROD_DB']['PASSWORD'],
use_unicode = True,
charset = 'utf8',
cursorclass=pymysql.cursors.DictCursor
)
cursor = connect.cursor()
return cursor
cursor = connectProdDB(companyName)
cursor.execute(query)
res = cursor.fetchall()
df = pd.DataFrame(res)
위 설정글 참고
gspread는 Google Sheets를 위한 라이브러리로, 쉬운 사용법이 장점이다.
https://docs.gspread.org/en/latest/user-guide.html
# -- 사용 예시 --
# 인증
gc = gspread.authorize(credentials)
# Create
gc.create('sheet_name', folder_id = 'folder_id')
# Open - open / open_by_key / open_by_url
worksheet = gc.open('sheet_name').worksheet('Sheet1')
# update
worksheet.update('A1', [[1, 2], [3, 4]])
4.1 dataframe 삽입
gspread_dataframe
라이브러리의 set_with_dataframe
함수로 worksheet에 dataframe(쿼리 결과) 삽입set_with_dataframe(worksheet, dataframe)
4.2 Create sheet
newSheetFile = gc.create(tableName, folder_id = folderId)
prodSheet = newSheetFile.sheet1
prodSheet.update_title("PROD")
# 시트에 DF 삽입
set_with_dataframe(worksheet = prodSheet, dataframe = df[df['TABLE_NAME'] == tableName])
# 새로 생성된 sheet의 ID 추가
sheetId = newSheetFile.id
idDict = openSheetIdInfo(dbName)
idDict[tableName] = sheetId
addSheetIdAndSave(dbName, idDict)
5.1 기존 sheet와 비교
# folder에 존재하는 각 sheet의 id와 name을 가져온다.
results = driveService.files().list(q=f"'{folderId}' in parents and trashed=false", fields="files(id, name)").execute()
items = results.get('files', [])
df['COLUMN_NOTE'] = np.nan
for tableName in tqdm(df.TABLE_NAME.unique()):
existingFile = None
itemsCopy = items.copy()
for item in items:
if item['name'] == tableName:
existingFile = item # item에 없는 것이 df에 있을 때 → 파일 추가
itemsCopy.remove(item)
break
5.2 기존 Sheet의 데이터 가져오기
gspread_dataframe
라이브러리의 get_as_dataframe
함수로 worksheet에 dataframe(쿼리 결과) 삽입prodSheet = gc.open_by_key(existingFile['id']).get_worksheet(0)
existingDF = get_as_dataframe(prodSheet)
5.3 업데이트 수행
# 1. item에 있는데 df.TABLE_NAME에 없음 -> existingFile = None (삭제된 테이블)
# 2. item에 없는데 df.TABLE_NAME에 있음 -> existinfFile = None (새로 추가된 테이블)
# 3. item에 있고 df.TABLE_NAME에 있음 -> existongFile = Exist
# 3
if existingFile:
prodSheet = gc.open_by_key(existingFile['id']).get_worksheet(0)
existingDF = get_as_dataframe(prodSheet).dropna(how = 'all').iloc[:,:9]
# 새로운 컬럼 확인 후 업데이트
newRows = df[(df['TABLE_NAME'] == tableName) & (~df['COLUMN_NAME'].isin(existingDF['COLUMN_NAME']))]
if not newRows.empty:
set_with_dataframe(worksheet = prodSheet, dataframe = newRows, row = existingDF.shape[0] + 2, include_index = False, include_column_header = False)
# 2
else:
newSheetFile = gc.create(tableName, folder_id = folderId)
prodSheet = newSheetFile.sheet1
prodSheet.update_title("PROD")
set_with_dataframe(worksheet = prodSheet, dataframe = df[df['TABLE_NAME'] == tableName])
# sheet ID 딕셔너리 업데이트
sheetId = newSheetFile.id
idDict = openSheetIdInfo(dbName)
idDict[tableName] = sheetId
addSheetIdAndSave(dbName, idDict)
# 1
if len(itemsCopy) != 0:
for item in itemsCopy:
deletingFile = item # df에 없는 것이 item에 있을 떄 → 파일 삭제
driveService.files().delete(fileId = deletingFile['id']).execute()
# Master Sheet 파일 생성 및 새 워크시트 생성하는 함수 (새 워크시트를 생성하는 경우: 새로운 DB가 추가되었을 떄)
def makeMasterCatalog(orderType, dbName, df, sheetName = "Master_data_catalog", folderId = "1ZSp_OrzjBun9eqXW*************"):
idDict = openSheetIdInfo(dbName)
query = f'''select TABLE_NAME, TABLE_ROWS, TABLE_COMMENT
from information_schema.tables
where table_schema = '{dbName}' order by TABLE_NAME;'''
masterDF = queryReturnDf("PROD", query,'Company1')
masterCatalog = df.groupby(['TABLE_SCHEMA','TABLE_NAME']).agg({
'COLUMN_NAME': lambda x:x.count()
}).reset_index().rename(columns={'COLUMN_NAME':'COLUMN_COUNT'})
masterCatalog['ROW_COUNT'] = masterDF['TABLE_ROWS']
masterCatalog['TABLE_COMMENT'] = masterDF['TABLE_COMMENT']
masterCatalog['TABLE_DESCRIPTION'] = ''
masterCatalog['TABLE_SHEET'] = masterCatalog['TABLE_NAME'].apply(lambda x:"https://docs.google.com/spreadsheets/d/%s" % idDict[x])
masterCatalog = masterCatalog[['TABLE_SCHEMA','TABLE_NAME', 'ROW_COUNT', 'COLUMN_COUNT', 'TABLE_COMMENT','TABLE_DESCRIPTION','TABLE_SHEET']]
gc = getCertified()
if orderType == "CREATE": # Master_data_catalog sheet 파일 생성
newSheet = gc.create(sheetName, folder_id=folderId)
newMasterSheet = newSheet.sheet1
newMasterSheet.update_title(dbName)
else: # "ADD" , # Master_data_catalog sheet의 새로운 worksheet 생성
gc.open(sheetName, folder_id=folderId)
newMasterSheet = gc.open(sheetName).add_worksheet(title=dbName, rows=100, cols=20)
set_with_dataframe(worksheet=newMasterSheet, dataframe=masterCatalog)
TABLE_SHEET
는 DB별 모든 테이블의 sheetID가 저장된 idDict 딕셔너리 데이터를 활용하여, 테이블별 시트 URL을 저장orderType == “CREATE” : (Master data catalog 파일 없을 시) 새 파일 생성
gc.create(sheetName, folder_id=folderId)
orderType == “ADD” : 새 DB가 추가됨에 따른 기존 파일에 새 워크시트 생성
gc.open(sheetName, folder_id=folderId) # 파일명을 sheetName으로
newMasterSheet = gc.open(sheetName).add_worksheet(title=dbName, rows=100, cols=20)
# 워크시트명을 dbName으로
ROWS_COUNT
, COLUMN_COUNT
ROWS_COUNT
, COLUMN_COUNT
는 수치가 매번 달라질 가능성이 높으므로, 두 컬럼만 업데이트하는 로직을 추가했다,7-1. 먼저 테이블 변경사항이 있는지 확인
# 먼저 테이블 변경사항이 있는지 확인
gc = getCertified()
sheet = gc.open(sheetName, folderId).worksheet(dbName)
maxRow = len(sheet.get_all_values()) - 1
masterDF = get_as_dataframe(sheet)[:maxRow]
masterDF = masterDF.drop(columns = [c for c in masterDF.columns if "Unnamed" in c]).dropna(how="all")
query = f'''select TABLE_SCHEMA, TABLE_NAME, TABLE_COMMENT
from information_schema.tables
where TABLE_SCHEMA = '{dbName}' order by TABLE_NAME;
'''
df = queryReturnDf("PROD", query, company)
newRows = df[(df['TABLE_SCHEMA'] == dbName) & (~df['TABLE_NAME'].isin(masterDF['TABLE_NAME']))]
7-2. 새로운 테이블 및 삭제된 테이블 확인 후 업데이트
# 새로운 테이블 확인 후 업데이트
if not newRows.empty:
idDict = openSheetIdInfo(dbName)
newRows['ROW_COUNT'] = np.nan
newRows['COLUMN_COUNT'] = np.nan
newRows['TABLE_DESCRIPTION'] = np.nan
newRows['TABLE_SHEET'] = newRows['TABLE_NAME'].apply(lambda x:"https://docs.google.com/spreadsheets/d/%s" % idDict[x])
newRows = newRows[['TABLE_SCHEMA','TABLE_NAME','ROW_COUNT','COLUMN_COUNT','TABLE_COMMENT','TABLE_DESCRIPTION', 'TABLE_SHEET']]
set_with_dataframe(worksheet = sheet, dataframe = newRows, row = masterDF.shape[0] + 2, include_index = False, include_column_header = False)
masterDF = masterDF.append(newRows, ignore_index = True)
# 삭제된 테이블 확인 후 masterDF에서 제거
dfTableNames = df['TABLE_NAME'].tolist()
deletedTables = masterDF[~masterDF['TABLE_NAME'].isin(dfTableNames)]
if not deletedTables.empty:
idDict = openSheetIdInfo(dbName)
# 삭제할 행 번호와 TABLE_NAME 리스트 생성
rowsToDelete = []
for index, row in deletedTables.iterrows():
rowsToDelete.append((index + 2, row['TABLE_NAME']))
for rowNumber, tableName in sorted(rowsToDelete, key=lambda x: x[0], reverse=True):
sheet.delete_rows(rowNumber)
idDict.pop(tableName)
masterDF = masterDF[masterDF['TABLE_NAME'].isin(dfTableNames)]
deleteSheetIDAndSave(dbName, idDict)
7-3. ROW_COUNT , COLUMN_COUNT 업데이트
# ROW_COUNT, COLUMN_COUNT 업데이트
rowQuery = f'''select TABLE_NAME, TABLE_ROWS
from information_schema.tables
where table_schema = '{dbName}' order by TABLE_NAME;
'''
cursor = connectProdDB(company)
cursor.execute(rowQuery)
rowRes = cursor.fetchall()
rowCountDict = {}
for i in rowRes:
rowCountDict[i['TABLE_NAME']] = int(i['TABLE_ROWS'])
colQuery = f'''
select TABLE_NAME, COLUMN_NAME
from information_schema.columns
where table_schema='{dbName}' order by TABLE_NAME;
'''
cursor.execute(colQuery)
colRes = cursor.fetchall()
colCountDict = pd.DataFrame(colRes).groupby('TABLE_NAME')['COLUMN_NAME'].count().to_dict()
masterDF['ROW_COUNT'] = masterDF['TABLE_NAME'].apply(lambda x: rowCountDict[x])
masterDF['COLUMN_COUNT'] = masterDF['TABLE_NAME'].apply(lambda x: colCountDict[x])
set_with_dataframe(sheet, masterDF)