데이터브릭스의 데이터를 API로 제공하는 방법

GarionNachal·2025년 4월 10일
0

databricks

목록 보기
8/24

데이터브릭스는 빅데이터 처리와 분석을 위한 강력한 플랫폼으로, 기업들이 데이터를 효과적으로 활용할 수 있도록 다양한 기능을 제공합니다. 이 중에서도 데이터브릭스에서 처리된 데이터를 외부 시스템이나 애플리케이션에 API로 제공하는 방법은 매우 중요한 주제입니다. 이 글에서는 데이터브릭스에서 데이터를 API로 제공하는 다양한 방법과 각 방법의 장단점을 살펴보겠습니다.

목차

  1. 데이터브릭스 REST API 개요
  2. Databricks SQL REST API를 통한 데이터 액세스
  3. Delta Sharing을 통한 데이터 공유
  4. 모델 서빙과 REST API
  5. Feature Serving 활용하기
  6. REST API 구현을 위한 실용적인 팁
  7. 결론

데이터브릭스 REST API 개요

데이터브릭스는 광범위한 REST API를 제공하여 다양한 기능에 프로그래밍 방식으로 액세스할 수 있게 합니다. 이러한 API를 통해 데이터브릭스의 워크스페이스, 클러스터, 작업, 노트북 등을 관리하고 데이터에 접근할 수 있습니다.

API 인증 방법

데이터브릭스 API를 호출하기 위해서는 인증이 필요합니다. 가장 일반적인 인증 방법은 개인 액세스 토큰(PAT)을 사용하는 것입니다.

Copyimport requests

# 데이터브릭스 워크스페이스 URL과 개인 액세스 토큰 설정
workspace_url = "https://your-workspace.databricks.com"
token = "your-personal-access-token"

# API 호출을 위한 헤더 설정
headers = {
    "Authorization": f"Bearer {token}",
    "Content-Type": "application/json"
}

# API 호출 예시 (클러스터 목록 가져오기)
response = requests.get(f"{workspace_url}/api/2.0/clusters/list", headers=headers)
clusters = response.json()
print(clusters)

Databricks CLI를 통한 API 호출

Databricks CLI를 사용하면 명령줄에서 데이터브릭스 API를 쉽게 호출할 수 있습니다.

Copy# 클러스터 목록 조회
databricks api get /api/2.0/clusters/list

# 특정 클러스터 정보 조회
databricks api post /api/2.0/clusters/get --json '{"cluster_id": "1234-567890-abcde123"}'

Databricks SQL REST API를 통한 데이터 액세스

데이터브릭스의 SQL REST API를 사용하면 SQL 쿼리를 실행하고 그 결과를 가져올 수 있습니다. 이를 통해 애플리케이션은 데이터브릭스 SQL 웨어하우스를 통해 데이터 레이크의 데이터에 접근할 수 있습니다.

SQL 쿼리 실행 방법

SQL 쿼리를 실행하려면 /sql/statements 엔드포인트로 POST 요청을 보냅니다.

Copyimport requests
import json

def execute_query(workspace_url, token, warehouse_id, query):
    url = f"{workspace_url}/api/2.0/sql/statements"
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    payload = {
        "warehouse_id": warehouse_id,
        "statement": query
    }

    response = requests.post(url, headers=headers, data=json.dumps(payload))
    return response.json()

# 사용 예시
query_result = execute_query(
    "https://your-workspace.databricks.com",
    "your-token",
    "warehouse-id",
    "SELECT * FROM database.table LIMIT 10"
)

# 쿼리 ID 확인
statement_id = query_result["statement_id"]
print(f"Statement ID: {statement_id}")

쿼리 상태 확인 및 결과 가져오기

쿼리를 제출한 후에는 해당 쿼리의 상태를 확인하고 완료되면 결과를 가져올 수 있습니다.

Copydef check_query_status(workspace_url, token, statement_id):
    url = f"{workspace_url}/api/2.0/sql/statements/{statement_id}"
    headers = {
        "Authorization": f"Bearer {token}"
    }

    response = requests.get(url, headers=headers)
    return response.json()

# 쿼리 상태 및 결과 확인
query_status = check_query_status(
    "https://your-workspace.databricks.com",
    "your-token",
    statement_id
)

# 쿼리 완료 확인
if query_status["status"]["state"] == "SUCCEEDED":
    # 결과 데이터 처리
    schema = query_status["schema"]
    result_data = query_status["result"]["data_array"]

    # 결과 데이터 변환 예시 (칼럼명과 값을 매핑)
    column_names = [col["name"] for col in schema]
    rows = []
    for row_data in result_data:
        row = {column_names[i]: value for i, value in enumerate(row_data)}
        rows.append(row)

    print(f"쿼리 결과: {rows}")

