[Python] 평가 결과 일치도 확인 (2)

Connected Brain·2026년 5월 13일

Kappa-Agreement 분석 Tool

0. Summary

분석 데이터 전처리 및 검수 Tool 요약

목적

  • 비정형 평가 로그를 분석 가능한 표준 데이터(Tidy Data)로 자동 변환하고, 평가자 간 의견 충돌이 발생한 지점(이상치)을 시각화하여 데이터 검수 효율을 극대화함

핵심 기능

  • 동적 데이터 정규화(Normalization): Wide Format의 로그를 분석 최적화된 Long Format으로 자동 전환하며, 가변적인 컬럼 구조에도 에러 없이 대응
  • 마스킹 자동 해제(De-identification): 익명화된 모델 ID와 실제 모델 정보를 매핑 테이블을 통해 결합하여 분석의 데이터 정합성 확보
  • 통계적 이상치 탐지(Outlier Detection): 동일 문항 내 점수 편차(MaxMin3\text{Max} - \text{Min} \ge 3)를 기반으로 재검토가 필요한 문항을 실시간 선별
  • 시각적 검수 자동화(Highlighting): 구글 시트 API 연동을 통해 이상치가 발생한 셀의 최댓값/최솟값을 자동으로 컬러링하여 가독성 증대

기대 효과

  • 데이터 클리닝 가속화: 수작업으로 진행하던 방대한 양의 전처리 과정을 자동화하여 분석 준비 시간 단축
  • 품질 관리(QA) 정밀도 향상: 단순 일치도 확인을 넘어, 실제 갈등 지점을 시각적으로 즉각 파악하여 데이터 신뢰도 제고
  • 확장성 있는 파이프라인: 다양한 평가 양식과 프로젝트 스키마에 유연하게 적용 가능한 견고한 전처리 구조 구축
    (평가 지표명, 로그 상의 Column 명 등이 바뀌어도 대응할 수 있음)

1. Input Format

xlsx 포맷

통합 문서 구성

  • 각각의 영역(Category) 별로 시트가 분리되어 있음

평가 결과 로그

  • 해당 영역(Category)에 속한 세부영역(Sub-category)에 해당하는 문항이 포함
  • ID,script_itn,file_size와 같은 평가에 사용한 모델 음성 관련 정보 포함
  • (모델 명): 응답열에 각 모델별 출력 응답, (모델 명): (평가항목)열에 각 모델별 출력 응답에 대한 평가자의 평가 점수가 위치, 상세 사유는 (모델 명): 비고열에 기록 위치함
  • (모델 명): 이상여부에는 평가자가 모델 오류로 판단한 경우 표시하는 체크 박스의 값을 표시

2. 평가 결과 일치도 확인 툴

0) 필수 라이브러리 설치

import sys
import subprocess
import importlib

#@title 필수 라이브러리 목록 (pip 설치명 : 파이썬 모듈명)
required_packages = {
    'gspread': 'gspread',
    'gspread-dataframe': 'gspread_dataframe',
    'gspread-formatting': 'gspread_formatting',
    'statsmodels': 'statsmodels',
    'ipywidgets': 'ipywidgets'
}

print("--- 필수 라이브러리 점검 시작 ---")
for package_name, module_name in required_packages.items():
    try:
        importlib.import_module(module_name)
        print(f"[OK] {package_name} 라이브러리가 이미 설치되어 있습니다.")
    except ImportError:
        print(f"[INSTALL] {package_name} 모듈을 찾을 수 없어 설치를 진행합니다...")
        # 파이썬 내부에서 pip 명령어를 안전하게 호출하여 조용히(-q) 설치
        subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", package_name])
        print(f" -> {package_name} 설치 완료.")
print("--- 필수 라이브러리 점검 완료 ---\n")

# 데이터 처리 및 GUI용 라이브러리 임포트
import pandas as pd
import numpy as np
import io
import ipywidgets as widgets
from IPython.display import display
from statsmodels.stats.inter_rater import fleiss_kappa

# 구글 시트 연동용 라이브러리 임포트
from google.colab import auth
from google.auth import default
import gspread
from gspread_dataframe import set_with_dataframe
from gspread_formatting import *

# Google 계정 인증 (실행 시 팝업창을 통해 권한 허용 필요)
try:
    auth.authenticate_user()
    creds, _ = default()
    gc = gspread.authorize(creds)
    print("Google 계정 인증이 완료되었습니다.")
except Exception as e:
    print(f"인증 과정에서 오류가 발생했습니다: {e}")

필수 라이브러리 목록

  • gspread : Google Sheets API를 사용하여 구글 스프레드시트를 생성, 데이터 접근 및 제어 기능 제공
  • gspread-dataframe : Pandas데이터프레임(DataFrame) 객체를 구글 시트로 즉시 전송, 또는 반대로 시트의 데이터를 데이터프레임으로 변환하는 기능 제공
  • gspread-formatting : 구글 시트 내의 셀 배경색, 텍스트 스타일 등 조건부 서식을 파이썬 코드로 조작
  • statsmodels : 고급 통계 모델링 및 검정 기능을 제공(Fleiss-kappa 지수를 연산을 위해 사용)
  • ipywidgets : Google Colab 환경에서 텍스트 입력창, 파일 업로드 버튼, 실행 버튼 등 사용자가 직접 조작할 수 있는 대화형 GUI 인터페이스 제공

라이브러리 설치 여부 확인

print("--- 필수 라이브러리 점검 시작 ---")
for package_name, module_name in required_packages.items():
    try:
        importlib.import_module(module_name)
        print(f"[OK] {package_name} 라이브러리가 이미 설치되어 있습니다.")
    except ImportError:
        print(f"[INSTALL] {package_name} 모듈을 찾을 수 없어 설치를 진행합니다...")
        # 파이썬 내부에서 pip 명령어를 안전하게 호출하여 조용히(-q) 설치
        subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", package_name])
        print(f" -> {package_name} 설치 완료.")
print("--- 필수 라이브러리 점검 완료 ---\n")
  • 라이브러리 목록에 있는 라이브러리를 import문이 아닌 importlib.import_module() 함수를 통해 호출해, 현재 런타임 상에 존재하는지 확인
  • 해당 호출에서 에러가 발생할 경우 try-except 구문을 활용해 내부 프로세스 호출을 통한 자동 설치로 분기
  • 별도의 !pip install 명령어 없이 동적으로 미설치된 라이브러리 항목 설치를 진행

