[Livy] livy 로 spark application submit

Woong·2025년 4월 2일
0

Apache Spark

목록 보기
24/25

Livy 에 대해서

  • spark cluster 에 REST API 로 상호작용하는 프로젝트
    • 글 작성 시점에선 apache incubator 프로젝트

Livy 로 spark submit 하기

  • POST /batches API 로 spark 앱을 submit 할 수 있다.
    • file: main 파일 경로 (required)
    • className: 실행할 class name (jar 로 submit 할시 required)
    • queue: YARN queue name
    • pyFiles: pyspark 앱일 경우 .py 파일들
    • files: config 등 사용될 파일들
    • args: main 에 전달할 sys.argv 인자

HDFS 에 python 코드 업로드하고 livy 로 submit하기

  • pyspark 코드
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# SparkSession 생성
spark = SparkSession.builder \
    .appName("ZeppelinEquivalentJob") \
    .enableHiveSupport() \
    .getOrCreate()


# Hive DB 선택 및 쿼리 수행
spark.sql('USE dw')
df = spark.table("user_profile")

# 필터링 및 컬럼 선택
df2 = df.select(col('user_id'), col('nickname').alias('name'))

# 결과 테이블에 저장
df2.write.mode("append").saveAsTable("feature_store.demo_user_table")
  • hdfs 에 webhdfs API 로 python 코드 업로드
import requests
from requests.auth import HTTPBasicAuth

KNOX_URL = "https://<host>:<port>/gateway/default/webhdfs/v1"
AUTH = HTTPBasicAuth("<id>", "<password>")

HDFS_PATH = "/tmp/anjinwoong/etl_job_test.py"

# Step 1: 파일 생성 요청
# 실제 데이터를 업로드하지 않고, 클러스터 내부의 HDFS DataNode 중 하나로 redirect URL을 응답
# WebHDFS는 NameNode가 아닌 DataNode에 직접 데이터 업로드하도록 설계되어 있어, 클라이언트에게 "어디에 업로드해야 하는지"를 알려주는 방식

r1 = requests.put(
    f"{KNOX_URL}{HDFS_PATH}",
    params={'op': 'CREATE', 'overwrite': 'true', 'user.name': 'jungahn'},
    allow_redirects=False,
    auth=AUTH,
    verify=False
)

assert r1.status_code == 307, f"CREATE 단계 실패: {r1.text}"


# Step 2: redirect location으로 파일 업로드
with open("src/etl_job_test.py", "rb") as f:
    r2 = requests.put(r1.headers['Location'], data=f, auth=AUTH, verify=False)
    print(r2.status_code)
    assert r2.status_code == 201, f"업로드 실패: {r2.text}"

print("HDFS 업로드 완료")
  • POST /batches API 로 pyspark 실행
    • log 조회의 경우 직후에 조회시 accpeted 되기 전 혹은 직후일 가능성이 높으므로, 따로 분리하여 id 로 조회 권장
import requests
import json
from requests.auth import HTTPBasicAuth

livy_url = "https://<host>:<port>/gateway/default/livy/v1/batches"
auth = HTTPBasicAuth("<id>", "<password>")

payload = {
    "file": "hdfs:///tmp/anjinwoong/etl_job_test.py",
    "name": "pyspark-hdfs-job",
    "proxyUser": "<id>",
    "executorCores": 2,
    "executorMemory": "1g",
    "numExecutors": 2,
    "driverMemory": "1g",
    "conf": {
        "spark.yarn.submit.waitAppCompletion": "true",
        "spark.pyspark.python": "/usr/bin/python3",
        "spark.yarn.queue": "default",
    }
}

headers = {
    "Content-Type": "application/json"
}

response = requests.post(
    livy_url,
    auth=auth,
    headers=headers,
    data=json.dumps(payload),
    verify=False 
)

print("Status:", response.status_code)
print("Response:", response.json())


# 실행 결과 체크

batch_id = response.json()["id"]

log_resp = requests.get(
    f"{livy_url}/{batch_id}/log?from=0&size=1000",
    auth=auth,
    verify=False
)
for line in log_resp.json().get("log", []):
    print(line)

reference

0개의 댓글

Powered by GraphCDN, the GraphQL CDN