SQL REST API는 다음과 같은 특징이 있습니다:

  • 동시에 최대 10개의 쿼리만 처리할 수 있으나, 클러스터를 추가하여 동시 접근 수를 늘릴 수 있습니다.
  • 대용량 테이블 쿼리보다는 적정 규모의 데이터 쿼리에 적합합니다.
  • 현재는 개인 액세스 토큰(PAT)만 인증에 사용 가능하여 행 수준 보안(RLS)이나 열 수준 보안(CLS) 적용에 제한이 있습니다.

Delta Sharing을 통한 데이터 공유

데이터브릭스는 Delta Sharing이라는 오픈 프로토콜을 통해 조직 내부 또는 외부에 안전하게 데이터를 공유할 수 있는 방법을 제공합니다. 이를 통해 벤더 종속성 없이 Delta Lake와 Apache Parquet 형식의 데이터를 다른 플랫폼과 쉽게 공유할 수 있습니다.

공유 생성 및 데이터 등록

Delta Sharing을 통해 공유를 생성하고 데이터를 등록하는 방법은 다음과 같습니다:

  1. 데이터브릭스 콘솔에서 '데이터' 탭으로 이동
  2. 'Delta Sharing' 섹션으로 이동하여 새 공유 생성
  3. 공유할 테이블이나 데이터를 등록
  4. 수신자를 등록하고 액세스 권한 부여

REST API를 통한 Delta Sharing 관리

데이터브릭스는 Delta Sharing을 관리하기 위한 REST API를 제공합니다.

Copy# 공유 생성
def create_share(workspace_url, token, share_name):
    url = f"{workspace_url}/api/2.0/shares/create"
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    payload = {
        "name": share_name
    }

    response = requests.post(url, headers=headers, data=json.dumps(payload))
    return response.json()

# 공유에 테이블 추가
def add_table_to_share(workspace_url, token, share_name, table_name):
    url = f"{workspace_url}/api/2.0/shares/{share_name}/assets"
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    payload = {
        "assets": [
            {
                "name": table_name,
                "type": "TABLE"
            }
        ]
    }

    response = requests.post(url, headers=headers, data=json.dumps(payload))
    return response.json()

모델 서빙과 REST API

데이터브릭스의 모델 서빙 기능을 활용하면 머신러닝 모델을 REST API 엔드포인트로 배포하여 실시간 예측을 제공할 수 있습니다. 이는 데이터브릭스에서 학습한 모델을 외부 애플리케이션에서 활용하는 좋은 방법입니다.

모델 서빙 엔드포인트 생성

Copy# 모델 등록 및 엔드포인트 생성 예시
import mlflow

# 모델 등록
with mlflow.start_run():
    # 모델 학습 코드
    # ...

    # 모델 저장
    mlflow.sklearn.log_model(model, "model")
    model_uri = f"runs:/{mlflow.active_run().info.run_id}/model"

# 모델 등록
model_name = "my_model"
model_version = mlflow.register_model(model_uri, model_name)

# 모델 서빙 엔드포인트 생성을 위한 REST API 호출
def create_serving_endpoint(workspace_url, token, endpoint_name, model_name, model_version):
    url = f"{workspace_url}/api/2.0/serving-endpoints"
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    payload = {
        "name": endpoint_name,
        "config": {
            "served_models": [
                {
                    "model_name": model_name,
                    "model_version": model_version,
                    "workload_size": "Small",
                    "scale_to_zero_enabled": True
                }
            ]
        }
    }

    response = requests.post(url, headers=headers, data=json.dumps(payload))
    return response.json()

모델 서빙 엔드포인트 호출

모델 서빙 엔드포인트가 생성되면 다음과 같이 호출할 수 있습니다:

Copydef query_model_endpoint(workspace_url, token, endpoint_name, input_data):
    url = f"{workspace_url}/serving-endpoints/{endpoint_name}/invocations"
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    payload = {
        "dataframe_records": input_data
    }

    response = requests.post(url, headers=headers, data=json.dumps(payload))
    return response.json()

# 예측 요청 예시
input_data = [
    {"feature1": 1.0, "feature2": 2.0},
    {"feature1": 3.0, "feature2": 4.0}
]

prediction = query_model_endpoint(
    "https://your-workspace.databricks.com",
    "your-token",
    "my-model-endpoint",
    input_data
)
print(f"예측 결과: {prediction}")

Feature Serving 활용하기

데이터브릭스의 Feature Serving 기능을 사용하면 사전 계산된 특성(feature)을 단일 REST API를 통해 AI 애플리케이션에 제공할 수 있습니다. 이 기능은 복잡한 인프라 관리 없이도 특성 서빙을 가능하게 합니다.

Feature Store 설정 및 특성 등록

Copyfrom databricks.feature_store import FeatureStoreClient

# Feature Store 클라이언트 생성
fs = FeatureStoreClient()

# 특성 테이블 생성
feature_df = spark.table("my_features")

# 특성 테이블 등록
fs.create_table(
    name="my_feature_table",
    primary_keys=["id"],
    df=feature_df,
    description="My feature table for real-time serving"
)

# 온라인 저장소 활성화
fs.publish_table("my_feature_table")

Feature Serving 엔드포인트 호출