내부 프로세스 설치 과정 상세

subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", package_name])
  • subprocess : 파이썬 내부의 시스템 명령어를 실행하는 모듈
    • 종료 대기 여부를 설정할 수 있어 향후 비동기 기능 구현에 사용될 수 있음
  • sys.executable : 실행 중인 파이썬 인터프리터의 절대 결로를 호출
  • check_call : 입력한 명령어 실행이 완료될 때까지 메인 프로그램 실행을 일시정지(Blocking)
    → 정상적으로 실행이 완료된 이후 메인 프로그램 실행
  • "-q" : 설치 과정에서 출력되는 로그를 출력하지 않도록 함

1) 입력 GUI 및 파일 업로드

#@title 1. 입력 GUI 및 파일 업로드 폼 구성
import pandas as pd
import io
import ipywidgets as widgets
from IPython.display import display

# 시트 접두사 입력 위젯 추가
prefix_input = widgets.Text(value='', description='시트 접두사:', placeholder='예: SLM_번역_1차_')
id_input = widgets.Text(value='id', description='ID 컬럼:')
rater_input = widgets.Text(value='tester', description='작업자 컬럼:')
# 그룹 컬럼 기본값을 'category, sub_category'로 변경
group_input = widgets.Text(value='category, sub_category', description='그룹 컬럼(쉼표 구분):', layout=widgets.Layout(width='50%'))
metric_input = widgets.Text(value='유창성, 정확성', description='평가지표 키워드(쉼표 구분):', layout=widgets.Layout(width='50%'))
threshold_input = widgets.FloatText(value=3.0, description='이상치 임계값:')

log_upload_widget = widgets.FileUpload(accept='.csv, .xlsx', multiple=False, description='로그 파일')
mask_upload_widget = widgets.FileUpload(accept='.csv, .xlsx', multiple=False, description='마스킹 파일')

load_button = widgets.Button(description='데이터 로드', button_style='info')
output_log = widgets.Output()

# GUI 화면 배치
display(prefix_input, id_input, rater_input, group_input, metric_input, threshold_input)
display(widgets.HBox([log_upload_widget, mask_upload_widget]), load_button, output_log)

df_raw = pd.DataFrame()
df_masking_raw = pd.DataFrame()

def load_file_to_df(upload_data):
    if isinstance(upload_data, dict):
        filename = list(upload_data.keys())[0]
        content = upload_data[filename]['content']
    else:
        uploaded_file = upload_data[0]
        filename = uploaded_file['name']
        content = uploaded_file['content']

    if filename.endswith('.csv'):
        df = pd.read_csv(io.BytesIO(content))
        # CSV의 경우 파일명을 'category'로 간주하여 추가
        df['category'] = filename.replace('.csv', '')
        return df

    elif filename.endswith(('.xls', '.xlsx')):
        all_sheets = pd.read_excel(io.BytesIO(content), sheet_name=None)
        df_list = []

        # 엑셀 내의 모든 시트를 순회하며 시트 이름을 'category' 컬럼으로 추가
        for sheet_name, df in all_sheets.items():
            df['category'] = sheet_name
            df_list.append(df)

        if len(df_list) > 1:
            return pd.concat(df_list, ignore_index=True)
        else:
            return df_list[0]

    return pd.DataFrame()

def on_load_button_clicked(b):
    global df_raw, df_masking_raw
    with output_log:
        output_log.clear_output()
        if not log_upload_widget.value or not mask_upload_widget.value:
            print("로그 파일과 마스킹 파일을 모두 업로드해주세요.")
            return

        try:
            df_raw = load_file_to_df(log_upload_widget.value)
            df_masking_raw = load_file_to_df(mask_upload_widget.value)
            print(f"로그 데이터({len(df_raw)}행) 및 마스킹 데이터({len(df_masking_raw)}행) 로드 완료.")
            if 'category' in df_raw.columns:
                print(f"-> 인식된 'category'(시트) 목록: {df_raw['category'].unique().tolist()}")
        except Exception as e:
            print(f"로드 중 오류 발생: {e}")

load_button.on_click(on_load_button_clicked)

입력 GUI 요소

# 시트 접두사 입력 위젯 추가
prefix_input = widgets.Text(value='', description='시트 접두사:', placeholder='예: SLM_번역_1차_')
id_input = widgets.Text(value='id', description='ID 컬럼:')
rater_input = widgets.Text(value='tester', description='작업자 컬럼:')
# 그룹 컬럼 기본값을 'category, sub_category'로 변경
group_input = widgets.Text(value='category, sub_category', description='그룹 컬럼(쉼표 구분):', layout=widgets.Layout(width='50%'))
metric_input = widgets.Text(value='유창성, 정확성', description='평가지표 키워드(쉼표 구분):', layout=widgets.Layout(width='50%'))
threshold_input = widgets.FloatText(value=3.0, description='이상치 임계값:')

log_upload_widget = widgets.FileUpload(accept='.csv, .xlsx', multiple=False, description='로그 파일')
mask_upload_widget = widgets.FileUpload(accept='.csv, .xlsx', multiple=False, description='마스킹 파일')

load_button = widgets.Button(description='데이터 로드', button_style='info')
output_log = widgets.Output()
  • prefix_input : 구글 시트 이름 앞에 붙을 문자열 (파일 구분을 쉽게 하기 위함)
  • id_input : 문항을 식별할 때 사용할 고유 번호가 위치한 Cloumn 명
  • rater_input : 평가를 실시한 작업자를 표시하는 이름, ID가 위치한 Cloumn 명
  • group_input : 일치도 분석을 실시할 계층과 포함할 로그 정보 목록
  • metric_input : 평가 점수열을 찾기 위한 키워드 목록
    (해당 목록에 속한 평가지표 명이 포함된 열을 추출해 통계 분석 대상으로 포함)
  • threshold_input : 동일 평가 대상에 대해 점수 차이가 해당 입력값 이상일 경우 이상치로 판정

업로드 파일 처리 로직

