
MLflow로 ML 실험을 체계적으로 기록하고 관리하는 방법
import mlflow
import mlflow.sklearn
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import f1_score, accuracy_score
import pandas as pd
# 데이터 준비
CATALOG = "3dt016_databricks"
SCHEMA = "wine"
wine_df = spark.table(f"{CATALOG}.{SCHEMA}.wine_quality_lab").toPandas()
X = wine_df.drop(["is_good_quality", "quality"], axis=1, errors="ignore")
y = wine_df["is_good_quality"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
print(f"학습: {len(X_train)}개, 테스트: {len(X_test)}개")
# 자동 로깅 활성화
mlflow.autolog()
# 실험 이름 설정
mlflow.set_experiment("/Users/" + spark.sql("SELECT current_user()").first()[0] + "/wine_quality_mlflow_lab")
# 모델 학습 — autolog이 자동으로 모든 것을 기록합니다!
with mlflow.start_run(run_name="autolog_rf_baseline"):
rf = RandomForestClassifier(n_estimators=100, max_depth=5, random_state=42)
rf.fit(X_train, y_train)
y_pred = rf.predict(X_test)
print(f"F1 Score: {f1_score(y_test, y_pred):.4f}")
print(f"Accuracy: {accuracy_score(y_test, y_pred):.4f}")

💡 Experiments UI

from sklearn.ensemble import GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
models = {
"rf_shallow": RandomForestClassifier(n_estimators=50, max_depth=3, random_state=42),
"rf_deep": RandomForestClassifier(n_estimators=200, max_depth=10, random_state=42),
"gb_default": GradientBoostingClassifier(n_estimators=100, random_state=42),
"lr_baseline": LogisticRegression(max_iter=1000, random_state=42),
}
for name, model in models.items():
with mlflow.start_run(run_name=name):
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
print(f"{name}: F1={f1_score(y_test, y_pred):.4f}")




import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, roc_curve, auc
import numpy as np
mlflow.autolog(disable=True) # 커스텀 로깅을 위해 자동 로깅 끄기
with mlflow.start_run(run_name="custom_logging_demo"):
# 1️⃣ 커스텀 파라미터 로깅
params = {
"n_estimators": 150,
"max_depth": 7,
"min_samples_split": 5,
"data_version": "v1.0", # 데이터 버전도 추적!
"feature_count": X_train.shape[1],
}
mlflow.log_params(params)
# 2️⃣ 모델 학습
rf = RandomForestClassifier(**{k: v for k, v in params.items()
if k in ["n_estimators", "max_depth", "min_samples_split"]},
random_state=42)
rf.fit(X_train, y_train)
y_pred = rf.predict(X_test)
y_prob = rf.predict_proba(X_test)[:, 1]
# 3️⃣ 커스텀 메트릭 로깅
metrics = {
"f1_score": f1_score(y_test, y_pred),
"accuracy": accuracy_score(y_test, y_pred),
"positive_ratio": y_test.mean(),
}
mlflow.log_metrics(metrics)
# 4️⃣ 아티팩트 로깅 — 혼동행렬
fig, ax = plt.subplots(figsize=(6, 5))
cm = confusion_matrix(y_test, y_pred)
disp = ConfusionMatrixDisplay(cm, display_labels=["보통", "좋음"])
disp.plot(ax=ax, cmap="Blues")
ax.set_title("Wine Quality Confusion Matrix")
fig.savefig("/tmp/confusion_matrix.png", dpi=150, bbox_inches="tight")
mlflow.log_artifact("/tmp/confusion_matrix.png")
plt.close()
# 5️⃣ 아티팩트 로깅 — ROC 커브
fpr, tpr, _ = roc_curve(y_test, y_prob)
roc_auc = auc(fpr, tpr)
fig, ax = plt.subplots(figsize=(6, 5))
ax.plot(fpr, tpr, color='#FF3621', lw=2, label=f'AUC = {roc_auc:.3f}')
ax.plot([0, 1], [0, 1], 'k--', lw=1)
ax.set_xlabel('False Positive Rate')
ax.set_ylabel('True Positive Rate')
ax.set_title('ROC Curve')
ax.legend()
fig.savefig("/tmp/roc_curve.png", dpi=150, bbox_inches="tight")
mlflow.log_artifact("/tmp/roc_curve.png")
plt.close()
# 6️⃣ 태그로 실험 분류
mlflow.set_tags({
"team": "data_school_3기",
"task": "wine_quality_classification",
"stage": "experiment",
})
# 7️⃣ 모델 로깅
mlflow.sklearn.log_model(rf, "model")
print(f"✅ 커스텀 로깅 완료!")
print(f" F1: {metrics['f1_score']:.4f}")
print(f" AUC: {roc_auc:.4f}")

모델의 "사용 설명서"를 함께 기록
from mlflow.models import infer_signature
mlflow.autolog(disable=True)
with mlflow.start_run(run_name="model_with_signature"):
rf = RandomForestClassifier(n_estimators=150, max_depth=7, random_state=42)
rf.fit(X_train, y_train)
y_pred = rf.predict(X_test)
# 시그니처 자동 추론: 입력/출력 스키마를 모델에 기록
signature = infer_signature(X_train, y_pred)
# 입력 예제: 모델 테스트에 사용할 샘플 데이터
input_example = X_test.head(3)
mlflow.sklearn.log_model(
rf,
"model",
signature=signature,
input_example=input_example,
)
print("✅ 시그니처 포함 모델 로깅 완료!")
print(f"\n📋 시그니처 정보:")
print(signature)

# 저장된 모델 로드 & 예측 테스트
run_id = mlflow.last_active_run().info.run_id
loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model")
# 시그니처 확인
print("모델 시그니처:", loaded_model.metadata.signature)
# 예측 테스트
sample = X_test.head(5)
predictions = loaded_model.predict(sample)
print(f"\n예측 결과: {predictions}")

시그니처: 모델 입출력 스키마
| 기능 | 코드 | 용도 |
|---|---|---|
| 자동 로깅 | mlflow.autolog() | 빠른 실험, 기본 추적 |
| 파라미터 | mlflow.log_param(key, value) | 하이퍼파라미터, 설정값 |
| 메트릭 | mlflow.log_metric(key, value) | 성능 수치 |
| 아티팩트 | mlflow.log_artifact(path) | 차트, 데이터 파일 |
| 태그 | mlflow.set_tag(key, value) | 실험 분류, 메타데이터 |
| 시그니처 | infer_signature(X, y) | 모델 입출력 스키마 |
Feature Store 를 사용하면 피처 재사용 + 학습-서빙 일관성 + 리니지 추적이 자동
- Feature Store 에 피처 테이블을 생성하고 중앙에서 관리하는 방법
- FeatureLookup 으로 학습 데이터셋을 자동 구성하는 방법
- fe.log_model()로 Feature Store 연동 모델을 로깅하는 방법
- fe.score_batch()와 Spark UDF 로 대규모 배치 추론을 수행하는 방법
머신러닝에서 피처(Feature)란 모델이 예측을 위해 사용하는 입력 변수
free_sulfur_ratio = free_SO2 / total_SO2→ Feature Store을 사용하면 문제 해결. 피처를 중앙 저장소에 한 번 정의하고, 학습과 서빙 모두에서 같은 피처를 사용하게 만듦
학습 시에는 df['a'] / df['b']로 계산하고, 서빙 시에는 조금 다른 로직으로 계산하면 모델 성능이 저하
→ Feature Store 의 해결책: 피처 계산 로직을 한 번만 정의하고, 학습(create_training_set)과 추론(score_batch) 모두 같은 Feature Store 에서 피처를 가져옴
Databricks 의 Feature Store 는 Unity Catalog 에 통합되어 있음
피처 테이블은 일반 Delta 테이블과 동일하지만, 추가로 Primary Key 와 메타데이터가 등록되어 피처 조회/리니지 추적이 가능
| 구성요소 | 설명 |
|---|---|
| FeatureEngineeringClient | Feature Store와 상호작용하는 클라이언트 |
| fe.create_table() | 피처 테이블을 생성하고 등록 |
| FeatureLookup | Primary Key로 피처를 자동 조회하는 설정 |
| fe.create_training_set() | 라벨 DF + 피처 자동 JOIN으로 학습셋 생성 |
| fe.log_model() | 모델 + Feature Store 연동 정보를 함께 로깅 |
| fe.score_batch() | ID만 주면 피처 자동 조회 + 예측 |

from pyspark.sql import functions as F
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
CATALOG = "3dt016_databricks"
SCHEMA = "wine"
# 원본 데이터 로드
wine_raw = spark.table(f"{CATALOG}.{SCHEMA}.wine_quality_lab")
# 고유 키(primary key) 추가 — Feature Store 필수!
wine_with_id = wine_raw.withColumn("wine_id", F.monotonically_increasing_id())
display(wine_with_id.limit(5))
# 피처 엔지니어링: 새로운 피처 생성
wine_features = wine_with_id.select(
"wine_id",
"fixed_acidity",
"volatile_acidity",
"citric_acid",
"residual_sugar",
"chlorides",
"free_sulfur_dioxide",
"total_sulfur_dioxide",
"density",
"pH",
"sulphates",
"alcohol",
# 파생 피처들
(F.col("free_sulfur_dioxide") / F.col("total_sulfur_dioxide")).alias("free_sulfur_ratio"),
(F.col("fixed_acidity") * F.col("alcohol")).alias("acidity_alcohol_interaction"),
# NaN 방지: alcohol이 0인 경우 대비
F.when(F.col("alcohol") != 0, F.col("residual_sugar") / F.col("alcohol"))
.otherwise(0.0).alias("sugar_alcohol_ratio"),
).fillna(0) # NaN을 0으로 채움
display(wine_features.limit(5))
# Feature Store 테이블 생성
FEATURE_TABLE = f"{CATALOG}.{SCHEMA}.wine_features_lab"
try:
fe.create_table(
name=FEATURE_TABLE,
primary_keys=["wine_id"],
df=wine_features,
description="와인 품질 예측을 위한 피처 테이블 (3기 실습)",
)
print(f"✅ Feature Store 테이블 생성 완료: {FEATURE_TABLE}")
except Exception as e:
if "already exists" in str(e).lower():
# 이미 존재하면 덮어쓰기
fe.write_table(name=FEATURE_TABLE, df=wine_features, mode="overwrite")
print(f"✅ Feature Store 테이블 업데이트 완료: {FEATURE_TABLE}")
else:
raise e



from databricks.feature_engineering import FeatureLookup
import mlflow
# 라벨 데이터 (wine_id + target만 포함)
labels_df = wine_with_id.select("wine_id", "is_good_quality")
# Feature Lookup 정의
feature_lookups = [
FeatureLookup(
table_name=FEATURE_TABLE,
lookup_key="wine_id",
)
]
# 학습 데이터셋 생성 (Feature Store에서 자동으로 피처를 조회!)
# exclude_columns: wine_id는 조회 키일 뿐, 학습 피처가 아님!
training_set = fe.create_training_set(
df=labels_df,
feature_lookups=feature_lookups,
label="is_good_quality",
exclude_columns=["wine_id"],
)
training_df = training_set.load_df()
display(training_df.limit(5))
# 모델 학습 + Feature Store 연동 로깅
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
from mlflow.models import infer_signature
mlflow.set_experiment("/Users/" + spark.sql("SELECT current_user()").first()[0] + "/wine_feature_store_lab")
# Pandas 변환
train_pdf = training_df.toPandas()
X = train_pdf.drop(["is_good_quality"], axis=1)
y = train_pdf["is_good_quality"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
with mlflow.start_run(run_name="feature_store_rf"):
rf = RandomForestClassifier(n_estimators=150, max_depth=7, random_state=42)
rf.fit(X_train, y_train)
y_pred = rf.predict(X_test)
f1 = f1_score(y_test, y_pred)
mlflow.log_metric("f1_score", f1)
# Feature Store 연동 모델 로깅
fe.log_model(
model=rf,
artifact_path="model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name=f"{CATALOG}.{SCHEMA}.wine_quality_model_lab",
)
print(f"✅ Feature Store 연동 모델 학습 완료! F1: {f1:.4f}")

# 새로운 데이터가 들어왔다고 가정
# (실제로는 같은 데이터를 사용하지만, 실무에서는 새로운 배치 데이터)
new_data = wine_with_id.select(F.col("wine_id").cast("long")).limit(100)
# 등록된 모델의 최신 버전 자동 조회
from mlflow import MlflowClient
client = MlflowClient(registry_uri="databricks-uc")
model_name = f"{CATALOG}.{SCHEMA}.wine_quality_model_lab"
versions = client.search_model_versions(f"name='{model_name}'")
if versions:
latest_version = max(int(v.version) for v in versions)
print(f"✅ 모델 '{model_name}' 최신 버전: {latest_version}")
else:
latest_version = 1
print(f"⚠️ 모델 버전을 찾을 수 없어 기본값 1 사용")
# Feature Store 기반 배치 추론
# wine_id만 있으면 자동으로 Feature Store에서 피처를 조회하여 예측!
try:
predictions = fe.score_batch(
model_uri=f"models:/{model_name}/{latest_version}",
df=new_data,
)
display(predictions.select("wine_id", "prediction"))
except Exception as e:
print(f"⚠️ score_batch 오류: {e}")
print("→ Feature Store 테이블과 모델을 처음부터 다시 생성해주세요 (Step 1부터 재실행)")
print("→ 또는 아래 Spark UDF 방식을 사용해주세요")

# sklearn 모델을 직접 Spark pandas_udf로 변환 — 분산 환경에서 수백만 건도 OK!
import pandas as pd
from pyspark.sql.types import IntegerType
# Step 3에서 학습한 rf 모델을 직접 사용 (Feature Store 래퍼 없이)
feature_cols = [c for c in X.columns]
@F.pandas_udf(IntegerType())
def predict_udf(*cols):
X_input = pd.concat(cols, axis=1)
X_input.columns = feature_cols
X_input = X_input.fillna(0)
return pd.Series(rf.predict(X_input))
# 전체 데이터에 대해 예측 수행
result_df = (
wine_features
.withColumn("prediction", predict_udf(*[F.col(c) for c in feature_cols]))
)
display(result_df.select("wine_id", "prediction").limit(10))

| 방식 | 장점 | 적합한 경우 |
|---|---|---|
| fe.score_batch() | ID만으로 자동 피처 조회 | 범용 배치 추론 |
| Spark UDF | Spark 클러스터 분산 처리 | 수백만 건 + 대용량 |
# 추론 결과를 Delta 테이블로 저장
result_df.select("wine_id", "prediction").write.mode("overwrite").saveAsTable(
f"{CATALOG}.{SCHEMA}.wine_predictions_lab"
)
print(f"✅ 배치 추론 결과 저장 완료: {CATALOG}.{SCHEMA}.wine_predictions_lab")
print(f" 총 {result_df.count():,}건 예측 완료")
cosmos db 데이터파티셔닝 해시함수
→ 높은 카디널리티를 가질 때 해시펑션을 이용
Databricks에서 LLM을 저비용으로 활용하는 방법
| 방식 | 설명 | 용도 |
|---|---|---|
| Foundation Model API | Databricks가 호스팅하는 LLM을 API로 호출 | 개발/프로토타입 |
| External Models | 외부 API (OpenAI, Anthropic 등)를 통합 | 특정 모델 선호 시 |
| Model Serving | 커스텀 모델을 엔드포인트로 배포 | 파인튜닝 모델 서빙 |
CATALOG = "3dt016_databricks"
SCHEMA = "wine"
# LLM 모델 엔드포인트 (사용 가능한 모델로 변경)
LLM_MODEL = "databricks-meta-llama-3-3-70b-instruct"
print(f"📌 카탈로그: {CATALOG}.{SCHEMA}")
print(f"📌 LLM 모델: {LLM_MODEL}")

import mlflow.deployments
# Databricks Foundation Model API 클라이언트
client = mlflow.deployments.get_deploy_client("databricks")
# LLM에 질문하기
response = client.predict(
endpoint=LLM_MODEL,
inputs={
"messages": [
{"role": "system", "content": "You are a helpful data science assistant. Answer in Korean."},
{"role": "user", "content": "머신러닝에서 과적합(overfitting)이 무엇이고, 어떻게 방지하나요? 간단히 설명해주세요."}
],
"max_tokens": 256,
"temperature": 0.7,
}
)
print("🤖 LLM 응답:")
print(response['choices'][0]['message']['content'])

| 모델 | 특징 | 용도 |
|------|------|------|
| Llama 3.3 70B | Meta의 오픈소스 모델, 영어 최적화 | 범용 (이 실습에서 사용) |
| DBRX | Databricks 자체 개발 모델 | 범용 (비활성화 환경 있음) |
| Mixtral | Mistral AI의 MoE 모델 | 빠른 추론 |
| BGE Large EN | 임베딩 모델 | Vector Search 용 |
# Few-shot 프롬프트 예시: 와인 설명 생성
response = client.predict(
endpoint=LLM_MODEL,
inputs={
"messages": [
{"role": "system", "content": """You are a sommelier. Given wine chemical properties, write a tasting note.
Examples:
- Alcohol 12.5%, Residual sugar 5g → "Medium-bodied dry wine. Clean finish."
- Alcohol 9.0%, Residual sugar 45g → "Light-bodied sweet wine. Rich fruit aroma."
"""},
{"role": "user", "content": "Alcohol 11.8%, Residual sugar 1.2g, Acidity 0.32, pH 3.18 — Describe this wine."}
],
"max_tokens": 128,
}
)
print("🍷 Sommelier AI:")
print(response['choices'][0]['message']['content'])


기본 문법
SELECT
column_name,
ai_query(
'model-name', -- 사용할 LLM 모델
CONCAT('prompt: ', column_name) -- 프롬프트 + 데이터 컬럼
) AS result
FROM my_table
모델명, 프롬프트(CONCAT 으로 컬럼 결합), 테이블명.
# 샘플 고객 리뷰 데이터 생성
from pyspark.sql import Row
reviews = spark.createDataFrame([
Row(review_id=1, text="This wine is absolutely fantastic! Best purchase ever.", rating=5),
Row(review_id=2, text="Terrible. Cork taint ruined the entire bottle. Waste of money.", rating=1),
Row(review_id=3, text="Decent for the price. Nothing special but drinkable.", rating=3),
Row(review_id=4, text="Wonderful aroma and balanced flavors. Would buy again!", rating=5),
Row(review_id=5, text="Too acidic for my taste. Not recommended.", rating=2),
])
reviews.write.mode("overwrite").saveAsTable(f"{CATALOG}.{SCHEMA}.wine_reviews_lab")
print(f"✅ 리뷰 테이블 생성: {CATALOG}.{SCHEMA}.wine_reviews_lab")
# 💡 ai_query()로 감성 분석 — SQL 한 줄! (spark.sql로 변수 참조)
display(spark.sql(f"""
SELECT
review_id,
text,
rating,
ai_query(
'{LLM_MODEL}',
CONCAT('Classify the sentiment of this review as POSITIVE, NEGATIVE, or NEUTRAL. Reply with ONLY the classification word. Review: ', text)
) AS ai_sentiment
FROM {CATALOG}.{SCHEMA}.wine_reviews_lab
"""))

# 카테고리 자동 분류
display(spark.sql(f"""
SELECT
review_id,
text,
ai_query(
'{LLM_MODEL}',
CONCAT('Classify this wine review into ONE category: TASTE, AROMA, VALUE, QUALITY, OTHER. Reply with only the category. Review: ', text)
) AS category
FROM {CATALOG}.{SCHEMA}.wine_reviews_lab
"""))



| 구성요소 | 역할 | 비유 |
|---|---|---|
| Endpoint | 컴퓨팅 서버, 한 번 만들면 재사용 | 도서관 건물 |
| Index | 테이블의 텍스트를 벡터로 변환/저장 | 책 목록 |
| Query | 질문과 가장 유사한 문서 반환 | 관련 책 찾기 |
RAG(Retrieval-Augmented Generation)란?
[사용자 질문]
↓
[Vector Search] → 관련 문서 2~3개 검색
↓
[LLM] ← 질문 + 검색된 문서 (컨텍스트)
↓
[답변 생성]
# 소규모 와인 지식 문서 생성
wine_docs = spark.createDataFrame([
Row(doc_id=1, content="Chardonnay is a white wine grape variety with buttery, oaky characteristics. In cool climates it shows apple and citrus flavors, while warm climates bring tropical fruit notes."),
Row(doc_id=2, content="Cabernet Sauvignon is the flagship full-bodied red wine. It features blackcurrant, cedar, and vanilla aromas with high tannin and acidity."),
Row(doc_id=3, content="Wine acidity is crucial for freshness and balance. A pH of 3.0-3.4 is typical, with lower values indicating more tartness. The balance between acidity and residual sugar is key to taste."),
Row(doc_id=4, content="Wine pairing basics: Red wines pair with red meat, white wines with fish and chicken. Sweet wines go with desserts, but the wine should be sweeter than the food."),
Row(doc_id=5, content="In MLOps, model monitoring detects data drift and model performance degradation. Databricks Lakehouse Monitoring provides automatic drift detection using statistical tests."),
])
wine_docs.write.mode("overwrite").saveAsTable(f"{CATALOG}.{SCHEMA}.wine_knowledge_lab")
print(f"✅ 지식 문서 테이블 생성: {CATALOG}.{SCHEMA}.wine_knowledge_lab")
Vector Search의 Delta Sync 인덱스는 테이블 변경 추적이 필요
spark.sql(f"""
ALTER TABLE {CATALOG}.{SCHEMA}.wine_knowledge_lab
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
print("✅ Change Data Feed 활성화 완료")
from databricks.vector_search.client import VectorSearchClient
vsc = VectorSearchClient(disable_notice=True)
vsc.create_endpoint(name="my-endpoint", endpoint_type="STANDARD")
VS_ENDPOINT = "my-endpoint"
# 엔드포인트 상태 확인
try:
ep_info = vsc.get_endpoint(VS_ENDPOINT)
print(f"✅ 엔드포인트 '{VS_ENDPOINT}' 확인됨")
except Exception as e:
print(f"❌ 엔드포인트를 찾을 수 없습니다: {e}")
print("→ VS_ENDPOINT 이름을 확인하거나, 새로 생성해주세요.")
혹은 좌측 compute-vector search-endpoint 만들기

# 인덱스 생성
INDEX_NAME = f"{CATALOG}.{SCHEMA}.wine_knowledge_index"
try:
vsc.create_delta_sync_index(
endpoint_name=VS_ENDPOINT,
index_name=INDEX_NAME,
source_table_name=f"{CATALOG}.{SCHEMA}.wine_knowledge_lab",
pipeline_type="TRIGGERED",
primary_key="doc_id",
embedding_source_column="content",
embedding_model_endpoint_name="databricks-bge-large-en",
)
print(f"✅ Vector Search 인덱스 생성 시작: {INDEX_NAME}")
print("⏳ 인덱스 준비까지 5~10분 소요됩니다.")
except Exception as e:
if "already exists" in str(e).lower():
print(f"ℹ️ 인덱스가 이미 존재합니다: {INDEX_NAME}")
else:
print(f"⚠️ {e}")
# 인덱스 상태 확인 (ready: True 가 될 때까지 이 셀을 반복 실행)
index_info = vsc.get_index(VS_ENDPOINT, INDEX_NAME).describe()
status = index_info.get("status", {})
print(f"상태: {status.get('detailed_state', 'UNKNOWN')}")
print(f"준비 완료: {status.get('ready', False)}")
if not status.get("ready", False):
print("\n⏳ 아직 준비 중입니다. 1~2분 후 이 셀을 다시 실행해주세요.")
else:
print("\n✅ 인덱스가 준비되었습니다! 다음 셀로 진행하세요.")
# 유사 문서 검색 테스트
results = vsc.get_index(
endpoint_name=VS_ENDPOINT,
index_name=INDEX_NAME,
).similarity_search(
query_text="What wine pairs well with steak?",
columns=["doc_id", "content"],
num_results=2,
)
print("🔍 Search results:")
for doc in results.get("result", {}).get("data_array", []):
print(f" - [doc_id={doc[0]}] {doc[1][:100]}...")
RAG 함수는 3 단계로 동작:
할루시네이션 방지
def rag_answer(question):
"""Simple RAG: Search → Build context → LLM answer"""
# 1️⃣ Vector Search로 관련 문서 검색
search_results = vsc.get_index(VS_ENDPOINT, INDEX_NAME).similarity_search(
query_text=question,
columns=["content"],
num_results=2,
)
# 2️⃣ 검색된 문서를 컨텍스트로 구성
context_docs = search_results.get("result", {}).get("data_array", [])
context = "\n".join([doc[0] for doc in context_docs])
# 3️⃣ LLM에게 질문 + 컨텍스트 전달
response = client.predict(
endpoint=LLM_MODEL,
inputs={
"messages": [
{"role": "system", "content": f"""You are a wine expert. Answer the question based ONLY on the reference documents below.
Answer concisely. If the information is not in the documents, say "Information not found in documents."
Reference documents:
{context}"""},
{"role": "user", "content": question}
],
"max_tokens": 256,
}
)
return response['choices'][0]['message']['content']
# RAG 테스트
questions = [
"What are the characteristics of Chardonnay wine?",
"What wine pairs well with steak?",
"How does Databricks detect model drift?",
]
for q in questions:
print(f"\n❓ {q}")
print(f"💬 {rag_answer(q)}")
print("-" * 60)

참고 우리 팀 코드 스타일을 아는 AI 만들기: RAG와 Vector DB 활용기
목표: 사전학습된 Vision Transformer(ViT)로 이미지를 분류하고, Foundation Model API로 이미지 캡셔닝(설명 생성)을 수행
| 개념 | 설명 |
|---|---|
| Vision Transformer (ViT) | 이미지를 패치(조각)로 나눠 Transformer로 처리하는 모델 |
| Transfer Learning | 사전학습된 모델을 새 작업에 재활용하는 기법 |
| 이미지 캡셔닝 | 이미지를 보고 자연어 설명을 생성하는 작업 |
| 멀티모달 LLM | 텍스트 + 이미지를 동시에 이해하는 대규모 언어 모델 |

1. Classification (분류)
이미지 전체를 보고 해당 이미지가 어떤 카테고리에 속하는지 판별
특징: 이미지당 하나의 대표 클래스를 반환
예시: 사진을 보고 '자동차'인지 '사람'인지 분류.
2. Object Detection (객체 탐지/검출)
이미지 내의 객체를 분류하는 것과 동시에, 그 객체가 어디에 위치하는지 박스(Bounding Box)로 표시하여 위치를 파악하는 기술
구성: Classification(분류) + Localization(위치 파악).
특징: 한 이미지 내에 여러 객체가 있어도 각각의 위치와 종류를 탐지
활용: 자율주행 자동차(보행자, 신호등 탐지), 의료 영상 분석 등. 자율주행 자동차를 할거면 segmentation까지 필요
%pip install transformers torch torchvision datasets pillow --quiet
dbutils.library.restartPython()
# 공통 설정 — Python 재시작 후 이 셀부터 실행!
CATALOG = "3dt016_databricks"
SCHEMA = "classification"
print(f"📌 카탈로그: {CATALOG}")
print(f"📌 스키마: {SCHEMA}")
기존 이미지 분류는 CNN(합성곱 신경망)이 주류였지만, 2020년 Google이 발표한 ViT는 NLP에서 성공한 Transformer 구조를 이미지에 적용
장점: 대규모 데이터에서 CNN보다 뛰어난 성능, Transfer Learning에 탁월
from datasets import load_dataset
import matplotlib.pyplot as plt
from PIL import Image
# HuggingFace에서 음식 분류 데이터셋 로드
# Food-101에서 5개 클래스만 사용 (비용 절약)
import os
os.environ["HF_DATASETS_CACHE"] = "/tmp/hf_cache"
os.environ["HF_HOME"] = "/tmp/hf_home"
dataset = load_dataset(
"ethz/food101",
split="train",
trust_remote_code=True,
cache_dir="/tmp/hf_cache"
)
# 5개 음식 카테고리만 선택
target_foods = ["pizza", "sushi", "fried_rice", "ramen", "ice_cream"]
target_labels = {name: idx for idx, name in enumerate(dataset.features["label"].names) if name in target_foods}
print(f"✅ 선택한 음식 카테고리:")
for name, label_id in target_labels.items():
print(f" - {name} (label: {label_id})")
# 선택한 카테고리의 이미지만 필터링 (카테고리당 50장 = 총 250장)
import random
random.seed(42)
label_ids = set(target_labels.values())
filtered_data = {"image": [], "label": [], "label_name": []}
# 카테고리별로 이미지 수집
label_to_name = {v: k for k, v in target_labels.items()}
counts = {label_id: 0 for label_id in label_ids}
MAX_PER_CLASS = 50
for item in dataset:
label = item["label"]
if label in label_ids and counts[label] < MAX_PER_CLASS:
filtered_data["image"].append(item["image"])
filtered_data["label"].append(label)
filtered_data["label_name"].append(label_to_name[label])
counts[label] += 1
if all(c >= MAX_PER_CLASS for c in counts.values()):
break
print(f"\n✅ 총 {len(filtered_data['image'])}장 이미지 준비 완료!")
for name, label_id in target_labels.items():
print(f" {name}: {counts[label_id]}장")
# 📸 샘플 이미지 시각화
fig, axes = plt.subplots(1, 5, figsize=(20, 4))
for i, name in enumerate(target_foods):
# 해당 카테고리의 첫 번째 이미지
idx = filtered_data["label_name"].index(name)
axes[i].imshow(filtered_data["image"][idx])
axes[i].set_title(name, fontsize=14, fontweight="bold")
axes[i].axis("off")
plt.suptitle("Food Image Samples (5 Categories)", fontsize=16)
plt.tight_layout()
plt.show()

Google의 vit-base-patch16-224 모델을 사용. ImageNet(1000개 클래스)으로 사전학습된 모델
from transformers import ViTForImageClassification, ViTFeatureExtractor
import torch
import numpy as np
# 사전학습된 ViT 모델 로드 (ImageNet 1000 클래스)
model_name = "google/vit-base-patch16-224"
feature_extractor = ViTFeatureExtractor.from_pretrained(model_name)
model = ViTForImageClassification.from_pretrained(model_name)
model.eval() # 추론 모드
print(f"✅ ViT 모델 로드 완료: {model_name}")
print(f" - 모델 파라미터 수: {sum(p.numel() for p in model.parameters()):,}")
print(f" - 입력 크기: 224 x 224 픽셀")
print(f" - 출력 클래스: {model.config.num_labels}개 (ImageNet)")
def classify_image(image, top_k=3):
"""이미지를 ViT 모델로 분류하고 상위 k개 결과 반환"""
# 이미지 전처리 (리사이즈, 정규화)
inputs = feature_extractor(images=image, return_tensors="pt")
# 추론 (GPU 불필요)
with torch.no_grad():
outputs = model(**inputs)
probs = torch.softmax(outputs.logits, dim=-1)
# 상위 k개 결과
top_probs, top_indices = torch.topk(probs, top_k)
results = []
for prob, idx in zip(top_probs[0], top_indices[0]):
label = model.config.id2label[idx.item()]
results.append({"label": label, "confidence": prob.item()})
return results
# 각 카테고리별 첫 번째 이미지로 분류 테스트
print("🔍 이미지 분류 결과 (Top-3):\n")
for food_name in target_foods:
idx = filtered_data["label_name"].index(food_name)
image = filtered_data["image"][idx]
results = classify_image(image)
print(f"📷 [{food_name}]")
for r in results:
bar = "█" * int(r["confidence"] * 30)
print(f" {r['label']:30s} {r['confidence']:.1%} {bar}")
print()

이미지가 수천~수만 장이라면? Spark UDF로 분산 처리
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, FloatType, ArrayType
import io, base64
# 이미지를 base64로 인코딩하여 Spark DataFrame 생성
rows = []
for i in range(len(filtered_data["image"])):
img = filtered_data["image"][i]
# PIL 이미지 → base64 문자열
buf = io.BytesIO()
img.save(buf, format="JPEG")
img_b64 = base64.b64encode(buf.getvalue()).decode("utf-8")
rows.append((i, filtered_data["label_name"][i], img_b64))
image_df = spark.createDataFrame(rows, ["image_id", "true_label", "image_base64"])
print(f"✅ Spark DataFrame 생성: {image_df.count()}행")
display(image_df.select("image_id", "true_label").limit(10))

import mlflow
mlflow.set_experiment("/Users/" + spark.sql("SELECT current_user()").first()[0] + "/image_classification_lab")
# 전체 분류 수행 + 정확도 계산
correct = 0
total = len(filtered_data["image"])
# ImageNet 라벨 → 음식 이름 매핑 (근사 매핑)
food_mapping = {
"pizza": ["pizza", "pizza_pie"],
"sushi": ["sushi"],
"fried_rice": ["fried_rice"],
"ramen": ["ramen", "noodle"],
"ice_cream": ["ice_cream", "ice_lolly", "chocolate_sauce"]
}
with mlflow.start_run(run_name="vit_food_classification"):
mlflow.log_param("model_name", model_name)
mlflow.log_param("num_classes", len(target_foods))
mlflow.log_param("total_images", total)
predictions = []
for i in range(total):
image = filtered_data["image"][i]
true_label = filtered_data["label_name"][i]
result = classify_image(image, top_k=1)
pred_label = result[0]["label"]
pred_confidence = result[0]["confidence"]
# 매핑 체크 (ImageNet 라벨이 음식 이름과 유사한지)
is_correct = any(
mapped in pred_label.lower().replace(" ", "_")
for mapped in food_mapping.get(true_label, [])
)
if is_correct:
correct += 1
predictions.append({
"true": true_label,
"predicted": pred_label,
"confidence": pred_confidence,
"correct": is_correct
})
accuracy = correct / total
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("total_images", total)
mlflow.log_metric("correct_predictions", correct)
print(f"✅ ViT 이미지 분류 완료!")
print(f" 정확도: {accuracy:.1%} ({correct}/{total})")
print(f" * ImageNet 라벨 → 음식 이름 근사 매핑 기준")

이미지 캡셔닝이란?
이미지를 보고 자연어로 설명을 생성하는 작업