데이터브릭스는 빅데이터 처리와 분석을 위한 강력한 플랫폼으로, 기업들이 데이터를 효과적으로 활용할 수 있도록 다양한 기능을 제공합니다. 이 중에서도 데이터브릭스에서 처리된 데이터를 외부 시스템이나 애플리케이션에 API로 제공하는 방법은 매우 중요한 주제입니다. 이 글에서는 데이터브릭스에서 데이터를 API로 제공하는 다양한 방법과 각 방법의 장단점을 살펴보겠습니다.
데이터브릭스는 광범위한 REST API를 제공하여 다양한 기능에 프로그래밍 방식으로 액세스할 수 있게 합니다. 이러한 API를 통해 데이터브릭스의 워크스페이스, 클러스터, 작업, 노트북 등을 관리하고 데이터에 접근할 수 있습니다.
데이터브릭스 API를 호출하기 위해서는 인증이 필요합니다. 가장 일반적인 인증 방법은 개인 액세스 토큰(PAT)을 사용하는 것입니다.
Copyimport requests
# 데이터브릭스 워크스페이스 URL과 개인 액세스 토큰 설정
workspace_url = "https://your-workspace.databricks.com"
token = "your-personal-access-token"
# API 호출을 위한 헤더 설정
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
# API 호출 예시 (클러스터 목록 가져오기)
response = requests.get(f"{workspace_url}/api/2.0/clusters/list", headers=headers)
clusters = response.json()
print(clusters)
Databricks CLI를 사용하면 명령줄에서 데이터브릭스 API를 쉽게 호출할 수 있습니다.
Copy# 클러스터 목록 조회
databricks api get /api/2.0/clusters/list
# 특정 클러스터 정보 조회
databricks api post /api/2.0/clusters/get --json '{"cluster_id": "1234-567890-abcde123"}'
데이터브릭스의 SQL REST API를 사용하면 SQL 쿼리를 실행하고 그 결과를 가져올 수 있습니다. 이를 통해 애플리케이션은 데이터브릭스 SQL 웨어하우스를 통해 데이터 레이크의 데이터에 접근할 수 있습니다.
SQL 쿼리를 실행하려면 /sql/statements
엔드포인트로 POST 요청을 보냅니다.
Copyimport requests
import json
def execute_query(workspace_url, token, warehouse_id, query):
url = f"{workspace_url}/api/2.0/sql/statements"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
payload = {
"warehouse_id": warehouse_id,
"statement": query
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
return response.json()
# 사용 예시
query_result = execute_query(
"https://your-workspace.databricks.com",
"your-token",
"warehouse-id",
"SELECT * FROM database.table LIMIT 10"
)
# 쿼리 ID 확인
statement_id = query_result["statement_id"]
print(f"Statement ID: {statement_id}")
쿼리를 제출한 후에는 해당 쿼리의 상태를 확인하고 완료되면 결과를 가져올 수 있습니다.
Copydef check_query_status(workspace_url, token, statement_id):
url = f"{workspace_url}/api/2.0/sql/statements/{statement_id}"
headers = {
"Authorization": f"Bearer {token}"
}
response = requests.get(url, headers=headers)
return response.json()
# 쿼리 상태 및 결과 확인
query_status = check_query_status(
"https://your-workspace.databricks.com",
"your-token",
statement_id
)
# 쿼리 완료 확인
if query_status["status"]["state"] == "SUCCEEDED":
# 결과 데이터 처리
schema = query_status["schema"]
result_data = query_status["result"]["data_array"]
# 결과 데이터 변환 예시 (칼럼명과 값을 매핑)
column_names = [col["name"] for col in schema]
rows = []
for row_data in result_data:
row = {column_names[i]: value for i, value in enumerate(row_data)}
rows.append(row)
print(f"쿼리 결과: {rows}")
SQL REST API는 다음과 같은 특징이 있습니다:
데이터브릭스는 Delta Sharing이라는 오픈 프로토콜을 통해 조직 내부 또는 외부에 안전하게 데이터를 공유할 수 있는 방법을 제공합니다. 이를 통해 벤더 종속성 없이 Delta Lake와 Apache Parquet 형식의 데이터를 다른 플랫폼과 쉽게 공유할 수 있습니다.
Delta Sharing을 통해 공유를 생성하고 데이터를 등록하는 방법은 다음과 같습니다:
데이터브릭스는 Delta Sharing을 관리하기 위한 REST API를 제공합니다.
Copy# 공유 생성
def create_share(workspace_url, token, share_name):
url = f"{workspace_url}/api/2.0/shares/create"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
payload = {
"name": share_name
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
return response.json()
# 공유에 테이블 추가
def add_table_to_share(workspace_url, token, share_name, table_name):
url = f"{workspace_url}/api/2.0/shares/{share_name}/assets"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
payload = {
"assets": [
{
"name": table_name,
"type": "TABLE"
}
]
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
return response.json()
데이터브릭스의 모델 서빙 기능을 활용하면 머신러닝 모델을 REST API 엔드포인트로 배포하여 실시간 예측을 제공할 수 있습니다. 이는 데이터브릭스에서 학습한 모델을 외부 애플리케이션에서 활용하는 좋은 방법입니다.
Copy# 모델 등록 및 엔드포인트 생성 예시
import mlflow
# 모델 등록
with mlflow.start_run():
# 모델 학습 코드
# ...
# 모델 저장
mlflow.sklearn.log_model(model, "model")
model_uri = f"runs:/{mlflow.active_run().info.run_id}/model"
# 모델 등록
model_name = "my_model"
model_version = mlflow.register_model(model_uri, model_name)
# 모델 서빙 엔드포인트 생성을 위한 REST API 호출
def create_serving_endpoint(workspace_url, token, endpoint_name, model_name, model_version):
url = f"{workspace_url}/api/2.0/serving-endpoints"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
payload = {
"name": endpoint_name,
"config": {
"served_models": [
{
"model_name": model_name,
"model_version": model_version,
"workload_size": "Small",
"scale_to_zero_enabled": True
}
]
}
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
return response.json()
모델 서빙 엔드포인트가 생성되면 다음과 같이 호출할 수 있습니다:
Copydef query_model_endpoint(workspace_url, token, endpoint_name, input_data):
url = f"{workspace_url}/serving-endpoints/{endpoint_name}/invocations"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
payload = {
"dataframe_records": input_data
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
return response.json()
# 예측 요청 예시
input_data = [
{"feature1": 1.0, "feature2": 2.0},
{"feature1": 3.0, "feature2": 4.0}
]
prediction = query_model_endpoint(
"https://your-workspace.databricks.com",
"your-token",
"my-model-endpoint",
input_data
)
print(f"예측 결과: {prediction}")
데이터브릭스의 Feature Serving 기능을 사용하면 사전 계산된 특성(feature)을 단일 REST API를 통해 AI 애플리케이션에 제공할 수 있습니다. 이 기능은 복잡한 인프라 관리 없이도 특성 서빙을 가능하게 합니다.
Copyfrom databricks.feature_store import FeatureStoreClient
# Feature Store 클라이언트 생성
fs = FeatureStoreClient()
# 특성 테이블 생성
feature_df = spark.table("my_features")
# 특성 테이블 등록
fs.create_table(
name="my_feature_table",
primary_keys=["id"],
df=feature_df,
description="My feature table for real-time serving"
)
# 온라인 저장소 활성화
fs.publish_table("my_feature_table")
특성 테이블이 등록되고 온라인 저장소가 활성화되면 REST API를 통해 특성을 실시간으로 조회할 수 있습니다.
Copydef query_feature_serving(workspace_url, token, table_name, entity_ids):
url = f"{workspace_url}/api/2.0/feature-store/features/lookup"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
payload = {
"table_name": table_name,
"lookup_key": {
"id": entity_ids
}
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
return response.json()
# 특성 조회 예시
features = query_feature_serving(
"https://your-workspace.databricks.com",
"your-token",
"my_feature_table",
["user_1", "user_2"]
)
print(f"특성 조회 결과: {features}")
데이터브릭스 노트북에서 직접 REST API 엔드포인트를 구현할 수도 있습니다. 이를 위해 노트북에서 Flask 또는 FastAPI와 같은 웹 프레임워크를 사용할 수 있습니다.
Copy# 노트북에서 Flask 웹 서버 실행 예시
from flask import Flask, request, jsonify
import threading
app = Flask(__name__)
@app.route('/api/data', methods=['GET'])
def get_data():
# Delta 테이블에서 데이터 조회
df = spark.sql("SELECT * FROM my_database.my_table LIMIT 10")
# 결과를 JSON으로 변환
result = [row.asDict() for row in df.collect()]
return jsonify(result)
# 백그라운드에서 Flask 서버 실행
def run_flask():
app.run(host='0.0.0.0', port=8080)
flask_thread = threading.Thread(target=run_flask)
flask_thread.daemon = True
flask_thread.start()
print("API 서버가 시작되었습니다. 클러스터 Public DNS를 통해 액세스할 수 있습니다.")
API로 제공할 데이터를 정기적으로 업데이트하기 위해 데이터브릭스 Jobs를 사용할 수 있습니다.
Copy# 데이터 업데이트 작업 생성
def create_data_update_job(workspace_url, token, job_name, notebook_path):
url = f"{workspace_url}/api/2.0/jobs/create"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
payload = {
"name": job_name,
"schedule": {
"quartz_cron_expression": "0 0 * * * ?", # 매시간 실행
"timezone_id": "Asia/Seoul"
},
"tasks": [
{
"task_key": "update_data",
"notebook_task": {
"notebook_path": notebook_path
},
"existing_cluster_id": "your-cluster-id"
}
]
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
return response.json()
API 요청의 응답을 캐싱하여 성능을 최적화할 수 있습니다.
Copyfrom functools import lru_cache
import time
@lru_cache(maxsize=128)
def get_cached_data(query, expiration=300):
"""
쿼리 결과를 캐싱하는 함수 (기본 만료 시간: 5분)
"""
# 캐시 키에 타임스탬프 포함 (만료 시간 단위로)
cache_key = f"{query}_{int(time.time() / expiration)}"
# 데이터 조회
result = spark.sql(query).collect()
return result
# 사용 예시
@app.route('/api/cached-data', methods=['GET'])
def get_api_data():
query = "SELECT * FROM my_database.my_table LIMIT 10"
result = get_cached_data(query)
return jsonify([row.asDict() for row in result])
데이터브릭스에서 데이터를 API로 제공하는 방법은 다양합니다. 각 방법은 고유한 장점과 사용 사례가 있으며, 비즈니스 요구사항에 맞게 적절한 방법을 선택해야 합니다.
이러한 다양한 방법 중에서 데이터의 특성, 액세스 패턴, 보안 요구사항, 성능 요구사항 등을 고려하여 최적의 접근 방식을 선택하는 것이 중요합니다. 데이터브릭스의 강력한 기능과 풍부한 API를 활용하면 데이터 중심 애플리케이션과 서비스를 효과적으로 구축할 수 있습니다.