df_raw = pd.DataFrame()
df_masking_raw = pd.DataFrame()

def load_file_to_df(upload_data):
    if isinstance(upload_data, dict):
        filename = list(upload_data.keys())[0]
        content = upload_data[filename]['content']
    else:
        uploaded_file = upload_data[0]
        filename = uploaded_file['name']
        content = uploaded_file['content']

    if filename.endswith('.csv'):
        df = pd.read_csv(io.BytesIO(content))
        # CSV의 경우 파일명을 'category'로 간주하여 추가
        df['category'] = filename.replace('.csv', '')
        return df

    elif filename.endswith(('.xls', '.xlsx')):
        all_sheets = pd.read_excel(io.BytesIO(content), sheet_name=None)
        df_list = []

        # 엑셀 내의 모든 시트를 순회하며 시트 이름을 'category' 컬럼으로 추가
        for sheet_name, df in all_sheets.items():
            df['category'] = sheet_name
            df_list.append(df)

        if len(df_list) > 1:
            return pd.concat(df_list, ignore_index=True)
        else:
            return df_list[0]

    return pd.DataFrame()
  • isinstance(upload_data, dict) : 입력된 파일 형식이 딕셔너리(dict) 자료형인지 확인
    • 딕셔너리(dict) 자료형일 경우 : keys()를 통해 첫 번째 파일명을 가져오고, 해당 파일 내부 정보 추출
    • 딕셔너리(dict) 자료형이 아닌 경우 : 객체 배열로 간주하고, 첫 번째 요소의 'name''content'를 가져와 정보를 추출
  • 불러온 내부 정보를.csv, .xls, .xlsx 확장자에 따라 대응할 수 있도록 설계
    • .csv 확장자인 경우 : 파일명을 영역(category) 요소로 간주하고 내부 정보를 데이터 프레임으로 불러옴
    • .xls, .xlsx 확장자인 경우 : 엑셀 내 모든 시트를 순회하며 시트 이름을영역(category) 열에 추가

데이터 규모 및 데이터 분류 식별

print(f"로그 데이터({len(df_raw)}행) 및 마스킹 데이터({len(df_masking_raw)}행) 로드 완료.")
if 'category' in df_raw.columns:
	print(f"-> 인식된 'category'(시트) 목록: {df_raw['category'].unique().tolist()}")
  • 업로드된 전체 데이터 규모를 확인하고 엑셀 파일의 모든 시트가 로드되었는지 시각적으로 확인할 수 있도록 함

2) 데이터 전처리 및 통합 데이터 프레임 구성

#@title 2. 전처리: script_itn 보존 및 통합 데이터 프레임 구성
if not df_raw.empty and not df_masking_raw.empty:
    ID_COLUMN = id_input.value.strip()
    RATER_COLUMN = rater_input.value.strip()
    GROUP_COLUMNS = [x.strip() for x in group_input.value.split(',') if x.strip()]
    TARGET_METRIC_KEYWORDS = [x.strip() for x in metric_input.value.split(',') if x.strip()]

    # [수정] script_itn 컬럼이 존재할 경우 필수 변수(essential_cols) 및 ID 변수(id_vars)에 포함
    REFERENCE_COLS = ['script_itn'] if 'script_itn' in df_raw.columns else []
    essential_cols = [ID_COLUMN, RATER_COLUMN] + GROUP_COLUMNS + REFERENCE_COLS

    # [1] 마스킹 데이터 정규화
    mask_id_col = df_masking_raw.columns[0]
    df_mask_lookup = df_masking_raw.melt(id_vars=mask_id_col, var_name='마스킹_모델명', value_name='모델')
    df_mask_lookup.rename(columns={mask_id_col: ID_COLUMN}, inplace=True)
    df_mask_lookup['모델'] = df_mask_lookup['모델'].str.strip()

    # [2] 평가 로그 컬럼 분류
    metric_vars = []
    text_vars = []
    TEXT_KEYWORDS = ["이상여부", "비고", "응답", "오류", "사유"]

    for col in df_raw.columns:
        if any(ex in col for ex in TEXT_KEYWORDS):
            text_vars.append(col)
        elif any(keyword in col for keyword in TARGET_METRIC_KEYWORDS):
            metric_vars.append(col)
            df_raw[col] = pd.to_numeric(df_raw[col], errors='coerce')

    # [3] Long Format 생성 (id_vars에 REFERENCE_COLS 추가하여 데이터 보존)
    id_vars = [ID_COLUMN, RATER_COLUMN] + [c for c in GROUP_COLUMNS if c in df_raw.columns] + REFERENCE_COLS

    # 점수 데이터 처리
    if metric_vars:
        melted_metrics = df_raw.melt(id_vars=id_vars, value_vars=metric_vars, var_name='raw_metric', value_name='score')
        melted_metrics[['모델', '지표']] = melted_metrics['raw_metric'].str.split(':', n=1, expand=True)
        melted_metrics['지표'] = melted_metrics['지표'].str.strip()
        melted_metrics['모델'] = melted_metrics['모델'].str.strip()
        df_scores = melted_metrics.pivot_table(index=id_vars + ['모델'], columns='지표', values='score', aggfunc='first').reset_index()
    else:
        df_scores = pd.DataFrame(columns=id_vars + ['모델'])

    # 텍스트 데이터 처리
    if text_vars:
        melted_texts = df_raw.melt(id_vars=id_vars, value_vars=text_vars, var_name='raw_text_metric', value_name='text_value')
        melted_texts[['모델', '유형']] = melted_texts['raw_text_metric'].str.split(':', n=1, expand=True)
        melted_texts['유형'] = melted_texts['유형'].str.strip()
        melted_texts['모델'] = melted_texts['모델'].str.strip()
        df_texts = melted_texts.pivot_table(index=id_vars + ['모델'], columns='유형', values='text_value', aggfunc='first').reset_index()
        df_final_raw = pd.merge(df_scores, df_texts, on=id_vars + ['모델'], how='outer')
    else:
        df_final_raw = df_scores

    # [4] 마스킹 정보 결합
    df_final = pd.merge(df_final_raw, df_mask_lookup, on=[ID_COLUMN, '모델'], how='left')

    # [5] 문항 번호 카운팅
    if GROUP_COLUMNS:
        unique_q = df_final[GROUP_COLUMNS + [ID_COLUMN]].drop_duplicates().sort_values(GROUP_COLUMNS + [ID_COLUMN])
        unique_q['문항 번호'] = unique_q.groupby(GROUP_COLUMNS).cumcount() + 1
        df_final = pd.merge(df_final, unique_q, on=GROUP_COLUMNS + [ID_COLUMN], how='left')

    # 컬럼 순서 조정 (script_itn을 ID와 문항 번호 근처로 배치)
    cols = df_final.columns.tolist()
    if 'script_itn' in cols:
        target_idx = cols.index('문항 번호') + 1 if '문항 번호' in cols else cols.index(ID_COLUMN) + 1
        cols.insert(target_idx, cols.pop(cols.index('script_itn')))
    df_final = df_final[cols]

    print(f"전처리 완료. 'script_itn'을 포함한 {len(df_final)}행의 통합 데이터를 구성했습니다.")
    df_wide = df_raw[essential_cols + metric_vars].copy()