특성 테이블이 등록되고 온라인 저장소가 활성화되면 REST API를 통해 특성을 실시간으로 조회할 수 있습니다.

Copydef query_feature_serving(workspace_url, token, table_name, entity_ids):
    url = f"{workspace_url}/api/2.0/feature-store/features/lookup"
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    payload = {
        "table_name": table_name,
        "lookup_key": {
            "id": entity_ids
        }
    }

    response = requests.post(url, headers=headers, data=json.dumps(payload))
    return response.json()

# 특성 조회 예시
features = query_feature_serving(
    "https://your-workspace.databricks.com",
    "your-token",
    "my_feature_table",
    ["user_1", "user_2"]
)
print(f"특성 조회 결과: {features}")

REST API 구현을 위한 실용적인 팁

1. 사용자 정의 API 엔드포인트 생성

데이터브릭스 노트북에서 직접 REST API 엔드포인트를 구현할 수도 있습니다. 이를 위해 노트북에서 Flask 또는 FastAPI와 같은 웹 프레임워크를 사용할 수 있습니다.

Copy# 노트북에서 Flask 웹 서버 실행 예시
from flask import Flask, request, jsonify
import threading

app = Flask(__name__)

@app.route('/api/data', methods=['GET'])
def get_data():
    # Delta 테이블에서 데이터 조회
    df = spark.sql("SELECT * FROM my_database.my_table LIMIT 10")

    # 결과를 JSON으로 변환
    result = [row.asDict() for row in df.collect()]
    return jsonify(result)

# 백그라운드에서 Flask 서버 실행
def run_flask():
    app.run(host='0.0.0.0', port=8080)

flask_thread = threading.Thread(target=run_flask)
flask_thread.daemon = True
flask_thread.start()

print("API 서버가 시작되었습니다. 클러스터 Public DNS를 통해 액세스할 수 있습니다.")

2. 데이터브릭스 Jobs를 통한 정기적인 데이터 업데이트

API로 제공할 데이터를 정기적으로 업데이트하기 위해 데이터브릭스 Jobs를 사용할 수 있습니다.

Copy# 데이터 업데이트 작업 생성
def create_data_update_job(workspace_url, token, job_name, notebook_path):
    url = f"{workspace_url}/api/2.0/jobs/create"
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    payload = {
        "name": job_name,
        "schedule": {
            "quartz_cron_expression": "0 0 * * * ?",  # 매시간 실행
            "timezone_id": "Asia/Seoul"
        },
        "tasks": [
            {
                "task_key": "update_data",
                "notebook_task": {
                    "notebook_path": notebook_path
                },
                "existing_cluster_id": "your-cluster-id"
            }
        ]
    }

    response = requests.post(url, headers=headers, data=json.dumps(payload))
    return response.json()

3. API 응답 캐싱 및 성능 최적화

API 요청의 응답을 캐싱하여 성능을 최적화할 수 있습니다.

Copyfrom functools import lru_cache
import time

@lru_cache(maxsize=128)
def get_cached_data(query, expiration=300):
    """
    쿼리 결과를 캐싱하는 함수 (기본 만료 시간: 5분)
    """
    # 캐시 키에 타임스탬프 포함 (만료 시간 단위로)
    cache_key = f"{query}_{int(time.time() / expiration)}"

    # 데이터 조회
    result = spark.sql(query).collect()
    return result

# 사용 예시
@app.route('/api/cached-data', methods=['GET'])
def get_api_data():
    query = "SELECT * FROM my_database.my_table LIMIT 10"
    result = get_cached_data(query)
    return jsonify([row.asDict() for row in result])

결론

데이터브릭스에서 데이터를 API로 제공하는 방법은 다양합니다. 각 방법은 고유한 장점과 사용 사례가 있으며, 비즈니스 요구사항에 맞게 적절한 방법을 선택해야 합니다.

  • SQL REST API는 SQL 쿼리를 통해 데이터에 접근할 수 있는 간편한 방법을 제공합니다.
  • Delta Sharing은 조직 내부 또는 외부에 안전하게 데이터를 공유할 수 있는 오픈 프로토콜을 제공합니다.
  • 모델 서빙은 머신러닝 모델을 REST API 엔드포인트로 배포하여 실시간 예측을 제공합니다.
  • Feature Serving은 사전 계산된 특성을 실시간으로 제공하여 AI 애플리케이션 개발을 촉진합니다.
  • 사용자 정의 API 엔드포인트는 특수한 요구사항에 맞는 맞춤형 API를 구현할 수 있게 합니다.

이러한 다양한 방법 중에서 데이터의 특성, 액세스 패턴, 보안 요구사항, 성능 요구사항 등을 고려하여 최적의 접근 방식을 선택하는 것이 중요합니다. 데이터브릭스의 강력한 기능과 풍부한 API를 활용하면 데이터 중심 애플리케이션과 서비스를 효과적으로 구축할 수 있습니다.

참고 자료

profile
AI를 꿈꾸는 BackEnd개발자

0개의 댓글