사용자 입력값 변수화

ID_COLUMN = id_input.value.strip()
RATER_COLUMN = rater_input.value.strip()
GROUP_COLUMNS = [x.strip() for x in group_input.value.split(',') if x.strip()]
TARGET_METRIC_KEYWORDS = [x.strip() for x in metric_input.value.split(',') if x.strip()]
  • GUI에서 입력한 값 변수화
    • ID평가자strip()으로 공백 제거
    • 그룹평가지표는 공백 제거 후 콤마( , )를 기준으로 분리해 리스트화

마스킹 데이터 정규화

  • 업로드된 평가로그는 실제 모델 데이터를 제공하지 않으며 오로지 마스킹된 모델명만 제시함.
  • 문항별 실제 모델 정보는 마스킹 정보 파일에 기록되어 있으므로 ID별 실제 모델 정보가 포함된 마스킹 파일을 함께 업로드해 문항의 응답별 실제 모델을 확인할 수 있게 함
mask_id_col = df_masking_raw.columns[0]
df_mask_lookup = df_masking_raw.melt(id_vars=mask_id_col, var_name='마스킹_모델명', value_name='모델')
df_mask_lookup.rename(columns={mask_id_col: ID_COLUMN}, inplace=True)
df_mask_lookup['모델'] = df_mask_lookup['모델'].str.strip()
  • 마스킹 파일을 불러온 데이터 프레임이 첫번째 열을 ID 열로 간주
  • 기존에 wide format으로 작성된df_masking_rawmelt()해 long format으로 수정
    • wide format : 하나의 ID 행에 마스킹 모델 명을 헤더로 하는 열에 실제 모델명이 가로로 나열됨
    • long format : 하나의 ID 행에 마스킹 모델명, 실제 모델명이 나열
  • 수정한 모델 마스킹 정보의 ID 열의 이름을 입력한 ID 열의 이름과 동일하게 변경
df_final = pd.merge(df_final_raw, df_mask_lookup, on=[ID_COLUMN, '모델'], how='left')
  • 전체 평가 로그에서 ID와 마스킹 모델명 2가지를 통해 실제 모델명을 찾아 기록
  • 이전에 전체 평가 로그의 모델 및 ID 열 이름과 마스킹 정보의 열 이름을 통일해두었기에, 쉽게 합칠 수 있음

평가 결과 전처리

metric_vars = []
text_vars = []
TEXT_KEYWORDS = ["이상여부", "비고", "응답", "오류", "사유"]

for col in df_raw.columns:
	if any(ex in col for ex in TEXT_KEYWORDS):
    	text_vars.append(col)
	elif any(keyword in col for keyword in TARGET_METRIC_KEYWORDS):
    	metric_vars.append(col)
    	df_raw[col] = pd.to_numeric(df_raw[col], errors='coerce')
  • raw 데이터의 열을 확인하며 입력했던 평가지표 키워드를 포함하는 열은 평가 점수 열로 간주해 숫자로 값을 가져오고, 텍스트 키워드에 해당하는 열은 텍스트 형태로 값을 불러옮
id_vars = [ID_COLUMN, RATER_COLUMN] + [c for c in GROUP_COLUMNS if c in df_raw.columns] + REFERENCE_COLS
  • long format 구성시 사용할 고정 헤더 목록 설정
    • ID와 평가자 + 그룹 목록에서 실제 데이터에 존재하는 요소 + 참고자료 열
  • id_vars +text_vars (텍스트 형태) + metric_vars (숫자 형태)로 최종 long format이 구성됨

3) 계층별 kappa 계산

#@title 3. Fleiss-kappa 계산 (다중 계층 통합 방식 적용)
from statsmodels.stats.inter_rater import fleiss_kappa
import numpy as np
import pandas as pd

def calculate_multi_level_kappa(df_source, group_cols, id_col, metric_columns):
    final_results = []

    # 그룹 컬럼 인식 (인덱스 0: Category, 인덱스 1: Sub-Category)
    cat_col = group_cols[0] if len(group_cols) > 0 else None
    subcat_col = group_cols[1] if len(group_cols) > 1 else None

    # --- 내부 헬퍼 함수: 특정 데이터 집합(Subset)에 대한 단일 Kappa 계산 ---
    def compute_kappa_for_subset(df_subset):
        group_freq_matrix = []
        for id_val, id_group in df_subset.groupby(id_col):
            for col in metric_columns:
                # 결측치만 제거하고 이상여부와 상관없이 모든 점수 반영
                vs = id_group[col].dropna()

                if not vs.empty:
                    counts = vs.value_counts()
                    group_freq_matrix.append([int(counts.get(i, 0)) for i in [1, 2, 3, 4, 5]])

        freq_array = np.array(group_freq_matrix)

        try:
            if len(freq_array) == 0:
                return np.nan, 0, 0
            elif np.all(freq_array == freq_array[0, :]):
                return 1.0, len(freq_array), len(freq_array)
            else:
                row_sums = freq_array.sum(axis=1)
                target_raters = np.max(row_sums)
                valid_freq_array = freq_array[row_sums == target_raters]
                valid_rows = len(valid_freq_array)

                if valid_rows > 0:
                    return fleiss_kappa(valid_freq_array), len(freq_array), valid_rows
                else:
                    return np.nan, len(freq_array), 0
        except:
            return np.nan, len(freq_array), 0

    # 1. 전체 통합 Kappa 연산
    print("1/4. 전체 통합 Kappa 계산 중...")
    k, total, valid = compute_kappa_for_subset(df_source)
    final_results.append({'계산_수준': '1. 전체 통합', 'Kappa': k, '총_추출_문항행수': total, '유효_연산_문항행수': valid})

    # 2. Category 기준 통합 Kappa 연산
    if cat_col and cat_col in df_source.columns:
        print(f"2/4. Category({cat_col}) 기준 통합 Kappa 계산 중...")
        for val, grp in df_source.groupby(cat_col):
            k, total, valid = compute_kappa_for_subset(grp)
            final_results.append({'계산_수준': '2. Category 통합', cat_col: val, 'Kappa': k, '총_추출_문항행수': total, '유효_연산_문항행수': valid})

    # 3. Sub-Category 기준 통합 Kappa 연산
    if cat_col and subcat_col and subcat_col in df_source.columns:
        print(f"3/4. Sub-Category({subcat_col}) 기준 통합 Kappa 계산 중...")
        for (c_val, s_val), grp in df_source.groupby([cat_col, subcat_col]):
            k, total, valid = compute_kappa_for_subset(grp)
            final_results.append({'계산_수준': '3. Sub-Category 통합', cat_col: c_val, subcat_col: s_val, 'Kappa': k, '총_추출_문항행수': total, '유효_연산_문항행수': valid})

    # 4. 개별 문항(ID)별 Kappa 연산
    print("4/4. 개별 문항(ID)별 Kappa 계산 중...")
    for id_val, grp in df_source.groupby(id_col):
        k, total, valid = compute_kappa_for_subset(grp)
        res = {'계산_수준': '4. 개별 문항별', id_col: id_val, 'Kappa': k, '총_추출_문항행수': total, '유효_연산_문항행수': valid}
        if cat_col and cat_col in grp.columns: res[cat_col] = grp[cat_col].iloc[0]
        if subcat_col and subcat_col in grp.columns: res[subcat_col] = grp[subcat_col].iloc[0]
        final_results.append(res)

    # 결과 데이터 프레임 구성 및 열 순서 재정렬
    result_df = pd.DataFrame(final_results)

    ordered_cols = ['계산_수준']
    if cat_col: ordered_cols.append(cat_col)
    if subcat_col: ordered_cols.append(subcat_col)
    ordered_cols.extend([id_col, 'Kappa', '기준미달_여부', '총_추출_문항행수', '유효_연산_문항행수'])

    final_cols = [c for c in ordered_cols if c in result_df.columns]
    result_df = result_df[final_cols]

    result_df['기준미달_여부'] = result_df['Kappa'].apply(lambda x: '미달 (<=0.2)' if pd.notna(x) and x <= 0.2 else '')

    print("모든 수준의 Kappa 연산이 완료되었습니다.")
    return result_df

if 'df_raw' in locals() and not df_raw.empty and 'metric_vars' in locals():
    df_kappa = calculate_multi_level_kappa(df_raw, GROUP_COLUMNS, ID_COLUMN, metric_vars)
else:
    print("먼저 1단계와 2단계를 완료하여 데이터를 로드해야 합니다.")

특정 집단 kappa 연산

    # --- 내부 헬퍼 함수: 특정 데이터 집합(Subset)에 대한 단일 Kappa 계산 ---
    def compute_kappa_for_subset(df_subset):
        group_freq_matrix = []
        for id_val, id_group in df_subset.groupby(id_col):
            for col in metric_columns:
                # 결측치만 제거하고 이상여부와 상관없이 모든 점수 반영
                vs = id_group[col].dropna()

                if not vs.empty:
                    counts = vs.value_counts()
                    group_freq_matrix.append([int(counts.get(i, 0)) for i in [1, 2, 3, 4, 5]])

        freq_array = np.array(group_freq_matrix)

        try:
            if len(freq_array) == 0:
                return np.nan, 0, 0
            elif np.all(freq_array == freq_array[0, :]):
                return 1.0, len(freq_array), len(freq_array)
            else:
                row_sums = freq_array.sum(axis=1)
                target_raters = np.max(row_sums)
                valid_freq_array = freq_array[row_sums == target_raters]
                valid_rows = len(valid_freq_array)

                if valid_rows > 0:
                    return fleiss_kappa(valid_freq_array), len(freq_array), valid_rows
                else:
                    return np.nan, len(freq_array), 0
        except:
            return np.nan, len(freq_array), 0
  • 데이터를 ID를 기준으로 묶고, 하나로 묶인 평가 결과에 대해 여러 지표를 순회하며 빈도를 계산
    • counts = vs.value_counts()를 통해 1~5점 각각의 점수가 등장한 빈도가 계산됨
  • freq_array = np.array(group_freq_matrix)를 통해 기존에 리스트 형태로 저장된 점수별 빈도 데이터를 행렬로 변환
    • [ 0,0,1,1,3 ], [ 1,0,0,2,2 ], ...] → (점수별 빈도가 저장된 2차원 행렬)
  • 계산할 데이터가 없거나, 전부 동일한 경우 예외 처리
    • if len(freq_array) == 0: return np.nan, 0, 0 계산할 데이터가 없어 0을 반환
    • elif np.all(freq_array == freq_array[0, :]): return 1.0, len(freq_array), len(freq_array) 모든 분항에 대해 똑같은 점수를 준 경우 1을 반환(만점 처리)
  • 메인 로직 계산시 target_raters = np.max(row_sums) 특정 행의 전체 빈도값의 합 중 최댓값을 기준으로 평가자 수를 설정
    • 모든 문항이 동일한 평가자 수를 가진다는 것을 전제로 하므로 이를 바탕으로 계산에 사용할 행을 필터링하는 기준으로 해당 행의 답변 수의 합이 평가자 수와 동일한지 확인
  • 계산된 fleiss_kappa와 계산에 사용된 정보를 반환

계층적 kappa 계산

    # 1. 전체 통합 Kappa 연산
    print("1/4. 전체 통합 Kappa 계산 중...")
    k, total, valid = compute_kappa_for_subset(df_source)
    final_results.append({'계산_수준': '1. 전체 통합', 'Kappa': k, '총_추출_문항행수': total, '유효_연산_문항행수': valid})

    # 2. Category 기준 통합 Kappa 연산
    if cat_col and cat_col in df_source.columns:
        print(f"2/4. Category({cat_col}) 기준 통합 Kappa 계산 중...")
        for val, grp in df_source.groupby(cat_col):
            k, total, valid = compute_kappa_for_subset(grp)
            final_results.append({'계산_수준': '2. Category 통합', cat_col: val, 'Kappa': k, '총_추출_문항행수': total, '유효_연산_문항행수': valid})

    # 3. Sub-Category 기준 통합 Kappa 연산
    if cat_col and subcat_col and subcat_col in df_source.columns:
        print(f"3/4. Sub-Category({subcat_col}) 기준 통합 Kappa 계산 중...")
        for (c_val, s_val), grp in df_source.groupby([cat_col, subcat_col]):
            k, total, valid = compute_kappa_for_subset(grp)
            final_results.append({'계산_수준': '3. Sub-Category 통합', cat_col: c_val, subcat_col: s_val, 'Kappa': k, '총_추출_문항행수': total, '유효_연산_문항행수': valid})

    # 4. 개별 문항(ID)별 Kappa 연산
    print("4/4. 개별 문항(ID)별 Kappa 계산 중...")
    for id_val, grp in df_source.groupby(id_col):
        k, total, valid = compute_kappa_for_subset(grp)
        res = {'계산_수준': '4. 개별 문항별', id_col: id_val, 'Kappa': k, '총_추출_문항행수': total, '유효_연산_문항행수': valid}
        if cat_col and cat_col in grp.columns: res[cat_col] = grp[cat_col].iloc[0]
        if subcat_col and subcat_col in grp.columns: res[subcat_col] = grp[subcat_col].iloc[0]
        final_results.append(res)
  • 평가 플랫폼에서 제공하는 kappa 값과 동일한 값을 도출하기 위해서는 영역, 세부영역, 문항 등 계층별로 kappa를 계산해야 했으므로 앞선 부분 집단 kappa 계산을 여러 단계로 적용해 계층적 kappa 계산을 실시
  • 데이터를 영역, 세부영역 등 기준 단위로 분할하여 순회
    • 같은 구분명을 가진 단위로 데이터를 그룹화
      예시) 기기_제어라는 영역을 가진 답변을 하나로 묶음
    • 그룹화한 답변 내용을 특정 집단 kappa 계산 함수에 입력해 해당 집단의 kappa를 계산

4) 이상치 확인

#@title 4. 이상치 연산, 통계 보고서 생성 및 모델 오류 목록 취합
if 'df_final' in locals() and not df_final.empty:
    print("이상치 연산 및 모델 오류 목록 취합 시작...")
    outlier_df = df_final.copy()

    # 1) 순수 수치 지표 추출 (script_itn 제외)
    exclude_keys = ['모델', '마스킹_모델명', '응답', '비고', '모델 오류', '오류', '이상여부', '사유', '문항 번호', 'script_itn']
    pure_metrics = [c for c in df_final.columns if c not in essential_cols and c not in exclude_keys]

    outlier_groups = set() # 이상치가 발생한 (ID, 모델) 그룹 저장용

    # 2) 통계적 이상치(Outlier) 계산: 최댓값 - 최솟값 >= 3
    for metric in pure_metrics:
        if metric not in outlier_df.columns: continue

        # 그룹별 max, min 계산
        grouped = outlier_df.groupby([ID_COLUMN, '모델'])[metric]
        metric_max = grouped.transform('max')
        metric_min = grouped.transform('min')

        # 판정 기준 적용
        is_out_group = (metric_max - metric_min) >= 3
        outlier_df[f'{metric}_이상치판정'] = is_out_group

        # 이상치 그룹 수집
        outlier_rows = outlier_df[is_out_group]
        for _, row in outlier_rows.iterrows():
            outlier_groups.add((row[ID_COLUMN], row['모델']))

    outlier_masks = [outlier_df[f'{m}_이상치판정'] for m in pure_metrics if f'{m}_이상치판정' in outlier_df.columns]
    outlier_df['총_이상치_발생수'] = pd.concat(outlier_masks, axis=1).sum(axis=1) if outlier_masks else 0

    # 3) 통계 보고서 생성
    analysis_dims = list(dict.fromkeys([RATER_COLUMN, ID_COLUMN, '모델'] + GROUP_COLUMNS))
    summary_list = []
    for dim in analysis_dims:
        if dim in outlier_df.columns:
            summary = outlier_df.groupby(dim).agg(총_평가=(ID_COLUMN, 'count'), 이상치_합=('총_이상치_발생수', 'sum')).reset_index()
            summary.columns = ['분석대상', '총_평가_건수', '이상치_발생_총합']
            summary.insert(0, '분석차원', dim)
            summary_list.append(summary)
    outlier_summary = pd.concat(summary_list, ignore_index=True)

    # 4) 모델 오류 목록 취합 (script_itn 위치 지정)
    error_col = next((c for c in df_final.columns if any(k in c for k in ['이상여부', '오류'])), None)
    df_error_list = pd.DataFrame()

    if error_col:
        is_error = outlier_df[error_col].map(lambda x: str(x).lower() in ['true', '1', '1.0', 't'])
        df_filtered = outlier_df[is_error].copy()

        # 열 순서 재배치: 세부영역, id, 문항 번호, script_itn, 테스터, 마스킹 모델명, 비고, 실제 모델명, 응답
        ordered_cols = []
        ordered_cols.extend([c for c in GROUP_COLUMNS if c in df_filtered.columns])
        if ID_COLUMN in df_filtered.columns: ordered_cols.append(ID_COLUMN)
        if '문항 번호' in df_filtered.columns: ordered_cols.append('문항 번호')
        if 'script_itn' in df_filtered.columns: ordered_cols.append('script_itn') # [추가]
        if RATER_COLUMN in df_filtered.columns: ordered_cols.append(RATER_COLUMN)
        if '마스킹_모델명' in df_filtered.columns: ordered_cols.append('마스킹_모델명')
        if '비고' in df_filtered.columns: ordered_cols.append('비고')
        if '모델' in df_filtered.columns: ordered_cols.append('모델')
        if '응답' in df_filtered.columns: ordered_cols.append('응답')

        extra_cols = [error_col]
        for c in df_filtered.columns:
            if any(k in str(c) for k in ['사유']) and c not in ordered_cols and c not in extra_cols:
                extra_cols.append(c)

        df_error_list = df_filtered[[c for c in ordered_cols + extra_cols if c in df_filtered.columns]].copy()
        df_error_list.rename(columns={'모델': '실제 모델명'}, inplace=True)

    print("4단계 완료.")
else:
    print("먼저 이전 단계를 완료해야 합니다.")

이상치 확인 방법

  • 순수 평가 지표가 기록된 열만 골라냄(숫자가 기록된 영역만 확인하기 위함)
  • ID와 모델명 조합을 통해 단일 평가 대상을 구분해 해당 대상에 대한 평가를 그룹화
  • 그룹 내에서 grouped.transform('max'),grouped.transform('min')으로 최대, 최소 도출
  • 도출된 최대 metric_max, 최소 metric_min의 차가 threshold_input보다 크거나 같으면 이상치로 판단
    • metric_maxmetric_min은 그룹 내에서 각각 최대,최소 값을 뽑아 원본과 동일한 크기로 생성한 것임
    • 동일한 대상에 대해 평가자 1은 2점, 평가자 2는 5점을 부여한 상황은 정상적이지 않은 평가로 간주
  • 이상치를 포함한 문항 전체를 true로 표기할 수 있도록 is_out_group을 구성한 뒤, 전체 응답 목록에 (평가 지표명)_이상치판정열을 추가해 이상치 포함 여부를 기록

5) 이상치 목록 작성

#@title 5. 이상치 목록 작성
if 'outlier_df' in locals() and 'outlier_groups' in locals():
    print("이상치 목록 작성을 시작합니다...")
    df_outlier_list = pd.DataFrame()

    if outlier_groups:
        # 이상치 그룹에 해당하는 전체 행 필터링
        mask = outlier_df.apply(lambda x: (x[ID_COLUMN], x['모델']) in outlier_groups, axis=1)
        df_filtered = outlier_df[mask].copy()

        # 열 순서 정의 (세부영역, ID, 문항 번호, 작업자, 마스킹 정보, 실제 모델, 지표, 기타 응답)
        ordered_cols = []
        ordered_cols.extend([c for c in GROUP_COLUMNS if c in df_filtered.columns])
        if ID_COLUMN in df_filtered.columns: ordered_cols.append(ID_COLUMN)
        if '문항 번호' in df_filtered.columns: ordered_cols.append('문항 번호')
        if RATER_COLUMN in df_filtered.columns: ordered_cols.append(RATER_COLUMN)
        if '마스킹_모델명' in df_filtered.columns: ordered_cols.append('마스킹_모델명')
        if '모델' in df_filtered.columns: ordered_cols.append('모델')

        ordered_cols.extend(pure_metrics)

        extra_cols = []
        for c in df_filtered.columns:
            if any(k in str(c) for k in ['응답', '비고', '사유', '이상여부', '오류']) and c not in ordered_cols:
                extra_cols.append(c)

        final_target_cols = [c for c in ordered_cols + extra_cols if c in df_filtered.columns]
        df_outlier_list = df_filtered[final_target_cols].copy()

        # 정렬 및 명칭 변경
        df_outlier_list.sort_values(by=[ID_COLUMN, '모델', RATER_COLUMN], inplace=True)
        df_outlier_list.rename(columns={'모델': '실제 모델명'}, inplace=True)
        # 6단계 셀 서식 매핑을 위해 인덱스 초기화 필수
        df_outlier_list.reset_index(drop=True, inplace=True)

        print(f"이상치 목록 {len(df_outlier_list)}건 취합 완료 (총 {len(outlier_groups)}개 문항:모델 그룹).")
    else:
        print("이상치 조건(최댓값-최솟값 >= 3)을 만족하는 항목이 없습니다.")

    print("5단계 완료.")
else:
    print("4단계가 정상적으로 실행되지 않았습니다.")
  • 앞선 과정에서 확인한 이상치를 따로 확인할 수 있도록 별도의 목록을 작성

6) 서식 적용 및 최종 시트 저장

#@title 6. 최종 구글 시트 저장
from datetime import datetime

required_vars = ['outlier_df', 'outlier_summary', 'df_kappa', 'df_error_list', 'df_outlier_list']
missing_vars = [var for var in required_vars if var not in globals()]

if not missing_vars:
    current_time = datetime.now().strftime("%Y%m%d_%H%M%S")

    # [수정] 사용자가 입력한 접두사 가져오기 및 시트명 적용
    prefix = prefix_input.value.strip() if 'prefix_input' in globals() else ""
    if prefix and not prefix.endswith('_'):
        prefix += " " # 자연스러운 연결을 위해 스페이싱 자동 추가

    sheet_title = f"{prefix}평가로그_통합_결과_{current_time}"
    print(f"'{sheet_title}' 시트 생성 및 데이터 기록 중...")

    spreadsheet = gc.create(sheet_title)

    # 공통 서식 설정
    red_fmt = cellFormat(backgroundColor=color(1, 0.78, 0.8), textFormat=textFormat(bold=True, foregroundColor=color(0.61, 0, 0.02)))
    max_fmt = cellFormat(backgroundColor=color(0.8, 0.9, 1.0), textFormat=textFormat(bold=True, foregroundColor=color(0.0, 0.2, 0.8)))
    gray_bg_fmt = cellFormat(backgroundColor=color(0.95, 0.95, 0.95))

    # [시트 1] 평가 결과 취합
    ws_main = spreadsheet.sheet1
    ws_main.update_title("평가_결과_취합")
    set_with_dataframe(ws_main, outlier_df)

    main_reqs = []
    for i, col in enumerate(outlier_df.columns):
        if f'{col}_이상치판정' in outlier_df.columns:
            for r_idx, is_out in enumerate(outlier_df[f'{col}_이상치판정']):
                if is_out:
                    main_reqs.append({"repeatCell": {"range": {"sheetId": ws_main.id, "startRowIndex": r_idx+1, "endRowIndex": r_idx+2, "startColumnIndex": i, "endColumnIndex": i+1}, "cell": {"userEnteredFormat": red_fmt.to_props()}, "fields": "userEnteredFormat"}})
    if main_reqs: spreadsheet.batch_update({"requests": main_reqs})

    # [시트 2] Kappa 결과
    ws_kappa = spreadsheet.add_worksheet(title="Kappa_계산결과", rows="1000", cols="20")
    set_with_dataframe(ws_kappa, df_kappa)

    # [시트 3] 이상치 통계
    ws_summary = spreadsheet.add_worksheet(title="이상치_통계보고서", rows="1000", cols="20")
    set_with_dataframe(ws_summary, outlier_summary)

    # [시트 4] 모델 오류 목록
    ws_error = spreadsheet.add_worksheet(title="모델오류_목록", rows="1000", cols="20")
    set_with_dataframe(ws_error, df_error_list)

    # [시트 5] 이상치 목록
    if not df_outlier_list.empty:
        ws_outlier = spreadsheet.add_worksheet(title="이상치_목록", rows=str(len(df_outlier_list)+100), cols="30")
        set_with_dataframe(ws_outlier, df_outlier_list)

        outlier_reqs = []

        # 1) 문항 그룹 교차 배경색
        current_group = None
        is_gray = False

        for row_idx, row in df_outlier_list.iterrows():
            group_key = (row[ID_COLUMN], row['실제 모델명'])
            if group_key != current_group:
                current_group = group_key
                is_gray = not is_gray

            if is_gray:
                outlier_reqs.append({
                    "repeatCell": {
                        "range": {
                            "sheetId": ws_outlier.id,
                            "startRowIndex": row_idx + 1, "endRowIndex": row_idx + 2,
                            "startColumnIndex": 0, "endColumnIndex": len(df_outlier_list.columns)
                        },
                        "cell": {"userEnteredFormat": gray_bg_fmt.to_props()},
                        "fields": "userEnteredFormat.backgroundColor"
                    }
                })

        # 2) 최댓값/최솟값 하이라이트
        grouped = df_outlier_list.groupby([ID_COLUMN, '실제 모델명'])
        for metric in pure_metrics:
            if metric not in df_outlier_list.columns: continue
            col_idx = df_outlier_list.columns.get_loc(metric)

            for _, group_df in grouped:
                metric_series = group_df[metric].dropna()
                if metric_series.empty: continue

                g_max = metric_series.max()
                g_min = metric_series.min()

                if (g_max - g_min) >= 3:
                    for row_idx, val in metric_series.items():
                        if val == g_max:
                            outlier_reqs.append({"repeatCell": {"range": {"sheetId": ws_outlier.id, "startRowIndex": row_idx+1, "endRowIndex": row_idx+2, "startColumnIndex": col_idx, "endColumnIndex": col_idx+1}, "cell": {"userEnteredFormat": max_fmt.to_props()}, "fields": "userEnteredFormat"}})
                        elif val == g_min:
                            outlier_reqs.append({"repeatCell": {"range": {"sheetId": ws_outlier.id, "startRowIndex": row_idx+1, "endRowIndex": row_idx+2, "startColumnIndex": col_idx, "endColumnIndex": col_idx+1}, "cell": {"userEnteredFormat": red_fmt.to_props()}, "fields": "userEnteredFormat"}})

        if outlier_reqs:
            spreadsheet.batch_update({"requests": outlier_reqs})
    else:
        ws_outlier = spreadsheet.add_worksheet(title="이상치_목록", rows="10", cols="10")

    print(f"\n작업 완료! 구글 시트 URL: {spreadsheet.url}")
else:
    print(f"오류: 데이터 메모리 누락 -> {missing_vars}")
  • 앞선 과정에서 구성한 데이터를 취합해 구글 시트로 구성
  • 가시성 강화를 위해 색상 변경 및 서식 적용

최댓값/최솟값 하이라이트

        # 2) 최댓값/최솟값 하이라이트
        grouped = df_outlier_list.groupby([ID_COLUMN, '실제 모델명'])
        for metric in pure_metrics:
            if metric not in df_outlier_list.columns: continue
            col_idx = df_outlier_list.columns.get_loc(metric)

            for _, group_df in grouped:
                metric_series = group_df[metric].dropna()
                if metric_series.empty: continue

                g_max = metric_series.max()
                g_min = metric_series.min()

                if (g_max - g_min) >= 3:
                    for row_idx, val in metric_series.items():
                        if val == g_max:
                            outlier_reqs.append({"repeatCell": {"range": {"sheetId": ws_outlier.id, "startRowIndex": row_idx+1, "endRowIndex": row_idx+2, "startColumnIndex": col_idx, "endColumnIndex": col_idx+1}, "cell": {"userEnteredFormat": max_fmt.to_props()}, "fields": "userEnteredFormat"}})
                        elif val == g_min:
                            outlier_reqs.append({"repeatCell": {"range": {"sheetId": ws_outlier.id, "startRowIndex": row_idx+1, "endRowIndex": row_idx+2, "startColumnIndex": col_idx, "endColumnIndex": col_idx+1}, "cell": {"userEnteredFormat": red_fmt.to_props()}, "fields": "userEnteredFormat"}})
  • 앞서 이상치 검색할 때처럼 Id-모델을 단위로 그룹화해 최댓값, 최솟값을 확인할 집단을 지정
  • g_maxg_min가 최댓값과 최솟값에 해당하는 셀로, 해당 셀을 outlier_reqs에 추가해 서식 적용 목록을 구성 (batch 단위로 업데이트)
  • outlier_reqs의 셀들을 최댓값과 최솟값에 따라 정해진 서식으로 변경

3. 평가 로그 분석 구글 시트

  1. 평가 로그 입력
  2. 전처리 및 지표 계산
  3. 통합 분석 구글 시트 생성
  • 해당 기능을 통해 간편하게 평가 로그를 분석하고, 문제점을 확인할 수 있었음

0개의 댓글